With this change, it is now possible to mix-and-match tf.keras.Layers and tf.Modules inside a tf.keras.Model and everything will be tracked properly. - Variables in tf.Modules that are set as attributes of custom Layers and Models now show up properly in properties such as Layer.trainable_variables and Model.trainable_variables. - tf.Modules do not show up in Model.layers. Instead, a new method Layer._flatten_modules is added that iterates over tf.Modules and Layers in the order that Keras expects. The existing method Layer.submodules (inherited from tf.Module) can still be used to iterate over tf.Modules and Layer with the tf.Module ordering. Layer._flatten_layers is built on top of Layer._flatten_modules - Layer._layers is renamed to Layer._self_tracked_trackables to avoid naming conflicts with user-defined attributes (and to reflect that this attr contains Layers, Modules, and TrackableDataStructures) - A new property is added to tf.Module to enable this, namely tf.Module.non_trainable_variables PiperOrigin-RevId: 339917644 Change-Id: I96a7302745280a6261de8c4295c5cbf5f4d7dd5c
3762 lines
132 KiB
Python
3762 lines
132 KiB
Python
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""Tests for training routines."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import collections
|
|
import io
|
|
import sys
|
|
|
|
from absl.testing import parameterized
|
|
import numpy as np
|
|
import six
|
|
|
|
from tensorflow.python.data.ops import dataset_ops
|
|
from tensorflow.python.eager import context
|
|
from tensorflow.python.eager import def_function
|
|
from tensorflow.python.framework import ops
|
|
from tensorflow.python.framework import tensor_shape
|
|
from tensorflow.python.framework import test_util as tf_test_util
|
|
from tensorflow.python.keras import backend
|
|
from tensorflow.python.keras import combinations
|
|
from tensorflow.python.keras import keras_parameterized
|
|
from tensorflow.python.keras import layers as layers_module
|
|
from tensorflow.python.keras import losses
|
|
from tensorflow.python.keras import metrics as metrics_module
|
|
from tensorflow.python.keras import optimizer_v2
|
|
from tensorflow.python.keras import testing_utils
|
|
from tensorflow.python.keras.callbacks import Callback
|
|
from tensorflow.python.keras.engine import input_layer
|
|
from tensorflow.python.keras.engine import sequential
|
|
from tensorflow.python.keras.engine import training as training_module
|
|
from tensorflow.python.keras.engine import training_utils_v1
|
|
from tensorflow.python.keras.utils import data_utils
|
|
from tensorflow.python.keras.utils import np_utils
|
|
from tensorflow.python.ops import array_ops
|
|
from tensorflow.python.ops import init_ops
|
|
from tensorflow.python.ops import math_ops
|
|
from tensorflow.python.ops import nn_ops
|
|
from tensorflow.python.ops import sparse_ops
|
|
from tensorflow.python.ops import state_ops
|
|
from tensorflow.python.ops import template
|
|
from tensorflow.python.ops import variable_scope
|
|
from tensorflow.python.ops import variables as variables_lib
|
|
from tensorflow.python.platform import test
|
|
from tensorflow.python.platform import tf_logging as logging
|
|
from tensorflow.python.training.rmsprop import RMSPropOptimizer
|
|
|
|
try:
|
|
import scipy.sparse as scipy_sparse # pylint: disable=g-import-not-at-top
|
|
except ImportError:
|
|
scipy_sparse = None
|
|
|
|
|
|
class TrainingTest(keras_parameterized.TestCase):
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
@keras_parameterized.run_with_all_model_types
|
|
def test_model_instrumentation(self):
|
|
layers = [
|
|
layers_module.Dense(10, dtype=np.float64),
|
|
layers_module.Dense(10, dtype=np.float64)
|
|
]
|
|
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
|
|
|
|
self.assertTrue(model._instrumented_keras_api)
|
|
self.assertTrue(model._instrumented_keras_model_class)
|
|
self.assertFalse(model._instrumented_keras_layer_class)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_fit_training_arg(self):
|
|
|
|
class ReturnTraining(layers_module.Layer):
|
|
|
|
def call(self, inputs, training):
|
|
if training:
|
|
return inputs + array_ops.constant([100], 'float32')
|
|
else:
|
|
return inputs + array_ops.constant([0], 'float32')
|
|
|
|
model = sequential.Sequential([ReturnTraining()])
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
hist = model.fit(x=np.array([0.]), y=np.array([0.]))
|
|
self.assertAllClose(hist.history['loss'][0], 10000)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_fit_on_empty(self):
|
|
model = sequential.Sequential([layers_module.Dense(1)])
|
|
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
with self.assertRaisesRegex(ValueError,
|
|
'Expect x to be a non-empty array or dataset.'):
|
|
model.fit(x=np.array([]), y=np.array([]))
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_run_eagerly_setting(self):
|
|
model = sequential.Sequential([layers_module.Dense(1)])
|
|
run_eagerly = testing_utils.should_run_eagerly()
|
|
model.compile('sgd', 'mse', run_eagerly=run_eagerly)
|
|
self.assertEqual(model.run_eagerly, run_eagerly)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
@parameterized.named_parameters(
|
|
('train_on_batch', 'train_on_batch'),
|
|
('test_on_batch', 'test_on_batch'),
|
|
('predict_on_batch', 'predict_on_batch'),
|
|
('fit', 'fit'),
|
|
('evaluate', 'evaluate'),
|
|
('predict', 'predict'),
|
|
)
|
|
def test_disallow_methods_inside_tf_function(self, method_name):
|
|
model = sequential.Sequential([layers_module.Dense(1)])
|
|
run_eagerly = testing_utils.should_run_eagerly()
|
|
model.compile('sgd', 'mse', run_eagerly=run_eagerly)
|
|
|
|
@def_function.function
|
|
def my_fn():
|
|
getattr(model, method_name)(1)
|
|
|
|
error_msg = 'inside a `tf.function`'
|
|
with self.assertRaisesRegex(RuntimeError, error_msg):
|
|
my_fn()
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_fit_and_validate_learning_phase(self):
|
|
|
|
class ReturnTraining(layers_module.Layer):
|
|
|
|
def call(self, inputs):
|
|
return backend.in_train_phase(lambda: array_ops.ones_like(inputs),
|
|
lambda: array_ops.zeros_like(inputs))
|
|
|
|
model = sequential.Sequential([ReturnTraining(input_shape=(2,))])
|
|
model.compile(
|
|
'sgd',
|
|
loss='mae',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
inputs = np.ones((40, 2), dtype=np.float32)
|
|
targets = np.ones((40, 1), dtype=np.float32)
|
|
|
|
# Test correctness with `steps_per_epoch`.
|
|
train_dataset = dataset_ops.Dataset.from_tensor_slices(
|
|
(inputs, targets)).batch(10)
|
|
val_dataset = dataset_ops.Dataset.from_tensor_slices(
|
|
(inputs, targets)).batch(10)
|
|
history = model.fit(
|
|
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
|
|
|
|
# The training loss should be 0.0
|
|
self.assertAllClose(history.history['loss'][0], 0.0)
|
|
# The validation loss should be 1.0.
|
|
self.assertAllClose(history.history['val_loss'][0], 1.0)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_fit_and_validate_training_arg(self):
|
|
|
|
class ReturnTraining(layers_module.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
return backend.in_train_phase(
|
|
lambda: array_ops.ones_like(inputs),
|
|
lambda: array_ops.zeros_like(inputs),
|
|
training=training)
|
|
|
|
model = sequential.Sequential([ReturnTraining(input_shape=(2,))])
|
|
model.compile(
|
|
'sgd',
|
|
loss='mae',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
inputs = np.ones((40, 2), dtype=np.float32)
|
|
targets = np.ones((40, 1), dtype=np.float32)
|
|
|
|
# Test correctness with `steps_per_epoch`.
|
|
train_dataset = dataset_ops.Dataset.from_tensor_slices(
|
|
(inputs, targets)).batch(10)
|
|
val_dataset = dataset_ops.Dataset.from_tensor_slices(
|
|
(inputs, targets)).batch(10)
|
|
history = model.fit(
|
|
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
|
|
|
|
# The training loss should be 0.0
|
|
self.assertAllClose(history.history['loss'][0], 0.0)
|
|
# The validation loss should be 1.0.
|
|
self.assertAllClose(history.history['val_loss'][0], 1.0)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
@keras_parameterized.run_with_all_model_types
|
|
def test_target_dtype_matches_output(self):
|
|
|
|
def loss_fn(labels, preds):
|
|
self.assertEqual(labels.dtype, preds.dtype)
|
|
return labels - preds
|
|
|
|
layers = [
|
|
layers_module.Dense(10, dtype=np.float64),
|
|
layers_module.Dense(10, dtype=np.float64)
|
|
]
|
|
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
|
|
inputs = np.ones(10, dtype=np.float64)
|
|
targets = np.ones(10, dtype=np.float64)
|
|
model.compile(
|
|
'sgd',
|
|
loss=loss_fn,
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.train_on_batch(inputs, targets)
|
|
model.test_on_batch(inputs, targets)
|
|
self.assertEqual(model.predict(inputs).dtype, np.float64)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_fit_and_validate_nested_training_arg(self):
|
|
|
|
class NestedReturnTraining(layers_module.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
return backend.in_train_phase(
|
|
lambda: array_ops.ones_like(inputs),
|
|
lambda: array_ops.zeros_like(inputs),
|
|
training=training)
|
|
|
|
class ReturnTraining(layers_module.Layer):
|
|
|
|
def __init__(self, input_shape=None, **kwargs):
|
|
super(ReturnTraining, self).__init__(input_shape=input_shape, **kwargs)
|
|
self._nested_layer = None
|
|
|
|
def build(self, input_shape):
|
|
self._nested_layer = NestedReturnTraining()
|
|
self.built = True
|
|
|
|
def call(self, inputs):
|
|
return self._nested_layer(inputs)
|
|
|
|
model = sequential.Sequential([ReturnTraining(input_shape=(2,))])
|
|
model.compile(
|
|
'sgd',
|
|
loss='mae',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
inputs = np.ones((40, 2), dtype=np.float32)
|
|
targets = np.ones((40, 1), dtype=np.float32)
|
|
|
|
# Test correctness with `steps_per_epoch`.
|
|
train_dataset = dataset_ops.Dataset.from_tensor_slices(
|
|
(inputs, targets)).batch(10)
|
|
val_dataset = dataset_ops.Dataset.from_tensor_slices(
|
|
(inputs, targets)).batch(10)
|
|
history = model.fit(
|
|
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
|
|
|
|
# The training loss should be 0.0
|
|
self.assertAllClose(history.history['loss'][0], 0.0)
|
|
# The validation loss should be 1.0.
|
|
self.assertAllClose(history.history['val_loss'][0], 1.0)
|
|
|
|
@keras_parameterized.run_with_all_model_types(exclude_models='sequential')
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_fit_on_arrays(self):
|
|
input_a = layers_module.Input(shape=(3,), name='input_a')
|
|
input_b = layers_module.Input(shape=(3,), name='input_b')
|
|
|
|
dense = layers_module.Dense(4, name='dense')
|
|
dropout = layers_module.Dropout(0.5, name='dropout')
|
|
branch_a = [input_a, dense]
|
|
branch_b = [input_b, dense, dropout]
|
|
|
|
model = testing_utils.get_multi_io_model(branch_a, branch_b)
|
|
|
|
optimizer = RMSPropOptimizer(learning_rate=0.001)
|
|
loss = 'mse'
|
|
loss_weights = [1., 0.5]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
|
|
loss_weights=loss_weights,
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 3))
|
|
|
|
output_d_np = np.random.random((10, 4))
|
|
output_e_np = np.random.random((10, 4))
|
|
|
|
# Test fit at different verbosity
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=0)
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=1)
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=2,
|
|
batch_size=5,
|
|
verbose=2)
|
|
model.train_on_batch([input_a_np, input_b_np], [output_d_np, output_e_np])
|
|
|
|
# Test with validation data
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
validation_data=([input_a_np, input_b_np], [output_d_np,
|
|
output_e_np]),
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=0)
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
validation_data=([input_a_np, input_b_np], [output_d_np,
|
|
output_e_np]),
|
|
epochs=2,
|
|
batch_size=5,
|
|
verbose=1)
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
validation_data=([input_a_np, input_b_np], [output_d_np,
|
|
output_e_np]),
|
|
epochs=2,
|
|
batch_size=5,
|
|
verbose=2)
|
|
# Test with validation split
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=2,
|
|
batch_size=5,
|
|
verbose=0,
|
|
validation_split=0.2)
|
|
|
|
if testing_utils.get_model_type() == 'functional':
|
|
# Test with dictionary inputs
|
|
model.fit(
|
|
{
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, {
|
|
'dense': output_d_np,
|
|
'dropout': output_e_np
|
|
},
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=0)
|
|
model.fit(
|
|
{
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, {
|
|
'dense': output_d_np,
|
|
'dropout': output_e_np
|
|
},
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=1)
|
|
model.fit(
|
|
{
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, {
|
|
'dense': output_d_np,
|
|
'dropout': output_e_np
|
|
},
|
|
validation_data=({
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, {
|
|
'dense': output_d_np,
|
|
'dropout': output_e_np
|
|
}),
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=0)
|
|
model.train_on_batch({
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, {
|
|
'dense': output_d_np,
|
|
'dropout': output_e_np
|
|
})
|
|
|
|
# Test with lists for loss, metrics
|
|
loss = ['mae', 'mse']
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=0)
|
|
|
|
# Test with dictionaries for loss, metrics, loss weights
|
|
if testing_utils.get_model_type() == 'functional':
|
|
loss = {'dense': 'mse', 'dropout': 'mae'}
|
|
loss_weights = {'dense': 1., 'dropout': 0.5}
|
|
metrics = {
|
|
'dense': 'mse',
|
|
'dropout': metrics_module.CategoricalAccuracy()
|
|
}
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=metrics,
|
|
loss_weights=loss_weights,
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=1,
|
|
batch_size=5,
|
|
verbose=0)
|
|
|
|
# Build single-input model
|
|
x = layers_module.Input(shape=(3,), name='input_a')
|
|
y = layers_module.Dense(4)(x)
|
|
model = training_module.Model(x, y)
|
|
model.compile(
|
|
optimizer,
|
|
loss='mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
# This will work
|
|
model.fit([input_a_np], output_d_np, epochs=1)
|
|
|
|
# Test model on a list of floats
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 4))
|
|
|
|
# Test execution on inputs that are lists of scalars.
|
|
# TF2 and TF1 have slightly different semantics:
|
|
if context.executing_eagerly():
|
|
# In TF2 to avoid any ambiguity when there are nested lists
|
|
# the entire input gets converted to a
|
|
# single numpy array (& it only works in the case of a single io model)
|
|
model.fit(np.ndarray.tolist(input_a_np),
|
|
np.ndarray.tolist(input_b_np),
|
|
epochs=2,
|
|
batch_size=5,
|
|
verbose=2)
|
|
else:
|
|
# In TF1 there was logic to try disambiguating between the individual
|
|
# inputs when lists are nested. This allowed multi-io functional models
|
|
# to support lists of scalars as input, but it caused ambiguity issues
|
|
# for subclass models & made it trickier to pass multi-dimensional inputs
|
|
# as lists of scalars to single io models. This was an excessive amount
|
|
# of complexity for what boiled down to a convenience method we were
|
|
# mainly just using for writing tests.
|
|
model.fit([np.ndarray.tolist(input_a_np)],
|
|
[np.ndarray.tolist(input_b_np)],
|
|
epochs=2,
|
|
batch_size=5,
|
|
verbose=2)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_evaluate_predict_on_arrays(self):
|
|
a = layers_module.Input(shape=(3,), name='input_a')
|
|
b = layers_module.Input(shape=(3,), name='input_b')
|
|
|
|
dense = layers_module.Dense(4, name='dense')
|
|
c = dense(a)
|
|
d = dense(b)
|
|
e = layers_module.Dropout(0.5, name='dropout')(c)
|
|
|
|
model = training_module.Model([a, b], [d, e])
|
|
|
|
optimizer = RMSPropOptimizer(learning_rate=0.001)
|
|
loss = 'mse'
|
|
loss_weights = [1., 0.5]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=['mae', metrics_module.CategoricalAccuracy()],
|
|
loss_weights=loss_weights,
|
|
sample_weight_mode=None,
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 3))
|
|
|
|
output_d_np = np.random.random((10, 4))
|
|
output_e_np = np.random.random((10, 4))
|
|
|
|
# Test evaluate at different verbosity
|
|
out = model.evaluate(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
batch_size=5,
|
|
verbose=0)
|
|
self.assertEqual(len(out), 7)
|
|
out = model.evaluate(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
batch_size=5,
|
|
verbose=1)
|
|
self.assertEqual(len(out), 7)
|
|
out = model.evaluate(
|
|
[input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
batch_size=5,
|
|
verbose=2)
|
|
self.assertEqual(len(out), 7)
|
|
out = model.test_on_batch([input_a_np, input_b_np],
|
|
[output_d_np, output_e_np])
|
|
self.assertEqual(len(out), 7)
|
|
|
|
# Test evaluate with dictionary inputs
|
|
model.evaluate(
|
|
{
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, {
|
|
'dense': output_d_np,
|
|
'dropout': output_e_np
|
|
},
|
|
batch_size=5,
|
|
verbose=0)
|
|
model.evaluate(
|
|
{
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
}, {
|
|
'dense': output_d_np,
|
|
'dropout': output_e_np
|
|
},
|
|
batch_size=5,
|
|
verbose=1)
|
|
|
|
# Test predict
|
|
out = model.predict([input_a_np, input_b_np], batch_size=5)
|
|
self.assertEqual(len(out), 2)
|
|
out = model.predict({'input_a': input_a_np, 'input_b': input_b_np})
|
|
self.assertEqual(len(out), 2)
|
|
out = model.predict_on_batch({
|
|
'input_a': input_a_np,
|
|
'input_b': input_b_np
|
|
})
|
|
self.assertEqual(len(out), 2)
|
|
|
|
def _make_sequence_input_functions(self, input_type):
|
|
# train and test
|
|
xy_namedtuple = collections.namedtuple('xy_namedtuple', ['x', 'y'])
|
|
|
|
# predict
|
|
x_namedtuple = collections.namedtuple('x_namedtuple', ['x'])
|
|
|
|
if input_type == 'dataset':
|
|
dataset = dataset_ops.Dataset.range(16).map(
|
|
lambda _: array_ops.ones(shape=(1,)))
|
|
|
|
xy_dataset = dataset_ops.Dataset.zip((dataset, dataset)).batch(4)
|
|
x_dataset = dataset.batch(4)
|
|
def xy_function(use_namedtuple):
|
|
return xy_dataset.map(xy_namedtuple) if use_namedtuple else xy_dataset
|
|
|
|
def x_function(use_namedtuple):
|
|
return x_dataset.map(x_namedtuple) if use_namedtuple else x_dataset
|
|
|
|
return xy_function, x_function
|
|
|
|
elif input_type == 'generator':
|
|
def xy_generator(use_namedtuple):
|
|
x, y = np.ones((4, 1)), np.ones((4, 1))
|
|
for _ in range(4):
|
|
if use_namedtuple:
|
|
yield xy_namedtuple(x, y)
|
|
else:
|
|
yield x, y
|
|
|
|
def x_generator(use_namedtuple):
|
|
x = np.ones((4, 1))
|
|
for _ in range(4):
|
|
if use_namedtuple:
|
|
yield x_namedtuple(x)
|
|
else:
|
|
yield x
|
|
|
|
return xy_generator, x_generator
|
|
|
|
elif input_type == 'sequence':
|
|
class XYSequence(data_utils.Sequence):
|
|
|
|
def __init__(self, use_namedtuple):
|
|
self._use_namedtuple = use_namedtuple
|
|
super(XYSequence, self).__init__()
|
|
|
|
def __getitem__(self, idx):
|
|
x, y = np.ones((4, 1)), np.ones((4, 1))
|
|
if self._use_namedtuple:
|
|
return xy_namedtuple(x, y)
|
|
return x, y
|
|
|
|
def __len__(self):
|
|
return 4
|
|
|
|
class XSequence(data_utils.Sequence):
|
|
|
|
def __init__(self, use_namedtuple):
|
|
self._use_namedtuple = use_namedtuple
|
|
super(XSequence, self).__init__()
|
|
|
|
def __getitem__(self, idx):
|
|
x = np.ones((4, 1))
|
|
if self._use_namedtuple:
|
|
return x_namedtuple(x)
|
|
return x
|
|
|
|
def __len__(self):
|
|
return 4
|
|
|
|
return XYSequence, XSequence
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
@keras_parameterized.run_with_all_model_types
|
|
@parameterized.named_parameters(
|
|
('dataset', 'dataset'),
|
|
('generator', 'generator'),
|
|
('sequence', 'sequence'),
|
|
)
|
|
def test_sequence_input_types(self, input_type):
|
|
"""Ensure that namedtuples and tuples are plumbed identically."""
|
|
if not context.executing_eagerly():
|
|
self.skipTest('Improved checking is only present in data_adapter.')
|
|
|
|
xy_function, x_function = self._make_sequence_input_functions(input_type)
|
|
fit_kwargs, evaluate_kwargs, predict_kwargs = {}, {}, {}
|
|
if input_type == 'generator':
|
|
fit_kwargs['steps_per_epoch'] = 4
|
|
evaluate_kwargs['steps'] = 4
|
|
predict_kwargs['steps'] = 4
|
|
|
|
model = testing_utils.get_small_mlp(1, 1, 1)
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer='sgd',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
model.fit(xy_function(use_namedtuple=False), **fit_kwargs)
|
|
model.evaluate(xy_function(use_namedtuple=False), **evaluate_kwargs)
|
|
model.predict(x_function(use_namedtuple=False), **predict_kwargs)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_custom_mapping_in_config(self):
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def call(self, inputs):
|
|
return inputs
|
|
|
|
def get_config(self):
|
|
self.a = {}
|
|
return {'a': self.a}
|
|
|
|
model = MyModel()
|
|
self.assertIn('{"a": {}}', model.to_json())
|
|
|
|
def test_training_on_sparse_data_with_dense_placeholders_v1(self):
|
|
with ops.Graph().as_default():
|
|
if scipy_sparse is None:
|
|
return
|
|
|
|
test_inputs = [
|
|
scipy_sparse.random(6, 3, density=0.25).tocsr() for _ in range(2)
|
|
]
|
|
test_outputs = [
|
|
scipy_sparse.random(6, i, density=0.25).tocsr() for i in range(3, 5)
|
|
]
|
|
in1 = layers_module.Input(shape=(3,))
|
|
in2 = layers_module.Input(shape=(3,))
|
|
out1 = layers_module.Dropout(0.5, name='dropout')(in1)
|
|
out2 = layers_module.Dense(4, name='dense_1')(in2)
|
|
model = training_module.Model([in1, in2], [out1, out2])
|
|
model.predict(test_inputs, batch_size=2)
|
|
optimizer = 'rmsprop'
|
|
model.compile(
|
|
optimizer,
|
|
'mse',
|
|
metrics=['mae', metrics_module.CategoricalAccuracy()])
|
|
model.fit(test_inputs, test_outputs,
|
|
epochs=1, batch_size=2, validation_split=0.5)
|
|
model.evaluate(test_inputs, test_outputs, batch_size=2)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_compile_with_sparse_placeholders(self):
|
|
inputs = layers_module.Input(shape=(10,), sparse=True)
|
|
weights = variables_lib.Variable(
|
|
np.ones((10, 1)).astype(np.float32), name='weights')
|
|
weights_mult = lambda x: sparse_ops.sparse_tensor_dense_matmul(x, weights)
|
|
output_layer = layers_module.Lambda(weights_mult)(inputs)
|
|
model = training_module.Model([inputs], output_layer)
|
|
model.compile(
|
|
loss='binary_crossentropy',
|
|
optimizer='adam',
|
|
metrics=['accuracy'],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_that_trainable_disables_updates(self):
|
|
val_a = np.random.random((10, 4))
|
|
val_out = np.random.random((10, 4))
|
|
|
|
a = layers_module.Input(shape=(4,))
|
|
layer = layers_module.BatchNormalization(input_shape=(4,))
|
|
b = layer(a)
|
|
model = training_module.Model(a, b)
|
|
|
|
model.trainable = False
|
|
if not ops.executing_eagerly_outside_functions():
|
|
self.assertEmpty(model.updates)
|
|
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
if not ops.executing_eagerly_outside_functions():
|
|
self.assertEmpty(model.updates)
|
|
|
|
x1 = model.predict(val_a)
|
|
model.train_on_batch(val_a, val_out)
|
|
x2 = model.predict(val_a)
|
|
self.assertAllClose(x1, x2, atol=1e-7)
|
|
|
|
model.trainable = True
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
if not ops.executing_eagerly_outside_functions():
|
|
self.assertAllGreater(len(model.updates), 0)
|
|
|
|
model.train_on_batch(val_a, val_out)
|
|
x2 = model.predict(val_a)
|
|
assert np.abs(np.sum(x1 - x2)) > 1e-5
|
|
|
|
layer.trainable = False
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
if not ops.executing_eagerly_outside_functions():
|
|
self.assertEmpty(model.updates)
|
|
|
|
x1 = model.predict(val_a)
|
|
model.train_on_batch(val_a, val_out)
|
|
x2 = model.predict(val_a)
|
|
self.assertAllClose(x1, x2, atol=1e-7)
|
|
|
|
def test_weight_deduplication_in_methods(self):
|
|
inp = layers_module.Input(shape=(1,))
|
|
bn = layers_module.BatchNormalization()
|
|
d = layers_module.Dense(1)
|
|
|
|
m0 = training_module.Model(inp, d(bn(inp)))
|
|
m1 = training_module.Model(inp, d(bn(inp)))
|
|
|
|
x0 = m0(inp)
|
|
x1 = m1(inp)
|
|
x = layers_module.Add()([x0, x1])
|
|
|
|
model = training_module.Model(inp, x)
|
|
self.assertLen(model.trainable_weights, 4)
|
|
self.assertLen(model.non_trainable_weights, 2)
|
|
self.assertLen(model.weights, 6)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_weight_deduplication(self):
|
|
|
|
class WatchingLayer(layers_module.Layer):
|
|
|
|
def __init__(self, dense_to_track):
|
|
# This will cause the kernel and bias to be double counted, effectively
|
|
# doubling the learning rate if weights are not deduped.
|
|
self._kernel = dense_to_track.kernel
|
|
self._bias = dense_to_track.bias
|
|
super(WatchingLayer, self).__init__()
|
|
|
|
inp = layers_module.Input(shape=(1,))
|
|
dense_layer = layers_module.Dense(1)
|
|
dense_output = dense_layer(inp) # This will build the dense kernel
|
|
|
|
# Deterministically set weights to make the test repeatable.
|
|
dense_layer.set_weights([np.ones((1, 1)), np.zeros((1,))])
|
|
output = WatchingLayer(dense_layer)(dense_output)
|
|
|
|
model = training_module.Model(inp, output)
|
|
|
|
# 0.25 is the edge of the radius of convergence for the double apply case.
|
|
# At lr=0.24, the double apply case will very slowly descend while the
|
|
# correct case will drop very quickly.
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=optimizer_v2.gradient_descent.SGD(0.24),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones((64 * 2,))
|
|
y = 4.5 * x - 3.
|
|
|
|
history = model.fit(x, y, batch_size=64, epochs=2, verbose=2)
|
|
|
|
# If the gradient apply is duplicated then the loss after 2 epochs will
|
|
# be ~0.15, compared to the correct answer of O(1e-7).
|
|
self.assertLess(history.history['loss'][-1], 1e-6)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_weight_shared_across_layers(self):
|
|
|
|
class AddWeightLayer(layers_module.Layer):
|
|
|
|
def __init__(self, trainable_var, non_trainable_var):
|
|
self.trainable_var = trainable_var
|
|
self.non_trainable_var = non_trainable_var
|
|
super(AddWeightLayer, self).__init__()
|
|
|
|
def call(self, inputs):
|
|
return inputs + self.trainable_var
|
|
|
|
class LayerWithWeightSharedLayers(layers_module.Layer):
|
|
|
|
def __init__(self):
|
|
super(LayerWithWeightSharedLayers, self).__init__()
|
|
shared_trainable_var = variables_lib.Variable(1.)
|
|
shared_non_trainable_var = variables_lib.Variable(
|
|
1., trainable=False)
|
|
self.layer1 = AddWeightLayer(shared_trainable_var,
|
|
shared_non_trainable_var)
|
|
self.layer2 = AddWeightLayer(shared_trainable_var,
|
|
shared_non_trainable_var)
|
|
|
|
def call(self, inputs):
|
|
return self.layer2(self.layer1(inputs))
|
|
|
|
l = LayerWithWeightSharedLayers()
|
|
layers = list(l._flatten_layers(include_self=False, recursive=False))
|
|
self.assertEqual(layers, [l.layer1, l.layer2])
|
|
self.assertEqual(l.variables,
|
|
[l.layer1.trainable_var, l.layer1.non_trainable_var])
|
|
self.assertEqual(l.trainable_variables, [l.layer1.trainable_var])
|
|
self.assertEqual(l.non_trainable_variables, [l.layer1.non_trainable_var])
|
|
self.assertLen(l.get_weights(), 2)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_weight_tracking_for_template(self):
|
|
def variable_scoped_function(trainable=True):
|
|
return variable_scope.get_variable(
|
|
'dummy', shape=[1], trainable=trainable,
|
|
initializer=init_ops.zeros_initializer())
|
|
def nested_template():
|
|
nested1 = template.make_template('nested', variable_scoped_function)
|
|
nested2 = template.make_template('nested', variable_scoped_function)
|
|
v1 = nested1()
|
|
v2 = nested2()
|
|
|
|
# nested1 and nested2 should not share variables
|
|
self.assertIsNot(v1, v2)
|
|
|
|
# Variables created by nested1 should be isolated from variables
|
|
# created by nested2.
|
|
self.assertEqual(1, len(nested1.variables))
|
|
self.assertEqual(1, len(nested2.variables))
|
|
self.assertIs(nested1.variables[0], v1)
|
|
self.assertIs(nested2.variables[0], v2)
|
|
self.assertEqual(1, len(nested1.trainable_variables))
|
|
self.assertEqual(1, len(nested2.trainable_variables))
|
|
self.assertIs(nested1.trainable_variables[0], v1)
|
|
self.assertIs(nested2.trainable_variables[0], v2)
|
|
self.assertEqual(len(nested1.non_trainable_variables), 0)
|
|
self.assertEqual(len(nested2.non_trainable_variables), 0)
|
|
return v1, v2
|
|
|
|
tmpl1 = template.make_template('s1', nested_template)
|
|
tmpl2 = template.make_template('s1', nested_template)
|
|
|
|
v1, v2 = tmpl1()
|
|
v5, v6 = tmpl2()
|
|
|
|
model = training_module.Model()
|
|
model.template = tmpl1
|
|
self.assertEqual(2, len(model.variables))
|
|
self.assertIs(model.variables[0], v1)
|
|
self.assertIs(model.variables[1], v2)
|
|
self.assertEqual(2, len(model.variables))
|
|
self.assertIs(model.trainable_variables[0], v1)
|
|
self.assertIs(model.trainable_variables[1], v2)
|
|
self.assertEqual(len(model.non_trainable_variables), 0)
|
|
model.templates = [tmpl2]
|
|
for v, w in zip(model.variables, [v1, v2, v5, v6]):
|
|
self.assertIs(v, w)
|
|
for v, w in zip(model.trainable_variables, [v1, v2, v5, v6]):
|
|
self.assertIs(v, w)
|
|
self.assertEqual(len(model.non_trainable_variables), 0)
|
|
# Make sure losses, layers, and updates aren't broken by having a Template
|
|
# in the mix, which does not expose any updates or losses.
|
|
self.assertEqual([], model.layers)
|
|
self.assertEqual([], model.updates)
|
|
self.assertEqual([], model.losses)
|
|
self.assertEqual([], model.templates.layers)
|
|
self.assertEqual([], model.templates.updates)
|
|
self.assertEqual([], model.templates.losses)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_logs_passed_to_callbacks(self):
|
|
input_dim = 5
|
|
num_classes = 1
|
|
|
|
class TestCallback(Callback):
|
|
|
|
def __init__(self):
|
|
super(TestCallback, self).__init__()
|
|
self.epoch_end_logs = None
|
|
self.batch_end_logs = None
|
|
self.epoch_end_call_count = 0
|
|
self.batch_end_call_count = 0
|
|
|
|
def on_epoch_end(self, epoch, logs=None):
|
|
self.epoch_end_logs = logs
|
|
self.epoch_end_call_count += 1
|
|
|
|
def on_batch_end(self, batch, logs=None):
|
|
self.batch_end_logs = logs
|
|
self.batch_end_call_count += 1
|
|
|
|
model = testing_utils.get_small_sequential_mlp(
|
|
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
|
|
model.compile(
|
|
loss='binary_crossentropy',
|
|
metrics=['acc'],
|
|
weighted_metrics=['mae'],
|
|
optimizer=RMSPropOptimizer(learning_rate=0.01),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
np.random.seed(1337)
|
|
(x_train, y_train), (_, _) = testing_utils.get_test_data(
|
|
train_samples=10,
|
|
test_samples=10,
|
|
input_shape=(input_dim,),
|
|
num_classes=num_classes)
|
|
|
|
test_callback = TestCallback()
|
|
model.fit(
|
|
x_train,
|
|
y_train,
|
|
batch_size=2,
|
|
epochs=2,
|
|
verbose=0,
|
|
callbacks=[test_callback],
|
|
validation_data=(x_train, y_train))
|
|
self.assertEqual(test_callback.batch_end_call_count, 10)
|
|
self.assertEqual(test_callback.epoch_end_call_count, 2)
|
|
|
|
self.assertSetEqual(
|
|
set(test_callback.batch_end_logs.keys()), set(['acc', 'loss', 'mae']))
|
|
self.assertSetEqual(
|
|
set(test_callback.epoch_end_logs.keys()),
|
|
set(['acc', 'loss', 'mae', 'val_acc', 'val_loss', 'val_mae']))
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_mismatched_output_shape_and_target_shape(self):
|
|
model = sequential.Sequential([
|
|
layers_module.Dense(2, input_shape=(3, 4)),
|
|
layers_module.Dense(5),
|
|
])
|
|
model.compile(
|
|
RMSPropOptimizer(learning_rate=0.001),
|
|
loss='sparse_categorical_crossentropy',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
# Test with Numpy data
|
|
x_train = np.random.random((10, 3, 4)).astype(np.float32)
|
|
y_train = np.random.randint(0, 5, size=(10, 3)).astype(np.float32)
|
|
model.fit(x_train, y_train, batch_size=5, epochs=1)
|
|
|
|
# Test with iterator
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((x_train, y_train))
|
|
dataset = dataset.repeat(10)
|
|
dataset = dataset.batch(10)
|
|
model.fit(dataset, epochs=1, steps_per_epoch=2)
|
|
|
|
if context.executing_eagerly():
|
|
# Test with eager execution
|
|
model.compile(RMSPropOptimizer(learning_rate=0.001),
|
|
loss='sparse_categorical_crossentropy',
|
|
run_eagerly=True)
|
|
model.fit(x_train, y_train, batch_size=5, epochs=1)
|
|
|
|
# Test with eager execution and iterator
|
|
model.fit(dataset, epochs=1, steps_per_epoch=2)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_losses_in_defun(self):
|
|
layer = layers_module.Dense(1, kernel_regularizer='l1')
|
|
layer(array_ops.ones([1, 10]))
|
|
|
|
@def_function.function
|
|
def get_losses():
|
|
return layer.losses
|
|
|
|
self.assertAllEqual(
|
|
self.evaluate(layer.losses), self.evaluate(get_losses()))
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_logging(self):
|
|
mock_stdout = io.BytesIO() if six.PY2 else io.StringIO()
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(10, activation='relu'))
|
|
model.add(layers_module.Dense(1, activation='sigmoid'))
|
|
model.compile(
|
|
RMSPropOptimizer(learning_rate=0.001),
|
|
loss='binary_crossentropy',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
with test.mock.patch.object(sys, 'stdout', mock_stdout):
|
|
model.fit(
|
|
np.ones((10, 10), 'float32'), np.ones((10, 1), 'float32'), epochs=10)
|
|
self.assertTrue('Epoch 5/10' in mock_stdout.getvalue())
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_training_with_loss_instance(self):
|
|
a = layers_module.Input(shape=(3,), name='input_a')
|
|
b = layers_module.Input(shape=(3,), name='input_b')
|
|
|
|
dense = layers_module.Dense(4, name='dense')
|
|
c = dense(a)
|
|
d = dense(b)
|
|
e = layers_module.Dropout(0.5, name='dropout')(c)
|
|
|
|
model = training_module.Model([a, b], [d, e])
|
|
loss_weights = [1., 0.5]
|
|
model.compile(
|
|
RMSPropOptimizer(learning_rate=0.001),
|
|
loss=losses.MeanSquaredError(),
|
|
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
|
|
loss_weights=loss_weights)
|
|
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 3))
|
|
|
|
output_d_np = np.random.random((10, 4))
|
|
output_e_np = np.random.random((10, 4))
|
|
|
|
model.fit([input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=1,
|
|
batch_size=5)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_static_batch_in_input_layer(self):
|
|
if context.executing_eagerly():
|
|
self.skipTest('Not inferred in eager.')
|
|
|
|
class Counter(Callback):
|
|
|
|
def __init__(self):
|
|
self.batches = 0
|
|
|
|
def on_batch_end(self, batch, logs=None):
|
|
self.batches += 1
|
|
|
|
x, y = np.ones((64, 10), 'float32'), np.ones((64, 1), 'float32')
|
|
|
|
for batch_size, expected_batches in [(None, 2), (4, 16)]:
|
|
inputs = input_layer.Input(batch_size=batch_size, shape=(10,))
|
|
outputs = layers_module.Dense(1, activation='sigmoid')(inputs)
|
|
model = training_module.Model(inputs, outputs)
|
|
|
|
model.compile(optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
|
|
counter = Counter()
|
|
model.fit(x, y, callbacks=[counter])
|
|
self.assertEqual(counter.batches, expected_batches)
|
|
|
|
model = sequential.Sequential(
|
|
[layers_module.Dense(1, batch_input_shape=(batch_size, 10))])
|
|
model.compile(optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
|
|
counter = Counter()
|
|
model.fit(x, y, callbacks=[counter])
|
|
self.assertEqual(counter.batches, expected_batches)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_static_batch_in_input_layer_consistency_checks(self):
|
|
if context.executing_eagerly():
|
|
self.skipTest('Not inferred in eager.')
|
|
x, y = np.ones((64, 10), 'float32'), np.ones((64, 1), 'float32')
|
|
|
|
inputs = input_layer.Input(batch_size=2, shape=(10,))
|
|
outputs = layers_module.Dense(1, activation='sigmoid')(inputs)
|
|
model = training_module.Model(inputs, outputs)
|
|
model.compile(optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
|
|
with self.assertRaisesRegex(ValueError,
|
|
'incompatible with the specified batch size'):
|
|
model.fit(x, y, batch_size=4)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_compatible_batch_size_functional_model(self):
|
|
|
|
class MyLayer(layers_module.Layer):
|
|
|
|
def call(self, inputs):
|
|
return array_ops.concat(inputs, axis=0)
|
|
|
|
input1 = input_layer.Input(batch_size=2, shape=(10,))
|
|
input2 = input_layer.Input(batch_size=3, shape=(10,))
|
|
outputs = MyLayer()([input1, input2])
|
|
with self.assertRaisesRegex(ValueError,
|
|
'specified batch sizes of the Input Layers'):
|
|
training_module.Model([input1, input2], outputs)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_calling_subclass_model_on_different_datasets(self):
|
|
|
|
class SubclassedModel(training_module.Model):
|
|
|
|
def call(self, inputs):
|
|
return inputs * 2
|
|
|
|
model = SubclassedModel()
|
|
dataset_one = dataset_ops.Dataset.range(2).batch(2)
|
|
dataset_two = dataset_ops.Dataset.range(3, 10).batch(2)
|
|
self.assertAllEqual([[0], [2]], model.predict(dataset_one, steps=1))
|
|
self.assertAllEqual([[6], [8], [10], [12]],
|
|
model.predict(dataset_two, steps=2))
|
|
|
|
@combinations.generate(combinations.combine(mode=['eager']))
|
|
def test_training_on_sparse_categorical_crossentropy_loss_with_softmax(self):
|
|
np.random.seed(1337)
|
|
train_x = np.ones((100, 4))
|
|
train_y = np.random.randint(0, 1, size=(100, 1))
|
|
|
|
reference_model = testing_utils.get_small_sequential_mlp(16, 2,
|
|
input_dim=4)
|
|
reference_model.compile(loss='sparse_categorical_crossentropy',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=True)
|
|
fixed_weights = reference_model.get_weights()
|
|
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
|
|
|
|
test_model = testing_utils.get_small_sequential_mlp(16, 2, input_dim=4)
|
|
test_model.compile(loss='sparse_categorical_crossentropy',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=False)
|
|
test_model.set_weights(fixed_weights)
|
|
test_model_loss = test_model.train_on_batch(train_x, train_y)
|
|
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
|
|
|
|
@combinations.generate(combinations.combine(mode=['eager']))
|
|
def test_training_on_categorical_crossentropy_loss_with_softmax(self):
|
|
np.random.seed(1337)
|
|
train_x = np.ones((100, 4))
|
|
train_y = np_utils.to_categorical(
|
|
np.random.randint(0, 1, size=(100, 1)), 2)
|
|
|
|
reference_model = testing_utils.get_small_sequential_mlp(16, 2,
|
|
input_dim=4)
|
|
reference_model.compile(loss='categorical_crossentropy',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=True)
|
|
fixed_weights = reference_model.get_weights()
|
|
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
|
|
|
|
test_model = testing_utils.get_small_sequential_mlp(16, 2, input_dim=4)
|
|
test_model.compile(loss='categorical_crossentropy',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=False)
|
|
test_model.set_weights(fixed_weights)
|
|
test_model_loss = test_model.train_on_batch(train_x, train_y)
|
|
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
|
|
|
|
@combinations.generate(combinations.combine(mode=['eager']))
|
|
def test_training_on_binary_crossentropy_loss(self):
|
|
train_x = np.ones((100, 4), dtype=np.float32)
|
|
train_y = np.ones((100, 1), dtype=np.float32)
|
|
reference_model = testing_utils.get_small_sequential_mlp(16, 1,
|
|
input_dim=4)
|
|
reference_model.compile(loss='binary_crossentropy',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=True)
|
|
fixed_weights = reference_model.get_weights()
|
|
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
|
|
|
|
test_model = testing_utils.get_small_sequential_mlp(16, 1, input_dim=4)
|
|
test_model.compile(loss='binary_crossentropy',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=False)
|
|
test_model.set_weights(fixed_weights)
|
|
test_model_loss = test_model.train_on_batch(train_x, train_y)
|
|
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
@parameterized.named_parameters(
|
|
('default', 1, 4), ('integer_two', 2, 2), ('integer_four', 4, 1),
|
|
('simple_list', [1, 3, 4], 3), ('duplicated_list', [4, 2, 2], 2))
|
|
def test_validation_freq(self, validation_freq, expected_runs):
|
|
x, y = np.ones((10, 10)), np.ones((10, 1))
|
|
model = testing_utils.get_small_mlp(2, 1, 10)
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
class ValCounter(Callback):
|
|
|
|
def __init__(self):
|
|
self.val_runs = 0
|
|
|
|
def on_test_begin(self, logs=None):
|
|
self.val_runs += 1
|
|
|
|
val_counter = ValCounter()
|
|
model.fit(
|
|
x,
|
|
y,
|
|
epochs=4,
|
|
validation_data=(x, y),
|
|
validation_freq=validation_freq,
|
|
callbacks=[val_counter])
|
|
self.assertEqual(val_counter.val_runs, expected_runs)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_validation_steps_without_data(self):
|
|
if context.executing_eagerly():
|
|
self.skipTest('Check removed in new `fit`')
|
|
x, y = np.ones((10, 10)), np.ones((10, 1))
|
|
model = testing_utils.get_small_mlp(2, 1, 10)
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
with self.assertRaisesRegex(
|
|
ValueError, '`validation_steps` should not be specified if '
|
|
'`validation_data` is None.'):
|
|
model.fit(x, y, epochs=4, validation_data=None, validation_steps=3)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_layer_with_variable_output(self):
|
|
|
|
class VariableOutputLayer(layers_module.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.v = self.add_weight('output_var', shape=(2, 5), initializer='ones')
|
|
|
|
def call(self, inputs):
|
|
return self.v
|
|
|
|
model = testing_utils.get_model_from_layers(
|
|
[VariableOutputLayer(), layers_module.Dense(1)], input_shape=(10,))
|
|
# TODO(omalleyt): Make this work with `run_eagerly=True`.
|
|
model.compile('sgd', 'mse', run_eagerly=False)
|
|
model.fit(np.ones((10, 10)), np.ones((10, 1)), batch_size=2, epochs=5)
|
|
|
|
self.assertLen(model.trainable_variables, 3)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_model_dtype(self):
|
|
|
|
class AssertTypeLayer(layers_module.Layer):
|
|
|
|
def call(self, inputs):
|
|
assert inputs.dtype.name == self.dtype, (
|
|
'Input tensor has type %s which does not match assert type %s' %
|
|
(inputs.dtype.name, self.assert_type))
|
|
return inputs + 1.
|
|
|
|
for dtype in ('float16', 'float32', 'float64'):
|
|
model = testing_utils.get_model_from_layers(
|
|
[AssertTypeLayer(dtype=dtype)], input_shape=(10,))
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones((10, 10))
|
|
y = np.ones((10, 10))
|
|
model.fit(x, y)
|
|
model.test_on_batch(x, y)
|
|
model(x)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_model_input_dtype(self):
|
|
model = testing_utils.get_small_mlp(1, 10, 10)
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x = np.ones((10, 10)).astype(np.float64)
|
|
y = np.ones((10, 10)).astype(np.float64)
|
|
dataset = dataset_ops.Dataset.from_tensor_slices((x, y)).batch(2)
|
|
model.fit(dataset)
|
|
self.assertEqual(model._compute_dtype, 'float32')
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_subclassed_model_with_training_arg(self):
|
|
|
|
class LayerWithTrainingArg(layers_module.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
self.training = training
|
|
return inputs
|
|
|
|
class ModelWithTrainingArg(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(ModelWithTrainingArg, self).__init__()
|
|
self.l1 = LayerWithTrainingArg()
|
|
|
|
def call(self, inputs, training=None):
|
|
self.training = training
|
|
inputs = self.l1(inputs, training=training)
|
|
return inputs
|
|
|
|
x = np.zeros((1, 2))
|
|
model = ModelWithTrainingArg()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer='sgd',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, x, epochs=1)
|
|
|
|
if context.executing_eagerly():
|
|
expected_training_arg = True
|
|
else:
|
|
expected_training_arg = backend.symbolic_learning_phase()
|
|
|
|
self.assertIs(model.training, expected_training_arg)
|
|
self.assertIs(model.l1.training, expected_training_arg)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_error_when_model_is_not_compiled(self):
|
|
inputs = input_layer.Input(shape=(1,))
|
|
outputs = layers_module.Dense(1)(inputs)
|
|
model = training_module.Model(inputs, outputs)
|
|
with self.assertRaisesRegex(RuntimeError, 'must compile your model'):
|
|
model.fit(np.ones((1, 1)), np.ones((1, 1)))
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def call(self, x):
|
|
self.add_loss(math_ops.reduce_sum(x))
|
|
return x
|
|
|
|
model = MyModel()
|
|
with self.assertRaisesRegex(RuntimeError, 'must compile your model'):
|
|
model.fit(np.random.random((32, 1)), epochs=2)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_losses_of_different_dtypes(self):
|
|
inp = input_layer.Input(shape=(2,))
|
|
out_1 = layers_module.Dense(
|
|
2, dtype='float32', kernel_regularizer='l2')(
|
|
inp)
|
|
out_2 = layers_module.Dense(
|
|
2, dtype='float16', kernel_regularizer='l2')(
|
|
inp)
|
|
model = training_module.Model(inp, [out_1, out_2])
|
|
extra_loss = math_ops.reduce_sum(math_ops.cast(out_2, 'float64'))
|
|
model.add_loss(extra_loss)
|
|
model.compile('sgd', ['mse', 'mse'],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x, y = np.ones((10, 2)), np.ones((10, 2))
|
|
model.fit(x, [y, y])
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_losses_of_different_dtypes_with_subclassed_model(self):
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def build(self, _):
|
|
self.dense = layers_module.Dense(2)
|
|
|
|
def call(self, inputs):
|
|
self.add_loss(math_ops.cast(nn_ops.l2_loss(inputs), 'float64'))
|
|
return self.dense(inputs)
|
|
|
|
model = MyModel(dtype='float32')
|
|
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
x, y = np.ones((10, 2)), np.ones((10, 2))
|
|
model.fit(x, y)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_regularizer_of_different_dtype(self):
|
|
inp = input_layer.Input(shape=(2,))
|
|
|
|
def regularizer(weight):
|
|
return math_ops.cast(nn_ops.l2_loss(weight), 'float64')
|
|
|
|
out = layers_module.Dense(
|
|
2, dtype='float32', kernel_regularizer=regularizer)(
|
|
inp)
|
|
model = training_module.Model(inp, out)
|
|
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
x, y = np.ones((10, 2)), np.ones((10, 2))
|
|
model.fit(x, y)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_outputs_are_floats(self):
|
|
x, y = np.ones((10, 1)), np.ones((10, 1))
|
|
model = sequential.Sequential([layers_module.Dense(1)])
|
|
model.compile('sgd', 'mse', metrics=['accuracy'],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
history = model.fit(x, y, epochs=2)
|
|
self.assertIsInstance(history.history['loss'][0], float)
|
|
self.assertIsInstance(history.history['accuracy'][0], float)
|
|
|
|
loss, accuracy = model.train_on_batch(x, y)
|
|
self.assertIsInstance(loss, float)
|
|
self.assertIsInstance(accuracy, float)
|
|
|
|
loss, accuracy = model.evaluate(x, y)
|
|
self.assertIsInstance(loss, float)
|
|
self.assertIsInstance(accuracy, float)
|
|
|
|
loss, accuracy = model.test_on_batch(x, y)
|
|
self.assertIsInstance(loss, float)
|
|
self.assertIsInstance(accuracy, float)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_int_output(self):
|
|
x, y = np.ones((10, 1)), np.ones((10, 1))
|
|
model = sequential.Sequential([layers_module.Dense(1)])
|
|
|
|
class MyMetric(metrics_module.Metric):
|
|
|
|
def update_state(self, y_true, y_pred, sample_weight=None):
|
|
del y_true, y_pred, sample_weight
|
|
|
|
def result(self):
|
|
return array_ops.constant(1, dtype='int64')
|
|
|
|
model.compile('sgd', 'mse', metrics=[MyMetric()],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
history = model.fit(x, y, epochs=2)
|
|
self.assertIsInstance(history.history['my_metric'][0], int)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_calling_aggregate_gradient(self):
|
|
|
|
class _Optimizer(optimizer_v2.gradient_descent.SGD):
|
|
"""Mock optimizer to check if _aggregate_gradient is called."""
|
|
|
|
_HAS_AGGREGATE_GRAD = True
|
|
|
|
def __init__(self):
|
|
self.aggregate_gradients_called = False
|
|
super(_Optimizer, self).__init__(name='MyOptimizer')
|
|
|
|
def _aggregate_gradients(self, grads):
|
|
self.aggregate_gradients_called = True
|
|
return super(_Optimizer, self)._aggregate_gradients(grads)
|
|
|
|
mock_optimizer = _Optimizer()
|
|
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(10, activation='relu'))
|
|
|
|
model.compile(mock_optimizer, 'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x, y = np.ones((10, 10)), np.ones((10, 10))
|
|
model.fit(x, y)
|
|
self.assertEqual(model.optimizer.aggregate_gradients_called, True)
|
|
|
|
class _OptimizerOverrideApplyGradients(_Optimizer):
|
|
"""Override apply_gradients.
|
|
|
|
To test the case where the optimizer does not define the
|
|
experimental_aggregate_gradients parameter.
|
|
"""
|
|
|
|
_HAS_AGGREGATE_GRAD = False
|
|
|
|
def apply_gradients(self, grads_and_vars, name=None): # pylint: disable=useless-super-delegation
|
|
return super(_OptimizerOverrideApplyGradients,
|
|
self).apply_gradients(grads_and_vars, name)
|
|
|
|
mock_optimizer = _OptimizerOverrideApplyGradients()
|
|
model.compile(mock_optimizer, 'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x, y = np.ones((10, 10)), np.ones((10, 10))
|
|
model.fit(x, y)
|
|
self.assertEqual(model.optimizer.aggregate_gradients_called, True)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_gradients_are_none(self):
|
|
|
|
class DenseWithExtraWeight(layers_module.Dense):
|
|
|
|
def build(self, input_shape):
|
|
# Gradients w.r.t. extra_weights are None
|
|
self.extra_weight_1 = self.add_weight('extra_weight_1', shape=(),
|
|
initializer='ones')
|
|
super(DenseWithExtraWeight, self).build(input_shape)
|
|
self.extra_weight_2 = self.add_weight('extra_weight_2', shape=(),
|
|
initializer='ones')
|
|
|
|
model = sequential.Sequential([DenseWithExtraWeight(4, input_shape=(4,))])
|
|
# Test clipping can handle None gradients
|
|
opt = optimizer_v2.adam.Adam(clipnorm=1.0, clipvalue=1.0)
|
|
model.compile(opt, 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
inputs = np.random.normal(size=(64, 4))
|
|
targets = np.random.normal(size=(64, 4))
|
|
old_kernel = model.get_weights()[1]
|
|
model.fit(inputs, targets)
|
|
new_kernel = model.get_weights()[1]
|
|
self.assertNotAllEqual(old_kernel, new_kernel)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_layer_ordering(self):
|
|
|
|
class MyLayer(layers_module.Layer):
|
|
pass
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def __init__(self, name):
|
|
super(MyModel, self).__init__(name=name)
|
|
|
|
self.weight = variables_lib.Variable(0, name=name)
|
|
|
|
self.direct_sublayer = MyLayer(name='direct')
|
|
self.direct_sublayer.d = {'d': MyLayer(name='direct/dict')}
|
|
|
|
self.dict_sublayer = {'d': MyLayer(name='dict')}
|
|
self.dict_sublayer['d'].direct = MyLayer(name='dict/direct')
|
|
|
|
model = MyModel('model')
|
|
# All sublayers, including self and recursive sublayers.
|
|
self.assertEqual(['model', 'direct', 'direct/dict', 'dict', 'dict/direct'],
|
|
[l.name for l in model._flatten_layers()])
|
|
# Only direct sublayers, including those in data structures.
|
|
self.assertEqual(['direct', 'dict'], [l.name for l in model.layers])
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_trainable_state_setting(self):
|
|
|
|
class UpdateLayer(layers_module.Layer):
|
|
|
|
def __init__(self):
|
|
super(UpdateLayer, self).__init__()
|
|
self.v = variables_lib.Variable(0., trainable=False)
|
|
|
|
def call(self, x):
|
|
self.add_update(lambda: self.v.assign_add(1.))
|
|
return x * self.v
|
|
|
|
layer = UpdateLayer()
|
|
model_with_updates = sequential.Sequential([layer])
|
|
model_with_updates.compile(
|
|
'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
layer.trainable = False
|
|
model_without_updates = sequential.Sequential([layer])
|
|
model_without_updates.compile(
|
|
'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x, y = np.ones((10, 1)), np.ones((10, 1))
|
|
|
|
self.assertEqual(self.evaluate(layer.v), 0.)
|
|
model_with_updates.fit(x, y, batch_size=10)
|
|
# assign_add called.
|
|
self.assertEqual(self.evaluate(layer.v), 1.)
|
|
model_without_updates.fit(x, y, batch_size=10)
|
|
# assign_add not called.
|
|
self.assertEqual(self.evaluate(layer.v), 1.)
|
|
|
|
@keras_parameterized.run_all_keras_modes(
|
|
always_skip_v1=True)
|
|
@parameterized.named_parameters(
|
|
('numpy_array', 'numpy_array'),
|
|
('dataset_array', 'dataset_array'),
|
|
('dataset_dict', 'dataset_dict'))
|
|
def test_single_input_no_tuple_wrapping(self, input_type):
|
|
x = np.ones((10, 1))
|
|
|
|
if input_type == 'numpy_array':
|
|
batch_size = 3
|
|
expected_data_type = ops.Tensor
|
|
elif input_type == 'dataset_array':
|
|
x = dataset_ops.Dataset.from_tensor_slices(x).batch(3)
|
|
batch_size = None
|
|
expected_data_type = ops.Tensor
|
|
else:
|
|
x = {'my_input': x}
|
|
x = dataset_ops.Dataset.from_tensor_slices(x).batch(3)
|
|
batch_size = None
|
|
expected_data_type = dict
|
|
|
|
test_case = self
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def train_step(self, data):
|
|
# No tuple wrapping for single x input and no targets.
|
|
test_case.assertIsInstance(data, expected_data_type)
|
|
return super(MyModel, self).train_step(data)
|
|
|
|
def test_step(self, data):
|
|
test_case.assertIsInstance(data, expected_data_type)
|
|
return super(MyModel, self).test_step(data)
|
|
|
|
def predict_step(self, data):
|
|
test_case.assertIsInstance(data, expected_data_type)
|
|
return super(MyModel, self).predict_step(data)
|
|
|
|
inputs = layers_module.Input(shape=(1,), name='my_input')
|
|
outputs = layers_module.Dense(1)(inputs)
|
|
model = MyModel(inputs, outputs)
|
|
model.add_loss(math_ops.reduce_sum(outputs))
|
|
model.compile('sgd', 'mse')
|
|
model.fit(x, batch_size=batch_size)
|
|
model.evaluate(x, batch_size=batch_size)
|
|
model.predict(x, batch_size=batch_size)
|
|
|
|
@keras_parameterized.run_all_keras_modes(
|
|
always_skip_v1=True)
|
|
@parameterized.named_parameters(
|
|
('custom_metrics', False, True),
|
|
('compiled_metrics', True, False),
|
|
('both_compiled_and_custom_metrics', True, True))
|
|
def test_evaluate_with_custom_test_step(
|
|
self, use_compiled_metrics, use_custom_metrics):
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def test_step(self, data):
|
|
x, y = data
|
|
pred = self(x)
|
|
metrics = {}
|
|
if use_compiled_metrics:
|
|
self.compiled_metrics.update_state(y, pred)
|
|
self.compiled_loss(y, pred)
|
|
for metric in self.metrics:
|
|
metrics[metric.name] = metric.result()
|
|
if use_custom_metrics:
|
|
custom_metrics = {
|
|
'mean': math_ops.reduce_mean(pred),
|
|
'sum': math_ops.reduce_sum(pred)
|
|
}
|
|
metrics.update(custom_metrics)
|
|
return metrics
|
|
|
|
inputs = layers_module.Input((2,))
|
|
outputs = layers_module.Dense(3)(inputs)
|
|
model = MyModel(inputs, outputs)
|
|
if use_compiled_metrics:
|
|
model.compile('adam', 'mse', metrics=['mae', 'mape'],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
else:
|
|
model.compile('adam', 'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x = np.random.random((4, 2))
|
|
y = np.random.random((4, 3))
|
|
results_list = model.evaluate(x, y)
|
|
results_dict = model.evaluate(x, y, return_dict=True)
|
|
self.assertLen(results_list, len(results_dict))
|
|
if use_compiled_metrics and use_custom_metrics:
|
|
self.assertLen(results_list, 5)
|
|
self.assertEqual(results_list,
|
|
[results_dict['loss'],
|
|
results_dict['mae'], results_dict['mape'],
|
|
results_dict['mean'], results_dict['sum']])
|
|
if use_compiled_metrics and not use_custom_metrics:
|
|
self.assertLen(results_list, 3)
|
|
self.assertEqual(results_list,
|
|
[results_dict['loss'],
|
|
results_dict['mae'], results_dict['mape']])
|
|
if not use_compiled_metrics and use_custom_metrics:
|
|
self.assertLen(results_list, 2)
|
|
self.assertEqual(results_list,
|
|
[results_dict['mean'], results_dict['sum']])
|
|
|
|
|
|
class TestExceptionsAndWarnings(keras_parameterized.TestCase):
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_compile_warning_for_loss_missing_output(self):
|
|
with self.cached_session():
|
|
inp = layers_module.Input(shape=(16,), name='input_a')
|
|
out_1 = layers_module.Dense(8, name='dense_1')(inp)
|
|
out_2 = layers_module.Dense(
|
|
3, activation='softmax', name='dense_2')(
|
|
out_1)
|
|
model = training_module.Model(inputs=[inp], outputs=[out_1, out_2])
|
|
optimizer = RMSPropOptimizer(learning_rate=0.001)
|
|
|
|
model.compile(
|
|
optimizer,
|
|
loss={
|
|
'dense_2': 'categorical_crossentropy',
|
|
},
|
|
metrics={
|
|
'dense_2': 'categorical_accuracy',
|
|
'dense_1': metrics_module.CategoricalAccuracy(),
|
|
},
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_sparse_op_with_op_layer(self):
|
|
with testing_utils.use_keras_tensors_scope(False):
|
|
# The meaningful error is only raised w/o KerasTensors.
|
|
# It's tricky to raise the exact same error w/ KerasTensors enabled.
|
|
# We may want to add dispatching to the sparse_ops and have dispatch
|
|
# trigger on attributeerror so that these ops fully work w/ KerasTensors.
|
|
# This may need to wait until dispatch v2
|
|
inputs = layers_module.Input(
|
|
shape=(2,), sparse=True, name='sparse_tensor')
|
|
output = sparse_ops.sparse_minimum(inputs, inputs)
|
|
with self.assertRaisesRegex(
|
|
ValueError, 'not supported by Keras automatic '
|
|
'op wrapping'):
|
|
training_module.Model([inputs], output)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_predict_error_with_empty_x(self):
|
|
inputs = layers_module.Input(shape=(2,))
|
|
outputs = layers_module.Dense(4)(inputs)
|
|
model = training_module.Model(inputs=inputs, outputs=outputs)
|
|
model.compile(loss='mse')
|
|
|
|
with self.assertRaisesRegex(ValueError,
|
|
'Expect x to be a non-empty array or dataset.'):
|
|
model.predict(np.array([]))
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_on_batch_error_inconsistent_batch_size(self):
|
|
input_node1 = layers_module.Input(shape=(5,))
|
|
input_node2 = layers_module.Input(shape=(5,))
|
|
output_node = layers_module.Concatenate()([input_node1, input_node2])
|
|
output_node = layers_module.Dense(4)(output_node)
|
|
model = training_module.Model([input_node1, input_node2], output_node)
|
|
model.compile(loss='mse')
|
|
|
|
with self.assertRaisesRegex(ValueError, 'Data cardinality is ambiguous'):
|
|
model.train_on_batch([np.ones((10, 5)), np.ones((10, 5))],
|
|
np.ones((11, 4)))
|
|
|
|
with self.assertRaisesRegex(ValueError, 'Data cardinality is ambiguous'):
|
|
model.test_on_batch([np.ones((10, 5)), np.ones((10, 5))],
|
|
np.ones((11, 4)))
|
|
|
|
with self.assertRaisesRegex(ValueError, 'Data cardinality is ambiguous'):
|
|
model.predict_on_batch([np.ones((10, 5)), np.ones((11, 5))])
|
|
|
|
|
|
class LossWeightingTest(keras_parameterized.TestCase):
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_class_weights(self):
|
|
num_classes = 5
|
|
batch_size = 5
|
|
epochs = 10
|
|
weighted_class = 3
|
|
weight = .5
|
|
train_samples = 1000
|
|
test_samples = 1000
|
|
input_dim = 5
|
|
learning_rate = 0.001
|
|
|
|
model = testing_utils.get_small_sequential_mlp(
|
|
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
|
|
model.compile(
|
|
loss='categorical_crossentropy',
|
|
metrics=['acc', metrics_module.CategoricalAccuracy()],
|
|
weighted_metrics=['mae', metrics_module.CategoricalAccuracy()],
|
|
optimizer=RMSPropOptimizer(learning_rate=learning_rate),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
np.random.seed(1337)
|
|
(x_train, y_train), (x_test, y_test) = testing_utils.get_test_data(
|
|
train_samples=train_samples,
|
|
test_samples=test_samples,
|
|
input_shape=(input_dim,),
|
|
num_classes=num_classes)
|
|
int_y_test = y_test.copy()
|
|
int_y_train = y_train.copy()
|
|
# convert class vectors to binary class matrices
|
|
y_train = np_utils.to_categorical(y_train, num_classes)
|
|
y_test = np_utils.to_categorical(y_test, num_classes)
|
|
test_ids = np.where(int_y_test == np.array(weighted_class))[0]
|
|
|
|
class_weight = dict([(i, 1.) for i in range(num_classes)])
|
|
class_weight[weighted_class] = weight
|
|
|
|
model.fit(
|
|
x_train,
|
|
y_train,
|
|
batch_size=batch_size,
|
|
epochs=epochs // 3,
|
|
verbose=0,
|
|
class_weight=class_weight,
|
|
validation_data=(x_train, y_train))
|
|
model.fit(
|
|
x_train,
|
|
y_train,
|
|
batch_size=batch_size,
|
|
epochs=epochs // 2,
|
|
verbose=0,
|
|
class_weight=class_weight)
|
|
model.fit(
|
|
x_train,
|
|
y_train,
|
|
batch_size=batch_size,
|
|
epochs=epochs // 2,
|
|
verbose=0,
|
|
class_weight=class_weight,
|
|
validation_split=0.1)
|
|
|
|
model.train_on_batch(
|
|
x_train[:batch_size], y_train[:batch_size], class_weight=class_weight)
|
|
ref_score = model.evaluate(x_test, y_test, verbose=0) # pylint: disable=unused-variable
|
|
score = model.evaluate( # pylint: disable=unused-variable
|
|
x_test[test_ids, :], y_test[test_ids, :], verbose=0)
|
|
# TODO(b/152990697): Fix the class weights test here.
|
|
# self.assertLess(score[0], ref_score[0])
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_temporal_sample_weights(self):
|
|
num_classes = 5
|
|
batch_size = 5
|
|
epochs = 10
|
|
weighted_class = 3
|
|
weight = 10.
|
|
train_samples = 1000
|
|
test_samples = 1000
|
|
input_dim = 5
|
|
timesteps = 3
|
|
learning_rate = 0.001
|
|
|
|
with self.cached_session():
|
|
model = sequential.Sequential()
|
|
model.add(
|
|
layers_module.TimeDistributed(
|
|
layers_module.Dense(num_classes),
|
|
input_shape=(timesteps, input_dim)))
|
|
model.add(layers_module.Activation('softmax'))
|
|
|
|
np.random.seed(1337)
|
|
(x_train, y_train), (x_test, y_test) = testing_utils.get_test_data(
|
|
train_samples=train_samples,
|
|
test_samples=test_samples,
|
|
input_shape=(input_dim,),
|
|
num_classes=num_classes)
|
|
int_y_test = y_test.copy()
|
|
int_y_train = y_train.copy()
|
|
# convert class vectors to binary class matrices
|
|
y_train = np_utils.to_categorical(y_train, num_classes)
|
|
y_test = np_utils.to_categorical(y_test, num_classes)
|
|
test_ids = np.where(int_y_test == np.array(weighted_class))[0]
|
|
|
|
sample_weight = np.ones((y_train.shape[0]))
|
|
sample_weight[int_y_train == weighted_class] = weight
|
|
|
|
temporal_x_train = np.reshape(x_train, (len(x_train), 1,
|
|
x_train.shape[1]))
|
|
temporal_x_train = np.repeat(temporal_x_train, timesteps, axis=1)
|
|
temporal_x_test = np.reshape(x_test, (len(x_test), 1, x_test.shape[1]))
|
|
temporal_x_test = np.repeat(temporal_x_test, timesteps, axis=1)
|
|
|
|
temporal_y_train = np.reshape(y_train, (len(y_train), 1,
|
|
y_train.shape[1]))
|
|
temporal_y_train = np.repeat(temporal_y_train, timesteps, axis=1)
|
|
temporal_y_test = np.reshape(y_test, (len(y_test), 1, y_test.shape[1]))
|
|
temporal_y_test = np.repeat(temporal_y_test, timesteps, axis=1)
|
|
|
|
temporal_sample_weight = np.reshape(sample_weight, (len(sample_weight),
|
|
1))
|
|
temporal_sample_weight = np.repeat(
|
|
temporal_sample_weight, timesteps, axis=1)
|
|
|
|
model.compile(
|
|
RMSPropOptimizer(learning_rate=learning_rate),
|
|
loss='categorical_crossentropy',
|
|
metrics=['acc', metrics_module.CategoricalAccuracy()],
|
|
weighted_metrics=['mae', metrics_module.CategoricalAccuracy()],
|
|
sample_weight_mode='temporal',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
model.fit(
|
|
temporal_x_train,
|
|
temporal_y_train,
|
|
batch_size=batch_size,
|
|
epochs=epochs // 3,
|
|
verbose=0,
|
|
sample_weight=temporal_sample_weight)
|
|
model.fit(
|
|
temporal_x_train,
|
|
temporal_y_train,
|
|
batch_size=batch_size,
|
|
epochs=epochs // 3,
|
|
verbose=0,
|
|
sample_weight=temporal_sample_weight,
|
|
validation_split=0.1)
|
|
|
|
model.train_on_batch(
|
|
temporal_x_train[:batch_size],
|
|
temporal_y_train[:batch_size],
|
|
sample_weight=temporal_sample_weight[:batch_size])
|
|
model.test_on_batch(
|
|
temporal_x_train[:batch_size],
|
|
temporal_y_train[:batch_size],
|
|
sample_weight=temporal_sample_weight[:batch_size])
|
|
ref_score = model.evaluate(temporal_x_test, temporal_y_test, verbose=0)
|
|
if not context.executing_eagerly():
|
|
score = model.evaluate(
|
|
temporal_x_test[test_ids], temporal_y_test[test_ids], verbose=0)
|
|
self.assertLess(score[0], ref_score[0])
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
@keras_parameterized.run_with_all_model_types(exclude_models='sequential')
|
|
def test_fit_with_incorrect_weights(self):
|
|
input_a = layers_module.Input(shape=(3,), name='input_a')
|
|
input_b = layers_module.Input(shape=(3,), name='input_b')
|
|
|
|
dense = layers_module.Dense(2, name='output_1')
|
|
dropout = layers_module.Dropout(0.5, name='output_2')
|
|
branch_a = [input_a, dense]
|
|
branch_b = [input_b, dense, dropout]
|
|
|
|
model = testing_utils.get_multi_io_model(branch_a, branch_b)
|
|
model.compile(
|
|
optimizer='adam',
|
|
loss='mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x = np.random.random((10, 3))
|
|
y = np.random.random((10, 2))
|
|
|
|
with self.assertRaises(ValueError):
|
|
model.fit([x, x], [y, y], epochs=1, sample_weight={'unknown': x})
|
|
|
|
with self.assertRaises(ValueError):
|
|
model.fit([x, x], [y, y], epochs=1, class_weight={'unknown': 1})
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_default_sample_weight(self):
|
|
"""Verifies that fit works without having to set sample_weight."""
|
|
num_classes = 5
|
|
input_dim = 5
|
|
timesteps = 3
|
|
learning_rate = 0.001
|
|
|
|
with self.cached_session():
|
|
model = sequential.Sequential()
|
|
model.add(
|
|
layers_module.TimeDistributed(
|
|
layers_module.Dense(num_classes),
|
|
input_shape=(timesteps, input_dim)))
|
|
|
|
x = np.random.random((10, timesteps, input_dim))
|
|
y = np.random.random((10, timesteps, num_classes))
|
|
optimizer = RMSPropOptimizer(learning_rate=learning_rate)
|
|
|
|
# sample_weight_mode is a list and mode value is None
|
|
model.compile(
|
|
optimizer,
|
|
loss='mse',
|
|
sample_weight_mode=[None],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, epochs=1, batch_size=10)
|
|
|
|
# sample_weight_mode is a list and mode value is `temporal`
|
|
model.compile(
|
|
optimizer,
|
|
loss='mse',
|
|
sample_weight_mode=['temporal'],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, epochs=1, batch_size=10)
|
|
|
|
# sample_weight_mode is a dict and mode value is None
|
|
model.compile(
|
|
optimizer,
|
|
loss='mse',
|
|
sample_weight_mode={'time_distributed': None},
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, epochs=1, batch_size=10)
|
|
|
|
# sample_weight_mode is a dict and mode value is `temporal`
|
|
model.compile(
|
|
optimizer,
|
|
loss='mse',
|
|
sample_weight_mode={'time_distributed': 'temporal'},
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, epochs=1, batch_size=10)
|
|
|
|
# sample_weight_mode is a not a list/dict and mode value is None
|
|
model.compile(
|
|
optimizer,
|
|
loss='mse',
|
|
sample_weight_mode=None,
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, epochs=1, batch_size=10)
|
|
|
|
# sample_weight_mode is a not a list/dict and mode value is `temporal`
|
|
model.compile(
|
|
optimizer,
|
|
loss='mse',
|
|
sample_weight_mode='temporal',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, epochs=1, batch_size=10)
|
|
|
|
def test_sample_weight_tensor(self):
|
|
"""Tests that sample weight may be defined as a tensor in the graph."""
|
|
with ops.get_default_graph().as_default():
|
|
# Create a simple pass-through model
|
|
inputs = layers_module.Input(shape=1, name='input_layer')
|
|
model = training_module.Model(inputs=inputs, outputs=inputs)
|
|
model.compile(
|
|
loss='mean_absolute_error',
|
|
optimizer='adam')
|
|
|
|
# Prepare sample weights iterator tensor
|
|
sample_weights = array_ops.constant(
|
|
[[0, .4, 1, 1], [2, .4, .3, 1]])
|
|
dataset = dataset_ops.Dataset.from_tensor_slices(sample_weights)
|
|
sample_weights = dataset_ops.make_one_shot_iterator(dataset).get_next()
|
|
sample_weights = training_utils_v1.standardize_sample_weights(
|
|
sample_weights, model.output_names)
|
|
|
|
# Update model loss with sample weight tensor.
|
|
model._compile_weights_loss_and_weighted_metrics(sample_weights)
|
|
|
|
feeds = {'input_layer:0': [[0], [0], [0], [0]],
|
|
'input_layer_target:0': [[1], [1], [1], [1]]}
|
|
with self.cached_session() as sess:
|
|
self.assertAllClose(
|
|
(.4 + 1 + 1) / 4, sess.run(model.total_loss, feed_dict=feeds))
|
|
self.assertAllClose(
|
|
(2+ .4 + .3 + 1) / 4, sess.run(model.total_loss, feed_dict=feeds))
|
|
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
class MaskingTest(keras_parameterized.TestCase):
|
|
|
|
def _get_model(self, input_shape=None):
|
|
layers = [
|
|
layers_module.Masking(mask_value=0),
|
|
layers_module.TimeDistributed(
|
|
layers_module.Dense(1, kernel_initializer='one'))
|
|
]
|
|
model = testing_utils.get_model_from_layers(layers, input_shape)
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
return model
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
def test_masking(self):
|
|
model = self._get_model(input_shape=(2, 1))
|
|
x = np.array([[[1], [1]], [[0], [0]]])
|
|
y = np.array([[[1], [1]], [[1], [1]]])
|
|
loss = model.train_on_batch(x, y)
|
|
self.assertEqual(loss, 0)
|
|
|
|
@keras_parameterized.run_with_all_model_types(exclude_models='functional')
|
|
def test_masking_deferred(self):
|
|
model = self._get_model()
|
|
x = np.array([[[1], [1]], [[0], [0]]])
|
|
y = np.array([[[1], [1]], [[1], [1]]])
|
|
loss = model.train_on_batch(x, y)
|
|
self.assertEqual(loss, 0)
|
|
|
|
def test_mask_argument_in_layer(self):
|
|
# Test that the mask argument gets correctly passed to a layer in the
|
|
# functional API.
|
|
|
|
class CustomMaskedLayer(layers_module.Layer):
|
|
|
|
def __init__(self):
|
|
super(CustomMaskedLayer, self).__init__()
|
|
self.supports_masking = True
|
|
|
|
def call(self, inputs, mask=None):
|
|
assert mask is not None
|
|
return inputs
|
|
|
|
def compute_output_shape(self, input_shape):
|
|
return input_shape
|
|
|
|
x = np.random.random((5, 3))
|
|
inputs = layers_module.Input((3,))
|
|
masked = layers_module.Masking(mask_value=0)(inputs)
|
|
outputs = CustomMaskedLayer()(masked)
|
|
|
|
model = training_module.Model(inputs, outputs)
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
y = np.random.random((5, 3))
|
|
model.train_on_batch(x, y)
|
|
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
class TestDynamicTrainability(keras_parameterized.TestCase):
|
|
|
|
def test_trainable_warning(self):
|
|
x = np.random.random((5, 3))
|
|
y = np.random.random((5, 2))
|
|
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(2, input_dim=3))
|
|
model.trainable = False
|
|
model.compile(
|
|
'rmsprop',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.trainable = True
|
|
model.train_on_batch(x, y)
|
|
self.assertRaises(Warning)
|
|
|
|
def test_trainable_argument(self):
|
|
with self.cached_session():
|
|
x = np.random.random((5, 3))
|
|
y = np.random.random((5, 2))
|
|
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(2, input_dim=3, trainable=False))
|
|
model.compile(
|
|
'rmsprop',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
out = model.predict(x)
|
|
model.train_on_batch(x, y)
|
|
out_2 = model.predict(x)
|
|
self.assertAllClose(out, out_2)
|
|
|
|
# test with nesting
|
|
inputs = layers_module.Input(shape=(3,))
|
|
output = model(inputs)
|
|
model = training_module.Model(inputs, output)
|
|
model.compile(
|
|
'rmsprop',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
out = model.predict(x)
|
|
model.train_on_batch(x, y)
|
|
out_2 = model.predict(x)
|
|
self.assertAllClose(out, out_2)
|
|
|
|
def test_layer_trainability_switch(self):
|
|
# with constructor argument, in Sequential
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(2, trainable=False, input_dim=1))
|
|
self.assertListEqual(model.trainable_weights, [])
|
|
|
|
# by setting the `trainable` argument, in Sequential
|
|
model = sequential.Sequential()
|
|
layer = layers_module.Dense(2, input_dim=1)
|
|
model.add(layer)
|
|
self.assertListEqual(model.trainable_weights, layer.trainable_weights)
|
|
layer.trainable = False
|
|
self.assertListEqual(model.trainable_weights, [])
|
|
|
|
# with constructor argument, in Model
|
|
x = layers_module.Input(shape=(1,))
|
|
y = layers_module.Dense(2, trainable=False)(x)
|
|
model = training_module.Model(x, y)
|
|
self.assertListEqual(model.trainable_weights, [])
|
|
|
|
# by setting the `trainable` argument, in Model
|
|
x = layers_module.Input(shape=(1,))
|
|
layer = layers_module.Dense(2)
|
|
y = layer(x)
|
|
model = training_module.Model(x, y)
|
|
self.assertListEqual(model.trainable_weights, layer.trainable_weights)
|
|
layer.trainable = False
|
|
self.assertListEqual(model.trainable_weights, [])
|
|
|
|
def test_model_trainability_switch(self):
|
|
# a non-trainable model has no trainable weights
|
|
x = layers_module.Input(shape=(1,))
|
|
y = layers_module.Dense(2)(x)
|
|
model = training_module.Model(x, y)
|
|
model.trainable = False
|
|
self.assertListEqual(model.trainable_weights, [])
|
|
|
|
# same for Sequential
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(2, input_dim=1))
|
|
model.trainable = False
|
|
self.assertListEqual(model.trainable_weights, [])
|
|
|
|
def test_nested_model_trainability(self):
|
|
# a Sequential inside a Model
|
|
inner_model = sequential.Sequential()
|
|
inner_model.add(layers_module.Dense(2, input_dim=1))
|
|
|
|
x = layers_module.Input(shape=(1,))
|
|
y = inner_model(x)
|
|
outer_model = training_module.Model(x, y)
|
|
self.assertListEqual(outer_model.trainable_weights,
|
|
inner_model.trainable_weights)
|
|
inner_model.trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
inner_model.trainable = True
|
|
inner_model.layers[-1].trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
|
|
# a Sequential inside a Sequential
|
|
inner_model = sequential.Sequential()
|
|
inner_model.add(layers_module.Dense(2, input_dim=1))
|
|
outer_model = sequential.Sequential()
|
|
outer_model.add(inner_model)
|
|
self.assertListEqual(outer_model.trainable_weights,
|
|
inner_model.trainable_weights)
|
|
inner_model.trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
inner_model.trainable = True
|
|
inner_model.layers[-1].trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
|
|
# a Model inside a Model
|
|
x = layers_module.Input(shape=(1,))
|
|
y = layers_module.Dense(2)(x)
|
|
inner_model = training_module.Model(x, y)
|
|
x = layers_module.Input(shape=(1,))
|
|
y = inner_model(x)
|
|
outer_model = training_module.Model(x, y)
|
|
self.assertListEqual(outer_model.trainable_weights,
|
|
inner_model.trainable_weights)
|
|
inner_model.trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
inner_model.trainable = True
|
|
inner_model.layers[-1].trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
|
|
# a Model inside a Sequential
|
|
x = layers_module.Input(shape=(1,))
|
|
y = layers_module.Dense(2)(x)
|
|
inner_model = training_module.Model(x, y)
|
|
outer_model = sequential.Sequential()
|
|
outer_model.add(inner_model)
|
|
self.assertListEqual(outer_model.trainable_weights,
|
|
inner_model.trainable_weights)
|
|
inner_model.trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
inner_model.trainable = True
|
|
inner_model.layers[-1].trainable = False
|
|
self.assertListEqual(outer_model.trainable_weights, [])
|
|
|
|
def test_gan_workflow(self):
|
|
shared_layer = layers_module.BatchNormalization()
|
|
|
|
inputs1 = input_layer.Input(10)
|
|
outputs1 = shared_layer(inputs1)
|
|
model1 = training_module.Model(inputs1, outputs1)
|
|
shared_layer.trainable = False
|
|
model1.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
inputs2 = input_layer.Input(10)
|
|
outputs2 = shared_layer(inputs2)
|
|
model2 = training_module.Model(inputs2, outputs2)
|
|
shared_layer.trainable = True
|
|
model2.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x, y = np.ones((10, 10)), np.ones((10, 10))
|
|
|
|
out1_0 = model1.predict_on_batch(x)
|
|
model1.train_on_batch(x, y)
|
|
out1_1 = model1.predict_on_batch(x)
|
|
self.assertAllClose(out1_0, out1_1)
|
|
|
|
out2_0 = model2.predict_on_batch(x)
|
|
model2.train_on_batch(x, y)
|
|
out2_1 = model2.predict_on_batch(x)
|
|
self.assertNotAllClose(out2_0, out2_1)
|
|
|
|
def test_toggle_value(self):
|
|
input_0 = layers_module.Input(shape=(1,))
|
|
dense_0 = layers_module.Dense(
|
|
1, kernel_initializer='ones', bias_initializer='ones')
|
|
dense_1 = layers_module.Dense(
|
|
1, kernel_initializer='ones', bias_initializer='ones')
|
|
result = layers_module.Add()([dense_0(input_0), dense_1(input_0)])
|
|
model = training_module.Model(input_0, result)
|
|
dense_0.trainable = False
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones((10, 1))
|
|
y = 5 * x + 2
|
|
model.train_on_batch(x, y)
|
|
dense_0.trainable = True
|
|
model.train_on_batch(x, y)
|
|
kernel, bias = dense_0.get_weights()
|
|
self.assertAllEqual([kernel[0, 0], bias[0]], [1., 1.])
|
|
|
|
kernel, bias = dense_1.get_weights()
|
|
self.assertAllClose([kernel[0, 0], bias[0]], [1.1176, 1.1176])
|
|
|
|
|
|
class TestTrainingWithDataTensors(keras_parameterized.TestCase):
|
|
|
|
def test_training_and_eval_methods_on_symbolic_tensors_single_io(self):
|
|
with ops.Graph().as_default():
|
|
x = layers_module.Input(shape=(3,), name='input')
|
|
y = layers_module.Dense(4, name='dense')(x)
|
|
model = training_module.Model(x, y)
|
|
|
|
optimizer = RMSPropOptimizer(learning_rate=0.001)
|
|
loss = 'mse'
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=['mae', metrics_module.CategoricalAccuracy()])
|
|
|
|
inputs = backend.zeros(shape=(10, 3))
|
|
targets = backend.zeros(shape=(10, 4))
|
|
|
|
model.fit(inputs, targets, epochs=1, steps_per_epoch=2, verbose=0)
|
|
model.evaluate(inputs, targets, steps=2, verbose=0)
|
|
model.predict(inputs, steps=2)
|
|
model.train_on_batch(inputs, targets)
|
|
model.test_on_batch(inputs, targets)
|
|
model.fit(inputs, targets,
|
|
epochs=1, steps_per_epoch=2, verbose=0,
|
|
validation_data=(inputs, targets), validation_steps=2)
|
|
|
|
# Test with dynamic shape
|
|
inputs = array_ops.placeholder_with_default(
|
|
np.zeros((2, 3)), shape=tensor_shape.TensorShape([None, 3]))
|
|
targets = array_ops.placeholder_with_default(
|
|
np.zeros((2, 4)), shape=tensor_shape.TensorShape([None, 4]))
|
|
self.assertEqual(inputs.shape.dims[0].value, None)
|
|
model.fit(inputs, targets, epochs=1, steps_per_epoch=2, verbose=0)
|
|
model.evaluate(inputs, targets, steps=2, verbose=0)
|
|
model.predict(inputs, steps=2)
|
|
model.train_on_batch(inputs, targets)
|
|
model.test_on_batch(inputs, targets)
|
|
model.fit(inputs, targets,
|
|
epochs=1, steps_per_epoch=2, verbose=0,
|
|
validation_data=(inputs, targets), validation_steps=2)
|
|
|
|
def test_training_and_eval_methods_on_symbolic_tensors_multi_io(self):
|
|
a = layers_module.Input(shape=(3,), name='input_a')
|
|
b = layers_module.Input(shape=(3,), name='input_b')
|
|
|
|
dense = layers_module.Dense(4, name='dense')
|
|
c = dense(a)
|
|
d = dense(b)
|
|
e = layers_module.Dropout(0.5, name='dropout')(c)
|
|
|
|
model = training_module.Model([a, b], [d, e])
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = 'mse'
|
|
loss_weights = [1., 0.5]
|
|
model.compile(
|
|
optimizer,
|
|
loss,
|
|
metrics=['mae', metrics_module.CategoricalAccuracy()],
|
|
loss_weights=loss_weights)
|
|
|
|
input_a_tf = array_ops.zeros(shape=(10, 3))
|
|
input_b_tf = array_ops.zeros(shape=(10, 3))
|
|
|
|
output_d_tf = array_ops.zeros(shape=(10, 4))
|
|
output_e_tf = array_ops.zeros(shape=(10, 4))
|
|
|
|
model.fit([input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
|
|
epochs=1,
|
|
steps_per_epoch=2,
|
|
verbose=0)
|
|
model.train_on_batch([input_a_tf, input_b_tf], [output_d_tf, output_e_tf])
|
|
|
|
# Test with dictionary inputs
|
|
model.fit({
|
|
'input_a': input_a_tf,
|
|
'input_b': input_b_tf
|
|
}, {
|
|
'dense': output_d_tf,
|
|
'dropout': output_e_tf
|
|
},
|
|
epochs=1,
|
|
steps_per_epoch=2,
|
|
verbose=0)
|
|
model.fit({
|
|
'input_a': input_a_tf,
|
|
'input_b': input_b_tf
|
|
}, {
|
|
'dense': output_d_tf,
|
|
'dropout': output_e_tf
|
|
},
|
|
validation_data=({
|
|
'input_a': input_a_tf,
|
|
'input_b': input_b_tf
|
|
}, {
|
|
'dense': output_d_tf,
|
|
'dropout': output_e_tf
|
|
}),
|
|
epochs=1,
|
|
steps_per_epoch=2,
|
|
validation_steps=2,
|
|
verbose=0)
|
|
model.train_on_batch({
|
|
'input_a': input_a_tf,
|
|
'input_b': input_b_tf
|
|
}, {
|
|
'dense': output_d_tf,
|
|
'dropout': output_e_tf
|
|
})
|
|
|
|
# Test with validation data
|
|
model.fit([input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
|
|
validation_data=([input_a_tf,
|
|
input_b_tf], [output_d_tf, output_e_tf]),
|
|
epochs=1,
|
|
steps_per_epoch=2,
|
|
validation_steps=2,
|
|
verbose=0)
|
|
# Test evaluation / prediction methods
|
|
model.evaluate([input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
|
|
steps=2,
|
|
verbose=0)
|
|
model.predict([input_a_tf, input_b_tf], steps=2)
|
|
model.test_on_batch([input_a_tf, input_b_tf], [output_d_tf, output_e_tf])
|
|
|
|
@tf_test_util.run_deprecated_v1
|
|
def test_model_with_input_feed_tensor(self):
|
|
"""We test building a model with a TF variable as input.
|
|
|
|
We should be able to call fit, evaluate, predict,
|
|
by only passing them data for the placeholder inputs
|
|
in the model.
|
|
"""
|
|
with ops.Graph().as_default(), self.cached_session():
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 3))
|
|
|
|
output_a_np = np.random.random((10, 4))
|
|
output_b_np = np.random.random((10, 3))
|
|
|
|
input_v = variables_lib.Variable(input_a_np, dtype='float32')
|
|
self.evaluate(variables_lib.variables_initializer([input_v]))
|
|
a = input_layer.Input(tensor=input_v)
|
|
b = input_layer.Input(shape=(3,), name='input_b')
|
|
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
dp = layers_module.Dropout(0.5, name='dropout')
|
|
b_2 = dp(b)
|
|
|
|
model = training_module.Model([a, b], [a_2, b_2])
|
|
model.summary()
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = 'mse'
|
|
loss_weights = [1., 0.5]
|
|
model.compile(optimizer, loss, metrics=['mean_squared_error'],
|
|
loss_weights=loss_weights,
|
|
sample_weight_mode=None)
|
|
|
|
# test train_on_batch
|
|
out = model.train_on_batch(input_b_np,
|
|
[output_a_np, output_b_np])
|
|
out = model.train_on_batch({'input_b': input_b_np},
|
|
[output_a_np, output_b_np])
|
|
out = model.test_on_batch({'input_b': input_b_np},
|
|
[output_a_np, output_b_np])
|
|
out = model.predict_on_batch({'input_b': input_b_np})
|
|
|
|
# test fit
|
|
out = model.fit({'input_b': input_b_np},
|
|
[output_a_np, output_b_np], epochs=1, batch_size=10)
|
|
out = model.fit(input_b_np,
|
|
[output_a_np, output_b_np], epochs=1, batch_size=10)
|
|
|
|
# test evaluate
|
|
out = model.evaluate({'input_b': input_b_np},
|
|
[output_a_np, output_b_np], batch_size=10)
|
|
out = model.evaluate(input_b_np,
|
|
[output_a_np, output_b_np], batch_size=10)
|
|
|
|
# test predict
|
|
out = model.predict({'input_b': input_b_np}, batch_size=10)
|
|
out = model.predict(input_b_np, batch_size=10)
|
|
self.assertEqual(len(out), 2)
|
|
|
|
# Now test a model with a single input
|
|
# i.e. we don't pass any data to fit the model.
|
|
self.evaluate(variables_lib.variables_initializer([input_v]))
|
|
a = input_layer.Input(tensor=input_v)
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
a_2 = layers_module.Dropout(0.5, name='dropout')(a_2)
|
|
model = training_module.Model(a, a_2)
|
|
model.summary()
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = 'mse'
|
|
model.compile(optimizer, loss, metrics=['mean_squared_error'])
|
|
|
|
# test train_on_batch
|
|
out = model.train_on_batch(None,
|
|
output_a_np)
|
|
out = model.train_on_batch(None,
|
|
output_a_np)
|
|
out = model.test_on_batch(None,
|
|
output_a_np)
|
|
out = model.predict_on_batch(None)
|
|
out = model.train_on_batch([],
|
|
output_a_np)
|
|
out = model.train_on_batch({},
|
|
output_a_np)
|
|
|
|
# test fit
|
|
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=3)
|
|
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=3)
|
|
|
|
# test evaluate
|
|
_ = model.evaluate(None, output_a_np, steps=3)
|
|
_ = model.evaluate(None, output_a_np, steps=3)
|
|
|
|
# test predict
|
|
out = model.predict(None, steps=3)
|
|
out = model.predict(None, steps=3)
|
|
self.assertEqual(out.shape, (10 * 3, 4))
|
|
|
|
# Same, without learning phase
|
|
# i.e. we don't pass any data to fit the model.
|
|
self.evaluate(variables_lib.variables_initializer([input_v]))
|
|
a = input_layer.Input(tensor=input_v)
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
model = training_module.Model(a, a_2)
|
|
model.summary()
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = 'mse'
|
|
model.compile(optimizer, loss, metrics=['mean_squared_error'])
|
|
|
|
# test train_on_batch
|
|
out = model.train_on_batch(None,
|
|
output_a_np)
|
|
out = model.train_on_batch(None,
|
|
output_a_np)
|
|
out = model.test_on_batch(None,
|
|
output_a_np)
|
|
out = model.predict_on_batch(None)
|
|
out = model.train_on_batch([],
|
|
output_a_np)
|
|
out = model.train_on_batch({},
|
|
output_a_np)
|
|
|
|
# test fit
|
|
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=10)
|
|
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=10)
|
|
|
|
# test evaluate
|
|
_ = model.evaluate(None, output_a_np, steps=10)
|
|
_ = model.evaluate(None, output_a_np, steps=10)
|
|
|
|
# test predict
|
|
out = model.predict(None, steps=3)
|
|
out = model.predict(None, steps=3)
|
|
self.assertEqual(out.shape, (10 * 3, 4))
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_model_with_partial_loss(self):
|
|
with self.cached_session():
|
|
a = input_layer.Input(shape=(3,), name='input_a')
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
dp = layers_module.Dropout(0.5, name='dropout')
|
|
a_3 = dp(a_2)
|
|
model = training_module.Model(a, [a_2, a_3])
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = {'dropout': 'mse'}
|
|
model.compile(optimizer, loss, metrics=['mae'])
|
|
|
|
input_a_np = np.random.random((10, 3))
|
|
output_a_np = np.random.random((10, 4))
|
|
|
|
# test train_on_batch
|
|
_ = model.train_on_batch(input_a_np, output_a_np)
|
|
_ = model.test_on_batch(input_a_np, output_a_np)
|
|
# fit
|
|
_ = model.fit(input_a_np, output_a_np)
|
|
# evaluate
|
|
_ = model.evaluate(input_a_np, output_a_np)
|
|
|
|
# Same without dropout.
|
|
a = input_layer.Input(shape=(3,), name='input_a')
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
a_3 = layers_module.Dense(4, name='dense_2')(a_2)
|
|
model = training_module.Model(a, [a_2, a_3])
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = {'dense_2': 'mse'}
|
|
model.compile(optimizer, loss, metrics={'dense_1': 'mae'})
|
|
|
|
# test train_on_batch
|
|
_ = model.train_on_batch(input_a_np, output_a_np)
|
|
_ = model.test_on_batch(input_a_np, output_a_np)
|
|
# fit
|
|
_ = model.fit(input_a_np, output_a_np)
|
|
# evaluate
|
|
_ = model.evaluate(input_a_np, output_a_np)
|
|
|
|
def test_model_with_external_loss(self):
|
|
with ops.Graph().as_default(), self.cached_session():
|
|
# None loss, only regularization loss.
|
|
a = input_layer.Input(shape=(3,), name='input_a')
|
|
a_2 = layers_module.Dense(
|
|
4, name='dense_1', kernel_regularizer='l1', bias_regularizer='l2')(
|
|
a)
|
|
dp = layers_module.Dropout(0.5, name='dropout')
|
|
a_3 = dp(a_2)
|
|
|
|
model = training_module.Model(a, [a_2, a_3])
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = None
|
|
model.compile(optimizer, loss, metrics=['mae'])
|
|
|
|
input_a_np = np.random.random((10, 3))
|
|
|
|
# test train_on_batch
|
|
out = model.train_on_batch(input_a_np, None)
|
|
out = model.test_on_batch(input_a_np, None)
|
|
# fit
|
|
out = model.fit(input_a_np, None)
|
|
# evaluate
|
|
out = model.evaluate(input_a_np, None)
|
|
|
|
# No dropout, external loss.
|
|
a = input_layer.Input(shape=(3,), name='input_a')
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
a_3 = layers_module.Dense(4, name='dense_2')(a)
|
|
|
|
model = training_module.Model(a, [a_2, a_3])
|
|
model.add_loss(backend.mean(a_3 + a_2))
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = None
|
|
model.compile(optimizer, loss, metrics=['mae'])
|
|
|
|
# test train_on_batch
|
|
out = model.train_on_batch(input_a_np, None)
|
|
out = model.test_on_batch(input_a_np, None)
|
|
# fit
|
|
out = model.fit(input_a_np, None)
|
|
# evaluate
|
|
out = model.evaluate(input_a_np, None)
|
|
|
|
# Test model with no external data at all.
|
|
input_v = variables_lib.Variable(input_a_np, dtype='float32')
|
|
self.evaluate(variables_lib.variables_initializer([input_v]))
|
|
a = input_layer.Input(tensor=input_v)
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
a_2 = layers_module.Dropout(0.5, name='dropout')(a_2)
|
|
model = training_module.Model(a, a_2)
|
|
model.add_loss(backend.mean(a_2))
|
|
|
|
model.compile(optimizer='rmsprop',
|
|
loss=None,
|
|
metrics=['mean_squared_error'])
|
|
|
|
# test train_on_batch
|
|
out = model.train_on_batch(None, None)
|
|
out = model.test_on_batch(None, None)
|
|
out = model.predict_on_batch(None)
|
|
|
|
# Test multi-output model with no external data at all.
|
|
self.evaluate(variables_lib.variables_initializer([input_v]))
|
|
a = input_layer.Input(tensor=input_v)
|
|
a_1 = layers_module.Dense(4, name='dense_1')(a)
|
|
a_2 = layers_module.Dropout(0.5, name='dropout')(a_1)
|
|
model = training_module.Model(a, [a_1, a_2])
|
|
model.add_loss(backend.mean(a_2))
|
|
|
|
model.compile(optimizer='rmsprop',
|
|
loss=None,
|
|
metrics=['mean_squared_error'])
|
|
|
|
# test train_on_batch
|
|
out = model.train_on_batch(None, None)
|
|
out = model.test_on_batch(None, None)
|
|
out = model.predict_on_batch(None)
|
|
|
|
out = model.predict(None, steps=3)
|
|
self.assertEqual(len(out), 2)
|
|
self.assertEqual(out[0].shape, (10 * 3, 4))
|
|
self.assertEqual(out[1].shape, (10 * 3, 4))
|
|
|
|
def test_target_tensors(self):
|
|
with ops.Graph().as_default(), self.cached_session():
|
|
# single-output, as list
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(4, input_shape=(4,), name='dense'))
|
|
input_val = np.random.random((10, 4))
|
|
target_val = np.random.random((10, 4))
|
|
target = backend.variable(target_val)
|
|
model.compile(optimizer='rmsprop', loss='mse', target_tensors=[target])
|
|
model.train_on_batch(input_val, None)
|
|
|
|
# single-output, as single tensor
|
|
model.compile(optimizer='rmsprop', loss='mse', target_tensors=target)
|
|
model.train_on_batch(input_val, None)
|
|
|
|
# single-output, as dict
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors={'dense': target})
|
|
model.train_on_batch(input_val, None)
|
|
|
|
# test invalid arguments
|
|
with self.assertRaises(TypeError):
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors=set())
|
|
with self.assertRaises(ValueError):
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors=[target, target])
|
|
with self.assertRaises(ValueError):
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors={'dense2': None})
|
|
with self.assertRaises(ValueError):
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors=[target])
|
|
model.train_on_batch(input_val, target_val)
|
|
|
|
# multi-output, as list
|
|
input_val = np.random.random((10, 4))
|
|
target_val_a = np.random.random((10, 4))
|
|
target_val_b = np.random.random((10, 4))
|
|
target_a = backend.variable(target_val_a)
|
|
target_b = backend.variable(target_val_b)
|
|
|
|
inputs = layers_module.Input(shape=(4,))
|
|
output_a = layers_module.Dense(4, name='dense_a')(inputs)
|
|
output_b = layers_module.Dense(4, name='dense_b')(inputs)
|
|
model = training_module.Model(inputs, [output_a, output_b])
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors=[target_a, target_b])
|
|
model.train_on_batch(input_val, None)
|
|
|
|
# multi-output, as dict
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors={'dense_a': target_a,
|
|
'dense_b': target_b})
|
|
model.train_on_batch(input_val, None)
|
|
|
|
# test with sample weights
|
|
model.compile(
|
|
optimizer='rmsprop',
|
|
loss='mse',
|
|
metrics=['mae', metrics_module.CategoricalAccuracy()],
|
|
target_tensors=[target_a, target_b])
|
|
model.train_on_batch(input_val, None,
|
|
sample_weight={'dense_a': np.random.random((10,))})
|
|
|
|
def test_model_custom_target_tensors(self):
|
|
with ops.Graph().as_default(), self.cached_session():
|
|
a = input_layer.Input(shape=(3,), name='input_a')
|
|
b = input_layer.Input(shape=(3,), name='input_b')
|
|
|
|
a_2 = layers_module.Dense(4, name='dense_1')(a)
|
|
dp = layers_module.Dropout(0.5, name='dropout')
|
|
b_2 = dp(b)
|
|
|
|
y = backend.placeholder([10, 4], name='y')
|
|
y1 = backend.placeholder([10, 3], name='y1')
|
|
y2 = backend.placeholder([7, 5], name='y2')
|
|
model = training_module.Model([a, b], [a_2, b_2])
|
|
|
|
optimizer = 'rmsprop'
|
|
loss = 'mse'
|
|
loss_weights = [1., 0.5]
|
|
|
|
# test list of target tensors
|
|
with self.assertRaises(ValueError):
|
|
model.compile(optimizer, loss, metrics=[], loss_weights=loss_weights,
|
|
sample_weight_mode=None, target_tensors=[y, y1, y2])
|
|
model.compile(optimizer, loss, metrics=[], loss_weights=loss_weights,
|
|
sample_weight_mode=None, target_tensors=[y, y1])
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 3))
|
|
|
|
output_a_np = np.random.random((10, 4))
|
|
output_b_np = np.random.random((10, 3))
|
|
|
|
_ = model.train_on_batch([input_a_np, input_b_np],
|
|
[output_a_np, output_b_np], {
|
|
'dense_1': np.random.random((10,)),
|
|
'dropout': np.random.random((10,))
|
|
})
|
|
# test dictionary of target_tensors
|
|
with self.assertRaises(ValueError):
|
|
model.compile(optimizer, loss,
|
|
metrics=[],
|
|
loss_weights=loss_weights,
|
|
sample_weight_mode=None,
|
|
target_tensors={'does_not_exist': y2})
|
|
# test dictionary of target_tensors
|
|
model.compile(optimizer, loss,
|
|
metrics=[],
|
|
loss_weights=loss_weights,
|
|
sample_weight_mode=None,
|
|
target_tensors={'dense_1': y, 'dropout': y1})
|
|
_ = model.train_on_batch([input_a_np, input_b_np],
|
|
[output_a_np, output_b_np], {
|
|
'dense_1': np.random.random((10,)),
|
|
'dropout': np.random.random((10,))
|
|
})
|
|
|
|
# test with custom TF placeholder as target
|
|
pl_target_a = array_ops.placeholder('float32', shape=(None, 4))
|
|
model.compile(optimizer='rmsprop', loss='mse',
|
|
target_tensors={'dense_1': pl_target_a})
|
|
model.train_on_batch([input_a_np, input_b_np],
|
|
[output_a_np, output_b_np])
|
|
|
|
|
|
class TestTrainingWithMetrics(keras_parameterized.TestCase):
|
|
"""Training tests related to metrics."""
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_metrics_names(self):
|
|
a = layers_module.Input(shape=(3,), name='input_a')
|
|
b = layers_module.Input(shape=(3,), name='input_b')
|
|
|
|
dense = layers_module.Dense(4, name='dense')
|
|
c = dense(a)
|
|
d = dense(b)
|
|
e = layers_module.Dropout(0.5, name='dropout')(c)
|
|
|
|
model = training_module.Model([a, b], [d, e])
|
|
|
|
optimizer = RMSPropOptimizer(learning_rate=0.001)
|
|
metrics = ['mse', metrics_module.BinaryAccuracy()]
|
|
model.compile(
|
|
optimizer,
|
|
loss='mae',
|
|
metrics=metrics,
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
mse_metric = 'mse' if context.executing_eagerly() else 'mean_squared_error'
|
|
reference_metric_names = [
|
|
'loss', 'dense_loss', 'dropout_loss', 'dense_' + mse_metric,
|
|
'dense_binary_accuracy', 'dropout_' + mse_metric,
|
|
'dropout_binary_accuracy'
|
|
]
|
|
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 3))
|
|
|
|
output_d_np = np.random.random((10, 4))
|
|
output_e_np = np.random.random((10, 4))
|
|
|
|
model.fit([input_a_np, input_b_np], [output_d_np, output_e_np],
|
|
epochs=1,
|
|
batch_size=5)
|
|
self.assertEqual(reference_metric_names, model.metrics_names)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_metric_state_reset_between_fit_and_evaluate(self):
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Dense(3, activation='relu', input_dim=4))
|
|
model.add(layers_module.Dense(1, activation='sigmoid'))
|
|
acc_obj = metrics_module.BinaryAccuracy()
|
|
model.compile(
|
|
loss='mae',
|
|
metrics=[acc_obj],
|
|
optimizer=RMSPropOptimizer(learning_rate=0.001),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x_train = np.random.random((100, 4))
|
|
y_train = np.random.random((100, 1))
|
|
model.fit(x_train, y_train, batch_size=5, epochs=2)
|
|
self.assertEqual(self.evaluate(acc_obj.count), 100)
|
|
|
|
x_test = np.random.random((10, 4))
|
|
y_test = np.random.random((10, 1))
|
|
model.evaluate(x_test, y_test, batch_size=5)
|
|
self.assertEqual(self.evaluate(acc_obj.count), 10)
|
|
|
|
@keras_parameterized.run_with_all_model_types(exclude_models=['sequential'])
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_metrics_valid_compile_input_formats(self):
|
|
inp_1 = layers_module.Input(shape=(1,), name='input_1')
|
|
inp_2 = layers_module.Input(shape=(1,), name='input_2')
|
|
x = layers_module.Dense(3, kernel_initializer='ones', trainable=False)
|
|
out_1 = layers_module.Dense(
|
|
1, kernel_initializer='ones', name='output_1', trainable=False)
|
|
out_2 = layers_module.Dense(
|
|
1, kernel_initializer='ones', name='output_2', trainable=False)
|
|
|
|
branch_a = [inp_1, x, out_1]
|
|
branch_b = [inp_2, x, out_2]
|
|
model = testing_utils.get_multi_io_model(branch_a, branch_b)
|
|
|
|
# list of metrics.
|
|
model.compile(
|
|
optimizer='rmsprop',
|
|
loss='mse',
|
|
metrics=[metrics_module.MeanSquaredError()],
|
|
weighted_metrics=[metrics_module.MeanSquaredError()],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
# list of list of metrics.
|
|
model.compile(
|
|
optimizer='rmsprop',
|
|
loss='mse',
|
|
metrics=[
|
|
metrics_module.MeanSquaredError(),
|
|
[metrics_module.MeanSquaredError(),
|
|
metrics_module.Accuracy()]
|
|
],
|
|
weighted_metrics=[
|
|
metrics_module.MeanSquaredError(),
|
|
[metrics_module.MeanSquaredError(),
|
|
metrics_module.Accuracy()]
|
|
],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
# dict of metrics.
|
|
model.compile(
|
|
optimizer='rmsprop',
|
|
loss='mse',
|
|
metrics={
|
|
'output_1':
|
|
metrics_module.MeanSquaredError(),
|
|
'output_2': [
|
|
metrics_module.MeanSquaredError(),
|
|
metrics_module.Accuracy()
|
|
],
|
|
},
|
|
weighted_metrics={
|
|
'output_1':
|
|
metrics_module.MeanSquaredError(),
|
|
'output_2': [
|
|
metrics_module.MeanSquaredError(),
|
|
metrics_module.Accuracy()
|
|
],
|
|
},
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_metrics_masking(self):
|
|
np.random.seed(1337)
|
|
model = sequential.Sequential()
|
|
model.add(layers_module.Masking(mask_value=0, input_shape=(2, 1)))
|
|
model.add(
|
|
layers_module.TimeDistributed(
|
|
layers_module.Dense(1, kernel_initializer='ones')))
|
|
model.compile(
|
|
RMSPropOptimizer(learning_rate=0.001),
|
|
loss='mse',
|
|
weighted_metrics=['accuracy'],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
# verify that masking is applied.
|
|
x = np.array([[[1], [1]], [[1], [1]], [[0], [0]]])
|
|
y = np.array([[[1], [1]], [[0], [1]], [[1], [1]]])
|
|
scores = model.train_on_batch(x, y)
|
|
self.assertArrayNear(scores, [0.25, 0.75], 0.1)
|
|
|
|
# verify that masking is combined with sample weights.
|
|
w = np.array([3, 2, 4])
|
|
scores = model.train_on_batch(x, y, sample_weight=w)
|
|
self.assertArrayNear(scores, [0.3328, 0.8], 0.001)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_with_tensor_on_model(self):
|
|
x = layers_module.Input(shape=(1,))
|
|
y = layers_module.Dense(1, kernel_initializer='ones')(x)
|
|
model = training_module.Model(x, y)
|
|
model.add_metric(
|
|
math_ops.reduce_sum(y), name='metric_1', aggregation='mean')
|
|
|
|
if context.executing_eagerly():
|
|
# This is not a use case in v1 graph mode.
|
|
mean_result = metrics_module.Mean()(y)
|
|
with self.assertRaisesRegex(
|
|
ValueError, 'Expected a symbolic Tensor for the metric value'):
|
|
model.add_metric(mean_result, name='metric_2')
|
|
else:
|
|
with self.assertRaisesRegex(
|
|
ValueError, 'Using the result of calling a `Metric` object '):
|
|
with backend.get_graph().as_default():
|
|
model.add_metric(metrics_module.Mean(name='metric_2')(y))
|
|
|
|
model.compile(
|
|
'sgd',
|
|
loss='mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
inputs = np.ones(shape=(10, 1))
|
|
targets = np.ones(shape=(10, 1))
|
|
history = model.fit(
|
|
inputs,
|
|
targets,
|
|
epochs=2,
|
|
batch_size=5,
|
|
validation_data=(inputs, targets))
|
|
self.assertEqual(history.history['metric_1'][-1], 5)
|
|
self.assertEqual(history.history['val_metric_1'][-1], 5)
|
|
|
|
eval_results = model.evaluate(inputs, targets, batch_size=5)
|
|
self.assertEqual(eval_results[-1], 5)
|
|
|
|
model.predict(inputs, batch_size=5)
|
|
model.train_on_batch(inputs, targets)
|
|
model.test_on_batch(inputs, targets)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_in_model_call(self):
|
|
|
|
class TestModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(name='test_model')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
self.mean = metrics_module.Mean(name='metric_1')
|
|
|
|
def call(self, x):
|
|
self.add_metric(
|
|
math_ops.reduce_sum(x), name='metric_2', aggregation='mean')
|
|
# Provide same name as in the instance created in __init__
|
|
# for eager mode
|
|
self.add_metric(self.mean(x), name='metric_1')
|
|
return self.dense1(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(0.01),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones(shape=(10, 1))
|
|
y = np.ones(shape=(10, 2))
|
|
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
|
|
self.assertAlmostEqual(history.history['metric_1'][-1], 1, 0)
|
|
self.assertAlmostEqual(history.history['val_metric_1'][-1], 1, 0)
|
|
self.assertAlmostEqual(history.history['metric_2'][-1], 5, 0)
|
|
self.assertAlmostEqual(history.history['val_metric_2'][-1], 5, 0)
|
|
|
|
eval_results = model.evaluate(x, y, batch_size=5)
|
|
self.assertAlmostEqual(eval_results[1], 1, 0)
|
|
self.assertAlmostEqual(eval_results[2], 5, 0)
|
|
|
|
model.predict(x, batch_size=5)
|
|
model.train_on_batch(x, y)
|
|
model.test_on_batch(x, y)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_in_layer_call(self):
|
|
|
|
class TestLayer(layers_module.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.a = self.add_variable(
|
|
'a', (1, 1), initializer='ones', trainable=False)
|
|
self.built = True
|
|
|
|
def call(self, inputs):
|
|
self.add_metric(
|
|
math_ops.reduce_sum(inputs), name='metric_1', aggregation='mean')
|
|
return inputs + 1
|
|
|
|
layers = [
|
|
TestLayer(input_shape=(1,)),
|
|
layers_module.Dense(2, kernel_initializer='ones')
|
|
]
|
|
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(0.01),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones(shape=(10, 1))
|
|
y = np.ones(shape=(10, 2))
|
|
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
|
|
self.assertEqual(history.history['metric_1'][-1], 5)
|
|
self.assertAlmostEqual(history.history['val_metric_1'][-1], 5, 0)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_model_metrics_list(self):
|
|
|
|
class LayerWithAddMetric(layers_module.Layer):
|
|
|
|
def __init__(self):
|
|
super(LayerWithAddMetric, self).__init__()
|
|
self.dense = layers_module.Dense(1, kernel_initializer='ones')
|
|
|
|
def __call__(self, inputs):
|
|
outputs = self.dense(inputs)
|
|
self.add_metric(
|
|
math_ops.reduce_sum(outputs), name='metric_1', aggregation='mean')
|
|
return outputs
|
|
|
|
class LayerWithNestedAddMetricLayer(layers_module.Layer):
|
|
|
|
def __init__(self):
|
|
super(LayerWithNestedAddMetricLayer, self).__init__()
|
|
self.layer = LayerWithAddMetric()
|
|
|
|
def call(self, inputs):
|
|
outputs = self.layer(inputs)
|
|
self.add_metric(
|
|
math_ops.reduce_sum(outputs), name='metric_2', aggregation='mean')
|
|
return outputs
|
|
|
|
x = layers_module.Input(shape=(1,))
|
|
y = LayerWithNestedAddMetricLayer()(x)
|
|
|
|
model = training_module.Model(x, y)
|
|
model.add_metric(
|
|
math_ops.reduce_sum(y), name='metric_3', aggregation='mean')
|
|
|
|
if context.executing_eagerly():
|
|
# This is not a use case in v1 graph mode.
|
|
mean_result = metrics_module.Mean()(y)
|
|
with self.assertRaisesRegex(
|
|
ValueError, 'Expected a symbolic Tensor for the metric value'):
|
|
model.add_metric(mean_result, name='metric_4')
|
|
|
|
else:
|
|
with self.assertRaisesRegex(
|
|
ValueError, 'Using the result of calling a `Metric` object '):
|
|
with backend.get_graph().as_default():
|
|
model.add_metric(metrics_module.Mean(name='metric_4')(y))
|
|
|
|
model.compile(
|
|
'sgd',
|
|
loss='mse',
|
|
metrics=[metrics_module.Accuracy('metric_4')],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
model.fit(np.ones((10, 1)), np.ones((10, 1)), batch_size=10)
|
|
|
|
# Verify that the metrics added using `compile` and `add_metric` API are
|
|
# included
|
|
self.assertEqual([m.name for m in model.metrics],
|
|
['loss', 'metric_4', 'metric_2', 'metric_1', 'metric_3'])
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_model_metrics_list_in_call(self):
|
|
|
|
class TestModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(name='test_model')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
|
|
def call(self, x):
|
|
self.add_metric(
|
|
math_ops.reduce_sum(x), name='metric_1', aggregation='mean')
|
|
return self.dense1(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(0.01),
|
|
metrics=[metrics_module.Accuracy('acc')],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x = np.ones(shape=(10, 1))
|
|
y = np.ones(shape=(10, 2))
|
|
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
|
|
|
|
self.assertEqual([m.name for m in model.metrics],
|
|
['loss', 'acc', 'metric_1'])
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_multiple_add_metric_calls(self):
|
|
|
|
class TestModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(name='test_model')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
self.mean1 = metrics_module.Mean(name='metric_1')
|
|
self.mean2 = metrics_module.Mean(name='metric_2')
|
|
|
|
def call(self, x):
|
|
self.add_metric(self.mean2(x), name='metric_2')
|
|
self.add_metric(self.mean1(x), name='metric_1')
|
|
self.add_metric(
|
|
math_ops.reduce_sum(x), name='metric_3', aggregation='mean')
|
|
return self.dense1(x)
|
|
|
|
model = TestModel()
|
|
self.assertListEqual([m.name for m in model.metrics],
|
|
['metric_1', 'metric_2'])
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(0.01),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones(shape=(10, 1))
|
|
y = np.ones(shape=(10, 2))
|
|
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
|
|
self.assertAlmostEqual(history.history['metric_1'][-1], 1, 0)
|
|
self.assertAlmostEqual(history.history['metric_2'][-1], 1, 0)
|
|
self.assertAlmostEqual(history.history['metric_3'][-1], 5, 0)
|
|
|
|
eval_results = model.evaluate(x, y, batch_size=5)
|
|
self.assertArrayNear(eval_results[1:4], [1, 1, 5], 0.1)
|
|
|
|
model.predict(x, batch_size=5)
|
|
model.train_on_batch(x, y)
|
|
model.test_on_batch(x, y)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_multiple_add_metric_calls_layer(self):
|
|
|
|
class TestLayer(layers_module.Layer):
|
|
|
|
def __init__(self):
|
|
super(TestLayer, self).__init__(name='test_layer')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
self.m1 = metrics_module.Mean(name='m_1')
|
|
self.m2 = [
|
|
metrics_module.Mean(name='m_2'),
|
|
metrics_module.Mean(name='m_3')
|
|
]
|
|
self.m3 = {
|
|
'mean4': metrics_module.Mean(name='m_4'),
|
|
'mean5': metrics_module.Mean(name='m_5')
|
|
}
|
|
|
|
def call(self, x):
|
|
self.add_metric(self.m2[0](x))
|
|
self.add_metric(self.m2[1](x))
|
|
self.add_metric(self.m1(x))
|
|
self.add_metric(self.m3['mean4'](x))
|
|
self.add_metric(self.m3['mean5'](x))
|
|
self.add_metric(math_ops.reduce_sum(x), name='m_6', aggregation='mean')
|
|
return self.dense1(x)
|
|
|
|
layer = TestLayer()
|
|
self.assertListEqual([m.name for m in layer.metrics],
|
|
['m_1', 'm_2', 'm_3', 'm_4', 'm_5'])
|
|
|
|
layer(np.ones((10, 10)))
|
|
self.assertListEqual([m.name for m in layer.metrics],
|
|
['m_1', 'm_2', 'm_3', 'm_4', 'm_5', 'm_6'])
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_duplicate_metric_name_in_add_metric(self):
|
|
|
|
class TestModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(name='test_model')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
self.mean = metrics_module.Mean(name='metric_1')
|
|
self.mean2 = metrics_module.Mean(name='metric_1')
|
|
|
|
def call(self, x):
|
|
self.add_metric(self.mean(x), name='metric_1')
|
|
return self.dense1(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(0.01),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones(shape=(10, 1))
|
|
y = np.ones(shape=(10, 2))
|
|
with self.assertRaisesRegex(
|
|
ValueError,
|
|
'Please provide different names for the metrics you have added. '
|
|
'We found 2 metrics with the name: "metric_1"'):
|
|
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_without_name(self):
|
|
|
|
class TestModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(name='test_model')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
|
|
def call(self, x):
|
|
self.add_metric(math_ops.reduce_sum(x), aggregation='mean')
|
|
return self.dense1(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer=RMSPropOptimizer(0.01),
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x = np.ones(shape=(10, 1))
|
|
y = np.ones(shape=(10, 2))
|
|
|
|
with self.assertRaisesRegex(ValueError,
|
|
'Please provide a name for your metric like'):
|
|
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_correctness(self):
|
|
inputs = input_layer.Input(shape=(1,))
|
|
targets = input_layer.Input(shape=(1,))
|
|
|
|
class Bias(layers_module.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.bias = self.add_variable('bias', (1,), initializer='zeros')
|
|
self.mae = metrics_module.MeanAbsoluteError(name='mae_1')
|
|
|
|
def call(self, inputs):
|
|
inputs, targets = inputs
|
|
outputs = inputs + self.bias
|
|
self.add_metric(self.mae(targets, outputs), name='mae_1')
|
|
return outputs
|
|
|
|
outputs = Bias()([inputs, targets])
|
|
model = training_module.Model([inputs, targets], outputs)
|
|
|
|
model.add_metric(
|
|
metrics_module.mean_absolute_error(targets, outputs),
|
|
name='mae_2',
|
|
aggregation='mean')
|
|
|
|
model.compile(
|
|
loss='mae',
|
|
optimizer=optimizer_v2.gradient_descent.SGD(0.1),
|
|
metrics=[metrics_module.MeanAbsoluteError(name='mae_3')],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.array([[0.], [1.], [2.]])
|
|
y = np.array([[0.5], [2.], [3.5]])
|
|
history = model.fit([x, y], y, batch_size=3, epochs=5)
|
|
|
|
expected_val = [1., 0.9, 0.8, 0.7, 0.6]
|
|
for key in ['loss', 'mae_1', 'mae_2', 'mae_3']:
|
|
self.assertAllClose(history.history[key], expected_val, 1e-3)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_order(self):
|
|
|
|
class MyLayer(layers_module.Layer):
|
|
|
|
def call(self, inputs, training=None, mask=None):
|
|
self.add_metric(
|
|
array_ops.ones([32]) * 2.0, name='two', aggregation='mean')
|
|
return inputs
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def __init__(self, **kwargs):
|
|
super(MyModel, self).__init__(**kwargs)
|
|
self._sampler = MyLayer(name='sampler')
|
|
|
|
def call(self, inputs, training=None, mask=None):
|
|
z = self._sampler(inputs)
|
|
self.add_metric(
|
|
array_ops.ones([32]) * 1.0, name='one', aggregation='mean')
|
|
self.add_metric(
|
|
array_ops.ones([32]) * 3.0, name='three', aggregation='mean')
|
|
return z
|
|
|
|
xdata = np.random.uniform(size=[32, 16]).astype(np.float32)
|
|
dataset_train = dataset_ops.Dataset.from_tensor_slices((xdata, xdata))
|
|
dataset_train = dataset_train.batch(32, drop_remainder=True)
|
|
|
|
model = MyModel()
|
|
model.compile(
|
|
optimizer='sgd',
|
|
loss='mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
history = model.fit(dataset_train, epochs=3)
|
|
self.assertDictEqual(
|
|
history.history, {
|
|
'loss': [0.0, 0.0, 0.0],
|
|
'three': [3.0, 3.0, 3.0],
|
|
'two': [2.0, 2.0, 2.0],
|
|
'one': [1.0, 1.0, 1.0]
|
|
})
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_aggregation_mean(self):
|
|
|
|
class TestModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(name='test_model')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
|
|
def call(self, x):
|
|
self.add_metric(
|
|
math_ops.reduce_sum(x), name='metric_1', aggregation='mean')
|
|
return self.dense1(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
'rmsprop', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(np.ones(shape=(10, 1)), np.ones(shape=(10, 2)), batch_size=5)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_add_metric_aggregation_none(self):
|
|
|
|
class TestModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(name='test_model')
|
|
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
|
|
self.mean = metrics_module.Mean(name='metric_1')
|
|
|
|
def call(self, x):
|
|
self.add_metric(self.mean(x), name='metric_1', aggregation=None)
|
|
return self.dense1(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
'rmsprop', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(np.ones(shape=(10, 1)), np.ones(shape=(10, 2)), batch_size=5)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def DISABLED_test_add_metric_invalid_aggregation(self):
|
|
# TODO(psv): Reenable test once it is fixed.
|
|
x = layers_module.Input(shape=(1,))
|
|
y = layers_module.Dense(1, kernel_initializer='ones')(x)
|
|
model = training_module.Model(x, y)
|
|
with self.assertRaisesRegex(ValueError,
|
|
'only `mean` sample-wise metric aggregation'):
|
|
model.add_metric(
|
|
math_ops.reduce_sum(y), name='metric_1', aggregation='sum')
|
|
|
|
with self.assertRaisesRegex(ValueError,
|
|
'only `mean` sample-wise metric aggregation'):
|
|
model.add_metric(
|
|
math_ops.reduce_sum(y), name='metric_1', aggregation=None)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_model_with_nested_compiled_model(self):
|
|
|
|
class LayerWithAddMetric(layers_module.Layer):
|
|
|
|
def __init__(self):
|
|
super(LayerWithAddMetric, self).__init__()
|
|
self.dense = layers_module.Dense(1, kernel_initializer='ones')
|
|
|
|
def call(self, inputs):
|
|
outputs = self.dense(inputs)
|
|
self.add_metric(
|
|
math_ops.reduce_sum(outputs), name='mean', aggregation='mean')
|
|
return outputs
|
|
|
|
x = layers_module.Input(shape=(1,))
|
|
y = LayerWithAddMetric()(x)
|
|
|
|
inner_model = training_module.Model(x, y)
|
|
inner_model.add_metric(
|
|
math_ops.reduce_sum(y), name='mean1', aggregation='mean')
|
|
|
|
inner_model.compile(
|
|
'sgd',
|
|
loss='mse',
|
|
metrics=[metrics_module.Accuracy('acc')],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
inner_model.fit(np.ones((10, 1)), np.ones((10, 1)), batch_size=10)
|
|
|
|
self.assertEqual([m.name for m in inner_model.metrics],
|
|
['loss', 'acc', 'mean', 'mean1'])
|
|
|
|
x = layers_module.Input(shape=[1])
|
|
y = inner_model(x)
|
|
outer_model = training_module.Model(x, y)
|
|
outer_model.add_metric(
|
|
math_ops.reduce_sum(y), name='mean2', aggregation='mean')
|
|
|
|
outer_model.compile(
|
|
'sgd',
|
|
loss='mse',
|
|
metrics=[metrics_module.Accuracy('acc2')],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
outer_model.fit(np.ones((10, 1)), np.ones((10, 1)), batch_size=10)
|
|
self.assertEqual([m.name for m in outer_model.metrics],
|
|
['loss', 'acc2', 'mean', 'mean1', 'mean2'])
|
|
|
|
|
|
class BareUpdateLayer(layers_module.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.counter = self.add_weight(
|
|
'counter',
|
|
dtype='int32',
|
|
shape=(),
|
|
initializer='zeros',
|
|
trainable=False)
|
|
|
|
def call(self, inputs):
|
|
state_ops.assign_add(self.counter, 1)
|
|
return math_ops.cast(self.counter, inputs.dtype) * inputs
|
|
|
|
|
|
class LambdaUpdateLayer(layers_module.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.counter = self.add_weight(
|
|
'counter',
|
|
dtype='int32',
|
|
shape=(),
|
|
initializer='zeros',
|
|
trainable=False)
|
|
|
|
def call(self, inputs):
|
|
# Make sure update isn't run twice.
|
|
self.add_update(lambda: state_ops.assign_add(self.counter, 1))
|
|
return math_ops.cast(self.counter, inputs.dtype) * inputs
|
|
|
|
|
|
class NestedUpdateLayer(layers_module.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.layer = BareUpdateLayer()
|
|
self.layer.build(input_shape)
|
|
|
|
@property
|
|
def counter(self):
|
|
return self.layer.counter
|
|
|
|
def call(self, inputs):
|
|
return self.layer(inputs)
|
|
|
|
|
|
class SubgraphUpdateLayer(layers_module.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.counter = self.add_weight(
|
|
'counter',
|
|
dtype='int32',
|
|
shape=(),
|
|
initializer='zeros',
|
|
trainable=False)
|
|
|
|
def call(self, inputs, training=None):
|
|
if training is None:
|
|
training = backend.learning_phase()
|
|
|
|
if training:
|
|
self.counter.assign(self.counter + 1)
|
|
return inputs
|
|
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
class TestAutoUpdates(keras_parameterized.TestCase):
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
@parameterized.named_parameters(
|
|
('bare_update', BareUpdateLayer),
|
|
('lambda_update', LambdaUpdateLayer),
|
|
('nested_update', NestedUpdateLayer))
|
|
def test_updates_in_model(self, layer_builder):
|
|
layer = layer_builder()
|
|
x, y = np.ones((10, 10)), np.ones((10, 1))
|
|
model = testing_utils.get_model_from_layers(
|
|
[layer, layers_module.Dense(1)], input_shape=(10,))
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, batch_size=2, epochs=1)
|
|
self.assertEqual(self.evaluate(layer.counter), 5)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
def test_lambda_updates_trainable_false(self):
|
|
x, y = np.ones((10, 10)), np.ones((10, 1))
|
|
layer = LambdaUpdateLayer()
|
|
model = testing_utils.get_model_from_layers(
|
|
[layer, layers_module.Dense(1)], input_shape=(10,))
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, batch_size=2, epochs=1)
|
|
self.assertEqual(self.evaluate(layer.counter), 5)
|
|
layer.trainable = False
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, batch_size=2, epochs=1)
|
|
self.assertEqual(self.evaluate(layer.counter), 5)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
def test_subgraph_updates_in_model(self):
|
|
layer = SubgraphUpdateLayer()
|
|
x, y = np.ones((10, 10)), np.ones((10, 1))
|
|
model = testing_utils.get_model_from_layers(
|
|
[layer, layers_module.Dense(1)], input_shape=(10,))
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(x, y, batch_size=2, epochs=1)
|
|
self.assertEqual(self.evaluate(layer.counter), 5)
|
|
|
|
@parameterized.named_parameters(
|
|
('bare_update', BareUpdateLayer),
|
|
('lambda_update', LambdaUpdateLayer),
|
|
('nested_update', NestedUpdateLayer))
|
|
def test_updates_standalone_layer(self, layer_builder):
|
|
layer = layer_builder()
|
|
y = layer(np.ones((10, 10)))
|
|
self.evaluate(layer.counter.initializer)
|
|
self.evaluate(y)
|
|
self.assertEqual(self.evaluate(layer.counter), 1)
|
|
|
|
def test_trainable_false_standalone_layer(self):
|
|
layer = LambdaUpdateLayer()
|
|
y = layer(np.ones((10, 10)))
|
|
self.evaluate(layer.counter.initializer)
|
|
self.evaluate(y)
|
|
self.assertEqual(self.evaluate(layer.counter), 1)
|
|
layer.trainable = False
|
|
y = layer(np.ones((10, 10)))
|
|
self.evaluate(y)
|
|
self.assertEqual(self.evaluate(layer.counter), 1)
|
|
|
|
@keras_parameterized.run_with_all_model_types
|
|
def test_batchnorm_trainable_false(self):
|
|
bn = layers_module.BatchNormalization()
|
|
model = testing_utils.get_model_from_layers([bn, layers_module.Dense(1)],
|
|
input_shape=(10,))
|
|
bn.trainable = False
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x, y = np.ones((10, 10)), np.ones((10, 1))
|
|
model.fit(x, y, batch_size=2, epochs=1)
|
|
self.assertAllEqual(self.evaluate(bn.moving_mean), np.zeros((10,)))
|
|
self.assertAllEqual(self.evaluate(bn.moving_variance), np.ones((10,)))
|
|
|
|
|
|
class TestFunctionTracing(keras_parameterized.TestCase):
|
|
|
|
def _seq_model_and_data(self):
|
|
model = sequential.Sequential([layers_module.Dense(4, activation='relu')])
|
|
model.compile(loss='mse', optimizer='rmsprop')
|
|
x = np.random.random((10, 6))
|
|
y = np.random.random((10, 4))
|
|
return model, x, y
|
|
|
|
@keras_parameterized.run_all_keras_modes(
|
|
always_skip_v1=True, always_skip_eager=True)
|
|
def test_no_tracing_between_epoch(self):
|
|
if sys.version_info[0] < 3:
|
|
self.skipTest('self.assertLogs() call is not available in Python 2.')
|
|
|
|
model, x, y = self._seq_model_and_data()
|
|
|
|
logging.set_verbosity(1)
|
|
with self.assertLogs(level=1) as logs:
|
|
model.fit(x, y, epochs=10, batch_size=5, validation_data=(x, y))
|
|
|
|
new_func_graph = 'INFO:absl:Creating new FuncGraph for Python function'
|
|
self.assertEqual(sum(new_func_graph in log for log in logs.output), 9)
|
|
|
|
@keras_parameterized.run_all_keras_modes(
|
|
always_skip_v1=True, always_skip_eager=True)
|
|
def test_evaluate_no_cached_data(self):
|
|
if sys.version_info[0] < 3:
|
|
self.skipTest('self.assertLogs() call is not available in Python 2.')
|
|
|
|
model, x, y = self._seq_model_and_data()
|
|
|
|
new_func_graph = 'INFO:absl:Creating new FuncGraph for Python function'
|
|
logging.set_verbosity(1)
|
|
with self.assertLogs(level=1) as eval_logs:
|
|
for _ in range(6):
|
|
model.evaluate(x, y, batch_size=5)
|
|
self.assertEqual(sum(new_func_graph in log for log in eval_logs.output), 20)
|
|
|
|
|
|
class TestBuildCustomModel(keras_parameterized.TestCase):
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_build_list_of_inputs(self):
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
self.l1 = layers_module.Dense(1)
|
|
self.l2 = layers_module.Dense(2)
|
|
|
|
def call(self, x):
|
|
a, b = x
|
|
return self.l1(a) + self.l2(b)
|
|
|
|
# List of tuples
|
|
model = MyModel()
|
|
model.build([(None, 1), (None, 2)])
|
|
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
|
|
self.assertEqual(model.l2.kernel.shape.as_list(), [2, 2])
|
|
# List of lists
|
|
model = MyModel()
|
|
model.build([[None, 1], [None, 2]])
|
|
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
|
|
self.assertEqual(model.l2.kernel.shape.as_list(), [2, 2])
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_build_single_inputs(self):
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
self.l1 = layers_module.Dense(1)
|
|
|
|
def call(self, x):
|
|
return self.l1(x)
|
|
|
|
model = MyModel()
|
|
model.build((None, 1))
|
|
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
|
|
model = MyModel()
|
|
model.build([None, 1])
|
|
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_build_dict_inputs(self):
|
|
|
|
class MyModel(training_module.Model):
|
|
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
self.l1 = layers_module.Dense(1)
|
|
|
|
def call(self, inputs):
|
|
return self.l1(inputs['x'])
|
|
|
|
model = MyModel()
|
|
model.build({'x': [None, 16]})
|
|
self.assertEqual(model.l1.kernel.shape.as_list(), [16, 1])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test.main()
|