From d21fe75e1420aa2fb16db1148cf39f5e3093cebe Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Thu, 6 Aug 2020 12:41:18 -0700 Subject: [PATCH] [tf.data service] Retry creation of jobs. This lets us recover from errors when dispatchers are restarted during job creation. PiperOrigin-RevId: 325287906 Change-Id: I50f3348b72e988146dd09f2953a1ab1c102381b8 --- .../core/kernels/data/experimental/BUILD | 1 + .../experimental/data_service_dataset_op.cc | 21 ++++++++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/tensorflow/core/kernels/data/experimental/BUILD b/tensorflow/core/kernels/data/experimental/BUILD index b7a3b774cc5..a4682e09b2a 100644 --- a/tensorflow/core/kernels/data/experimental/BUILD +++ b/tensorflow/core/kernels/data/experimental/BUILD @@ -168,6 +168,7 @@ tf_kernel_library( "//tensorflow/core/data:compression_utils", "//tensorflow/core/data:dataset_proto_cc", "//tensorflow/core/data/service:data_service", + "//tensorflow/core/data/service:grpc_util", "//tensorflow/core/distributed_runtime/rpc:grpc_util", "//tensorflow/core/kernels/data:dataset_utils", "//tensorflow/core/kernels/data:name_utils", diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 233a61f440e..ca73799bd24 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -23,6 +23,7 @@ limitations under the License. #include "absl/strings/str_cat.h" #include "tensorflow/core/data/dataset.pb.h" #include "tensorflow/core/data/service/data_service.h" +#include "tensorflow/core/data/service/grpc_util.h" #include "tensorflow/core/distributed_runtime/rpc/grpc_util.h" #include "tensorflow/core/framework/dataset.h" #include "tensorflow/core/framework/model.h" @@ -210,13 +211,23 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { &deregister_fn_)); DataServiceDispatcherClient dispatcher(dataset()->address_, dataset()->protocol_); + int64 deadline_micros = ctx->env()->NowMicros() + kRetryTimeoutMicros; if (dataset()->job_name_.empty()) { - TF_RETURN_IF_ERROR(dispatcher.CreateJob( - dataset()->dataset_id_, dataset()->processing_mode_, &job_id_)); + TF_RETURN_IF_ERROR(grpc_util::Retry( + [&]() { + return dispatcher.CreateJob(dataset()->dataset_id_, + dataset()->processing_mode_, + &job_id_); + }, + "create job", deadline_micros)); } else { - TF_RETURN_IF_ERROR(dispatcher.GetOrCreateJob( - dataset()->dataset_id_, dataset()->processing_mode_, - dataset()->job_name_, iterator_index_, &job_id_)); + TF_RETURN_IF_ERROR(grpc_util::Retry( + [&]() { + return dispatcher.GetOrCreateJob( + dataset()->dataset_id_, dataset()->processing_mode_, + dataset()->job_name_, iterator_index_, &job_id_); + }, + "get or create job", deadline_micros)); } VLOG(1) << "Created data service job with id " << job_id_; return Status::OK();