Force an allocation, followed by a copy, in OpKernel::set_output when the

output is marked for allocation via Scoped Allocator.

There are many opkernels that do not allocate their output, rather they call
`set_output` on a tensor obtained from elsewhere, such as the input.  If such
opkernels' output is marked for allocation via ScopedAllocator, but never
allocated, the output would not reach its supposed place in the backing buffer
allocated by the scoped allocator.  This prevented the ScopedAllocator Grappler
optimization from functioning properly, and caused bound check failures in
ScopedAllocatorConcat.

After this change, if an opkernel calls `set_output` on an output that is
supposed to be allocated via ScopedAllocator, `set_output` will allocated the
output with appropriate AllocatorAttributes and then deep copy the provided
tensor to the newly allocated output.

PiperOrigin-RevId: 243317289
This commit is contained in:
Ayush Dubey 2019-04-12 13:01:06 -07:00 committed by TensorFlower Gardener
parent 8fa0c87bd2
commit a3d2ee6ada
2 changed files with 134 additions and 76 deletions
tensorflow/core

View File

@ -15,14 +15,13 @@ limitations under the License.
#include "tensorflow/core/framework/op_kernel.h"
#include <cstdlib>
#include <cstring>
#include <mutex> // NOLINT
#include <unordered_map>
#include <utility>
#include <vector>
#include <cstdlib>
#include <cstring>
#include "tensorflow/core/framework/attr_value_util.h"
#include "tensorflow/core/framework/device_attributes.pb.h"
#include "tensorflow/core/framework/graph.pb_text.h"
@ -796,24 +795,51 @@ Status OpKernelContext::set_output(StringPiece name, const Tensor& tensor) {
void OpKernelContext::set_output(int index, const Tensor& tensor) {
DCHECK_GE(index, 0);
DCHECK_LT(index, outputs_.size());
DCHECK(!IsRefType(params_->op_kernel->output_type(index)));
const DataType type = params_->op_kernel->output_type(index);
DCHECK(!IsRefType(type));
DCHECK_EQ(mutable_output(index), nullptr);
record_tensor_reference(tensor);
outputs_[index] = TensorValue(new Tensor(tensor));
if (track_allocations() && tensor.TotalBytes() > 0) {
mutex_lock l(stats_mu_);
if (!temp_tensor_buffer_and_size_) {
return;
}
auto it = std::find_if(temp_tensor_buffer_and_size_->begin(),
temp_tensor_buffer_and_size_->end(),
[&tensor](const std::pair<const void*, int64>& e) {
return e.first == static_cast<const void*>(
tensor.tensor_data().data());
});
if (it != temp_tensor_buffer_and_size_->end()) {
temp_memory_allocated_ -= it->second;
temp_tensor_buffer_and_size_->erase(it);
const bool never_forward =
(params_->forward_from_array != nullptr &&
params_->forward_from_array[index] == Params::kNeverForward);
if (never_forward) {
// This output was marked to not be forwarded either during graph
// construction or grappler passes. Force an allocation and copy input to
// output.
AllocatorAttributes allocator_attributes = output_alloc_attr(index);
VLOG(1) << "OpKernelContext set_output index " << index << " tensor "
<< tensor.DebugString() << " never_forward " << never_forward
<< " params_->forward_from_array[index] "
<< params_->forward_from_array[index] << " alloc_attr.scope_id "
<< allocator_attributes.scope_id;
auto new_tensor = MakeUnique<Tensor>();
Status s = allocate_tensor(type, tensor.shape(), new_tensor.get(),
allocator_attributes);
TF_DCHECK_OK(s);
device()->CopyTensorInSameDevice(&tensor, new_tensor.get(),
op_device_context(), [](const Status&) {});
outputs_[index] = TensorValue(new_tensor.release());
} else {
// Input can be forwarded to output; incref on `tensor` and set output at
// `index` to this tensor.
record_tensor_reference(tensor);
outputs_[index] = TensorValue(new Tensor(tensor));
if (track_allocations() && tensor.TotalBytes() > 0) {
mutex_lock l(stats_mu_);
if (!temp_tensor_buffer_and_size_) {
return;
}
const auto it = std::find_if(
temp_tensor_buffer_and_size_->begin(),
temp_tensor_buffer_and_size_->end(),
[&tensor](const std::pair<const void*, int64>& e) {
return e.first ==
static_cast<const void*>(tensor.tensor_data().data());
});
if (it != temp_tensor_buffer_and_size_->end()) {
temp_memory_allocated_ -= it->second;
temp_tensor_buffer_and_size_->erase(it);
}
}
}
}
@ -1007,10 +1033,8 @@ void LoadDynamicKernelsInternal() {
bool override_abi_check =
strcmp(getenv("TF_REALLY_LOAD_UNSAFE_PACKAGES"), "1") == 0;
string bazel_kernel_dir = io::JoinPath(env->GetRunfilesDir(),
"tensorflow",
"core",
"kernels");
string bazel_kernel_dir =
io::JoinPath(env->GetRunfilesDir(), "tensorflow", "core", "kernels");
std::vector<string> files;
Status s_kernel_dir = env->GetChildren(bazel_kernel_dir, &files);
if (s_kernel_dir.ok()) {

View File

@ -71,11 +71,13 @@ class ScopedAllocatorOptimizerTest : public ::testing::Test {
\ / \ /
s1 s2
| |
(i1) (i2) if forward is true
| |
a1 a2
| |
r1 r2
*/
void BuildAbsGraph(GraphDef* graph_def) {
void BuildAbsGraph(GraphDef* graph_def, bool forward) {
tensorflow::Scope s = tensorflow::Scope::NewRootScope();
s = s.WithDevice("/job:localhost/replica:0/task:0/device:CPU:0");
@ -87,8 +89,16 @@ class ScopedAllocatorOptimizerTest : public ::testing::Test {
ops::Const<float>(s.WithOpName("c"), {-5.0, -2.0, 0.0, -2.0}, {2, 2});
Output s1 = ops::Add(s.WithOpName("s1"), a, b);
Output s2 = ops::Add(s.WithOpName("s2"), b, c);
Output a1 = ops::Abs(s.WithOpName("a1"), s1);
Output a2 = ops::Abs(s.WithOpName("a2"), s2);
Output int1, int2;
if (forward) {
int1 = ops::Identity(s.WithOpName("i1"), s1);
int2 = ops::Identity(s.WithOpName("i2"), s2);
} else {
int1 = s1;
int2 = s2;
}
Output a1 = ops::Abs(s.WithOpName("a1"), int1);
Output a2 = ops::Abs(s.WithOpName("a2"), int2);
Output r1 = ops::Reshape(s.WithOpName("r1"), a1, {1, 4});
Output r2 = ops::Reshape(s.WithOpName("r2"), a2, {4, 1});
TF_CHECK_OK(s.ToGraphDef(graph_def));
@ -105,13 +115,67 @@ class ScopedAllocatorOptimizerTest : public ::testing::Test {
}
}
}
// Constructs a graph by calling BuildAbsGraph, then executes it and returns
// r1, r2, and scoped_allocator_1_2_Abs:0.
void BuildAndExecuteAbsGraph(bool forward, std::vector<Tensor>* outputs) {
GrapplerItem item;
BuildAbsGraph(&item.graph, forward);
// Turn off all optimization except the ScopedAllocatorOptimizer
// to avoid anything that would alter the expected graph input/output,
// e.g. by constant folding away all calculations.
ConfigProto config;
GraphOptions* gopt = config.mutable_graph_options();
OptimizerOptions* opts = gopt->mutable_optimizer_options();
opts->set_do_common_subexpression_elimination(false);
opts->set_do_constant_folding(false);
opts->set_do_function_inlining(false);
opts->set_opt_level(OptimizerOptions::L0);
RewriterConfig* rwcfg = gopt->mutable_rewrite_options();
rwcfg->clear_optimizers();
(*rwcfg->add_optimizers()) = "scoped_allocator";
rwcfg->mutable_scoped_allocator_opts()->add_enable_op("Abs");
std::unique_ptr<Session> session(CreateSession(item.graph, config));
// Request two targets: one fetch output and one non-fetched output.
std::vector<string> output_names = {"r1:0", "r2:0"};
if (!forward) {
output_names.push_back("scoped_allocator_1_2_Abs:0");
}
std::vector<std::pair<string, Tensor>> inputs;
std::vector<string> target_nodes = {};
Status s = session->Run(inputs, output_names, target_nodes, outputs);
TF_ASSERT_OK(s);
ASSERT_EQ(outputs->size(), forward ? 2 : 3);
}
// Validates that output[0] matches expected0 and outputs[1] matches
// expected1.
void ValidateValues(const std::vector<Tensor>& outputs,
const std::vector<float>& expected0,
const std::vector<float>& expected1) {
for (int oi = 0; oi < outputs.size(); ++oi) {
if (oi == 0) {
ASSERT_EQ(expected0.size(), outputs[oi].NumElements());
for (int i = 0; i < expected0.size(); ++i) {
EXPECT_EQ(expected0[i], outputs[oi].flat<float>()(i));
}
} else if (oi == 1) {
ASSERT_EQ(expected1.size(), outputs[oi].NumElements());
for (int i = 0; i < expected1.size(); ++i) {
EXPECT_EQ(expected1[i], outputs[oi].flat<float>()(i));
}
}
}
}
};
TEST_F(ScopedAllocatorOptimizerTest, UnaryRewriteOnly) {
// Tests that Rewrite of program with parallel unary Ops is done as
// anticipated.
GrapplerItem item;
BuildAbsGraph(&item.graph);
BuildAbsGraph(&item.graph, false);
SetShapes(&item.graph);
ScopedAllocatorOptions opts;
@ -164,57 +228,15 @@ TEST_F(ScopedAllocatorOptimizerTest, UnaryRewriteOnly) {
}
TEST_F(ScopedAllocatorOptimizerTest, UnaryExecute) {
// Constructs the same graph as UnaryRewriteOnly, but actually executes it.
GrapplerItem item;
BuildAbsGraph(&item.graph);
// Turn off all optimization except the ScopedAllocatorOptimizer
// to avoid anything that would alter the expected graph input/output,
// e.g. by constant folding away all calculations.
ConfigProto config;
GraphOptions* gopt = config.mutable_graph_options();
OptimizerOptions* opts = gopt->mutable_optimizer_options();
opts->set_do_common_subexpression_elimination(false);
opts->set_do_constant_folding(false);
opts->set_do_function_inlining(false);
opts->set_opt_level(OptimizerOptions::L0);
RewriterConfig* rwcfg = gopt->mutable_rewrite_options();
rwcfg->clear_optimizers();
(*rwcfg->add_optimizers()) = "scoped_allocator";
rwcfg->mutable_scoped_allocator_opts()->add_enable_op("Abs");
std::unique_ptr<Session> session(CreateSession(item.graph, config));
std::vector<std::pair<string, Tensor>> inputs;
// Request two targets: one fetch output and one non-fetched output.
std::vector<string> output_names = {"r1:0", "r2:0",
"scoped_allocator_1_2_Abs:0"};
std::vector<string> target_nodes = {};
// Builds the same graph as UnaryRewriteOnly but also executes it and
// validates the output.
std::vector<Tensor> outputs;
Status s = session->Run(inputs, output_names, target_nodes, &outputs);
TF_ASSERT_OK(s);
ASSERT_EQ(outputs.size(), 3);
std::vector<float> expected_r1({2, 2, 3, 3});
std::vector<float> expected_r2({4, 4, 3, 2});
BuildAndExecuteAbsGraph(false, &outputs);
// a + b == 2, -2, 3, 3
// b + c == -4, -4, 3, 2
for (int oi = 0; oi < outputs.size(); ++oi) {
for (int i = 0; i < outputs[oi].NumElements(); ++i) {
VLOG(1) << "output vec " << oi << " index " << i << " = "
<< outputs[oi].flat<float>()(i);
}
if (oi == 0) {
ASSERT_EQ(expected_r1.size(), outputs[oi].NumElements());
for (int i = 0; i < expected_r1.size(); ++i) {
EXPECT_EQ(expected_r1[i], outputs[oi].flat<float>()(i));
}
} else if (oi == 1) {
ASSERT_EQ(expected_r2.size(), outputs[oi].NumElements());
for (int i = 0; i < expected_r2.size(); ++i) {
EXPECT_EQ(expected_r2[i], outputs[oi].flat<float>()(i));
}
}
}
std::vector<float> expected_r1({2, 2, 3, 3});
std::vector<float> expected_r2({4, 4, 3, 2});
ValidateValues(outputs, expected_r1, expected_r2);
}
// Tests static ScopedAllocatorOptimizer::ExtendNodeAttr.
@ -239,6 +261,18 @@ TEST_F(ScopedAllocatorOptimizerTest, Extend) {
VLOG(0) << "nd2: " << nd2.DebugString();
}
TEST_F(ScopedAllocatorOptimizerTest, ForwardInputToOutput) {
// Test that kernels that forward the input to output using `set_output` work
// well with scoped allocator optimization.
std::vector<Tensor> outputs;
BuildAndExecuteAbsGraph(true, &outputs);
// a + b == 2, -2, 3, 3
// b + c == -4, -4, 3, 2
std::vector<float> expected_r1({2, 2, 3, 3});
std::vector<float> expected_r2({4, 4, 3, 2});
ValidateValues(outputs, expected_r1, expected_r2);
}
} // namespace
} // namespace grappler
} // namespace tensorflow