Move cache_rpc_response from GrpcWorkerService to GrpcWorker, since the rpc cache is maintained by GrpcWorker.
Imported from GitHub PR #29379 PiperOrigin-RevId: 257005058
This commit is contained in:
parent
07323ccf85
commit
29c522fd3e
@ -370,11 +370,6 @@ class GrpcWorkerService : public AsyncServiceInterface {
|
|||||||
GrpcWorkerServiceOptions options)
|
GrpcWorkerServiceOptions options)
|
||||||
: is_shutdown_(false) {
|
: is_shutdown_(false) {
|
||||||
builder->RegisterService(&worker_service_);
|
builder->RegisterService(&worker_service_);
|
||||||
// TODO(jingdong): it would be cleaner to move this option to GrpcWorker
|
|
||||||
// since the cache is maintained by GrpcWorker now.
|
|
||||||
if (options.cache_rpc_response) {
|
|
||||||
worker->EnableResponseCache();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < options.num_serving_threads; i++) {
|
for (int i = 0; i < options.num_serving_threads; i++) {
|
||||||
threads_.emplace_back(
|
threads_.emplace_back(
|
||||||
@ -428,7 +423,11 @@ GrpcWorker::GrpcWorker(WorkerEnv* worker_env, const ConfigProto& config)
|
|||||||
recv_buf_max_chunk_(
|
recv_buf_max_chunk_(
|
||||||
config.experimental().recv_buf_max_chunk() > 0
|
config.experimental().recv_buf_max_chunk() > 0
|
||||||
? config.experimental().recv_buf_max_chunk()
|
? config.experimental().recv_buf_max_chunk()
|
||||||
: (config.experimental().recv_buf_max_chunk() < 0 ? 0 : 4096)) {}
|
: (config.experimental().recv_buf_max_chunk() < 0 ? 0 : 4096)) {
|
||||||
|
if (config.rpc_options().cache_rpc_response()) {
|
||||||
|
EnableResponseCache();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void GrpcWorker::EnableResponseCache() {
|
void GrpcWorker::EnableResponseCache() {
|
||||||
VLOG(1) << "Enabling gRPC tensor response cache.";
|
VLOG(1) << "Enabling gRPC tensor response cache.";
|
||||||
|
@ -75,14 +75,6 @@ struct GrpcWorkerServiceOptions {
|
|||||||
// default queue depth for a method.
|
// default queue depth for a method.
|
||||||
std::unordered_map<int, int> queue_depth;
|
std::unordered_map<int, int> queue_depth;
|
||||||
int num_serving_threads = 8;
|
int num_serving_threads = 8;
|
||||||
|
|
||||||
// Setting cache_rpc_response to true will enable sender side caching of
|
|
||||||
// response for RecvTensorAsync and RecvBufAsync to allow receiver to retry
|
|
||||||
// requests . This is only necessary when the network fabric is experiencing a
|
|
||||||
// significant error rate. Without it we'll fail a step on an network error,
|
|
||||||
// while with it we'll be able to complete long steps (like complex
|
|
||||||
// initializations) in the face of some network errors during RecvTensor.
|
|
||||||
bool cache_rpc_response = false;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Returns an implementation of WorkerService rpc service.
|
// Returns an implementation of WorkerService rpc service.
|
||||||
|
@ -327,6 +327,14 @@ message RPCOptions {
|
|||||||
// If compression_algorithm is set, the compression level to be used.
|
// If compression_algorithm is set, the compression level to be used.
|
||||||
// From 0 (no compression), up to 3.
|
// From 0 (no compression), up to 3.
|
||||||
int32 compression_level = 3;
|
int32 compression_level = 3;
|
||||||
|
|
||||||
|
// Setting cache_rpc_response to true will enable sender side caching of
|
||||||
|
// response for RecvTensorAsync and RecvBufAsync to allow receiver to retry
|
||||||
|
// requests . This is only necessary when the network fabric is experiencing a
|
||||||
|
// significant error rate. Without it we'll fail a step on an network error,
|
||||||
|
// while with it we'll be able to complete long steps (like complex
|
||||||
|
// initializations) in the face of some network errors during RecvTensor.
|
||||||
|
bool cache_rpc_response = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metadata about the session.
|
// Metadata about the session.
|
||||||
|
Loading…
Reference in New Issue
Block a user