Internal change

PiperOrigin-RevId: 309268229
Change-Id: I463270c338de17ed3104c2bbc9088a5d3bff0672
This commit is contained in:
A. Unique TensorFlower 2020-04-30 11:42:51 -07:00 committed by TensorFlower Gardener
parent ed5a72cb2a
commit 9326ffb93b
4 changed files with 58 additions and 180 deletions

View File

@ -477,34 +477,6 @@ BENCHMARK(BM_executor)->ArgPair(8192, 32);
// Tall fat graph
BENCHMARK(BM_executor)->ArgPair(1024, 1024);
static void BM_const_identity(int iters, int width, int outputs_per_const) {
#ifdef PLATFORM_GOOGL
BenchmarkUseRealTime();
#endif // PLATFORM_GOOGLE
Graph* g = new Graph(OpRegistry::Global());
for (int i = 0; i < width; ++i) {
Tensor i_t(i);
Node* const_node = test::graph::Constant(g, i_t);
for (int j = 0; j < outputs_per_const; ++j) {
test::graph::Identity(g, const_node);
}
}
FixupSourceAndSinkEdges(g);
#ifdef PLATFORM_GOOGLE
SetBenchmarkLabel(
strings::StrCat("Nodes = ", (1 + outputs_per_const) * width));
SetBenchmarkItemsProcessed((1 + outputs_per_const) * width *
static_cast<int64>(iters));
#endif // PLATFORM_GOOGLE
test::Benchmark("cpu", g).Run(iters);
}
// Graph with actual op execution.
BENCHMARK(BM_const_identity)->ArgPair(1, 1);
BENCHMARK(BM_const_identity)->ArgPair(1, 100);
BENCHMARK(BM_const_identity)->ArgPair(100, 1);
BENCHMARK(BM_const_identity)->ArgPair(100, 100);
static void BM_FeedInputFetchOutput(int iters) {
testing::StopTiming();
Graph* g = new Graph(OpRegistry::Global());

View File

@ -167,9 +167,9 @@ cc_library(
srcs = ["single_threaded_executor.cc"],
hdrs = ["single_threaded_executor.h"],
deps = [
"//tensorflow/core:core_cpu",
"//tensorflow/core:core_cpu_internal",
"//tensorflow/core:lib",
"//tensorflow/core/common_runtime:core_cpu_internal",
"//tensorflow/core/common_runtime:entry",
"//tensorflow/core/common_runtime:local_executor_params",
],
alwayslink = 1,

View File

@ -15,7 +15,6 @@ limitations under the License.
#include "tensorflow/core/kernels/data/single_threaded_executor.h"
#include "tensorflow/core/common_runtime/entry.h"
#include "tensorflow/core/common_runtime/executor.h"
#include "tensorflow/core/common_runtime/executor_factory.h"
#include "tensorflow/core/graph/algorithm.h"
@ -38,9 +37,6 @@ class SingleThreadedExecutorImpl : public Executor {
for (const KernelState& kernel_state : kernels_) {
params_.delete_kernel(kernel_state.kernel);
}
for (const ConstTensorKernelState& kernel_state : const_tensor_kernels_) {
params_.delete_kernel(kernel_state.kernel);
}
}
Status Initialize(const Graph& graph) {
@ -57,7 +53,6 @@ class SingleThreadedExecutorImpl : public Executor {
kernels_.reserve(ordered_nodes.size());
std::vector<Node*> nodes_with_kernels;
std::vector<Node*> nodes_with_const_tensor_kernels;
nodes_with_kernels.reserve(ordered_nodes.size());
std::map<size_t, Node*> arg_index_to_node_map;
@ -107,38 +102,24 @@ class SingleThreadedExecutorImpl : public Executor {
continue;
}
OpKernel* kernel;
TF_RETURN_IF_ERROR(params_.create_kernel(n->properties(), &kernel));
const size_t kernel_index = kernels_.size();
kernels_.push_back({});
nodes_with_kernels.push_back(n);
KernelState& kernel_state = kernels_[kernel_index];
node_to_index_map[n] = kernel_index;
const Tensor* const_tensor;
if (n->num_outputs() == 1 && (const_tensor = kernel->const_tensor())) {
// Nodes that produce a single constant tensor are handled specially:
// we evaluate the tensor once, and propagate it to its consumers as
// a `const Tensor*`, to avoid refcount manipulation.
const size_t kernel_index = const_tensor_kernels_.size();
const_tensor_kernels_.push_back({});
nodes_with_const_tensor_kernels.push_back(n);
ConstTensorKernelState& kernel_state =
const_tensor_kernels_[kernel_index];
kernel_state.kernel = kernel;
kernel_state.const_tensor = *const_tensor;
TF_RETURN_IF_ERROR(
params_.create_kernel(n->properties(), &kernel_state.kernel));
kernel_state.num_inputs = n->num_inputs();
kernel_state.num_outputs = n->num_outputs();
if (kernel_index == 0) {
kernel_state.input_start_index = 0;
} else {
const size_t kernel_index = kernels_.size();
kernels_.push_back({});
nodes_with_kernels.push_back(n);
KernelState& kernel_state = kernels_[kernel_index];
kernel_state.kernel = kernel;
kernel_state.num_inputs = n->num_inputs();
kernel_state.num_outputs = n->num_outputs();
node_to_index_map[n] = kernel_index;
if (kernel_index == 0) {
kernel_state.input_start_index = 0;
} else {
const KernelState& previous_kernel_state = kernels_[kernel_index - 1];
kernel_state.input_start_index =
previous_kernel_state.input_start_index +
previous_kernel_state.num_inputs;
}
const KernelState& previous_kernel_state = kernels_[kernel_index - 1];
kernel_state.input_start_index =
previous_kernel_state.input_start_index +
previous_kernel_state.num_inputs;
}
}
@ -165,22 +146,6 @@ class SingleThreadedExecutorImpl : public Executor {
}
}
// Build the mapping from each const tensor kernel to the input slot for the
// corresponding destination node.
for (size_t i = 0; i < const_tensor_kernels_.size(); ++i) {
Node* n = nodes_with_const_tensor_kernels[i];
ConstTensorKernelState& kernel_state = const_tensor_kernels_[i];
for (const Edge* e : n->out_edges()) {
kernel_state.output_locations.push_back(
kernels_[node_to_index_map[e->dst()]].input_start_index +
e->dst_input());
}
bool on_host =
kernel_state.kernel->output_memory_types()[0] == HOST_MEMORY;
kernel_state.output_alloc_attr.set_on_host(on_host);
}
// Build the mapping from each node output to the input slot for the
// corresponding destination node.
for (size_t i = 0; i < kernels_.size(); ++i) {
@ -265,7 +230,7 @@ class SingleThreadedExecutorImpl : public Executor {
// * In an error case (see below), we use the connectivity information in
// `KernelState::output_locations` to determine which locations have been
// initialized, and manually destroy them.
std::vector<Entry> inputs(total_num_inputs_);
std::vector<ManualConstructor<Tensor>> inputs(total_num_inputs_);
// TODO(mrry): Can we avoid copying into these vectors? Consider modifying
// OpKernelContext to take the TensorValueVec as a pointer into `inputs`.
@ -322,25 +287,11 @@ class SingleThreadedExecutorImpl : public Executor {
const Tensor* arg;
TF_CHECK_OK(args.call_frame->GetArg(i, &arg));
for (size_t j = 0; j < num_destinations; ++j) {
Entry& input = inputs[arg_output_locations_[i][j]];
input.state = Entry::State::HAS_CONST_TENSOR;
input.const_tensor = arg;
inputs[arg_output_locations_[i][j]].Init(*arg);
}
}
}
// Kernels that return a constant value (e.g. ConstOp) are relatively
// expensive due to the Tensor allocations that they perform. Therefore we
// specialize their implementation and forward their constant value directly
// to the inputs of kernels that consume them.
for (const ConstTensorKernelState& kernel_state : const_tensor_kernels_) {
for (size_t i = 0; i < kernel_state.output_locations.size(); ++i) {
Entry& input = inputs[kernel_state.output_locations[i]];
input.state = Entry::State::HAS_CONST_TENSOR;
input.const_tensor = &kernel_state.const_tensor;
}
}
// Execute the kernels one-at-a-time in topological order.
for (size_t i = 0; i < kernels_.size(); ++i) {
const KernelState& kernel_state = kernels_[i];
@ -355,21 +306,8 @@ class SingleThreadedExecutorImpl : public Executor {
input_alloc_attrs.clear();
input_alloc_attrs.resize(num_inputs);
for (size_t j = 0; j < num_inputs; ++j) {
Entry& input = inputs[input_start_index + j];
switch (input.state) {
case Entry::State::HAS_CONST_TENSOR:
// NOTE(mrry): This `const_cast` is necessary because `TensorValue`
// stores a non-const `Tensor*`, and relies on the `OpKernelContext`
// accessors making dynamic checks that prevent using an immutable
// tensor as a mutable tensor.
node_inputs[j].tensor = const_cast<Tensor*>(input.const_tensor);
break;
case Entry::State::HAS_VALUE:
node_inputs[j].tensor = input.val.get();
break;
default:
DCHECK(false) << "Input did not have a valid value.";
}
auto t = inputs[input_start_index + j].get();
node_inputs[j].tensor = t;
input_alloc_attrs[j] = input_alloc_attrs_[input_start_index + j];
}
params.op_kernel = kernel_state.kernel;
@ -378,11 +316,41 @@ class SingleThreadedExecutorImpl : public Executor {
// Actually execute the kernel.
device->Compute(kernel_state.kernel, &ctx);
TF_RETURN_IF_ERROR(ctx.status());
if (!ctx.status().ok()) {
// On failure, we must manually free all intermediate tensors. We have
// already freed all the inputs for kernels up to (but not including)
// the `i`th kernel. We scan through the previously executed kernels and
// destroy any tensors that were destined to be the input for a kernel
// that has not yet executed.
for (size_t j = 0; j < arg_output_locations_.size(); ++j) {
for (size_t output_location : arg_output_locations_[j]) {
if (output_location >= input_start_index) {
// Only destroy an output location if it is an input to an
// operation that has not yet executed.
inputs[output_location].Destroy();
}
}
}
for (size_t j = 0; j < i; ++j) {
const KernelState& executed_kernel_state = kernels_[j];
for (size_t k = 0; k < executed_kernel_state.num_outputs; ++k) {
for (size_t output_location :
executed_kernel_state.output_locations[k]) {
if (output_location >= input_start_index) {
// Only destroy an output location if it is an input to an
// operation that has not yet executed.
inputs[output_location].Destroy();
}
}
}
}
return ctx.status();
}
// Free the inputs to the current kernel.
for (size_t j = 0; j < num_inputs; ++j) {
inputs[input_start_index + j].ClearVal();
inputs[input_start_index + j].Destroy();
}
// Forward the outputs of the kernel to the inputs of subsequent kernels.
@ -395,15 +363,11 @@ class SingleThreadedExecutorImpl : public Executor {
for (size_t k = 0; k < num_destinations - 1; ++k) {
// TODO(mrry): Validate that the types match the expected values or
// ensure that the necessary validation has already happened.
Entry& input = inputs[kernel_state.output_locations[j][k]];
input.state = Entry::State::HAS_VALUE;
input.val.Init(*val.tensor);
inputs[kernel_state.output_locations[j][k]].Init(*val.tensor);
}
// Move `arg` to the last consumer to avoid the cost of copying it.
Entry& input =
inputs[kernel_state.output_locations[j][num_destinations - 1]];
input.state = Entry::State::HAS_VALUE;
input.val.Init(std::move(*val.tensor));
inputs[kernel_state.output_locations[j][num_destinations - 1]].Init(
std::move(*val.tensor));
}
delete val.tensor;
}
@ -442,7 +406,7 @@ class SingleThreadedExecutorImpl : public Executor {
// For the `j`th output of `kernel`, `output_locations[j]` contains the
// locations in the flat `inputs` vector to which that output must be
// copied. See comment at the beginning of `Run()` for details.
// copied. See comment at the beginning of `RunAsync()` for details.
std::vector<std::vector<size_t>>
output_locations; // Length = `num_outputs`.
@ -457,33 +421,6 @@ class SingleThreadedExecutorImpl : public Executor {
std::vector<std::vector<size_t>>
arg_output_locations_; // Length = `num_args`.
// Represents cached graph structure state for each kernel that produces
// a single constant-valued tensor.
struct ConstTensorKernelState {
// The kernel object. Not owned.
//
// This pointer is managed by `params_.create_kernel()` and
// `params_.delete_kernel()`.
OpKernel* kernel;
// The cached value of `kernel->const_tensor()`.
//
// NOTE: We keep a `Tensor` rather than a `const Tensor*` here in order to
// keep the reference count on the underlying buffer above 1. Otherwise, a
// kernel could interpret the input as a forwardable tensor, and mutate the
// underlying constant tensor.
Tensor const_tensor;
// For the single output of `kernel`, `output_locations` contains the
// locations in the flat `inputs` vector to which that output must be
// copied. See comment at the beginning of `Run()` for details.
std::vector<size_t> output_locations; // Length = `num_outputs`.
// Memory space information for the single output of `kernel`.
AllocatorAttributes output_alloc_attr;
};
std::vector<ConstTensorKernelState> const_tensor_kernels_;
// Memory space information for each input. This information is stored in the
// same order as the flat `inputs` vector. See comment at the beginning of
// `RunAsync()` for details.

View File

@ -27,7 +27,6 @@ limitations under the License.
#include "tensorflow/core/framework/rendezvous.h"
#include "tensorflow/core/framework/versions.pb.h"
#include "tensorflow/core/graph/algorithm.h"
#include "tensorflow/core/graph/testlib.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/random/simple_philox.h"
#include "tensorflow/core/lib/strings/strcat.h"
@ -300,36 +299,6 @@ BENCHMARK(BM_executor)->ArgPair(8192, 32);
// Tall fat graph
BENCHMARK(BM_executor)->ArgPair(1024, 1024);
static void BM_const_identity(int iters, int width, int outputs_per_const) {
#ifdef PLATFORM_GOOGLE
BenchmarkUseRealTime();
#endif // PLATFORM_GOOGLE
Graph* g = new Graph(OpRegistry::Global());
for (int i = 0; i < width; ++i) {
Tensor i_t(i);
Node* const_node = test::graph::Constant(g, i_t);
for (int j = 0; j < outputs_per_const; ++j) {
test::graph::Identity(g, const_node);
}
}
FixupSourceAndSinkEdges(g);
#ifdef PLATFORM_GOOGLE
SetBenchmarkLabel(
strings::StrCat("Nodes = ", (1 + outputs_per_const) * width));
SetBenchmarkItemsProcessed((1 + outputs_per_const) * width *
static_cast<int64>(iters));
#endif // PLATFORM_GOOGLE
test::Benchmark("cpu", g, nullptr, nullptr, nullptr,
"SINGLE_THREADED_EXECUTOR")
.Run(iters);
}
// Graph with actual op execution.
BENCHMARK(BM_const_identity)->ArgPair(1, 1);
BENCHMARK(BM_const_identity)->ArgPair(1, 100);
BENCHMARK(BM_const_identity)->ArgPair(100, 1);
BENCHMARK(BM_const_identity)->ArgPair(100, 100);
// TODO(mrry): This benchmark currently crashes with a use-after free, because
// test::Benchmark::RunWithArgs() assumes that the executor will take ownership
// of the given graph, *and* keep its nodes (`x`, `y` and `z`) alive for the