diff --git a/tensorflow/python/distribute/BUILD b/tensorflow/python/distribute/BUILD index acbffb84089..01ae1b61f6a 100644 --- a/tensorflow/python/distribute/BUILD +++ b/tensorflow/python/distribute/BUILD @@ -1181,6 +1181,23 @@ distribute_py_test( ], ) +distribute_py_test( + name = "strategy_reduce_test", + srcs = ["strategy_reduce_test.py"], + main = "strategy_reduce_test.py", + tags = [ + "multi_and_single_gpu", + ], + deps = [ + ":combinations", + ":strategy_combinations", + "//tensorflow/python:errors", + "//tensorflow/python:variables", + "//tensorflow/python/eager:test", + "@absl_py//absl/testing:parameterized", + ], +) + distribute_py_test( name = "minimize_loss_test", srcs = ["minimize_loss_test.py"], diff --git a/tensorflow/python/distribute/custom_training_loop_models_test.py b/tensorflow/python/distribute/custom_training_loop_models_test.py index 48f2af0349a..5a9384bb7e0 100644 --- a/tensorflow/python/distribute/custom_training_loop_models_test.py +++ b/tensorflow/python/distribute/custom_training_loop_models_test.py @@ -26,6 +26,7 @@ import numpy as np from tensorflow.python import keras from tensorflow.python.data.ops import dataset_ops from tensorflow.python.distribute import combinations +from tensorflow.python.distribute import reduce_util from tensorflow.python.distribute import strategy_combinations from tensorflow.python.eager import backprop from tensorflow.python.eager import def_function @@ -448,6 +449,35 @@ class KerasModelsTest(test.TestCase, parameterized.TestCase): train_step(input_iterator) + @combinations.generate( + combinations.combine( + distribution=strategy_combinations.all_strategies, mode=["eager"])) + def test_reduce_loss(self, distribution): + inputs = np.zeros((10, 4), dtype=np.float32) + targets = np.zeros((10, 1), dtype=np.float32) + dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets)) + dataset = dataset.batch(10, drop_remainder=False) + input_iterator = iter(distribution.experimental_distribute_dataset(dataset)) + + with distribution.scope(): + x = keras.layers.Input(shape=(4), name="input") + y = keras.layers.Dense(3, name="dense")(x) + model = keras.Model(x, y) + + @def_function.function + def train_step(iterator): + + def step_fn(inputs): + images, targets = inputs + outputs = model(images) + loss = keras.losses.sparse_categorical_crossentropy(targets, outputs) + return loss + + return distribution.run(step_fn, args=(next(iterator),)) + + loss = train_step(input_iterator) + loss = distribution.reduce(reduce_util.ReduceOp.MEAN, loss, axis=0) + @combinations.generate( combinations.combine( distribution=strategy_combinations.tpu_strategies, mode=["eager"])) diff --git a/tensorflow/python/distribute/distribute_lib.py b/tensorflow/python/distribute/distribute_lib.py index 4531e922840..ecdc4fad159 100644 --- a/tensorflow/python/distribute/distribute_lib.py +++ b/tensorflow/python/distribute/distribute_lib.py @@ -114,6 +114,7 @@ from tensorflow.python.distribute import distribution_strategy_context from tensorflow.python.distribute import numpy_dataset from tensorflow.python.distribute import reduce_util from tensorflow.python.eager import context as eager_context +from tensorflow.python.eager import def_function from tensorflow.python.eager import monitoring from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes @@ -628,6 +629,10 @@ class StrategyBase(object): # a sensible value. extended._retrace_functions_for_each_device = True + # Below are the dicts of axis(int) -> `tf.function`. + self._mean_reduce_helper_fns = {} + self._reduce_sum_fns = {} + @property def extended(self): """`tf.distribute.StrategyExtended` with additional methods.""" @@ -1014,8 +1019,25 @@ class StrategyBase(object): if axis is None: return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access if reduce_op == reduce_util.ReduceOp.SUM: - value = self.run( - lambda v: math_ops.reduce_sum(v, axis=axis), args=(value,)) + + def reduce_sum(v): + return math_ops.reduce_sum(v, axis=axis) + + if eager_context.executing_eagerly(): + # As some strategies (e.g. TPUStrategy) doesn't support pure eager + # execution, wrap the `reduce_sum_fn` with a `tf.function` so it can be + # run from eager mode. Cache the tf.function by `axis` to avoid the + # same function to be traced again. + if axis not in self._reduce_sum_fns: + + def reduce_sum_fn(v): + return self.run(reduce_sum, args=(v,)) + + self._reduce_sum_fns[axis] = def_function.function(reduce_sum_fn) + value = self._reduce_sum_fns[axis](value) + else: + value = self.run(reduce_sum, args=(value,)) + return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access if reduce_op != reduce_util.ReduceOp.MEAN: raise TypeError("Expected `reduce_op` to be a `tf.distribute.ReduceOp`, " @@ -1062,7 +1084,22 @@ class StrategyBase(object): # reduce is complete? return numer, denom - numer, denom = self.run(mean_reduce_helper, args=(value,)) + if eager_context.executing_eagerly(): + # As some strategies (e.g. TPUStrategy) doesn't support pure eager + # execution, wrap the `mean_reduce_helper` with a `tf.function` so it can + # be run from eager mode. Cache the tf.function by `axis` to avoid the + # same function to be traced again. + if axis not in self._mean_reduce_helper_fns: + + def mean_reduce_fn(v): + return self.run(mean_reduce_helper, args=(v,)) + + self._mean_reduce_helper_fns[axis] = def_function.function( + mean_reduce_fn) + numer, denom = self._mean_reduce_helper_fns[axis](value) + else: + numer, denom = self.run(mean_reduce_helper, args=(value,)) + # TODO(josh11b): Should batch reduce here instead of doing two. numer = self._extended._reduce(reduce_util.ReduceOp.SUM, numer) # pylint: disable=protected-access denom = self._extended._reduce(reduce_util.ReduceOp.SUM, denom) # pylint: disable=protected-access diff --git a/tensorflow/python/distribute/strategy_reduce_test.py b/tensorflow/python/distribute/strategy_reduce_test.py new file mode 100644 index 00000000000..a87cce2f0b8 --- /dev/null +++ b/tensorflow/python/distribute/strategy_reduce_test.py @@ -0,0 +1,52 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for `strategy.reduce`.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from absl.testing import parameterized + +from tensorflow.python.distribute import combinations +from tensorflow.python.distribute import reduce_util +from tensorflow.python.distribute import strategy_combinations +from tensorflow.python.eager import def_function +from tensorflow.python.eager import test +from tensorflow.python.framework import constant_op + + +class StrategyReduceTest(test.TestCase, parameterized.TestCase): + + @combinations.generate( + combinations.combine( + distribution=strategy_combinations.all_strategies, + mode=["eager"] + )) + def test_reduce_with_axis(self, distribution): + + @def_function.function + def fn(): + return constant_op.constant([1., 2.]) + x = distribution.run(fn) + + x_m = distribution.reduce(reduce_util.ReduceOp.MEAN, x, axis=0) + self.assertEqual(1.5, self.evaluate(x_m)) + x_s = distribution.reduce(reduce_util.ReduceOp.SUM, x, axis=0) + self.assertEqual(3 * distribution.num_replicas_in_sync, self.evaluate(x_s)) + + +if __name__ == "__main__": + test.main()