451 lines
18 KiB
C++
451 lines
18 KiB
C++
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
==============================================================================*/
|
|
|
|
#include "tensorflow/compiler/xrt/xrt_util.h"
|
|
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include "tensorflow/compiler/xla/debug_options_flags.h"
|
|
#include "tensorflow/compiler/xla/types.h"
|
|
#include "tensorflow/core/platform/logging.h"
|
|
#include "tensorflow/core/platform/mutex.h"
|
|
|
|
namespace tensorflow {
|
|
namespace {
|
|
|
|
mutex nccl_factory_mutex(LINKER_INITIALIZED);
|
|
std::shared_ptr<NcclUniqueIdFactory>* nccl_factory;
|
|
|
|
// The ScopedHandles data structure is used in the ExecuteChained() API and its
|
|
// task is to track tuple allocation registrations. It is used both the track
|
|
// intermediate results of a chained computation, or its final results. Anything
|
|
// which is marked to be released, will be released using the XRTMemoryManager
|
|
// once the object is destroyed (unless an explicit call to Drop() or Release()
|
|
// is made).
|
|
class ScopedHandles {
|
|
public:
|
|
explicit ScopedHandles(RefPtr<XRTMemoryManager> memory_manager)
|
|
: memory_manager_(std::move(memory_manager)) {}
|
|
|
|
~ScopedHandles() {
|
|
for (size_t i = 0; i < handles_.size(); ++i) {
|
|
if (handles_release_[i]) {
|
|
memory_manager_->Release(handles_[i]).IgnoreError();
|
|
}
|
|
}
|
|
}
|
|
|
|
int64 operator[](size_t index) const { return handles_.at(index); }
|
|
|
|
size_t size() const { return handles_.size(); }
|
|
|
|
// Adds the given handle at the index position, by marking it releasable
|
|
// according to the release argument. If an existing, and to-be-released
|
|
// handle already exists at the same index, it will be released.
|
|
Status Add(size_t index, int64 handle, bool release) {
|
|
if (index >= handles_.size()) {
|
|
handles_.resize(index + 1, XRTMemoryManager::InvalidKey());
|
|
handles_release_.resize(index + 1, false);
|
|
}
|
|
if (handles_release_[index]) {
|
|
Status status = memory_manager_->Release(handles_[index]);
|
|
if (!status.ok()) {
|
|
if (release) {
|
|
memory_manager_->Release(handle).IgnoreError();
|
|
}
|
|
return status;
|
|
}
|
|
}
|
|
handles_[index] = handle;
|
|
handles_release_[index] = release;
|
|
return Status::OK();
|
|
}
|
|
|
|
// Adds a to-be-released tuple allocation at the given index.
|
|
Status Add(size_t index, RefPtr<XRTTupleAllocation> tuple) {
|
|
return Add(index, memory_manager_->Register(std::move(tuple)),
|
|
/*release=*/true);
|
|
}
|
|
|
|
// Drops the handle at the given index, and releases it using the
|
|
// XRTMemoryManager::Release() if marked as to-be-released.
|
|
Status Drop(size_t index) {
|
|
if (handles_release_.at(index)) {
|
|
TF_RETURN_IF_ERROR(memory_manager_->Release(handles_[index]));
|
|
}
|
|
Release(index);
|
|
return Status::OK();
|
|
}
|
|
|
|
// Releases the handle at the given index. The destructor will not use that
|
|
// XRTMemoryManager::Release() API on such handle.
|
|
int64 Release(size_t index) {
|
|
int64 handle = handles_.at(index);
|
|
handles_[index] = XRTMemoryManager::InvalidKey();
|
|
handles_release_[index] = false;
|
|
return handle;
|
|
}
|
|
|
|
// Looks up the handle stored at the given index, and returns the matching
|
|
// tuple allocation.
|
|
xla::StatusOr<RefPtr<XRTTupleAllocation>> Lookup(size_t index) const {
|
|
return memory_manager_->Lookup(handles_.at(index));
|
|
}
|
|
|
|
private:
|
|
RefPtr<XRTMemoryManager> memory_manager_;
|
|
std::vector<int64> handles_;
|
|
std::vector<bool> handles_release_;
|
|
};
|
|
|
|
bool DebugOptionsPassThroughEnabled() {
|
|
const char* env = getenv("TF_XLA_DEBUG_OPTIONS_PASSTHROUGH");
|
|
bool enabled =
|
|
env != nullptr && (strcmp(env, "1") == 0 || strcmp(env, "true") == 0);
|
|
if (enabled) {
|
|
LOG(WARNING) << "Passing through XLA debug options!";
|
|
} else {
|
|
LOG(WARNING) << "TF_XLA_DEBUG_OPTIONS_PASSTHROUGH not set, not all options "
|
|
"will be retained";
|
|
}
|
|
return enabled;
|
|
}
|
|
|
|
string SafeDebugPath(const string& path) {
|
|
if (path.empty() || path.compare(0, 5, "gs://") == 0 ||
|
|
path.compare(0, 11, "bigstore://") == 0) {
|
|
return path;
|
|
}
|
|
LOG(WARNING) << "Invalid config path (will be dropped): " << path;
|
|
return string();
|
|
}
|
|
|
|
Status MakeOutput(const RefPtr<XRTTupleAllocation>& output, int64 index,
|
|
RefPtr<XRTTupleAllocation>* result) {
|
|
if (index == 0) {
|
|
*result = output;
|
|
} else {
|
|
XRTTupleAllocation* tuple;
|
|
TF_RETURN_IF_ERROR(
|
|
XRTTupleAllocation::MakeSubBuffer(output.get(), {index - 1}, &tuple,
|
|
/*alias_parent_allocation=*/true));
|
|
result->reset(tuple);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status PopulateOpWorkingSet(xla::Backend* backend,
|
|
const xrt::XRTChainedExecuteOp& op,
|
|
int current_index, const ScopedHandles& outputs,
|
|
XRTMemoryManager::WorkingSet* working_set) {
|
|
for (int i = 0; i < op.inputs_size(); ++i) {
|
|
auto& input = op.inputs(i);
|
|
if (input.op_index() >= current_index) {
|
|
return errors::InvalidArgument(
|
|
"Input index ", input.op_index(),
|
|
" is above the current position: ", current_index);
|
|
}
|
|
TF_RETURN_IF_ERROR(
|
|
working_set->LookupAndPin(backend, outputs[input.op_index()]));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace
|
|
|
|
void SetNcclUniqueIdFactory(std::shared_ptr<NcclUniqueIdFactory> factory) {
|
|
mutex_lock lock(nccl_factory_mutex);
|
|
if (nccl_factory == nullptr) {
|
|
nccl_factory = new std::shared_ptr<NcclUniqueIdFactory>();
|
|
}
|
|
*nccl_factory = std::move(factory);
|
|
}
|
|
|
|
std::shared_ptr<NcclUniqueIdFactory> GetNcclUniqueIdFactory() {
|
|
mutex_lock lock(nccl_factory_mutex);
|
|
return nccl_factory != nullptr ? *nccl_factory : nullptr;
|
|
}
|
|
|
|
xla::DebugOptions BuildXlaDebugOptions(const xla::DebugOptions& ref_options) {
|
|
static const bool options_passthrough = DebugOptionsPassThroughEnabled();
|
|
if (options_passthrough) {
|
|
return ref_options;
|
|
}
|
|
xla::DebugOptions options = xla::GetDebugOptionsFromFlags();
|
|
options.set_xla_dump_to(SafeDebugPath(ref_options.xla_dump_to()));
|
|
options.set_xla_dump_hlo_as_proto(ref_options.xla_dump_hlo_as_proto());
|
|
options.set_xla_dump_hlo_as_text(ref_options.xla_dump_hlo_as_text());
|
|
options.set_xla_dump_hlo_snapshots(ref_options.xla_dump_hlo_snapshots());
|
|
options.set_xla_dump_hlo_pass_re(ref_options.xla_dump_hlo_pass_re());
|
|
options.set_xla_dump_include_timestamp(
|
|
ref_options.xla_dump_include_timestamp());
|
|
options.set_xla_dump_max_hlo_modules(ref_options.xla_dump_max_hlo_modules());
|
|
for (auto& pass : ref_options.xla_disable_hlo_passes()) {
|
|
options.add_xla_disable_hlo_passes(pass);
|
|
}
|
|
return options;
|
|
}
|
|
|
|
xla::StatusOr<std::vector<InputCoords>> GetComputationInputs(
|
|
OpKernelContext* context, const char* input_name) {
|
|
OpInputList arg_list;
|
|
TF_RETURN_IF_ERROR(context->input_list(input_name, &arg_list));
|
|
// Concatenate all input uids from list of scalars-or-vectors carrying them.
|
|
std::vector<InputCoords> input_coords;
|
|
for (int i = 0; i < arg_list.size(); ++i) {
|
|
const Tensor& arg = arg_list[i];
|
|
if (TensorShapeUtils::IsScalar(arg.shape())) {
|
|
input_coords.emplace_back(arg.scalar<int64>()());
|
|
} else {
|
|
TF_RET_CHECK(TensorShapeUtils::IsVector(arg.shape()));
|
|
auto arg_vec = arg.vec<int64>();
|
|
const int64 num_elts = arg.shape().dim_size(0);
|
|
for (int i = 0; i < num_elts; ++i) {
|
|
input_coords.emplace_back(arg_vec(i));
|
|
}
|
|
}
|
|
}
|
|
return std::move(input_coords);
|
|
}
|
|
|
|
bool InputShapeMatches(const xla::Shape& parameter_shape,
|
|
const xla::Shape& input_shape) {
|
|
auto shape_checker = [&](const xla::Shape& pshape,
|
|
const xla::ShapeIndex& index) {
|
|
if (pshape.IsArray()) {
|
|
TF_ASSIGN_OR_RETURN(const xla::Shape* ishape,
|
|
xla::ShapeUtil::TryGetSubshape(input_shape, index));
|
|
if (pshape.rank() != ishape->rank() ||
|
|
pshape.element_type() != ishape->element_type()) {
|
|
return errors::InvalidArgument("Mismatching shapes");
|
|
}
|
|
if (pshape.is_static() && pshape.layout() != ishape->layout()) {
|
|
return errors::InvalidArgument("Mismatching layouts");
|
|
}
|
|
for (int64 dim = 0; dim < pshape.rank(); ++dim) {
|
|
if (pshape.is_dynamic_dimension(dim)) {
|
|
if (pshape.dimensions(dim) < ishape->dimensions(dim)) {
|
|
return errors::InvalidArgument("Mismatching shapes");
|
|
}
|
|
} else if (pshape.dimensions(dim) != ishape->dimensions(dim)) {
|
|
return errors::InvalidArgument("Mismatching shapes");
|
|
}
|
|
}
|
|
}
|
|
return Status::OK();
|
|
};
|
|
return xla::ShapeUtil::ForEachSubshapeWithStatus(parameter_shape,
|
|
shape_checker)
|
|
.ok();
|
|
}
|
|
|
|
xla::StatusOr<std::vector<RefPtr<XRTTupleAllocation>>> GetInputTupleAllocations(
|
|
const std::vector<InputCoords>& input_coords,
|
|
XRTMemoryManager::WorkingSet* working_set, xla::Backend* backend,
|
|
int64 num_input_shapes,
|
|
const std::function<xla::Shape(int64)>& shape_getter, bool release_inputs) {
|
|
if (input_coords.size() != num_input_shapes) {
|
|
return errors::InvalidArgument(
|
|
"Number of inputs does not match executable proto input shapes: ",
|
|
input_coords.size(), " vs. ", num_input_shapes);
|
|
}
|
|
std::vector<RefPtr<XRTTupleAllocation>> input_tuples;
|
|
input_tuples.reserve(input_coords.size());
|
|
for (size_t i = 0; i < input_coords.size(); ++i) {
|
|
TF_RETURN_IF_ERROR(
|
|
working_set->LookupAndPin(backend, input_coords[i].handle));
|
|
auto tuple = working_set->PinnedTuples().back();
|
|
if (release_inputs) {
|
|
// We are holding a reference to the tuple, so we can safely delete it
|
|
// from the resource manager here.
|
|
TF_RETURN_IF_ERROR(
|
|
working_set->MemoryManager()->Release(input_coords[i].handle));
|
|
VLOG(2) << "Released allocation handle " << input_coords[i].handle;
|
|
}
|
|
xla::Shape input_shape = shape_getter(i);
|
|
if (!InputShapeMatches(input_shape, tuple->on_host_shape())) {
|
|
return errors::InvalidArgument(
|
|
"Run-time shape mismatch for XRTExecute argument[", i, "] (",
|
|
input_coords[i].handle, "). Expected ", input_shape.DebugString(),
|
|
"; got ", tuple->on_host_shape().DebugString());
|
|
}
|
|
if (input_coords[i].index.empty()) {
|
|
input_tuples.emplace_back(std::move(tuple));
|
|
} else {
|
|
XRTTupleAllocation* sub_tuple;
|
|
TF_RETURN_IF_ERROR(XRTTupleAllocation::MakeSubBuffer(
|
|
tuple.get(), input_coords[i].index, &sub_tuple,
|
|
/*alias_parent_allocation=*/true));
|
|
input_tuples.emplace_back(sub_tuple);
|
|
}
|
|
}
|
|
return std::move(input_tuples);
|
|
}
|
|
|
|
Status RebuildOutputAliases(
|
|
const RefPtr<XRTTupleAllocation>& output_tuple,
|
|
absl::Span<const RefPtr<XRTTupleAllocation>> input_tuples,
|
|
const xla::HloInputOutputAliasConfig& input_output_alias) {
|
|
auto alias_function =
|
|
[&](const xla::ShapeIndex& output_index,
|
|
const xla::HloInputOutputAliasConfig::Alias& alias) -> Status {
|
|
TF_RET_CHECK(alias.parameter_number < input_tuples.size());
|
|
return alias.kind == xla::HloInputOutputAliasConfig::AliasKind::kUserAlias
|
|
? output_tuple->AliasBufferFrom(
|
|
*input_tuples[alias.parameter_number],
|
|
alias.parameter_index, output_index)
|
|
: Status::OK();
|
|
};
|
|
return input_output_alias.ForEachAliasWithStatus(alias_function);
|
|
}
|
|
|
|
xla::StatusOr<std::vector<xla::ExecutionInput>> GetArgumentsBuffers(
|
|
const xla::HloInputOutputAliasConfig& input_output_alias,
|
|
absl::Span<const RefPtr<XRTTupleAllocation>> input_tuples,
|
|
const std::vector<bool>& input_is_dynamic, bool release_inputs) {
|
|
auto is_dynamic = [&](size_t arg) {
|
|
return arg < input_is_dynamic.size() && input_is_dynamic[arg];
|
|
};
|
|
std::vector<xla::ExecutionInput> arguments;
|
|
// Don't alias dynamic input -- Due to the underlying implementation,
|
|
// aliased inputs have two owners: XRTAllocation and return value of
|
|
// this function. If an argument is dynamic and the ownership is
|
|
// released to output of this function, TPUExecute will free it and
|
|
// reallocate a new one, which creates a double freeing issue where
|
|
// XRTAllocation also attempts to release the buffer.
|
|
bool alias_outputs = release_inputs && input_tuples.size() == 1 &&
|
|
input_tuples[0]->IsExclusiveOwner() && !is_dynamic(0);
|
|
arguments.reserve(input_tuples.size());
|
|
for (int64 i = 0; i < input_tuples.size(); ++i) {
|
|
auto alias_checker =
|
|
[&](const xla::ShapeIndex& index) -> xla::StatusOr<bool> {
|
|
// Only the buffers which the caller explicitly marked as aliased
|
|
// (kUserAlias), should create aliases.
|
|
// The XLA compiler might create opportunistic aliases (kSystemAlias)
|
|
// which need a different handling. With a system alias we know that XLA
|
|
// is going to reuse a given input parameter buffer for a given output, so
|
|
// unless it is known at call site that the input buffer has no more uses,
|
|
// a copy needs to be made at call site. With user specified alias the
|
|
// caller tells us that he expects a given output to land over the buffers
|
|
// of a given parametter.
|
|
if (input_output_alias.ParameterAliasKind(i, index) ==
|
|
xla::HloInputOutputAliasConfig::AliasKind::kUserAlias) {
|
|
TF_RET_CHECK(!is_dynamic(i));
|
|
return true;
|
|
}
|
|
return alias_outputs;
|
|
};
|
|
TF_ASSIGN_OR_RETURN(xla::ExecutionInput exec_input,
|
|
input_tuples[i]->ToExecutionInput(alias_checker));
|
|
arguments.emplace_back(std::move(exec_input));
|
|
}
|
|
return std::move(arguments);
|
|
}
|
|
|
|
Status CreateExecuteOutput(OpKernelContext* context,
|
|
XRTMemoryManager* memory_manager,
|
|
RefPtr<XRTTupleAllocation> output_tuple,
|
|
bool return_exploded_tuple) {
|
|
if (return_exploded_tuple && output_tuple->on_host_shape().IsTuple()) {
|
|
int64 tuple_element_count =
|
|
xla::ShapeUtil::TupleElementCount(output_tuple->on_device_shape());
|
|
Tensor* output_tensor;
|
|
TF_RETURN_IF_ERROR(context->allocate_output(
|
|
0, TensorShape({tuple_element_count}), &output_tensor));
|
|
|
|
for (int64 i = 0; i < tuple_element_count; ++i) {
|
|
XRTTupleAllocation* suballocation;
|
|
TF_RETURN_IF_ERROR(XRTTupleAllocation::MakeSubBuffer(
|
|
output_tuple.get(), {i}, &suballocation,
|
|
/*alias_parent_allocation=*/false));
|
|
output_tensor->vec<int64>()(i) = memory_manager->Register(suballocation);
|
|
}
|
|
} else {
|
|
Tensor* output_tensor;
|
|
TF_RETURN_IF_ERROR(
|
|
context->allocate_output(0, TensorShape({}), &output_tensor));
|
|
output_tensor->scalar<int64>()() =
|
|
memory_manager->Register(std::move(output_tuple));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status ExecuteChained(OpKernelContext* context,
|
|
const RefPtr<XRTMemoryManager>& memory_manager,
|
|
xla::Backend* backend, int device_ordinal,
|
|
const xrt::XRTChainedExecutePlan& plan,
|
|
const xrt::XRTChainedExecuteConfig& config,
|
|
const ChainedExecuteFn& execute_op) {
|
|
// Create the vector which tracks the uses of the intermediate chained
|
|
// operations outputs.
|
|
std::vector<int64> uses(plan.ops_size(), 0);
|
|
for (auto& op : plan.ops()) {
|
|
for (auto& input : op.inputs()) {
|
|
uses[input.op_index()] += 1;
|
|
}
|
|
}
|
|
|
|
ScopedHandles outputs(memory_manager);
|
|
ScopedHandles results(memory_manager);
|
|
for (int i = 0; i < plan.ops_size(); ++i) {
|
|
auto& op = plan.ops(i);
|
|
if (op.op_oneof_case() == xrt::XRTChainedExecuteOp::kDataHandle) {
|
|
// This operation is a device data load. Set the handle as output and
|
|
// leave the release flag off, since this is not an intermediate output.
|
|
TF_RETURN_IF_ERROR(outputs.Add(i, op.data_handle(), /*release=*/false));
|
|
} else if (op.op_oneof_case() ==
|
|
xrt::XRTChainedExecuteOp::kComputationHandle) {
|
|
// This is an XRT execute operation, forward to the device specific
|
|
// handler. Populating the working set makes sure the input allocations
|
|
// for this execute operations are pinned to device memory.
|
|
XRTMemoryManager::WorkingSet working_set(memory_manager);
|
|
TF_RETURN_IF_ERROR(
|
|
PopulateOpWorkingSet(backend, op, i, outputs, &working_set));
|
|
TF_ASSIGN_OR_RETURN(auto tuple,
|
|
execute_op(op, working_set.PinnedTuples()));
|
|
TF_RETURN_IF_ERROR(outputs.Add(i, std::move(tuple)));
|
|
} else {
|
|
return errors::InvalidArgument(
|
|
"Undefined operation kind at post-order position ", i);
|
|
}
|
|
// If the result of this chained operation is an output result, feed the
|
|
// results at the desired position.
|
|
for (auto& output : op.outputs()) {
|
|
TF_ASSIGN_OR_RETURN(auto tuple, outputs.Lookup(i));
|
|
RefPtr<XRTTupleAllocation> result;
|
|
TF_RETURN_IF_ERROR(MakeOutput(tuple, output.output_index(), &result));
|
|
TF_RETURN_IF_ERROR(results.Add(output.result_index(), std::move(result)));
|
|
}
|
|
// Drop intermediate results which have no more users.
|
|
for (auto& input : op.inputs()) {
|
|
uses[input.op_index()] -= 1;
|
|
if (uses[input.op_index()] == 0) {
|
|
TF_RETURN_IF_ERROR(outputs.Drop(input.op_index()));
|
|
}
|
|
}
|
|
}
|
|
|
|
Tensor* output_tensor;
|
|
TF_RETURN_IF_ERROR(context->allocate_output(
|
|
0, TensorShape({static_cast<int64>(results.size())}), &output_tensor));
|
|
for (size_t i = 0; i < results.size(); ++i) {
|
|
output_tensor->vec<int64>()(i) = results.Release(i);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace tensorflow
|