TFE copy across gpus

PiperOrigin-RevId: 179622259
This commit is contained in:
Alexandre Passos 2017-12-19 16:51:25 -08:00 committed by TensorFlower Gardener
parent 1ddb053077
commit ef523e77ca
3 changed files with 87 additions and 41 deletions

View File

@ -33,7 +33,7 @@ tf_cuda_library(
"//tensorflow/core:lib_internal", "//tensorflow/core:lib_internal",
"//tensorflow/core:protos_all_cc", "//tensorflow/core:protos_all_cc",
], ],
}), }) + ["//tensorflow/core:gpu_runtime"],
) )
tf_cuda_library( tf_cuda_library(
@ -55,6 +55,10 @@ tf_cuda_library(
tf_cuda_cc_test( tf_cuda_cc_test(
name = "c_api_test", name = "c_api_test",
srcs = ["c_api_test.cc"], srcs = ["c_api_test.cc"],
tags = [
"guitar",
"multi_gpu",
],
deps = [ deps = [
":c_api", ":c_api",
"//tensorflow/core:lib", "//tensorflow/core:lib",

View File

@ -25,6 +25,7 @@ limitations under the License.
#include "tensorflow/c/c_api_internal.h" #include "tensorflow/c/c_api_internal.h"
#include "tensorflow/c/eager/c_api_internal.h" #include "tensorflow/c/eager/c_api_internal.h"
#include "tensorflow/c/eager/runtime.h" #include "tensorflow/c/eager/runtime.h"
#include "tensorflow/core/common_runtime/copy_tensor.h"
#include "tensorflow/core/common_runtime/device_factory.h" #include "tensorflow/core/common_runtime/device_factory.h"
#include "tensorflow/core/common_runtime/device_mgr.h" #include "tensorflow/core/common_runtime/device_mgr.h"
#include "tensorflow/core/common_runtime/function.h" #include "tensorflow/core/common_runtime/function.h"
@ -167,18 +168,6 @@ TFE_TensorHandle* TFE_TensorHandleCopyToDevice(TFE_TensorHandle* h,
if (is_same_device) { if (is_same_device) {
return new TFE_TensorHandle(h->t, dst_cpu ? nullptr : dstd); return new TFE_TensorHandle(h->t, dst_cpu ? nullptr : dstd);
} }
const bool src_cpu = IsCPU(srcd);
if (src_cpu == dst_cpu) {
TF_SetStatus(
status, TF_INVALID_ARGUMENT,
tensorflow::strings::StrCat(
"TFE_TensorHandleCopyToDevice requires either the source "
"TFE_TensorHandle be on or the destination device be on CPU "
"or be the same (they are ",
DeviceName(srcd), " and ", DeviceName(dstd), " in this call)")
.c_str());
return nullptr;
}
tensorflow::Tensor* src = &(h->t); tensorflow::Tensor* src = &(h->t);
if (!dst_cpu && !tensorflow::DataTypeCanUseMemcpy(src->dtype())) { if (!dst_cpu && !tensorflow::DataTypeCanUseMemcpy(src->dtype())) {
TF_SetStatus( TF_SetStatus(
@ -189,26 +178,19 @@ TFE_TensorHandle* TFE_TensorHandleCopyToDevice(TFE_TensorHandle* h,
.c_str()); .c_str());
return nullptr; return nullptr;
} }
if (src_cpu) { tensorflow::Tensor dst(dstd->GetAllocator(tensorflow::AllocatorAttributes()),
tensorflow::Tensor dst( src->dtype(), src->shape());
dstd->GetAllocator(tensorflow::AllocatorAttributes()), src->dtype(), if (src->shape().num_elements() == 0) {
src->shape()); return new TFE_TensorHandle(dst, dst_cpu ? nullptr : dstd);
if (src->shape().num_elements() == 0) { }
return new TFE_TensorHandle(dst, dstd); tensorflow::DeviceContext* src_device_context = nullptr;
} if (!IsCPU(srcd)) {
tensorflow::Notification n; src_device_context = srcd->tensorflow_gpu_device_info()->default_context;
dstd->tensorflow_gpu_device_info()->default_context->CopyCPUTensorToDevice( }
src, dstd, &dst, [status, &n](const tensorflow::Status& s) { tensorflow::DeviceContext* dst_device_context = nullptr;
status->status = s; if (!dst_cpu) {
n.Notify(); dst_device_context = dstd->tensorflow_gpu_device_info()->default_context;
});
n.WaitForNotification();
return (TF_GetCode(status) == TF_OK) ? new TFE_TensorHandle(dst, dstd)
: nullptr;
} }
CHECK(dst_cpu);
tensorflow::Tensor dst(src->dtype(), src->shape());
tensorflow::Notification n;
// TODO(ashankar): The Sync() call below may be more aggressive than // TODO(ashankar): The Sync() call below may be more aggressive than
// necessary. It is based on knowledge of implementation details - that // necessary. It is based on knowledge of implementation details - that
// GPU devices are implemented using 3 streams - one for host->device copies, // GPU devices are implemented using 3 streams - one for host->device copies,
@ -217,16 +199,18 @@ TFE_TensorHandle* TFE_TensorHandleCopyToDevice(TFE_TensorHandle* h,
// but more than necessary (since it waits for operations that might have // but more than necessary (since it waits for operations that might have
// nothing to do with this tensor to complete). // nothing to do with this tensor to complete).
status->status = srcd->Sync(); status->status = srcd->Sync();
if (!status->status.ok()) return nullptr; tensorflow::Notification n;
srcd->tensorflow_gpu_device_info()->default_context->CopyDeviceTensorToCPU( tensorflow::CopyTensor::ViaDMA("copy", src_device_context, dst_device_context,
src, "IGNORE_MY_TENSOR_NAME", srcd, &dst, srcd, dstd, tensorflow::AllocatorAttributes(),
[status, &n](const tensorflow::Status& s) { tensorflow::AllocatorAttributes(), src, &dst,
status->status = s; [status, &n](const tensorflow::Status& s) {
n.Notify(); status->status = s;
}); n.Notify();
});
n.WaitForNotification(); n.WaitForNotification();
return (TF_GetCode(status) == TF_OK) ? new TFE_TensorHandle(dst, nullptr) return (TF_GetCode(status) == TF_OK)
: nullptr; ? new TFE_TensorHandle(dst, dst_cpu ? nullptr : dstd)
: nullptr;
} }
TFE_Op* TFE_NewOp(TFE_Context* ctx, const char* op_or_function_name, TFE_Op* TFE_NewOp(TFE_Context* ctx, const char* op_or_function_name,

View File

@ -216,6 +216,64 @@ TEST(CAPI, TensorHandleCopyBetweenDevices) {
EXPECT_EQ(TF_OK, TF_GetCode(status.get())) << TF_Message(status.get()); EXPECT_EQ(TF_OK, TF_GetCode(status.get())) << TF_Message(status.get());
} }
TEST(CAPI, TensorHandleCopyBetweenTwoGPUDevices) {
std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)> status(
TF_NewStatus(), TF_DeleteStatus);
TFE_ContextOptions* opts = TFE_NewContextOptions();
TFE_Context* ctx = TFE_NewContext(opts, status.get());
TFE_DeleteContextOptions(opts);
ASSERT_EQ(TF_OK, TF_GetCode(status.get())) << TF_Message(status.get());
TFE_TensorHandle* hcpu = TestMatrixTensorHandle();
TF_Tensor* t = TFE_TensorHandleResolve(hcpu, status.get());
ASSERT_EQ(TF_OK, TF_GetCode(status.get())) << TF_Message(status.get());
TF_DeviceList* devices = TFE_ContextListDevices(ctx, status.get());
ASSERT_EQ(TF_OK, TF_GetCode(status.get())) << TF_Message(status.get());
const int num_devices = TF_DeviceListCount(devices);
const char* kCPUDevice = "CPU:0";
if (num_devices < 3) {
TF_DeleteDeviceList(devices);
TF_DeleteTensor(t);
TFE_DeleteTensorHandle(hcpu);
TFE_DeleteContext(ctx, status.get());
return;
}
const string gpu_1_name(TF_DeviceListName(devices, 1, status.get()));
ASSERT_TRUE(TF_GetCode(status.get()) == TF_OK);
const string gpu_2_name(TF_DeviceListName(devices, 2, status.get()));
ASSERT_TRUE(TF_GetCode(status.get()) == TF_OK);
TFE_TensorHandle* hdevice =
TFE_TensorHandleCopyToDevice(hcpu, ctx, gpu_1_name.c_str(), status.get());
ASSERT_TRUE(TF_GetCode(status.get()) == TF_OK);
TFE_TensorHandle* hdevice2 = TFE_TensorHandleCopyToDevice(
hdevice, ctx, gpu_2_name.c_str(), status.get());
ASSERT_TRUE(TF_GetCode(status.get()) == TF_OK);
TFE_DeleteTensorHandle(hdevice);
// Copy back to CPU
TFE_TensorHandle* hcopy =
TFE_TensorHandleCopyToDevice(hdevice2, ctx, kCPUDevice, status.get());
ASSERT_TRUE(TF_GetCode(status.get()) == TF_OK);
TFE_DeleteTensorHandle(hdevice2);
// Ensure that the contents are the same!
TF_Tensor* tcopy = TFE_TensorHandleResolve(hcopy, status.get());
TFE_DeleteTensorHandle(hcopy);
ASSERT_TRUE(TF_GetCode(status.get()) == TF_OK);
EXPECT_EQ(TF_TensorByteSize(t), TF_TensorByteSize(tcopy));
EXPECT_EQ(
0, memcmp(TF_TensorData(t), TF_TensorData(tcopy), TF_TensorByteSize(t)));
TF_DeleteTensor(tcopy);
TF_DeleteDeviceList(devices);
TF_DeleteTensor(t);
TFE_DeleteTensorHandle(hcpu);
TFE_DeleteContext(ctx, status.get());
EXPECT_EQ(TF_OK, TF_GetCode(status.get())) << TF_Message(status.get());
}
TEST(CAPI, TensorHandleSilentCopy) { TEST(CAPI, TensorHandleSilentCopy) {
std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)> status( std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)> status(
TF_NewStatus(), TF_DeleteStatus); TF_NewStatus(), TF_DeleteStatus);