Fix an issue of out of order execution. For a multi-device function, don't send a packed input to the function device until all underlying remote handles are ready on remote devices. Otherwise, on a remote worker, a remote component function execution request could be enqueued before a request for producing a function input.

PiperOrigin-RevId: 347079883
Change-Id: I7929f27539890ce6e55c09d1ef79847101187b28
This commit is contained in:
Yujing Zhang 2020-12-11 14:49:21 -08:00 committed by TensorFlower Gardener
parent 63b7178b4a
commit 4867489c44
6 changed files with 65 additions and 11 deletions

View File

@ -20,10 +20,11 @@ namespace {
void TestRemoteExecuteSilentCopiesFunc(bool async, bool remote,
bool heavy_load_on_streaming_rpc,
bool remote_func_outputs = false) {
bool remote_func_outputs = false,
bool has_packed_input = false) {
return TestRemoteExecuteSilentCopies(async, remote, /*func=*/true,
heavy_load_on_streaming_rpc,
remote_func_outputs);
remote_func_outputs, has_packed_input);
}
TEST(CAPI, RemoteExecuteSilentCopiesAsyncFunc) {
@ -60,5 +61,14 @@ TEST(CAPI, RemoteExecuteSilentCopiesLocalAsyncFuncOrdering) {
TestRemoteExecuteSilentCopiesFunc(/*async=*/true, /*remote=*/false,
/*heavy_load_on_streaming_rpc=*/true);
}
TEST(CAPI, RemoteExecuteSilentCopiesRemoteAsyncPackedInputFuncOrdering) {
// A remote input (packed) may be not ready when we start running a function.
// Test that the function execution should wait until the remote input is
// ready.
TestRemoteExecuteSilentCopiesFunc(/*async=*/true, /*remote=*/true,
/*heavy_load_on_streaming_rpc=*/true,
/*remote_func_outputs*/ true,
/*has_packed_input=*/true);
}
} // namespace

View File

@ -68,7 +68,9 @@ string MatMulFunction(const string& matmul_device) {
void TestRemoteExecuteSilentCopies(bool async, bool remote, bool func,
bool heavy_load_on_streaming_rpc,
bool remote_func_outputs) {
bool remote_func_outputs,
bool has_packed_input) {
CHECK(!has_packed_input || func);
tensorflow::ServerDef server_def = GetServerDef(3);
// This server def has the task index set to 0.
@ -123,6 +125,15 @@ void TestRemoteExecuteSilentCopies(bool async, bool remote, bool func,
TFE_TensorHandleCopyToDevice(h1_task0, ctx, task2_name, status);
ASSERT_EQ(TF_GetCode(status), TF_OK) << TF_Message(status);
TFE_TensorHandle* packed_handle = nullptr;
if (has_packed_input) {
int num_replicas = 1;
std::vector<TFE_TensorHandle*> packed_handles = {h1_task2};
packed_handle = TFE_CreatePackedTensorHandle(ctx, packed_handles.data(),
&num_replicas, status);
ASSERT_EQ(TF_GetCode(status), TF_OK) << TF_Message(status);
}
TFE_Op* matmul = nullptr;
if (func) {
const string matmul_device = remote_func_outputs ? task2_name : "";
@ -135,7 +146,7 @@ void TestRemoteExecuteSilentCopies(bool async, bool remote, bool func,
ASSERT_EQ(TF_GetCode(status), TF_OK) << TF_Message(status);
TFE_OpAddInput(matmul, h0_task0, status);
ASSERT_EQ(TF_GetCode(status), TF_OK) << TF_Message(status);
TFE_OpAddInput(matmul, h1_task2, status);
TFE_OpAddInput(matmul, has_packed_input ? packed_handle : h1_task2, status);
ASSERT_EQ(TF_GetCode(status), TF_OK) << TF_Message(status);
} else {
// Handles are on task0 (local), and task2, but op is on task1.
@ -194,6 +205,9 @@ void TestRemoteExecuteSilentCopies(bool async, bool remote, bool func,
TFE_DeleteTensorHandle(h0_task0);
TFE_DeleteTensorHandle(h1_task0);
if (packed_handle) {
TFE_DeleteTensorHandle(packed_handle);
}
TFE_DeleteTensorHandle(h1_task2);
TFE_DeleteTensorHandle(retvals[0]);
for (auto* h : handles_task0) {

View File

@ -21,6 +21,7 @@ limitations under the License.
// is not ready when we start running an op or a function.
void TestRemoteExecuteSilentCopies(bool async, bool remote, bool func,
bool heavy_load_on_streaming_rpc,
bool remote_func_outputs = false);
bool remote_func_outputs = false,
bool has_packed_input = false);
#endif // TENSORFLOW_C_EAGER_C_API_REMOTE_TEST_UTIL_H_

View File

@ -920,11 +920,11 @@ Status EagerRemoteExecute(EagerOperation* op, TensorHandle** retvals,
}
}
auto* input_handle = remote_op->add_op_inputs()->mutable_remote_handle();
// For a multi-device function, a remote RunComponentFunction request is
// not sent through StreamingEnqueueAsync. It could arrive at a remote
// worker before a remote execution request which produces an input of the
// component function. So we wait until the remote input is ready before
// serializing it.
// For a remote component function, a function execution request and an
// input generation request may come from different workers. We need to
// guarantee that the input generation request is processed before the
// function execution request, so wait until the remote input is ready
// before sending it to the multi-device function device.
const bool wait_until_ready = op->is_function();
TF_RETURN_IF_ERROR(ctx.RemoteMgr()->SerializeRemoteTensorHandle(
input, wait_until_ready, input_handle, input_device,

View File

@ -333,8 +333,13 @@ Status SerializePackedHandle(const uint64 op_id, TensorHandle* packed_handle,
const bool serialize_resource_dtype_and_shape =
(i == 0) && (h->dtype == DT_RESOURCE) &&
(!ctx->OnSameTask(src_device, target_device));
// For a remote component function, a function execution request and an
// input generation request may come from different workers. We need to
// guarantee that the input generation request is processed before the
// function execution request, so wait until the underlying remote handles
// are ready before sending a packed handle to the function device.
TF_RETURN_IF_ERROR(ctx->RemoteMgr()->SerializeRemoteTensorHandle(
h, /*wait_until_ready=*/false,
h, /*wait_until_ready=*/true,
op->add_handles()->mutable_remote_handle(), src_device,
absl::get<Device*>(h->DeviceOrHostCPU(*ctx))->name(),
serialize_resource_dtype_and_shape));

View File

@ -390,6 +390,30 @@ class MultiWorkersTest(test.TestCase, parameterized.TestCase):
self.assertAllEqual(remote_function(constant_op.constant([1.0])), [3.0])
def testMultiDeviceFunctionExecutionOrderingWithPackedInput(self):
shape = [2]
with ops.device('/job:worker/replica:0/task:2/device:CPU:0'):
# Send 20 remote requests to simulate heavy load on worker:2.
unused_values = []
for _ in range(20):
unused_values.append(array_ops.zeros(shape))
func_input = array_ops.zeros(shape)
packed_input = ops.pack_eager_tensors([func_input])
@def_function.function
def func(packed_input):
# When worker:2 receives the component function request, packed_input
# should be ready on worker:2.
with ops.device('/job:worker/replica:0/task:2/device:CPU:0'):
ret = packed_input + constant_op.constant(1.0)
return ret + constant_op.constant(1.0)
# Run the function on a worker:1
with ops.device('/job:worker/replica:0/task:1/device:CPU:0'):
self.assertAllEqual(func(packed_input).numpy(),
array_ops.ones(shape).numpy() * 2)
def testMultiDeviceFunctionWithPackedVariable(self):
with ops.device('/job:worker/replica:0/task:0/device:CPU:0'):
var0 = resource_variable_ops.ResourceVariable(1.0)