From 41f64f07889773040e408f50da18da706dfb1eea Mon Sep 17 00:00:00 2001 From: Andrew Audibert <aaudibert@google.com> Date: Thu, 30 Apr 2020 15:43:50 -0700 Subject: [PATCH] Improve variable naming consistency in data_service_dataset_op PiperOrigin-RevId: 309314268 Change-Id: I4c445fe65291020c0bf4fa43e4648b589d26e8f0 --- .../experimental/data_service_dataset_op.cc | 68 +++++++++---------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 80425215121..ba5c3a54871 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -232,9 +232,8 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { // TODO(aaudibert): Instead of polling, have master send updates when // the list of tasks changes. void TaskThreadManager(std::unique_ptr<IteratorContext> ctx) { - VLOG(3) << "Starting task handler manager"; + VLOG(3) << "Starting task thread manager"; DataServiceMasterClient master(dataset()->address_, dataset()->protocol_); - uint64 next_check = Env::Default()->NowMicros(); while (true) { { @@ -242,7 +241,8 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { // All units are microseconds. while (!cancelled_ && Env::Default()->NowMicros() < next_check) { int64 remaining_time = next_check - Env::Default()->NowMicros(); - VLOG(3) << "Task manager waiting for " << remaining_time << "us"; + VLOG(3) << "Task thread manager waiting for " << remaining_time + << "us"; cv_.wait_for(l, std::chrono::microseconds(remaining_time)); } if (cancelled_) { @@ -258,7 +258,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { void UpdateTaskThreads(DataServiceMasterClient* master, IteratorContext* ctx) LOCKS_EXCLUDED(mu_) { - VLOG(3) << "Updating task handler threads"; + VLOG(3) << "Updating task threads"; std::vector<TaskInfo> tasks; bool job_finished; Status s = master->GetTasks(job_id_, &tasks, &job_finished); @@ -276,23 +276,23 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { continue; } task_threads_[task.id()] = absl::make_unique<TaskThread>(); - TaskThread* task_handler = task_threads_[task.id()].get(); - task_handler->task_id = task.id(); - task_handler->address = task.worker_address(); + TaskThread* task_thread = task_threads_[task.id()].get(); + task_thread->task_id = task.id(); + task_thread->address = task.worker_address(); num_unfinished_tasks_++; outstanding_requests_++; - auto done = [this, task_handler]() { + auto done = [this, task_thread]() { mutex_lock l(mu_); num_unfinished_tasks_--; outstanding_requests_--; cv_.notify_all(); - task_handler->finished = true; - VLOG(3) << "Task thread " << task_handler->task_id << " finished"; + task_thread->finished = true; + VLOG(3) << "Task thread " << task_thread->task_id << " finished"; }; - task_handler->thread = - ctx->StartThread("tf-data-service-task_handler", - [this, task_handler, done = std::move(done)]() { - RunTaskThread(task_handler, std::move(done)); + task_thread->thread = + ctx->StartThread("tf-data-service-task_thread", + [this, task_thread, done = std::move(done)]() { + RunTaskThread(task_thread, std::move(done)); }); } // Mark deleted tasks and clean up finished task threads. @@ -315,32 +315,30 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } } - void RunTaskThread(TaskThread* task_handler, std::function<void()> done) { + void RunTaskThread(TaskThread* task_thread, std::function<void()> done) { auto cleanup = gtl::MakeCleanup([done = std::move(done)]() { done(); }); - VLOG(3) << "Starting task handler thread for task " - << task_handler->task_id << " with worker address " - << task_handler->address; + VLOG(3) << "Starting task thread for task " << task_thread->task_id + << " with worker address " << task_thread->address; while (true) { - if (!task_handler->worker) { - Status s = CreateDataServiceWorkerClient(task_handler->address, - dataset()->protocol_, - &task_handler->worker); + if (!task_thread->worker) { + Status s = CreateDataServiceWorkerClient( + task_thread->address, dataset()->protocol_, &task_thread->worker); if (!s.ok()) { LOG(WARNING) << "Failed to create a worker client for " - << task_handler->address << ": " << s; + << task_thread->address << ": " << s; } } { mutex_lock l(mu_); - if (task_handler->end_of_sequence) { - VLOG(3) << "Task thread " << task_handler->task_id + if (task_thread->end_of_sequence) { + VLOG(3) << "Task thread" << task_thread->task_id << " reached end_of_sequence"; return; } outstanding_requests_--; while (!cancelled_ && results_.size() + outstanding_requests_ >= max_outstanding_requests_) { - VLOG(3) << "Task thread for task " << task_handler->task_id + VLOG(3) << "Task thread for task " << task_thread->task_id << " waiting. results_.size()=" << results_.size() << " outstanding_requests_=" << outstanding_requests_; cv_.wait(l); @@ -353,10 +351,10 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { // TODO(aaudibert): add backoff and max retries. int64 deadline_micros = Env::Default()->NowMicros() + kRetryTimeoutMicros; - Status s = FetchElement(task_handler, deadline_micros); + Status s = FetchElement(task_thread, deadline_micros); if (!s.ok()) { LOG(WARNING) << "Failed to fetch element from worker at " - << task_handler->address << ": " << s; + << task_thread->address << ": " << s; } } } @@ -366,13 +364,13 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { // If the task reaches end_of_sequence or is cancelled (e.g. due to a // worker dying), FetchElement returns Status::OK() without adding to // `results_`. - Status FetchElement(TaskThread* task_handler, int64 deadline_micros) { - VLOG(3) << "Fetching an element for task id " << task_handler->task_id; + Status FetchElement(TaskThread* task_thread, int64 deadline_micros) { + VLOG(3) << "Fetching an element for task id " << task_thread->task_id; CompressedElement compressed; bool end_of_sequence; for (int num_retries = 0;; ++num_retries) { - Status s = task_handler->worker->GetElement( - task_handler->task_id, &compressed, &end_of_sequence); + Status s = task_thread->worker->GetElement( + task_thread->task_id, &compressed, &end_of_sequence); if (s.ok()) { break; } @@ -385,7 +383,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { mutex_lock l(mu_); // If `UpdateTaskThreads` finds that the task has been cancelled, it // will set end_of_sequence to `true`. - if (task_handler->end_of_sequence || cancelled_) { + if (task_thread->end_of_sequence || cancelled_) { return Status::OK(); } } @@ -411,12 +409,12 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } mutex_lock l(mu_); if (end_of_sequence) { - task_handler->end_of_sequence = true; + task_thread->end_of_sequence = true; return Status::OK(); } results_.push(std::move(element)); cv_.notify_all(); - VLOG(3) << "Fetched an element for task id " << task_handler->task_id; + VLOG(3) << "Fetched an element for task id " << task_thread->task_id; return Status::OK(); }