With this change, it is now possible to mix-and-match tf.keras.Layers and tf.Modules inside a tf.keras.Model and everything will be tracked properly. - Variables in tf.Modules that are set as attributes of custom Layers and Models now show up properly in properties such as Layer.trainable_variables and Model.trainable_variables. - tf.Modules do not show up in Model.layers. Instead, a new method Layer._flatten_modules is added that iterates over tf.Modules and Layers in the order that Keras expects. The existing method Layer.submodules (inherited from tf.Module) can still be used to iterate over tf.Modules and Layer with the tf.Module ordering. Layer._flatten_layers is built on top of Layer._flatten_modules - Layer._layers is renamed to Layer._self_tracked_trackables to avoid naming conflicts with user-defined attributes (and to reflect that this attr contains Layers, Modules, and TrackableDataStructures) - A new property is added to tf.Module to enable this, namely tf.Module.non_trainable_variables PiperOrigin-RevId: 339917644 Change-Id: I96a7302745280a6261de8c4295c5cbf5f4d7dd5c
223 lines
8.5 KiB
Python
223 lines
8.5 KiB
Python
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""Tests specific to deferred-build `Sequential` models."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import os
|
|
import unittest
|
|
import numpy as np
|
|
|
|
from tensorflow.python import keras
|
|
from tensorflow.python.compat import v2_compat
|
|
from tensorflow.python.keras import keras_parameterized
|
|
from tensorflow.python.keras import testing_utils
|
|
from tensorflow.python.ops import math_ops
|
|
from tensorflow.python.platform import test
|
|
|
|
try:
|
|
import h5py # pylint:disable=g-import-not-at-top
|
|
except ImportError:
|
|
h5py = None
|
|
|
|
|
|
class TestDeferredSequential(keras_parameterized.TestCase):
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_build_behavior(self):
|
|
# Test graph network creation after __call__
|
|
model = get_model()
|
|
model(np.random.random((2, 6)))
|
|
self.assertLen(model.weights, 4)
|
|
self.assertTrue(model._is_graph_network)
|
|
self.assertLen(model.inputs, 1)
|
|
self.assertLen(model.outputs, 1)
|
|
self.assertEqual(model.inputs[0].shape.as_list(), [2, 6])
|
|
self.assertEqual(model.outputs[0].shape.as_list(), [2, 2])
|
|
|
|
# Test effect of new __call__ with a different shape
|
|
model(np.random.random((3, 6)))
|
|
self.assertLen(model.inputs, 1)
|
|
self.assertLen(model.outputs, 1)
|
|
self.assertEqual(model.inputs[0].shape.as_list(), [None, 6])
|
|
self.assertEqual(model.outputs[0].shape.as_list(), [None, 2])
|
|
model(np.random.random((4, 6)))
|
|
self.assertLen(model.inputs, 1)
|
|
self.assertLen(model.outputs, 1)
|
|
self.assertEqual(model.inputs[0].shape.as_list(), [None, 6])
|
|
self.assertEqual(model.outputs[0].shape.as_list(), [None, 2])
|
|
|
|
# Test graph network creation after build
|
|
model = get_model()
|
|
model.build((None, 6))
|
|
self.assertLen(model.weights, 4)
|
|
self.assertTrue(model._is_graph_network)
|
|
self.assertLen(model.inputs, 1)
|
|
self.assertLen(model.outputs, 1)
|
|
self.assertEqual(model.inputs[0].shape.as_list(), [None, 6])
|
|
self.assertEqual(model.outputs[0].shape.as_list(), [None, 2])
|
|
|
|
# Test graph network creation after compile/fit
|
|
model = get_model()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer='rmsprop',
|
|
metrics=[keras.metrics.CategoricalAccuracy()],
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.fit(np.zeros((2, 6)), np.zeros((2, 2)))
|
|
self.assertLen(model.weights, 4)
|
|
self.assertTrue(model._is_graph_network)
|
|
self.assertLen(model.inputs, 1)
|
|
self.assertLen(model.outputs, 1)
|
|
# Inconsistency here: with eager `fit`, the model is built with shape
|
|
# (2, 6), but with graph function `fit`, it is built with shape `(None, 6)`.
|
|
# This is likely due to our assumption "the batch size should be dynamic"
|
|
# at the level of `Model`. TODO(fchollet): investigate and resolve.
|
|
self.assertEqual(model.inputs[0].shape.as_list()[-1], 6)
|
|
self.assertEqual(model.outputs[0].shape.as_list()[-1], 2)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_add_and_pop(self):
|
|
model = get_model()
|
|
model.build((None, 6))
|
|
self.assertTrue(model.built)
|
|
self.assertTrue(model._is_graph_network)
|
|
self.assertLen(model.layers, 3)
|
|
self.assertLen(model.weights, 4)
|
|
model.pop()
|
|
self.assertTrue(model.built)
|
|
self.assertTrue(model._is_graph_network)
|
|
self.assertLen(model.layers, 2)
|
|
self.assertLen(model.weights, 2)
|
|
model.add(keras.layers.Dense(2))
|
|
self.assertTrue(model.built)
|
|
self.assertTrue(model._is_graph_network)
|
|
self.assertLen(model.layers, 3)
|
|
self.assertLen(model.weights, 4)
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_feature_extraction(self):
|
|
# This tests layer connectivity reset when rebuilding
|
|
model = get_model()
|
|
model(np.random.random((3, 6))) # First build
|
|
model(np.random.random((4, 6))) # Triggers a rebuild
|
|
# Classic feature extractor pattern
|
|
extractor = keras.Model(inputs=model.inputs,
|
|
outputs=[layer.output for layer in model.layers])
|
|
# Check that inputs and outputs are connected
|
|
_ = extractor(np.random.random((4, 6)))
|
|
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_saving_savedmodel(self):
|
|
model = get_model()
|
|
model(np.random.random((3, 6))) # Build model
|
|
|
|
path = os.path.join(self.get_temp_dir(), 'model_path')
|
|
model.save(path)
|
|
new_model = keras.models.load_model(path)
|
|
model_layers = model._flatten_layers(include_self=True, recursive=False)
|
|
new_model_layers = new_model._flatten_layers(
|
|
include_self=True, recursive=False)
|
|
for layer1, layer2 in zip(model_layers, new_model_layers):
|
|
self.assertEqual(layer1.name, layer2.name)
|
|
for w1, w2 in zip(layer1.weights, layer2.weights):
|
|
self.assertAllClose(w1, w2)
|
|
|
|
@unittest.skipIf(h5py is None, 'Test requires h5py')
|
|
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
|
|
def test_saving_h5(self):
|
|
path = os.path.join(self.get_temp_dir(), 'model_path.h5')
|
|
model = get_model()
|
|
model(np.random.random((3, 6))) # Build model
|
|
|
|
path = os.path.join(self.get_temp_dir(), 'model_path.h5')
|
|
model.save(path)
|
|
new_model = keras.models.load_model(path)
|
|
model_layers = model._flatten_layers(include_self=True, recursive=False)
|
|
new_model_layers = new_model._flatten_layers(
|
|
include_self=True, recursive=False)
|
|
for layer1, layer2 in zip(model_layers, new_model_layers):
|
|
self.assertEqual(layer1.name, layer2.name)
|
|
for w1, w2 in zip(layer1.weights, layer2.weights):
|
|
self.assertAllClose(w1, w2)
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_shared_layer(self):
|
|
# This tests that preexisting layer connectivity is preserved
|
|
# when auto-building graph networks
|
|
shared_layer = keras.layers.Dense(2)
|
|
m1 = keras.Sequential([shared_layer])
|
|
m1(np.random.random((3, 6)))
|
|
m2 = keras.Sequential([shared_layer])
|
|
m2(np.random.random((3, 6)))
|
|
# Nesting case
|
|
shared_layer = keras.layers.Dense(2)
|
|
m1 = keras.Sequential([shared_layer])
|
|
m2 = keras.Sequential([shared_layer, m1])
|
|
m2(np.random.random((3, 2)))
|
|
|
|
@keras_parameterized.run_all_keras_modes
|
|
def test_loss_layer(self):
|
|
class LossLayer(keras.layers.Layer):
|
|
|
|
def call(self, inputs):
|
|
self.add_loss(math_ops.reduce_sum(inputs))
|
|
return inputs
|
|
|
|
# Test loss layer alone
|
|
model = keras.Sequential([LossLayer()])
|
|
model.compile('rmsprop', run_eagerly=testing_utils.should_run_eagerly())
|
|
loss = model.train_on_batch(np.ones((2, 2)))
|
|
self.assertAllClose(loss, 4.)
|
|
model(np.random.random((4, 2))) # Triggers a rebuild
|
|
loss = model.train_on_batch(np.ones((1, 2)))
|
|
self.assertAllClose(loss, 2.)
|
|
|
|
# Test loss layer combined with another layer
|
|
model = keras.Sequential([
|
|
keras.layers.Dense(1, kernel_initializer='ones'),
|
|
LossLayer()])
|
|
model.compile('rmsprop', run_eagerly=testing_utils.should_run_eagerly())
|
|
loss = model.train_on_batch(np.ones((2, 2)))
|
|
self.assertAllClose(loss, 4.)
|
|
model(np.random.random((4, 2))) # Triggers a rebuild
|
|
loss = model.train_on_batch(np.ones((1, 2)))
|
|
self.assertLess(loss, 2.)
|
|
|
|
# Test loss layer combined with external loss
|
|
model = keras.Sequential([
|
|
keras.layers.Dense(1, kernel_initializer='ones'),
|
|
LossLayer()])
|
|
model.compile('rmsprop', 'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
loss = model.train_on_batch(np.ones((2, 2)), np.ones((2, 2)))
|
|
model(np.random.random((4, 2))) # Triggers a rebuild
|
|
loss = model.train_on_batch(np.ones((1, 2)), np.ones((1, 2)))
|
|
|
|
|
|
def get_model():
|
|
model = keras.models.Sequential()
|
|
model.add(keras.layers.Dense(2, name='first_layer'))
|
|
model.add(keras.layers.Dropout(0.3, name='dp'))
|
|
model.add(keras.layers.Dense(2, name='last_layer'))
|
|
return model
|
|
|
|
|
|
if __name__ == '__main__':
|
|
v2_compat.enable_v2_behavior()
|
|
test.main()
|