Use CreateWorkerSession and DeleteWorkerSession for all distributed sessions.

This change adds a phase to the session creation protocol: the master now contacts all workers to register a session handle and create a "WorkerSession" on each worker before it first registers or runs a graph on any worker. Subsequent requests to a worker ensure that the worker has the session handle registered before performing the request, and an AbortedError is raised if the worker has not (e.g. because it restarted after a failure).

As a result, more failure cases are covered by the high-level APIs (tf.estimator, Slim, etc.) that recreate the session on receiving an AbortedError. Previously, there was a possible race condition in which a PS task could restart between variable initialization and the first step, leading to a FailedPreconditionError ("Attempting to use uninitialized value") that would not be handled by the high-level APIs.

PiperOrigin-RevId: 193694958
This commit is contained in:
Derek Murray 2018-04-20 11:13:16 -07:00 committed by TensorFlower Gardener
parent b3f379e907
commit 49f3469d95

View File

@ -89,6 +89,10 @@ class MasterSession::ReffedClientGraph : public core::RefCounted {
~ReffedClientGraph() override {
if (should_deregister_) {
DeregisterPartitions();
} else {
for (Part& part : partitions_) {
worker_cache_->ReleaseWorker(part.name, part.worker);
}
}
}
@ -1174,14 +1178,8 @@ Status MasterSession::Create(GraphDef* graph_def,
TF_RETURN_IF_ERROR(GraphExecutionState::MakeForBaseGraph(
graph_def, execution_options, &execution_state_));
}
// TODO(b/36574172): Remove these conditions when ClusterSpec
// propagation is supported in all servers.
if (options.cluster_def != nullptr ||
session_opts_.config.isolate_session_state()) {
should_delete_worker_sessions_ = true;
return CreateWorkerSessions(options);
}
return Status::OK();
should_delete_worker_sessions_ = true;
return CreateWorkerSessions(options);
}
Status MasterSession::CreateWorkerSessions(