Making the Eager iterator use the new copy_to_device.
This CL gets rid of the forced placement of all eager datasets / iterators on the CPU since now we can have some datasets on the GPU. PiperOrigin-RevId: 205264791
This commit is contained in:
parent
9dfa333cc8
commit
1044888430
@ -480,6 +480,11 @@ class _CopyToDeviceDataset(dataset_ops.Dataset):
|
|||||||
|
|
||||||
self._finalize_func = _remote_finalize_func
|
self._finalize_func = _remote_finalize_func
|
||||||
self._finalize_captured_args = _remote_finalize_func.captured_inputs
|
self._finalize_captured_args = _remote_finalize_func.captured_inputs
|
||||||
|
|
||||||
|
g = ops.get_default_graph()
|
||||||
|
_remote_init_func.add_to_graph(g)
|
||||||
|
_remote_next_func.add_to_graph(g)
|
||||||
|
_remote_finalize_func.add_to_graph(g)
|
||||||
# pylint: enable=protected-scope
|
# pylint: enable=protected-scope
|
||||||
|
|
||||||
# The one_shot_iterator implementation needs a 0 arg _make_dataset function
|
# The one_shot_iterator implementation needs a 0 arg _make_dataset function
|
||||||
|
@ -18,33 +18,14 @@ from __future__ import absolute_import
|
|||||||
from __future__ import division
|
from __future__ import division
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
import threading
|
|
||||||
|
|
||||||
from tensorflow.contrib.data.python.ops import prefetching_ops
|
from tensorflow.contrib.data.python.ops import prefetching_ops
|
||||||
from tensorflow.python.data.ops import iterator_ops
|
from tensorflow.python.data.ops import iterator_ops
|
||||||
from tensorflow.python.data.util import nest
|
|
||||||
from tensorflow.python.data.util import sparse
|
|
||||||
from tensorflow.python.eager import context
|
from tensorflow.python.eager import context
|
||||||
from tensorflow.python.framework import constant_op
|
|
||||||
from tensorflow.python.framework import dtypes
|
|
||||||
from tensorflow.python.framework import function
|
|
||||||
from tensorflow.python.framework import ops
|
from tensorflow.python.framework import ops
|
||||||
from tensorflow.python.ops import gen_dataset_ops
|
from tensorflow.python.ops import gen_dataset_ops
|
||||||
from tensorflow.python.ops import resource_variable_ops
|
|
||||||
from tensorflow.python.training.checkpointable import base as checkpointable
|
from tensorflow.python.training.checkpointable import base as checkpointable
|
||||||
from tensorflow.python.training.saver import BaseSaverBuilder
|
from tensorflow.python.training.saver import BaseSaverBuilder
|
||||||
|
|
||||||
_uid_counter = 0
|
|
||||||
_uid_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def _generate_shared_name(prefix):
|
|
||||||
with _uid_lock:
|
|
||||||
global _uid_counter
|
|
||||||
uid = _uid_counter
|
|
||||||
_uid_counter += 1
|
|
||||||
return "{}{}".format(prefix, uid)
|
|
||||||
|
|
||||||
|
|
||||||
class Iterator(iterator_ops.EagerIterator, checkpointable.CheckpointableBase):
|
class Iterator(iterator_ops.EagerIterator, checkpointable.CheckpointableBase):
|
||||||
"""An iterator producing tf.Tensor objects from a tf.data.Dataset.
|
"""An iterator producing tf.Tensor objects from a tf.data.Dataset.
|
||||||
@ -80,38 +61,18 @@ class Iterator(iterator_ops.EagerIterator, checkpointable.CheckpointableBase):
|
|||||||
"`tf.contrib.eager.Iterator`. Use `for ... in dataset:` to iterate "
|
"`tf.contrib.eager.Iterator`. Use `for ... in dataset:` to iterate "
|
||||||
"over the dataset instead.")
|
"over the dataset instead.")
|
||||||
|
|
||||||
super(Iterator, self).__init__(dataset)
|
|
||||||
if not context.context().device_spec.device_type:
|
if not context.context().device_spec.device_type:
|
||||||
is_remote_device = False
|
is_remote_device = False
|
||||||
else:
|
else:
|
||||||
is_remote_device = context.context().device_spec.device_type != "CPU"
|
is_remote_device = context.context().device_spec.device_type != "CPU"
|
||||||
self._buffer_resource_handle = None
|
|
||||||
if is_remote_device:
|
if is_remote_device:
|
||||||
with ops.device("/device:CPU:0"):
|
with ops.device(None):
|
||||||
iter_string_handle = gen_dataset_ops.iterator_to_string_handle(
|
# Let the placer figure out where to place the various functions etc.
|
||||||
self._resource)
|
# created by the CopyToDeviceDataset.
|
||||||
|
dataset = dataset.apply(prefetching_ops.copy_to_device(
|
||||||
@function.Defun(dtypes.string)
|
context.context().device_name))
|
||||||
def remote_fn(h):
|
dataset = dataset.prefetch(1)
|
||||||
remote_iterator = iterator_ops.Iterator.from_string_handle(
|
super(Iterator, self).__init__(dataset)
|
||||||
h, self.output_types, self.output_shapes, self.output_classes)
|
|
||||||
return remote_iterator.get_next()
|
|
||||||
|
|
||||||
remote_fn.add_to_graph(None)
|
|
||||||
target = constant_op.constant("/device:CPU:0")
|
|
||||||
with ops.device(self._device):
|
|
||||||
self._buffer_resource_handle = prefetching_ops.function_buffering_resource( # pylint: disable=line-too-long
|
|
||||||
string_arg=iter_string_handle,
|
|
||||||
output_types=self._flat_output_types,
|
|
||||||
f=remote_fn,
|
|
||||||
target_device=target,
|
|
||||||
buffer_size=10,
|
|
||||||
container="",
|
|
||||||
shared_name=_generate_shared_name(
|
|
||||||
"contrib_eager_iterator_function_buffer_resource"))
|
|
||||||
self._buffer_resource_deleter = resource_variable_ops.EagerResourceDeleter( # pylint: disable=line-too-long
|
|
||||||
handle=self._buffer_resource_handle,
|
|
||||||
handle_device=self._device)
|
|
||||||
|
|
||||||
def _next_internal(self):
|
def _next_internal(self):
|
||||||
"""Returns a nested structure of `tf.Tensor`s containing the next element.
|
"""Returns a nested structure of `tf.Tensor`s containing the next element.
|
||||||
@ -120,15 +81,6 @@ class Iterator(iterator_ops.EagerIterator, checkpointable.CheckpointableBase):
|
|||||||
# that there is no more data to iterate over.
|
# that there is no more data to iterate over.
|
||||||
# TODO(b/77291417): Fix
|
# TODO(b/77291417): Fix
|
||||||
with context.execution_mode(context.SYNC):
|
with context.execution_mode(context.SYNC):
|
||||||
if self._buffer_resource_handle is not None:
|
|
||||||
with ops.device(self._device):
|
|
||||||
ret = prefetching_ops.function_buffering_resource_get_next(
|
|
||||||
function_buffer_resource=self._buffer_resource_handle,
|
|
||||||
output_types=self._flat_output_types)
|
|
||||||
return sparse.deserialize_sparse_tensors(
|
|
||||||
nest.pack_sequence_as(self._output_types, ret), self._output_types,
|
|
||||||
self._output_shapes, self._output_classes)
|
|
||||||
else:
|
|
||||||
return super(Iterator, self)._next_internal()
|
return super(Iterator, self)._next_internal()
|
||||||
|
|
||||||
# TODO(shivaniagrawal): Expose checkpointable stateful objects from dataset
|
# TODO(shivaniagrawal): Expose checkpointable stateful objects from dataset
|
||||||
|
@ -499,7 +499,7 @@ class EagerIterator(object):
|
|||||||
"tf.data.Dataset.make_initializable_iterator or "
|
"tf.data.Dataset.make_initializable_iterator or "
|
||||||
"tf.data.Dataset.make_one_shot_iterator for graph construction".
|
"tf.data.Dataset.make_one_shot_iterator for graph construction".
|
||||||
format(type(self)))
|
format(type(self)))
|
||||||
with ops.device("/device:CPU:0"):
|
self._device = context.context().device_name
|
||||||
ds_variant = dataset._as_variant_tensor() # pylint: disable=protected-access
|
ds_variant = dataset._as_variant_tensor() # pylint: disable=protected-access
|
||||||
self._output_classes = dataset.output_classes
|
self._output_classes = dataset.output_classes
|
||||||
self._output_types = dataset.output_types
|
self._output_types = dataset.output_types
|
||||||
@ -508,14 +508,14 @@ class EagerIterator(object):
|
|||||||
sparse.as_dense_types(self._output_types, self._output_classes))
|
sparse.as_dense_types(self._output_types, self._output_classes))
|
||||||
self._flat_output_shapes = nest.flatten(
|
self._flat_output_shapes = nest.flatten(
|
||||||
sparse.as_dense_shapes(self._output_shapes, self._output_classes))
|
sparse.as_dense_shapes(self._output_shapes, self._output_classes))
|
||||||
|
with ops.colocate_with(ds_variant):
|
||||||
self._resource = gen_dataset_ops.anonymous_iterator(
|
self._resource = gen_dataset_ops.anonymous_iterator(
|
||||||
output_types=self._flat_output_types,
|
output_types=self._flat_output_types,
|
||||||
output_shapes=self._flat_output_shapes)
|
output_shapes=self._flat_output_shapes)
|
||||||
gen_dataset_ops.make_iterator(ds_variant, self._resource)
|
gen_dataset_ops.make_iterator(ds_variant, self._resource)
|
||||||
# Delete the resource when this object is deleted
|
# Delete the resource when this object is deleted
|
||||||
self._resource_deleter = resource_variable_ops.EagerResourceDeleter(
|
self._resource_deleter = resource_variable_ops.EagerResourceDeleter(
|
||||||
handle=self._resource, handle_device="/device:CPU:0")
|
handle=self._resource, handle_device=self._device)
|
||||||
self._device = context.context().device_name
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
return self
|
return self
|
||||||
|
Loading…
x
Reference in New Issue
Block a user