Use a separate mutex for updating context state.
Instead of reusing lock for access context map, we should use a separate mutex for accessing / updating the state of EagerContext. The two locks serve different purposes. This separation avoids deadlocks between executing long running functions and various other context map accesses. PiperOrigin-RevId: 285239370 Change-Id: I497e857775101fa1f5773c693330dc0611c3426b
This commit is contained in:
parent
88dbc2d867
commit
d5cee81f13
@ -277,14 +277,12 @@ Status EagerServiceImpl::UpdateContext(const UpdateContextRequest* request,
|
|||||||
eager::CreateClusterFLR(request->context_id(), ctx, worker_session.get());
|
eager::CreateClusterFLR(request->context_id(), ctx, worker_session.get());
|
||||||
|
|
||||||
{
|
{
|
||||||
// Hold `contexts_mu_` exclusively, wait for all pending nodes to finish
|
// Hold `context_update_mu_` exclusively update the context state. This lock
|
||||||
// (implicitly calling WaitForAllPendingNodes inside `ctx->ClearCaches`),
|
// prevents other threads from processing an enqueued request at the same
|
||||||
// and update the context state.
|
// time. Each enqueue request will be processed either with context state
|
||||||
// This lock prevents other threads from handling enqueue requests at the
|
// before or after the update, but the exact ordering needs to be enforced
|
||||||
// same time. Each enqueue request will be processed either with context
|
// by the client if desired.
|
||||||
// state before or after the update, but the exact ordering needs to be
|
mutex_lock l(context_update_mu_);
|
||||||
// determined by the client if desired.
|
|
||||||
mutex_lock lock(contexts_mu_);
|
|
||||||
ctx->ClearCaches();
|
ctx->ClearCaches();
|
||||||
Status s = ctx->UpdateRemoteWorker(
|
Status s = ctx->UpdateRemoteWorker(
|
||||||
device_mgr, std::move(remote_eager_workers),
|
device_mgr, std::move(remote_eager_workers),
|
||||||
@ -420,9 +418,6 @@ Status EagerServiceImpl::Enqueue(const EnqueueRequest* request,
|
|||||||
TF_RETURN_IF_ERROR(GetServerContext(request->context_id(), &context));
|
TF_RETURN_IF_ERROR(GetServerContext(request->context_id(), &context));
|
||||||
core::ScopedUnref context_unref(context);
|
core::ScopedUnref context_unref(context);
|
||||||
|
|
||||||
// Acquire shared lock to prevent handling enqueue requests while updating
|
|
||||||
// context (see UpdateContext).
|
|
||||||
tf_shared_lock lock(contexts_mu_);
|
|
||||||
EagerExecutor& executor =
|
EagerExecutor& executor =
|
||||||
stream_id == kInvalidStreamId
|
stream_id == kInvalidStreamId
|
||||||
? context->Context()->Executor()
|
? context->Context()->Executor()
|
||||||
@ -431,6 +426,9 @@ Status EagerServiceImpl::Enqueue(const EnqueueRequest* request,
|
|||||||
Status s;
|
Status s;
|
||||||
for (const auto& item : request->queue()) {
|
for (const auto& item : request->queue()) {
|
||||||
auto* queue_response = response->add_queue_response();
|
auto* queue_response = response->add_queue_response();
|
||||||
|
// Acquire shared lock to prevent handling enqueue requests while updating
|
||||||
|
// context (see UpdateContext).
|
||||||
|
tf_shared_lock l(context_update_mu_);
|
||||||
if (item.has_operation()) {
|
if (item.has_operation()) {
|
||||||
s = ExecuteOp(item.operation(), context->Context(), &executor,
|
s = ExecuteOp(item.operation(), context->Context(), &executor,
|
||||||
queue_response);
|
queue_response);
|
||||||
|
@ -213,6 +213,12 @@ class EagerServiceImpl {
|
|||||||
mutex contexts_mu_;
|
mutex contexts_mu_;
|
||||||
std::unordered_map<uint64, ServerContext*> contexts_ GUARDED_BY(contexts_mu_);
|
std::unordered_map<uint64, ServerContext*> contexts_ GUARDED_BY(contexts_mu_);
|
||||||
|
|
||||||
|
// Mutex to guard access to EagerContext in `contexts_`. Different from
|
||||||
|
// `contexts_mu_` which guards adding / removing item from the map, this mutex
|
||||||
|
// is supposed to be used to avoid concurrent reading/updating the state of an
|
||||||
|
// EagerContext inside the map.
|
||||||
|
mutex context_update_mu_;
|
||||||
|
|
||||||
std::unique_ptr<Thread> gc_thread_;
|
std::unique_ptr<Thread> gc_thread_;
|
||||||
mutex gc_thread_shutdown_mu_;
|
mutex gc_thread_shutdown_mu_;
|
||||||
condition_variable gc_thread_cv_;
|
condition_variable gc_thread_cv_;
|
||||||
|
Loading…
Reference in New Issue
Block a user