1. If a DataServiceDataset iterator is cancelled, it will now call TryCancel on its outstanding RPCs.
2. As a result, we can reduce the frequency of returning from blocked round-robin requests to check whether the iterator is cancelled. This may avoid delays in GetNext() that could happen if one consumer reads from a round earlier than others, and needs to perform multiple retries with exponential backoff.
3. Because of (2), server shutdown may take up to 1 minute if a round-robin request is blocked waiting for other consumers. To prevent slow unit tests, certain tests store their servers globally so that they are destroyed immediately at process exit without waiting for their outstanding RPCs to finish.
Running data_service_ops_test.py locally, this CL reduces the time from 27 seconds to 20 seconds
PiperOrigin-RevId: 351825888
Change-Id: Iba20a456bdabf251d03b94f090fe760616d3da4d
This enables a new mode of reading from the tf.data service, where consumers read from tasks in a coordinated fashion, instead of the normal first-come first-served.
The main use case for this is coordinated bucketization for synchronous training, where we want to ensure that at each step consumers get batches with elements of similar sizes. This mitigates the inefficiency of some consumers slowly training on large examples while others quickly train on small examples, then block waiting for the slower examples to be processed.
When `consumer_index` and `num_consumers` are specified to `distribute`, each task will enforce a strict round-robin order, where its first element goes to consumer 0, second element to consumer 1, and so on. This requires that all consumers consume the same number of elements.
PiperOrigin-RevId: 351625063
Change-Id: I9b400f55ad61406cb125af8225096e7ff5dc4b0c
The dataset `Dataset.range(0).repeat()` could potentially infinite loop because
`repeat()` will keep trying to instantiate `range(0)` to get the next element.
We avoid this by detecting when the input to `repeat()` is empty, and returning
early from `repeat()`. However, we may return *too* early if the input to
`repeat` has a chance of being nonempty. For example, consider
`Dataset.range(2).shuffle(2).take(1).filter(lambda x: x == 0).repeat()`. It
should produce {0, 0, 0, ...}, but will actually produce a random number of
zeros before reporting end of sequence. This change increases the number of
empty sequences we need to see before giving up. We'll now only exit after
seeing 10000 consecutive empty sequences. Iterating over an empty sequence
takes on the order of microseconds, so we will still exit quickly in cases
where the input is truly empty.
PiperOrigin-RevId: 349596829
Change-Id: I588f8abf6cae3a4bec616cc43085e109687bc86c
Previously using tf.data.experimental.assert_cardinality(tf.data.experimental.INFINITE_CARDINALITY) would cause the assertion to fail as soon as the first dataset element was produced, even if the dataset actually was infinite. After this CL, we will only raise an error if the dataset runs out of elements.
Fixes https://github.com/tensorflow/tensorflow/issues/45894
PiperOrigin-RevId: 349321521
Change-Id: I54804225da55f49cef4fa69e498a239854d16e22
This CL addresses a race condition where parallel map could segfault when intra-op parallelism is disabled.
The race can happen if an iterator is destroyed while there are outstanding map function calls. Normally the destructor will block until all outstanding calls are finished. But before this CL, `CallFunction` might call `done()` too early, decrementing `num_calls_` and allowing the destructor to run before `CallFunction`'s final calls to RecordStop/RecordStart.
PiperOrigin-RevId: 347747695
Change-Id: Iacf790c082dca80ef5800593c9f885bc481d78b6
ignore_errors may cause some elements to be dropped, so we no longer know the exact cardinality.
PiperOrigin-RevId: 344875178
Change-Id: I98beadcf8322d5eb23da14192cfeb83e1303e36a
Previously we would time out if we repeatedly failed to connect to the dispatcher for one hour, or if we failed to connect to any individual worker for one hour. This CL changes the logic to retry indefinitely. This prevents us from raising an error if just one worker can't be contacted.
PiperOrigin-RevId: 343363558
Change-Id: I3ade1d057ad86e5857b1bfca328bbcdc7511f906
If we try to restore into an iterator with a smaller cycle length from the original, it will produce a segmentation fault. This can happen either due to user error, or due to the cycle_length being autotuned.
This CL is a stopgap solution to give a better error message than a segmentation fault. In the long term we aim to support adjusting the cycle_length so that autotuned cycle_length + checkpointing just works.
PiperOrigin-RevId: 342733442
Change-Id: Ie9869224cc1598e74e6eb00397df35e6a1a46859
The issue was some constants were declared but not defined. When optimizations were enabled, the constants would be inlined which is why this wasn't causing issues in opt mode. @timshen91 helped me debug this issue.
This was reproducable by running the following command to build the pip package, then installing the pip package and running `import tensorflow as tf`:
bazel build --config=dbg //tensorflow/tools/pip_package:build_pip_package
The error was:
ImportError: /home/reedwm/venvs/tf_dbg/lib/python3.6/site-packages/tensorflow/python/_pywrap_tensorflow_internal.so: undefined symbol: _ZN10tensorflow4data12experimental19SnapshotDatasetV2Op5kHashE
PiperOrigin-RevId: 341209683
Change-Id: If71c6ca529bd87cd1a589422440454548165e813
This fixes two issues:
1) We would hash AUTO compression type into the snapshot hash. We should instead resolve AUTO before including it in the snapshot hash, so that we can share snapshots based on their actual compression type.
2) Snapshot hashes could previously be determined either before or after dataset graph optimization. This can be a problem if dataset graph optimization changes the graph, since we don't always optimize a dataset before iterating over it (e.g. if the dataset was produced by a `flat_map` or `interleave` function). To address this, we will now always use the hash of the pre-optimized input dataset. This has an additional benefit of avoiding issues with optimizations being potentially non-deterministic.
This issue was originally raised by a user in https://github.com/tensorflow/tensorflow/issues/44278.
PiperOrigin-RevId: 340490555
Change-Id: Iab6fb39a9ff94b7857061adec551d6813ba9b8f9
This CL:
- adds support for collecting aggregate time tf.data iterator spent actively servicing `GetNext` requests (accounting for concurrent requests)
- adds support for collecting the "lifetime" of tf.data iterator, that is time between receiving the first `GetNext` request and servicing the last `GetNext` request
- removes support for collecting time between subsequent calls to IteratorGetNextOp
PiperOrigin-RevId: 340474712
Change-Id: Icdfd35c46623160e9faacf1af69f897af88049f6