From 15b155e929f2eb3e30c1194fa9afc1ea40e330a4 Mon Sep 17 00:00:00 2001 From: Todd Wang Date: Wed, 25 Jul 2018 20:58:13 -0700 Subject: [PATCH] Replace generic Pool with StreamPool, and discard failed streams. We have a Pool in XLA that maintains a freelist of Streams, to avoid the overhead of repeatedly allocating new Streams. Streams have a monotonic state machine; if a stream encounters any error, it will remain in an error state forever. The functional change in this CL is to ensure that streams which have encountered an error are deleted, rather than being put back on the pool. Without this change, a previously failed stream will be put back on the pool, only to cause the next usage of the stream to trivially fail. I've chosen to replace the generic templatized Pool with a concrete StreamPool, since this makes the logic more straightforward to reason about. Also note that the only existing usage of Pool is to hold streams. The functional change is in stream_pool.cc; most of everything else is mechanical updates. PiperOrigin-RevId: 206100631 --- tensorflow/compiler/jit/BUILD | 1 + tensorflow/compiler/jit/xla_device.h | 7 +- tensorflow/compiler/xla/client/BUILD | 1 + .../compiler/xla/client/local_client.cc | 7 +- tensorflow/compiler/xla/service/BUILD | 21 +-- tensorflow/compiler/xla/service/backend.cc | 17 +-- tensorflow/compiler/xla/service/backend.h | 14 +- .../compiler/xla/service/execution_tracker.cc | 9 +- .../compiler/xla/service/execution_tracker.h | 8 +- tensorflow/compiler/xla/service/gpu/BUILD | 3 +- .../xla/service/gpu/gpu_executable.cc | 2 +- .../xla/service/gpu/hlo_execution_profiler.cc | 11 +- .../xla/service/gpu/hlo_execution_profiler.h | 12 +- tensorflow/compiler/xla/service/pool.h | 84 ----------- tensorflow/compiler/xla/service/pool_test.cc | 40 ------ tensorflow/compiler/xla/service/service.cc | 11 +- .../service/service_executable_run_options.h | 7 +- .../compiler/xla/service/stream_pool.cc | 56 ++++++++ tensorflow/compiler/xla/service/stream_pool.h | 64 +++++++++ .../compiler/xla/service/stream_pool_test.cc | 136 ++++++++++++++++++ tensorflow/compiler/xla/tests/BUILD | 2 + .../xla/tests/transfer_manager_test.cc | 3 +- .../xla/tests/xla_hlo_profile_test.cc | 3 +- 23 files changed, 329 insertions(+), 190 deletions(-) delete mode 100644 tensorflow/compiler/xla/service/pool.h delete mode 100644 tensorflow/compiler/xla/service/pool_test.cc create mode 100644 tensorflow/compiler/xla/service/stream_pool.cc create mode 100644 tensorflow/compiler/xla/service/stream_pool.h create mode 100644 tensorflow/compiler/xla/service/stream_pool_test.cc diff --git a/tensorflow/compiler/jit/BUILD b/tensorflow/compiler/jit/BUILD index 9174a67cc6d..e34347b9d4e 100644 --- a/tensorflow/compiler/jit/BUILD +++ b/tensorflow/compiler/jit/BUILD @@ -166,6 +166,7 @@ cc_library( "//tensorflow/compiler/xla/client:client_library", "//tensorflow/compiler/xla/client:global_data", "//tensorflow/compiler/xla/client:local_client", + "//tensorflow/compiler/xla/service:stream_pool", "//tensorflow/core:core_cpu", "//tensorflow/core:core_cpu_internal", "//tensorflow/core:framework", diff --git a/tensorflow/compiler/jit/xla_device.h b/tensorflow/compiler/jit/xla_device.h index fccdb143680..4a5942fbd7f 100644 --- a/tensorflow/compiler/jit/xla_device.h +++ b/tensorflow/compiler/jit/xla_device.h @@ -29,6 +29,7 @@ limitations under the License. #include "tensorflow/compiler/tf2xla/xla_compiler.h" #include "tensorflow/compiler/tf2xla/xla_op_registry.h" #include "tensorflow/compiler/xla/client/local_client.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/core/common_runtime/device_factory.h" #include "tensorflow/core/common_runtime/local_device.h" #include "tensorflow/core/framework/allocator.h" @@ -153,17 +154,17 @@ class XlaDevice : public LocalDevice { // stream are executed on the device. Operations include data // copying back and forth between CPU and the device, and // computations enqueued by XLA. - xla::Backend::StreamPtr stream_; + xla::StreamPool::Ptr stream_; // If true, only stream_ is valid and all computation and transfers use // stream_. If false, computation is performed by stream_ and transfers are // performed by host_to_device/device_to_host_stream. bool use_multiple_streams_; // If use_multiple_streams_, host to device transfers are performed using this // stream. - xla::Backend::StreamPtr host_to_device_stream_; + xla::StreamPool::Ptr host_to_device_stream_; // If use_multiple_streams_, device to host transfers are performed using this // stream. - xla::Backend::StreamPtr device_to_host_stream_; + xla::StreamPool::Ptr device_to_host_stream_; // Must we use XLA's transfer manager for correct host<->device transfers? if // false, we can use ThenMemcpy() instead. bool transfer_as_literal_; diff --git a/tensorflow/compiler/xla/client/BUILD b/tensorflow/compiler/xla/client/BUILD index 289d3f552ae..ef166f86389 100644 --- a/tensorflow/compiler/xla/client/BUILD +++ b/tensorflow/compiler/xla/client/BUILD @@ -114,6 +114,7 @@ cc_library( "//tensorflow/compiler/xla/service:local_service", "//tensorflow/compiler/xla/service:shaped_buffer", "//tensorflow/compiler/xla/service:source_map_util", + "//tensorflow/compiler/xla/service:stream_pool", "//tensorflow/core:lib", "//tensorflow/core:stream_executor_no_cuda", "@llvm//:support", diff --git a/tensorflow/compiler/xla/client/local_client.cc b/tensorflow/compiler/xla/client/local_client.cc index 035ee9bf4cb..e7250e11d5e 100644 --- a/tensorflow/compiler/xla/client/local_client.cc +++ b/tensorflow/compiler/xla/client/local_client.cc @@ -23,6 +23,7 @@ limitations under the License. #include "tensorflow/compiler/xla/service/backend.h" #include "tensorflow/compiler/xla/service/service_executable_run_options.h" #include "tensorflow/compiler/xla/service/source_map_util.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/compiler/xla/status_macros.h" using xla::source_map_util::InvalidParameterArgument; @@ -30,8 +31,8 @@ using xla::source_map_util::InvalidParameterArgument; namespace xla { namespace { -StatusOr BorrowStreamForDevice(int device_ordinal, - Backend* backend) { +StatusOr BorrowStreamForDevice(int device_ordinal, + Backend* backend) { if (device_ordinal < 0) { device_ordinal = backend->default_device_ordinal(); } @@ -142,7 +143,7 @@ StatusOr LocalExecutable::Run( TF_RETURN_IF_ERROR( ValidateExecutionOptions(arguments, run_options, *backend_)); - Backend::StreamPtr stream; + StreamPool::Ptr stream; if (run_options.stream() == nullptr) { // NB! The lifetime of `stream` needs to match the lifetime of // `actual_options` (otherwise we will end up using a returned stream in diff --git a/tensorflow/compiler/xla/service/BUILD b/tensorflow/compiler/xla/service/BUILD index 2305dd4318b..6c140caab78 100644 --- a/tensorflow/compiler/xla/service/BUILD +++ b/tensorflow/compiler/xla/service/BUILD @@ -564,7 +564,7 @@ cc_library( ":computation_placer", ":device_memory_allocator", ":platform_util", - ":pool", + ":stream_pool", ":transfer_manager", "//tensorflow/compiler/xla:status_macros", "//tensorflow/compiler/xla:statusor", @@ -598,6 +598,7 @@ cc_library( ":hlo_proto_util", ":platform_util", ":source_map_util", + ":stream_pool", ":transfer_manager", "//tensorflow/compiler/xla:executable_run_options", "//tensorflow/compiler/xla:execution_options_util", @@ -751,8 +752,8 @@ cc_library( ":hlo_execution_profile", ":hlo_graph_dumper", ":hlo_proto", - ":pool", ":shaped_buffer", + ":stream_pool", "//tensorflow/compiler/xla:executable_run_options", "//tensorflow/compiler/xla:status", "//tensorflow/compiler/xla:status_macros", @@ -838,7 +839,7 @@ cc_library( hdrs = ["execution_tracker.h"], deps = [ ":backend", - ":pool", + ":stream_pool", "//tensorflow/compiler/xla:executable_run_options", "//tensorflow/compiler/xla:statusor", "//tensorflow/compiler/xla:util", @@ -2715,21 +2716,25 @@ tf_cc_test( ) cc_library( - name = "pool", - hdrs = ["pool.h"], + name = "stream_pool", + srcs = ["stream_pool.cc"], + hdrs = ["stream_pool.h"], deps = [ + "//tensorflow/compiler/xla:types", "//tensorflow/compiler/xla:util", "//tensorflow/core:lib", + "//tensorflow/core:stream_executor_no_cuda", ], ) tf_cc_test( - name = "pool_test", - srcs = ["pool_test.cc"], + name = "stream_pool_test", + srcs = ["stream_pool_test.cc"], deps = [ - ":pool", + ":stream_pool", "//tensorflow/compiler/xla:test_helpers", "//tensorflow/compiler/xla/tests:xla_internal_test_main", + "//tensorflow/core:stream_executor_no_cuda", ], ) diff --git a/tensorflow/compiler/xla/service/backend.cc b/tensorflow/compiler/xla/service/backend.cc index 349b32451a6..d12be3e007f 100644 --- a/tensorflow/compiler/xla/service/backend.cc +++ b/tensorflow/compiler/xla/service/backend.cc @@ -96,24 +96,19 @@ Backend::CreateDefaultBackend() { return CreateBackend(backend_options); } -StatusOr Backend::BorrowStream(int device_ordinal) { - TF_ASSIGN_OR_RETURN(auto exec, stream_executor(device_ordinal)); - return BorrowStream(exec); +StatusOr Backend::BorrowStream(int device_ordinal) { + TF_ASSIGN_OR_RETURN(auto executor, stream_executor(device_ordinal)); + return BorrowStream(executor); } -StatusOr Backend::BorrowStream( - se::StreamExecutor* executor) { +StatusOr Backend::BorrowStream(se::StreamExecutor* executor) { tensorflow::mutex_lock l(mu_); if (0 == stream_pools_.count(executor)) { stream_pools_.emplace(std::piecewise_construct, std::forward_as_tuple(executor), - std::forward_as_tuple([executor]() { - auto stream = MakeUnique(executor); - stream->Init(); - return stream; - })); + std::forward_as_tuple()); } - return stream_pools_.at(executor).Allocate(); + return stream_pools_.at(executor).BorrowStream(executor); } Backend::Backend( diff --git a/tensorflow/compiler/xla/service/backend.h b/tensorflow/compiler/xla/service/backend.h index 6546602473e..1bc3796fa48 100644 --- a/tensorflow/compiler/xla/service/backend.h +++ b/tensorflow/compiler/xla/service/backend.h @@ -24,7 +24,7 @@ limitations under the License. #include "tensorflow/compiler/xla/service/compiler.h" #include "tensorflow/compiler/xla/service/computation_placer.h" #include "tensorflow/compiler/xla/service/device_memory_allocator.h" -#include "tensorflow/compiler/xla/service/pool.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/compiler/xla/service/transfer_manager.h" #include "tensorflow/compiler/xla/statusor.h" #include "tensorflow/compiler/xla/types.h" @@ -63,11 +63,9 @@ class BackendOptions { // // It also offers a pooling API for creation/use of initialized streams: // -// StreamPtr stream = backend->BorrowStream().ConsumeValueOrDie(); +// StreamPool::Ptr stream = backend->BorrowStream().ConsumeValueOrDie(); class Backend { public: - using StreamPtr = Pool::SmartPtr; - // Creates a new backend. static StatusOr> CreateBackend( const BackendOptions& options); @@ -114,13 +112,13 @@ class Backend { // Borrows a stream for use by the caller, either by grabbing it from an // internal pool, or by constructing/initializating it, and returns the result // to the caller. - StatusOr BorrowStream(int device_ordinal); - StatusOr BorrowStream(se::StreamExecutor* executor); + StatusOr BorrowStream(int device_ordinal); + StatusOr BorrowStream(se::StreamExecutor* executor); // Returns a function to borrow a stream, as `BorrowStream` above does. // Purely for convenience, the caller could rather make this anonymous // function itself. - std::function(int)> StreamBorrower() { + std::function(int)> StreamBorrower() { return [this](int device_ordinal) { return BorrowStream(device_ordinal); }; } @@ -169,7 +167,7 @@ class Backend { tensorflow::mutex mu_; // Mapping from stream executor to stream pools, used by `BorrowStream` above. - std::map> stream_pools_ GUARDED_BY(mu_); + std::map stream_pools_ GUARDED_BY(mu_); // The default memory allocator to use. std::unique_ptr memory_allocator_; diff --git a/tensorflow/compiler/xla/service/execution_tracker.cc b/tensorflow/compiler/xla/service/execution_tracker.cc index 6794cfe297b..228c3fac95c 100644 --- a/tensorflow/compiler/xla/service/execution_tracker.cc +++ b/tensorflow/compiler/xla/service/execution_tracker.cc @@ -25,7 +25,7 @@ limitations under the License. namespace xla { AsyncExecution::AsyncExecution(Backend* backend, - std::vector streams, + std::vector streams, const ExecutionProfile& profile, GlobalDataHandle result) : backend_(CHECK_NOTNULL(backend)), @@ -46,9 +46,10 @@ Status AsyncExecution::BlockUntilDone() const { ExecutionTracker::ExecutionTracker() : next_handle_(1) {} -ExecutionHandle ExecutionTracker::Register( - Backend* backend, std::vector streams, - const ExecutionProfile& profile, GlobalDataHandle result) { +ExecutionHandle ExecutionTracker::Register(Backend* backend, + std::vector streams, + const ExecutionProfile& profile, + GlobalDataHandle result) { tensorflow::mutex_lock lock(execution_mutex_); int64 handle = next_handle_++; auto inserted = handle_to_execution_.emplace( diff --git a/tensorflow/compiler/xla/service/execution_tracker.h b/tensorflow/compiler/xla/service/execution_tracker.h index 4458152dd9a..4e9b9f883e2 100644 --- a/tensorflow/compiler/xla/service/execution_tracker.h +++ b/tensorflow/compiler/xla/service/execution_tracker.h @@ -22,7 +22,7 @@ limitations under the License. #include "tensorflow/compiler/xla/executable_run_options.h" #include "tensorflow/compiler/xla/service/backend.h" -#include "tensorflow/compiler/xla/service/pool.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/compiler/xla/statusor.h" #include "tensorflow/compiler/xla/util.h" #include "tensorflow/compiler/xla/xla_data.pb.h" @@ -40,7 +40,7 @@ namespace xla { // the stream when destructed. class AsyncExecution { public: - AsyncExecution(Backend* backend, std::vector streams, + AsyncExecution(Backend* backend, std::vector streams, const ExecutionProfile& profile, GlobalDataHandle result); Status BlockUntilDone() const; @@ -54,7 +54,7 @@ class AsyncExecution { Backend* backend_; // Stream on which the execution is launched. - std::vector streams_; + std::vector streams_; // Profile object of the execution to be returned to the user. ExecutionProfile profile_; @@ -72,7 +72,7 @@ class ExecutionTracker { // Registers an execution with its backend, streams, and data handle to the // execution result. Returns a handle for the registered execution. ExecutionHandle Register(Backend* backend, - std::vector stream, + std::vector stream, const ExecutionProfile& profile, GlobalDataHandle data); diff --git a/tensorflow/compiler/xla/service/gpu/BUILD b/tensorflow/compiler/xla/service/gpu/BUILD index 6f1e766d1c2..4c218116983 100644 --- a/tensorflow/compiler/xla/service/gpu/BUILD +++ b/tensorflow/compiler/xla/service/gpu/BUILD @@ -248,7 +248,7 @@ cc_library( deps = [ "//tensorflow/compiler/xla/service:hlo", "//tensorflow/compiler/xla/service:hlo_execution_profile", - "//tensorflow/compiler/xla/service:pool", + "//tensorflow/compiler/xla/service:stream_pool", "//tensorflow/core:lib", "//tensorflow/core:ptr_util", "//tensorflow/core:stream_executor_no_cuda", @@ -321,6 +321,7 @@ cc_library( "//tensorflow/compiler/xla/service:hlo_execution_profile", "//tensorflow/compiler/xla/service:logical_buffer", "//tensorflow/compiler/xla/service:shaped_buffer", + "//tensorflow/compiler/xla/service:stream_pool", "//tensorflow/compiler/xla/service:transfer_manager", "//tensorflow/compiler/xla/service:tuple_points_to_analysis", "//tensorflow/core:lib", diff --git a/tensorflow/compiler/xla/service/gpu/gpu_executable.cc b/tensorflow/compiler/xla/service/gpu/gpu_executable.cc index 0cad2958c72..9767836cd62 100644 --- a/tensorflow/compiler/xla/service/gpu/gpu_executable.cc +++ b/tensorflow/compiler/xla/service/gpu/gpu_executable.cc @@ -84,7 +84,7 @@ Status GpuExecutable::ExecuteThunks( } // Stream 0 indicates `main_stream` and substreams start from stream 1. - std::vector::SmartPtr> sub_streams; + std::vector sub_streams; sub_streams.reserve(thunk_schedule_->StreamCount() - 1); while (sub_streams.size() + 1 < thunk_schedule_->StreamCount()) { sub_streams.emplace_back(); diff --git a/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.cc b/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.cc index 19420e590d0..17226769302 100644 --- a/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.cc +++ b/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.cc @@ -23,7 +23,7 @@ limitations under the License. #include "tensorflow/compiler/xla/service/hlo_computation.h" #include "tensorflow/compiler/xla/service/hlo_execution_profile.h" #include "tensorflow/compiler/xla/service/hlo_instruction.h" -#include "tensorflow/compiler/xla/service/pool.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/stream_executor_no_cuda.h" #include "tensorflow/core/util/ptr_util.h" @@ -37,10 +37,9 @@ void InitAndStartTimer(std::stack>* timers, stream->InitTimer(timers->top().get()).ThenStartTimer(timers->top().get()); } -uint64 GetCyclesTaken( - std::stack>* timers, - const std::vector::SmartPtr>& sub_streams, - se::Stream* stream, double clock_rate_ghz) { +uint64 GetCyclesTaken(std::stack>* timers, + const std::vector& sub_streams, + se::Stream* stream, double clock_rate_ghz) { CHECK_GT(timers->size(), 0); stream->ThenWaitFor(&sub_streams); stream->ThenStopTimer(timers->top().get()); @@ -53,7 +52,7 @@ uint64 GetCyclesTaken( HloExecutionProfiler::HloExecutionProfiler( bool do_profile, HloExecutionProfile* profile, se::Stream* stream, - const std::vector::SmartPtr>& sub_streams, + const std::vector& sub_streams, const HloComputation* computation) : do_profile_(do_profile), profile_(profile), diff --git a/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.h b/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.h index 6654850bef3..80cde75f2bb 100644 --- a/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.h +++ b/tensorflow/compiler/xla/service/gpu/hlo_execution_profiler.h @@ -24,7 +24,7 @@ limitations under the License. #include "tensorflow/compiler/xla/service/hlo_computation.h" #include "tensorflow/compiler/xla/service/hlo_execution_profile.h" #include "tensorflow/compiler/xla/service/hlo_instruction.h" -#include "tensorflow/compiler/xla/service/pool.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/core/platform/stream_executor_no_cuda.h" namespace xla { @@ -38,10 +38,10 @@ class ScopedInstructionProfiler; class HloExecutionProfiler { public: // If profiling is enabled, start an execution timer running. - explicit HloExecutionProfiler( - bool do_profile, HloExecutionProfile* profile, se::Stream* stream, - const std::vector::SmartPtr>& sub_streams, - const HloComputation* computation); + explicit HloExecutionProfiler(bool do_profile, HloExecutionProfile* profile, + se::Stream* stream, + const std::vector& sub_streams, + const HloComputation* computation); // If profiling is enabled, sets the total cycle count on the profile from the // execution timer. @@ -72,7 +72,7 @@ class HloExecutionProfiler { double clock_rate_ghz_; HloExecutionProfile* profile_; se::Stream* stream_; - const std::vector::SmartPtr>& sub_streams_; + const std::vector& sub_streams_; const HloComputation* computation_; std::stack> timers_; // Contains the HLO instructions for which we are currently measuring the diff --git a/tensorflow/compiler/xla/service/pool.h b/tensorflow/compiler/xla/service/pool.h deleted file mode 100644 index 8e710ebb6dc..00000000000 --- a/tensorflow/compiler/xla/service/pool.h +++ /dev/null @@ -1,84 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_COMPILER_XLA_POOL_H_ -#define TENSORFLOW_COMPILER_XLA_POOL_H_ - -#include -#include - -#include "tensorflow/compiler/xla/ptr_util.h" -#include "tensorflow/core/platform/mutex.h" - -namespace xla { - -// Pool of values, which are created as needed and destroyed when the `Pool` is -// destroyed -template -class Pool { - public: - struct Deleter { - void operator()(T* ptr) { pool->Deallocate(ptr); } - - Pool* pool; - }; - - // A pointer to a taken element of a `Pool` which returns it to the pool on - // destruction - using SmartPtr = std::unique_ptr; - - // Constructs a `Pool` with given factory function, which need not be - // thread-safe. - explicit Pool(std::function()> factory) - : factory_(factory) {} - - explicit Pool() : Pool([]() { return MakeUnique(); }) {} - - // Returns a pointer to a value in the pool, creating a new value if none is - // free. The returned smart pointer returns the element to the pool on - // destruction. - // - // This method is thread-safe. - SmartPtr Allocate() { - tensorflow::mutex_lock lock(mu_); - T* ptr; - if (!xs_.empty()) { - ptr = std::move(xs_.back()).release(); - xs_.pop_back(); - } else { - ptr = factory_().release(); - } - Deleter del = {this}; - return std::unique_ptr(ptr, del); - } - - private: - // Puts a pointer to a value back into the pool, leaving it free for future - // use. - // - // This method is thread-safe. - void Deallocate(T* ptr) { - tensorflow::mutex_lock lock(mu_); - xs_.push_back(std::unique_ptr(ptr)); - } - - const std::function()> factory_ GUARDED_BY(mu_); - std::vector> xs_ GUARDED_BY(mu_); - tensorflow::mutex mu_; -}; - -} // namespace xla - -#endif // TENSORFLOW_COMPILER_XLA_POOL_H_ diff --git a/tensorflow/compiler/xla/service/pool_test.cc b/tensorflow/compiler/xla/service/pool_test.cc deleted file mode 100644 index 8c4fe258e38..00000000000 --- a/tensorflow/compiler/xla/service/pool_test.cc +++ /dev/null @@ -1,40 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/compiler/xla/service/pool.h" - -#include "tensorflow/compiler/xla/test_helpers.h" - -namespace xla { -namespace { - -using PoolTest = ::testing::Test; - -TEST_F(PoolTest, Test) { - Pool pool; - - { - auto ptr = pool.Allocate(); - EXPECT_NE(nullptr, ptr.get()); - *ptr = 5; - } - - auto ptr = pool.Allocate(); - EXPECT_NE(nullptr, ptr.get()); - EXPECT_EQ(5, *ptr); -} - -} // namespace -} // namespace xla diff --git a/tensorflow/compiler/xla/service/service.cc b/tensorflow/compiler/xla/service/service.cc index 636013cbb56..ce070bc5b6c 100644 --- a/tensorflow/compiler/xla/service/service.cc +++ b/tensorflow/compiler/xla/service/service.cc @@ -37,6 +37,7 @@ limitations under the License. #include "tensorflow/compiler/xla/service/hlo_proto_util.h" #include "tensorflow/compiler/xla/service/platform_util.h" #include "tensorflow/compiler/xla/service/source_map_util.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/compiler/xla/service/transfer_manager.h" #include "tensorflow/compiler/xla/shape_layout.h" #include "tensorflow/compiler/xla/shape_util.h" @@ -376,7 +377,7 @@ Service::ExecuteParallelAndRegisterResult( ExecutionProfile* profile) { // Streams where the computation are launched, so we can wait on the streams // to complete. - std::vector::SmartPtr> streams; + std::vector streams; std::vector> timers; // Global data handles for the computation results, one for each computation. @@ -403,7 +404,7 @@ Service::ExecuteParallelAndRegisterResult( CHECK_EQ(replicas.size(), arguments[i].size()); std::vector result_buffers; for (int64 replica = 0; replica < replicas.size(); ++replica) { - TF_ASSIGN_OR_RETURN(Pool::SmartPtr stream, + TF_ASSIGN_OR_RETURN(StreamPool::Ptr stream, backend->BorrowStream(replicas[replica])); streams.push_back(std::move(stream)); @@ -515,13 +516,13 @@ StatusOr Service::ExecuteAndRegisterResult( arguments, Backend* backend, const string& result_tag, ExecutionProfile* profile) { // Set up streams. - std::vector::SmartPtr> streams; + std::vector streams; TF_ASSIGN_OR_RETURN(auto replicas, Replicas(*backend, SingleComputationDeviceHandle())); TF_RET_CHECK(!replicas.empty()); for (se::StreamExecutor* executor : replicas) { - TF_ASSIGN_OR_RETURN(Pool::SmartPtr stream, + TF_ASSIGN_OR_RETURN(StreamPool::Ptr stream, backend->BorrowStream(executor)); streams.push_back(std::move(stream)); } @@ -533,7 +534,7 @@ StatusOr Service::ExecuteAndRegisterResult( // Set up run options. std::vector run_options; - for (const Pool::SmartPtr& stream : streams) { + for (const StreamPool::Ptr& stream : streams) { ExecutableRunOptions options; options.set_stream(stream.get()); options.set_device_ordinal(stream->parent()->device_ordinal()); diff --git a/tensorflow/compiler/xla/service/service_executable_run_options.h b/tensorflow/compiler/xla/service/service_executable_run_options.h index 7f3910cdb03..dbfed628bfc 100644 --- a/tensorflow/compiler/xla/service/service_executable_run_options.h +++ b/tensorflow/compiler/xla/service/service_executable_run_options.h @@ -17,7 +17,7 @@ limitations under the License. #define TENSORFLOW_COMPILER_XLA_SERVICE_SERVICE_EXECUTABLE_RUN_OPTIONS_H_ #include "tensorflow/compiler/xla/executable_run_options.h" -#include "tensorflow/compiler/xla/service/pool.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/compiler/xla/statusor.h" #include "tensorflow/stream_executor/stream_executor.h" @@ -27,8 +27,7 @@ namespace xla { // data, now only a stream cache for GPU backend. class ServiceExecutableRunOptions { public: - using StreamBorrower = - std::function::SmartPtr>(int)>; + using StreamBorrower = std::function(int)>; ServiceExecutableRunOptions() : ServiceExecutableRunOptions(ExecutableRunOptions()) {} @@ -51,7 +50,7 @@ class ServiceExecutableRunOptions { // Borrows a stream and returns a smart pointer which returns the stream on // destruction. - StatusOr::SmartPtr> BorrowStream(int device_ordinal) const { + StatusOr BorrowStream(int device_ordinal) const { return borrow_stream_ ? borrow_stream_(device_ordinal) : Status(tensorflow::error::UNIMPLEMENTED, "No stream cache"); diff --git a/tensorflow/compiler/xla/service/stream_pool.cc b/tensorflow/compiler/xla/service/stream_pool.cc new file mode 100644 index 00000000000..92bb21b816c --- /dev/null +++ b/tensorflow/compiler/xla/service/stream_pool.cc @@ -0,0 +1,56 @@ +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/compiler/xla/service/stream_pool.h" + +#include "tensorflow/compiler/xla/ptr_util.h" + +namespace xla { + +StreamPool::Ptr StreamPool::BorrowStream(se::StreamExecutor* executor) { + std::unique_ptr stream; + { + tensorflow::mutex_lock lock(mu_); + if (!streams_.empty()) { + // Re-use an existing stream from the pool. + stream = std::move(streams_.back()); + streams_.pop_back(); + } + } + + if (!stream) { + // Create a new stream. + stream = MakeUnique(executor); + stream->Init(); + } + + // Return the stream wrapped in Ptr, which has our special deleter semantics. + PtrDeleter deleter = {this}; + return Ptr(stream.release(), deleter); +} + +void StreamPool::ReturnStream(se::Stream* stream) { + if (stream->ok()) { + tensorflow::mutex_lock lock(mu_); + streams_.emplace_back(stream); + } else { + // If the stream has encountered any errors, all subsequent + // operations on it will fail. So just delete the stream, and rely + // on new streams to be created in the future. + delete stream; + } +} + +} // namespace xla diff --git a/tensorflow/compiler/xla/service/stream_pool.h b/tensorflow/compiler/xla/service/stream_pool.h new file mode 100644 index 00000000000..7221d323a61 --- /dev/null +++ b/tensorflow/compiler/xla/service/stream_pool.h @@ -0,0 +1,64 @@ +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_COMPILER_XLA_SERVICE_STREAM_POOL_H_ +#define TENSORFLOW_COMPILER_XLA_SERVICE_STREAM_POOL_H_ + +#include +#include + +#include "tensorflow/compiler/xla/types.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/stream_executor_no_cuda.h" + +namespace xla { + +// Pool of stream_executor::Streams, which are created as needed and +// destroyed when the pool is destroyed. +class StreamPool { + public: + struct PtrDeleter { + void operator()(se::Stream* stream) { pool->ReturnStream(stream); } + StreamPool* pool; + }; + + // Stream pointer type returned by BorrowStream, which returns the + // stream to the pool on destruction. + using Ptr = std::unique_ptr; + + StreamPool() {} + + // Returns a pointer to a stream in the pool, creating a new stream + // if none are available in the pool. The returned smart pointer + // returns the stream to the pool on destruction. + // + // This method is thread-safe. + Ptr BorrowStream(se::StreamExecutor* executor); + + private: + // Puts a pointer to a stream back into the pool, leaving it free + // for future use. Streams that have previously encountered errors + // are deleted, and not returned to the pool. + // + // This method is thread-safe. + void ReturnStream(se::Stream* stream); + + tensorflow::mutex mu_; + std::vector> streams_ GUARDED_BY(mu_); +}; + +} // namespace xla + +#endif // TENSORFLOW_COMPILER_XLA_SERVICE_STREAM_POOL_H_ diff --git a/tensorflow/compiler/xla/service/stream_pool_test.cc b/tensorflow/compiler/xla/service/stream_pool_test.cc new file mode 100644 index 00000000000..aaf5c37b0d2 --- /dev/null +++ b/tensorflow/compiler/xla/service/stream_pool_test.cc @@ -0,0 +1,136 @@ +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/compiler/xla/service/stream_pool.h" + +#include + +#include "tensorflow/compiler/xla/test_helpers.h" +#include "tensorflow/core/platform/stream_executor_no_cuda.h" + +namespace xla { +namespace { + +class StreamPoolTest : public ::testing::Test { + protected: + std::unique_ptr NewStreamExecutor() { + se::Platform* platform = + se::MultiPlatformManager::PlatformWithName("Host").ConsumeValueOrDie(); + se::StreamExecutorConfig config(/*ordinal=*/0); + return platform->GetUncachedExecutor(config).ConsumeValueOrDie(); + } +}; + +TEST_F(StreamPoolTest, EmptyPool) { StreamPool pool; } + +TEST_F(StreamPoolTest, OneStreamPool) { + std::unique_ptr executor = NewStreamExecutor(); + StreamPool pool; + + // Borrow and return a stream. + StreamPool::Ptr stream1 = pool.BorrowStream(executor.get()); + se::Stream* stream1_ptr = stream1.get(); + EXPECT_TRUE(stream1->ok()); + stream1 = nullptr; + + // Borrow and return another stream. + StreamPool::Ptr stream2 = pool.BorrowStream(executor.get()); + se::Stream* stream2_ptr = stream2.get(); + EXPECT_TRUE(stream2->ok()); + stream2 = nullptr; + + // The underlying streams should be the same, since stream1 was the + // only stream available in the pool when stream2 was borrowed. + EXPECT_EQ(stream1_ptr, stream2_ptr); +} + +TEST_F(StreamPoolTest, TwoStreamPool) { + std::unique_ptr executor = NewStreamExecutor(); + StreamPool pool; + + // Borrow two streams. + StreamPool::Ptr stream1 = pool.BorrowStream(executor.get()); + se::Stream* stream1_ptr = stream1.get(); + EXPECT_TRUE(stream1->ok()); + StreamPool::Ptr stream2 = pool.BorrowStream(executor.get()); + se::Stream* stream2_ptr = stream2.get(); + EXPECT_TRUE(stream2->ok()); + + // The underlying streams should be different, since we haven't + // returned either of them yet. + EXPECT_NE(stream1_ptr, stream2_ptr); + + // Return stream1 and borrow stream3. + stream1 = nullptr; + StreamPool::Ptr stream3 = pool.BorrowStream(executor.get()); + se::Stream* stream3_ptr = stream3.get(); + EXPECT_TRUE(stream3->ok()); + + // stream1 and stream3 should be the same. + EXPECT_EQ(stream1_ptr, stream3_ptr); + EXPECT_NE(stream2_ptr, stream3_ptr); + + // Return stream2, and borrow stream4. + stream2 = nullptr; + StreamPool::Ptr stream4 = pool.BorrowStream(executor.get()); + se::Stream* stream4_ptr = stream4.get(); + EXPECT_TRUE(stream4->ok()); + + // Stream2 and stream4 should be the same. + EXPECT_EQ(stream2_ptr, stream4_ptr); + EXPECT_NE(stream3_ptr, stream4_ptr); +} + +TEST_F(StreamPoolTest, BadStreamDiscarded) { + std::unique_ptr executor = NewStreamExecutor(); + StreamPool pool; + + // Borrow a stream. + StreamPool::Ptr stream1 = pool.BorrowStream(executor.get()); + EXPECT_TRUE(stream1->ok()); + + // Force an error on the stream; here we call a method that requires + // DNN support, which we know the Host platform doesn't support. + stream1->ThenDepthConcatenate({}, {}, nullptr); + EXPECT_FALSE(stream1->ok()); + + // Return stream1 and borrow stream2. + stream1 = nullptr; + StreamPool::Ptr stream2 = pool.BorrowStream(executor.get()); + se::Stream* stream2_ptr = stream2.get(); + EXPECT_TRUE(stream2->ok()); + + // The underlying streams should be different. They would have been + // the same, but since we forced an error on stream1, it cannot be + // put back into the pool. Sadly we can't just check: + // EXPECT_NE(stream1_ptr, stream2_ptr); + // + // The above should hold logically, but it may fail if the new + // stream instance allocated for stream2 happens to reside in the + // same memory address as stream1, which has been deleted. + // + // The check that stream2->ok() serves as a good-enough check. + + // Return stream2 and borrow stream3. The previous error on stream1 + // has no effect on these streams, and they are the same. + stream2 = nullptr; + StreamPool::Ptr stream3 = pool.BorrowStream(executor.get()); + se::Stream* stream3_ptr = stream3.get(); + EXPECT_TRUE(stream3->ok()); + EXPECT_EQ(stream2_ptr, stream3_ptr); +} + +} // namespace +} // namespace xla diff --git a/tensorflow/compiler/xla/tests/BUILD b/tensorflow/compiler/xla/tests/BUILD index 099431d949e..39dec5b2e0c 100644 --- a/tensorflow/compiler/xla/tests/BUILD +++ b/tensorflow/compiler/xla/tests/BUILD @@ -379,6 +379,7 @@ xla_test( "//tensorflow/compiler/xla/client:xla_computation", "//tensorflow/compiler/xla/client/xla_client:xla_builder", "//tensorflow/compiler/xla/service:platform_util", + "//tensorflow/compiler/xla/service:stream_pool", "//tensorflow/compiler/xla/tests:client_library_test_base", "//tensorflow/compiler/xla/tests:test_utils", "//tensorflow/core:lib", @@ -2013,6 +2014,7 @@ xla_test( "//tensorflow/compiler/xla/service:device_memory_allocator", "//tensorflow/compiler/xla/service:generic_transfer_manager", "//tensorflow/compiler/xla/service:shaped_buffer", + "//tensorflow/compiler/xla/service:stream_pool", "//tensorflow/core:lib", "//tensorflow/core:stream_executor_no_cuda", "//tensorflow/core:test", diff --git a/tensorflow/compiler/xla/tests/transfer_manager_test.cc b/tensorflow/compiler/xla/tests/transfer_manager_test.cc index 0f86b7f20f9..125513ddfd1 100644 --- a/tensorflow/compiler/xla/tests/transfer_manager_test.cc +++ b/tensorflow/compiler/xla/tests/transfer_manager_test.cc @@ -22,6 +22,7 @@ limitations under the License. #include "tensorflow/compiler/xla/service/device_memory_allocator.h" #include "tensorflow/compiler/xla/service/generic_transfer_manager.h" #include "tensorflow/compiler/xla/service/shaped_buffer.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/compiler/xla/shape_util.h" #include "tensorflow/compiler/xla/statusor.h" #include "tensorflow/compiler/xla/tests/literal_test_util.h" @@ -60,7 +61,7 @@ class TransferManagerTest : public LocalClientTestBase { } protected: - Backend::StreamPtr stream_ptr_; + StreamPool::Ptr stream_ptr_; se::Stream* stream_; private: diff --git a/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc b/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc index 7a75e5102c2..10c51a1c849 100644 --- a/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc +++ b/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc @@ -22,6 +22,7 @@ limitations under the License. #include "tensorflow/compiler/xla/client/xla_computation.h" #include "tensorflow/compiler/xla/map_util.h" #include "tensorflow/compiler/xla/service/platform_util.h" +#include "tensorflow/compiler/xla/service/stream_pool.h" #include "tensorflow/compiler/xla/shape_util.h" #include "tensorflow/compiler/xla/tests/client_library_test_base.h" #include "tensorflow/compiler/xla/tests/test_macros.h" @@ -133,7 +134,7 @@ void ExecuteAndFetchProfile(string* profile_output, LocalClient* client, DeviceMemoryAllocator* allocator = backend->memory_allocator(); auto* transfer_manager = backend->transfer_manager(); TF_ASSERT_OK_AND_ASSIGN( - Backend::StreamPtr stream_ptr, + StreamPool::Ptr stream_ptr, backend->BorrowStream(backend->default_device_ordinal())); TF_ASSERT_OK_AND_ASSIGN(