From 1705b099df0fd7c2ea4abf83cfee896febeb8309 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Sat, 20 May 2017 00:06:37 +0200 Subject: [PATCH] Map Staging Area (#9686) * Map Staging Area Operator * Add map staging operators to the BUILD * Don't be too clever about notifying waiting thread * Add Staging Area Size op * Use predicate form of condition_variable.wait * Add a Staging Area clear op * Add Peek op and capacity to the Staging Area * map_stage_op.cpp -> map_stage_op.cc * size should be int32 * Small improvements to staging test case * Create BaseStagingArea Containing common functionality to StagingArea and BaseStagingArea * Add MapStagingArea test cases * Add an ordering test * Rename get() to peek() * Combine pop(key) and popitem() into get(key=None) * Block on peek methods Wait for the requested element to be inserted into the staging area. * Use constructor initializer lists * Arrange class typedefs, members and methods Also make previously public helper methods like HasBoundedCapacity() and IsFull() private. * Better docs for StagingArea.clear() * SYCL_DEVICE MapStage => OrderedMapStage * Initializer list fix * device('/gpu:0') => test.gpu_device_name() * Copyright year to 2017 * Reverse keys before random shuffle * Python docstring updates * Fix stage op docs * Order RecordInput correctly * Updated MapStage op docs * Capture index by value instead of reference Its just an int. * Add support for Staging Area memory limit Mostly useful for constraining the amount of memory used for Staging Areas placed on the GPU. * Use key Tensor as key rather than int64 * Pass dtypes to StagingMap * CamelCase types * Support barrier operation on the MapStagingArea - It is now possible to insert incomplete elements. - Key std::map and std::unordered_map on scalar Tensor int64 (avoids unnecessary allocations) * Remove shuffling to make test deterministic * Use size_t for indices and positive quantities * Fix lint errors - autopep8 data_flow_ops.py indentation to 2 spaces. - Alphabetically order data_flow lib dependencies in BUILD * extra capacity() should be memory_limit() * Run buildifier * Fix OrderedMapIncompleteSize kernel name * Fix device name in colocation test Device name is "" if no GPU is present. * Fix subtle size_t to int conversion bug * Add #include for windows builds * Add #include in stage_op.cc too --- tensorflow/core/kernels/BUILD | 10 + tensorflow/core/kernels/map_stage_op.cc | 816 ++++++++++++++++++ tensorflow/core/kernels/stage_op.cc | 279 +++++- tensorflow/core/ops/data_flow_ops.cc | 280 ++++++ tensorflow/python/kernel_tests/BUILD | 14 + .../python/kernel_tests/map_stage_op_test.py | 383 ++++++++ .../python/kernel_tests/stage_op_test.py | 166 +++- tensorflow/python/ops/data_flow_ops.py | 676 ++++++++++++--- 8 files changed, 2491 insertions(+), 133 deletions(-) create mode 100644 tensorflow/core/kernels/map_stage_op.cc create mode 100644 tensorflow/python/kernel_tests/map_stage_op_test.py diff --git a/tensorflow/core/kernels/BUILD b/tensorflow/core/kernels/BUILD index 4e2df417cf9..1365634bbfe 100644 --- a/tensorflow/core/kernels/BUILD +++ b/tensorflow/core/kernels/BUILD @@ -288,6 +288,15 @@ tf_kernel_library( ], ) +tf_kernel_library( + name = "map_stage_op", + srcs = ["map_stage_op.cc"], + deps = [ + "//tensorflow/core:framework", + "//tensorflow/core:lib", + ], +) + cc_library( name = "queue_base", srcs = ["queue_base.cc"], @@ -1315,6 +1324,7 @@ cc_library( ":fifo_queue_op", ":lookup_table_init_op", ":lookup_table_op", + ":map_stage_op", ":padding_fifo_queue_op", ":priority_queue_op", ":queue_ops", diff --git a/tensorflow/core/kernels/map_stage_op.cc b/tensorflow/core/kernels/map_stage_op.cc new file mode 100644 index 00000000000..6ec7cce59c8 --- /dev/null +++ b/tensorflow/core/kernels/map_stage_op.cc @@ -0,0 +1,816 @@ +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include +#include +#include +#include + +#include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/resource_mgr.h" +#include "tensorflow/core/framework/tensor.h" +#include "tensorflow/core/framework/tensor_shape.h" +#include "tensorflow/core/lib/gtl/optional.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/mutex.h" + +namespace tensorflow { + +namespace { + +// Partial Ordering Comparator for Tensor keys containing scalar int64's +struct KeyTensorLess { + bool operator()(const Tensor & lhs, const Tensor & rhs) const { + return std::less{}(lhs.scalar()(), + rhs.scalar()()); + } +}; + +// Key Equality operator for Tensor keys containing scalar int64's +struct KeyTensorEqual { + bool operator()(const Tensor & lhs, const Tensor & rhs) const { + return std::equal_to{}(lhs.scalar()(), + rhs.scalar()()); + } +}; + +// Hash for Tensor keys containing scalar int64's +struct KeyTensorHash { + std::size_t operator()(const Tensor & key) const { + return std::hash{}(key.scalar()()); + } +}; + + +// General Template Definition +template +struct MapTraits {}; + +// Partially specialise for ordered +template +struct MapTraits +{ + typedef Tensor KeyType; + typedef Data DataType; + typedef std::map MapType; +}; + +// Partially specialise for unordered +template +struct MapTraits +{ + typedef Tensor KeyType; + typedef Data DataType; + typedef std::unordered_map MapType; +}; + +// Wrapper around map/unordered_map +template +class StagingMap : public ResourceBase +{ +public: + // Public typedefs + typedef std::vector Tuple; + typedef gtl::optional OptionalTensor; + typedef std::vector IncompleteTuple; + + typedef MapTraits MapTraits_; + typedef typename MapTraits_::MapType MapType; + typedef typename MapTraits_::KeyType KeyType; + + typedef MapTraits IncompleteTraits; + typedef typename IncompleteTraits::MapType IncompleteType; + +private: + // Private variables + DataTypeVector dtypes_ GUARDED_BY(mu_); + std::size_t capacity_ GUARDED_BY(mu_); + std::size_t memory_limit_ GUARDED_BY(mu_); + std::size_t current_bytes_ GUARDED_BY(mu_); + mutex mu_; + condition_variable not_empty_; + condition_variable full_; + IncompleteType incomplete_ GUARDED_BY(mu_); + MapType map_ GUARDED_BY(mu_); + +private: + // private methods + + // If map is configured for bounded capacity, notify + // waiting inserters that space is now available + void notify_inserters_if_bounded(mutex_lock & l) + { + if(has_capacity() || has_memory_limit()) + { + l.unlock(); + full_.notify_one(); + } + } + + // Notify any removers waiting to extract values + // that data is now available + void notify_removers(mutex_lock & l) + { + l.unlock(); + not_empty_.notify_one(); + } + + inline bool has_capacity() + { return capacity_ > 0; } + + inline bool has_memory_limit() + { return memory_limit_ > 0; } + + inline bool would_exceed_memory_limit(std::size_t bytes) + { return bytes + current_bytes_ > memory_limit_; } + + inline bool is_capacity_full() + { return map_.size() >= capacity_; } + + // Get number of bytes in the tuple + inline std::size_t get_tuple_bytes(const Tuple & tuple) + { + return std::accumulate(tuple.begin(), tuple.end(), 0, + [](const std::size_t & lhs, const Tensor & rhs) { + return lhs + rhs.TotalBytes(); + }); + } + + // Check that the index is within bounds + inline Status check_index(const Tensor & key, std::size_t index) + { + if(index >= dtypes_.size()) + { + return Status(errors::InvalidArgument("Index '", + index, "' for key '", key.scalar()(), + "' was out of bounds '", dtypes_.size(), "'.")); + } + + return Status::OK(); + } + + + // Check that the optional value at the specified index + // is uninitialized + inline Status check_index_uninitialized(const Tensor & key, + std::size_t index, + const IncompleteTuple & tuple) + { + if(tuple[index].has_value()) + { + return Status(errors::InvalidArgument("The tensor for index '", + index, "' for key '", key.scalar()(), + "' was already initialized '", dtypes_.size(), "'.")); + } + + return Status::OK(); + } + + // Check that the indices are strictly ordered + inline Status check_index_ordering(const Tensor & indices) + { + auto findices = indices.flat(); + + for(std::size_t i = 0; i < findices.dimension(0)-1; ++i) + { + if(findices(i) < findices(i+1)) + { continue; } + + return Status(errors::InvalidArgument("Indices are not " + "strictly ordered")); + } + + return Status::OK(); + } + + // Check bytes are within memory limits memory limits + inline Status check_memory_limit(std::size_t bytes) + { + if(has_memory_limit() && bytes > memory_limit_) { + return Status(errors::ResourceExhausted("Attempted to insert " + "tensors with combined size of '", bytes, "' bytes into " + "Staging Area with a memory limit of '", memory_limit_, "'.")); + } + + return Status::OK(); + } + + // Insert incomplete data into the Barrier + Status put_incomplete(const KeyType & key, + const Tensor & indices, + Tuple * tuple, + mutex_lock &l) + { + auto findices = indices.flat(); + + // Search for the key in our incomplete set + auto it = incomplete_.find(key); + + // Check that the tuple fits within the memory limit + std::size_t tuple_bytes = get_tuple_bytes(*tuple); + TF_RETURN_IF_ERROR(check_memory_limit(tuple_bytes)); + + if(has_memory_limit()) + { + full_.wait(l, [tuple_bytes, this]() { + // Stop waiting if we don't exceed the memory limit + return !would_exceed_memory_limit(tuple_bytes); + }); + } + + // This key isn't present in the incomplete set + // Create IncompleteTuple and insert + if(it == incomplete_.end()) + { + IncompleteTuple empty(dtypes_.size()); + + // Initialise empty tuple with given dta + for(std::size_t i = 0; i < findices.dimension(0); ++i) + { + std::size_t index = findices(i); + TF_RETURN_IF_ERROR(check_index(key, index)); + + // Assign tuple at this index + empty[index] = std::move((*tuple)[i]); + } + + // Insert into incomplete map + incomplete_.insert({key, std::move(empty)}); + + // Increment size + current_bytes_ += tuple_bytes; + } + // Found an entry in the incomplete index + // Update with given data and insert complete entries + // into the main map + else + { + // Reference existing incomplete tuple + IncompleteTuple & present = it->second; + + // Assign given data + for(std::size_t i = 0; i < findices.dimension(0); ++i) + { + std::size_t index = findices(i); + TF_RETURN_IF_ERROR(check_index(key, index)); + TF_RETURN_IF_ERROR(check_index_uninitialized(key, + index, present)); + + // Assign tuple at this index + present[index] = std::move((*tuple)[i]); + } + + // Increment size + current_bytes_ += tuple_bytes; + + // Do we have values at all tuple elements? + bool complete = std::all_of(present.begin(), present.end(), + [](const OptionalTensor & v) { return v.has_value(); }); + + // If so, put the tuple in the actual map + if(complete) + { + // Create a tuple for insertion + Tuple new_tuple; + + for(const auto & v: present) + { new_tuple.push_back(v.value()); } + + // Remove from incomplete + incomplete_.erase(it); + + TF_RETURN_IF_ERROR(put_complete(key, &new_tuple, l)); + } + } + + return Status::OK(); + } + + // Does the insertion into the actual staging area + Status put_complete(const KeyType & key, Tuple * tuple, + mutex_lock & l) + { + // Insert key and tuples into the map + map_.insert({key, std::move(*tuple)}); + + notify_removers(l); + + return Status::OK(); + } + +public: + // public methods + explicit StagingMap(const DataTypeVector & dtypes, + std::size_t capacity, std::size_t memory_limit) : + dtypes_(dtypes), + capacity_(capacity), + memory_limit_(memory_limit), + current_bytes_(0) {} + + Status put(KeyType* key, const Tensor * indices, + Tuple* tuple) + { + mutex_lock l(mu_); + + // Sanity check the indices + TF_RETURN_IF_ERROR(check_index_ordering(*indices)); + + // Handle incomplete inserts + if(indices->NumElements() != dtypes_.size()) + { + return put_incomplete(*key, *indices, tuple, l); + } + + std::size_t tuple_bytes = get_tuple_bytes(*tuple); + // Check that tuple_bytes fits within the memory limit + TF_RETURN_IF_ERROR(check_memory_limit(tuple_bytes)); + + // If map capacity is bounded wait until map is not full + if(has_capacity() || has_memory_limit()) { + full_.wait(l, [tuple_bytes, this]() { + // If there's a memory limit, check if there's space for insertion + bool memory_limit_valid = has_memory_limit() ? + !would_exceed_memory_limit(tuple_bytes) : true; + // If we're configured for capacity check if there's space for insertion + bool capacity_valid = has_capacity() ? !is_capacity_full() : true; + + // Stop waiting upon success for both conditions + return memory_limit_valid && capacity_valid; + }); + } + + // Do the put operation + TF_RETURN_IF_ERROR(put_complete(*key, tuple, l)); + + // Update the current size + current_bytes_ += tuple_bytes; + + return Status::OK(); + } + + Status get(const KeyType* key, Tuple* tuple) + { + mutex_lock l(mu_); + + typename MapType::const_iterator it; + + // Wait until the element with the requested key is present + not_empty_.wait(l, [&, this]() { + it = map_.find(*key); + return it != map_.end(); + }); + + // Copy tensors into the tuple + for(const auto & tensor : it->second) + { tuple->push_back(tensor); } + + // Update bytes in the Staging Area + current_bytes_ -= get_tuple_bytes(*tuple); + + return Status::OK(); + } + + Status pop(const KeyType* key, Tuple* tuple) + { + mutex_lock l(mu_); + + typename MapType::iterator it; + + // Wait until the element with the requested key is present + not_empty_.wait(l, [&, this]() { + it = map_.find(*key); + return it != this->map_.end(); + }); + + // Move from the entry as its erased anyway + *tuple = std::move(it->second); + + // Remove + map_.erase(it); + + // Update bytes in the Staging Area + current_bytes_ -= get_tuple_bytes(*tuple); + + notify_inserters_if_bounded(l); + + return Status::OK(); + } + + Status popitem(KeyType* key, Tuple* tuple) + { + mutex_lock l(mu_); + + // Wait until map is not empty + not_empty_.wait(l, [this]() { return !this->map_.empty(); }); + + // Move from the first element and erase it + *tuple = std::move(map_.begin()->second); + *key = map_.begin()->first; + map_.erase(map_.begin()); + + // Update bytes in the Staging Area + current_bytes_ -= get_tuple_bytes(*tuple); + + notify_inserters_if_bounded(l); + + return Status::OK(); + } + + Status clear() + { + mutex_lock l(mu_); + map_.clear(); + incomplete_.clear(); + current_bytes_ = 0; + + notify_inserters_if_bounded(l); + + return Status::OK(); + } + + size_t incomplete_size() + { + mutex_lock l(mu_); + return incomplete_.size(); + } + + size_t size() + { + // Lock the map and return the size + mutex_lock l(mu_); + return map_.size(); + } + + string DebugString() + { + return "StagingMap"; + } +}; + +template +Status GetStagingMap(OpKernelContext* ctx, + const NodeDef& ndef, + StagingMap** map) +{ + auto rm = ctx->resource_manager(); + ContainerInfo cinfo; + + // Lambda for creating the Staging Area + auto create_fn = [&ndef](StagingMap** ret) -> Status + { + DataTypeVector dtypes; + int64 capacity; + int64 memory_limit; + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "dtypes", &dtypes)); + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "capacity", &capacity)); + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "memory_limit", &memory_limit)); + *ret = new StagingMap(dtypes, capacity, memory_limit); + return Status::OK(); + }; + + TF_RETURN_IF_ERROR(cinfo.Init(rm, ndef, true /* use name() */)); + TF_RETURN_IF_ERROR(rm->LookupOrCreate>( + cinfo.container(), cinfo.name(), + map, create_fn)); + return Status::OK(); +} + +template +class MapStageOp : public OpKernel +{ + public: + explicit MapStageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override { + StagingMap* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + typename StagingMap::Tuple tuple; + + const Tensor * key_tensor; + const Tensor * indices_tensor; + OpInputList values_tensor; + + OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor)); + OP_REQUIRES_OK(ctx, ctx->input("indices", &indices_tensor)); + OP_REQUIRES_OK(ctx, ctx->input_list("values", &values_tensor)); + + // Create copy for insertion into Staging Area + Tensor key(*key_tensor); + + // Create the tuple to store + for (std::size_t i = 0; i < values_tensor.size(); ++i) { + tuple.push_back(values_tensor[i]); + } + + // Store the tuple in the map + OP_REQUIRES_OK(ctx, map->put(&key, indices_tensor, &tuple)); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapStage").Device(DEVICE_CPU), + MapStageOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapStage").Device(DEVICE_CPU), + MapStageOp); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapStage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapStageOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapStage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapStageOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapStage").HostMemory("key") + .Device(DEVICE_SYCL), MapStageOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapStage").HostMemory("key") + .Device(DEVICE_SYCL), MapStageOp); + +#endif // TENSORFLOW_USE_SYCL + +template +class MapUnstageOp : public OpKernel +{ + public: + explicit MapUnstageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + StagingMap* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + typename StagingMap::Tuple tuple; + + const Tensor * key_tensor; + OpInputList values_tensor; + + OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor)); + OP_REQUIRES_OK(ctx, map->pop(key_tensor, &tuple)); + + OP_REQUIRES( + ctx, tuple.size() == (size_t)ctx->num_outputs(), + errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(), + " vs. ", ctx->num_outputs())); + for (size_t i = 0; i < tuple.size(); ++i) { + ctx->set_output(i, tuple[i]); + } + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapUnstage").Device(DEVICE_CPU), + MapUnstageOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage").Device(DEVICE_CPU), + MapUnstageOp); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapUnstage").HostMemory("key") + .Device(DEVICE_GPU), MapUnstageOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage").HostMemory("key") + .Device(DEVICE_GPU), MapUnstageOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapUnstage").HostMemory("key") + .Device(DEVICE_SYCL), MapUnstageOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage").HostMemory("key") + .Device(DEVICE_SYCL), MapUnstageOp); +#endif // TENSORFLOW_USE_SYCL + +template +class MapPeekOp : public OpKernel +{ + public: + explicit MapPeekOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + StagingMap* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + typename StagingMap::Tuple tuple; + + const Tensor * key_tensor; + OpInputList values_tensor; + + OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor)); + OP_REQUIRES_OK(ctx, map->get(key_tensor, &tuple)); + + OP_REQUIRES( + ctx, tuple.size() == (size_t)ctx->num_outputs(), + errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(), + " vs. ", ctx->num_outputs())); + for (size_t i = 0; i < tuple.size(); ++i) { + ctx->set_output(i, tuple[i]); + } + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapPeek").Device(DEVICE_CPU), + MapPeekOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek").Device(DEVICE_CPU), + MapPeekOp); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapPeek").HostMemory("key") + .Device(DEVICE_GPU), MapPeekOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek").HostMemory("key") + .Device(DEVICE_GPU), MapPeekOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapPeek").HostMemory("key") + .Device(DEVICE_SYCL), MapPeekOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek").HostMemory("key") + .Device(DEVICE_SYCL), MapPeekOp); +#endif // TENSORFLOW_USE_SYCL + + + +template +class MapUnstageNoKeyOp : public OpKernel +{ + public: + explicit MapUnstageNoKeyOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + StagingMap* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + // Pop a random (key, value) off the map + typename StagingMap::KeyType key; + typename StagingMap::Tuple tuple; + + OP_REQUIRES_OK(ctx, map->popitem(&key, &tuple)); + + // Allocate a key tensor and assign the key as the first output + ctx->set_output(0, key); + + // Set the rest of the outputs to the tuple Tensors + OP_REQUIRES(ctx, + tuple.size() == (size_t)ctx->num_outputs()-1, + errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(), + " vs. ", ctx->num_outputs()-1)); + for (size_t i = 0; i < tuple.size(); ++i) + { + ctx->set_output(i+1, tuple[i]); + } + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey").Device(DEVICE_CPU), + MapUnstageNoKeyOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey").Device(DEVICE_CPU), + MapUnstageNoKeyOp); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey").HostMemory("key") + .Device(DEVICE_GPU), MapUnstageNoKeyOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey").HostMemory("key") + .Device(DEVICE_GPU), MapUnstageNoKeyOp); + +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey").HostMemory("key") + .Device(DEVICE_SYCL), MapUnstageNoKeyOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey").HostMemory("key") + .Device(DEVICE_SYCL), MapUnstageNoKeyOp); +#endif // TENSORFLOW_USE_SYCL + + +template +class MapSizeOp : public OpKernel +{ + public: + explicit MapSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override + { + StagingMap* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + // Allocate size output tensor + Tensor * size = nullptr; + OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), + &size)); + + // Set it to the actual size + size->scalar().setConstant(map->size()); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_CPU), + MapSizeOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_CPU), + MapSizeOp); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_GPU) + .HostMemory("size"), MapSizeOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_GPU) + .HostMemory("size"), MapSizeOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapSizeOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapSizeOp); +#endif // TENSORFLOW_USE_SYCL + +template +class MapIncompleteSizeOp : public OpKernel +{ + public: + explicit MapIncompleteSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override + { + StagingMap* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + // Allocate size output tensor + Tensor * size = nullptr; + OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), + &size)); + + // Set it to the actual size + size->scalar().setConstant(map->incomplete_size()); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_CPU), + MapIncompleteSizeOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_CPU), + MapIncompleteSizeOp); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_GPU) + .HostMemory("size"), MapIncompleteSizeOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_GPU) + .HostMemory("size"), MapIncompleteSizeOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapIncompleteSizeOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapIncompleteSizeOp); +#endif // TENSORFLOW_USE_SYCL + +template +class MapClearOp : public OpKernel +{ + public: + explicit MapClearOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override + { + StagingMap* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + OP_REQUIRES_OK(ctx, map->clear()); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_CPU), + MapClearOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_CPU), + MapClearOp); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_GPU), + MapClearOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_GPU), + MapClearOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_SYCL), + MapClearOp); +REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_SYCL), + MapClearOp); +#endif // TENSORFLOW_USE_SYCL + +} // namespace + +} // namespace tensorflow diff --git a/tensorflow/core/kernels/stage_op.cc b/tensorflow/core/kernels/stage_op.cc index 161ba892127..45168112cc3 100644 --- a/tensorflow/core/kernels/stage_op.cc +++ b/tensorflow/core/kernels/stage_op.cc @@ -1,4 +1,4 @@ -/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ limitations under the License. ==============================================================================*/ #include +#include #include #include "tensorflow/core/framework/op_kernel.h" @@ -30,26 +31,153 @@ namespace { class Buffer : public ResourceBase { public: - explicit Buffer() {} - + // public types typedef std::vector Tuple; - // the Buffer takes ownership of the Tuple - void Put(Tuple* tuple) { - mutex_lock l(mu_); - buf_.push_back(std::move(*tuple)); - non_empty_cond_var_.notify_one(); // maybe possible to optimize by reducing - // how often this signal is sent + private: + // private variables + std::size_t capacity_; + std::size_t memory_limit_; + std::size_t current_bytes_; + mutex mu_; + condition_variable non_empty_cond_var_; + condition_variable full_cond_var_; + std::deque buf_ GUARDED_BY(mu_); + + + private: + // private methods + + // If the buffer is configured for bounded capacity, notify + // waiting inserters that space is now available + void notify_inserters_if_bounded(mutex_lock & l) + { + if(IsBounded()) + { + l.unlock(); + full_cond_var_.notify_one(); + } } - void Get(Tuple* tuple) { // TODO(zhifengc): Support cancellation. + // Are there a limit number of elements or a memory limit + // configued on this buffer? + bool IsBounded() { + return capacity_ > 0 || memory_limit_ > 0; + } + + bool IsCapacityFull() { + return buf_.size() >= capacity_; + } + + bool WouldExceedMemoryLimit(std::size_t bytes) { + return bytes + current_bytes_ > memory_limit_; + } + + std::size_t GetTupleBytes(const Tuple & tuple) + { + return std::accumulate(tuple.begin(), tuple.end(), 0, + [](const std::size_t & lhs, const Tensor & rhs) { + return lhs + rhs.TotalBytes(); + }); + } + + public: + // public methods + explicit Buffer(std::size_t capacity, std::size_t memory_limit) : + capacity_(capacity), + memory_limit_(memory_limit), + current_bytes_(0) {} + + // the Buffer takes ownership of the Tuple + Status Put(Tuple* tuple) { mutex_lock l(mu_); - while (buf_.empty()) { - non_empty_cond_var_.wait(l); + + std::size_t tuple_bytes = GetTupleBytes(*tuple); + + // Sanity check so that we don't block for ever below + if(memory_limit_ > 0 && tuple_bytes > memory_limit_) { + return Status(errors::ResourceExhausted("Attempted to insert " + "tensors with combined size of '", tuple_bytes, "' bytes into " + "Staging Area with a memory limit of '", memory_limit_, "'.")); } + + // If buffer capacity is bounded wait until elements have been removed + if(IsBounded()) { + full_cond_var_.wait(l, [tuple_bytes, this]() { + // If there's a memory limit, check if there's space for insertion + bool memory_limit_valid = memory_limit_ > 0 ? + !WouldExceedMemoryLimit(tuple_bytes) : true; + // If we're configured for capacity check if there's space for insertion + bool capacity_valid = capacity_ > 0 ? !IsCapacityFull() : true; + + // Stop waiting upon success for both conditions + return capacity_valid && memory_limit_valid; + }); + } + + // Update bytes in the Staging Area + current_bytes_ += tuple_bytes; + + // Store tuple + buf_.push_back(std::move(*tuple)); + + l.unlock(); + // maybe possible to optimize by reducing + // how often this signal is sent + non_empty_cond_var_.notify_one(); + + return Status::OK(); + } + + // Get tuple at front of the buffer + void Get(Tuple* tuple) { // TODO(zhifengc): Support cancellation. + mutex_lock l(mu_); + + // Wait for data if the buffer is empty + non_empty_cond_var_.wait(l, [this]() { + return !buf_.empty(); + }); + + // Move data into the output tuple *tuple = std::move(buf_.front()); buf_.pop_front(); + + // Update bytes in the Staging Area + current_bytes_ -= GetTupleBytes(*tuple); + + notify_inserters_if_bounded(l); + } + + // Return tuple at index + Status Peek(std::size_t index, Tuple* tuple) { + mutex_lock l(mu_); + + // Wait if the requested index is not available + non_empty_cond_var_.wait(l, [index, this]() { + return index < this->buf_.size(); + }); + + // Place tensors in the output tuple + for(const auto & tensor: buf_[index]) { + tuple->push_back(tensor); + } + + return Status::OK(); + } + + // Buffer size + size_t Size() { + mutex_lock l(mu_); + return buf_.size(); + } + + void Clear() { + mutex_lock l(mu_); + buf_.clear(); + current_bytes_ = 0; + + notify_inserters_if_bounded(l); } string DebugString() { @@ -57,23 +185,27 @@ class Buffer : public ResourceBase { return strings::StrCat("Staging size: ", buf_.size()); } - private: - mutex mu_; - condition_variable non_empty_cond_var_; - std::deque buf_ GUARDED_BY(mu_); }; -Status CreateBuffer(Buffer** ret) { - *ret = new Buffer; - return Status::OK(); -} - Status GetBuffer(OpKernelContext* ctx, const NodeDef& ndef, Buffer** buf) { auto rm = ctx->resource_manager(); ContainerInfo cinfo; + + // Lambda for creating the Staging Area + auto create_fn = [&ndef](Buffer** ret) -> Status + { + int64 capacity; + int64 memory_limit; + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "capacity", &capacity)); + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "memory_limit", &memory_limit)); + *ret = new Buffer(capacity, memory_limit); + return Status::OK(); + }; + + TF_RETURN_IF_ERROR(cinfo.Init(rm, ndef, true /* use name() */)); TF_RETURN_IF_ERROR(rm->LookupOrCreate(cinfo.container(), cinfo.name(), - buf, CreateBuffer)); + buf, create_fn)); return Status::OK(); } @@ -88,10 +220,10 @@ class StageOp : public OpKernel { OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf)); core::ScopedUnref scope(buf); Buffer::Tuple tuple; - for (int i = 0; i < ctx->num_inputs(); ++i) { + for (std::size_t i = 0; i < ctx->num_inputs(); ++i) { tuple.push_back(ctx->input(i)); } - buf->Put(&tuple); + OP_REQUIRES_OK(ctx, buf->Put(&tuple)); } }; @@ -114,11 +246,13 @@ class UnstageOp : public OpKernel { OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf)); core::ScopedUnref scope(buf); Buffer::Tuple tuple; + buf->Get(&tuple); - OP_REQUIRES( - ctx, tuple.size() == (size_t)ctx->num_outputs(), + + OP_REQUIRES(ctx, tuple.size() == (size_t)ctx->num_outputs(), errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(), " vs. ", ctx->num_outputs())); + for (size_t i = 0; i < tuple.size(); ++i) { ctx->set_output(i, tuple[i]); } @@ -133,4 +267,97 @@ REGISTER_KERNEL_BUILDER(Name("Unstage").Device(DEVICE_GPU), UnstageOp); REGISTER_KERNEL_BUILDER(Name("Unstage").Device(DEVICE_SYCL), UnstageOp); #endif // TENSORFLOW_USE_SYCL +class StagePeekOp : public OpKernel { + public: + explicit StagePeekOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + Buffer* buf = nullptr; + OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf)); + core::ScopedUnref scope(buf); + Buffer::Tuple tuple; + + std::size_t index = ctx->input(0).scalar()(); + + OP_REQUIRES_OK(ctx, buf->Peek(index, &tuple)); + + OP_REQUIRES(ctx, tuple.size() == (size_t)ctx->num_outputs(), + errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(), + " vs. ", ctx->num_outputs())); + + for (size_t i = 0; i < tuple.size(); ++i) { + ctx->set_output(i, tuple[i]); + } + } +}; + +REGISTER_KERNEL_BUILDER(Name("StagePeek").Device(DEVICE_CPU), + StagePeekOp); +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("StagePeek").HostMemory("index"). + Device(DEVICE_GPU), StagePeekOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("StagePeek").HostMemory("index") + .Device(DEVICE_SYCL), StagePeekOp); +#endif // TENSORFLOW_USE_SYCL + + +class StageSizeOp : public OpKernel { + public: + explicit StageSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + Buffer* buf = nullptr; + OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf)); + core::ScopedUnref scope(buf); + + // Allocate size output tensor + Tensor * size = nullptr; + OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), + &size)); + + // Set it to the actual size + size->scalar().setConstant(buf->Size()); + } +}; + +REGISTER_KERNEL_BUILDER(Name("StageSize").Device(DEVICE_CPU), StageSizeOp); +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("StageSize").HostMemory("size") + .Device(DEVICE_GPU), StageSizeOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("StageSize").HostMemory("size") + .Device(DEVICE_SYCL), StageSizeOp); +#endif // TENSORFLOW_USE_SYCL + +class StageClearOp : public OpKernel { + public: + explicit StageClearOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + Buffer* buf = nullptr; + OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf)); + core::ScopedUnref scope(buf); + + buf->Clear(); + } +}; + +REGISTER_KERNEL_BUILDER(Name("StageClear").Device(DEVICE_CPU), StageClearOp); +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("StageClear").Device(DEVICE_GPU), StageClearOp); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("StageClear").Device(DEVICE_SYCL), StageClearOp); +#endif // TENSORFLOW_USE_SYCL + + } // namespace tensorflow diff --git a/tensorflow/core/ops/data_flow_ops.cc b/tensorflow/core/ops/data_flow_ops.cc index c80ff983cfa..2be804acbf6 100644 --- a/tensorflow/core/ops/data_flow_ops.cc +++ b/tensorflow/core/ops/data_flow_ops.cc @@ -1967,6 +1967,8 @@ handle: The handle for a tensor stored in the session state. REGISTER_OP("Stage") .Input("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") .Attr("dtypes: list(type)") .Attr("container: string = ''") .Attr("shared_name: string = ''") @@ -1979,6 +1981,11 @@ The basic functionality of this Op is similar to a queue with many fewer capabilities and options. This Op is optimized for performance. values: a list of tensors +dtypes A list of data types that inserted values should adhere to. +capacity: Maximum number of elements in the Staging Area. If > 0, inserts + on the container will block when the capacity is reached. +memory_limit: The maximum number of bytes allowed for Tensors in the Staging Area. + If > 0, inserts will block until sufficient space is available. container: If non-empty, this queue is placed in the given container. Otherwise, a default container is used. shared_name: It is necessary to match this name to the matching Unstage Op. @@ -1986,6 +1993,8 @@ shared_name: It is necessary to match this name to the matching Unstage Op. REGISTER_OP("Unstage") .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") .Attr("dtypes: list(type)") .Attr("container: string = ''") .Attr("shared_name: string = ''") @@ -1998,6 +2007,277 @@ The basic funtionality is similar to dequeue with many fewer capabilities and options. This Op is optimized for performance. )doc"); +REGISTER_OP("StagePeek") + .Input("index: int32") + .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op peeks at the values at the specified index. If the +underlying container does not contain sufficient elements +this op will block until it does. This Op is optimized for +performance. + )doc"); + + +REGISTER_OP("StageSize") + .Output("size: int32") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(shape_inference::ScalarShape) + .SetIsStateful() + .Doc(R"doc( +Op returns the number of elements in the underlying container. + )doc"); + +REGISTER_OP("StageClear") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op removes all elements in the underlying container. + )doc"); + +// UnorderedMap +REGISTER_OP("MapStage") + .Input("key: int64") + .Input("indices: int32") + .Input("values: fake_dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("fake_dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::NoOutputs) + .SetIsStateful() + .Doc(R"doc( +Stage (key, values) in the underlying container which behaves like a hashtable. + +key: int64 +values: a list of tensors +dtypes A list of data types that inserted values should adhere to. +capacity: Maximum number of elements in the Staging Area. If > 0, inserts + on the container will block when the capacity is reached. +container: If non-empty, this queue is placed in the given container. Otherwise, + a default container is used. +shared_name: It is necessary to match this name to the matching Unstage Op. +)doc"); + +REGISTER_OP("MapPeek") + .Input("key: int64") + .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op peeks at the values at the specified key. If the +underlying container does not contain this key +this op will block until it does. + )doc"); + +REGISTER_OP("MapUnstage") + .Input("key: int64") + .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op removes and returns the values associated with the key +from the underlying container. If the underlying container +does not contain this key, the op will block until it does. + )doc"); + +REGISTER_OP("MapUnstageNoKey") + .Output("key: int64") + .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op removes and returns a random (key, value) +from the underlying container. If the underlying container +does not contain elements, the op will block until it does. + )doc"); + +REGISTER_OP("MapSize") + .Output("size: int32") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::ScalarShape) + .SetIsStateful() + .Doc(R"doc( +Op returns the number of elements in the underlying container. + )doc"); + +REGISTER_OP("MapIncompleteSize") + .Output("size: int32") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::ScalarShape) + .SetIsStateful() + .Doc(R"doc( +Op returns the number of incomplete elements in the underlying container. + )doc"); + + +REGISTER_OP("MapClear") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::NoOutputs) + .SetIsStateful() + .Doc(R"doc( +Op removes all elements in the underlying container. + )doc"); + + +// OrderedMap +REGISTER_OP("OrderedMapStage") + .Input("key: int64") + .Input("indices: int32") + .Input("values: fake_dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("fake_dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::NoOutputs) + .SetIsStateful() + .Doc(R"doc( +Stage (key, values) in the underlying container which behaves like a ordered +associative container. Elements are ordered by key. + +key: int64 +values: a list of tensors +dtypes A list of data types that inserted values should adhere to. +capacity: Maximum number of elements in the Staging Area. If > 0, inserts + on the container will block when the capacity is reached. +container: If non-empty, this queue is placed in the given container. Otherwise, + a default container is used. +shared_name: It is necessary to match this name to the matching Unstage Op. +)doc"); + +REGISTER_OP("OrderedMapPeek") + .Input("key: int64") + .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op peeks at the values at the specified key. If the +underlying container does not contain this key +this op will block until it does. This Op is optimized for +performance. + )doc"); + +REGISTER_OP("OrderedMapUnstage") + .Input("key: int64") + .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op removes and returns the values associated with the key +from the underlying container. If the underlying container +does not contain this key, the op will block until it does. + )doc"); + +REGISTER_OP("OrderedMapUnstageNoKey") + .Output("key: int64") + .Output("values: dtypes") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::UnknownShape) + .SetIsStateful() + .Doc(R"doc( +Op removes and returns the (key, value) element with the smallest +key from the underlying container. If the underlying container +does not contain elements, the op will block until it does. + )doc"); + +REGISTER_OP("OrderedMapSize") + .Output("size: int32") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::ScalarShape) + .SetIsStateful() + .Doc(R"doc( +Op returns the number of elements in the underlying container. + )doc"); + +REGISTER_OP("OrderedMapIncompleteSize") + .Output("size: int32") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::ScalarShape) + .SetIsStateful() + .Doc(R"doc( +Op returns the number of incomplete elements in the underlying container. + )doc"); + +REGISTER_OP("OrderedMapClear") + .Attr("capacity: int >= 0 = 0") + .Attr("memory_limit: int >= 0 = 0") + .Attr("dtypes: list(type)") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .SetShapeFn(tensorflow::shape_inference::NoOutputs) + .SetIsStateful() + .Doc(R"doc( +Op removes all elements in the underlying container. + )doc"); + REGISTER_OP("RecordInput") .Output("records: string") .Attr("file_pattern: string") diff --git a/tensorflow/python/kernel_tests/BUILD b/tensorflow/python/kernel_tests/BUILD index 57325d124b8..88754015a67 100644 --- a/tensorflow/python/kernel_tests/BUILD +++ b/tensorflow/python/kernel_tests/BUILD @@ -2339,6 +2339,20 @@ cuda_py_test( ], ) +cuda_py_test( + name = "map_stage_op_test", + size = "small", + srcs = ["map_stage_op_test.py"], + additional_deps = [ + "//tensorflow/python:array_ops", + "//tensorflow/python:client_testlib", + "//tensorflow/python:framework_for_generated_wrappers", + "//tensorflow/python:math_ops", + "//tensorflow/python:util", + "//tensorflow/python:data_flow_ops", + ], +) + cuda_py_test( name = "concat_op_test", size = "medium", diff --git a/tensorflow/python/kernel_tests/map_stage_op_test.py b/tensorflow/python/kernel_tests/map_stage_op_test.py new file mode 100644 index 00000000000..718d8aebd87 --- /dev/null +++ b/tensorflow/python/kernel_tests/map_stage_op_test.py @@ -0,0 +1,383 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import ops +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import data_flow_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.platform import test + + +class MapStageTest(test.TestCase): + + def testSimple(self): + with self.test_session(use_gpu=True) as sess: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.float32]) + stage = stager.put(pi, [v], [0]) + k, y = stager.get(gi) + y = math_ops.reduce_max(math_ops.matmul(y, y)) + sess.run(stage, feed_dict={x: -1, pi: 0}) + for i in range(10): + _, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i}) + self.assertAllClose(4 * (i - 1) * (i - 1) * 128, yval, rtol=1e-4) + + def testMultiple(self): + with self.test_session(use_gpu=True) as sess: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.float32, dtypes.float32]) + stage = stager.put(pi, [x, v], [0, 1]) + k, (z, y) = stager.get(gi) + y = math_ops.reduce_max(z * math_ops.matmul(y, y)) + sess.run(stage, feed_dict={x: -1, pi: 0}) + for i in range(10): + _, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i}) + self.assertAllClose( + 4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4) + + def testDictionary(self): + with self.test_session(use_gpu=True) as sess: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32], + shapes=[[], [128, 128]], + names=['x', 'v']) + stage = stager.put(pi,{'x': x, 'v': v}) + key, ret = stager.get(gi) + z = ret['x'] + y = ret['v'] + y = math_ops.reduce_max(z * math_ops.matmul(y, y)) + sess.run(stage, feed_dict={x: -1, pi: 0}) + for i in range(10): + _, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i}) + self.assertAllClose( + 4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4) + + def testColocation(self): + gpu_dev = test.gpu_device_name() + + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(gpu_dev): + stager = data_flow_ops.MapStagingArea([dtypes.float32]) + y = stager.put(1, [v], [0]) + self.assertEqual(y.device, '/device:GPU:0' if gpu_dev + else gpu_dev) + with ops.device('/cpu:0'): + _, x = stager.get(1) + y = stager.peek(1) + _, z = stager.get() + self.assertEqual(x.device, '/device:CPU:0') + self.assertEqual(y.device, '/device:CPU:0') + self.assertEqual(z.device, '/device:CPU:0') + + def testPeek(self): + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + p = array_ops.placeholder(dtypes.int32, name='p') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.int32, ], shapes=[[]]) + stage = stager.put(pi,[x], [0]) + peek = stager.peek(gi) + size = stager.size() + + n = 10 + + with self.test_session(use_gpu=True) as sess: + for i in range(n): + sess.run(stage, feed_dict={x:i, pi:i}) + + for i in range(n): + self.assertTrue(sess.run(peek, feed_dict={gi: i}) == i) + + self.assertTrue(sess.run(size) == 10) + + def testSizeAndClear(self): + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32, name='x') + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32], + shapes=[[], [128, 128]], + names=['x', 'v']) + stage = stager.put(pi,{'x': x, 'v': v}) + size = stager.size() + clear = stager.clear() + + with self.test_session(use_gpu=True) as sess: + sess.run(stage, feed_dict={x: -1, pi: 3}) + self.assertEqual(sess.run(size), 1) + sess.run(stage, feed_dict={x: -1, pi: 1}) + self.assertEqual(sess.run(size), 2) + sess.run(clear) + self.assertEqual(sess.run(size), 0) + + + def testCapacity(self): + capacity = 3 + + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + pi = array_ops.placeholder(dtypes.int64, name='pi') + gi = array_ops.placeholder(dtypes.int64, name='gi') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.int32, ], + capacity=capacity, shapes=[[]]) + + stage = stager.put(pi, [x], [0]) + get = stager.get() + size = stager.size() + + from six.moves import queue as Queue + import threading + + queue = Queue.Queue() + n = 5 + missed = 0 + + with self.test_session(use_gpu=True) as sess: + # Stage data in a separate thread which will block + # when it hits the staging area's capacity and thus + # not fill the queue with n tokens + def thread_run(): + for i in range(n): + sess.run(stage, feed_dict={x: i, pi: i}) + queue.put(0) + + t = threading.Thread(target=thread_run) + t.start() + + # Get tokens from the queue, making notes of when we timeout + for i in range(n): + try: + queue.get(timeout=0.05) + except Queue.Empty: + missed += 1 + + # We timed out n - capacity times waiting for queue puts + self.assertTrue(missed == n - capacity) + + # Clear the staging area out a bit + for i in range(n - capacity): + sess.run(get) + + # This should now succeed + t.join() + + self.assertTrue(sess.run(size) == capacity) + + # Clear out the staging area completely + for i in range(capacity): + sess.run(get) + + def testMemoryLimit(self): + memory_limit = 512*1024 # 512K + chunk = 200*1024 # 256K + capacity = memory_limit // chunk + + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.uint8, name='x') + pi = array_ops.placeholder(dtypes.int64, name='pi') + gi = array_ops.placeholder(dtypes.int64, name='gi') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.uint8], + memory_limit=memory_limit, shapes=[[]]) + stage = stager.put(pi, [x], [0]) + get = stager.get() + size = stager.size() + + from six.moves import queue as Queue + import threading + import numpy as np + + queue = Queue.Queue() + n = 5 + missed = 0 + + with self.test_session(use_gpu=True) as sess: + # Stage data in a separate thread which will block + # when it hits the staging area's capacity and thus + # not fill the queue with n tokens + def thread_run(): + for i in range(n): + sess.run(stage, feed_dict={x: np.full(chunk, i, dtype=np.uint8), + pi: i}) + queue.put(0) + + t = threading.Thread(target=thread_run) + t.start() + + # Get tokens from the queue, making notes of when we timeout + for i in range(n): + try: + queue.get(timeout=0.05) + except Queue.Empty: + missed += 1 + + # We timed out n - capacity times waiting for queue puts + self.assertTrue(missed == n - capacity) + + # Clear the staging area out a bit + for i in range(n - capacity): + sess.run(get) + + # This should now succeed + t.join() + + self.assertTrue(sess.run(size) == capacity) + + # Clear out the staging area completely + for i in range(capacity): + sess.run(get) + + def testOrdering(self): + import six + import random + + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + pi = array_ops.placeholder(dtypes.int64, name='pi') + gi = array_ops.placeholder(dtypes.int64, name='gi') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.int32, ], + shapes=[[]], ordered=True) + stage = stager.put(pi, [x], [0]) + get = stager.get() + size = stager.size() + + n = 10 + + with self.test_session(use_gpu=True) as sess: + # Keys n-1..0 + keys = list(reversed(six.moves.range(n))) + + for i in keys: + sess.run(stage, feed_dict={pi: i, x: i}) + + self.assertTrue(sess.run(size) == n) + + # Check that key, values come out in ascending order + for i, k in enumerate(reversed(keys)): + get_key, values = sess.run(get) + self.assertTrue(i == k == get_key == values) + + self.assertTrue(sess.run(size) == 0) + + def testBarrier(self): + with self.test_session(use_gpu=True) as sess: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + f = array_ops.placeholder(dtypes.float32) + v = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + with ops.device(test.gpu_device_name()): + # Test barrier with dictionary + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32, dtypes.float32], + names=['x', 'v', 'f']) + stage_xf = stager.put(pi,{'x': x, 'f': f}) + stage_v = stager.put(pi, {'v': v}) + key, ret = stager.get(gi) + size = stager.size() + isize = stager.incomplete_size() + + # 0 complete and incomplete entries + self.assertTrue(sess.run([size, isize]) == [0, 0]) + # Stage key 0, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 0, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Stage key 1, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 1, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 2]) + + # Now complete key 0 with tuple entry v + sess.run(stage_v, feed_dict={pi: 0, v: 1}) + # 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 1]) + # We can now obtain tuple associated with key 0 + self.assertTrue(sess.run([key, ret], feed_dict={gi:0}) + == [0, { 'x':1, 'f':2, 'v':1}]) + + # 0 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Now complete key 1 with tuple entry v + sess.run(stage_v, feed_dict={pi: 1, v: 3}) + # We can now obtain tuple associated with key 1 + self.assertTrue(sess.run([key, ret], feed_dict={gi:1}) + == [1, { 'x':1, 'f':2, 'v':3}]) + + # Test again with index inserts + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32, dtypes.float32]) + stage_xf = stager.put(pi, [x, f], [0, 2]) + stage_v = stager.put(pi, [v], [1]) + key, ret = stager.get(gi) + size = stager.size() + isize = stager.incomplete_size() + + # 0 complete and incomplete entries + self.assertTrue(sess.run([size, isize]) == [0, 0]) + # Stage key 0, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 0, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Stage key 1, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 1, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 2]) + + # Now complete key 0 with tuple entry v + sess.run(stage_v, feed_dict={pi: 0, v: 1}) + # 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 1]) + # We can now obtain tuple associated with key 0 + self.assertTrue(sess.run([key, ret], feed_dict={gi:0}) + == [0, [1, 1, 2]]) + + # 0 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Now complete key 1 with tuple entry v + sess.run(stage_v, feed_dict={pi: 1, v: 3}) + # We can now obtain tuple associated with key 1 + self.assertTrue(sess.run([key, ret], feed_dict={gi:1}) + == [1, [1,3, 2]]) + + +if __name__ == '__main__': + test.main() diff --git a/tensorflow/python/kernel_tests/stage_op_test.py b/tensorflow/python/kernel_tests/stage_op_test.py index 81eee48d2e8..645ac2f1302 100644 --- a/tensorflow/python/kernel_tests/stage_op_test.py +++ b/tensorflow/python/kernel_tests/stage_op_test.py @@ -1,4 +1,4 @@ -# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ class StageTest(test.TestCase): with ops.device('/cpu:0'): x = array_ops.placeholder(dtypes.float32) v = 2. * (array_ops.zeros([128, 128]) + x) - with ops.device('/gpu:0'): + with ops.device(test.gpu_device_name()): stager = data_flow_ops.StagingArea([dtypes.float32]) stage = stager.put([v]) y = stager.get() @@ -78,18 +78,174 @@ class StageTest(test.TestCase): self.assertAllClose( 4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4) - def testColocation1(self): + def testColocation(self): + gpu_dev = test.gpu_device_name() + with ops.device('/cpu:0'): x = array_ops.placeholder(dtypes.float32) v = 2. * (array_ops.zeros([128, 128]) + x) - with ops.device('/gpu:0'): + with ops.device(gpu_dev): stager = data_flow_ops.StagingArea([dtypes.float32]) y = stager.put([v]) - self.assertEqual(y.device, '/device:GPU:0') + self.assertEqual(y.device, '/device:GPU:0' if gpu_dev + else gpu_dev) with ops.device('/cpu:0'): x = stager.get() self.assertEqual(x.device, '/device:CPU:0') + def testPeek(self): + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + p = array_ops.placeholder(dtypes.int32, name='p') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.StagingArea([dtypes.int32, ], shapes=[[]]) + stage = stager.put([x]) + peek = stager.peek(p) + ret = stager.get() + + with self.test_session(use_gpu=True) as sess: + for i in range(10): + sess.run(stage, feed_dict={x:i}) + + for i in range(10): + self.assertTrue(sess.run(peek, feed_dict={p:i}) == i) + + def testSizeAndClear(self): + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32, name='x') + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.StagingArea( + [dtypes.float32, dtypes.float32], + shapes=[[], [128, 128]], + names=['x', 'v']) + stage = stager.put({'x': x, 'v': v}) + ret = stager.get() + size = stager.size() + clear = stager.clear() + + with self.test_session(use_gpu=True) as sess: + sess.run(stage, feed_dict={x: -1}) + self.assertEqual(sess.run(size), 1) + sess.run(stage, feed_dict={x: -1}) + self.assertEqual(sess.run(size), 2) + sess.run(clear) + self.assertEqual(sess.run(size), 0) + + def testCapacity(self): + capacity = 3 + + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.StagingArea([dtypes.int32, ], + capacity=capacity, shapes=[[]]) + stage = stager.put([x]) + ret = stager.get() + size = stager.size() + + from six.moves import queue as Queue + import threading + + queue = Queue.Queue() + n = 5 + missed = 0 + + with self.test_session(use_gpu=True) as sess: + # Stage data in a separate thread which will block + # when it hits the staging area's capacity and thus + # not fill the queue with n tokens + def thread_run(): + for i in range(n): + sess.run(stage, feed_dict={x: i}) + queue.put(0) + + t = threading.Thread(target=thread_run) + t.start() + + # Get tokens from the queue, making notes of when we timeout + for i in range(n): + try: + queue.get(timeout=0.05) + except Queue.Empty: + missed += 1 + + # We timed out n - capacity times waiting for queue puts + self.assertTrue(missed == n - capacity) + + # Clear the staging area out a bit + for i in range(n - capacity): + self.assertTrue(sess.run(ret) == i) + + # Thread should be able to join now + t.join() + + self.assertTrue(sess.run(size) == capacity) + + # Clear the staging area completely + for i in range(capacity): + self.assertTrue(sess.run(ret) == i+(n-capacity)) + + self.assertTrue(sess.run(size) == 0) + + def testMemoryLimit(self): + memory_limit = 512*1024 # 512K + chunk = 200*1024 # 256K + capacity = memory_limit // chunk + + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.uint8, name='x') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.StagingArea([dtypes.uint8, ], + memory_limit=memory_limit, shapes=[[]]) + stage = stager.put([x]) + ret = stager.get() + size = stager.size() + + from six.moves import queue as Queue + import threading + import numpy as np + + queue = Queue.Queue() + n = 5 + missed = 0 + + with self.test_session(use_gpu=True) as sess: + # Stage data in a separate thread which will block + # when it hits the staging area's capacity and thus + # not fill the queue with n tokens + def thread_run(): + for i in range(n): + sess.run(stage, feed_dict={x: np.full(chunk, i, dtype=np.uint8)}) + queue.put(0) + + t = threading.Thread(target=thread_run) + t.start() + + # Get tokens from the queue, making notes of when we timeout + for i in range(n): + try: + queue.get(timeout=0.05) + except Queue.Empty: + missed += 1 + + # We timed out n - capacity times waiting for queue puts + self.assertTrue(missed == n - capacity) + + # Clear the staging area out a bit + for i in range(n - capacity): + self.assertTrue(sess.run(ret)[0] == i) + + # Thread should be able to join now + t.join() + + self.assertTrue(sess.run(size) == capacity) + + # Clear the staging area completely + for i in range(capacity): + self.assertTrue(sess.run(ret)[0] == i+(n-capacity)) + + self.assertTrue(sess.run(size) == 0) if __name__ == '__main__': test.main() diff --git a/tensorflow/python/ops/data_flow_ops.py b/tensorflow/python/ops/data_flow_ops.py index 9a208613add..eb9d0ba7e32 100644 --- a/tensorflow/python/ops/data_flow_ops.py +++ b/tensorflow/python/ops/data_flow_ops.py @@ -1,4 +1,4 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1344,72 +1344,30 @@ class SparseConditionalAccumulator(ConditionalAccumulatorBase): dense_shape=return_val.shape) -class StagingArea(object): - """Class for staging inputs. No ordering guarantees. - - A `StagingArea` is a TensorFlow data structure that stores tensors across - multiple steps, and exposes operations that can put and get tensors. - - Each `StagingArea` element is a tuple of one or more tensors, where each - tuple component has a static dtype, and may have a static shape. - - The capacity of a `StagingArea` is unbounded and supports multiple - concurrent producers and consumers; and provides exactly-once delivery. - - Each element of a `StagingArea` is a fixed-length tuple of tensors whose - dtypes are described by `dtypes`, and whose shapes are optionally described - by the `shapes` argument. - - If the `shapes` argument is specified, each component of a staging area - element must have the respective fixed shape. If it is - unspecified, different elements may have different shapes, - """ - +class BaseStagingArea(object): + """Base class for Staging Areas.""" _identifier = 0 _lock = threading.Lock() - def __init__(self, dtypes, shapes=None, names=None, shared_name=None): - """Constructs a staging area object. - - The two optional lists, `shapes` and `names`, must be of the same length - as `dtypes` if provided. The values at a given index `i` indicate the - shape and name to use for the corresponding queue component in `dtypes`. - - The device scope at the time of object creation determines where the - storage for the `StagingArea` will reside. Calls to `put` will incur a copy - to this memory space, if necessary. Tensors returned by `get` will be - placed according to the device scope when `get` is called. - - Args: - dtypes: A list of types. The length of dtypes must equal the number - of tensors in each element. - shapes: (Optional.) Constraints on the shapes of tensors in an element. - A list of shape tuples or None. This list is the same length - as dtypes. If the shape of any tensors in the element are constrained, - all must be; shapes can be None if the shapes should not be constrained. - names: (Optional.) If provided, the `get()` and - `put()` methods will use dictionaries with these names as keys. - Must be None or a list or tuple of the same length as `dtypes`. - shared_name: (Optional.) A name to be used for the shared object. By - passing the same name to two different python objects they will share - the underlying staging area. Must be a string. - - Raises: - ValueError: If one of the arguments is invalid. - """ + def __init__(self, dtypes, shapes=None, names=None, shared_name=None, + capacity=0, memory_limit=0): if shared_name is None: - self._name = ops.get_default_graph().unique_name("StagingArea") + self._name = (ops.get_default_graph() + .unique_name(self.__class__.__name__)) elif isinstance(shared_name, six.string_types): self._name = shared_name else: raise ValueError("shared_name must be a string") + self._dtypes = dtypes + if shapes is not None: if len(shapes) != len(dtypes): raise ValueError("StagingArea shapes must be the same length as dtypes") self._shapes = [tensor_shape.TensorShape(s) for s in shapes] else: self._shapes = [tensor_shape.unknown_shape() for _ in self._dtypes] + if names is not None: if len(names) != len(dtypes): raise ValueError("StagingArea names must be the same length as dtypes") @@ -1417,6 +1375,9 @@ class StagingArea(object): else: self._names = None + self._capacity = capacity + self._memory_limit = memory_limit + # all get and put ops must colocate with this op with ops.name_scope("%s_root" % self._name): self._coloc_op = control_flow_ops.no_op() @@ -1441,52 +1402,140 @@ class StagingArea(object): """The list of names for each component of a staging area element.""" return self._names - def _check_put_dtypes(self, vals): + @property + def capacity(self): + """The maximum number of elements of this staging area.""" + return self._capacity + + @property + def memory_limit(self): + """The maximum number of bytes of this staging area.""" + return self._memory_limit + + def _check_put_dtypes(self, vals, indices=None): """Validate and convert `vals` to a list of `Tensor`s. The `vals` argument can be a Tensor, a list or tuple of tensors, or a dictionary with tensor values. + If `vals` is a list, then the appropriate indices associated with the + values must be provided. + If it is a dictionary, the staging area must have been constructed with a `names` attribute and the dictionary keys must match the staging area names. + `indices` will be inferred from the dictionary keys. If the staging area was constructed with a `names` attribute, `vals` must be a dictionary. + Checks that the dtype and shape of each value matches that + of the staging area. + Args: vals: A tensor, a list or tuple of tensors, or a dictionary.. Returns: - A list of `Tensor` objects. + A (tensors, indices) tuple where `tensors` is a list of `Tensor` objects + and `indices` is a list of indices associed with the tensors. Raises: - ValueError: If `vals` is invalid. + ValueError: If `vals` or `indices` is invalid. """ if isinstance(vals, dict): if not self._names: raise ValueError( "Staging areas must have names to enqueue a dictionary") - if sorted(self._names) != sorted(vals.keys()): + if not set(vals.keys()).issubset(self._names): raise ValueError("Keys in dictionary to put do not match names " "of staging area. Dictionary: (%s), Queue: (%s)" % (sorted(vals.keys()), sorted(self._names))) # The order of values in `self._names` indicates the order in which the # tensors in the dictionary `vals` must be listed. - vals = [vals[k] for k in self._names] + vals, indices, n = zip(*[(vals[k], i, k) for i, k in enumerate(self._names) + if k in vals]) else: if self._names: raise ValueError("You must enqueue a dictionary in a staging area " "with names") + + if indices is None: + raise ValueError("Indices must be supplied when inserting a list " + "of tensors") + + if len(indices) != len(vals): + raise ValueError("Number of indices '%s' doesn't match " + "number of values '%s'") + if not isinstance(vals, (list, tuple)): vals = [vals] + indices = [0] + + # Sanity check number of values + if not len(vals) <= len(self._dtypes): + raise ValueError("Unexpected number of inputs '%s' vs '%s'" % ( + len(values), len(self._dtypes))) tensors = [] - for i, (val, dtype) in enumerate(zip(vals, self._dtypes)): - tensors.append( - ops.convert_to_tensor( - val, dtype=dtype, name="component_%d" % i)) + + for val, i in zip(vals, indices): + dtype, shape = self._dtypes[i], self._shapes[i] + # Check dtype + if not val.dtype == dtype: + raise ValueError("Datatypes do not match. '%s' != '%s'" %( + str(val.dtype), str(dtype))) + + # Check shape + val.get_shape().assert_is_compatible_with(shape) + + tensors.append(ops.convert_to_tensor(val, dtype=dtype, + name="component_%d" % i)) + + return tensors, indices + + def _create_device_transfers(self, tensors): + """Encode inter-device transfers if the current device + is not the same as the Staging Area's device + """ + + if not isinstance(tensors, (tuple, list)): + tensors = [tensors] + + curr_device_scope = control_flow_ops.no_op().device + + if curr_device_scope != self._coloc_op.device: + tensors = [array_ops.identity(t) for t in tensors] return tensors + def _get_return_value(self, tensors): + """Return the value to return from a get op. + + If the staging area has names, return a dictionary with the + names as keys. Otherwise return either a single tensor + or a list of tensors depending on the length of `tensors`. + + Args: + tensors: List of tensors from the get op. + + Returns: + A single tensor, a list of tensors, or a dictionary + of tensors. + """ + + tensors = self._create_device_transfers(tensors) + + # Sets shape + for output, shape in zip(tensors, self._shapes): + output.set_shape(shape) + + if self._names: + # The returned values in `tensors` are in the same order as + # the names in `self._names`. + return {n: tensors[i] for i, n in enumerate(self._names)} + elif len(tensors) == 1: + return tensors[0] + else: + return tensors + def _scope_vals(self, vals): """Return a list of values to pass to `name_scope()`. @@ -1503,9 +1552,86 @@ class StagingArea(object): else: return [vals] +class StagingArea(BaseStagingArea): + """Class for staging inputs. No ordering guarantees. + + A `StagingArea` is a TensorFlow data structure that stores tensors across + multiple steps, and exposes operations that can put and get tensors. + + Each `StagingArea` element is a tuple of one or more tensors, where each + tuple component has a static dtype, and may have a static shape. + + The capacity of a `StagingArea` may be bounded or unbounded. + It supports multiple concurrent producers and consumers; and + provides exactly-once delivery. + + Each element of a `StagingArea` is a fixed-length tuple of tensors whose + dtypes are described by `dtypes`, and whose shapes are optionally described + by the `shapes` argument. + + If the `shapes` argument is specified, each component of a staging area + element must have the respective fixed shape. If it is + unspecified, different elements may have different shapes, + + It can be configured with a capacity in which case + put(values) will block until space becomes available. + + Similarly, it can be configured with a memory limit which + will block put(values) until space is available. + This is mostly useful for limiting the number of tensors on + devices such as GPUs. + + All get() and peek() commands block if the the requested data + is not present in the Staging Area. + + """ + + def __init__(self, dtypes, shapes=None, names=None, shared_name=None, + capacity=0, memory_limit=0): + """Constructs a staging area object. + + The two optional lists, `shapes` and `names`, must be of the same length + as `dtypes` if provided. The values at a given index `i` indicate the + shape and name to use for the corresponding queue component in `dtypes`. + + The device scope at the time of object creation determines where the + storage for the `StagingArea` will reside. Calls to `put` will incur a copy + to this memory space, if necessary. Tensors returned by `get` will be + placed according to the device scope when `get` is called. + + Args: + dtypes: A list of types. The length of dtypes must equal the number + of tensors in each element. + capacity: (Optional.) Maximum number of elements. + An integer. If zero, the Staging Area is unbounded + memory_limit: (Optional.) Maximum number of bytes of all tensors + in the Staging Area. + An integer. If zero, the Staging Area is unbounded + shapes: (Optional.) Constraints on the shapes of tensors in an element. + A list of shape tuples or None. This list is the same length + as dtypes. If the shape of any tensors in the element are constrained, + all must be; shapes can be None if the shapes should not be constrained. + names: (Optional.) If provided, the `get()` and + `put()` methods will use dictionaries with these names as keys. + Must be None or a list or tuple of the same length as `dtypes`. + shared_name: (Optional.) A name to be used for the shared object. By + passing the same name to two different python objects they will share + the underlying staging area. Must be a string. + + Raises: + ValueError: If one of the arguments is invalid. + """ + + super(StagingArea, self).__init__(dtypes, shapes, + names, shared_name, + capacity, memory_limit) + def put(self, values, name=None): """Create an op that places a value into the staging area. + This operation will block if the `StagingArea` has reached + its capacity. + Args: values: Tensor (or a tuple of Tensors) to place into the staging area. name: A name for the operation (optional). @@ -1518,46 +1644,23 @@ class StagingArea(object): """ with ops.name_scope(name, "%s_put" % self._name, self._scope_vals(values)) as scope: - vals = self._check_put_dtypes(values) - if len(values) != len(self._dtypes): - raise ValueError("Unexpected number of inputs " + str(len(values)) + - "vs " + str(len(self._dtypes))) - for val, dtype in zip(vals, self._dtypes): - if val.dtype != dtype: - raise ValueError("Datatypes do not match. " + str(val.dtype) + " != " - + str(dtype)) - for val, shape in zip(vals, self._shapes): - val.get_shape().assert_is_compatible_with(shape) + # Hard-code indices for this staging area + indices = range(len(values)) if isinstance(values, (list, tuple)) else None + vals, _ = self._check_put_dtypes(values, indices) with ops.colocate_with(self._coloc_op): op = gen_data_flow_ops.stage(values=vals, shared_name=self._name, - name=scope) + name=scope, capacity=self._capacity, + memory_limit=self._memory_limit) return op - def _get_return_value(self, tensors): - """Return the value to return from a get op. + def __internal_get(self, get_fn, name): + with ops.colocate_with(self._coloc_op): + ret = get_fn() - If the staging area has names, return a dictionary with the - names as keys. Otherwise return either a single tensor - or a list of tensors depending on the length of `tensors`. - - Args: - tensors: List of tensors from the get op. - - Returns: - A single tensor, a list of tensors, or a dictionary - of tensors. - """ - if self._names: - # The returned values in `tensors` are in the same order as - # the names in `self._names`. - return {n: tensors[i] for i, n in enumerate(self._names)} - elif len(tensors) == 1: - return tensors[0] - else: - return tensors + return self._get_return_value(ret) def get(self, name=None): """Gets one element from this staging area. @@ -1584,19 +1687,388 @@ class StagingArea(object): if name is None: name = "%s_get" % self._name + fn = lambda: gen_data_flow_ops.unstage(dtypes=self._dtypes, + shared_name=self._name, name=name, + capacity=self._capacity, + memory_limit=self._memory_limit) + + return self.__internal_get(fn, name) + + def peek(self, index, name=None): + """Peeks at an element in the staging area. + + If the staging area is too small to contain the element at + the specified index, it will block until enough elements + are inserted to complete the operation. + + The placement of the returned tensor will be determined by + the current device scope when this function is called. + + Args: + index: The index of the tensor within the staging area + to look up. + name: A name for the operation (optional). + + Returns: + The tuple of tensors that was gotten. + """ + if name is None: + name = "%s_peek" % self._name + + fn = lambda: gen_data_flow_ops.stage_peek(index, + dtypes=self._dtypes, shared_name=self._name, + name=name, capacity=self._capacity, + memory_limit=self._memory_limit) + + return self.__internal_get(fn, name) + + def size(self, name=None): + """Returns the number of elements in the staging area. + + Args: + name: A name for the operation (optional) + + Returns: + The created op + """ + if name is None: + name = "%s_size" % self._name + + return gen_data_flow_ops.stage_size(name=name, shared_name=self._name, + dtypes=self._dtypes, capacity=self._capacity, + memory_limit=self._memory_limit) + + def clear(self, name=None): + """Clears the staging area. + + Args: + name: A name for the operation (optional) + + Returns: + The created op + """ + if name is None: + name = "%s_clear" % self._name + + return gen_data_flow_ops.stage_clear(name=name, shared_name=self._name, + dtypes=self._dtypes, capacity=self._capacity, + memory_limit=self._memory_limit) + +class MapStagingArea(BaseStagingArea): + """ + A `MapStagingArea` is a TensorFlow data structure that stores tensors across + multiple steps, and exposes operations that can put and get tensors. + + Each `MapStagingArea` element is a (key, value) pair. + Only int64 keys are supported, other types should be + hashed to produce a key. + Values are a tuple of one or more tensors. + Each tuple component has a static dtype, + and may have a static shape. + + The capacity of a `MapStagingArea` may be bounded or unbounded. + It supports multiple concurrent producers and consumers; and + provides exactly-once delivery. + + Each value tuple of a `MapStagingArea` is a fixed-length tuple of tensors whose + dtypes are described by `dtypes`, and whose shapes are optionally described + by the `shapes` argument. + + If the `shapes` argument is specified, each component of a staging area + element must have the respective fixed shape. If it is + unspecified, different elements may have different shapes, + + It behaves like an associative container with support for: + + - put(key, values) + - peek(key) like dict.get(key) + - get(key) like dict.pop(key) + - get(key=None) like dict.popitem() + - size() + - clear() + + If ordered a tree structure ordered by key will be used and + get(key=None) will remove (key, value) pairs in increasing key order. + Otherwise a hashtable + + It can be configured with a capacity in which case + put(key, values) will block until space becomes available. + + Similarly, it can be configured with a memory limit which + will block put(key, values) until space is available. + This is mostly useful for limiting the number of tensors on + devices such as GPUs. + + All get() and peek() commands block if the requested + (key, value) pair is not present in the staging area. + + Incomplete puts are supported and will be placed in an incomplete + hash until such time as all values associated with the key have + been inserted. Once completed, this (key, value) pair will be + inserted into the main data structure. Data in the incomplete set + counts towards the memory limit, but not towards capacity limit. + """ + + def __init__(self, dtypes, shapes=None, names=None, shared_name=None, + ordered=False, capacity=0, memory_limit=0): + """ + Args: + dtypes: A list of types. The length of dtypes must equal the number + of tensors in each element. + capacity: (Optional.) Maximum number of elements. + An integer. If zero, the Staging Area is unbounded + memory_limit: (Optional.) Maximum number of bytes of all tensors + in the Staging Area (excluding keys). + An integer. If zero, the Staging Area is unbounded + ordered: (Optional.) If True the underlying data structure + is a tree ordered on key. Otherwise assume a hashtable. + shapes: (Optional.) Constraints on the shapes of tensors in an element. + A list of shape tuples or None. This list is the same length + as dtypes. If the shape of any tensors in the element are constrained, + all must be; shapes can be None if the shapes should not be constrained. + names: (Optional.) If provided, the `get()` and + `put()` methods will use dictionaries with these names as keys. + Must be None or a list or tuple of the same length as `dtypes`. + shared_name: (Optional.) A name to be used for the shared object. By + passing the same name to two different python objects they will share + the underlying staging area. Must be a string. + + Raises: + ValueError: If one of the arguments is invalid. + + """ + + super(MapStagingArea, self).__init__(dtypes, shapes, + names, shared_name, + capacity, memory_limit) + + # Defer to different methods depending if the map is ordered + self._ordered = ordered + + if ordered: + self._put_fn = gen_data_flow_ops.ordered_map_stage + self._pop_fn = gen_data_flow_ops.ordered_map_unstage + self._popitem_fn = gen_data_flow_ops.ordered_map_unstage_no_key + self._peek_fn = gen_data_flow_ops.ordered_map_peek + self._size_fn = gen_data_flow_ops.ordered_map_size + self._incomplete_size_fn = gen_data_flow_ops.ordered_map_incomplete_size + self._clear_fn = gen_data_flow_ops.ordered_map_clear + else: + self._put_fn = gen_data_flow_ops.map_stage + self._pop_fn = gen_data_flow_ops.map_unstage + self._popitem_fn = gen_data_flow_ops.map_unstage_no_key + self._peek_fn = gen_data_flow_ops.map_peek + self._size_fn = gen_data_flow_ops.map_size + self._incomplete_size_fn = gen_data_flow_ops.map_incomplete_size + self._clear_fn = gen_data_flow_ops.map_clear + + def put(self, key, vals, indices=None, name=None): + """ + Create an op that stores the (key, vals) pair in the staging area. + + Incomplete puts are possible, preferably using a dictionary for vals + as the appropriate dtypes and shapes can be inferred from the value names + dictionary key values. If vals is a list or tuple, indices must + also be specified so that the op knows at which element position + to perform the insert. + + This operation will block if the capacity or memory limit of this + container is reached. + + Args: + key: Key associated with the data + vals: Tensor (or a dict/tuple of Tensors) to place + into the staging area. + indices: (Optional) if vals is a tuple/list, this is required. + name: A name for the operation (optional) + + Returns: + The created op + + Raises: + ValueError: If the number or type of inputs don't match the staging area. + """ + + with ops.name_scope(name, "%s_put" % self._name, + self._scope_vals(vals)) as scope: + + vals, indices = self._check_put_dtypes(vals, indices) + + with ops.colocate_with(self._coloc_op): + op = self._put_fn(key, indices, vals, dtypes=self._dtypes, + shared_name=self._name, name=scope, + capacity=self._capacity, + memory_limit=self._memory_limit) + return op + + def peek(self, key, name=None): + """ + Peeks at staging area data associated with the key. + + If the key is not in the staging area, it will block + until the associated (key, value) is inserted. + + Args: + key: Key associated with the required data + name: A name for the operation (optional) + + Returns: + The created op + """ + + if name is None: + name = "%s_pop" % self._name + with ops.colocate_with(self._coloc_op): - ret = gen_data_flow_ops.unstage(dtypes=self._dtypes, - shared_name=self._name, name=name) + result = self._peek_fn(key, shared_name=self._name, + dtypes=self._dtypes, + name=name, + capacity=self._capacity, + memory_limit=self._memory_limit) - curr_device_scope = control_flow_ops.no_op().device - if curr_device_scope != self._coloc_op.device: - for i in range(len(ret)): - ret[i] = array_ops.identity(ret[i]) + return self._get_return_value(result) - for output, shape in zip(ret, self._shapes): - output.set_shape(shape) + def get(self, key=None, name=None): + """ + If the key is provided, the associated (key, value) + is returned from the staging area. If the key is not + in the staging area, this method will block until + the associated (key, value) is inserted. - return self._get_return_value(ret) + If no key is provided and the staging area is ordered, + the (key, value) with the smallest key will be returned. + Otherwise, a random (key, value) will be returned. + + If the staging area is empty when this operation executes, + it will block until there is an element to dequeue. + + Args: + key: Key associated with the required data (Optional) + name: A name for the operation (optional) + + Returns: + The created op + """ + if key is None: + return self._popitem(name) + else: + return self._pop(key, name) + + + def _pop(self, key, name=None): + """ + Remove and return the associated (key, value) + is returned from the staging area. If the key is not + in the staging area, this method will block until + the associated (key, value) is inserted. + + Args: + key: Key associated with the required data + name: A name for the operation (optional) + + Returns: + The created op + """ + if name is None: + name = "%s_get" % self._name + + with ops.colocate_with(self._coloc_op): + result = self._pop_fn(key, shared_name=self._name, + dtypes=self._dtypes, + name=name, + capacity=self._capacity, + memory_limit=self._memory_limit) + + return key, self._get_return_value(result) + + def _popitem(self, name=None): + """ + If the staging area is ordered, + the (key, value) with the smallest key will be returned. + Otherwise, a random (key, value) will be returned. + + If the staging area is empty when this operation executes, + it will block until there is an element to dequeue. + + Args: + key: Key associated with the required data + name: A name for the operation (optional) + + Returns: + The created op + """ + if name is None: + name = "%s_get_nokey" % self._name + + with ops.colocate_with(self._coloc_op): + key, result = self._popitem_fn(shared_name=self._name, + dtypes=self._dtypes, + name=name, + capacity=self._capacity, + memory_limit=self._memory_limit) + + # Separate keys and results out from + # underlying namedtuple + key = self._create_device_transfers(key)[0] + result = self._get_return_value(result) + + return key, result + + def size(self, name=None): + """ + Returns the number of elements in the staging area. + + Args: + name: A name for the operation (optional) + + Returns: + The created op + """ + if name is None: + name = "%s_size" % self._name + + return self._size_fn(shared_name=self._name, + name=name, dtypes=self._dtypes, + capacity=self._capacity, + memory_limit=self._memory_limit) + + def incomplete_size(self, name=None): + """ + Returns the number of incomplete elements in the staging area. + + Args: + name: A name for the operation (optional) + + Returns: + The created op + """ + if name is None: + name = "%s_incomplete_size" % self._name + + return self._incomplete_size_fn(shared_name=self._name, + name=name, dtypes=self._dtypes, + capacity=self._capacity, + memory_limit=self._memory_limit) + + + + def clear(self, name=None): + """ + Clears the staging area. + + Args: + name: A name for the operation (optional) + + Returns: + The created op + """ + if name is None: + name = "%s_clear" % self._name + + return self._clear_fn(shared_name=self._name, + name=name, dtypes=self._dtypes, + capacity=self._capacity, + memory_limit=self._memory_limit) class RecordInput(object):