Move XlaAllocator to stream_executor and rename it to TfAllocatorAdapter.

The new package allows usage from SE, and the new name reflects what it is
doing.

PiperOrigin-RevId: 249501148
This commit is contained in:
George Karpenkov 2019-05-22 12:49:19 -07:00 committed by TensorFlower Gardener
parent 9cc84f78b8
commit 9067ab24bf
14 changed files with 392 additions and 257 deletions

View File

@ -258,10 +258,8 @@ cc_library(
name = "xla_launch_util",
srcs = ["xla_launch_util.cc"],
hdrs = ["xla_launch_util.h"],
# TODO(skyewm): remove this once XlaAllocator is factored out.
visibility = [
":internal",
"//tensorflow/compiler/xla/python:__pkg__",
],
deps = [
":common",

View File

@ -29,6 +29,7 @@ cc_library(
"//tensorflow/core:lib",
"//tensorflow/core:state_ops_op_lib",
"//tensorflow/core:stream_executor_no_cuda",
"//tensorflow/stream_executor:tf_allocator_adapter",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/memory",
],

View File

@ -61,7 +61,7 @@ XlaPlatformInfo PlatformInfoFromContext(OpKernelConstruction* ctx) {
DeviceType device_type = ctx->device_type();
se::Platform::Id platform_id = nullptr;
const XlaDevice::Metadata* xla_device_metadata = nullptr;
std::unique_ptr<XlaAllocator> xla_allocator;
std::unique_ptr<se::TfAllocatorAdapter> xla_allocator;
se::DeviceMemoryAllocator* device_allocator = nullptr;
if (ctx->device_type() == DeviceType(DEVICE_CPU)) {
@ -93,7 +93,7 @@ XlaPlatformInfo PlatformInfoFromContext(OpKernelConstruction* ctx) {
se::MultiPlatformManager::PlatformWithId(platform_id);
OP_REQUIRES_OK_RETURN(ctx, XlaPlatformInfo(), maybe_platform.status());
xla_allocator = absl::make_unique<XlaAllocator>(
xla_allocator = absl::make_unique<se::TfAllocatorAdapter>(
maybe_platform.ValueOrDie(), ctx->device()->GetAllocator({}));
}

View File

@ -27,6 +27,7 @@ limitations under the License.
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/util/stream_executor_util.h"
#include "tensorflow/stream_executor/tf_allocator_adapter.h"
namespace tensorflow {
@ -36,11 +37,11 @@ class XlaPlatformInfo {
public:
XlaPlatformInfo() : device_type_("") {}
XlaPlatformInfo(XlaPlatformInfo&&) = default;
explicit XlaPlatformInfo(const DeviceType device_type,
se::Platform::Id platform_id,
const XlaDevice::Metadata* xla_device_metadata,
std::unique_ptr<XlaAllocator> xla_allocator,
se::DeviceMemoryAllocator* device_allocator)
explicit XlaPlatformInfo(
const DeviceType device_type, se::Platform::Id platform_id,
const XlaDevice::Metadata* xla_device_metadata,
std::unique_ptr<se::TfAllocatorAdapter> xla_allocator,
se::DeviceMemoryAllocator* device_allocator)
: device_type_(device_type),
platform_id_(platform_id),
xla_device_metadata_(xla_device_metadata),
@ -84,8 +85,8 @@ class XlaPlatformInfo {
// then device_allocator_ is the xla::Backend's memory allocator and
// xla_allocator_ is null. If the op is placed on a regular CPU or GPU device
// then device_allocator_ is null and xla_allocator_ points to an appropriate
// XlaAllocator instance.
std::unique_ptr<XlaAllocator> xla_allocator_;
// se::TfAllocatorAdapter instance.
std::unique_ptr<se::TfAllocatorAdapter> xla_allocator_;
se::DeviceMemoryAllocator* device_allocator_;
TF_DISALLOW_COPY_AND_ASSIGN(XlaPlatformInfo);

View File

@ -167,32 +167,6 @@ Status SnapshotResourceVariables(OpKernelContext* ctx,
return Status::OK();
}
XlaAllocator::XlaAllocator(const se::Platform* platform, Allocator* wrapped)
: se::DeviceMemoryAllocator(platform), wrapped_(wrapped) {}
XlaAllocator::~XlaAllocator() {}
xla::StatusOr<se::OwningDeviceMemory> XlaAllocator::Allocate(
int device_ordinal, uint64 size, bool retry_on_failure) {
AllocationAttributes attrs;
attrs.no_retry_on_failure = !retry_on_failure;
void* data = nullptr;
if (size != 0) {
data = wrapped_->AllocateRaw(Allocator::kAllocatorAlignment, size, attrs);
if (data == nullptr) {
return errors::ResourceExhausted(
"Out of memory while trying to allocate ", size, " bytes.");
}
}
return se::OwningDeviceMemory(se::DeviceMemoryBase(data, size),
device_ordinal, this);
}
Status XlaAllocator::Deallocate(int device_ordinal, se::DeviceMemoryBase mem) {
wrapped_->DeallocateRaw(mem.opaque());
return Status::OK();
}
XlaComputationLaunchContext::XlaComputationLaunchContext(
xla::LocalClient* client, se::DeviceMemoryAllocator* xla_allocator,
bool allocate_xla_tensors, bool use_multiple_streams)

View File

@ -32,7 +32,6 @@ limitations under the License.
#include "tensorflow/stream_executor/device_memory_allocator.h"
namespace tensorflow {
class XlaAllocator;
// Struct that represents a possibly-absent Tensor.
struct OptionalTensor {
@ -104,74 +103,6 @@ class VariableInfo {
Status LockVariables(absl::Span<VariableInfo> variables)
EXCLUSIVE_LOCK_FUNCTION();
// Adapter class that wraps a Tensorflow allocator as an XLA allocator.
// Assumes that the Tensorflow allocator permits asynchronous deallocation:
// see comment on `AllowsAsynchronousDeallocation()`.
class XlaAllocator : public se::DeviceMemoryAllocator {
public:
XlaAllocator(const se::Platform* platform, Allocator* wrapped);
~XlaAllocator() override;
xla::StatusOr<se::OwningDeviceMemory> Allocate(
int device_ordinal, uint64 size, bool retry_on_failure) override;
Status Deallocate(int device_ordinal, se::DeviceMemoryBase mem) override;
// The Tensorflow BFC allocator used on GPU allows host-side deallocation
// before GPU execution takes place. Tensorflow uses the ordering of the main
// compute stream to enforce a happens-before relationship between a memory
// allocation and code that reuses the same memory. If Tensorflow adds
// support for multiple GPU streams or allocators with different ordering
// requirements, this code may need to change.
// (This attribute has no effect on CPU.)
bool AllowsAsynchronousDeallocation() const override { return true; }
private:
Allocator* wrapped_;
};
// Adapter class that wraps per-device TF allocators as an XLA allocator.
// Assumes that the Tensorflow allocator permits asynchronous deallocation;
// see comment on `AllowsAsynchronousDeallocation()`.
class MultiDeviceAdapter : public se::DeviceMemoryAllocator {
public:
MultiDeviceAdapter(
const se::Platform* platform,
std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators)
: DeviceMemoryAllocator(platform),
tf_allocators_(std::move(tf_allocators)) {
for (const auto& tf_allocator : tf_allocators_) {
per_device_allocators_.emplace_back(platform, tf_allocator.get());
}
}
xla::StatusOr<se::OwningDeviceMemory> Allocate(
int device_ordinal, uint64 size, bool retry_on_failure) override {
CHECK_LT(device_ordinal, per_device_allocators_.size());
return per_device_allocators_[device_ordinal].Allocate(device_ordinal, size,
retry_on_failure);
}
Status Deallocate(int device_ordinal, se::DeviceMemoryBase mem) override {
CHECK_LT(device_ordinal, per_device_allocators_.size());
return per_device_allocators_[device_ordinal].Deallocate(device_ordinal,
mem);
}
// The Tensorflow BFC allocator used on GPU allows host-side deallocation
// before GPU execution takes place. Tensorflow uses the ordering of the main
// compute stream to enforce a happens-before relationship between a memory
// allocation and code that reuses the same memory. If Tensorflow adds
// support for multiple GPU streams or allocators with different ordering
// requirements, this code may need to change.
// (This attribute has no effect on CPU.)
bool AllowsAsynchronousDeallocation() const override { return true; }
private:
std::vector<tensorflow::XlaAllocator> per_device_allocators_;
// The wrapped TF allocators backing per_device_allocators_ (XlaAllocator does
// not take ownership of its underlying Allocator).
std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators_;
};
// Helper class to perform the marshalling of TensorFlow inputs and outputs to
// ShapedBuffers suitable for passing to an XLA computation.
class XlaComputationLaunchContext {

View File

@ -145,7 +145,6 @@ cc_library(
":shared_device_buffer",
":types",
":worker_thread",
"//tensorflow/compiler/jit:xla_launch_util",
"//tensorflow/compiler/xla:executable_run_options",
"//tensorflow/compiler/xla:literal",
"//tensorflow/compiler/xla:literal_util",
@ -166,6 +165,7 @@ cc_library(
"//tensorflow/core:gpu_mem_allocator",
"//tensorflow/core:lib",
"//tensorflow/core/profiler/lib:traceme",
"//tensorflow/stream_executor:tf_allocator_adapter",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",

View File

@ -71,7 +71,6 @@ limitations under the License.
#include "absl/synchronization/notification.h"
#include "absl/time/time.h"
#include "include/pybind11/pybind11.h"
#include "tensorflow/compiler/jit/xla_launch_util.h"
#include "tensorflow/compiler/xla/client/client_library.h"
#include "tensorflow/compiler/xla/client/xla_computation.h"
#include "tensorflow/compiler/xla/executable_run_options.h"
@ -88,6 +87,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/gpu/gpu_mem_allocator.h"
#include "tensorflow/core/platform/types.h"
#include "tensorflow/core/profiler/lib/traceme.h"
#include "tensorflow/stream_executor/tf_allocator_adapter.h"
namespace xla {
@ -174,9 +174,8 @@ void Device::ThenExecuteOnWorkerThread(se::Stream* stream,
[this, callback]() { worker_thread_->Schedule(std::move(callback)); });
}
static StatusOr<std::unique_ptr<tensorflow::MultiDeviceAdapter>>
CreateBFCAllocator(se::Platform* platform, LocalClient* client,
double memory_fraction) {
static StatusOr<std::unique_ptr<se::MultiDeviceAdapter>> CreateBFCAllocator(
se::Platform* platform, LocalClient* client, double memory_fraction) {
CHECK_GT(client->backend().device_count(), 0);
std::vector<std::unique_ptr<tensorflow::Allocator>> allocators;
for (se::StreamExecutor* executor : client->backend().stream_executors()) {
@ -203,8 +202,8 @@ CreateBFCAllocator(se::Platform* platform, LocalClient* client,
absl::StrCat("GPU_", device_ordinal, "_bfc"));
allocators.emplace_back(std::move(gpu_bfc_allocator));
}
return absl::make_unique<tensorflow::MultiDeviceAdapter>(
platform, std::move(allocators));
return absl::make_unique<se::MultiDeviceAdapter>(platform,
std::move(allocators));
}
StatusOr<std::shared_ptr<PyLocalClient>> PyLocalClient::Get(

View File

@ -1000,7 +1000,6 @@ cc_library(
name = "allocator",
srcs = [
"framework/allocator.cc",
"framework/allocator_registry.cc",
"framework/allocator_registry.h",
"framework/numeric_types.h",
"framework/tracking_allocator.cc",
@ -1012,12 +1011,37 @@ cc_library(
],
features = ["parse_headers"],
visibility = ["//visibility:public"],
deps = [
":lib",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/types:optional",
"//third_party/eigen3",
] + if_static(extra_deps = [":allocator_registry_impl"]),
alwayslink = 1,
)
# This target will be included in libtensorflow_framework.so via the
# framework_internal_impl target.
# All other dependencies on this target need to go through if_static guard,
# as otherwise duplicate registration in the registry will cause crashes.
cc_library(
name = "allocator_registry_impl",
srcs = [
"framework/allocator.h",
"framework/allocator_registry.cc",
"framework/allocator_registry.h",
"framework/cpu_allocator_impl.cc",
"framework/numeric_types.h",
"framework/tracking_allocator.h",
"framework/type_traits.h",
],
deps = [
":lib",
"//third_party/eigen3",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/types:optional",
],
alwayslink = 1,
)
cc_library(
@ -2873,6 +2897,7 @@ tf_cuda_library(
"**/*test*",
"**/*main.cc",
"framework/allocator.cc",
"framework/cpu_allocator_impl.cc",
"framework/allocator_registry.cc",
"framework/tracking_allocator.cc",
"example/example_parser_configuration.*",
@ -2906,6 +2931,7 @@ tf_cuda_library(
],
}),
deps = [
":allocator_registry_impl",
":allocator",
":feature_util",
":lib",

View File

@ -42,38 +42,9 @@ constexpr size_t Allocator::kAllocatorAlignment;
Allocator::~Allocator() {}
// If true, cpu allocator collects more stats.
static bool cpu_allocator_collect_stats = false;
// If true, cpu allocator collects full stats.
static bool cpu_allocator_collect_full_stats = false;
// Individual allocations large than this amount will trigger a warning.
static const double kLargeAllocationWarningThreshold = 0.1;
// If cpu_allocator_collect_stats is true, warn when the total allocated memory
// exceeds this threshold.
static const double kTotalAllocationWarningThreshold = 0.5;
static const int kMaxSingleAllocationWarnings = 5;
static const int kMaxTotalAllocationWarnings = 1;
// Cache first invocation to port::AvailableRam, as it can be expensive.
static int64_t LargeAllocationWarningBytes() {
static int64_t value = static_cast<int64>(port::AvailableRam() *
kLargeAllocationWarningThreshold);
return value;
}
static int64_t TotalAllocationWarningBytes() {
static int64_t value = static_cast<int64>(port::AvailableRam() *
kTotalAllocationWarningThreshold);
return value;
}
void EnableCPUAllocatorStats(bool enable) {
cpu_allocator_collect_stats = enable;
}
bool CPUAllocatorStatsEnabled() { return cpu_allocator_collect_stats; }
void EnableCPUAllocatorFullStats(bool enable) {
cpu_allocator_collect_full_stats = enable;
}
@ -85,120 +56,6 @@ string AllocatorAttributes::DebugString() const {
" gpu_compatible=", gpu_compatible(), ")");
}
namespace {
// A default Allocator for CPU devices. ProcessState::GetCPUAllocator() will
// return a different version that may perform better, but may also lack the
// optional stats triggered by the functions above. TODO(tucker): migrate all
// uses of cpu_allocator() except tests to use ProcessState instead.
class CPUAllocator : public Allocator {
public:
CPUAllocator()
: single_allocation_warning_count_(0),
total_allocation_warning_count_(0) {}
~CPUAllocator() override {}
string Name() override { return "cpu"; }
void* AllocateRaw(size_t alignment, size_t num_bytes) override {
if (num_bytes > LargeAllocationWarningBytes() &&
single_allocation_warning_count_ < kMaxSingleAllocationWarnings) {
++single_allocation_warning_count_;
LOG(WARNING) << "Allocation of " << num_bytes << " exceeds "
<< 100 * kLargeAllocationWarningThreshold
<< "% of system memory.";
}
void* p = port::AlignedMalloc(num_bytes, alignment);
if (cpu_allocator_collect_stats) {
const std::size_t alloc_size = port::MallocExtension_GetAllocatedSize(p);
mutex_lock l(mu_);
++stats_.num_allocs;
stats_.bytes_in_use += alloc_size;
stats_.peak_bytes_in_use =
std::max<int64>(stats_.peak_bytes_in_use, stats_.bytes_in_use);
stats_.largest_alloc_size =
std::max<int64>(stats_.largest_alloc_size, alloc_size);
if (stats_.bytes_in_use > TotalAllocationWarningBytes() &&
total_allocation_warning_count_ < kMaxTotalAllocationWarnings) {
++total_allocation_warning_count_;
LOG(WARNING) << "Total allocated memory " << stats_.bytes_in_use
<< "exceeds " << 100 * kTotalAllocationWarningThreshold
<< "% of system memory";
}
}
return p;
}
void DeallocateRaw(void* ptr) override {
if (cpu_allocator_collect_stats) {
const std::size_t alloc_size =
port::MallocExtension_GetAllocatedSize(ptr);
mutex_lock l(mu_);
stats_.bytes_in_use -= alloc_size;
}
port::AlignedFree(ptr);
}
absl::optional<AllocatorStats> GetStats() override {
mutex_lock l(mu_);
return stats_;
}
void ClearStats() override {
mutex_lock l(mu_);
stats_.num_allocs = 0;
stats_.peak_bytes_in_use = stats_.bytes_in_use;
stats_.largest_alloc_size = 0;
}
size_t AllocatedSizeSlow(const void* ptr) const override {
return port::MallocExtension_GetAllocatedSize(ptr);
}
private:
mutex mu_;
AllocatorStats stats_ GUARDED_BY(mu_);
// Use <atomic> for single allocations to avoid mutex contention when
// statistics are disabled.
std::atomic<int> single_allocation_warning_count_;
int total_allocation_warning_count_ GUARDED_BY(mu_);
TF_DISALLOW_COPY_AND_ASSIGN(CPUAllocator);
};
class CPUAllocatorFactory : public AllocatorFactory {
public:
Allocator* CreateAllocator() override { return new CPUAllocator; }
SubAllocator* CreateSubAllocator(int numa_node) override {
return new CPUSubAllocator(new CPUAllocator);
}
private:
class CPUSubAllocator : public SubAllocator {
public:
explicit CPUSubAllocator(CPUAllocator* cpu_allocator)
: SubAllocator({}, {}), cpu_allocator_(cpu_allocator) {}
void* Alloc(size_t alignment, size_t num_bytes) override {
return cpu_allocator_->AllocateRaw(alignment, num_bytes);
}
void Free(void* ptr, size_t num_bytes) override {
cpu_allocator_->DeallocateRaw(ptr);
}
private:
CPUAllocator* cpu_allocator_;
};
};
REGISTER_MEM_ALLOCATOR("DefaultCPUAllocator", 100, CPUAllocatorFactory);
} // namespace
Allocator* cpu_allocator_base() {
static Allocator* cpu_alloc =
AllocatorFactoryRegistry::singleton()->GetAllocator();

View File

@ -0,0 +1,176 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include <atomic>
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/framework/allocator_registry.h"
#include "tensorflow/core/framework/tracking_allocator.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/lib/strings/stringprintf.h"
#include "tensorflow/core/platform/mem.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/types.h"
namespace tensorflow {
// If true, cpu allocator collects more stats.
static bool cpu_allocator_collect_stats = false;
void EnableCPUAllocatorStats(bool enable) {
cpu_allocator_collect_stats = enable;
}
bool CPUAllocatorStatsEnabled() { return cpu_allocator_collect_stats; }
static const int kMaxTotalAllocationWarnings = 1;
static const int kMaxSingleAllocationWarnings = 5;
// If cpu_allocator_collect_stats is true, warn when the total allocated memory
// exceeds this threshold.
static const double kTotalAllocationWarningThreshold = 0.5;
// Individual allocations large than this amount will trigger a warning.
static const double kLargeAllocationWarningThreshold = 0.1;
// Cache first invocation to port::AvailableRam, as it can be expensive.
static int64_t LargeAllocationWarningBytes() {
static int64_t value = static_cast<int64>(port::AvailableRam() *
kLargeAllocationWarningThreshold);
return value;
}
static int64_t TotalAllocationWarningBytes() {
static int64_t value = static_cast<int64>(port::AvailableRam() *
kTotalAllocationWarningThreshold);
return value;
}
namespace {
// A default Allocator for CPU devices. ProcessState::GetCPUAllocator() will
// return a different version that may perform better, but may also lack the
// optional stats triggered by the functions above. TODO(tucker): migrate all
// uses of cpu_allocator() except tests to use ProcessState instead.
class CPUAllocator : public Allocator {
public:
CPUAllocator()
: single_allocation_warning_count_(0),
total_allocation_warning_count_(0) {}
~CPUAllocator() override {}
string Name() override { return "cpu"; }
void* AllocateRaw(size_t alignment, size_t num_bytes) override {
if (num_bytes > LargeAllocationWarningBytes() &&
single_allocation_warning_count_ < kMaxSingleAllocationWarnings) {
++single_allocation_warning_count_;
LOG(WARNING) << "Allocation of " << num_bytes << " exceeds "
<< 100 * kLargeAllocationWarningThreshold
<< "% of system memory.";
}
void* p = port::AlignedMalloc(num_bytes, alignment);
if (cpu_allocator_collect_stats) {
const std::size_t alloc_size = port::MallocExtension_GetAllocatedSize(p);
mutex_lock l(mu_);
++stats_.num_allocs;
stats_.bytes_in_use += alloc_size;
stats_.peak_bytes_in_use =
std::max<int64>(stats_.peak_bytes_in_use, stats_.bytes_in_use);
stats_.largest_alloc_size =
std::max<int64>(stats_.largest_alloc_size, alloc_size);
if (stats_.bytes_in_use > TotalAllocationWarningBytes() &&
total_allocation_warning_count_ < kMaxTotalAllocationWarnings) {
++total_allocation_warning_count_;
LOG(WARNING) << "Total allocated memory " << stats_.bytes_in_use
<< "exceeds " << 100 * kTotalAllocationWarningThreshold
<< "% of system memory";
}
}
return p;
}
void DeallocateRaw(void* ptr) override {
if (cpu_allocator_collect_stats) {
const std::size_t alloc_size =
port::MallocExtension_GetAllocatedSize(ptr);
mutex_lock l(mu_);
stats_.bytes_in_use -= alloc_size;
}
port::AlignedFree(ptr);
}
absl::optional<AllocatorStats> GetStats() override {
mutex_lock l(mu_);
return stats_;
}
void ClearStats() override {
mutex_lock l(mu_);
stats_.num_allocs = 0;
stats_.peak_bytes_in_use = stats_.bytes_in_use;
stats_.largest_alloc_size = 0;
}
size_t AllocatedSizeSlow(const void* ptr) const override {
return port::MallocExtension_GetAllocatedSize(ptr);
}
private:
mutex mu_;
AllocatorStats stats_ GUARDED_BY(mu_);
// Use <atomic> for single allocations to avoid mutex contention when
// statistics are disabled.
std::atomic<int> single_allocation_warning_count_;
int total_allocation_warning_count_ GUARDED_BY(mu_);
TF_DISALLOW_COPY_AND_ASSIGN(CPUAllocator);
};
class CPUAllocatorFactory : public AllocatorFactory {
public:
Allocator* CreateAllocator() override { return new CPUAllocator; }
SubAllocator* CreateSubAllocator(int numa_node) override {
return new CPUSubAllocator(new CPUAllocator);
}
private:
class CPUSubAllocator : public SubAllocator {
public:
explicit CPUSubAllocator(CPUAllocator* cpu_allocator)
: SubAllocator({}, {}), cpu_allocator_(cpu_allocator) {}
void* Alloc(size_t alignment, size_t num_bytes) override {
return cpu_allocator_->AllocateRaw(alignment, num_bytes);
}
void Free(void* ptr, size_t num_bytes) override {
cpu_allocator_->DeallocateRaw(ptr);
}
private:
CPUAllocator* cpu_allocator_;
};
};
REGISTER_MEM_ALLOCATOR("DefaultCPUAllocator", 100, CPUAllocatorFactory);
} // namespace
} // namespace tensorflow

View File

@ -687,6 +687,27 @@ cc_library(
],
)
cc_library(
name = "tf_allocator_adapter",
srcs = ["tf_allocator_adapter.cc"],
hdrs = ["tf_allocator_adapter.h"],
deps = [
":device_memory",
":device_memory_allocator",
":platform",
"//tensorflow/core:allocator",
"//tensorflow/core:lib",
"//tensorflow/stream_executor/lib",
"//tensorflow/stream_executor/platform",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/types:optional",
"@com_google_absl//absl/types:span",
],
)
tf_cc_test(
name = "stream_test",
size = "small",

View File

@ -0,0 +1,51 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/stream_executor/tf_allocator_adapter.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/stream_executor/lib/error.h"
namespace stream_executor {
TfAllocatorAdapter::TfAllocatorAdapter(const Platform *platform,
tensorflow::Allocator *wrapped)
: DeviceMemoryAllocator(platform), wrapped_(wrapped) {}
TfAllocatorAdapter::~TfAllocatorAdapter() {}
port::StatusOr<OwningDeviceMemory> TfAllocatorAdapter::Allocate(
int device_ordinal, uint64 size, bool retry_on_failure) {
tensorflow::AllocationAttributes attrs;
attrs.no_retry_on_failure = !retry_on_failure;
void *data = nullptr;
if (size != 0) {
data = wrapped_->AllocateRaw(tensorflow::Allocator::kAllocatorAlignment,
size, attrs);
if (data == nullptr) {
return tensorflow::errors::ResourceExhausted(
"Out of memory while trying to allocate ", size, " bytes.");
}
}
return OwningDeviceMemory(DeviceMemoryBase(data, size), device_ordinal, this);
}
port::Status TfAllocatorAdapter::Deallocate(int device_ordinal,
DeviceMemoryBase mem) {
wrapped_->DeallocateRaw(mem.opaque());
return port::Status::OK();
}
} // namespace stream_executor

View File

@ -0,0 +1,100 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_STREAM_EXECUTOR_TF_ALLOCATOR_ADAPTER_H_
#define TENSORFLOW_STREAM_EXECUTOR_TF_ALLOCATOR_ADAPTER_H_
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/stream_executor/device_memory.h"
#include "tensorflow/stream_executor/device_memory_allocator.h"
#include "tensorflow/stream_executor/lib/statusor.h"
#include "tensorflow/stream_executor/platform.h"
namespace stream_executor {
// Adapter class that wraps a Tensorflow allocator.
//
// Assumes that the Tensorflow allocator permits asynchronous deallocation:
// see comment on `AllowsAsynchronousDeallocation()`.
class TfAllocatorAdapter : public DeviceMemoryAllocator {
public:
TfAllocatorAdapter(const Platform *platform, tensorflow::Allocator *wrapped);
~TfAllocatorAdapter() override;
port::StatusOr<OwningDeviceMemory> Allocate(int device_ordinal, uint64 size,
bool retry_on_failure) override;
port::Status Deallocate(int device_ordinal, DeviceMemoryBase mem) override;
// The Tensorflow BFC allocator used on GPU allows host-side deallocation
// before GPU execution takes place. Tensorflow uses the ordering of the main
// compute stream to enforce a happens-before relationship between a memory
// allocation and code that reuses the same memory. If Tensorflow adds
// support for multiple GPU streams or allocators with different ordering
// requirements, this code may need to change.
// (This attribute has no effect on CPU.)
bool AllowsAsynchronousDeallocation() const override { return true; }
private:
tensorflow::Allocator *wrapped_;
};
// Adapter class that wraps per-device TF allocators as an XLA allocator.
// Assumes that the Tensorflow allocator permits asynchronous deallocation;
// see comment on `AllowsAsynchronousDeallocation()`.
class MultiDeviceAdapter : public DeviceMemoryAllocator {
public:
MultiDeviceAdapter(
const Platform *platform,
std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators)
: DeviceMemoryAllocator(platform),
tf_allocators_(std::move(tf_allocators)) {
for (const auto &tf_allocator : tf_allocators_) {
per_device_allocators_.emplace_back(platform, tf_allocator.get());
}
}
port::StatusOr<OwningDeviceMemory> Allocate(int device_ordinal, uint64 size,
bool retry_on_failure) override {
CHECK_LT(device_ordinal, per_device_allocators_.size());
return per_device_allocators_[device_ordinal].Allocate(device_ordinal, size,
retry_on_failure);
}
port::Status Deallocate(int device_ordinal, DeviceMemoryBase mem) override {
CHECK_LT(device_ordinal, per_device_allocators_.size());
return per_device_allocators_[device_ordinal].Deallocate(device_ordinal,
mem);
}
// The Tensorflow BFC allocator used on GPU allows host-side deallocation
// before GPU execution takes place. Tensorflow uses the ordering of the main
// compute stream to enforce a happens-before relationship between a memory
// allocation and code that reuses the same memory. If Tensorflow adds
// support for multiple GPU streams or allocators with different ordering
// requirements, this code may need to change.
// (This attribute has no effect on CPU.)
bool AllowsAsynchronousDeallocation() const override { return true; }
private:
std::vector<TfAllocatorAdapter> per_device_allocators_;
// The wrapped TF allocators backing per_device_allocators_ (XlaAllocator does
// not take ownership of its underlying Allocator).
std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators_;
};
} // namespace stream_executor
#endif // TENSORFLOW_STREAM_EXECUTOR_TF_ALLOCATOR_ADAPTER_H_