diff --git a/tensorflow/c/c_api_experimental.cc b/tensorflow/c/c_api_experimental.cc
index 831c6a0ad40..531dcd367de 100644
--- a/tensorflow/c/c_api_experimental.cc
+++ b/tensorflow/c/c_api_experimental.cc
@@ -525,12 +525,12 @@ tensorflow::Status EnableCollectiveOps(const tensorflow::ServerDef& server_def,
 
     LOG_AND_RETURN_IF_ERROR(context->StoreCollectiveOpsServer(
         std::move(new_server), grpc_server->worker_env()->device_mgr,
-        grpc_server->worker_env()->collective_executor_mgr));
+        grpc_server->worker_env()->collective_executor_mgr.get()));
   } else {
     LOG_AND_RETURN_IF_ERROR(grpc_server->UpdateServerDef(server_def));
     LOG_AND_RETURN_IF_ERROR(context->StoreCollectiveOpsServer(
         /*new_server=*/nullptr, grpc_server->worker_env()->device_mgr,
-        grpc_server->worker_env()->collective_executor_mgr));
+        grpc_server->worker_env()->collective_executor_mgr.get()));
   }
   return tensorflow::Status::OK();
 #undef LOG_AND_RETURN_IF_ERROR
diff --git a/tensorflow/c/eager/BUILD b/tensorflow/c/eager/BUILD
index a77e76644b8..f56f8ad0a4b 100644
--- a/tensorflow/c/eager/BUILD
+++ b/tensorflow/c/eager/BUILD
@@ -514,7 +514,6 @@ tf_cuda_cc_test(
     extra_copts = tfe_xla_copts(),
     tags = [
         "no_windows",
-        "noasan",  # leaks gRPC server instances
     ],
     deps = [
         ":c_api",
@@ -581,7 +580,6 @@ tf_cuda_cc_test(
     extra_copts = tfe_xla_copts(),
     tags = [
         "no_windows",
-        "noasan",  # leaks gRPC server instances
     ],
     deps = [
         ":c_api",
diff --git a/tensorflow/core/distributed_runtime/master_env.h b/tensorflow/core/distributed_runtime/master_env.h
index 837ccd1dd48..64b73dfac22 100644
--- a/tensorflow/core/distributed_runtime/master_env.h
+++ b/tensorflow/core/distributed_runtime/master_env.h
@@ -62,7 +62,7 @@ struct WorkerCacheFactoryOptions {
 struct MasterEnv {
   Env* env = nullptr;
 
-  // Object from which WorkerInterface instances can be obtained.
+  // Object from which WorkerInterface instances can be obtained. Not owned.
   WorkerCacheInterface* worker_cache = nullptr;
 
   // The operation definitions to use.  Must be filled before use.
@@ -93,7 +93,7 @@ struct MasterEnv {
       worker_cache_factory;
 
   // Generates per-step CollectiveExecutors and has access to utilities
-  // supporting collective operations.
+  // supporting collective operations. Not owned.
   CollectiveExecutorMgrInterface* collective_executor_mgr = nullptr;
 };
 
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
index c0b4d0ef6ec..9fe6eef5610 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
@@ -267,9 +267,9 @@ Status GrpcServer::Init(const GrpcServerOptions& opts) {
   CHECK_NE(nullptr, worker_cache);
 
   if (opts.collective_mgr_func) {
-    worker_env_.collective_executor_mgr =
-        opts.collective_mgr_func(config, &worker_env_, worker_cache);
-    if (!worker_env_.collective_executor_mgr) {
+    worker_env_.collective_executor_mgr.reset(
+        opts.collective_mgr_func(config, &worker_env_, worker_cache));
+    if (worker_env_.collective_executor_mgr == nullptr) {
       return errors::Internal(
           "collective_mgr_func did not return CollectiveExecutorMgr");
     }
@@ -281,9 +281,9 @@ Status GrpcServer::Init(const GrpcServerOptions& opts) {
         new CollectiveParamResolverDistributed(config, worker_env_.device_mgr,
                                                dev_resolver.get(), worker_cache,
                                                default_worker_name));
-    worker_env_.collective_executor_mgr = new RpcCollectiveExecutorMgr(
+    worker_env_.collective_executor_mgr.reset(new RpcCollectiveExecutorMgr(
         config, worker_env_.device_mgr, std::move(dev_resolver),
-        std::move(param_resolver), worker_cache, default_worker_name);
+        std::move(param_resolver), worker_cache, default_worker_name));
   }
 
   // Set up worker environment.
@@ -299,7 +299,8 @@ Status GrpcServer::Init(const GrpcServerOptions& opts) {
   // Finish setting up master environment.
   master_env_.ops = OpRegistry::Global();
   master_env_.worker_cache = worker_cache;
-  master_env_.collective_executor_mgr = worker_env_.collective_executor_mgr;
+  master_env_.collective_executor_mgr =
+      worker_env_.collective_executor_mgr.get();
   StatsPublisherFactory stats_factory = opts.stats_factory;
   master_env_.master_session_factory =
       [config, stats_factory](
@@ -433,6 +434,8 @@ Status GrpcServer::UpdateServerDef(const ServerDef& server_def) {
     return errors::InvalidArgument(
         "Failed to build worker cache with the provided server def.");
   }
+  // Transfer ownership of worker_cache to worker_env_.session_mgr.
+  worker_env_.session_mgr->ResetDefaultWorkerCache(worker_cache);
 
   string default_worker_name;
   string unused;
@@ -447,13 +450,14 @@ Status GrpcServer::UpdateServerDef(const ServerDef& server_def) {
       new CollectiveParamResolverDistributed(
           server_def_.default_session_config(), worker_env_.device_mgr,
           dev_resolver.get(), worker_cache, default_worker_name));
-  worker_env_.collective_executor_mgr = new RpcCollectiveExecutorMgr(
+  worker_env_.collective_executor_mgr.reset(new RpcCollectiveExecutorMgr(
       server_def_.default_session_config(), worker_env_.device_mgr,
       std::move(dev_resolver), std::move(param_resolver), worker_cache,
-      default_worker_name);
+      default_worker_name));
 
   master_env_.worker_cache = worker_cache;
-  master_env_.collective_executor_mgr = worker_env_.collective_executor_mgr;
+  master_env_.collective_executor_mgr =
+      worker_env_.collective_executor_mgr.get();
   return Status::OK();
 }
 
diff --git a/tensorflow/core/distributed_runtime/session_mgr.cc b/tensorflow/core/distributed_runtime/session_mgr.cc
index 1d9a22a5817..37f47848f75 100644
--- a/tensorflow/core/distributed_runtime/session_mgr.cc
+++ b/tensorflow/core/distributed_runtime/session_mgr.cc
@@ -144,6 +144,10 @@ Status SessionMgr::CreateSession(
   return Status::OK();
 }
 
+void SessionMgr::ResetDefaultWorkerCache(WorkerCacheInterface* worker_cache) {
+  default_worker_cache_.reset(worker_cache);
+}
+
 Status SessionMgr::UpdateSession(
     const string& session, const ServerDef& server_def,
     const protobuf::RepeatedPtrField<DeviceAttributes>&
diff --git a/tensorflow/core/distributed_runtime/session_mgr.h b/tensorflow/core/distributed_runtime/session_mgr.h
index 8c438dbd83e..a9467708870 100644
--- a/tensorflow/core/distributed_runtime/session_mgr.h
+++ b/tensorflow/core/distributed_runtime/session_mgr.h
@@ -53,6 +53,8 @@ class SessionMgr {
       const protobuf::RepeatedPtrField<DeviceAttributes>& device_attributes,
       bool isolate_session_state);
 
+  void ResetDefaultWorkerCache(WorkerCacheInterface* worker_cache);
+
   // Updates state (worker cache, devices) of worker session identified by
   // session name (`session`) based on a new server_def and set of devices.
   Status UpdateSession(const string& session, const ServerDef& server_def,
diff --git a/tensorflow/core/distributed_runtime/worker_env.h b/tensorflow/core/distributed_runtime/worker_env.h
index ecc3313d0ce..b308c5e7a18 100644
--- a/tensorflow/core/distributed_runtime/worker_env.h
+++ b/tensorflow/core/distributed_runtime/worker_env.h
@@ -60,7 +60,7 @@ struct WorkerEnv {
 
   // Generates per-step CollectiveExecutors and has access to utilities
   // supporting collective operations.
-  CollectiveExecutorMgrInterface* collective_executor_mgr = nullptr;
+  std::unique_ptr<CollectiveExecutorMgrInterface> collective_executor_mgr;
 
   // A pool of threads for scheduling compute work.
   thread::ThreadPool* compute_pool = nullptr;