From 9326ffb93bee46fcd0d0b1c80169bb49a651a1a1 Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Thu, 30 Apr 2020 11:42:51 -0700 Subject: [PATCH] Internal change PiperOrigin-RevId: 309268229 Change-Id: I463270c338de17ed3104c2bbc9088a5d3bff0672 --- .../core/common_runtime/executor_test.cc | 28 --- tensorflow/core/kernels/data/BUILD | 4 +- .../kernels/data/single_threaded_executor.cc | 175 ++++++------------ .../data/single_threaded_executor_test.cc | 31 ---- 4 files changed, 58 insertions(+), 180 deletions(-) diff --git a/tensorflow/core/common_runtime/executor_test.cc b/tensorflow/core/common_runtime/executor_test.cc index c5231449a00..79dbdd3bf44 100644 --- a/tensorflow/core/common_runtime/executor_test.cc +++ b/tensorflow/core/common_runtime/executor_test.cc @@ -477,34 +477,6 @@ BENCHMARK(BM_executor)->ArgPair(8192, 32); // Tall fat graph BENCHMARK(BM_executor)->ArgPair(1024, 1024); -static void BM_const_identity(int iters, int width, int outputs_per_const) { -#ifdef PLATFORM_GOOGL - BenchmarkUseRealTime(); -#endif // PLATFORM_GOOGLE - Graph* g = new Graph(OpRegistry::Global()); - for (int i = 0; i < width; ++i) { - Tensor i_t(i); - Node* const_node = test::graph::Constant(g, i_t); - for (int j = 0; j < outputs_per_const; ++j) { - test::graph::Identity(g, const_node); - } - } - FixupSourceAndSinkEdges(g); -#ifdef PLATFORM_GOOGLE - SetBenchmarkLabel( - strings::StrCat("Nodes = ", (1 + outputs_per_const) * width)); - SetBenchmarkItemsProcessed((1 + outputs_per_const) * width * - static_cast(iters)); -#endif // PLATFORM_GOOGLE - test::Benchmark("cpu", g).Run(iters); -} - -// Graph with actual op execution. -BENCHMARK(BM_const_identity)->ArgPair(1, 1); -BENCHMARK(BM_const_identity)->ArgPair(1, 100); -BENCHMARK(BM_const_identity)->ArgPair(100, 1); -BENCHMARK(BM_const_identity)->ArgPair(100, 100); - static void BM_FeedInputFetchOutput(int iters) { testing::StopTiming(); Graph* g = new Graph(OpRegistry::Global()); diff --git a/tensorflow/core/kernels/data/BUILD b/tensorflow/core/kernels/data/BUILD index d088abc00e6..65881da377f 100644 --- a/tensorflow/core/kernels/data/BUILD +++ b/tensorflow/core/kernels/data/BUILD @@ -167,9 +167,9 @@ cc_library( srcs = ["single_threaded_executor.cc"], hdrs = ["single_threaded_executor.h"], deps = [ + "//tensorflow/core:core_cpu", + "//tensorflow/core:core_cpu_internal", "//tensorflow/core:lib", - "//tensorflow/core/common_runtime:core_cpu_internal", - "//tensorflow/core/common_runtime:entry", "//tensorflow/core/common_runtime:local_executor_params", ], alwayslink = 1, diff --git a/tensorflow/core/kernels/data/single_threaded_executor.cc b/tensorflow/core/kernels/data/single_threaded_executor.cc index 89a92b0f0c1..45413e8d312 100644 --- a/tensorflow/core/kernels/data/single_threaded_executor.cc +++ b/tensorflow/core/kernels/data/single_threaded_executor.cc @@ -15,7 +15,6 @@ limitations under the License. #include "tensorflow/core/kernels/data/single_threaded_executor.h" -#include "tensorflow/core/common_runtime/entry.h" #include "tensorflow/core/common_runtime/executor.h" #include "tensorflow/core/common_runtime/executor_factory.h" #include "tensorflow/core/graph/algorithm.h" @@ -38,9 +37,6 @@ class SingleThreadedExecutorImpl : public Executor { for (const KernelState& kernel_state : kernels_) { params_.delete_kernel(kernel_state.kernel); } - for (const ConstTensorKernelState& kernel_state : const_tensor_kernels_) { - params_.delete_kernel(kernel_state.kernel); - } } Status Initialize(const Graph& graph) { @@ -57,7 +53,6 @@ class SingleThreadedExecutorImpl : public Executor { kernels_.reserve(ordered_nodes.size()); std::vector nodes_with_kernels; - std::vector nodes_with_const_tensor_kernels; nodes_with_kernels.reserve(ordered_nodes.size()); std::map arg_index_to_node_map; @@ -107,38 +102,24 @@ class SingleThreadedExecutorImpl : public Executor { continue; } - OpKernel* kernel; - TF_RETURN_IF_ERROR(params_.create_kernel(n->properties(), &kernel)); + const size_t kernel_index = kernels_.size(); + kernels_.push_back({}); + nodes_with_kernels.push_back(n); + KernelState& kernel_state = kernels_[kernel_index]; + node_to_index_map[n] = kernel_index; - const Tensor* const_tensor; - if (n->num_outputs() == 1 && (const_tensor = kernel->const_tensor())) { - // Nodes that produce a single constant tensor are handled specially: - // we evaluate the tensor once, and propagate it to its consumers as - // a `const Tensor*`, to avoid refcount manipulation. - const size_t kernel_index = const_tensor_kernels_.size(); - const_tensor_kernels_.push_back({}); - nodes_with_const_tensor_kernels.push_back(n); - ConstTensorKernelState& kernel_state = - const_tensor_kernels_[kernel_index]; - kernel_state.kernel = kernel; - kernel_state.const_tensor = *const_tensor; + TF_RETURN_IF_ERROR( + params_.create_kernel(n->properties(), &kernel_state.kernel)); + kernel_state.num_inputs = n->num_inputs(); + kernel_state.num_outputs = n->num_outputs(); + + if (kernel_index == 0) { + kernel_state.input_start_index = 0; } else { - const size_t kernel_index = kernels_.size(); - kernels_.push_back({}); - nodes_with_kernels.push_back(n); - KernelState& kernel_state = kernels_[kernel_index]; - kernel_state.kernel = kernel; - kernel_state.num_inputs = n->num_inputs(); - kernel_state.num_outputs = n->num_outputs(); - node_to_index_map[n] = kernel_index; - if (kernel_index == 0) { - kernel_state.input_start_index = 0; - } else { - const KernelState& previous_kernel_state = kernels_[kernel_index - 1]; - kernel_state.input_start_index = - previous_kernel_state.input_start_index + - previous_kernel_state.num_inputs; - } + const KernelState& previous_kernel_state = kernels_[kernel_index - 1]; + kernel_state.input_start_index = + previous_kernel_state.input_start_index + + previous_kernel_state.num_inputs; } } @@ -165,22 +146,6 @@ class SingleThreadedExecutorImpl : public Executor { } } - // Build the mapping from each const tensor kernel to the input slot for the - // corresponding destination node. - for (size_t i = 0; i < const_tensor_kernels_.size(); ++i) { - Node* n = nodes_with_const_tensor_kernels[i]; - ConstTensorKernelState& kernel_state = const_tensor_kernels_[i]; - for (const Edge* e : n->out_edges()) { - kernel_state.output_locations.push_back( - kernels_[node_to_index_map[e->dst()]].input_start_index + - e->dst_input()); - } - - bool on_host = - kernel_state.kernel->output_memory_types()[0] == HOST_MEMORY; - kernel_state.output_alloc_attr.set_on_host(on_host); - } - // Build the mapping from each node output to the input slot for the // corresponding destination node. for (size_t i = 0; i < kernels_.size(); ++i) { @@ -265,7 +230,7 @@ class SingleThreadedExecutorImpl : public Executor { // * In an error case (see below), we use the connectivity information in // `KernelState::output_locations` to determine which locations have been // initialized, and manually destroy them. - std::vector inputs(total_num_inputs_); + std::vector> inputs(total_num_inputs_); // TODO(mrry): Can we avoid copying into these vectors? Consider modifying // OpKernelContext to take the TensorValueVec as a pointer into `inputs`. @@ -322,25 +287,11 @@ class SingleThreadedExecutorImpl : public Executor { const Tensor* arg; TF_CHECK_OK(args.call_frame->GetArg(i, &arg)); for (size_t j = 0; j < num_destinations; ++j) { - Entry& input = inputs[arg_output_locations_[i][j]]; - input.state = Entry::State::HAS_CONST_TENSOR; - input.const_tensor = arg; + inputs[arg_output_locations_[i][j]].Init(*arg); } } } - // Kernels that return a constant value (e.g. ConstOp) are relatively - // expensive due to the Tensor allocations that they perform. Therefore we - // specialize their implementation and forward their constant value directly - // to the inputs of kernels that consume them. - for (const ConstTensorKernelState& kernel_state : const_tensor_kernels_) { - for (size_t i = 0; i < kernel_state.output_locations.size(); ++i) { - Entry& input = inputs[kernel_state.output_locations[i]]; - input.state = Entry::State::HAS_CONST_TENSOR; - input.const_tensor = &kernel_state.const_tensor; - } - } - // Execute the kernels one-at-a-time in topological order. for (size_t i = 0; i < kernels_.size(); ++i) { const KernelState& kernel_state = kernels_[i]; @@ -355,21 +306,8 @@ class SingleThreadedExecutorImpl : public Executor { input_alloc_attrs.clear(); input_alloc_attrs.resize(num_inputs); for (size_t j = 0; j < num_inputs; ++j) { - Entry& input = inputs[input_start_index + j]; - switch (input.state) { - case Entry::State::HAS_CONST_TENSOR: - // NOTE(mrry): This `const_cast` is necessary because `TensorValue` - // stores a non-const `Tensor*`, and relies on the `OpKernelContext` - // accessors making dynamic checks that prevent using an immutable - // tensor as a mutable tensor. - node_inputs[j].tensor = const_cast(input.const_tensor); - break; - case Entry::State::HAS_VALUE: - node_inputs[j].tensor = input.val.get(); - break; - default: - DCHECK(false) << "Input did not have a valid value."; - } + auto t = inputs[input_start_index + j].get(); + node_inputs[j].tensor = t; input_alloc_attrs[j] = input_alloc_attrs_[input_start_index + j]; } params.op_kernel = kernel_state.kernel; @@ -378,11 +316,41 @@ class SingleThreadedExecutorImpl : public Executor { // Actually execute the kernel. device->Compute(kernel_state.kernel, &ctx); - TF_RETURN_IF_ERROR(ctx.status()); + + if (!ctx.status().ok()) { + // On failure, we must manually free all intermediate tensors. We have + // already freed all the inputs for kernels up to (but not including) + // the `i`th kernel. We scan through the previously executed kernels and + // destroy any tensors that were destined to be the input for a kernel + // that has not yet executed. + for (size_t j = 0; j < arg_output_locations_.size(); ++j) { + for (size_t output_location : arg_output_locations_[j]) { + if (output_location >= input_start_index) { + // Only destroy an output location if it is an input to an + // operation that has not yet executed. + inputs[output_location].Destroy(); + } + } + } + for (size_t j = 0; j < i; ++j) { + const KernelState& executed_kernel_state = kernels_[j]; + for (size_t k = 0; k < executed_kernel_state.num_outputs; ++k) { + for (size_t output_location : + executed_kernel_state.output_locations[k]) { + if (output_location >= input_start_index) { + // Only destroy an output location if it is an input to an + // operation that has not yet executed. + inputs[output_location].Destroy(); + } + } + } + } + return ctx.status(); + } // Free the inputs to the current kernel. for (size_t j = 0; j < num_inputs; ++j) { - inputs[input_start_index + j].ClearVal(); + inputs[input_start_index + j].Destroy(); } // Forward the outputs of the kernel to the inputs of subsequent kernels. @@ -395,15 +363,11 @@ class SingleThreadedExecutorImpl : public Executor { for (size_t k = 0; k < num_destinations - 1; ++k) { // TODO(mrry): Validate that the types match the expected values or // ensure that the necessary validation has already happened. - Entry& input = inputs[kernel_state.output_locations[j][k]]; - input.state = Entry::State::HAS_VALUE; - input.val.Init(*val.tensor); + inputs[kernel_state.output_locations[j][k]].Init(*val.tensor); } // Move `arg` to the last consumer to avoid the cost of copying it. - Entry& input = - inputs[kernel_state.output_locations[j][num_destinations - 1]]; - input.state = Entry::State::HAS_VALUE; - input.val.Init(std::move(*val.tensor)); + inputs[kernel_state.output_locations[j][num_destinations - 1]].Init( + std::move(*val.tensor)); } delete val.tensor; } @@ -442,7 +406,7 @@ class SingleThreadedExecutorImpl : public Executor { // For the `j`th output of `kernel`, `output_locations[j]` contains the // locations in the flat `inputs` vector to which that output must be - // copied. See comment at the beginning of `Run()` for details. + // copied. See comment at the beginning of `RunAsync()` for details. std::vector> output_locations; // Length = `num_outputs`. @@ -457,33 +421,6 @@ class SingleThreadedExecutorImpl : public Executor { std::vector> arg_output_locations_; // Length = `num_args`. - // Represents cached graph structure state for each kernel that produces - // a single constant-valued tensor. - struct ConstTensorKernelState { - // The kernel object. Not owned. - // - // This pointer is managed by `params_.create_kernel()` and - // `params_.delete_kernel()`. - OpKernel* kernel; - - // The cached value of `kernel->const_tensor()`. - // - // NOTE: We keep a `Tensor` rather than a `const Tensor*` here in order to - // keep the reference count on the underlying buffer above 1. Otherwise, a - // kernel could interpret the input as a forwardable tensor, and mutate the - // underlying constant tensor. - Tensor const_tensor; - - // For the single output of `kernel`, `output_locations` contains the - // locations in the flat `inputs` vector to which that output must be - // copied. See comment at the beginning of `Run()` for details. - std::vector output_locations; // Length = `num_outputs`. - - // Memory space information for the single output of `kernel`. - AllocatorAttributes output_alloc_attr; - }; - std::vector const_tensor_kernels_; - // Memory space information for each input. This information is stored in the // same order as the flat `inputs` vector. See comment at the beginning of // `RunAsync()` for details. diff --git a/tensorflow/core/kernels/data/single_threaded_executor_test.cc b/tensorflow/core/kernels/data/single_threaded_executor_test.cc index 7342ac690ab..e1f8a399c6f 100644 --- a/tensorflow/core/kernels/data/single_threaded_executor_test.cc +++ b/tensorflow/core/kernels/data/single_threaded_executor_test.cc @@ -27,7 +27,6 @@ limitations under the License. #include "tensorflow/core/framework/rendezvous.h" #include "tensorflow/core/framework/versions.pb.h" #include "tensorflow/core/graph/algorithm.h" -#include "tensorflow/core/graph/testlib.h" #include "tensorflow/core/lib/core/status_test_util.h" #include "tensorflow/core/lib/random/simple_philox.h" #include "tensorflow/core/lib/strings/strcat.h" @@ -300,36 +299,6 @@ BENCHMARK(BM_executor)->ArgPair(8192, 32); // Tall fat graph BENCHMARK(BM_executor)->ArgPair(1024, 1024); -static void BM_const_identity(int iters, int width, int outputs_per_const) { -#ifdef PLATFORM_GOOGLE - BenchmarkUseRealTime(); -#endif // PLATFORM_GOOGLE - Graph* g = new Graph(OpRegistry::Global()); - for (int i = 0; i < width; ++i) { - Tensor i_t(i); - Node* const_node = test::graph::Constant(g, i_t); - for (int j = 0; j < outputs_per_const; ++j) { - test::graph::Identity(g, const_node); - } - } - FixupSourceAndSinkEdges(g); -#ifdef PLATFORM_GOOGLE - SetBenchmarkLabel( - strings::StrCat("Nodes = ", (1 + outputs_per_const) * width)); - SetBenchmarkItemsProcessed((1 + outputs_per_const) * width * - static_cast(iters)); -#endif // PLATFORM_GOOGLE - test::Benchmark("cpu", g, nullptr, nullptr, nullptr, - "SINGLE_THREADED_EXECUTOR") - .Run(iters); -} - -// Graph with actual op execution. -BENCHMARK(BM_const_identity)->ArgPair(1, 1); -BENCHMARK(BM_const_identity)->ArgPair(1, 100); -BENCHMARK(BM_const_identity)->ArgPair(100, 1); -BENCHMARK(BM_const_identity)->ArgPair(100, 100); - // TODO(mrry): This benchmark currently crashes with a use-after free, because // test::Benchmark::RunWithArgs() assumes that the executor will take ownership // of the given graph, *and* keep its nodes (`x`, `y` and `z`) alive for the