Replace generic Pool with StreamPool, and discard failed streams.
We have a Pool in XLA that maintains a freelist of Streams, to avoid the overhead of repeatedly allocating new Streams. Streams have a monotonic state machine; if a stream encounters any error, it will remain in an error state forever. The functional change in this CL is to ensure that streams which have encountered an error are deleted, rather than being put back on the pool. Without this change, a previously failed stream will be put back on the pool, only to cause the next usage of the stream to trivially fail. I've chosen to replace the generic templatized Pool with a concrete StreamPool, since this makes the logic more straightforward to reason about. Also note that the only existing usage of Pool is to hold streams. The functional change is in stream_pool.cc; most of everything else is mechanical updates. PiperOrigin-RevId: 206100631
This commit is contained in:
parent
5d92abe1e4
commit
15b155e929
@ -166,6 +166,7 @@ cc_library(
|
||||
"//tensorflow/compiler/xla/client:client_library",
|
||||
"//tensorflow/compiler/xla/client:global_data",
|
||||
"//tensorflow/compiler/xla/client:local_client",
|
||||
"//tensorflow/compiler/xla/service:stream_pool",
|
||||
"//tensorflow/core:core_cpu",
|
||||
"//tensorflow/core:core_cpu_internal",
|
||||
"//tensorflow/core:framework",
|
||||
|
@ -29,6 +29,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/tf2xla/xla_compiler.h"
|
||||
#include "tensorflow/compiler/tf2xla/xla_op_registry.h"
|
||||
#include "tensorflow/compiler/xla/client/local_client.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/core/common_runtime/device_factory.h"
|
||||
#include "tensorflow/core/common_runtime/local_device.h"
|
||||
#include "tensorflow/core/framework/allocator.h"
|
||||
@ -153,17 +154,17 @@ class XlaDevice : public LocalDevice {
|
||||
// stream are executed on the device. Operations include data
|
||||
// copying back and forth between CPU and the device, and
|
||||
// computations enqueued by XLA.
|
||||
xla::Backend::StreamPtr stream_;
|
||||
xla::StreamPool::Ptr stream_;
|
||||
// If true, only stream_ is valid and all computation and transfers use
|
||||
// stream_. If false, computation is performed by stream_ and transfers are
|
||||
// performed by host_to_device/device_to_host_stream.
|
||||
bool use_multiple_streams_;
|
||||
// If use_multiple_streams_, host to device transfers are performed using this
|
||||
// stream.
|
||||
xla::Backend::StreamPtr host_to_device_stream_;
|
||||
xla::StreamPool::Ptr host_to_device_stream_;
|
||||
// If use_multiple_streams_, device to host transfers are performed using this
|
||||
// stream.
|
||||
xla::Backend::StreamPtr device_to_host_stream_;
|
||||
xla::StreamPool::Ptr device_to_host_stream_;
|
||||
// Must we use XLA's transfer manager for correct host<->device transfers? if
|
||||
// false, we can use ThenMemcpy() instead.
|
||||
bool transfer_as_literal_;
|
||||
|
@ -114,6 +114,7 @@ cc_library(
|
||||
"//tensorflow/compiler/xla/service:local_service",
|
||||
"//tensorflow/compiler/xla/service:shaped_buffer",
|
||||
"//tensorflow/compiler/xla/service:source_map_util",
|
||||
"//tensorflow/compiler/xla/service:stream_pool",
|
||||
"//tensorflow/core:lib",
|
||||
"//tensorflow/core:stream_executor_no_cuda",
|
||||
"@llvm//:support",
|
||||
|
@ -23,6 +23,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/xla/service/backend.h"
|
||||
#include "tensorflow/compiler/xla/service/service_executable_run_options.h"
|
||||
#include "tensorflow/compiler/xla/service/source_map_util.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/compiler/xla/status_macros.h"
|
||||
|
||||
using xla::source_map_util::InvalidParameterArgument;
|
||||
@ -30,8 +31,8 @@ using xla::source_map_util::InvalidParameterArgument;
|
||||
namespace xla {
|
||||
|
||||
namespace {
|
||||
StatusOr<Backend::StreamPtr> BorrowStreamForDevice(int device_ordinal,
|
||||
Backend* backend) {
|
||||
StatusOr<StreamPool::Ptr> BorrowStreamForDevice(int device_ordinal,
|
||||
Backend* backend) {
|
||||
if (device_ordinal < 0) {
|
||||
device_ordinal = backend->default_device_ordinal();
|
||||
}
|
||||
@ -142,7 +143,7 @@ StatusOr<ScopedShapedBuffer> LocalExecutable::Run(
|
||||
TF_RETURN_IF_ERROR(
|
||||
ValidateExecutionOptions(arguments, run_options, *backend_));
|
||||
|
||||
Backend::StreamPtr stream;
|
||||
StreamPool::Ptr stream;
|
||||
if (run_options.stream() == nullptr) {
|
||||
// NB! The lifetime of `stream` needs to match the lifetime of
|
||||
// `actual_options` (otherwise we will end up using a returned stream in
|
||||
|
@ -564,7 +564,7 @@ cc_library(
|
||||
":computation_placer",
|
||||
":device_memory_allocator",
|
||||
":platform_util",
|
||||
":pool",
|
||||
":stream_pool",
|
||||
":transfer_manager",
|
||||
"//tensorflow/compiler/xla:status_macros",
|
||||
"//tensorflow/compiler/xla:statusor",
|
||||
@ -598,6 +598,7 @@ cc_library(
|
||||
":hlo_proto_util",
|
||||
":platform_util",
|
||||
":source_map_util",
|
||||
":stream_pool",
|
||||
":transfer_manager",
|
||||
"//tensorflow/compiler/xla:executable_run_options",
|
||||
"//tensorflow/compiler/xla:execution_options_util",
|
||||
@ -751,8 +752,8 @@ cc_library(
|
||||
":hlo_execution_profile",
|
||||
":hlo_graph_dumper",
|
||||
":hlo_proto",
|
||||
":pool",
|
||||
":shaped_buffer",
|
||||
":stream_pool",
|
||||
"//tensorflow/compiler/xla:executable_run_options",
|
||||
"//tensorflow/compiler/xla:status",
|
||||
"//tensorflow/compiler/xla:status_macros",
|
||||
@ -838,7 +839,7 @@ cc_library(
|
||||
hdrs = ["execution_tracker.h"],
|
||||
deps = [
|
||||
":backend",
|
||||
":pool",
|
||||
":stream_pool",
|
||||
"//tensorflow/compiler/xla:executable_run_options",
|
||||
"//tensorflow/compiler/xla:statusor",
|
||||
"//tensorflow/compiler/xla:util",
|
||||
@ -2715,21 +2716,25 @@ tf_cc_test(
|
||||
)
|
||||
|
||||
cc_library(
|
||||
name = "pool",
|
||||
hdrs = ["pool.h"],
|
||||
name = "stream_pool",
|
||||
srcs = ["stream_pool.cc"],
|
||||
hdrs = ["stream_pool.h"],
|
||||
deps = [
|
||||
"//tensorflow/compiler/xla:types",
|
||||
"//tensorflow/compiler/xla:util",
|
||||
"//tensorflow/core:lib",
|
||||
"//tensorflow/core:stream_executor_no_cuda",
|
||||
],
|
||||
)
|
||||
|
||||
tf_cc_test(
|
||||
name = "pool_test",
|
||||
srcs = ["pool_test.cc"],
|
||||
name = "stream_pool_test",
|
||||
srcs = ["stream_pool_test.cc"],
|
||||
deps = [
|
||||
":pool",
|
||||
":stream_pool",
|
||||
"//tensorflow/compiler/xla:test_helpers",
|
||||
"//tensorflow/compiler/xla/tests:xla_internal_test_main",
|
||||
"//tensorflow/core:stream_executor_no_cuda",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -96,24 +96,19 @@ Backend::CreateDefaultBackend() {
|
||||
return CreateBackend(backend_options);
|
||||
}
|
||||
|
||||
StatusOr<Backend::StreamPtr> Backend::BorrowStream(int device_ordinal) {
|
||||
TF_ASSIGN_OR_RETURN(auto exec, stream_executor(device_ordinal));
|
||||
return BorrowStream(exec);
|
||||
StatusOr<StreamPool::Ptr> Backend::BorrowStream(int device_ordinal) {
|
||||
TF_ASSIGN_OR_RETURN(auto executor, stream_executor(device_ordinal));
|
||||
return BorrowStream(executor);
|
||||
}
|
||||
|
||||
StatusOr<Backend::StreamPtr> Backend::BorrowStream(
|
||||
se::StreamExecutor* executor) {
|
||||
StatusOr<StreamPool::Ptr> Backend::BorrowStream(se::StreamExecutor* executor) {
|
||||
tensorflow::mutex_lock l(mu_);
|
||||
if (0 == stream_pools_.count(executor)) {
|
||||
stream_pools_.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(executor),
|
||||
std::forward_as_tuple([executor]() {
|
||||
auto stream = MakeUnique<se::Stream>(executor);
|
||||
stream->Init();
|
||||
return stream;
|
||||
}));
|
||||
std::forward_as_tuple());
|
||||
}
|
||||
return stream_pools_.at(executor).Allocate();
|
||||
return stream_pools_.at(executor).BorrowStream(executor);
|
||||
}
|
||||
|
||||
Backend::Backend(
|
||||
|
@ -24,7 +24,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/xla/service/compiler.h"
|
||||
#include "tensorflow/compiler/xla/service/computation_placer.h"
|
||||
#include "tensorflow/compiler/xla/service/device_memory_allocator.h"
|
||||
#include "tensorflow/compiler/xla/service/pool.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/compiler/xla/service/transfer_manager.h"
|
||||
#include "tensorflow/compiler/xla/statusor.h"
|
||||
#include "tensorflow/compiler/xla/types.h"
|
||||
@ -63,11 +63,9 @@ class BackendOptions {
|
||||
//
|
||||
// It also offers a pooling API for creation/use of initialized streams:
|
||||
//
|
||||
// StreamPtr stream = backend->BorrowStream().ConsumeValueOrDie();
|
||||
// StreamPool::Ptr stream = backend->BorrowStream().ConsumeValueOrDie();
|
||||
class Backend {
|
||||
public:
|
||||
using StreamPtr = Pool<se::Stream>::SmartPtr;
|
||||
|
||||
// Creates a new backend.
|
||||
static StatusOr<std::unique_ptr<Backend>> CreateBackend(
|
||||
const BackendOptions& options);
|
||||
@ -114,13 +112,13 @@ class Backend {
|
||||
// Borrows a stream for use by the caller, either by grabbing it from an
|
||||
// internal pool, or by constructing/initializating it, and returns the result
|
||||
// to the caller.
|
||||
StatusOr<StreamPtr> BorrowStream(int device_ordinal);
|
||||
StatusOr<StreamPtr> BorrowStream(se::StreamExecutor* executor);
|
||||
StatusOr<StreamPool::Ptr> BorrowStream(int device_ordinal);
|
||||
StatusOr<StreamPool::Ptr> BorrowStream(se::StreamExecutor* executor);
|
||||
|
||||
// Returns a function to borrow a stream, as `BorrowStream` above does.
|
||||
// Purely for convenience, the caller could rather make this anonymous
|
||||
// function itself.
|
||||
std::function<StatusOr<StreamPtr>(int)> StreamBorrower() {
|
||||
std::function<StatusOr<StreamPool::Ptr>(int)> StreamBorrower() {
|
||||
return [this](int device_ordinal) { return BorrowStream(device_ordinal); };
|
||||
}
|
||||
|
||||
@ -169,7 +167,7 @@ class Backend {
|
||||
tensorflow::mutex mu_;
|
||||
|
||||
// Mapping from stream executor to stream pools, used by `BorrowStream` above.
|
||||
std::map<se::StreamExecutor*, Pool<se::Stream>> stream_pools_ GUARDED_BY(mu_);
|
||||
std::map<se::StreamExecutor*, StreamPool> stream_pools_ GUARDED_BY(mu_);
|
||||
|
||||
// The default memory allocator to use.
|
||||
std::unique_ptr<StreamExecutorMemoryAllocator> memory_allocator_;
|
||||
|
@ -25,7 +25,7 @@ limitations under the License.
|
||||
namespace xla {
|
||||
|
||||
AsyncExecution::AsyncExecution(Backend* backend,
|
||||
std::vector<Backend::StreamPtr> streams,
|
||||
std::vector<StreamPool::Ptr> streams,
|
||||
const ExecutionProfile& profile,
|
||||
GlobalDataHandle result)
|
||||
: backend_(CHECK_NOTNULL(backend)),
|
||||
@ -46,9 +46,10 @@ Status AsyncExecution::BlockUntilDone() const {
|
||||
|
||||
ExecutionTracker::ExecutionTracker() : next_handle_(1) {}
|
||||
|
||||
ExecutionHandle ExecutionTracker::Register(
|
||||
Backend* backend, std::vector<Backend::StreamPtr> streams,
|
||||
const ExecutionProfile& profile, GlobalDataHandle result) {
|
||||
ExecutionHandle ExecutionTracker::Register(Backend* backend,
|
||||
std::vector<StreamPool::Ptr> streams,
|
||||
const ExecutionProfile& profile,
|
||||
GlobalDataHandle result) {
|
||||
tensorflow::mutex_lock lock(execution_mutex_);
|
||||
int64 handle = next_handle_++;
|
||||
auto inserted = handle_to_execution_.emplace(
|
||||
|
@ -22,7 +22,7 @@ limitations under the License.
|
||||
|
||||
#include "tensorflow/compiler/xla/executable_run_options.h"
|
||||
#include "tensorflow/compiler/xla/service/backend.h"
|
||||
#include "tensorflow/compiler/xla/service/pool.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/compiler/xla/statusor.h"
|
||||
#include "tensorflow/compiler/xla/util.h"
|
||||
#include "tensorflow/compiler/xla/xla_data.pb.h"
|
||||
@ -40,7 +40,7 @@ namespace xla {
|
||||
// the stream when destructed.
|
||||
class AsyncExecution {
|
||||
public:
|
||||
AsyncExecution(Backend* backend, std::vector<Backend::StreamPtr> streams,
|
||||
AsyncExecution(Backend* backend, std::vector<StreamPool::Ptr> streams,
|
||||
const ExecutionProfile& profile, GlobalDataHandle result);
|
||||
|
||||
Status BlockUntilDone() const;
|
||||
@ -54,7 +54,7 @@ class AsyncExecution {
|
||||
Backend* backend_;
|
||||
|
||||
// Stream on which the execution is launched.
|
||||
std::vector<Backend::StreamPtr> streams_;
|
||||
std::vector<StreamPool::Ptr> streams_;
|
||||
|
||||
// Profile object of the execution to be returned to the user.
|
||||
ExecutionProfile profile_;
|
||||
@ -72,7 +72,7 @@ class ExecutionTracker {
|
||||
// Registers an execution with its backend, streams, and data handle to the
|
||||
// execution result. Returns a handle for the registered execution.
|
||||
ExecutionHandle Register(Backend* backend,
|
||||
std::vector<Backend::StreamPtr> stream,
|
||||
std::vector<StreamPool::Ptr> stream,
|
||||
const ExecutionProfile& profile,
|
||||
GlobalDataHandle data);
|
||||
|
||||
|
@ -248,7 +248,7 @@ cc_library(
|
||||
deps = [
|
||||
"//tensorflow/compiler/xla/service:hlo",
|
||||
"//tensorflow/compiler/xla/service:hlo_execution_profile",
|
||||
"//tensorflow/compiler/xla/service:pool",
|
||||
"//tensorflow/compiler/xla/service:stream_pool",
|
||||
"//tensorflow/core:lib",
|
||||
"//tensorflow/core:ptr_util",
|
||||
"//tensorflow/core:stream_executor_no_cuda",
|
||||
@ -321,6 +321,7 @@ cc_library(
|
||||
"//tensorflow/compiler/xla/service:hlo_execution_profile",
|
||||
"//tensorflow/compiler/xla/service:logical_buffer",
|
||||
"//tensorflow/compiler/xla/service:shaped_buffer",
|
||||
"//tensorflow/compiler/xla/service:stream_pool",
|
||||
"//tensorflow/compiler/xla/service:transfer_manager",
|
||||
"//tensorflow/compiler/xla/service:tuple_points_to_analysis",
|
||||
"//tensorflow/core:lib",
|
||||
|
@ -84,7 +84,7 @@ Status GpuExecutable::ExecuteThunks(
|
||||
}
|
||||
|
||||
// Stream 0 indicates `main_stream` and substreams start from stream 1.
|
||||
std::vector<Pool<se::Stream>::SmartPtr> sub_streams;
|
||||
std::vector<StreamPool::Ptr> sub_streams;
|
||||
sub_streams.reserve(thunk_schedule_->StreamCount() - 1);
|
||||
while (sub_streams.size() + 1 < thunk_schedule_->StreamCount()) {
|
||||
sub_streams.emplace_back();
|
||||
|
@ -23,7 +23,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/xla/service/hlo_computation.h"
|
||||
#include "tensorflow/compiler/xla/service/hlo_execution_profile.h"
|
||||
#include "tensorflow/compiler/xla/service/hlo_instruction.h"
|
||||
#include "tensorflow/compiler/xla/service/pool.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/core/platform/logging.h"
|
||||
#include "tensorflow/core/platform/stream_executor_no_cuda.h"
|
||||
#include "tensorflow/core/util/ptr_util.h"
|
||||
@ -37,10 +37,9 @@ void InitAndStartTimer(std::stack<std::unique_ptr<se::Timer>>* timers,
|
||||
stream->InitTimer(timers->top().get()).ThenStartTimer(timers->top().get());
|
||||
}
|
||||
|
||||
uint64 GetCyclesTaken(
|
||||
std::stack<std::unique_ptr<se::Timer>>* timers,
|
||||
const std::vector<Pool<se::Stream>::SmartPtr>& sub_streams,
|
||||
se::Stream* stream, double clock_rate_ghz) {
|
||||
uint64 GetCyclesTaken(std::stack<std::unique_ptr<se::Timer>>* timers,
|
||||
const std::vector<StreamPool::Ptr>& sub_streams,
|
||||
se::Stream* stream, double clock_rate_ghz) {
|
||||
CHECK_GT(timers->size(), 0);
|
||||
stream->ThenWaitFor(&sub_streams);
|
||||
stream->ThenStopTimer(timers->top().get());
|
||||
@ -53,7 +52,7 @@ uint64 GetCyclesTaken(
|
||||
|
||||
HloExecutionProfiler::HloExecutionProfiler(
|
||||
bool do_profile, HloExecutionProfile* profile, se::Stream* stream,
|
||||
const std::vector<Pool<se::Stream>::SmartPtr>& sub_streams,
|
||||
const std::vector<StreamPool::Ptr>& sub_streams,
|
||||
const HloComputation* computation)
|
||||
: do_profile_(do_profile),
|
||||
profile_(profile),
|
||||
|
@ -24,7 +24,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/xla/service/hlo_computation.h"
|
||||
#include "tensorflow/compiler/xla/service/hlo_execution_profile.h"
|
||||
#include "tensorflow/compiler/xla/service/hlo_instruction.h"
|
||||
#include "tensorflow/compiler/xla/service/pool.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/core/platform/stream_executor_no_cuda.h"
|
||||
|
||||
namespace xla {
|
||||
@ -38,10 +38,10 @@ class ScopedInstructionProfiler;
|
||||
class HloExecutionProfiler {
|
||||
public:
|
||||
// If profiling is enabled, start an execution timer running.
|
||||
explicit HloExecutionProfiler(
|
||||
bool do_profile, HloExecutionProfile* profile, se::Stream* stream,
|
||||
const std::vector<Pool<se::Stream>::SmartPtr>& sub_streams,
|
||||
const HloComputation* computation);
|
||||
explicit HloExecutionProfiler(bool do_profile, HloExecutionProfile* profile,
|
||||
se::Stream* stream,
|
||||
const std::vector<StreamPool::Ptr>& sub_streams,
|
||||
const HloComputation* computation);
|
||||
|
||||
// If profiling is enabled, sets the total cycle count on the profile from the
|
||||
// execution timer.
|
||||
@ -72,7 +72,7 @@ class HloExecutionProfiler {
|
||||
double clock_rate_ghz_;
|
||||
HloExecutionProfile* profile_;
|
||||
se::Stream* stream_;
|
||||
const std::vector<Pool<se::Stream>::SmartPtr>& sub_streams_;
|
||||
const std::vector<StreamPool::Ptr>& sub_streams_;
|
||||
const HloComputation* computation_;
|
||||
std::stack<std::unique_ptr<se::Timer>> timers_;
|
||||
// Contains the HLO instructions for which we are currently measuring the
|
||||
|
@ -1,84 +0,0 @@
|
||||
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
==============================================================================*/
|
||||
|
||||
#ifndef TENSORFLOW_COMPILER_XLA_POOL_H_
|
||||
#define TENSORFLOW_COMPILER_XLA_POOL_H_
|
||||
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
|
||||
#include "tensorflow/compiler/xla/ptr_util.h"
|
||||
#include "tensorflow/core/platform/mutex.h"
|
||||
|
||||
namespace xla {
|
||||
|
||||
// Pool of values, which are created as needed and destroyed when the `Pool` is
|
||||
// destroyed
|
||||
template <typename T>
|
||||
class Pool {
|
||||
public:
|
||||
struct Deleter {
|
||||
void operator()(T* ptr) { pool->Deallocate(ptr); }
|
||||
|
||||
Pool<T>* pool;
|
||||
};
|
||||
|
||||
// A pointer to a taken element of a `Pool` which returns it to the pool on
|
||||
// destruction
|
||||
using SmartPtr = std::unique_ptr<T, Deleter>;
|
||||
|
||||
// Constructs a `Pool` with given factory function, which need not be
|
||||
// thread-safe.
|
||||
explicit Pool(std::function<std::unique_ptr<T>()> factory)
|
||||
: factory_(factory) {}
|
||||
|
||||
explicit Pool() : Pool([]() { return MakeUnique<T>(); }) {}
|
||||
|
||||
// Returns a pointer to a value in the pool, creating a new value if none is
|
||||
// free. The returned smart pointer returns the element to the pool on
|
||||
// destruction.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
SmartPtr Allocate() {
|
||||
tensorflow::mutex_lock lock(mu_);
|
||||
T* ptr;
|
||||
if (!xs_.empty()) {
|
||||
ptr = std::move(xs_.back()).release();
|
||||
xs_.pop_back();
|
||||
} else {
|
||||
ptr = factory_().release();
|
||||
}
|
||||
Deleter del = {this};
|
||||
return std::unique_ptr<T, Deleter>(ptr, del);
|
||||
}
|
||||
|
||||
private:
|
||||
// Puts a pointer to a value back into the pool, leaving it free for future
|
||||
// use.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
void Deallocate(T* ptr) {
|
||||
tensorflow::mutex_lock lock(mu_);
|
||||
xs_.push_back(std::unique_ptr<T>(ptr));
|
||||
}
|
||||
|
||||
const std::function<std::unique_ptr<T>()> factory_ GUARDED_BY(mu_);
|
||||
std::vector<std::unique_ptr<T>> xs_ GUARDED_BY(mu_);
|
||||
tensorflow::mutex mu_;
|
||||
};
|
||||
|
||||
} // namespace xla
|
||||
|
||||
#endif // TENSORFLOW_COMPILER_XLA_POOL_H_
|
@ -1,40 +0,0 @@
|
||||
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
==============================================================================*/
|
||||
|
||||
#include "tensorflow/compiler/xla/service/pool.h"
|
||||
|
||||
#include "tensorflow/compiler/xla/test_helpers.h"
|
||||
|
||||
namespace xla {
|
||||
namespace {
|
||||
|
||||
using PoolTest = ::testing::Test;
|
||||
|
||||
TEST_F(PoolTest, Test) {
|
||||
Pool<int> pool;
|
||||
|
||||
{
|
||||
auto ptr = pool.Allocate();
|
||||
EXPECT_NE(nullptr, ptr.get());
|
||||
*ptr = 5;
|
||||
}
|
||||
|
||||
auto ptr = pool.Allocate();
|
||||
EXPECT_NE(nullptr, ptr.get());
|
||||
EXPECT_EQ(5, *ptr);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace xla
|
@ -37,6 +37,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/xla/service/hlo_proto_util.h"
|
||||
#include "tensorflow/compiler/xla/service/platform_util.h"
|
||||
#include "tensorflow/compiler/xla/service/source_map_util.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/compiler/xla/service/transfer_manager.h"
|
||||
#include "tensorflow/compiler/xla/shape_layout.h"
|
||||
#include "tensorflow/compiler/xla/shape_util.h"
|
||||
@ -376,7 +377,7 @@ Service::ExecuteParallelAndRegisterResult(
|
||||
ExecutionProfile* profile) {
|
||||
// Streams where the computation are launched, so we can wait on the streams
|
||||
// to complete.
|
||||
std::vector<Pool<se::Stream>::SmartPtr> streams;
|
||||
std::vector<StreamPool::Ptr> streams;
|
||||
std::vector<std::unique_ptr<se::Timer>> timers;
|
||||
|
||||
// Global data handles for the computation results, one for each computation.
|
||||
@ -403,7 +404,7 @@ Service::ExecuteParallelAndRegisterResult(
|
||||
CHECK_EQ(replicas.size(), arguments[i].size());
|
||||
std::vector<ScopedShapedBuffer> result_buffers;
|
||||
for (int64 replica = 0; replica < replicas.size(); ++replica) {
|
||||
TF_ASSIGN_OR_RETURN(Pool<se::Stream>::SmartPtr stream,
|
||||
TF_ASSIGN_OR_RETURN(StreamPool::Ptr stream,
|
||||
backend->BorrowStream(replicas[replica]));
|
||||
streams.push_back(std::move(stream));
|
||||
|
||||
@ -515,13 +516,13 @@ StatusOr<GlobalDataHandle> Service::ExecuteAndRegisterResult(
|
||||
arguments,
|
||||
Backend* backend, const string& result_tag, ExecutionProfile* profile) {
|
||||
// Set up streams.
|
||||
std::vector<Pool<se::Stream>::SmartPtr> streams;
|
||||
std::vector<StreamPool::Ptr> streams;
|
||||
|
||||
TF_ASSIGN_OR_RETURN(auto replicas,
|
||||
Replicas(*backend, SingleComputationDeviceHandle()));
|
||||
TF_RET_CHECK(!replicas.empty());
|
||||
for (se::StreamExecutor* executor : replicas) {
|
||||
TF_ASSIGN_OR_RETURN(Pool<se::Stream>::SmartPtr stream,
|
||||
TF_ASSIGN_OR_RETURN(StreamPool::Ptr stream,
|
||||
backend->BorrowStream(executor));
|
||||
streams.push_back(std::move(stream));
|
||||
}
|
||||
@ -533,7 +534,7 @@ StatusOr<GlobalDataHandle> Service::ExecuteAndRegisterResult(
|
||||
|
||||
// Set up run options.
|
||||
std::vector<ServiceExecutableRunOptions> run_options;
|
||||
for (const Pool<se::Stream>::SmartPtr& stream : streams) {
|
||||
for (const StreamPool::Ptr& stream : streams) {
|
||||
ExecutableRunOptions options;
|
||||
options.set_stream(stream.get());
|
||||
options.set_device_ordinal(stream->parent()->device_ordinal());
|
||||
|
@ -17,7 +17,7 @@ limitations under the License.
|
||||
#define TENSORFLOW_COMPILER_XLA_SERVICE_SERVICE_EXECUTABLE_RUN_OPTIONS_H_
|
||||
|
||||
#include "tensorflow/compiler/xla/executable_run_options.h"
|
||||
#include "tensorflow/compiler/xla/service/pool.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/compiler/xla/statusor.h"
|
||||
#include "tensorflow/stream_executor/stream_executor.h"
|
||||
|
||||
@ -27,8 +27,7 @@ namespace xla {
|
||||
// data, now only a stream cache for GPU backend.
|
||||
class ServiceExecutableRunOptions {
|
||||
public:
|
||||
using StreamBorrower =
|
||||
std::function<StatusOr<Pool<se::Stream>::SmartPtr>(int)>;
|
||||
using StreamBorrower = std::function<StatusOr<StreamPool::Ptr>(int)>;
|
||||
|
||||
ServiceExecutableRunOptions()
|
||||
: ServiceExecutableRunOptions(ExecutableRunOptions()) {}
|
||||
@ -51,7 +50,7 @@ class ServiceExecutableRunOptions {
|
||||
|
||||
// Borrows a stream and returns a smart pointer which returns the stream on
|
||||
// destruction.
|
||||
StatusOr<Pool<se::Stream>::SmartPtr> BorrowStream(int device_ordinal) const {
|
||||
StatusOr<StreamPool::Ptr> BorrowStream(int device_ordinal) const {
|
||||
return borrow_stream_
|
||||
? borrow_stream_(device_ordinal)
|
||||
: Status(tensorflow::error::UNIMPLEMENTED, "No stream cache");
|
||||
|
56
tensorflow/compiler/xla/service/stream_pool.cc
Normal file
56
tensorflow/compiler/xla/service/stream_pool.cc
Normal file
@ -0,0 +1,56 @@
|
||||
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
==============================================================================*/
|
||||
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
|
||||
#include "tensorflow/compiler/xla/ptr_util.h"
|
||||
|
||||
namespace xla {
|
||||
|
||||
StreamPool::Ptr StreamPool::BorrowStream(se::StreamExecutor* executor) {
|
||||
std::unique_ptr<se::Stream> stream;
|
||||
{
|
||||
tensorflow::mutex_lock lock(mu_);
|
||||
if (!streams_.empty()) {
|
||||
// Re-use an existing stream from the pool.
|
||||
stream = std::move(streams_.back());
|
||||
streams_.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
if (!stream) {
|
||||
// Create a new stream.
|
||||
stream = MakeUnique<se::Stream>(executor);
|
||||
stream->Init();
|
||||
}
|
||||
|
||||
// Return the stream wrapped in Ptr, which has our special deleter semantics.
|
||||
PtrDeleter deleter = {this};
|
||||
return Ptr(stream.release(), deleter);
|
||||
}
|
||||
|
||||
void StreamPool::ReturnStream(se::Stream* stream) {
|
||||
if (stream->ok()) {
|
||||
tensorflow::mutex_lock lock(mu_);
|
||||
streams_.emplace_back(stream);
|
||||
} else {
|
||||
// If the stream has encountered any errors, all subsequent
|
||||
// operations on it will fail. So just delete the stream, and rely
|
||||
// on new streams to be created in the future.
|
||||
delete stream;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace xla
|
64
tensorflow/compiler/xla/service/stream_pool.h
Normal file
64
tensorflow/compiler/xla/service/stream_pool.h
Normal file
@ -0,0 +1,64 @@
|
||||
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
==============================================================================*/
|
||||
|
||||
#ifndef TENSORFLOW_COMPILER_XLA_SERVICE_STREAM_POOL_H_
|
||||
#define TENSORFLOW_COMPILER_XLA_SERVICE_STREAM_POOL_H_
|
||||
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "tensorflow/compiler/xla/types.h"
|
||||
#include "tensorflow/core/platform/mutex.h"
|
||||
#include "tensorflow/core/platform/stream_executor_no_cuda.h"
|
||||
|
||||
namespace xla {
|
||||
|
||||
// Pool of stream_executor::Streams, which are created as needed and
|
||||
// destroyed when the pool is destroyed.
|
||||
class StreamPool {
|
||||
public:
|
||||
struct PtrDeleter {
|
||||
void operator()(se::Stream* stream) { pool->ReturnStream(stream); }
|
||||
StreamPool* pool;
|
||||
};
|
||||
|
||||
// Stream pointer type returned by BorrowStream, which returns the
|
||||
// stream to the pool on destruction.
|
||||
using Ptr = std::unique_ptr<se::Stream, PtrDeleter>;
|
||||
|
||||
StreamPool() {}
|
||||
|
||||
// Returns a pointer to a stream in the pool, creating a new stream
|
||||
// if none are available in the pool. The returned smart pointer
|
||||
// returns the stream to the pool on destruction.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
Ptr BorrowStream(se::StreamExecutor* executor);
|
||||
|
||||
private:
|
||||
// Puts a pointer to a stream back into the pool, leaving it free
|
||||
// for future use. Streams that have previously encountered errors
|
||||
// are deleted, and not returned to the pool.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
void ReturnStream(se::Stream* stream);
|
||||
|
||||
tensorflow::mutex mu_;
|
||||
std::vector<std::unique_ptr<se::Stream>> streams_ GUARDED_BY(mu_);
|
||||
};
|
||||
|
||||
} // namespace xla
|
||||
|
||||
#endif // TENSORFLOW_COMPILER_XLA_SERVICE_STREAM_POOL_H_
|
136
tensorflow/compiler/xla/service/stream_pool_test.cc
Normal file
136
tensorflow/compiler/xla/service/stream_pool_test.cc
Normal file
@ -0,0 +1,136 @@
|
||||
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
==============================================================================*/
|
||||
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "tensorflow/compiler/xla/test_helpers.h"
|
||||
#include "tensorflow/core/platform/stream_executor_no_cuda.h"
|
||||
|
||||
namespace xla {
|
||||
namespace {
|
||||
|
||||
class StreamPoolTest : public ::testing::Test {
|
||||
protected:
|
||||
std::unique_ptr<se::StreamExecutor> NewStreamExecutor() {
|
||||
se::Platform* platform =
|
||||
se::MultiPlatformManager::PlatformWithName("Host").ConsumeValueOrDie();
|
||||
se::StreamExecutorConfig config(/*ordinal=*/0);
|
||||
return platform->GetUncachedExecutor(config).ConsumeValueOrDie();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(StreamPoolTest, EmptyPool) { StreamPool pool; }
|
||||
|
||||
TEST_F(StreamPoolTest, OneStreamPool) {
|
||||
std::unique_ptr<se::StreamExecutor> executor = NewStreamExecutor();
|
||||
StreamPool pool;
|
||||
|
||||
// Borrow and return a stream.
|
||||
StreamPool::Ptr stream1 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream1_ptr = stream1.get();
|
||||
EXPECT_TRUE(stream1->ok());
|
||||
stream1 = nullptr;
|
||||
|
||||
// Borrow and return another stream.
|
||||
StreamPool::Ptr stream2 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream2_ptr = stream2.get();
|
||||
EXPECT_TRUE(stream2->ok());
|
||||
stream2 = nullptr;
|
||||
|
||||
// The underlying streams should be the same, since stream1 was the
|
||||
// only stream available in the pool when stream2 was borrowed.
|
||||
EXPECT_EQ(stream1_ptr, stream2_ptr);
|
||||
}
|
||||
|
||||
TEST_F(StreamPoolTest, TwoStreamPool) {
|
||||
std::unique_ptr<se::StreamExecutor> executor = NewStreamExecutor();
|
||||
StreamPool pool;
|
||||
|
||||
// Borrow two streams.
|
||||
StreamPool::Ptr stream1 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream1_ptr = stream1.get();
|
||||
EXPECT_TRUE(stream1->ok());
|
||||
StreamPool::Ptr stream2 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream2_ptr = stream2.get();
|
||||
EXPECT_TRUE(stream2->ok());
|
||||
|
||||
// The underlying streams should be different, since we haven't
|
||||
// returned either of them yet.
|
||||
EXPECT_NE(stream1_ptr, stream2_ptr);
|
||||
|
||||
// Return stream1 and borrow stream3.
|
||||
stream1 = nullptr;
|
||||
StreamPool::Ptr stream3 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream3_ptr = stream3.get();
|
||||
EXPECT_TRUE(stream3->ok());
|
||||
|
||||
// stream1 and stream3 should be the same.
|
||||
EXPECT_EQ(stream1_ptr, stream3_ptr);
|
||||
EXPECT_NE(stream2_ptr, stream3_ptr);
|
||||
|
||||
// Return stream2, and borrow stream4.
|
||||
stream2 = nullptr;
|
||||
StreamPool::Ptr stream4 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream4_ptr = stream4.get();
|
||||
EXPECT_TRUE(stream4->ok());
|
||||
|
||||
// Stream2 and stream4 should be the same.
|
||||
EXPECT_EQ(stream2_ptr, stream4_ptr);
|
||||
EXPECT_NE(stream3_ptr, stream4_ptr);
|
||||
}
|
||||
|
||||
TEST_F(StreamPoolTest, BadStreamDiscarded) {
|
||||
std::unique_ptr<se::StreamExecutor> executor = NewStreamExecutor();
|
||||
StreamPool pool;
|
||||
|
||||
// Borrow a stream.
|
||||
StreamPool::Ptr stream1 = pool.BorrowStream(executor.get());
|
||||
EXPECT_TRUE(stream1->ok());
|
||||
|
||||
// Force an error on the stream; here we call a method that requires
|
||||
// DNN support, which we know the Host platform doesn't support.
|
||||
stream1->ThenDepthConcatenate({}, {}, nullptr);
|
||||
EXPECT_FALSE(stream1->ok());
|
||||
|
||||
// Return stream1 and borrow stream2.
|
||||
stream1 = nullptr;
|
||||
StreamPool::Ptr stream2 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream2_ptr = stream2.get();
|
||||
EXPECT_TRUE(stream2->ok());
|
||||
|
||||
// The underlying streams should be different. They would have been
|
||||
// the same, but since we forced an error on stream1, it cannot be
|
||||
// put back into the pool. Sadly we can't just check:
|
||||
// EXPECT_NE(stream1_ptr, stream2_ptr);
|
||||
//
|
||||
// The above should hold logically, but it may fail if the new
|
||||
// stream instance allocated for stream2 happens to reside in the
|
||||
// same memory address as stream1, which has been deleted.
|
||||
//
|
||||
// The check that stream2->ok() serves as a good-enough check.
|
||||
|
||||
// Return stream2 and borrow stream3. The previous error on stream1
|
||||
// has no effect on these streams, and they are the same.
|
||||
stream2 = nullptr;
|
||||
StreamPool::Ptr stream3 = pool.BorrowStream(executor.get());
|
||||
se::Stream* stream3_ptr = stream3.get();
|
||||
EXPECT_TRUE(stream3->ok());
|
||||
EXPECT_EQ(stream2_ptr, stream3_ptr);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace xla
|
@ -379,6 +379,7 @@ xla_test(
|
||||
"//tensorflow/compiler/xla/client:xla_computation",
|
||||
"//tensorflow/compiler/xla/client/xla_client:xla_builder",
|
||||
"//tensorflow/compiler/xla/service:platform_util",
|
||||
"//tensorflow/compiler/xla/service:stream_pool",
|
||||
"//tensorflow/compiler/xla/tests:client_library_test_base",
|
||||
"//tensorflow/compiler/xla/tests:test_utils",
|
||||
"//tensorflow/core:lib",
|
||||
@ -2013,6 +2014,7 @@ xla_test(
|
||||
"//tensorflow/compiler/xla/service:device_memory_allocator",
|
||||
"//tensorflow/compiler/xla/service:generic_transfer_manager",
|
||||
"//tensorflow/compiler/xla/service:shaped_buffer",
|
||||
"//tensorflow/compiler/xla/service:stream_pool",
|
||||
"//tensorflow/core:lib",
|
||||
"//tensorflow/core:stream_executor_no_cuda",
|
||||
"//tensorflow/core:test",
|
||||
|
@ -22,6 +22,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/xla/service/device_memory_allocator.h"
|
||||
#include "tensorflow/compiler/xla/service/generic_transfer_manager.h"
|
||||
#include "tensorflow/compiler/xla/service/shaped_buffer.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/compiler/xla/shape_util.h"
|
||||
#include "tensorflow/compiler/xla/statusor.h"
|
||||
#include "tensorflow/compiler/xla/tests/literal_test_util.h"
|
||||
@ -60,7 +61,7 @@ class TransferManagerTest : public LocalClientTestBase {
|
||||
}
|
||||
|
||||
protected:
|
||||
Backend::StreamPtr stream_ptr_;
|
||||
StreamPool::Ptr stream_ptr_;
|
||||
se::Stream* stream_;
|
||||
|
||||
private:
|
||||
|
@ -22,6 +22,7 @@ limitations under the License.
|
||||
#include "tensorflow/compiler/xla/client/xla_computation.h"
|
||||
#include "tensorflow/compiler/xla/map_util.h"
|
||||
#include "tensorflow/compiler/xla/service/platform_util.h"
|
||||
#include "tensorflow/compiler/xla/service/stream_pool.h"
|
||||
#include "tensorflow/compiler/xla/shape_util.h"
|
||||
#include "tensorflow/compiler/xla/tests/client_library_test_base.h"
|
||||
#include "tensorflow/compiler/xla/tests/test_macros.h"
|
||||
@ -133,7 +134,7 @@ void ExecuteAndFetchProfile(string* profile_output, LocalClient* client,
|
||||
DeviceMemoryAllocator* allocator = backend->memory_allocator();
|
||||
auto* transfer_manager = backend->transfer_manager();
|
||||
TF_ASSERT_OK_AND_ASSIGN(
|
||||
Backend::StreamPtr stream_ptr,
|
||||
StreamPool::Ptr stream_ptr,
|
||||
backend->BorrowStream(backend->default_device_ordinal()));
|
||||
|
||||
TF_ASSERT_OK_AND_ASSIGN(
|
||||
|
Loading…
Reference in New Issue
Block a user