Split hdf5_format_test to save_test and save_weights_test.

hdf5_format_test tests both h5 and SavedModel formats. Splitting it into 2 files helps the test organization.

(because save_test already exists, the whole model saving tests had to be copied over manually. No changes have been made to the copied tests)

PiperOrigin-RevId: 345493852
Change-Id: I5893c027ba85ffc5931dd0963a7068a71fc82a5c
This commit is contained in:
Katherine Wu 2020-12-03 11:19:30 -08:00 committed by TensorFlower Gardener
parent bb2e09ad72
commit 6c9d159c0a
3 changed files with 664 additions and 654 deletions

View File

@ -111,9 +111,9 @@ tf_py_test(
)
tf_py_test(
name = "hdf5_format_test",
name = "save_weights_test",
size = "medium",
srcs = ["hdf5_format_test.py"],
srcs = ["save_weights_test.py"],
python_version = "PY3",
shard_count = 4,
tags = [
@ -134,6 +134,7 @@ tf_py_test(
size = "medium",
srcs = ["save_test.py"],
python_version = "PY3",
shard_count = 4,
deps = [
"//tensorflow/python:client_testlib",
"//tensorflow/python/feature_column:feature_column_v2",

View File

@ -19,17 +19,25 @@ from __future__ import division
from __future__ import print_function
import os
import shutil
import sys
import tempfile
from absl.testing import parameterized
import numpy as np
from tensorflow.python import keras
from tensorflow.python import tf2
from tensorflow.python.eager import context
from tensorflow.python.feature_column import feature_column_lib
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import ops
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.keras import combinations
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import losses
from tensorflow.python.keras import optimizer_v1
from tensorflow.python.keras import optimizers
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import sequential
from tensorflow.python.keras.feature_column import dense_features
@ -38,9 +46,13 @@ from tensorflow.python.keras.layers import core
from tensorflow.python.keras.saving import model_config
from tensorflow.python.keras.saving import save
from tensorflow.python.keras.utils import generic_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import lookup_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.platform import test
from tensorflow.python.saved_model import loader_impl
from tensorflow.python.training import training as training_module
if sys.version_info >= (3, 6):
import pathlib # pylint:disable=g-import-not-at-top
@ -309,5 +321,652 @@ class TestSaveModel(test.TestCase, parameterized.TestCase):
_ = save.load_model(filepath, compile=True)
@keras_parameterized.run_with_all_saved_model_formats
class TestWholeModelSaving(keras_parameterized.TestCase):
def _save_model_dir(self, dirname='saved_model'):
temp_dir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
return os.path.join(temp_dir, dirname)
def _assert_same_weights_and_metrics(self, model, loaded_model):
"""Checks that the loaded weights and metrics are the same as the original.
Args:
model: original model
loaded_model: loaded model
"""
self.assertAllClose(model.weights, loaded_model.weights)
if loaded_model.optimizer:
if testing_utils.get_save_format() == 'tf':
# TODO(b/153110928): Keras TF format doesn't restore optimizer weights
# currently.
return
self.assertAllClose(model.optimizer.weights,
loaded_model.optimizer.weights)
# In V1/Graph mode, the model isn't built, so the metrics are not loaded
# immediately (requires model to be called on some data before building
# metrics).
check_metrics = tf2.enabled() and context.executing_eagerly()
if check_metrics:
self.assertAllEqual([m.name for m in model.metrics],
[m.name for m in loaded_model.metrics])
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_save_and_load(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
save_kwargs = testing_utils.get_save_kwargs()
if ((save_format == 'h5' or not save_kwargs.get('save_traces', True)) and
testing_utils.get_model_type() == 'subclass'):
# HDF5 format currently does not allow saving subclassed models.
# When saving with `save_traces=False`, the subclassed model must have a
# get_config/from_config, which the autogenerated model does not have.
return
with self.cached_session():
model = testing_utils.get_model_from_layers(
[keras.layers.Dense(2),
keras.layers.RepeatVector(3),
keras.layers.TimeDistributed(keras.layers.Dense(3))],
input_shape=(3,))
model.compile(
loss=keras.losses.MSE,
optimizer=keras.optimizer_v2.rmsprop.RMSprop(lr=0.0001),
metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalCrossentropy(
name='cce', label_smoothing=constant_op.constant(0.2)),
],
weighted_metrics=[
keras.metrics.categorical_crossentropy,
keras.metrics.CategoricalCrossentropy(
name='cce', label_smoothing=constant_op.constant(0.2)),
],
sample_weight_mode='temporal')
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(
model, saved_model_dir, save_format=save_format,
**save_kwargs)
loaded_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, loaded_model)
out2 = loaded_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
eval_out = model.evaluate(x, y)
eval_out2 = loaded_model.evaluate(x, y)
self.assertArrayNear(eval_out, eval_out2, 0.001)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_sequential_model_saving_without_input_shape(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
model.compile(
loss=keras.losses.MSE,
optimizer='rmsprop',
metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalAccuracy(name='cat_acc')
],
weighted_metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalAccuracy(name='cat_acc2')
],
sample_weight_mode='temporal')
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
out = model.predict(x)
model.save(saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_sequential_model_saving_without_compile(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
x = np.random.random((1, 3))
out = model.predict(x)
# Save the model without any compilation or training.
keras.models.save_model(model, saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_sequential_model_saving_2(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with ops.Graph().as_default(), self.cached_session():
# test with custom optimizer, loss
class CustomOp(optimizer_v1.RMSprop):
pass
def custom_loss(y_true, y_pred):
return keras.losses.mse(y_true, y_pred)
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss=custom_loss, optimizer=CustomOp(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(
saved_model_dir,
custom_objects={'CustomOp': CustomOp,
'custom_loss': custom_loss})
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_saving_without_compilation(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_with_tf_optimizer(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss='mse',
optimizer=training_module.AdadeltaOptimizer(0.1),
metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_right_after_compilation(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
if not ops.executing_eagerly_outside_functions():
model._make_train_function()
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_lambda_numpy_array_arguments(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
if h5py is None:
self.skipTest('h5py required to run this test')
mean = np.random.random((4, 2, 3))
std = np.abs(np.random.random((4, 2, 3))) + 1e-5
inputs = keras.layers.Input(shape=(4, 2, 3))
output = keras.layers.Lambda(lambda image, mu, std: (image - mu) / std,
arguments={'mu': mean, 'std': std})(inputs)
model = keras.models.Model(inputs, output)
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
self.assertAllClose(mean, model.layers[1].arguments['mu'])
self.assertAllClose(std, model.layers[1].arguments['std'])
def test_saving_model_with_long_layer_names(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
# This layer name will make the `layers_name` HDF5 attribute blow
# out of proportion. Note that it fits into the internal HDF5
# attribute memory limit on its own but because h5py converts
# the list of layer names into numpy array, which uses the same
# amount of memory for every item, it increases the memory
# requirements substantially.
x = keras.Input(shape=(2,), name='input_' + ('x' * (2**15)))
f = x
for i in range(4):
f = keras.layers.Dense(2, name='dense_%d' % (i,))(f)
model = keras.Model(inputs=[x], outputs=[f])
model.compile(
'adam', loss=keras.losses.MeanSquaredError(), metrics=['acc'])
x = np.random.random((1, 2))
y = np.random.random((1, 2))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
if save_format in ['tf', 'tensorflow']:
return
# Check that the HDF5 files contains chunked array
# of layer names.
with h5py.File(saved_model_dir, 'r') as h5file:
num_names_arrays = len([attr for attr in h5file['model_weights'].attrs
if attr.startswith('layer_names')])
# The chunking of layer names array should have happened.
self.assertGreater(num_names_arrays, 0)
out2 = model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_saving_model_with_long_weights_names(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
x = keras.Input(shape=(2,), name='nested_model_input')
f = x
for i in range(4):
f = keras.layers.Dense(2, name='nested_model_dense_%d' % (i,))(f)
# This layer name will make the `weights_name`
# HDF5 attribute blow out of proportion.
f = keras.layers.Dense(2, name='nested_model_output' + ('x' * (2**14)))(f)
nested_model = keras.Model(inputs=[x], outputs=[f], name='nested_model')
x = keras.Input(shape=(2,), name='outer_model_input')
f = nested_model(x)
f = keras.layers.Dense(2, name='outer_model_output')(f)
model = keras.Model(inputs=[x], outputs=[f])
model.compile(loss='mse', optimizer='adam', metrics=['acc'])
x = np.random.random((1, 2))
y = np.random.random((1, 2))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
if save_format in ['h5', 'hdf5', 'keras']:
# Check that the HDF5 files contains chunked array
# of weight names.
with h5py.File(saved_model_dir, 'r') as h5file:
num_weight_arrays = len(
[attr for attr in h5file['model_weights']['nested_model'].attrs
if attr.startswith('weight_names')])
# The chunking of layer names array should have happened.
self.assertGreater(num_weight_arrays, 0)
out2 = model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_model_saving_to_pre_created_h5py_file(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with ops.Graph().as_default(), self.cached_session():
inputs = keras.Input(shape=(3,))
x = keras.layers.Dense(2)(inputs)
outputs = keras.layers.Dense(3)(x)
model = keras.Model(inputs, outputs)
model.compile(
loss=keras.losses.MSE,
optimizer=optimizer_v1.Adam(),
metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalAccuracy()
])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
loaded_model = keras.models.load_model(saved_model_dir)
out1 = loaded_model.predict(x)
self.assertAllClose(out, out1, atol=1e-05)
if save_format in ['tf', 'tensorflow']:
return
# Test h5 format specifically
fd, fname = tempfile.mkstemp('.h5')
with h5py.File(fname, mode='r+') as h5file:
keras.models.save_model(model, h5file)
loaded_model = keras.models.load_model(h5file)
out2 = loaded_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
# Test non-default options in h5
with h5py.File('_', driver='core',
backing_store=False) as h5file:
keras.models.save_model(model, h5file)
loaded_model = keras.models.load_model(h5file)
out2 = loaded_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
# Cleanup
os.close(fd)
os.remove(fname)
def test_model_saving_to_new_dir_path(self):
saved_model_dir = os.path.join(self._save_model_dir(), 'newdir',
'saved_model')
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
x = np.random.random((1, 3))
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_model_raise_exception_with_failed_saving(self):
if h5py is None:
self.skipTest('h5py required to run this test')
saved_model_dir = self._save_model_dir()
saved_model_path = os.path.join(saved_model_dir, 'saved_model.h5')
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
with self.assertRaisesRegex(OSError, 'Unable to create file'):
with h5py.File(saved_model_path, 'w'):
keras.models.save_model(model, saved_model_path)
def test_saving_constant_initializer_with_numpy(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(
keras.layers.Dense(
2,
input_shape=(3,),
kernel_initializer=keras.initializers.Constant(np.ones((3, 2)))))
model.add(keras.layers.Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_group_naming_h5py(self):
# Test saving model with layer which name is prefix to a previous layer
# name.
temp_dir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, temp_dir)
h5_path = os.path.join(temp_dir, 'test.h5')
input_layer = keras.layers.Input((None, None, 3), name='test_input')
x = keras.layers.Conv2D(1, 1, name='conv1/conv')(input_layer)
x = keras.layers.Activation('relu', name='conv1')(x)
model = keras.models.Model(inputs=input_layer, outputs=x)
model.save_weights(h5_path)
model.load_weights(h5_path)
def test_primitive_attrs_contain_no_extraneous_strings(self):
if h5py is None:
self.skipTest('h5py required to run this test')
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(keras.layers.Dense(1, input_shape=[2]))
model.save(saved_model_dir, save_format=save_format)
if save_format in ['tf', 'tensorflow']:
return
h5file = h5py.File(saved_model_dir, 'r')
self.assertRegex(h5file.attrs['keras_version'], r'^[\d]+\.[\d]+\.[\S]+$')
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_functional_model_with_custom_loss_and_metric(self):
def _make_model():
inputs = keras.Input(shape=(4,))
x = keras.layers.Dense(8, activation='relu')(inputs)
outputs = keras.layers.Dense(3, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
custom_loss = keras.layers.Lambda(lambda x: keras.backend.sum(x * x))(x)
model.add_loss(custom_loss)
model.add_metric(custom_loss, aggregation='mean', name='custom_loss')
return model
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = _make_model()
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=optimizers.gradient_descent_v2.SGD(),
metrics=[keras.metrics.SparseCategoricalCrossentropy()])
x = np.random.normal(size=(32, 4))
y = np.random.randint(0, 3, size=32)
model.train_on_batch(x, y)
evaluation_results = model.evaluate(x, y)
# Save and reload model.
model.save(saved_model_dir, save_format=save_format)
del model # Prevent misuse.
loaded_model = keras.models.load_model(saved_model_dir)
loaded_model_eval_results = loaded_model.evaluate(x, y)
# Assert all evaluation results are the same.
self.assertAllClose(evaluation_results, loaded_model_eval_results, 1e-9)
# Check correctness of the loss calculation.
self.assertAllGreater(evaluation_results, 0.)
evaluation_results = dict(
zip(loaded_model.metrics_names, evaluation_results))
self.assertNear(
evaluation_results['sparse_categorical_crossentropy'] +
evaluation_results['custom_loss'], evaluation_results['loss'], 1e-6)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_save_uncompiled_model_with_optimizer(self):
with self.cached_session() as session:
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential([keras.layers.Dense(1, input_shape=(3,))])
# Set the model's optimizer but don't compile. This can happen if the
# model is trained with a custom training loop.
model.optimizer = keras.optimizer_v2.rmsprop.RMSprop(lr=0.0001)
if not context.executing_eagerly():
session.run([v.initializer for v in model.variables])
model.save(saved_model_dir, save_format=save_format)
if save_format in ['tf', 'tensorflow']:
loaded = keras.models.load_model(saved_model_dir)
self.assertIsInstance(loaded.optimizer,
keras.optimizer_v2.optimizer_v2.OptimizerV2)
@combinations.generate(combinations.combine(mode=['eager']))
def test_functional_model_with_getitem_op_layer(self):
inp = keras.Input(shape=(8))
out = inp[:]
model = keras.Model(
inputs=[inp],
outputs=out)
batch_size = 7
x = array_ops.stack([
math_ops.range(8) for _ in range(batch_size)])
args = [x]
expected = x[:]
self.assertAllEqual(model(args), expected)
self.assertAllEqual(model.predict(args, batch_size=batch_size), expected)
# Make sure it can be successfully saved and loaded
save_format = testing_utils.get_save_format()
saved_model_dir = self._save_model_dir()
keras.models.save_model(model, saved_model_dir, save_format=save_format)
loaded_model = keras.models.load_model(saved_model_dir)
self.assertAllEqual(loaded_model(args), expected)
self.assertAllEqual(loaded_model.predict(args, batch_size=batch_size),
expected)
# Factory functions to create models that will be serialized inside a Network.
def _make_graph_network(input_size, output_size):
inputs = keras.Input(input_size)
x = keras.layers.Dense(8, activation='relu')(inputs)
y = keras.layers.Dense(output_size)(x)
return keras.Model(inputs=inputs, outputs=y)
def _make_sequential(input_size, output_size):
del input_size
return keras.Sequential([
keras.layers.Dense(8, activation='relu'),
keras.layers.Dense(output_size),
])
def _make_sequential_built(input_size, output_size):
model = _make_sequential(input_size, output_size)
model.build((None, input_size))
return model
def _make_sequential_graph_network(input_size, output_size):
return keras.Sequential([
keras.layers.InputLayer(input_size),
keras.layers.Dense(8, activation='relu'),
keras.layers.Dense(output_size),
])
def _make_sequential_input_shape(input_size, output_size):
return keras.Sequential([
keras.layers.Dense(8, activation='relu', input_shape=(input_size,)),
keras.layers.Dense(output_size),
])
class _make_subclassed(keras.Model): # pylint: disable=invalid-name
def __init__(self, input_size, output_size):
super(_make_subclassed, self).__init__()
self._config = {'input_size': input_size, 'output_size': output_size}
self._hidden_layer = keras.layers.Dense(8, activation='relu', name='hidden')
self._logits_layer = keras.layers.Dense(output_size, name='logits')
def call(self, inputs):
x = self._hidden_layer(inputs)
return self._logits_layer(x)
def get_config(self):
return self._config
@classmethod
def from_config(cls, config):
return cls(**config)
class _make_subclassed_built(_make_subclassed): # pylint: disable=invalid-name
def __init__(self, input_size, output_size):
super(_make_subclassed_built, self).__init__(input_size, output_size)
self.build((None, input_size))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class TestWholeModelSavingWithNesting(test.TestCase, parameterized.TestCase):
"""Tests saving a whole model that contains other models."""
@parameterized.named_parameters([
('graph_network', _make_graph_network),
('sequential', _make_sequential),
('sequential_built', _make_sequential_built),
('sequential_graph_network', _make_sequential_graph_network),
('sequential_input_shape', _make_sequential_input_shape),
('subclassed', _make_subclassed),
('subclassed_built', _make_subclassed_built),
])
def test_functional(self, model_fn):
"""Tests serializing a model that uses a nested model to share weights."""
if h5py is None:
self.skipTest('h5py required to run this test')
def _make_model():
inputs = (keras.Input(shape=(4,), name='examples'),
keras.Input(shape=(4,), name='neighbors'))
base_model = model_fn(inputs[0].shape.as_list()[-1], 2)
outputs = keras.layers.add([base_model(inputs[0]), base_model(inputs[1])])
return keras.Model(inputs=inputs, outputs=outputs)
with self.cached_session():
x = (np.random.normal(size=(16, 4)).astype(np.float32),
np.random.normal(size=(16, 4)).astype(np.float32))
model = _make_model()
predictions = model(x)
# Save and reload.
model_path = os.path.join(self.get_temp_dir(), 'model.h5')
model.save(model_path)
del model
loaded_model = keras.models.load_model(
model_path,
custom_objects={
'_make_subclassed': _make_subclassed,
'_make_subclassed_built': _make_subclassed_built,
},
compile=False)
self.assertAllClose(loaded_model(x), predictions, 1e-9)
if __name__ == '__main__':
test.main()

View File

@ -20,14 +20,12 @@ from __future__ import print_function
import os
import shutil
import tempfile
import uuid
from absl.testing import parameterized
import numpy as np
from tensorflow.python import keras
from tensorflow.python import tf2
from tensorflow.python.eager import context
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
@ -35,13 +33,11 @@ from tensorflow.python.framework import ops
from tensorflow.python.keras import combinations
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import optimizer_v1
from tensorflow.python.keras import optimizers
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import training
from tensorflow.python.keras.saving import hdf5_format
from tensorflow.python.lib.io import file_io
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import random_ops
from tensorflow.python.platform import test
from tensorflow.python.platform import tf_logging as logging
@ -244,7 +240,8 @@ class TestWeightSavingAndLoading(test.TestCase, parameterized.TestCase):
self.assertAllClose(y, ref_y)
@keras_parameterized.run_with_all_saved_model_formats
@keras_parameterized.run_with_all_saved_model_formats(
exclude_formats=['tf_no_traces'])
def test_nested_model_weight_loading(self):
save_format = testing_utils.get_save_format()
temp_dir = self.get_temp_dir()
@ -371,653 +368,6 @@ class TestWeightSavingAndLoading(test.TestCase, parameterized.TestCase):
keras.backend.get_value(model.layers[1].bias))
@keras_parameterized.run_with_all_saved_model_formats
class TestWholeModelSaving(keras_parameterized.TestCase):
def _save_model_dir(self, dirname='saved_model'):
temp_dir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
return os.path.join(temp_dir, dirname)
def _assert_same_weights_and_metrics(self, model, loaded_model):
"""Checks that the loaded weights and metrics are the same as the original.
Args:
model: original model
loaded_model: loaded model
"""
self.assertAllClose(model.weights, loaded_model.weights)
if loaded_model.optimizer:
if testing_utils.get_save_format() == 'tf':
# TODO(b/153110928): Keras TF format doesn't restore optimizer weights
# currently.
return
self.assertAllClose(model.optimizer.weights,
loaded_model.optimizer.weights)
# In V1/Graph mode, the model isn't built, so the metrics are not loaded
# immediately (requires model to be called on some data before building
# metrics).
check_metrics = tf2.enabled() and context.executing_eagerly()
if check_metrics:
self.assertAllEqual([m.name for m in model.metrics],
[m.name for m in loaded_model.metrics])
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_save_and_load(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
save_kwargs = testing_utils.get_save_kwargs()
if ((save_format == 'h5' or not save_kwargs.get('save_traces', True)) and
testing_utils.get_model_type() == 'subclass'):
# HDF5 format currently does not allow saving subclassed models.
# When saving with `save_traces=False`, the subclassed model must have a
# get_config/from_config, which the autogenerated model does not have.
return
with self.cached_session():
model = testing_utils.get_model_from_layers(
[keras.layers.Dense(2),
keras.layers.RepeatVector(3),
keras.layers.TimeDistributed(keras.layers.Dense(3))],
input_shape=(3,))
model.compile(
loss=keras.losses.MSE,
optimizer=keras.optimizer_v2.rmsprop.RMSprop(lr=0.0001),
metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalCrossentropy(
name='cce', label_smoothing=constant_op.constant(0.2)),
],
weighted_metrics=[
keras.metrics.categorical_crossentropy,
keras.metrics.CategoricalCrossentropy(
name='cce', label_smoothing=constant_op.constant(0.2)),
],
sample_weight_mode='temporal')
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(
model, saved_model_dir, save_format=save_format,
**save_kwargs)
loaded_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, loaded_model)
out2 = loaded_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
eval_out = model.evaluate(x, y)
eval_out2 = loaded_model.evaluate(x, y)
self.assertArrayNear(eval_out, eval_out2, 0.001)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_sequential_model_saving_without_input_shape(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
model.compile(
loss=keras.losses.MSE,
optimizer='rmsprop',
metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalAccuracy(name='cat_acc')
],
weighted_metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalAccuracy(name='cat_acc2')
],
sample_weight_mode='temporal')
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
out = model.predict(x)
model.save(saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_sequential_model_saving_without_compile(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
x = np.random.random((1, 3))
out = model.predict(x)
# Save the model without any compilation or training.
keras.models.save_model(model, saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_sequential_model_saving_2(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with ops.Graph().as_default(), self.cached_session():
# test with custom optimizer, loss
class CustomOp(optimizer_v1.RMSprop):
pass
def custom_loss(y_true, y_pred):
return keras.losses.mse(y_true, y_pred)
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss=custom_loss, optimizer=CustomOp(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(
saved_model_dir,
custom_objects={'CustomOp': CustomOp,
'custom_loss': custom_loss})
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_saving_without_compilation(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_with_tf_optimizer(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss='mse',
optimizer=training_module.AdadeltaOptimizer(0.1),
metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_right_after_compilation(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
if not ops.executing_eagerly_outside_functions():
model._make_train_function()
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_lambda_numpy_array_arguments(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
if h5py is None:
self.skipTest('h5py required to run this test')
mean = np.random.random((4, 2, 3))
std = np.abs(np.random.random((4, 2, 3))) + 1e-5
inputs = keras.layers.Input(shape=(4, 2, 3))
output = keras.layers.Lambda(lambda image, mu, std: (image - mu) / std,
arguments={'mu': mean, 'std': std})(inputs)
model = keras.models.Model(inputs, output)
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
self.assertAllClose(mean, model.layers[1].arguments['mu'])
self.assertAllClose(std, model.layers[1].arguments['std'])
def test_saving_model_with_long_layer_names(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
# This layer name will make the `layers_name` HDF5 attribute blow
# out of proportion. Note that it fits into the internal HDF5
# attribute memory limit on its own but because h5py converts
# the list of layer names into numpy array, which uses the same
# amount of memory for every item, it increases the memory
# requirements substantially.
x = keras.Input(shape=(2,), name='input_' + ('x' * (2**15)))
f = x
for i in range(4):
f = keras.layers.Dense(2, name='dense_%d' % (i,))(f)
model = keras.Model(inputs=[x], outputs=[f])
model.compile(
'adam', loss=keras.losses.MeanSquaredError(), metrics=['acc'])
x = np.random.random((1, 2))
y = np.random.random((1, 2))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
if save_format in ['tf', 'tensorflow']:
return
# Check that the HDF5 files contains chunked array
# of layer names.
with h5py.File(saved_model_dir, 'r') as h5file:
num_names_arrays = len([attr for attr in h5file['model_weights'].attrs
if attr.startswith('layer_names')])
# The chunking of layer names array should have happened.
self.assertGreater(num_names_arrays, 0)
out2 = model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_saving_model_with_long_weights_names(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
x = keras.Input(shape=(2,), name='nested_model_input')
f = x
for i in range(4):
f = keras.layers.Dense(2, name='nested_model_dense_%d' % (i,))(f)
# This layer name will make the `weights_name`
# HDF5 attribute blow out of proportion.
f = keras.layers.Dense(2, name='nested_model_output' + ('x' * (2**14)))(f)
nested_model = keras.Model(inputs=[x], outputs=[f], name='nested_model')
x = keras.Input(shape=(2,), name='outer_model_input')
f = nested_model(x)
f = keras.layers.Dense(2, name='outer_model_output')(f)
model = keras.Model(inputs=[x], outputs=[f])
model.compile(loss='mse', optimizer='adam', metrics=['acc'])
x = np.random.random((1, 2))
y = np.random.random((1, 2))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
if save_format in ['h5', 'hdf5', 'keras']:
# Check that the HDF5 files contains chunked array
# of weight names.
with h5py.File(saved_model_dir, 'r') as h5file:
num_weight_arrays = len(
[attr for attr in h5file['model_weights']['nested_model'].attrs
if attr.startswith('weight_names')])
# The chunking of layer names array should have happened.
self.assertGreater(num_weight_arrays, 0)
out2 = model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_model_saving_to_pre_created_h5py_file(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with ops.Graph().as_default(), self.cached_session():
inputs = keras.Input(shape=(3,))
x = keras.layers.Dense(2)(inputs)
outputs = keras.layers.Dense(3)(x)
model = keras.Model(inputs, outputs)
model.compile(
loss=keras.losses.MSE,
optimizer=optimizer_v1.Adam(),
metrics=[
keras.metrics.categorical_accuracy,
keras.metrics.CategoricalAccuracy()
])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
loaded_model = keras.models.load_model(saved_model_dir)
out1 = loaded_model.predict(x)
self.assertAllClose(out, out1, atol=1e-05)
if save_format in ['tf', 'tensorflow']:
return
# Test h5 format specifically
fd, fname = tempfile.mkstemp('.h5')
with h5py.File(fname, mode='r+') as h5file:
keras.models.save_model(model, h5file)
loaded_model = keras.models.load_model(h5file)
out2 = loaded_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
# Test non-default options in h5
with h5py.File('_', driver='core',
backing_store=False) as h5file:
keras.models.save_model(model, h5file)
loaded_model = keras.models.load_model(h5file)
out2 = loaded_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
# Cleanup
os.close(fd)
os.remove(fname)
def test_model_saving_to_new_dir_path(self):
saved_model_dir = os.path.join(self._save_model_dir(), 'newdir',
'saved_model')
save_format = testing_utils.get_save_format()
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
x = np.random.random((1, 3))
out = model.predict(x)
keras.models.save_model(model, saved_model_dir, save_format=save_format)
new_model = keras.models.load_model(saved_model_dir)
self._assert_same_weights_and_metrics(model, new_model)
out2 = new_model.predict(x)
self.assertAllClose(out, out2, atol=1e-05)
def test_model_raise_exception_with_failed_saving(self):
if h5py is None:
self.skipTest('h5py required to run this test')
saved_model_dir = self._save_model_dir()
saved_model_path = os.path.join(saved_model_dir, 'saved_model.h5')
with self.cached_session():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_shape=(3,)))
model.add(keras.layers.RepeatVector(3))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
with self.assertRaisesRegex(OSError, 'Unable to create file'):
with h5py.File(saved_model_path, 'w'):
keras.models.save_model(model, saved_model_path)
def test_saving_constant_initializer_with_numpy(self):
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(
keras.layers.Dense(
2,
input_shape=(3,),
kernel_initializer=keras.initializers.Constant(np.ones((3, 2)))))
model.add(keras.layers.Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
keras.models.save_model(model, saved_model_dir, save_format=save_format)
model = keras.models.load_model(saved_model_dir)
def test_saving_group_naming_h5py(self):
# Test saving model with layer which name is prefix to a previous layer
# name.
temp_dir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, temp_dir)
h5_path = os.path.join(temp_dir, 'test.h5')
input_layer = keras.layers.Input((None, None, 3), name='test_input')
x = keras.layers.Conv2D(1, 1, name='conv1/conv')(input_layer)
x = keras.layers.Activation('relu', name='conv1')(x)
model = keras.models.Model(inputs=input_layer, outputs=x)
model.save_weights(h5_path)
model.load_weights(h5_path)
def test_primitive_attrs_contain_no_extraneous_strings(self):
if h5py is None:
self.skipTest('h5py required to run this test')
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential()
model.add(keras.layers.Dense(1, input_shape=[2]))
model.save(saved_model_dir, save_format=save_format)
if save_format in ['tf', 'tensorflow']:
return
h5file = h5py.File(saved_model_dir, 'r')
self.assertRegex(h5file.attrs['keras_version'], r'^[\d]+\.[\d]+\.[\S]+$')
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_functional_model_with_custom_loss_and_metric(self):
def _make_model():
inputs = keras.Input(shape=(4,))
x = keras.layers.Dense(8, activation='relu')(inputs)
outputs = keras.layers.Dense(3, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
custom_loss = keras.layers.Lambda(lambda x: keras.backend.sum(x * x))(x)
model.add_loss(custom_loss)
model.add_metric(custom_loss, aggregation='mean', name='custom_loss')
return model
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
with self.cached_session():
model = _make_model()
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=optimizers.gradient_descent_v2.SGD(),
metrics=[keras.metrics.SparseCategoricalCrossentropy()])
x = np.random.normal(size=(32, 4))
y = np.random.randint(0, 3, size=32)
model.train_on_batch(x, y)
evaluation_results = model.evaluate(x, y)
# Save and reload model.
model.save(saved_model_dir, save_format=save_format)
del model # Prevent misuse.
loaded_model = keras.models.load_model(saved_model_dir)
loaded_model_eval_results = loaded_model.evaluate(x, y)
# Assert all evaluation results are the same.
self.assertAllClose(evaluation_results, loaded_model_eval_results, 1e-9)
# Check correctness of the loss calculation.
self.assertAllGreater(evaluation_results, 0.)
evaluation_results = dict(
zip(loaded_model.metrics_names, evaluation_results))
self.assertNear(
evaluation_results['sparse_categorical_crossentropy'] +
evaluation_results['custom_loss'], evaluation_results['loss'], 1e-6)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_save_uncompiled_model_with_optimizer(self):
with self.cached_session() as session:
saved_model_dir = self._save_model_dir()
save_format = testing_utils.get_save_format()
model = keras.models.Sequential([keras.layers.Dense(1, input_shape=(3,))])
# Set the model's optimizer but don't compile. This can happen if the
# model is trained with a custom training loop.
model.optimizer = keras.optimizer_v2.rmsprop.RMSprop(lr=0.0001)
if not context.executing_eagerly():
session.run([v.initializer for v in model.variables])
model.save(saved_model_dir, save_format=save_format)
if save_format in ['tf', 'tensorflow']:
loaded = keras.models.load_model(saved_model_dir)
self.assertIsInstance(loaded.optimizer,
keras.optimizer_v2.optimizer_v2.OptimizerV2)
@combinations.generate(combinations.combine(mode=['eager']))
def test_functional_model_with_getitem_op_layer(self):
inp = keras.Input(shape=(8))
out = inp[:]
model = keras.Model(
inputs=[inp],
outputs=out)
batch_size = 7
x = array_ops.stack([
math_ops.range(8) for _ in range(batch_size)])
args = [x]
expected = x[:]
self.assertAllEqual(model(args), expected)
self.assertAllEqual(model.predict(args, batch_size=batch_size), expected)
# Make sure it can be successfully saved and loaded
save_format = testing_utils.get_save_format()
saved_model_dir = self._save_model_dir()
keras.models.save_model(model, saved_model_dir, save_format=save_format)
loaded_model = keras.models.load_model(saved_model_dir)
self.assertAllEqual(loaded_model(args), expected)
self.assertAllEqual(loaded_model.predict(args, batch_size=batch_size),
expected)
# Factory functions to create models that will be serialized inside a Network.
def _make_graph_network(input_size, output_size):
inputs = keras.Input(input_size)
x = keras.layers.Dense(8, activation='relu')(inputs)
y = keras.layers.Dense(output_size)(x)
return keras.Model(inputs=inputs, outputs=y)
def _make_sequential(input_size, output_size):
del input_size
return keras.Sequential([
keras.layers.Dense(8, activation='relu'),
keras.layers.Dense(output_size),
])
def _make_sequential_built(input_size, output_size):
model = _make_sequential(input_size, output_size)
model.build((None, input_size))
return model
def _make_sequential_graph_network(input_size, output_size):
return keras.Sequential([
keras.layers.InputLayer(input_size),
keras.layers.Dense(8, activation='relu'),
keras.layers.Dense(output_size),
])
def _make_sequential_input_shape(input_size, output_size):
return keras.Sequential([
keras.layers.Dense(8, activation='relu', input_shape=(input_size,)),
keras.layers.Dense(output_size),
])
class _make_subclassed(keras.Model): # pylint: disable=invalid-name
def __init__(self, input_size, output_size):
super(_make_subclassed, self).__init__()
self._config = {'input_size': input_size, 'output_size': output_size}
self._hidden_layer = keras.layers.Dense(8, activation='relu', name='hidden')
self._logits_layer = keras.layers.Dense(output_size, name='logits')
def call(self, inputs):
x = self._hidden_layer(inputs)
return self._logits_layer(x)
def get_config(self):
return self._config
@classmethod
def from_config(cls, config):
return cls(**config)
class _make_subclassed_built(_make_subclassed): # pylint: disable=invalid-name
def __init__(self, input_size, output_size):
super(_make_subclassed_built, self).__init__(input_size, output_size)
self.build((None, input_size))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class TestWholeModelSavingWithNesting(test.TestCase, parameterized.TestCase):
"""Tests saving a whole model that contains other models."""
@parameterized.named_parameters([
('graph_network', _make_graph_network),
('sequential', _make_sequential),
('sequential_built', _make_sequential_built),
('sequential_graph_network', _make_sequential_graph_network),
('sequential_input_shape', _make_sequential_input_shape),
('subclassed', _make_subclassed),
('subclassed_built', _make_subclassed_built),
])
def test_functional(self, model_fn):
"""Tests serializing a model that uses a nested model to share weights."""
if h5py is None:
self.skipTest('h5py required to run this test')
def _make_model():
inputs = (keras.Input(shape=(4,), name='examples'),
keras.Input(shape=(4,), name='neighbors'))
base_model = model_fn(inputs[0].shape.as_list()[-1], 2)
outputs = keras.layers.add([base_model(inputs[0]), base_model(inputs[1])])
return keras.Model(inputs=inputs, outputs=outputs)
with self.cached_session():
x = (np.random.normal(size=(16, 4)).astype(np.float32),
np.random.normal(size=(16, 4)).astype(np.float32))
model = _make_model()
predictions = model(x)
# Save and reload.
model_path = os.path.join(self.get_temp_dir(), 'model.h5')
model.save(model_path)
del model
loaded_model = keras.models.load_model(
model_path,
custom_objects={
'_make_subclassed': _make_subclassed,
'_make_subclassed_built': _make_subclassed_built,
},
compile=False)
self.assertAllClose(loaded_model(x), predictions, 1e-9)
class SubclassedModel(training.Model):
def __init__(self):