Do not invoke ScopedAllocator when an OpKernel calls allocate_temp with a

scope_id.

This change only has effect when running a TF graph/function with collective
ops and with ScopedAllocatorOptimizer enabled.

Before this change, if an OpKernel called allocate_temp with
AllocatorAttributes containing a meaningful scope id, and later called
set_output with this tensor, it would cause a double allocation from a single
use ScopedAllocatorInstance.

After this change, there are only 2 ways to cause an allocation from
ScopedAllocator:
1. allocate_output with AllocatorAttributes having a scope id.  This is the
most performant way and will not lead to extra copies.
2. set_output on an index which should have been allocated with scoped
allocator.  In this case, we check if the output has not already been allocated
and if not, we force an allocation and a copy.
All other calls to allocate_* in OpKernel will set scope_id to -1 to ensure
that we are not calling into ScopedAllocator.

The performance implication of this change is that kernels should call
allocate_output whenever possible to ensure no extra copies.  Any other call,
such as allocate_temp followed by set_output, will lead to an intermediate
memory allocation, and then a copy into the scope allocated buffer.

This change also removes calls to GetScopedAllocator inside memory tracking
branches.

PiperOrigin-RevId: 249888173
This commit is contained in:
Ayush Dubey 2019-05-24 13:22:39 -07:00 committed by TensorFlower Gardener
parent 30799cde19
commit ae96bb6c65
3 changed files with 152 additions and 3 deletions

View File

@ -730,6 +730,15 @@ Status OpKernelContext::allocate_output(int index, const TensorShape& shape,
const DataType type = params_->op_kernel->output_type(index);
DCHECK(!IsRefType(type));
DCHECK(mutable_output(index) == nullptr);
if (attr.scope_id > 0) {
if (!allocated_scope_ids_.insert(attr.scope_id).second) {
return errors::Internal(
"OpKernel ", params_->op_kernel->name(),
" called allocate_output at index ", index, " with scope_id ",
attr.scope_id,
" more than once. Try turning off the ScopedAllocator optimizer.");
}
}
auto output_tensor = MakeUnique<Tensor>();
Status s = allocate_tensor(type, shape, output_tensor.get(), attr);
if (s.ok()) {
@ -743,6 +752,22 @@ Status OpKernelContext::allocate_temp(
DataType type, const TensorShape& shape, Tensor* out_temp,
AllocatorAttributes allocator_attr,
const AllocationAttributes& allocation_attr) {
if (allocator_attr.scope_id > 0) {
// We do not allow ScopedAllocator calls from allocate_temp. Unlike
// allocate_persistent where we return an error if a kernel provides a
// meaningful scope_id, here we clear the scope_id and return a temporary
// buffer. This is because it is legal for a kernel to call allocate_temp
// and then set_output with the temp tensor.
//
// We achieve memory correctness by forcing an allocation in set_output and
// copying over the tensor from the temp buffer. Kernels which would like
// to avoid this performance penalty should switch to calling
// allocate_output.
VLOG(2) << "Warning: OpKernel " << params_->op_kernel->name()
<< " called allocate_temp with scope_id " << allocator_attr.scope_id
<< ". Switch to allocate_output to avoid performance penalty.";
allocator_attr.scope_id = -1;
}
Status s =
allocate_tensor(type, shape, out_temp, allocator_attr, allocation_attr);
if (track_allocations() && s.ok() && out_temp->TotalBytes() > 0) {
@ -763,6 +788,13 @@ Status OpKernelContext::allocate_persistent(DataType type,
PersistentTensor* out_persistent,
Tensor** out_tensor,
AllocatorAttributes attr) {
if (attr.scope_id > 0) {
// ScopedAllocator cannot be used for persistent tensors, because these
// tensors may persist across kernel invocations/steps, whereas the backing
// tensor for the scoped allocator will be reallocated every step.
return errors::Internal(
"Unexpected call to allocate_persistent with scope_id ", attr.scope_id);
}
Tensor persistent;
Status s = allocate_tensor(type, shape, &persistent, attr);
if (s.ok()) {
@ -807,22 +839,38 @@ void OpKernelContext::set_output(int index, const Tensor& tensor) {
DCHECK(!IsRefType(type));
DCHECK_EQ(mutable_output(index), nullptr);
bool allocate_and_copy = false;
const bool never_forward =
(params_->forward_from_array != nullptr &&
params_->forward_from_array[index] == Params::kNeverForward);
if (never_forward) {
if (allocated_scope_ids_.find(output_alloc_attr(index).scope_id) ==
allocated_scope_ids_.end()) {
allocate_and_copy = true;
} else {
// The output at `index` must have been previously allocated via a call to
// `allocate_output(index, ...)`. That call would ensure that we return
// the correct slice of the ScopedAllocated buffer, so we do not
// re-allocate and copy here.
LOG(WARNING)
<< "OpKernel " << params_->op_kernel->name()
<< " called both allocate_output and set_output with scope_id "
<< output_alloc_attr(index).scope_id;
}
}
if (allocate_and_copy) {
// This output was marked to not be forwarded either during graph
// construction or grappler passes. Force an allocation and copy input to
// output.
AllocatorAttributes allocator_attributes = output_alloc_attr(index);
VLOG(1) << "OpKernelContext set_output index " << index << " tensor "
<< tensor.DebugString() << " never_forward " << never_forward
<< " params_->forward_from_array[index] "
<< params_->forward_from_array[index] << " alloc_attr.scope_id "
<< allocator_attributes.scope_id;
<< output_alloc_attr(index).scope_id;
auto new_tensor = MakeUnique<Tensor>();
Status s = allocate_tensor(type, tensor.shape(), new_tensor.get(),
allocator_attributes);
output_alloc_attr(index));
TF_DCHECK_OK(s);
device()->CopyTensorInSameDevice(&tensor, new_tensor.get(),
op_device_context(), [](const Status&) {});

View File

@ -18,6 +18,7 @@ limitations under the License.
#include <atomic>
#include <functional>
#include <unordered_set>
#include <utility>
#include <vector>
@ -1291,6 +1292,9 @@ class OpKernelContext {
gtl::InlinedVector<WrappedAllocator, 4> wrapped_allocators_ GUARDED_BY(mu_);
gtl::InlinedVector<TensorValue, 4> outputs_;
// Keep track of calls to ScopeAllocator.
std::unordered_set<int32> allocated_scope_ids_;
// Constructed only if <params->record_tensor_accesses>.
ManualConstructor<UniqueTensorReferences> referenced_tensors_ GUARDED_BY(mu_);

View File

@ -18,6 +18,7 @@ limitations under the License.
#include <memory>
#include <utility>
#include <vector>
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/framework/attr_value.pb.h"
#include "tensorflow/core/framework/attr_value_util.h"
@ -25,6 +26,7 @@ limitations under the License.
#include "tensorflow/core/framework/node_def_builder.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/tensor_shape.pb.h"
#include "tensorflow/core/framework/tensor_util.h"
#include "tensorflow/core/framework/types.pb.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/core/status_test_util.h"
@ -401,6 +403,101 @@ TEST_F(OpKernelTest, InputDtype) {
delete params.device;
}
// A mock device that mimics the behavior of scoped allocator upon calling
// GetAllocator with a positive scope_id.
class ScopedAllocatorDevice : public DeviceBase {
public:
explicit ScopedAllocatorDevice(Env* env)
: DeviceBase(env),
scope_allocated_(false),
num_allocations_(0),
num_scoped_allocations_(0) {}
Allocator* GetAllocator(AllocatorAttributes attrs) override {
CHECK_LE(attrs.scope_id, 0);
num_allocations_++;
return cpu_allocator();
}
Allocator* GetScopedAllocator(AllocatorAttributes attrs,
int64 /*step_id*/) override {
CHECK_GT(attrs.scope_id, 0);
num_scoped_allocations_++;
if (scope_allocated_) {
return nullptr;
} else {
scope_allocated_ = true;
return cpu_allocator();
}
}
void CopyTensorInSameDevice(const Tensor* input_tensor, Tensor* output_tensor,
const DeviceContext* device_context,
StatusCallback done) override {
CHECK(input_tensor->NumElements() == output_tensor->NumElements());
tensor::DeepCopy(*input_tensor, output_tensor);
done(Status::OK());
}
// Return the count of calls to GetAllocator or GetScopedAllocator, depending
// on when scoped is false or true respectively. For testing purposes.
int num_allocations(bool scoped) {
if (scoped) {
return num_scoped_allocations_;
} else {
return num_allocations_;
}
}
private:
bool scope_allocated_;
int num_allocations_;
int num_scoped_allocations_;
};
// Test that a kernel which has an output marked for allocation via
// ScopedAllocator, which calls allocate_temp and set_output, does the right
// thing. In this case, the expected behavior is for allocate_temp to return
// a temporary buffer, and set_output to copy the contents of this temp buffer
// into the ScopedAllocator slice.
TEST_F(OpKernelTest, ScopedAllocationTest) {
Env* env = Env::Default();
OpKernelContext::Params params;
params.record_tensor_accesses = false;
auto sa_device = absl::make_unique<ScopedAllocatorDevice>(env);
params.device = sa_device.get();
Status status;
std::unique_ptr<OpKernel> op(CreateOpKernel(
DEVICE_CPU, params.device, cpu_allocator(),
CreateNodeDef("Test4", {DT_FLOAT}), TF_GRAPH_DEF_VERSION, &status));
EXPECT_TRUE(status.ok());
params.op_kernel = op.get();
AllocatorAttributes alloc_attrs;
alloc_attrs.scope_id = 1;
std::vector<AllocatorAttributes> output_alloc_attrs({alloc_attrs});
params.output_attr_array = output_alloc_attrs.data();
std::vector<int> forward_from({OpKernelContext::Params::kNeverForward});
params.forward_from_array = forward_from.data();
auto ctx = absl::make_unique<OpKernelContext>(&params);
EXPECT_EQ(sa_device->num_allocations(false), 0);
EXPECT_EQ(sa_device->num_allocations(true), 0);
Tensor temp1;
TF_EXPECT_OK(
ctx->allocate_temp(DT_FLOAT, TensorShape({8}), &temp1, alloc_attrs));
EXPECT_EQ(sa_device->num_allocations(false), 1);
EXPECT_EQ(sa_device->num_allocations(true), 0);
Tensor temp2;
alloc_attrs.scope_id = -1;
TF_EXPECT_OK(
ctx->allocate_temp(DT_FLOAT, TensorShape({4}), &temp2, alloc_attrs));
EXPECT_EQ(sa_device->num_allocations(false), 2);
EXPECT_EQ(sa_device->num_allocations(true), 0);
ctx->set_output(0, temp1);
EXPECT_EQ(sa_device->num_allocations(false), 2);
EXPECT_EQ(sa_device->num_allocations(true), 1);
}
class OpKernelBuilderTest : public ::testing::Test {
protected:
// Each attr is described by a "name|type|value".