Add an internal GPU-compatible version of FIFOQueue.

Just registers a FIFOQueueV2 GPU kernel with the queue resource in host memory, then makes sure the public FIFOQueue requests a CPU placement.

Internal for now because adding EnqueueMany and DequeueMany GPU kernels is somewhat complicated, and without those placing the FIFOQueue resource on a GPU device is a nasty surprise. Eventually we may want those kernels.

PiperOrigin-RevId: 288322387
Change-Id: I294b1c878b47d2374468ec8f86ce0027b4f09112
This commit is contained in:
Allen Lavoie 2020-01-06 10:08:02 -08:00 committed by TensorFlower Gardener
parent feffae326d
commit f98b3bc701
3 changed files with 114 additions and 7 deletions

View File

@ -20,5 +20,7 @@ namespace tensorflow {
REGISTER_KERNEL_BUILDER(Name("FIFOQueue").Device(DEVICE_CPU), FIFOQueueOp);
REGISTER_KERNEL_BUILDER(Name("FIFOQueueV2").Device(DEVICE_CPU), FIFOQueueOp);
REGISTER_KERNEL_BUILDER(
Name("FIFOQueueV2").Device(DEVICE_DEFAULT).HostMemory("handle"),
FIFOQueueOp);
} // namespace tensorflow

View File

@ -52,7 +52,7 @@ class FIFOQueueTest(test.TestCase):
q = data_flow_ops.FIFOQueue(10, dtypes_lib.float32, name="Q")
self.assertTrue(isinstance(q.queue_ref, ops.Tensor))
self.assertProtoEquals("""
name:'Q' op:'FIFOQueueV2'
name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2'
attr { key: 'component_types' value { list { type: DT_FLOAT } } }
attr { key: 'shapes' value { list {} } }
attr { key: 'capacity' value { i: 10 } }
@ -68,7 +68,7 @@ class FIFOQueueTest(test.TestCase):
name="Q")
self.assertTrue(isinstance(q.queue_ref, ops.Tensor))
self.assertProtoEquals("""
name:'Q' op:'FIFOQueueV2'
name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2'
attr { key: 'component_types' value { list {
type: DT_INT32 type : DT_FLOAT
} } }
@ -87,7 +87,7 @@ class FIFOQueueTest(test.TestCase):
name="Q")
self.assertTrue(isinstance(q.queue_ref, ops.Tensor))
self.assertProtoEquals("""
name:'Q' op:'FIFOQueueV2'
name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2'
attr { key: 'component_types' value { list {
type: DT_INT32 type : DT_FLOAT
} } }
@ -399,6 +399,34 @@ class FIFOQueueTest(test.TestCase):
_f()
@test_util.run_all_in_graph_and_eager_modes
class GPUCompatibleFIFOQueueTests(test.TestCase):
def testEnqueueWithShape(self):
with test_util.use_gpu():
q = data_flow_ops.GPUCompatibleFIFOQueue(
10, dtypes_lib.float32, shapes=(3, 2))
self.evaluate(q.enqueue(([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],)))
with self.assertRaises(ValueError):
q.enqueue(([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]],))
self.assertEqual(1, self.evaluate(q.size()))
def testEnqueueDequeue(self):
with test_util.use_gpu():
q = data_flow_ops.GPUCompatibleFIFOQueue(10, dtypes_lib.float32)
elems_numpy = [10.0, 20.0, 30.0]
elems = [constant_op.constant(x) for x in elems_numpy]
for x in elems:
self.evaluate(q.enqueue((x,)))
for i in xrange(len(elems)):
dequeued_tensor = q.dequeue()
self.assertEqual(elems[0].device, dequeued_tensor.device)
vals = self.evaluate(dequeued_tensor)
self.assertEqual([elems_numpy[i]], vals)
@test_util.run_v1_only(
"These tests can likely run in 2.x with some fixes, but have not been "
"converted yet. Currently they hold on to operations and rely on "
@ -1619,7 +1647,7 @@ class FIFOQueueDictTest(test.TestCase):
name="Q")
self.assertTrue(isinstance(q.queue_ref, ops.Tensor))
self.assertProtoEquals("""
name:'Q' op:'FIFOQueueV2'
name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2'
attr { key: 'component_types' value { list {
type: DT_INT32 type : DT_FLOAT
} } }
@ -1640,7 +1668,7 @@ class FIFOQueueDictTest(test.TestCase):
name="Q")
self.assertTrue(isinstance(q.queue_ref, ops.Tensor))
self.assertProtoEquals("""
name:'Q' op:'FIFOQueueV2'
name:'Q' device: "/device:CPU:*" op:'FIFOQueueV2'
attr { key: 'component_types' value { list {
type: DT_INT32 type : DT_FLOAT
} } }

View File

@ -754,7 +754,7 @@ class FIFOQueue(QueueBase):
dtypes = _as_type_list(dtypes)
shapes = _as_shape_list(shapes, dtypes)
names = _as_name_list(names, dtypes)
with ops.init_scope():
with ops.init_scope(), ops.device("CPU"):
queue_ref = gen_data_flow_ops.fifo_queue_v2(
component_types=dtypes,
shapes=shapes,
@ -765,6 +765,83 @@ class FIFOQueue(QueueBase):
super(FIFOQueue, self).__init__(dtypes, shapes, names, queue_ref)
# TODO(allenl): If GPU-compatible queues turn out to be useful, we should
# implement GPU kernels for EnqueueMany and DequeueMany so we can make the
# public FIFOQueue GPU-compatible and remove this internal version.
class GPUCompatibleFIFOQueue(QueueBase):
"""A queue implementation that dequeues elements in first-in first-out order.
GPUCompatibleFIFOQueue is like FIFOQueue, but the queue resource may be placed
either on a CPU or on a GPU. It is not cross-device: enqueues and dequeues
will be colocated with the queue resource. GPUCompatibleFIFOQueue only
supports enqueue and dequeue at the moment, not enqueue_many or dequeue_many.
See `tf.queue.QueueBase` for a description of the methods on this class.
"""
def __init__(self,
capacity,
dtypes,
shapes=None,
names=None,
shared_name=None,
name="fifo_queue"):
"""Creates a queue that dequeues elements in a first-in first-out order.
A `FIFOQueue` has bounded capacity; supports multiple concurrent
producers and consumers; and provides exactly-once delivery.
A `FIFOQueue` holds a list of up to `capacity` elements. Each
element is a fixed-length tuple of tensors whose dtypes are
described by `dtypes`, and whose shapes are optionally described
by the `shapes` argument.
If the `shapes` argument is specified, each component of a queue
element must have the respective fixed shape. If it is
unspecified, different queue elements may have different shapes,
but the use of `dequeue_many` is disallowed.
Args:
capacity: An integer. The upper bound on the number of elements
that may be stored in this queue.
dtypes: A list of `DType` objects. The length of `dtypes` must equal
the number of tensors in each queue element.
shapes: (Optional.) A list of fully-defined `TensorShape` objects
with the same length as `dtypes`, or `None`.
names: (Optional.) A list of string naming the components in the queue
with the same length as `dtypes`, or `None`. If specified the dequeue
methods return a dictionary with the names as keys.
shared_name: (Optional.) If non-empty, this queue will be shared under
the given name across multiple sessions.
name: Optional name for the queue operation.
"""
dtypes = _as_type_list(dtypes)
shapes = _as_shape_list(shapes, dtypes)
names = _as_name_list(names, dtypes)
with ops.init_scope():
queue_ref = gen_data_flow_ops.fifo_queue_v2(
component_types=dtypes,
shapes=shapes,
capacity=capacity,
shared_name=_shared_name(shared_name),
name=name)
super(GPUCompatibleFIFOQueue, self).__init__(
dtypes, shapes, names, queue_ref)
def enqueue_many(self, vals, name=None):
"""enqueue_many is not supported on GPUCompatibleFIFOQueue."""
raise NotImplementedError(
"GPUCompatibleFIFOQueue does not support enqueue_many or dequeue_many, "
"only enqueue and dequeue.")
def dequeue_many(self, n, name=None):
"""dequeue_many is not supported on GPUCompatibleFIFOQueue."""
raise NotImplementedError(
"GPUCompatibleFIFOQueue does not support enqueue_many or dequeue_many, "
"only enqueue and dequeue.")
@tf_export(
"queue.PaddingFIFOQueue",
v1=["queue.PaddingFIFOQueue", "io.PaddingFIFOQueue", "PaddingFIFOQueue"])