Retire agg_small_grads_max_bytes, agg_small_grads_max_group

agg_small_grads_max_bytes and agg_small_grads_max_group aren't effective to public users. They're not exposed in the public API, so they're always their default value (agg_small_grads_max_bytes=0), which are then ignored.

PiperOrigin-RevId: 296096179
Change-Id: Id7397539441a0e34af5c76d994eb08028e289b6d
This commit is contained in:
Ran Chen 2020-02-19 17:31:02 -08:00 committed by TensorFlower Gardener
parent 1e4f7195a8
commit ccfc7fd531
2 changed files with 20 additions and 106 deletions

View File

@ -589,58 +589,11 @@ class _ConcatAndSplitPacker(object):
return aggregated_device_grads
class _AggregateSmallTensorPacker(object):
"""Concatenate small gradient tensors together for reduction."""
def __init__(self,
agg_small_grads_max_bytes=1048576,
agg_small_grads_max_group=16):
"""Initialize the _AggregateSmallTensorPacker object.
Args:
agg_small_grads_max_bytes: largest tensor eligible for aggregation,
in number of bytes.
agg_small_grads_max_group: largest permitted aggregation of small
tensors.
Raises:
ValueError: if `agg_small_grads_max_bytes` or `agg_small_grads_max_group`
is not greater than 0.
"""
if agg_small_grads_max_bytes <= 0 or agg_small_grads_max_group <= 0:
raise ValueError("agg_small_grads_max_bytes and agg_small_grads_max_group"
" should both be greater than zero.")
self.agg_small_grads_max_bytes = agg_small_grads_max_bytes
self.agg_small_grads_max_group = agg_small_grads_max_group
def pack(self, grouped_grads_and_vars):
"""Aggregate small tensors."""
if (self.agg_small_grads_max_bytes > 0 and
self.agg_small_grads_max_group > 0):
device_grads, self.packing = cross_device_utils.pack_small_tensors(
grouped_grads_and_vars,
max_bytes=self.agg_small_grads_max_bytes,
max_group=self.agg_small_grads_max_group)
return device_grads
def unpack(self, summed_device_grad_packs):
"""Reverse the aggregation process."""
return cross_device_utils.unpack_small_tensors(summed_device_grad_packs,
self.packing)
def _pack_tensors(device_grads,
num_packs=0,
agg_small_grads_max_bytes=0,
agg_small_grads_max_group=0):
def _pack_tensors(device_grads, num_packs=0):
"""Pack tensors if specified."""
if num_packs > 0:
tensor_packer = _ConcatAndSplitPacker(num_packs)
device_grad_packs = tensor_packer.pack(device_grads)
elif agg_small_grads_max_bytes > 0 and agg_small_grads_max_group > 0:
tensor_packer = _AggregateSmallTensorPacker(agg_small_grads_max_bytes,
agg_small_grads_max_group)
device_grad_packs = tensor_packer.pack(device_grads)
else:
tensor_packer = None
device_grad_packs = device_grads
@ -657,34 +610,19 @@ def _unpack_tensors(reduced, tensor_packer=None):
class AllReduceCrossDeviceOps(CrossDeviceOps):
"""Reduction using all-reduce."""
def __init__(self,
all_reduce_alg="nccl",
num_packs=1,
agg_small_grads_max_bytes=0,
agg_small_grads_max_group=10):
def __init__(self, all_reduce_alg="nccl", num_packs=1):
"""All-reduce implementation of CrossDeviceOps.
Before performing all-reduce, tensors will be repacked or aggregated for
more efficient cross-device transportation:
1) If `num_packs` is non-zero, pack values into
`num_packs` splits.
2) Otherwise, if `agg_small_grads_max_bytes` > 0 and
`agg_small_grads_max_group` > 0, aggregate values smaller than
`agg_small_grads_max_bytes` into groups with at most
`agg_small_grads_max_group` values.
3) Otherwise, no repacking or grouping will happen.
Before performing all-reduce, tensors will be packed for more efficient
cross-device transportation.
Args:
all_reduce_alg: the all-reduce algorithm to use, currently only "nccl" or
"hierarchical_copy" are supported.
num_packs: see above.
agg_small_grads_max_bytes: see above.
agg_small_grads_max_group: see above.
num_packs: If non-zero, pack values into `num_packs` splits.
"""
self._all_reduce_alg = all_reduce_alg
self._num_packs = num_packs
self._agg_small_grads_max_bytes = agg_small_grads_max_bytes
self._agg_small_grads_max_group = agg_small_grads_max_group
self._simple_cross_replica_ops = ReductionToOneDevice()
super(AllReduceCrossDeviceOps, self).__init__()
@ -724,18 +662,14 @@ class AllReduceCrossDeviceOps(CrossDeviceOps):
def _do_batch_all_reduce(self, reduce_op, dense_values):
"""Run batch all-reduces."""
logging.log_first_n(
logging.INFO, "batch_all_reduce: %d all-reduces with algorithm = %s, "
"num_packs = %d, agg_small_grads_max_bytes = %d and "
"agg_small_grads_max_group = %d" %
(len(dense_values), self._all_reduce_alg, self._num_packs,
self._agg_small_grads_max_bytes, self._agg_small_grads_max_group), 10)
logging.INFO,
"batch_all_reduce: %d all-reduces with algorithm = %s, num_packs = %d" %
(len(dense_values), self._all_reduce_alg, self._num_packs), 10)
destinations = dense_values[0]._devices # pylint: disable=protected-access
grouped = _group_value_by_device(dense_values)
device_grad_packs, tensor_packer = _pack_tensors(
grouped, self._num_packs, self._agg_small_grads_max_bytes,
self._agg_small_grads_max_group)
device_grad_packs, tensor_packer = _pack_tensors(grouped, self._num_packs)
# The actual aggregation of the repacked gradients. Note that they are
# sharded among different aggregation trees. So it is important to strike
@ -839,9 +773,7 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps):
worker_devices,
num_gpus_per_worker,
all_reduce_spec=("pscpu/pscpu", 2, -1),
num_packs=0,
agg_small_grads_max_bytes=0,
agg_small_grads_max_group=10):
num_packs=0):
"""Initialize the all-reduce algorithm.
Args:
@ -868,15 +800,10 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps):
"pscpu/pscpu" algorithm. The third elements should be in increasing
order across tuples and end with -1 which indicates infinity.
num_packs: see AllReduceCrossDeviceOps.
agg_small_grads_max_bytes: see AllReduceCrossDeviceOps.
agg_small_grads_max_group: see AllReduceCrossDeviceOps.
"""
self._worker_devices = worker_devices
self._num_gpus_per_worker = num_gpus_per_worker
super(MultiWorkerAllReduce, self).__init__(
num_packs=num_packs,
agg_small_grads_max_bytes=agg_small_grads_max_bytes,
agg_small_grads_max_group=agg_small_grads_max_group)
super(MultiWorkerAllReduce, self).__init__(num_packs=num_packs)
def validate_and_complete_spec(spec):
"""Validate and complete the all-reduce spec."""
@ -907,12 +834,9 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps):
def _batch_all_reduce(self, reduce_op, per_replica_values):
"""All-reduce algorithm in a batch."""
logging.log_first_n(
logging.INFO,
"Distributed batch_all_reduce: %d all-reduces with "
"allreduce_spec = %r, num_packs = %d, agg_small_grads_max_bytes = %d, "
"and agg_small_grads_max_group = %d" %
(len(per_replica_values), self._all_reduce_spec, self._num_packs,
self._agg_small_grads_max_bytes, self._agg_small_grads_max_group), 10)
logging.INFO, "Distributed batch_all_reduce: %d all-reduces with "
"allreduce_spec = %r, num_packs = %d" %
(len(per_replica_values), self._all_reduce_spec, self._num_packs), 10)
device_grads = _group_value_by_device(per_replica_values)
@ -935,8 +859,7 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps):
spec_tuple.limit, remaining_grads)
if this_grads:
device_grad_packs, tensor_packer = _pack_tensors(
this_grads, self._num_packs, self._agg_small_grads_max_bytes,
self._agg_small_grads_max_group)
this_grads, self._num_packs)
range_agg_grads = cross_device_utils.sum_gradients_all_reduce(
self._worker_devices, device_grad_packs, len(self._worker_devices),
spec_tuple.alg, spec_tuple.shards, range(self._num_gpus_per_worker))

View File

@ -284,19 +284,15 @@ class SingleWorkerCrossDeviceOpsTest(CrossDeviceOpsTestBase):
cross_device_ops=[
combinations.NamedObject(
"AllReduce",
cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 1, 0, 0)),
cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 1)),
combinations.NamedObject(
"AllReduceNoGradientRepacking",
cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 0, 0, 0)),
cross_device_ops_lib.AllReduceCrossDeviceOps("nccl", 0)),
combinations.NamedObject("NcclAllReduce",
cross_device_ops_lib.NcclAllReduce()),
combinations.NamedObject(
"HierarchicalCopy",
cross_device_ops_lib.HierarchicalCopyAllReduce(8)),
combinations.NamedObject(
"HierarchicalCopyAggregateSmallTensors",
cross_device_ops_lib.AllReduceCrossDeviceOps(
"hierarchical_copy", 0, 100, 10))
],
devices=[
["/gpu:0", "/gpu:1"],
@ -397,22 +393,17 @@ class MultiWorkerCrossDeviceOpsTest(multi_worker_test_base.MultiWorkerTestBase,
"MultiWorkerAllReduce",
cross_device_ops_lib.MultiWorkerAllReduce(worker_devices, 2,
("pscpu/pscpu", 2, -1),
0, 0, 0)),
0)),
combinations.NamedObject(
"MultiWorkerAllReducePack",
cross_device_ops_lib.MultiWorkerAllReduce(worker_devices, 2,
("pscpu/pscpu", 2, -1),
1, 0, 0)),
combinations.NamedObject(
"MultiWorkerAllReduceAggregation",
cross_device_ops_lib.MultiWorkerAllReduce(worker_devices, 2,
("pscpu/pscpu", 2, -1),
0, 100, 10)),
1)),
combinations.NamedObject(
"MultiWorkerAllReduceMultipleSpecs",
cross_device_ops_lib.MultiWorkerAllReduce(
worker_devices, 2, [("pscpu/pscpu", 2, 100),
("xring", 2, -1)], 0, 0, 0)),
("xring", 2, -1)], 0)),
],
devices=[
[