Prefix thread annotations with ABSL_.

PiperOrigin-RevId: 342696677
Change-Id: I8923e0d6aae1a29d8674e1196339e8e224baf25c
This commit is contained in:
Andrew Audibert 2020-11-16 12:36:45 -08:00 committed by TensorFlower Gardener
parent 655efe4f8a
commit 1d1ad47792
9 changed files with 54 additions and 56 deletions

View File

@ -561,7 +561,7 @@ class EagerContext : public ImmediateExecutionContext, public core::RefCounted {
// CompositeDevice.
// TODO(b/145922293): Consider taking device names as keys.
absl::flat_hash_map<uint64, std::unique_ptr<CompositeDevice>>
composite_devices_ ABSL_GUARDED_BY(composite_devices_mu_);
composite_devices_ GUARDED_BY(composite_devices_mu_);
FunctionLibraryDefinition func_lib_def_{OpRegistry::Global(), {}};

View File

@ -186,7 +186,7 @@ Status DataServiceDispatcherImpl::Start() {
Status DataServiceDispatcherImpl::RestoreSplitProvider(
const Job& job, std::unique_ptr<SplitProvider>& restored)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
int64 index = job.distributed_epoch_state.value().split_provider_index;
VLOG(1) << "Restoring split provider for job " << job.job_id << " to index "
<< index;
@ -341,7 +341,7 @@ Status DataServiceDispatcherImpl::GetSplit(const GetSplitRequest* request,
Status DataServiceDispatcherImpl::MakeSplitProvider(
int64 dataset_id, std::unique_ptr<SplitProvider>& split_provider)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::shared_ptr<const Dataset> dataset;
TF_RETURN_IF_ERROR(state_.DatasetFromId(dataset_id, dataset));
std::shared_ptr<const DatasetDef> dataset_def;
@ -393,7 +393,7 @@ Status DataServiceDispatcherImpl::GetOrRegisterDataset(
Status DataServiceDispatcherImpl::RegisterDataset(uint64 fingerprint,
const DatasetDef& dataset,
int64& dataset_id)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
dataset_id = state_.NextAvailableDatasetId();
Update update;
RegisterDatasetUpdate* register_dataset = update.mutable_register_dataset();
@ -487,7 +487,7 @@ Status DataServiceDispatcherImpl::ReleaseJobClient(
// Validates that the job matches the given processing_mode and dataset_id.
Status DataServiceDispatcherImpl::ValidateMatchingJob(
std::shared_ptr<const Job> job, ProcessingMode processing_mode,
int64 dataset_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
int64 dataset_id) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
DCHECK(job->named_job_key.has_value());
std::string job_name = job->named_job_key->name;
if (job->processing_mode != processing_mode) {
@ -506,7 +506,7 @@ Status DataServiceDispatcherImpl::ValidateMatchingJob(
Status DataServiceDispatcherImpl::CreateJob(
int64 dataset_id, ProcessingMode processing_mode,
absl::optional<NamedJobKey> named_job_key, std::shared_ptr<const Job>& job)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
switch (processing_mode) {
case ProcessingMode::PARALLEL_EPOCHS:
case ProcessingMode::DISTRIBUTED_EPOCH:
@ -535,7 +535,7 @@ Status DataServiceDispatcherImpl::CreateJob(
}
Status DataServiceDispatcherImpl::CreateTasksForWorker(
const std::string& worker_address) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
const std::string& worker_address) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::vector<std::shared_ptr<const Job>> jobs = state_.ListJobs();
for (const auto& job : jobs) {
if (job->finished) {
@ -549,7 +549,7 @@ Status DataServiceDispatcherImpl::CreateTasksForWorker(
Status DataServiceDispatcherImpl::AcquireJobClientId(
const std::shared_ptr<const Job>& job, int64& job_client_id)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
job_client_id = state_.NextAvailableJobClientId();
Update update;
AcquireJobClientUpdate* acquire_job_client =
@ -563,7 +563,7 @@ Status DataServiceDispatcherImpl::AcquireJobClientId(
Status DataServiceDispatcherImpl::CreateTasksForJob(
std::shared_ptr<const Job> job,
std::vector<std::shared_ptr<const Task>>& tasks)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::vector<std::shared_ptr<const Worker>> workers = state_.ListWorkers();
tasks.clear();
tasks.reserve(workers.size());
@ -578,7 +578,7 @@ Status DataServiceDispatcherImpl::CreateTasksForJob(
Status DataServiceDispatcherImpl::CreateTask(std::shared_ptr<const Job> job,
const std::string& worker_address,
std::shared_ptr<const Task>& task)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
int64 task_id = state_.NextAvailableTaskId();
Update update;
CreateTaskUpdate* create_task = update.mutable_create_task();
@ -593,7 +593,7 @@ Status DataServiceDispatcherImpl::CreateTask(std::shared_ptr<const Job> job,
}
Status DataServiceDispatcherImpl::AssignTasks(
std::vector<std::shared_ptr<const Task>> tasks) ABSL_LOCKS_EXCLUDED(mu_) {
std::vector<std::shared_ptr<const Task>> tasks) LOCKS_EXCLUDED(mu_) {
for (const auto& task : tasks) {
TF_RETURN_IF_ERROR(AssignTask(task));
}
@ -602,7 +602,7 @@ Status DataServiceDispatcherImpl::AssignTasks(
Status DataServiceDispatcherImpl::GetOrCreateWorkerStub(
const std::string& worker_address, WorkerService::Stub*& out_stub)
ABSL_LOCKS_EXCLUDED(mu_) {
LOCKS_EXCLUDED(mu_) {
{
mutex_lock l(mu_);
auto it = worker_stubs_.find(worker_address);
@ -627,7 +627,7 @@ Status DataServiceDispatcherImpl::GetOrCreateWorkerStub(
}
Status DataServiceDispatcherImpl::AssignTask(std::shared_ptr<const Task> task)
ABSL_LOCKS_EXCLUDED(mu_) {
LOCKS_EXCLUDED(mu_) {
VLOG(2) << "Started assigning task " << task->task_id << " to worker "
<< task->worker_address;
grpc::ClientContext client_ctx;
@ -710,7 +710,7 @@ Status DataServiceDispatcherImpl::GetWorkers(const GetWorkersRequest* request,
return Status::OK();
}
Status DataServiceDispatcherImpl::CheckStarted() ABSL_LOCKS_EXCLUDED(mu_) {
Status DataServiceDispatcherImpl::CheckStarted() LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
if (!started_) {
return errors::Unavailable("Dispatcher has not started yet.");
@ -721,7 +721,7 @@ Status DataServiceDispatcherImpl::CheckStarted() ABSL_LOCKS_EXCLUDED(mu_) {
Status DataServiceDispatcherImpl::RecordSplitProduced(int64 job_id,
int64 repetition,
bool finished)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
Update update;
ProduceSplitUpdate* produce_split = update.mutable_produce_split();
produce_split->set_job_id(job_id);
@ -731,12 +731,12 @@ Status DataServiceDispatcherImpl::RecordSplitProduced(int64 job_id,
}
Status DataServiceDispatcherImpl::ApplyWithoutJournaling(const Update& update)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return state_.Apply(update);
}
Status DataServiceDispatcherImpl::Apply(const Update& update)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (journal_writer_.has_value()) {
TF_RETURN_IF_ERROR(journal_writer_.value()->Write(update));
}
@ -764,8 +764,7 @@ void DataServiceDispatcherImpl::JobGcThread() {
}
}
Status DataServiceDispatcherImpl::GcOldJobs()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
Status DataServiceDispatcherImpl::GcOldJobs() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::vector<std::shared_ptr<const Job>> jobs = state_.ListJobs();
int64 now = env_->NowMicros();
for (const auto& job : jobs) {
@ -792,7 +791,7 @@ Status DataServiceDispatcherImpl::GcOldJobs()
Status DataServiceDispatcherImpl::GetDatasetDef(
int64 dataset_id, std::shared_ptr<const DatasetDef>& dataset_def)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::shared_ptr<const Dataset> dataset;
TF_RETURN_IF_ERROR(state_.DatasetFromId(dataset_id, dataset));
return GetDatasetDef(*dataset, dataset_def);
@ -800,7 +799,7 @@ Status DataServiceDispatcherImpl::GetDatasetDef(
Status DataServiceDispatcherImpl::GetDatasetDef(
const Dataset& dataset, std::shared_ptr<const DatasetDef>& dataset_def)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::string key = DatasetKey(dataset.dataset_id, dataset.fingerprint);
return dataset_store_->Get(key, dataset_def);
}

View File

@ -83,42 +83,42 @@ class DataServiceDispatcherImpl {
// `restored`.
Status RestoreSplitProvider(const DispatcherState::Job& job,
std::unique_ptr<SplitProvider>& restored)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Makes a split provider for the specified `dataset_id`, and stores it in
// `split_provider`.
Status MakeSplitProvider(int64 dataset_id,
std::unique_ptr<SplitProvider>& split_provider)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Registers a dataset with the given fingerprint, storing the new dataset's
// id in `dataset_id`.
Status RegisterDataset(uint64 fingerprint, const DatasetDef& dataset,
int64& dataset_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
int64& dataset_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Gets a worker's stub from `worker_stubs_`, or if none exists, creates a
// stub and stores it in `worker_stubs_`. A borrowed pointer to the stub is
// stored in `out_stub`.
Status GetOrCreateWorkerStub(const std::string& worker_address,
WorkerService::Stub*& out_stub)
ABSL_LOCKS_EXCLUDED(mu_);
LOCKS_EXCLUDED(mu_);
// Creates a job and stores it in `job`. This method updates the
// dispatcher state with the new job, but does not assign tasks to workers.
Status CreateJob(int64 dataset_id, ProcessingMode processing_mode,
absl::optional<DispatcherState::NamedJobKey> named_job_key,
std::shared_ptr<const DispatcherState::Job>& job)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Creates tasks for the specified worker, one task for every unfinished job.
Status CreateTasksForWorker(const std::string& worker_address);
// Acquires a job client id to read from the given job and sets
// `job_client_id`.
Status AcquireJobClientId(
const std::shared_ptr<const DispatcherState::Job>& job,
int64& job_client_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
int64& job_client_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Creates one task for each worker, for the given job. The created tasks are
// stored in `tasks`. This method only updates dispatcher metadata with the
// new tasks, but doesn't assign the tasks to the workers.
Status CreateTasksForJob(
std::shared_ptr<const DispatcherState::Job> job,
std::vector<std::shared_ptr<const DispatcherState::Task>>& tasks)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Creates a new task for a job, storing the created task in `task`.
Status CreateTask(std::shared_ptr<const DispatcherState::Job> job,
@ -128,40 +128,40 @@ class DataServiceDispatcherImpl {
// `worker_address` fields.
Status AssignTasks(
std::vector<std::shared_ptr<const DispatcherState::Task>> tasks)
ABSL_LOCKS_EXCLUDED(mu_);
LOCKS_EXCLUDED(mu_);
// Assigns a task to the worker indicated by its `worker_address` field.
Status AssignTask(std::shared_ptr<const DispatcherState::Task> task)
ABSL_LOCKS_EXCLUDED(mu_);
LOCKS_EXCLUDED(mu_);
// Validates that an existing job matches the given processing_mode and
// dataset_id, returning an error status describing any difference.
Status ValidateMatchingJob(std::shared_ptr<const DispatcherState::Job> job,
ProcessingMode processing_mode, int64 dataset_id)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Checks that the dispatcher has started, returning UNAVAILABLE if it hasn't.
Status CheckStarted() ABSL_LOCKS_EXCLUDED(mu_);
Status CheckStarted() LOCKS_EXCLUDED(mu_);
// Records that a split was produced by a call to `GetSplit`.
Status RecordSplitProduced(int64 job_id, int64 repetition, bool finished)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Applies a state update, updating both the journal and the in-memory state.
Status Apply(const Update& update) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
Status Apply(const Update& update) EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Applies a state update, but doesn't update the journal. Only meant to be
// used when recovering state when the dispatcher starts.
Status ApplyWithoutJournaling(const Update& update)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// A thread which periodically checks for jobs to clean up.
void JobGcThread();
// Scans for old jobs and marks them as finished.
Status GcOldJobs() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
Status GcOldJobs() EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Gets a `DatasetDef` from `dataset_store_` for the given dataset id, and
// stores it in `dataset_def`.
Status GetDatasetDef(int64 dataset_id,
std::shared_ptr<const DatasetDef>& dataset_def)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Gets a `DatasetDef` from `dataset_store_` for the given dataset, and
// stores it in `dataset_def`.
Status GetDatasetDef(const DispatcherState::Dataset& dataset,
std::shared_ptr<const DatasetDef>& dataset_def)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EXCLUSIVE_LOCKS_REQUIRED(mu_);
const experimental::DispatcherConfig& config_;
Env* env_;

View File

@ -103,7 +103,7 @@ Status DataServiceWorkerImpl::ProcessTask(const ProcessTaskRequest* request,
}
Status DataServiceWorkerImpl::ProcessTaskInternal(const TaskDef& task_def)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::unique_ptr<Task>& task = tasks_[task_def.task_id()];
if (task) {
VLOG(1) << "Received request to process already-processed task "
@ -245,7 +245,7 @@ Status DataServiceWorkerImpl::GetWorkerTasks(
return Status::OK();
}
void DataServiceWorkerImpl::TaskCompletionThread() ABSL_LOCKS_EXCLUDED(mu_) {
void DataServiceWorkerImpl::TaskCompletionThread() LOCKS_EXCLUDED(mu_) {
while (true) {
{
mutex_lock l(mu_);
@ -269,7 +269,7 @@ void DataServiceWorkerImpl::TaskCompletionThread() ABSL_LOCKS_EXCLUDED(mu_) {
}
}
Status DataServiceWorkerImpl::SendTaskUpdates() ABSL_LOCKS_EXCLUDED(mu_) {
Status DataServiceWorkerImpl::SendTaskUpdates() LOCKS_EXCLUDED(mu_) {
std::vector<TaskProgress> task_progress;
{
mutex_lock l(mu_);
@ -292,7 +292,7 @@ Status DataServiceWorkerImpl::SendTaskUpdates() ABSL_LOCKS_EXCLUDED(mu_) {
return Status::OK();
}
void DataServiceWorkerImpl::HeartbeatThread() ABSL_LOCKS_EXCLUDED(mu_) {
void DataServiceWorkerImpl::HeartbeatThread() LOCKS_EXCLUDED(mu_) {
while (true) {
int64 next_heartbeat_micros =
Env::Default()->NowMicros() + (config_.heartbeat_interval_ms() * 1000);
@ -321,7 +321,7 @@ void DataServiceWorkerImpl::HeartbeatThread() ABSL_LOCKS_EXCLUDED(mu_) {
}
}
Status DataServiceWorkerImpl::Heartbeat() ABSL_LOCKS_EXCLUDED(mu_) {
Status DataServiceWorkerImpl::Heartbeat() LOCKS_EXCLUDED(mu_) {
std::vector<int64> current_tasks;
{
mutex_lock l(mu_);

View File

@ -68,17 +68,16 @@ class DataServiceWorkerImpl {
};
// Sends task status to the dispatcher and checks for dispatcher commands.
Status SendTaskUpdates() ABSL_LOCKS_EXCLUDED(mu_);
Status SendTaskUpdates() LOCKS_EXCLUDED(mu_);
// Creates an iterator to process a task.
Status ProcessTaskInternal(const TaskDef& task)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
Status ProcessTaskInternal(const TaskDef& task) EXCLUSIVE_LOCKS_REQUIRED(mu_);
Status EnsureTaskInitialized(Task& task);
// A thread for notifying the dispatcher when tasks complete.
void TaskCompletionThread() ABSL_LOCKS_EXCLUDED(mu_);
void TaskCompletionThread() LOCKS_EXCLUDED(mu_);
// A thread for doing periodic heartbeats to the dispatcher.
void HeartbeatThread() ABSL_LOCKS_EXCLUDED(mu_);
void HeartbeatThread() LOCKS_EXCLUDED(mu_);
// Performs a heartbeat to the dispatcher.
Status Heartbeat() ABSL_LOCKS_EXCLUDED(mu_);
Status Heartbeat() LOCKS_EXCLUDED(mu_);
const experimental::WorkerConfig config_;
// The worker's own address.

View File

@ -375,7 +375,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
}
}
void UpdateTasks() ABSL_LOCKS_EXCLUDED(mu_) {
void UpdateTasks() LOCKS_EXCLUDED(mu_) {
VLOG(3) << "Updating tasks";
std::vector<TaskInfo> tasks;
bool job_finished;
@ -431,7 +431,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
}
}
void UpdateWorkerThreads(IteratorContext* ctx) ABSL_LOCKS_EXCLUDED(mu_) {
void UpdateWorkerThreads(IteratorContext* ctx) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
while (num_running_worker_threads_ < max_outstanding_requests_) {
num_running_worker_threads_++;
@ -631,7 +631,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
bool job_finished_ = false;
std::vector<std::unique_ptr<Thread>> worker_threads_ TF_GUARDED_BY(mu_);
std::unique_ptr<Thread> task_thread_manager_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<Thread> task_thread_manager_ GUARDED_BY(mu_);
};
const int64 dataset_id_;

View File

@ -35,7 +35,7 @@ class IndexSplitProvider : public SplitProvider {
private:
mutex mu_;
int64 i_ ABSL_GUARDED_BY(mu_);
int64 i_ GUARDED_BY(mu_);
const int64 n_;
};

View File

@ -150,7 +150,7 @@ class IfOp : public AsyncOpKernel {
mutex mu_;
std::unordered_map<FunctionLibraryRuntime*, std::pair<FHandle, FHandle>>
handles_ ABSL_GUARDED_BY(mu_);
handles_ GUARDED_BY(mu_);
class State {
public:
@ -395,7 +395,7 @@ class WhileOp : public AsyncOpKernel {
mutex mu_;
std::unordered_map<FunctionLibraryRuntime*, std::pair<FHandle, FHandle>>
handles_ ABSL_GUARDED_BY(mu_);
handles_ GUARDED_BY(mu_);
static Status CondResultToBool(OpKernelContext* ctx,
const FunctionLibraryRuntime::Options& opts,

View File

@ -126,7 +126,7 @@ class ProfilerServiceImpl : public grpc::ProfilerService::Service {
mutex mutex_;
absl::flat_hash_map<std::string, bool> stop_signals_per_session_
ABSL_GUARDED_BY(mutex_);
GUARDED_BY(mutex_);
};
} // namespace