diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc index 3855d4af895..cfa3a1b3338 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc +++ b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc @@ -370,11 +370,6 @@ class GrpcWorkerService : public AsyncServiceInterface { GrpcWorkerServiceOptions options) : is_shutdown_(false) { builder->RegisterService(&worker_service_); - // TODO(jingdong): it would be cleaner to move this option to GrpcWorker - // since the cache is maintained by GrpcWorker now. - if (options.cache_rpc_response) { - worker->EnableResponseCache(); - } for (int i = 0; i < options.num_serving_threads; i++) { threads_.emplace_back( @@ -428,7 +423,11 @@ GrpcWorker::GrpcWorker(WorkerEnv* worker_env, const ConfigProto& config) recv_buf_max_chunk_( config.experimental().recv_buf_max_chunk() > 0 ? config.experimental().recv_buf_max_chunk() - : (config.experimental().recv_buf_max_chunk() < 0 ? 0 : 4096)) {} + : (config.experimental().recv_buf_max_chunk() < 0 ? 0 : 4096)) { + if (config.rpc_options().cache_rpc_response()) { + EnableResponseCache(); + } +} void GrpcWorker::EnableResponseCache() { VLOG(1) << "Enabling gRPC tensor response cache."; diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h index da97f09f92e..e683540065c 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h +++ b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h @@ -75,14 +75,6 @@ struct GrpcWorkerServiceOptions { // default queue depth for a method. std::unordered_map queue_depth; int num_serving_threads = 8; - - // Setting cache_rpc_response to true will enable sender side caching of - // response for RecvTensorAsync and RecvBufAsync to allow receiver to retry - // requests . This is only necessary when the network fabric is experiencing a - // significant error rate. Without it we'll fail a step on an network error, - // while with it we'll be able to complete long steps (like complex - // initializations) in the face of some network errors during RecvTensor. - bool cache_rpc_response = false; }; // Returns an implementation of WorkerService rpc service. diff --git a/tensorflow/core/protobuf/config.proto b/tensorflow/core/protobuf/config.proto index dd237d47d6e..e0283e07eac 100644 --- a/tensorflow/core/protobuf/config.proto +++ b/tensorflow/core/protobuf/config.proto @@ -327,6 +327,14 @@ message RPCOptions { // If compression_algorithm is set, the compression level to be used. // From 0 (no compression), up to 3. int32 compression_level = 3; + + // Setting cache_rpc_response to true will enable sender side caching of + // response for RecvTensorAsync and RecvBufAsync to allow receiver to retry + // requests . This is only necessary when the network fabric is experiencing a + // significant error rate. Without it we'll fail a step on an network error, + // while with it we'll be able to complete long steps (like complex + // initializations) in the face of some network errors during RecvTensor. + bool cache_rpc_response = 4; } // Metadata about the session.