Clean up unused all reduce code

PiperOrigin-RevId: 329068527
Change-Id: I4aa13d38e03cf02c6c65ddab3d83b10f523053b4
This commit is contained in:
Ran Chen 2020-08-28 23:53:33 -07:00 committed by TensorFlower Gardener
parent a61f55115c
commit d98960155a
3 changed files with 3 additions and 367 deletions

View File

@ -13,6 +13,7 @@ exports_files(["LICENSE"])
py_library(
name = "distribute_test_lib_pip",
deps = [
":all_reduce",
":combinations",
":multi_worker_test_base",
":single_loss_example",
@ -89,7 +90,6 @@ py_library(
srcs = ["cross_device_utils.py"],
srcs_version = "PY2AND3",
deps = [
":all_reduce",
":values",
"//tensorflow/python:array_ops",
"//tensorflow/python:collective_ops",
@ -141,6 +141,7 @@ py_library(
],
srcs_version = "PY2AND3",
deps = [
":all_reduce",
":cross_device_ops",
":distribute_lib",
":mirrored_strategy",

View File

@ -18,16 +18,13 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections as pycoll
import copy
import threading
from tensorflow.python.distribute import all_reduce
from tensorflow.python.distribute import values as value_lib
from tensorflow.python.eager import backprop
from tensorflow.python.eager import context
from tensorflow.python.framework import device as pydev
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import collective_ops
@ -171,65 +168,6 @@ def aggregate_single_gradient_using_copy(grad_and_vars, use_mean,
return (grad, v), None
def group_device_names(devices, group_size):
"""Group device names into groups of group_size.
Args:
devices: a list of canonical device strings.
group_size: integer which is equal to or greater than 1.
Returns:
list of lists of devices, where each inner list is group_size long,
and each device appears at least once in an inner list. If
len(devices) % group_size == 0 then each device will appear exactly once.
Raises:
ValueError: if group_size > len(devices)
"""
num_devices = len(devices)
if group_size > num_devices:
raise ValueError(
'only %d devices, but group_size=%d' % (num_devices, group_size))
num_groups = (
num_devices // group_size + (1 if (num_devices % group_size != 0) else 0))
groups = [[] for i in range(num_groups)]
for i in range(num_groups * group_size):
groups[i % num_groups].append(devices[i % num_devices])
return groups
def split_grads_by_size(threshold_size, device_grads):
"""Break gradients into two sets according to tensor size.
Args:
threshold_size: int size cutoff for small vs large tensor.
device_grads: List of lists of (gradient, variable) tuples. The outer
list is over devices. The inner list is over individual gradients.
Returns:
small_grads: Subset of device_grads where shape is <= threshold_size
elements.
large_grads: Subset of device_grads where shape is > threshold_size
elements.
"""
small_grads = []
large_grads = []
for dl in device_grads:
small_dl = []
large_dl = []
for (g, v) in dl:
tensor_size = g.get_shape().num_elements()
if tensor_size <= threshold_size:
small_dl.append([g, v])
else:
large_dl.append([g, v])
if small_dl:
small_grads.append(small_dl)
if large_dl:
large_grads.append(large_dl)
return small_grads, large_grads
# TODO(yuefengz): use random key starts to avoid reusing keys?
class CollectiveKeys(object):
"""Class that manages collective keys.
@ -580,272 +518,6 @@ def build_collective_gather_indexed_slices(input_slices_list,
return out_slices_list
def sum_grad_and_var_all_reduce(grad_and_vars,
num_workers,
alg,
gpu_indices,
aux_devices=None,
num_shards=1):
"""Apply all-reduce algorithm over specified gradient tensors."""
with ops.name_scope('allreduce'):
# Note that each grad_and_vars looks like the following:
# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
scaled_grads = [g for g, _ in grad_and_vars]
if alg == 'nccl':
summed_grads = nccl_ops.all_sum(scaled_grads)
elif alg == 'xring':
summed_grads = all_reduce.build_ring_all_reduce(
scaled_grads, num_workers, num_shards, gpu_indices, math_ops.add)
elif alg == 'nccl/xring':
summed_grads = all_reduce.build_nccl_then_ring(scaled_grads, num_shards,
math_ops.add)
elif alg == 'nccl/rechd':
summed_grads = all_reduce.build_nccl_then_recursive_hd(
scaled_grads, math_ops.add)
elif alg == 'nccl/pscpu':
summed_grads = all_reduce.build_nccl_then_shuffle(
scaled_grads, aux_devices, math_ops.add, math_ops.add_n)
elif alg == 'pscpu/pscpu':
second_gather_devices = aux_devices[:num_shards]
summed_grads = all_reduce.build_shuffle_then_shuffle(
scaled_grads, aux_devices, second_gather_devices, math_ops.add_n)
elif alg in ['pscpu', 'psgpu']:
summed_grads = all_reduce.build_shuffle_all_reduce(
scaled_grads, aux_devices, math_ops.add_n)
else:
raise ValueError('unsupported all_reduce alg: ', alg)
result = []
for (_, v), g in zip(grad_and_vars, summed_grads):
result.append([g, v])
return result
def sum_gradients_all_reduce(dev_prefixes, replica_grads, num_workers, alg,
num_shards, gpu_indices):
"""Apply all-reduce algorithm over specified gradient tensors.
Args:
dev_prefixes: list of prefix strings to use to generate PS device names.
replica_grads: the gradients to reduce.
num_workers: number of worker processes across entire job.
alg: the all-reduce algorithm to apply.
num_shards: alg-specific sharding factor.
gpu_indices: indices of local GPUs in order usable for ring-reduce.
Returns:
list of reduced tensors
"""
alg_contains_shuffle = any(n in alg for n in ['pscpu', 'psgpu'])
is_hierarchical = '/' in alg
if 'pscpu' in alg:
aux_devices = [prefix + '/cpu:0' for prefix in dev_prefixes]
elif 'psgpu' in alg:
aux_devices = [
prefix + '/gpu:%d' % i
for i in range(len(gpu_indices))
for prefix in dev_prefixes
]
else:
aux_devices = ['/job:localhost/cpu:0']
# Auxiliary devices for hierarchical all-reduces.
aux_device_groups = group_device_names(
aux_devices, num_shards if alg_contains_shuffle else 1)
group_index = 0
reduced_gv_list = []
for grad_and_vars in zip(*replica_grads):
reduced_gv_list.append(
sum_grad_and_var_all_reduce(
grad_and_vars, num_workers, alg, gpu_indices, aux_devices
if is_hierarchical else aux_device_groups[group_index], num_shards))
group_index = (group_index + 1) % len(aux_device_groups)
new_replica_grads = [list(x) for x in zip(*reduced_gv_list)]
return new_replica_grads
def extract_ranges(index_list, range_size_limit=32):
"""Extract consecutive ranges and singles from index_list.
Args:
index_list: List of monotone increasing non-negative integers.
range_size_limit: Largest size range to return. If a larger
consecutive range exists, it will be returned as multiple
ranges.
Returns:
(ranges, singles) where ranges is a list of [first, last] pairs of
consecutive elements in index_list, and singles is all of the
other elements, in original order.
"""
if not index_list:
return [], []
first = index_list[0]
last = first
ranges = []
singles = []
for i in index_list[1:]:
if i == last + 1 and (last - first) <= range_size_limit:
last = i
else:
if last > first:
ranges.append([first, last])
else:
singles.append(first)
first = i
last = i
if last > first:
ranges.append([first, last])
else:
singles.append(first)
return ranges, singles
GradPackTuple = pycoll.namedtuple('GradPackTuple', 'indices vars shapes')
def pack_range(key, packing, grad_vars, rng):
"""Form the concatenation of a specified range of gradient tensors.
Args:
key: Value under which to store meta-data in packing that will be used
later to restore the grad_var list structure.
packing: Dict holding data describing packed ranges of small tensors.
grad_vars: List of (grad, var) pairs for one replica.
rng: A pair of integers giving the first, last indices of a consecutive
range of tensors to be packed.
Returns:
A tensor that is the concatenation of all the specified small tensors.
"""
to_pack = grad_vars[rng[0]:rng[1] + 1]
members = []
variables = []
restore_shapes = []
with ops.name_scope('pack'):
for g, v in to_pack:
variables.append(v)
restore_shapes.append(g.shape)
with ops.device(g.device):
members.append(array_ops.reshape(g, [-1]))
packing[key] = GradPackTuple(
indices=range(rng[0], rng[1] + 1),
vars=variables,
shapes=restore_shapes)
with ops.device(members[0].device):
return array_ops.concat(members, 0)
def unpack_grad_tuple(gv, gpt):
"""Unpack a previously packed collection of gradient tensors.
Args:
gv: A (grad, var) pair to be unpacked.
gpt: A GradPackTuple describing the packing operation that produced gv.
Returns:
A list of (grad, var) pairs corresponding to the values that were
originally packed into gv, maybe following subsequent operations like
reduction.
"""
elt_widths = [x.num_elements() for x in gpt.shapes]
with ops.device(gv[0].device):
with ops.name_scope('unpack'):
splits = array_ops.split(gv[0], elt_widths)
unpacked_gv = []
for idx, s in enumerate(splits):
unpacked_gv.append((array_ops.reshape(s, gpt.shapes[idx]),
gpt.vars[idx]))
return unpacked_gv
def pack_small_tensors(replica_grads, max_bytes=0, max_group=0):
"""Concatenate small gradient tensors together for reduction.
Args:
replica_grads: List of lists of (gradient, variable) tuples.
max_bytes: Int giving max number of bytes in a tensor that
may be considered small.
max_group: Int giving max number of small tensors that may be
concatenated into one new tensor.
Returns:
new_replica_grads, packing where new_replica_grads is identical to
replica_grads except that all feasible small_tensors have been removed
from their places and concatenated into larger tensors that are
now in the front of the list for each replica, and packing contains
the data necessary to restore the replica_grads structure.
Look through the first replica for gradients of the same type (float),
and small size, that are all sequential. For each such group,
replace by a new tensor that is a flattened concatenation. Note
that the corresponding variable will be absent, which doesn't matter
because it isn't used during all-reduce.
Requires:
Every gv_list in replicas must have isomorphic structure including identical
tensor sizes and types.
"""
small_indices = []
large_indices = []
for idx, (g, _) in enumerate(replica_grads[0]):
if g.dtype == dtypes.float32 and (4 * g.shape.num_elements()) <= max_bytes:
small_indices.append(idx)
else:
large_indices.append(idx)
small_ranges, small_singles = extract_ranges(
small_indices, range_size_limit=max_group)
large_indices = sorted(large_indices + small_singles)
num_gv = len(replica_grads[0])
packing = {}
if small_ranges:
new_replica_grads = []
for dev_idx, gv_list in enumerate(replica_grads):
assert len(gv_list) == num_gv
new_gv_list = []
for r in small_ranges:
key = '%d:%d' % (dev_idx, len(new_gv_list))
new_gv_list.append((pack_range(key, packing, gv_list, r),
'packing_var_placeholder'))
for i in large_indices:
new_gv_list.append(gv_list[i])
new_replica_grads.append(new_gv_list)
return new_replica_grads, packing
else:
return replica_grads, None
def unpack_small_tensors(replica_grads, packing):
"""Undo the structure alterations to replica_grads done by pack_small_tensors.
Args:
replica_grads: List of List of (grad, var) tuples.
packing: A dict generated by pack_small_tensors describing the changes
it made to replica_grads.
Returns:
new_replica_grads: identical to replica_grads except that concatenations
of small tensors have been split apart and returned to their original
positions, paired with their original variables.
"""
if not packing:
return replica_grads
new_replica_grads = []
num_devices = len(replica_grads)
num_packed = len(packing.keys()) // num_devices
for dev_idx, gv_list in enumerate(replica_grads):
gv_list = list(gv_list)
new_gv_list = gv_list[num_packed:]
for i in range(num_packed):
k = '%d:%d' % (dev_idx, i)
gpt = packing[k]
gv = unpack_grad_tuple(gv_list[i], gpt)
for gi, idx in enumerate(gpt.indices):
assert idx == gpt.indices[gi]
new_gv_list.insert(idx, gv[gi])
new_replica_grads.append(new_gv_list)
return new_replica_grads
def aggregate_tensors_or_indexed_slices(values, accumulation_fn=math_ops.add_n):
"""Aggregate tensors using `accumulation_fn` and IndexedSlices via concat."""
if any(isinstance(v, ops.IndexedSlices) for v in values):
@ -875,18 +547,6 @@ def copy_tensor_or_indexed_slices_to_device(value, device):
return result
def contains_indexed_slices(value):
"""Check whether the value is `IndexedSlices` or contains `IndexedSlices`."""
if isinstance(value, ops.IndexedSlices):
return True
elif isinstance(value, (list, tuple)) and value:
return any(contains_indexed_slices(v) for v in value)
elif isinstance(value, value_lib.DistributedValues):
return contains_indexed_slices(value.values)
else:
return False
def is_indexed_slices(value):
if isinstance(value, ops.IndexedSlices):
return True

View File

@ -81,32 +81,7 @@ class IndexedSlicesUtilsTest(test.TestCase, parameterized.TestCase):
def testIsIndexedSlices(self):
t = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
self.assertTrue(cross_device_utils.contains_indexed_slices(t))
@test_util.run_in_graph_and_eager_modes
def testContainsIndexedSlices_List(self):
t0 = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
t1 = math_ops._as_indexed_slices(
constant_op.constant([[0., 0.], [5, 6], [7., 8.]]))
self.assertTrue(cross_device_utils.contains_indexed_slices([t0, t1]))
@test_util.run_in_graph_and_eager_modes
def testContainsIndexedSlices_Tuple(self):
t0 = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
t1 = math_ops._as_indexed_slices(
constant_op.constant([[0., 0.], [5, 6], [7., 8.]]))
self.assertTrue(cross_device_utils.contains_indexed_slices((t0, t1)))
@test_util.run_in_graph_and_eager_modes
def testContainsIndexedSlices_PerReplica(self):
t0 = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
t1 = math_ops._as_indexed_slices(
constant_op.constant([[0., 0.], [5, 6], [7., 8.]]))
per_replica = value_lib.PerReplica((t0, t1))
self.assertTrue(cross_device_utils.contains_indexed_slices(per_replica))
self.assertTrue(cross_device_utils.is_indexed_slices(t))
@combinations.generate(combinations.combine(
mode=["graph", "eager"],