288 lines
11 KiB
Python
288 lines
11 KiB
Python
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""Tests for common methods in strategy classes."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
from absl.testing import parameterized
|
|
|
|
from tensorflow.python.data.ops import dataset_ops
|
|
from tensorflow.python.distribute import combinations
|
|
from tensorflow.python.distribute import distribution_strategy_context as ds_context
|
|
from tensorflow.python.distribute import multi_worker_test_base
|
|
from tensorflow.python.distribute import reduce_util
|
|
from tensorflow.python.distribute import strategy_combinations
|
|
from tensorflow.python.distribute import strategy_test_lib
|
|
from tensorflow.python.distribute import test_util
|
|
from tensorflow.python.distribute import tpu_strategy
|
|
from tensorflow.python.distribute.collective_all_reduce_strategy import CollectiveAllReduceStrategy
|
|
from tensorflow.python.eager import def_function
|
|
from tensorflow.python.framework import constant_op
|
|
from tensorflow.python.framework import dtypes
|
|
from tensorflow.python.framework import ops
|
|
from tensorflow.python.ops import array_ops
|
|
from tensorflow.python.ops import math_ops
|
|
from tensorflow.python.ops import variables
|
|
from tensorflow.python.platform import test
|
|
from tensorflow.python.util import nest
|
|
|
|
|
|
@combinations.generate(
|
|
combinations.combine(
|
|
strategy=[
|
|
strategy_combinations.multi_worker_mirrored_2x1_cpu,
|
|
strategy_combinations.multi_worker_mirrored_2x1_gpu,
|
|
] + strategy_combinations.all_strategies,
|
|
mode=['eager']))
|
|
class StrategyTest(test.TestCase, parameterized.TestCase):
|
|
|
|
def testCaptureReplicaId(self, strategy):
|
|
m = {}
|
|
|
|
@def_function.function
|
|
def f():
|
|
return ds_context.get_replica_context().replica_id_in_sync_group
|
|
|
|
@def_function.function
|
|
def g():
|
|
# Make g() a stateful function so it's traced twice.
|
|
if m.get('v', None) is None:
|
|
m['v'] = variables.Variable(0.)
|
|
return strategy.run(f)
|
|
|
|
g()
|
|
|
|
|
|
@combinations.generate(
|
|
combinations.combine(
|
|
strategy=[
|
|
strategy_combinations.multi_worker_mirrored_2x1_cpu,
|
|
strategy_combinations.multi_worker_mirrored_2x1_gpu,
|
|
] + strategy_combinations.all_strategies,
|
|
mode=['eager']))
|
|
class ReduceTest(test.TestCase, parameterized.TestCase):
|
|
|
|
def testBasic(self, strategy):
|
|
per_replica_value = strategy.experimental_distribute_values_from_function(
|
|
lambda _: array_ops.ones((), dtypes.float32))
|
|
|
|
def fn_eager():
|
|
|
|
return strategy.reduce(
|
|
reduce_util.ReduceOp.SUM, value=per_replica_value, axis=None)
|
|
|
|
fn_graph = def_function.function(fn_eager)
|
|
# Run reduce under the strategy scope to explicitly enter
|
|
# strategy default_device scope.
|
|
with strategy.scope():
|
|
self.assertEqual(fn_eager().numpy(), 1.0 * strategy.num_replicas_in_sync)
|
|
self.assertEqual(fn_graph().numpy(), 1.0 * strategy.num_replicas_in_sync)
|
|
|
|
# Run reduce without a strategy scope to implicitly enter
|
|
# strategy default_device scope.
|
|
self.assertEqual(fn_eager().numpy(), 1.0 * strategy.num_replicas_in_sync)
|
|
self.assertEqual(fn_graph().numpy(), 1.0 * strategy.num_replicas_in_sync)
|
|
|
|
def testAxis(self, strategy):
|
|
|
|
@def_function.function
|
|
def fn():
|
|
return constant_op.constant([1., 2.])
|
|
|
|
x = strategy.run(fn)
|
|
|
|
x_m = strategy.reduce(reduce_util.ReduceOp.MEAN, x, axis=0)
|
|
self.assertEqual(1.5, x_m)
|
|
x_s = strategy.reduce(reduce_util.ReduceOp.SUM, x, axis=0)
|
|
self.assertEqual(3 * strategy.num_replicas_in_sync, x_s)
|
|
|
|
|
|
def _make_indexed_slices(values, indices, dense_shape):
|
|
tensor = ops.IndexedSlices(
|
|
values=constant_op.constant(values),
|
|
indices=constant_op.constant(indices),
|
|
dense_shape=constant_op.constant(dense_shape))
|
|
return tensor
|
|
|
|
|
|
def _get_num_replicas_per_client(strategy):
|
|
if isinstance(strategy, CollectiveAllReduceStrategy):
|
|
resolver = strategy.cluster_resolver
|
|
return max(nest.flatten(resolver.num_accelerators())[0], 1)
|
|
else:
|
|
return strategy.num_replicas_in_sync
|
|
|
|
|
|
def _is_tpu_strategy(strategy):
|
|
return isinstance(strategy,
|
|
(tpu_strategy.TPUStrategy, tpu_strategy.TPUStrategyV1,
|
|
tpu_strategy.TPUStrategyV2))
|
|
|
|
|
|
@combinations.generate(
|
|
combinations.combine(
|
|
strategy=[
|
|
strategy_combinations.multi_worker_mirrored_2x1_cpu,
|
|
strategy_combinations.multi_worker_mirrored_2x1_gpu,
|
|
],
|
|
mode=['eager']))
|
|
class DistributedCollectiveAllReduceStrategyTest(
|
|
strategy_test_lib.DistributionTestBase,
|
|
parameterized.TestCase):
|
|
|
|
def testDatasetFromFunction(self, strategy):
|
|
def dataset_fn(input_context):
|
|
global_batch_size = 10
|
|
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
|
|
d = dataset_ops.DatasetV2.range(100).repeat().batch(batch_size)
|
|
return d.shard(input_context.num_input_pipelines,
|
|
input_context.input_pipeline_id)
|
|
|
|
expected_sum_on_workers = {'chief': 10, 'worker': 35}
|
|
input_iterator = iter(
|
|
strategy.distribute_datasets_from_function(dataset_fn))
|
|
|
|
@def_function.function
|
|
def run(iterator):
|
|
return strategy.experimental_local_results(iterator.get_next())
|
|
|
|
result = run(input_iterator)
|
|
sum_value = math_ops.reduce_sum(result)
|
|
self.assertEqual(
|
|
sum_value.numpy(),
|
|
expected_sum_on_workers[multi_worker_test_base.get_task_type()])
|
|
|
|
def testSimpleInputFromDatasetLastPartialBatch(self, strategy):
|
|
global_batch_size = 8
|
|
dataset = dataset_ops.DatasetV2.range(14).batch(
|
|
global_batch_size, drop_remainder=False)
|
|
input_iterator = iter(strategy.experimental_distribute_dataset(dataset))
|
|
|
|
@def_function.function
|
|
def run(input_iterator):
|
|
return strategy.run(lambda x: x, args=(next(input_iterator),))
|
|
|
|
# Let the complete batch go.
|
|
run(input_iterator)
|
|
|
|
# `result` is an incomplete batch
|
|
result = run(input_iterator)
|
|
expected_data_on_workers = {'chief': [8, 9, 10], 'worker': [11, 12, 13]}
|
|
self.assertAllEqual(
|
|
expected_data_on_workers[multi_worker_test_base.get_task_type()],
|
|
result.numpy(),
|
|
)
|
|
|
|
def testSimpleInputFromFnLastPartialBatch(self, strategy):
|
|
|
|
def dataset_fn(input_context):
|
|
global_batch_size = 8
|
|
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
|
|
dataset = dataset_ops.DatasetV2.range(14).batch(
|
|
batch_size, drop_remainder=False)
|
|
return dataset.shard(input_context.num_input_pipelines,
|
|
input_context.input_pipeline_id)
|
|
|
|
input_iterator = iter(
|
|
strategy.distribute_datasets_from_function(dataset_fn))
|
|
|
|
@def_function.function
|
|
def run(input_iterator):
|
|
return strategy.run(lambda x: x, args=(next(input_iterator),))
|
|
|
|
# Let the complete batch go.
|
|
run(input_iterator)
|
|
# `result` is an incomplete batch
|
|
result = run(input_iterator)
|
|
|
|
expected_data_on_worker = {'chief': [8, 9, 10, 11], 'worker': [12, 13]}
|
|
self.assertAllEqual(
|
|
expected_data_on_worker[multi_worker_test_base.get_task_type()],
|
|
result.numpy())
|
|
|
|
def testReduceHostTensor(self, strategy):
|
|
reduced = strategy.reduce(
|
|
reduce_util.ReduceOp.SUM, array_ops.identity(1.), axis=None)
|
|
self.assertEqual(reduced.numpy(), 2.)
|
|
|
|
def testReduceToHostTensor(self, strategy):
|
|
value = array_ops.identity(1.)
|
|
reduced = strategy.extended.reduce_to(reduce_util.ReduceOp.SUM, value,
|
|
value)
|
|
self.assertEqual(reduced.numpy(), 2.)
|
|
|
|
def testBatchReduceToHostTensor(self, strategy):
|
|
value = array_ops.identity(1.)
|
|
reduced = strategy.extended.batch_reduce_to(reduce_util.ReduceOp.SUM,
|
|
[(value, value),
|
|
(value, value)])
|
|
self.assertAllEqual([2., 2.], reduced)
|
|
|
|
def testReduceDeviceTensors(self, strategy):
|
|
value = strategy.run(lambda: array_ops.identity(1.))
|
|
reduced = strategy.reduce(reduce_util.ReduceOp.SUM, value, axis=None)
|
|
self.assertEqual(reduced.numpy(), 2.)
|
|
|
|
def testReduceToDeviceTensors(self, strategy):
|
|
value = strategy.run(lambda: array_ops.identity(1.))
|
|
reduced = strategy.extended.reduce_to(reduce_util.ReduceOp.SUM, value,
|
|
value)
|
|
self.assertEqual(reduced.numpy(), 2.)
|
|
|
|
def testBatchReduceToDeviceTensors(self, strategy):
|
|
value = strategy.run(lambda: array_ops.identity(1.))
|
|
reduced = strategy.extended.batch_reduce_to(reduce_util.ReduceOp.SUM,
|
|
[(value, value),
|
|
(value, value)])
|
|
self.assertAllEqual([2., 2.], reduced)
|
|
|
|
# TODO(crccw): add a test that mixes device and host tensors after multi
|
|
# worker strategy combinations can run on a fixed number of GPUs.
|
|
|
|
|
|
class StrategyClusterResolverTest(test.TestCase, parameterized.TestCase):
|
|
|
|
@combinations.generate(
|
|
combinations.combine(
|
|
strategy=[strategy_combinations.multi_worker_mirrored_2x1_cpu] +
|
|
strategy_combinations.all_strategies,
|
|
mode=['eager']))
|
|
def testClusterResolverProperty(self, strategy):
|
|
# CollectiveAllReduceStrategy and TPUStrategy must have a cluster resolver.
|
|
# `None` otherwise.
|
|
resolver = strategy.cluster_resolver
|
|
if not isinstance(strategy, CollectiveAllReduceStrategy) and not isinstance(
|
|
strategy, tpu_strategy.TPUStrategy):
|
|
self.assertIsNone(resolver)
|
|
return
|
|
|
|
with strategy.scope():
|
|
self.assertIs(strategy.cluster_resolver, resolver)
|
|
|
|
self.assertTrue(hasattr(resolver, 'cluster_spec'))
|
|
self.assertTrue(hasattr(resolver, 'master'))
|
|
self.assertTrue(hasattr(resolver, 'num_accelerators'))
|
|
self.assertTrue(hasattr(resolver, 'task_id'))
|
|
self.assertTrue(hasattr(resolver, 'task_type'))
|
|
if isinstance(strategy, CollectiveAllReduceStrategy):
|
|
self.assertEqual(resolver.task_id, 0)
|
|
self.assertAllInSet(resolver.task_type, ['chief', 'worker'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_util.main()
|