From 462a79b7fe98ad71dccbcf691a06a4a7f48ee382 Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Tue, 30 Oct 2018 10:24:22 -0700 Subject: [PATCH] Change RecvBufRespExtra.tensor_content to a repeated string and fill it with many small strings instead of one large one, when using gRPC. Typing tensor_content as a Cord instead of a single string leads to roughly a 20% speedup in a 2-worker (8 v100 GPUs each) benchmark training of resnet50 using collective all-reduce for gradient reduction and gRPC for all inter-worker transport. It is hypothesized that without the Cord type gRPC is stalling incoming RecvBuf RPCs as it repeatedly reallocates and copies the strings. Using a Cord to receive the value leads to much better flow control. Unfortunately, proto3 does not yet support [ctype=CORD], so we can't use that simple and effective optimization. This CL changes tensor_content to a sequence of strings and sets a max single-string size of 4KB, the likely page size. (This default can be changed via ConfigProto.experimental.recv_buf_max_chunk.) It achieves roughly a 12% speedup on the benchmark test. The speedups are highly dependent on topology and network weather since the major effect is believed to be on flow control. PiperOrigin-RevId: 219322231 --- .../collective_rma_distributed.cc | 20 +++++-- .../collective_rma_distributed_test.cc | 2 +- .../rpc/grpc_server_lib.cc | 4 +- .../rpc/grpc_worker_service.cc | 55 ++++++++++++++----- .../rpc/grpc_worker_service.h | 7 ++- tensorflow/core/protobuf/config.proto | 5 ++ .../core/protobuf/transport_options.proto | 2 +- ...nsorflow.-config-proto.-experimental.pbtxt | 6 ++ .../golden/v1/tensorflow.-config-proto.pbtxt | 6 ++ ...nsorflow.-config-proto.-experimental.pbtxt | 6 ++ .../golden/v2/tensorflow.-config-proto.pbtxt | 6 ++ 11 files changed, 94 insertions(+), 25 deletions(-) diff --git a/tensorflow/core/distributed_runtime/collective_rma_distributed.cc b/tensorflow/core/distributed_runtime/collective_rma_distributed.cc index 805e023b0f3..9087703cb55 100644 --- a/tensorflow/core/distributed_runtime/collective_rma_distributed.cc +++ b/tensorflow/core/distributed_runtime/collective_rma_distributed.cc @@ -61,6 +61,15 @@ class RecvBufCall : public CancellableCall { RecvBufResponse resp_; }; +void PopulateTensorFromExtra(const RecvBufRespExtra& extra, + Tensor* cpu_tensor) { + char* head = reinterpret_cast(DMAHelper::base(cpu_tensor)); + for (const auto& tensor_content_chunk : extra.tensor_content()) { + memcpy(head, tensor_content_chunk.data(), + tensor_content_chunk.size()); + head += tensor_content_chunk.size(); + } +} } // namespace void CollectiveRemoteAccessDistributed::RecvFromPeer( @@ -95,7 +104,10 @@ void CollectiveRemoteAccessDistributed::RecvFromPeer( // them into the destination tensor here. RecvBufRespExtra extra; state->call->resp_.transport_options().UnpackTo(&extra); - int64 num_bytes = extra.tensor_content().size(); + int64 num_bytes = 0; + for (const auto& chunk : extra.tensor_content()) { + num_bytes += chunk.size(); + } if (num_bytes != to_tensor->TotalBytes()) { done(errors::Internal("RecvBufResponse returned ", num_bytes, " bytes where to_tensor expected ", @@ -118,8 +130,7 @@ void CollectiveRemoteAccessDistributed::RecvFromPeer( cpu_attr.set_gpu_compatible(true); Tensor* cpu_tensor = new Tensor(cpu_dev->GetAllocator(cpu_attr), to_tensor->dtype(), to_tensor->shape()); - memcpy(DMAHelper::base(cpu_tensor), extra.tensor_content().data(), - num_bytes); + PopulateTensorFromExtra(extra, cpu_tensor); // Then copy it to the GPU. CopyTensor::ViaDMA("", // edge name (non-existent) nullptr /*send_dev_ctx*/, to_device_ctx, cpu_dev, @@ -135,8 +146,7 @@ void CollectiveRemoteAccessDistributed::RecvFromPeer( return; } else { // CPU device - memcpy(DMAHelper::base(to_tensor), extra.tensor_content().data(), - num_bytes); + PopulateTensorFromExtra(extra, to_tensor); } } if (!s.ok() && errors::IsFailedPrecondition(s)) { diff --git a/tensorflow/core/distributed_runtime/collective_rma_distributed_test.cc b/tensorflow/core/distributed_runtime/collective_rma_distributed_test.cc index bfd312410cb..33e1c8f2c33 100644 --- a/tensorflow/core/distributed_runtime/collective_rma_distributed_test.cc +++ b/tensorflow/core/distributed_runtime/collective_rma_distributed_test.cc @@ -104,7 +104,7 @@ class FakeWorker : public TestWorkerInterface { // bytes in the response. RecvBufRespExtra extra; int64 num_bytes = h->prod_value->TotalBytes(); - extra.set_tensor_content(string( + extra.add_tensor_content(string( reinterpret_cast(DMAHelper::base(h->prod_value)), num_bytes)); response->mutable_transport_options()->PackFrom(extra); diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc index c4f2247145c..63d438c6155 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc +++ b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc @@ -194,8 +194,8 @@ Status GrpcServer::Init( MaybeMutateBuilder(&builder); master_impl_ = CreateMaster(&master_env_); master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder); - worker_impl_ = - worker_func ? worker_func(&worker_env_) : NewGrpcWorker(&worker_env_); + worker_impl_ = worker_func ? worker_func(&worker_env_) + : NewGrpcWorker(&worker_env_, config); worker_service_ = NewGrpcWorkerService(worker_impl_.get(), &builder).release(); eager_service_ = new eager::GrpcEagerServiceImpl(&worker_env_, &builder); diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc index 1b6d796bd43..de80992095d 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc +++ b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc @@ -418,8 +418,13 @@ class GrpcWorkerService : public AsyncServiceInterface { } // namespace -GrpcWorker::GrpcWorker(WorkerEnv* worker_env) - : Worker(worker_env), recent_request_ids_(100000) {} +GrpcWorker::GrpcWorker(WorkerEnv* worker_env, const ConfigProto& config) + : Worker(worker_env), + recent_request_ids_(100000), + recv_buf_max_chunk_( + config.experimental().recv_buf_max_chunk() > 0 + ? config.experimental().recv_buf_max_chunk() + : (config.experimental().recv_buf_max_chunk() < 0 ? 0 : 4096)) {} // GrpcRecvTensorAsync: unlike the other Worker methods, which use protocol // buffers for a response object, to avoid extra protocol buffer serialization @@ -505,6 +510,33 @@ void GrpcWorker::GrpcRecvTensorAsync(CallOptions* opts, }); } +namespace { +// If RecvBufRespExtra.tensor_content is a single large string, then gRPC +// can stall on the recv side when the string buffer needs to be enlarged, +// since the size is not sent in advance. Changing this field to a sequence +// of small strings costs some extra time on the send side, since we do +// some otherwise unnecessary copies, but it improves runtime overall by +// improving flow control. Best performance is likely achieved with a +// max_chunk_bytes equal to the memory page size. +// +// TODO(tucker): When proto3 supports [ctype=CORD] then change +// RecvBufRespExtra.tensor_content to a cord instead of a repeated string, +// and remove this function. +void SetTensorInRecvBufResp(int64 max_chunk_bytes, const Tensor* tensor, + int64 num_bytes, RecvBufResponse* response) { + RecvBufRespExtra extra; + const char* head = reinterpret_cast(DMAHelper::base(tensor)); + while (num_bytes > 0) { + int64 bytes = + max_chunk_bytes > 0 ? std::min(num_bytes, max_chunk_bytes) : num_bytes; + extra.add_tensor_content(std::string(head, bytes)); + head += bytes; + num_bytes -= bytes; + } + response->mutable_transport_options()->PackFrom(extra); +} +} // namespace + void GrpcWorker::RecvBufAsync(CallOptions* opts, const RecvBufRequest* request, RecvBufResponse* response, StatusCallback done) { // This is a generic, low performance implementation appropriate for grpc. @@ -551,11 +583,8 @@ void GrpcWorker::RecvBufAsync(CallOptions* opts, const RecvBufRequest* request, [this, num_bytes, response, done, hook, cpu_tensor](const Status& s) { if (s.ok()) { - RecvBufRespExtra extra; - extra.set_tensor_content(reinterpret_cast( - DMAHelper::base(cpu_tensor)), - num_bytes); - response->mutable_transport_options()->PackFrom(extra); + SetTensorInRecvBufResp(recv_buf_max_chunk_, cpu_tensor, + num_bytes, response); } response->set_send_start_micros(env_->env->NowMicros()); done(s); @@ -566,11 +595,8 @@ void GrpcWorker::RecvBufAsync(CallOptions* opts, const RecvBufRequest* request, } } else { // Tensor is on CPU. - RecvBufRespExtra extra; - extra.set_tensor_content(reinterpret_cast( - DMAHelper::base(hook->prod_value)), - num_bytes); - response->mutable_transport_options()->PackFrom(extra); + SetTensorInRecvBufResp(recv_buf_max_chunk_, hook->prod_value, + num_bytes, response); } } response->set_send_start_micros(env_->env->NowMicros()); @@ -608,8 +634,9 @@ void GrpcWorker::LoggingAsync(const LoggingRequest* request, WorkerEnv* GrpcWorker::env() { return env_; } -std::unique_ptr NewGrpcWorker(WorkerEnv* env) { - return std::unique_ptr(new GrpcWorker(env)); +std::unique_ptr NewGrpcWorker(WorkerEnv* env, + const ConfigProto& config) { + return std::unique_ptr(new GrpcWorker(env, config)); } std::unique_ptr NewGrpcWorkerService( diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h index d9e48524dea..996617d385d 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h +++ b/tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h @@ -27,12 +27,13 @@ class ServerBuilder; namespace tensorflow { class AsyncServiceInterface; +class ConfigProto; struct WorkerEnv; struct WorkerSession; class GrpcWorker : public Worker { public: - GrpcWorker(WorkerEnv* env); + GrpcWorker(WorkerEnv* env, const ConfigProto& config); // Specialized version of RecvTensor for gRPC, which avoids a copy. virtual void GrpcRecvTensorAsync(CallOptions* opts, @@ -50,9 +51,11 @@ class GrpcWorker : public Worker { private: RecentRequestIds recent_request_ids_; + const int32 recv_buf_max_chunk_; }; -std::unique_ptr NewGrpcWorker(WorkerEnv* worker_env); +std::unique_ptr NewGrpcWorker(WorkerEnv* worker_env, + const ConfigProto& config); // Returns an implementation of WorkerService rpc service. std::unique_ptr NewGrpcWorkerService( diff --git a/tensorflow/core/protobuf/config.proto b/tensorflow/core/protobuf/config.proto index 104ab039cb7..1963a5b56e0 100644 --- a/tensorflow/core/protobuf/config.proto +++ b/tensorflow/core/protobuf/config.proto @@ -400,6 +400,11 @@ message ConfigProto { // Which executor to use, the default executor will be used // if it is an empty string or "DEFAULT" string executor_type = 3; + + // Guidance to formatting of large RecvBuf fields for transfer. + // Any positive value sets the max chunk size. 0 defaults to 4096. + // Any negative value indicates no max, i.e. one chunk only. + int32 recv_buf_max_chunk = 4; }; Experimental experimental = 16; diff --git a/tensorflow/core/protobuf/transport_options.proto b/tensorflow/core/protobuf/transport_options.proto index d7b1bddbbe3..1d32475e9b9 100644 --- a/tensorflow/core/protobuf/transport_options.proto +++ b/tensorflow/core/protobuf/transport_options.proto @@ -4,5 +4,5 @@ package tensorflow; // Extra data needed on a non-RDMA RecvBufResponse. message RecvBufRespExtra { - bytes tensor_content = 1; + repeated bytes tensor_content = 1; }; diff --git a/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.-experimental.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.-experimental.pbtxt index 9f6dcd8fdb0..f7491649c22 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.-experimental.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.-experimental.pbtxt @@ -14,6 +14,12 @@ tf_proto { label: LABEL_OPTIONAL type: TYPE_STRING } + field { + name: "recv_buf_max_chunk" + number: 4 + label: LABEL_OPTIONAL + type: TYPE_INT32 + } reserved_range { start: 2 end: 3 diff --git a/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.pbtxt index f3a515163df..53b532beab3 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.-config-proto.pbtxt @@ -137,6 +137,12 @@ tf_proto { label: LABEL_OPTIONAL type: TYPE_STRING } + field { + name: "recv_buf_max_chunk" + number: 4 + label: LABEL_OPTIONAL + type: TYPE_INT32 + } reserved_range { start: 2 end: 3 diff --git a/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.-experimental.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.-experimental.pbtxt index 9f6dcd8fdb0..f7491649c22 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.-experimental.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.-experimental.pbtxt @@ -14,6 +14,12 @@ tf_proto { label: LABEL_OPTIONAL type: TYPE_STRING } + field { + name: "recv_buf_max_chunk" + number: 4 + label: LABEL_OPTIONAL + type: TYPE_INT32 + } reserved_range { start: 2 end: 3 diff --git a/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.pbtxt index f3a515163df..53b532beab3 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.-config-proto.pbtxt @@ -137,6 +137,12 @@ tf_proto { label: LABEL_OPTIONAL type: TYPE_STRING } + field { + name: "recv_buf_max_chunk" + number: 4 + label: LABEL_OPTIONAL + type: TYPE_INT32 + } reserved_range { start: 2 end: 3