diff --git a/tensorflow/python/distribute/cross_device_ops.py b/tensorflow/python/distribute/cross_device_ops.py index 7f6230e9404..ba8f7542712 100644 --- a/tensorflow/python/distribute/cross_device_ops.py +++ b/tensorflow/python/distribute/cross_device_ops.py @@ -589,58 +589,11 @@ class _ConcatAndSplitPacker(object): return aggregated_device_grads -class _AggregateSmallTensorPacker(object): - """Concatenate small gradient tensors together for reduction.""" - - def __init__(self, - agg_small_grads_max_bytes=1048576, - agg_small_grads_max_group=16): - """Initialize the _AggregateSmallTensorPacker object. - - Args: - agg_small_grads_max_bytes: largest tensor eligible for aggregation, - in number of bytes. - agg_small_grads_max_group: largest permitted aggregation of small - tensors. - - Raises: - ValueError: if `agg_small_grads_max_bytes` or `agg_small_grads_max_group` - is not greater than 0. - """ - if agg_small_grads_max_bytes <= 0 or agg_small_grads_max_group <= 0: - raise ValueError("agg_small_grads_max_bytes and agg_small_grads_max_group" - " should both be greater than zero.") - self.agg_small_grads_max_bytes = agg_small_grads_max_bytes - self.agg_small_grads_max_group = agg_small_grads_max_group - - def pack(self, grouped_grads_and_vars): - """Aggregate small tensors.""" - if (self.agg_small_grads_max_bytes > 0 and - self.agg_small_grads_max_group > 0): - device_grads, self.packing = cross_device_utils.pack_small_tensors( - grouped_grads_and_vars, - max_bytes=self.agg_small_grads_max_bytes, - max_group=self.agg_small_grads_max_group) - return device_grads - - def unpack(self, summed_device_grad_packs): - """Reverse the aggregation process.""" - return cross_device_utils.unpack_small_tensors(summed_device_grad_packs, - self.packing) - - -def _pack_tensors(device_grads, - num_packs=0, - agg_small_grads_max_bytes=0, - agg_small_grads_max_group=0): +def _pack_tensors(device_grads, num_packs=0): """Pack tensors if specified.""" if num_packs > 0: tensor_packer = _ConcatAndSplitPacker(num_packs) device_grad_packs = tensor_packer.pack(device_grads) - elif agg_small_grads_max_bytes > 0 and agg_small_grads_max_group > 0: - tensor_packer = _AggregateSmallTensorPacker(agg_small_grads_max_bytes, - agg_small_grads_max_group) - device_grad_packs = tensor_packer.pack(device_grads) else: tensor_packer = None device_grad_packs = device_grads @@ -657,34 +610,19 @@ def _unpack_tensors(reduced, tensor_packer=None): class AllReduceCrossDeviceOps(CrossDeviceOps): """Reduction using all-reduce.""" - def __init__(self, - all_reduce_alg="nccl", - num_packs=1, - agg_small_grads_max_bytes=0, - agg_small_grads_max_group=10): + def __init__(self, all_reduce_alg="nccl", num_packs=1): """All-reduce implementation of CrossDeviceOps. - Before performing all-reduce, tensors will be repacked or aggregated for - more efficient cross-device transportation: - 1) If `num_packs` is non-zero, pack values into - `num_packs` splits. - 2) Otherwise, if `agg_small_grads_max_bytes` > 0 and - `agg_small_grads_max_group` > 0, aggregate values smaller than - `agg_small_grads_max_bytes` into groups with at most - `agg_small_grads_max_group` values. - 3) Otherwise, no repacking or grouping will happen. + Before performing all-reduce, tensors will be packed for more efficient + cross-device transportation. Args: all_reduce_alg: the all-reduce algorithm to use, currently only "nccl" or "hierarchical_copy" are supported. - num_packs: see above. - agg_small_grads_max_bytes: see above. - agg_small_grads_max_group: see above. + num_packs: If non-zero, pack values into `num_packs` splits. """ self._all_reduce_alg = all_reduce_alg self._num_packs = num_packs - self._agg_small_grads_max_bytes = agg_small_grads_max_bytes - self._agg_small_grads_max_group = agg_small_grads_max_group self._simple_cross_replica_ops = ReductionToOneDevice() super(AllReduceCrossDeviceOps, self).__init__() @@ -724,18 +662,14 @@ class AllReduceCrossDeviceOps(CrossDeviceOps): def _do_batch_all_reduce(self, reduce_op, dense_values): """Run batch all-reduces.""" logging.log_first_n( - logging.INFO, "batch_all_reduce: %d all-reduces with algorithm = %s, " - "num_packs = %d, agg_small_grads_max_bytes = %d and " - "agg_small_grads_max_group = %d" % - (len(dense_values), self._all_reduce_alg, self._num_packs, - self._agg_small_grads_max_bytes, self._agg_small_grads_max_group), 10) + logging.INFO, + "batch_all_reduce: %d all-reduces with algorithm = %s, num_packs = %d" % + (len(dense_values), self._all_reduce_alg, self._num_packs), 10) destinations = dense_values[0]._devices # pylint: disable=protected-access grouped = _group_value_by_device(dense_values) - device_grad_packs, tensor_packer = _pack_tensors( - grouped, self._num_packs, self._agg_small_grads_max_bytes, - self._agg_small_grads_max_group) + device_grad_packs, tensor_packer = _pack_tensors(grouped, self._num_packs) # The actual aggregation of the repacked gradients. Note that they are # sharded among different aggregation trees. So it is important to strike @@ -839,9 +773,7 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps): worker_devices, num_gpus_per_worker, all_reduce_spec=("pscpu/pscpu", 2, -1), - num_packs=0, - agg_small_grads_max_bytes=0, - agg_small_grads_max_group=10): + num_packs=0): """Initialize the all-reduce algorithm. Args: @@ -868,15 +800,10 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps): "pscpu/pscpu" algorithm. The third elements should be in increasing order across tuples and end with -1 which indicates infinity. num_packs: see AllReduceCrossDeviceOps. - agg_small_grads_max_bytes: see AllReduceCrossDeviceOps. - agg_small_grads_max_group: see AllReduceCrossDeviceOps. """ self._worker_devices = worker_devices self._num_gpus_per_worker = num_gpus_per_worker - super(MultiWorkerAllReduce, self).__init__( - num_packs=num_packs, - agg_small_grads_max_bytes=agg_small_grads_max_bytes, - agg_small_grads_max_group=agg_small_grads_max_group) + super(MultiWorkerAllReduce, self).__init__(num_packs=num_packs) def validate_and_complete_spec(spec): """Validate and complete the all-reduce spec.""" @@ -907,12 +834,9 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps): def _batch_all_reduce(self, reduce_op, per_replica_values): """All-reduce algorithm in a batch.""" logging.log_first_n( - logging.INFO, - "Distributed batch_all_reduce: %d all-reduces with " - "allreduce_spec = %r, num_packs = %d, agg_small_grads_max_bytes = %d, " - "and agg_small_grads_max_group = %d" % - (len(per_replica_values), self._all_reduce_spec, self._num_packs, - self._agg_small_grads_max_bytes, self._agg_small_grads_max_group), 10) + logging.INFO, "Distributed batch_all_reduce: %d all-reduces with " + "allreduce_spec = %r, num_packs = %d" % + (len(per_replica_values), self._all_reduce_spec, self._num_packs), 10) device_grads = _group_value_by_device(per_replica_values) @@ -935,8 +859,7 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps): spec_tuple.limit, remaining_grads) if this_grads: device_grad_packs, tensor_packer = _pack_tensors( - this_grads, self._num_packs, self._agg_small_grads_max_bytes, - self._agg_small_grads_max_group) + this_grads, self._num_packs) range_agg_grads = cross_device_utils.sum_gradients_all_reduce( self._worker_devices, device_grad_packs, len(self._worker_devices), spec_tuple.alg, spec_tuple.shards, range(self._num_gpus_per_worker)) diff --git a/tensorflow/python/distribute/cross_device_ops_test.py b/tensorflow/python/distribute/cross_device_ops_test.py index b60809fd3b5..c91ec38bfd1 100644 --- a/tensorflow/python/distribute/cross_device_ops_test.py +++ b/tensorflow/python/distribute/cross_device_ops_test.py @@ -284,19 +284,15 @@ class SingleWorkerCrossDeviceOpsTest(CrossDeviceOpsTestBase): cross_device_ops=[ combinations.NamedObject( "AllReduce", - cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 1, 0, 0)), + cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 1)), combinations.NamedObject( "AllReduceNoGradientRepacking", - cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 0, 0, 0)), + cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 0)), combinations.NamedObject("NcclAllReduce", cross_device_ops_lib.NcclAllReduce()), combinations.NamedObject( "HierarchicalCopy", cross_device_ops_lib.HierarchicalCopyAllReduce(8)), - combinations.NamedObject( - "HierarchicalCopyAggregateSmallTensors", - cross_device_ops_lib.AllReduceCrossDeviceOps( - "hierarchical_copy", 0, 100, 10)) ], devices=[ ["/gpu:0", "/gpu:1"], @@ -397,22 +393,17 @@ class MultiWorkerCrossDeviceOpsTest(multi_worker_test_base.MultiWorkerTestBase, "MultiWorkerAllReduce", cross_device_ops_lib.MultiWorkerAllReduce(worker_devices, 2, ("pscpu/pscpu", 2, -1), - 0, 0, 0)), + 0)), combinations.NamedObject( "MultiWorkerAllReducePack", cross_device_ops_lib.MultiWorkerAllReduce(worker_devices, 2, ("pscpu/pscpu", 2, -1), - 1, 0, 0)), - combinations.NamedObject( - "MultiWorkerAllReduceAggregation", - cross_device_ops_lib.MultiWorkerAllReduce(worker_devices, 2, - ("pscpu/pscpu", 2, -1), - 0, 100, 10)), + 1)), combinations.NamedObject( "MultiWorkerAllReduceMultipleSpecs", cross_device_ops_lib.MultiWorkerAllReduce( worker_devices, 2, [("pscpu/pscpu", 2, 100), - ("xring", 2, -1)], 0, 0, 0)), + ("xring", 2, -1)], 0)), ], devices=[ [