From 5c4b8790ca2144c9cde4c02cac46ffe12c76e9ef Mon Sep 17 00:00:00 2001 From: Haoyu Zhang Date: Fri, 17 Jul 2020 10:10:53 -0700 Subject: [PATCH] Fix two memory leaks and enable asan for C API remote tests. PiperOrigin-RevId: 321801325 Change-Id: Id579f93e167c9665b4ca740eee160da801ca0694 --- tensorflow/c/c_api_experimental.cc | 4 ++-- tensorflow/c/eager/BUILD | 2 -- .../core/distributed_runtime/master_env.h | 4 ++-- .../rpc/grpc_server_lib.cc | 22 +++++++++++-------- .../core/distributed_runtime/session_mgr.cc | 4 ++++ .../core/distributed_runtime/session_mgr.h | 2 ++ .../core/distributed_runtime/worker_env.h | 2 +- 7 files changed, 24 insertions(+), 16 deletions(-) diff --git a/tensorflow/c/c_api_experimental.cc b/tensorflow/c/c_api_experimental.cc index 831c6a0ad40..531dcd367de 100644 --- a/tensorflow/c/c_api_experimental.cc +++ b/tensorflow/c/c_api_experimental.cc @@ -525,12 +525,12 @@ tensorflow::Status EnableCollectiveOps(const tensorflow::ServerDef& server_def, LOG_AND_RETURN_IF_ERROR(context->StoreCollectiveOpsServer( std::move(new_server), grpc_server->worker_env()->device_mgr, - grpc_server->worker_env()->collective_executor_mgr)); + grpc_server->worker_env()->collective_executor_mgr.get())); } else { LOG_AND_RETURN_IF_ERROR(grpc_server->UpdateServerDef(server_def)); LOG_AND_RETURN_IF_ERROR(context->StoreCollectiveOpsServer( /*new_server=*/nullptr, grpc_server->worker_env()->device_mgr, - grpc_server->worker_env()->collective_executor_mgr)); + grpc_server->worker_env()->collective_executor_mgr.get())); } return tensorflow::Status::OK(); #undef LOG_AND_RETURN_IF_ERROR diff --git a/tensorflow/c/eager/BUILD b/tensorflow/c/eager/BUILD index a77e76644b8..f56f8ad0a4b 100644 --- a/tensorflow/c/eager/BUILD +++ b/tensorflow/c/eager/BUILD @@ -514,7 +514,6 @@ tf_cuda_cc_test( extra_copts = tfe_xla_copts(), tags = [ "no_windows", - "noasan", # leaks gRPC server instances ], deps = [ ":c_api", @@ -581,7 +580,6 @@ tf_cuda_cc_test( extra_copts = tfe_xla_copts(), tags = [ "no_windows", - "noasan", # leaks gRPC server instances ], deps = [ ":c_api", diff --git a/tensorflow/core/distributed_runtime/master_env.h b/tensorflow/core/distributed_runtime/master_env.h index 837ccd1dd48..64b73dfac22 100644 --- a/tensorflow/core/distributed_runtime/master_env.h +++ b/tensorflow/core/distributed_runtime/master_env.h @@ -62,7 +62,7 @@ struct WorkerCacheFactoryOptions { struct MasterEnv { Env* env = nullptr; - // Object from which WorkerInterface instances can be obtained. + // Object from which WorkerInterface instances can be obtained. Not owned. WorkerCacheInterface* worker_cache = nullptr; // The operation definitions to use. Must be filled before use. @@ -93,7 +93,7 @@ struct MasterEnv { worker_cache_factory; // Generates per-step CollectiveExecutors and has access to utilities - // supporting collective operations. + // supporting collective operations. Not owned. CollectiveExecutorMgrInterface* collective_executor_mgr = nullptr; }; diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc index c0b4d0ef6ec..9fe6eef5610 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc +++ b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc @@ -267,9 +267,9 @@ Status GrpcServer::Init(const GrpcServerOptions& opts) { CHECK_NE(nullptr, worker_cache); if (opts.collective_mgr_func) { - worker_env_.collective_executor_mgr = - opts.collective_mgr_func(config, &worker_env_, worker_cache); - if (!worker_env_.collective_executor_mgr) { + worker_env_.collective_executor_mgr.reset( + opts.collective_mgr_func(config, &worker_env_, worker_cache)); + if (worker_env_.collective_executor_mgr == nullptr) { return errors::Internal( "collective_mgr_func did not return CollectiveExecutorMgr"); } @@ -281,9 +281,9 @@ Status GrpcServer::Init(const GrpcServerOptions& opts) { new CollectiveParamResolverDistributed(config, worker_env_.device_mgr, dev_resolver.get(), worker_cache, default_worker_name)); - worker_env_.collective_executor_mgr = new RpcCollectiveExecutorMgr( + worker_env_.collective_executor_mgr.reset(new RpcCollectiveExecutorMgr( config, worker_env_.device_mgr, std::move(dev_resolver), - std::move(param_resolver), worker_cache, default_worker_name); + std::move(param_resolver), worker_cache, default_worker_name)); } // Set up worker environment. @@ -299,7 +299,8 @@ Status GrpcServer::Init(const GrpcServerOptions& opts) { // Finish setting up master environment. master_env_.ops = OpRegistry::Global(); master_env_.worker_cache = worker_cache; - master_env_.collective_executor_mgr = worker_env_.collective_executor_mgr; + master_env_.collective_executor_mgr = + worker_env_.collective_executor_mgr.get(); StatsPublisherFactory stats_factory = opts.stats_factory; master_env_.master_session_factory = [config, stats_factory]( @@ -433,6 +434,8 @@ Status GrpcServer::UpdateServerDef(const ServerDef& server_def) { return errors::InvalidArgument( "Failed to build worker cache with the provided server def."); } + // Transfer ownership of worker_cache to worker_env_.session_mgr. + worker_env_.session_mgr->ResetDefaultWorkerCache(worker_cache); string default_worker_name; string unused; @@ -447,13 +450,14 @@ Status GrpcServer::UpdateServerDef(const ServerDef& server_def) { new CollectiveParamResolverDistributed( server_def_.default_session_config(), worker_env_.device_mgr, dev_resolver.get(), worker_cache, default_worker_name)); - worker_env_.collective_executor_mgr = new RpcCollectiveExecutorMgr( + worker_env_.collective_executor_mgr.reset(new RpcCollectiveExecutorMgr( server_def_.default_session_config(), worker_env_.device_mgr, std::move(dev_resolver), std::move(param_resolver), worker_cache, - default_worker_name); + default_worker_name)); master_env_.worker_cache = worker_cache; - master_env_.collective_executor_mgr = worker_env_.collective_executor_mgr; + master_env_.collective_executor_mgr = + worker_env_.collective_executor_mgr.get(); return Status::OK(); } diff --git a/tensorflow/core/distributed_runtime/session_mgr.cc b/tensorflow/core/distributed_runtime/session_mgr.cc index 1d9a22a5817..37f47848f75 100644 --- a/tensorflow/core/distributed_runtime/session_mgr.cc +++ b/tensorflow/core/distributed_runtime/session_mgr.cc @@ -144,6 +144,10 @@ Status SessionMgr::CreateSession( return Status::OK(); } +void SessionMgr::ResetDefaultWorkerCache(WorkerCacheInterface* worker_cache) { + default_worker_cache_.reset(worker_cache); +} + Status SessionMgr::UpdateSession( const string& session, const ServerDef& server_def, const protobuf::RepeatedPtrField& diff --git a/tensorflow/core/distributed_runtime/session_mgr.h b/tensorflow/core/distributed_runtime/session_mgr.h index 8c438dbd83e..a9467708870 100644 --- a/tensorflow/core/distributed_runtime/session_mgr.h +++ b/tensorflow/core/distributed_runtime/session_mgr.h @@ -53,6 +53,8 @@ class SessionMgr { const protobuf::RepeatedPtrField& device_attributes, bool isolate_session_state); + void ResetDefaultWorkerCache(WorkerCacheInterface* worker_cache); + // Updates state (worker cache, devices) of worker session identified by // session name (`session`) based on a new server_def and set of devices. Status UpdateSession(const string& session, const ServerDef& server_def, diff --git a/tensorflow/core/distributed_runtime/worker_env.h b/tensorflow/core/distributed_runtime/worker_env.h index ecc3313d0ce..b308c5e7a18 100644 --- a/tensorflow/core/distributed_runtime/worker_env.h +++ b/tensorflow/core/distributed_runtime/worker_env.h @@ -60,7 +60,7 @@ struct WorkerEnv { // Generates per-step CollectiveExecutors and has access to utilities // supporting collective operations. - CollectiveExecutorMgrInterface* collective_executor_mgr = nullptr; + std::unique_ptr collective_executor_mgr; // A pool of threads for scheduling compute work. thread::ThreadPool* compute_pool = nullptr;