From 9067ab24bf4044bf63abb62f7e6122d9491b69d0 Mon Sep 17 00:00:00 2001 From: George Karpenkov <cheshire@google.com> Date: Wed, 22 May 2019 12:49:19 -0700 Subject: [PATCH] Move XlaAllocator to stream_executor and rename it to TfAllocatorAdapter. The new package allows usage from SE, and the new name reflects what it is doing. PiperOrigin-RevId: 249501148 --- tensorflow/compiler/jit/BUILD | 2 - tensorflow/compiler/jit/kernels/BUILD | 1 + tensorflow/compiler/jit/kernels/xla_ops.cc | 4 +- tensorflow/compiler/jit/kernels/xla_ops.h | 15 +- tensorflow/compiler/jit/xla_launch_util.cc | 26 --- tensorflow/compiler/jit/xla_launch_util.h | 69 ------- tensorflow/compiler/xla/python/BUILD | 2 +- .../compiler/xla/python/local_client.cc | 11 +- tensorflow/core/BUILD | 28 ++- tensorflow/core/framework/allocator.cc | 143 -------------- .../core/framework/cpu_allocator_impl.cc | 176 ++++++++++++++++++ tensorflow/stream_executor/BUILD | 21 +++ .../stream_executor/tf_allocator_adapter.cc | 51 +++++ .../stream_executor/tf_allocator_adapter.h | 100 ++++++++++ 14 files changed, 392 insertions(+), 257 deletions(-) create mode 100644 tensorflow/core/framework/cpu_allocator_impl.cc create mode 100644 tensorflow/stream_executor/tf_allocator_adapter.cc create mode 100644 tensorflow/stream_executor/tf_allocator_adapter.h diff --git a/tensorflow/compiler/jit/BUILD b/tensorflow/compiler/jit/BUILD index 1735d429f5a..af99a678bc2 100644 --- a/tensorflow/compiler/jit/BUILD +++ b/tensorflow/compiler/jit/BUILD @@ -258,10 +258,8 @@ cc_library( name = "xla_launch_util", srcs = ["xla_launch_util.cc"], hdrs = ["xla_launch_util.h"], - # TODO(skyewm): remove this once XlaAllocator is factored out. visibility = [ ":internal", - "//tensorflow/compiler/xla/python:__pkg__", ], deps = [ ":common", diff --git a/tensorflow/compiler/jit/kernels/BUILD b/tensorflow/compiler/jit/kernels/BUILD index 3524da23fb3..19b6c657796 100644 --- a/tensorflow/compiler/jit/kernels/BUILD +++ b/tensorflow/compiler/jit/kernels/BUILD @@ -29,6 +29,7 @@ cc_library( "//tensorflow/core:lib", "//tensorflow/core:state_ops_op_lib", "//tensorflow/core:stream_executor_no_cuda", + "//tensorflow/stream_executor:tf_allocator_adapter", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/memory", ], diff --git a/tensorflow/compiler/jit/kernels/xla_ops.cc b/tensorflow/compiler/jit/kernels/xla_ops.cc index 6df0991e354..e825a77b1d1 100644 --- a/tensorflow/compiler/jit/kernels/xla_ops.cc +++ b/tensorflow/compiler/jit/kernels/xla_ops.cc @@ -61,7 +61,7 @@ XlaPlatformInfo PlatformInfoFromContext(OpKernelConstruction* ctx) { DeviceType device_type = ctx->device_type(); se::Platform::Id platform_id = nullptr; const XlaDevice::Metadata* xla_device_metadata = nullptr; - std::unique_ptr<XlaAllocator> xla_allocator; + std::unique_ptr<se::TfAllocatorAdapter> xla_allocator; se::DeviceMemoryAllocator* device_allocator = nullptr; if (ctx->device_type() == DeviceType(DEVICE_CPU)) { @@ -93,7 +93,7 @@ XlaPlatformInfo PlatformInfoFromContext(OpKernelConstruction* ctx) { se::MultiPlatformManager::PlatformWithId(platform_id); OP_REQUIRES_OK_RETURN(ctx, XlaPlatformInfo(), maybe_platform.status()); - xla_allocator = absl::make_unique<XlaAllocator>( + xla_allocator = absl::make_unique<se::TfAllocatorAdapter>( maybe_platform.ValueOrDie(), ctx->device()->GetAllocator({})); } diff --git a/tensorflow/compiler/jit/kernels/xla_ops.h b/tensorflow/compiler/jit/kernels/xla_ops.h index eaa686780e4..3a1009ec8a7 100644 --- a/tensorflow/compiler/jit/kernels/xla_ops.h +++ b/tensorflow/compiler/jit/kernels/xla_ops.h @@ -27,6 +27,7 @@ limitations under the License. #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/platform/macros.h" #include "tensorflow/core/util/stream_executor_util.h" +#include "tensorflow/stream_executor/tf_allocator_adapter.h" namespace tensorflow { @@ -36,11 +37,11 @@ class XlaPlatformInfo { public: XlaPlatformInfo() : device_type_("") {} XlaPlatformInfo(XlaPlatformInfo&&) = default; - explicit XlaPlatformInfo(const DeviceType device_type, - se::Platform::Id platform_id, - const XlaDevice::Metadata* xla_device_metadata, - std::unique_ptr<XlaAllocator> xla_allocator, - se::DeviceMemoryAllocator* device_allocator) + explicit XlaPlatformInfo( + const DeviceType device_type, se::Platform::Id platform_id, + const XlaDevice::Metadata* xla_device_metadata, + std::unique_ptr<se::TfAllocatorAdapter> xla_allocator, + se::DeviceMemoryAllocator* device_allocator) : device_type_(device_type), platform_id_(platform_id), xla_device_metadata_(xla_device_metadata), @@ -84,8 +85,8 @@ class XlaPlatformInfo { // then device_allocator_ is the xla::Backend's memory allocator and // xla_allocator_ is null. If the op is placed on a regular CPU or GPU device // then device_allocator_ is null and xla_allocator_ points to an appropriate - // XlaAllocator instance. - std::unique_ptr<XlaAllocator> xla_allocator_; + // se::TfAllocatorAdapter instance. + std::unique_ptr<se::TfAllocatorAdapter> xla_allocator_; se::DeviceMemoryAllocator* device_allocator_; TF_DISALLOW_COPY_AND_ASSIGN(XlaPlatformInfo); diff --git a/tensorflow/compiler/jit/xla_launch_util.cc b/tensorflow/compiler/jit/xla_launch_util.cc index 3bb698b33d6..d66c80fea90 100644 --- a/tensorflow/compiler/jit/xla_launch_util.cc +++ b/tensorflow/compiler/jit/xla_launch_util.cc @@ -167,32 +167,6 @@ Status SnapshotResourceVariables(OpKernelContext* ctx, return Status::OK(); } -XlaAllocator::XlaAllocator(const se::Platform* platform, Allocator* wrapped) - : se::DeviceMemoryAllocator(platform), wrapped_(wrapped) {} - -XlaAllocator::~XlaAllocator() {} - -xla::StatusOr<se::OwningDeviceMemory> XlaAllocator::Allocate( - int device_ordinal, uint64 size, bool retry_on_failure) { - AllocationAttributes attrs; - attrs.no_retry_on_failure = !retry_on_failure; - void* data = nullptr; - if (size != 0) { - data = wrapped_->AllocateRaw(Allocator::kAllocatorAlignment, size, attrs); - if (data == nullptr) { - return errors::ResourceExhausted( - "Out of memory while trying to allocate ", size, " bytes."); - } - } - return se::OwningDeviceMemory(se::DeviceMemoryBase(data, size), - device_ordinal, this); -} - -Status XlaAllocator::Deallocate(int device_ordinal, se::DeviceMemoryBase mem) { - wrapped_->DeallocateRaw(mem.opaque()); - return Status::OK(); -} - XlaComputationLaunchContext::XlaComputationLaunchContext( xla::LocalClient* client, se::DeviceMemoryAllocator* xla_allocator, bool allocate_xla_tensors, bool use_multiple_streams) diff --git a/tensorflow/compiler/jit/xla_launch_util.h b/tensorflow/compiler/jit/xla_launch_util.h index 4cb020ffe34..429ff0a065c 100644 --- a/tensorflow/compiler/jit/xla_launch_util.h +++ b/tensorflow/compiler/jit/xla_launch_util.h @@ -32,7 +32,6 @@ limitations under the License. #include "tensorflow/stream_executor/device_memory_allocator.h" namespace tensorflow { -class XlaAllocator; // Struct that represents a possibly-absent Tensor. struct OptionalTensor { @@ -104,74 +103,6 @@ class VariableInfo { Status LockVariables(absl::Span<VariableInfo> variables) EXCLUSIVE_LOCK_FUNCTION(); -// Adapter class that wraps a Tensorflow allocator as an XLA allocator. -// Assumes that the Tensorflow allocator permits asynchronous deallocation: -// see comment on `AllowsAsynchronousDeallocation()`. -class XlaAllocator : public se::DeviceMemoryAllocator { - public: - XlaAllocator(const se::Platform* platform, Allocator* wrapped); - ~XlaAllocator() override; - xla::StatusOr<se::OwningDeviceMemory> Allocate( - int device_ordinal, uint64 size, bool retry_on_failure) override; - Status Deallocate(int device_ordinal, se::DeviceMemoryBase mem) override; - - // The Tensorflow BFC allocator used on GPU allows host-side deallocation - // before GPU execution takes place. Tensorflow uses the ordering of the main - // compute stream to enforce a happens-before relationship between a memory - // allocation and code that reuses the same memory. If Tensorflow adds - // support for multiple GPU streams or allocators with different ordering - // requirements, this code may need to change. - // (This attribute has no effect on CPU.) - bool AllowsAsynchronousDeallocation() const override { return true; } - - private: - Allocator* wrapped_; -}; - -// Adapter class that wraps per-device TF allocators as an XLA allocator. -// Assumes that the Tensorflow allocator permits asynchronous deallocation; -// see comment on `AllowsAsynchronousDeallocation()`. -class MultiDeviceAdapter : public se::DeviceMemoryAllocator { - public: - MultiDeviceAdapter( - const se::Platform* platform, - std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators) - : DeviceMemoryAllocator(platform), - tf_allocators_(std::move(tf_allocators)) { - for (const auto& tf_allocator : tf_allocators_) { - per_device_allocators_.emplace_back(platform, tf_allocator.get()); - } - } - - xla::StatusOr<se::OwningDeviceMemory> Allocate( - int device_ordinal, uint64 size, bool retry_on_failure) override { - CHECK_LT(device_ordinal, per_device_allocators_.size()); - return per_device_allocators_[device_ordinal].Allocate(device_ordinal, size, - retry_on_failure); - } - - Status Deallocate(int device_ordinal, se::DeviceMemoryBase mem) override { - CHECK_LT(device_ordinal, per_device_allocators_.size()); - return per_device_allocators_[device_ordinal].Deallocate(device_ordinal, - mem); - } - - // The Tensorflow BFC allocator used on GPU allows host-side deallocation - // before GPU execution takes place. Tensorflow uses the ordering of the main - // compute stream to enforce a happens-before relationship between a memory - // allocation and code that reuses the same memory. If Tensorflow adds - // support for multiple GPU streams or allocators with different ordering - // requirements, this code may need to change. - // (This attribute has no effect on CPU.) - bool AllowsAsynchronousDeallocation() const override { return true; } - - private: - std::vector<tensorflow::XlaAllocator> per_device_allocators_; - // The wrapped TF allocators backing per_device_allocators_ (XlaAllocator does - // not take ownership of its underlying Allocator). - std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators_; -}; - // Helper class to perform the marshalling of TensorFlow inputs and outputs to // ShapedBuffers suitable for passing to an XLA computation. class XlaComputationLaunchContext { diff --git a/tensorflow/compiler/xla/python/BUILD b/tensorflow/compiler/xla/python/BUILD index 45a3a264fd6..d628d961489 100644 --- a/tensorflow/compiler/xla/python/BUILD +++ b/tensorflow/compiler/xla/python/BUILD @@ -145,7 +145,6 @@ cc_library( ":shared_device_buffer", ":types", ":worker_thread", - "//tensorflow/compiler/jit:xla_launch_util", "//tensorflow/compiler/xla:executable_run_options", "//tensorflow/compiler/xla:literal", "//tensorflow/compiler/xla:literal_util", @@ -166,6 +165,7 @@ cc_library( "//tensorflow/core:gpu_mem_allocator", "//tensorflow/core:lib", "//tensorflow/core/profiler/lib:traceme", + "//tensorflow/stream_executor:tf_allocator_adapter", "@com_google_absl//absl/memory", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/synchronization", diff --git a/tensorflow/compiler/xla/python/local_client.cc b/tensorflow/compiler/xla/python/local_client.cc index 26b3af298e1..6a0802ea9da 100644 --- a/tensorflow/compiler/xla/python/local_client.cc +++ b/tensorflow/compiler/xla/python/local_client.cc @@ -71,7 +71,6 @@ limitations under the License. #include "absl/synchronization/notification.h" #include "absl/time/time.h" #include "include/pybind11/pybind11.h" -#include "tensorflow/compiler/jit/xla_launch_util.h" #include "tensorflow/compiler/xla/client/client_library.h" #include "tensorflow/compiler/xla/client/xla_computation.h" #include "tensorflow/compiler/xla/executable_run_options.h" @@ -88,6 +87,7 @@ limitations under the License. #include "tensorflow/core/common_runtime/gpu/gpu_mem_allocator.h" #include "tensorflow/core/platform/types.h" #include "tensorflow/core/profiler/lib/traceme.h" +#include "tensorflow/stream_executor/tf_allocator_adapter.h" namespace xla { @@ -174,9 +174,8 @@ void Device::ThenExecuteOnWorkerThread(se::Stream* stream, [this, callback]() { worker_thread_->Schedule(std::move(callback)); }); } -static StatusOr<std::unique_ptr<tensorflow::MultiDeviceAdapter>> -CreateBFCAllocator(se::Platform* platform, LocalClient* client, - double memory_fraction) { +static StatusOr<std::unique_ptr<se::MultiDeviceAdapter>> CreateBFCAllocator( + se::Platform* platform, LocalClient* client, double memory_fraction) { CHECK_GT(client->backend().device_count(), 0); std::vector<std::unique_ptr<tensorflow::Allocator>> allocators; for (se::StreamExecutor* executor : client->backend().stream_executors()) { @@ -203,8 +202,8 @@ CreateBFCAllocator(se::Platform* platform, LocalClient* client, absl::StrCat("GPU_", device_ordinal, "_bfc")); allocators.emplace_back(std::move(gpu_bfc_allocator)); } - return absl::make_unique<tensorflow::MultiDeviceAdapter>( - platform, std::move(allocators)); + return absl::make_unique<se::MultiDeviceAdapter>(platform, + std::move(allocators)); } StatusOr<std::shared_ptr<PyLocalClient>> PyLocalClient::Get( diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD index 1862176764a..357d0795ff8 100644 --- a/tensorflow/core/BUILD +++ b/tensorflow/core/BUILD @@ -1000,7 +1000,6 @@ cc_library( name = "allocator", srcs = [ "framework/allocator.cc", - "framework/allocator_registry.cc", "framework/allocator_registry.h", "framework/numeric_types.h", "framework/tracking_allocator.cc", @@ -1012,12 +1011,37 @@ cc_library( ], features = ["parse_headers"], visibility = ["//visibility:public"], + deps = [ + ":lib", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/types:optional", + "//third_party/eigen3", + ] + if_static(extra_deps = [":allocator_registry_impl"]), + alwayslink = 1, +) + +# This target will be included in libtensorflow_framework.so via the +# framework_internal_impl target. +# All other dependencies on this target need to go through if_static guard, +# as otherwise duplicate registration in the registry will cause crashes. +cc_library( + name = "allocator_registry_impl", + srcs = [ + "framework/allocator.h", + "framework/allocator_registry.cc", + "framework/allocator_registry.h", + "framework/cpu_allocator_impl.cc", + "framework/numeric_types.h", + "framework/tracking_allocator.h", + "framework/type_traits.h", + ], deps = [ ":lib", "//third_party/eigen3", "@com_google_absl//absl/strings", "@com_google_absl//absl/types:optional", ], + alwayslink = 1, ) cc_library( @@ -2873,6 +2897,7 @@ tf_cuda_library( "**/*test*", "**/*main.cc", "framework/allocator.cc", + "framework/cpu_allocator_impl.cc", "framework/allocator_registry.cc", "framework/tracking_allocator.cc", "example/example_parser_configuration.*", @@ -2906,6 +2931,7 @@ tf_cuda_library( ], }), deps = [ + ":allocator_registry_impl", ":allocator", ":feature_util", ":lib", diff --git a/tensorflow/core/framework/allocator.cc b/tensorflow/core/framework/allocator.cc index 7ab87d1f8a4..1f1d29f6394 100644 --- a/tensorflow/core/framework/allocator.cc +++ b/tensorflow/core/framework/allocator.cc @@ -42,38 +42,9 @@ constexpr size_t Allocator::kAllocatorAlignment; Allocator::~Allocator() {} -// If true, cpu allocator collects more stats. -static bool cpu_allocator_collect_stats = false; // If true, cpu allocator collects full stats. static bool cpu_allocator_collect_full_stats = false; -// Individual allocations large than this amount will trigger a warning. -static const double kLargeAllocationWarningThreshold = 0.1; - -// If cpu_allocator_collect_stats is true, warn when the total allocated memory -// exceeds this threshold. -static const double kTotalAllocationWarningThreshold = 0.5; - -static const int kMaxSingleAllocationWarnings = 5; -static const int kMaxTotalAllocationWarnings = 1; - -// Cache first invocation to port::AvailableRam, as it can be expensive. -static int64_t LargeAllocationWarningBytes() { - static int64_t value = static_cast<int64>(port::AvailableRam() * - kLargeAllocationWarningThreshold); - return value; -} - -static int64_t TotalAllocationWarningBytes() { - static int64_t value = static_cast<int64>(port::AvailableRam() * - kTotalAllocationWarningThreshold); - return value; -} - -void EnableCPUAllocatorStats(bool enable) { - cpu_allocator_collect_stats = enable; -} -bool CPUAllocatorStatsEnabled() { return cpu_allocator_collect_stats; } void EnableCPUAllocatorFullStats(bool enable) { cpu_allocator_collect_full_stats = enable; } @@ -85,120 +56,6 @@ string AllocatorAttributes::DebugString() const { " gpu_compatible=", gpu_compatible(), ")"); } -namespace { -// A default Allocator for CPU devices. ProcessState::GetCPUAllocator() will -// return a different version that may perform better, but may also lack the -// optional stats triggered by the functions above. TODO(tucker): migrate all -// uses of cpu_allocator() except tests to use ProcessState instead. -class CPUAllocator : public Allocator { - public: - CPUAllocator() - : single_allocation_warning_count_(0), - total_allocation_warning_count_(0) {} - - ~CPUAllocator() override {} - - string Name() override { return "cpu"; } - - void* AllocateRaw(size_t alignment, size_t num_bytes) override { - if (num_bytes > LargeAllocationWarningBytes() && - single_allocation_warning_count_ < kMaxSingleAllocationWarnings) { - ++single_allocation_warning_count_; - LOG(WARNING) << "Allocation of " << num_bytes << " exceeds " - << 100 * kLargeAllocationWarningThreshold - << "% of system memory."; - } - - void* p = port::AlignedMalloc(num_bytes, alignment); - if (cpu_allocator_collect_stats) { - const std::size_t alloc_size = port::MallocExtension_GetAllocatedSize(p); - mutex_lock l(mu_); - ++stats_.num_allocs; - stats_.bytes_in_use += alloc_size; - stats_.peak_bytes_in_use = - std::max<int64>(stats_.peak_bytes_in_use, stats_.bytes_in_use); - stats_.largest_alloc_size = - std::max<int64>(stats_.largest_alloc_size, alloc_size); - - if (stats_.bytes_in_use > TotalAllocationWarningBytes() && - total_allocation_warning_count_ < kMaxTotalAllocationWarnings) { - ++total_allocation_warning_count_; - LOG(WARNING) << "Total allocated memory " << stats_.bytes_in_use - << "exceeds " << 100 * kTotalAllocationWarningThreshold - << "% of system memory"; - } - } - return p; - } - - void DeallocateRaw(void* ptr) override { - if (cpu_allocator_collect_stats) { - const std::size_t alloc_size = - port::MallocExtension_GetAllocatedSize(ptr); - mutex_lock l(mu_); - stats_.bytes_in_use -= alloc_size; - } - port::AlignedFree(ptr); - } - - absl::optional<AllocatorStats> GetStats() override { - mutex_lock l(mu_); - return stats_; - } - - void ClearStats() override { - mutex_lock l(mu_); - stats_.num_allocs = 0; - stats_.peak_bytes_in_use = stats_.bytes_in_use; - stats_.largest_alloc_size = 0; - } - - size_t AllocatedSizeSlow(const void* ptr) const override { - return port::MallocExtension_GetAllocatedSize(ptr); - } - - private: - mutex mu_; - AllocatorStats stats_ GUARDED_BY(mu_); - - // Use <atomic> for single allocations to avoid mutex contention when - // statistics are disabled. - std::atomic<int> single_allocation_warning_count_; - int total_allocation_warning_count_ GUARDED_BY(mu_); - - TF_DISALLOW_COPY_AND_ASSIGN(CPUAllocator); -}; - -class CPUAllocatorFactory : public AllocatorFactory { - public: - Allocator* CreateAllocator() override { return new CPUAllocator; } - - SubAllocator* CreateSubAllocator(int numa_node) override { - return new CPUSubAllocator(new CPUAllocator); - } - - private: - class CPUSubAllocator : public SubAllocator { - public: - explicit CPUSubAllocator(CPUAllocator* cpu_allocator) - : SubAllocator({}, {}), cpu_allocator_(cpu_allocator) {} - - void* Alloc(size_t alignment, size_t num_bytes) override { - return cpu_allocator_->AllocateRaw(alignment, num_bytes); - } - - void Free(void* ptr, size_t num_bytes) override { - cpu_allocator_->DeallocateRaw(ptr); - } - - private: - CPUAllocator* cpu_allocator_; - }; -}; - -REGISTER_MEM_ALLOCATOR("DefaultCPUAllocator", 100, CPUAllocatorFactory); -} // namespace - Allocator* cpu_allocator_base() { static Allocator* cpu_alloc = AllocatorFactoryRegistry::singleton()->GetAllocator(); diff --git a/tensorflow/core/framework/cpu_allocator_impl.cc b/tensorflow/core/framework/cpu_allocator_impl.cc new file mode 100644 index 00000000000..e2bab9900e1 --- /dev/null +++ b/tensorflow/core/framework/cpu_allocator_impl.cc @@ -0,0 +1,176 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include <atomic> + +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/framework/allocator_registry.h" +#include "tensorflow/core/framework/tracking_allocator.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/lib/strings/stringprintf.h" +#include "tensorflow/core/platform/mem.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/types.h" + +namespace tensorflow { + +// If true, cpu allocator collects more stats. +static bool cpu_allocator_collect_stats = false; + +void EnableCPUAllocatorStats(bool enable) { + cpu_allocator_collect_stats = enable; +} +bool CPUAllocatorStatsEnabled() { return cpu_allocator_collect_stats; } + +static const int kMaxTotalAllocationWarnings = 1; + +static const int kMaxSingleAllocationWarnings = 5; + +// If cpu_allocator_collect_stats is true, warn when the total allocated memory +// exceeds this threshold. +static const double kTotalAllocationWarningThreshold = 0.5; + +// Individual allocations large than this amount will trigger a warning. +static const double kLargeAllocationWarningThreshold = 0.1; + +// Cache first invocation to port::AvailableRam, as it can be expensive. +static int64_t LargeAllocationWarningBytes() { + static int64_t value = static_cast<int64>(port::AvailableRam() * + kLargeAllocationWarningThreshold); + return value; +} + +static int64_t TotalAllocationWarningBytes() { + static int64_t value = static_cast<int64>(port::AvailableRam() * + kTotalAllocationWarningThreshold); + return value; +} + +namespace { + +// A default Allocator for CPU devices. ProcessState::GetCPUAllocator() will +// return a different version that may perform better, but may also lack the +// optional stats triggered by the functions above. TODO(tucker): migrate all +// uses of cpu_allocator() except tests to use ProcessState instead. +class CPUAllocator : public Allocator { + public: + CPUAllocator() + : single_allocation_warning_count_(0), + total_allocation_warning_count_(0) {} + + ~CPUAllocator() override {} + + string Name() override { return "cpu"; } + + void* AllocateRaw(size_t alignment, size_t num_bytes) override { + if (num_bytes > LargeAllocationWarningBytes() && + single_allocation_warning_count_ < kMaxSingleAllocationWarnings) { + ++single_allocation_warning_count_; + LOG(WARNING) << "Allocation of " << num_bytes << " exceeds " + << 100 * kLargeAllocationWarningThreshold + << "% of system memory."; + } + + void* p = port::AlignedMalloc(num_bytes, alignment); + if (cpu_allocator_collect_stats) { + const std::size_t alloc_size = port::MallocExtension_GetAllocatedSize(p); + mutex_lock l(mu_); + ++stats_.num_allocs; + stats_.bytes_in_use += alloc_size; + stats_.peak_bytes_in_use = + std::max<int64>(stats_.peak_bytes_in_use, stats_.bytes_in_use); + stats_.largest_alloc_size = + std::max<int64>(stats_.largest_alloc_size, alloc_size); + + if (stats_.bytes_in_use > TotalAllocationWarningBytes() && + total_allocation_warning_count_ < kMaxTotalAllocationWarnings) { + ++total_allocation_warning_count_; + LOG(WARNING) << "Total allocated memory " << stats_.bytes_in_use + << "exceeds " << 100 * kTotalAllocationWarningThreshold + << "% of system memory"; + } + } + return p; + } + + void DeallocateRaw(void* ptr) override { + if (cpu_allocator_collect_stats) { + const std::size_t alloc_size = + port::MallocExtension_GetAllocatedSize(ptr); + mutex_lock l(mu_); + stats_.bytes_in_use -= alloc_size; + } + port::AlignedFree(ptr); + } + + absl::optional<AllocatorStats> GetStats() override { + mutex_lock l(mu_); + return stats_; + } + + void ClearStats() override { + mutex_lock l(mu_); + stats_.num_allocs = 0; + stats_.peak_bytes_in_use = stats_.bytes_in_use; + stats_.largest_alloc_size = 0; + } + + size_t AllocatedSizeSlow(const void* ptr) const override { + return port::MallocExtension_GetAllocatedSize(ptr); + } + + private: + mutex mu_; + AllocatorStats stats_ GUARDED_BY(mu_); + + // Use <atomic> for single allocations to avoid mutex contention when + // statistics are disabled. + std::atomic<int> single_allocation_warning_count_; + int total_allocation_warning_count_ GUARDED_BY(mu_); + + TF_DISALLOW_COPY_AND_ASSIGN(CPUAllocator); +}; + +class CPUAllocatorFactory : public AllocatorFactory { + public: + Allocator* CreateAllocator() override { return new CPUAllocator; } + + SubAllocator* CreateSubAllocator(int numa_node) override { + return new CPUSubAllocator(new CPUAllocator); + } + + private: + class CPUSubAllocator : public SubAllocator { + public: + explicit CPUSubAllocator(CPUAllocator* cpu_allocator) + : SubAllocator({}, {}), cpu_allocator_(cpu_allocator) {} + + void* Alloc(size_t alignment, size_t num_bytes) override { + return cpu_allocator_->AllocateRaw(alignment, num_bytes); + } + + void Free(void* ptr, size_t num_bytes) override { + cpu_allocator_->DeallocateRaw(ptr); + } + + private: + CPUAllocator* cpu_allocator_; + }; +}; + +REGISTER_MEM_ALLOCATOR("DefaultCPUAllocator", 100, CPUAllocatorFactory); +} // namespace + +} // namespace tensorflow diff --git a/tensorflow/stream_executor/BUILD b/tensorflow/stream_executor/BUILD index a09091626e8..e43aaf526cd 100644 --- a/tensorflow/stream_executor/BUILD +++ b/tensorflow/stream_executor/BUILD @@ -687,6 +687,27 @@ cc_library( ], ) +cc_library( + name = "tf_allocator_adapter", + srcs = ["tf_allocator_adapter.cc"], + hdrs = ["tf_allocator_adapter.h"], + deps = [ + ":device_memory", + ":device_memory_allocator", + ":platform", + "//tensorflow/core:allocator", + "//tensorflow/core:lib", + "//tensorflow/stream_executor/lib", + "//tensorflow/stream_executor/platform", + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/strings:str_format", + "@com_google_absl//absl/synchronization", + "@com_google_absl//absl/types:optional", + "@com_google_absl//absl/types:span", + ], +) + tf_cc_test( name = "stream_test", size = "small", diff --git a/tensorflow/stream_executor/tf_allocator_adapter.cc b/tensorflow/stream_executor/tf_allocator_adapter.cc new file mode 100644 index 00000000000..892673d63e6 --- /dev/null +++ b/tensorflow/stream_executor/tf_allocator_adapter.cc @@ -0,0 +1,51 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/stream_executor/tf_allocator_adapter.h" + +#include "tensorflow/core/lib/core/errors.h" +#include "tensorflow/stream_executor/lib/error.h" + +namespace stream_executor { + +TfAllocatorAdapter::TfAllocatorAdapter(const Platform *platform, + tensorflow::Allocator *wrapped) + : DeviceMemoryAllocator(platform), wrapped_(wrapped) {} + +TfAllocatorAdapter::~TfAllocatorAdapter() {} + +port::StatusOr<OwningDeviceMemory> TfAllocatorAdapter::Allocate( + int device_ordinal, uint64 size, bool retry_on_failure) { + tensorflow::AllocationAttributes attrs; + attrs.no_retry_on_failure = !retry_on_failure; + void *data = nullptr; + if (size != 0) { + data = wrapped_->AllocateRaw(tensorflow::Allocator::kAllocatorAlignment, + size, attrs); + if (data == nullptr) { + return tensorflow::errors::ResourceExhausted( + "Out of memory while trying to allocate ", size, " bytes."); + } + } + return OwningDeviceMemory(DeviceMemoryBase(data, size), device_ordinal, this); +} + +port::Status TfAllocatorAdapter::Deallocate(int device_ordinal, + DeviceMemoryBase mem) { + wrapped_->DeallocateRaw(mem.opaque()); + return port::Status::OK(); +} + +} // namespace stream_executor diff --git a/tensorflow/stream_executor/tf_allocator_adapter.h b/tensorflow/stream_executor/tf_allocator_adapter.h new file mode 100644 index 00000000000..3ab15d2ae66 --- /dev/null +++ b/tensorflow/stream_executor/tf_allocator_adapter.h @@ -0,0 +1,100 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_STREAM_EXECUTOR_TF_ALLOCATOR_ADAPTER_H_ +#define TENSORFLOW_STREAM_EXECUTOR_TF_ALLOCATOR_ADAPTER_H_ + +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/stream_executor/device_memory.h" +#include "tensorflow/stream_executor/device_memory_allocator.h" +#include "tensorflow/stream_executor/lib/statusor.h" +#include "tensorflow/stream_executor/platform.h" + +namespace stream_executor { + +// Adapter class that wraps a Tensorflow allocator. +// +// Assumes that the Tensorflow allocator permits asynchronous deallocation: +// see comment on `AllowsAsynchronousDeallocation()`. +class TfAllocatorAdapter : public DeviceMemoryAllocator { + public: + TfAllocatorAdapter(const Platform *platform, tensorflow::Allocator *wrapped); + ~TfAllocatorAdapter() override; + + port::StatusOr<OwningDeviceMemory> Allocate(int device_ordinal, uint64 size, + bool retry_on_failure) override; + + port::Status Deallocate(int device_ordinal, DeviceMemoryBase mem) override; + + // The Tensorflow BFC allocator used on GPU allows host-side deallocation + // before GPU execution takes place. Tensorflow uses the ordering of the main + // compute stream to enforce a happens-before relationship between a memory + // allocation and code that reuses the same memory. If Tensorflow adds + // support for multiple GPU streams or allocators with different ordering + // requirements, this code may need to change. + // (This attribute has no effect on CPU.) + bool AllowsAsynchronousDeallocation() const override { return true; } + + private: + tensorflow::Allocator *wrapped_; +}; + +// Adapter class that wraps per-device TF allocators as an XLA allocator. +// Assumes that the Tensorflow allocator permits asynchronous deallocation; +// see comment on `AllowsAsynchronousDeallocation()`. +class MultiDeviceAdapter : public DeviceMemoryAllocator { + public: + MultiDeviceAdapter( + const Platform *platform, + std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators) + : DeviceMemoryAllocator(platform), + tf_allocators_(std::move(tf_allocators)) { + for (const auto &tf_allocator : tf_allocators_) { + per_device_allocators_.emplace_back(platform, tf_allocator.get()); + } + } + + port::StatusOr<OwningDeviceMemory> Allocate(int device_ordinal, uint64 size, + bool retry_on_failure) override { + CHECK_LT(device_ordinal, per_device_allocators_.size()); + return per_device_allocators_[device_ordinal].Allocate(device_ordinal, size, + retry_on_failure); + } + + port::Status Deallocate(int device_ordinal, DeviceMemoryBase mem) override { + CHECK_LT(device_ordinal, per_device_allocators_.size()); + return per_device_allocators_[device_ordinal].Deallocate(device_ordinal, + mem); + } + + // The Tensorflow BFC allocator used on GPU allows host-side deallocation + // before GPU execution takes place. Tensorflow uses the ordering of the main + // compute stream to enforce a happens-before relationship between a memory + // allocation and code that reuses the same memory. If Tensorflow adds + // support for multiple GPU streams or allocators with different ordering + // requirements, this code may need to change. + // (This attribute has no effect on CPU.) + bool AllowsAsynchronousDeallocation() const override { return true; } + + private: + std::vector<TfAllocatorAdapter> per_device_allocators_; + // The wrapped TF allocators backing per_device_allocators_ (XlaAllocator does + // not take ownership of its underlying Allocator). + std::vector<std::unique_ptr<tensorflow::Allocator>> tf_allocators_; +}; + +} // namespace stream_executor + +#endif // TENSORFLOW_STREAM_EXECUTOR_TF_ALLOCATOR_ADAPTER_H_