When calling `strategy.reduce` in eager mode, wrap the `strategy.run` calls inside with `tf.function` so it is compatible with TPUStrategy.

PiperOrigin-RevId: 312847673
Change-Id: I6db92c34ba24e160689da3fca2fe0a3c26223d52
This commit is contained in:
Ruoxin Sang 2020-05-22 12:12:28 -07:00 committed by TensorFlower Gardener
parent 19f4ac7694
commit 4797b3b908
4 changed files with 139 additions and 3 deletions

View File

@ -1181,6 +1181,23 @@ distribute_py_test(
], ],
) )
distribute_py_test(
name = "strategy_reduce_test",
srcs = ["strategy_reduce_test.py"],
main = "strategy_reduce_test.py",
tags = [
"multi_and_single_gpu",
],
deps = [
":combinations",
":strategy_combinations",
"//tensorflow/python:errors",
"//tensorflow/python:variables",
"//tensorflow/python/eager:test",
"@absl_py//absl/testing:parameterized",
],
)
distribute_py_test( distribute_py_test(
name = "minimize_loss_test", name = "minimize_loss_test",
srcs = ["minimize_loss_test.py"], srcs = ["minimize_loss_test.py"],

View File

@ -26,6 +26,7 @@ import numpy as np
from tensorflow.python import keras from tensorflow.python import keras
from tensorflow.python.data.ops import dataset_ops from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.distribute import combinations from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import reduce_util
from tensorflow.python.distribute import strategy_combinations from tensorflow.python.distribute import strategy_combinations
from tensorflow.python.eager import backprop from tensorflow.python.eager import backprop
from tensorflow.python.eager import def_function from tensorflow.python.eager import def_function
@ -448,6 +449,35 @@ class KerasModelsTest(test.TestCase, parameterized.TestCase):
train_step(input_iterator) train_step(input_iterator)
@combinations.generate(
combinations.combine(
distribution=strategy_combinations.all_strategies, mode=["eager"]))
def test_reduce_loss(self, distribution):
inputs = np.zeros((10, 4), dtype=np.float32)
targets = np.zeros((10, 1), dtype=np.float32)
dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
dataset = dataset.batch(10, drop_remainder=False)
input_iterator = iter(distribution.experimental_distribute_dataset(dataset))
with distribution.scope():
x = keras.layers.Input(shape=(4), name="input")
y = keras.layers.Dense(3, name="dense")(x)
model = keras.Model(x, y)
@def_function.function
def train_step(iterator):
def step_fn(inputs):
images, targets = inputs
outputs = model(images)
loss = keras.losses.sparse_categorical_crossentropy(targets, outputs)
return loss
return distribution.run(step_fn, args=(next(iterator),))
loss = train_step(input_iterator)
loss = distribution.reduce(reduce_util.ReduceOp.MEAN, loss, axis=0)
@combinations.generate( @combinations.generate(
combinations.combine( combinations.combine(
distribution=strategy_combinations.tpu_strategies, mode=["eager"])) distribution=strategy_combinations.tpu_strategies, mode=["eager"]))

View File

@ -114,6 +114,7 @@ from tensorflow.python.distribute import distribution_strategy_context
from tensorflow.python.distribute import numpy_dataset from tensorflow.python.distribute import numpy_dataset
from tensorflow.python.distribute import reduce_util from tensorflow.python.distribute import reduce_util
from tensorflow.python.eager import context as eager_context from tensorflow.python.eager import context as eager_context
from tensorflow.python.eager import def_function
from tensorflow.python.eager import monitoring from tensorflow.python.eager import monitoring
from tensorflow.python.framework import constant_op from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes from tensorflow.python.framework import dtypes
@ -628,6 +629,10 @@ class StrategyBase(object):
# a sensible value. # a sensible value.
extended._retrace_functions_for_each_device = True extended._retrace_functions_for_each_device = True
# Below are the dicts of axis(int) -> `tf.function`.
self._mean_reduce_helper_fns = {}
self._reduce_sum_fns = {}
@property @property
def extended(self): def extended(self):
"""`tf.distribute.StrategyExtended` with additional methods.""" """`tf.distribute.StrategyExtended` with additional methods."""
@ -1014,8 +1019,25 @@ class StrategyBase(object):
if axis is None: if axis is None:
return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access
if reduce_op == reduce_util.ReduceOp.SUM: if reduce_op == reduce_util.ReduceOp.SUM:
value = self.run(
lambda v: math_ops.reduce_sum(v, axis=axis), args=(value,)) def reduce_sum(v):
return math_ops.reduce_sum(v, axis=axis)
if eager_context.executing_eagerly():
# As some strategies (e.g. TPUStrategy) doesn't support pure eager
# execution, wrap the `reduce_sum_fn` with a `tf.function` so it can be
# run from eager mode. Cache the tf.function by `axis` to avoid the
# same function to be traced again.
if axis not in self._reduce_sum_fns:
def reduce_sum_fn(v):
return self.run(reduce_sum, args=(v,))
self._reduce_sum_fns[axis] = def_function.function(reduce_sum_fn)
value = self._reduce_sum_fns[axis](value)
else:
value = self.run(reduce_sum, args=(value,))
return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access
if reduce_op != reduce_util.ReduceOp.MEAN: if reduce_op != reduce_util.ReduceOp.MEAN:
raise TypeError("Expected `reduce_op` to be a `tf.distribute.ReduceOp`, " raise TypeError("Expected `reduce_op` to be a `tf.distribute.ReduceOp`, "
@ -1062,7 +1084,22 @@ class StrategyBase(object):
# reduce is complete? # reduce is complete?
return numer, denom return numer, denom
numer, denom = self.run(mean_reduce_helper, args=(value,)) if eager_context.executing_eagerly():
# As some strategies (e.g. TPUStrategy) doesn't support pure eager
# execution, wrap the `mean_reduce_helper` with a `tf.function` so it can
# be run from eager mode. Cache the tf.function by `axis` to avoid the
# same function to be traced again.
if axis not in self._mean_reduce_helper_fns:
def mean_reduce_fn(v):
return self.run(mean_reduce_helper, args=(v,))
self._mean_reduce_helper_fns[axis] = def_function.function(
mean_reduce_fn)
numer, denom = self._mean_reduce_helper_fns[axis](value)
else:
numer, denom = self.run(mean_reduce_helper, args=(value,))
# TODO(josh11b): Should batch reduce here instead of doing two. # TODO(josh11b): Should batch reduce here instead of doing two.
numer = self._extended._reduce(reduce_util.ReduceOp.SUM, numer) # pylint: disable=protected-access numer = self._extended._reduce(reduce_util.ReduceOp.SUM, numer) # pylint: disable=protected-access
denom = self._extended._reduce(reduce_util.ReduceOp.SUM, denom) # pylint: disable=protected-access denom = self._extended._reduce(reduce_util.ReduceOp.SUM, denom) # pylint: disable=protected-access

View File

@ -0,0 +1,52 @@
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for `strategy.reduce`."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl.testing import parameterized
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import reduce_util
from tensorflow.python.distribute import strategy_combinations
from tensorflow.python.eager import def_function
from tensorflow.python.eager import test
from tensorflow.python.framework import constant_op
class StrategyReduceTest(test.TestCase, parameterized.TestCase):
@combinations.generate(
combinations.combine(
distribution=strategy_combinations.all_strategies,
mode=["eager"]
))
def test_reduce_with_axis(self, distribution):
@def_function.function
def fn():
return constant_op.constant([1., 2.])
x = distribution.run(fn)
x_m = distribution.reduce(reduce_util.ReduceOp.MEAN, x, axis=0)
self.assertEqual(1.5, self.evaluate(x_m))
x_s = distribution.reduce(reduce_util.ReduceOp.SUM, x, axis=0)
self.assertEqual(3 * distribution.num_replicas_in_sync, self.evaluate(x_s))
if __name__ == "__main__":
test.main()