STT-tensorflow/tensorflow/python/distribute/cross_device_utils_test.py
Ran Chen fa08cfd489 Add an experimental_hints to batch all reduce
This contains all performance hints to the API. Currently there's only bytes_per_pack, which splits large batches into multiple packs allows overlapping communication and computation.

Currently we can only pack if all Tensors in the batch have known shapes.

PiperOrigin-RevId: 297269428
Change-Id: Iaf7d7d3adf7c6cad59aa6079fbcd36b31e92c4b5
2020-02-25 20:32:44 -08:00

223 lines
8.5 KiB
Python

# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for cross_device_utils."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl.testing import parameterized
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import cross_device_utils
from tensorflow.python.distribute import device_util
from tensorflow.python.distribute import values as value_lib
from tensorflow.python.eager import test
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import test_util
from tensorflow.python.keras.engine import input_layer
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
class IndexedSlicesUtilsTest(test.TestCase, parameterized.TestCase):
def _assert_values_equal(self, left, right):
self.assertAllEqual(
self.evaluate(ops.convert_to_tensor(left)),
self.evaluate(ops.convert_to_tensor(right)))
@test_util.run_in_graph_and_eager_modes
def testAggregateTensors(self):
t0 = constant_op.constant([[1., 2.], [0, 0], [3., 4.]])
t1 = constant_op.constant([[0., 0.], [5, 6], [7., 8.]])
total = constant_op.constant([[1., 2.], [5, 6], [10., 12.]])
result = cross_device_utils.aggregate_tensors_or_indexed_slices([t0, t1])
self._assert_values_equal(total, result)
@test_util.run_in_graph_and_eager_modes
def testAggregateIndexedSlices(self):
t0 = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
t1 = math_ops._as_indexed_slices(
constant_op.constant([[0., 0.], [5, 6], [7., 8.]]))
total = constant_op.constant([[1., 2.], [5, 6], [10., 12.]])
result = cross_device_utils.aggregate_tensors_or_indexed_slices([t0, t1])
self.assertIsInstance(result, ops.IndexedSlices)
self._assert_values_equal(total, result)
@test_util.run_in_graph_and_eager_modes
def testDivideTensor(self):
t = constant_op.constant([[1., 2.], [0, 0], [3., 4.]])
n = 2
expected = constant_op.constant([[0.5, 1.], [0, 0], [1.5, 2.]])
result = cross_device_utils.divide_by_n_tensors_or_indexed_slices(t, n)
self._assert_values_equal(expected, result)
@test_util.run_in_graph_and_eager_modes
def testDivideIndexedSlices(self):
t = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
n = 2
expected = constant_op.constant([[0.5, 1.], [0, 0], [1.5, 2.]])
result = cross_device_utils.divide_by_n_tensors_or_indexed_slices(t, n)
self.assertIsInstance(result, ops.IndexedSlices)
self._assert_values_equal(expected, result)
@test_util.run_in_graph_and_eager_modes
def testIsIndexedSlices(self):
t = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
self.assertTrue(cross_device_utils.contains_indexed_slices(t))
@test_util.run_in_graph_and_eager_modes
def testContainsIndexedSlices_List(self):
t0 = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
t1 = math_ops._as_indexed_slices(
constant_op.constant([[0., 0.], [5, 6], [7., 8.]]))
self.assertTrue(cross_device_utils.contains_indexed_slices([t0, t1]))
@test_util.run_in_graph_and_eager_modes
def testContainsIndexedSlices_Tuple(self):
t0 = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
t1 = math_ops._as_indexed_slices(
constant_op.constant([[0., 0.], [5, 6], [7., 8.]]))
self.assertTrue(cross_device_utils.contains_indexed_slices((t0, t1)))
@test_util.run_in_graph_and_eager_modes
def testContainsIndexedSlices_PerReplica(self):
t0 = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
t1 = math_ops._as_indexed_slices(
constant_op.constant([[0., 0.], [5, 6], [7., 8.]]))
per_replica = value_lib.PerReplica((t0, t1))
self.assertTrue(cross_device_utils.contains_indexed_slices(per_replica))
@combinations.generate(combinations.combine(
mode=["graph", "eager"],
required_gpus=1))
def testCopyTensor(self):
with ops.device("/cpu:0"):
t = constant_op.constant([[1., 2.], [0, 0], [3., 4.]])
destination = "/gpu:0"
result = cross_device_utils.copy_tensor_or_indexed_slices_to_device(
t, destination)
self._assert_values_equal(t, result)
self.assertEqual(device_util.resolve(destination),
device_util.resolve(result.device))
@combinations.generate(combinations.combine(
mode=["graph", "eager"],
required_gpus=1))
def testCopyIndexedSlices(self):
with ops.device("/cpu:0"):
t = math_ops._as_indexed_slices(
constant_op.constant([[1., 2.], [0, 0], [3., 4.]]))
destination = "/gpu:0"
result = cross_device_utils.copy_tensor_or_indexed_slices_to_device(
t, destination)
self.assertIsInstance(result, ops.IndexedSlices)
self._assert_values_equal(t, result)
self.assertEqual(
device_util.resolve(destination), device_util.resolve(result.device))
class PackBySizeTest(test.TestCase):
def assertShape(self, per_replica, shape):
for v in per_replica._values: # pylint: disable=protected-access
self.assertEqual(v.shape, shape)
def testPreferLargerPack(self):
# Each packs except the last one should be equal or larger than
# bytes_per_pack.
values = [
# size = 2 * 4 * 4 * 4 = 128
array_ops.ones([2, 4, 4], dtype=dtypes.float32),
# size = 8 * 4 = 32
array_ops.ones([8], dtype=dtypes.int32),
# size = 10 * 10 * 8 = 800
array_ops.ones([10, 10], dtype=dtypes.int64),
# size = 1 * 4 = 4
array_ops.ones([1], dtype=dtypes.int32),
]
per_replica_values = [value_lib.PerReplica([v, v]) for v in values]
packs = cross_device_utils.pack_by_size(
per_replica_values, bytes_per_pack=200)
self.assertLen(packs, 2)
self.assertLen(packs[0], 3)
self.assertShape(packs[0][0], [2, 4, 4])
self.assertShape(packs[0][1], [8])
self.assertShape(packs[0][2], [10, 10])
self.assertLen(packs[1], 1)
self.assertShape(packs[1][0], [1])
def testZeroBytesPerPack(self):
values = [
array_ops.ones([1], dtype=dtypes.float32),
array_ops.ones([2], dtype=dtypes.float32),
]
per_replica_values = [value_lib.PerReplica([v, v]) for v in values]
packs = cross_device_utils.pack_by_size(
per_replica_values, bytes_per_pack=0)
self.assertLen(packs, 1)
self.assertLen(packs[0], 2)
self.assertShape(packs[0][0], [1])
self.assertShape(packs[0][1], [2])
def testUnknownShape(self):
per_replica_values = [
value_lib.PerReplica([
array_ops.ones([10, 10], dtype=dtypes.float32),
array_ops.ones([10, 10], dtype=dtypes.float32),
]),
value_lib.PerReplica([
array_ops.ones([10, 10], dtype=dtypes.float32),
input_layer.Input(
shape=(10), batch_size=None, dtype=dtypes.float32),
]),
]
packs = cross_device_utils.pack_by_size(
per_replica_values, bytes_per_pack=1)
self.assertLen(packs, 1)
self.assertEqual(packs[0], per_replica_values)
def testInconsistentShape(self):
per_replica_values = [
value_lib.PerReplica([
array_ops.ones([10, 10], dtype=dtypes.float32),
array_ops.ones([10, 10], dtype=dtypes.float32),
]),
value_lib.PerReplica([
array_ops.ones([10, 10], dtype=dtypes.float32),
input_layer.Input(
shape=(10), batch_size=None, dtype=dtypes.float32),
]),
]
packs = cross_device_utils.pack_by_size(
per_replica_values, bytes_per_pack=1)
self.assertLen(packs, 1)
self.assertEqual(packs[0], per_replica_values)
if __name__ == "__main__":
test.main()