STT-tensorflow/tensorflow/python/keras/engine/functional_test.py
Scott Zhu 8f68aad110 Fix functional subclass model used with multiple inheritance.
Fix https://github.com/tensorflow/tensorflow/issues/44646

PiperOrigin-RevId: 343390846
Change-Id: I6f621fb3c70efa1f4181fe08bccd7df2bd5ffdab
2020-11-19 16:21:10 -08:00

2548 lines
90 KiB
Python

# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#,============================================================================
"""Tests for layer graphs construction & handling."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import warnings
import numpy as np
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.keras import backend
from tensorflow.python.keras import combinations
from tensorflow.python.keras import initializers
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import layers
from tensorflow.python.keras import losses
from tensorflow.python.keras import models
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import base_layer
from tensorflow.python.keras.engine import functional
from tensorflow.python.keras.engine import input_layer as input_layer_lib
from tensorflow.python.keras.engine import sequential
from tensorflow.python.keras.engine import training as training_lib
from tensorflow.python.keras.utils import layer_utils
from tensorflow.python.keras.utils import tf_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.ops import string_ops
from tensorflow.python.ops.ragged import ragged_factory_ops
from tensorflow.python.platform import test
from tensorflow.python.training.tracking.util import Checkpoint
try:
import yaml # pylint:disable=g-import-not-at-top
except ImportError:
yaml = None
class NetworkConstructionTest(keras_parameterized.TestCase):
def test_default_model_name(self):
inputs = input_layer_lib.Input(shape=(1,))
outputs = layers.Dense(1, activation='relu')(inputs)
model = training_lib.Model(inputs=inputs, outputs=outputs)
self.assertEqual(model.name, 'model')
model_2 = training_lib.Model(inputs=inputs, outputs=outputs)
self.assertEqual(model_2.name, 'model_1')
model_3 = training_lib.Model(inputs=inputs, outputs=outputs)
self.assertEqual(model_3.name, 'model_2')
def test_get_updates(self):
class MyLayer(layers.Layer):
def build(self, input_shape):
self.a = self.add_variable('a',
(1, 1),
'float32',
trainable=False)
self.b = self.add_variable('b',
(1, 1),
'float32',
trainable=False)
self.add_update(state_ops.assign_add(self.a, [[1.]],
name='unconditional_update'))
self.built = True
def call(self, inputs):
self.add_update(state_ops.assign_add(self.b, inputs,
name='conditional_update'),
inputs=True)
return inputs + 1
with ops.Graph().as_default():
x1 = input_layer_lib.Input(shape=(1,))
layer = MyLayer()
_ = layer(x1)
self.assertEqual(len(layer.updates), 2)
x2 = input_layer_lib.Input(shape=(1,))
y2 = layer(x2)
self.assertEqual(len(layer.updates), 3)
network = functional.Functional(x2, y2)
self.assertEqual(len(network.updates), 3)
x3 = input_layer_lib.Input(shape=(1,))
_ = layer(x3)
self.assertEqual(len(network.updates), 4)
x4 = input_layer_lib.Input(shape=(1,))
_ = network(x4)
self.assertEqual(len(network.updates), 5)
network.add_update(state_ops.assign_add(layer.a, [[1]]))
self.assertEqual(len(network.updates), 6)
network.add_update(state_ops.assign_add(layer.b, x4), inputs=True)
self.assertEqual(len(network.updates), 7)
@combinations.generate(combinations.combine(mode=['graph']))
def test_get_updates_bn(self):
x1 = input_layer_lib.Input(shape=(1,))
layer = layers.BatchNormalization()
_ = layer(x1)
self.assertEqual(len(layer.updates), 2)
def test_get_layer(self):
# create a simple network
x = input_layer_lib.Input(shape=(32,))
dense_a = layers.Dense(4, name='dense_a')
dense_b = layers.Dense(2, name='dense_b')
y = dense_b(dense_a(x))
network = functional.Functional(x, y, name='dense_network')
# test various get_layer by index
self.assertEqual(network.get_layer(index=1), dense_a)
# test invalid get_layer by index
with self.assertRaisesRegex(
ValueError, 'Was asked to retrieve layer at index ' + str(3) +
' but model only has ' + str(len(network.layers)) + ' layers.'):
network.get_layer(index=3)
# test that only one between name and index is requested
with self.assertRaisesRegex(ValueError,
'Provide only a layer name or a layer index'):
network.get_layer(index=1, name='dense_b')
# test that a name or an index must be provided
with self.assertRaisesRegex(ValueError,
'Provide either a layer name or layer index.'):
network.get_layer()
# test various get_layer by name
self.assertEqual(network.get_layer(name='dense_a'), dense_a)
# test invalid get_layer by name
with self.assertRaisesRegex(ValueError, 'No such layer: dense_c.'):
network.get_layer(name='dense_c')
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testTopologicalAttributes(self):
# test layer attributes / methods related to cross-layer connectivity.
a = input_layer_lib.Input(shape=(32,), name='input_a')
b = input_layer_lib.Input(shape=(32,), name='input_b')
# test input, output, input_shape, output_shape
test_layer = layers.Dense(16, name='test_layer')
a_test = test_layer(a)
self.assertIs(test_layer.input, a)
self.assertIs(test_layer.output, a_test)
self.assertEqual(test_layer.input_shape, (None, 32))
self.assertEqual(test_layer.output_shape, (None, 16))
# test `get_*_at` methods
dense = layers.Dense(16, name='dense_1')
a_2 = dense(a)
b_2 = dense(b)
self.assertIs(dense.get_input_at(0), a)
self.assertIs(dense.get_input_at(1), b)
self.assertIs(dense.get_output_at(0), a_2)
self.assertIs(dense.get_output_at(1), b_2)
self.assertEqual(dense.get_input_shape_at(0), (None, 32))
self.assertEqual(dense.get_input_shape_at(1), (None, 32))
self.assertEqual(dense.get_output_shape_at(0), (None, 16))
self.assertEqual(dense.get_output_shape_at(1), (None, 16))
# Test invalid value for attribute retrieval.
with self.assertRaises(ValueError):
dense.get_input_at(2)
with self.assertRaises(AttributeError):
new_dense = layers.Dense(16)
_ = new_dense.input
with self.assertRaises(AttributeError):
new_dense = layers.Dense(16)
_ = new_dense.output
with self.assertRaises(AttributeError):
new_dense = layers.Dense(16)
_ = new_dense.output_shape
with self.assertRaises(AttributeError):
new_dense = layers.Dense(16)
_ = new_dense.input_shape
with self.assertRaises(AttributeError):
new_dense = layers.Dense(16)
a = input_layer_lib.Input(shape=(3, 32))
a = input_layer_lib.Input(shape=(5, 32))
a_2 = dense(a)
b_2 = dense(b)
_ = new_dense.input_shape
with self.assertRaises(AttributeError):
new_dense = layers.Dense(16)
a = input_layer_lib.Input(shape=(3, 32))
a = input_layer_lib.Input(shape=(5, 32))
a_2 = dense(a)
b_2 = dense(b)
_ = new_dense.output_shape
def _assertAllIs(self, a, b):
self.assertTrue(all(x is y for x, y in zip(a, b)))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testTopologicalAttributesMultiOutputLayer(self):
class PowersLayer(layers.Layer):
def call(self, inputs):
return [inputs**2, inputs**3]
x = input_layer_lib.Input(shape=(32,))
test_layer = PowersLayer()
p1, p2 = test_layer(x) # pylint: disable=not-callable
self.assertIs(test_layer.input, x)
self._assertAllIs(test_layer.output, [p1, p2])
self.assertEqual(test_layer.input_shape, (None, 32))
self.assertEqual(test_layer.output_shape, [(None, 32), (None, 32)])
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testTopologicalAttributesMultiInputLayer(self):
class AddLayer(layers.Layer):
def call(self, inputs):
assert len(inputs) == 2
return inputs[0] + inputs[1]
a = input_layer_lib.Input(shape=(32,))
b = input_layer_lib.Input(shape=(32,))
test_layer = AddLayer()
y = test_layer([a, b]) # pylint: disable=not-callable
self._assertAllIs(test_layer.input, [a, b])
self.assertIs(test_layer.output, y)
self.assertEqual(test_layer.input_shape, [(None, 32), (None, 32)])
self.assertEqual(test_layer.output_shape, (None, 32))
def testBasicNetwork(self):
with ops.Graph().as_default():
# minimum viable network
x = input_layer_lib.Input(shape=(32,))
dense = layers.Dense(2)
y = dense(x)
network = functional.Functional(x, y, name='dense_network')
# test basic attributes
self.assertEqual(network.name, 'dense_network')
self.assertEqual(len(network.layers), 2) # InputLayer + Dense
self.assertEqual(network.layers[1], dense)
self._assertAllIs(network.weights, dense.weights)
self._assertAllIs(network.trainable_weights, dense.trainable_weights)
self._assertAllIs(network.non_trainable_weights,
dense.non_trainable_weights)
# test callability on Input
x_2 = input_layer_lib.Input(shape=(32,))
y_2 = network(x_2)
self.assertEqual(y_2.shape.as_list(), [None, 2])
# test callability on regular tensor
x_2 = array_ops.placeholder(dtype='float32', shape=(None, 32))
y_2 = network(x_2)
self.assertEqual(y_2.shape.as_list(), [None, 2])
# test network `trainable` attribute
network.trainable = False
self._assertAllIs(network.weights, dense.weights)
self.assertEqual(network.trainable_weights, [])
self._assertAllIs(network.non_trainable_weights,
dense.trainable_weights + dense.non_trainable_weights)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_trainable_weights(self):
a = layers.Input(shape=(2,))
b = layers.Dense(1)(a)
model = training_lib.Model(a, b)
weights = model.weights
self._assertAllIs(model.trainable_weights, weights)
self.assertListEqual(model.non_trainable_weights, [])
model.trainable = False
self.assertListEqual(model.trainable_weights, [])
self._assertAllIs(model.non_trainable_weights, weights)
model.trainable = True
self._assertAllIs(model.trainable_weights, weights)
self.assertListEqual(model.non_trainable_weights, [])
model.layers[1].trainable = False
self.assertListEqual(model.trainable_weights, [])
self._assertAllIs(model.non_trainable_weights, weights)
# sequential model
model = sequential.Sequential()
model.add(layers.Dense(1, input_dim=2))
weights = model.weights
self._assertAllIs(model.trainable_weights, weights)
self.assertListEqual(model.non_trainable_weights, [])
model.trainable = False
self.assertListEqual(model.trainable_weights, [])
self._assertAllIs(model.non_trainable_weights, weights)
model.trainable = True
self._assertAllIs(model.trainable_weights, weights)
self.assertListEqual(model.non_trainable_weights, [])
model.layers[0].trainable = False
self.assertListEqual(model.trainable_weights, [])
self._assertAllIs(model.non_trainable_weights, weights)
def test_layer_call_arguments(self):
with ops.Graph().as_default():
# Test the ability to pass and serialize arguments to `call`.
inp = layers.Input(shape=(2,))
x = layers.Dense(3)(inp)
x = layers.Dropout(0.5)(x, training=True)
model = training_lib.Model(inp, x)
# Would be `dropout/cond/Merge` by default
self.assertIn('dropout', model.output.op.name)
# Test that argument is kept when applying the model
inp2 = layers.Input(shape=(2,))
out2 = model(inp2)
self.assertIn('dropout', out2.op.name)
# Test that argument is kept after loading a model
config = model.get_config()
model = training_lib.Model.from_config(config)
self.assertIn('dropout', model.output.op.name)
def test_node_construction(self):
# test basics
a = layers.Input(shape=(32,), name='input_a')
b = layers.Input(shape=(32,), name='input_b')
with self.assertRaises(ValueError):
_ = layers.Input(shape=(32,), batch_shape=(10, 32))
with self.assertRaises(ValueError):
_ = layers.Input(shape=(32,), unknown_kwarg=None)
self.assertListEqual(a.shape.as_list(), [None, 32])
a_layer, a_node_index, a_tensor_index = a._keras_history
b_layer, _, _ = b._keras_history
self.assertEqual(len(a_layer._inbound_nodes), 1)
self.assertEqual(a_tensor_index, 0)
node = a_layer._inbound_nodes[a_node_index]
self.assertEqual(node.outbound_layer, a_layer)
self.assertListEqual(node.inbound_layers, [])
self.assertListEqual(node.input_tensors, [a])
self.assertListEqual(node.input_shapes, [(None, 32)])
self.assertListEqual(node.output_tensors, [a])
self.assertListEqual(node.output_shapes, [(None, 32)])
dense = layers.Dense(16, name='dense_1')
a_2 = dense(a)
b_2 = dense(b)
self.assertEqual(len(dense._inbound_nodes), 2)
self.assertEqual(len(dense._outbound_nodes), 0)
self.assertEqual(dense._inbound_nodes[0].inbound_layers, a_layer)
self.assertEqual(dense._inbound_nodes[0].outbound_layer, dense)
self.assertEqual(dense._inbound_nodes[1].inbound_layers, b_layer)
self.assertEqual(dense._inbound_nodes[1].outbound_layer, dense)
self.assertIs(dense._inbound_nodes[0].input_tensors, a)
self.assertIs(dense._inbound_nodes[1].input_tensors, b)
# test layer properties
test_layer = layers.Dense(16, name='test_layer')
a_test = test_layer(a)
self.assertListEqual(test_layer.kernel.shape.as_list(), [32, 16])
self.assertIs(test_layer.input, a)
self.assertIs(test_layer.output, a_test)
self.assertEqual(test_layer.input_shape, (None, 32))
self.assertEqual(test_layer.output_shape, (None, 16))
self.assertIs(dense.get_input_at(0), a)
self.assertIs(dense.get_input_at(1), b)
self.assertIs(dense.get_output_at(0), a_2)
self.assertIs(dense.get_output_at(1), b_2)
self.assertEqual(dense.get_input_shape_at(0), (None, 32))
self.assertEqual(dense.get_input_shape_at(1), (None, 32))
self.assertEqual(dense.get_output_shape_at(0), (None, 16))
self.assertEqual(dense.get_output_shape_at(1), (None, 16))
self.assertEqual(dense.get_input_mask_at(0), None)
self.assertEqual(dense.get_input_mask_at(1), None)
self.assertEqual(dense.get_output_mask_at(0), None)
self.assertEqual(dense.get_output_mask_at(1), None)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_multi_input_layer(self):
with self.cached_session():
# test multi-input layer
a = layers.Input(shape=(32,), name='input_a')
b = layers.Input(shape=(32,), name='input_b')
dense = layers.Dense(16, name='dense_1')
a_2 = dense(a)
b_2 = dense(b)
merged = layers.concatenate([a_2, b_2], name='merge')
self.assertListEqual(merged.shape.as_list(), [None, 16 * 2])
merge_layer, merge_node_index, merge_tensor_index = merged._keras_history
self.assertEqual(merge_node_index, 0)
self.assertEqual(merge_tensor_index, 0)
self.assertEqual(len(merge_layer._inbound_nodes), 1)
self.assertEqual(len(merge_layer._outbound_nodes), 0)
self.assertEqual(len(merge_layer._inbound_nodes[0].input_tensors), 2)
self.assertEqual(len(merge_layer._inbound_nodes[0].inbound_layers), 2)
c = layers.Dense(64, name='dense_2')(merged)
d = layers.Dense(5, name='dense_3')(c)
model = training_lib.Model(inputs=[a, b], outputs=[c, d], name='model')
self.assertEqual(len(model.layers), 6)
output_shapes = model.compute_output_shape([(None, 32), (None, 32)])
self.assertListEqual(output_shapes[0].as_list(), [None, 64])
self.assertListEqual(output_shapes[1].as_list(), [None, 5])
self.assertListEqual(
model.compute_mask([a, b], [None, None]), [None, None])
# we don't check names of first 2 layers (inputs) because
# ordering of same-level layers is not fixed
self.assertListEqual([l.name for l in model.layers][2:],
['dense_1', 'merge', 'dense_2', 'dense_3'])
self.assertListEqual([l.name for l in model._input_layers],
['input_a', 'input_b'])
self.assertListEqual([l.name for l in model._output_layers],
['dense_2', 'dense_3'])
# actually run model
fn = backend.function(model.inputs, model.outputs)
input_a_np = np.random.random((10, 32))
input_b_np = np.random.random((10, 32))
fn_outputs = fn([input_a_np, input_b_np])
self.assertListEqual([x.shape for x in fn_outputs], [(10, 64), (10, 5)])
# test get_source_inputs
self._assertAllIs(layer_utils.get_source_inputs(c), [a, b])
# serialization / deserialization
json_config = model.to_json()
recreated_model = models.model_from_json(json_config)
recreated_model.compile('rmsprop', 'mse')
self.assertListEqual([l.name for l in recreated_model.layers][2:],
['dense_1', 'merge', 'dense_2', 'dense_3'])
self.assertListEqual([l.name for l in recreated_model._input_layers],
['input_a', 'input_b'])
self.assertListEqual([l.name for l in recreated_model._output_layers],
['dense_2', 'dense_3'])
fn = backend.function(recreated_model.inputs, recreated_model.outputs)
input_a_np = np.random.random((10, 32))
input_b_np = np.random.random((10, 32))
fn_outputs = fn([input_a_np, input_b_np])
self.assertListEqual([x.shape for x in fn_outputs], [(10, 64), (10, 5)])
def test_multi_output_layer_output_names(self):
inp = layers.Input(name='inp', shape=(None,), dtype=dtypes.float32)
class _MultiOutput(layers.Layer):
def call(self, x):
return x + 1., x + 2.
out = _MultiOutput(name='out')(inp)
model = training_lib.Model(inp, out)
self.assertEqual(['out', 'out_1'], model.output_names)
self.assertAllClose([2., 3.], model(1.))
def test_recursion(self):
with ops.Graph().as_default(), self.cached_session():
a = layers.Input(shape=(32,), name='input_a')
b = layers.Input(shape=(32,), name='input_b')
dense = layers.Dense(16, name='dense_1')
a_2 = dense(a)
b_2 = dense(b)
merged = layers.concatenate([a_2, b_2], name='merge')
c = layers.Dense(64, name='dense_2')(merged)
d = layers.Dense(5, name='dense_3')(c)
model = training_lib.Model(inputs=[a, b], outputs=[c, d], name='model')
e = layers.Input(shape=(32,), name='input_e')
f = layers.Input(shape=(32,), name='input_f')
self.assertEqual(len(model.inputs), 2)
g, h = model([e, f])
self.assertEqual(len(model.inputs), 2)
self.assertEqual(g.name, 'model/dense_2/BiasAdd:0')
self.assertListEqual(g.shape.as_list(), c.shape.as_list())
self.assertListEqual(h.shape.as_list(), d.shape.as_list())
# test separate manipulation of different layer outputs
i = layers.Dense(7, name='dense_4')(h)
final_model = training_lib.Model(
inputs=[e, f], outputs=[i, g], name='final')
self.assertEqual(len(final_model.inputs), 2)
self.assertEqual(len(final_model.outputs), 2)
self.assertEqual(len(final_model.layers), 4)
# we don't check names of first 2 layers (inputs) because
# ordering of same-level layers is not fixed
self.assertListEqual([layer.name for layer in final_model.layers][2:],
['model', 'dense_4'])
self.assertListEqual(
model.compute_mask([e, f], [None, None]), [None, None])
self.assertListEqual(
final_model.compute_output_shape([(10, 32), (10, 32)]), [(10, 7),
(10, 64)])
# run recursive model
fn = backend.function(final_model.inputs, final_model.outputs)
input_a_np = np.random.random((10, 32))
input_b_np = np.random.random((10, 32))
fn_outputs = fn([input_a_np, input_b_np])
self.assertListEqual([x.shape for x in fn_outputs], [(10, 7), (10, 64)])
# test serialization
model_config = final_model.get_config()
recreated_model = models.Model.from_config(model_config)
fn = backend.function(recreated_model.inputs, recreated_model.outputs)
input_a_np = np.random.random((10, 32))
input_b_np = np.random.random((10, 32))
fn_outputs = fn([input_a_np, input_b_np])
self.assertListEqual([x.shape for x in fn_outputs], [(10, 7), (10, 64)])
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_multi_input_multi_output_recursion(self):
with self.cached_session():
# test multi-input multi-output
a = layers.Input(shape=(32,), name='input_a')
b = layers.Input(shape=(32,), name='input_b')
dense = layers.Dense(16, name='dense_1')
a_2 = dense(a)
b_2 = dense(b)
merged = layers.concatenate([a_2, b_2], name='merge')
c = layers.Dense(64, name='dense_2')(merged)
d = layers.Dense(5, name='dense_3')(c)
model = training_lib.Model(inputs=[a, b], outputs=[c, d], name='model')
j = layers.Input(shape=(32,), name='input_j')
k = layers.Input(shape=(32,), name='input_k')
_, n = model([j, k])
o = layers.Input(shape=(32,), name='input_o')
p = layers.Input(shape=(32,), name='input_p')
q, _ = model([o, p])
self.assertListEqual(n.shape.as_list(), [None, 5])
self.assertListEqual(q.shape.as_list(), [None, 64])
s = layers.concatenate([n, q], name='merge_nq')
self.assertListEqual(s.shape.as_list(), [None, 64 + 5])
# test with single output as 1-elem list
multi_io_model = training_lib.Model([j, k, o, p], [s])
fn = backend.function(multi_io_model.inputs, multi_io_model.outputs)
fn_outputs = fn([
np.random.random((10, 32)), np.random.random((10, 32)),
np.random.random((10, 32)), np.random.random((10, 32))
])
self.assertListEqual([x.shape for x in fn_outputs], [(10, 69)])
# test with single output as tensor
multi_io_model = training_lib.Model([j, k, o, p], s)
fn = backend.function(multi_io_model.inputs, multi_io_model.outputs)
fn_outputs = fn([
np.random.random((10, 32)), np.random.random((10, 32)),
np.random.random((10, 32)), np.random.random((10, 32))
])
# note that the output of the function will still be a 1-elem list
self.assertListEqual([x.shape for x in fn_outputs], [(10, 69)])
# test serialization
model_config = multi_io_model.get_config()
recreated_model = models.Model.from_config(model_config)
fn = backend.function(recreated_model.inputs, recreated_model.outputs)
fn_outputs = fn([
np.random.random((10, 32)), np.random.random((10, 32)),
np.random.random((10, 32)), np.random.random((10, 32))
])
# note that the output of the function will still be a 1-elem list
self.assertListEqual([x.shape for x in fn_outputs], [(10, 69)])
config = model.get_config()
models.Model.from_config(config)
model.summary()
json_str = model.to_json()
models.model_from_json(json_str)
if yaml is not None:
yaml_str = model.to_yaml()
models.model_from_yaml(yaml_str)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_invalid_graphs(self):
a = layers.Input(shape=(32,), name='input_a')
b = layers.Input(shape=(32,), name='input_b')
dense = layers.Dense(16, name='dense_1')
a_2 = dense(a)
b_2 = dense(b)
merged = layers.concatenate([a_2, b_2], name='merge')
c = layers.Dense(64, name='dense_2')(merged)
d = layers.Dense(5, name='dense_3')(c)
model = training_lib.Model(inputs=[a, b], outputs=[c, d], name='model')
# input is not an Input tensor
j = layers.Input(shape=(32,), name='input_j')
j = layers.Dense(32)(j)
k = layers.Input(shape=(32,), name='input_k')
m, n = model([j, k])
with self.assertRaises(Exception):
training_lib.Model([j, k], [m, n])
# disconnected graph
j = layers.Input(shape=(32,), name='input_j')
k = layers.Input(shape=(32,), name='input_k')
m, n = model([j, k])
with self.assertRaises(Exception):
training_lib.Model([j], [m, n])
# redundant outputs
j = layers.Input(shape=(32,), name='input_j')
k = layers.Input(shape=(32,), name='input_k')
m, n = model([j, k])
training_lib.Model([j, k], [m, n, n])
# redundant inputs
j = layers.Input(shape=(32,), name='input_j')
k = layers.Input(shape=(32,), name='input_k')
m, n = model([j, k])
with self.assertRaises(Exception):
training_lib.Model([j, k, j], [m, n])
# i have not idea what I'm doing: garbage as inputs/outputs
j = layers.Input(shape=(32,), name='input_j')
k = layers.Input(shape=(32,), name='input_k')
m, n = model([j, k])
with self.assertRaises(Exception):
training_lib.Model([j, k], [m, n, 0])
def test_raw_tf_compatibility(self):
with ops.Graph().as_default():
# test calling layers/models on TF tensors
a = layers.Input(shape=(32,), name='input_a')
b = layers.Input(shape=(32,), name='input_b')
dense = layers.Dense(16, name='dense_1')
a_2 = dense(a)
b_2 = dense(b)
merged = layers.concatenate([a_2, b_2], name='merge')
c = layers.Dense(64, name='dense_2')(merged)
d = layers.Dense(5, name='dense_3')(c)
model = training_lib.Model(inputs=[a, b], outputs=[c, d], name='model')
j = layers.Input(shape=(32,), name='input_j')
k = layers.Input(shape=(32,), name='input_k')
self.assertEqual(len(model.inputs), 2)
m, n = model([j, k])
self.assertEqual(len(model.inputs), 2)
tf_model = training_lib.Model([j, k], [m, n])
j_tf = array_ops.placeholder(dtype=dtypes.float32, shape=(None, 32))
k_tf = array_ops.placeholder(dtype=dtypes.float32, shape=(None, 32))
m_tf, n_tf = tf_model([j_tf, k_tf])
self.assertListEqual(m_tf.shape.as_list(), [None, 64])
self.assertListEqual(n_tf.shape.as_list(), [None, 5])
# test merge
layers.concatenate([j_tf, k_tf], axis=1)
layers.add([j_tf, k_tf])
# test tensor input
x = array_ops.placeholder(shape=(None, 2), dtype=dtypes.float32)
layers.InputLayer(input_tensor=x)
x = layers.Input(tensor=x)
layers.Dense(2)(x)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_basic_masking(self):
a = layers.Input(shape=(10, 32), name='input_a')
b = layers.Masking()(a)
model = training_lib.Model(a, b)
self.assertEqual(model.output_mask.shape.as_list(), [None, 10])
def testMaskingSingleInput(self):
class MaskedLayer(layers.Layer):
def call(self, inputs, mask=None):
if mask is not None:
return inputs * mask
return inputs
def compute_mask(self, inputs, mask=None):
return array_ops.ones_like(inputs)
if context.executing_eagerly():
a = constant_op.constant([2] * 32)
mask = constant_op.constant([0, 1] * 16)
a._keras_mask = mask
b = MaskedLayer().apply(a)
self.assertTrue(hasattr(b, '_keras_mask'))
self.assertAllEqual(
self.evaluate(array_ops.ones_like(mask)),
self.evaluate(getattr(b, '_keras_mask')))
self.assertAllEqual(self.evaluate(a * mask), self.evaluate(b))
else:
x = input_layer_lib.Input(shape=(32,))
y = MaskedLayer()(x) # pylint: disable=not-callable
network = functional.Functional(x, y)
# test callability on Input
x_2 = input_layer_lib.Input(shape=(32,))
y_2 = network(x_2)
self.assertEqual(y_2.shape.as_list(), [None, 32])
# test callability on regular tensor
x_2 = array_ops.placeholder(dtype='float32', shape=(None, 32))
y_2 = network(x_2)
self.assertEqual(y_2.shape.as_list(), [None, 32])
def test_activity_regularization_with_model_composition(self):
def reg(x):
return math_ops.reduce_sum(x)
net_a_input = input_layer_lib.Input((2,))
net_a = net_a_input
net_a = layers.Dense(
2, kernel_initializer='ones', use_bias=False, activity_regularizer=reg)(
net_a)
model_a = training_lib.Model([net_a_input], [net_a])
net_b_input = input_layer_lib.Input((2,))
net_b = model_a(net_b_input)
model_b = training_lib.Model([net_b_input], [net_b])
model_b.compile(optimizer='sgd', loss=None)
x = np.ones((1, 2))
loss = model_b.evaluate(x)
self.assertEqual(loss, 4.)
@combinations.generate(combinations.keras_mode_combinations())
def test_layer_sharing_at_heterogenous_depth(self):
x_val = np.random.random((10, 5))
x = input_layer_lib.Input(shape=(5,))
a = layers.Dense(5, name='A')
b = layers.Dense(5, name='B')
output = a(b(a(b(x))))
m = training_lib.Model(x, output)
m.run_eagerly = testing_utils.should_run_eagerly()
output_val = m.predict(x_val)
config = m.get_config()
weights = m.get_weights()
m2 = models.Model.from_config(config)
m2.set_weights(weights)
output_val_2 = m2.predict(x_val)
self.assertAllClose(output_val, output_val_2, atol=1e-6)
@combinations.generate(combinations.keras_mode_combinations())
def test_layer_sharing_at_heterogenous_depth_with_concat(self):
input_shape = (16, 9, 3)
input_layer = input_layer_lib.Input(shape=input_shape)
a = layers.Dense(3, name='dense_A')
b = layers.Dense(3, name='dense_B')
c = layers.Dense(3, name='dense_C')
x1 = b(a(input_layer))
x2 = a(c(input_layer))
output = layers.concatenate([x1, x2])
m = training_lib.Model(inputs=input_layer, outputs=output)
m.run_eagerly = testing_utils.should_run_eagerly()
x_val = np.random.random((10, 16, 9, 3))
output_val = m.predict(x_val)
config = m.get_config()
weights = m.get_weights()
m2 = models.Model.from_config(config)
m2.set_weights(weights)
output_val_2 = m2.predict(x_val)
self.assertAllClose(output_val, output_val_2, atol=1e-6)
@combinations.generate(combinations.keras_mode_combinations())
def test_explicit_training_argument(self):
a = layers.Input(shape=(2,))
b = layers.Dropout(0.5)(a)
base_model = training_lib.Model(a, b)
a = layers.Input(shape=(2,))
b = base_model(a, training=False)
model = training_lib.Model(a, b)
x = np.ones((100, 2))
y = np.ones((100, 2))
model.compile(
optimizer='sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly())
loss = model.train_on_batch(x, y)
self.assertEqual(loss, 0) # In inference mode, output is equal to input.
a = layers.Input(shape=(2,))
b = base_model(a, training=True)
model = training_lib.Model(a, b)
preds = model.predict(x)
self.assertEqual(np.min(preds), 0.) # At least one unit was dropped.
@combinations.generate(combinations.keras_mode_combinations())
def test_mask_derived_from_keras_layer(self):
inputs = input_layer_lib.Input((5, 10))
mask = input_layer_lib.Input((5,))
outputs = layers.RNN(layers.LSTMCell(100))(inputs, mask=mask)
model = training_lib.Model([inputs, mask], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[np.ones((10, 5, 10)), np.zeros((10, 5))],
y=np.zeros((10, 100)),
batch_size=2)
# All data is masked, returned values are 0's.
self.assertEqual(history.history['loss'][0], 0.0)
history = model.fit(
x=[np.ones((10, 5, 10)), np.ones((10, 5))],
y=np.zeros((10, 100)),
batch_size=2)
# Data is not masked, returned values are random.
self.assertGreater(history.history['loss'][0], 0.0)
model = training_lib.Model.from_config(model.get_config())
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[np.ones((10, 5, 10)), np.zeros((10, 5))],
y=np.zeros((10, 100)),
batch_size=2)
# All data is masked, returned values are 0's.
self.assertEqual(history.history['loss'][0], 0.0)
history = model.fit(
x=[np.ones((10, 5, 10)), np.ones((10, 5))],
y=np.zeros((10, 100)),
batch_size=2)
# Data is not masked, returned values are random.
self.assertGreater(history.history['loss'][0], 0.0)
@combinations.generate(combinations.keras_mode_combinations())
def test_call_arg_derived_from_keras_layer(self):
class MyAdd(layers.Layer):
def call(self, x1, x2):
return x1 + x2
input1 = input_layer_lib.Input(10)
input2 = input_layer_lib.Input(10)
outputs = MyAdd()(input1, input2)
model = training_lib.Model([input1, input2], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10)), 7 * np.ones((10, 10))],
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
# Check serialization.
model = training_lib.Model.from_config(
model.get_config(), custom_objects={'MyAdd': MyAdd})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10)), 7 * np.ones((10, 10))],
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
@combinations.generate(combinations.times(
combinations.keras_mode_combinations(mode='eager'),
combinations.combine(use_keras_tensors=False)))
def test_only_some_in_first_arg_derived_from_keras_layer(self):
class MyAddAll(layers.Layer):
def call(self, inputs):
x = inputs[0]
for inp in inputs[1:]:
if inp is not None:
x = x + inp
return x
input1 = input_layer_lib.Input(10)
input2 = input_layer_lib.Input(10)
layer = MyAddAll()
with self.assertRaisesRegexp(ValueError, 'construct a functional'):
layer([0.0, input1, None, input2, None])
@combinations.generate(combinations.times(
combinations.keras_mode_combinations(mode='eager'),
combinations.combine(use_keras_tensors=True)))
def test_only_some_in_first_arg_derived_from_keras_layer_keras_tensors(self):
# This functionality is unsupported in v1 graphs
class MyAddAll(layers.Layer):
def call(self, inputs):
x = inputs[0]
for inp in inputs[1:]:
if inp is not None:
x = x + inp
return x
input1 = input_layer_lib.Input(10)
input2 = input_layer_lib.Input(10)
layer = MyAddAll()
outputs = layer([0.0, input1, None, input2, None])
model = training_lib.Model([input1, input2], outputs)
self.assertIn(layer, model.layers)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10)), 7 * np.ones((10, 10))],
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
# Check serialization.
model = training_lib.Model.from_config(
model.get_config(), custom_objects={'MyAddAll': MyAddAll})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10)), 7 * np.ones((10, 10))],
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
@combinations.generate(
combinations.times(
combinations.keras_mode_combinations(),
combinations.combine(share_already_used_layer=[True, False])))
def test_call_kwarg_derived_from_keras_layer(self, share_already_used_layer):
class MaybeAdd(layers.Layer):
def call(self, x1, x2=None):
if x2 is not None:
return x1 + x2
return x1
class IdentityLayer(layers.Layer):
def call(self, x):
return x
input1 = input_layer_lib.Input(10)
input2 = input_layer_lib.Input(10)
identity_layer = IdentityLayer()
if share_already_used_layer:
# We have had model serialization/deserialization break in the past:
# when a layer was previously used to construct other functional models
# and had a non-empty list of inbound nodes before being used to define
# the model being serialized/deserialized.
# (The serialization/deserialization was not correctly adjusting
# the node_index serialization/deserialization).
# So, we explicitly test this case.
training_lib.Model([input1], identity_layer(input1))
outputs = MaybeAdd()(input1, x2=identity_layer(input2))
model = training_lib.Model([input1, input2], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10)), 7 * np.ones((10, 10))],
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
model = training_lib.Model.from_config(
model.get_config(),
custom_objects={
'MaybeAdd': MaybeAdd,
'IdentityLayer': IdentityLayer
})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10)), 7 * np.ones((10, 10))],
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
@combinations.generate(combinations.keras_mode_combinations())
def test_call_kwarg_dtype_serialization(self):
class Double(layers.Layer):
def call(self, x1, dtype=None):
return math_ops.cast(x1 + x1, dtype=dtype)
input1 = input_layer_lib.Input(10)
outputs = Double()(input1, dtype=dtypes.float16)
model = training_lib.Model([input1], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10))],
y=6 * np.ones((10, 10)),
batch_size=2)
# Check that input was correctly doubled.
self.assertEqual(history.history['loss'][0], 0.0)
# Check the output dtype
self.assertEqual(model(array_ops.ones((3, 10))).dtype, dtypes.float16)
model = training_lib.Model.from_config(
model.get_config(), custom_objects={'Double': Double})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10))],
y=6 * np.ones((10, 10)),
batch_size=2)
# Check that input was correctly doubled.
self.assertEqual(history.history['loss'][0], 0.0)
# Check the output dtype
self.assertEqual(model(array_ops.ones((3, 10))).dtype, dtypes.float16)
@combinations.generate(combinations.keras_mode_combinations())
def test_call_kwarg_nonserializable(self):
class Double(layers.Layer):
def call(self, x1, kwarg=None):
return x1 + x1
class NonSerializable(object):
def __init__(self, foo=None):
self.foo = foo
input1 = input_layer_lib.Input(10)
outputs = Double()(input1, kwarg=NonSerializable())
model = training_lib.Model([input1], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[3 * np.ones((10, 10))],
y=6 * np.ones((10, 10)),
batch_size=2)
# Check that input was correctly doubled.
self.assertEqual(history.history['loss'][0], 0.0)
with self.assertRaisesRegex(
TypeError, 'Layer double was passed non-JSON-serializable arguments.'):
model.get_config()
@combinations.generate(
combinations.times(
combinations.keras_mode_combinations(),
combinations.keras_tensor_combinations(),
combinations.combine(share_already_used_layer=[True, False])))
def test_call_kwarg_derived_from_keras_layer_and_first_arg_is_constant(
self, share_already_used_layer):
class IdentityLayer(layers.Layer):
def call(self, x):
return x
class MaybeAdd(layers.Layer):
def call(self, x1, x2=None):
if x2 is not None:
return x1 + x2
return x1
input2 = input_layer_lib.Input(10)
identity_layer = IdentityLayer()
if share_already_used_layer:
# We have had model serialization/deserialization break in the past:
# when a layer was previously used to construct other functional models
# and had a non-empty list of inbound nodes before being used to define
# the model being serialized/deserialized.
# (The serialization/deserialization was not correctly adjusting
# the node_index serialization/deserialization).
# So, we explicitly test this case.
training_lib.Model([input2], identity_layer(input2))
outputs = MaybeAdd()(3., x2=identity_layer(input2))
model = training_lib.Model([input2], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=7 * np.ones((10, 10)),
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
model = training_lib.Model.from_config(
model.get_config(),
custom_objects={
'MaybeAdd': MaybeAdd,
'IdentityLayer': IdentityLayer
})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=7 * np.ones((10, 10)),
y=10 * np.ones((10, 10)),
batch_size=2)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
@combinations.generate(combinations.keras_mode_combinations())
def test_composite_call_kwarg_derived_from_keras_layer(self):
# Create a test layer that accepts composite tensor inputs.
class MaybeAdd(layers.Layer):
def call(self, x1, x2=None):
# We need to convert this to a tensor for loss calculations -
# losses don't play nicely with ragged tensors yet.
if x2 is not None:
return (x1 + x2).to_tensor(default_value=0)
return x1.to_tensor(default_value=0)
input1 = input_layer_lib.Input((None,), ragged=True)
input2 = input_layer_lib.Input((None,), ragged=True)
outputs = MaybeAdd()(input1, x2=input2)
model = training_lib.Model([input1, input2], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
input_data = [
ragged_factory_ops.constant([[3.0, 3.0], [3.0, 3.0], [3.0]]),
ragged_factory_ops.constant([[7.0, 7.0], [7.0, 7.0], [7.0]])
]
expected_data = np.array([[10.0, 10.0], [10.0, 10.0], [10.0, 0.0]])
history = model.fit(x=input_data, y=expected_data)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
model = training_lib.Model.from_config(
model.get_config(), custom_objects={'MaybeAdd': MaybeAdd})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(x=input_data, y=expected_data)
# Check that second input was correctly added to first.
self.assertEqual(history.history['loss'][0], 0.0)
@combinations.generate(combinations.times(
combinations.keras_mode_combinations(mode='eager'),
combinations.keras_tensor_combinations()))
def test_call_some_not_all_nested_in_first_arg_derived_from_keras_layer(self):
# This functionality is unsupported in v1 graphs
class AddAll(layers.Layer):
def call(self, x1_x2, x3):
x1, x2 = x1_x2
out = x1 + x2
if x3 is not None:
for t in x3.values():
out += t
return out
input1 = input_layer_lib.Input(10)
input2 = input_layer_lib.Input(10)
input3 = input_layer_lib.Input(10)
layer = AddAll()
outputs = layer(
[input1, 4 * array_ops.ones((1, 10))],
x3={
'a': input2,
'b': input3,
'c': 5 * array_ops.ones((1, 10))
})
model = training_lib.Model([input1, input2, input3], outputs)
self.assertIn(layer, model.layers)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[np.ones((10, 10)), 2 * np.ones((10, 10)), 3 * np.ones((10, 10))],
y=15 * np.ones((10, 10)),
batch_size=2)
# Check that all inputs were correctly added.
self.assertEqual(history.history['loss'][0], 0.0)
model = training_lib.Model.from_config(
model.get_config(), custom_objects={'AddAll': AddAll})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[np.ones((10, 10)), 2 * np.ones((10, 10)), 3 * np.ones((10, 10))],
y=15 * np.ones((10, 10)),
batch_size=2)
# Check that all inputs were correctly added.
self.assertEqual(history.history['loss'][0], 0.0)
@combinations.generate(combinations.keras_mode_combinations())
def test_call_nested_arg_derived_from_keras_layer(self):
class AddAll(layers.Layer):
def call(self, x1, x2, x3=None):
out = x1 + x2
if x3 is not None:
for t in x3.values():
out += t
return out
input1 = input_layer_lib.Input(10)
input2 = input_layer_lib.Input(10)
input3 = input_layer_lib.Input(10)
outputs = AddAll()(
input1,
4 * array_ops.ones((1, 10)),
x3={
'a': input2,
'b': input3,
'c': 5 * array_ops.ones((1, 10))
})
model = training_lib.Model([input1, input2, input3], outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[np.ones((10, 10)), 2 * np.ones((10, 10)), 3 * np.ones((10, 10))],
y=15 * np.ones((10, 10)),
batch_size=2)
# Check that all inputs were correctly added.
self.assertEqual(history.history['loss'][0], 0.0)
model = training_lib.Model.from_config(
model.get_config(), custom_objects={'AddAll': AddAll})
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(
x=[np.ones((10, 10)), 2 * np.ones((10, 10)), 3 * np.ones((10, 10))],
y=15 * np.ones((10, 10)),
batch_size=2)
# Check that all inputs were correctly added.
self.assertEqual(history.history['loss'][0], 0.0)
@combinations.generate(combinations.keras_mode_combinations())
def test_multi_output_model_with_none_masking(self):
def func(x):
return [x * 0.2, x * 0.3]
def output_shape(input_shape):
return [input_shape, input_shape]
i = layers.Input(shape=(3, 2, 1))
o = layers.Lambda(function=func, output_shape=output_shape)(i)
self.assertEqual(backend.int_shape(o[0]), (None, 3, 2, 1))
self.assertEqual(backend.int_shape(o[1]), (None, 3, 2, 1))
o = layers.add(o)
model = training_lib.Model(i, o)
model.run_eagerly = testing_utils.should_run_eagerly()
i2 = layers.Input(shape=(3, 2, 1))
o2 = model(i2)
model2 = training_lib.Model(i2, o2)
model2.run_eagerly = testing_utils.should_run_eagerly()
x = np.random.random((4, 3, 2, 1))
out = model2.predict(x)
assert out.shape == (4, 3, 2, 1)
self.assertAllClose(out, x * 0.2 + x * 0.3, atol=1e-4)
@combinations.generate(combinations.keras_mode_combinations())
def test_constant_initializer_with_numpy(self):
initializer = initializers.Constant(np.ones((3, 2)))
model = sequential.Sequential()
model.add(layers.Dense(2, input_shape=(3,), kernel_initializer=initializer))
model.add(layers.Dense(3))
model.compile(
loss='mse',
optimizer='sgd',
metrics=['acc'],
run_eagerly=testing_utils.should_run_eagerly())
json_str = model.to_json()
models.model_from_json(json_str)
if yaml is not None:
yaml_str = model.to_yaml()
models.model_from_yaml(yaml_str)
def test_subclassed_error_if_init_not_called(self):
class MyNetwork(training_lib.Model):
def __init__(self):
self._foo = [layers.Dense(10), layers.Dense(10)]
with self.assertRaisesRegex(RuntimeError, 'forgot to call'):
MyNetwork()
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_int_input_shape(self):
inputs = input_layer_lib.Input(10)
self.assertEqual([None, 10], inputs.shape.as_list())
inputs_with_batch = input_layer_lib.Input(batch_size=20, shape=5)
self.assertEqual([20, 5], inputs_with_batch.shape.as_list())
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_model_initialization(self):
# Functional model
inputs = input_layer_lib.Input(shape=(32,))
outputs = layers.Dense(4)(inputs)
with self.assertRaisesRegex(TypeError,
'Keyword argument not understood'):
model = training_lib.Model(
inputs, outputs, name='m', trainable=False, dtype='int64')
with self.assertRaisesRegex(TypeError,
'Keyword argument not understood'):
model = training_lib.Model(
inputs, outputs, name='m', trainable=False, dynamic=False)
model = training_lib.Model(inputs, outputs, name='m', trainable=False)
self.assertEqual('m', model.name)
self.assertFalse(model.trainable)
self.assertFalse(model.dynamic)
class SubclassModel(training_lib.Model):
pass
# Subclassed model
model = SubclassModel(
name='subclassed', trainable=True, dtype='int64', dynamic=True)
self.assertEqual('subclassed', model.name)
self.assertTrue(model.dynamic)
self.assertTrue(model.trainable)
w = model.add_weight('w', [], initializer=initializers.Constant(1))
self.assertEqual(dtypes.int64, w.dtype)
def test_disconnected_inputs(self):
input_tensor1 = input_layer_lib.Input(shape=[200], name='a')
input_tensor2 = input_layer_lib.Input(shape=[10], name='b')
output_tensor1 = layers.Dense(units=10)(input_tensor1)
net = functional.Functional(
inputs=[input_tensor1, input_tensor2], outputs=[output_tensor1])
net2 = functional.Functional.from_config(net.get_config())
self.assertLen(net2.inputs, 2)
self.assertEqual('a', net2.layers[0].name)
self.assertEqual('b', net2.layers[1].name)
@combinations.generate(combinations.keras_model_type_combinations())
def test_dependency_tracking(self):
model = testing_utils.get_small_mlp(1, 4, input_dim=3)
model.trackable = Checkpoint()
self.assertIn('trackable', model._unconditional_dependency_names)
self.assertEqual(model.trackable, model._lookup_dependency('trackable'))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_model_construction_in_tf_function(self):
d = {'model': None}
@def_function.function
def fn(x):
if d['model'] is None:
# Check that Functional can be built in a `tf.function`.
inputs = input_layer_lib.Input(10)
outputs = layers.Dense(1)(inputs)
model = functional.Functional(inputs, outputs)
d['model'] = model
else:
model = d['model']
return model(x)
x = array_ops.ones((10, 10))
y = fn(x)
self.assertEqual(y.shape.as_list(), [10, 1])
class DeferredModeTest(keras_parameterized.TestCase):
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testSimpleNetworkBuilding(self):
inputs = input_layer_lib.Input(shape=(32,))
if context.executing_eagerly():
self.assertEqual(inputs.dtype.name, 'float32')
self.assertEqual(inputs.shape.as_list(), [None, 32])
x = layers.Dense(2)(inputs)
if context.executing_eagerly():
self.assertEqual(x.dtype.name, 'float32')
self.assertEqual(x.shape.as_list(), [None, 2])
outputs = layers.Dense(4)(x)
network = functional.Functional(inputs, outputs)
self.assertIsInstance(network, functional.Functional)
if context.executing_eagerly():
# It should be possible to call such a network on EagerTensors.
inputs = constant_op.constant(
np.random.random((10, 32)).astype('float32'))
outputs = network(inputs)
self.assertEqual(outputs.shape.as_list(), [10, 4])
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testMultiIONetworkBuilding(self):
input_a = input_layer_lib.Input(shape=(32,))
input_b = input_layer_lib.Input(shape=(16,))
a = layers.Dense(16)(input_a)
class AddLayer(layers.Layer):
def call(self, inputs):
return inputs[0] + inputs[1]
c = AddLayer()([a, input_b]) # pylint: disable=not-callable
c = layers.Dense(2)(c)
network = functional.Functional([input_a, input_b], [a, c])
if context.executing_eagerly():
a_val = constant_op.constant(
np.random.random((10, 32)).astype('float32'))
b_val = constant_op.constant(
np.random.random((10, 16)).astype('float32'))
outputs = network([a_val, b_val])
self.assertEqual(len(outputs), 2)
self.assertEqual(outputs[0].shape.as_list(), [10, 16])
self.assertEqual(outputs[1].shape.as_list(), [10, 2])
class DefaultShapeInferenceBehaviorTest(keras_parameterized.TestCase):
def _testShapeInference(self, model, input_shape, expected_output_shape):
input_value = np.random.random(input_shape)
output_value = model.predict(input_value)
self.assertEqual(output_value.shape, expected_output_shape)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testSingleInputCase(self):
class LayerWithOneInput(layers.Layer):
def build(self, input_shape):
self.w = array_ops.ones(shape=(3, 4))
def call(self, inputs):
return backend.dot(inputs, self.w)
inputs = input_layer_lib.Input(shape=(3,))
layer = LayerWithOneInput()
if context.executing_eagerly():
self.assertEqual(
layer.compute_output_shape((None, 3)).as_list(), [None, 4])
# As a side-effect, compute_output_shape builds the layer.
self.assertTrue(layer.built)
# We can still query the layer's compute_output_shape with compatible
# input shapes.
self.assertEqual(
layer.compute_output_shape((6, 3)).as_list(), [6, 4])
outputs = layer(inputs)
model = training_lib.Model(inputs, outputs)
self._testShapeInference(model, (2, 3), (2, 4))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testMultiInputOutputCase(self):
class MultiInputOutputLayer(layers.Layer):
def build(self, input_shape):
self.w = array_ops.ones(shape=(3, 4))
def call(self, inputs):
a = backend.dot(inputs[0], self.w)
b = a + inputs[1]
return [a, b]
input_a = input_layer_lib.Input(shape=(3,))
input_b = input_layer_lib.Input(shape=(4,))
output_a, output_b = MultiInputOutputLayer()([input_a, input_b])
model = training_lib.Model([input_a, input_b], [output_a, output_b])
output_a_val, output_b_val = model.predict(
[np.random.random((2, 3)), np.random.random((2, 4))])
self.assertEqual(output_a_val.shape, (2, 4))
self.assertEqual(output_b_val.shape, (2, 4))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testTrainingArgument(self):
class LayerWithTrainingArg(layers.Layer):
def build(self, input_shape):
self.w = array_ops.ones(shape=(3, 4))
def call(self, inputs, training):
return backend.dot(inputs, self.w)
inputs = input_layer_lib.Input(shape=(3,))
outputs = LayerWithTrainingArg()(inputs, training=False)
model = training_lib.Model(inputs, outputs)
self._testShapeInference(model, (2, 3), (2, 4))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testNoneInShape(self):
class Model(training_lib.Model):
def __init__(self):
super(Model, self).__init__()
self.conv1 = layers.Conv2D(8, 3)
self.pool = layers.GlobalAveragePooling2D()
self.fc = layers.Dense(3)
def call(self, x):
x = self.conv1(x)
x = self.pool(x)
x = self.fc(x)
return x
model = Model()
model.build(tensor_shape.TensorShape((None, None, None, 1)))
self.assertTrue(model.built, 'Model should be built')
self.assertTrue(model.weights,
'Model should have its weights created as it '
'has been built')
sample_input = array_ops.ones((1, 10, 10, 1))
output = model(sample_input)
self.assertEqual(output.shape, (1, 3))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testNoneInShapeWithCompoundModel(self):
class BasicBlock(training_lib.Model):
def __init__(self):
super(BasicBlock, self).__init__()
self.conv1 = layers.Conv2D(8, 3)
self.pool = layers.GlobalAveragePooling2D()
self.dense = layers.Dense(3)
def call(self, x):
x = self.conv1(x)
x = self.pool(x)
x = self.dense(x)
return x
class CompoundModel(training_lib.Model):
def __init__(self):
super(CompoundModel, self).__init__()
self.block = BasicBlock()
def call(self, x):
x = self.block(x) # pylint: disable=not-callable
return x
model = CompoundModel()
model.build(tensor_shape.TensorShape((None, None, None, 1)))
self.assertTrue(model.built, 'Model should be built')
self.assertTrue(model.weights,
'Model should have its weights created as it '
'has been built')
sample_input = array_ops.ones((1, 10, 10, 1))
output = model(sample_input) # pylint: disable=not-callable
self.assertEqual(output.shape, (1, 3))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def testNoneInShapeWithFunctionalAPI(self):
class BasicBlock(training_lib.Model):
# Inheriting from layers.Layer since we are calling this layer
# inside a model created using functional API.
def __init__(self):
super(BasicBlock, self).__init__()
self.conv1 = layers.Conv2D(8, 3)
def call(self, x):
x = self.conv1(x)
return x
input_layer = layers.Input(shape=(None, None, 1))
x = BasicBlock()(input_layer)
x = layers.GlobalAveragePooling2D()(x)
output_layer = layers.Dense(3)(x)
model = training_lib.Model(inputs=input_layer, outputs=output_layer)
model.build(tensor_shape.TensorShape((None, None, None, 1)))
self.assertTrue(model.built, 'Model should be built')
self.assertTrue(model.weights,
'Model should have its weights created as it '
'has been built')
sample_input = array_ops.ones((1, 10, 10, 1))
output = model(sample_input)
self.assertEqual(output.shape, (1, 3))
@combinations.generate(combinations.keras_mode_combinations())
def test_sequential_as_downstream_of_masking_layer(self):
inputs = layers.Input(shape=(3, 4))
x = layers.Masking(mask_value=0., input_shape=(3, 4))(inputs)
s = sequential.Sequential()
s.add(layers.Dense(5, input_shape=(4,)))
x = layers.wrappers.TimeDistributed(s)(x)
model = training_lib.Model(inputs=inputs, outputs=x)
model.compile(
optimizer='rmsprop',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly())
model_input = np.random.randint(
low=1, high=5, size=(10, 3, 4)).astype('float32')
for i in range(4):
model_input[i, i:, :] = 0.
model.fit(model_input,
np.random.random((10, 3, 5)), epochs=1, batch_size=6)
if not context.executing_eagerly():
# Note: this doesn't work in eager due to DeferredTensor/ops compatibility
# issue.
mask_outputs = [model.layers[1].compute_mask(model.layers[1].input)]
mask_outputs += [model.layers[2].compute_mask(
model.layers[2].input, mask_outputs[-1])]
func = backend.function([model.input], mask_outputs)
mask_outputs_val = func([model_input])
self.assertAllClose(mask_outputs_val[0], np.any(model_input, axis=-1))
self.assertAllClose(mask_outputs_val[1], np.any(model_input, axis=-1))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_external_keras_serialization_compat_input_layers(self):
inputs = input_layer_lib.Input(shape=(10,))
outputs = layers.Dense(1)(inputs)
model = training_lib.Model(inputs, outputs)
config = model.get_config()
# Checks that single inputs and outputs are still saved as 1-element lists.
# Saving as 1-element lists or not is equivalent in TF Keras, but only the
# 1-element list format is supported in TF.js and keras-team/Keras.
self.assertLen(config['input_layers'], 1)
self.assertLen(config['output_layers'], 1)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_external_keras_serialization_compat_inbound_nodes(self):
# Check single Tensor input.
inputs = input_layer_lib.Input(shape=(10,), name='in')
outputs = layers.Dense(1)(inputs)
model = training_lib.Model(inputs, outputs)
config = model.get_config()
self.assertEqual(config['layers'][1]['inbound_nodes'], [[['in', 0, 0, {}]]])
# Check multiple Tensor input.
inputs1 = input_layer_lib.Input(shape=(10,), name='in1')
inputs2 = input_layer_lib.Input(shape=(10,), name='in2')
outputs = layers.Add()([inputs1, inputs2])
model = training_lib.Model([inputs1, inputs2], outputs)
config = model.get_config()
self.assertEqual(config['layers'][2]['inbound_nodes'],
[[['in1', 0, 0, {}], ['in2', 0, 0, {}]]])
@combinations.generate(combinations.combine(mode=['eager']))
def test_dict_inputs_tensors(self):
# Note that this test is running with v2 eager only, since the v1
# will behave differently wrt to dict input for training.
inputs = {
'sentence2': input_layer_lib.Input(
shape=(), name='a', dtype=dtypes.string),
'sentence1': input_layer_lib.Input(
shape=(), name='b', dtype=dtypes.string),
}
strlen = layers.Lambda(string_ops.string_length_v2)
diff = layers.Subtract()(
[strlen(inputs['sentence1']), strlen(inputs['sentence2'])])
diff = math_ops.cast(diff, dtypes.float32)
model = training_lib.Model(inputs, diff)
extra_keys = {
'sentence1': constant_op.constant(['brown fox', 'lazy dog']),
'sentence2': constant_op.constant(['owl', 'cheeky cat']),
'label': constant_op.constant([0, 1]),
}
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
model(extra_keys)
self.assertIn('ignored by the model', str(w[-1].message))
model.compile('sgd', 'mse')
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
model.fit(extra_keys, y=constant_op.constant([0, 1]), steps_per_epoch=1)
self.assertIn('ignored by the model', str(w[-1].message))
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
model.evaluate(extra_keys, constant_op.constant([0, 1]))
self.assertIn('ignored by the model', str(w[-1].message))
# Make sure the model inputs are sorted with the dict keys.
self.assertEqual(model.inputs[0]._keras_history.layer.name, 'b')
self.assertEqual(model.inputs[1]._keras_history.layer.name, 'a')
class GraphUtilsTest(test.TestCase):
def testGetReachableFromInputs(self):
with ops.Graph().as_default(), self.cached_session():
pl_1 = array_ops.placeholder(shape=None, dtype='float32')
pl_2 = array_ops.placeholder(shape=None, dtype='float32')
pl_3 = array_ops.placeholder(shape=None, dtype='float32')
x_1 = pl_1 + pl_2
x_2 = pl_2 * 2
x_3 = pl_3 + 1
x_4 = x_1 + x_2
x_5 = x_3 * pl_1
self.assertEqual(
tf_utils.get_reachable_from_inputs([pl_1]),
{pl_1, x_1, x_4, x_5, x_1.op, x_4.op, x_5.op})
self.assertEqual(
tf_utils.get_reachable_from_inputs([pl_1, pl_2]),
{pl_1, pl_2, x_1, x_2, x_4, x_5, x_1.op, x_2.op, x_4.op, x_5.op})
self.assertEqual(
tf_utils.get_reachable_from_inputs([pl_3]),
{pl_3, x_3, x_5, x_3.op, x_5.op})
self.assertEqual(
tf_utils.get_reachable_from_inputs([x_3]), {x_3, x_5, x_5.op})
class NestedNetworkTest(keras_parameterized.TestCase):
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_nested_inputs_network(self):
inputs = {
'x1': input_layer_lib.Input(shape=(1,)),
'x2': input_layer_lib.Input(shape=(1,))
}
outputs = layers.Add()([inputs['x1'], inputs['x2']])
network = functional.Functional(inputs, outputs)
network = functional.Functional.from_config(network.get_config())
result_tensor = network({
'x1': array_ops.ones((1, 1), 'float32'),
'x2': array_ops.ones((1, 1), 'float32')
})
result = self.evaluate(result_tensor)
self.assertAllEqual(result, [[2.]])
# TODO(b/122726584): Investigate why concrete batch is flaky in some builds.
output_shape = network.compute_output_shape({
'x1': (None, 1),
'x2': (None, 1)
})
self.assertListEqual(output_shape.as_list(), [None, 1])
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_nested_outputs_network(self):
inputs = input_layer_lib.Input(shape=(1,))
outputs = {
'x+x': layers.Add()([inputs, inputs]),
'x*x': layers.Multiply()([inputs, inputs])
}
network = functional.Functional(inputs, outputs)
network = functional.Functional.from_config(network.get_config())
result_tensor = network(array_ops.ones((1, 1), 'float32'))
result = self.evaluate(result_tensor)
self.assertAllEqual(result['x+x'], [[2.]])
self.assertAllEqual(result['x*x'], [[1.]])
output_shape = network.compute_output_shape((None, 1))
self.assertListEqual(output_shape['x+x'].as_list(), [None, 1])
self.assertListEqual(output_shape['x*x'].as_list(), [None, 1])
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_nested_network_inside_network(self):
inner_inputs = {
'x1': input_layer_lib.Input(shape=(1,)),
'x2': input_layer_lib.Input(shape=(1,))
}
inner_outputs = {
'x1+x2': layers.Add()([inner_inputs['x1'], inner_inputs['x2']]),
'x1*x2': layers.Multiply()([inner_inputs['x1'], inner_inputs['x2']])
}
inner_network = functional.Functional(
inner_inputs, inner_outputs)
inputs = [
input_layer_lib.Input(shape=(1,)),
input_layer_lib.Input(shape=(1,))
]
middle = inner_network({'x1': inputs[0], 'x2': inputs[1]})
outputs = layers.Add()([middle['x1+x2'], middle['x1*x2']])
network = functional.Functional(inputs, outputs)
network = functional.Functional.from_config(network.get_config())
# Computes: `(x1+x2) + (x1*x2)`
result_tensor = network(
[array_ops.ones((1, 1), 'float32'),
array_ops.ones((1, 1), 'float32')])
result = self.evaluate(result_tensor)
self.assertAllEqual(result, [[3.]])
output_shape = network.compute_output_shape([(None, 1), (None, 1)])
self.assertListEqual(output_shape.as_list(), [None, 1])
@combinations.generate(combinations.combine(mode=['graph']))
def test_updates_with_direct_call(self):
inputs = input_layer_lib.Input(shape=(10,))
x = layers.BatchNormalization()(inputs)
x = layers.Dense(10)(x)
model = training_lib.Model(inputs, x)
ph = backend.placeholder(shape=(10, 10))
model(ph)
self.assertLen(model.updates, 4)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_dict_mapping_input(self):
class ReturnFirst(layers.Layer):
def call(self, inputs):
b, _ = inputs
return b
# Checks that inputs are put in same order as the
# Model was constructed with.
b = input_layer_lib.Input(shape=(10,), name='b')
a = input_layer_lib.Input(shape=(10,), name='a')
outputs = ReturnFirst()([b, a])
b_val = array_ops.ones((10, 10))
a_val = array_ops.zeros((10, 10))
model = training_lib.Model([b, a], outputs)
res = model({'a': a_val, 'b': b_val})
self.assertAllClose(self.evaluate(res), self.evaluate(b_val))
reversed_model = training_lib.Model([a, b], outputs)
res = reversed_model({'a': a_val, 'b': b_val})
self.assertAllClose(self.evaluate(res), self.evaluate(b_val))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_dict_mapping_single_input(self):
b = input_layer_lib.Input(shape=(1,), name='b')
outputs = b * 2
model = training_lib.Model(b, outputs)
b_val = array_ops.ones((1, 1))
extra_val = array_ops.ones((1, 10))
inputs = {'a': extra_val, 'b': b_val}
res = model(inputs)
# Check that 'b' was used and 'a' was ignored.
self.assertEqual(res.shape.as_list(), [1, 1])
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_nested_dict_mapping(self):
a = input_layer_lib.Input(shape=(1,), dtype='int32', name='a')
b = input_layer_lib.Input(shape=(1,), dtype='int32', name='b')
c = input_layer_lib.Input(shape=(1,), dtype='int32', name='c')
d = input_layer_lib.Input(shape=(1,), dtype='int32', name='d')
inputs = {'a': (a, b), 'c': (c, d)}
outputs = 1000 * a + 100 * b + 10 * c + d
model = training_lib.Model(inputs, outputs)
a_val = array_ops.ones((1, 1), dtype='int32')
b_val = 2 * array_ops.ones((1, 1), dtype='int32')
c_val = 3 * array_ops.ones((1, 1), dtype='int32')
d_val = 4 * array_ops.ones((1, 1), dtype='int32')
inputs_val = {'a': (a_val, b_val), 'c': (c_val, d_val)}
res = model(inputs_val)
# Check that inputs were flattened in the correct order.
self.assertFalse(model._enable_dict_to_input_mapping)
self.assertEqual(self.evaluate(res), [1234])
@combinations.generate(combinations.keras_mode_combinations())
class AddLossTest(keras_parameterized.TestCase):
def test_add_loss_outside_call_only_loss(self):
inputs = input_layer_lib.Input((10,))
mid = layers.Dense(10)(inputs)
outputs = layers.Dense(1)(mid)
model = training_lib.Model(inputs, outputs)
model.add_loss(math_ops.reduce_mean(outputs))
self.assertLen(model.losses, 1)
initial_weights = model.get_weights()
x = np.ones((10, 10))
model.compile(
'sgd',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, batch_size=2, epochs=1)
model2 = model.from_config(model.get_config())
model2.compile(
'sgd',
run_eagerly=testing_utils.should_run_eagerly())
model2.set_weights(initial_weights)
model2.fit(x, batch_size=2, epochs=1)
# The TFOpLayer and the AddLoss layer are serialized.
self.assertLen(model2.layers, 5)
self.assertAllClose(model.get_weights(), model2.get_weights())
def test_add_loss_outside_call_multiple_losses(self):
inputs = input_layer_lib.Input((10,))
x1 = layers.Dense(10)(inputs)
x2 = layers.Dense(10)(x1)
outputs = layers.Dense(1)(x2)
model = training_lib.Model(inputs, outputs)
model.add_loss(math_ops.reduce_sum(x1 * x2))
model.add_loss(math_ops.reduce_mean(outputs))
self.assertLen(model.losses, 2)
initial_weights = model.get_weights()
x, y = np.ones((10, 10)), np.ones((10, 1))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, batch_size=2, epochs=1)
model2 = model.from_config(model.get_config())
model2.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
model2.set_weights(initial_weights)
model2.fit(x, y, batch_size=2, epochs=1)
self.assertAllClose(model.get_weights(), model2.get_weights())
def test_add_loss_crossentropy_backtracking(self):
inputs = input_layer_lib.Input((2,))
labels = input_layer_lib.Input((1,))
outputs = layers.Dense(1, activation='sigmoid')(inputs)
model = functional.Functional([inputs, labels], outputs)
model.add_loss(losses.binary_crossentropy(labels, outputs))
model.compile('adam')
x = np.random.random((2, 2))
y = np.random.random((2, 1))
model.fit([x, y])
inputs = input_layer_lib.Input((2,))
labels = input_layer_lib.Input((2,))
outputs = layers.Dense(2, activation='softmax')(inputs)
model = functional.Functional([inputs, labels], outputs)
model.add_loss(losses.categorical_crossentropy(labels, outputs))
model.compile('adam')
x = np.random.random((2, 2))
y = np.random.random((2, 2))
model.fit([x, y])
inputs = input_layer_lib.Input((2,))
labels = input_layer_lib.Input((1,), dtype='int32')
outputs = layers.Dense(2, activation='softmax')(inputs)
model = functional.Functional([inputs, labels], outputs)
model.add_loss(losses.sparse_categorical_crossentropy(labels, outputs))
model.compile('adam')
x = np.random.random((2, 2))
y = np.random.randint(0, 2, size=(2, 1))
model.fit([x, y])
@combinations.generate(combinations.keras_mode_combinations())
class WeightAccessTest(keras_parameterized.TestCase):
def test_functional_model(self):
inputs = input_layer_lib.Input((10,))
x1 = layers.Dense(10)(inputs)
x2 = layers.Dense(10)(x1)
outputs = layers.Dense(1)(x2)
model = training_lib.Model(inputs, outputs)
self.assertEqual(len(model.weights), 6)
def test_sequential_model_with_input_shape(self):
x1 = layers.Dense(10, input_shape=(10,))
x2 = layers.Dense(10)
x3 = layers.Dense(1)
model = sequential.Sequential([x1, x2, x3])
self.assertEqual(len(model.weights), 6)
def test_sequential_model_without_input_shape(self):
x1 = layers.Dense(10)
x2 = layers.Dense(10)
x3 = layers.Dense(1)
model = sequential.Sequential([x1, x2, x3])
with self.assertRaisesRegex(
ValueError, 'Weights for model .* have not yet been created'):
_ = model.weights
def test_subclass_model_with_build_method(self):
class SubclassModel(models.Model):
def build(self, input_shape):
self.w = self.add_weight(shape=input_shape[-1], initializer='ones')
def call(self, inputs):
return inputs * self.w
model = SubclassModel()
with self.assertRaisesRegex(
ValueError, 'Weights for model .* have not yet been created'):
_ = model.weights
model(input_layer_lib.Input((10,)))
self.assertEqual(len(model.weights), 1)
def test_subclass_model_without_build_method(self):
class SubclassModel(models.Model):
def __init__(self):
super(SubclassModel, self).__init__()
self.w = self.add_weight(shape=(), initializer='ones')
def call(self, inputs):
return inputs * self.w
model = SubclassModel()
self.assertEqual(len(model.weights), 1)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class DTypeTest(keras_parameterized.TestCase):
@testing_utils.enable_v2_dtype_behavior
def test_graph_network_dtype(self):
inputs = input_layer_lib.Input((10,))
outputs = layers.Dense(10)(inputs)
network = functional.Functional(inputs, outputs)
self.assertEqual(network.dtype, 'float32')
@testing_utils.enable_v2_dtype_behavior
def test_subclassed_network_dtype(self):
class IdentityNetwork(training_lib.Model):
def call(self, inputs):
return inputs
network = IdentityNetwork()
self.assertEqual(network.dtype, 'float32')
self.assertEqual(network(array_ops.constant(1, 'float64')).dtype, 'float32')
network = IdentityNetwork(dtype='float16')
self.assertEqual(network.dtype, 'float16')
self.assertEqual(network(array_ops.constant(1, 'float64')).dtype, 'float16')
network = IdentityNetwork(autocast=False)
self.assertEqual(network.dtype, 'float32')
self.assertEqual(network(array_ops.constant(1, 'float64')).dtype, 'float64')
class AttrTrackingLayer(base_layer.Layer):
"""Count how many times `dynamic` and `stateful` are called.
These counts are used to test that the attribute cache behaves as expected.
"""
def __init__(self, *args, **kwargs):
self.stateful_count = 0
self.dynamic_count = 0
super(AttrTrackingLayer, self).__init__(*args, **kwargs)
@base_layer.Layer.stateful.getter
def stateful(self):
self.stateful_count += 1
return super(AttrTrackingLayer, self).stateful
@property
def dynamic(self):
self.dynamic_count += 1
return super(AttrTrackingLayer, self).dynamic
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class CacheCorrectnessTest(keras_parameterized.TestCase):
def layer_and_network_test(self):
# Top level layer
network = functional.Functional()
layer_0 = AttrTrackingLayer()
sub_network = functional.Functional()
layer_1 = AttrTrackingLayer(dynamic=True)
layer_2 = AttrTrackingLayer()
sub_network.sub_layers = [layer_1, layer_2]
network.sub_layer = layer_0
for _ in range(2):
self.assertEqual(network.dynamic, False)
self.assertEqual(network.stateful, False)
# The second pass should be a cache hit.
self.assertEqual(layer_0.dynamic_count, 1)
self.assertEqual(layer_0.stateful_count, 1)
# Mutations of the sub-layer should force recalculation of the network's
# stateful attribute. (mutations bubble up.)
layer_0.stateful = True
self.assertEqual(network.stateful, True)
self.assertEqual(layer_0.stateful_count, 2)
layer_0.stateful = False
self.assertEqual(network.stateful, False)
self.assertEqual(layer_0.stateful_count, 3)
# But changing stateful should not affect dynamic.
self.assertEqual(network.dynamic, False)
self.assertEqual(layer_0.dynamic_count, 1)
network.sub_network = sub_network
# Adding to the topology should invalidate the cache and reflect in the top
# level network.
self.assertEqual(network.dynamic, True)
self.assertEqual(layer_0.dynamic_count, 2)
self.assertEqual(layer_1.dynamic_count, 1)
# Still dynamic, but we need to recompute.
sub_network.sub_layers.pop()
self.assertEqual(network.dynamic, True)
self.assertEqual(layer_0.dynamic_count, 3)
self.assertEqual(layer_1.dynamic_count, 2)
# Now that we've removed the dynamic layer deep in the layer hierarchy, we
# need to make sure that that bubbles up through all the levels.
sub_network.sub_layers.pop()
self.assertEqual(network.dynamic, False)
self.assertEqual(layer_0.dynamic_count, 4)
self.assertEqual(layer_1.dynamic_count, 2)
# Now check with a tracked dict.
sub_network.sub_layers = {
"layer_1": layer_1,
"layer_2": layer_2,
}
self.assertEqual(network.dynamic, True)
self.assertEqual(layer_0.dynamic_count, 5)
self.assertEqual(layer_1.dynamic_count, 3)
# In-place assignment should still invalidate the cache.
sub_network.sub_layers["layer_1"] = layer_1
self.assertEqual(network.dynamic, True)
self.assertEqual(layer_0.dynamic_count, 6)
self.assertEqual(layer_1.dynamic_count, 4)
sub_network.sub_layers["layer_1"] = None
for _ in range(2):
self.assertEqual(network.dynamic, False)
self.assertEqual(layer_0.dynamic_count, 7)
self.assertEqual(layer_1.dynamic_count, 4)
layer_3 = AttrTrackingLayer()
layer_3.stateful = True
sub_network.sub_layers = None
self.assertEqual(network.dynamic, False)
self.assertEqual(network.stateful, False)
# Test duplicate layers.
sub_network.sub_layers = [layer_1, layer_1, layer_1, layer_3]
self.assertEqual(network.dynamic, True)
self.assertEqual(network.stateful, True)
for _ in range(3):
sub_network.sub_layers.pop()
self.assertEqual(network.dynamic, True)
self.assertEqual(network.stateful, False)
sub_network.sub_layers.pop()
self.assertEqual(network.dynamic, False)
self.assertEqual(network.stateful, False)
def test_compute_output_shape_cache(self):
# See https://github.com/tensorflow/tensorflow/issues/32029.
x = input_layer_lib.Input(shape=(None, 32))
dense = layers.Dense(2)
y = dense(x)
network = functional.Functional(x, y, name='dense_network')
for i in range(999, 1024):
self.assertEqual(network.compute_output_shape((1, i, 32)), (1, i, 2))
def test_2d_inputs_squeezed_to_1d(self):
input_1d = input_layer_lib.Input(shape=())
outputs = input_1d * 2.
net = functional.Functional(input_1d, outputs)
x = np.ones((10, 1))
y = net(x)
self.assertEqual(y.shape.rank, 1)
def test_1d_inputs_expanded_to_2d(self):
input_1d = input_layer_lib.Input(shape=(1,))
outputs = input_1d * 2.
net = functional.Functional(input_1d, outputs)
x = np.ones((10,))
y = net(x)
self.assertEqual(y.shape.rank, 2)
def test_training_passed_during_construction(self):
def _call(inputs, training):
if training is None:
return inputs * -1.0
elif training:
return inputs
else:
return inputs * 0.0
class MyLayer(base_layer.Layer):
def call(self, inputs, training=True):
return _call(inputs, training)
my_layer = MyLayer()
x = np.ones((1, 10))
# Hard-coded `true` value passed during construction is respected.
inputs = input_layer_lib.Input(10)
outputs = my_layer(inputs, training=True)
network = functional.Functional(inputs, outputs)
self.assertAllEqual(network(x, training=True), _call(x, True))
self.assertAllEqual(network(x, training=False), _call(x, True))
self.assertAllEqual(network(x), _call(x, True))
# Hard-coded `false` value passed during construction is respected.
inputs = input_layer_lib.Input(10)
outputs = my_layer(inputs, training=False)
network = functional.Functional(inputs, outputs)
self.assertAllEqual(network(x, training=True), _call(x, False))
self.assertAllEqual(network(x, training=False), _call(x, False))
self.assertAllEqual(network(x), _call(x, False))
if context.executing_eagerly():
# In v2, construction still works when no `training` is specified
# When no value passed during construction, it uses the local default.
inputs = input_layer_lib.Input(10)
outputs = my_layer(inputs)
network = functional.Functional(inputs, outputs)
self.assertAllEqual(network(x, training=True), _call(x, True))
self.assertAllEqual(network(x, training=False), _call(x, False))
self.assertAllEqual(network(x), _call(x, True)) # Use local default
# `None` value passed positionally during construction is ignored at runtime
inputs = input_layer_lib.Input(10)
outputs = my_layer(inputs, None)
network = functional.Functional(inputs, outputs)
self.assertAllEqual(network(x, training=True), _call(x, True))
self.assertAllEqual(network(x, training=False), _call(x, False))
if context.executing_eagerly():
self.assertAllEqual(network(x), _call(x, True)) # Use local default
else:
# in v1 training would have defaulted to using the `None` inside the layer
# if training is not passed at runtime
self.assertAllEqual(network(x), _call(x, None))
# `None` value passed as kwarg during construction is ignored at runtime.
inputs = input_layer_lib.Input(10)
outputs = my_layer(inputs, training=None)
network = functional.Functional(inputs, outputs)
self.assertAllEqual(network(x, training=True), _call(x, True))
self.assertAllEqual(network(x, training=False), _call(x, False))
if context.executing_eagerly():
self.assertAllEqual(network(x), _call(x, True)) # Use local default
else:
# in v1 training would have defaulted to using the `None` inside the layer
# if training is not passed at runtime
self.assertAllEqual(network(x), _call(x, None))
class InputsOutputsErrorTest(keras_parameterized.TestCase):
@testing_utils.enable_v2_dtype_behavior
def test_input_error(self):
inputs = input_layer_lib.Input((10,))
outputs = layers.Dense(10)(inputs)
with self.assertRaisesRegex(
TypeError, "('Keyword argument not understood:', 'input')"):
models.Model(input=inputs, outputs=outputs)
@testing_utils.enable_v2_dtype_behavior
def test_output_error(self):
inputs = input_layer_lib.Input((10,))
outputs = layers.Dense(10)(inputs)
with self.assertRaisesRegex(
TypeError, "('Keyword argument not understood:', 'output')"):
models.Model(inputs=inputs, output=outputs)
def test_input_spec(self):
if not context.executing_eagerly():
return
inputs = input_layer_lib.Input((10,))
outputs = layers.Dense(10)(inputs)
model = models.Model(inputs, outputs)
with self.assertRaisesRegex(
ValueError, r'.*expected shape=.*'):
model(np.zeros((3, 11)))
def test_input_spec_list_of_inputs(self):
if not context.executing_eagerly():
return
input_1 = input_layer_lib.Input((10,), name='1')
input_2 = input_layer_lib.Input((5,), name='2')
x = layers.Concatenate()([input_1, input_2])
outputs = layers.Dense(10)(x)
model = models.Model([input_1, input_2], outputs)
with self.assertRaisesRegex(
ValueError, r'.*expects 2 input.*'):
model(np.zeros((3, 10)))
with self.assertRaisesRegex(
ValueError, r'.*expects 2 input.*'):
model([np.zeros((3, 10)), np.zeros((3, 5)), np.zeros((3, 10))])
with self.assertRaisesRegex(
ValueError, r'.*expected shape=.*'):
model([np.zeros((3, 10)), np.zeros((3, 6))])
# Test passing data via dict keyed by input name
with self.assertRaisesRegex(
ValueError, r'Missing data for input.*'):
model({'1': np.zeros((3, 10))})
with self.assertRaisesRegex(
ValueError, r'.*expected shape=.*'):
model({'1': np.zeros((3, 10)), '2': np.zeros((3, 6))})
def test_input_spec_dict(self):
if not context.executing_eagerly():
return
input_1 = input_layer_lib.Input((10,))
input_2 = input_layer_lib.Input((5,))
x = layers.Concatenate()([input_1, input_2])
outputs = layers.Dense(10)(x)
model = models.Model({'1': input_1, '2': input_2}, outputs)
with self.assertRaisesRegex(
ValueError, r'Missing data for input.*'):
model({'1': np.zeros((3, 10))})
with self.assertRaisesRegex(
ValueError, r'.*expected shape=.*'):
model({'1': np.zeros((3, 10)), '2': np.zeros((3, 6))})
class FunctionalSubclassModel(training_lib.Model):
def __init__(self, *args, **kwargs):
my_input = input_layer_lib.Input(shape=(16,))
dense = layers.Dense(32, activation='relu')
output = dense(my_input)
outputs = {'output': output}
super().__init__(inputs=[my_input], outputs=outputs, *args, **kwargs)
class MixinClass(object):
def __init__(self, foo, **kwargs):
self._foo = foo
super().__init__(**kwargs)
def get_foo(self):
return self._foo
class SubclassedModel(training_lib.Model):
def __init__(self, bar, **kwargs):
self._bar = bar
super().__init__(**kwargs)
def get_bar(self):
return self._bar
class MultipleInheritanceModelTest(keras_parameterized.TestCase):
def testFunctionalSubclass(self):
m = FunctionalSubclassModel()
# Some smoke test for the weights and output shape of the model
self.assertLen(m.weights, 2)
self.assertEqual(m.outputs[0].shape.as_list(), [None, 32])
def testFunctionalSubclassPreMixin(self):
class MixedFunctionalSubclassModel(MixinClass, FunctionalSubclassModel):
pass
m = MixedFunctionalSubclassModel(foo='123')
self.assertTrue(m._is_graph_network)
self.assertLen(m.weights, 2)
self.assertEqual(m.outputs[0].shape.as_list(), [None, 32])
self.assertEqual(m.get_foo(), '123')
def testFunctionalSubclassPostMixin(self):
# Make sure the the mixin class is also init correct when the order changed.
class MixedFunctionalSubclassModel(FunctionalSubclassModel, MixinClass):
pass
m = MixedFunctionalSubclassModel(foo='123')
self.assertTrue(m._is_graph_network)
self.assertLen(m.weights, 2)
self.assertEqual(m.outputs[0].shape.as_list(), [None, 32])
self.assertEqual(m.get_foo(), '123')
def testSubclassModelPreMixin(self):
class MixedSubclassModel(MixinClass, SubclassedModel):
pass
m = MixedSubclassModel(foo='123', bar='456')
self.assertFalse(m._is_graph_network)
self.assertEqual(m.get_foo(), '123')
self.assertEqual(m.get_bar(), '456')
if __name__ == '__main__':
test.main()