STT-tensorflow/tensorflow/python/keras/engine/deferred_sequential_test.py
Thomas O'Malley d266494953 Support TF Modules inside Keras Layers and Models.
With this change, it is now possible to mix-and-match tf.keras.Layers and
tf.Modules inside a tf.keras.Model and everything will be tracked properly.

- Variables in tf.Modules that are set as attributes of custom Layers and
  Models now show up properly in properties such as Layer.trainable_variables
  and Model.trainable_variables.
- tf.Modules do not show up in Model.layers. Instead, a new method
  Layer._flatten_modules is added that iterates over tf.Modules and Layers in
  the order that Keras expects. The existing method Layer.submodules (inherited
  from tf.Module) can still be used to iterate over tf.Modules and Layer with the
  tf.Module ordering. Layer._flatten_layers is built on top of
  Layer._flatten_modules
- Layer._layers is renamed to Layer._self_tracked_trackables to avoid naming
  conflicts with user-defined attributes (and to reflect that this attr
  contains Layers, Modules, and TrackableDataStructures)
- A new property is added to tf.Module to enable this, namely
  tf.Module.non_trainable_variables

PiperOrigin-RevId: 339917644
Change-Id: I96a7302745280a6261de8c4295c5cbf5f4d7dd5c
2020-10-30 12:27:12 -07:00

223 lines
8.5 KiB
Python

# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests specific to deferred-build `Sequential` models."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import unittest
import numpy as np
from tensorflow.python import keras
from tensorflow.python.compat import v2_compat
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.ops import math_ops
from tensorflow.python.platform import test
try:
import h5py # pylint:disable=g-import-not-at-top
except ImportError:
h5py = None
class TestDeferredSequential(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_build_behavior(self):
# Test graph network creation after __call__
model = get_model()
model(np.random.random((2, 6)))
self.assertLen(model.weights, 4)
self.assertTrue(model._is_graph_network)
self.assertLen(model.inputs, 1)
self.assertLen(model.outputs, 1)
self.assertEqual(model.inputs[0].shape.as_list(), [2, 6])
self.assertEqual(model.outputs[0].shape.as_list(), [2, 2])
# Test effect of new __call__ with a different shape
model(np.random.random((3, 6)))
self.assertLen(model.inputs, 1)
self.assertLen(model.outputs, 1)
self.assertEqual(model.inputs[0].shape.as_list(), [None, 6])
self.assertEqual(model.outputs[0].shape.as_list(), [None, 2])
model(np.random.random((4, 6)))
self.assertLen(model.inputs, 1)
self.assertLen(model.outputs, 1)
self.assertEqual(model.inputs[0].shape.as_list(), [None, 6])
self.assertEqual(model.outputs[0].shape.as_list(), [None, 2])
# Test graph network creation after build
model = get_model()
model.build((None, 6))
self.assertLen(model.weights, 4)
self.assertTrue(model._is_graph_network)
self.assertLen(model.inputs, 1)
self.assertLen(model.outputs, 1)
self.assertEqual(model.inputs[0].shape.as_list(), [None, 6])
self.assertEqual(model.outputs[0].shape.as_list(), [None, 2])
# Test graph network creation after compile/fit
model = get_model()
model.compile(
loss='mse',
optimizer='rmsprop',
metrics=[keras.metrics.CategoricalAccuracy()],
run_eagerly=testing_utils.should_run_eagerly())
model.fit(np.zeros((2, 6)), np.zeros((2, 2)))
self.assertLen(model.weights, 4)
self.assertTrue(model._is_graph_network)
self.assertLen(model.inputs, 1)
self.assertLen(model.outputs, 1)
# Inconsistency here: with eager `fit`, the model is built with shape
# (2, 6), but with graph function `fit`, it is built with shape `(None, 6)`.
# This is likely due to our assumption "the batch size should be dynamic"
# at the level of `Model`. TODO(fchollet): investigate and resolve.
self.assertEqual(model.inputs[0].shape.as_list()[-1], 6)
self.assertEqual(model.outputs[0].shape.as_list()[-1], 2)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_add_and_pop(self):
model = get_model()
model.build((None, 6))
self.assertTrue(model.built)
self.assertTrue(model._is_graph_network)
self.assertLen(model.layers, 3)
self.assertLen(model.weights, 4)
model.pop()
self.assertTrue(model.built)
self.assertTrue(model._is_graph_network)
self.assertLen(model.layers, 2)
self.assertLen(model.weights, 2)
model.add(keras.layers.Dense(2))
self.assertTrue(model.built)
self.assertTrue(model._is_graph_network)
self.assertLen(model.layers, 3)
self.assertLen(model.weights, 4)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_feature_extraction(self):
# This tests layer connectivity reset when rebuilding
model = get_model()
model(np.random.random((3, 6))) # First build
model(np.random.random((4, 6))) # Triggers a rebuild
# Classic feature extractor pattern
extractor = keras.Model(inputs=model.inputs,
outputs=[layer.output for layer in model.layers])
# Check that inputs and outputs are connected
_ = extractor(np.random.random((4, 6)))
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_saving_savedmodel(self):
model = get_model()
model(np.random.random((3, 6))) # Build model
path = os.path.join(self.get_temp_dir(), 'model_path')
model.save(path)
new_model = keras.models.load_model(path)
model_layers = model._flatten_layers(include_self=True, recursive=False)
new_model_layers = new_model._flatten_layers(
include_self=True, recursive=False)
for layer1, layer2 in zip(model_layers, new_model_layers):
self.assertEqual(layer1.name, layer2.name)
for w1, w2 in zip(layer1.weights, layer2.weights):
self.assertAllClose(w1, w2)
@unittest.skipIf(h5py is None, 'Test requires h5py')
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_saving_h5(self):
path = os.path.join(self.get_temp_dir(), 'model_path.h5')
model = get_model()
model(np.random.random((3, 6))) # Build model
path = os.path.join(self.get_temp_dir(), 'model_path.h5')
model.save(path)
new_model = keras.models.load_model(path)
model_layers = model._flatten_layers(include_self=True, recursive=False)
new_model_layers = new_model._flatten_layers(
include_self=True, recursive=False)
for layer1, layer2 in zip(model_layers, new_model_layers):
self.assertEqual(layer1.name, layer2.name)
for w1, w2 in zip(layer1.weights, layer2.weights):
self.assertAllClose(w1, w2)
@keras_parameterized.run_all_keras_modes
def test_shared_layer(self):
# This tests that preexisting layer connectivity is preserved
# when auto-building graph networks
shared_layer = keras.layers.Dense(2)
m1 = keras.Sequential([shared_layer])
m1(np.random.random((3, 6)))
m2 = keras.Sequential([shared_layer])
m2(np.random.random((3, 6)))
# Nesting case
shared_layer = keras.layers.Dense(2)
m1 = keras.Sequential([shared_layer])
m2 = keras.Sequential([shared_layer, m1])
m2(np.random.random((3, 2)))
@keras_parameterized.run_all_keras_modes
def test_loss_layer(self):
class LossLayer(keras.layers.Layer):
def call(self, inputs):
self.add_loss(math_ops.reduce_sum(inputs))
return inputs
# Test loss layer alone
model = keras.Sequential([LossLayer()])
model.compile('rmsprop', run_eagerly=testing_utils.should_run_eagerly())
loss = model.train_on_batch(np.ones((2, 2)))
self.assertAllClose(loss, 4.)
model(np.random.random((4, 2))) # Triggers a rebuild
loss = model.train_on_batch(np.ones((1, 2)))
self.assertAllClose(loss, 2.)
# Test loss layer combined with another layer
model = keras.Sequential([
keras.layers.Dense(1, kernel_initializer='ones'),
LossLayer()])
model.compile('rmsprop', run_eagerly=testing_utils.should_run_eagerly())
loss = model.train_on_batch(np.ones((2, 2)))
self.assertAllClose(loss, 4.)
model(np.random.random((4, 2))) # Triggers a rebuild
loss = model.train_on_batch(np.ones((1, 2)))
self.assertLess(loss, 2.)
# Test loss layer combined with external loss
model = keras.Sequential([
keras.layers.Dense(1, kernel_initializer='ones'),
LossLayer()])
model.compile('rmsprop', 'mse',
run_eagerly=testing_utils.should_run_eagerly())
loss = model.train_on_batch(np.ones((2, 2)), np.ones((2, 2)))
model(np.random.random((4, 2))) # Triggers a rebuild
loss = model.train_on_batch(np.ones((1, 2)), np.ones((1, 2)))
def get_model():
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, name='first_layer'))
model.add(keras.layers.Dropout(0.3, name='dp'))
model.add(keras.layers.Dense(2, name='last_layer'))
return model
if __name__ == '__main__':
v2_compat.enable_v2_behavior()
test.main()