STT-tensorflow/tensorflow/python/distribute/strategy_common_test.py
Rick Chao 4d13d6416d Make cluster_resolver standard property in tf.distribute strategies.
PiperOrigin-RevId: 317771299
Change-Id: I71b5c585cef7bd7ef80e66b75e30287fddcf89e2
2020-06-22 18:19:14 -07:00

224 lines
8.6 KiB
Python

# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for common methods in strategy classes."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl.testing import parameterized
import numpy as np
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import multi_worker_test_base
from tensorflow.python.distribute import reduce_util
from tensorflow.python.distribute import strategy_combinations
from tensorflow.python.distribute import strategy_test_lib
from tensorflow.python.distribute.collective_all_reduce_strategy import CollectiveAllReduceStrategy
from tensorflow.python.distribute.tpu_strategy import TPUStrategy
from tensorflow.python.eager import def_function
from tensorflow.python.framework import dtypes
from tensorflow.python.ops import array_ops
from tensorflow.python.platform import test
class StrategyReduceTest(test.TestCase, parameterized.TestCase):
@combinations.generate(
combinations.combine(
strategy=[strategy_combinations.multi_worker_mirrored_two_workers] +
strategy_combinations.strategies_minus_tpu,
mode=['eager']))
def testSimpleReduce(self, strategy):
def fn_eager():
def replica_fn():
return array_ops.ones((), dtypes.float32)
per_replica_value = strategy.run(replica_fn)
return strategy.reduce(
reduce_util.ReduceOp.SUM, value=per_replica_value, axis=None)
fn_graph = def_function.function(fn_eager)
# Run reduce under the strategy scope to explicitly enter
# strategy default_device scope.
with strategy.scope():
self.assertEqual(fn_eager().numpy(), 1.0 * strategy.num_replicas_in_sync)
self.assertEqual(fn_graph().numpy(), 1.0 * strategy.num_replicas_in_sync)
# Run reduce without a strategy scope to implicitly enter
# strategy default_device scope.
self.assertEqual(fn_eager().numpy(), 1.0 * strategy.num_replicas_in_sync)
self.assertEqual(fn_graph().numpy(), 1.0 * strategy.num_replicas_in_sync)
@combinations.generate(
combinations.combine(
strategy=[strategy_combinations.multi_worker_mirrored_two_workers],
mode=['eager']))
class DistributedCollectiveAllReduceStrategyTest(
strategy_test_lib.DistributionTestBase,
parameterized.TestCase):
def testDatasetFromFunction(self, strategy):
def dataset_fn(input_context):
global_batch_size = 10
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
d = dataset_ops.DatasetV2.range(100).repeat().batch(batch_size)
return d.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
expected_data_on_worker = [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
input_iterator = iter(
strategy.experimental_distribute_datasets_from_function(dataset_fn))
@def_function.function
def run(iterator):
return strategy.experimental_local_results(iterator.get_next())
result = run(input_iterator)
self.assertTrue(
np.array_equal(
result[0].numpy(),
expected_data_on_worker[multi_worker_test_base.get_task_index()]))
def testSimpleInputFromDatasetLastPartialBatch(self, strategy):
global_batch_size = 8
dataset = dataset_ops.DatasetV2.range(14).batch(
global_batch_size, drop_remainder=False)
input_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@def_function.function
def run(input_iterator):
return strategy.run(lambda x: x, args=(next(input_iterator),))
# Let the complete batch go.
run(input_iterator)
# `result` is an incomplete batch
result = run(input_iterator)
expected_data_on_worker = [[8, 9, 10], [11, 12, 13]]
self.assertTrue(
np.array_equal(
result.numpy(),
expected_data_on_worker[multi_worker_test_base.get_task_index()]))
def testSimpleInputFromFnLastPartialBatch(self, strategy):
def dataset_fn(input_context):
global_batch_size = 8
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = dataset_ops.DatasetV2.range(14).batch(
batch_size, drop_remainder=False)
return dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
input_iterator = iter(
strategy.experimental_distribute_datasets_from_function(dataset_fn))
@def_function.function
def run(input_iterator):
return strategy.run(lambda x: x, args=(next(input_iterator),))
# Let the complete batch go.
run(input_iterator)
# `result` is an incomplete batch
result = run(input_iterator)
expected_data_on_worker = [[8, 9, 10, 11], [12, 13]]
self.assertTrue(
np.array_equal(
result.numpy(), expected_data_on_worker[
multi_worker_test_base.get_task_index()]))
def testReduceHostTensor(self, strategy):
reduced = strategy.reduce(
reduce_util.ReduceOp.SUM, array_ops.identity(1.), axis=None)
self.assertEqual(reduced.numpy(), 2.)
def testReduceToHostTensor(self, strategy):
value = array_ops.identity(1.)
reduced = strategy.extended.reduce_to(reduce_util.ReduceOp.SUM, value,
value)
self.assertEqual(reduced.numpy(), 2.)
def testBatchReduceToHostTensor(self, strategy):
value = array_ops.identity(1.)
reduced = strategy.extended.batch_reduce_to(reduce_util.ReduceOp.SUM,
[(value, value),
(value, value)])
self.assertAllEqual(reduced, [2., 2.])
def testReduceDeviceTensors(self, strategy):
value = strategy.run(lambda: array_ops.identity(1.))
reduced = strategy.reduce(reduce_util.ReduceOp.SUM, value, axis=None)
self.assertEqual(reduced.numpy(), 2.)
def testReduceToDeviceTensors(self, strategy):
value = strategy.run(lambda: array_ops.identity(1.))
reduced = strategy.extended.reduce_to(reduce_util.ReduceOp.SUM, value,
value)
self.assertEqual(reduced.numpy(), 2.)
def testBatchReduceToDeviceTensors(self, strategy):
value = strategy.run(lambda: array_ops.identity(1.))
reduced = strategy.extended.batch_reduce_to(reduce_util.ReduceOp.SUM,
[(value, value),
(value, value)])
self.assertAllEqual(reduced, [2., 2.])
# TODO(crccw): add a test that mixes device and host tensors after multi
# worker strategy combinations can run on a fixed number of GPUs.
class StrategyClusterResolverTest(test.TestCase, parameterized.TestCase):
@combinations.generate(
combinations.combine(
strategy=[strategy_combinations.multi_worker_mirrored_two_workers] +
strategy_combinations.all_strategies,
mode=['eager']))
def testClusterResolverProperty(self, strategy):
# CollectiveAllReduceStrategy and TPUStrategy must have a cluster resolver.
# `None` otherwise.
resolver = strategy.cluster_resolver
if not isinstance(strategy, CollectiveAllReduceStrategy) and not isinstance(
strategy, TPUStrategy):
self.assertIsNone(resolver)
return
with strategy.scope():
self.assertIs(strategy.cluster_resolver, resolver)
self.assertTrue(hasattr(resolver, 'cluster_spec'))
self.assertTrue(hasattr(resolver, 'environment'))
self.assertTrue(hasattr(resolver, 'master'))
self.assertTrue(hasattr(resolver, 'num_accelerators'))
self.assertIsNone(resolver.rpc_layer)
if isinstance(strategy, CollectiveAllReduceStrategy):
self.assertGreaterEqual(resolver.task_id, 0)
self.assertLessEqual(resolver.task_id, 1)
self.assertEqual(resolver.task_type, 'worker')
elif isinstance(strategy, TPUStrategy):
# TPUStrategy does not have task_id and task_type applicable.
self.assertIsNone(resolver.task_id)
self.assertIsNone(resolver.task_type)
if __name__ == '__main__':
combinations.main()