This allows users to put code that creates slot variables outside of a strategy scope, which is nice. One common use case is checkpoint restore. For slot variables that are created in `apply_gradients`, this change won't affect them as `apply_gradients` is often called inside the `fn` passed to `strategy.run`, and we enter the scope automatically inside `strategy.run`. If optimizer captured scope is not the same as the one used to create variables, or if slot variable creation uses a different scope other than the one optimizer captured, an error will be raised. Meanwhile, also changes the behavior of slot variable restoration from delayed to immediate when a strategy is present, achieving more consistent behavior (see b/172323399) and avoiding double initialization. PiperOrigin-RevId: 343568263 Change-Id: I2c19a592638a718e11974fcfb17a5a2b98139bf8
2721 lines
103 KiB
Python
2721 lines
103 KiB
Python
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""Tests for tf.keras models using tf.distribute.Strategy."""
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import os
|
|
|
|
from absl.testing import parameterized
|
|
import numpy as np
|
|
|
|
from tensorflow.python import keras
|
|
from tensorflow.python.data.experimental.ops import cardinality
|
|
from tensorflow.python.data.experimental.ops import distribute_options
|
|
from tensorflow.python.data.experimental.ops import writers
|
|
from tensorflow.python.data.ops import dataset_ops
|
|
from tensorflow.python.data.ops import readers
|
|
from tensorflow.python.distribute import central_storage_strategy
|
|
from tensorflow.python.distribute import collective_all_reduce_strategy
|
|
from tensorflow.python.distribute import combinations as ds_combinations
|
|
from tensorflow.python.distribute import distribution_strategy_context
|
|
from tensorflow.python.distribute import mirrored_strategy
|
|
from tensorflow.python.distribute import multi_process_runner
|
|
from tensorflow.python.distribute import multi_worker_test_base
|
|
from tensorflow.python.distribute import parameter_server_strategy
|
|
from tensorflow.python.distribute import parameter_server_strategy_v2
|
|
from tensorflow.python.distribute import reduce_util
|
|
from tensorflow.python.distribute import strategy_combinations
|
|
from tensorflow.python.distribute import tpu_strategy
|
|
from tensorflow.python.distribute import values as ds_values_lib
|
|
from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver
|
|
from tensorflow.python.eager import backprop
|
|
from tensorflow.python.eager import context
|
|
from tensorflow.python.eager import def_function
|
|
from tensorflow.python.framework import dtypes
|
|
from tensorflow.python.framework import sparse_tensor
|
|
from tensorflow.python.framework import test_combinations as combinations
|
|
from tensorflow.python.keras import testing_utils
|
|
from tensorflow.python.keras.distribute import distributed_training_utils
|
|
from tensorflow.python.keras.distribute import distributed_training_utils_v1
|
|
from tensorflow.python.keras.distribute import optimizer_combinations
|
|
from tensorflow.python.keras.distribute.strategy_combinations import all_strategies
|
|
from tensorflow.python.keras.distribute.strategy_combinations import multi_worker_mirrored_strategies
|
|
from tensorflow.python.keras.distribute.strategy_combinations import strategies_minus_default_minus_tpu
|
|
from tensorflow.python.keras.distribute.strategy_combinations import strategies_minus_tpu
|
|
from tensorflow.python.keras.distribute.strategy_combinations import tpu_strategies
|
|
from tensorflow.python.keras.engine import base_layer_utils
|
|
from tensorflow.python.keras.mixed_precision import policy
|
|
from tensorflow.python.keras.optimizer_v2 import gradient_descent as gradient_descent_keras
|
|
from tensorflow.python.keras.utils import np_utils
|
|
from tensorflow.python.ops import array_ops
|
|
from tensorflow.python.ops import check_ops
|
|
from tensorflow.python.ops import math_ops
|
|
from tensorflow.python.ops import nn
|
|
from tensorflow.python.ops import parsing_ops
|
|
from tensorflow.python.ops import variables
|
|
from tensorflow.python.ops.losses import loss_reduction
|
|
from tensorflow.python.ops.ragged import ragged_tensor
|
|
from tensorflow.python.platform import test
|
|
from tensorflow.python.training import gradient_descent
|
|
from tensorflow.python.training import rmsprop
|
|
from tensorflow.python.training import server_lib
|
|
from tensorflow.python.util import nest
|
|
|
|
_RANDOM_SEED = 1337
|
|
_TRAIN_SIZE = 200
|
|
_INPUT_SIZE = (10,)
|
|
_NUM_CLASS = 2
|
|
|
|
# Note: Please make sure the tests in this file are also covered in
|
|
# keras_backward_compat_test for features that are supported with both APIs.
|
|
|
|
# TODO(anjalisridhar): Add a decorator that will allow us to run these tests as
|
|
# part of the tf.keras unit tests suite.
|
|
|
|
|
|
def simple_sequential_model():
|
|
model = keras.models.Sequential()
|
|
model.add(keras.layers.Dense(16, activation='relu', input_shape=_INPUT_SIZE))
|
|
model.add(keras.layers.Dropout(0.1))
|
|
model.add(keras.layers.Dense(_NUM_CLASS, activation='softmax'))
|
|
return model
|
|
|
|
|
|
def simple_subclassed_model(num_labels=_NUM_CLASS):
|
|
|
|
class _SimpleMLP(keras.Model):
|
|
|
|
def __init__(self, num_labels):
|
|
super(_SimpleMLP, self).__init__()
|
|
self.dense = keras.layers.Dense(num_labels)
|
|
|
|
def call(self, inputs):
|
|
return self.dense(inputs)
|
|
|
|
return _SimpleMLP(num_labels)
|
|
|
|
|
|
def simple_multi_inputs_multi_outputs_model():
|
|
input_a = keras.layers.Input(shape=(16,), name='input_a')
|
|
input_b = keras.layers.Input(shape=(16,), name='input_b')
|
|
|
|
merged = keras.layers.concatenate([input_a, input_b], name='merge')
|
|
output_c = keras.layers.Dense(3, activation='softmax', name='dense_2')(merged)
|
|
output_d = keras.layers.Dense(2, activation='softmax', name='dense_3')(merged)
|
|
model = keras.models.Model(
|
|
inputs=[input_a, input_b], outputs=[output_c, output_d])
|
|
return model
|
|
|
|
|
|
def get_multi_inputs_multi_outputs_data():
|
|
(a_train, c_train), (a_test, c_test) = testing_utils.get_test_data(
|
|
train_samples=_TRAIN_SIZE,
|
|
test_samples=50,
|
|
input_shape=(16,),
|
|
num_classes=3,
|
|
random_seed=_RANDOM_SEED)
|
|
(b_train, d_train), (b_test, d_test) = testing_utils.get_test_data(
|
|
train_samples=_TRAIN_SIZE,
|
|
test_samples=50,
|
|
input_shape=(16,),
|
|
num_classes=2,
|
|
random_seed=_RANDOM_SEED)
|
|
(m_train, _), (m_test, _) = testing_utils.get_test_data(
|
|
train_samples=_TRAIN_SIZE,
|
|
test_samples=50,
|
|
input_shape=(8,),
|
|
num_classes=2,
|
|
random_seed=_RANDOM_SEED)
|
|
|
|
c_train = np_utils.to_categorical(c_train)
|
|
c_test = np_utils.to_categorical(c_test)
|
|
d_train = np_utils.to_categorical(d_train)
|
|
d_test = np_utils.to_categorical(d_test)
|
|
|
|
train_data = {
|
|
'input_a': a_train,
|
|
'input_b': b_train,
|
|
'input_m': m_train,
|
|
'output_c': c_train,
|
|
'output_d': d_train
|
|
}
|
|
test_data = {
|
|
'input_a': a_test,
|
|
'input_b': b_test,
|
|
'input_m': m_test,
|
|
'output_c': c_test,
|
|
'output_d': d_test
|
|
}
|
|
|
|
return (train_data, test_data)
|
|
|
|
|
|
def batch_wrapper(dataset, batch_size, distribution, repeat=None):
|
|
if repeat:
|
|
dataset = dataset.repeat(repeat)
|
|
# TPUs currently require fully defined input shapes, drop_remainder ensures
|
|
# the input will have fully defined shapes.
|
|
if _is_tpu_strategy(distribution):
|
|
return dataset.batch(batch_size, drop_remainder=True)
|
|
else:
|
|
return dataset.batch(batch_size)
|
|
|
|
|
|
def get_model():
|
|
x = keras.layers.Input(shape=(3,), name='input')
|
|
y = keras.layers.Dense(4, name='dense')(x)
|
|
model = keras.Model(x, y)
|
|
return model
|
|
|
|
|
|
def get_sample_weights_model():
|
|
x = keras.layers.Input(shape=(1,), name='input')
|
|
y = keras.layers.Dense(
|
|
1, kernel_initializer='ones', bias_initializer='zeros', name='dense')(
|
|
x)
|
|
model = keras.Model(x, y)
|
|
return model
|
|
|
|
|
|
def get_dataset(distribution):
|
|
inputs = np.zeros((10, 3), dtype=np.float32)
|
|
targets = np.zeros((10, 4), dtype=np.float32)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
|
|
dataset = dataset.repeat(100)
|
|
dataset = batch_wrapper(dataset, 10, distribution)
|
|
return dataset
|
|
|
|
|
|
def get_predict_dataset(distribution):
|
|
inputs = np.zeros((10, 3), dtype=np.float32)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices(inputs)
|
|
dataset = dataset.repeat(100)
|
|
dataset = batch_wrapper(dataset, 10, distribution)
|
|
return dataset
|
|
|
|
|
|
def convert_numpy_to_dataset_with_unknown_cardinality(inputs, targets=None):
|
|
if targets is not None:
|
|
input_slices = (inputs, targets)
|
|
dummy_op = (lambda inp, target: True)
|
|
else:
|
|
input_slices = inputs
|
|
dummy_op = (lambda inp: True)
|
|
|
|
original_dataset = (dataset_ops.Dataset.from_tensor_slices(input_slices))
|
|
ds_with_unknown_cardinality = (
|
|
original_dataset.filter(dummy_op).batch(10, drop_remainder=True))
|
|
return ds_with_unknown_cardinality
|
|
|
|
|
|
def multi_input_output_model():
|
|
a = keras.layers.Input(shape=(3,), name='input_a')
|
|
b = keras.layers.Input(shape=(5,), name='input_b')
|
|
# TODO(anjalisridhar): Change the output dimension of the second Dense layer
|
|
# once the iterator output validation issue has been fixed.
|
|
dense_1 = keras.layers.Dense(7, name='dense_1')
|
|
dense_2 = keras.layers.Dense(7, name='dense_2')
|
|
c = dense_1(a)
|
|
d = dense_2(b)
|
|
e = keras.layers.Dropout(0.5, name='dropout')(c)
|
|
model = keras.models.Model([a, b], [d, e])
|
|
return model
|
|
|
|
|
|
def strategy_minus_tpu_combinations():
|
|
return combinations.combine(
|
|
distribution=strategies_minus_tpu, mode=['graph', 'eager'])
|
|
|
|
|
|
def tpu_strategy_combinations():
|
|
return combinations.combine(
|
|
distribution=tpu_strategies, mode=['graph', 'eager'])
|
|
|
|
|
|
def tpu_strategy_combinations_graph_only():
|
|
return combinations.combine(distribution=tpu_strategies, mode=['graph'])
|
|
|
|
|
|
def multi_worker_strategy_combinations_eager_only():
|
|
return combinations.combine(
|
|
distribution=multi_worker_mirrored_strategies, mode=['eager'])
|
|
|
|
|
|
def all_strategy_combinations():
|
|
return strategy_minus_tpu_combinations() + tpu_strategy_combinations(
|
|
) + multi_worker_strategy_combinations_eager_only()
|
|
|
|
|
|
def all_strategy_minus_default_and_tpu_combinations():
|
|
return combinations.combine(
|
|
distribution=[
|
|
strategy_combinations.one_device_strategy,
|
|
strategy_combinations.one_device_strategy_gpu,
|
|
strategy_combinations.mirrored_strategy_with_gpu_and_cpu,
|
|
strategy_combinations.mirrored_strategy_with_two_gpus,
|
|
],
|
|
mode=['graph', 'eager'])
|
|
|
|
|
|
def all_strategy_combinations_minus_default():
|
|
return (all_strategy_minus_default_and_tpu_combinations() +
|
|
tpu_strategy_combinations() +
|
|
multi_worker_strategy_combinations_eager_only())
|
|
|
|
|
|
def strategy_and_optimizer_combinations():
|
|
non_tpu_strategies = combinations.times(
|
|
strategy_minus_tpu_combinations(),
|
|
combinations.combine(
|
|
optimizer=[
|
|
optimizer_combinations.adagrad_optimizer_v1_fn,
|
|
optimizer_combinations.adam_optimizer_v1_fn,
|
|
optimizer_combinations.gradient_descent_optimizer_v1_fn,
|
|
optimizer_combinations.rmsprop_optimizer_v1_fn,
|
|
optimizer_combinations.adadelta_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adagrad_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adam_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adamax_optimizer_keras_v2_fn,
|
|
optimizer_combinations.gradient_descent_optimizer_keras_v2_fn,
|
|
optimizer_combinations.nadam_optimizer_keras_v2_fn,
|
|
optimizer_combinations.rmsprop_optimizer_keras_v2_fn,
|
|
optimizer_combinations.ftrl_optimizer_keras_v2_fn
|
|
]))
|
|
tpu_strategies_graph = combinations.combine(
|
|
distribution=tpu_strategies,
|
|
mode=['graph'],
|
|
optimizer=[
|
|
optimizer_combinations.adagrad_optimizer_v1_fn,
|
|
optimizer_combinations.adam_optimizer_v1_fn,
|
|
optimizer_combinations.gradient_descent_optimizer_v1_fn,
|
|
optimizer_combinations.rmsprop_optimizer_v1_fn,
|
|
optimizer_combinations.adagrad_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adam_optimizer_keras_v2_fn,
|
|
optimizer_combinations.gradient_descent_optimizer_keras_v2_fn,
|
|
optimizer_combinations.rmsprop_optimizer_keras_v2_fn
|
|
])
|
|
tpu_strategies_eager = combinations.combine(
|
|
distribution=tpu_strategies,
|
|
mode=['eager'],
|
|
optimizer=[
|
|
optimizer_combinations.adagrad_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adam_optimizer_keras_v2_fn,
|
|
optimizer_combinations.gradient_descent_optimizer_keras_v2_fn,
|
|
optimizer_combinations.rmsprop_optimizer_keras_v2_fn
|
|
])
|
|
multi_worker_eager = combinations.combine(
|
|
distribution=multi_worker_mirrored_strategies,
|
|
mode=['eager'],
|
|
optimizer=[
|
|
optimizer_combinations.adadelta_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adagrad_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adam_optimizer_keras_v2_fn,
|
|
optimizer_combinations.adamax_optimizer_keras_v2_fn,
|
|
optimizer_combinations.gradient_descent_optimizer_keras_v2_fn,
|
|
optimizer_combinations.nadam_optimizer_keras_v2_fn,
|
|
optimizer_combinations.rmsprop_optimizer_keras_v2_fn,
|
|
optimizer_combinations.ftrl_optimizer_keras_v2_fn
|
|
])
|
|
return (non_tpu_strategies + tpu_strategies_eager + tpu_strategies_graph +
|
|
multi_worker_eager)
|
|
|
|
|
|
class BatchCountingCB(keras.callbacks.Callback):
|
|
|
|
def __init__(self):
|
|
super(BatchCountingCB, self).__init__()
|
|
self.train_begin_batches = []
|
|
self.train_end_batches = []
|
|
self.test_begin_batches = []
|
|
self.test_end_batches = []
|
|
self.predict_begin_batches = []
|
|
self.predict_end_batches = []
|
|
|
|
def on_train_batch_begin(self, batch, logs=None):
|
|
self.train_begin_batches.append(batch)
|
|
|
|
def on_train_batch_end(self, batch, logs=None):
|
|
self.train_end_batches.append(batch)
|
|
|
|
def on_test_batch_begin(self, batch, logs=None):
|
|
self.test_begin_batches.append(batch)
|
|
|
|
def on_test_batch_end(self, batch, logs=None):
|
|
self.test_end_batches.append(batch)
|
|
|
|
def on_predict_batch_begin(self, batch, logs=None):
|
|
self.predict_begin_batches.append(batch)
|
|
|
|
def on_predict_batch_end(self, batch, logs=None):
|
|
self.predict_end_batches.append(batch)
|
|
|
|
|
|
class TestDistributionStrategyWithNumpyArrays(test.TestCase,
|
|
parameterized.TestCase):
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calculating_input_params_no_steps_no_batch_size(self, distribution):
|
|
# Calculate the per_replica_batch_size scaling factor for strategies
|
|
# that use per_core_batch_size
|
|
replica_scale_factor = 1.0
|
|
if not distributed_training_utils.global_batch_size_supported(distribution):
|
|
replica_scale_factor = distribution.num_replicas_in_sync
|
|
|
|
with self.cached_session():
|
|
# Default global batch size 32 for input with 64 samples run in 2 steps
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 64, steps=None, batch_size=None)
|
|
self.assertEqual(batch_size, 32 // replica_scale_factor)
|
|
self.assertEqual(steps, 2)
|
|
|
|
# Computed global batch size 20 is lower than 32 if we pass less samples.
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 20, steps=None, batch_size=None)
|
|
self.assertEqual(batch_size, 20 // replica_scale_factor)
|
|
self.assertEqual(steps, 1)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calculating_input_params_with_steps_no_batch_size(
|
|
self, distribution):
|
|
# Calculate the per_replica_batch_size scaling factor for strategies
|
|
# that use per_core_batch_size
|
|
replica_scale_factor = 1.0
|
|
if not distributed_training_utils.global_batch_size_supported(distribution):
|
|
replica_scale_factor = distribution.num_replicas_in_sync
|
|
|
|
with self.cached_session():
|
|
# Computed global batch size is correct for number of specified 1 step
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 64, steps=1, batch_size=None)
|
|
self.assertEqual(batch_size, 64 // replica_scale_factor)
|
|
self.assertEqual(steps, 1)
|
|
|
|
# Computed global batch size is correct for number of specified 2 steps
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 64, steps=2, batch_size=None)
|
|
self.assertEqual(batch_size, 32 // replica_scale_factor)
|
|
self.assertEqual(steps, 2)
|
|
|
|
# All samples can not be consumed in specified number of steps
|
|
with self.assertRaisesRegex(ValueError, 'not divisible by steps'):
|
|
distributed_training_utils_v1.get_input_params(
|
|
distribution, 63, steps=2, batch_size=None)
|
|
|
|
# This cases is different for different strategies due to the
|
|
# difference in supported batch size being global or per-replica.
|
|
if replica_scale_factor == 1:
|
|
# Computed global batch size is correct even if not sharadable
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 63, steps=3, batch_size=None)
|
|
self.assertEqual(batch_size, 21)
|
|
self.assertEqual(steps, 3)
|
|
else:
|
|
# Computed global batch size can not be sharded across replicas
|
|
with self.assertRaisesRegex(
|
|
ValueError, 'could not be sharded evenly '
|
|
'across the sync replicas'):
|
|
distributed_training_utils_v1.get_input_params(
|
|
distribution, 63, steps=1, batch_size=None)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calculating_input_params_no_steps_with_batch_size(
|
|
self, distribution):
|
|
# Calculate the per_replica_batch_size scaling factor for strategies
|
|
# that use per_core_batch_size
|
|
replica_scale_factor = 1.0
|
|
if not distributed_training_utils.global_batch_size_supported(distribution):
|
|
replica_scale_factor = distribution.num_replicas_in_sync
|
|
|
|
with self.cached_session():
|
|
# Computed steps is correct for specified batch size
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 64, steps=None, batch_size=16)
|
|
self.assertEqual(batch_size, 16)
|
|
self.assertEqual(steps, 4 // replica_scale_factor)
|
|
|
|
# Computed steps is correct for specified batch size
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 64, steps=None, batch_size=32)
|
|
self.assertEqual(batch_size, 32)
|
|
self.assertEqual(steps, 2 // replica_scale_factor)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calculating_input_params_with_steps_with_batch_size(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
# No change to steps and batch size if both specified and feasible
|
|
steps, batch_size = distributed_training_utils_v1.get_input_params(
|
|
distribution, 64, steps=5, batch_size=3)
|
|
self.assertEqual(batch_size, 3)
|
|
self.assertEqual(steps, 5)
|
|
|
|
# Number of samples is less than global batch size * steps
|
|
with self.assertRaisesRegex(ValueError, 'less than samples required'):
|
|
distributed_training_utils_v1.get_input_params(
|
|
distribution, 64, steps=10, batch_size=13)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calling_model_with_numpy_arrays(self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
metrics = ['mae']
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
inputs = np.zeros((64, 3), dtype=np.float32)
|
|
targets = np.zeros((64, 4), dtype=np.float32)
|
|
|
|
# Call fit with validation data
|
|
model.fit(
|
|
inputs,
|
|
targets,
|
|
epochs=1,
|
|
batch_size=2,
|
|
verbose=0,
|
|
validation_data=(inputs, targets))
|
|
|
|
# TODO(anjalisridhar): We need tests for when the batch size and steps
|
|
# are smaller and results in a 0 batch_size and steps value.
|
|
model.evaluate(inputs, targets)
|
|
model.evaluate(inputs, targets, batch_size=8)
|
|
|
|
model.predict(inputs)
|
|
model.predict(inputs, batch_size=8)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calling_model_with_mixed_precision(self, distribution):
|
|
if isinstance(distribution,
|
|
(parameter_server_strategy.ParameterServerStrategyV1,
|
|
parameter_server_strategy_v2.ParameterServerStrategyV2,
|
|
central_storage_strategy.CentralStorageStrategy,
|
|
central_storage_strategy.CentralStorageStrategyV1)):
|
|
self.skipTest('b/152097775')
|
|
if _is_tpu_strategy(distribution):
|
|
policy_name = 'mixed_bfloat16'
|
|
else:
|
|
policy_name = 'mixed_float16'
|
|
with self.cached_session(), \
|
|
distribution.scope(), \
|
|
policy.policy_scope(policy_name):
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
x = keras.layers.Input(shape=(3,), name='input')
|
|
y = keras.layers.Dense(4, name='dense')(x)
|
|
y = keras.layers.Activation('softmax', dtype='float32')(y)
|
|
model = keras.Model(x, y)
|
|
loss = 'mse'
|
|
metrics = ['mae']
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
# We need to pass float32 since TPUs do not support float64, even though
|
|
# these arrays will immediately be casted to bfloat16 on TPUs. We also
|
|
# cannot pass bfloat16, as Numpy does not support it.
|
|
inputs = np.zeros((64, 3), dtype='float32')
|
|
targets = np.zeros((64, 4), dtype='float32')
|
|
|
|
model.fit(
|
|
inputs,
|
|
targets,
|
|
epochs=1,
|
|
batch_size=2,
|
|
verbose=0,
|
|
validation_data=(inputs, targets))
|
|
|
|
model.evaluate(inputs, targets)
|
|
model.evaluate(inputs, targets, batch_size=8)
|
|
|
|
model.predict(inputs)
|
|
model.predict(inputs, batch_size=8)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_operator_overload_mixed_precision(self, distribution):
|
|
# Regression test that tests a fixed bug does not reoccur. Adding an
|
|
# AutoCastVariable to a tensor on a TPU, where the variable was the LHS of
|
|
# the '+' operator, used to cause the gradient w.r.t. the variable to be
|
|
# None.
|
|
if isinstance(distribution,
|
|
(parameter_server_strategy.ParameterServerStrategyV1,
|
|
parameter_server_strategy_v2.ParameterServerStrategyV2,
|
|
central_storage_strategy.CentralStorageStrategy,
|
|
central_storage_strategy.CentralStorageStrategyV1)):
|
|
self.skipTest('b/152097775')
|
|
|
|
if _is_tpu_strategy(distribution):
|
|
policy_name = 'mixed_bfloat16'
|
|
else:
|
|
policy_name = 'mixed_float16'
|
|
|
|
class MyLayer(keras.layers.Layer):
|
|
|
|
def build(self, _):
|
|
self.v1 = self.add_weight('v', ())
|
|
self.v2 = self.add_weight('v', ())
|
|
|
|
def call(self, inp):
|
|
inp += self.v1
|
|
return self.v2 + inp
|
|
|
|
with self.cached_session(), distribution.scope():
|
|
layer = MyLayer(dtype=policy.Policy(policy_name))
|
|
def run_fn():
|
|
x = np.array([1.])
|
|
with backprop.GradientTape() as tape:
|
|
y = layer(x)
|
|
grad_v1, grad_v2 = tape.gradient(y, [layer.v1, layer.v2])
|
|
return grad_v1, grad_v2
|
|
if context.executing_eagerly():
|
|
run_fn = def_function.function(run_fn)
|
|
|
|
grad_v1, grad_v2 = distribution.run(run_fn)
|
|
self.assertIsNotNone(grad_v1)
|
|
self.assertIsNotNone(grad_v2)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=[strategy_combinations.one_device_strategy],
|
|
mode=['graph', 'eager']))
|
|
def test_optimizer_in_cross_replica_context_raises_error(self, distribution):
|
|
|
|
with self.cached_session(), distribution.scope():
|
|
model = keras.models.Sequential([keras.layers.Dense(1)])
|
|
x = np.array([[1.]])
|
|
with backprop.GradientTape() as tape:
|
|
y = model(x)
|
|
gradients = tape.gradient(y, model.trainable_variables)
|
|
optimizer = gradient_descent_keras.SGD()
|
|
|
|
with self.assertRaisesRegex(RuntimeError,
|
|
'cannot be called in cross-replica context'):
|
|
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calling_model_with_nested_numpy_arrays(self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(learning_rate=0.001)
|
|
model = multi_input_output_model()
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
input_a_np = np.asarray(np.random.random((64, 3)), dtype=np.float32)
|
|
input_b_np = np.asarray(np.random.random((64, 5)), dtype=np.float32)
|
|
inputs = [input_a_np, input_b_np]
|
|
|
|
output_d_np = np.asarray(np.random.random((64, 7)), dtype=np.float32)
|
|
output_e_np = np.asarray(np.random.random((64, 7)), dtype=np.float32)
|
|
targets = [output_d_np, output_e_np]
|
|
|
|
# Call fit with validation data
|
|
model.fit(inputs, targets, epochs=1, batch_size=8, verbose=0)
|
|
|
|
# TODO(anjalisridhar): We need tests for when the batch size and steps are
|
|
# smaller and results in a 0 batch_size and steps value.
|
|
model.evaluate(inputs, targets)
|
|
model.evaluate(inputs, targets, batch_size=8)
|
|
|
|
model.predict(inputs)
|
|
model.predict(inputs, batch_size=8)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=strategies_minus_tpu, mode=['graph', 'eager']) +
|
|
combinations.combine(
|
|
distribution=multi_worker_mirrored_strategies, mode=['eager']))
|
|
def test_numpy_with_sample_weights(self, distribution):
|
|
with self.cached_session(), distribution.scope():
|
|
model = get_sample_weights_model()
|
|
optimizer = rmsprop.RMSPropOptimizer(learning_rate=0.001)
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
inputs = np.array([[0], [1], [2], [3]], np.float32)
|
|
targets = np.array([[2], [4], [6], [8]], np.float32)
|
|
sample_weights = np.array([0.25, 0.5, 0.75, 1], np.float32)
|
|
|
|
result = model.evaluate(
|
|
inputs,
|
|
targets,
|
|
batch_size=2,
|
|
sample_weight=sample_weights,
|
|
verbose=1)
|
|
# The per sample loss is multipled by the corresponding sample weight. The
|
|
# average of these weighted losses is the return value of the `evaluate`
|
|
# call. For example, in the test above the average weighted loss is
|
|
# calculated in the following manner:
|
|
# batch_1 = (((2-0)^2) * 0.25 + ((4-1)^2) * 0.5) / 2 = 5.5 / 2 = 2.75
|
|
# batch_2 = (((6-2)^2 * 0.75) + ((8-3)^2 * 1)) / 2 = 37 / 2 = 18.5
|
|
# final result = (batch_1 + batch_2) / 2 = 10.625.
|
|
# The first time we divide by number of input samples and the second time
|
|
# we divide by number of steps/batches that the loss is aggregated over.
|
|
self.assertAllClose(result, 10.625)
|
|
|
|
# We now test without passing sample_weights:
|
|
# batch_1 = ((2-0)^2) + ((4-1)^2) / 2 = 13 / 2 = 6.5
|
|
# batch_2 = ((6-2)^2) + ((8-3)^2) / 2 = 41 / 2 = 20.5
|
|
# final result = (batch_1 + batch_2) / 2 = 27 / 2 = 13.5
|
|
result = model.evaluate(inputs, targets, batch_size=2, verbose=1)
|
|
self.assertAllClose(result, 13.5)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_flatten_predict_outputs(self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
model = multi_input_output_model()
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(learning_rate=0.001)
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
# We take 6 input samples with each input having a dimension of 3 or 5.
|
|
input_a_np = np.asarray(np.random.random((6, 3)), dtype=np.float32)
|
|
input_b_np = np.asarray(np.random.random((6, 5)), dtype=np.float32)
|
|
inputs = [input_a_np, input_b_np]
|
|
|
|
outs = model.predict(inputs)
|
|
# `predict` a list that is equal in length to the number of model outputs.
|
|
# In this test our model has two outputs and each element of `outs`
|
|
# corresponds to all the samples of one of the model outputs.
|
|
self.assertLen(outs, 2)
|
|
# Each of the output samples have a dimension of 7. We should process all
|
|
# the available input samples(6).
|
|
self.assertAllEqual([6, 7], outs[0].shape)
|
|
self.assertAllEqual([6, 7], outs[1].shape)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(tpu_strategy_combinations_graph_only(),
|
|
combinations.combine(batch_size=[4, 6])))
|
|
def test_evaluate_with_partial_batch(self, distribution, batch_size):
|
|
with self.cached_session():
|
|
optimizer = gradient_descent.GradientDescentOptimizer(0.001)
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
|
|
with distribution.scope():
|
|
model_with_ds_strategy = get_model()
|
|
model_with_ds_strategy.compile(optimizer, loss, metrics=metrics)
|
|
|
|
cpu_model = get_model()
|
|
cpu_model.compile(optimizer, loss, metrics=metrics)
|
|
|
|
x = np.random.random((10, 3)).astype('float32')
|
|
y = np.random.random((10, 4)).astype('float32')
|
|
|
|
# As sample size is 10, we batch by 4 so that the last batch is
|
|
# a partial batch. Also `evaluate()` using numpy array as inputs without
|
|
# distribution strategy uses entire sample as a single batch. As so,
|
|
# we remove parameters `batch_size` and `steps`.
|
|
cpu_model.set_weights(model_with_ds_strategy.get_weights())
|
|
evaluate_ground_truth = cpu_model.evaluate(x, y)
|
|
|
|
# We don't compare the loss as loss is currently not computed as metric
|
|
# in Keras, the loss value is inaccurate for last partial batch due to
|
|
# more weights for the last batch samples.
|
|
steps = np.ceil(10.0 / batch_size)
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.evaluate(
|
|
x, y, batch_size=batch_size, steps=steps)[1:],
|
|
evaluate_ground_truth[1:],
|
|
atol=1e-5,
|
|
rtol=1e-5)
|
|
# Test that `steps` is inferred correctly when final partial batch exists.
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.evaluate(x, y, batch_size=batch_size)[1:],
|
|
evaluate_ground_truth[1:],
|
|
atol=1e-5,
|
|
rtol=1e-5)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
tpu_strategy_combinations_graph_only()))
|
|
def test_predict_with_partial_batch(self, distribution):
|
|
with self.cached_session():
|
|
optimizer = gradient_descent.GradientDescentOptimizer(0.001)
|
|
loss = 'mse'
|
|
|
|
with distribution.scope():
|
|
model_with_ds_strategy = get_model()
|
|
model_with_ds_strategy.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
cpu_model = get_model()
|
|
cpu_model.compile(optimizer, loss)
|
|
|
|
inputs = np.random.random((10, 3)).astype(np.float32)
|
|
|
|
# As sample size is 10, we batch by 4 so that the last batch is
|
|
# a partial batch. Also `predict()` using numpy array as inputs without
|
|
# distribution strategy uses entire sample as a single batch. As so,
|
|
# we remove parameters `batch_size` and `steps`.
|
|
cpu_model.set_weights(model_with_ds_strategy.get_weights())
|
|
predict_ground_truth = cpu_model.predict(inputs)
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.predict(inputs, batch_size=4, steps=3),
|
|
predict_ground_truth,
|
|
atol=1e-5,
|
|
rtol=1e-5)
|
|
# Test that `steps` is inferred correctly when final partial batch exists.
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.predict(inputs, batch_size=4),
|
|
predict_ground_truth,
|
|
atol=1e-5,
|
|
rtol=1e-5)
|
|
|
|
@ds_combinations.generate(tpu_strategy_combinations_graph_only())
|
|
def test_no_target_model(self, distribution):
|
|
with self.cached_session():
|
|
optimizer = gradient_descent.GradientDescentOptimizer(0.001)
|
|
|
|
class MyLayer(keras.layers.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
self.add_loss(math_ops.reduce_sum(inputs), inputs=True)
|
|
return inputs
|
|
|
|
with distribution.scope():
|
|
model = keras.models.Sequential()
|
|
model.add(
|
|
keras.layers.Dense(16, activation='relu', input_shape=_INPUT_SIZE))
|
|
model.add(MyLayer())
|
|
model.add(keras.layers.Dense(_NUM_CLASS, activation='softmax'))
|
|
|
|
model.compile(optimizer)
|
|
inputs = np.zeros((20, 10), np.float32)
|
|
|
|
model.fit(inputs, epochs=1, steps_per_epoch=2)
|
|
model.predict(inputs, steps=1)
|
|
model.evaluate(inputs, steps=1)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
tpu_strategy_combinations_graph_only()))
|
|
def test_predict_multi_output_model_with_partial_batch(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
optimizer = gradient_descent.GradientDescentOptimizer(0.001)
|
|
loss = 'mse'
|
|
|
|
with distribution.scope():
|
|
model_with_ds_strategy = simple_multi_inputs_multi_outputs_model()
|
|
model_with_ds_strategy.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
cpu_model = simple_multi_inputs_multi_outputs_model()
|
|
cpu_model.compile(optimizer, loss)
|
|
|
|
input_data, _ = get_multi_inputs_multi_outputs_data()
|
|
input_dict = {
|
|
'input_a': input_data['input_a'],
|
|
'input_b': input_data['input_b'],
|
|
}
|
|
|
|
# As sample size is 200, we batch by 18 so that the last batch is
|
|
# a partial batch. Also `fit()` using numpy array as inputs without
|
|
# distribution strategy uses entire sample as a single batch. As so,
|
|
# we remove parameters `batch_size` and `steps`.
|
|
cpu_model.set_weights(model_with_ds_strategy.get_weights())
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.predict(input_dict, batch_size=18, steps=12),
|
|
cpu_model.predict(input_dict),
|
|
atol=1e-4,
|
|
rtol=1e-4)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_gradients_are_none(self, distribution):
|
|
|
|
if not context.executing_eagerly():
|
|
self.skipTest('None gradients are not supported in graph mode')
|
|
|
|
class DenseWithExtraWeight(keras.layers.Dense):
|
|
|
|
def build(self, input_shape):
|
|
# Gradients w.r.t. extra_weights are None
|
|
self.extra_weight_1 = self.add_weight('extra_weight_1', shape=(),
|
|
initializer='ones')
|
|
super(DenseWithExtraWeight, self).build(input_shape)
|
|
self.extra_weight_2 = self.add_weight('extra_weight_2', shape=(),
|
|
initializer='ones')
|
|
|
|
with distribution.scope():
|
|
model = keras.Sequential([DenseWithExtraWeight(4, input_shape=(4,))])
|
|
model.compile('adam', 'mse')
|
|
|
|
inputs = np.random.normal(size=(64, 4))
|
|
targets = np.random.normal(size=(64, 4))
|
|
old_kernel = model.get_weights()[1]
|
|
model.fit(inputs, targets)
|
|
new_kernel = model.get_weights()[1]
|
|
self.assertNotAllEqual(old_kernel, new_kernel)
|
|
|
|
|
|
class TestDistributionStrategyWithDatasets(test.TestCase,
|
|
parameterized.TestCase):
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_calling_model_on_same_dataset(self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
dataset = get_dataset(distribution)
|
|
|
|
# Call fit with validation data
|
|
model.fit(
|
|
dataset,
|
|
epochs=1,
|
|
steps_per_epoch=2,
|
|
verbose=0,
|
|
validation_data=dataset,
|
|
validation_steps=2)
|
|
model.fit(
|
|
dataset,
|
|
epochs=1,
|
|
steps_per_epoch=2,
|
|
verbose=0,
|
|
validation_data=dataset,
|
|
validation_steps=2)
|
|
model.predict(get_predict_dataset(distribution), steps=2)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_model_interleaved_eval_same_as_direct_eval(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
user_controlled_model = get_model()
|
|
user_controlled_model.compile(
|
|
optimizer_fn(0.001),
|
|
loss='mse',
|
|
metrics=['mae', keras.metrics.CategoricalAccuracy()])
|
|
|
|
interleaved_model = get_model()
|
|
interleaved_model.set_weights(user_controlled_model.get_weights())
|
|
interleaved_model.compile(
|
|
optimizer_fn(0.001),
|
|
loss='mse',
|
|
metrics=['mae', keras.metrics.CategoricalAccuracy()])
|
|
|
|
dataset = get_dataset(distribution)
|
|
|
|
# Call fit with validation interleaved
|
|
interleaved_output = interleaved_model.fit(
|
|
dataset,
|
|
epochs=2,
|
|
steps_per_epoch=2,
|
|
verbose=1,
|
|
validation_data=dataset,
|
|
validation_steps=2,
|
|
shuffle=False)
|
|
|
|
# Manually control the validation running after each epoch.
|
|
user_controlled_output = []
|
|
for _ in range(2):
|
|
user_controlled_model.fit(
|
|
dataset, epochs=1, steps_per_epoch=2, verbose=1, shuffle=False)
|
|
user_controlled_output.append(
|
|
user_controlled_model.evaluate(dataset, steps=2))
|
|
|
|
self.assertEqual(interleaved_output.history['val_loss'],
|
|
[x[0] for x in user_controlled_output])
|
|
val_mean_absolute_error = interleaved_output.history.get(
|
|
'val_mean_absolute_error')
|
|
if not val_mean_absolute_error:
|
|
# The name of the metric changed in TF2.0
|
|
val_mean_absolute_error = interleaved_output.history['val_mae']
|
|
self.assertEqual(val_mean_absolute_error,
|
|
[x[1] for x in user_controlled_output])
|
|
self.assertEqual(interleaved_output.history['val_categorical_accuracy'],
|
|
[x[2] for x in user_controlled_output])
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_fit_with_tuple_and_dict_dataset_inputs(self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(learning_rate=0.001)
|
|
model = multi_input_output_model()
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
input_a_np = np.random.random((10, 3)).astype('float32')
|
|
input_b_np = np.random.random((10, 5)).astype('float32')
|
|
output_d_np = np.random.random((10, 7)).astype('float32')
|
|
output_e_np = np.random.random((10, 7)).astype('float32')
|
|
|
|
# Test with tuples
|
|
dataset_tuple = dataset_ops.Dataset.from_tensor_slices(
|
|
((input_a_np, input_b_np), (output_d_np, output_e_np)))
|
|
dataset_tuple = dataset_tuple.repeat(100)
|
|
dataset_tuple = dataset_tuple.batch(10)
|
|
|
|
model.fit(dataset_tuple, epochs=1, steps_per_epoch=2, verbose=1)
|
|
|
|
# Test with dict
|
|
dataset_dict = dataset_ops.Dataset.from_tensor_slices(({
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, (output_d_np, output_e_np)))
|
|
dataset_dict = dataset_dict.repeat(100)
|
|
dataset_dict = dataset_dict.batch(10)
|
|
|
|
model.fit(dataset_dict, epochs=1, steps_per_epoch=2, verbose=1)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_fit_with_dictionary_in_the_dataset_b135161171(
|
|
self, distribution):
|
|
|
|
if _is_tpu_strategy(distribution):
|
|
self.skipTest('b/142805125')
|
|
|
|
def custom_loss(predict, label, weight):
|
|
bce = keras.losses.binary_crossentropy(label, predict)
|
|
return math_ops.reduce_mean(bce * weight)
|
|
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
input_img = keras.layers.Input([64, 64, 3], name='img')
|
|
input_lbl = keras.layers.Input([64, 64, 1], name='lbl')
|
|
input_weight = keras.layers.Input([64, 64], name='weight')
|
|
predict = keras.layers.Conv2D(2, [1, 1], padding='same')(input_img)
|
|
loss_lambda = keras.layers.Lambda(
|
|
lambda x: custom_loss(*x), name='my_loss')
|
|
my_loss = loss_lambda([predict, input_lbl, input_weight])
|
|
model = keras.models.Model(
|
|
inputs=[input_img, input_lbl, input_weight],
|
|
outputs=[predict, my_loss])
|
|
model.add_loss(model.get_layer('my_loss').output)
|
|
model.compile(
|
|
optimizer='adam')
|
|
|
|
if context.executing_eagerly():
|
|
|
|
def map_fn(img, lbl, weight):
|
|
inputs = {'img': img, 'lbl': lbl, 'weight': weight}
|
|
return (inputs,)
|
|
else:
|
|
|
|
def map_fn(img, lbl, weight):
|
|
inputs = {'img': img, 'lbl': lbl, 'weight': weight}
|
|
return inputs, {}
|
|
|
|
fake_imgs = np.ones([50, 64, 64, 3], dtype=np.float32)
|
|
fake_lbls = np.ones([50, 64, 64, 1], dtype=np.float32)
|
|
fake_weights = np.ones([50, 64, 64], dtype=np.float32)
|
|
|
|
data = dataset_ops.Dataset.from_tensor_slices(
|
|
(fake_imgs, fake_lbls, fake_weights)).map(map_fn).batch(10)
|
|
|
|
model.fit(data)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_fit_eval_and_predict_methods_on_dataset_without_steps(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
inputs = np.zeros((1000, 3), dtype=np.float32)
|
|
targets = np.zeros((1000, 4), dtype=np.float32)
|
|
# steps/steps_per_epoch are calculated when using numpy arrays as
|
|
# input data.
|
|
fit_with_numpy = model.fit(
|
|
inputs, targets, epochs=1, batch_size=10).history
|
|
eval_with_numpy = model.evaluate(inputs, targets, batch_size=10)
|
|
predict_with_numpy = model.predict(inputs, batch_size=10)
|
|
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
|
|
dataset = dataset.batch(10, drop_remainder=True)
|
|
fit_with_ds = model.fit(dataset, epochs=1).history
|
|
eval_with_ds = model.evaluate(dataset)
|
|
predict_dataset = dataset_ops.Dataset.from_tensor_slices(inputs)
|
|
predict_dataset = predict_dataset.batch(10, drop_remainder=True)
|
|
predict_with_ds = model.predict(predict_dataset)
|
|
self.assertAllClose(fit_with_numpy, fit_with_ds, atol=1e-4, rtol=1e-4)
|
|
self.assertAllClose(eval_with_numpy, eval_with_ds, atol=1e-4, rtol=1e-4)
|
|
self.assertAllClose(
|
|
predict_with_numpy, predict_with_ds, atol=1e-4, rtol=1e-4)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_predict_on_dataset_with_unknown_cardinality_without_steps(
|
|
self, distribution, mode):
|
|
|
|
if mode == 'graph' and _is_tpu_strategy(distribution):
|
|
self.skipTest('partial batch not supported with TPU in graph mode.')
|
|
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
model.compile(optimizer, loss, metrics=metrics)
|
|
|
|
inputs = np.zeros((20, 3), dtype=np.float32)
|
|
# steps/steps_per_epoch are calculated when using numpy arrays as
|
|
# input data.
|
|
predict_with_numpy = model.predict(inputs, batch_size=10)
|
|
|
|
predict_dataset = convert_numpy_to_dataset_with_unknown_cardinality(
|
|
inputs)
|
|
|
|
self.assertEqual(
|
|
keras.backend.get_value(cardinality.cardinality(predict_dataset)),
|
|
cardinality.UNKNOWN)
|
|
|
|
predict_with_ds = model.predict(predict_dataset)
|
|
self.assertAllClose(
|
|
predict_with_numpy, predict_with_ds, atol=1e-4, rtol=1e-4)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_on_dataset_with_unknown_cardinality_without_steps(
|
|
self, distribution, mode):
|
|
# TODO(b/155867206): Investigate why this test occasionally segfaults on TPU
|
|
# in eager mode.
|
|
if mode == 'eager' and _is_tpu_strategy(distribution):
|
|
self.skipTest('caused segfault with TPU in eager mode.')
|
|
|
|
if mode == 'graph' and _is_tpu_strategy(distribution):
|
|
self.skipTest('partial batch not supported with TPU in graph mode.')
|
|
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
inputs = np.zeros((100, 3), dtype=np.float32)
|
|
targets = np.zeros((100, 4), dtype=np.float32)
|
|
# steps/steps_per_epoch are calculated when using numpy arrays as
|
|
# input data.
|
|
fit_with_numpy = model.fit(
|
|
inputs, targets, epochs=1, batch_size=10).history
|
|
fit_with_numpy_multiple_epochs = model.fit(
|
|
inputs, targets, epochs=2, batch_size=10).history
|
|
eval_with_numpy = model.evaluate(inputs, targets, batch_size=10)
|
|
predict_with_numpy = model.predict(inputs, batch_size=10)
|
|
|
|
dataset = convert_numpy_to_dataset_with_unknown_cardinality(
|
|
inputs, targets)
|
|
predict_dataset = convert_numpy_to_dataset_with_unknown_cardinality(
|
|
inputs)
|
|
|
|
self.assertEqual(
|
|
keras.backend.get_value(cardinality.cardinality(dataset)),
|
|
cardinality.UNKNOWN)
|
|
self.assertEqual(
|
|
keras.backend.get_value(cardinality.cardinality(predict_dataset)),
|
|
cardinality.UNKNOWN)
|
|
|
|
eval_with_ds = model.evaluate(dataset)
|
|
predict_with_ds = model.predict(predict_dataset)
|
|
self.assertAllClose(eval_with_numpy, eval_with_ds, atol=1e-4, rtol=1e-4)
|
|
self.assertAllClose(
|
|
predict_with_numpy, predict_with_ds, atol=1e-4, rtol=1e-4)
|
|
|
|
fit_with_ds = model.fit(dataset, epochs=1).history
|
|
fit_with_ds_multiple_epochs = model.fit(dataset, epochs=2).history
|
|
self.assertAllClose(fit_with_numpy, fit_with_ds, atol=1e-4, rtol=1e-4)
|
|
self.assertAllClose(
|
|
fit_with_numpy_multiple_epochs,
|
|
fit_with_ds_multiple_epochs,
|
|
atol=1e-4,
|
|
rtol=1e-4)
|
|
|
|
@ds_combinations.generate(tpu_strategy_combinations_graph_only())
|
|
def test_on_dataset_with_unknown_cardinality(self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
model = get_model()
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
model.compile(
|
|
gradient_descent.GradientDescentOptimizer(0.001),
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
inputs = np.zeros((1000, 3), dtype=np.float32)
|
|
targets = np.zeros((1000, 4), dtype=np.float32)
|
|
# steps/steps_per_epoch are calculated when using numpy arrays as
|
|
# input data.
|
|
eval_with_numpy = model.evaluate(inputs, targets, batch_size=10)
|
|
predict_with_numpy = model.predict(inputs, batch_size=10)
|
|
|
|
dataset = convert_numpy_to_dataset_with_unknown_cardinality(
|
|
inputs, targets)
|
|
predict_dataset = convert_numpy_to_dataset_with_unknown_cardinality(
|
|
inputs)
|
|
|
|
self.assertEqual(
|
|
keras.backend.get_value(cardinality.cardinality(dataset)),
|
|
cardinality.UNKNOWN)
|
|
self.assertEqual(
|
|
keras.backend.get_value(cardinality.cardinality(predict_dataset)),
|
|
cardinality.UNKNOWN)
|
|
|
|
eval_with_ds = model.evaluate(dataset, steps=100)
|
|
predict_with_ds = model.predict(predict_dataset, steps=100)
|
|
self.assertAllClose(eval_with_numpy, eval_with_ds, atol=1e-4, rtol=1e-4)
|
|
self.assertAllClose(
|
|
predict_with_numpy, predict_with_ds, atol=1e-4, rtol=1e-4)
|
|
|
|
with self.assertRaisesRegex(ValueError,
|
|
'Number of steps could not be inferred'):
|
|
model.fit(dataset, epochs=1)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_fit_eval_and_predict_methods_on_dataset(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
dataset = get_dataset(distribution)
|
|
|
|
model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=1)
|
|
model.evaluate(dataset, steps=2, verbose=1)
|
|
model.predict(get_predict_dataset(distribution), steps=2)
|
|
|
|
@ds_combinations.generate(strategy_and_optimizer_combinations())
|
|
def test_fit_eval_and_predict_with_optimizer(self, distribution, optimizer):
|
|
with self.cached_session():
|
|
|
|
with distribution.scope():
|
|
|
|
model = get_model()
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer(),
|
|
loss)
|
|
|
|
dataset = get_dataset(distribution)
|
|
|
|
model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=1)
|
|
model.evaluate(dataset, steps=2, verbose=1)
|
|
model.predict(get_predict_dataset(distribution), steps=2)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=[
|
|
strategy_combinations.mirrored_strategy_with_gpu_and_cpu,
|
|
strategy_combinations.one_device_strategy
|
|
],
|
|
mode=['graph', 'eager']))
|
|
def test_dataset_wrong_input_shape(self, distribution, mode):
|
|
if mode == 'graph':
|
|
self.skipTest(
|
|
'TODO(b/120943676, b/120957836): Re-enable for graph once the '
|
|
'validation code is restored.')
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(learning_rate=0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
# Wrong input shape
|
|
inputs = np.zeros((10, 5), dtype=np.float32)
|
|
targets = np.zeros((10, 4), dtype=np.float32)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
|
|
dataset = dataset.repeat(100)
|
|
dataset = dataset.batch(10)
|
|
|
|
with self.assertRaisesRegex(ValueError, 'is incompatible with'):
|
|
model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=0)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=[
|
|
strategy_combinations.mirrored_strategy_with_gpu_and_cpu
|
|
],
|
|
mode=['graph', 'eager']))
|
|
def test_dataset_external_batch_input_validation(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(learning_rate=0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
# Batching is done outside tf.data's `batch`
|
|
inputs = np.zeros((100, 10, 3), dtype=np.float32)
|
|
targets = np.zeros((100, 10, 4), dtype=np.float32)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
|
|
dataset = dataset.repeat(100)
|
|
|
|
model.fit(dataset, epochs=1, steps_per_epoch=2, verbose=1)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=[
|
|
strategy_combinations.mirrored_strategy_with_gpu_and_cpu,
|
|
strategy_combinations.mirrored_strategy_with_two_gpus
|
|
],
|
|
mode=['graph', 'eager']))
|
|
def test_learning_phase_value(self, distribution):
|
|
# TODO(anjalisridhar): Modify this test to use Lambdas since we can compare
|
|
# meaningful values. Currently we don't pass the learning phase if the
|
|
# Lambda layer uses the learning phase.
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
x = keras.layers.Input(shape=(1,), name='input')
|
|
y = keras.layers.Dense(1, kernel_initializer='ones')(x)
|
|
z = keras.layers.Dropout(0.9999)(y)
|
|
model = keras.Model(x, z)
|
|
initial_weights = model.get_weights()
|
|
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.005)
|
|
loss = 'mse'
|
|
metrics = ['acc']
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics)
|
|
|
|
batch_size = 8
|
|
if isinstance(distribution, (mirrored_strategy.MirroredStrategy,
|
|
mirrored_strategy.MirroredStrategyV1)):
|
|
# MirroredStrategy uses global batch size.
|
|
batch_size = 8 * distribution.num_replicas_in_sync
|
|
|
|
inputs = np.ones((10, 1), dtype=np.float32)
|
|
targets = np.ones((10, 1), dtype=np.float32)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((inputs, targets))
|
|
dataset = dataset.repeat().batch(batch_size)
|
|
hist = model.fit(dataset, epochs=1, steps_per_epoch=20, verbose=1)
|
|
self.assertAlmostEqual(hist.history['acc'][0], 0, 0)
|
|
|
|
with distribution.scope():
|
|
model.set_weights(initial_weights)
|
|
# TODO(psv/anjalisridhar): Enable these lines after we fix b/117431185.
|
|
# evaluate_output = model.evaluate(dataset, steps=20)
|
|
# self.assertAlmostEqual(evaluate_output[1], 1, 0)
|
|
|
|
inputs = np.ones((10, 1), dtype=np.float32)
|
|
predict_dataset = dataset_ops.Dataset.from_tensor_slices(inputs)
|
|
|
|
predict_dataset = predict_dataset.repeat().batch(batch_size)
|
|
output = model.predict(predict_dataset, steps=10)
|
|
# `predict` runs for 10 steps
|
|
ref_output = np.ones((160, 1), dtype=np.float32)
|
|
self.assertArrayNear(output, ref_output, 1e-1)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def testOptimizerWithCallbacks(self, distribution):
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
model = get_model()
|
|
optimizer = gradient_descent_keras.SGD(0.01)
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
dataset = get_dataset(distribution)
|
|
|
|
def schedule(_):
|
|
return 0.001
|
|
|
|
model.fit(
|
|
dataset,
|
|
epochs=1,
|
|
steps_per_epoch=2,
|
|
verbose=0,
|
|
callbacks=[keras.callbacks.LearningRateScheduler(schedule)])
|
|
self.assertAllClose(0.001, keras.backend.get_value(model.optimizer.lr))
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(tpu_strategy_combinations_graph_only(),
|
|
combinations.combine(batch_size=[4, 6])))
|
|
def test_evaluate_with_dataset_with_partial_batch(self, distribution,
|
|
batch_size):
|
|
with self.cached_session():
|
|
optimizer = gradient_descent.GradientDescentOptimizer(0.001)
|
|
loss = 'mse'
|
|
metrics = ['mae', keras.metrics.CategoricalAccuracy()]
|
|
|
|
with distribution.scope():
|
|
model_with_ds_strategy = get_model()
|
|
model_with_ds_strategy.compile(optimizer, loss, metrics=metrics)
|
|
|
|
cpu_model = get_model()
|
|
cpu_model.compile(optimizer, loss, metrics=metrics)
|
|
|
|
x = np.random.random((10, 3)).astype('float32')
|
|
y = np.random.random((10, 4)).astype('float32')
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((x, y))
|
|
|
|
# As sample size is 10, we make the last batch a partial batch.
|
|
cpu_model.set_weights(model_with_ds_strategy.get_weights())
|
|
dataset_with_partial_batch = dataset.batch(batch_size)
|
|
|
|
# We don't compare the loss as loss is currently not computed as metric
|
|
# in Keras, the loss value is inaccurate for last partial batch due to
|
|
# more weights for the last batch samples.
|
|
steps = np.ceil(10.0 / batch_size)
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.evaluate(
|
|
dataset_with_partial_batch, steps=steps)[1:],
|
|
cpu_model.evaluate(dataset_with_partial_batch, steps=steps)[1:],
|
|
atol=1e-5,
|
|
rtol=1e-5)
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.evaluate(dataset_with_partial_batch)[1:],
|
|
cpu_model.evaluate(dataset_with_partial_batch)[1:],
|
|
atol=1e-5,
|
|
rtol=1e-5)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
tpu_strategy_combinations_graph_only()))
|
|
def test_predict_with_dataset_with_partial_batch(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
optimizer = gradient_descent.GradientDescentOptimizer(0.001)
|
|
loss = 'mse'
|
|
|
|
with distribution.scope():
|
|
model_with_ds_strategy = get_model()
|
|
model_with_ds_strategy.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
cpu_model = get_model()
|
|
cpu_model.compile(optimizer, loss)
|
|
|
|
inputs = np.random.random((10, 3)).astype(np.float32)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((inputs))
|
|
|
|
# As sample size is 10, we batch by 4 so that the last batch is
|
|
# a partial batch.
|
|
dataset_with_partial_batch = dataset.batch(4)
|
|
cpu_model.set_weights(model_with_ds_strategy.get_weights())
|
|
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.predict(dataset_with_partial_batch, steps=3),
|
|
cpu_model.predict(dataset_with_partial_batch, steps=3),
|
|
atol=1e-5,
|
|
rtol=1e-5)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
tpu_strategy_combinations_graph_only()))
|
|
def test_predict_multi_output_model_with_dataset_with_partial_batch(
|
|
self, distribution):
|
|
with self.cached_session():
|
|
optimizer = gradient_descent.GradientDescentOptimizer(0.001)
|
|
loss = 'mse'
|
|
|
|
with distribution.scope():
|
|
model_with_ds_strategy = simple_multi_inputs_multi_outputs_model()
|
|
model_with_ds_strategy.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
cpu_model = simple_multi_inputs_multi_outputs_model()
|
|
cpu_model.compile(optimizer, loss)
|
|
|
|
input_data, _ = get_multi_inputs_multi_outputs_data()
|
|
input_dict = {
|
|
'input_a': input_data['input_a'],
|
|
'input_b': input_data['input_b'],
|
|
}
|
|
|
|
dataset = dataset_ops.Dataset.from_tensor_slices(input_dict)
|
|
|
|
# As sample size is 200, we batch by 18 using 12 steps per epoch so
|
|
# that the last batch is a partial batch.
|
|
dataset_with_partial_batch = dataset.batch(18)
|
|
cpu_model.set_weights(model_with_ds_strategy.get_weights())
|
|
|
|
self.assertAllClose(
|
|
model_with_ds_strategy.predict(dataset_with_partial_batch, steps=12),
|
|
cpu_model.predict(dataset_with_partial_batch, steps=12),
|
|
atol=1e-4,
|
|
rtol=1e-4)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations_minus_default())
|
|
def test_match_model_input_matches_with_dataset_tensors(self, distribution):
|
|
|
|
def _create_model_input_output_tensors():
|
|
input_a = keras.layers.Input(shape=(16,), name='z_input_sorted_last')
|
|
input_b = keras.layers.Input(shape=(32,), name='a_input_sorted_first')
|
|
intermediate_a = keras.layers.Dense(10)(input_a)
|
|
intermediate_b = keras.layers.Dense(10)(input_b)
|
|
merged = keras.layers.Add()([intermediate_a, intermediate_b])
|
|
output = keras.layers.Dense(2)(merged)
|
|
return input_a, input_b, output
|
|
|
|
input_dict = {
|
|
'z_input_sorted_last': np.random.rand(32, 16).astype(np.float32),
|
|
'a_input_sorted_first': np.random.rand(32, 32).astype(np.float32)
|
|
}
|
|
target = np.ones((32, 2), dtype=np.float32)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((input_dict, target))
|
|
dataset = dataset.batch(4, drop_remainder=True)
|
|
|
|
with self.cached_session():
|
|
with distribution.scope():
|
|
input_a, input_b, output = _create_model_input_output_tensors()
|
|
# `input_a`, which has input name that comes last in alphanumeric
|
|
# order, is the first input of the model input layers. If tensors
|
|
# from `input_dict` is blindly flattened and passed to model
|
|
# inputs incorrectly, this would result in `input_a` input layer
|
|
# matching with tensor `a_input_sorted_first` and would result in
|
|
# shape mismatch.
|
|
model_with_array_input = keras.models.Model(
|
|
inputs=[input_a, input_b], outputs=output)
|
|
model_with_array_input.compile('sgd', 'mse')
|
|
model_weights = model_with_array_input.get_weights()
|
|
model_with_array_input_fit = model_with_array_input.fit(
|
|
dataset, steps_per_epoch=1, epochs=1).history
|
|
|
|
input_a, input_b, output = _create_model_input_output_tensors()
|
|
model_with_dict_input = keras.models.Model(
|
|
inputs={
|
|
'z_input_sorted_last': input_a,
|
|
'a_input_sorted_first': input_b,
|
|
},
|
|
outputs=output)
|
|
model_with_dict_input.compile('sgd', 'mse')
|
|
model_with_dict_input.set_weights(model_weights)
|
|
model_with_dict_input_fit = model_with_dict_input.fit(
|
|
dataset, steps_per_epoch=1, epochs=1).history
|
|
self.assertAllClose(
|
|
model_with_dict_input_fit,
|
|
model_with_array_input_fit,
|
|
atol=1e-4,
|
|
rtol=1e-4)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=strategies_minus_tpu, mode=['graph', 'eager']) +
|
|
combinations.combine(
|
|
distribution=multi_worker_mirrored_strategies, mode=['eager']))
|
|
def test_dataset_with_sample_weights(self, distribution):
|
|
with self.cached_session(), distribution.scope():
|
|
model = get_sample_weights_model()
|
|
optimizer = rmsprop.RMSPropOptimizer(learning_rate=0.001)
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
inputs = np.array([[0], [1], [2], [3]], np.float32)
|
|
targets = np.array([[2], [4], [6], [8]], np.float32)
|
|
sample_weights = np.array([0.25, 0.5, 0.75, 1], np.float32)
|
|
ds = dataset_ops.Dataset.from_tensor_slices(
|
|
(inputs, targets, sample_weights)).batch(2)
|
|
result = model.evaluate(ds, verbose=1)
|
|
# The per sample loss is multipled by the corresponding sample weight. The
|
|
# average of these weighted losses is the return value of the `evaluate`
|
|
# call. For example, in the test above the average weighted loss is
|
|
# calculated in the following manner:
|
|
# batch_1 = (((2-0)^2) * 0.25 + ((4-1)^2) * 0.5) / 2 = 5.5 / 2 = 2.75
|
|
# batch_2 = (((6-2)^2 * 0.75) + ((8-3)^2 * 1)) / 2 = 37 / 2 = 18.5
|
|
# final result = (batch_1 + batch_2) / 2 = 10.625.
|
|
# The first time we divide by number of input samples and the second time
|
|
# we divide by number of steps/batches that the loss is aggregated over.
|
|
self.assertAllClose(result, 10.625)
|
|
|
|
# We now test without passing sample_weights:
|
|
# batch_1 = ((2-0)^2) + ((4-1)^2) / 2 = 13 / 2 = 6.5
|
|
# batch_2 = ((6-2)^2) + ((8-3)^2) / 2 = 41 / 2 = 20.5
|
|
# final result = (batch_1 + batch_2) / 2 = 27 / 2 = 13.5
|
|
ds = dataset_ops.Dataset.from_tensor_slices((inputs, targets)).batch(2)
|
|
result = model.evaluate(ds, verbose=1)
|
|
self.assertAllClose(result, 13.5)
|
|
|
|
|
|
def _is_tpu_strategy(strategy):
|
|
if isinstance(strategy,
|
|
(tpu_strategy.TPUStrategy, tpu_strategy.TPUStrategyV1)):
|
|
return True
|
|
return False
|
|
|
|
|
|
class TestDistributionStrategyWithDatasetsFile(test.TestCase,
|
|
parameterized.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestDistributionStrategyWithDatasetsFile, self).setUp()
|
|
self.input_file_name = os.path.join(self.get_temp_dir(), 'input.tfrecord')
|
|
inputs = np.zeros((20, 3), dtype=np.float32)
|
|
input_dataset = dataset_ops.Dataset.from_tensor_slices(inputs)
|
|
input_dataset = input_dataset.map(parsing_ops.serialize_tensor)
|
|
writer = writers.TFRecordWriter(self.input_file_name)
|
|
writer.write(input_dataset)
|
|
|
|
# TODO(wxinyi): add a multi-worker test for TPU
|
|
@ds_combinations.generate(multi_worker_strategy_combinations_eager_only())
|
|
def test_predict_on_dataset_shard_options_file_multi_worker_mirrored(
|
|
self, distribution, mode):
|
|
# This test is to verify if we successfully switch auto_shard_policy of a
|
|
# input dataset inside model.predict with MultiWorkerMirroredStrategy to
|
|
# AutoShardPolicy.DATA. Since there is only one input file for multiple
|
|
# workers, AutoShardPolicy.AUTO or AutoShardPolicy.FILE will lead to an
|
|
# error. However, since we switch to AutoShardPolicy.DATA in model.predict,
|
|
# no error is raised.
|
|
del mode
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
model.compile(optimizer, loss)
|
|
|
|
dataset = readers.TFRecordDataset(self.input_file_name)
|
|
dataset = dataset.map(lambda x: parsing_ops.parse_tensor(x, dtypes.float32))
|
|
|
|
dummy_op = lambda inp: True
|
|
|
|
dataset = dataset.filter(dummy_op).batch(8, drop_remainder=True)
|
|
|
|
options = dataset_ops.Options()
|
|
options.experimental_distribute.auto_shard_policy = \
|
|
distribute_options.AutoShardPolicy.FILE
|
|
dataset = dataset.with_options(options)
|
|
|
|
model.predict(dataset, steps=1)
|
|
|
|
|
|
class TestRegularizerLoss(test.TestCase, parameterized.TestCase):
|
|
|
|
class IdentityRegularizer(keras.regularizers.Regularizer):
|
|
|
|
def __call__(self, x):
|
|
return array_ops.identity(x)
|
|
|
|
class AddLayer(keras.layers.Layer):
|
|
|
|
def build(self, _):
|
|
self.v = self.add_weight(
|
|
'v', (),
|
|
initializer='ones',
|
|
regularizer=TestRegularizerLoss.IdentityRegularizer())
|
|
|
|
def call(self, inputs):
|
|
return inputs + self.v
|
|
|
|
@staticmethod
|
|
def loss_fn(_, y_pred):
|
|
return math_ops.reduce_mean(y_pred)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(all_strategy_combinations_minus_default()))
|
|
def test_regularizer_loss(self, distribution):
|
|
batch_size = 2
|
|
if not distributed_training_utils.global_batch_size_supported(distribution):
|
|
batch_size //= distribution.num_replicas_in_sync
|
|
|
|
# Given an input x, which is always 1, and variable v, this model computes
|
|
# Loss=x+v+regularizer_loss, where regularizer_loss=v and the variable is
|
|
# initialized to 1. Therefore, this model computes Loss=1+2v, and so the
|
|
# gradient dLoss/dv = 2. This gradient of 2 is averaged over all examples
|
|
# in a batch and then multiplied by the learning rate of 1. As a result,
|
|
# the model update for one batch should subtract 2 from v, resulting in v
|
|
# being -1. If the regularizer loss is not scaled correctly by number of
|
|
# replicas, the variable value will be incorrect when number of replicas
|
|
# >1. For e.g. it will be -2 if num replicas = 2.
|
|
with distribution.scope():
|
|
x = keras.layers.Input(shape=(1,), batch_size=batch_size)
|
|
y = TestRegularizerLoss.AddLayer()(x)
|
|
model = keras.models.Model(inputs=x, outputs=y)
|
|
opt = gradient_descent_keras.SGD(1.)
|
|
model.compile(
|
|
opt,
|
|
loss=TestRegularizerLoss.loss_fn)
|
|
model.fit(
|
|
x=np.array([[1.], [1.]], dtype=np.float32),
|
|
y=np.array([[1.], [1.]], dtype=np.float32),
|
|
batch_size=batch_size)
|
|
v = model.get_weights()[0]
|
|
self.assertEqual(-1.0, v)
|
|
|
|
|
|
@testing_utils.run_all_without_tensor_float_32(
|
|
'Uses Dense layers, which call matmul')
|
|
class TestDistributionStrategyWithKerasModels(test.TestCase,
|
|
parameterized.TestCase):
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_distribution_strategy_on_sequential_model(
|
|
self, distribution):
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(learning_rate=0.001)
|
|
model = simple_sequential_model()
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
inputs = np.zeros((20, 10), np.float32)
|
|
targets = np.zeros((20, 2), np.float32)
|
|
|
|
model.fit(inputs, targets, epochs=1, batch_size=10)
|
|
model.predict(inputs, batch_size=10)
|
|
model.evaluate(inputs, targets, batch_size=10)
|
|
|
|
@ds_combinations.generate(all_strategy_combinations())
|
|
def test_distribution_strategy_on_functional_model(
|
|
self, distribution):
|
|
with distribution.scope():
|
|
optimizer_fn = gradient_descent_keras.SGD
|
|
optimizer = optimizer_fn(learning_rate=0.001)
|
|
model = get_model()
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss)
|
|
|
|
inputs = np.zeros((64, 3), dtype=np.float32)
|
|
targets = np.zeros((64, 4), dtype=np.float32)
|
|
|
|
model.fit(inputs, targets, epochs=1)
|
|
model.predict(inputs)
|
|
model.evaluate(inputs, targets)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_distributed_dataset(self, distribution):
|
|
with distribution.scope():
|
|
|
|
class CBCounter(keras.callbacks.Callback):
|
|
|
|
def __init__(self):
|
|
self.epochs = 0
|
|
self.train_batches = 0
|
|
self.test_batches = 0
|
|
|
|
def on_epoch_end(self, batch, logs=None):
|
|
self.epochs += 1
|
|
|
|
def on_train_batch_end(self, batch, logs=None):
|
|
self.train_batches += 1
|
|
|
|
def on_test_batch_end(self, batch, logs=None):
|
|
self.test_batches += 1
|
|
|
|
model = keras.Sequential([keras.layers.Dense(1)])
|
|
model.compile('sgd', 'mse')
|
|
cb_counter = CBCounter()
|
|
|
|
x, y = np.ones((100, 10)), np.ones((100, 1))
|
|
ds = dataset_ops.DatasetV2.from_tensor_slices((x, y))
|
|
ds = ds.batch(10).repeat(2)
|
|
ds = distribution.experimental_distribute_dataset(ds)
|
|
|
|
val_ds = dataset_ops.DatasetV2.from_tensor_slices((x, y))
|
|
val_ds = val_ds.batch(20)
|
|
val_ds = distribution.experimental_distribute_dataset(val_ds)
|
|
|
|
model.fit(
|
|
ds,
|
|
steps_per_epoch=10,
|
|
validation_data=val_ds,
|
|
validation_steps=5,
|
|
epochs=2,
|
|
callbacks=[cb_counter])
|
|
|
|
self.assertEqual(cb_counter.train_batches, 20)
|
|
self.assertEqual(cb_counter.test_batches, 10)
|
|
self.assertEqual(cb_counter.epochs, 2)
|
|
|
|
# Check for `steps_per_epoch`.
|
|
if distribution.num_replicas_in_sync > 1:
|
|
with self.assertRaisesRegex(ValueError,
|
|
'distributed dataset, you must specify'):
|
|
model.fit(ds, epochs=2)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_distributed_datasets_from_function(self, distribution):
|
|
with distribution.scope():
|
|
|
|
class CBCounter(keras.callbacks.Callback):
|
|
|
|
def __init__(self):
|
|
self.epochs = 0
|
|
self.train_batches = 0
|
|
self.test_batches = 0
|
|
|
|
def on_epoch_end(self, batch, logs=None):
|
|
self.epochs += 1
|
|
|
|
def on_train_batch_end(self, batch, logs=None):
|
|
self.train_batches += 1
|
|
|
|
def on_test_batch_end(self, batch, logs=None):
|
|
self.test_batches += 1
|
|
|
|
model = keras.Sequential([keras.layers.Dense(1)])
|
|
model.compile('sgd', 'mse')
|
|
cb_counter = CBCounter()
|
|
|
|
def make_dataset(_):
|
|
x, y = np.ones((100, 10)), np.ones((100, 1))
|
|
ds = dataset_ops.DatasetV2.from_tensor_slices((x, y))
|
|
ds = ds.batch(5).repeat()
|
|
return ds
|
|
|
|
ds = distribution.distribute_datasets_from_function(make_dataset)
|
|
val_ds = distribution.distribute_datasets_from_function(make_dataset)
|
|
|
|
model.fit(
|
|
ds,
|
|
steps_per_epoch=10,
|
|
validation_data=val_ds,
|
|
validation_steps=5,
|
|
epochs=2,
|
|
callbacks=[cb_counter])
|
|
|
|
self.assertEqual(cb_counter.train_batches, 20)
|
|
self.assertEqual(cb_counter.test_batches, 10)
|
|
self.assertEqual(cb_counter.epochs, 2)
|
|
|
|
# Check for `steps_per_epoch`.
|
|
if distribution.num_replicas_in_sync > 1:
|
|
with self.assertRaisesRegex(ValueError,
|
|
'distributed dataset, you must specify'):
|
|
model.fit(ds, epochs=2)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_host_training_loop(self, distribution):
|
|
if isinstance(distribution,
|
|
collective_all_reduce_strategy.CollectiveAllReduceStrategy):
|
|
self.skipTest('b/172032817')
|
|
with distribution.scope():
|
|
inputs = keras.Input((10, 10, 3))
|
|
x = keras.layers.Conv2D(3, kernel_size=3)(inputs)
|
|
x = keras.layers.Flatten()(x)
|
|
outputs = keras.layers.Dense(1)(x)
|
|
model = keras.Model(inputs, outputs)
|
|
|
|
model.compile('sgd', 'mse', steps_per_execution=10)
|
|
|
|
bc = BatchCountingCB()
|
|
x, y = np.ones((100, 10, 10, 3)), np.ones((100, 1))
|
|
model.fit(x, y, batch_size=2, epochs=1, callbacks=[bc])
|
|
self.assertEqual(bc.train_begin_batches, [0, 10, 20, 30, 40])
|
|
self.assertEqual(bc.train_end_batches, [9, 19, 29, 39, 49])
|
|
|
|
model.evaluate(x, y, batch_size=2, callbacks=[bc])
|
|
self.assertEqual(bc.test_begin_batches, [0, 10, 20, 30, 40])
|
|
self.assertEqual(bc.test_end_batches, [9, 19, 29, 39, 49])
|
|
|
|
model.predict(x, batch_size=2, callbacks=[bc])
|
|
self.assertEqual(bc.predict_begin_batches, [0, 10, 20, 30, 40])
|
|
self.assertEqual(bc.predict_end_batches, [9, 19, 29, 39, 49])
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_host_training_loop_last_partial_execution(self, distribution):
|
|
if isinstance(distribution,
|
|
collective_all_reduce_strategy.CollectiveAllReduceStrategy):
|
|
self.skipTest('b/172032817')
|
|
with distribution.scope():
|
|
inputs = keras.Input(10)
|
|
outputs = keras.layers.Dense(1)(inputs)
|
|
model = keras.Model(inputs, outputs)
|
|
|
|
model.compile('sgd', 'mse', steps_per_execution=20)
|
|
|
|
bc = BatchCountingCB()
|
|
x, y = np.ones((100, 10)), np.ones((100, 1))
|
|
model.fit(x, y, batch_size=2, epochs=1, callbacks=[bc])
|
|
self.assertEqual(bc.train_begin_batches, [0, 20, 40])
|
|
self.assertEqual(bc.train_end_batches, [19, 39, 49])
|
|
|
|
model.evaluate(x, y, batch_size=2, callbacks=[bc])
|
|
self.assertEqual(bc.test_begin_batches, [0, 20, 40])
|
|
self.assertEqual(bc.test_end_batches, [19, 39, 49])
|
|
|
|
model.predict(x, batch_size=2, callbacks=[bc])
|
|
self.assertEqual(bc.predict_begin_batches, [0, 20, 40])
|
|
self.assertEqual(bc.predict_end_batches, [19, 39, 49])
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_host_training_loop_dataset_unknown_size(self, distribution):
|
|
if isinstance(distribution,
|
|
collective_all_reduce_strategy.CollectiveAllReduceStrategy):
|
|
self.skipTest('b/172032817')
|
|
with distribution.scope():
|
|
inputs = keras.Input(10)
|
|
outputs = keras.layers.Dense(1)(inputs)
|
|
model = keras.Model(inputs, outputs)
|
|
|
|
model.compile('sgd', 'mse', steps_per_execution=20)
|
|
|
|
x, y = np.ones((100, 10)), np.ones((100, 1))
|
|
ds = dataset_ops.DatasetV2.from_tensor_slices((x, y)).batch(2)
|
|
ds = ds.filter(lambda *args, **kwargs: True) # Makes the size UNKNOWN.
|
|
bc = BatchCountingCB()
|
|
|
|
with self.assertRaisesRegex(ValueError, 'steps_per_execution'):
|
|
model.fit(ds, epochs=2, callbacks=[bc])
|
|
|
|
train_ds = ds.repeat(2)
|
|
model.fit(train_ds, steps_per_epoch=50, epochs=2, callbacks=[bc])
|
|
self.assertEqual(bc.train_begin_batches, [0, 20, 40, 0, 20, 40])
|
|
self.assertEqual(bc.train_end_batches, [19, 39, 49, 19, 39, 49])
|
|
|
|
with self.assertRaisesRegex(ValueError, 'steps_per_execution'):
|
|
model.evaluate(ds, callbacks=[bc])
|
|
|
|
test_ds = ds.repeat(2)
|
|
model.evaluate(test_ds, steps=50, callbacks=[bc])
|
|
self.assertEqual(bc.test_begin_batches, [0, 20, 40])
|
|
self.assertEqual(bc.test_end_batches, [19, 39, 49])
|
|
|
|
predict_ds = ds.repeat(2)
|
|
model.predict(predict_ds, steps=50, callbacks=[bc])
|
|
self.assertEqual(bc.predict_begin_batches, [0, 20, 40])
|
|
self.assertEqual(bc.predict_end_batches, [19, 39, 49])
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_host_training_loop_truncate_to_epoch(self, distribution):
|
|
if isinstance(distribution,
|
|
collective_all_reduce_strategy.CollectiveAllReduceStrategy):
|
|
self.skipTest('b/172032817')
|
|
with distribution.scope():
|
|
inputs = keras.Input(10)
|
|
outputs = keras.layers.Dense(1)(inputs)
|
|
model = keras.Model(inputs, outputs)
|
|
|
|
model.compile('sgd', 'mse', steps_per_execution=500)
|
|
|
|
x, y = np.ones((100, 10)), np.ones((100, 1))
|
|
bc = BatchCountingCB()
|
|
model.fit(x, y, batch_size=2, epochs=2, callbacks=[bc])
|
|
self.assertEqual(bc.train_begin_batches, [0, 0])
|
|
self.assertEqual(bc.train_end_batches, [49, 49])
|
|
|
|
x, y = np.ones((50, 10)), np.ones((50, 1))
|
|
model.evaluate(x, y, batch_size=2, callbacks=[bc])
|
|
self.assertEqual(bc.test_begin_batches, [0])
|
|
self.assertEqual(bc.test_end_batches, [24])
|
|
|
|
x = np.ones((50, 10))
|
|
model.predict(x, batch_size=2, callbacks=[bc])
|
|
self.assertEqual(bc.predict_begin_batches, [0])
|
|
self.assertEqual(bc.predict_end_batches, [24])
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_gradient_clipping(self, distribution):
|
|
|
|
class MyLayer(keras.layers.Layer):
|
|
|
|
def build(self, _):
|
|
self.v1 = variables.Variable(1.)
|
|
self.v2 = variables.Variable(1.)
|
|
|
|
def call(self, x):
|
|
return 3 * self.v1 - 3 * self.v2
|
|
|
|
x, y = np.ones((10, 1)), np.ones((10, 1))
|
|
|
|
with distribution.scope():
|
|
layer = MyLayer()
|
|
model = keras.Sequential([layer])
|
|
optimizer = gradient_descent_keras.SGD(1., clipnorm=2., clipvalue=2.)
|
|
model.compile(optimizer, 'mae')
|
|
|
|
if isinstance(distribution,
|
|
(central_storage_strategy.CentralStorageStrategy,
|
|
central_storage_strategy.CentralStorageStrategyV1)):
|
|
with self.assertRaisesRegex(ValueError, 'not supported'):
|
|
model.fit(x, y, batch_size=10, epochs=1)
|
|
else:
|
|
model.fit(x, y, batch_size=10, epochs=1)
|
|
self.assertAllClose(self.evaluate(layer.v1), 3.)
|
|
self.assertAllClose(self.evaluate(layer.v2), -1.)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_custom_gradient_transformation(self, distribution):
|
|
if isinstance(distribution,
|
|
(central_storage_strategy.CentralStorageStrategy,
|
|
central_storage_strategy.CentralStorageStrategyV1)):
|
|
self.skipTest('Not supported with `CentralStorageStrategy`')
|
|
|
|
class MyLayer(keras.layers.Layer):
|
|
|
|
def build(self, _):
|
|
self.v1 = variables.Variable(1.)
|
|
self.v2 = variables.Variable(-1.)
|
|
|
|
def call(self, x):
|
|
return x + self.v1 + self.v2
|
|
|
|
def custom_transform(grads_and_vars):
|
|
# Always set gradients to 1.
|
|
return [(array_ops.ones_like(g), v) for g, v in grads_and_vars]
|
|
|
|
x, y = np.ones((10, 1)), np.ones((10, 1))
|
|
|
|
with distribution.scope():
|
|
layer = MyLayer()
|
|
model = keras.Sequential([layer])
|
|
optimizer = gradient_descent_keras.SGD(
|
|
1., gradient_transformers=[custom_transform])
|
|
model.compile(optimizer, 'mae')
|
|
|
|
model.fit(x, y, batch_size=10, epochs=1)
|
|
self.assertAllClose(self.evaluate(layer.v1), 0.)
|
|
self.assertAllClose(self.evaluate(layer.v2), -2.)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
all_strategy_combinations_minus_default()))
|
|
def test_distribution_strategy_one_dimensional(self, distribution):
|
|
with distribution.scope():
|
|
inp = keras.layers.Input(shape=(10,))
|
|
out = keras.layers.Dense(3, activation='softmax')(inp)
|
|
model = keras.Model(inputs=[inp], outputs=[out])
|
|
model.compile(
|
|
optimizer='rmsprop',
|
|
loss='sparse_categorical_crossentropy',
|
|
metrics=['sparse_categorical_accuracy'])
|
|
|
|
x = np.random.random((64, 10)).astype('float32')
|
|
y = np.random.randint(3, size=64)
|
|
|
|
model.fit(x, y, epochs=1, steps_per_epoch=2)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=[
|
|
strategy_combinations.mirrored_strategy_with_gpu_and_cpu,
|
|
strategy_combinations.mirrored_strategy_with_two_gpus
|
|
],
|
|
mode=['graph', 'eager'],
|
|
reduction=[
|
|
loss_reduction.ReductionV2.AUTO,
|
|
loss_reduction.ReductionV2.SUM_OVER_BATCH_SIZE,
|
|
loss_reduction.ReductionV2.SUM
|
|
]))
|
|
def test_distribution_strategy_with_loss_reduction_types(
|
|
self, distribution, reduction):
|
|
np.random.seed(_RANDOM_SEED)
|
|
|
|
def _get_model():
|
|
inputs = keras.Input((10,))
|
|
x1 = keras.layers.Dense(10, kernel_initializer='zeros')(inputs)
|
|
x2 = keras.layers.Dense(10, kernel_initializer='zeros')(x1)
|
|
outputs = keras.layers.Dense(1, kernel_initializer='zeros')(x2)
|
|
model = keras.Model(inputs, outputs)
|
|
return model
|
|
|
|
x = np.random.random((64, 10))
|
|
y = np.random.random((64, 1))
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((x, y))
|
|
dataset = dataset.batch(32)
|
|
|
|
model = _get_model()
|
|
model.compile(
|
|
'sgd', loss=keras.losses.MeanSquaredError(reduction=reduction))
|
|
history = model.fit(dataset, steps_per_epoch=2, epochs=1, shuffle=False)
|
|
|
|
with distribution.scope():
|
|
ds_model = _get_model()
|
|
ds_model.compile(
|
|
'sgd',
|
|
loss=keras.losses.MeanSquaredError(reduction=reduction))
|
|
ds_history = ds_model.fit(
|
|
dataset, steps_per_epoch=2, epochs=1, shuffle=False)
|
|
self.assertArrayNear(history.history['loss'], ds_history.history['loss'],
|
|
1e-5)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
all_strategy_combinations_minus_default()))
|
|
def test_distribution_strategy_with_symbolic_add_loss(
|
|
self, mode, distribution):
|
|
|
|
def _make_model_with_add_loss():
|
|
inputs = keras.Input((10,))
|
|
x1 = keras.layers.Dense(10, kernel_initializer='zeros')(inputs)
|
|
x2 = keras.layers.Dense(10, kernel_initializer='zeros')(x1)
|
|
outputs = keras.layers.Dense(1, kernel_initializer='zeros')(x2)
|
|
model = keras.Model(inputs, outputs)
|
|
model.add_loss(math_ops.reduce_mean(x1))
|
|
model.add_loss(math_ops.reduce_mean(outputs))
|
|
return model
|
|
|
|
x = np.ones((64, 10)).astype('float32')
|
|
|
|
model = _make_model_with_add_loss()
|
|
model.compile('sgd')
|
|
history = model.fit(x, epochs=1)
|
|
|
|
with distribution.scope():
|
|
ds_model = _make_model_with_add_loss()
|
|
ds_model.compile(
|
|
'sgd')
|
|
ds_history = ds_model.fit(x, epochs=1)
|
|
|
|
self.assertAllClose(history.history, ds_history.history)
|
|
|
|
# TODO(omalleyt): Investigate flakiness and re-enable.
|
|
@ds_combinations.generate(all_strategy_minus_default_and_tpu_combinations())
|
|
def DISABLED_test_distribution_strategy_with_callable_add_loss(
|
|
self, distribution):
|
|
|
|
def _make_model():
|
|
inputs = keras.Input((10,))
|
|
x1 = keras.layers.Dense(10, kernel_initializer='zeros')(inputs)
|
|
x2 = keras.layers.Dense(10, kernel_initializer='zeros')(x1)
|
|
d = keras.layers.Dense(1, kernel_initializer='zeros')
|
|
outputs = d(x2)
|
|
model = keras.Model(inputs, outputs)
|
|
model.add_loss(lambda: 100. * math_ops.reduce_mean(d.kernel))
|
|
return model
|
|
|
|
x = np.ones((64, 10)).astype('float32')
|
|
y = np.ones((64, 1)).astype('float32')
|
|
|
|
model = _make_model()
|
|
self.assertLen(model.losses, 1)
|
|
|
|
model.compile('sgd', 'mse')
|
|
history = model.fit(x, y, steps_per_epoch=2, epochs=1)
|
|
|
|
with distribution.scope():
|
|
ds_model = _make_model()
|
|
self.assertLen(ds_model.losses, 1)
|
|
ds_model.compile('sgd', 'mse')
|
|
ds_history = ds_model.fit(x, y, steps_per_epoch=2, epochs=1)
|
|
|
|
self.assertAllClose(history.history, ds_history.history)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
all_strategy_minus_default_and_tpu_combinations()))
|
|
def test_distribution_strategy_with_add_metric_in_call(
|
|
self, distribution):
|
|
|
|
class Bias(keras.layers.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.bias = self.add_weight(name='bias', initializer='zeros', shape=())
|
|
|
|
def call(self, inputs):
|
|
self.add_metric(
|
|
math_ops.reduce_mean(inputs), name='bias', aggregation='mean')
|
|
return inputs + self.bias
|
|
|
|
def _make_model_with_add_metric():
|
|
inputs = keras.Input((10,))
|
|
x1 = keras.layers.Dense(10, kernel_initializer='zeros')(inputs)
|
|
x2 = Bias()(x1)
|
|
outputs = keras.layers.Dense(1, kernel_initializer='zeros')(x2)
|
|
model = keras.Model(inputs, outputs)
|
|
return model
|
|
|
|
x = np.ones((64, 10)).astype('float32')
|
|
y = np.ones((64, 1)).astype('float32')
|
|
|
|
model = _make_model_with_add_metric()
|
|
self.assertLen(model.metrics, 1)
|
|
|
|
model.compile('sgd', 'mse')
|
|
history = model.fit(
|
|
x, y, validation_data=(x, y), validation_steps=2, epochs=2)
|
|
|
|
with distribution.scope():
|
|
ds_model = _make_model_with_add_metric()
|
|
self.assertLen(ds_model.metrics, 1)
|
|
ds_model.compile(
|
|
'sgd',
|
|
'mse')
|
|
ds_history = ds_model.fit(
|
|
x, y, validation_data=(x, y), validation_steps=2, epochs=2)
|
|
# includes stateful loss metric in eager.
|
|
metrics_len = 2 if context.executing_eagerly() else 1
|
|
self.assertLen(ds_model.metrics, metrics_len)
|
|
|
|
self.assertAllClose(history.history, ds_history.history)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=[
|
|
strategy_combinations.one_device_strategy,
|
|
strategy_combinations.one_device_strategy_gpu,
|
|
strategy_combinations.mirrored_strategy_with_gpu_and_cpu,
|
|
strategy_combinations.mirrored_strategy_with_two_gpus,
|
|
],
|
|
mode=['eager']))
|
|
def test_distribution_strategy_with_add_metric_object(
|
|
self, distribution):
|
|
|
|
class Bias(keras.layers.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.bias = self.add_weight(name='bias', initializer='zeros', shape=())
|
|
self.mean = keras.metrics.Mean(name='mean')
|
|
|
|
def call(self, inputs):
|
|
self.add_metric(self.mean(inputs))
|
|
return inputs + self.bias
|
|
|
|
def _make_model_with_add_metric_object():
|
|
inputs = keras.Input((10,))
|
|
x1 = keras.layers.Dense(10, kernel_initializer='zeros')(inputs)
|
|
x2 = Bias()(x1)
|
|
outputs = keras.layers.Dense(1, kernel_initializer='zeros')(x2)
|
|
model = keras.Model(inputs, outputs)
|
|
return model
|
|
|
|
x = np.ones((64, 10)).astype('float32')
|
|
y = np.ones((64, 1)).astype('float32')
|
|
|
|
model = _make_model_with_add_metric_object()
|
|
self.assertLen(model.metrics, 1)
|
|
|
|
model.compile('sgd', 'mse')
|
|
history = model.fit(
|
|
x, y, validation_data=(x, y), validation_steps=2, epochs=2)
|
|
|
|
with distribution.scope():
|
|
ds_model = _make_model_with_add_metric_object()
|
|
self.assertLen(ds_model.metrics, 1)
|
|
ds_model.compile(
|
|
'sgd',
|
|
'mse')
|
|
ds_history = ds_model.fit(
|
|
x, y, validation_data=(x, y), validation_steps=2, epochs=2)
|
|
# includes stateful loss metric in eager.
|
|
metrics_len = 2 if context.executing_eagerly() else 1
|
|
self.assertLen(ds_model.metrics, metrics_len)
|
|
|
|
self.assertAllClose(history.history, ds_history.history)
|
|
|
|
@ds_combinations.generate(
|
|
# TODO(phillypham): Why does validation_steps > 1 not work on TPUs?
|
|
combinations.times(
|
|
all_strategy_minus_default_and_tpu_combinations()))
|
|
def test_distribution_strategy_with_add_metric_outside_call(
|
|
self, distribution):
|
|
|
|
def _make_model_with_add_metric():
|
|
inputs = keras.Input((10,))
|
|
x1 = keras.layers.Dense(10, kernel_initializer='zeros')(inputs)
|
|
outputs = keras.layers.Dense(1, kernel_initializer='zeros')(x1)
|
|
model = keras.Model(inputs, outputs)
|
|
model.add_metric(
|
|
math_ops.reduce_mean(x1), name='mid_mean', aggregation='mean')
|
|
return model
|
|
|
|
x = np.ones((64, 10)).astype('float32')
|
|
y = np.ones((64, 1)).astype('float32')
|
|
|
|
model = _make_model_with_add_metric()
|
|
self.assertLen(model.metrics, 1)
|
|
|
|
model.compile('sgd', 'mse')
|
|
history = model.fit(
|
|
x, y, validation_data=(x, y), validation_steps=2, epochs=2)
|
|
|
|
with distribution.scope():
|
|
ds_model = _make_model_with_add_metric()
|
|
self.assertLen(ds_model.metrics, 1)
|
|
ds_model.compile(
|
|
'sgd',
|
|
'mse')
|
|
ds_history = ds_model.fit(
|
|
x, y, validation_data=(x, y), validation_steps=2, epochs=2)
|
|
# includes stateful loss metric in eager.
|
|
metrics_len = 2 if context.executing_eagerly() else 1
|
|
self.assertLen(ds_model.metrics, metrics_len)
|
|
|
|
self.assertAllClose(history.history, ds_history.history)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=strategies_minus_tpu + multi_worker_mirrored_strategies,
|
|
mode=['eager']))
|
|
def test_sparse_tensor_outputs(self, distribution):
|
|
|
|
class ToSparse(keras.layers.Layer):
|
|
"""Create a sparse tensor based on a given dense tensor."""
|
|
|
|
def call(self, inputs):
|
|
indices = array_ops.where_v2(math_ops.not_equal(inputs, 0))
|
|
values = array_ops.gather_nd(inputs, indices)
|
|
shape = array_ops.shape(inputs, out_type='int64')
|
|
return sparse_tensor.SparseTensor(indices, values, dense_shape=shape)
|
|
|
|
model = keras.Sequential([ToSparse()])
|
|
|
|
# Define some input data with additional padding.
|
|
input_data = np.array([[1, 0, 0], [2, 3, 0]])
|
|
output = model.predict(input_data, batch_size=2)
|
|
|
|
expected_indices = np.array([[0, 0], [1, 0], [1, 1]])
|
|
expected_values = np.array([1, 2, 3])
|
|
expected_dense_shape = np.array([2, 3])
|
|
|
|
self.assertAllEqual(output.indices, expected_indices)
|
|
self.assertAllEqual(output.values, expected_values)
|
|
self.assertAllEqual(output.dense_shape, expected_dense_shape)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=strategies_minus_tpu + multi_worker_mirrored_strategies,
|
|
mode=['eager']))
|
|
def test_ragged_tensor_outputs(self, distribution):
|
|
|
|
class ToRagged(keras.layers.Layer):
|
|
"""Create a ragged tensor based on a given dense tensor."""
|
|
|
|
def __init__(self, padding, ragged_rank=1, **kwargs):
|
|
super(ToRagged, self).__init__(**kwargs)
|
|
self._padding = padding
|
|
self._ragged_rank = ragged_rank
|
|
|
|
def call(self, inputs):
|
|
return ragged_tensor.RaggedTensor.from_tensor(
|
|
inputs, padding=self._padding, ragged_rank=self._ragged_rank)
|
|
|
|
model = keras.Sequential([ToRagged(padding=0)])
|
|
|
|
# Define some input data with additional padding.
|
|
input_data = np.array([[1, 0, 0], [2, 3, 0]])
|
|
output = model.predict(input_data, batch_size=2)
|
|
|
|
expected_values = [[1], [2, 3]]
|
|
self.assertAllEqual(expected_values, output)
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=strategies_minus_default_minus_tpu + tpu_strategies +
|
|
multi_worker_mirrored_strategies,
|
|
mode=['eager']))
|
|
def test_correctness_of_add_loss_with_merge_call(self, distribution):
|
|
batch_size = 32
|
|
|
|
def _get_model():
|
|
inputs = keras.layers.Input(shape=(1,))
|
|
labels = keras.layers.Input(shape=(1,))
|
|
x = keras.layers.Dense(10, activation='relu')(inputs)
|
|
y = keras.layers.Dense(1)(x)
|
|
model = keras.models.Model([inputs, labels], y)
|
|
model.add_loss(keras.losses.mean_squared_error(labels, y))
|
|
return model
|
|
|
|
def _get_data():
|
|
x_train = np.random.rand(64, 1)
|
|
y_train = 3 * x_train
|
|
x_train = x_train.astype('float32')
|
|
y_train = y_train.astype('float32')
|
|
dataset = dataset_ops.DatasetV2.from_tensor_slices((x_train, y_train))
|
|
dataset = dataset.batch(batch_size)
|
|
return dataset
|
|
|
|
with distribution.scope():
|
|
model = _get_model()
|
|
optimizer = gradient_descent_keras.SGD(0.2)
|
|
|
|
@def_function.function
|
|
def train_step(dist_inputs):
|
|
|
|
def step_fn(inputs):
|
|
with backprop.GradientTape() as tape:
|
|
logits = model(inputs)
|
|
|
|
# Invoke a merge_call()
|
|
distribution_strategy_context.get_replica_context().merge_call(
|
|
lambda d: None)
|
|
|
|
# Verify that there is only one loss on the model.
|
|
assert len(model.losses) == 1
|
|
loss_from_model = math_ops.reduce_sum(
|
|
model.losses) * 1.0 / batch_size
|
|
|
|
# Compute loss in this loop.
|
|
loss = keras.losses.mean_squared_error(inputs[1], logits)
|
|
loss = nn.compute_average_loss(loss, global_batch_size=batch_size)
|
|
|
|
# Verify that the loss computed in this loop is equivalent to the
|
|
# loss from the model that was added via add_loss.
|
|
check_ops.assert_equal(loss, loss_from_model)
|
|
|
|
grads = tape.gradient(loss, model.trainable_variables)
|
|
optimizer.apply_gradients(zip(grads, model.trainable_variables))
|
|
return loss
|
|
|
|
per_replica_losses = distribution.run(step_fn, args=(dist_inputs,))
|
|
return distribution.reduce(
|
|
reduce_util.ReduceOp.SUM, per_replica_losses, axis=None)
|
|
|
|
dataset = distribution.experimental_distribute_dataset(_get_data())
|
|
for _ in range(2):
|
|
for x in dataset:
|
|
train_step(x)
|
|
|
|
@ds_combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_unimplemented_parameter_server_strategy(self):
|
|
cluster_spec = multi_worker_test_base.create_in_process_cluster(
|
|
num_workers=3, num_ps=2)
|
|
cluster_resolver = SimpleClusterResolver(
|
|
cluster_spec=server_lib.ClusterSpec(cluster_spec),
|
|
task_type='worker',
|
|
task_id=1,
|
|
num_accelerators={'GPU': 0})
|
|
distribution = parameter_server_strategy.ParameterServerStrategyV1(
|
|
cluster_resolver)
|
|
|
|
self.assertIsInstance(distribution,
|
|
parameter_server_strategy.ParameterServerStrategyV1)
|
|
|
|
with self.assertRaisesRegex(NotImplementedError,
|
|
'ParameterServerStrategy*'):
|
|
with distribution.scope():
|
|
model = simple_sequential_model()
|
|
optimizer = rmsprop.RMSPropOptimizer(learning_rate=0.001)
|
|
loss = 'mse'
|
|
model.compile(optimizer, loss)
|
|
|
|
|
|
# Models to exercise inserting ancillary layers with add_loss and add_metric.
|
|
def _functional_with_add_loss_and_metric(input_shape, num_classes, l1, l2):
|
|
inputs = keras.Input(input_shape, name='images')
|
|
x = keras.layers.Conv2D(32, kernel_size=5, activation='relu')(inputs)
|
|
x = keras.layers.MaxPooling2D(pool_size=2)(x)
|
|
x = keras.layers.Conv2D(64, kernel_size=5, activation='relu')(x)
|
|
x = keras.layers.MaxPooling2D(pool_size=2)(x)
|
|
# Apply L2 regularization to embedding. Use a mix of TensorFlow ops and layers
|
|
# to exercise all code paths.
|
|
x = keras.layers.Flatten(name='embedding')(x)
|
|
l2_loss = math_ops.reduce_mean(math_ops.reduce_sum(math_ops.square(x), -1))
|
|
# Apply L1 regularization to next layer.
|
|
x = keras.layers.Dense(1024, activation='relu', name='sparse_embedding')(x)
|
|
l1_loss = keras.layers.Lambda(
|
|
lambda x: math_ops.reduce_mean(math_ops.reduce_sum(x, -1)),
|
|
name='l1_loss')(
|
|
x)
|
|
outputs = keras.layers.Dense(num_classes, name='logits')(x)
|
|
model = keras.Model(inputs=inputs, outputs=outputs)
|
|
# Weight regularization terms.
|
|
model.add_loss(keras.layers.Lambda(lambda x: x * l2)(l2_loss))
|
|
model.add_metric(l2_loss, aggregation='mean', name='l2_loss')
|
|
model.add_loss(l1_loss * l1)
|
|
model.add_metric(l1_loss, aggregation='mean', name='l1_loss')
|
|
return model
|
|
|
|
|
|
def _sequential_with_add_loss_and_metric(input_shape, num_classes, l1, l2):
|
|
model = keras.Sequential([
|
|
keras.layers.Conv2D(
|
|
32, kernel_size=5, activation='relu', input_shape=input_shape),
|
|
keras.layers.MaxPooling2D(pool_size=2),
|
|
keras.layers.Conv2D(64, kernel_size=5, activation='relu'),
|
|
keras.layers.MaxPooling2D(pool_size=2),
|
|
keras.layers.Flatten(name='embedding'),
|
|
keras.layers.Dense(1024, activation='relu', name='sparse_embedding'),
|
|
keras.layers.Dense(num_classes, name='logits'),
|
|
])
|
|
# Extract layer outputs, add regularization terms, and rescale the metric.
|
|
# Use a mix of TensorFlow ops and layers to exercise all code paths.
|
|
x = model.get_layer('sparse_embedding').get_output_at(-1)
|
|
l1_loss = l1 * math_ops.reduce_mean(math_ops.reduce_sum(x, -1))
|
|
model.add_loss(l1_loss)
|
|
model.add_metric(
|
|
keras.layers.Lambda(lambda x: math_ops.divide(x, l1))(l1_loss),
|
|
aggregation='mean',
|
|
name='l1_loss')
|
|
x = model.get_layer('embedding').get_output_at(-1)
|
|
l2_loss = keras.layers.Lambda(
|
|
lambda x: l2 * math_ops.reduce_mean(math_ops.reduce_sum(x * x, -1)),
|
|
name='l2_loss')(
|
|
x)
|
|
model.add_loss(l2_loss)
|
|
model.add_metric(l2_loss / l2, aggregation='mean', name='l2_loss')
|
|
return model
|
|
|
|
|
|
def _functional_with_layer_reuse(input_shape, num_classes, l1, l2):
|
|
base_model = keras.Sequential([
|
|
keras.layers.Conv2D(
|
|
32, kernel_size=5, activation='relu', input_shape=input_shape),
|
|
keras.layers.MaxPooling2D(pool_size=2),
|
|
keras.layers.Conv2D(64, kernel_size=5, activation='relu'),
|
|
keras.layers.MaxPooling2D(pool_size=2),
|
|
keras.layers.Flatten(),
|
|
keras.layers.Dense(1024, activation='relu'),
|
|
keras.layers.Dense(num_classes, name='logits'),
|
|
])
|
|
inputs = keras.Input(input_shape, name='images')
|
|
logits = base_model(inputs)
|
|
model = keras.Model(inputs=inputs, outputs=logits)
|
|
# Reuse sequential layer and create new nodes.
|
|
zero_logits = base_model(array_ops.zeros_like(inputs))
|
|
one_logits = base_model(array_ops.ones_like(inputs))
|
|
# L2 loss.
|
|
l2_loss = math_ops.reduce_mean(
|
|
math_ops.reduce_sum(math_ops.square(logits - zero_logits), -1))
|
|
model.add_loss(l2_loss * l2)
|
|
model.add_metric(l2_loss, aggregation='mean', name='l2_loss')
|
|
# L1 loss.
|
|
l1_loss = math_ops.reduce_mean(
|
|
math_ops.reduce_sum(math_ops.abs(logits - one_logits), -1))
|
|
model.add_loss(l1_loss * l1)
|
|
model.add_metric(l1_loss, aggregation='mean', name='l1_loss')
|
|
return model
|
|
|
|
|
|
class TestDistributionStrategyWithMultipleAddLossAndMetricCalls(
|
|
test.TestCase, parameterized.TestCase):
|
|
"""Tests complex models with multiple add loss and metric calls."""
|
|
|
|
@ds_combinations.generate(
|
|
combinations.times(
|
|
all_strategy_combinations_minus_default(),
|
|
combinations.combine(
|
|
model_fn=[
|
|
_functional_with_add_loss_and_metric,
|
|
_sequential_with_add_loss_and_metric,
|
|
_functional_with_layer_reuse,
|
|
],
|
|
l1=[0.01],
|
|
l2=[0.1])))
|
|
def test_fit_and_evaluate(self, distribution, model_fn, l1, l2):
|
|
# Make fake MNIST-like image data.
|
|
np.random.seed(_RANDOM_SEED)
|
|
dataset = dataset_ops.DatasetV2.from_tensor_slices(
|
|
(np.random.uniform(size=(64, 28, 28, 1)).astype(np.float32),
|
|
np.random.randint(0, 10, size=(64,))))
|
|
dataset = dataset.shuffle(64).batch(
|
|
8 * distribution.num_replicas_in_sync, drop_remainder=True)
|
|
# Make model with distribution strategy and initialize with dataset shape.
|
|
input_shape = dataset_ops.get_structure(dataset)[0].shape[1:]
|
|
with distribution.scope():
|
|
model = model_fn(input_shape, 10, l1, l2)
|
|
model.compile(
|
|
optimizer=keras.optimizers.adam_v2.Adam(1e-4),
|
|
loss=keras.losses.SparseCategoricalCrossentropy(
|
|
from_logits=True,
|
|
reduction=loss_reduction.ReductionV2.SUM_OVER_BATCH_SIZE),
|
|
metrics=[
|
|
keras.metrics.SparseCategoricalAccuracy(),
|
|
keras.metrics.SparseCategoricalCrossentropy(from_logits=True),
|
|
])
|
|
# Non-eager training doesn't support steps_per_epoch=None.
|
|
for unused_epoch in range(2):
|
|
model.fit(dataset)
|
|
results = dict(zip(model.metrics_names, model.evaluate(dataset)))
|
|
# Sanity checks.
|
|
self.assertBetween(results['sparse_categorical_accuracy'], 0.02, 1.)
|
|
self.assertGreater(results['l2_loss'], 0.)
|
|
self.assertGreater(results['l1_loss'], 0.)
|
|
# Assert correctness of the loss calculation and updating of metrics.
|
|
self.assertNear(
|
|
results['l1_loss'] * l1 + results['l2_loss'] * l2 +
|
|
results['sparse_categorical_crossentropy'], results['loss'], 1e-6)
|
|
|
|
|
|
class DeterministicModel(keras.Model):
|
|
"""Deterministic Model that always outputs the same initial result.
|
|
|
|
It verifies the `call` method is run inside the same distribution
|
|
strategy that the model was initially passed.
|
|
"""
|
|
|
|
def __init__(self, strategy):
|
|
super(DeterministicModel, self).__init__()
|
|
self.x = None
|
|
self.strategy = strategy
|
|
|
|
def build(self, input_shape):
|
|
self.x = variables.Variable(array_ops.ones(shape=()))
|
|
|
|
def call(self, inputs, training=None, mask=None):
|
|
active_strategy = distribution_strategy_context.get_strategy()
|
|
if active_strategy is not self.strategy:
|
|
raise ValueError('Model must execute call w/ the original strategy')
|
|
return self.x * inputs
|
|
|
|
|
|
class TestModelCapturesStrategy(test.TestCase, parameterized.TestCase):
|
|
"""Tests that model creation captures the strategy."""
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(distribution=all_strategies, mode=['eager']))
|
|
def test_fit_and_evaluate(self, distribution):
|
|
dataset = dataset_ops.DatasetV2.from_tensor_slices(
|
|
(array_ops.ones(shape=(64,)), array_ops.ones(shape=(64,))))
|
|
dataset = dataset.batch(8 * distribution.num_replicas_in_sync)
|
|
# Make model with distribution strategy
|
|
with distribution.scope():
|
|
model = DeterministicModel(distribution)
|
|
optimizer = keras.optimizers.adam_v2.Adam(1e-4)
|
|
|
|
# Compile & evaluate the model outside of the distribution strategy scope
|
|
model.compile(
|
|
optimizer=optimizer,
|
|
loss=keras.losses.MeanSquaredError(),
|
|
metrics=['binary_accuracy'])
|
|
|
|
# Call `optimizer.iterations` out of strategy scope.
|
|
self.assertEqual(model.optimizer.iterations.numpy(), 0)
|
|
|
|
# Non-eager training doesn't support steps_per_epoch=None.
|
|
for unused_epoch in range(2):
|
|
model.fit(dataset)
|
|
|
|
results = model.evaluate(dataset)
|
|
results = dict(zip(model.metrics_names, results))
|
|
|
|
# Check that the metrics have a result we expect
|
|
self.assertEqual(results['binary_accuracy'], 1.0)
|
|
self.assertAllClose(results['loss'], 0.0)
|
|
|
|
# Assert that all metric/optimizer/model variables were made in the
|
|
# distribution strategy (Test that compile uses the captured
|
|
# distribution strategy)
|
|
metric_vars = nest.flatten(
|
|
[metric.variables for metric in model.metrics])
|
|
for var in metric_vars:
|
|
self.assertTrue(distribution.extended.variable_created_in_scope(var))
|
|
for var in model.optimizer._weights:
|
|
self.assertTrue(distribution.extended.variable_created_in_scope(var))
|
|
for var in model.variables:
|
|
self.assertTrue(distribution.extended.variable_created_in_scope(var))
|
|
|
|
# Make sure the metric must be created in the same scope as the model:
|
|
# This shouldn't raise any validation errors
|
|
with distribution.scope():
|
|
metric = keras.metrics.BinaryAccuracy()
|
|
model.compile(
|
|
optimizer=optimizer,
|
|
loss=keras.losses.MeanSquaredError(),
|
|
metrics=[metric])
|
|
|
|
# This should raise an error because the metric is constructed
|
|
# outside of the scope, and not by compile
|
|
if distribution_strategy_context.has_strategy():
|
|
with self.assertRaisesRegex(ValueError, 'All metrics must be created in'):
|
|
model.compile(
|
|
optimizer=keras.optimizers.adam_v2.Adam(1e-4),
|
|
loss=keras.losses.MeanSquaredError(),
|
|
metrics=[keras.metrics.BinaryAccuracy()])
|
|
|
|
@ds_combinations.generate(
|
|
combinations.combine(
|
|
distribution=strategy_combinations.mirrored_strategy_with_one_cpu,
|
|
mode=['eager']))
|
|
def test_optimizer(self, distribution):
|
|
temp_dir = os.path.join(self.get_temp_dir(), 'ckpt')
|
|
|
|
def create_model():
|
|
model = keras.models.Sequential([
|
|
keras.layers.Dense(1),
|
|
])
|
|
model.compile(optimizer='adam', loss='mse')
|
|
model.build([None, 1]) # create weights.
|
|
self.assertEmpty(model.optimizer.weights)
|
|
return model
|
|
|
|
model = create_model()
|
|
x = y = array_ops.ones(shape=(1, 1))
|
|
model.fit(x=x, y=y, batch_size=1)
|
|
model.save_weights(temp_dir)
|
|
|
|
with distribution.scope():
|
|
model = create_model()
|
|
model.load_weights(temp_dir)
|
|
self.assertNotEmpty(model.optimizer.weights)
|
|
self.assertIsInstance(model.optimizer.weights[0],
|
|
ds_values_lib.DistributedVariable)
|
|
|
|
with distribution.scope():
|
|
model = create_model()
|
|
# create/restore slot variables outside of scope is fine.
|
|
model.load_weights(temp_dir)
|
|
self.assertNotEmpty(model.optimizer.weights)
|
|
self.assertIsInstance(model.optimizer.weights[0],
|
|
ds_values_lib.DistributedVariable)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
base_layer_utils.enable_v2_dtype_behavior()
|
|
multi_process_runner.test_main()
|