[XLA:CPU] Atomically enqueue tuple buffers for outfeed.

Previously it was possible that a distinct thread could hop in between the
buffer enqueues done by a tuple-outfeeding thread. This changes the sequence to
enqueue all the tuple buffers as an atomic unit.

PiperOrigin-RevId: 163781804
This commit is contained in:
Chris Leary 2017-07-31 19:10:37 -07:00 committed by TensorFlower Gardener
parent b882d686ff
commit 9d56130881
8 changed files with 115 additions and 48 deletions

View File

@ -130,5 +130,6 @@ void __xla_cpu_runtime_ReleaseOutfeedBufferAfterPopulation(
xla::cpu::runtime::XfeedManager* xfeed = xla::cpu::runtime::GetXfeedManager();
xla::StatusOr<xla::Shape> shape =
xla::llvm_ir::DecodeSelfDescribingShapeConstant(shape_ptr, shape_length);
xfeed->outfeed()->ReleaseCurrentBuffer(buffer_length, buffer_ptr, shape);
xfeed->outfeed()->ReleaseCurrentBuffer(buffer_length, buffer_ptr,
std::move(shape));
}

View File

@ -36,7 +36,7 @@ void XfeedQueueManager::Reset() {
enqueued_buffers_.clear();
}
void XfeedQueueManager::EnqueueBuffers(
void XfeedQueueManager::EnqueueBuffersAtomically(
tensorflow::gtl::ArraySlice<XfeedBuffer*> buffers) {
tensorflow::mutex_lock l(mu_);
bool was_empty = enqueued_buffers_.empty();

View File

@ -63,7 +63,8 @@ class XfeedQueueManager {
// called when the buffer will no longer be accessed by the XfeedManager,
// either as a result of a call to Reset or because the runtime has dequeued
// and used the buffer.
void EnqueueBuffers(tensorflow::gtl::ArraySlice<XfeedBuffer*> buffers);
void EnqueueBuffersAtomically(
tensorflow::gtl::ArraySlice<XfeedBuffer*> buffers);
// Blocks until the queue is non-empty, then returns the buffer at the head of
// the queue. Sets the current buffer to be the returned buffer. It is an

View File

@ -87,8 +87,8 @@ TEST_F(InfeedManagerTest, SingleThreadedSequential) {
cpu::runtime::XfeedManager* xfeed = cpu::runtime::GetXfeedManager();
xfeed->infeed()->EnqueueBuffers({a});
xfeed->infeed()->EnqueueBuffers({b});
xfeed->infeed()->EnqueueBuffersAtomically({a});
xfeed->infeed()->EnqueueBuffersAtomically({b});
ProcessNextBuffer(a->length());
ProcessNextBuffer(b->length());
}
@ -99,9 +99,9 @@ TEST_F(InfeedManagerTest, SingleThreadedInterleaved) {
cpu::runtime::XfeedManager* xfeed = cpu::runtime::GetXfeedManager();
xfeed->infeed()->EnqueueBuffers({a});
xfeed->infeed()->EnqueueBuffersAtomically({a});
ProcessNextBuffer(a->length());
xfeed->infeed()->EnqueueBuffers({b});
xfeed->infeed()->EnqueueBuffersAtomically({b});
ProcessNextBuffer(b->length());
}
@ -122,7 +122,7 @@ TEST_F(InfeedManagerTest, MultiThreaded) {
}
}
TestInfeedBuffer* a = new TestInfeedBuffer(length);
xfeed->infeed()->EnqueueBuffers({a});
xfeed->infeed()->EnqueueBuffersAtomically({a});
});
ProcessNextBuffer(length);
@ -131,7 +131,7 @@ TEST_F(InfeedManagerTest, MultiThreaded) {
TEST_F(InfeedManagerTest, OutfeedWrongShape) {
TestInfeedBuffer* b = new TestInfeedBuffer(32, /*expect_shape_match=*/false);
cpu::runtime::XfeedManager* xfeed = cpu::runtime::GetXfeedManager();
xfeed->outfeed()->EnqueueBuffers({b});
xfeed->outfeed()->EnqueueBuffersAtomically({b});
ProcessNextOutfeedBuffer(32, ShapeUtil::MakeShape(U8, {33}));
}

View File

@ -111,9 +111,9 @@ Status CpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor,
// infeed manager.
std::vector<cpu::runtime::XfeedBuffer*> buffers;
buffers.reserve(literal.tuple_literals_size());
auto cleanup = tensorflow::gtl::MakeCleanup([buffers]() {
auto cleanup = tensorflow::gtl::MakeCleanup([&buffers]() {
for (cpu::runtime::XfeedBuffer* b : buffers) {
b->Done(ShapeUtil::MakeNil());
b->Done(Cancelled("Failed to infeed buffer to device."));
}
});
@ -128,7 +128,7 @@ Status CpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor,
}
cpu::runtime::XfeedManager* xfeed_manager = cpu::runtime::GetXfeedManager();
xfeed_manager->infeed()->EnqueueBuffers(buffers);
xfeed_manager->infeed()->EnqueueBuffersAtomically(buffers);
cleanup.release();
return Status::OK();
@ -141,7 +141,7 @@ Status CpuTransferManager::TransferBufferToInfeed(se::StreamExecutor* executor,
TransferBufferToInfeedInternal(executor, size, source));
cpu::runtime::XfeedManager* xfeed_manager = cpu::runtime::GetXfeedManager();
xfeed_manager->infeed()->EnqueueBuffers({buffer});
xfeed_manager->infeed()->EnqueueBuffersAtomically({buffer});
return Status::OK();
}
@ -166,7 +166,7 @@ CpuTransferManager::TransferBufferToInfeedInternal(se::StreamExecutor* executor,
/*source=*/source, queued_buffer->device_memory());
if (!s.ok()) {
queued_buffer->Done(ShapeUtil::MakeNil());
queued_buffer->Done(s);
return s;
}
return queued_buffer;
@ -186,8 +186,8 @@ Status CpuTransferManager::TransferLiteralFromOutfeed(
Literal::CreateFromDimensions(literal_shape.element_type(), dimensions);
literal->Swap(empty.get());
TF_ASSIGN_OR_RETURN(Shape received_shape,
TransferBufferFromOutfeed(
executor, size, literal->MutableInternalData()));
TransferArrayBufferFromOutfeed(
executor, literal->MutableInternalData(), size));
TF_RET_CHECK(ShapeUtil::Compatible(received_shape, literal->shape()))
<< "Shape received from outfeed "
<< ShapeUtil::HumanString(received_shape)
@ -204,6 +204,7 @@ Status CpuTransferManager::TransferLiteralFromOutfeed(
}
std::vector<std::unique_ptr<Literal>> elements;
std::vector<std::pair<void*, int64>> buffer_data;
for (int64 i = 0; i < literal_shape.tuple_shapes_size(); ++i) {
const Shape& tuple_element_shape =
ShapeUtil::GetTupleElementShape(literal_shape, i);
@ -215,48 +216,88 @@ Status CpuTransferManager::TransferLiteralFromOutfeed(
tuple_element_shape.dimensions().size());
auto empty = Literal::CreateFromDimensions(
tuple_element_shape.element_type(), dimensions);
TF_ASSIGN_OR_RETURN(
Shape received_shape,
TransferBufferFromOutfeed(executor,
GetByteSizeRequirement(tuple_element_shape),
empty->MutableInternalData()));
TF_RET_CHECK(ShapeUtil::Compatible(received_shape, tuple_element_shape))
<< "Shape received from outfeed "
<< ShapeUtil::HumanString(received_shape)
<< " did not match the shape that was requested for outfeed: "
<< ShapeUtil::HumanString(tuple_element_shape);
TF_RET_CHECK(GetByteSizeRequirement(tuple_element_shape) ==
GetByteSizeRequirement(received_shape));
*empty->mutable_shape() = received_shape;
int64 size = GetByteSizeRequirement(tuple_element_shape);
buffer_data.push_back({empty->MutableInternalData(), size});
elements.push_back(std::move(empty));
}
TF_ASSIGN_OR_RETURN(Shape received_shape,
TransferTupleBuffersFromOutfeed(executor, buffer_data));
TF_RET_CHECK(ShapeUtil::Compatible(received_shape, literal_shape))
<< "Shape received from outfeed "
<< ShapeUtil::HumanString(received_shape)
<< " did not match the shape that was requested for outfeed: "
<< ShapeUtil::HumanString(literal_shape);
TF_RET_CHECK(GetByteSizeRequirement(literal_shape) ==
GetByteSizeRequirement(received_shape));
for (int64 i = 0; i < literal_shape.tuple_shapes_size(); ++i) {
*elements[i]->mutable_shape() = received_shape.tuple_shapes(i);
}
auto result = Literal::MakeTupleOwned(std::move(elements));
literal->Swap(result.get());
TF_RET_CHECK(ShapeUtil::Equal(literal->shape(), literal_shape));
return Status::OK();
}
StatusOr<Shape> CpuTransferManager::TransferBufferFromOutfeed(
perftools::gputools::StreamExecutor* executor, int64 size,
void* destination) {
if (size > std::numeric_limits<int32>::max()) {
return InvalidArgument("Outfeed shape is too large: needs %lld bytes",
size);
StatusOr<Shape> CpuTransferManager::TransferTupleBuffersFromOutfeed(
perftools::gputools::StreamExecutor* executor,
tensorflow::gtl::ArraySlice<std::pair<void*, int64>> buffer_data) {
return TransferBuffersFromOutfeedInternal(executor, buffer_data,
/*is_tuple=*/true);
}
StatusOr<Shape> CpuTransferManager::TransferArrayBufferFromOutfeed(
perftools::gputools::StreamExecutor* executor, void* destination,
int64 size_bytes) {
return TransferBuffersFromOutfeedInternal(
executor, {{destination, size_bytes}}, /*is_tuple=*/false);
}
StatusOr<Shape> CpuTransferManager::TransferBuffersFromOutfeedInternal(
perftools::gputools::StreamExecutor* executor,
tensorflow::gtl::ArraySlice<std::pair<void*, int64>> buffer_data,
bool is_tuple) {
std::vector<std::unique_ptr<CpuOutfeedBuffer>> buffers;
for (auto b : buffer_data) {
int64 size = b.second;
if (size > std::numeric_limits<int32>::max()) {
return InvalidArgument("Outfeed shape is too large: needs %lld bytes",
size);
}
if (size <= 0) {
return InvalidArgument("Outfeed shape must have positive size; got %lld",
size);
}
int32 size_32 = static_cast<int32>(size);
VLOG(2)
<< "Enqueueing outfeed buffer (for the device to populate) of length "
<< size_32 << "B";
buffers.emplace_back(MakeUnique<CpuOutfeedBuffer>(b.first, size_32));
}
if (size <= 0) {
return InvalidArgument("Outfeed shape must have positive size; got %lld",
size);
std::vector<cpu::runtime::XfeedBuffer*> buffer_pointers;
buffer_pointers.reserve(buffers.size());
for (auto& b : buffers) {
buffer_pointers.push_back(b.get());
}
int32 size_32 = static_cast<int32>(size);
cpu::runtime::XfeedManager* xfeed_manager = cpu::runtime::GetXfeedManager();
CpuOutfeedBuffer buffer(destination, size_32);
VLOG(2) << "Enqueueing outfeed buffer (for the device to populate) of length "
<< size_32 << "B";
xfeed_manager->outfeed()->EnqueueBuffers({&buffer});
xfeed_manager->outfeed()->EnqueueBuffersAtomically(buffer_pointers);
VLOG(2) << "Waiting for buffer to be notified as populated.";
return buffer.WaitForNotification();
std::vector<Shape> outfed_shapes;
for (auto& buffer : buffers) {
TF_ASSIGN_OR_RETURN(Shape outfed_shape, buffer->WaitForNotification());
outfed_shapes.push_back(std::move(outfed_shape));
}
if (is_tuple) {
return ShapeUtil::MakeTupleShape(outfed_shapes);
}
TF_RET_CHECK(outfed_shapes.size() == 1);
return std::move(outfed_shapes[0]);
}
} // namespace xla

View File

@ -23,6 +23,7 @@ limitations under the License.
#include "tensorflow/compiler/xla/service/transfer_manager.h"
#include "tensorflow/compiler/xla/statusor.h"
#include "tensorflow/compiler/xla/xla_data.pb.h"
#include "tensorflow/core/lib/gtl/array_slice.h"
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/stream_executor_no_cuda.h"
#include "tensorflow/core/platform/types.h"
@ -51,10 +52,23 @@ class CpuTransferManager : public GenericTransferManager {
perftools::gputools::StreamExecutor* executor, int64 size,
const void* source);
// On success, returns the shape that was transferred from the outfeed.
StatusOr<Shape> TransferBufferFromOutfeed(
perftools::gputools::StreamExecutor* executor, int64 size,
void* destination);
// Helper that transfers a tuple of element buffers from the device's outfeed.
StatusOr<Shape> TransferTupleBuffersFromOutfeed(
perftools::gputools::StreamExecutor* executor,
tensorflow::gtl::ArraySlice<std::pair<void*, int64>> buffer_data);
// Helper that transfers an array buffer from the device's outfeed.
StatusOr<Shape> TransferArrayBufferFromOutfeed(
perftools::gputools::StreamExecutor* executor, void* destination,
int64 size_bytes);
// On success, returns the shape that was transferred from the outfeed -- if
// is_tuple is true, the returned shape will be a tuple of the returned shapes
// for the given buffers.
StatusOr<Shape> TransferBuffersFromOutfeedInternal(
perftools::gputools::StreamExecutor* executor,
tensorflow::gtl::ArraySlice<std::pair<void*, int64>> buffer_data,
bool is_tuple);
TF_DISALLOW_COPY_AND_ASSIGN(CpuTransferManager);
};

View File

@ -109,6 +109,15 @@ Status FailedPrecondition(const char* format, ...) {
return WithLogBacktrace(tensorflow::errors::FailedPrecondition(message));
}
Status Cancelled(const char* format, ...) {
string message;
va_list args;
va_start(args, format);
tensorflow::strings::Appendv(&message, format, args);
va_end(args);
return WithLogBacktrace(tensorflow::errors::Cancelled(message));
}
Status ResourceExhausted(const char* format, ...) {
string message;
va_list args;

View File

@ -171,6 +171,7 @@ Status InvalidArgument(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);
Status Unimplemented(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);
Status InternalError(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);
Status FailedPrecondition(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);
Status Cancelled(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);
Status ResourceExhausted(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);
Status NotFound(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);
Status Unavailable(const char* format, ...) TF_PRINTF_ATTRIBUTE(1, 2);