Map Staging Area (#9686)

* Map Staging Area Operator

* Add map staging operators to the BUILD

* Don't be too clever about notifying waiting thread

* Add Staging Area Size op

* Use predicate form of condition_variable.wait

* Add a Staging Area clear op

* Add Peek op and capacity to the Staging Area

* map_stage_op.cpp -> map_stage_op.cc

* size should be int32

* Small improvements to staging test case

* Create BaseStagingArea

Containing common functionality to StagingArea and BaseStagingArea

* Add MapStagingArea test cases

* Add an ordering test

* Rename get() to peek()

* Combine pop(key) and popitem() into get(key=None)

* Block on peek methods

Wait for the requested element to be inserted into the staging area.

* Use constructor initializer lists

* Arrange class typedefs, members and methods

Also make previously public helper methods like HasBoundedCapacity() and
IsFull() private.

* Better docs for StagingArea.clear()

* SYCL_DEVICE MapStage => OrderedMapStage

* Initializer list fix

* device('/gpu:0') => test.gpu_device_name()

* Copyright year to 2017

* Reverse keys before random shuffle

* Python docstring updates

* Fix stage op docs

* Order RecordInput correctly

* Updated MapStage op docs

* Capture index by value instead of reference

Its just an int.

* Add support for Staging Area memory limit

Mostly useful for constraining the amount of memory used for Staging
Areas placed on the GPU.

* Use key Tensor as key rather than int64

* Pass dtypes to StagingMap

* CamelCase types

* Support barrier operation on the MapStagingArea

- It is now possible to insert incomplete elements.
- Key std::map and std::unordered_map on scalar Tensor int64
  (avoids unnecessary allocations)

* Remove shuffling to make test deterministic

* Use size_t for indices and positive quantities

* Fix lint errors

- autopep8 data_flow_ops.py indentation to 2 spaces.
- Alphabetically order data_flow lib dependencies in BUILD

* extra capacity() should be memory_limit()

* Run buildifier

* Fix OrderedMapIncompleteSize kernel name

* Fix device name in colocation test

Device name is "" if no GPU is present.

* Fix subtle size_t to int conversion bug

* Add #include <numeric> for windows builds

* Add #include <numeric> in stage_op.cc too
This commit is contained in:
Simon Perkins 2017-05-20 00:06:37 +02:00 committed by Rasmus Munk Larsen
parent 47bcf2919f
commit 1705b099df
8 changed files with 2491 additions and 133 deletions

View File

@ -288,6 +288,15 @@ tf_kernel_library(
],
)
tf_kernel_library(
name = "map_stage_op",
srcs = ["map_stage_op.cc"],
deps = [
"//tensorflow/core:framework",
"//tensorflow/core:lib",
],
)
cc_library(
name = "queue_base",
srcs = ["queue_base.cc"],
@ -1315,6 +1324,7 @@ cc_library(
":fifo_queue_op",
":lookup_table_init_op",
":lookup_table_op",
":map_stage_op",
":padding_fifo_queue_op",
":priority_queue_op",
":queue_ops",

View File

@ -0,0 +1,816 @@
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include <map>
#include <numeric>
#include <unordered_map>
#include <vector>
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/resource_mgr.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/framework/tensor_shape.h"
#include "tensorflow/core/lib/gtl/optional.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/mutex.h"
namespace tensorflow {
namespace {
// Partial Ordering Comparator for Tensor keys containing scalar int64's
struct KeyTensorLess {
bool operator()(const Tensor & lhs, const Tensor & rhs) const {
return std::less<int64>{}(lhs.scalar<int64>()(),
rhs.scalar<int64>()());
}
};
// Key Equality operator for Tensor keys containing scalar int64's
struct KeyTensorEqual {
bool operator()(const Tensor & lhs, const Tensor & rhs) const {
return std::equal_to<int64>{}(lhs.scalar<int64>()(),
rhs.scalar<int64>()());
}
};
// Hash for Tensor keys containing scalar int64's
struct KeyTensorHash {
std::size_t operator()(const Tensor & key) const {
return std::hash<int64>{}(key.scalar<int64>()());
}
};
// General Template Definition
template <bool Ordered, typename Data>
struct MapTraits {};
// Partially specialise for ordered
template <typename Data>
struct MapTraits<true, Data>
{
typedef Tensor KeyType;
typedef Data DataType;
typedef std::map<KeyType, Data, KeyTensorLess> MapType;
};
// Partially specialise for unordered
template <typename Data>
struct MapTraits<false, Data>
{
typedef Tensor KeyType;
typedef Data DataType;
typedef std::unordered_map<KeyType, Data,
KeyTensorHash, KeyTensorEqual> MapType;
};
// Wrapper around map/unordered_map
template <bool Ordered>
class StagingMap : public ResourceBase
{
public:
// Public typedefs
typedef std::vector<Tensor> Tuple;
typedef gtl::optional<Tensor> OptionalTensor;
typedef std::vector<OptionalTensor> IncompleteTuple;
typedef MapTraits<Ordered, Tuple> MapTraits_;
typedef typename MapTraits_::MapType MapType;
typedef typename MapTraits_::KeyType KeyType;
typedef MapTraits<false, IncompleteTuple> IncompleteTraits;
typedef typename IncompleteTraits::MapType IncompleteType;
private:
// Private variables
DataTypeVector dtypes_ GUARDED_BY(mu_);
std::size_t capacity_ GUARDED_BY(mu_);
std::size_t memory_limit_ GUARDED_BY(mu_);
std::size_t current_bytes_ GUARDED_BY(mu_);
mutex mu_;
condition_variable not_empty_;
condition_variable full_;
IncompleteType incomplete_ GUARDED_BY(mu_);
MapType map_ GUARDED_BY(mu_);
private:
// private methods
// If map is configured for bounded capacity, notify
// waiting inserters that space is now available
void notify_inserters_if_bounded(mutex_lock & l)
{
if(has_capacity() || has_memory_limit())
{
l.unlock();
full_.notify_one();
}
}
// Notify any removers waiting to extract values
// that data is now available
void notify_removers(mutex_lock & l)
{
l.unlock();
not_empty_.notify_one();
}
inline bool has_capacity()
{ return capacity_ > 0; }
inline bool has_memory_limit()
{ return memory_limit_ > 0; }
inline bool would_exceed_memory_limit(std::size_t bytes)
{ return bytes + current_bytes_ > memory_limit_; }
inline bool is_capacity_full()
{ return map_.size() >= capacity_; }
// Get number of bytes in the tuple
inline std::size_t get_tuple_bytes(const Tuple & tuple)
{
return std::accumulate(tuple.begin(), tuple.end(), 0,
[](const std::size_t & lhs, const Tensor & rhs) {
return lhs + rhs.TotalBytes();
});
}
// Check that the index is within bounds
inline Status check_index(const Tensor & key, std::size_t index)
{
if(index >= dtypes_.size())
{
return Status(errors::InvalidArgument("Index '",
index, "' for key '", key.scalar<int64>()(),
"' was out of bounds '", dtypes_.size(), "'."));
}
return Status::OK();
}
// Check that the optional value at the specified index
// is uninitialized
inline Status check_index_uninitialized(const Tensor & key,
std::size_t index,
const IncompleteTuple & tuple)
{
if(tuple[index].has_value())
{
return Status(errors::InvalidArgument("The tensor for index '",
index, "' for key '", key.scalar<int64>()(),
"' was already initialized '", dtypes_.size(), "'."));
}
return Status::OK();
}
// Check that the indices are strictly ordered
inline Status check_index_ordering(const Tensor & indices)
{
auto findices = indices.flat<int>();
for(std::size_t i = 0; i < findices.dimension(0)-1; ++i)
{
if(findices(i) < findices(i+1))
{ continue; }
return Status(errors::InvalidArgument("Indices are not "
"strictly ordered"));
}
return Status::OK();
}
// Check bytes are within memory limits memory limits
inline Status check_memory_limit(std::size_t bytes)
{
if(has_memory_limit() && bytes > memory_limit_) {
return Status(errors::ResourceExhausted("Attempted to insert "
"tensors with combined size of '", bytes, "' bytes into "
"Staging Area with a memory limit of '", memory_limit_, "'."));
}
return Status::OK();
}
// Insert incomplete data into the Barrier
Status put_incomplete(const KeyType & key,
const Tensor & indices,
Tuple * tuple,
mutex_lock &l)
{
auto findices = indices.flat<int>();
// Search for the key in our incomplete set
auto it = incomplete_.find(key);
// Check that the tuple fits within the memory limit
std::size_t tuple_bytes = get_tuple_bytes(*tuple);
TF_RETURN_IF_ERROR(check_memory_limit(tuple_bytes));
if(has_memory_limit())
{
full_.wait(l, [tuple_bytes, this]() {
// Stop waiting if we don't exceed the memory limit
return !would_exceed_memory_limit(tuple_bytes);
});
}
// This key isn't present in the incomplete set
// Create IncompleteTuple and insert
if(it == incomplete_.end())
{
IncompleteTuple empty(dtypes_.size());
// Initialise empty tuple with given dta
for(std::size_t i = 0; i < findices.dimension(0); ++i)
{
std::size_t index = findices(i);
TF_RETURN_IF_ERROR(check_index(key, index));
// Assign tuple at this index
empty[index] = std::move((*tuple)[i]);
}
// Insert into incomplete map
incomplete_.insert({key, std::move(empty)});
// Increment size
current_bytes_ += tuple_bytes;
}
// Found an entry in the incomplete index
// Update with given data and insert complete entries
// into the main map
else
{
// Reference existing incomplete tuple
IncompleteTuple & present = it->second;
// Assign given data
for(std::size_t i = 0; i < findices.dimension(0); ++i)
{
std::size_t index = findices(i);
TF_RETURN_IF_ERROR(check_index(key, index));
TF_RETURN_IF_ERROR(check_index_uninitialized(key,
index, present));
// Assign tuple at this index
present[index] = std::move((*tuple)[i]);
}
// Increment size
current_bytes_ += tuple_bytes;
// Do we have values at all tuple elements?
bool complete = std::all_of(present.begin(), present.end(),
[](const OptionalTensor & v) { return v.has_value(); });
// If so, put the tuple in the actual map
if(complete)
{
// Create a tuple for insertion
Tuple new_tuple;
for(const auto & v: present)
{ new_tuple.push_back(v.value()); }
// Remove from incomplete
incomplete_.erase(it);
TF_RETURN_IF_ERROR(put_complete(key, &new_tuple, l));
}
}
return Status::OK();
}
// Does the insertion into the actual staging area
Status put_complete(const KeyType & key, Tuple * tuple,
mutex_lock & l)
{
// Insert key and tuples into the map
map_.insert({key, std::move(*tuple)});
notify_removers(l);
return Status::OK();
}
public:
// public methods
explicit StagingMap(const DataTypeVector & dtypes,
std::size_t capacity, std::size_t memory_limit) :
dtypes_(dtypes),
capacity_(capacity),
memory_limit_(memory_limit),
current_bytes_(0) {}
Status put(KeyType* key, const Tensor * indices,
Tuple* tuple)
{
mutex_lock l(mu_);
// Sanity check the indices
TF_RETURN_IF_ERROR(check_index_ordering(*indices));
// Handle incomplete inserts
if(indices->NumElements() != dtypes_.size())
{
return put_incomplete(*key, *indices, tuple, l);
}
std::size_t tuple_bytes = get_tuple_bytes(*tuple);
// Check that tuple_bytes fits within the memory limit
TF_RETURN_IF_ERROR(check_memory_limit(tuple_bytes));
// If map capacity is bounded wait until map is not full
if(has_capacity() || has_memory_limit()) {
full_.wait(l, [tuple_bytes, this]() {
// If there's a memory limit, check if there's space for insertion
bool memory_limit_valid = has_memory_limit() ?
!would_exceed_memory_limit(tuple_bytes) : true;
// If we're configured for capacity check if there's space for insertion
bool capacity_valid = has_capacity() ? !is_capacity_full() : true;
// Stop waiting upon success for both conditions
return memory_limit_valid && capacity_valid;
});
}
// Do the put operation
TF_RETURN_IF_ERROR(put_complete(*key, tuple, l));
// Update the current size
current_bytes_ += tuple_bytes;
return Status::OK();
}
Status get(const KeyType* key, Tuple* tuple)
{
mutex_lock l(mu_);
typename MapType::const_iterator it;
// Wait until the element with the requested key is present
not_empty_.wait(l, [&, this]() {
it = map_.find(*key);
return it != map_.end();
});
// Copy tensors into the tuple
for(const auto & tensor : it->second)
{ tuple->push_back(tensor); }
// Update bytes in the Staging Area
current_bytes_ -= get_tuple_bytes(*tuple);
return Status::OK();
}
Status pop(const KeyType* key, Tuple* tuple)
{
mutex_lock l(mu_);
typename MapType::iterator it;
// Wait until the element with the requested key is present
not_empty_.wait(l, [&, this]() {
it = map_.find(*key);
return it != this->map_.end();
});
// Move from the entry as its erased anyway
*tuple = std::move(it->second);
// Remove
map_.erase(it);
// Update bytes in the Staging Area
current_bytes_ -= get_tuple_bytes(*tuple);
notify_inserters_if_bounded(l);
return Status::OK();
}
Status popitem(KeyType* key, Tuple* tuple)
{
mutex_lock l(mu_);
// Wait until map is not empty
not_empty_.wait(l, [this]() { return !this->map_.empty(); });
// Move from the first element and erase it
*tuple = std::move(map_.begin()->second);
*key = map_.begin()->first;
map_.erase(map_.begin());
// Update bytes in the Staging Area
current_bytes_ -= get_tuple_bytes(*tuple);
notify_inserters_if_bounded(l);
return Status::OK();
}
Status clear()
{
mutex_lock l(mu_);
map_.clear();
incomplete_.clear();
current_bytes_ = 0;
notify_inserters_if_bounded(l);
return Status::OK();
}
size_t incomplete_size()
{
mutex_lock l(mu_);
return incomplete_.size();
}
size_t size()
{
// Lock the map and return the size
mutex_lock l(mu_);
return map_.size();
}
string DebugString()
{
return "StagingMap";
}
};
template <bool Ordered>
Status GetStagingMap(OpKernelContext* ctx,
const NodeDef& ndef,
StagingMap<Ordered>** map)
{
auto rm = ctx->resource_manager();
ContainerInfo cinfo;
// Lambda for creating the Staging Area
auto create_fn = [&ndef](StagingMap<Ordered>** ret) -> Status
{
DataTypeVector dtypes;
int64 capacity;
int64 memory_limit;
TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "dtypes", &dtypes));
TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "capacity", &capacity));
TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "memory_limit", &memory_limit));
*ret = new StagingMap<Ordered>(dtypes, capacity, memory_limit);
return Status::OK();
};
TF_RETURN_IF_ERROR(cinfo.Init(rm, ndef, true /* use name() */));
TF_RETURN_IF_ERROR(rm->LookupOrCreate<StagingMap<Ordered>>(
cinfo.container(), cinfo.name(),
map, create_fn));
return Status::OK();
}
template <bool Ordered>
class MapStageOp : public OpKernel
{
public:
explicit MapStageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
void Compute(OpKernelContext* ctx) override {
StagingMap<Ordered>* map = nullptr;
OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map));
core::ScopedUnref scope(map);
typename StagingMap<Ordered>::Tuple tuple;
const Tensor * key_tensor;
const Tensor * indices_tensor;
OpInputList values_tensor;
OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor));
OP_REQUIRES_OK(ctx, ctx->input("indices", &indices_tensor));
OP_REQUIRES_OK(ctx, ctx->input_list("values", &values_tensor));
// Create copy for insertion into Staging Area
Tensor key(*key_tensor);
// Create the tuple to store
for (std::size_t i = 0; i < values_tensor.size(); ++i) {
tuple.push_back(values_tensor[i]);
}
// Store the tuple in the map
OP_REQUIRES_OK(ctx, map->put(&key, indices_tensor, &tuple));
}
};
REGISTER_KERNEL_BUILDER(Name("MapStage").Device(DEVICE_CPU),
MapStageOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapStage").Device(DEVICE_CPU),
MapStageOp<true>);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("MapStage")
.HostMemory("key")
.HostMemory("indices")
.Device(DEVICE_GPU), MapStageOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapStage")
.HostMemory("key")
.HostMemory("indices")
.Device(DEVICE_GPU), MapStageOp<true>);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("MapStage").HostMemory("key")
.Device(DEVICE_SYCL), MapStageOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapStage").HostMemory("key")
.Device(DEVICE_SYCL), MapStageOp<true>);
#endif // TENSORFLOW_USE_SYCL
template <bool Ordered>
class MapUnstageOp : public OpKernel
{
public:
explicit MapUnstageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
// Using this op in such a way that it blocks forever
// is an error. As such cancellation is not handled.
void Compute(OpKernelContext* ctx) override {
StagingMap<Ordered>* map = nullptr;
OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map));
core::ScopedUnref scope(map);
typename StagingMap<Ordered>::Tuple tuple;
const Tensor * key_tensor;
OpInputList values_tensor;
OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor));
OP_REQUIRES_OK(ctx, map->pop(key_tensor, &tuple));
OP_REQUIRES(
ctx, tuple.size() == (size_t)ctx->num_outputs(),
errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(),
" vs. ", ctx->num_outputs()));
for (size_t i = 0; i < tuple.size(); ++i) {
ctx->set_output(i, tuple[i]);
}
}
};
REGISTER_KERNEL_BUILDER(Name("MapUnstage").Device(DEVICE_CPU),
MapUnstageOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage").Device(DEVICE_CPU),
MapUnstageOp<true>);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("MapUnstage").HostMemory("key")
.Device(DEVICE_GPU), MapUnstageOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage").HostMemory("key")
.Device(DEVICE_GPU), MapUnstageOp<true>);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("MapUnstage").HostMemory("key")
.Device(DEVICE_SYCL), MapUnstageOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage").HostMemory("key")
.Device(DEVICE_SYCL), MapUnstageOp<true>);
#endif // TENSORFLOW_USE_SYCL
template <bool Ordered>
class MapPeekOp : public OpKernel
{
public:
explicit MapPeekOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
// Using this op in such a way that it blocks forever
// is an error. As such cancellation is not handled.
void Compute(OpKernelContext* ctx) override {
StagingMap<Ordered>* map = nullptr;
OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map));
core::ScopedUnref scope(map);
typename StagingMap<Ordered>::Tuple tuple;
const Tensor * key_tensor;
OpInputList values_tensor;
OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor));
OP_REQUIRES_OK(ctx, map->get(key_tensor, &tuple));
OP_REQUIRES(
ctx, tuple.size() == (size_t)ctx->num_outputs(),
errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(),
" vs. ", ctx->num_outputs()));
for (size_t i = 0; i < tuple.size(); ++i) {
ctx->set_output(i, tuple[i]);
}
}
};
REGISTER_KERNEL_BUILDER(Name("MapPeek").Device(DEVICE_CPU),
MapPeekOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek").Device(DEVICE_CPU),
MapPeekOp<true>);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("MapPeek").HostMemory("key")
.Device(DEVICE_GPU), MapPeekOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek").HostMemory("key")
.Device(DEVICE_GPU), MapPeekOp<true>);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("MapPeek").HostMemory("key")
.Device(DEVICE_SYCL), MapPeekOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek").HostMemory("key")
.Device(DEVICE_SYCL), MapPeekOp<true>);
#endif // TENSORFLOW_USE_SYCL
template <bool Ordered>
class MapUnstageNoKeyOp : public OpKernel
{
public:
explicit MapUnstageNoKeyOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
// Using this op in such a way that it blocks forever
// is an error. As such cancellation is not handled.
void Compute(OpKernelContext* ctx) override {
StagingMap<Ordered>* map = nullptr;
OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map));
core::ScopedUnref scope(map);
// Pop a random (key, value) off the map
typename StagingMap<Ordered>::KeyType key;
typename StagingMap<Ordered>::Tuple tuple;
OP_REQUIRES_OK(ctx, map->popitem(&key, &tuple));
// Allocate a key tensor and assign the key as the first output
ctx->set_output(0, key);
// Set the rest of the outputs to the tuple Tensors
OP_REQUIRES(ctx,
tuple.size() == (size_t)ctx->num_outputs()-1,
errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(),
" vs. ", ctx->num_outputs()-1));
for (size_t i = 0; i < tuple.size(); ++i)
{
ctx->set_output(i+1, tuple[i]);
}
}
};
REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey").Device(DEVICE_CPU),
MapUnstageNoKeyOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey").Device(DEVICE_CPU),
MapUnstageNoKeyOp<true>);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey").HostMemory("key")
.Device(DEVICE_GPU), MapUnstageNoKeyOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey").HostMemory("key")
.Device(DEVICE_GPU), MapUnstageNoKeyOp<true>);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey").HostMemory("key")
.Device(DEVICE_SYCL), MapUnstageNoKeyOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey").HostMemory("key")
.Device(DEVICE_SYCL), MapUnstageNoKeyOp<true>);
#endif // TENSORFLOW_USE_SYCL
template <bool Ordered>
class MapSizeOp : public OpKernel
{
public:
explicit MapSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
void Compute(OpKernelContext* ctx) override
{
StagingMap<Ordered>* map = nullptr;
OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map));
core::ScopedUnref scope(map);
// Allocate size output tensor
Tensor * size = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}),
&size));
// Set it to the actual size
size->scalar<int32>().setConstant(map->size());
}
};
REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_CPU),
MapSizeOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_CPU),
MapSizeOp<true>);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_GPU)
.HostMemory("size"), MapSizeOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_GPU)
.HostMemory("size"), MapSizeOp<true>);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_SYCL)
.HostMemory("size"), MapSizeOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_SYCL)
.HostMemory("size"), MapSizeOp<true>);
#endif // TENSORFLOW_USE_SYCL
template <bool Ordered>
class MapIncompleteSizeOp : public OpKernel
{
public:
explicit MapIncompleteSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
void Compute(OpKernelContext* ctx) override
{
StagingMap<Ordered>* map = nullptr;
OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map));
core::ScopedUnref scope(map);
// Allocate size output tensor
Tensor * size = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}),
&size));
// Set it to the actual size
size->scalar<int32>().setConstant(map->incomplete_size());
}
};
REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_CPU),
MapIncompleteSizeOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_CPU),
MapIncompleteSizeOp<true>);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_GPU)
.HostMemory("size"), MapIncompleteSizeOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_GPU)
.HostMemory("size"), MapIncompleteSizeOp<true>);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_SYCL)
.HostMemory("size"), MapIncompleteSizeOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_SYCL)
.HostMemory("size"), MapIncompleteSizeOp<true>);
#endif // TENSORFLOW_USE_SYCL
template <bool Ordered>
class MapClearOp : public OpKernel
{
public:
explicit MapClearOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
void Compute(OpKernelContext* ctx) override
{
StagingMap<Ordered>* map = nullptr;
OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map));
core::ScopedUnref scope(map);
OP_REQUIRES_OK(ctx, map->clear());
}
};
REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_CPU),
MapClearOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_CPU),
MapClearOp<true>);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_GPU),
MapClearOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_GPU),
MapClearOp<true>);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_SYCL),
MapClearOp<false>);
REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_SYCL),
MapClearOp<true>);
#endif // TENSORFLOW_USE_SYCL
} // namespace
} // namespace tensorflow

View File

@ -1,4 +1,4 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,6 +14,7 @@ limitations under the License.
==============================================================================*/
#include <deque>
#include <numeric>
#include <vector>
#include "tensorflow/core/framework/op_kernel.h"
@ -30,26 +31,153 @@ namespace {
class Buffer : public ResourceBase {
public:
explicit Buffer() {}
// public types
typedef std::vector<Tensor> Tuple;
// the Buffer takes ownership of the Tuple
void Put(Tuple* tuple) {
mutex_lock l(mu_);
buf_.push_back(std::move(*tuple));
non_empty_cond_var_.notify_one(); // maybe possible to optimize by reducing
// how often this signal is sent
private:
// private variables
std::size_t capacity_;
std::size_t memory_limit_;
std::size_t current_bytes_;
mutex mu_;
condition_variable non_empty_cond_var_;
condition_variable full_cond_var_;
std::deque<Tuple> buf_ GUARDED_BY(mu_);
private:
// private methods
// If the buffer is configured for bounded capacity, notify
// waiting inserters that space is now available
void notify_inserters_if_bounded(mutex_lock & l)
{
if(IsBounded())
{
l.unlock();
full_cond_var_.notify_one();
}
}
void Get(Tuple* tuple) { // TODO(zhifengc): Support cancellation.
// Are there a limit number of elements or a memory limit
// configued on this buffer?
bool IsBounded() {
return capacity_ > 0 || memory_limit_ > 0;
}
bool IsCapacityFull() {
return buf_.size() >= capacity_;
}
bool WouldExceedMemoryLimit(std::size_t bytes) {
return bytes + current_bytes_ > memory_limit_;
}
std::size_t GetTupleBytes(const Tuple & tuple)
{
return std::accumulate(tuple.begin(), tuple.end(), 0,
[](const std::size_t & lhs, const Tensor & rhs) {
return lhs + rhs.TotalBytes();
});
}
public:
// public methods
explicit Buffer(std::size_t capacity, std::size_t memory_limit) :
capacity_(capacity),
memory_limit_(memory_limit),
current_bytes_(0) {}
// the Buffer takes ownership of the Tuple
Status Put(Tuple* tuple) {
mutex_lock l(mu_);
while (buf_.empty()) {
non_empty_cond_var_.wait(l);
std::size_t tuple_bytes = GetTupleBytes(*tuple);
// Sanity check so that we don't block for ever below
if(memory_limit_ > 0 && tuple_bytes > memory_limit_) {
return Status(errors::ResourceExhausted("Attempted to insert "
"tensors with combined size of '", tuple_bytes, "' bytes into "
"Staging Area with a memory limit of '", memory_limit_, "'."));
}
// If buffer capacity is bounded wait until elements have been removed
if(IsBounded()) {
full_cond_var_.wait(l, [tuple_bytes, this]() {
// If there's a memory limit, check if there's space for insertion
bool memory_limit_valid = memory_limit_ > 0 ?
!WouldExceedMemoryLimit(tuple_bytes) : true;
// If we're configured for capacity check if there's space for insertion
bool capacity_valid = capacity_ > 0 ? !IsCapacityFull() : true;
// Stop waiting upon success for both conditions
return capacity_valid && memory_limit_valid;
});
}
// Update bytes in the Staging Area
current_bytes_ += tuple_bytes;
// Store tuple
buf_.push_back(std::move(*tuple));
l.unlock();
// maybe possible to optimize by reducing
// how often this signal is sent
non_empty_cond_var_.notify_one();
return Status::OK();
}
// Get tuple at front of the buffer
void Get(Tuple* tuple) { // TODO(zhifengc): Support cancellation.
mutex_lock l(mu_);
// Wait for data if the buffer is empty
non_empty_cond_var_.wait(l, [this]() {
return !buf_.empty();
});
// Move data into the output tuple
*tuple = std::move(buf_.front());
buf_.pop_front();
// Update bytes in the Staging Area
current_bytes_ -= GetTupleBytes(*tuple);
notify_inserters_if_bounded(l);
}
// Return tuple at index
Status Peek(std::size_t index, Tuple* tuple) {
mutex_lock l(mu_);
// Wait if the requested index is not available
non_empty_cond_var_.wait(l, [index, this]() {
return index < this->buf_.size();
});
// Place tensors in the output tuple
for(const auto & tensor: buf_[index]) {
tuple->push_back(tensor);
}
return Status::OK();
}
// Buffer size
size_t Size() {
mutex_lock l(mu_);
return buf_.size();
}
void Clear() {
mutex_lock l(mu_);
buf_.clear();
current_bytes_ = 0;
notify_inserters_if_bounded(l);
}
string DebugString() {
@ -57,23 +185,27 @@ class Buffer : public ResourceBase {
return strings::StrCat("Staging size: ", buf_.size());
}
private:
mutex mu_;
condition_variable non_empty_cond_var_;
std::deque<Tuple> buf_ GUARDED_BY(mu_);
};
Status CreateBuffer(Buffer** ret) {
*ret = new Buffer;
return Status::OK();
}
Status GetBuffer(OpKernelContext* ctx, const NodeDef& ndef, Buffer** buf) {
auto rm = ctx->resource_manager();
ContainerInfo cinfo;
// Lambda for creating the Staging Area
auto create_fn = [&ndef](Buffer** ret) -> Status
{
int64 capacity;
int64 memory_limit;
TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "capacity", &capacity));
TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "memory_limit", &memory_limit));
*ret = new Buffer(capacity, memory_limit);
return Status::OK();
};
TF_RETURN_IF_ERROR(cinfo.Init(rm, ndef, true /* use name() */));
TF_RETURN_IF_ERROR(rm->LookupOrCreate<Buffer>(cinfo.container(), cinfo.name(),
buf, CreateBuffer));
buf, create_fn));
return Status::OK();
}
@ -88,10 +220,10 @@ class StageOp : public OpKernel {
OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
core::ScopedUnref scope(buf);
Buffer::Tuple tuple;
for (int i = 0; i < ctx->num_inputs(); ++i) {
for (std::size_t i = 0; i < ctx->num_inputs(); ++i) {
tuple.push_back(ctx->input(i));
}
buf->Put(&tuple);
OP_REQUIRES_OK(ctx, buf->Put(&tuple));
}
};
@ -114,11 +246,13 @@ class UnstageOp : public OpKernel {
OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
core::ScopedUnref scope(buf);
Buffer::Tuple tuple;
buf->Get(&tuple);
OP_REQUIRES(
ctx, tuple.size() == (size_t)ctx->num_outputs(),
OP_REQUIRES(ctx, tuple.size() == (size_t)ctx->num_outputs(),
errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(),
" vs. ", ctx->num_outputs()));
for (size_t i = 0; i < tuple.size(); ++i) {
ctx->set_output(i, tuple[i]);
}
@ -133,4 +267,97 @@ REGISTER_KERNEL_BUILDER(Name("Unstage").Device(DEVICE_GPU), UnstageOp);
REGISTER_KERNEL_BUILDER(Name("Unstage").Device(DEVICE_SYCL), UnstageOp);
#endif // TENSORFLOW_USE_SYCL
class StagePeekOp : public OpKernel {
public:
explicit StagePeekOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
// Using this op in such a way that it blocks forever
// is an error. As such cancellation is not handled.
void Compute(OpKernelContext* ctx) override {
Buffer* buf = nullptr;
OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
core::ScopedUnref scope(buf);
Buffer::Tuple tuple;
std::size_t index = ctx->input(0).scalar<int>()();
OP_REQUIRES_OK(ctx, buf->Peek(index, &tuple));
OP_REQUIRES(ctx, tuple.size() == (size_t)ctx->num_outputs(),
errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(),
" vs. ", ctx->num_outputs()));
for (size_t i = 0; i < tuple.size(); ++i) {
ctx->set_output(i, tuple[i]);
}
}
};
REGISTER_KERNEL_BUILDER(Name("StagePeek").Device(DEVICE_CPU),
StagePeekOp);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("StagePeek").HostMemory("index").
Device(DEVICE_GPU), StagePeekOp);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("StagePeek").HostMemory("index")
.Device(DEVICE_SYCL), StagePeekOp);
#endif // TENSORFLOW_USE_SYCL
class StageSizeOp : public OpKernel {
public:
explicit StageSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
// Using this op in such a way that it blocks forever
// is an error. As such cancellation is not handled.
void Compute(OpKernelContext* ctx) override {
Buffer* buf = nullptr;
OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
core::ScopedUnref scope(buf);
// Allocate size output tensor
Tensor * size = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}),
&size));
// Set it to the actual size
size->scalar<int32>().setConstant(buf->Size());
}
};
REGISTER_KERNEL_BUILDER(Name("StageSize").Device(DEVICE_CPU), StageSizeOp);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("StageSize").HostMemory("size")
.Device(DEVICE_GPU), StageSizeOp);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("StageSize").HostMemory("size")
.Device(DEVICE_SYCL), StageSizeOp);
#endif // TENSORFLOW_USE_SYCL
class StageClearOp : public OpKernel {
public:
explicit StageClearOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
// Using this op in such a way that it blocks forever
// is an error. As such cancellation is not handled.
void Compute(OpKernelContext* ctx) override {
Buffer* buf = nullptr;
OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
core::ScopedUnref scope(buf);
buf->Clear();
}
};
REGISTER_KERNEL_BUILDER(Name("StageClear").Device(DEVICE_CPU), StageClearOp);
#if GOOGLE_CUDA
REGISTER_KERNEL_BUILDER(Name("StageClear").Device(DEVICE_GPU), StageClearOp);
#endif
#ifdef TENSORFLOW_USE_SYCL
REGISTER_KERNEL_BUILDER(Name("StageClear").Device(DEVICE_SYCL), StageClearOp);
#endif // TENSORFLOW_USE_SYCL
} // namespace tensorflow

View File

@ -1967,6 +1967,8 @@ handle: The handle for a tensor stored in the session state.
REGISTER_OP("Stage")
.Input("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
@ -1979,6 +1981,11 @@ The basic functionality of this Op is similar to a queue with many
fewer capabilities and options. This Op is optimized for performance.
values: a list of tensors
dtypes A list of data types that inserted values should adhere to.
capacity: Maximum number of elements in the Staging Area. If > 0, inserts
on the container will block when the capacity is reached.
memory_limit: The maximum number of bytes allowed for Tensors in the Staging Area.
If > 0, inserts will block until sufficient space is available.
container: If non-empty, this queue is placed in the given container. Otherwise,
a default container is used.
shared_name: It is necessary to match this name to the matching Unstage Op.
@ -1986,6 +1993,8 @@ shared_name: It is necessary to match this name to the matching Unstage Op.
REGISTER_OP("Unstage")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
@ -1998,6 +2007,277 @@ The basic funtionality is similar to dequeue with many fewer
capabilities and options. This Op is optimized for performance.
)doc");
REGISTER_OP("StagePeek")
.Input("index: int32")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op peeks at the values at the specified index. If the
underlying container does not contain sufficient elements
this op will block until it does. This Op is optimized for
performance.
)doc");
REGISTER_OP("StageSize")
.Output("size: int32")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(shape_inference::ScalarShape)
.SetIsStateful()
.Doc(R"doc(
Op returns the number of elements in the underlying container.
)doc");
REGISTER_OP("StageClear")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op removes all elements in the underlying container.
)doc");
// UnorderedMap
REGISTER_OP("MapStage")
.Input("key: int64")
.Input("indices: int32")
.Input("values: fake_dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("fake_dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::NoOutputs)
.SetIsStateful()
.Doc(R"doc(
Stage (key, values) in the underlying container which behaves like a hashtable.
key: int64
values: a list of tensors
dtypes A list of data types that inserted values should adhere to.
capacity: Maximum number of elements in the Staging Area. If > 0, inserts
on the container will block when the capacity is reached.
container: If non-empty, this queue is placed in the given container. Otherwise,
a default container is used.
shared_name: It is necessary to match this name to the matching Unstage Op.
)doc");
REGISTER_OP("MapPeek")
.Input("key: int64")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op peeks at the values at the specified key. If the
underlying container does not contain this key
this op will block until it does.
)doc");
REGISTER_OP("MapUnstage")
.Input("key: int64")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op removes and returns the values associated with the key
from the underlying container. If the underlying container
does not contain this key, the op will block until it does.
)doc");
REGISTER_OP("MapUnstageNoKey")
.Output("key: int64")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op removes and returns a random (key, value)
from the underlying container. If the underlying container
does not contain elements, the op will block until it does.
)doc");
REGISTER_OP("MapSize")
.Output("size: int32")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::ScalarShape)
.SetIsStateful()
.Doc(R"doc(
Op returns the number of elements in the underlying container.
)doc");
REGISTER_OP("MapIncompleteSize")
.Output("size: int32")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::ScalarShape)
.SetIsStateful()
.Doc(R"doc(
Op returns the number of incomplete elements in the underlying container.
)doc");
REGISTER_OP("MapClear")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::NoOutputs)
.SetIsStateful()
.Doc(R"doc(
Op removes all elements in the underlying container.
)doc");
// OrderedMap
REGISTER_OP("OrderedMapStage")
.Input("key: int64")
.Input("indices: int32")
.Input("values: fake_dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("fake_dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::NoOutputs)
.SetIsStateful()
.Doc(R"doc(
Stage (key, values) in the underlying container which behaves like a ordered
associative container. Elements are ordered by key.
key: int64
values: a list of tensors
dtypes A list of data types that inserted values should adhere to.
capacity: Maximum number of elements in the Staging Area. If > 0, inserts
on the container will block when the capacity is reached.
container: If non-empty, this queue is placed in the given container. Otherwise,
a default container is used.
shared_name: It is necessary to match this name to the matching Unstage Op.
)doc");
REGISTER_OP("OrderedMapPeek")
.Input("key: int64")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op peeks at the values at the specified key. If the
underlying container does not contain this key
this op will block until it does. This Op is optimized for
performance.
)doc");
REGISTER_OP("OrderedMapUnstage")
.Input("key: int64")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op removes and returns the values associated with the key
from the underlying container. If the underlying container
does not contain this key, the op will block until it does.
)doc");
REGISTER_OP("OrderedMapUnstageNoKey")
.Output("key: int64")
.Output("values: dtypes")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::UnknownShape)
.SetIsStateful()
.Doc(R"doc(
Op removes and returns the (key, value) element with the smallest
key from the underlying container. If the underlying container
does not contain elements, the op will block until it does.
)doc");
REGISTER_OP("OrderedMapSize")
.Output("size: int32")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::ScalarShape)
.SetIsStateful()
.Doc(R"doc(
Op returns the number of elements in the underlying container.
)doc");
REGISTER_OP("OrderedMapIncompleteSize")
.Output("size: int32")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::ScalarShape)
.SetIsStateful()
.Doc(R"doc(
Op returns the number of incomplete elements in the underlying container.
)doc");
REGISTER_OP("OrderedMapClear")
.Attr("capacity: int >= 0 = 0")
.Attr("memory_limit: int >= 0 = 0")
.Attr("dtypes: list(type)")
.Attr("container: string = ''")
.Attr("shared_name: string = ''")
.SetShapeFn(tensorflow::shape_inference::NoOutputs)
.SetIsStateful()
.Doc(R"doc(
Op removes all elements in the underlying container.
)doc");
REGISTER_OP("RecordInput")
.Output("records: string")
.Attr("file_pattern: string")

View File

@ -2339,6 +2339,20 @@ cuda_py_test(
],
)
cuda_py_test(
name = "map_stage_op_test",
size = "small",
srcs = ["map_stage_op_test.py"],
additional_deps = [
"//tensorflow/python:array_ops",
"//tensorflow/python:client_testlib",
"//tensorflow/python:framework_for_generated_wrappers",
"//tensorflow/python:math_ops",
"//tensorflow/python:util",
"//tensorflow/python:data_flow_ops",
],
)
cuda_py_test(
name = "concat_op_test",
size = "medium",

View File

@ -0,0 +1,383 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import data_flow_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.platform import test
class MapStageTest(test.TestCase):
def testSimple(self):
with self.test_session(use_gpu=True) as sess:
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32)
pi = array_ops.placeholder(dtypes.int64)
gi = array_ops.placeholder(dtypes.int64)
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea([dtypes.float32])
stage = stager.put(pi, [v], [0])
k, y = stager.get(gi)
y = math_ops.reduce_max(math_ops.matmul(y, y))
sess.run(stage, feed_dict={x: -1, pi: 0})
for i in range(10):
_, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i})
self.assertAllClose(4 * (i - 1) * (i - 1) * 128, yval, rtol=1e-4)
def testMultiple(self):
with self.test_session(use_gpu=True) as sess:
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32)
pi = array_ops.placeholder(dtypes.int64)
gi = array_ops.placeholder(dtypes.int64)
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea([dtypes.float32, dtypes.float32])
stage = stager.put(pi, [x, v], [0, 1])
k, (z, y) = stager.get(gi)
y = math_ops.reduce_max(z * math_ops.matmul(y, y))
sess.run(stage, feed_dict={x: -1, pi: 0})
for i in range(10):
_, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i})
self.assertAllClose(
4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4)
def testDictionary(self):
with self.test_session(use_gpu=True) as sess:
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32)
pi = array_ops.placeholder(dtypes.int64)
gi = array_ops.placeholder(dtypes.int64)
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea(
[dtypes.float32, dtypes.float32],
shapes=[[], [128, 128]],
names=['x', 'v'])
stage = stager.put(pi,{'x': x, 'v': v})
key, ret = stager.get(gi)
z = ret['x']
y = ret['v']
y = math_ops.reduce_max(z * math_ops.matmul(y, y))
sess.run(stage, feed_dict={x: -1, pi: 0})
for i in range(10):
_, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i})
self.assertAllClose(
4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4)
def testColocation(self):
gpu_dev = test.gpu_device_name()
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32)
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device(gpu_dev):
stager = data_flow_ops.MapStagingArea([dtypes.float32])
y = stager.put(1, [v], [0])
self.assertEqual(y.device, '/device:GPU:0' if gpu_dev
else gpu_dev)
with ops.device('/cpu:0'):
_, x = stager.get(1)
y = stager.peek(1)
_, z = stager.get()
self.assertEqual(x.device, '/device:CPU:0')
self.assertEqual(y.device, '/device:CPU:0')
self.assertEqual(z.device, '/device:CPU:0')
def testPeek(self):
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.int32, name='x')
pi = array_ops.placeholder(dtypes.int64)
gi = array_ops.placeholder(dtypes.int64)
p = array_ops.placeholder(dtypes.int32, name='p')
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea([dtypes.int32, ], shapes=[[]])
stage = stager.put(pi,[x], [0])
peek = stager.peek(gi)
size = stager.size()
n = 10
with self.test_session(use_gpu=True) as sess:
for i in range(n):
sess.run(stage, feed_dict={x:i, pi:i})
for i in range(n):
self.assertTrue(sess.run(peek, feed_dict={gi: i}) == i)
self.assertTrue(sess.run(size) == 10)
def testSizeAndClear(self):
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32, name='x')
pi = array_ops.placeholder(dtypes.int64)
gi = array_ops.placeholder(dtypes.int64)
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea(
[dtypes.float32, dtypes.float32],
shapes=[[], [128, 128]],
names=['x', 'v'])
stage = stager.put(pi,{'x': x, 'v': v})
size = stager.size()
clear = stager.clear()
with self.test_session(use_gpu=True) as sess:
sess.run(stage, feed_dict={x: -1, pi: 3})
self.assertEqual(sess.run(size), 1)
sess.run(stage, feed_dict={x: -1, pi: 1})
self.assertEqual(sess.run(size), 2)
sess.run(clear)
self.assertEqual(sess.run(size), 0)
def testCapacity(self):
capacity = 3
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.int32, name='x')
pi = array_ops.placeholder(dtypes.int64, name='pi')
gi = array_ops.placeholder(dtypes.int64, name='gi')
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea([dtypes.int32, ],
capacity=capacity, shapes=[[]])
stage = stager.put(pi, [x], [0])
get = stager.get()
size = stager.size()
from six.moves import queue as Queue
import threading
queue = Queue.Queue()
n = 5
missed = 0
with self.test_session(use_gpu=True) as sess:
# Stage data in a separate thread which will block
# when it hits the staging area's capacity and thus
# not fill the queue with n tokens
def thread_run():
for i in range(n):
sess.run(stage, feed_dict={x: i, pi: i})
queue.put(0)
t = threading.Thread(target=thread_run)
t.start()
# Get tokens from the queue, making notes of when we timeout
for i in range(n):
try:
queue.get(timeout=0.05)
except Queue.Empty:
missed += 1
# We timed out n - capacity times waiting for queue puts
self.assertTrue(missed == n - capacity)
# Clear the staging area out a bit
for i in range(n - capacity):
sess.run(get)
# This should now succeed
t.join()
self.assertTrue(sess.run(size) == capacity)
# Clear out the staging area completely
for i in range(capacity):
sess.run(get)
def testMemoryLimit(self):
memory_limit = 512*1024 # 512K
chunk = 200*1024 # 256K
capacity = memory_limit // chunk
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.uint8, name='x')
pi = array_ops.placeholder(dtypes.int64, name='pi')
gi = array_ops.placeholder(dtypes.int64, name='gi')
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea([dtypes.uint8],
memory_limit=memory_limit, shapes=[[]])
stage = stager.put(pi, [x], [0])
get = stager.get()
size = stager.size()
from six.moves import queue as Queue
import threading
import numpy as np
queue = Queue.Queue()
n = 5
missed = 0
with self.test_session(use_gpu=True) as sess:
# Stage data in a separate thread which will block
# when it hits the staging area's capacity and thus
# not fill the queue with n tokens
def thread_run():
for i in range(n):
sess.run(stage, feed_dict={x: np.full(chunk, i, dtype=np.uint8),
pi: i})
queue.put(0)
t = threading.Thread(target=thread_run)
t.start()
# Get tokens from the queue, making notes of when we timeout
for i in range(n):
try:
queue.get(timeout=0.05)
except Queue.Empty:
missed += 1
# We timed out n - capacity times waiting for queue puts
self.assertTrue(missed == n - capacity)
# Clear the staging area out a bit
for i in range(n - capacity):
sess.run(get)
# This should now succeed
t.join()
self.assertTrue(sess.run(size) == capacity)
# Clear out the staging area completely
for i in range(capacity):
sess.run(get)
def testOrdering(self):
import six
import random
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.int32, name='x')
pi = array_ops.placeholder(dtypes.int64, name='pi')
gi = array_ops.placeholder(dtypes.int64, name='gi')
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.MapStagingArea([dtypes.int32, ],
shapes=[[]], ordered=True)
stage = stager.put(pi, [x], [0])
get = stager.get()
size = stager.size()
n = 10
with self.test_session(use_gpu=True) as sess:
# Keys n-1..0
keys = list(reversed(six.moves.range(n)))
for i in keys:
sess.run(stage, feed_dict={pi: i, x: i})
self.assertTrue(sess.run(size) == n)
# Check that key, values come out in ascending order
for i, k in enumerate(reversed(keys)):
get_key, values = sess.run(get)
self.assertTrue(i == k == get_key == values)
self.assertTrue(sess.run(size) == 0)
def testBarrier(self):
with self.test_session(use_gpu=True) as sess:
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32)
f = array_ops.placeholder(dtypes.float32)
v = array_ops.placeholder(dtypes.float32)
pi = array_ops.placeholder(dtypes.int64)
gi = array_ops.placeholder(dtypes.int64)
with ops.device(test.gpu_device_name()):
# Test barrier with dictionary
stager = data_flow_ops.MapStagingArea(
[dtypes.float32, dtypes.float32, dtypes.float32],
names=['x', 'v', 'f'])
stage_xf = stager.put(pi,{'x': x, 'f': f})
stage_v = stager.put(pi, {'v': v})
key, ret = stager.get(gi)
size = stager.size()
isize = stager.incomplete_size()
# 0 complete and incomplete entries
self.assertTrue(sess.run([size, isize]) == [0, 0])
# Stage key 0, x and f tuple entries
sess.run(stage_xf, feed_dict={pi: 0, x: 1, f: 2})
self.assertTrue(sess.run([size, isize]) == [0, 1])
# Stage key 1, x and f tuple entries
sess.run(stage_xf, feed_dict={pi: 1, x: 1, f: 2})
self.assertTrue(sess.run([size, isize]) == [0, 2])
# Now complete key 0 with tuple entry v
sess.run(stage_v, feed_dict={pi: 0, v: 1})
# 1 complete and 1 incomplete entry
self.assertTrue(sess.run([size, isize]) == [1, 1])
# We can now obtain tuple associated with key 0
self.assertTrue(sess.run([key, ret], feed_dict={gi:0})
== [0, { 'x':1, 'f':2, 'v':1}])
# 0 complete and 1 incomplete entry
self.assertTrue(sess.run([size, isize]) == [0, 1])
# Now complete key 1 with tuple entry v
sess.run(stage_v, feed_dict={pi: 1, v: 3})
# We can now obtain tuple associated with key 1
self.assertTrue(sess.run([key, ret], feed_dict={gi:1})
== [1, { 'x':1, 'f':2, 'v':3}])
# Test again with index inserts
stager = data_flow_ops.MapStagingArea(
[dtypes.float32, dtypes.float32, dtypes.float32])
stage_xf = stager.put(pi, [x, f], [0, 2])
stage_v = stager.put(pi, [v], [1])
key, ret = stager.get(gi)
size = stager.size()
isize = stager.incomplete_size()
# 0 complete and incomplete entries
self.assertTrue(sess.run([size, isize]) == [0, 0])
# Stage key 0, x and f tuple entries
sess.run(stage_xf, feed_dict={pi: 0, x: 1, f: 2})
self.assertTrue(sess.run([size, isize]) == [0, 1])
# Stage key 1, x and f tuple entries
sess.run(stage_xf, feed_dict={pi: 1, x: 1, f: 2})
self.assertTrue(sess.run([size, isize]) == [0, 2])
# Now complete key 0 with tuple entry v
sess.run(stage_v, feed_dict={pi: 0, v: 1})
# 1 complete and 1 incomplete entry
self.assertTrue(sess.run([size, isize]) == [1, 1])
# We can now obtain tuple associated with key 0
self.assertTrue(sess.run([key, ret], feed_dict={gi:0})
== [0, [1, 1, 2]])
# 0 complete and 1 incomplete entry
self.assertTrue(sess.run([size, isize]) == [0, 1])
# Now complete key 1 with tuple entry v
sess.run(stage_v, feed_dict={pi: 1, v: 3})
# We can now obtain tuple associated with key 1
self.assertTrue(sess.run([key, ret], feed_dict={gi:1})
== [1, [1,3, 2]])
if __name__ == '__main__':
test.main()

View File

@ -1,4 +1,4 @@
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -31,7 +31,7 @@ class StageTest(test.TestCase):
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32)
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device('/gpu:0'):
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.StagingArea([dtypes.float32])
stage = stager.put([v])
y = stager.get()
@ -78,18 +78,174 @@ class StageTest(test.TestCase):
self.assertAllClose(
4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4)
def testColocation1(self):
def testColocation(self):
gpu_dev = test.gpu_device_name()
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32)
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device('/gpu:0'):
with ops.device(gpu_dev):
stager = data_flow_ops.StagingArea([dtypes.float32])
y = stager.put([v])
self.assertEqual(y.device, '/device:GPU:0')
self.assertEqual(y.device, '/device:GPU:0' if gpu_dev
else gpu_dev)
with ops.device('/cpu:0'):
x = stager.get()
self.assertEqual(x.device, '/device:CPU:0')
def testPeek(self):
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.int32, name='x')
p = array_ops.placeholder(dtypes.int32, name='p')
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.StagingArea([dtypes.int32, ], shapes=[[]])
stage = stager.put([x])
peek = stager.peek(p)
ret = stager.get()
with self.test_session(use_gpu=True) as sess:
for i in range(10):
sess.run(stage, feed_dict={x:i})
for i in range(10):
self.assertTrue(sess.run(peek, feed_dict={p:i}) == i)
def testSizeAndClear(self):
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.float32, name='x')
v = 2. * (array_ops.zeros([128, 128]) + x)
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.StagingArea(
[dtypes.float32, dtypes.float32],
shapes=[[], [128, 128]],
names=['x', 'v'])
stage = stager.put({'x': x, 'v': v})
ret = stager.get()
size = stager.size()
clear = stager.clear()
with self.test_session(use_gpu=True) as sess:
sess.run(stage, feed_dict={x: -1})
self.assertEqual(sess.run(size), 1)
sess.run(stage, feed_dict={x: -1})
self.assertEqual(sess.run(size), 2)
sess.run(clear)
self.assertEqual(sess.run(size), 0)
def testCapacity(self):
capacity = 3
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.int32, name='x')
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.StagingArea([dtypes.int32, ],
capacity=capacity, shapes=[[]])
stage = stager.put([x])
ret = stager.get()
size = stager.size()
from six.moves import queue as Queue
import threading
queue = Queue.Queue()
n = 5
missed = 0
with self.test_session(use_gpu=True) as sess:
# Stage data in a separate thread which will block
# when it hits the staging area's capacity and thus
# not fill the queue with n tokens
def thread_run():
for i in range(n):
sess.run(stage, feed_dict={x: i})
queue.put(0)
t = threading.Thread(target=thread_run)
t.start()
# Get tokens from the queue, making notes of when we timeout
for i in range(n):
try:
queue.get(timeout=0.05)
except Queue.Empty:
missed += 1
# We timed out n - capacity times waiting for queue puts
self.assertTrue(missed == n - capacity)
# Clear the staging area out a bit
for i in range(n - capacity):
self.assertTrue(sess.run(ret) == i)
# Thread should be able to join now
t.join()
self.assertTrue(sess.run(size) == capacity)
# Clear the staging area completely
for i in range(capacity):
self.assertTrue(sess.run(ret) == i+(n-capacity))
self.assertTrue(sess.run(size) == 0)
def testMemoryLimit(self):
memory_limit = 512*1024 # 512K
chunk = 200*1024 # 256K
capacity = memory_limit // chunk
with ops.device('/cpu:0'):
x = array_ops.placeholder(dtypes.uint8, name='x')
with ops.device(test.gpu_device_name()):
stager = data_flow_ops.StagingArea([dtypes.uint8, ],
memory_limit=memory_limit, shapes=[[]])
stage = stager.put([x])
ret = stager.get()
size = stager.size()
from six.moves import queue as Queue
import threading
import numpy as np
queue = Queue.Queue()
n = 5
missed = 0
with self.test_session(use_gpu=True) as sess:
# Stage data in a separate thread which will block
# when it hits the staging area's capacity and thus
# not fill the queue with n tokens
def thread_run():
for i in range(n):
sess.run(stage, feed_dict={x: np.full(chunk, i, dtype=np.uint8)})
queue.put(0)
t = threading.Thread(target=thread_run)
t.start()
# Get tokens from the queue, making notes of when we timeout
for i in range(n):
try:
queue.get(timeout=0.05)
except Queue.Empty:
missed += 1
# We timed out n - capacity times waiting for queue puts
self.assertTrue(missed == n - capacity)
# Clear the staging area out a bit
for i in range(n - capacity):
self.assertTrue(sess.run(ret)[0] == i)
# Thread should be able to join now
t.join()
self.assertTrue(sess.run(size) == capacity)
# Clear the staging area completely
for i in range(capacity):
self.assertTrue(sess.run(ret)[0] == i+(n-capacity))
self.assertTrue(sess.run(size) == 0)
if __name__ == '__main__':
test.main()

View File

@ -1,4 +1,4 @@
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -1344,72 +1344,30 @@ class SparseConditionalAccumulator(ConditionalAccumulatorBase):
dense_shape=return_val.shape)
class StagingArea(object):
"""Class for staging inputs. No ordering guarantees.
A `StagingArea` is a TensorFlow data structure that stores tensors across
multiple steps, and exposes operations that can put and get tensors.
Each `StagingArea` element is a tuple of one or more tensors, where each
tuple component has a static dtype, and may have a static shape.
The capacity of a `StagingArea` is unbounded and supports multiple
concurrent producers and consumers; and provides exactly-once delivery.
Each element of a `StagingArea` is a fixed-length tuple of tensors whose
dtypes are described by `dtypes`, and whose shapes are optionally described
by the `shapes` argument.
If the `shapes` argument is specified, each component of a staging area
element must have the respective fixed shape. If it is
unspecified, different elements may have different shapes,
"""
class BaseStagingArea(object):
"""Base class for Staging Areas."""
_identifier = 0
_lock = threading.Lock()
def __init__(self, dtypes, shapes=None, names=None, shared_name=None):
"""Constructs a staging area object.
The two optional lists, `shapes` and `names`, must be of the same length
as `dtypes` if provided. The values at a given index `i` indicate the
shape and name to use for the corresponding queue component in `dtypes`.
The device scope at the time of object creation determines where the
storage for the `StagingArea` will reside. Calls to `put` will incur a copy
to this memory space, if necessary. Tensors returned by `get` will be
placed according to the device scope when `get` is called.
Args:
dtypes: A list of types. The length of dtypes must equal the number
of tensors in each element.
shapes: (Optional.) Constraints on the shapes of tensors in an element.
A list of shape tuples or None. This list is the same length
as dtypes. If the shape of any tensors in the element are constrained,
all must be; shapes can be None if the shapes should not be constrained.
names: (Optional.) If provided, the `get()` and
`put()` methods will use dictionaries with these names as keys.
Must be None or a list or tuple of the same length as `dtypes`.
shared_name: (Optional.) A name to be used for the shared object. By
passing the same name to two different python objects they will share
the underlying staging area. Must be a string.
Raises:
ValueError: If one of the arguments is invalid.
"""
def __init__(self, dtypes, shapes=None, names=None, shared_name=None,
capacity=0, memory_limit=0):
if shared_name is None:
self._name = ops.get_default_graph().unique_name("StagingArea")
self._name = (ops.get_default_graph()
.unique_name(self.__class__.__name__))
elif isinstance(shared_name, six.string_types):
self._name = shared_name
else:
raise ValueError("shared_name must be a string")
self._dtypes = dtypes
if shapes is not None:
if len(shapes) != len(dtypes):
raise ValueError("StagingArea shapes must be the same length as dtypes")
self._shapes = [tensor_shape.TensorShape(s) for s in shapes]
else:
self._shapes = [tensor_shape.unknown_shape() for _ in self._dtypes]
if names is not None:
if len(names) != len(dtypes):
raise ValueError("StagingArea names must be the same length as dtypes")
@ -1417,6 +1375,9 @@ class StagingArea(object):
else:
self._names = None
self._capacity = capacity
self._memory_limit = memory_limit
# all get and put ops must colocate with this op
with ops.name_scope("%s_root" % self._name):
self._coloc_op = control_flow_ops.no_op()
@ -1441,52 +1402,140 @@ class StagingArea(object):
"""The list of names for each component of a staging area element."""
return self._names
def _check_put_dtypes(self, vals):
@property
def capacity(self):
"""The maximum number of elements of this staging area."""
return self._capacity
@property
def memory_limit(self):
"""The maximum number of bytes of this staging area."""
return self._memory_limit
def _check_put_dtypes(self, vals, indices=None):
"""Validate and convert `vals` to a list of `Tensor`s.
The `vals` argument can be a Tensor, a list or tuple of tensors, or a
dictionary with tensor values.
If `vals` is a list, then the appropriate indices associated with the
values must be provided.
If it is a dictionary, the staging area must have been constructed with a
`names` attribute and the dictionary keys must match the staging area names.
`indices` will be inferred from the dictionary keys.
If the staging area was constructed with a `names` attribute, `vals` must
be a dictionary.
Checks that the dtype and shape of each value matches that
of the staging area.
Args:
vals: A tensor, a list or tuple of tensors, or a dictionary..
Returns:
A list of `Tensor` objects.
A (tensors, indices) tuple where `tensors` is a list of `Tensor` objects
and `indices` is a list of indices associed with the tensors.
Raises:
ValueError: If `vals` is invalid.
ValueError: If `vals` or `indices` is invalid.
"""
if isinstance(vals, dict):
if not self._names:
raise ValueError(
"Staging areas must have names to enqueue a dictionary")
if sorted(self._names) != sorted(vals.keys()):
if not set(vals.keys()).issubset(self._names):
raise ValueError("Keys in dictionary to put do not match names "
"of staging area. Dictionary: (%s), Queue: (%s)" %
(sorted(vals.keys()), sorted(self._names)))
# The order of values in `self._names` indicates the order in which the
# tensors in the dictionary `vals` must be listed.
vals = [vals[k] for k in self._names]
vals, indices, n = zip(*[(vals[k], i, k) for i, k in enumerate(self._names)
if k in vals])
else:
if self._names:
raise ValueError("You must enqueue a dictionary in a staging area "
"with names")
if indices is None:
raise ValueError("Indices must be supplied when inserting a list "
"of tensors")
if len(indices) != len(vals):
raise ValueError("Number of indices '%s' doesn't match "
"number of values '%s'")
if not isinstance(vals, (list, tuple)):
vals = [vals]
indices = [0]
# Sanity check number of values
if not len(vals) <= len(self._dtypes):
raise ValueError("Unexpected number of inputs '%s' vs '%s'" % (
len(values), len(self._dtypes)))
tensors = []
for i, (val, dtype) in enumerate(zip(vals, self._dtypes)):
tensors.append(
ops.convert_to_tensor(
val, dtype=dtype, name="component_%d" % i))
for val, i in zip(vals, indices):
dtype, shape = self._dtypes[i], self._shapes[i]
# Check dtype
if not val.dtype == dtype:
raise ValueError("Datatypes do not match. '%s' != '%s'" %(
str(val.dtype), str(dtype)))
# Check shape
val.get_shape().assert_is_compatible_with(shape)
tensors.append(ops.convert_to_tensor(val, dtype=dtype,
name="component_%d" % i))
return tensors, indices
def _create_device_transfers(self, tensors):
"""Encode inter-device transfers if the current device
is not the same as the Staging Area's device
"""
if not isinstance(tensors, (tuple, list)):
tensors = [tensors]
curr_device_scope = control_flow_ops.no_op().device
if curr_device_scope != self._coloc_op.device:
tensors = [array_ops.identity(t) for t in tensors]
return tensors
def _get_return_value(self, tensors):
"""Return the value to return from a get op.
If the staging area has names, return a dictionary with the
names as keys. Otherwise return either a single tensor
or a list of tensors depending on the length of `tensors`.
Args:
tensors: List of tensors from the get op.
Returns:
A single tensor, a list of tensors, or a dictionary
of tensors.
"""
tensors = self._create_device_transfers(tensors)
# Sets shape
for output, shape in zip(tensors, self._shapes):
output.set_shape(shape)
if self._names:
# The returned values in `tensors` are in the same order as
# the names in `self._names`.
return {n: tensors[i] for i, n in enumerate(self._names)}
elif len(tensors) == 1:
return tensors[0]
else:
return tensors
def _scope_vals(self, vals):
"""Return a list of values to pass to `name_scope()`.
@ -1503,9 +1552,86 @@ class StagingArea(object):
else:
return [vals]
class StagingArea(BaseStagingArea):
"""Class for staging inputs. No ordering guarantees.
A `StagingArea` is a TensorFlow data structure that stores tensors across
multiple steps, and exposes operations that can put and get tensors.
Each `StagingArea` element is a tuple of one or more tensors, where each
tuple component has a static dtype, and may have a static shape.
The capacity of a `StagingArea` may be bounded or unbounded.
It supports multiple concurrent producers and consumers; and
provides exactly-once delivery.
Each element of a `StagingArea` is a fixed-length tuple of tensors whose
dtypes are described by `dtypes`, and whose shapes are optionally described
by the `shapes` argument.
If the `shapes` argument is specified, each component of a staging area
element must have the respective fixed shape. If it is
unspecified, different elements may have different shapes,
It can be configured with a capacity in which case
put(values) will block until space becomes available.
Similarly, it can be configured with a memory limit which
will block put(values) until space is available.
This is mostly useful for limiting the number of tensors on
devices such as GPUs.
All get() and peek() commands block if the the requested data
is not present in the Staging Area.
"""
def __init__(self, dtypes, shapes=None, names=None, shared_name=None,
capacity=0, memory_limit=0):
"""Constructs a staging area object.
The two optional lists, `shapes` and `names`, must be of the same length
as `dtypes` if provided. The values at a given index `i` indicate the
shape and name to use for the corresponding queue component in `dtypes`.
The device scope at the time of object creation determines where the
storage for the `StagingArea` will reside. Calls to `put` will incur a copy
to this memory space, if necessary. Tensors returned by `get` will be
placed according to the device scope when `get` is called.
Args:
dtypes: A list of types. The length of dtypes must equal the number
of tensors in each element.
capacity: (Optional.) Maximum number of elements.
An integer. If zero, the Staging Area is unbounded
memory_limit: (Optional.) Maximum number of bytes of all tensors
in the Staging Area.
An integer. If zero, the Staging Area is unbounded
shapes: (Optional.) Constraints on the shapes of tensors in an element.
A list of shape tuples or None. This list is the same length
as dtypes. If the shape of any tensors in the element are constrained,
all must be; shapes can be None if the shapes should not be constrained.
names: (Optional.) If provided, the `get()` and
`put()` methods will use dictionaries with these names as keys.
Must be None or a list or tuple of the same length as `dtypes`.
shared_name: (Optional.) A name to be used for the shared object. By
passing the same name to two different python objects they will share
the underlying staging area. Must be a string.
Raises:
ValueError: If one of the arguments is invalid.
"""
super(StagingArea, self).__init__(dtypes, shapes,
names, shared_name,
capacity, memory_limit)
def put(self, values, name=None):
"""Create an op that places a value into the staging area.
This operation will block if the `StagingArea` has reached
its capacity.
Args:
values: Tensor (or a tuple of Tensors) to place into the staging area.
name: A name for the operation (optional).
@ -1518,46 +1644,23 @@ class StagingArea(object):
"""
with ops.name_scope(name, "%s_put" % self._name,
self._scope_vals(values)) as scope:
vals = self._check_put_dtypes(values)
if len(values) != len(self._dtypes):
raise ValueError("Unexpected number of inputs " + str(len(values)) +
"vs " + str(len(self._dtypes)))
for val, dtype in zip(vals, self._dtypes):
if val.dtype != dtype:
raise ValueError("Datatypes do not match. " + str(val.dtype) + " != "
+ str(dtype))
for val, shape in zip(vals, self._shapes):
val.get_shape().assert_is_compatible_with(shape)
# Hard-code indices for this staging area
indices = range(len(values)) if isinstance(values, (list, tuple)) else None
vals, _ = self._check_put_dtypes(values, indices)
with ops.colocate_with(self._coloc_op):
op = gen_data_flow_ops.stage(values=vals, shared_name=self._name,
name=scope)
name=scope, capacity=self._capacity,
memory_limit=self._memory_limit)
return op
def _get_return_value(self, tensors):
"""Return the value to return from a get op.
def __internal_get(self, get_fn, name):
with ops.colocate_with(self._coloc_op):
ret = get_fn()
If the staging area has names, return a dictionary with the
names as keys. Otherwise return either a single tensor
or a list of tensors depending on the length of `tensors`.
Args:
tensors: List of tensors from the get op.
Returns:
A single tensor, a list of tensors, or a dictionary
of tensors.
"""
if self._names:
# The returned values in `tensors` are in the same order as
# the names in `self._names`.
return {n: tensors[i] for i, n in enumerate(self._names)}
elif len(tensors) == 1:
return tensors[0]
else:
return tensors
return self._get_return_value(ret)
def get(self, name=None):
"""Gets one element from this staging area.
@ -1584,19 +1687,388 @@ class StagingArea(object):
if name is None:
name = "%s_get" % self._name
fn = lambda: gen_data_flow_ops.unstage(dtypes=self._dtypes,
shared_name=self._name, name=name,
capacity=self._capacity,
memory_limit=self._memory_limit)
return self.__internal_get(fn, name)
def peek(self, index, name=None):
"""Peeks at an element in the staging area.
If the staging area is too small to contain the element at
the specified index, it will block until enough elements
are inserted to complete the operation.
The placement of the returned tensor will be determined by
the current device scope when this function is called.
Args:
index: The index of the tensor within the staging area
to look up.
name: A name for the operation (optional).
Returns:
The tuple of tensors that was gotten.
"""
if name is None:
name = "%s_peek" % self._name
fn = lambda: gen_data_flow_ops.stage_peek(index,
dtypes=self._dtypes, shared_name=self._name,
name=name, capacity=self._capacity,
memory_limit=self._memory_limit)
return self.__internal_get(fn, name)
def size(self, name=None):
"""Returns the number of elements in the staging area.
Args:
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_size" % self._name
return gen_data_flow_ops.stage_size(name=name, shared_name=self._name,
dtypes=self._dtypes, capacity=self._capacity,
memory_limit=self._memory_limit)
def clear(self, name=None):
"""Clears the staging area.
Args:
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_clear" % self._name
return gen_data_flow_ops.stage_clear(name=name, shared_name=self._name,
dtypes=self._dtypes, capacity=self._capacity,
memory_limit=self._memory_limit)
class MapStagingArea(BaseStagingArea):
"""
A `MapStagingArea` is a TensorFlow data structure that stores tensors across
multiple steps, and exposes operations that can put and get tensors.
Each `MapStagingArea` element is a (key, value) pair.
Only int64 keys are supported, other types should be
hashed to produce a key.
Values are a tuple of one or more tensors.
Each tuple component has a static dtype,
and may have a static shape.
The capacity of a `MapStagingArea` may be bounded or unbounded.
It supports multiple concurrent producers and consumers; and
provides exactly-once delivery.
Each value tuple of a `MapStagingArea` is a fixed-length tuple of tensors whose
dtypes are described by `dtypes`, and whose shapes are optionally described
by the `shapes` argument.
If the `shapes` argument is specified, each component of a staging area
element must have the respective fixed shape. If it is
unspecified, different elements may have different shapes,
It behaves like an associative container with support for:
- put(key, values)
- peek(key) like dict.get(key)
- get(key) like dict.pop(key)
- get(key=None) like dict.popitem()
- size()
- clear()
If ordered a tree structure ordered by key will be used and
get(key=None) will remove (key, value) pairs in increasing key order.
Otherwise a hashtable
It can be configured with a capacity in which case
put(key, values) will block until space becomes available.
Similarly, it can be configured with a memory limit which
will block put(key, values) until space is available.
This is mostly useful for limiting the number of tensors on
devices such as GPUs.
All get() and peek() commands block if the requested
(key, value) pair is not present in the staging area.
Incomplete puts are supported and will be placed in an incomplete
hash until such time as all values associated with the key have
been inserted. Once completed, this (key, value) pair will be
inserted into the main data structure. Data in the incomplete set
counts towards the memory limit, but not towards capacity limit.
"""
def __init__(self, dtypes, shapes=None, names=None, shared_name=None,
ordered=False, capacity=0, memory_limit=0):
"""
Args:
dtypes: A list of types. The length of dtypes must equal the number
of tensors in each element.
capacity: (Optional.) Maximum number of elements.
An integer. If zero, the Staging Area is unbounded
memory_limit: (Optional.) Maximum number of bytes of all tensors
in the Staging Area (excluding keys).
An integer. If zero, the Staging Area is unbounded
ordered: (Optional.) If True the underlying data structure
is a tree ordered on key. Otherwise assume a hashtable.
shapes: (Optional.) Constraints on the shapes of tensors in an element.
A list of shape tuples or None. This list is the same length
as dtypes. If the shape of any tensors in the element are constrained,
all must be; shapes can be None if the shapes should not be constrained.
names: (Optional.) If provided, the `get()` and
`put()` methods will use dictionaries with these names as keys.
Must be None or a list or tuple of the same length as `dtypes`.
shared_name: (Optional.) A name to be used for the shared object. By
passing the same name to two different python objects they will share
the underlying staging area. Must be a string.
Raises:
ValueError: If one of the arguments is invalid.
"""
super(MapStagingArea, self).__init__(dtypes, shapes,
names, shared_name,
capacity, memory_limit)
# Defer to different methods depending if the map is ordered
self._ordered = ordered
if ordered:
self._put_fn = gen_data_flow_ops.ordered_map_stage
self._pop_fn = gen_data_flow_ops.ordered_map_unstage
self._popitem_fn = gen_data_flow_ops.ordered_map_unstage_no_key
self._peek_fn = gen_data_flow_ops.ordered_map_peek
self._size_fn = gen_data_flow_ops.ordered_map_size
self._incomplete_size_fn = gen_data_flow_ops.ordered_map_incomplete_size
self._clear_fn = gen_data_flow_ops.ordered_map_clear
else:
self._put_fn = gen_data_flow_ops.map_stage
self._pop_fn = gen_data_flow_ops.map_unstage
self._popitem_fn = gen_data_flow_ops.map_unstage_no_key
self._peek_fn = gen_data_flow_ops.map_peek
self._size_fn = gen_data_flow_ops.map_size
self._incomplete_size_fn = gen_data_flow_ops.map_incomplete_size
self._clear_fn = gen_data_flow_ops.map_clear
def put(self, key, vals, indices=None, name=None):
"""
Create an op that stores the (key, vals) pair in the staging area.
Incomplete puts are possible, preferably using a dictionary for vals
as the appropriate dtypes and shapes can be inferred from the value names
dictionary key values. If vals is a list or tuple, indices must
also be specified so that the op knows at which element position
to perform the insert.
This operation will block if the capacity or memory limit of this
container is reached.
Args:
key: Key associated with the data
vals: Tensor (or a dict/tuple of Tensors) to place
into the staging area.
indices: (Optional) if vals is a tuple/list, this is required.
name: A name for the operation (optional)
Returns:
The created op
Raises:
ValueError: If the number or type of inputs don't match the staging area.
"""
with ops.name_scope(name, "%s_put" % self._name,
self._scope_vals(vals)) as scope:
vals, indices = self._check_put_dtypes(vals, indices)
with ops.colocate_with(self._coloc_op):
op = self._put_fn(key, indices, vals, dtypes=self._dtypes,
shared_name=self._name, name=scope,
capacity=self._capacity,
memory_limit=self._memory_limit)
return op
def peek(self, key, name=None):
"""
Peeks at staging area data associated with the key.
If the key is not in the staging area, it will block
until the associated (key, value) is inserted.
Args:
key: Key associated with the required data
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_pop" % self._name
with ops.colocate_with(self._coloc_op):
ret = gen_data_flow_ops.unstage(dtypes=self._dtypes,
shared_name=self._name, name=name)
result = self._peek_fn(key, shared_name=self._name,
dtypes=self._dtypes,
name=name,
capacity=self._capacity,
memory_limit=self._memory_limit)
curr_device_scope = control_flow_ops.no_op().device
if curr_device_scope != self._coloc_op.device:
for i in range(len(ret)):
ret[i] = array_ops.identity(ret[i])
return self._get_return_value(result)
for output, shape in zip(ret, self._shapes):
output.set_shape(shape)
def get(self, key=None, name=None):
"""
If the key is provided, the associated (key, value)
is returned from the staging area. If the key is not
in the staging area, this method will block until
the associated (key, value) is inserted.
return self._get_return_value(ret)
If no key is provided and the staging area is ordered,
the (key, value) with the smallest key will be returned.
Otherwise, a random (key, value) will be returned.
If the staging area is empty when this operation executes,
it will block until there is an element to dequeue.
Args:
key: Key associated with the required data (Optional)
name: A name for the operation (optional)
Returns:
The created op
"""
if key is None:
return self._popitem(name)
else:
return self._pop(key, name)
def _pop(self, key, name=None):
"""
Remove and return the associated (key, value)
is returned from the staging area. If the key is not
in the staging area, this method will block until
the associated (key, value) is inserted.
Args:
key: Key associated with the required data
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_get" % self._name
with ops.colocate_with(self._coloc_op):
result = self._pop_fn(key, shared_name=self._name,
dtypes=self._dtypes,
name=name,
capacity=self._capacity,
memory_limit=self._memory_limit)
return key, self._get_return_value(result)
def _popitem(self, name=None):
"""
If the staging area is ordered,
the (key, value) with the smallest key will be returned.
Otherwise, a random (key, value) will be returned.
If the staging area is empty when this operation executes,
it will block until there is an element to dequeue.
Args:
key: Key associated with the required data
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_get_nokey" % self._name
with ops.colocate_with(self._coloc_op):
key, result = self._popitem_fn(shared_name=self._name,
dtypes=self._dtypes,
name=name,
capacity=self._capacity,
memory_limit=self._memory_limit)
# Separate keys and results out from
# underlying namedtuple
key = self._create_device_transfers(key)[0]
result = self._get_return_value(result)
return key, result
def size(self, name=None):
"""
Returns the number of elements in the staging area.
Args:
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_size" % self._name
return self._size_fn(shared_name=self._name,
name=name, dtypes=self._dtypes,
capacity=self._capacity,
memory_limit=self._memory_limit)
def incomplete_size(self, name=None):
"""
Returns the number of incomplete elements in the staging area.
Args:
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_incomplete_size" % self._name
return self._incomplete_size_fn(shared_name=self._name,
name=name, dtypes=self._dtypes,
capacity=self._capacity,
memory_limit=self._memory_limit)
def clear(self, name=None):
"""
Clears the staging area.
Args:
name: A name for the operation (optional)
Returns:
The created op
"""
if name is None:
name = "%s_clear" % self._name
return self._clear_fn(shared_name=self._name,
name=name, dtypes=self._dtypes,
capacity=self._capacity,
memory_limit=self._memory_limit)
class RecordInput(object):