Remove "dummy queue" which is never closed. This is causing workers to hang when a training job stops early (e.g., using tuner along with tf.estimator.parameterized_train_and_evaluate or with TFX).

PiperOrigin-RevId: 356855978
Change-Id: Id28441865b4a0bd598a515e2550ef4a23a564d24
This commit is contained in:
A. Unique TensorFlower 2021-02-10 16:37:15 -08:00 committed by TensorFlower Gardener
parent 84f074f68d
commit f987ff6bcd

View File

@ -18,7 +18,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.core.framework import types_pb2
from tensorflow.python.distribute import distribution_strategy_context
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
@ -319,16 +318,6 @@ class SyncReplicasOptimizer(optimizer.Optimizer):
shared_name="sync_token_q"))
self._sync_token_queue = sync_token_queue
# dummy_queue is passed to the queue runner. Don't use the real queues
# because the queue runner doesn't automatically reopen it once it
# closed queues in PS devices.
dummy_queue = (
data_flow_ops.FIFOQueue(1,
types_pb2.DT_INT32,
shapes=(),
name="dummy_queue",
shared_name="dummy_queue"))
with ops.device(global_step.device), ops.name_scope(""):
# Replicas have to wait until they can get a token from the token queue.
with ops.control_dependencies(train_ops):
@ -346,8 +335,8 @@ class SyncReplicasOptimizer(optimizer.Optimizer):
sync_op = self._variable_averages.apply(
self._variables_to_average)
self._chief_queue_runner = queue_runner.QueueRunner(dummy_queue,
[sync_op])
self._chief_queue_runner = queue_runner.QueueRunner(
sync_token_queue, [sync_op])
for accum, dev in self._accumulator_list:
with ops.device(dev):
chief_init_ops.append(