Rename WorkerCacheInterface::CreateWorker to GetOrCreateWorker().

Change-Id: I7d3a0c40f7a578a6eef97f82bad13bb6ac2f9cbc
This commit is contained in:
jiakai 2019-05-23 17:54:08 +08:00
parent 592fa18bc1
commit fda82aa165
17 changed files with 23 additions and 29 deletions

View File

@ -142,7 +142,7 @@ class GdrRemoteRendezvous : public BaseRemoteRendezvous {
}
WorkerSession* sess = session();
WorkerInterface* rwi = sess->worker_cache->CreateWorker(src_worker);
WorkerInterface* rwi = sess->worker_cache->GetOrCreateWorker(src_worker);
if (rwi == nullptr) {
Status s = errors::Internal("No worker known as ", src_worker);
done(s, Args(), recv_args, Tensor{}, false);

View File

@ -32,7 +32,7 @@ class CancellableCall {
: cancel_mgr_(cancel_mgr),
remote_worker_(remote_worker),
wc_(wc),
wi_(wc_->CreateWorker(remote_worker_)) {}
wi_(wc_->GetOrCreateWorker(remote_worker_)) {}
virtual ~CancellableCall() { wc_->ReleaseWorker(remote_worker_, wi_); }

View File

@ -127,7 +127,7 @@ Status ClusterFunctionLibraryRuntime::Instantiate(
VLOG(1) << "CFLR::Instantiate: " << function_name << " on " << options.target
<< " (this: " << this << ")";
WorkerInterface* wi =
worker_session_->worker_cache->CreateWorker(options.target);
worker_session_->worker_cache->GetOrCreateWorker(options.target);
if (wi == nullptr) {
std::vector<string> workers;

View File

@ -97,7 +97,7 @@ void DeviceResolverDistributed::RefreshRemoteAttributes(
const string& device, const string& task, const StatusCallback& done) {
GetStatusRequest* req = new GetStatusRequest;
GetStatusResponse* resp = new GetStatusResponse;
WorkerInterface* worker = worker_cache_->CreateWorker(task);
WorkerInterface* worker = worker_cache_->GetOrCreateWorker(task);
CHECK(worker) << "Failed to get worker for " << task;
worker->GetStatusAsync(
req, resp, [this, device, task, req, resp, worker, done](Status s) {

View File

@ -629,7 +629,7 @@ void Master::CleanupWorkers(const ResetRequest& reset) {
int c = 0;
for (int i = 0; i < num_workers; ++i) {
const string& worker_name = worker_names[i];
auto worker = env_->worker_cache->CreateWorker(worker_name);
auto worker = env_->worker_cache->GetOrCreateWorker(worker_name);
if (worker) {
worker->CleanupAllAsync(
&req, &resp[i], [this, &n, worker_name, worker, c](Status s) {

View File

@ -444,7 +444,7 @@ Status MasterSession::ReffedClientGraph::DoRegisterPartitions(
Part* part = &partitions_.back();
part->name = name_def.first;
TrackFeedsAndFetches(part, name_def.second, popts);
part->worker = worker_cache_->CreateWorker(part->name);
part->worker = worker_cache_->GetOrCreateWorker(part->name);
if (part->worker == nullptr) {
s = errors::NotFound("worker ", part->name);
break;
@ -1279,7 +1279,7 @@ Status MasterSession::CreateWorkerSessions(
// Create all the workers & kick off the computations.
for (size_t i = 0; i < worker_names.size(); ++i) {
workers[i].name = &worker_names[i];
workers[i].worker = worker_cache->CreateWorker(worker_names[i]);
workers[i].worker = worker_cache->GetOrCreateWorker(worker_names[i]);
workers[i].request.set_session_handle(handle_);
if (session_opts_.config.experimental()
.share_cluster_devices_in_session()) {
@ -1377,7 +1377,7 @@ Status MasterSession::DeleteWorkerSessions() {
// Create all the workers & kick off the computations.
for (size_t i = 0; i < worker_names.size(); ++i) {
workers[i].name = &worker_names[i];
workers[i].worker = worker_cache->CreateWorker(worker_names[i]);
workers[i].worker = worker_cache->GetOrCreateWorker(worker_names[i]);
workers[i].request.set_session_handle(handle_);
// Since the worker may have gone away, set a timeout to avoid blocking the
// session-close operation.

View File

@ -66,7 +66,7 @@ void AsRemoteDevices(
void NewRemoteDevices(Env* env, WorkerCacheInterface* worker_cache,
const string& worker_name, NewRemoteDevicesDone done) {
WorkerInterface* wi = worker_cache->CreateWorker(worker_name);
WorkerInterface* wi = worker_cache->GetOrCreateWorker(worker_name);
if (wi == nullptr) {
std::vector<Device*> empty;
done(errors::NotFound("Device ", worker_name, " is not found."), &empty);

View File

@ -53,7 +53,7 @@ class RemoteDeviceTest : public ::testing::Test {
NewGrpcChannelCache(spec, channel_func));
worker_cache_.reset(NewGrpcWorkerCache(channel_cache));
remote_name_ = "/job:localhost/replica:0/task:0";
wi_ = worker_cache_->CreateWorker(remote_name_);
wi_ = worker_cache_->GetOrCreateWorker(remote_name_);
}
~RemoteDeviceTest() override {

View File

@ -69,7 +69,7 @@ class GrpcWorkerCache : public WorkerCachePartial {
channel_cache_->ListWorkersInJob(job_name, workers);
}
WorkerInterface* CreateWorker(const string& target) override {
WorkerInterface* GetOrCreateWorker(const string& target) override {
if (target == local_target_) {
return local_worker_;
} else {

View File

@ -232,7 +232,7 @@ void RpcRemoteRendezvous::RecvFromRemoteAsync(
// The worker will be released in a subsequent call to
// `sess->worker_cache->ReleaseWorker()` (if the call has not yet been
// initialized) or `call->ReleaseWorker()` (if it has been initialized).
WorkerInterface* rwi = sess->worker_cache->CreateWorker(call->src_worker_);
WorkerInterface* rwi = sess->worker_cache->GetOrCreateWorker(call->src_worker_);
if (s.ok() && rwi == nullptr) {
s = errors::Internal("No worker known as ", call->src_worker_);
}

View File

@ -52,7 +52,7 @@ class DummyWorkerCache : public WorkerCacheInterface {
void ListWorkers(std::vector<string>* workers) const override {}
void ListWorkersInJob(const string& job_name,
std::vector<string>* workers) const override {}
WorkerInterface* CreateWorker(const string& target) override {
WorkerInterface* GetOrCreateWorker(const string& target) override {
return nullptr;
}
bool GetDeviceLocalityNonBlocking(const string& device,

View File

@ -80,7 +80,7 @@ void RpcCollectiveExecutorMgr::RefreshStepIdSequenceAsync(
gks->next_step_id_ = NewRandomStepId();
done(Status::OK());
} else {
WorkerInterface* wi = worker_cache_->CreateWorker(group_leader_);
WorkerInterface* wi = worker_cache_->GetOrCreateWorker(group_leader_);
GetStepSequenceRequest* req = new GetStepSequenceRequest;
GetStepSequenceResponse* resp = new GetStepSequenceResponse;
req->add_graph_key(graph_key);

View File

@ -152,7 +152,7 @@ class TestWorkerCache : public WorkerCacheInterface {
}
}
WorkerInterface* CreateWorker(const string& target) override {
WorkerInterface* GetOrCreateWorker(const string& target) override {
auto it = workers_.find(target);
if (it != workers_.end()) {
return it->second;

View File

@ -43,12 +43,9 @@ class WorkerCacheInterface {
// or can be constructed, returns a pointer to a WorkerInterface object
// wrapping that channel. The returned value must be destroyed by
// calling `this->ReleaseWorker(target, ret)`
// TODO(mrry): rename this to GetOrCreateWorker() or something that
// makes it more obvious that this method returns a potentially
// shared object.
virtual WorkerInterface* CreateWorker(const string& target) = 0;
virtual WorkerInterface* GetOrCreateWorker(const string& target) = 0;
// Release a worker previously returned by this->CreateWorker(target).
// Release a worker previously returned by this->GetOrCreateWorker(target).
//
// TODO(jeff,sanjay): Consider moving target into WorkerInterface.
// TODO(jeff,sanjay): Unify all worker-cache impls and factor out a

View File

@ -65,7 +65,7 @@ Status WorkerCachePartial::RefreshDeviceStatus(const string& device_name) {
auto deleter = [this, &task](WorkerInterface* wi) {
ReleaseWorker(task, wi);
};
std::unique_ptr<WorkerInterface, decltype(deleter)> rwi(CreateWorker(task),
std::unique_ptr<WorkerInterface, decltype(deleter)> rwi(GetOrCreateWorker(task),
deleter);
if (s.ok() && !rwi) {
s = errors::Internal("RefreshDeviceStatus, unknown worker task: ", task);

View File

@ -41,14 +41,11 @@ class WorkerCacheWrapper : public WorkerCacheInterface {
// or can be constructed, returns a pointer to a WorkerInterface object
// wrapping that channel. The returned value must be destroyed by
// calling `this->ReleaseWorker(target, ret)`
// TODO(mrry): rename this to GetOrCreateWorker() or something that
// makes it more obvious that this method returns a potentially
// shared object.
virtual WorkerInterface* CreateWorker(const string& target) {
return wrapped_->CreateWorker(target);
virtual WorkerInterface* GetOrCreateWorker(const string& target) {
return wrapped_->GetOrCreateWorker(target);
}
// Release a worker previously returned by this->CreateWorker(target).
// Release a worker previously returned by this->GetOrCreateWorker(target).
//
// TODO(jeff,sanjay): Consider moving target into WorkerInterface.
// TODO(jeff,sanjay): Unify all worker-cache impls and factor out a

View File

@ -42,14 +42,14 @@ class WorkerFreeListCache : public WorkerCacheInterface {
wrapped_->ListWorkersInJob(job_name, workers);
}
WorkerInterface* CreateWorker(const string& target) override {
WorkerInterface* GetOrCreateWorker(const string& target) override {
mutex_lock l(mu_);
auto p = workers_.find(target);
if (p != workers_.end()) {
return p->second.worker;
}
WorkerState state;
state.worker = wrapped_->CreateWorker(target);
state.worker = wrapped_->GetOrCreateWorker(target);
if (state.worker != nullptr) {
workers_.insert(std::make_pair(target, state));
}