A. Unique TensorFlower c0141706a4 Internal build rule change.
PiperOrigin-RevId: 345134366
Change-Id: Idfa2b6983b3a9aaeaaa2db4a8e62b73c2533bf0c
2020-12-01 17:46:15 -08:00

471 lines
20 KiB
C++

/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#define EIGEN_USE_THREADS
#include "tensorflow/compiler/xla/service/hlo_runner.h"
#include <string>
#include <utility>
#include "absl/memory/memory.h"
#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
#include "tensorflow/compiler/xla/layout_util.h"
#include "tensorflow/compiler/xla/service/executable.h"
#include "tensorflow/compiler/xla/service/hlo_module_group.h"
#include "tensorflow/compiler/xla/service/hlo_parser.h"
#include "tensorflow/compiler/xla/service/transfer_manager.h"
#include "tensorflow/compiler/xla/shape.h"
#include "tensorflow/compiler/xla/shape_util.h"
#include "tensorflow/core/lib/core/blocking_counter.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/types.h"
namespace xla {
HloRunner::HloRunner(se::Platform* platform, int intra_op_parallelism_threads) {
BackendOptions backend_options;
backend_options.set_platform(platform);
backend_options.set_intra_op_parallelism_threads(
intra_op_parallelism_threads);
backend_ = Backend::CreateBackend(backend_options).ConsumeValueOrDie();
VLOG(1) << "Created HloRunner for platform: " << platform->Name();
}
HloRunner::~HloRunner() {}
StatusOr<ScopedShapedBuffer> HloRunner::TransferLiteralToDevice(
const Literal& literal) {
TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
backend().transfer_manager()->AllocateScopedShapedBuffer(
literal.shape(), backend().memory_allocator(),
backend().default_device_ordinal()));
TF_ASSIGN_OR_RETURN(
auto stream, backend().BorrowStream(backend().default_stream_executor()));
TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
stream.get(), literal, buffer));
return std::move(buffer);
}
StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
absl::Span<const Literal* const> literals) {
std::vector<ScopedShapedBuffer> buffers;
for (const Literal* literal : literals) {
CHECK(literal != nullptr);
TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
TransferLiteralToDevice(*literal));
buffers.push_back(std::move(buffer));
}
return std::move(buffers);
}
StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
absl::Span<const Literal> literals) {
std::vector<const Literal*> literal_pointers;
literal_pointers.reserve(literals.size());
for (const auto& literal : literals) {
literal_pointers.push_back(&literal);
}
return TransferLiteralsToDevice(literal_pointers);
}
StatusOr<Literal> HloRunner::TransferLiteralFromDevice(
const ShapedBuffer& buffer) {
TF_ASSIGN_OR_RETURN(
auto stream, backend().BorrowStream(backend().default_stream_executor()));
return backend().transfer_manager()->TransferLiteralFromDevice(stream.get(),
buffer);
}
StatusOr<Literal> HloRunner::Execute(std::unique_ptr<HloModule> module,
absl::Span<const Literal* const> arguments,
bool run_hlo_passes,
ExecutionProfile* profile) {
TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
TransferLiteralsToDevice(arguments));
TF_ASSIGN_OR_RETURN(ExecutionOutput result,
ExecuteWithDeviceBuffers(
/*module=*/std::move(module),
/*arguments=*/argument_buffers,
/*run_hlo_passes=*/run_hlo_passes,
/*profile=*/profile));
return TransferLiteralFromDevice(result.Result());
}
StatusOr<Literal> HloRunner::ExecuteWithExecutable(
std::unique_ptr<Executable> executable,
absl::Span<const Literal* const> arguments, ExecutionProfile* profile) {
TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
TransferLiteralsToDevice(arguments));
TF_ASSIGN_OR_RETURN(ExecutionOutput result,
ExecuteWithDeviceBuffers(
/*executable=*/executable.get(),
/*arguments=*/argument_buffers,
/*profile=*/profile));
return TransferLiteralFromDevice(result.Result());
}
// Convert the owning buffer of inputs into a (partially) owning vector of
// ExecutionInputs, and an owning vector of `OwningDeviceMemory`'s.
static std::vector<ExecutionInput> ExecutionInputsFromScopedShapedBuffers(
absl::Span<ScopedShapedBuffer const> inputs,
HloInputOutputAliasConfig alias_config, int device_ordinal,
se::DeviceMemoryAllocator* allocator) {
std::vector<ExecutionInput> execution_inputs;
std::vector<se::OwningDeviceMemory> owned_args;
for (int param_num = 0; param_num < inputs.size(); param_num++) {
const ScopedShapedBuffer& input_buffer = inputs[param_num];
ShapeTree<MaybeOwningDeviceMemory> buffer_tree(
input_buffer.on_device_shape());
input_buffer.buffers().ForEachElement(
[&](const ShapeIndex& index,
const se::DeviceMemoryBase& execution_input_buffer) {
if (alias_config.ParameterHasAlias(param_num, index)) {
// Store owned.
*buffer_tree.mutable_element(index) = se::OwningDeviceMemory{
execution_input_buffer, device_ordinal, allocator};
} else {
// Store unowned.
*buffer_tree.mutable_element(index) = execution_input_buffer;
}
});
execution_inputs.emplace_back(std::move(buffer_tree));
}
return execution_inputs;
}
StatusOr<ExecutionOutput> HloRunner::ExecuteWithDeviceBuffers(
std::unique_ptr<HloModule> module,
absl::Span<ScopedShapedBuffer const> arguments, bool run_hlo_passes,
ExecutionProfile* profile) {
TF_ASSIGN_OR_RETURN(std::unique_ptr<Executable> executable,
CreateExecutable(std::move(module), run_hlo_passes));
return ExecuteWithDeviceBuffers(executable.get(), arguments, profile);
}
StatusOr<ExecutionOutput> HloRunner::ExecuteWithDeviceBuffers(
Executable* executable, absl::Span<ScopedShapedBuffer const> arguments,
ExecutionProfile* profile) {
// Get service run options.
se::Stream stream(backend().default_stream_executor());
stream.Init();
ServiceExecutableRunOptions service_run_options =
GetServiceRunOptionsForDevice(backend().default_device_ordinal(), &stream,
nullptr, RunId());
service_run_options.mutable_run_options()->set_execution_profile(profile);
std::vector<ExecutionInput> execution_arguments =
ExecutionInputsFromScopedShapedBuffers(
arguments, executable->module().input_output_alias_config(),
stream.parent()->device_ordinal(), stream.parent()->GetAllocator());
TF_ASSIGN_OR_RETURN(
ExecutionOutput retval,
executable->ExecuteOnStreamWrapper(&service_run_options,
std::move(execution_arguments)));
TF_RETURN_IF_ERROR(stream.BlockHostUntilDone());
return std::move(retval);
}
StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
std::unique_ptr<HloModule> module, const ReplicatedExecuteOptions& options,
DeviceAssignment* device_assignment) {
TF_ASSIGN_OR_RETURN(
std::unique_ptr<Executable> executable,
CreateExecutable(std::move(module), options.run_hlo_passes));
return ExecuteReplicated(executable.get(), options, device_assignment);
}
StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicatedImpl(
std::function<StatusOr<std::vector<ScopedShapedBuffer>>(
const std::vector<ServiceExecutableRunOptions>&,
const std::vector<absl::Span<const ShapedBuffer* const>>&)>
execution_helper,
std::function<int64(int64)> argument_count_provider,
std::function<const Literal*(int64, int64)> argument_provider,
const ReplicatedExecuteOptions& options,
DeviceAssignment* device_assignment) {
std::vector<std::unique_ptr<se::Stream>> streams;
std::vector<ServiceExecutableRunOptions> service_run_options;
std::vector<ScopedShapedBuffer> argument_buffers;
// This reserve() call is necessary for correctness, because
// argument_buffer_ptrs contains pointers into the elements of
// argument_buffers.
const int64 total_argument_count = [&]() {
int64 total = 0;
for (int64 i = 0; i < options.num_replicas; ++i) {
total += argument_count_provider(i);
}
return total;
}();
argument_buffers.reserve(total_argument_count);
// Plus one so we can safely get &argument_buffer_ptrs[0] in case there are
// no arguments.
std::vector<const ShapedBuffer*> argument_buffer_ptrs(total_argument_count +
1);
std::vector<absl::Span<const ShapedBuffer* const>> argument_buffer_slices;
int64 index = 0;
RunId run_id;
for (int64 i = 0; i < options.num_replicas; ++i) {
int64 device = (*device_assignment)(i, 0);
TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor,
backend().stream_executor(device));
streams.push_back(absl::make_unique<se::Stream>(executor));
streams.back()->Init();
service_run_options.emplace_back(GetServiceRunOptionsForDevice(
device, streams.back().get(), device_assignment, run_id));
// Copy arguments to device.
const int64 argument_count = argument_count_provider(i);
for (int64 arg_index = 0; arg_index < argument_count; arg_index++) {
const Literal* const argument = argument_provider(i, arg_index);
TF_RET_CHECK(argument != nullptr);
TF_ASSIGN_OR_RETURN(
ScopedShapedBuffer argument_buffer,
backend().transfer_manager()->AllocateScopedShapedBuffer(
argument->shape(), backend().memory_allocator(), device));
TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
streams.back().get(), *argument, argument_buffer));
argument_buffers.push_back(std::move(argument_buffer));
argument_buffer_ptrs[index++] = &argument_buffers.back();
}
argument_buffer_slices.emplace_back(
&argument_buffer_ptrs[index - argument_count], argument_count);
}
std::unique_ptr<tensorflow::thread::ThreadPool> pool;
int64 num_threads = (options.infeed != nullptr) ? options.num_replicas : 0;
if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
num_threads += options.num_replicas;
}
if (num_threads > 0) {
pool = absl::make_unique<tensorflow::thread::ThreadPool>(
tensorflow::Env::Default(), "infeed_outfeed",
/*num_threads=*/num_threads);
}
if (options.infeed != nullptr) {
for (int64 i = 0; i < options.num_replicas; ++i) {
int64 device = (*device_assignment)(i, 0);
pool->Schedule([this, device, &options]() {
se::StreamExecutor* executor =
backend().stream_executor(device).ValueOrDie();
VLOG(1) << "Starting infeed on device " << device;
for (int64 step = 1;
options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
TF_CHECK_OK(backend().transfer_manager()->TransferLiteralToInfeed(
executor, *options.infeed));
if (step % 100 == 0) {
VLOG(1) << "Infeed step " << step;
}
}
});
}
}
if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
for (int64 i = 0; i < options.num_replicas; ++i) {
int64 device = (*device_assignment)(i, 0);
pool->Schedule([this, device, &options]() {
se::StreamExecutor* executor =
backend().stream_executor(device).ValueOrDie();
VLOG(1) << "Starting outfeed on device " << device;
for (int64 step = 1;
options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
Literal literal;
TF_CHECK_OK(backend().transfer_manager()->TransferLiteralFromOutfeed(
executor, options.outfeed_shape, &literal));
if (options.outfeed_values != nullptr) {
options.outfeed_values->push_back(std::move(literal));
}
if (step % 100 == 0) {
VLOG(1) << "Outfeed step " << step;
}
}
});
}
}
LOG(INFO) << "Replicated execution started";
TF_ASSIGN_OR_RETURN(
std::vector<ScopedShapedBuffer> results,
execution_helper(service_run_options, argument_buffer_slices));
LOG(INFO) << "Replicated execution terminated";
std::vector<Literal> exec_results;
for (int64 i = 0; i < options.num_replicas; ++i) {
TF_RETURN_IF_ERROR(streams[i]->BlockHostUntilDone());
TF_ASSIGN_OR_RETURN(Literal literal,
backend().transfer_manager()->TransferLiteralFromDevice(
streams[i].get(), results[i]));
exec_results.push_back(std::move(literal));
}
return std::move(exec_results);
}
StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
Executable* executable, const ReplicatedExecuteOptions& options,
DeviceAssignment* device_assignment, ExecutionProfile* profile) {
return ExecuteReplicatedImpl(
[&](const std::vector<ServiceExecutableRunOptions>& service_run_options,
const std::vector<absl::Span<const ShapedBuffer* const>>&
argument_buffer_slices)
-> StatusOr<std::vector<ScopedShapedBuffer>> {
std::vector<ScopedShapedBuffer> results;
if (!options.use_threads) {
TF_ASSIGN_OR_RETURN(
results, executable->ExecuteOnStreams(service_run_options,
argument_buffer_slices));
} else {
tensorflow::mutex mutex;
std::vector<StatusOr<ScopedShapedBuffer>> thread_results(
options.num_replicas);
{
LOG(INFO) << "Creating thread pool for " << options.num_replicas
<< " replicas";
tensorflow::thread::ThreadPool pool(
tensorflow::Env::Default(), "replicas", options.num_replicas);
for (int64 i = 0; i < options.num_replicas; ++i) {
pool.Schedule([&, i] {
auto result = executable->ExecuteOnStream(
&service_run_options[i], argument_buffer_slices[i],
nullptr);
tensorflow::mutex_lock lock(mutex);
thread_results[i] = std::move(result);
});
}
// Note: the thread pool destructor guarantees it completes all work
// before we leave this scope.
}
for (auto& thread_result : thread_results) {
if (!thread_result.ok()) {
return thread_result.status();
}
results.push_back(std::move(thread_result).ValueOrDie());
}
}
return results;
},
[&](int64 replica) { return options.arguments.size(); },
[&](int64 replica, int64 index) { return options.arguments[index]; },
options, device_assignment);
}
StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
std::function<Executable*(int64)> executable_provider,
std::function<int64(int64)> argument_count_provider,
std::function<const Literal*(int64, int64)> argument_provider,
const ReplicatedExecuteOptions& options) {
TF_ASSIGN_OR_RETURN(
DeviceAssignment device_assignment,
backend().computation_placer()->AssignDevices(options.num_replicas, 1));
return ExecuteReplicatedImpl(
[&](const std::vector<ServiceExecutableRunOptions>& service_run_options,
const std::vector<absl::Span<const ShapedBuffer* const>>&
argument_buffer_slices)
-> StatusOr<std::vector<ScopedShapedBuffer>> {
TF_RET_CHECK(options.use_threads);
std::vector<ScopedShapedBuffer> results;
tensorflow::mutex mutex;
std::vector<StatusOr<ScopedShapedBuffer>> thread_results(
options.num_replicas);
{
LOG(INFO) << "Creating thread pool for " << options.num_replicas
<< " replicas";
tensorflow::thread::ThreadPool pool(tensorflow::Env::Default(),
"replicas", options.num_replicas);
for (int64 i = 0; i < options.num_replicas; ++i) {
for (const auto& arg : argument_buffer_slices[i]) {
TF_RET_CHECK(arg != nullptr);
}
pool.Schedule([&, i] {
auto result = executable_provider(i)->ExecuteOnStream(
&service_run_options[i], argument_buffer_slices[i], nullptr);
tensorflow::mutex_lock lock(mutex);
thread_results[i] = std::move(result);
});
}
// Note: the thread pool destructor guarantees it completes all work
// before we leave this scope.
}
for (auto& thread_result : thread_results) {
if (!thread_result.ok()) {
return thread_result.status();
}
results.push_back(std::move(thread_result).ValueOrDie());
}
return results;
},
argument_count_provider, argument_provider, options, &device_assignment);
}
StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
std::unique_ptr<HloModule> module,
const ReplicatedExecuteOptions& options) {
TF_ASSIGN_OR_RETURN(
DeviceAssignment device_assignment,
backend().computation_placer()->AssignDevices(options.num_replicas, 1));
return ExecuteReplicated(std::move(module), options, &device_assignment);
}
StatusOr<std::unique_ptr<Executable>> HloRunner::CreateExecutable(
std::unique_ptr<HloModule> module, bool run_hlo_passes) {
if (run_hlo_passes) {
auto module_group = absl::make_unique<HloModuleGroup>(std::move(module));
TF_ASSIGN_OR_RETURN(
auto executables,
backend().compiler()->Compile(std::move(module_group),
{{backend().default_stream_executor()}},
backend().memory_allocator()));
return std::move(executables[0]);
}
return backend().compiler()->RunBackend(std::move(module),
backend().default_stream_executor(),
backend().memory_allocator());
}
ServiceExecutableRunOptions HloRunner::GetServiceRunOptionsForDevice(
int64 device, se::Stream* stream, DeviceAssignment* device_assignment,
RunId run_id) {
ExecutableRunOptions run_options;
run_options.set_device_ordinal(device);
run_options.set_stream(stream);
run_options.set_allocator(backend().memory_allocator());
run_options.set_intra_op_thread_pool(
backend().eigen_intra_op_thread_pool_device());
if (device_assignment != nullptr) {
run_options.set_device_assignment(device_assignment);
}
run_options.set_run_id(run_id);
return ServiceExecutableRunOptions(run_options, backend().StreamBorrower());
}
Backend& HloRunner::backend() {
if (!backend_) {
backend_ = Backend::CreateDefaultBackend().ConsumeValueOrDie();
VLOG(1) << "Executing on platform " << backend().platform()->Name();
}
return *backend_;
}
const Backend& HloRunner::backend() const {
return const_cast<HloRunner*>(this)->backend();
}
} // namespace xla