Merge pull request #34532 from ROCmSoftwarePlatform/r1.15-rccl-upstream-patch

[ROCm] r1.15 rccl upstream patch
This commit is contained in:
Mihai Maruseac 2020-01-20 21:48:26 -08:00 committed by GitHub
commit 360b2e318a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 292 additions and 94 deletions

View File

@ -249,6 +249,23 @@ class BaseGPUDevice::StreamGroupFactory {
VLOG(2) << "Created stream[" << stream_group_within_gpu
<< "] = " << group->compute;
#if TENSORFLOW_USE_ROCM
// ROCm streams are lightweight and will not necessarily trigger device
// queue init until they are first used. For optimal performance,
// compute and nccl streams must be immediate siblings.
group->nccl = new se::Stream(executor);
group->nccl->Init();
VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
<< "] = " << group->nccl;
// ROCm streams are lightweight and will not necessarily trigger device
// queue init until they are first used. For optimal performance,
// compute and nccl streams must be immediate siblings.
// Force underlying resource creation now.
group->compute->ThenWaitFor(group->nccl);
group->nccl->ThenWaitFor(group->compute);
#endif
group->host_to_device = new se::Stream(executor);
group->host_to_device->Init();
VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
@ -371,8 +388,12 @@ Status BaseGPUDevice::Init(const SessionOptions& options) {
streams_.push_back(StreamGroupFactory::Global().GetOrCreate(
tf_gpu_id_, i, executor_, options.config.gpu_options()));
device_contexts_.push_back(new GPUDeviceContext(
i, streams_.back()->compute, streams_.back()->host_to_device,
streams_.back()->device_to_host, streams_.back()->device_to_device));
i, streams_.back()->compute,
#if TENSORFLOW_USE_ROCM
streams_.back()->nccl,
#endif
streams_.back()->host_to_device, streams_.back()->device_to_host,
streams_.back()->device_to_device));
}
em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,

View File

@ -137,6 +137,9 @@ class BaseGPUDevice : public LocalDevice {
friend class GPUDeviceTestHelper;
struct StreamGroup {
se::Stream* compute = nullptr;
#if TENSORFLOW_USE_ROCM
se::Stream* nccl = nullptr;
#endif
se::Stream* host_to_device = nullptr;
se::Stream* device_to_host = nullptr;
gtl::InlinedVector<se::Stream*, 4> device_to_device;

View File

@ -30,18 +30,28 @@ class GPUDeviceContext : public DeviceContext {
public:
// Does not take ownership of streams.
GPUDeviceContext(int stream_id, se::Stream* stream,
#if TENSORFLOW_USE_ROCM
se::Stream* nccl_stream,
#endif
se::Stream* host_to_device_stream,
se::Stream* device_to_host_stream,
gtl::InlinedVector<se::Stream*, 4> device_to_device_stream)
: stream_id_(stream_id),
stream_(stream),
#if TENSORFLOW_USE_ROCM
nccl_stream_(nccl_stream),
#endif
host_to_device_stream_(host_to_device_stream),
device_to_host_stream_(device_to_host_stream),
device_to_device_stream_(device_to_device_stream) {}
device_to_device_stream_(device_to_device_stream) {
}
~GPUDeviceContext() override {}
se::Stream* stream() const override { return stream_; }
#if TENSORFLOW_USE_ROCM
se::Stream* nccl_stream() const { return nccl_stream_; }
#endif
se::Stream* host_to_device_stream() const { return host_to_device_stream_; }
se::Stream* device_to_host_stream() const { return device_to_host_stream_; }
se::Stream* device_to_device_stream(int index) const {
@ -72,6 +82,10 @@ class GPUDeviceContext : public DeviceContext {
// The default primary stream to use for this context.
// All the memory belongs to this stream.
se::Stream* stream_;
#if TENSORFLOW_USE_ROCM
// The stream to use for nccl operations.
se::Stream* nccl_stream_;
#endif
// The stream to use for copying data from host into GPU.
se::Stream* host_to_device_stream_;
// The stream to use for copying data from GPU to host.

View File

@ -194,6 +194,17 @@ tf_cc_test(
],
)
# virtual targets since nested select statements not possible
tf_kernel_library(
name = "virtual_nccl",
deps = if_cuda(["@local_config_nccl//:nccl"]),
)
tf_kernel_library(
name = "virtual_rccl",
deps = if_rocm(["@local_config_rocm//rocm:rccl"]),
)
tf_kernel_library(
name = "collective_ops",
srcs = if_nccl([
@ -213,7 +224,8 @@ tf_kernel_library(
"//tensorflow/core:protos_all_cc",
"//tensorflow/core/profiler/lib:traceme",
] + if_nccl([
"@local_config_nccl//:nccl",
":virtual_nccl",
":virtual_rccl",
"//tensorflow/core/nccl:nccl_lib",
]),
)
@ -382,11 +394,14 @@ cc_library(
tf_kernel_library(
name = "nccl_kernels",
srcs = if_cuda([
srcs = if_cuda_or_rocm([
"nccl_ops.cc",
]),
deps = if_cuda([
"@local_config_nccl//:nccl",
]) + if_rocm([
"@local_config_rocm//rocm:rccl",
]) + if_cuda_or_rocm([
"//tensorflow/core/nccl:nccl_lib",
"//tensorflow/core:framework",
"//tensorflow/core:gpu_headers_lib",

View File

@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl.h"
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
@ -79,4 +79,4 @@ const string NcclBase::NcclCollectiveKey(const string& exec_key, int step_id) {
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

View File

@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/framework/collective.h"
namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
class NcclBase : public CollectiveImplementationInterface {
public:
@ -44,7 +44,7 @@ class NcclBase : public CollectiveImplementationInterface {
const CollectiveParams* col_params_; // Not owned
};
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow
#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_H_

View File

@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_broadcaster.h"
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
@ -32,9 +32,8 @@ void NcclBroadcaster::Run(StatusCallback done) {
string nccl_collective_key =
NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done));
compute_stream->parent(), compute_stream, gpu_info, col_ctx_->input,
col_ctx_->output, col_params_->default_rank, std::move(done));
VLOG(1)
<< "NcclBroadcast calling NcclManager::AddBroadcastSend/Recv num_tasks "
<< col_params_->group.num_tasks << " current task "
@ -80,4 +79,4 @@ REGISTER_COLLECTIVE(NcclBroadcast, NcclBroadcaster);
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

View File

@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"
namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
class NcclBroadcaster : public NcclBase {
public:
@ -29,7 +29,7 @@ class NcclBroadcaster : public NcclBase {
void Run(StatusCallback done) override;
};
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow
#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_BROADCASTER_H_

View File

@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_gatherer.h"
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
@ -32,9 +32,8 @@ void NcclGatherer::Run(StatusCallback done) {
string nccl_collective_key =
NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done));
compute_stream->parent(), compute_stream, gpu_info, col_ctx_->input,
col_ctx_->output, col_params_->default_rank, std::move(done));
VLOG(1) << "NcclGatherer calling NcclManager::AddToAllGather num_tasks "
<< col_params_->group.num_tasks << " current task "
<< col_params_->instance.task_names[col_params_->default_rank]
@ -70,4 +69,4 @@ REGISTER_COLLECTIVE(NcclGather, NcclGatherer);
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

View File

@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"
namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
class NcclGatherer : public NcclBase {
public:
@ -29,7 +29,7 @@ class NcclGatherer : public NcclBase {
void Run(StatusCallback done) override;
};
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow
#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_GATHERER_H_

View File

@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_reducer.h"
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
@ -109,8 +109,8 @@ void NcclReducer::Run(StatusCallback done) {
nccl_done.Notify();
};
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
compute_stream->parent(), compute_stream, gpu_info,
col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done_callback));
VLOG(1) << "NcclReducer calling NcclManager::AddToAllReduce num_tasks "
<< col_params_->group.num_tasks << " current task "
@ -182,4 +182,4 @@ REGISTER_COLLECTIVE(NcclReduce, NcclReducer);
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

View File

@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"
namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
class NcclReducer : public NcclBase {
public:
@ -29,7 +29,7 @@ class NcclReducer : public NcclBase {
void Run(StatusCallback done) override;
};
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow
#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_REDUCER_H_

View File

@ -13,11 +13,15 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#if GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include <vector>
#if GOOGLE_CUDA
#include "third_party/nccl/nccl.h"
#elif TENSORFLOW_USE_ROCM
#include "rocm/include/rccl/rccl.h"
#endif
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/nccl/nccl_manager.h"
@ -104,8 +108,8 @@ class NcclAllReduceOpKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, input, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
input, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddToAllReduce(
std::move(participant),
@ -136,8 +140,8 @@ class NcclReduceSendKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, &c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
&c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddReduceSend(
std::move(participant),
@ -173,8 +177,8 @@ class NcclReduceRecvKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, input, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
input, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddReduceRecv(
std::move(participant),
@ -208,8 +212,8 @@ class NcclBroadcastSendKernel : public NcclAsyncOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, &c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
&c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddBroadcastSend(
std::move(participant), {GetCollectiveKey(c),
@ -245,8 +249,8 @@ class NcclBroadcastRecvKernel : public NcclAsyncOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, /*input=*/nullptr, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
/*input=*/nullptr, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddBroadcastRecv(
std::move(participant), {GetCollectiveKey(c),
@ -276,4 +280,4 @@ REGISTER_KERNEL_BUILDER(Name("NcclReduce").Device(DEVICE_GPU), NcclStubKernel);
} // namespace
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

View File

@ -5,6 +5,8 @@
load("//tensorflow:tensorflow.bzl", "tf_cuda_cc_test")
load("//tensorflow:tensorflow.bzl", "tf_copts")
load("@local_config_cuda//cuda:build_defs.bzl", "if_cuda")
load("@local_config_rocm//rocm:build_defs.bzl", "if_rocm")
load("//tensorflow:tensorflow.bzl", "if_cuda_or_rocm")
load(
"//tensorflow/core/platform:default/build_config_root.bzl",
"tf_cuda_tests_tags",
@ -19,18 +21,22 @@ exports_files(["LICENSE"])
cc_library(
name = "nccl_lib",
srcs = if_cuda([
srcs = if_cuda_or_rocm([
"nccl_manager.cc",
"nccl_rewrite.cc",
]),
hdrs = if_cuda([
hdrs = if_cuda_or_rocm([
"nccl_manager.h",
]),
copts = tf_copts(),
deps = if_cuda([
"@local_config_nccl//:nccl",
]) + if_rocm([
"@local_config_rocm//rocm:rccl",
"//tensorflow/core:gpu_runtime",
]) + if_cuda_or_rocm([
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/memory",
"@local_config_nccl//:nccl",
"//tensorflow/core:core_cpu",
"//tensorflow/core:framework",
"//tensorflow/core:gpu_headers_lib",
@ -46,14 +52,19 @@ tf_cuda_cc_test(
srcs = ["nccl_manager_test.cc"],
tags = tf_cuda_tests_tags() + [
"no_cuda_on_cpu_tap", # TODO(b/120284216): re-enable multi_gpu
"rocm_multi_gpu", # this test fails on ROCm unless 4 GPUs are used
],
deps = [
"//tensorflow/core:test",
"//tensorflow/core:test_main",
"//tensorflow/core:testlib",
] + if_cuda([
] + if_cuda_or_rocm([
":nccl_lib",
]) + if_cuda([
"@local_config_nccl//:nccl",
"//tensorflow/core:cuda",
]) + if_rocm([
"@local_config_rocm//rocm:rccl",
"//tensorflow/core:rocm",
]),
)

View File

@ -16,15 +16,33 @@ limitations under the License.
#include <utility>
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include "tensorflow/core/lib/core/refcount.h"
#include "tensorflow/core/lib/core/threadpool.h"
#include "tensorflow/core/platform/cuda.h"
#include "tensorflow/core/platform/env.h"
#if GOOGLE_CUDA
#include "tensorflow/core/platform/cuda.h"
#elif TENSORFLOW_USE_ROCM
#include "tensorflow/core/platform/rocm.h"
#endif
namespace tensorflow {
#if GOOGLE_CUDA
using se::cuda::ScopedActivateExecutorContext;
#elif TENSORFLOW_USE_ROCM
using se::rocm::ScopedActivateExecutorContext;
// Local hipify of cuda symbols
#define cudaError_t hipError_t
#define cudaStream_t hipStream_t
#define cudaGetErrorString hipGetErrorString
#define cudaGetDevice hipGetDevice
#define cudaSetDevice hipSetDevice
#define cudaSuccess hipSuccess
int NcclManager::instance_count = 0;
#endif
#define NCCL_RETURN_IF_ERROR(...) \
do { \
ncclResult_t nccl_status = (__VA_ARGS__); \
@ -41,8 +59,6 @@ namespace tensorflow {
} \
} while (0)
using se::cuda::ScopedActivateExecutorContext;
// Contains data for a single stream used for nccl communication; this includes
// a background thread that calls NcclManager::LoopKernelLaunches.
struct NcclManager::NcclStream : public core::RefCounted {
@ -54,7 +70,12 @@ struct NcclManager::NcclStream : public core::RefCounted {
// The stream on which to run the nccl collective.
// This is a different stream than the tensorflow compute stream.
#if TENSORFLOW_USE_ROCM
// On ROCm, we borrow the nccl stream from the device context.
se::Stream* stream = nullptr;
#else
std::unique_ptr<se::Stream> stream;
#endif
// `mu` protects access to `pending_launches_`, which is the list of
// collectives ready but whose kernels are yet to be launched. When the
@ -140,6 +161,16 @@ struct NcclManager::Collective : public core::RefCounted {
single_node(num_local_devices_in == num_global_devices_in),
communicator_key(communicator_key_in) {
participants.reserve(num_local_devices_in);
#if TENSORFLOW_USE_ROCM
// On ROCm platform, this allows caller to either use the singleton instance
// or to manage one non-singleton NcclManager instance.
// For example, the nccl_manager_test will use both paradigms in the same
// executable, but not running concurrently (which would hang otherwise).
if (NcclManager::instance_count > 1) {
status = errors::Internal(
"ROCm cannot use multi-node NCCL collectives on a single node");
}
#endif
}
const string collective_key; // A unique key for debugging.
@ -178,9 +209,17 @@ struct NcclManager::Collective : public core::RefCounted {
Status status;
};
NcclManager::NcclManager() { VLOG(2) << "New NcclManager " << this; }
NcclManager::NcclManager() {
VLOG(2) << "New NcclManager " << this;
#if TENSORFLOW_USE_ROCM
++instance_count;
#endif
}
NcclManager::~NcclManager() {
VLOG(2) << "~NcclManager " << this;
#if TENSORFLOW_USE_ROCM
--instance_count;
#endif
for (auto& it : device_to_comm_streams_) {
for (NcclStream* nccl_stream : it.second) {
{
@ -194,6 +233,12 @@ NcclManager::~NcclManager() {
}
NcclManager* NcclManager::instance() {
static NcclManager* instance = new NcclManager();
#if TENSORFLOW_USE_ROCM
// singleton does not count against total instances
// see comment above in Collective constructor concerning ROCm platform
static std::once_flag once;
std::call_once(once, [] { --NcclManager::instance_count; });
#endif
return instance;
}
@ -205,14 +250,14 @@ string NcclManager::GenerateCommunicatorKey() {
Status NcclManager::GetCommunicator(NcclManager::Collective* collective,
NcclManager::Communicator** communicator) {
// Sort by executor to make ordering of executors deterministic.
// Sort by global rank to make ordering of participants deterministic.
std::sort(collective->participants.begin(), collective->participants.end(),
[](const std::unique_ptr<Participant>& a,
const std::unique_ptr<Participant>& b) {
if (a->executor == b->executor) {
return a->global_rank < b->global_rank;
}
if (a->global_rank == b->global_rank) {
return a->executor < b->executor;
}
return a->global_rank < b->global_rank;
});
mutex_lock l(mu_);
@ -298,8 +343,12 @@ Status NcclManager::GetCommunicator(NcclManager::Collective* collective,
if (nccl_stream == nullptr) {
nccl_stream = new NcclStream();
nccl_stream->executor = executor;
#if TENSORFLOW_USE_ROCM
nccl_stream->stream = collective->participants[i]->context->nccl_stream();
#else
nccl_stream->stream.reset(new se::Stream(executor));
nccl_stream->stream->Init();
#endif
streams.emplace_back(nccl_stream);
used_streams.insert(nccl_stream);
@ -589,7 +638,11 @@ void NcclManager::RunCollective(Collective* collective) {
}
void NcclManager::LoopKernelLaunches(NcclStream* nccl_stream) {
#if TENSORFLOW_USE_ROCM
se::Stream* comm_stream = nccl_stream->stream;
#else
se::Stream* comm_stream = nccl_stream->stream.get();
#endif
ScopedActivateExecutorContext scoped_context(nccl_stream->executor);
const cudaStream_t* cu_stream = reinterpret_cast<const cudaStream_t*>(
comm_stream->implementation()->GpuStreamMemberHack());
@ -709,4 +762,4 @@ void NcclManager::LoopKernelLaunches(NcclStream* nccl_stream) {
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

View File

@ -15,7 +15,7 @@ limitations under the License.
#ifndef TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_
#define TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include <vector>
@ -28,8 +28,14 @@ limitations under the License.
#include "absl/container/flat_hash_map.h"
#include "absl/memory/memory.h"
#if GOOGLE_CUDA
#include "third_party/nccl/nccl.h"
#elif TENSORFLOW_USE_ROCM
#include "rocm/include/rccl/rccl.h"
#include "tensorflow/core/common_runtime/gpu_device_context.h"
#endif
#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
#include "tensorflow/core/framework/device_base.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/stream_executor.h"
@ -49,6 +55,10 @@ class NcclManager {
static NcclManager* instance();
#if TENSORFLOW_USE_ROCM
static int instance_count;
#endif
// Calls `ncclGetUniqueId` and returns the id as a string. The returned value
// may be shared with other participants on different nodes and passed in to
// multi-node collective invocations.
@ -57,12 +67,15 @@ class NcclManager {
// A participant in a Collective.
struct Participant {
Participant(se::StreamExecutor* executor, se::Stream* tensor_stream,
EventMgr* event_mgr, int gpu_device_id, const Tensor* input,
Tensor* output, int global_rank, DoneCallback done_callback)
const DeviceBase::GpuDeviceInfo* info, const Tensor* input, Tensor* output,
int global_rank, DoneCallback done_callback)
: executor(executor),
tensor_stream(tensor_stream),
event_mgr(event_mgr),
gpu_device_id(gpu_device_id),
event_mgr(info->event_mgr),
gpu_device_id(info->gpu_id),
#if TENSORFLOW_USE_ROCM
context(static_cast<GPUDeviceContext*>(info->default_context)),
#endif
input(input),
input_event(nullptr),
output(output),
@ -97,6 +110,10 @@ class NcclManager {
const int gpu_device_id;
#if TENSORFLOW_USE_ROCM
GPUDeviceContext* const context;
#endif
// Owned by the caller, who must keep it live until `done_callback` is
// called. Is NULL for participants that only receive data.
const Tensor* input;
@ -245,6 +262,6 @@ class NcclManager {
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#endif // TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_

View File

@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
#include "tensorflow/core/nccl/nccl_manager.h"
@ -303,11 +303,11 @@ class NcclManagerTest : public ::testing::Test {
for (int local_rank = 0; local_rank < num_ranks_per_node;
++local_rank) {
auto* device = this->GetDevice(local_rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
const int global_rank = node * num_ranks_per_node + local_rank;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->ins[global_rank], &test_case->outs[global_rank],
global_rank, this->CreateDoneCallback(test_case.get()));
node_states[node].nccl_manager.AddToAllReduce(
@ -351,7 +351,7 @@ class NcclManagerTest : public ::testing::Test {
src_global_rank, local_rank, &node_states,
&collective_key, &communicator_key, &test_case]() {
auto* device = this->GetDevice(local_rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
const int global_rank = node * num_ranks_per_node + local_rank;
auto* input = global_rank == src_global_rank
@ -361,7 +361,7 @@ class NcclManagerTest : public ::testing::Test {
? nullptr
: &test_case->outs[global_rank];
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(), input,
device->executor(), stream, info, input,
output, global_rank, this->CreateDoneCallback(test_case.get()));
if (global_rank == src_global_rank) {
VLOG(1) << "AddBroadcastSend node " << node << " global_rank "
@ -442,10 +442,10 @@ TYPED_TEST(NcclManagerTest, BasicSumReduction) {
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
VLOG(2) << "rank " << rank << " device " << device->name();
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->ins[rank], &test_case->outs[rank], /*global_rank=*/-1,
this->CreateDoneCallback(test_case.get()));
NcclManager::instance()->AddToAllReduce(
@ -508,11 +508,11 @@ TYPED_TEST(NcclManagerTest, MultipleCallers) {
case_and_rank.pop_back();
}
auto* device = this->GetDevice(rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
typename TestFixture::TestCase* test_case = test_cases[test_num].get();
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->ins[rank], &test_case->outs[rank], /*global_rank=*/-1,
this->CreateDoneCallback(test_case));
NcclManager::instance()->AddToAllReduce(
@ -551,10 +551,10 @@ TYPED_TEST(NcclManagerTest, BasicAllGather) {
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
VLOG(2) << "rank " << rank << " device " << device->name();
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->ins[rank], &test_case->outs[rank], rank,
this->CreateDoneCallback(test_case.get()));
NcclManager::instance()->AddToAllGather(
@ -585,7 +585,12 @@ TYPED_TEST(NcclManagerTest, InPlaceBroadcast) {
// Test broadcast with increasing ranks.
TYPED_TEST(NcclManagerTest, BroadcastWithDifferentRanks) {
for (int num_ranks = 4; num_ranks <= 8; ++num_ranks) {
#if TENSORFLOW_USE_ROCM
for (int num_ranks = 1; num_ranks <= 4; ++num_ranks)
#else
for (int num_ranks = 4; num_ranks <= 8; ++num_ranks)
#endif
{
const int src_rank = static_cast<int>(random::New64() % num_ranks);
for (int in_place_idx = 0; in_place_idx <= 1; ++in_place_idx) {
const bool in_place = in_place_idx == 0;
@ -603,12 +608,14 @@ TEST(NcclManagerTest, CommunicatorKey) {
EXPECT_EQ(communicator_key.size(), NCCL_UNIQUE_ID_BYTES);
}
#if !TENSORFLOW_USE_ROCM
// This test creates `num_nodes` NcclManagers to simulate a multi-node
// environment. It works on a single node and reuses GPUs. It enqueues NCCL
// kernels on separate stream per rank.
TYPED_TEST(NcclManagerTest, MultiNode) {
this->RunMultiNodeAllReduceTest(/*num_nodes=*/2, /*num_ranks_per_node=*/4);
}
#endif
// Tests that specifying `communicator_key` with a single node NCCL collective
// works well.
@ -618,9 +625,15 @@ TYPED_TEST(NcclManagerTest, MultiNodeSingle) {
// Multi-node broadcast.
TYPED_TEST(NcclManagerTest, MultiNodeBroadcast) {
#if TENSORFLOW_USE_ROCM
this->RunMultiNodeBroadcastTest(/*num_nodes=*/1, /*num_ranks_per_node=*/4,
/*src_node=*/0, /*src_local_rank=*/3,
/*in_place=*/true);
#else
this->RunMultiNodeBroadcastTest(/*num_nodes=*/4, /*num_ranks_per_node=*/8,
/*src_node=*/2, /*src_local_rank=*/3,
/*in_place=*/true);
#endif
}
// Checks that we return error status if a collective_key is used for different
@ -633,10 +646,10 @@ TYPED_TEST(NcclManagerTest, ConsistentCollectiveType) {
TensorShape({2, 3}), 0.0f));
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->ins[rank], &test_case->outs[rank], /*global_rank=*/-1,
this->CreateDoneCallback(test_case.get()));
if (rank == 0) {
@ -670,10 +683,10 @@ TYPED_TEST(NcclManagerTest, ConsistentCommunicatorKey) {
TensorShape({2, 3}), 0.0f));
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->ins[rank], &test_case->outs[rank], /*global_rank=*/-1,
this->CreateDoneCallback(test_case.get()));
NcclManager::instance()->AddToAllReduce(
@ -699,11 +712,11 @@ TYPED_TEST(NcclManagerTest, ConsistentNumberOfDevices) {
TensorShape({2, 3}), 0.0f));
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
int num_devices = rank == 0 ? num_ranks : num_ranks + 1;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->ins[rank], &test_case->outs[rank], /*global_rank=*/-1,
this->CreateDoneCallback(test_case.get()));
NcclManager::instance()->AddToAllReduce(std::move(participant),
@ -728,10 +741,10 @@ TYPED_TEST(NcclManagerTest, BroadcastNoSource) {
/*src_rank=*/-1, false));
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(), nullptr,
device->executor(), stream, info, nullptr,
&test_case->outs[rank], rank,
this->CreateDoneCallback(test_case.get()));
NcclManager::instance()->AddBroadcastRecv(std::move(participant),
@ -755,10 +768,10 @@ TYPED_TEST(NcclManagerTest, BroadcastMultipleSends) {
/*src_rank=*/-1, false));
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->outs[rank], &test_case->outs[rank], rank,
this->CreateDoneCallback(test_case.get()));
NcclManager::instance()->AddBroadcastSend(std::move(participant),
@ -783,10 +796,10 @@ TYPED_TEST(NcclManagerTest, BroadcastInconsistentSource) {
/*src_rank=*/-1, false));
for (int rank = 0; rank < num_ranks; ++rank) {
auto* device = this->GetDevice(rank);
auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
auto* info = device->tensorflow_gpu_device_info();
auto* stream = device->tensorflow_gpu_device_info()->stream;
auto participant = absl::make_unique<NcclManager::Participant>(
device->executor(), stream, event_mgr, device->gpu_id(),
device->executor(), stream, info,
&test_case->outs[rank], &test_case->outs[rank], rank,
this->CreateDoneCallback(test_case.get()));
NcclManager::instance()->AddBroadcastRecv(std::move(participant),
@ -802,4 +815,4 @@ TYPED_TEST(NcclManagerTest, BroadcastInconsistentSource) {
} // namespace tensorflow
#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

View File

@ -35,10 +35,11 @@ export TF_GPU_COUNT=${N_GPUS}
yes "" | $PYTHON_BIN_PATH configure.py
# Run bazel test command. Double test timeouts to avoid flakes.
# Run tests requiring more than one GPU separately.
bazel test \
--config=rocm \
-k \
--test_tag_filters=gpu,-no_gpu,-no_rocm,-benchmark-test,-no_oss,-oss_serial, \
--test_tag_filters=gpu,-no_gpu,-no_rocm,-benchmark-test,-no_oss,-oss_serial,-rocm_multi_gpu, \
--test_timeout 600,900,2400,7200 \
--test_output=errors \
--jobs=${N_JOBS} \
@ -51,6 +52,20 @@ bazel test \
-//tensorflow/contrib/... \
-//tensorflow/lite/... \
-//tensorflow/python/compiler/tensorrt/... \
&& \
bazel test \
--config=rocm \
-k \
--test_tag_filters=-no_rocm,rocm_multi_gpu, \
--test_timeout 600,900,2400,7200 \
--test_output=errors \
--jobs=${N_JOBS} \
--local_test_jobs=1 \
--test_sharding_strategy=disabled \
-- \
//tensorflow/... \
-//tensorflow/compiler/... \
-//tensorflow/contrib/... \
-//tensorflow/lite/... \
-//tensorflow/python/compiler/tensorrt/...

View File

@ -84,6 +84,18 @@ cc_library(
visibility = ["//visibility:public"],
)
cc_library(
name = "rccl",
srcs = ["rocm/lib/%{rccl_lib}"],
data = ["rocm/lib/%{rccl_lib}"],
includes = [
".",
"rocm/include",
],
linkstatic = 1,
visibility = ["//visibility:public"],
)
cc_library(
name = "rocm",
visibility = ["//visibility:public"],

View File

@ -201,6 +201,9 @@ def _rocm_include_path(repository_ctx, rocm_config):
# Add MIOpen headers
inc_dirs.append("/opt/rocm/miopen/include")
# Add RCCL headers
inc_dirs.append("/opt/rocm/rccl/include")
# Add hcc headers
inc_dirs.append("/opt/rocm/hcc/include")
inc_dirs.append("/opt/rocm/hcc/compiler/lib/clang/7.0.0/include/")
@ -220,7 +223,7 @@ def _rocm_include_path(repository_ctx, rocm_config):
return inc_dirs
def _enable_rocm(repository_ctx):
def enable_rocm(repository_ctx):
if "TF_NEED_ROCM" in repository_ctx.os.environ:
enable_rocm = repository_ctx.os.environ["TF_NEED_ROCM"].strip()
return enable_rocm == "1"
@ -472,6 +475,12 @@ def _find_libs(repository_ctx, rocm_config):
cpu_value,
rocm_config.rocm_toolkit_path + "/miopen",
),
"rccl": _find_rocm_lib(
"rccl",
repository_ctx,
cpu_value,
rocm_config.rocm_toolkit_path + "/rccl",
),
}
def _get_rocm_config(repository_ctx):
@ -554,6 +563,7 @@ def _create_dummy_repository(repository_ctx):
"%{hip_lib}": _lib_name("hip", cpu_value),
"%{rocblas_lib}": _lib_name("rocblas", cpu_value),
"%{miopen_lib}": _lib_name("miopen", cpu_value),
"%{rccl_lib}": _lib_name("rccl", cpu_value),
"%{rocfft_lib}": _lib_name("rocfft", cpu_value),
"%{hiprand_lib}": _lib_name("hiprand", cpu_value),
"%{copy_rules}": "",
@ -695,6 +705,12 @@ def _create_local_rocm_repository(repository_ctx):
src_dir = rocm_toolkit_path + "/miopen/include",
out_dir = "rocm/include/miopen",
),
make_copy_dir_rule(
repository_ctx,
name = "rccl-include",
src_dir = rocm_toolkit_path + "/rccl/include",
out_dir = "rocm/include/rccl",
),
]
rocm_libs = _find_libs(repository_ctx, rocm_config)
@ -731,11 +747,13 @@ def _create_local_rocm_repository(repository_ctx):
"%{rocfft_lib}": rocm_libs["rocfft"].file_name,
"%{hiprand_lib}": rocm_libs["hiprand"].file_name,
"%{miopen_lib}": rocm_libs["miopen"].file_name,
"%{rccl_lib}": rocm_libs["rccl"].file_name,
"%{copy_rules}": "\n".join(copy_rules),
"%{rocm_headers}": ('":rocm-include",\n' +
'":rocfft-include",\n' +
'":rocblas-include",\n' +
'":miopen-include",'),
'":miopen-include",\n' +
'":rccl-include",'),
},
)
@ -878,7 +896,7 @@ def _create_remote_rocm_repository(repository_ctx, remote_config_repo):
def _rocm_autoconf_impl(repository_ctx):
"""Implementation of the rocm_autoconf repository rule."""
if not _enable_rocm(repository_ctx):
if not enable_rocm(repository_ctx):
_create_dummy_repository(repository_ctx)
elif _TF_ROCM_CONFIG_REPO in repository_ctx.os.environ:
_create_remote_rocm_repository(

View File

@ -19,6 +19,10 @@ load(
"find_cuda_config",
"get_cpu_value",
)
load(
"//third_party/gpus:rocm_configure.bzl",
"enable_rocm",
)
_CUDA_TOOLKIT_PATH = "CUDA_TOOLKIT_PATH"
_NCCL_HDR_PATH = "NCCL_HDR_PATH"
@ -62,8 +66,8 @@ def _label(file):
def _nccl_configure_impl(repository_ctx):
"""Implementation of the nccl_configure repository rule."""
if (not enable_cuda(repository_ctx) or
get_cpu_value(repository_ctx) not in ("Linux", "FreeBSD")):
if ((not enable_cuda(repository_ctx) and not enable_rocm(repository_ctx))
or get_cpu_value(repository_ctx) not in ("Linux", "FreeBSD")):
# Add a dummy build file to make bazel query happy.
repository_ctx.file("BUILD", _NCCL_DUMMY_BUILD_CONTENT)
return