Remove the internal flag of lazy_remote_inputs_copy.

PiperOrigin-RevId: 350591934
Change-Id: Ia1023a6dc6d20309248d2eb2e2300a6a55a7c2ac
This commit is contained in:
Yujing Zhang 2021-01-07 10:36:08 -08:00 committed by TensorFlower Gardener
parent ce9122eb7b
commit 13d37279f1
32 changed files with 71 additions and 221 deletions

View File

@ -142,7 +142,7 @@ TFE_Context* TFE_NewContext(const TFE_ContextOptions* opts, TF_Status* status) {
opts->session_options.options,
static_cast<tensorflow::ContextDevicePlacementPolicy>(
opts->device_placement_policy),
opts->async, opts->lazy_remote_inputs_copy, device_mgr.release(),
opts->async, device_mgr.release(),
/*device_mgr_owned*/ true, r);
#if !defined(IS_MOBILE_PLATFORM)
eager_context->SetDistributedManager(

View File

@ -482,11 +482,6 @@ TFE_MonitoringSamplerCell* TFE_MonitoringGetCellSampler2(
static_cast<void*>(sampler->sampler->GetCell(label1, label2)));
}
void TFE_ContextOptionsSetLazyRemoteInputsCopy(TFE_ContextOptions* options,
bool lazy_copy) {
options->lazy_remote_inputs_copy = lazy_copy;
}
void TFE_ContextOptionsSetTfrt(TFE_ContextOptions* options, bool use_tfrt) {
options->use_tfrt = use_tfrt;
}

View File

@ -265,10 +265,6 @@ TF_CAPI_EXPORT extern void TFE_MonitoringDeleteSampler2(
TF_CAPI_EXPORT extern TFE_MonitoringSamplerCell* TFE_MonitoringGetCellSampler2(
TFE_MonitoringSampler2* sampler, const char* label1, const char* label2);
// Sets whether to copy the remote inputs of a function lazily.
TF_CAPI_EXPORT extern void TFE_ContextOptionsSetLazyRemoteInputsCopy(
TFE_ContextOptions*, bool lazy_copy);
// Sets whether to use TFRT
TF_CAPI_EXPORT extern void TFE_ContextOptionsSetTfrt(TFE_ContextOptions*,
bool use_tfrt);

View File

@ -32,8 +32,6 @@ struct TFE_ContextOptions {
bool async = false;
TFE_ContextDevicePlacementPolicy device_placement_policy{
TFE_DEVICE_PLACEMENT_SILENT};
// If true, lazily copy the remote inputs of a function to the target devices.
bool lazy_remote_inputs_copy = true;
// If true, use TFRT backend
bool use_tfrt = false;
};

View File

@ -45,8 +45,7 @@ EagerContextPtr CreateTestingEagerContext(DeviceMgr* device_mgr) {
return EagerContextPtr(new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
/* async= */ false,
/* lazy_copy_function_remote_inputs= */ false, device_mgr,
/* async= */ false, device_mgr,
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
/* cluster_flr= */ nullptr));
}

View File

@ -76,8 +76,7 @@ auto* eager_context_created =
EagerContext::EagerContext(
const SessionOptions& opts,
ContextDevicePlacementPolicy default_device_placement_policy, bool async,
const bool lazy_copy_function_remote_inputs, const DeviceMgr* device_mgr,
bool device_mgr_owned, Rendezvous* rendezvous,
const DeviceMgr* device_mgr, bool device_mgr_owned, Rendezvous* rendezvous,
DistributedFunctionLibraryRuntime* cluster_flr)
: ImmediateExecutionContext(kEager),
opts_(opts),
@ -95,7 +94,6 @@ EagerContext::EagerContext(
default_executor_(async),
log_memory_(LogMemory::IsEnabled()),
env_(opts.env),
lazy_copy_function_remote_inputs_(lazy_copy_function_remote_inputs),
use_send_tensor_rpc_(false),
pin_small_ops_to_cpu_(ReadBoolFromEnvVar(
"TF_EAGER_ENABLE_SMALL_TENSOR_CPU_PINNING", false)) {
@ -326,7 +324,7 @@ Status EagerContext::SelectDevice(DeviceNameUtils::ParsedName preferred,
void EagerContext::ResetClusterFLR(
DistributedFunctionLibraryRuntime* cluster_flr) {
cluster_flr_.Reset(cluster_flr, lazy_copy_function_remote_inputs_);
cluster_flr_.Reset(cluster_flr, /*owned=*/true);
}
EagerExecutor& EagerContext::Executor() {
@ -410,10 +408,6 @@ ContextDevicePlacementPolicy EagerContext::GetDevicePlacementPolicy() const {
return default_device_placement_policy_;
}
bool EagerContext::LazyCopyFunctionRemoteInputs() const {
return lazy_copy_function_remote_inputs_;
}
#if !defined(IS_MOBILE_PLATFORM)
std::vector<string> EagerContext::GetRemoteContexts() {
tf_shared_lock l(remote_state_mu_);

View File

@ -96,8 +96,7 @@ class EagerContext : public ImmediateExecutionContext, public core::RefCounted {
EagerContext(const SessionOptions& opts,
ContextDevicePlacementPolicy default_device_placement_policy,
bool async, const bool lazy_copy_function_remote_inputs,
const DeviceMgr* device_mgr, bool device_mgr_owned,
bool async, const DeviceMgr* device_mgr, bool device_mgr_owned,
Rendezvous* rendezvous,
DistributedFunctionLibraryRuntime* cluster_flr = nullptr);
@ -190,8 +189,6 @@ class EagerContext : public ImmediateExecutionContext, public core::RefCounted {
Status SelectDevice(DeviceNameUtils::ParsedName preferred,
const NodeDef& ndef, Device** out) const;
bool LazyCopyFunctionRemoteInputs() const;
bool FindFunctionByName(const string& name) const;
Status FindFunctionOpData(const string& name,

View File

@ -217,7 +217,6 @@ tensorflow::Status CreateRemoteContexts(
tensorflow::uint64 context_id, tensorflow::uint64 context_view_id,
int keep_alive_secs, const tensorflow::ServerDef& server_def,
tensorflow::eager::EagerClientCache* remote_eager_workers, bool async,
const bool lazy_copy_remote_function_inputs,
const tensorflow::eager::CreateContextRequest& base_request) {
int num_remote_workers = remote_workers.size();
tensorflow::BlockingCounter counter(num_remote_workers);
@ -269,8 +268,9 @@ tensorflow::Status CreateRemoteContexts(
}
request.set_async(async);
request.set_keep_alive_secs(keep_alive_secs);
request.set_lazy_copy_remote_function_inputs(
lazy_copy_remote_function_inputs);
// TODO(b/134094971): deprecate lazy_copy_remote_function_inputs when server
// doesn't try to get the value of lazy_copy_remote_function_inputs.
request.set_lazy_copy_remote_function_inputs(true);
eager_client->CreateContextAsync(
&request, response,
@ -557,7 +557,7 @@ tensorflow::Status UpdateContextWithServerDef(
const tensorflow::Status s = CreateRemoteContexts(
context, remote_workers, context_id, context_view_id, keep_alive_secs,
server_def, remote_eager_workers.get(), context->Executor().Async(),
context->LazyCopyFunctionRemoteInputs(), base_request);
base_request);
// NOTE: the remote tasks could fail after `GetAllRemoteDevices` and cause
// the CreateRemoteContexts to fail. We currently only log instead of
// directly returning the error, since returning here will cause the server
@ -582,8 +582,7 @@ tensorflow::Status UpdateContextWithServerDef(
sg.Update(CreateRemoteContexts(
context, added_workers, context_id, context_view_id + 1,
keep_alive_secs, server_def, remote_eager_workers.get(),
context->Executor().Async(), context->LazyCopyFunctionRemoteInputs(),
base_request));
context->Executor().Async(), base_request));
}
if (!existing_workers.empty()) {
if (VLOG_IS_ON(1)) {

View File

@ -58,12 +58,11 @@ class EagerContextTest : public ::testing::Test {
ContextDevicePlacementPolicy policy) {
ASSERT_EQ(context_, nullptr);
InitDeviceManager();
context_ = new EagerContext(
opts, policy,
/* async */ false,
/* lazy_copy_function_remote_inputs */ false, device_manager_,
/* device_mgr_owned */ false, /* rendezvous */ nullptr,
/* cluster_flr */ nullptr);
context_ =
new EagerContext(opts, policy,
/* async */ false, device_manager_,
/* device_mgr_owned */ false, /* rendezvous */ nullptr,
/* cluster_flr */ nullptr);
}
protected:

View File

@ -77,7 +77,7 @@ TEST(CustomDevice, TestTensorHandle) {
core::RefCountPtr<EagerContext> ctx(new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &device_mgr, false, nullptr, nullptr));
&device_mgr, false, nullptr, nullptr));
std::string device_name = "/job:localhost/replica:0/task:0/device:CUSTOM:15";
TestCustomDevice device(device_name);
core::RefCountPtr<TestCustomDeviceTensorHandle> tensor(

View File

@ -48,7 +48,7 @@ TEST(EagerOpRewriteRegistryTest, RegisterRewritePass) {
tensorflow::EagerContext* ctx = new tensorflow::EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &device_mgr, false, nullptr, nullptr);
&device_mgr, false, nullptr, nullptr);
EagerOperation orig_op(ctx);
std::unique_ptr<tensorflow::EagerOperation> out_op;
EXPECT_EQ(Status::OK(),

View File

@ -28,7 +28,7 @@ TEST(EagerOperationTest, DeviceName) {
auto ctx = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &device_mgr, false, nullptr, nullptr);
&device_mgr, false, nullptr, nullptr);
auto op = new EagerOperation(ctx);

View File

@ -197,8 +197,7 @@ Status ValidateInputTypeAndPlacement(
return errors::InvalidArgument("expected ", kernel->num_inputs(),
" inputs, got ", n_inputs);
}
const bool skip_remote_copy =
ctx->LazyCopyFunctionRemoteInputs() && kernel->IsFunction();
const bool is_function = kernel->IsFunction();
if (n_inputs > 0) {
const DataType* input_types = &kernel->input_dtypes()[0];
TensorHandle* const* handles = &op->Inputs()[0];
@ -229,7 +228,7 @@ Status ValidateInputTypeAndPlacement(
}
Device* handle_device = absl::get<Device*>(handle_device_variant);
const bool maybe_copy =
!skip_remote_copy || handle->Type() != TensorHandle::REMOTE;
!is_function || handle->Type() != TensorHandle::REMOTE;
// If the input is already on the right device, then nothing to do.
if (expected_device != handle_device && maybe_copy) {
TF_RETURN_IF_ERROR(CopyInputToExpectedDevice(ctx, op, kernel->device(),
@ -432,23 +431,8 @@ Status GetOrCreateKernelAndDevice(
profiler::TraceMe activity("EagerCopyToDeviceAndAddCacheKey",
profiler::TraceMeLevel::kInfo);
input_dev_ptrs.reserve(op->Inputs().size());
// When LazyCopyFunctionRemoteInputs is disabled, all inputs need to be on
// local devices, since we execute a remote function through worker service,
// which doesn't accept remote inputs.
for (int i = 0, end = op->Inputs().size(); i < end; i++) {
TensorHandle* input = op->Inputs()[i];
if (!ctx.LazyCopyFunctionRemoteInputs() &&
input->Type() == TensorHandle::REMOTE) {
TensorHandle* handle = nullptr;
TF_RETURN_IF_ERROR(
EagerCopyToDevice(input, &ctx, &op->Executor(),
device == nullptr ? ctx.HostCPU() : device,
/*mirror=*/true, &handle));
op->UpdateInput(i, handle);
// Unref handle since it has a ref as an input now
handle->Unref();
input = handle;
}
// Get device for this input, and add it to 'cache_key'.
Device* input_device;
@ -549,9 +533,7 @@ Status GetOrCreateKernelAndDevice(
<< "Full node_def=" << ndef.DebugString();
std::function<int64()> get_op_id = nullptr;
#if !defined(IS_MOBILE_PLATFORM)
if (ctx.LazyCopyFunctionRemoteInputs()) {
get_op_id = [&ctx]() { return ctx.RemoteMgr()->NextOpId(); };
}
get_op_id = [&ctx]() { return ctx.RemoteMgr()->NextOpId(); };
#endif // IS_MOBILE_PLATFORM
kernel.reset(new KernelAndDeviceFunc(
flr, ctx.pflr(), std::move(input_dev_ptrs),
@ -569,9 +551,8 @@ Status GetOrCreateKernelAndDevice(
ctx.GetCollectiveExecutorHandle(), ctx.HostCPU()));
}
TF_RETURN_IF_ERROR(kernel->Init(
{ctx.LogDevicePlacement(), ctx.LazyCopyFunctionRemoteInputs()}, ndef,
graph_collector));
TF_RETURN_IF_ERROR(
kernel->Init(ctx.LogDevicePlacement(), ndef, graph_collector));
if (op->is_function()) {
ctx.AddKernelToCache(cache_key, kernel.get());
@ -873,8 +854,7 @@ Status EagerRemoteExecute(EagerOperation* op, TensorHandle** retvals,
{
profiler::TraceMe activity("CopyInputToExpectedDevice",
profiler::TraceMeLevel::kInfo);
const bool eagerly_copy_function_remote_inputs =
!ctx.LazyCopyFunctionRemoteInputs() || !op->is_function();
const bool is_function = op->is_function();
for (int i = 0, end = op->Inputs().size(); i < end; i++) {
tensorflow::TensorHandle* input = op->Inputs()[i];
tensorflow::Device* input_device = absl::get<Device*>(input->device());
@ -887,8 +867,7 @@ Status EagerRemoteExecute(EagerOperation* op, TensorHandle** retvals,
// explicitly copy, and instead depend on the copy to happen locally
// when the op is executed on the device.
!ctx.OnSameTask(op_device, input_device)) {
if (eagerly_copy_function_remote_inputs ||
input_device_or_cpu->IsLocal()) {
if (!is_function || input_device_or_cpu->IsLocal()) {
tensorflow::Device* remote_cpu_device;
TF_RETURN_IF_ERROR(
ctx.CPUDeviceOnTask(op_device, &remote_cpu_device));
@ -967,19 +946,14 @@ Status EagerRemoteExecute(EagerOperation* op, TensorHandle** retvals,
id, i, remote_task, output_dtypes[i], op_device, &ctx, unknown_device);
}
if (ctx.LazyCopyFunctionRemoteInputs()) {
// Store the data type and shape of a remote resource variable on the
// corresponding remote TensorHandle (output of 'VarHandleOp').
// If the variable is an input of a remote function, the function may need
// the type and shape during function instantiation. When
// LazyCopyFunctionRemoteInputs is enabled, we no longer copy the resource
// handle (contains the type and shape) of the variable to the default
// function device. Instead, we store the type and shape on eager master
// and sent them to the default function device along with the
// EnqueueRequest.
TF_RETURN_IF_ERROR(
StoreResourceDtypesAndShapes(*remote_op, output_dtypes, retvals));
}
// Store the data type and shape of a remote resource variable on the
// corresponding remote TensorHandle (output of 'VarHandleOp').
// If the variable is an input of a remote function, the function may need
// the type and shape during function instantiation. Store the type and
// shape on eager master and sent them to the default function device along
// with the EnqueueRequest.
TF_RETURN_IF_ERROR(
StoreResourceDtypesAndShapes(*remote_op, output_dtypes, retvals));
auto& executor = op->Executor();
DVLOG(4) << "Execute remote eager op: " << op->Name()

View File

@ -68,7 +68,7 @@ TEST(ExecuteNodeTest, ExecuteNodeArgs) {
auto ctx = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &device_mgr, false, nullptr, nullptr);
&device_mgr, false, nullptr, nullptr);
// Set a RemoteMgr to the EagerContext.
auto remote_mgr = absl::make_unique<eager::RemoteMgr>(

View File

@ -97,7 +97,8 @@ KernelAndDeviceFunc::~KernelAndDeviceFunc() {
}
}
Status KernelAndDeviceOp::Init(const Context& ctx, const NodeDef& ndef,
Status KernelAndDeviceOp::Init(const bool log_device_placement,
const NodeDef& ndef,
GraphCollector* graph_collector) {
OpKernel* k = nullptr;
if (flr_ == nullptr) {
@ -129,7 +130,7 @@ Status KernelAndDeviceOp::Init(const Context& ctx, const NodeDef& ndef,
return Status::OK();
}
Status KernelAndDeviceFunc::InstantiateFunc(const Context& ctx,
Status KernelAndDeviceFunc::InstantiateFunc(const bool log_device_placement,
const NodeDef& ndef,
GraphCollector* graph_collector) {
const OpDef* op_def = nullptr;
@ -212,18 +213,19 @@ Status KernelAndDeviceFunc::InstantiateFunc(const Context& ctx,
->mutable_optimizer_options()
->set_do_function_inlining(true);
options.config_proto.set_log_device_placement(ctx.log_device_placement);
options.config_proto.set_log_device_placement(log_device_placement);
TF_RETURN_IF_ERROR(
pflr_->Instantiate(ndef.op(), AttrSlice(ndef), options, &handle_));
return pflr_->IsCrossProcess(handle_, &is_cross_process_);
}
Status KernelAndDeviceFunc::Init(const Context& ctx, const NodeDef& ndef,
Status KernelAndDeviceFunc::Init(const bool log_device_placement,
const NodeDef& ndef,
GraphCollector* graph_collector) {
TF_RETURN_IF_ERROR(InstantiateFunc(ctx, ndef, graph_collector));
return pflr_->GetOutputDevices(handle_, &output_devices_,
ctx.eager_lazy_copy);
TF_RETURN_IF_ERROR(
InstantiateFunc(log_device_placement, ndef, graph_collector));
return pflr_->GetOutputDevices(handle_, &output_devices_);
}
namespace {

View File

@ -97,16 +97,11 @@ typedef absl::variant<Tensor, TensorShape> EagerKernelRet;
// https://www.tensorflow.org/code/tensorflow/core/kernels/ops_testutil.h
class KernelAndDevice : public core::RefCounted {
public:
struct Context {
bool log_device_placement = false;
bool eager_lazy_copy = false;
};
// Populates this with a kernel appropriate for 'ndef'.
//
// The provided FunctionLibraryRuntime MUST outlive all calls to
// Run() on the returned KernelAndDevice.
virtual Status Init(const Context& ctx, const NodeDef& ndef,
virtual Status Init(const bool log_device_placement, const NodeDef& ndef,
GraphCollector* graph_collector) = 0;
// Non-multi-device functions are run using regular CallOp and look like
@ -205,7 +200,7 @@ class KernelAndDeviceOp final : public KernelAndDevice {
~KernelAndDeviceOp() override {}
Status Init(const Context& ctx, const NodeDef& ndef,
Status Init(const bool log_device_placement, const NodeDef& ndef,
GraphCollector* graph_collector) override;
Status Run(ScopedStepContainer* step_container, const EagerKernelArgs& inputs,
@ -290,10 +285,10 @@ class KernelAndDeviceFunc : public KernelAndDevice {
bool IsCrossProcess() override { return is_cross_process_; }
Status InstantiateFunc(const Context& ctx, const NodeDef& ndef,
Status InstantiateFunc(const bool log_device_placement, const NodeDef& ndef,
GraphCollector* graph_collector);
Status Init(const Context& ctx, const NodeDef& ndef,
Status Init(const bool log_device_placement, const NodeDef& ndef,
GraphCollector* graph_collector) override;
Status Run(ScopedStepContainer* step_container, const EagerKernelArgs& inputs,

View File

@ -38,13 +38,12 @@ class EagerOpRewriteTest : public ::testing::Test {
absl::make_unique<StaticDeviceMgr>(DeviceFactory::NewDevice(
"CPU", {}, "/job:localhost/replica:0/task:0/device:CPU:0"));
bool async = false;
bool lazy_remote_tensor_copy = false;
tensorflow::Rendezvous* rendezvous =
new tensorflow::IntraProcessRendezvous(device_mgr.get());
eager_ctx_ = new tensorflow::EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
async, lazy_remote_tensor_copy, device_mgr.get(), false, rendezvous);
async, device_mgr.get(), false, rendezvous);
EagerExecutor executor_(false);
std::unique_ptr<tensorflow::EagerOperation> op(

View File

@ -83,12 +83,11 @@ class PlacementTest : public ::testing::Test {
ContextDevicePlacementPolicy policy) {
ASSERT_EQ(context_, nullptr);
InitDeviceManager();
context_ = new EagerContext(
opts, policy,
/* async */ false,
/* lazy_copy_function_remote_inputs */ false, device_manager_,
/* device_mgr_owned */ false, /* rendezvous */ nullptr,
/* cluster_flr */ nullptr);
context_ =
new EagerContext(opts, policy,
/* async */ false, device_manager_,
/* device_mgr_owned */ false, /* rendezvous */ nullptr,
/* cluster_flr */ nullptr);
}
protected:

View File

@ -39,7 +39,7 @@ TEST(TensorHandle_ShapeTest, AsyncShape) {
auto ctx = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &device_mgr, false, nullptr, nullptr);
&device_mgr, false, nullptr, nullptr);
TensorHandle* sync_th =
TensorHandle::CreateLocalHandle(std::move(t), nullptr, nullptr, ctx);
TensorHandle* async_th = TensorHandle::CreateEmptyLocalHandle(
@ -105,8 +105,7 @@ class PackedTensorHandleTest : public ::testing::Test {
context_ = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
/* async= */ false,
/* lazy_copy_function_remote_inputs= */ false, device_mgr_,
/* async= */ false, device_mgr_,
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
/* cluster_flr= */ nullptr);
}
@ -256,7 +255,7 @@ TEST(TensorHandle_ResourceDeviceTest, OnLocalDevice) {
auto ctx = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &local_device_mgr, false, nullptr, nullptr);
&local_device_mgr, false, nullptr, nullptr);
tensorflow::DataType dtype = DT_RESOURCE;
TensorShape shape = {2};
@ -288,7 +287,7 @@ TEST(TensorHandle_ResourceDeviceTest, OnRemoteDevice) {
auto ctx = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &local_device_mgr, false, nullptr, nullptr);
&local_device_mgr, false, nullptr, nullptr);
std::unique_ptr<Device> d0(
CreateDevice("CPU", "/job:worker/task:0/device:CPU:0", false));
@ -342,8 +341,7 @@ class RemoteTensorHandleTest : public ::testing::Test {
context_ = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
/* async= */ false,
/* lazy_copy_function_remote_inputs= */ false, device_mgr_,
/* async= */ false, device_mgr_,
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
/* cluster_flr= */ nullptr);
}
@ -382,8 +380,7 @@ TEST_F(RemoteTensorHandleTest, UnknownRemoteDevice) {
EagerContext* context = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
/* async= */ false,
/* lazy_copy_function_remote_inputs= */ false, &device_mgr,
/* async= */ false, &device_mgr,
/* device_mgr_owned= */ false, /* rendezvous= */ nullptr,
/* cluster_flr= */ nullptr);
@ -418,7 +415,7 @@ TEST(TensorHandle_DeviceNameTest, OnLocalDevice) {
auto ctx = new EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT, false,
false, &local_device_mgr, false, nullptr, nullptr);
&local_device_mgr, false, nullptr, nullptr);
Device* dcpu = local_device_mgr.ListDevices()[0];
Device* dgpu = local_device_mgr.ListDevices()[1];

View File

@ -995,8 +995,8 @@ Status ProcessFunctionLibraryRuntime::InstantiateMultiDevice(
}
Status ProcessFunctionLibraryRuntime::GetOutputDevices(
FunctionLibraryRuntime::Handle handle, std::vector<Device*>* output_devices,
const bool eager_lazy_copy) const {
FunctionLibraryRuntime::Handle handle,
std::vector<Device*>* output_devices) const {
MultiDeviceFunctionData* data = IsMultiDevice(handle);
if (data == nullptr) {
return errors::InvalidArgument(
@ -1015,16 +1015,6 @@ Status ProcessFunctionLibraryRuntime::GetOutputDevices(
Device* target_device = nullptr;
Device* host = nullptr;
if (target_flr == nullptr) {
if (!eager_lazy_copy) {
return errors::Unimplemented(
"Currently, outputting tensors on remote devices is not supported."
"The ",
comp_data.ret_indices[0],
"-th return value of the function outputs to target_device: ",
target,
" Please copy the tensor to local device explicitly using "
"tf.identity and return the new Tensor instead.");
}
if (!data->has_remote_outputs) {
data->has_remote_outputs = true;
}

View File

@ -150,8 +150,7 @@ class ProcessFunctionLibraryRuntime {
// is set to the device backing the resource.
// REQUIRES: `handle` identifies a multi-device function.
Status GetOutputDevices(FunctionLibraryRuntime::Handle handle,
std::vector<Device*>* output_devices,
const bool eager_lazy_copy) const;
std::vector<Device*>* output_devices) const;
// Returns true if function with handle `handle` was instantiated on device
// `device_name`. Returns false for multi-device functions.

View File

@ -284,12 +284,8 @@ void EagerClusterFunctionLibraryRuntime::CleanUp(
DistributedFunctionLibraryRuntime* CreateClusterFLR(
const uint64 context_id, EagerContext* ctx, WorkerSession* worker_session) {
if (ctx->LazyCopyFunctionRemoteInputs()) {
return new EagerClusterFunctionLibraryRuntime(
context_id, ctx, worker_session->remote_device_mgr());
} else {
return worker_session->cluster_flr();
}
return new EagerClusterFunctionLibraryRuntime(
context_id, ctx, worker_session->remote_device_mgr());
}
} // namespace eager

View File

@ -274,8 +274,7 @@ Status EagerServiceImpl::CreateContext(const CreateContextRequest* request,
opts.config = request->server_def().default_session_config();
tensorflow::EagerContext* ctx = new tensorflow::EagerContext(
opts, tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
request->async(), request->lazy_copy_remote_function_inputs(), device_mgr,
false, r, worker_session->cluster_flr());
request->async(), device_mgr, false, r, worker_session->cluster_flr());
// Ownership will be transferred to the ServerContext, or else in an error
// case ctx will be deleted by this unref.
core::ScopedUnref unref_ctx(ctx);

View File

@ -1220,9 +1220,7 @@ TEST_F(EagerServiceImplTest, RequestsToMasterTest) {
tensorflow::EagerContext* ctx = new tensorflow::EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
/*async=*/false,
/*lazy_copy_function_remote_inputs=*/false, device_mgr_.get(), false,
rendezvous);
/*async=*/false, device_mgr_.get(), false, rendezvous);
const uint64 context_id = random::New64();
// Set RemoteMgr to ctx.

View File

@ -58,7 +58,7 @@ Status CreateUncachedKernelAndDeviceOp(
ctx.HostCPU()));
const NodeDef& ndef = op->MutableAttrs()->BuildNodeDef();
return kernel->get()->Init({ctx.LogDevicePlacement()}, ndef,
return kernel->get()->Init(ctx.LogDevicePlacement(), ndef,
/*graph_collector=*/nullptr);
}

View File

@ -54,9 +54,7 @@ class RemoteMgrTest : public ::testing::Test {
ctx_ = new tensorflow::EagerContext(
SessionOptions(),
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
/*async=*/false,
/*lazy_copy_function_remote_inputs=*/false, device_mgr.release(), true,
rendezvous, nullptr);
/*async=*/false, device_mgr.release(), true, rendezvous, nullptr);
}
~RemoteMgrTest() override { ctx_->Unref(); }

View File

@ -46,8 +46,8 @@ tensorflow::Status DelegateData::Prepare(
eager_context_ = new tensorflow::EagerContext(
session_options,
tensorflow::ContextDevicePlacementPolicy::DEVICE_PLACEMENT_SILENT,
/*async=*/false, /*lazy_copy_function_remote_inputs=*/false,
device_mgr.release(), /*device_mgr_owned*/ true, rendezvous, nullptr);
/*async=*/false, device_mgr.release(), /*device_mgr_owned*/ true,
rendezvous, nullptr);
return tensorflow::Status();
}

View File

@ -419,7 +419,6 @@ class Context(object):
if execution_mode is None:
execution_mode = SYNC
self._default_is_async = execution_mode == ASYNC
self._lazy_remote_inputs_copy = None
self._use_tfrt = is_tfrt_enabled()
self._server_def = server_def
self._collective_ops_server_def = None
@ -521,9 +520,6 @@ class Context(object):
opts, self._mirroring_policy)
if self._default_is_async == ASYNC:
pywrap_tfe.TFE_ContextOptionsSetAsync(opts, True)
if self._lazy_remote_inputs_copy is not None:
pywrap_tfe.TFE_ContextOptionsSetLazyRemoteInputsCopy(
opts, self._lazy_remote_inputs_copy)
if self._use_tfrt is not None:
pywrap_tfe.TFE_ContextOptionsSetTfrt(opts, self._use_tfrt)
context_handle = pywrap_tfe.TFE_NewContext(opts)
@ -1177,10 +1173,6 @@ class Context(object):
A packed EagerTensor.
"""
self.ensure_initialized()
if self._lazy_remote_inputs_copy is not None and (
not self._lazy_remote_inputs_copy):
raise ValueError("Packing eager tensors is not supported when "
"lazy_remote_inputs_copy is disabled.")
return pywrap_tfe.TFE_Py_PackEagerTensors(self._handle, tensors)
def remove_function(self, name):
@ -1669,22 +1661,6 @@ class Context(object):
pywrap_tfe.TFE_ContextSetThreadLocalDevicePlacementPolicy(
self._handle, self._device_policy)
@property
def lazy_remote_inputs_copy(self):
return self._lazy_remote_inputs_copy
@lazy_remote_inputs_copy.setter
def lazy_remote_inputs_copy(self, lazy_copy):
"""Sets whether to copy remote inputs lazily for functions."""
if not isinstance(lazy_copy, bool):
raise ValueError("Expecting a boolean but got %s" % type(lazy_copy))
if self._lazy_remote_inputs_copy != lazy_copy:
if self._initialized:
raise ValueError(
"lazy_remote_inputs_copy should be set before being initialized.")
self._lazy_remote_inputs_copy = lazy_copy
@property
def use_tfrt(self):
return self._use_tfrt

View File

@ -233,21 +233,6 @@ class RemoteExecutionTest(test.TestCase, parameterized.TestCase):
"/job:%s/replica:0/task:0/device:CPU:0" % JOB_NAME)
class RemoteExecutionWithoutLazyRemoteInputsCopyTest(RemoteExecutionTest):
@classmethod
def setUpClass(cls):
super(RemoteExecutionWithoutLazyRemoteInputsCopyTest, cls).setUpClass()
context._reset_context()
context.context().lazy_remote_inputs_copy = False
@classmethod
def tearDownClass(cls):
super(RemoteExecutionWithoutLazyRemoteInputsCopyTest, cls).tearDownClass()
context._reset_context()
context.context().lazy_remote_inputs_copy = True
if __name__ == "__main__":
ops.enable_eager_execution()
test.main()

View File

@ -66,7 +66,6 @@ class SingleWorkerTest(test.TestCase, parameterized.TestCase):
# Reset the context to avoid polluting other test cases.
context._reset_context()
@test_util.eager_lazy_remote_copy_on_and_off
def testMultiDeviceFunctionBasic(self):
@def_function.function
@ -81,7 +80,6 @@ class SingleWorkerTest(test.TestCase, parameterized.TestCase):
self.assertAllEqual(basic(constant_op.constant([2])).numpy(), [5])
self.assertAllEqual(basic(constant_op.constant([1])).numpy(), [4])
@test_util.eager_lazy_remote_copy_on_and_off
def testMultiDeviceFunctionVariable(self):
with ops.device('/job:worker/replica:0/task:0/cpu:0'):
variable_b = variables.Variable(1)
@ -148,7 +146,6 @@ class SingleWorkerTest(test.TestCase, parameterized.TestCase):
self.assertIn('Dimensions must be equal', cm.exception.message)
@test_util.eager_lazy_remote_copy_on_and_off
def testShapeError_Function(self):
@def_function.function
@ -179,7 +176,6 @@ class SingleWorkerTest(test.TestCase, parameterized.TestCase):
with ops.device('/job:worker/task:0'):
self.assertAllEqual(func(), 1)
@test_util.eager_lazy_remote_copy_on_and_off
def testRemoteCall(self):
@def_function.function(
@ -306,7 +302,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
# Reset the context to avoid polluting other test cases.
context._reset_context()
@test_util.eager_lazy_remote_copy_on_and_off
def testReturnRemoteArgument(self):
@def_function.function
@ -376,7 +371,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
else:
os.environ[remote_async_env_var] = default_streaming
@test_util.eager_lazy_remote_copy_on_and_off
def testMultiDeviceFunctionOnLocalDevice(self):
with ops.device('/job:worker/replica:0/task:1'):
variable_b = variables.Variable(1.0)
@ -444,7 +438,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
# Run the function on a local worker
self.assertAllEqual(add_variables().numpy(), 3.0)
@test_util.eager_lazy_remote_copy_on_and_off
def testMultiDeviceFunctionOnRemoteDeviceWithWait(self):
with ops.device('/job:worker/replica:0/task:1'):
variable_b = variables.Variable([1.0])
@ -480,7 +473,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
with ops.device('/job:worker/replica:0/task:2'):
self.assertAllEqual(remote_function2(constant_op.constant([3.0])), [7.0])
@test_util.eager_lazy_remote_copy_on_and_off
def testMultiDeviceFunctionOnRemoteDevice(self):
with ops.device('/job:worker/replica:0/task:1'):
variable_b = variables.Variable(1.0)
@ -518,7 +510,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
self.assertAllEqual(rets[0].numpy(), [2])
self.assertAllEqual(rets[1].numpy(), 2)
@test_util.eager_lazy_remote_copy_on_and_off
def testMultiDeviceWhileLoopOnRemoteDevice(self):
with ops.device('/job:worker/replica:0/task:1'):
variable_b = variables.Variable(1.0)
@ -540,7 +531,6 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
with ops.device('/job:worker/replica:0/task:0/device:GPU:0'):
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
@test_util.eager_lazy_remote_copy_on_and_off
def testSimpleParameterServer(self):
with ops.device('/job:worker/task:2/device:CPU:0'):
@ -585,7 +575,6 @@ class MultiJobsTest(test.TestCase, parameterized.TestCase):
# Reset the context to avoid polluting other test cases.
context._reset_context()
@test_util.eager_lazy_remote_copy_on_and_off
def testSimpleParameterServer(self):
remote.connect_to_cluster(self._cluster)
@ -606,7 +595,6 @@ class MultiJobsTest(test.TestCase, parameterized.TestCase):
self.assertAllEqual(worker_fn(), 8)
# TODO(b/152224115): Re-enable this test.
@test_util.eager_lazy_remote_copy_on_and_off
def DISABLED_testSimpleParameterServerWithDeviceFilters(self):
cluster_device_filters = server_lib.ClusterDeviceFilters()
for i in range(2):
@ -653,7 +641,6 @@ class MultiJobsTest(test.TestCase, parameterized.TestCase):
# subsequent tests.
del v1, v2
@test_util.eager_lazy_remote_copy_on_and_off
def testConnectWithClusterResolver(self):
remote.connect_to_cluster(self._cluster_resolver)
@ -672,12 +659,10 @@ class MultiJobsTest(test.TestCase, parameterized.TestCase):
with ops.device('/job:my_worker/task:1/device:CPU:0'):
self.assertAllEqual(worker_fn(), 8)
@test_util.eager_lazy_remote_copy_on_and_off
def testConnectToClusterTwiceOk(self):
remote.connect_to_cluster(self._cluster_resolver)
remote.connect_to_cluster(self._cluster_resolver)
@test_util.eager_lazy_remote_copy_on_and_off
def testConnectToClusterOnMismatchedDevice(self):
remote.connect_to_cluster(self._cluster_resolver)
@ -687,12 +672,10 @@ class MultiJobsTest(test.TestCase, parameterized.TestCase):
with self.assertRaises(ValueError):
remote.connect_to_cluster(self._cluster_resolver)
@test_util.eager_lazy_remote_copy_on_and_off
def testConnectToClusterWithLocalMaster(self):
local_resolver = SimpleClusterResolver(ClusterSpec({}), master='local')
remote.connect_to_cluster(local_resolver)
@test_util.eager_lazy_remote_copy_on_and_off
def testConnectToClusterInGraphModeWillFail(self):
ops.disable_eager_execution()
with self.assertRaises(ValueError):

View File

@ -1107,21 +1107,6 @@ def run_in_async_and_sync_mode(f):
return decorator
def eager_lazy_remote_copy_on_and_off(f):
"""Execute the test method w/o lazy tensor copy for function remote inputs."""
@parameterized.named_parameters([("WithLazyRemoteCopy", True), ("", False)])
@functools.wraps(f)
def decorator(self, lazily_remote_copy, *args, **kwargs):
if lazily_remote_copy:
context.context().lazy_remote_inputs_copy = True
else:
context.context().lazy_remote_inputs_copy = False
f(self, *args, **kwargs)
return decorator
def run_in_graph_and_eager_modes(func=None,
config=None,
use_gpu=True,

View File

@ -1006,8 +1006,6 @@ PYBIND11_MODULE(_pywrap_tfe, m) {
});
m.def("TFE_ContextOptionsSetDevicePlacementPolicy",
&TFE_ContextOptionsSetDevicePlacementPolicy);
m.def("TFE_ContextOptionsSetLazyRemoteInputsCopy",
&TFE_ContextOptionsSetLazyRemoteInputsCopy);
m.def("TFE_ContextOptionsSetTfrt", &TFE_ContextOptionsSetTfrt);
m.def("TFE_ContextOptionsSetAsync", &TFE_ContextOptionsSetAsync);
m.def("TFE_DeleteContextOptions", &TFE_DeleteContextOptions,