diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 368489d9dc7..545bd1c13f3 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -186,7 +186,7 @@ Status DataServiceDispatcherImpl::Start() { Status DataServiceDispatcherImpl::RestoreSplitProvider( const Job& job, std::unique_ptr& restored) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { int64 index = job.distributed_epoch_state.value().split_provider_index; VLOG(1) << "Restoring split provider for job " << job.job_id << " to index " << index; @@ -341,7 +341,7 @@ Status DataServiceDispatcherImpl::GetSplit(const GetSplitRequest* request, Status DataServiceDispatcherImpl::MakeSplitProvider( int64 dataset_id, std::unique_ptr& split_provider) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::shared_ptr dataset; TF_RETURN_IF_ERROR(state_.DatasetFromId(dataset_id, dataset)); std::shared_ptr dataset_def; @@ -393,7 +393,7 @@ Status DataServiceDispatcherImpl::GetOrRegisterDataset( Status DataServiceDispatcherImpl::RegisterDataset(uint64 fingerprint, const DatasetDef& dataset, int64& dataset_id) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { dataset_id = state_.NextAvailableDatasetId(); Update update; RegisterDatasetUpdate* register_dataset = update.mutable_register_dataset(); @@ -487,7 +487,7 @@ Status DataServiceDispatcherImpl::ReleaseJobClient( // Validates that the job matches the given processing_mode and dataset_id. Status DataServiceDispatcherImpl::ValidateMatchingJob( std::shared_ptr job, ProcessingMode processing_mode, - int64 dataset_id) EXCLUSIVE_LOCKS_REQUIRED(mu_) { + int64 dataset_id) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { DCHECK(job->named_job_key.has_value()); std::string job_name = job->named_job_key->name; if (job->processing_mode != processing_mode) { @@ -506,7 +506,7 @@ Status DataServiceDispatcherImpl::ValidateMatchingJob( Status DataServiceDispatcherImpl::CreateJob( int64 dataset_id, ProcessingMode processing_mode, absl::optional named_job_key, std::shared_ptr& job) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { switch (processing_mode) { case ProcessingMode::PARALLEL_EPOCHS: case ProcessingMode::DISTRIBUTED_EPOCH: @@ -535,7 +535,7 @@ Status DataServiceDispatcherImpl::CreateJob( } Status DataServiceDispatcherImpl::CreateTasksForWorker( - const std::string& worker_address) EXCLUSIVE_LOCKS_REQUIRED(mu_) { + const std::string& worker_address) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> jobs = state_.ListJobs(); for (const auto& job : jobs) { if (job->finished) { @@ -549,7 +549,7 @@ Status DataServiceDispatcherImpl::CreateTasksForWorker( Status DataServiceDispatcherImpl::AcquireJobClientId( const std::shared_ptr& job, int64& job_client_id) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { job_client_id = state_.NextAvailableJobClientId(); Update update; AcquireJobClientUpdate* acquire_job_client = @@ -563,7 +563,7 @@ Status DataServiceDispatcherImpl::AcquireJobClientId( Status DataServiceDispatcherImpl::CreateTasksForJob( std::shared_ptr job, std::vector>& tasks) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> workers = state_.ListWorkers(); tasks.clear(); tasks.reserve(workers.size()); @@ -578,7 +578,7 @@ Status DataServiceDispatcherImpl::CreateTasksForJob( Status DataServiceDispatcherImpl::CreateTask(std::shared_ptr job, const std::string& worker_address, std::shared_ptr& task) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { int64 task_id = state_.NextAvailableTaskId(); Update update; CreateTaskUpdate* create_task = update.mutable_create_task(); @@ -593,7 +593,7 @@ Status DataServiceDispatcherImpl::CreateTask(std::shared_ptr job, } Status DataServiceDispatcherImpl::AssignTasks( - std::vector> tasks) LOCKS_EXCLUDED(mu_) { + std::vector> tasks) TF_LOCKS_EXCLUDED(mu_) { for (const auto& task : tasks) { TF_RETURN_IF_ERROR(AssignTask(task)); } @@ -602,7 +602,7 @@ Status DataServiceDispatcherImpl::AssignTasks( Status DataServiceDispatcherImpl::GetOrCreateWorkerStub( const std::string& worker_address, WorkerService::Stub*& out_stub) - LOCKS_EXCLUDED(mu_) { + TF_LOCKS_EXCLUDED(mu_) { { mutex_lock l(mu_); auto it = worker_stubs_.find(worker_address); @@ -627,7 +627,7 @@ Status DataServiceDispatcherImpl::GetOrCreateWorkerStub( } Status DataServiceDispatcherImpl::AssignTask(std::shared_ptr task) - LOCKS_EXCLUDED(mu_) { + TF_LOCKS_EXCLUDED(mu_) { VLOG(2) << "Started assigning task " << task->task_id << " to worker " << task->worker_address; grpc::ClientContext client_ctx; @@ -710,7 +710,7 @@ Status DataServiceDispatcherImpl::GetWorkers(const GetWorkersRequest* request, return Status::OK(); } -Status DataServiceDispatcherImpl::CheckStarted() LOCKS_EXCLUDED(mu_) { +Status DataServiceDispatcherImpl::CheckStarted() TF_LOCKS_EXCLUDED(mu_) { mutex_lock l(mu_); if (!started_) { return errors::Unavailable("Dispatcher has not started yet."); @@ -721,7 +721,7 @@ Status DataServiceDispatcherImpl::CheckStarted() LOCKS_EXCLUDED(mu_) { Status DataServiceDispatcherImpl::RecordSplitProduced(int64 job_id, int64 repetition, bool finished) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { Update update; ProduceSplitUpdate* produce_split = update.mutable_produce_split(); produce_split->set_job_id(job_id); @@ -731,12 +731,12 @@ Status DataServiceDispatcherImpl::RecordSplitProduced(int64 job_id, } Status DataServiceDispatcherImpl::ApplyWithoutJournaling(const Update& update) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return state_.Apply(update); } Status DataServiceDispatcherImpl::Apply(const Update& update) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { if (journal_writer_.has_value()) { TF_RETURN_IF_ERROR(journal_writer_.value()->Write(update)); } @@ -764,7 +764,7 @@ void DataServiceDispatcherImpl::JobGcThread() { } } -Status DataServiceDispatcherImpl::GcOldJobs() EXCLUSIVE_LOCKS_REQUIRED(mu_) { +Status DataServiceDispatcherImpl::GcOldJobs() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> jobs = state_.ListJobs(); int64 now = env_->NowMicros(); for (const auto& job : jobs) { @@ -791,7 +791,7 @@ Status DataServiceDispatcherImpl::GcOldJobs() EXCLUSIVE_LOCKS_REQUIRED(mu_) { Status DataServiceDispatcherImpl::GetDatasetDef( int64 dataset_id, std::shared_ptr& dataset_def) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::shared_ptr dataset; TF_RETURN_IF_ERROR(state_.DatasetFromId(dataset_id, dataset)); return GetDatasetDef(*dataset, dataset_def); @@ -799,7 +799,7 @@ Status DataServiceDispatcherImpl::GetDatasetDef( Status DataServiceDispatcherImpl::GetDatasetDef( const Dataset& dataset, std::shared_ptr& dataset_def) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::string key = DatasetKey(dataset.dataset_id, dataset.fingerprint); return dataset_store_->Get(key, dataset_def); } diff --git a/tensorflow/core/data/service/dispatcher_impl.h b/tensorflow/core/data/service/dispatcher_impl.h index f893c1d3995..2d19813c080 100644 --- a/tensorflow/core/data/service/dispatcher_impl.h +++ b/tensorflow/core/data/service/dispatcher_impl.h @@ -83,42 +83,42 @@ class DataServiceDispatcherImpl { // `restored`. Status RestoreSplitProvider(const DispatcherState::Job& job, std::unique_ptr& restored) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Makes a split provider for the specified `dataset_id`, and stores it in // `split_provider`. Status MakeSplitProvider(int64 dataset_id, std::unique_ptr& split_provider) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Registers a dataset with the given fingerprint, storing the new dataset's // id in `dataset_id`. Status RegisterDataset(uint64 fingerprint, const DatasetDef& dataset, - int64& dataset_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); + int64& dataset_id) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Gets a worker's stub from `worker_stubs_`, or if none exists, creates a // stub and stores it in `worker_stubs_`. A borrowed pointer to the stub is // stored in `out_stub`. Status GetOrCreateWorkerStub(const std::string& worker_address, WorkerService::Stub*& out_stub) - LOCKS_EXCLUDED(mu_); + TF_LOCKS_EXCLUDED(mu_); // Creates a job and stores it in `job`. This method updates the // dispatcher state with the new job, but does not assign tasks to workers. Status CreateJob(int64 dataset_id, ProcessingMode processing_mode, absl::optional named_job_key, std::shared_ptr& job) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Creates tasks for the specified worker, one task for every unfinished job. Status CreateTasksForWorker(const std::string& worker_address); // Acquires a job client id to read from the given job and sets // `job_client_id`. Status AcquireJobClientId( const std::shared_ptr& job, - int64& job_client_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); + int64& job_client_id) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Creates one task for each worker, for the given job. The created tasks are // stored in `tasks`. This method only updates dispatcher metadata with the // new tasks, but doesn't assign the tasks to the workers. Status CreateTasksForJob( std::shared_ptr job, std::vector>& tasks) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Creates a new task for a job, storing the created task in `task`. Status CreateTask(std::shared_ptr job, @@ -128,40 +128,40 @@ class DataServiceDispatcherImpl { // `worker_address` fields. Status AssignTasks( std::vector> tasks) - LOCKS_EXCLUDED(mu_); + TF_LOCKS_EXCLUDED(mu_); // Assigns a task to the worker indicated by its `worker_address` field. Status AssignTask(std::shared_ptr task) - LOCKS_EXCLUDED(mu_); + TF_LOCKS_EXCLUDED(mu_); // Validates that an existing job matches the given processing_mode and // dataset_id, returning an error status describing any difference. Status ValidateMatchingJob(std::shared_ptr job, ProcessingMode processing_mode, int64 dataset_id) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Checks that the dispatcher has started, returning UNAVAILABLE if it hasn't. - Status CheckStarted() LOCKS_EXCLUDED(mu_); + Status CheckStarted() TF_LOCKS_EXCLUDED(mu_); // Records that a split was produced by a call to `GetSplit`. Status RecordSplitProduced(int64 job_id, int64 repetition, bool finished) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Applies a state update, updating both the journal and the in-memory state. - Status Apply(const Update& update) EXCLUSIVE_LOCKS_REQUIRED(mu_); + Status Apply(const Update& update) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Applies a state update, but doesn't update the journal. Only meant to be // used when recovering state when the dispatcher starts. Status ApplyWithoutJournaling(const Update& update) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // A thread which periodically checks for jobs to clean up. void JobGcThread(); // Scans for old jobs and marks them as finished. - Status GcOldJobs() EXCLUSIVE_LOCKS_REQUIRED(mu_); + Status GcOldJobs() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Gets a `DatasetDef` from `dataset_store_` for the given dataset id, and // stores it in `dataset_def`. Status GetDatasetDef(int64 dataset_id, std::shared_ptr& dataset_def) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Gets a `DatasetDef` from `dataset_store_` for the given dataset, and // stores it in `dataset_def`. Status GetDatasetDef(const DispatcherState::Dataset& dataset, std::shared_ptr& dataset_def) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); const experimental::DispatcherConfig& config_; Env* env_; diff --git a/tensorflow/core/data/service/worker_impl.cc b/tensorflow/core/data/service/worker_impl.cc index 4621e1e8a80..7814fd287f4 100644 --- a/tensorflow/core/data/service/worker_impl.cc +++ b/tensorflow/core/data/service/worker_impl.cc @@ -103,7 +103,7 @@ Status DataServiceWorkerImpl::ProcessTask(const ProcessTaskRequest* request, } Status DataServiceWorkerImpl::ProcessTaskInternal(const TaskDef& task_def) - EXCLUSIVE_LOCKS_REQUIRED(mu_) { + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::unique_ptr& task = tasks_[task_def.task_id()]; if (task) { VLOG(1) << "Received request to process already-processed task " @@ -245,7 +245,7 @@ Status DataServiceWorkerImpl::GetWorkerTasks( return Status::OK(); } -void DataServiceWorkerImpl::TaskCompletionThread() LOCKS_EXCLUDED(mu_) { +void DataServiceWorkerImpl::TaskCompletionThread() TF_LOCKS_EXCLUDED(mu_) { while (true) { { mutex_lock l(mu_); @@ -269,7 +269,7 @@ void DataServiceWorkerImpl::TaskCompletionThread() LOCKS_EXCLUDED(mu_) { } } -Status DataServiceWorkerImpl::SendTaskUpdates() LOCKS_EXCLUDED(mu_) { +Status DataServiceWorkerImpl::SendTaskUpdates() TF_LOCKS_EXCLUDED(mu_) { std::vector task_progress; { mutex_lock l(mu_); @@ -292,7 +292,7 @@ Status DataServiceWorkerImpl::SendTaskUpdates() LOCKS_EXCLUDED(mu_) { return Status::OK(); } -void DataServiceWorkerImpl::HeartbeatThread() LOCKS_EXCLUDED(mu_) { +void DataServiceWorkerImpl::HeartbeatThread() TF_LOCKS_EXCLUDED(mu_) { while (true) { int64 next_heartbeat_micros = Env::Default()->NowMicros() + (config_.heartbeat_interval_ms() * 1000); @@ -321,7 +321,7 @@ void DataServiceWorkerImpl::HeartbeatThread() LOCKS_EXCLUDED(mu_) { } } -Status DataServiceWorkerImpl::Heartbeat() LOCKS_EXCLUDED(mu_) { +Status DataServiceWorkerImpl::Heartbeat() TF_LOCKS_EXCLUDED(mu_) { std::vector current_tasks; { mutex_lock l(mu_); diff --git a/tensorflow/core/data/service/worker_impl.h b/tensorflow/core/data/service/worker_impl.h index 16a0ba0cd93..75424cf2485 100644 --- a/tensorflow/core/data/service/worker_impl.h +++ b/tensorflow/core/data/service/worker_impl.h @@ -68,16 +68,17 @@ class DataServiceWorkerImpl { }; // Sends task status to the dispatcher and checks for dispatcher commands. - Status SendTaskUpdates() LOCKS_EXCLUDED(mu_); + Status SendTaskUpdates() TF_LOCKS_EXCLUDED(mu_); // Creates an iterator to process a task. - Status ProcessTaskInternal(const TaskDef& task) EXCLUSIVE_LOCKS_REQUIRED(mu_); + Status ProcessTaskInternal(const TaskDef& task) + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); Status EnsureTaskInitialized(Task& task); // A thread for notifying the dispatcher when tasks complete. - void TaskCompletionThread() LOCKS_EXCLUDED(mu_); + void TaskCompletionThread() TF_LOCKS_EXCLUDED(mu_); // A thread for doing periodic heartbeats to the dispatcher. - void HeartbeatThread() LOCKS_EXCLUDED(mu_); + void HeartbeatThread() TF_LOCKS_EXCLUDED(mu_); // Performs a heartbeat to the dispatcher. - Status Heartbeat() LOCKS_EXCLUDED(mu_); + Status Heartbeat() TF_LOCKS_EXCLUDED(mu_); const experimental::WorkerConfig config_; // The worker's own address. diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 626f0a3730f..47f0f588991 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -375,7 +375,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } } - void UpdateTasks() LOCKS_EXCLUDED(mu_) { + void UpdateTasks() TF_LOCKS_EXCLUDED(mu_) { VLOG(3) << "Updating tasks"; std::vector tasks; bool job_finished; @@ -431,7 +431,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } } - void UpdateWorkerThreads(IteratorContext* ctx) LOCKS_EXCLUDED(mu_) { + void UpdateWorkerThreads(IteratorContext* ctx) TF_LOCKS_EXCLUDED(mu_) { mutex_lock l(mu_); while (num_running_worker_threads_ < max_outstanding_requests_) { num_running_worker_threads_++; @@ -631,7 +631,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { bool job_finished_ = false; std::vector> worker_threads_ TF_GUARDED_BY(mu_); - std::unique_ptr task_thread_manager_ GUARDED_BY(mu_); + std::unique_ptr task_thread_manager_ TF_GUARDED_BY(mu_); }; const int64 dataset_id_; diff --git a/tensorflow/core/kernels/data/split_utils.h b/tensorflow/core/kernels/data/split_utils.h index 82fd4e8c0a4..ac0552b1995 100644 --- a/tensorflow/core/kernels/data/split_utils.h +++ b/tensorflow/core/kernels/data/split_utils.h @@ -35,7 +35,7 @@ class IndexSplitProvider : public SplitProvider { private: mutex mu_; - int64 i_ GUARDED_BY(mu_); + int64 i_ TF_GUARDED_BY(mu_); const int64 n_; };