Remove experimental mirroring policy APIs.
Mirroring has been enabled by default for a long time. All the runtime code that inspected the flag has long been removed. This is just cleaning up some leftover code. PiperOrigin-RevId: 328421064 Change-Id: I193b67f1f12f96ebb8b6822a780d88f4b82e77af
This commit is contained in:
parent
a8a19d7f51
commit
c74b95e771
tensorflow
c
eager
experimental/saved_model/core
core
common_runtime/eager
context.cccontext.hcontext_test.cceager_op_rewrite_registry_test.cceager_operation_test.ccexecute_node_test.ccmkl_eager_op_rewrite_test.ccplacement_test.cctensor_handle_test.cc
distributed_runtime/eager
lite/delegates/flex
python
@ -745,7 +745,6 @@ TFE_Context* TFE_NewContext(const TFE_ContextOptions* opts, TF_Status* status) {
|
||||
opts->session_options.options,
|
||||
static_cast<tensorflow::ContextDevicePlacementPolicy>(
|
||||
opts->device_placement_policy),
|
||||
static_cast<tensorflow::ContextMirroringPolicy>(opts->mirroring_policy),
|
||||
opts->async, opts->lazy_remote_inputs_copy, device_mgr.release(),
|
||||
/*device_mgr_owned*/ true, r,
|
||||
tensorflow::GetDefaultCustomKernelCreator()));
|
||||
|
@ -486,29 +486,6 @@ TFE_MonitoringSamplerCell* TFE_MonitoringGetCellSampler2(
|
||||
static_cast<void*>(sampler->sampler->GetCell(label1, label2)));
|
||||
}
|
||||
|
||||
void TFE_ContextOptionsSetMirroringPolicy(TFE_ContextOptions* options,
|
||||
TFE_ContextMirroringPolicy policy) {
|
||||
options->mirroring_policy = policy;
|
||||
}
|
||||
|
||||
void TFE_ContextSetThreadLocalMirroringPolicy(
|
||||
TFE_Context* ctx, TFE_ContextMirroringPolicy policy) {
|
||||
tensorflow::EagerContext* context =
|
||||
tensorflow::ContextFromInterface(tensorflow::unwrap(ctx));
|
||||
context->SetThreadLocalMirroringPolicy(
|
||||
static_cast<tensorflow::ContextMirroringPolicy>(policy));
|
||||
}
|
||||
|
||||
// Note: this function looks up a thread local policy. So it should be called in
|
||||
// the appropriate client thread. In particular, in async mode, it may not be
|
||||
// safe to call this function from the async EagerExecutor threads.
|
||||
extern TFE_ContextMirroringPolicy TFE_ContextGetMirroringPolicy(
|
||||
TFE_Context* ctx) {
|
||||
tensorflow::EagerContext* context =
|
||||
tensorflow::ContextFromInterface(tensorflow::unwrap(ctx));
|
||||
return static_cast<TFE_ContextMirroringPolicy>(context->GetMirroringPolicy());
|
||||
}
|
||||
|
||||
void TFE_ContextOptionsSetLazyRemoteInputsCopy(TFE_ContextOptions* options,
|
||||
bool lazy_copy) {
|
||||
options->lazy_remote_inputs_copy = lazy_copy;
|
||||
|
@ -265,33 +265,6 @@ TF_CAPI_EXPORT extern void TFE_MonitoringDeleteSampler2(
|
||||
TF_CAPI_EXPORT extern TFE_MonitoringSamplerCell* TFE_MonitoringGetCellSampler2(
|
||||
TFE_MonitoringSampler2* sampler, const char* label1, const char* label2);
|
||||
|
||||
// LINT.IfChange
|
||||
// Note: Keep in sync with internal copy of enum in eager/context.h.
|
||||
typedef enum TFE_ContextMirroringPolicy {
|
||||
// Do not maintain mirrors in a TensorHandle, instead make new TensorHandle
|
||||
// copies with their own lifetime.
|
||||
TFE_MIRRORING_NONE = 0,
|
||||
// Mirroring any remote tensor handles, associating them with the lifetime of
|
||||
// the local TensorHandle.
|
||||
TFE_MIRRORING_ALL = 1,
|
||||
} TFE_ContextMirroringPolicy;
|
||||
// LINT.ThenChange(//tensorflow/core/common_runtime/eager/context.h)
|
||||
|
||||
TF_CAPI_EXPORT extern void TFE_ContextOptionsSetMirroringPolicy(
|
||||
TFE_ContextOptions*, TFE_ContextMirroringPolicy);
|
||||
|
||||
// Sets a thread-local mirroring policy. After this call, other calls to
|
||||
// TFE_Execute in the same thread will use the mirroring policy specified here
|
||||
// instead of the mirroring policy used to construct the context. This has no
|
||||
// effect on the mirroring policy used by other program threads.
|
||||
TF_CAPI_EXPORT extern void TFE_ContextSetThreadLocalMirroringPolicy(
|
||||
TFE_Context*, TFE_ContextMirroringPolicy);
|
||||
|
||||
// Returns the mirroring policy to be used by this context in the current
|
||||
// thread.
|
||||
TF_CAPI_EXPORT extern TFE_ContextMirroringPolicy TFE_ContextGetMirroringPolicy(
|
||||
TFE_Context*);
|
||||
|
||||
// Sets whether to copy the remote inputs of a function lazily.
|
||||
TF_CAPI_EXPORT extern void TFE_ContextOptionsSetLazyRemoteInputsCopy(
|
||||
TFE_ContextOptions*, bool lazy_copy);
|
||||
|
@ -32,7 +32,6 @@ struct TFE_ContextOptions {
|
||||
bool async = false;
|
||||
TFE_ContextDevicePlacementPolicy device_placement_policy{
|
||||
TFE_DEVICE_PLACEMENT_SILENT};
|
||||
TFE_ContextMirroringPolicy mirroring_policy{TFE_MIRRORING_NONE};
|
||||
// If true, lazily copy the remote inputs of a function to the target devices.
|
||||
bool lazy_remote_inputs_copy = true;
|
||||
// If true, use TFRT backend
|
||||
|
@ -45,7 +45,6 @@ EagerContextPtr CreateTestingEagerContext(DeviceMgr* device_mgr) {
|
||||
return EagerContextPtr(new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE,
|
||||
/* async= */ false,
|
||||
/* lazy_copy_function_remote_inputs= */ false, device_mgr,
|
||||
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
|
||||
|
@ -72,8 +72,7 @@ auto* eager_context_created =
|
||||
|
||||
EagerContext::EagerContext(
|
||||
const SessionOptions& opts,
|
||||
ContextDevicePlacementPolicy default_device_placement_policy,
|
||||
ContextMirroringPolicy default_mirroring_policy, bool async,
|
||||
ContextDevicePlacementPolicy default_device_placement_policy, bool async,
|
||||
const bool lazy_copy_function_remote_inputs, const DeviceMgr* device_mgr,
|
||||
bool device_mgr_owned, Rendezvous* rendezvous,
|
||||
const CustomKernelCreator* custom_kernel_creator,
|
||||
@ -81,7 +80,6 @@ EagerContext::EagerContext(
|
||||
: ImmediateExecutionContext(kEager),
|
||||
opts_(opts),
|
||||
default_device_placement_policy_(default_device_placement_policy),
|
||||
default_mirroring_policy_(default_mirroring_policy),
|
||||
local_device_manager_(device_mgr, device_mgr_owned),
|
||||
host_cpu_device_(device_mgr->HostCPU()),
|
||||
rendezvous_(rendezvous),
|
||||
@ -403,25 +401,6 @@ ContextDevicePlacementPolicy EagerContext::GetDevicePlacementPolicy() const {
|
||||
return default_device_placement_policy_;
|
||||
}
|
||||
|
||||
void EagerContext::SetThreadLocalMirroringPolicy(
|
||||
ContextMirroringPolicy policy) {
|
||||
mutex_lock ml(policy_map_mu_);
|
||||
mirroring_policy_[std::this_thread::get_id()] = policy;
|
||||
}
|
||||
|
||||
ContextMirroringPolicy EagerContext::GetMirroringPolicy() const {
|
||||
tf_shared_lock l(policy_map_mu_);
|
||||
auto policy_map_it = mirroring_policy_.find(std::this_thread::get_id());
|
||||
if (policy_map_it != mirroring_policy_.end()) {
|
||||
return policy_map_it->second;
|
||||
}
|
||||
return default_mirroring_policy_;
|
||||
}
|
||||
|
||||
bool EagerContext::MirrorTensors() const {
|
||||
return GetMirroringPolicy() == MIRRORING_ALL;
|
||||
}
|
||||
|
||||
bool EagerContext::LazyCopyFunctionRemoteInputs() const {
|
||||
return lazy_copy_function_remote_inputs_;
|
||||
}
|
||||
|
@ -94,18 +94,6 @@ enum ContextDevicePlacementPolicy {
|
||||
};
|
||||
// LINT.ThenChange(//tensorflow/c/eager/c_api.h)
|
||||
|
||||
// LINT.IfChange
|
||||
// Note: Keep in sync with exported copy of enum in eager/c_api_experimental.h.
|
||||
enum ContextMirroringPolicy {
|
||||
// Do not maintain mirrors in a TensorHandle, instead make new TensorHandle
|
||||
// copies with their own lifetime.
|
||||
MIRRORING_NONE = 0,
|
||||
// Mirroring any remote tensor handles, associating them with the lifetime of
|
||||
// the local TensorHandle.
|
||||
MIRRORING_ALL = 1,
|
||||
};
|
||||
// LINT.ThenChange(//tensorflow/c/eager/c_api_experimental.h)
|
||||
|
||||
class RunMetadataListener {
|
||||
public:
|
||||
virtual ~RunMetadataListener() {}
|
||||
@ -149,8 +137,7 @@ class EagerContext : public ImmediateExecutionContext, public core::RefCounted {
|
||||
|
||||
EagerContext(const SessionOptions& opts,
|
||||
ContextDevicePlacementPolicy default_device_placement_policy,
|
||||
ContextMirroringPolicy default_mirroring_policy, bool async,
|
||||
const bool lazy_copy_function_remote_inputs,
|
||||
bool async, const bool lazy_copy_function_remote_inputs,
|
||||
const DeviceMgr* device_mgr, bool device_mgr_owned,
|
||||
Rendezvous* rendezvous,
|
||||
const CustomKernelCreator* custom_kernel_creator,
|
||||
@ -234,14 +221,6 @@ class EagerContext : public ImmediateExecutionContext, public core::RefCounted {
|
||||
Status SelectDevice(DeviceNameUtils::ParsedName preferred,
|
||||
const NodeDef& ndef, Device** out) const;
|
||||
|
||||
// Sets the implicit copy policy for the current thread.
|
||||
void SetThreadLocalMirroringPolicy(ContextMirroringPolicy);
|
||||
|
||||
// Returns the implicit copy policy for the current thread.
|
||||
ContextMirroringPolicy GetMirroringPolicy() const;
|
||||
|
||||
bool MirrorTensors() const;
|
||||
|
||||
bool LazyCopyFunctionRemoteInputs() const;
|
||||
|
||||
bool FindFunctionByName(const string& name) const;
|
||||
@ -557,15 +536,12 @@ class EagerContext : public ImmediateExecutionContext, public core::RefCounted {
|
||||
|
||||
SessionOptions opts_;
|
||||
const ContextDevicePlacementPolicy default_device_placement_policy_;
|
||||
const ContextMirroringPolicy default_mirroring_policy_;
|
||||
|
||||
// Note: we cannot use C++11 thread_local here as there is no concept of a
|
||||
// thread-local-object-local variable in C++11.
|
||||
mutable mutex policy_map_mu_;
|
||||
std::unordered_map<std::thread::id, ContextDevicePlacementPolicy>
|
||||
device_placement_policy_ TF_GUARDED_BY(policy_map_mu_);
|
||||
std::unordered_map<std::thread::id, ContextMirroringPolicy> mirroring_policy_
|
||||
TF_GUARDED_BY(policy_map_mu_);
|
||||
|
||||
OwnedOrUnownedHelper<const DeviceMgr> local_device_manager_;
|
||||
// Maintain copy of all previously created local device managers.
|
||||
|
@ -56,7 +56,6 @@ class EagerContextTest : public ::testing::Test {
|
||||
InitDeviceManager();
|
||||
context_ = new EagerContext(
|
||||
opts, policy,
|
||||
/* default_mirroring_policy */ MIRRORING_NONE,
|
||||
/* async */ false,
|
||||
/* lazy_copy_function_remote_inputs */ false, device_manager_,
|
||||
/* device_mgr_owned */ false, /* rendezvous */ nullptr,
|
||||
|
@ -47,9 +47,8 @@ TEST(EagerOpRewriteRegistryTest, RegisterRewritePass) {
|
||||
"CPU", {}, "/job:localhost/replica:0/task:0/device:CPU:0"));
|
||||
tensorflow::EagerContext* ctx = new tensorflow::EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, false, false,
|
||||
&device_mgr, false, nullptr, nullptr);
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
|
||||
false, &device_mgr, false, nullptr, nullptr);
|
||||
EagerOperation orig_op(ctx);
|
||||
std::unique_ptr<tensorflow::EagerOperation> out_op;
|
||||
EXPECT_EQ(Status::OK(),
|
||||
|
@ -27,9 +27,8 @@ TEST(EagerOperationTest, DeviceName) {
|
||||
"CPU", {}, "/job:localhost/replica:0/task:0/device:CPU:0"));
|
||||
auto ctx = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, false, false,
|
||||
&device_mgr, false, nullptr, nullptr, nullptr);
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
|
||||
false, &device_mgr, false, nullptr, nullptr, nullptr);
|
||||
|
||||
auto op = new EagerOperation(ctx);
|
||||
|
||||
|
@ -67,9 +67,8 @@ TEST(ExecuteNodeTest, ExecuteNodeArgs) {
|
||||
|
||||
auto ctx = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, false, false,
|
||||
&device_mgr, false, nullptr, nullptr, nullptr);
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
|
||||
false, &device_mgr, false, nullptr, nullptr, nullptr);
|
||||
|
||||
// Set a RemoteMgr to the EagerContext.
|
||||
auto remote_mgr = absl::make_unique<eager::RemoteMgr>(
|
||||
|
@ -40,8 +40,7 @@ class EagerOpRewriteTest : public ::testing::Test {
|
||||
tensorflow::EagerContext* eager_ctx = new tensorflow::EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, async,
|
||||
lazy_remote_tensor_copy, device_mgr.get(), false, rendezvous,
|
||||
async, lazy_remote_tensor_copy, device_mgr.get(), false, rendezvous,
|
||||
GetDefaultCustomKernelCreator());
|
||||
|
||||
EagerExecutor executor_(false);
|
||||
|
@ -85,7 +85,6 @@ class PlacementTest : public ::testing::Test {
|
||||
InitDeviceManager();
|
||||
context_ = new EagerContext(
|
||||
opts, policy,
|
||||
/* default_mirroring_policy */ MIRRORING_NONE,
|
||||
/* async */ false,
|
||||
/* lazy_copy_function_remote_inputs */ false, device_manager_,
|
||||
/* device_mgr_owned */ false, /* rendezvous */ nullptr,
|
||||
|
@ -38,9 +38,8 @@ TEST(TensorHandle_ShapeTest, AsyncShape) {
|
||||
"CPU", {}, "/job:localhost/replica:0/task:0/device:CPU:0"));
|
||||
auto ctx = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, false, false,
|
||||
&device_mgr, false, nullptr, nullptr, nullptr);
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
|
||||
false, &device_mgr, false, nullptr, nullptr, nullptr);
|
||||
TensorHandle* sync_th =
|
||||
TensorHandle::CreateLocalHandle(std::move(t), nullptr, nullptr, ctx);
|
||||
TensorHandle* async_th = TensorHandle::CreateEmptyLocalHandle(
|
||||
@ -106,7 +105,7 @@ class PackedTensorHandleTest : public ::testing::Test {
|
||||
context_ = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, /* async= */ false,
|
||||
/* async= */ false,
|
||||
/* lazy_copy_function_remote_inputs= */ false, device_mgr_,
|
||||
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
|
||||
/* custom_kernel_creator= */ nullptr,
|
||||
@ -257,9 +256,8 @@ TEST(TensorHandle_ResourceDeviceTest, OnLocalDevice) {
|
||||
StaticDeviceMgr local_device_mgr(std::move(d0));
|
||||
auto ctx = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, false, false,
|
||||
&local_device_mgr, false, nullptr, nullptr, nullptr);
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
|
||||
false, &local_device_mgr, false, nullptr, nullptr, nullptr);
|
||||
|
||||
tensorflow::DataType dtype = DT_RESOURCE;
|
||||
TensorShape shape = {2};
|
||||
@ -290,9 +288,8 @@ TEST(TensorHandle_ResourceDeviceTest, OnRemoteDevice) {
|
||||
StaticDeviceMgr local_device_mgr(std::move(d_local));
|
||||
auto ctx = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, false, false,
|
||||
&local_device_mgr, false, nullptr, nullptr, nullptr);
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
|
||||
false, &local_device_mgr, false, nullptr, nullptr, nullptr);
|
||||
|
||||
std::unique_ptr<Device> d0(
|
||||
CreateDevice("CPU", "/job:worker/task:0/device:CPU:0", false));
|
||||
@ -346,7 +343,7 @@ class RemoteTensorHandleTest : public ::testing::Test {
|
||||
context_ = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, /* async= */ false,
|
||||
/* async= */ false,
|
||||
/* lazy_copy_function_remote_inputs= */ false, device_mgr_,
|
||||
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
|
||||
/* custom_kernel_creator= */ nullptr,
|
||||
@ -387,7 +384,7 @@ TEST_F(RemoteTensorHandleTest, UnknownRemoteDevice) {
|
||||
EagerContext* context = new EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, /* async= */ false,
|
||||
/* async= */ false,
|
||||
/* lazy_copy_function_remote_inputs= */ false, &device_mgr,
|
||||
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
|
||||
/* custom_kernel_creator= */ nullptr,
|
||||
|
@ -278,9 +278,8 @@ Status EagerServiceImpl::CreateContext(const CreateContextRequest* request,
|
||||
opts.config = request->server_def().default_session_config();
|
||||
tensorflow::EagerContext* ctx = new tensorflow::EagerContext(
|
||||
opts, tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, request->async(),
|
||||
request->lazy_copy_remote_function_inputs(), device_mgr, false, r,
|
||||
GetDefaultCustomKernelCreator(), worker_session->cluster_flr());
|
||||
request->async(), request->lazy_copy_remote_function_inputs(), device_mgr,
|
||||
false, r, GetDefaultCustomKernelCreator(), worker_session->cluster_flr());
|
||||
// Ownership will be transferred to the ServerContext, or else in an error
|
||||
// case ctx will be deleted by this unref.
|
||||
core::ScopedUnref unref_ctx(ctx);
|
||||
|
@ -1218,7 +1218,7 @@ TEST_F(EagerServiceImplTest, RequestsToMasterTest) {
|
||||
tensorflow::EagerContext* ctx = new tensorflow::EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, /*async=*/false,
|
||||
/*async=*/false,
|
||||
/*lazy_copy_function_remote_inputs=*/false, device_mgr_.get(), false,
|
||||
rendezvous, GetDefaultCustomKernelCreator());
|
||||
const uint64 context_id = random::New64();
|
||||
|
@ -54,7 +54,7 @@ class RemoteMgrTest : public ::testing::Test {
|
||||
ctx_ = new tensorflow::EagerContext(
|
||||
SessionOptions(),
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE, /*async=*/false,
|
||||
/*async=*/false,
|
||||
/*lazy_copy_function_remote_inputs=*/false, device_mgr.release(), true,
|
||||
rendezvous, GetDefaultCustomKernelCreator(), nullptr);
|
||||
}
|
||||
|
@ -46,7 +46,6 @@ tensorflow::Status DelegateData::Prepare(
|
||||
eager_context_ = new tensorflow::EagerContext(
|
||||
session_options,
|
||||
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
|
||||
tensorflow::ContextMirroringPolicy::MIRRORING_NONE,
|
||||
/*async=*/false, /*lazy_copy_function_remote_inputs=*/false,
|
||||
device_mgr.release(), /*device_mgr_owned*/ true, rendezvous, nullptr);
|
||||
return tensorflow::Status();
|
||||
|
@ -775,7 +775,6 @@ class Cluster(object):
|
||||
device_filters.set_device_filters(
|
||||
"ps", i, ["/job:worker", "/job:%s" % client_name])
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_ALL
|
||||
# Allow at most one outstanding RPC for each worker at a certain time. This
|
||||
# is to simplify worker failure handling in the runtime
|
||||
os.environ["TF_ENABLE_EAGER_CLIENT_STREAMING_ENQUEUE"] = "False"
|
||||
|
@ -68,9 +68,6 @@ DEVICE_PLACEMENT_SILENT_FOR_INT32 = (
|
||||
SYNC = 0
|
||||
ASYNC = 1
|
||||
|
||||
MIRRORING_NONE = pywrap_tfe.TFE_MIRRORING_NONE
|
||||
MIRRORING_ALL = pywrap_tfe.TFE_MIRRORING_ALL
|
||||
|
||||
_KEEP_ALIVE_SECS = 600
|
||||
|
||||
_python_eager_context_create_counter = monitoring.Counter(
|
||||
@ -1648,27 +1645,6 @@ class Context(object):
|
||||
pywrap_tfe.TFE_ContextSetThreadLocalDevicePlacementPolicy(
|
||||
self._handle, self._device_policy)
|
||||
|
||||
@property
|
||||
def mirroring_policy(self):
|
||||
# Only get the policy from the context if it has already been initialized
|
||||
if self._context_handle is not None:
|
||||
return pywrap_tfe.TFE_ContextGetMirroringPolicy(self._handle)
|
||||
|
||||
return self._mirroring_policy
|
||||
|
||||
@mirroring_policy.setter
|
||||
def mirroring_policy(self, policy):
|
||||
if policy is None:
|
||||
policy = MIRRORING_NONE
|
||||
|
||||
if self._mirroring_policy is None or self._mirroring_policy != policy:
|
||||
self._mirroring_policy = policy
|
||||
|
||||
# Only set the policy if the context has already been initialized
|
||||
if self._context_handle is not None:
|
||||
pywrap_tfe.TFE_ContextSetThreadLocalMirroringPolicy(
|
||||
self._handle, self._mirroring_policy)
|
||||
|
||||
@property
|
||||
def lazy_remote_inputs_copy(self):
|
||||
return self._lazy_remote_inputs_copy
|
||||
@ -2104,18 +2080,6 @@ def device_policy(policy):
|
||||
ctx.device_policy = old_policy
|
||||
|
||||
|
||||
@tf_contextlib.contextmanager
|
||||
def mirroring_policy(policy):
|
||||
"""Context manager for setting mirroring policy for current thread."""
|
||||
ctx = context()
|
||||
old_policy = ctx.mirroring_policy
|
||||
try:
|
||||
ctx.mirroring_policy = policy
|
||||
yield
|
||||
finally:
|
||||
ctx.mirroring_policy = old_policy
|
||||
|
||||
|
||||
def set_execution_mode(mode):
|
||||
"""Sets execution mode for the current thread."""
|
||||
context().execution_mode = mode
|
||||
|
@ -92,7 +92,7 @@ class RemoteWorkerMicroBenchmarks(test.Benchmark):
|
||||
wall_time=mean_us,
|
||||
extras={"examples_per_sec": num_iters / total_time})
|
||||
|
||||
def benchmark_send_mirroring_off(self):
|
||||
def benchmark_send(self):
|
||||
remote.connect_to_remote_host(self._cached_server_target1)
|
||||
|
||||
x = random_ops.random_uniform((2, 2)).cpu()
|
||||
@ -105,34 +105,13 @@ class RemoteWorkerMicroBenchmarks(test.Benchmark):
|
||||
with ops.device("job:worker/replica:0/task:0/device:CPU:0"):
|
||||
return remote_func(m)
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_NONE
|
||||
self._run(lambda: func(x))
|
||||
# NOTE(b/136184459): Force garbage collecting hanging resources before
|
||||
# subsequent calls to set_server_def, to ensure the destroy resource ops are
|
||||
# executed when their corresponding device and manager are still available.
|
||||
gc.collect()
|
||||
|
||||
def benchmark_send_mirroring_on(self):
|
||||
remote.connect_to_remote_host(self._cached_server_target1)
|
||||
|
||||
x = random_ops.random_uniform((2, 2)).cpu()
|
||||
|
||||
@def_function.function
|
||||
def remote_func(m):
|
||||
return math_ops.matmul(m, m)
|
||||
|
||||
def func(m):
|
||||
with ops.device("job:worker/replica:0/task:0/device:CPU:0"):
|
||||
return remote_func(m)
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_ALL
|
||||
self._run(lambda: func(x))
|
||||
# NOTE(b/136184459): Force garbage collecting hanging resources before
|
||||
# subsequent calls to set_server_def, to ensure the destroy resource ops are
|
||||
# executed when their corresponding device and manager are still available.
|
||||
gc.collect()
|
||||
|
||||
def benchmark_worker_mirroring_off(self):
|
||||
def benchmark_worker_recv(self):
|
||||
remote.connect_to_remote_host(
|
||||
[self._cached_server_target1, self._cached_server_target2])
|
||||
|
||||
@ -147,29 +126,6 @@ class RemoteWorkerMicroBenchmarks(test.Benchmark):
|
||||
with ops.device("job:worker/replica:0/task:0/device:CPU:0"):
|
||||
return remote_func()
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_NONE
|
||||
self._run(func)
|
||||
# NOTE(b/136184459): Force garbage collecting hanging resources before
|
||||
# subsequent calls to set_server_def, to ensure the destroy resource ops are
|
||||
# executed when their corresponding device and manager are still available.
|
||||
gc.collect()
|
||||
|
||||
def benchmark_worker_mirroring_on(self):
|
||||
remote.connect_to_remote_host(
|
||||
[self._cached_server_target1, self._cached_server_target2])
|
||||
|
||||
with ops.device("job:worker/replica:0/task:1/device:CPU:0"):
|
||||
v = variables.Variable(1.0)
|
||||
|
||||
@def_function.function
|
||||
def remote_func():
|
||||
return 1.0 + v
|
||||
|
||||
def func():
|
||||
with ops.device("job:worker/replica:0/task:0/device:CPU:0"):
|
||||
return remote_func()
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_ALL
|
||||
self._run(func)
|
||||
# NOTE(b/136184459): Force garbage collecting hanging resources before
|
||||
# subsequent calls to set_server_def, to ensure the destroy resource ops are
|
||||
|
@ -468,17 +468,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
|
||||
c = a + 1.0
|
||||
return c
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_NONE
|
||||
|
||||
with ops.device('/job:worker/replica:0/task:0'):
|
||||
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
|
||||
|
||||
if test_util.is_gpu_available():
|
||||
with ops.device('/job:worker/replica:0/task:0/device:GPU:0'):
|
||||
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_ALL
|
||||
|
||||
with ops.device('/job:worker/replica:0/task:0'):
|
||||
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
|
||||
|
||||
@ -520,17 +509,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
|
||||
|
||||
return control_flow_ops.while_loop_v2(lambda _, d: d < 1, body, [i, 0])[0]
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_NONE
|
||||
|
||||
with ops.device('/job:worker/replica:0/task:0'):
|
||||
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
|
||||
|
||||
if test_util.is_gpu_available():
|
||||
with ops.device('/job:worker/replica:0/task:0/device:GPU:0'):
|
||||
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
|
||||
|
||||
context.context().mirroring_policy = context.MIRRORING_ALL
|
||||
|
||||
with ops.device('/job:worker/replica:0/task:0'):
|
||||
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
|
||||
|
||||
|
@ -544,19 +544,11 @@ PYBIND11_MODULE(_pywrap_tfe, m) {
|
||||
return TFE_ContextGetDevicePlacementPolicy(
|
||||
tensorflow::InputTFE_Context(ctx));
|
||||
});
|
||||
m.def("TFE_ContextGetMirroringPolicy", [](py::handle& ctx) {
|
||||
return TFE_ContextGetMirroringPolicy(tensorflow::InputTFE_Context(ctx));
|
||||
});
|
||||
m.def("TFE_ContextSetThreadLocalDevicePlacementPolicy",
|
||||
[](py::handle& ctx, TFE_ContextDevicePlacementPolicy policy) {
|
||||
TFE_ContextSetThreadLocalDevicePlacementPolicy(
|
||||
tensorflow::InputTFE_Context(ctx), policy);
|
||||
});
|
||||
m.def("TFE_ContextSetThreadLocalMirroringPolicy",
|
||||
[](py::handle& ctx, TFE_ContextMirroringPolicy policy) {
|
||||
TFE_ContextSetThreadLocalMirroringPolicy(
|
||||
tensorflow::InputTFE_Context(ctx), policy);
|
||||
});
|
||||
m.def("TFE_ContextSetServerDef", [](py::handle& ctx, int keep_alive_secs,
|
||||
py::bytes proto) {
|
||||
tensorflow::Safe_TF_StatusPtr status =
|
||||
@ -862,8 +854,6 @@ PYBIND11_MODULE(_pywrap_tfe, m) {
|
||||
m.def("TFE_ContextOptionsSetLazyRemoteInputsCopy",
|
||||
&TFE_ContextOptionsSetLazyRemoteInputsCopy);
|
||||
m.def("TFE_ContextOptionsSetTfrt", &TFE_ContextOptionsSetTfrt);
|
||||
m.def("TFE_ContextOptionsSetMirroringPolicy",
|
||||
&TFE_ContextOptionsSetMirroringPolicy);
|
||||
m.def("TFE_ContextOptionsSetAsync", &TFE_ContextOptionsSetAsync);
|
||||
m.def("TFE_DeleteContextOptions", &TFE_DeleteContextOptions,
|
||||
py::return_value_policy::reference);
|
||||
@ -1312,9 +1302,4 @@ PYBIND11_MODULE(_pywrap_tfe, m) {
|
||||
.value("TF_ATTR_PLACEHOLDER", TF_ATTR_PLACEHOLDER)
|
||||
.value("TF_ATTR_FUNC", TF_ATTR_FUNC)
|
||||
.export_values();
|
||||
|
||||
py::enum_<TFE_ContextMirroringPolicy>(m, "TFE_ContextMirroringPolicy")
|
||||
.value("TFE_MIRRORING_NONE", TFE_MIRRORING_NONE)
|
||||
.value("TFE_MIRRORING_ALL", TFE_MIRRORING_ALL)
|
||||
.export_values();
|
||||
};
|
||||
|
@ -109,10 +109,6 @@ def initialize_tpu_system(cluster_resolver=None):
|
||||
context.context()._clear_caches() # pylint: disable=protected-access
|
||||
|
||||
serialized_topology = output.numpy()
|
||||
|
||||
# TODO(b/134094971): Remove this when lazy tensor copy in multi-device
|
||||
# function has been implemented.
|
||||
context.context().mirroring_policy = context.MIRRORING_ALL
|
||||
elif not ops.executing_eagerly_outside_functions():
|
||||
master = cluster_resolver.master()
|
||||
cluster_spec = cluster_resolver.cluster_spec()
|
||||
|
Loading…
Reference in New Issue
Block a user