From d5cee81f135b37af2e92686a4b9627142cfa7583 Mon Sep 17 00:00:00 2001 From: Haoyu Zhang Date: Thu, 12 Dec 2019 11:56:29 -0800 Subject: [PATCH] Use a separate mutex for updating context state. Instead of reusing lock for access context map, we should use a separate mutex for accessing / updating the state of EagerContext. The two locks serve different purposes. This separation avoids deadlocks between executing long running functions and various other context map accesses. PiperOrigin-RevId: 285239370 Change-Id: I497e857775101fa1f5773c693330dc0611c3426b --- .../eager/eager_service_impl.cc | 20 +++++++++---------- .../eager/eager_service_impl.h | 6 ++++++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/tensorflow/core/distributed_runtime/eager/eager_service_impl.cc b/tensorflow/core/distributed_runtime/eager/eager_service_impl.cc index b1bf5bb63e4..7e4e8fed16c 100644 --- a/tensorflow/core/distributed_runtime/eager/eager_service_impl.cc +++ b/tensorflow/core/distributed_runtime/eager/eager_service_impl.cc @@ -277,14 +277,12 @@ Status EagerServiceImpl::UpdateContext(const UpdateContextRequest* request, eager::CreateClusterFLR(request->context_id(), ctx, worker_session.get()); { - // Hold `contexts_mu_` exclusively, wait for all pending nodes to finish - // (implicitly calling WaitForAllPendingNodes inside `ctx->ClearCaches`), - // and update the context state. - // This lock prevents other threads from handling enqueue requests at the - // same time. Each enqueue request will be processed either with context - // state before or after the update, but the exact ordering needs to be - // determined by the client if desired. - mutex_lock lock(contexts_mu_); + // Hold `context_update_mu_` exclusively update the context state. This lock + // prevents other threads from processing an enqueued request at the same + // time. Each enqueue request will be processed either with context state + // before or after the update, but the exact ordering needs to be enforced + // by the client if desired. + mutex_lock l(context_update_mu_); ctx->ClearCaches(); Status s = ctx->UpdateRemoteWorker( device_mgr, std::move(remote_eager_workers), @@ -420,9 +418,6 @@ Status EagerServiceImpl::Enqueue(const EnqueueRequest* request, TF_RETURN_IF_ERROR(GetServerContext(request->context_id(), &context)); core::ScopedUnref context_unref(context); - // Acquire shared lock to prevent handling enqueue requests while updating - // context (see UpdateContext). - tf_shared_lock lock(contexts_mu_); EagerExecutor& executor = stream_id == kInvalidStreamId ? context->Context()->Executor() @@ -431,6 +426,9 @@ Status EagerServiceImpl::Enqueue(const EnqueueRequest* request, Status s; for (const auto& item : request->queue()) { auto* queue_response = response->add_queue_response(); + // Acquire shared lock to prevent handling enqueue requests while updating + // context (see UpdateContext). + tf_shared_lock l(context_update_mu_); if (item.has_operation()) { s = ExecuteOp(item.operation(), context->Context(), &executor, queue_response); diff --git a/tensorflow/core/distributed_runtime/eager/eager_service_impl.h b/tensorflow/core/distributed_runtime/eager/eager_service_impl.h index 3223f7cdca6..16ca9bbcc3d 100644 --- a/tensorflow/core/distributed_runtime/eager/eager_service_impl.h +++ b/tensorflow/core/distributed_runtime/eager/eager_service_impl.h @@ -213,6 +213,12 @@ class EagerServiceImpl { mutex contexts_mu_; std::unordered_map contexts_ GUARDED_BY(contexts_mu_); + // Mutex to guard access to EagerContext in `contexts_`. Different from + // `contexts_mu_` which guards adding / removing item from the map, this mutex + // is supposed to be used to avoid concurrent reading/updating the state of an + // EagerContext inside the map. + mutex context_update_mu_; + std::unique_ptr gc_thread_; mutex gc_thread_shutdown_mu_; condition_variable gc_thread_cv_;