Improve variable naming consistency in data_service_dataset_op
PiperOrigin-RevId: 309314268 Change-Id: I4c445fe65291020c0bf4fa43e4648b589d26e8f0
This commit is contained in:
parent
8d1992fd27
commit
41f64f0788
@ -232,9 +232,8 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
// TODO(aaudibert): Instead of polling, have master send updates when
|
||||
// the list of tasks changes.
|
||||
void TaskThreadManager(std::unique_ptr<IteratorContext> ctx) {
|
||||
VLOG(3) << "Starting task handler manager";
|
||||
VLOG(3) << "Starting task thread manager";
|
||||
DataServiceMasterClient master(dataset()->address_, dataset()->protocol_);
|
||||
|
||||
uint64 next_check = Env::Default()->NowMicros();
|
||||
while (true) {
|
||||
{
|
||||
@ -242,7 +241,8 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
// All units are microseconds.
|
||||
while (!cancelled_ && Env::Default()->NowMicros() < next_check) {
|
||||
int64 remaining_time = next_check - Env::Default()->NowMicros();
|
||||
VLOG(3) << "Task manager waiting for " << remaining_time << "us";
|
||||
VLOG(3) << "Task thread manager waiting for " << remaining_time
|
||||
<< "us";
|
||||
cv_.wait_for(l, std::chrono::microseconds(remaining_time));
|
||||
}
|
||||
if (cancelled_) {
|
||||
@ -258,7 +258,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
|
||||
void UpdateTaskThreads(DataServiceMasterClient* master,
|
||||
IteratorContext* ctx) LOCKS_EXCLUDED(mu_) {
|
||||
VLOG(3) << "Updating task handler threads";
|
||||
VLOG(3) << "Updating task threads";
|
||||
std::vector<TaskInfo> tasks;
|
||||
bool job_finished;
|
||||
Status s = master->GetTasks(job_id_, &tasks, &job_finished);
|
||||
@ -276,23 +276,23 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
continue;
|
||||
}
|
||||
task_threads_[task.id()] = absl::make_unique<TaskThread>();
|
||||
TaskThread* task_handler = task_threads_[task.id()].get();
|
||||
task_handler->task_id = task.id();
|
||||
task_handler->address = task.worker_address();
|
||||
TaskThread* task_thread = task_threads_[task.id()].get();
|
||||
task_thread->task_id = task.id();
|
||||
task_thread->address = task.worker_address();
|
||||
num_unfinished_tasks_++;
|
||||
outstanding_requests_++;
|
||||
auto done = [this, task_handler]() {
|
||||
auto done = [this, task_thread]() {
|
||||
mutex_lock l(mu_);
|
||||
num_unfinished_tasks_--;
|
||||
outstanding_requests_--;
|
||||
cv_.notify_all();
|
||||
task_handler->finished = true;
|
||||
VLOG(3) << "Task thread " << task_handler->task_id << " finished";
|
||||
task_thread->finished = true;
|
||||
VLOG(3) << "Task thread " << task_thread->task_id << " finished";
|
||||
};
|
||||
task_handler->thread =
|
||||
ctx->StartThread("tf-data-service-task_handler",
|
||||
[this, task_handler, done = std::move(done)]() {
|
||||
RunTaskThread(task_handler, std::move(done));
|
||||
task_thread->thread =
|
||||
ctx->StartThread("tf-data-service-task_thread",
|
||||
[this, task_thread, done = std::move(done)]() {
|
||||
RunTaskThread(task_thread, std::move(done));
|
||||
});
|
||||
}
|
||||
// Mark deleted tasks and clean up finished task threads.
|
||||
@ -315,32 +315,30 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
}
|
||||
}
|
||||
|
||||
void RunTaskThread(TaskThread* task_handler, std::function<void()> done) {
|
||||
void RunTaskThread(TaskThread* task_thread, std::function<void()> done) {
|
||||
auto cleanup = gtl::MakeCleanup([done = std::move(done)]() { done(); });
|
||||
VLOG(3) << "Starting task handler thread for task "
|
||||
<< task_handler->task_id << " with worker address "
|
||||
<< task_handler->address;
|
||||
VLOG(3) << "Starting task thread for task " << task_thread->task_id
|
||||
<< " with worker address " << task_thread->address;
|
||||
while (true) {
|
||||
if (!task_handler->worker) {
|
||||
Status s = CreateDataServiceWorkerClient(task_handler->address,
|
||||
dataset()->protocol_,
|
||||
&task_handler->worker);
|
||||
if (!task_thread->worker) {
|
||||
Status s = CreateDataServiceWorkerClient(
|
||||
task_thread->address, dataset()->protocol_, &task_thread->worker);
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << "Failed to create a worker client for "
|
||||
<< task_handler->address << ": " << s;
|
||||
<< task_thread->address << ": " << s;
|
||||
}
|
||||
}
|
||||
{
|
||||
mutex_lock l(mu_);
|
||||
if (task_handler->end_of_sequence) {
|
||||
VLOG(3) << "Task thread " << task_handler->task_id
|
||||
if (task_thread->end_of_sequence) {
|
||||
VLOG(3) << "Task thread" << task_thread->task_id
|
||||
<< " reached end_of_sequence";
|
||||
return;
|
||||
}
|
||||
outstanding_requests_--;
|
||||
while (!cancelled_ && results_.size() + outstanding_requests_ >=
|
||||
max_outstanding_requests_) {
|
||||
VLOG(3) << "Task thread for task " << task_handler->task_id
|
||||
VLOG(3) << "Task thread for task " << task_thread->task_id
|
||||
<< " waiting. results_.size()=" << results_.size()
|
||||
<< " outstanding_requests_=" << outstanding_requests_;
|
||||
cv_.wait(l);
|
||||
@ -353,10 +351,10 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
// TODO(aaudibert): add backoff and max retries.
|
||||
int64 deadline_micros =
|
||||
Env::Default()->NowMicros() + kRetryTimeoutMicros;
|
||||
Status s = FetchElement(task_handler, deadline_micros);
|
||||
Status s = FetchElement(task_thread, deadline_micros);
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << "Failed to fetch element from worker at "
|
||||
<< task_handler->address << ": " << s;
|
||||
<< task_thread->address << ": " << s;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -366,13 +364,13 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
// If the task reaches end_of_sequence or is cancelled (e.g. due to a
|
||||
// worker dying), FetchElement returns Status::OK() without adding to
|
||||
// `results_`.
|
||||
Status FetchElement(TaskThread* task_handler, int64 deadline_micros) {
|
||||
VLOG(3) << "Fetching an element for task id " << task_handler->task_id;
|
||||
Status FetchElement(TaskThread* task_thread, int64 deadline_micros) {
|
||||
VLOG(3) << "Fetching an element for task id " << task_thread->task_id;
|
||||
CompressedElement compressed;
|
||||
bool end_of_sequence;
|
||||
for (int num_retries = 0;; ++num_retries) {
|
||||
Status s = task_handler->worker->GetElement(
|
||||
task_handler->task_id, &compressed, &end_of_sequence);
|
||||
Status s = task_thread->worker->GetElement(
|
||||
task_thread->task_id, &compressed, &end_of_sequence);
|
||||
if (s.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -385,7 +383,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
mutex_lock l(mu_);
|
||||
// If `UpdateTaskThreads` finds that the task has been cancelled, it
|
||||
// will set end_of_sequence to `true`.
|
||||
if (task_handler->end_of_sequence || cancelled_) {
|
||||
if (task_thread->end_of_sequence || cancelled_) {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
@ -411,12 +409,12 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
|
||||
}
|
||||
mutex_lock l(mu_);
|
||||
if (end_of_sequence) {
|
||||
task_handler->end_of_sequence = true;
|
||||
task_thread->end_of_sequence = true;
|
||||
return Status::OK();
|
||||
}
|
||||
results_.push(std::move(element));
|
||||
cv_.notify_all();
|
||||
VLOG(3) << "Fetched an element for task id " << task_handler->task_id;
|
||||
VLOG(3) << "Fetched an element for task id " << task_thread->task_id;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user