[tf.data service] Retry creation of jobs.
This lets us recover from errors when dispatchers are restarted during job creation. PiperOrigin-RevId: 325287906 Change-Id: I50f3348b72e988146dd09f2953a1ab1c102381b8
This commit is contained in:
parent
4d59bcb41e
commit
d21fe75e14
@ -168,6 +168,7 @@ tf_kernel_library(
|
|||||||
"//tensorflow/core/data:compression_utils",
|
"//tensorflow/core/data:compression_utils",
|
||||||
"//tensorflow/core/data:dataset_proto_cc",
|
"//tensorflow/core/data:dataset_proto_cc",
|
||||||
"//tensorflow/core/data/service:data_service",
|
"//tensorflow/core/data/service:data_service",
|
||||||
|
"//tensorflow/core/data/service:grpc_util",
|
||||||
"//tensorflow/core/distributed_runtime/rpc:grpc_util",
|
"//tensorflow/core/distributed_runtime/rpc:grpc_util",
|
||||||
"//tensorflow/core/kernels/data:dataset_utils",
|
"//tensorflow/core/kernels/data:dataset_utils",
|
||||||
"//tensorflow/core/kernels/data:name_utils",
|
"//tensorflow/core/kernels/data:name_utils",
|
||||||
|
@ -23,6 +23,7 @@ limitations under the License.
|
|||||||
#include "absl/strings/str_cat.h"
|
#include "absl/strings/str_cat.h"
|
||||||
#include "tensorflow/core/data/dataset.pb.h"
|
#include "tensorflow/core/data/dataset.pb.h"
|
||||||
#include "tensorflow/core/data/service/data_service.h"
|
#include "tensorflow/core/data/service/data_service.h"
|
||||||
|
#include "tensorflow/core/data/service/grpc_util.h"
|
||||||
#include "tensorflow/core/distributed_runtime/rpc/grpc_util.h"
|
#include "tensorflow/core/distributed_runtime/rpc/grpc_util.h"
|
||||||
#include "tensorflow/core/framework/dataset.h"
|
#include "tensorflow/core/framework/dataset.h"
|
||||||
#include "tensorflow/core/framework/model.h"
|
#include "tensorflow/core/framework/model.h"
|
||||||
@ -210,13 +211,23 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
|||||||
&deregister_fn_));
|
&deregister_fn_));
|
||||||
DataServiceDispatcherClient dispatcher(dataset()->address_,
|
DataServiceDispatcherClient dispatcher(dataset()->address_,
|
||||||
dataset()->protocol_);
|
dataset()->protocol_);
|
||||||
|
int64 deadline_micros = ctx->env()->NowMicros() + kRetryTimeoutMicros;
|
||||||
if (dataset()->job_name_.empty()) {
|
if (dataset()->job_name_.empty()) {
|
||||||
TF_RETURN_IF_ERROR(dispatcher.CreateJob(
|
TF_RETURN_IF_ERROR(grpc_util::Retry(
|
||||||
dataset()->dataset_id_, dataset()->processing_mode_, &job_id_));
|
[&]() {
|
||||||
|
return dispatcher.CreateJob(dataset()->dataset_id_,
|
||||||
|
dataset()->processing_mode_,
|
||||||
|
&job_id_);
|
||||||
|
},
|
||||||
|
"create job", deadline_micros));
|
||||||
} else {
|
} else {
|
||||||
TF_RETURN_IF_ERROR(dispatcher.GetOrCreateJob(
|
TF_RETURN_IF_ERROR(grpc_util::Retry(
|
||||||
|
[&]() {
|
||||||
|
return dispatcher.GetOrCreateJob(
|
||||||
dataset()->dataset_id_, dataset()->processing_mode_,
|
dataset()->dataset_id_, dataset()->processing_mode_,
|
||||||
dataset()->job_name_, iterator_index_, &job_id_));
|
dataset()->job_name_, iterator_index_, &job_id_);
|
||||||
|
},
|
||||||
|
"get or create job", deadline_micros));
|
||||||
}
|
}
|
||||||
VLOG(1) << "Created data service job with id " << job_id_;
|
VLOG(1) << "Created data service job with id " << job_id_;
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user