With this change, it is now possible to mix-and-match tf.keras.Layers and tf.Modules inside a tf.keras.Model and everything will be tracked properly. - Variables in tf.Modules that are set as attributes of custom Layers and Models now show up properly in properties such as Layer.trainable_variables and Model.trainable_variables. - tf.Modules do not show up in Model.layers. Instead, a new method Layer._flatten_modules is added that iterates over tf.Modules and Layers in the order that Keras expects. The existing method Layer.submodules (inherited from tf.Module) can still be used to iterate over tf.Modules and Layer with the tf.Module ordering. Layer._flatten_layers is built on top of Layer._flatten_modules - Layer._layers is renamed to Layer._self_tracked_trackables to avoid naming conflicts with user-defined attributes (and to reflect that this attr contains Layers, Modules, and TrackableDataStructures) - A new property is added to tf.Module to enable this, namely tf.Module.non_trainable_variables PiperOrigin-RevId: 339917644 Change-Id: I96a7302745280a6261de8c4295c5cbf5f4d7dd5c
1729 lines
59 KiB
Python
1729 lines
59 KiB
Python
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""Tests for TensorFlow 2.0 layer behavior."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import copy
|
|
import os
|
|
import sys
|
|
import traceback
|
|
|
|
import numpy as np
|
|
|
|
from tensorflow.python.eager import context
|
|
from tensorflow.python.eager import def_function
|
|
from tensorflow.python.framework import composite_tensor
|
|
from tensorflow.python.framework import constant_op
|
|
from tensorflow.python.framework import dtypes
|
|
from tensorflow.python.framework import errors_impl
|
|
from tensorflow.python.framework import ops
|
|
from tensorflow.python.framework import sparse_tensor
|
|
from tensorflow.python.framework import tensor_spec
|
|
from tensorflow.python.framework import type_spec
|
|
from tensorflow.python.keras import backend
|
|
from tensorflow.python.keras import combinations
|
|
from tensorflow.python.keras import keras_parameterized
|
|
from tensorflow.python.keras import layers
|
|
from tensorflow.python.keras import regularizers
|
|
from tensorflow.python.keras import testing_utils
|
|
from tensorflow.python.keras.engine import base_layer
|
|
from tensorflow.python.keras.engine import input_layer
|
|
from tensorflow.python.keras.engine import sequential
|
|
from tensorflow.python.keras.engine import training as training_lib
|
|
from tensorflow.python.keras.legacy_tf_layers import core as legacy_core
|
|
from tensorflow.python.keras.optimizer_v2 import rmsprop
|
|
from tensorflow.python.keras.utils import control_flow_util
|
|
from tensorflow.python.module import module
|
|
from tensorflow.python.ops import array_ops
|
|
from tensorflow.python.ops import math_ops
|
|
from tensorflow.python.ops import state_ops
|
|
from tensorflow.python.ops import summary_ops_v2
|
|
from tensorflow.python.ops import tensor_array_ops
|
|
from tensorflow.python.ops import variables
|
|
from tensorflow.python.ops.ragged import ragged_tensor
|
|
from tensorflow.python.platform import gfile
|
|
from tensorflow.python.platform import test
|
|
from tensorflow.python.summary import summary_iterator
|
|
from tensorflow.python.util import nest
|
|
|
|
|
|
class DynamicLayer(base_layer.Layer):
|
|
|
|
def __init__(self, dynamic=False, **kwargs):
|
|
super(DynamicLayer, self).__init__(dynamic=dynamic, **kwargs)
|
|
|
|
def call(self, inputs):
|
|
samples = tensor_array_ops.TensorArray(
|
|
dtype=dtypes.float32, size=array_ops.shape(inputs)[0])
|
|
for idx, sample in enumerate(inputs):
|
|
samples = samples.write(idx, math_ops.square(sample))
|
|
return samples.stack()
|
|
|
|
def compute_output_shape(self, input_shape):
|
|
return input_shape
|
|
|
|
|
|
class InvalidLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs):
|
|
raise ValueError('You did something wrong!')
|
|
|
|
|
|
class BaseLayerTest(keras_parameterized.TestCase):
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_layer_instrumentation(self):
|
|
layer = layers.Add()
|
|
self.assertTrue(layer._instrumented_keras_api)
|
|
self.assertTrue(layer._instrumented_keras_layer_class)
|
|
self.assertFalse(layer._instrumented_keras_model_class)
|
|
|
|
@combinations.generate(combinations.times(
|
|
combinations.keras_model_type_combinations(),
|
|
combinations.keras_tensor_combinations()))
|
|
def test_dynamic_layer(self):
|
|
model = testing_utils.get_model_from_layers([DynamicLayer(dynamic=True)],
|
|
input_shape=(3,))
|
|
self.assertEqual(model.dynamic, True)
|
|
model.compile(rmsprop.RMSprop(0.001), loss='mse')
|
|
self.assertEqual(model.run_eagerly, True)
|
|
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
|
|
|
|
@combinations.generate(combinations.times(
|
|
combinations.keras_model_type_combinations(),
|
|
combinations.keras_tensor_combinations()))
|
|
def test_dynamic_layer_error(self):
|
|
# Functional Models hit the `dyanamic=True` error during construction.
|
|
# Subclass Models should just throw the original autograph error during
|
|
# execution.
|
|
raised_error = False
|
|
try:
|
|
model = testing_utils.get_model_from_layers([DynamicLayer()],
|
|
input_shape=(3,))
|
|
model.compile(rmsprop.RMSprop(0.001), loss='mse')
|
|
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
|
|
except errors_impl.OperatorNotAllowedInGraphError as e:
|
|
if 'iterating over `tf.Tensor` is not allowed' in str(e):
|
|
raised_error = True
|
|
except TypeError as e:
|
|
if 'attempting to use Python control flow' in str(e):
|
|
raised_error = True
|
|
self.assertTrue(raised_error)
|
|
|
|
@combinations.generate(combinations.times(
|
|
combinations.keras_model_type_combinations(),
|
|
combinations.keras_tensor_combinations()))
|
|
def test_dynamic_layer_error_running_in_graph_mode(self):
|
|
with ops.get_default_graph().as_default():
|
|
model = testing_utils.get_model_from_layers([DynamicLayer(dynamic=True)],
|
|
input_shape=(3,))
|
|
self.assertEqual(model.dynamic, True)
|
|
# But then you cannot run the model since you're in a graph scope.
|
|
with self.assertRaisesRegex(ValueError,
|
|
'You must enable eager execution'):
|
|
model.compile(rmsprop.RMSprop(0.001), loss='mse')
|
|
|
|
def test_manual_compute_output_shape(self):
|
|
|
|
class BuildCounter(base_layer.Layer):
|
|
|
|
def __init__(self, *args, **kwargs): # pylint: disable=redefined-outer-name
|
|
super(BuildCounter, self).__init__(*args, **kwargs)
|
|
self.build_counter = 0
|
|
|
|
def build(self, input_shape):
|
|
self.build_counter += 1
|
|
self.build_shape = input_shape
|
|
|
|
def call(self, inputs):
|
|
return inputs
|
|
|
|
layer = BuildCounter(dtype=dtypes.float64)
|
|
output_shape = layer.compute_output_shape((None, 10))
|
|
self.assertEqual(layer.build_counter, 1)
|
|
self.assertEqual(layer.build_shape.as_list(), [None, 10])
|
|
self.assertEqual(output_shape.as_list(), [None, 10])
|
|
output_signature = layer.compute_output_signature(
|
|
tensor_spec.TensorSpec(dtype=dtypes.float64, shape=[None, 10]))
|
|
self.assertEqual(layer.build_counter, 1)
|
|
self.assertEqual(layer.build_shape.as_list(), [None, 10])
|
|
self.assertEqual(output_signature.dtype, dtypes.float64)
|
|
self.assertEqual(output_signature.shape.as_list(), [None, 10])
|
|
layer(np.ones((5, 10)))
|
|
self.assertEqual(layer.build_counter, 1)
|
|
self.assertEqual(layer.build_shape.as_list(), [None, 10])
|
|
|
|
def test_dynamic_layer_with_deferred_sequential_model(self):
|
|
model = sequential.Sequential([DynamicLayer(dynamic=True), layers.Dense(3)])
|
|
self.assertEqual(model.dynamic, True)
|
|
model.compile(rmsprop.RMSprop(0.001), loss='mse')
|
|
self.assertEqual(model.run_eagerly, True)
|
|
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
|
|
|
|
def test_nested_dynamic_layers_in_eager_mode(self):
|
|
inputs = input_layer.Input((3,))
|
|
outputs = DynamicLayer(dynamic=True)(inputs)
|
|
inner_model = training_lib.Model(inputs, outputs)
|
|
self.assertEqual(inner_model.dynamic, True)
|
|
|
|
inputs = input_layer.Input((3,))
|
|
x = DynamicLayer(dynamic=True)(inputs)
|
|
outputs = inner_model(x)
|
|
|
|
model = training_lib.Model(inputs, outputs)
|
|
self.assertEqual(model.dynamic, True)
|
|
model.compile(rmsprop.RMSprop(0.001), loss='mse')
|
|
self.assertEqual(model.run_eagerly, True)
|
|
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
|
|
|
|
def test_dynamic_subclassed_model_no_shape_inference(self):
|
|
|
|
class MyModel(training_lib.Model):
|
|
|
|
def __init__(self):
|
|
super(MyModel, self).__init__(dynamic=True)
|
|
self.layer1 = layers.Dense(3)
|
|
self.layer2 = layers.Dense(3)
|
|
|
|
def call(self, inputs):
|
|
if math_ops.reduce_sum(inputs) > 0:
|
|
return self.layer1(inputs)
|
|
else:
|
|
return self.layer2(inputs)
|
|
|
|
model = MyModel()
|
|
self.assertEqual(model.dynamic, True)
|
|
model.compile(rmsprop.RMSprop(0.001), loss='mse')
|
|
self.assertEqual(model.run_eagerly, True)
|
|
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
|
|
self.assertEqual(model.outputs, None)
|
|
|
|
def test_dynamic_subclassed_model_with_shape_inference(self):
|
|
|
|
class MyModel(training_lib.Model):
|
|
|
|
def __init__(self):
|
|
super(MyModel, self).__init__(dynamic=True)
|
|
self.layer1 = layers.Dense(3)
|
|
self.layer2 = layers.Dense(3)
|
|
|
|
def call(self, inputs):
|
|
if math_ops.reduce_sum(inputs) > 0:
|
|
return self.layer1(inputs)
|
|
else:
|
|
return self.layer2(inputs)
|
|
|
|
def compute_output_shape(self, input_shape):
|
|
return tuple(input_shape[:-1].as_list()) + (3,)
|
|
|
|
model = MyModel()
|
|
self.assertEqual(model.dynamic, True)
|
|
model.compile(rmsprop.RMSprop(0.001), loss='mse')
|
|
x, y = np.random.random((2, 3)), np.random.random((2, 3))
|
|
model.train_on_batch(x, y)
|
|
outputs = model(x)
|
|
self.assertEqual(outputs.shape.as_list(), [2, 3])
|
|
|
|
def test_deepcopy(self):
|
|
bias_reg = lambda x: 1e-3 * math_ops.reduce_sum(x)
|
|
layer = layers.Conv2D(32, (3, 3), bias_regularizer=bias_reg)
|
|
# Call the Layer on data to generate regularize losses.
|
|
layer(array_ops.ones((1, 10, 10, 3)))
|
|
self.assertLen(layer.losses, 1)
|
|
new_layer = copy.deepcopy(layer)
|
|
self.assertEqual(new_layer.bias_regularizer, bias_reg)
|
|
self.assertEqual(layer.get_config(), new_layer.get_config())
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_invalid_forward_pass(self):
|
|
inputs = input_layer.Input((3,))
|
|
with self.assertRaisesRegex(ValueError, 'You did something wrong!'):
|
|
_ = InvalidLayer()(inputs)
|
|
|
|
def test_no_legacy_model(self):
|
|
inputs = input_layer.Input((1,))
|
|
legacy_dense_0 = legacy_core.Dense(1, name='legacy_dense_0')
|
|
legacy_dense_1 = legacy_core.Dense(1, name='legacy_dense_1')
|
|
|
|
layer = legacy_dense_0(inputs)
|
|
layer = layers.Dense(1)(layer)
|
|
layer = legacy_dense_1(layer)
|
|
|
|
expected_regex = (r'The following are legacy tf\.layers\.Layers:\n '
|
|
'{}\n {}'.format(legacy_dense_0, legacy_dense_1))
|
|
|
|
with self.assertRaisesRegex(TypeError, expected_regex):
|
|
_ = training_lib.Model(inputs=[inputs], outputs=[layer])
|
|
|
|
model = training_lib.Model(inputs=[inputs], outputs=[inputs])
|
|
with self.assertRaisesRegex(TypeError, expected_regex):
|
|
model._insert_layers([legacy_dense_0, legacy_dense_1])
|
|
|
|
def test_no_legacy_sequential(self):
|
|
layer = [layers.Dense(1), legacy_core.Dense(1, name='legacy_dense_0')]
|
|
|
|
expected_regex = r'legacy tf\.layers\.Layers:\n {}'.format(layer[1])
|
|
with self.assertRaisesRegex(TypeError, expected_regex):
|
|
_ = sequential.Sequential(layer)
|
|
|
|
with self.assertRaisesRegex(TypeError, expected_regex):
|
|
_ = sequential.Sequential([input_layer.Input(shape=(4,))] + layer)
|
|
|
|
model = sequential.Sequential()
|
|
with self.assertRaisesRegex(TypeError, expected_regex):
|
|
for l in layer:
|
|
model.add(l)
|
|
|
|
@combinations.generate(
|
|
combinations.times(
|
|
combinations.keras_model_type_combinations(),
|
|
combinations.keras_tensor_combinations(),
|
|
combinations.combine(mode=['graph', 'eager'])))
|
|
def test_build_with_numpy_data(self):
|
|
model_layers = [
|
|
layers.Dense(3, activation='relu', kernel_initializer='ones'),
|
|
layers.Dense(1, activation='sigmoid', kernel_initializer='ones')
|
|
]
|
|
model = testing_utils.get_model_from_layers(model_layers, input_shape=(4,))
|
|
model(np.zeros((2, 4), dtype='float32'))
|
|
self.assertTrue(model.built)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_default_add_weight(self):
|
|
|
|
class TestLayer(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
super(TestLayer, self).__init__()
|
|
self.default_weight = self.add_weight()
|
|
self.weight_without_name = self.add_weight(shape=(3, 4))
|
|
self.regularized_weight_without_name = self.add_weight(
|
|
shape=(3, 4), regularizer='l2')
|
|
|
|
layer = TestLayer()
|
|
self.assertEqual(layer.default_weight.shape.as_list(), [])
|
|
self.assertEqual(layer.weight_without_name.shape.as_list(), [3, 4])
|
|
self.assertEqual(layer.default_weight.dtype.name, 'float32')
|
|
self.assertEqual(layer.weight_without_name.dtype.name, 'float32')
|
|
self.assertEqual(len(layer.losses), 1)
|
|
if not context.executing_eagerly():
|
|
# Cannot access tensor.name in eager execution.
|
|
self.assertIn('Variable_2/Regularizer', layer.losses[0].name)
|
|
|
|
@combinations.generate(combinations.keras_mode_combinations(mode=['eager']))
|
|
def test_learning_phase_freezing_for_layers(self):
|
|
|
|
class LearningPhaseLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs):
|
|
return backend.in_train_phase(lambda: array_ops.ones_like(inputs),
|
|
lambda: array_ops.zeros_like(inputs))
|
|
|
|
def get_learning_phase_value():
|
|
model = sequential.Sequential([LearningPhaseLayer(input_shape=(1,))])
|
|
model._run_eagerly = testing_utils.should_run_eagerly()
|
|
return np.sum(model(np.ones((1, 1))))
|
|
|
|
self.assertEqual(get_learning_phase_value(), 0)
|
|
|
|
# Test scope.
|
|
with backend.learning_phase_scope(1):
|
|
self.assertEqual(get_learning_phase_value(), 1)
|
|
|
|
# The effects of the scope end after exiting it.
|
|
self.assertEqual(get_learning_phase_value(), 0)
|
|
|
|
# Test setting.
|
|
backend.set_learning_phase(1)
|
|
self.assertEqual(get_learning_phase_value(), 1)
|
|
backend.set_learning_phase(0)
|
|
self.assertEqual(get_learning_phase_value(), 0)
|
|
|
|
# Cannot be enabled with `run_eagerly=True`, see b/123904578
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_layer_can_return_variable(self):
|
|
|
|
class ComputeSum(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
super(ComputeSum, self).__init__()
|
|
self.total = variables.Variable(
|
|
initial_value=array_ops.zeros((1, 1)), trainable=False)
|
|
if not context.executing_eagerly():
|
|
backend.get_session().run(self.total.initializer)
|
|
|
|
def call(self, inputs):
|
|
self.total.assign_add(inputs)
|
|
return self.total
|
|
|
|
inputs = input_layer.Input(shape=(1,))
|
|
model = training_lib.Model(inputs, ComputeSum()(inputs))
|
|
model.predict(np.ones((1, 1)))
|
|
|
|
def _get_layer_with_training_arg(self):
|
|
|
|
class TrainingLayer(base_layer.Layer):
|
|
"""A layer with a `training` argument in a defuned `call`."""
|
|
|
|
@def_function.function
|
|
def call(self, inputs, training=None):
|
|
if training is None:
|
|
training = backend.learning_phase()
|
|
return control_flow_util.smart_cond(
|
|
training, lambda: array_ops.ones_like(inputs),
|
|
lambda: array_ops.zeros_like(inputs))
|
|
|
|
return TrainingLayer()
|
|
|
|
# b/124459427: can't test with `run_eagerly=True` for now.
|
|
@combinations.generate(
|
|
combinations.times(combinations.keras_mode_combinations(),
|
|
combinations.keras_model_type_combinations(),
|
|
combinations.keras_tensor_combinations()))
|
|
def test_training_arg_in_defun(self):
|
|
layer = self._get_layer_with_training_arg()
|
|
model = testing_utils.get_model_from_layers([layer], input_shape=(1,))
|
|
model.compile(rmsprop.RMSprop(0.),
|
|
loss='mae')
|
|
history = model.fit(np.zeros((1, 1)), np.zeros((1, 1)))
|
|
self.assertEqual(history.history['loss'][0], 1.)
|
|
loss = model.evaluate(np.zeros((1, 1)), np.zeros((1, 1)))
|
|
self.assertEqual(loss, 0.)
|
|
|
|
# Test that the argument injection performed in `call` is not active
|
|
# when the argument is passed explicitly.
|
|
layer = self._get_layer_with_training_arg()
|
|
inputs = input_layer.Input(shape=(1,))
|
|
# Pass `training` by name
|
|
outputs = layer(inputs, training=False)
|
|
model = training_lib.Model(inputs, outputs)
|
|
model.compile(rmsprop.RMSprop(0.),
|
|
loss='mae')
|
|
history = model.fit(np.zeros((1, 1)), np.zeros((1, 1)))
|
|
self.assertEqual(history.history['loss'][0], 0.)
|
|
|
|
@combinations.generate(
|
|
combinations.times(combinations.keras_mode_combinations(),
|
|
combinations.keras_model_type_combinations(),
|
|
combinations.keras_tensor_combinations()))
|
|
def test_raw_variable_assignment(self):
|
|
|
|
class RawVariableLayer(base_layer.Layer):
|
|
|
|
def __init__(self, **kwargs):
|
|
super(RawVariableLayer, self).__init__(**kwargs)
|
|
# Test variables in nested structure.
|
|
self.var_list = [variables.Variable(1.), {'a': variables.Variable(2.)}]
|
|
|
|
def call(self, inputs):
|
|
return inputs * self.var_list[0] * self.var_list[1]['a']
|
|
|
|
model = testing_utils.get_model_from_layers([RawVariableLayer()],
|
|
input_shape=(10,))
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
x, y = np.ones((10, 10)), np.ones((10, 10))
|
|
# Checks that variables get initialized.
|
|
model.fit(x, y, batch_size=2, epochs=2)
|
|
|
|
@combinations.generate(combinations.combine(mode=['eager']))
|
|
def test_composite_variable_assignment(self):
|
|
|
|
class Spec(type_spec.TypeSpec):
|
|
|
|
value_type = property(lambda self: CompositeVariable)
|
|
|
|
def _component_specs(self):
|
|
pass
|
|
|
|
def _serialize(self):
|
|
pass
|
|
|
|
def _to_components(self, value):
|
|
return value._variables
|
|
|
|
def _from_components(self, variable_list):
|
|
return CompositeVariable(variable_list)
|
|
|
|
class CompositeVariable(composite_tensor.CompositeTensor):
|
|
|
|
def __init__(self, variable_list):
|
|
self._variables = variable_list
|
|
|
|
@property
|
|
def _type_spec(self):
|
|
return Spec()
|
|
|
|
class CompositeVariableLayer(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.composite_var = CompositeVariable(
|
|
[variables.Variable(1.),
|
|
variables.Variable(2.)])
|
|
|
|
layer = CompositeVariableLayer()
|
|
self.assertLen(layer.weights, 2)
|
|
self.assertIsInstance(layer.weights[0], variables.Variable)
|
|
self.assertIsInstance(layer.weights[1], variables.Variable)
|
|
self.assertEqual(self.evaluate(layer.weights[0]), 1.)
|
|
self.assertEqual(self.evaluate(layer.weights[1]), 2.)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_layer_names(self):
|
|
with testing_utils.use_keras_tensors_scope(False):
|
|
inputs = input_layer.Input(shape=[2])
|
|
add1 = inputs + inputs
|
|
add2 = layers.Add()([inputs, inputs])
|
|
add3 = inputs + inputs
|
|
add4 = layers.Add()([inputs, inputs])
|
|
model = training_lib.Model(
|
|
inputs=[inputs], outputs=[add1, add2, add3, add4])
|
|
actual_names = [l.name for l in model.layers]
|
|
graph_names = [
|
|
'input_1', 'tf_op_layer_AddV2', 'add', 'tf_op_layer_AddV2_1', 'add_1'
|
|
]
|
|
eager_names = [
|
|
'input_1', 'tf_op_layer_add', 'add', 'tf_op_layer_add_2', 'add_1'
|
|
]
|
|
for actual, eager, graph in zip(actual_names, graph_names, eager_names):
|
|
self.assertIn(actual, {eager, graph})
|
|
if context.executing_eagerly():
|
|
backend.clear_session()
|
|
with testing_utils.use_keras_tensors_scope(True):
|
|
inputs = input_layer.Input(shape=[2])
|
|
add1 = inputs + inputs
|
|
add2 = layers.Add()([inputs, inputs])
|
|
add3 = inputs + inputs
|
|
add4 = layers.Add()([inputs, inputs])
|
|
model = training_lib.Model(
|
|
inputs=[inputs], outputs=[add1, add2, add3, add4])
|
|
actual_names = [l.name for l in model.layers]
|
|
expected_names = [
|
|
'input_1', 'tf.__operators__.add', 'add', 'tf.__operators__.add_1',
|
|
'add_1'
|
|
]
|
|
self.assertAllEqual(actual_names, expected_names)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_layer_names_after_loading(self):
|
|
if context.executing_eagerly():
|
|
backend.clear_session()
|
|
with testing_utils.use_keras_tensors_scope(True):
|
|
# Mimic loading a model that already contained add layers with
|
|
# name = 'add_1' and 'tf.__operators__.add'
|
|
layers.Add(name='add_1')
|
|
layers.Add(name='tf.__operators__.add')
|
|
|
|
inputs = input_layer.Input(shape=[2])
|
|
add1 = inputs + inputs
|
|
add2 = layers.Add()([inputs, inputs])
|
|
add3 = inputs + inputs
|
|
add4 = layers.Add()([inputs, inputs])
|
|
model = training_lib.Model(
|
|
inputs=[inputs], outputs=[add1, add2, add3, add4])
|
|
actual_names = [l.name for l in model.layers]
|
|
# The generated op layer names should have avoided layer names seen in
|
|
# the loaded model. (This avoiance should not apply to non-op-layers)
|
|
expected_names = [
|
|
'input_1', 'tf.__operators__.add_1',
|
|
'add', 'tf.__operators__.add_2', 'add_1'
|
|
]
|
|
self.assertAllEqual(actual_names, expected_names)
|
|
|
|
def test_add_trainable_weight_on_frozen_layer(self):
|
|
|
|
class TestLayer(base_layer.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.w = self.add_weight(shape=(), trainable=True)
|
|
|
|
def call(self, inputs):
|
|
return self.w * inputs
|
|
|
|
layer = TestLayer()
|
|
layer.trainable = False
|
|
layer.build(None)
|
|
layer.trainable = True
|
|
self.assertListEqual(layer.trainable_weights, [layer.w])
|
|
|
|
@combinations.generate(
|
|
combinations.times(combinations.keras_mode_combinations(),
|
|
combinations.keras_model_type_combinations()))
|
|
def test_passing_initial_weights_values(self):
|
|
kernel_value = np.random.random((10, 2))
|
|
layer_with_weights = layers.Dense(2, use_bias=False, weights=[kernel_value])
|
|
|
|
model = testing_utils.get_model_from_layers([layer_with_weights],
|
|
input_shape=(10,))
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
inputs = np.random.random((3, 10))
|
|
out = model.predict(inputs)
|
|
self.assertAllClose(model.layers[-1].get_weights()[0], kernel_value)
|
|
self.assertAllClose(out, np.dot(inputs, kernel_value))
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_set_weights_and_get_weights(self):
|
|
layer = layers.Dense(2)
|
|
layer.build((None, 10))
|
|
kernel = np.random.random((10, 2))
|
|
bias = np.random.random((2,))
|
|
layer.set_weights([kernel, bias])
|
|
weights = layer.get_weights()
|
|
self.assertEqual(len(weights), 2)
|
|
self.assertAllClose(weights[0], kernel)
|
|
self.assertAllClose(weights[1], bias)
|
|
with self.assertRaisesRegex(ValueError,
|
|
'but the layer was expecting 2 weights'):
|
|
layer.set_weights([1, 2, 3])
|
|
with self.assertRaisesRegex(ValueError,
|
|
'not compatible with provided weight shape'):
|
|
layer.set_weights([kernel.T, bias])
|
|
|
|
def test_get_config_error(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def __init__(self, my_kwarg='default', **kwargs):
|
|
super(MyLayer, self).__init__(**kwargs)
|
|
self.my_kwarg = my_kwarg
|
|
|
|
# `__init__` includes kwargs but `get_config` is not overridden, so
|
|
# an error should be thrown:
|
|
with self.assertRaisesRegex(NotImplementedError, 'Layer MyLayer has'):
|
|
MyLayer('custom').get_config()
|
|
|
|
class MyLayerNew(base_layer.Layer):
|
|
|
|
def __init__(self, my_kwarg='default', **kwargs):
|
|
super(MyLayerNew, self).__init__(**kwargs)
|
|
self.my_kwarg = my_kwarg
|
|
|
|
def get_config(self):
|
|
config = super(MyLayerNew, self).get_config()
|
|
config['my_kwarg'] = self.my_kwarg
|
|
return config
|
|
|
|
# Test to make sure that error is not raised if the method call is
|
|
# from an overridden `get_config`:
|
|
self.assertEqual(MyLayerNew('custom').get_config()['my_kwarg'], 'custom')
|
|
|
|
class MyLayerNew2(base_layer.Layer):
|
|
|
|
def __init__(self, name='MyLayerName', dtype=None, **kwargs): # pylint:disable=redefined-outer-name
|
|
super(MyLayerNew2, self).__init__(name=name, dtype=dtype, **kwargs)
|
|
|
|
# Check that if the kwargs in `__init__` are base layer constructor
|
|
# arguments, no error is thrown:
|
|
self.assertEqual(MyLayerNew2(name='New').get_config()['name'], 'New')
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_count_params(self):
|
|
dense = layers.Dense(16)
|
|
dense.build((None, 4))
|
|
self.assertEqual(dense.count_params(), 16 * 4 + 16)
|
|
|
|
dense = layers.Dense(16)
|
|
with self.assertRaisesRegex(ValueError, 'call `count_params`'):
|
|
dense.count_params()
|
|
|
|
model = sequential.Sequential(layers.Dense(16))
|
|
with self.assertRaisesRegex(ValueError, 'call `count_params`'):
|
|
model.count_params()
|
|
|
|
dense = layers.Dense(16, input_dim=4)
|
|
model = sequential.Sequential(dense)
|
|
self.assertEqual(model.count_params(), 16 * 4 + 16)
|
|
|
|
def test_super_not_called(self):
|
|
|
|
class CustomLayerNotCallingSuper(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
layer = CustomLayerNotCallingSuper()
|
|
with self.assertRaisesRegex(RuntimeError, 'You must call `super()'):
|
|
layer(np.random.random((10, 2)))
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_first_arg_not_called_inputs(self):
|
|
x, y = array_ops.ones((10, 1)), array_ops.ones((10, 1))
|
|
|
|
class ArgLayer(base_layer.Layer):
|
|
|
|
def call(self, x, y):
|
|
return x + y
|
|
|
|
layer = ArgLayer()
|
|
out = self.evaluate(layer(x=x, y=y))
|
|
self.assertAllClose(out, 2 * np.ones((10, 1)))
|
|
|
|
class KwargLayer(base_layer.Layer):
|
|
|
|
def call(self, x=None, y=None):
|
|
return x + y
|
|
|
|
layer = KwargLayer()
|
|
out = self.evaluate(layer(x=x, y=y))
|
|
self.assertAllClose(out, 2 * np.ones((10, 1)))
|
|
|
|
with self.assertRaisesRegex(ValueError, 'must always be passed'):
|
|
layer(y=y)
|
|
|
|
class TFFunctionLayer(base_layer.Layer):
|
|
|
|
@def_function.function
|
|
def call(self, x, y=None):
|
|
if y is None:
|
|
return x
|
|
return x + y
|
|
|
|
layer = TFFunctionLayer()
|
|
out = self.evaluate(layer(x=x, y=y))
|
|
self.assertAllClose(out, 2 * np.ones((10, 1)))
|
|
|
|
def test_build_input_shape(self):
|
|
|
|
class CustomLayer(base_layer.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.add_weight('w', shape=input_shape[1:])
|
|
super(CustomLayer, self).build(input_shape)
|
|
|
|
layer = CustomLayer()
|
|
self.assertFalse(layer.built)
|
|
|
|
layer.build([None, 1, 2, 3])
|
|
self.assertTrue(layer.built)
|
|
self.assertEqual([None, 1, 2, 3], layer._build_input_shape)
|
|
|
|
layer = CustomLayer()
|
|
layer(input_layer.Input((3,)))
|
|
self.assertTrue(layer.built)
|
|
self.assertEqual([None, 3], layer._build_input_shape.as_list())
|
|
|
|
@combinations.generate(combinations.combine(mode=['eager']))
|
|
def custom_layer_training_arg(self):
|
|
class CustomLayerNoTrainingArg(base_layer.Layer):
|
|
|
|
def __init__(self, nested_layer=None):
|
|
self._nested_layer = nested_layer or array_ops.identity
|
|
|
|
def call(self, inputs):
|
|
return self._nested_layer(inputs)
|
|
|
|
class CustomLayerDefaultTrainingMissing(base_layer.Layer):
|
|
|
|
def __init__(self, nested_layer=None):
|
|
self._nested_layer = nested_layer or array_ops.identity
|
|
|
|
def call(self, inputs, training):
|
|
if training:
|
|
return self._nested_layer(inputs)
|
|
else:
|
|
return self._nested_layer(inputs) * 0.5
|
|
|
|
class CustomLayerDefaultTrainingNone(base_layer.Layer):
|
|
|
|
def __init__(self, nested_layer=None):
|
|
self._nested_layer = nested_layer or array_ops.identity
|
|
|
|
def call(self, inputs, training=None):
|
|
if training:
|
|
return self._nested_layer(inputs)
|
|
else:
|
|
return self._nested_layer(inputs) * 0.5
|
|
|
|
class CustomLayerDefaultTrainingFalse(base_layer.Layer):
|
|
|
|
def __init__(self, nested_layer=None):
|
|
self._nested_layer = nested_layer or array_ops.identity
|
|
|
|
def call(self, inputs, training=False):
|
|
if training:
|
|
return self._nested_layer(inputs)
|
|
else:
|
|
return self._nested_layer(inputs) * 0.5
|
|
|
|
class CustomLayerDefaultTrainingTrue(base_layer.Layer):
|
|
|
|
def __init__(self, nested_layer=None):
|
|
self._nested_layer = nested_layer or array_ops.identity
|
|
|
|
def call(self, inputs, training=True):
|
|
if training:
|
|
return self._nested_layer(inputs)
|
|
else:
|
|
return self._nested_layer(inputs) * 0.5
|
|
|
|
x = array_ops.ones(shape=(1, 1))
|
|
|
|
# If the layer signature doesn't specify a default training arg,
|
|
# run it in inference mode when to training arg is passed
|
|
# to __call__
|
|
layer = CustomLayerDefaultTrainingMissing()
|
|
self.assertAllEqual(layer(x), x * 0.5)
|
|
self.assertAllEqual(layer(x, training=False), x * 0.5)
|
|
self.assertAllEqual(layer(x, training=True), x)
|
|
|
|
# If the layer signature specifies `False` as the default training arg,
|
|
# run it in inference mode when no training arg is passed
|
|
# to __call__
|
|
layer = CustomLayerDefaultTrainingFalse()
|
|
self.assertAllEqual(layer(x), x * 0.5)
|
|
self.assertAllEqual(layer(x, training=False), x * 0.5)
|
|
self.assertAllEqual(layer(x, training=True), x)
|
|
|
|
# If the layer signature specifies `True` as the default training arg,
|
|
# explicitly run it in training mode when no training arg is passed
|
|
# to __call__
|
|
layer = CustomLayerDefaultTrainingTrue()
|
|
self.assertAllEqual(layer(x), x)
|
|
self.assertAllEqual(layer(x, training=False), x * 0.5)
|
|
self.assertAllEqual(layer(x, training=True), x)
|
|
|
|
# Outer layers/models should set the training context implicitly for all
|
|
# nested layers, respecting whatever mode the outer layer was run with.
|
|
layer = CustomLayerDefaultTrainingTrue(CustomLayerDefaultTrainingFalse())
|
|
# No outer value passed: use local defaults
|
|
self.assertAllEqual(layer(x), x * 0.25) # Use local default False
|
|
# Outer value passed: override local defaults
|
|
self.assertAllEqual(layer(x, training=False), x * 0.25)
|
|
self.assertAllEqual(layer(x, training=True), x)
|
|
|
|
layer = CustomLayerDefaultTrainingFalse(CustomLayerDefaultTrainingTrue())
|
|
# No outer value passed: use local defaults
|
|
self.assertAllEqual(layer(x), x) # Use local default True
|
|
# Outer value passed: override local defaults
|
|
self.assertAllEqual(layer(x, training=False), x * 0.25)
|
|
self.assertAllEqual(layer(x, training=True), x)
|
|
|
|
# If the outer layer `call` doesn't take a training argument at all,
|
|
# it'll set the nested scope as None when no training arg is passed in.
|
|
# If a training arg is passed in it won't use it directly in `call`, but
|
|
# it will set the nested training mode.
|
|
layer = CustomLayerNoTrainingArg(CustomLayerDefaultTrainingTrue())
|
|
self.assertAllEqual(layer(x), x) # Use local default True
|
|
self.assertAllEqual(layer(x, training=False), x * 0.5)
|
|
self.assertAllEqual(layer(x, training=True), x)
|
|
|
|
layer = CustomLayerDefaultTrainingNone(CustomLayerDefaultTrainingTrue())
|
|
self.assertAllEqual(layer(x), x) # Use local default True
|
|
self.assertAllEqual(layer(x, training=False), x * 0.5)
|
|
self.assertAllEqual(layer(x, training=True), x)
|
|
|
|
def test_activity_regularizer_string(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
pass
|
|
|
|
layer = MyLayer(activity_regularizer='l2')
|
|
self.assertIsInstance(layer.activity_regularizer, regularizers.L2)
|
|
|
|
def test_tf_module_tracking(self):
|
|
|
|
class MyModule(module.Module):
|
|
|
|
def __init__(self):
|
|
super(MyModule, self).__init__()
|
|
self.v1 = variables.Variable(1., trainable=True, name='v1')
|
|
self.v2 = variables.Variable(2., trainable=False, name='v2')
|
|
|
|
def __call__(self, x):
|
|
return x * self.v1 * self.v2
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def __init__(self, **kwargs):
|
|
super(MyLayer, self).__init__(self, **kwargs)
|
|
self.my_modules = {}
|
|
self.my_modules['a'] = MyModule()
|
|
|
|
def call(self, x):
|
|
return self.my_modules['a'](x)
|
|
|
|
layer = MyLayer()
|
|
self.assertLen(layer.variables, 2)
|
|
self.assertLen(layer.trainable_variables, 1)
|
|
self.assertLen(layer.non_trainable_variables, 1)
|
|
|
|
layer.trainable = False
|
|
self.assertLen(layer.variables, 2)
|
|
self.assertLen(layer.trainable_variables, 0)
|
|
self.assertLen(layer.non_trainable_variables, 2)
|
|
|
|
class MyModel(training_lib.Model):
|
|
|
|
def __init__(self):
|
|
super(MyModel, self).__init__()
|
|
self.my_modules = []
|
|
self.my_modules.append(MyModule())
|
|
|
|
def call(self, x):
|
|
return self.my_modules[0](x)
|
|
|
|
model = MyModel()
|
|
self.assertLen(model.variables, 2)
|
|
self.assertLen(model.trainable_variables, 1)
|
|
self.assertLen(model.non_trainable_variables, 1)
|
|
|
|
model.trainable = False
|
|
self.assertLen(model.variables, 2)
|
|
self.assertLen(model.trainable_variables, 0)
|
|
self.assertLen(model.non_trainable_variables, 2)
|
|
|
|
|
|
class SymbolicSupportTest(keras_parameterized.TestCase):
|
|
|
|
def test_using_symbolic_tensors_with_tf_ops(self):
|
|
# Single-input.
|
|
x = input_layer.Input((3,))
|
|
math_ops.square(x)
|
|
|
|
# Multi-inputs.
|
|
x1, x2 = input_layer.Input((3,)), input_layer.Input((3,))
|
|
array_ops.concat([x1, x2], axis=1)
|
|
|
|
# Mixing Keras symbolic tensors and graph tensors from the same graph works.
|
|
with backend.get_graph().as_default():
|
|
x1 = input_layer.Input((3,))
|
|
x2 = input_layer.Input((3,))
|
|
math_ops.matmul(x1, x2)
|
|
|
|
# Creating same op type (matmul) multiple times in the Keras graph works.
|
|
x1 = input_layer.Input((3,))
|
|
x2 = input_layer.Input((3,))
|
|
math_ops.matmul(x1, x2)
|
|
|
|
def test_mixing_eager_and_graph_tensors(self):
|
|
with ops.Graph().as_default():
|
|
x1 = array_ops.ones((3, 3))
|
|
x2 = array_ops.ones((3, 3))
|
|
with self.assertRaisesRegex(TypeError, 'Graph tensors'):
|
|
math_ops.matmul(x1, x2)
|
|
|
|
def test_mixing_numpy_arrays_and_graph_tensors(self):
|
|
with ops.Graph().as_default():
|
|
x1 = array_ops.ones((3, 3))
|
|
x2 = np.ones((3, 3), dtype='float32')
|
|
with self.assertRaisesRegex(TypeError, 'Graph tensors'):
|
|
math_ops.matmul(x1, x2)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_mixing_keras_symbolic_tensors_and_eager_tensors(self):
|
|
x1 = input_layer.Input((3,))
|
|
x2 = array_ops.ones((3, 3))
|
|
y = math_ops.matmul(x1, x2)
|
|
|
|
fn = backend.function(inputs=[x1], outputs=[y])
|
|
x_val = np.random.random((3, 3))
|
|
y_val = np.ones((3, 3))
|
|
self.assertAllClose(fn([x_val])[0],
|
|
np.matmul(x_val, y_val),
|
|
atol=1e-5)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_mixing_keras_symbolic_tensors_and_numpy_arrays(self):
|
|
x1 = input_layer.Input((3,))
|
|
x2 = np.ones((3, 3), dtype='float32')
|
|
y = math_ops.matmul(x1, x2)
|
|
|
|
fn = backend.function(inputs=[x1], outputs=[y])
|
|
x_val = np.random.random((3, 3))
|
|
y_val = np.ones((3, 3))
|
|
self.assertAllClose(fn([x_val])[0],
|
|
np.matmul(x_val, y_val),
|
|
atol=1e-5)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_reraising_exception(self):
|
|
# When layer is not dynamic, we have some pattern matching during exception
|
|
# handling to detect when the user is trying to use python control flow.
|
|
# When an exception is thrown but the pattern doesn't match, we want to
|
|
# preserve the originating stack trace. An early implementation of this
|
|
# logic lost the stack trace. We test the correct behavior here.
|
|
|
|
class TypeErrorLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs):
|
|
def easily_identifiable_name():
|
|
raise TypeError('Non-matching TypeError message.')
|
|
easily_identifiable_name()
|
|
|
|
inputs = input_layer.Input((3,))
|
|
|
|
try:
|
|
_ = TypeErrorLayer()(inputs)
|
|
except TypeError as e:
|
|
if hasattr(e, 'ag_error_metadata'):
|
|
self.assertIn('easily_identifiable_name', str(e))
|
|
# See ErrorMetadataBase in autograph/pyct/errors.py
|
|
function_name = e.ag_error_metadata.translated_stack[-1].function_name
|
|
else:
|
|
tb = traceback.extract_tb(sys.exc_info()[2])
|
|
last_entry = tb[-1]
|
|
function_name = last_entry[2]
|
|
self.assertEqual(function_name, 'easily_identifiable_name')
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_summaries_in_tf_function(self):
|
|
if not context.executing_eagerly():
|
|
return
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs):
|
|
summary_ops_v2.scalar('mean', math_ops.reduce_mean(inputs))
|
|
return inputs
|
|
|
|
tmp_dir = self.get_temp_dir()
|
|
writer = summary_ops_v2.create_file_writer_v2(tmp_dir)
|
|
with writer.as_default(), summary_ops_v2.record_if(True):
|
|
my_layer = MyLayer()
|
|
x = array_ops.ones((10, 10))
|
|
|
|
def my_fn(x):
|
|
return my_layer(x)
|
|
|
|
_ = my_fn(x)
|
|
|
|
event_file = gfile.Glob(os.path.join(tmp_dir, 'events*'))
|
|
self.assertLen(event_file, 1)
|
|
event_file = event_file[0]
|
|
tags = set()
|
|
for e in summary_iterator.summary_iterator(event_file):
|
|
for val in e.summary.value:
|
|
tags.add(val.tag)
|
|
self.assertEqual(set(['my_layer/mean']), tags)
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
def test_error_when_passing_non_tensor(self):
|
|
# layers that have an `input_spec` will raise an error when called on
|
|
# non-tensors. This covers all built-in layers.
|
|
layer = layers.Dense(3)
|
|
x = object()
|
|
with self.assertRaisesRegex(TypeError, r'should be tensors'):
|
|
layer(x)
|
|
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
class NestedTrackingTest(test.TestCase):
|
|
|
|
def test_nested_layer_variable_tracking(self):
|
|
# Test that variables from nested sublayers are
|
|
# being tracked by subclassed layers.
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
super(MyLayer, self).__init__()
|
|
self.dense1 = layers.Dense(1)
|
|
self.dense2 = layers.BatchNormalization()
|
|
|
|
def build(self, input_shape):
|
|
self.v1 = self.add_weight('v1', shape=input_shape[1:].as_list())
|
|
self.v2 = variables.Variable(
|
|
name='v2',
|
|
initial_value=np.zeros(input_shape[1:].as_list(), dtype='float32'),
|
|
trainable=False)
|
|
|
|
def call(self, inputs):
|
|
x = self.dense1(inputs) + self.dense2(inputs)
|
|
return x + self.v1 + self.v2
|
|
|
|
layer = MyLayer()
|
|
inputs = input_layer.Input((1,))
|
|
_ = layer(inputs)
|
|
|
|
self.assertEqual(len(layer.weights), 8)
|
|
self.assertEqual(len(layer.trainable_weights), 5)
|
|
self.assertEqual(len(layer.non_trainable_weights), 3)
|
|
|
|
layer.dense1.trainable = False
|
|
self.assertEqual(len(layer.weights), 8)
|
|
self.assertEqual(len(layer.trainable_weights), 3)
|
|
self.assertEqual(len(layer.non_trainable_weights), 5)
|
|
|
|
layer.trainable = False
|
|
self.assertEqual(len(layer.weights), 8)
|
|
self.assertEqual(len(layer.trainable_weights), 0)
|
|
self.assertEqual(len(layer.non_trainable_weights), 8)
|
|
self.assertEqual(
|
|
{id(v) for v in [layer.dense1, layer.dense2, layer.v1, layer.v2]},
|
|
{id(v) for _, v in layer._checkpoint_dependencies})
|
|
|
|
def test_nested_layer_updates_losses_tracking(self):
|
|
# Test that updates and losses from nested sublayers are
|
|
# being tracked by subclassed layers.
|
|
|
|
class UpdateAndLossLayer(base_layer.Layer):
|
|
|
|
def build(self, _):
|
|
self.v1 = self.add_weight('v1', shape=())
|
|
|
|
def call(self, inputs):
|
|
self.add_loss(math_ops.reduce_sum(inputs))
|
|
self.add_update(state_ops.assign_add(self.v1, 1))
|
|
return inputs + 1
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def build(self, _):
|
|
self.v1 = self.add_weight('v1', shape=())
|
|
|
|
def __init__(self):
|
|
super(MyLayer, self).__init__()
|
|
self.ul1 = UpdateAndLossLayer()
|
|
self.ul2 = UpdateAndLossLayer()
|
|
|
|
def call(self, inputs):
|
|
self.add_loss(math_ops.reduce_sum(inputs))
|
|
self.add_update(state_ops.assign_add(self.v1, 1))
|
|
x = self.ul1(inputs)
|
|
return self.ul2(x)
|
|
|
|
layer = MyLayer()
|
|
|
|
if context.executing_eagerly():
|
|
inputs = array_ops.ones((3, 1))
|
|
_ = layer(inputs)
|
|
self.assertEqual(len(layer.losses), 3)
|
|
self.assertLen(layer.get_losses_for(None), 3)
|
|
else:
|
|
inputs = input_layer.Input((1,))
|
|
_ = layer(inputs)
|
|
self.assertEqual(len(layer.losses), 3)
|
|
self.assertEqual(len(layer.updates), 3)
|
|
self.assertLen(layer.get_losses_for(None), 3)
|
|
|
|
def test_attribute_reassignment(self):
|
|
l = base_layer.Layer()
|
|
l.a = base_layer.Layer()
|
|
l.a = []
|
|
l.a = variables.Variable(1.)
|
|
l.a = base_layer.Layer()
|
|
last_assignment = base_layer.Layer()
|
|
l.a = last_assignment
|
|
l.b = variables.Variable(1.)
|
|
del l.b
|
|
l.c = base_layer.Layer()
|
|
del l.c
|
|
l.d = last_assignment
|
|
del l.d
|
|
sublayers = list(l._flatten_layers(include_self=False, recursive=False))
|
|
self.assertEqual([last_assignment], sublayers)
|
|
self.assertEqual([], l.trainable_weights)
|
|
self.assertEqual([], l.non_trainable_weights)
|
|
self.assertEqual([], l.weights)
|
|
del l.a
|
|
self.assertEqual([], l._self_tracked_trackables)
|
|
|
|
def test_assign_op_not_tracked_as_variable(self):
|
|
|
|
class LayerWithAssignAttr(base_layer.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.v = variables.Variable(1.)
|
|
self.v_assign = self.v.assign_add(2.)
|
|
|
|
layer = LayerWithAssignAttr()
|
|
layer.build((10, 10))
|
|
|
|
self.assertEqual([layer.v], layer.variables)
|
|
|
|
def test_layer_class_not_tracked_as_sublayer(self):
|
|
# See https://github.com/tensorflow/tensorflow/issues/27431 for details.
|
|
|
|
class LayerWithClassAttribute(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
super(LayerWithClassAttribute, self).__init__()
|
|
self.layer_fn = layers.Dense
|
|
|
|
layer = LayerWithClassAttribute()
|
|
self.assertEmpty(layer.variables)
|
|
self.assertEmpty(layer.submodules)
|
|
|
|
def test_layer_call_fn_args(self):
|
|
|
|
class NonDefunLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs, a, mask, b=None, training=None):
|
|
return inputs
|
|
|
|
class DefunLayer(base_layer.Layer):
|
|
|
|
@def_function.function
|
|
def call(self, x, mask, a, training=None, b=None):
|
|
return x
|
|
|
|
nondefun_layer = NonDefunLayer()
|
|
self.assertEqual(nondefun_layer._call_fn_args,
|
|
['inputs', 'a', 'mask', 'b', 'training'])
|
|
defun_layer = DefunLayer()
|
|
self.assertEqual(defun_layer._call_fn_args,
|
|
['x', 'mask', 'a', 'training', 'b'])
|
|
|
|
def test_sequential_model(self):
|
|
model = sequential.Sequential(
|
|
[layers.Dense(10, input_shape=(10,)),
|
|
layers.Dense(5)])
|
|
self.assertLen(model.layers, 2)
|
|
self.assertLen(model.weights, 4)
|
|
|
|
# Make sure a subclass model also works when it is called 'Sequential'.
|
|
class Sequential(training_lib.Model):
|
|
|
|
def __init__(self):
|
|
super(Sequential, self).__init__()
|
|
self.dense_layers = [layers.Dense(10), layers.Dense(5)]
|
|
|
|
def call(self, inputs):
|
|
x = inputs
|
|
for d in self.dense_layers:
|
|
x = d(x)
|
|
return x
|
|
|
|
s = Sequential()
|
|
self.assertLen(s.layers, 2)
|
|
self.assertLen(s.weights, 0)
|
|
|
|
s(input_layer.Input((10,)))
|
|
self.assertLen(s.weights, 4)
|
|
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
class NameScopingTest(keras_parameterized.TestCase):
|
|
|
|
def test_name_scope_layer(self):
|
|
x = backend.placeholder(shape=(10, 10))
|
|
layer = layers.Dense(10, name='MyName')
|
|
layer(x)
|
|
self.assertEqual(layer.bias.name, 'MyName/bias:0')
|
|
self.assertEqual(layer.kernel.name, 'MyName/kernel:0')
|
|
|
|
def test_name_scope_functional_api(self):
|
|
inputs = input_layer.Input((3,))
|
|
layer = layers.Dense(10, name='MyName')
|
|
_ = layer(inputs)
|
|
self.assertEqual(layer.bias.name, 'MyName/bias:0')
|
|
self.assertEqual(layer.kernel.name, 'MyName/kernel:0')
|
|
|
|
def test_name_scope_functional_api_nested(self):
|
|
|
|
class NestedLayer(base_layer.Layer):
|
|
|
|
def __init__(self, name='OuterName'):
|
|
super(NestedLayer, self).__init__(name=name)
|
|
self.dense = layers.Dense(10, name='InnerName')
|
|
|
|
def call(self, inputs):
|
|
return self.dense(inputs)
|
|
|
|
inputs = input_layer.Input((3,))
|
|
layer = NestedLayer()
|
|
_ = layer(inputs)
|
|
self.assertEqual(layer.dense.bias.name, 'OuterName/InnerName/bias:0')
|
|
self.assertEqual(layer.dense.kernel.name, 'OuterName/InnerName/kernel:0')
|
|
|
|
def test_name_scope_sublayer(self):
|
|
|
|
class NameScopeTracker(base_layer.Layer):
|
|
|
|
def call(self, inputs):
|
|
self.active_name_scope = ops.get_name_scope()
|
|
return inputs
|
|
|
|
x = backend.placeholder(shape=(10, 10))
|
|
sublayer = NameScopeTracker(name='Sublayer')
|
|
layer = layers.Dense(10, activation=sublayer, name='MyName2')
|
|
layer(x)
|
|
self.assertEqual(layer.bias.name, 'MyName2/bias:0')
|
|
self.assertEqual(layer.kernel.name, 'MyName2/kernel:0')
|
|
self.assertEqual(sublayer.active_name_scope, 'MyName2/Sublayer')
|
|
|
|
def test_name_scope_tf_tensor(self):
|
|
x = ops.convert_to_tensor_v2_with_dispatch(np.ones((10, 10)))
|
|
layer = layers.Dense(
|
|
10, activation=layers.ReLU(name='MyAct'), name='MyName3')
|
|
layer(x)
|
|
self.assertEqual(layer.bias.name, 'MyName3/bias:0')
|
|
self.assertEqual(layer.kernel.name, 'MyName3/kernel:0')
|
|
|
|
|
|
@combinations.generate(combinations.keras_mode_combinations(mode=['eager']))
|
|
class AutographControlFlowTest(keras_parameterized.TestCase):
|
|
|
|
def test_disabling_in_context_is_matched(self):
|
|
|
|
test_obj = self
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
with test_obj.assertRaisesRegex(TypeError, 'Tensor.*as.*bool'):
|
|
if constant_op.constant(False):
|
|
return inputs * 1.
|
|
return inputs * 0.
|
|
|
|
@def_function.function(autograph=False)
|
|
def test_fn():
|
|
return MyLayer()(constant_op.constant([[1., 2., 3.]]))
|
|
|
|
test_fn()
|
|
|
|
def test_if_training_pattern_output(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
if training:
|
|
return inputs * 1.
|
|
return inputs * 0.
|
|
|
|
inputs = input_layer.Input((3,))
|
|
outputs = MyLayer()(inputs)
|
|
model = training_lib.Model(inputs, outputs)
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
train_loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
|
|
self.assertEqual(train_loss, 0.)
|
|
test_loss = model.test_on_batch(np.ones((2, 3)), np.ones((2, 3)))
|
|
self.assertEqual(test_loss, 1.)
|
|
|
|
def test_if_training_pattern_loss(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
if training:
|
|
loss = math_ops.reduce_sum(inputs)
|
|
else:
|
|
loss = 0.
|
|
self.add_loss(loss)
|
|
return inputs
|
|
|
|
inputs = input_layer.Input((3,))
|
|
outputs = MyLayer()(inputs)
|
|
model = training_lib.Model(inputs, outputs)
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
train_loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
|
|
self.assertEqual(train_loss, 2 * 3)
|
|
test_loss = model.test_on_batch(np.ones((2, 3)), np.ones((2, 3)))
|
|
self.assertEqual(test_loss, 0)
|
|
|
|
def test_if_training_pattern_metric(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs, training=None):
|
|
if training:
|
|
metric = math_ops.reduce_sum(inputs)
|
|
else:
|
|
metric = 0.
|
|
self.add_metric(metric, name='my_metric', aggregation='mean')
|
|
return inputs
|
|
|
|
inputs = input_layer.Input((3,))
|
|
outputs = MyLayer()(inputs)
|
|
model = training_lib.Model(inputs, outputs)
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
for _ in range(3):
|
|
_, train_metric = model.train_on_batch(np.ones((2, 3)),
|
|
np.ones((2, 3)))
|
|
|
|
self.assertEqual(train_metric, 2 * 3)
|
|
_, test_metric = model.test_on_batch(np.ones((2, 3)),
|
|
np.ones((2, 3)))
|
|
self.assertEqual(test_metric, 0)
|
|
|
|
def test_if_training_pattern_update(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def build(self, input_shape):
|
|
self.counter = self.add_weight(
|
|
shape=(), trainable=False, initializer='zeros')
|
|
|
|
def call(self, inputs, training=None):
|
|
if training:
|
|
increment = 1.
|
|
else:
|
|
increment = 0.
|
|
self.counter.assign_add(increment)
|
|
return inputs
|
|
|
|
inputs = input_layer.Input((3,))
|
|
layer = MyLayer()
|
|
outputs = layer(inputs)
|
|
model = training_lib.Model(inputs, outputs)
|
|
model.compile(
|
|
'sgd',
|
|
'mse',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
|
|
self.assertEqual(backend.get_value(layer.counter), 1.)
|
|
|
|
def test_conditional_losses_in_call(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
super(MyLayer,
|
|
self).__init__(dynamic=testing_utils.should_run_eagerly())
|
|
|
|
def call(self, inputs, training=None):
|
|
if training:
|
|
self.add_loss(math_ops.reduce_sum(inputs))
|
|
return inputs
|
|
|
|
def compute_output_shape(self, input_shape):
|
|
return input_shape
|
|
|
|
inputs = input_layer.Input((3,))
|
|
layer = MyLayer()
|
|
outputs = layer(inputs)
|
|
model = training_lib.Model(inputs, outputs)
|
|
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
|
|
self.assertEqual(loss, 2 * 3)
|
|
|
|
def test_conditional_callable_losses(self):
|
|
model = sequential.Sequential([
|
|
layers.Dense(
|
|
1, kernel_regularizer=regularizers.l2(1e-4), input_shape=(1,))
|
|
])
|
|
model._run_eagerly = testing_utils.should_run_eagerly()
|
|
|
|
def assert_graph(t):
|
|
if not context.executing_eagerly():
|
|
self.assertEqual(t.graph, ops.get_default_graph())
|
|
|
|
@def_function.function
|
|
def get_losses(t):
|
|
if t < 0:
|
|
return math_ops.reduce_sum(model.losses) * t
|
|
else:
|
|
return math_ops.reduce_sum(model.losses)
|
|
|
|
assert_graph(get_losses(constant_op.constant(2.)))
|
|
assert_graph(get_losses(constant_op.constant(0.5)))
|
|
|
|
def test_conditional_metrics_in_call(self):
|
|
|
|
class MyLayer(base_layer.Layer):
|
|
|
|
def __init__(self):
|
|
super(MyLayer,
|
|
self).__init__(dynamic=testing_utils.should_run_eagerly())
|
|
|
|
def call(self, inputs, training=None):
|
|
if training:
|
|
self.add_metric(math_ops.reduce_sum(inputs),
|
|
name='sum',
|
|
aggregation='mean')
|
|
return inputs
|
|
|
|
def compute_output_shape(self, input_shape):
|
|
return input_shape
|
|
|
|
inputs = input_layer.Input((3,))
|
|
layer = MyLayer()
|
|
outputs = layer(inputs)
|
|
model = training_lib.Model(inputs, outputs)
|
|
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
|
|
history = model.fit(np.ones((2, 3)), np.ones((2, 3)))
|
|
self.assertEqual(history.history['sum'][-1], 2 * 3)
|
|
|
|
def test_conditional_activity_regularizer_in_call(self):
|
|
|
|
class TestModel(training_lib.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(
|
|
name='test_model', dynamic=testing_utils.should_run_eagerly())
|
|
self.layer = layers.Dense(2, activity_regularizer='l2')
|
|
|
|
def call(self, x, training=None):
|
|
if math_ops.greater(math_ops.reduce_sum(x), 0.0):
|
|
return self.layer(x)
|
|
else:
|
|
return self.layer(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer='sgd',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones(shape=(10, 1))
|
|
y = np.ones(shape=(10, 2))
|
|
|
|
if testing_utils.should_run_eagerly():
|
|
model.fit(x, y, epochs=2, batch_size=5)
|
|
else:
|
|
with self.assertRaisesRegex(ValueError, 'ActivityRegularizer'):
|
|
model.fit(x, y, epochs=2, batch_size=5)
|
|
|
|
def test_conditional_activity_regularizer_with_wrappers_in_call(self):
|
|
|
|
class TestModel(training_lib.Model):
|
|
|
|
def __init__(self):
|
|
super(TestModel, self).__init__(
|
|
name='test_model', dynamic=testing_utils.should_run_eagerly())
|
|
self.layer = layers.TimeDistributed(
|
|
layers.Dense(2, activity_regularizer='l2'), input_shape=(3, 4))
|
|
|
|
def call(self, x, training=None):
|
|
if math_ops.greater(math_ops.reduce_sum(x), 0.0):
|
|
return self.layer(x)
|
|
else:
|
|
return self.layer(x)
|
|
|
|
model = TestModel()
|
|
model.compile(
|
|
loss='mse',
|
|
optimizer='sgd',
|
|
run_eagerly=testing_utils.should_run_eagerly())
|
|
|
|
x = np.ones(shape=(10, 3, 4))
|
|
y = np.ones(shape=(10, 3, 2))
|
|
|
|
if testing_utils.should_run_eagerly():
|
|
model.fit(x, y, epochs=2, batch_size=5)
|
|
else:
|
|
with self.assertRaisesRegex(ValueError, 'ActivityRegularizer'):
|
|
model.fit(x, y, epochs=2, batch_size=5)
|
|
|
|
|
|
class AddLayer(base_layer.Layer):
|
|
"""A layer which adds its input to a variable.
|
|
|
|
Useful for testing a layer with a variable
|
|
"""
|
|
|
|
def build(self, _):
|
|
self.v = self.add_weight('v', (), initializer='ones')
|
|
self.built = True
|
|
|
|
def call(self, inputs):
|
|
return inputs + self.v
|
|
|
|
|
|
class IdentityLayer(base_layer.Layer):
|
|
"""A layer that returns its input.
|
|
|
|
Useful for testing a layer without a variable.
|
|
"""
|
|
|
|
def call(self, inputs):
|
|
return inputs
|
|
|
|
|
|
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
|
|
class DTypeTest(keras_parameterized.TestCase):
|
|
|
|
# This class only have tests relating to layer.dtype. Tests for dtype policies
|
|
# are in mixed_precision/keras_test.py
|
|
|
|
# TODO(reedwm): Maybe have a separate test file for input casting tests.
|
|
|
|
def _const(self, dtype):
|
|
return array_ops.constant(1, dtype=dtype)
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_dtype_defaults_to_floatx(self):
|
|
layer = AddLayer()
|
|
self.assertEqual(layer.dtype, 'float32')
|
|
layer(self._const('float64'))
|
|
self.assertEqual(layer.dtype, 'float32') # dtype should not change
|
|
|
|
try:
|
|
backend.set_floatx('float64')
|
|
layer = AddLayer()
|
|
self.assertEqual(layer.dtype, 'float64')
|
|
finally:
|
|
backend.set_floatx('float32')
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_passing_dtype_to_constructor(self):
|
|
layer = IdentityLayer(dtype='float64')
|
|
layer(self._const('float32'))
|
|
self.assertEqual(layer.dtype, 'float64')
|
|
|
|
layer = IdentityLayer(dtype='int32')
|
|
layer(self._const('float32'))
|
|
self.assertEqual(layer.dtype, 'int32')
|
|
|
|
layer = IdentityLayer(dtype=dtypes.float64)
|
|
layer(self._const('float32'))
|
|
self.assertEqual(layer.dtype, 'float64')
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def input_cast_to_dtype(self):
|
|
layer = AddLayer()
|
|
|
|
# Input should be cast to layer.dtype, so output should also be layer.dtype
|
|
self.assertEqual(layer(self._const('float64')).dtype, 'float32')
|
|
|
|
layer = AddLayer(dtype='float64')
|
|
self.assertEqual(layer(self._const('float32')).dtype, 'float64')
|
|
|
|
# Test inputs are not casted if layer.dtype is not floating-point
|
|
layer = IdentityLayer(dtype='int32')
|
|
self.assertEqual(layer(self._const('float64')).dtype, 'float64')
|
|
|
|
# Test inputs are not casted if the inputs are not floating-point
|
|
layer = IdentityLayer(dtype='float32')
|
|
self.assertEqual(layer(self._const('int32')).dtype, 'int32')
|
|
|
|
# Test Numpy arrays are casted
|
|
layer = IdentityLayer(dtype='float64')
|
|
self.assertEqual(layer(np.array(1, dtype='float32')).dtype, 'float64')
|
|
|
|
# Test Python floats are casted
|
|
layer = IdentityLayer(dtype='float64')
|
|
self.assertEqual(layer(1.).dtype, 'float64')
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def multiple_inputs_cast_to_dtype(self):
|
|
|
|
class MultiIdentityLayer(base_layer.Layer):
|
|
|
|
def call(self, inputs):
|
|
return [array_ops.identity(x) for x in inputs]
|
|
|
|
# Testing layer with default dtype of float32
|
|
layer = MultiIdentityLayer()
|
|
x, y = layer([self._const('float16'), self._const('float32')])
|
|
self.assertEqual(x.dtype, 'float32')
|
|
self.assertEqual(y.dtype, 'float32')
|
|
|
|
# Test passing dtype to the constructor
|
|
layer = MultiIdentityLayer(dtype='float64')
|
|
x, y = layer([self._const('float16'), self._const('float32')])
|
|
self.assertEqual(x.dtype, 'float64')
|
|
self.assertEqual(y.dtype, 'float64')
|
|
|
|
# Test several non-floating point types
|
|
layer = MultiIdentityLayer(dtype='float64')
|
|
x, y, z, w = layer([self._const('float16'), self._const('bool'),
|
|
self._const('float64'), self._constant('complex64')])
|
|
self.assertEqual(x.dtype, 'float64')
|
|
self.assertEqual(y.dtype, 'bool')
|
|
self.assertEqual(z.dtype, 'float64')
|
|
self.assertEqual(w.dtype, 'complex64')
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_extra_args_and_kwargs_not_casted(self):
|
|
|
|
class IdentityLayerWithArgs(base_layer.Layer):
|
|
|
|
def call(self, inputs, *args, **kwargs):
|
|
kwargs.pop('training', None)
|
|
return nest.flatten([inputs, args, kwargs])
|
|
|
|
layer = IdentityLayerWithArgs(dtype='float64')
|
|
x, y, z = layer(self._const('float16'), self._const('float16'),
|
|
kwarg=self._const('float16'))
|
|
self.assertEqual(x.dtype, 'float64')
|
|
self.assertEqual(y.dtype, 'float16')
|
|
self.assertEqual(z.dtype, 'float16')
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_layer_without_autocast(self):
|
|
|
|
class IdentityLayerWithoutAutocast(IdentityLayer):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['autocast'] = False
|
|
super(IdentityLayerWithoutAutocast, self).__init__(*args, **kwargs)
|
|
|
|
layer = IdentityLayerWithoutAutocast(dtype='float64')
|
|
self.assertEqual(layer(self._const('float32')).dtype, 'float32')
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_compute_output_signature(self):
|
|
|
|
class IdentityLayerWithOutputShape(IdentityLayer):
|
|
|
|
def compute_output_shape(self, input_shape):
|
|
return input_shape
|
|
|
|
layer = IdentityLayerWithOutputShape(dtype='float64')
|
|
output_signature = layer.compute_output_signature(
|
|
tensor_spec.TensorSpec(shape=(), dtype='float32'))
|
|
self.assertEqual(output_signature.shape, ())
|
|
self.assertEqual(output_signature.dtype, 'float64')
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_composite_tensors_input_casting(self):
|
|
sparse = sparse_tensor.SparseTensor(
|
|
indices=array_ops.constant([[0, 1], [2, 3]], dtype='int64'),
|
|
values=array_ops.constant([0., 1.], dtype='float32'),
|
|
dense_shape=array_ops.constant([4, 4], dtype='int64'))
|
|
ragged = ragged_tensor.RaggedTensor.from_row_splits(
|
|
values=array_ops.constant([1., 2., 3.], dtype='float32'),
|
|
row_splits=array_ops.constant([0, 2, 2, 3], dtype='int64'))
|
|
|
|
layer = IdentityLayer(dtype='float16')
|
|
|
|
for x in sparse, ragged:
|
|
self.assertEqual(x.dtype, 'float32')
|
|
y = layer(x)
|
|
self.assertEqual(y.dtype, 'float16')
|
|
self.assertEqual(type(x), type(y))
|
|
|
|
@testing_utils.enable_v2_dtype_behavior
|
|
def test_passing_non_tensor(self):
|
|
layer = IdentityLayer()
|
|
x = object()
|
|
y = layer(x) # Layer should not cast 'x', as it's not a tensor
|
|
self.assertIs(x, y)
|
|
|
|
@testing_utils.disable_v2_dtype_behavior
|
|
def test_v1_behavior(self):
|
|
# Test dtype defaults to None and inferred from input
|
|
layer = IdentityLayer()
|
|
self.assertIsNone(layer.dtype)
|
|
layer(self._const('float64'))
|
|
self.assertEqual(layer.dtype, 'float64')
|
|
|
|
# Test layer does not cast to dtype
|
|
self.assertEqual(layer(self._const('float32')).dtype, 'float32')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
ops.enable_eager_execution()
|
|
test.main()
|