From f98b3bc7012085096d8171fe56f6004677461567 Mon Sep 17 00:00:00 2001 From: Allen Lavoie Date: Mon, 6 Jan 2020 10:08:02 -0800 Subject: [PATCH] Add an internal GPU-compatible version of FIFOQueue. Just registers a FIFOQueueV2 GPU kernel with the queue resource in host memory, then makes sure the public FIFOQueue requests a CPU placement. Internal for now because adding EnqueueMany and DequeueMany GPU kernels is somewhat complicated, and without those placing the FIFOQueue resource on a GPU device is a nasty surprise. Eventually we may want those kernels. PiperOrigin-RevId: 288322387 Change-Id: I294b1c878b47d2374468ec8f86ce0027b4f09112 --- tensorflow/core/kernels/fifo_queue_op.cc | 4 +- .../python/kernel_tests/fifo_queue_test.py | 38 +++++++-- tensorflow/python/ops/data_flow_ops.py | 79 ++++++++++++++++++- 3 files changed, 114 insertions(+), 7 deletions(-) diff --git a/tensorflow/core/kernels/fifo_queue_op.cc b/tensorflow/core/kernels/fifo_queue_op.cc index 80869768f18..96d6b63165c 100644 --- a/tensorflow/core/kernels/fifo_queue_op.cc +++ b/tensorflow/core/kernels/fifo_queue_op.cc @@ -20,5 +20,7 @@ namespace tensorflow { REGISTER_KERNEL_BUILDER(Name("FIFOQueue").Device(DEVICE_CPU), FIFOQueueOp); REGISTER_KERNEL_BUILDER(Name("FIFOQueueV2").Device(DEVICE_CPU), FIFOQueueOp); - +REGISTER_KERNEL_BUILDER( + Name("FIFOQueueV2").Device(DEVICE_DEFAULT).HostMemory("handle"), + FIFOQueueOp); } // namespace tensorflow diff --git a/tensorflow/python/kernel_tests/fifo_queue_test.py b/tensorflow/python/kernel_tests/fifo_queue_test.py index 6c8bdbd5fec..880e949dd70 100644 --- a/tensorflow/python/kernel_tests/fifo_queue_test.py +++ b/tensorflow/python/kernel_tests/fifo_queue_test.py @@ -52,7 +52,7 @@ class FIFOQueueTest(test.TestCase): q = data_flow_ops.FIFOQueue(10, dtypes_lib.float32, name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" - name:'Q' op:'FIFOQueueV2' + name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2' attr { key: 'component_types' value { list { type: DT_FLOAT } } } attr { key: 'shapes' value { list {} } } attr { key: 'capacity' value { i: 10 } } @@ -68,7 +68,7 @@ class FIFOQueueTest(test.TestCase): name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" - name:'Q' op:'FIFOQueueV2' + name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2' attr { key: 'component_types' value { list { type: DT_INT32 type : DT_FLOAT } } } @@ -87,7 +87,7 @@ class FIFOQueueTest(test.TestCase): name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" - name:'Q' op:'FIFOQueueV2' + name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2' attr { key: 'component_types' value { list { type: DT_INT32 type : DT_FLOAT } } } @@ -399,6 +399,34 @@ class FIFOQueueTest(test.TestCase): _f() +@test_util.run_all_in_graph_and_eager_modes +class GPUCompatibleFIFOQueueTests(test.TestCase): + + def testEnqueueWithShape(self): + with test_util.use_gpu(): + q = data_flow_ops.GPUCompatibleFIFOQueue( + 10, dtypes_lib.float32, shapes=(3, 2)) + self.evaluate(q.enqueue(([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],))) + with self.assertRaises(ValueError): + q.enqueue(([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]],)) + self.assertEqual(1, self.evaluate(q.size())) + + def testEnqueueDequeue(self): + with test_util.use_gpu(): + q = data_flow_ops.GPUCompatibleFIFOQueue(10, dtypes_lib.float32) + elems_numpy = [10.0, 20.0, 30.0] + elems = [constant_op.constant(x) for x in elems_numpy] + + for x in elems: + self.evaluate(q.enqueue((x,))) + + for i in xrange(len(elems)): + dequeued_tensor = q.dequeue() + self.assertEqual(elems[0].device, dequeued_tensor.device) + vals = self.evaluate(dequeued_tensor) + self.assertEqual([elems_numpy[i]], vals) + + @test_util.run_v1_only( "These tests can likely run in 2.x with some fixes, but have not been " "converted yet. Currently they hold on to operations and rely on " @@ -1619,7 +1647,7 @@ class FIFOQueueDictTest(test.TestCase): name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" - name:'Q' op:'FIFOQueueV2' + name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2' attr { key: 'component_types' value { list { type: DT_INT32 type : DT_FLOAT } } } @@ -1640,7 +1668,7 @@ class FIFOQueueDictTest(test.TestCase): name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" - name:'Q' op:'FIFOQueueV2' + name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2' attr { key: 'component_types' value { list { type: DT_INT32 type : DT_FLOAT } } } diff --git a/tensorflow/python/ops/data_flow_ops.py b/tensorflow/python/ops/data_flow_ops.py index 05020b8d64c..303e02603df 100644 --- a/tensorflow/python/ops/data_flow_ops.py +++ b/tensorflow/python/ops/data_flow_ops.py @@ -754,7 +754,7 @@ class FIFOQueue(QueueBase): dtypes = _as_type_list(dtypes) shapes = _as_shape_list(shapes, dtypes) names = _as_name_list(names, dtypes) - with ops.init_scope(): + with ops.init_scope(), ops.device("CPU"): queue_ref = gen_data_flow_ops.fifo_queue_v2( component_types=dtypes, shapes=shapes, @@ -765,6 +765,83 @@ class FIFOQueue(QueueBase): super(FIFOQueue, self).__init__(dtypes, shapes, names, queue_ref) +# TODO(allenl): If GPU-compatible queues turn out to be useful, we should +# implement GPU kernels for EnqueueMany and DequeueMany so we can make the +# public FIFOQueue GPU-compatible and remove this internal version. +class GPUCompatibleFIFOQueue(QueueBase): + """A queue implementation that dequeues elements in first-in first-out order. + + GPUCompatibleFIFOQueue is like FIFOQueue, but the queue resource may be placed + either on a CPU or on a GPU. It is not cross-device: enqueues and dequeues + will be colocated with the queue resource. GPUCompatibleFIFOQueue only + supports enqueue and dequeue at the moment, not enqueue_many or dequeue_many. + + See `tf.queue.QueueBase` for a description of the methods on this class. + """ + + def __init__(self, + capacity, + dtypes, + shapes=None, + names=None, + shared_name=None, + name="fifo_queue"): + """Creates a queue that dequeues elements in a first-in first-out order. + + A `FIFOQueue` has bounded capacity; supports multiple concurrent + producers and consumers; and provides exactly-once delivery. + + A `FIFOQueue` holds a list of up to `capacity` elements. Each + element is a fixed-length tuple of tensors whose dtypes are + described by `dtypes`, and whose shapes are optionally described + by the `shapes` argument. + + If the `shapes` argument is specified, each component of a queue + element must have the respective fixed shape. If it is + unspecified, different queue elements may have different shapes, + but the use of `dequeue_many` is disallowed. + + Args: + capacity: An integer. The upper bound on the number of elements + that may be stored in this queue. + dtypes: A list of `DType` objects. The length of `dtypes` must equal + the number of tensors in each queue element. + shapes: (Optional.) A list of fully-defined `TensorShape` objects + with the same length as `dtypes`, or `None`. + names: (Optional.) A list of string naming the components in the queue + with the same length as `dtypes`, or `None`. If specified the dequeue + methods return a dictionary with the names as keys. + shared_name: (Optional.) If non-empty, this queue will be shared under + the given name across multiple sessions. + name: Optional name for the queue operation. + """ + dtypes = _as_type_list(dtypes) + shapes = _as_shape_list(shapes, dtypes) + names = _as_name_list(names, dtypes) + with ops.init_scope(): + queue_ref = gen_data_flow_ops.fifo_queue_v2( + component_types=dtypes, + shapes=shapes, + capacity=capacity, + shared_name=_shared_name(shared_name), + name=name) + + super(GPUCompatibleFIFOQueue, self).__init__( + dtypes, shapes, names, queue_ref) + + def enqueue_many(self, vals, name=None): + """enqueue_many is not supported on GPUCompatibleFIFOQueue.""" + raise NotImplementedError( + "GPUCompatibleFIFOQueue does not support enqueue_many or dequeue_many, " + "only enqueue and dequeue.") + + def dequeue_many(self, n, name=None): + """dequeue_many is not supported on GPUCompatibleFIFOQueue.""" + raise NotImplementedError( + "GPUCompatibleFIFOQueue does not support enqueue_many or dequeue_many, " + "only enqueue and dequeue.") + + @tf_export( "queue.PaddingFIFOQueue", v1=["queue.PaddingFIFOQueue", "io.PaddingFIFOQueue", "PaddingFIFOQueue"])