Change RecvBufRespExtra.tensor_content to a repeated string and fill

it with many small strings instead of one large one, when using gRPC.

Typing tensor_content as a Cord instead of a single string leads to
roughly a 20% speedup in a 2-worker (8 v100 GPUs each) benchmark training of
resnet50 using collective all-reduce for gradient reduction and gRPC for
all inter-worker transport.  It is hypothesized that without the Cord
type gRPC is stalling incoming RecvBuf RPCs as it repeatedly reallocates
and copies the strings.  Using a Cord to receive the value leads to much
better flow control.

Unfortunately, proto3 does not yet support [ctype=CORD], so we can't
use that simple and effective optimization.  This CL changes
tensor_content to a sequence of strings and sets a max single-string
size of 4KB, the likely page size.  (This default can be changed via
ConfigProto.experimental.recv_buf_max_chunk.) It achieves roughly a
12% speedup on the benchmark test.  The speedups are highly dependent
on topology and network weather since the major effect is believed to
be on flow control.

PiperOrigin-RevId: 219322231
This commit is contained in:
A. Unique TensorFlower 2018-10-30 10:24:22 -07:00 committed by TensorFlower Gardener
parent b4b3c41094
commit 462a79b7fe
11 changed files with 94 additions and 25 deletions

View File

@ -61,6 +61,15 @@ class RecvBufCall : public CancellableCall {
RecvBufResponse resp_;
};
void PopulateTensorFromExtra(const RecvBufRespExtra& extra,
Tensor* cpu_tensor) {
char* head = reinterpret_cast<char*>(DMAHelper::base(cpu_tensor));
for (const auto& tensor_content_chunk : extra.tensor_content()) {
memcpy(head, tensor_content_chunk.data(),
tensor_content_chunk.size());
head += tensor_content_chunk.size();
}
}
} // namespace
void CollectiveRemoteAccessDistributed::RecvFromPeer(
@ -95,7 +104,10 @@ void CollectiveRemoteAccessDistributed::RecvFromPeer(
// them into the destination tensor here.
RecvBufRespExtra extra;
state->call->resp_.transport_options().UnpackTo(&extra);
int64 num_bytes = extra.tensor_content().size();
int64 num_bytes = 0;
for (const auto& chunk : extra.tensor_content()) {
num_bytes += chunk.size();
}
if (num_bytes != to_tensor->TotalBytes()) {
done(errors::Internal("RecvBufResponse returned ", num_bytes,
" bytes where to_tensor expected ",
@ -118,8 +130,7 @@ void CollectiveRemoteAccessDistributed::RecvFromPeer(
cpu_attr.set_gpu_compatible(true);
Tensor* cpu_tensor = new Tensor(cpu_dev->GetAllocator(cpu_attr),
to_tensor->dtype(), to_tensor->shape());
memcpy(DMAHelper::base(cpu_tensor), extra.tensor_content().data(),
num_bytes);
PopulateTensorFromExtra(extra, cpu_tensor);
// Then copy it to the GPU.
CopyTensor::ViaDMA("", // edge name (non-existent)
nullptr /*send_dev_ctx*/, to_device_ctx, cpu_dev,
@ -135,8 +146,7 @@ void CollectiveRemoteAccessDistributed::RecvFromPeer(
return;
} else {
// CPU device
memcpy(DMAHelper::base(to_tensor), extra.tensor_content().data(),
num_bytes);
PopulateTensorFromExtra(extra, to_tensor);
}
}
if (!s.ok() && errors::IsFailedPrecondition(s)) {

View File

@ -104,7 +104,7 @@ class FakeWorker : public TestWorkerInterface {
// bytes in the response.
RecvBufRespExtra extra;
int64 num_bytes = h->prod_value->TotalBytes();
extra.set_tensor_content(string(
extra.add_tensor_content(string(
reinterpret_cast<const char*>(DMAHelper::base(h->prod_value)),
num_bytes));
response->mutable_transport_options()->PackFrom(extra);

View File

@ -194,8 +194,8 @@ Status GrpcServer::Init(
MaybeMutateBuilder(&builder);
master_impl_ = CreateMaster(&master_env_);
master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder);
worker_impl_ =
worker_func ? worker_func(&worker_env_) : NewGrpcWorker(&worker_env_);
worker_impl_ = worker_func ? worker_func(&worker_env_)
: NewGrpcWorker(&worker_env_, config);
worker_service_ =
NewGrpcWorkerService(worker_impl_.get(), &builder).release();
eager_service_ = new eager::GrpcEagerServiceImpl(&worker_env_, &builder);

View File

@ -418,8 +418,13 @@ class GrpcWorkerService : public AsyncServiceInterface {
} // namespace
GrpcWorker::GrpcWorker(WorkerEnv* worker_env)
: Worker(worker_env), recent_request_ids_(100000) {}
GrpcWorker::GrpcWorker(WorkerEnv* worker_env, const ConfigProto& config)
: Worker(worker_env),
recent_request_ids_(100000),
recv_buf_max_chunk_(
config.experimental().recv_buf_max_chunk() > 0
? config.experimental().recv_buf_max_chunk()
: (config.experimental().recv_buf_max_chunk() < 0 ? 0 : 4096)) {}
// GrpcRecvTensorAsync: unlike the other Worker methods, which use protocol
// buffers for a response object, to avoid extra protocol buffer serialization
@ -505,6 +510,33 @@ void GrpcWorker::GrpcRecvTensorAsync(CallOptions* opts,
});
}
namespace {
// If RecvBufRespExtra.tensor_content is a single large string, then gRPC
// can stall on the recv side when the string buffer needs to be enlarged,
// since the size is not sent in advance. Changing this field to a sequence
// of small strings costs some extra time on the send side, since we do
// some otherwise unnecessary copies, but it improves runtime overall by
// improving flow control. Best performance is likely achieved with a
// max_chunk_bytes equal to the memory page size.
//
// TODO(tucker): When proto3 supports [ctype=CORD] then change
// RecvBufRespExtra.tensor_content to a cord instead of a repeated string,
// and remove this function.
void SetTensorInRecvBufResp(int64 max_chunk_bytes, const Tensor* tensor,
int64 num_bytes, RecvBufResponse* response) {
RecvBufRespExtra extra;
const char* head = reinterpret_cast<const char*>(DMAHelper::base(tensor));
while (num_bytes > 0) {
int64 bytes =
max_chunk_bytes > 0 ? std::min(num_bytes, max_chunk_bytes) : num_bytes;
extra.add_tensor_content(std::string(head, bytes));
head += bytes;
num_bytes -= bytes;
}
response->mutable_transport_options()->PackFrom(extra);
}
} // namespace
void GrpcWorker::RecvBufAsync(CallOptions* opts, const RecvBufRequest* request,
RecvBufResponse* response, StatusCallback done) {
// This is a generic, low performance implementation appropriate for grpc.
@ -551,11 +583,8 @@ void GrpcWorker::RecvBufAsync(CallOptions* opts, const RecvBufRequest* request,
[this, num_bytes, response, done, hook,
cpu_tensor](const Status& s) {
if (s.ok()) {
RecvBufRespExtra extra;
extra.set_tensor_content(reinterpret_cast<const char*>(
DMAHelper::base(cpu_tensor)),
num_bytes);
response->mutable_transport_options()->PackFrom(extra);
SetTensorInRecvBufResp(recv_buf_max_chunk_, cpu_tensor,
num_bytes, response);
}
response->set_send_start_micros(env_->env->NowMicros());
done(s);
@ -566,11 +595,8 @@ void GrpcWorker::RecvBufAsync(CallOptions* opts, const RecvBufRequest* request,
}
} else {
// Tensor is on CPU.
RecvBufRespExtra extra;
extra.set_tensor_content(reinterpret_cast<const char*>(
DMAHelper::base(hook->prod_value)),
num_bytes);
response->mutable_transport_options()->PackFrom(extra);
SetTensorInRecvBufResp(recv_buf_max_chunk_, hook->prod_value,
num_bytes, response);
}
}
response->set_send_start_micros(env_->env->NowMicros());
@ -608,8 +634,9 @@ void GrpcWorker::LoggingAsync(const LoggingRequest* request,
WorkerEnv* GrpcWorker::env() { return env_; }
std::unique_ptr<GrpcWorker> NewGrpcWorker(WorkerEnv* env) {
return std::unique_ptr<GrpcWorker>(new GrpcWorker(env));
std::unique_ptr<GrpcWorker> NewGrpcWorker(WorkerEnv* env,
const ConfigProto& config) {
return std::unique_ptr<GrpcWorker>(new GrpcWorker(env, config));
}
std::unique_ptr<AsyncServiceInterface> NewGrpcWorkerService(

View File

@ -27,12 +27,13 @@ class ServerBuilder;
namespace tensorflow {
class AsyncServiceInterface;
class ConfigProto;
struct WorkerEnv;
struct WorkerSession;
class GrpcWorker : public Worker {
public:
GrpcWorker(WorkerEnv* env);
GrpcWorker(WorkerEnv* env, const ConfigProto& config);
// Specialized version of RecvTensor for gRPC, which avoids a copy.
virtual void GrpcRecvTensorAsync(CallOptions* opts,
@ -50,9 +51,11 @@ class GrpcWorker : public Worker {
private:
RecentRequestIds recent_request_ids_;
const int32 recv_buf_max_chunk_;
};
std::unique_ptr<GrpcWorker> NewGrpcWorker(WorkerEnv* worker_env);
std::unique_ptr<GrpcWorker> NewGrpcWorker(WorkerEnv* worker_env,
const ConfigProto& config);
// Returns an implementation of WorkerService rpc service.
std::unique_ptr<AsyncServiceInterface> NewGrpcWorkerService(

View File

@ -400,6 +400,11 @@ message ConfigProto {
// Which executor to use, the default executor will be used
// if it is an empty string or "DEFAULT"
string executor_type = 3;
// Guidance to formatting of large RecvBuf fields for transfer.
// Any positive value sets the max chunk size. 0 defaults to 4096.
// Any negative value indicates no max, i.e. one chunk only.
int32 recv_buf_max_chunk = 4;
};
Experimental experimental = 16;

View File

@ -4,5 +4,5 @@ package tensorflow;
// Extra data needed on a non-RDMA RecvBufResponse.
message RecvBufRespExtra {
bytes tensor_content = 1;
repeated bytes tensor_content = 1;
};

View File

@ -14,6 +14,12 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_STRING
}
field {
name: "recv_buf_max_chunk"
number: 4
label: LABEL_OPTIONAL
type: TYPE_INT32
}
reserved_range {
start: 2
end: 3

View File

@ -137,6 +137,12 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_STRING
}
field {
name: "recv_buf_max_chunk"
number: 4
label: LABEL_OPTIONAL
type: TYPE_INT32
}
reserved_range {
start: 2
end: 3

View File

@ -14,6 +14,12 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_STRING
}
field {
name: "recv_buf_max_chunk"
number: 4
label: LABEL_OPTIONAL
type: TYPE_INT32
}
reserved_range {
start: 2
end: 3

View File

@ -137,6 +137,12 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_STRING
}
field {
name: "recv_buf_max_chunk"
number: 4
label: LABEL_OPTIONAL
type: TYPE_INT32
}
reserved_range {
start: 2
end: 3