STT-tensorflow/tensorflow/python/keras/backend_test.py

2191 lines
80 KiB
Python

# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for Keras backend."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gc
from absl.testing import parameterized
import numpy as np
import scipy.sparse
from tensorflow.core.protobuf import config_pb2
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.framework import config
from tensorflow.python.framework import errors_impl
from tensorflow.python.framework import ops
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.framework import test_util
from tensorflow.python.keras import backend
from tensorflow.python.keras import combinations
from tensorflow.python.keras.engine import input_layer
from tensorflow.python.keras.layers import advanced_activations
from tensorflow.python.keras.layers import normalization
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import nn
from tensorflow.python.ops import variables
from tensorflow.python.platform import test
from tensorflow.python.util import tf_inspect
def compare_single_input_op_to_numpy(keras_op,
np_op,
input_shape,
dtype='float32',
negative_values=True,
keras_args=None,
keras_kwargs=None,
np_args=None,
np_kwargs=None):
keras_args = keras_args or []
keras_kwargs = keras_kwargs or {}
np_args = np_args or []
np_kwargs = np_kwargs or {}
inputs = 2. * np.random.random(input_shape)
if negative_values:
inputs -= 1.
keras_output = keras_op(
backend.variable(inputs, dtype=dtype), *keras_args, **keras_kwargs)
keras_output = backend.eval(keras_output)
np_output = np_op(inputs.astype(dtype), *np_args, **np_kwargs)
try:
np.testing.assert_allclose(keras_output, np_output, atol=1e-4)
except AssertionError:
raise AssertionError('Test for op `' + str(keras_op.__name__) + '` failed; '
'Expected ' + str(np_output) + ' but got ' +
str(keras_output))
def compare_two_inputs_op_to_numpy(keras_op,
np_op,
input_shape_a,
input_shape_b,
dtype='float32',
keras_args=None,
keras_kwargs=None,
np_args=None,
np_kwargs=None):
keras_args = keras_args or []
keras_kwargs = keras_kwargs or {}
np_args = np_args or []
np_kwargs = np_kwargs or {}
input_a = np.random.random(input_shape_a)
input_b = np.random.random(input_shape_b)
keras_output = keras_op(
backend.variable(input_a, dtype=dtype),
backend.variable(input_b, dtype=dtype), *keras_args, **keras_kwargs)
keras_output = backend.eval(keras_output)
np_output = np_op(input_a.astype(dtype), input_b.astype(dtype),
*np_args, **np_kwargs)
try:
np.testing.assert_allclose(keras_output, np_output, atol=1e-4)
except AssertionError:
raise AssertionError('Test for op `' + str(keras_op.__name__) + '` failed; '
'Expected ' + str(np_output) + ' but got ' +
str(keras_output))
class BackendResetTest(test.TestCase, parameterized.TestCase):
def test_new_config(self):
# User defined jit setting
config.set_optimizer_jit(False)
sess = backend.get_session()
default_config = context.context().config
self.assertEqual(
sess._config.graph_options.optimizer_options.global_jit_level,
default_config.graph_options.optimizer_options.global_jit_level)
backend.clear_session()
# New session has the same jit setting
sess = backend.get_session()
default_config = context.context().config
self.assertEqual(
sess._config.graph_options.optimizer_options.global_jit_level,
default_config.graph_options.optimizer_options.global_jit_level)
backend.clear_session()
# Change respected
config.set_optimizer_jit(True)
sess = backend.get_session()
default_config = context.context().config
self.assertEqual(
sess._config.graph_options.optimizer_options.global_jit_level,
default_config.graph_options.optimizer_options.global_jit_level)
backend.clear_session()
# We can't use the normal parameterized decorator because the test session
# will block graph clearing.
@parameterized.named_parameters(('_v1', context.graph_mode),
('_v2', context.eager_mode))
def test_new_graph(self, test_context):
with test_context():
g_old = backend.get_graph()
backend.clear_session()
g = backend.get_graph()
assert g_old is not g
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class BackendUtilsTest(test.TestCase):
def test_backend(self):
self.assertEqual(backend.backend(), 'tensorflow')
def test_get_reset_uids(self):
self.assertEqual(backend.get_uid('foo'), 1)
self.assertEqual(backend.get_uid('foo'), 2)
backend.reset_uids()
self.assertEqual(backend.get_uid('foo'), 1)
def test_learning_phase(self):
with self.cached_session() as sess:
with self.assertRaises(ValueError):
backend.set_learning_phase(2)
# Test running with a learning-phase-consuming layer
with backend.learning_phase_scope(0):
x = input_layer.Input((3,))
y = normalization.BatchNormalization()(x)
if not context.executing_eagerly():
self.evaluate(variables.global_variables_initializer())
sess.run(y, feed_dict={x: np.random.random((2, 3))})
def test_learning_phase_name(self):
with ops.name_scope('test_scope'):
# Test that outer name scopes do not affect the learning phase's name.
lp = backend.symbolic_learning_phase()
self.assertEqual(lp.name, 'keras_learning_phase:0')
def test_learning_phase_scope(self):
initial_learning_phase = backend.learning_phase()
with backend.learning_phase_scope(1):
self.assertEqual(backend.learning_phase(), 1)
self.assertEqual(backend.learning_phase(), initial_learning_phase)
with backend.learning_phase_scope(0):
self.assertEqual(backend.learning_phase(), 0)
self.assertEqual(backend.learning_phase(), initial_learning_phase)
with self.assertRaises(ValueError):
with backend.learning_phase_scope(None):
pass
self.assertEqual(backend.learning_phase(), initial_learning_phase)
new_learning_phase = 0
backend.set_learning_phase(new_learning_phase)
self.assertEqual(backend.learning_phase(), new_learning_phase)
with backend.learning_phase_scope(1):
self.assertEqual(backend.learning_phase(), 1)
self.assertEqual(backend.learning_phase(), new_learning_phase)
def test_learning_phase_scope_in_graph(self):
initial_learning_phase_outside_graph = backend.learning_phase()
with backend.get_graph().as_default():
initial_learning_phase_in_graph = backend.learning_phase()
self.assertEqual(backend.learning_phase(),
initial_learning_phase_outside_graph)
with backend.learning_phase_scope(1):
self.assertEqual(backend.learning_phase(), 1)
self.assertEqual(backend.learning_phase(),
initial_learning_phase_outside_graph)
with backend.get_graph().as_default():
self.assertIs(backend.learning_phase(), initial_learning_phase_in_graph)
self.assertEqual(backend.learning_phase(),
initial_learning_phase_outside_graph)
def test_int_shape(self):
x = backend.ones(shape=(3, 4))
self.assertEqual(backend.int_shape(x), (3, 4))
if not context.executing_eagerly():
x = backend.placeholder(shape=(None, 4))
self.assertEqual(backend.int_shape(x), (None, 4))
def test_in_train_phase(self):
y1 = backend.variable(1)
y2 = backend.variable(2)
if context.executing_eagerly():
with backend.learning_phase_scope(0):
y_val_test = backend.in_train_phase(y1, y2).numpy()
with backend.learning_phase_scope(1):
y_val_train = backend.in_train_phase(y1, y2).numpy()
else:
y = backend.in_train_phase(y1, y2)
f = backend.function([backend.learning_phase()], [y])
y_val_test = f([0])[0]
y_val_train = f([1])[0]
self.assertAllClose(y_val_test, 2)
self.assertAllClose(y_val_train, 1)
def test_is_keras_tensor(self):
x = backend.variable(1)
self.assertEqual(backend.is_keras_tensor(x), False)
x = input_layer.Input(shape=(1,))
self.assertEqual(backend.is_keras_tensor(x), True)
x = input_layer.Input(shape=(None,), ragged=True)
self.assertEqual(backend.is_keras_tensor(x), True)
x = input_layer.Input(shape=(None, None), sparse=True)
self.assertEqual(backend.is_keras_tensor(x), True)
with self.assertRaises(ValueError):
backend.is_keras_tensor(0)
def test_stop_gradient(self):
x = backend.variable(1)
y = backend.stop_gradient(x)
if not context.executing_eagerly():
self.assertEqual(y.op.name[:12], 'StopGradient')
xs = [backend.variable(1) for _ in range(3)]
ys = backend.stop_gradient(xs)
if not context.executing_eagerly():
for y in ys:
self.assertEqual(y.op.name[:12], 'StopGradient')
def test_placeholder(self):
x = backend.placeholder(shape=(3, 4))
self.assertEqual(x.shape.as_list(), [3, 4])
x = backend.placeholder(shape=(3, 4), sparse=True)
self.assertEqual(x.shape.as_list(), [3, 4])
def test_is_placeholder(self):
x = backend.placeholder(shape=(1,))
self.assertEqual(backend.is_placeholder(x), True)
x = backend.variable(1)
self.assertEqual(backend.is_placeholder(x), False)
def test_print_tensor(self):
# Unfortunately it seems impossible to use `mock` (or any other method)
# to capture stdout when used inside a graph or graph function, thus
# we cannot test correctness.
# The message gets correctly printed in practice.
x = backend.placeholder(shape=())
y = backend.print_tensor(x, 'eager=%s' % context.executing_eagerly())
f = backend.function(x, y)
f(0)
def test_cast_to_floatx(self):
x = backend.variable(1, dtype='float64')
x = backend.cast_to_floatx(x)
self.assertEqual(x.dtype.name, 'float32')
x = backend.cast_to_floatx(2)
self.assertEqual(x.dtype.name, 'float32')
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class BackendVariableTest(test.TestCase):
def test_zeros(self):
x = backend.zeros((3, 4))
val = backend.eval(x)
self.assertAllClose(val, np.zeros((3, 4)))
def test_ones(self):
x = backend.ones((3, 4))
val = backend.eval(x)
self.assertAllClose(val, np.ones((3, 4)))
def test_eye(self):
x = backend.eye(4)
val = backend.eval(x)
self.assertAllClose(val, np.eye(4))
def test_zeros_like(self):
x = backend.zeros((3, 4))
y = backend.zeros_like(x)
val = backend.eval(y)
self.assertAllClose(val, np.zeros((3, 4)))
def test_ones_like(self):
x = backend.zeros((3, 4))
y = backend.ones_like(x)
val = backend.eval(y)
self.assertAllClose(val, np.ones((3, 4)))
def test_random_uniform_variable(self):
x = backend.random_uniform_variable((30, 20), low=1, high=2, seed=0)
val = backend.eval(x)
self.assertAllClose(val.mean(), 1.5, atol=1e-1)
self.assertAllClose(val.max(), 2., atol=1e-1)
self.assertAllClose(val.min(), 1., atol=1e-1)
def test_random_normal_variable(self):
x = backend.random_normal_variable((30, 20), 1., 0.5, seed=0)
val = backend.eval(x)
self.assertAllClose(val.mean(), 1., atol=1e-1)
self.assertAllClose(val.std(), 0.5, atol=1e-1)
def test_count_params(self):
x = backend.zeros((4, 5))
val = backend.count_params(x)
self.assertAllClose(val, 20)
def test_constant(self):
ref_val = np.random.random((3, 4)).astype('float32')
x = backend.constant(ref_val)
val = backend.eval(x)
self.assertAllClose(val, ref_val)
def test_sparse_variable(self):
val = scipy.sparse.eye(10)
x = backend.variable(val)
self.assertTrue(isinstance(x, sparse_tensor.SparseTensor))
y = backend.to_dense(x)
self.assertFalse(backend.is_sparse(y))
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class BackendLinearAlgebraTest(test.TestCase, parameterized.TestCase):
def test_dot(self):
x = backend.ones(shape=(2, 3))
y = backend.ones(shape=(3, 4))
xy = backend.dot(x, y)
self.assertEqual(xy.shape.as_list(), [2, 4])
x = backend.ones(shape=(32, 28, 3))
y = backend.ones(shape=(3, 4))
xy = backend.dot(x, y)
self.assertEqual(xy.shape.as_list(), [32, 28, 4])
@parameterized.parameters(
[(2, 3, 4, 5), (2, 5, 6, 7), (2, 3, 4, 6, 7), (3, 1)],
[(2, 20, 1), (2, 30, 20), (2, 1, 30), (1, 2)],
[(4, 2, 3), (4, 5, 3), (4, 2, 5), (2, 2)],
[(4, 2), (4, 2, 3), (4, 3), (1, 1)],
[(4, 2), (4, 2, 3), (4, 3), 1],
[(4, 2, 3), (4, 3), (4, 2), (2, 1)],
)
def test_batch_dot(self, x_shape, y_shape, output_shape, axes):
x_val = np.random.random(x_shape)
y_val = np.random.random(y_shape)
x = backend.variable(x_val)
y = backend.variable(y_val)
xy = backend.batch_dot(x, y, axes=axes)
self.assertEqual(tuple(xy.shape.as_list()), output_shape)
xy_val = backend.eval(xy)
ref_val = self._reference_batch_dot(x_val, y_val, axes)
self.assertAllClose(xy_val, ref_val, atol=1e-5)
def _reference_batch_dot(self, x, y, axes):
if isinstance(axes, int):
axes = [axes, axes]
elif isinstance(axes, tuple):
axes = list(axes)
if axes is None:
if y.ndim == 2:
axes = [x.ndim - 1, y.ndim - 1]
else:
axes = [x.ndim - 1, y.ndim - 2]
if axes[0] < 0:
axes[0] += x.ndim
if axes[1] < 0:
axes[1] += y.ndim
result = []
axes = [axes[0] - 1, axes[1] - 1]
for xi, yi in zip(x, y):
result.append(np.tensordot(xi, yi, axes))
result = np.array(result)
if result.ndim == 1:
result = np.expand_dims(result, -1)
return result
def test_reduction_ops(self):
ops_to_test = [
(backend.max, np.max),
(backend.min, np.min),
(backend.sum, np.sum),
(backend.prod, np.prod),
(backend.var, np.var),
(backend.std, np.std),
(backend.mean, np.mean),
(backend.argmin, np.argmin),
(backend.argmax, np.argmax),
]
for keras_op, np_op in ops_to_test:
compare_single_input_op_to_numpy(keras_op, np_op, input_shape=(4, 7, 5),
keras_kwargs={'axis': 1},
np_kwargs={'axis': 1})
compare_single_input_op_to_numpy(keras_op, np_op, input_shape=(4, 7, 5),
keras_kwargs={'axis': -1},
np_kwargs={'axis': -1})
if 'keepdims' in tf_inspect.getargspec(keras_op).args:
compare_single_input_op_to_numpy(keras_op, np_op,
input_shape=(4, 7, 5),
keras_kwargs={'axis': 1,
'keepdims': True},
np_kwargs={'axis': 1,
'keepdims': True})
def test_elementwise_ops(self):
ops_to_test = [
(backend.square, np.square),
(backend.abs, np.abs),
(backend.round, np.round),
(backend.sign, np.sign),
(backend.sin, np.sin),
(backend.cos, np.cos),
(backend.exp, np.exp),
]
for keras_op, np_op in ops_to_test:
compare_single_input_op_to_numpy(keras_op, np_op, input_shape=(4, 7))
ops_to_test = [
(backend.sqrt, np.sqrt),
(backend.log, np.log),
]
for keras_op, np_op in ops_to_test:
compare_single_input_op_to_numpy(keras_op, np_op,
input_shape=(4, 7),
negative_values=False)
compare_single_input_op_to_numpy(
backend.clip,
np.clip,
input_shape=(6, 4),
keras_kwargs={
'min_value': 0.1,
'max_value': 2.4
},
np_kwargs={
'a_min': 0.1,
'a_max': 1.4
})
compare_single_input_op_to_numpy(
backend.pow, np.power, input_shape=(6, 4), keras_args=[3], np_args=[3])
def test_two_tensor_ops(self):
ops_to_test = [
(backend.equal, np.equal),
(backend.not_equal, np.not_equal),
(backend.greater, np.greater),
(backend.greater_equal, np.greater_equal),
(backend.less, np.less),
(backend.less_equal, np.less_equal),
(backend.maximum, np.maximum),
(backend.minimum, np.minimum),
]
for keras_op, np_op in ops_to_test:
compare_two_inputs_op_to_numpy(keras_op, np_op,
input_shape_a=(4, 7),
input_shape_b=(4, 7))
def test_relu(self):
x = ops.convert_to_tensor_v2([[-4, 0], [2, 7]], 'float32')
# standard relu
relu_op = backend.relu(x)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [2, 7]])
# alpha (leaky relu used)
relu_op = backend.relu(x, alpha=0.5)
if not context.executing_eagerly():
self.assertTrue('LeakyRelu' in relu_op.name)
self.assertAllClose(backend.eval(relu_op), [[-2, 0], [2, 7]])
# max_value < some elements
relu_op = backend.relu(x, max_value=5)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [2, 5]])
# nn.relu6 used
relu_op = backend.relu(x, max_value=6)
if not context.executing_eagerly():
self.assertTrue('Relu6' in relu_op.name) # uses tf.nn.relu6
self.assertAllClose(backend.eval(relu_op), [[0, 0], [2, 6]])
# max value > 6
relu_op = backend.relu(x, max_value=10)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [2, 7]])
# max value is float
relu_op = backend.relu(x, max_value=4.3)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [2, 4.3]])
# max value == 0
relu_op = backend.relu(x, max_value=0)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [0, 0]])
# alpha and max_value
relu_op = backend.relu(x, alpha=0.25, max_value=3)
self.assertAllClose(backend.eval(relu_op), [[-1, 0], [2, 3]])
# threshold
relu_op = backend.relu(x, threshold=3)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [0, 7]])
# threshold is float
relu_op = backend.relu(x, threshold=1.5)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [2, 7]])
# threshold is negative
relu_op = backend.relu(x, threshold=-5)
self.assertAllClose(backend.eval(relu_op), [[-4, 0], [2, 7]])
# threshold and max_value
relu_op = backend.relu(x, threshold=3, max_value=5)
self.assertAllClose(backend.eval(relu_op), [[0, 0], [0, 5]])
# threshold and alpha
relu_op = backend.relu(x, alpha=0.25, threshold=4)
self.assertAllClose(backend.eval(relu_op), [[-2, -1], [-0.5, 7]])
# threshold, alpha, and max_value
relu_op = backend.relu(x, alpha=0.25, threshold=4, max_value=5)
self.assertAllClose(backend.eval(relu_op), [[-2, -1], [-0.5, 5]])
# Test case for GitHub issue 35430, with integer dtype
x = input_layer.Input(shape=(), name='x', dtype='int64')
_ = advanced_activations.ReLU(max_value=100, dtype='int64')(x)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class BackendShapeOpsTest(test.TestCase):
def test_reshape(self):
compare_single_input_op_to_numpy(
backend.reshape,
np.reshape,
input_shape=(4, 7),
keras_args=[(2, 14)],
np_args=[(2, 14)])
def test_concatenate(self):
a = backend.variable(np.ones((1, 2, 3)))
b = backend.variable(np.ones((1, 2, 2)))
y = backend.concatenate([a, b], axis=-1)
self.assertEqual(y.shape.as_list(), [1, 2, 5])
def test_permute_dimensions(self):
compare_single_input_op_to_numpy(
backend.permute_dimensions,
np.transpose,
input_shape=(4, 7),
keras_args=[(1, 0)],
np_args=[(1, 0)])
def test_resize_images(self):
height_factor = 2
width_factor = 2
data_format = 'channels_last'
x = backend.variable(np.ones((1, 2, 2, 3)))
y = backend.resize_images(x, height_factor, width_factor, data_format)
self.assertEqual(y.shape.as_list(), [1, 4, 4, 3])
data_format = 'channels_first'
x = backend.variable(np.ones((1, 3, 2, 2)))
y = backend.resize_images(x, height_factor, width_factor, data_format)
self.assertEqual(y.shape.as_list(), [1, 3, 4, 4])
# Invalid use:
with self.assertRaises(ValueError):
backend.resize_images(
x, height_factor, width_factor, data_format='unknown')
def test_resize_volumes(self):
height_factor = 2
width_factor = 2
depth_factor = 2
data_format = 'channels_last'
x = backend.variable(np.ones((1, 2, 2, 2, 3)))
y = backend.resize_volumes(x, depth_factor, height_factor, width_factor,
data_format)
self.assertEqual(y.shape.as_list(), [1, 4, 4, 4, 3])
data_format = 'channels_first'
x = backend.variable(np.ones((1, 3, 2, 2, 2)))
y = backend.resize_volumes(x, depth_factor, height_factor, width_factor,
data_format)
self.assertEqual(y.shape.as_list(), [1, 3, 4, 4, 4])
# Invalid use:
with self.assertRaises(ValueError):
backend.resize_volumes(
x, depth_factor, height_factor, width_factor, data_format='unknown')
def test_repeat_elements(self):
x = backend.variable(np.ones((1, 3, 2)))
y = backend.repeat_elements(x, 3, axis=1)
self.assertEqual(y.shape.as_list(), [1, 9, 2])
# Use with a dynamic axis:
if not context.executing_eagerly():
x = backend.placeholder(shape=(2, None, 2))
y = backend.repeat_elements(x, 3, axis=1)
self.assertEqual(y.shape.as_list(), [2, None, 2])
def test_repeat(self):
x = backend.variable(np.ones((1, 3)))
y = backend.repeat(x, 2)
self.assertEqual(y.shape.as_list(), [1, 2, 3])
def test_flatten(self):
compare_single_input_op_to_numpy(
backend.flatten,
np.reshape,
input_shape=(4, 7, 6),
np_args=[(4 * 7 * 6,)])
def test_batch_flatten(self):
compare_single_input_op_to_numpy(
backend.batch_flatten,
np.reshape,
input_shape=(4, 7, 6),
np_args=[(4, 7 * 6)])
def test_temporal_padding(self):
def ref_op(x, padding):
shape = list(x.shape)
shape[1] += padding[0] + padding[1]
y = np.zeros(tuple(shape))
y[:, padding[0]:-padding[1], :] = x
return y
compare_single_input_op_to_numpy(
backend.temporal_padding,
ref_op,
input_shape=(4, 7, 6),
keras_args=[(2, 3)],
np_args=[(2, 3)])
def test_spatial_2d_padding(self):
def ref_op(x, padding, data_format='channels_last'):
shape = list(x.shape)
if data_format == 'channels_last':
shape[1] += padding[0][0] + padding[0][1]
shape[2] += padding[1][0] + padding[1][1]
y = np.zeros(tuple(shape))
y[:, padding[0][0]:-padding[0][1], padding[1][0]:-padding[1][1], :] = x
else:
shape[2] += padding[0][0] + padding[0][1]
shape[3] += padding[1][0] + padding[1][1]
y = np.zeros(tuple(shape))
y[:, :, padding[0][0]:-padding[0][1], padding[1][0]:-padding[1][1]] = x
return y
compare_single_input_op_to_numpy(
backend.spatial_2d_padding,
ref_op,
input_shape=(2, 3, 2, 3),
keras_args=[((2, 3), (1, 2))],
keras_kwargs={'data_format': 'channels_last'},
np_args=[((2, 3), (1, 2))],
np_kwargs={'data_format': 'channels_last'})
compare_single_input_op_to_numpy(
backend.spatial_2d_padding,
ref_op,
input_shape=(2, 3, 2, 3),
keras_args=[((2, 3), (1, 2))],
keras_kwargs={'data_format': 'channels_first'},
np_args=[((2, 3), (1, 2))],
np_kwargs={'data_format': 'channels_first'})
def test_spatial_3d_padding(self):
def ref_op(x, padding, data_format='channels_last'):
shape = list(x.shape)
if data_format == 'channels_last':
shape[1] += padding[0][0] + padding[0][1]
shape[2] += padding[1][0] + padding[1][1]
shape[3] += padding[2][0] + padding[2][1]
y = np.zeros(tuple(shape))
y[:,
padding[0][0]:-padding[0][1],
padding[1][0]:-padding[1][1],
padding[2][0]:-padding[2][1],
:] = x
else:
shape[2] += padding[0][0] + padding[0][1]
shape[3] += padding[1][0] + padding[1][1]
shape[4] += padding[2][0] + padding[2][1]
y = np.zeros(tuple(shape))
y[:, :,
padding[0][0]:-padding[0][1],
padding[1][0]:-padding[1][1],
padding[2][0]:-padding[2][1]] = x
return y
compare_single_input_op_to_numpy(
backend.spatial_3d_padding,
ref_op,
input_shape=(2, 3, 2, 3, 2),
keras_args=[((2, 3), (1, 2), (2, 3))],
keras_kwargs={'data_format': 'channels_last'},
np_args=[((2, 3), (1, 2), (2, 3))],
np_kwargs={'data_format': 'channels_last'})
compare_single_input_op_to_numpy(
backend.spatial_3d_padding,
ref_op,
input_shape=(2, 3, 2, 3, 2),
keras_args=[((2, 3), (1, 2), (2, 3))],
keras_kwargs={'data_format': 'channels_first'},
np_args=[((2, 3), (1, 2), (2, 3))],
np_kwargs={'data_format': 'channels_first'})
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class BackendNNOpsTest(test.TestCase, parameterized.TestCase):
def test_bias_add(self):
keras_op = backend.bias_add
np_op = np.add
compare_two_inputs_op_to_numpy(keras_op, np_op,
input_shape_a=(4, 7),
input_shape_b=(7,))
compare_two_inputs_op_to_numpy(keras_op, np_op,
input_shape_a=(4, 3, 7),
input_shape_b=(7,))
compare_two_inputs_op_to_numpy(keras_op, np_op,
input_shape_a=(4, 3, 5, 7),
input_shape_b=(7,))
compare_two_inputs_op_to_numpy(keras_op, np_op,
input_shape_a=(4, 3, 5, 2, 7),
input_shape_b=(7,))
with self.assertRaises((ValueError, errors_impl.InvalidArgumentError)):
x = backend.variable((3, 4))
b = backend.variable((3, 4))
backend.bias_add(x, b)
with self.assertRaises(ValueError):
x = backend.variable((3, 4))
b = backend.variable((4,))
backend.bias_add(x, b, data_format='unknown')
def test_bias_add_channels_first(self):
def keras_op(x, b):
return backend.bias_add(x, b, data_format='channels_first')
def np_op(x, b):
if x.ndim == 3:
b = b.reshape((1, b.shape[0], 1))
if x.ndim == 4:
b = b.reshape((1, b.shape[0], 1, 1))
return x + b
compare_two_inputs_op_to_numpy(keras_op, np_op,
input_shape_a=(4, 3, 7),
input_shape_b=(3,))
compare_two_inputs_op_to_numpy(keras_op, np_op,
input_shape_a=(4, 3, 5, 7),
input_shape_b=(3,))
def test_pool2d(self):
val = np.random.random((10, 3, 10, 10))
x = backend.variable(val)
y = backend.pool2d(
x, (2, 2),
strides=(1, 1),
padding='valid',
data_format='channels_first',
pool_mode='max')
self.assertEqual(y.shape.as_list(), [10, 3, 9, 9])
y = backend.pool2d(
x, (2, 2),
strides=(1, 1),
padding='valid',
data_format='channels_first',
pool_mode='avg')
self.assertEqual(y.shape.as_list(), [10, 3, 9, 9])
val = np.random.random((10, 10, 10, 3))
x = backend.variable(val)
y = backend.pool2d(
x, (2, 2), strides=(1, 1), padding='valid', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 9, 9, 3])
val = np.random.random((10, 10, 10, 3))
x = backend.variable(val)
y = backend.pool2d(
x, (2, 2), strides=(1, 1), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 10, 10, 3])
val = np.random.random((10, 10, 10, 3))
x = backend.variable(val)
y = backend.pool2d(
x, (2, 2), strides=(2, 2), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 5, 5, 3])
with self.assertRaises(ValueError):
y = backend.pool2d(
x, (2, 2),
strides=(2, 2),
padding='other',
data_format='channels_last')
with self.assertRaises(ValueError):
y = backend.pool2d(x, (2, 2), strides=(2, 2), data_format='other')
with self.assertRaises(ValueError):
y = backend.pool2d(x, (2, 2, 2), strides=(2, 2))
with self.assertRaises(ValueError):
y = backend.pool2d(x, (2, 2), strides=(2, 2, 2))
with self.assertRaises(ValueError):
y = backend.pool2d(x, (2, 2), strides=(2, 2), pool_mode='other')
def test_pool3d(self):
if test.is_built_with_rocm():
self.skipTest('Pooling with 3D tensors is not supported in ROCm')
val = np.random.random((10, 3, 10, 10, 10))
x = backend.variable(val)
y = backend.pool3d(
x, (2, 2, 2),
strides=(1, 1, 1),
padding='valid',
data_format='channels_first',
pool_mode='max')
self.assertEqual(y.shape.as_list(), [10, 3, 9, 9, 9])
y = backend.pool3d(
x, (2, 2, 2),
strides=(1, 1, 1),
padding='valid',
data_format='channels_first',
pool_mode='avg')
self.assertEqual(y.shape.as_list(), [10, 3, 9, 9, 9])
val = np.random.random((10, 10, 10, 10, 3))
x = backend.variable(val)
y = backend.pool3d(
x, (2, 2, 2),
strides=(1, 1, 1),
padding='valid',
data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 9, 9, 9, 3])
val = np.random.random((10, 10, 10, 10, 3))
x = backend.variable(val)
y = backend.pool3d(
x, (2, 2, 2),
strides=(1, 1, 1),
padding='same',
data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 10, 10, 10, 3])
val = np.random.random((10, 10, 10, 10, 3))
x = backend.variable(val)
y = backend.pool3d(
x, (2, 2, 2),
strides=(2, 2, 2),
padding='same',
data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 5, 5, 5, 3])
def test_conv1d(self):
val = np.random.random((10, 4, 10))
x = backend.variable(val)
kernel_val = np.random.random((3, 4, 5))
k = backend.variable(kernel_val)
y = backend.conv1d(
x, k, strides=(1,), padding='valid', data_format='channels_first')
self.assertEqual(y.shape.as_list(), [10, 5, 8])
val = np.random.random((10, 10, 4))
x = backend.variable(val)
y = backend.conv1d(
x, k, strides=(1,), padding='valid', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 8, 5])
val = np.random.random((10, 10, 4))
x = backend.variable(val)
y = backend.conv1d(
x, k, strides=(1,), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 10, 5])
val = np.random.random((10, 10, 4))
x = backend.variable(val)
y = backend.conv1d(
x, k, strides=(2,), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 5, 5])
def test_local_conv_channels_dim(self):
filters = 3
batch_size = 2
for input_shape in [(3, 5), (2, 3, 5), (2, 5, 3, 4)]:
channels_in = input_shape[0]
input_spatial_shape = input_shape[1:]
dim = len(input_spatial_shape)
inputs = np.random.normal(0, 1, (batch_size,) + input_shape)
inputs_cf = backend.variable(inputs)
for kernel_size in [1, 2]:
for stride in [1, 2]:
kernel_sizes = (kernel_size,) * dim
strides = (stride,) * dim
output_shape = tuple([(i - kernel_size + stride) // stride
for i in input_spatial_shape])
kernel_shape = (np.prod(output_shape),
np.prod(kernel_sizes) * channels_in,
filters)
kernel = np.random.normal(
0,
1,
output_shape + (channels_in, np.prod(kernel_sizes), filters)
)
kernel_cf = np.reshape(kernel, kernel_shape)
kernel_cf = backend.variable(kernel_cf)
conv_cf = backend.local_conv(inputs_cf, kernel_cf, kernel_sizes,
strides, output_shape, 'channels_first')
inputs_cl = np.transpose(inputs, [0, 2] + list(range(3, dim + 2)) +
[1])
inputs_cl = backend.variable(inputs_cl)
kernel_cl = np.reshape(
np.transpose(kernel, list(range(dim)) + [dim + 1, dim, dim + 2]),
kernel_shape
)
kernel_cl = backend.variable(kernel_cl)
conv_cl = backend.local_conv(inputs_cl, kernel_cl, kernel_sizes,
strides, output_shape, 'channels_last')
conv_cf = backend.eval(conv_cf)
conv_cl = backend.eval(conv_cl)
self.assertAllCloseAccordingToType(
conv_cf,
np.transpose(conv_cl,
[0, dim + 1] + list(range(1, dim + 1))),
atol=1e-5
)
@parameterized.named_parameters(
('local_conv1d', (5, 6), (3,), (1,), (3,)),
('local_conv2d', (4, 5, 6), (3, 3), (1, 1), (2, 3)))
def test_local_conv_1d_and_2d(self,
input_shape,
kernel_sizes,
strides,
output_shape):
filters = 3
batch_size = 2
inputs = np.random.normal(0, 1, (batch_size,) + input_shape)
inputs = backend.variable(inputs)
kernel = np.random.normal(0, 1, (np.prod(output_shape),
np.prod(kernel_sizes) * input_shape[-1],
filters))
kernel = backend.variable(kernel)
local_conv = backend.local_conv(inputs, kernel, kernel_sizes, strides,
output_shape, 'channels_last')
if len(output_shape) == 1:
local_conv_dim = backend.local_conv1d(inputs, kernel, kernel_sizes,
strides, 'channels_last')
else:
local_conv_dim = backend.local_conv2d(inputs, kernel, kernel_sizes,
strides, output_shape,
'channels_last')
local_conv = backend.eval(local_conv)
local_conv_dim = backend.eval(local_conv_dim)
self.assertAllCloseAccordingToType(local_conv, local_conv_dim)
def test_conv2d(self):
kernel_val = np.random.random((3, 3, 4, 5))
k = backend.variable(kernel_val)
# Test channels_first
val = np.random.random((10, 4, 10, 10))
x = backend.variable(val)
y = backend.conv2d(x, k, padding='valid', data_format='channels_first')
self.assertEqual(y.shape.as_list(), [10, 5, 8, 8])
# Test channels_last
val = np.random.random((10, 10, 10, 4))
x = backend.variable(val)
y = backend.conv2d(
x, k, strides=(1, 1), padding='valid', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 8, 8, 5])
# Test same padding
val = np.random.random((10, 10, 10, 4))
x = backend.variable(val)
y = backend.conv2d(x, k, padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 10, 10, 5])
# Test dilation_rate
val = np.random.random((10, 10, 10, 4))
x = backend.variable(val)
y = backend.conv2d(
x, k, dilation_rate=(2, 2), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 10, 10, 5])
# Test strides
val = np.random.random((10, 10, 10, 4))
x = backend.variable(val)
y = backend.conv2d(
x, k, strides=(2, 2), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 5, 5, 5])
# Test invalid arguments
with self.assertRaises(ValueError):
y = backend.conv2d(
x, k, (2, 2), padding='other', data_format='channels_last')
with self.assertRaises(ValueError):
y = backend.conv2d(x, k, (2, 2), data_format='other')
with self.assertRaises(ValueError):
y = backend.conv2d(x, k, (2, 2, 2))
def test_conv2d_transpose(self):
input_size = (7, 8)
kernel_size = (3, 3)
input_depth = 6
filters = 6
batch_size = 2
kernel_val = np.random.random(kernel_size + (input_depth, filters))
k = backend.variable(kernel_val)
# Test channels_first
input_val = np.random.random((batch_size, input_depth) + input_size)
x = backend.variable(input_val)
y = backend.conv2d_transpose(
x,
k, (batch_size, filters) + input_size,
padding='same',
data_format='channels_first')
self.assertEqual(
tuple(y.shape.as_list()), (batch_size, filters) + input_size)
# Test channels_last
input_val = np.random.random((batch_size,) + input_size + (input_depth,))
x = backend.variable(input_val)
y = backend.conv2d_transpose(
x,
k, (batch_size,) + input_size + (filters,),
padding='same',
data_format='channels_last')
self.assertEqual(
tuple(y.shape.as_list()), (batch_size,) + input_size + (filters,))
# Test dilation_rate
y = backend.conv2d_transpose(
x,
k, (batch_size,) + input_size + (filters,),
padding='same',
data_format='channels_last',
dilation_rate=(2, 2))
self.assertEqual(
tuple(y.shape.as_list()), (batch_size,) + input_size + (filters,))
# Test batch size of None in output_shape
y = backend.conv2d_transpose(
x,
k, (None,) + input_size + (filters,),
padding='same',
data_format='channels_last')
self.assertEqual(
tuple(y.shape.as_list()), (batch_size,) + input_size + (filters,))
# Test invalid values
with self.assertRaises(ValueError):
y = backend.conv2d_transpose(
x, k, (2, 2, 8, 9), padding='other', data_format='channels_last')
with self.assertRaises(ValueError):
y = backend.conv2d_transpose(x, k, (2, 2, 8, 9), data_format='other')
def test_separable_conv2d(self):
val = np.random.random((10, 4, 10, 10))
x = backend.variable(val)
depthwise_kernel_val = np.random.random((3, 3, 4, 1))
pointwise_kernel_val = np.random.random((1, 1, 4, 5))
dk = backend.variable(depthwise_kernel_val)
pk = backend.variable(pointwise_kernel_val)
y = backend.separable_conv2d(
x, dk, pk, padding='valid', data_format='channels_first')
self.assertEqual(y.shape.as_list(), [10, 5, 8, 8])
val = np.random.random((10, 10, 10, 4))
x = backend.variable(val)
y = backend.separable_conv2d(
x, dk, pk, strides=(1, 1), padding='valid', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 8, 8, 5])
val = np.random.random((10, 10, 10, 4))
x = backend.variable(val)
y = backend.separable_conv2d(
x, dk, pk, strides=(1, 1), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 10, 10, 5])
val = np.random.random((10, 10, 10, 4))
x = backend.variable(val)
y = backend.separable_conv2d(
x, dk, pk, strides=(2, 2), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 5, 5, 5])
with self.assertRaises(ValueError):
y = backend.separable_conv2d(
x, dk, pk, (2, 2), padding='other', data_format='channels_last')
with self.assertRaises(ValueError):
y = backend.separable_conv2d(x, dk, pk, (2, 2), data_format='other')
with self.assertRaises(ValueError):
y = backend.separable_conv2d(x, dk, pk, (2, 2, 2))
def test_conv3d(self):
val = np.random.random((10, 4, 10, 10, 10))
x = backend.variable(val)
kernel_val = np.random.random((3, 3, 3, 4, 5))
k = backend.variable(kernel_val)
y = backend.conv3d(x, k, padding='valid', data_format='channels_first')
self.assertEqual(y.shape.as_list(), [10, 5, 8, 8, 8])
val = np.random.random((10, 10, 10, 10, 4))
x = backend.variable(val)
y = backend.conv3d(
x, k, strides=(1, 1, 1), padding='valid', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 8, 8, 8, 5])
val = np.random.random((10, 10, 10, 10, 4))
x = backend.variable(val)
y = backend.conv3d(
x, k, strides=(1, 1, 1), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 10, 10, 10, 5])
val = np.random.random((10, 10, 10, 10, 4))
x = backend.variable(val)
y = backend.conv3d(
x, k, strides=(2, 2, 2), padding='same', data_format='channels_last')
self.assertEqual(y.shape.as_list(), [10, 5, 5, 5, 5])
with self.assertRaises(ValueError):
y = backend.conv3d(
x, k, (2, 2, 2), padding='other', data_format='channels_last')
with self.assertRaises(ValueError):
y = backend.conv3d(x, k, (2, 2, 2), data_format='other')
with self.assertRaises(ValueError):
y = backend.conv3d(x, k, (2, 2))
def test_rnn(self):
# implement a simple RNN
num_samples = 4
input_dim = 5
output_dim = 3
timesteps = 6
input_val = np.random.random(
(num_samples, timesteps, input_dim)).astype(np.float32)
init_state_val = np.random.random(
(num_samples, output_dim)).astype(np.float32)
w_i_val = np.random.random((input_dim, output_dim)).astype(np.float32)
w_o_val = np.random.random((output_dim, output_dim)).astype(np.float32)
np_mask = np.random.randint(2, size=(num_samples, timesteps))
def rnn_step_fn():
w_i = backend.variable(w_i_val)
w_o = backend.variable(w_o_val)
def step_function(x, states):
assert len(states) == 1
prev_output = states[0]
output = backend.dot(x, w_i) + backend.dot(prev_output, w_o)
return output, [output]
return step_function
# test default setup
last_output_list = [[], [], [], [], [], []]
outputs_list = [[], [], [], [], [], []]
state_list = [[], [], [], [], [], []]
rnn_fn = rnn_step_fn()
inputs = backend.variable(input_val)
initial_states = [backend.variable(init_state_val)]
mask = backend.variable(np_mask)
kwargs_list = [
{'go_backwards': False, 'mask': None},
{'go_backwards': False, 'mask': None, 'unroll': True},
{'go_backwards': True, 'mask': None},
{'go_backwards': True, 'mask': None, 'unroll': True},
{'go_backwards': False, 'mask': mask},
{'go_backwards': False, 'mask': mask, 'unroll': True},
]
for i, kwargs in enumerate(kwargs_list):
last_output, outputs, new_states = backend.rnn(rnn_fn, inputs,
initial_states, **kwargs)
# check static shape inference
self.assertEqual(last_output.shape.as_list(), [num_samples, output_dim])
self.assertEqual(outputs.shape.as_list(),
[num_samples, timesteps, output_dim])
for state in new_states:
self.assertEqual(state.shape.as_list(), [num_samples, output_dim])
last_output_list[i].append(backend.eval(last_output))
outputs_list[i].append(backend.eval(outputs))
self.assertLen(new_states, 1)
state_list[i].append(backend.eval(new_states[0]))
def assert_list_pairwise(z_list, atol=1e-05):
for (z1, z2) in zip(z_list[1:], z_list[:-1]):
self.assertAllClose(z1, z2, atol=atol)
assert_list_pairwise(last_output_list[0], atol=1e-04)
assert_list_pairwise(outputs_list[0], atol=1e-04)
assert_list_pairwise(state_list[0], atol=1e-04)
assert_list_pairwise(last_output_list[2], atol=1e-04)
assert_list_pairwise(outputs_list[2], atol=1e-04)
assert_list_pairwise(state_list[2], atol=1e-04)
for l, u_l in zip(last_output_list[0], last_output_list[1]):
self.assertAllClose(l, u_l, atol=1e-04)
for o, u_o in zip(outputs_list[0], outputs_list[1]):
self.assertAllClose(o, u_o, atol=1e-04)
for s, u_s in zip(state_list[0], state_list[1]):
self.assertAllClose(s, u_s, atol=1e-04)
for b_l, b_u_l in zip(last_output_list[2], last_output_list[3]):
self.assertAllClose(b_l, b_u_l, atol=1e-04)
for b_o, b_u_o in zip(outputs_list[2], outputs_list[3]):
self.assertAllClose(b_o, b_u_o, atol=1e-04)
for b_s, b_u_s in zip(state_list[2], state_list[3]):
self.assertAllClose(b_s, b_u_s, atol=1e-04)
def test_rnn_additional_states(self):
# implement a simple RNN
num_samples = 4
input_dim = 5
output_dim = 3
timesteps = 6
input_val = np.random.random(
(num_samples, timesteps, input_dim)).astype(np.float32)
init_state_val = np.random.random(
(num_samples, output_dim)).astype(np.float32)
w_i_val = np.random.random((input_dim, output_dim)).astype(np.float32)
w_o_val = np.random.random((output_dim, output_dim)).astype(np.float32)
np_mask = np.random.randint(2, size=(num_samples, timesteps))
def rnn_step_fn():
w_i = backend.variable(w_i_val)
w_o = backend.variable(w_o_val)
def step_function(x, states):
assert len(states) == 2
prev_output = states[0]
output = backend.dot(x, w_i) + backend.dot(prev_output, w_o)
return output, [output, backend.concatenate([output, output], axis=-1)]
return step_function
# test default setup
last_output_list = [[], [], [], [], [], []]
outputs_list = [[], [], [], [], [], []]
state_list = [[], [], [], [], [], []]
additional_state_list = [[], [], [], [], [], []]
rnn_fn = rnn_step_fn()
inputs = backend.variable(input_val)
initial_states = [
backend.variable(init_state_val),
ops.convert_to_tensor_v2(
np.concatenate([init_state_val, init_state_val], axis=-1))
]
mask = backend.variable(np_mask)
kwargs_list = [
{'go_backwards': False, 'mask': None},
{'go_backwards': False, 'mask': None, 'unroll': True},
{'go_backwards': True, 'mask': None},
{'go_backwards': True, 'mask': None, 'unroll': True},
{'go_backwards': False, 'mask': mask},
{'go_backwards': False, 'mask': mask, 'unroll': True},
]
for i, kwargs in enumerate(kwargs_list):
last_output, outputs, new_states = backend.rnn(rnn_fn, inputs,
initial_states, **kwargs)
# check static shape inference
self.assertEqual(last_output.shape.as_list(), [num_samples, output_dim])
self.assertEqual(outputs.shape.as_list(),
[num_samples, timesteps, output_dim])
# for state in new_states:
# self.assertEqual(state.shape.as_list(),
# [num_samples, output_dim])
self.assertEqual(new_states[0].shape.as_list(), [num_samples, output_dim])
self.assertEqual(new_states[1].shape.as_list(),
[num_samples, 2 * output_dim])
last_output_list[i].append(backend.eval(last_output))
outputs_list[i].append(backend.eval(outputs))
self.assertLen(new_states, 2)
state_list[i].append(backend.eval(new_states[0]))
additional_state_list[i].append(backend.eval(new_states[1]))
def assert_list_pairwise(z_list, atol=1e-05):
for (z1, z2) in zip(z_list[1:], z_list[:-1]):
self.assertAllClose(z1, z2, atol=atol)
assert_list_pairwise(last_output_list[0], atol=1e-04)
assert_list_pairwise(outputs_list[0], atol=1e-04)
assert_list_pairwise(state_list[0], atol=1e-04)
assert_list_pairwise(additional_state_list[0], atol=1e-04)
assert_list_pairwise(last_output_list[2], atol=1e-04)
assert_list_pairwise(outputs_list[2], atol=1e-04)
assert_list_pairwise(state_list[2], atol=1e-04)
assert_list_pairwise(additional_state_list[2], atol=1e-04)
for l, u_l in zip(last_output_list[0], last_output_list[1]):
self.assertAllClose(l, u_l, atol=1e-04)
for o, u_o in zip(outputs_list[0], outputs_list[1]):
self.assertAllClose(o, u_o, atol=1e-04)
for s, u_s in zip(state_list[0], state_list[1]):
self.assertAllClose(s, u_s, atol=1e-04)
for s, u_s in zip(additional_state_list[0], additional_state_list[1]):
self.assertAllClose(s, u_s, atol=1e-04)
for b_l, b_u_l in zip(last_output_list[2], last_output_list[3]):
self.assertAllClose(b_l, b_u_l, atol=1e-04)
for b_o, b_u_o in zip(outputs_list[2], outputs_list[3]):
self.assertAllClose(b_o, b_u_o, atol=1e-04)
for b_s, b_u_s in zip(state_list[2], state_list[3]):
self.assertAllClose(b_s, b_u_s, atol=1e-04)
for s, u_s in zip(additional_state_list[2], additional_state_list[3]):
self.assertAllClose(s, u_s, atol=1e-04)
def test_rnn_output_and_state_masking_independent(self):
num_samples = 2
num_timesteps = 4
state_and_io_size = 2
mask_last_num_timesteps = 2 # for second sample only
# a step function that just outputs inputs,
# but increments states +1 per timestep
def step_function(inputs, states):
return inputs, [s + 1 for s in states]
inputs_vals = np.random.random((num_samples, num_timesteps,
state_and_io_size))
initial_state_vals = np.random.random((num_samples, state_and_io_size))
# masking of two last timesteps for second sample only
mask_vals = np.ones((num_samples, num_timesteps))
mask_vals[1, -mask_last_num_timesteps:] = 0
# outputs expected to be same as inputs for the first sample
expected_outputs = inputs_vals.copy()
# but for the second sample all outputs in masked region should be the same
# as last output before masked region
expected_outputs[1, -mask_last_num_timesteps:] = \
expected_outputs[1, -(mask_last_num_timesteps + 1)]
expected_last_state = initial_state_vals.copy()
# first state should be incremented for every timestep (no masking)
expected_last_state[0] += num_timesteps
# second state should not be incremented for last two timesteps
expected_last_state[1] += (num_timesteps - mask_last_num_timesteps)
# verify same expected output for `unroll=true/false`
inputs = backend.variable(inputs_vals)
initial_states = [backend.variable(initial_state_vals)]
mask = backend.variable(mask_vals)
for unroll in [True, False]:
_, outputs, last_states = backend.rnn(
step_function,
inputs,
initial_states,
mask=mask,
unroll=unroll,
input_length=num_timesteps if unroll else None)
self.assertAllClose(backend.eval(outputs), expected_outputs)
self.assertAllClose(backend.eval(last_states[0]), expected_last_state)
def test_rnn_output_num_dim_larger_than_2_masking(self):
num_samples = 3
num_timesteps = 4
num_features = 5
def step_function(inputs, states):
outputs = backend.tile(backend.expand_dims(inputs), [1, 1, 2])
return outputs, [backend.identity(s) for s in states]
# Note: cannot just return states (which can be a problem) ->
# tensorflow/python/ops/resource_variable_ops.py", line 824, in set_shape
# NotImplementedError: ResourceVariable does not implement set_shape()
inputs_vals = np.random.random((num_samples, num_timesteps, num_features))
initial_state_vals = np.random.random((num_samples, 6))
mask_vals = np.ones((num_samples, num_timesteps))
mask_vals[-1, -1] = 0 # final timestep masked for last sample
expected_outputs = np.repeat(inputs_vals[..., None], repeats=2, axis=-1)
# for the last sample, the final timestep (in masked region) should be the
# same as the second to final output (before masked region)
expected_outputs[-1, -1] = expected_outputs[-1, -2]
inputs = backend.variable(inputs_vals)
initial_states = [backend.variable(initial_state_vals)]
mask = backend.variable(mask_vals)
for unroll in [True, False]:
_, outputs, _ = backend.rnn(
step_function,
inputs,
initial_states,
mask=mask,
unroll=unroll,
input_length=num_timesteps if unroll else None)
self.assertAllClose(backend.eval(outputs), expected_outputs)
def test_rnn_state_num_dim_larger_than_2_masking(self):
num_samples = 3
num_timesteps = 4
def step_function(inputs, states):
return inputs, [s + 1 for s in states]
inputs_vals = np.random.random((num_samples, num_timesteps, 5))
initial_state_vals = np.random.random((num_samples, 6, 7))
mask_vals = np.ones((num_samples, num_timesteps))
mask_vals[0, -2:] = 0 # final two timesteps masked for first sample
expected_last_state = initial_state_vals.copy()
expected_last_state[0] += (num_timesteps - 2)
expected_last_state[1:] += num_timesteps
inputs = backend.variable(inputs_vals)
initial_states = [backend.variable(initial_state_vals)]
mask = backend.variable(mask_vals)
for unroll in [True, False]:
_, _, last_states = backend.rnn(
step_function,
inputs,
initial_states,
mask=mask,
unroll=unroll,
input_length=num_timesteps if unroll else None)
self.assertAllClose(backend.eval(last_states[0]), expected_last_state)
def test_batch_normalization(self):
g_val = np.random.random((3,))
b_val = np.random.random((3,))
gamma = backend.variable(g_val)
beta = backend.variable(b_val)
# 3D NHC case
val = np.random.random((10, 5, 3))
x = backend.variable(val)
mean, var = nn.moments(x, (0, 1), None, None, False)
normed = backend.batch_normalization(
x, mean, var, beta, gamma, axis=-1, epsilon=1e-3)
self.assertEqual(normed.shape.as_list(), [10, 5, 3])
# 4D NHWC case
val = np.random.random((10, 5, 5, 3))
x = backend.variable(val)
mean, var = nn.moments(x, (0, 1, 2), None, None, False)
normed = backend.batch_normalization(
x, mean, var, beta, gamma, axis=-1, epsilon=1e-3)
self.assertEqual(normed.shape.as_list(), [10, 5, 5, 3])
# 4D NCHW case
if not context.executing_eagerly():
# Eager CPU kernel for NCHW does not exist.
val = np.random.random((10, 3, 5, 5))
x = backend.variable(val)
mean, var = nn.moments(x, (0, 2, 3), None, None, False)
normed = backend.batch_normalization(
x, mean, var, beta, gamma, axis=1, epsilon=1e-3)
self.assertEqual(normed.shape.as_list(), [10, 3, 5, 5])
def test_normalize_batch_in_training(self):
val = np.random.random((10, 3, 10, 10))
x = backend.variable(val)
reduction_axes = (0, 2, 3)
g_val = np.random.random((3,))
b_val = np.random.random((3,))
gamma = backend.variable(g_val)
beta = backend.variable(b_val)
normed, mean, var = backend.normalize_batch_in_training(
x, gamma, beta, reduction_axes, epsilon=1e-3)
self.assertEqual(normed.shape.as_list(), [10, 3, 10, 10])
self.assertEqual(mean.shape.as_list(), [
3,
])
self.assertEqual(var.shape.as_list(), [
3,
])
# case: gamma=None
gamma = None
normed, mean, var = backend.normalize_batch_in_training(
x, gamma, beta, reduction_axes, epsilon=1e-3)
self.assertEqual(normed.shape.as_list(), [10, 3, 10, 10])
self.assertEqual(mean.shape.as_list(), [
3,
])
self.assertEqual(var.shape.as_list(), [
3,
])
# case: beta=None
beta = None
normed, mean, var = backend.normalize_batch_in_training(
x, gamma, beta, reduction_axes, epsilon=1e-3)
self.assertEqual(normed.shape.as_list(), [10, 3, 10, 10])
self.assertEqual(mean.shape.as_list(), [
3,
])
self.assertEqual(var.shape.as_list(), [
3,
])
def test_dropout(self):
inputs = array_ops.ones((200, 200))
outputs = backend.dropout(inputs, 0.2)
outputs_val = backend.eval(outputs)
self.assertEqual(np.min(outputs_val), 0)
self.assertAllClose(np.count_nonzero(outputs_val), 32000, atol=1000)
# Test noise shape
outputs = backend.dropout(inputs, 0.2, noise_shape=(200, 1))
outputs_val = backend.eval(outputs)
self.assertAllClose(outputs_val[2, :], outputs_val[3, :], atol=1e-5)
class BackendCrossEntropyLossesTest(test.TestCase, parameterized.TestCase):
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_binary_crossentropy_with_sigmoid(self):
t = backend.constant([[0, 1, 0]])
logits = backend.constant([[8., 1., 1.]])
p = backend.sigmoid(logits)
p = array_ops.identity(array_ops.identity(p))
result = self.evaluate(backend.binary_crossentropy(t, p))
self.assertArrayNear(result[0], [8., 0.313, 1.313], 1e-3)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_categorical_crossentropy_loss(self):
t = backend.constant([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
p = backend.constant([[.9, .05, .05], [.05, .89, .06], [.05, .01, .94]])
result = backend.categorical_crossentropy(t, p)
self.assertArrayNear(self.evaluate(result), [.105, .116, .062], 1e-3)
p = backend.constant([[.9, .05, .05], [.05, .89, .01], [.05, .06, .94]])
result = backend.categorical_crossentropy(t, p, axis=0)
self.assertArrayNear(self.evaluate(result), [.105, .116, .062], 1e-3)
p = backend.constant([[8., 1., 1.], [0., 9., 1.], [2., 3., 5.]])
result = backend.categorical_crossentropy(t, p, from_logits=True),
self.assertArrayNear(self.evaluate(result)[0], [.002, 0, .17], 1e-3)
p = backend.constant([[8., 0., 2.], [1., 9., 3.], [1., 1., 5.]])
result = backend.categorical_crossentropy(t, p, from_logits=True, axis=0),
self.assertArrayNear(self.evaluate(result)[0], [.002, 0, .17], 1e-3)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_categorical_crossentropy_loss_with_unknown_rank_tensor(self):
t = backend.placeholder()
p = backend.placeholder()
o = backend.categorical_crossentropy(t, p)
t_val = ops.convert_to_tensor_v2([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]])
p_val = ops.convert_to_tensor_v2([[.9, .05, .05], [.05, .89, .06],
[.05, .01, .94]])
f = backend.function([t, p], o)
result = f([t_val, p_val])
self.assertArrayNear(result, [.105, .116, .062], 1e-3)
# With axis set
o = backend.categorical_crossentropy(t, p, axis=0)
f = backend.function([t, p], o)
result = f([t_val, p_val])
self.assertArrayNear(result, [.105, .065, .111], 1e-3)
# from logits
p_val = ops.convert_to_tensor_v2([[8., 1., 1.], [0., 9., 1.], [2., 3., 5.]])
o = backend.categorical_crossentropy(t, p, from_logits=True)
f = backend.function([t, p], o)
result = f([t_val, p_val])
self.assertArrayNear(result, [.002, 0, .17], 1e-3)
# from logits and axis set
o = backend.categorical_crossentropy(t, p, from_logits=True, axis=0)
f = backend.function([t, p], o)
result = f([t_val, p_val])
self.assertArrayNear(result, [.002, .003, .036], 1e-3)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_categorical_crossentropy_with_softmax(self):
t = backend.constant([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
logits = backend.constant([[8., 1., 1.], [0., 9., 1.], [2., 3., 5.]])
p = backend.softmax(logits)
p = array_ops.identity(array_ops.identity(p))
result = self.evaluate(backend.categorical_crossentropy(t, p))
self.assertArrayNear(result, [0.002, 0.0005, 0.17], 1e-3)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_sparse_categorical_crossentropy_loss(self):
t = backend.constant([0, 1, 2])
p = backend.constant([[.9, .05, .05], [.05, .89, .06], [.05, .01, .94]])
result = backend.sparse_categorical_crossentropy(t, p)
self.assertArrayNear(self.evaluate(result), [.105, .116, .062], 1e-3)
p = backend.constant([[.9, .05, .05], [.05, .89, .01], [.05, .06, .94]])
result = backend.sparse_categorical_crossentropy(t, p, axis=0)
self.assertArrayNear(self.evaluate(result), [.105, .116, .062], 1e-3)
p = backend.constant([[8., 1., 1.], [0., 9., 1.], [2., 3., 5.]])
result = backend.sparse_categorical_crossentropy(t, p, from_logits=True),
self.assertArrayNear(self.evaluate(result)[0], [.002, 0, .17], 1e-3)
p = backend.constant([[8., 0., 2.], [1., 9., 3.], [1., 1., 5.]])
result = backend.sparse_categorical_crossentropy(
t, p, from_logits=True, axis=0),
self.assertArrayNear(self.evaluate(result)[0], [.002, 0, .17], 1e-3)
@combinations.generate(combinations.combine(mode=['graph']))
def test_sparse_categorical_crossentropy_loss_with_unknown_rank_tensor(self):
# This test only runs in graph because the TF op layer is not supported yet
# for sparse ops.
t = backend.placeholder()
p = backend.placeholder()
o = backend.sparse_categorical_crossentropy(t, p)
t_val = ops.convert_to_tensor_v2([0, 1, 2])
p_val = ops.convert_to_tensor_v2([[.9, .05, .05], [.05, .89, .06],
[.05, .01, .94]])
f = backend.function([t, p], o)
result = f([t_val, p_val])
self.assertArrayNear(result, [.105, .116, .062], 1e-3)
# With axis set
with self.assertRaisesRegex(
ValueError,
'Cannot compute sparse categorical crossentropy with `axis=0`'):
o = backend.sparse_categorical_crossentropy(t, p, axis=0)
f = backend.function([t, p], o)
_ = f([t_val, p_val])
# from logits
p_val = ops.convert_to_tensor_v2([[8., 1., 1.], [0., 9., 1.], [2., 3., 5.]])
o = backend.sparse_categorical_crossentropy(t, p, from_logits=True)
f = backend.function([t, p], o)
result = f([t_val, p_val])
self.assertArrayNear(result, [.002, 0, .17], 1e-3)
# from logits and axis set
with self.assertRaisesRegex(
ValueError,
'Cannot compute sparse categorical crossentropy with `axis=0`'):
o = backend.sparse_categorical_crossentropy(
t, p, from_logits=True, axis=0)
f = backend.function([t, p], o)
_ = f([t_val, p_val])
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_sparse_categorical_crossentropy_with_softmax(self):
t = backend.constant([0, 1, 2])
logits = backend.constant([[8., 1., 1.], [0., 9., 1.], [2., 3., 5.]])
p = backend.softmax(logits)
p = array_ops.identity(array_ops.identity(p))
result = self.evaluate(backend.sparse_categorical_crossentropy(t, p))
self.assertArrayNear(result, [0.002, 0.0005, 0.17], 1e-3)
@test_util.with_control_flow_v2
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class TestCTC(test.TestCase):
def test_ctc_decode(self):
depth = 6
seq_len_0 = 5
input_prob_matrix_0 = np.asarray(
[[0.30999, 0.309938, 0.0679938, 0.0673362, 0.0708352, 0.173908],
[0.215136, 0.439699, 0.0370931, 0.0393967, 0.0381581, 0.230517],
[0.199959, 0.489485, 0.0233221, 0.0251417, 0.0233289, 0.238763],
[0.279611, 0.452966, 0.0204795, 0.0209126, 0.0194803, 0.20655],
[0.51286, 0.288951, 0.0243026, 0.0220788, 0.0219297, 0.129878],
# Random entry added in at time=5
[0.155251, 0.164444, 0.173517, 0.176138, 0.169979, 0.160671]],
dtype=np.float32)
# len max_time_steps array of batch_size x depth matrices
inputs = ([input_prob_matrix_0[t, :][np.newaxis, :]
for t in range(seq_len_0)] + # Pad to max_time_steps = 8
2 * [np.zeros((1, depth), dtype=np.float32)])
inputs = backend.variable(np.asarray(inputs).transpose((1, 0, 2)))
# batch_size length vector of sequence_lengths
input_length = backend.variable(np.array([seq_len_0], dtype=np.int32))
# batch_size length vector of negative log probabilities
log_prob_truth = np.array([
-3.5821197, # output beam 0
-3.777835 # output beam 1
], np.float32)[np.newaxis, :]
decode_truth = [
np.array([1, 0, -1, -1, -1, -1, -1]),
np.array([0, 1, 0, -1, -1, -1, -1])
]
beam_width = 2
top_paths = 2
decode_pred_tf, log_prob_pred_tf = backend.ctc_decode(
inputs,
input_length,
greedy=False,
beam_width=beam_width,
top_paths=top_paths)
self.assertEqual(len(decode_pred_tf), top_paths)
log_prob_pred = backend.eval(log_prob_pred_tf)
for i in range(top_paths):
self.assertTrue(
np.alltrue(decode_truth[i] == backend.eval(decode_pred_tf[i])))
self.assertAllClose(log_prob_truth, log_prob_pred)
def test_ctc_batch_cost(self):
with self.cached_session():
label_lens = np.expand_dims(np.asarray([5, 4]), 1)
input_lens = np.expand_dims(np.asarray([5, 5]), 1) # number of timesteps
loss_log_probs = [3.34211, 5.42262]
# dimensions are batch x time x categories
labels = np.asarray([[0, 1, 2, 1, 0], [0, 1, 1, 0, -1]])
inputs = np.asarray(
[[[0.633766, 0.221185, 0.0917319, 0.0129757, 0.0142857, 0.0260553],
[0.111121, 0.588392, 0.278779, 0.0055756, 0.00569609, 0.010436],
[0.0357786, 0.633813, 0.321418, 0.00249248, 0.00272882, 0.0037688],
[0.0663296, 0.643849, 0.280111, 0.00283995, 0.0035545, 0.00331533],
[0.458235, 0.396634, 0.123377, 0.00648837, 0.00903441, 0.00623107]],
[[0.30176, 0.28562, 0.0831517, 0.0862751, 0.0816851, 0.161508],
[0.24082, 0.397533, 0.0557226, 0.0546814, 0.0557528, 0.19549],
[0.230246, 0.450868, 0.0389607, 0.038309, 0.0391602, 0.202456],
[0.280884, 0.429522, 0.0326593, 0.0339046, 0.0326856, 0.190345],
[0.423286, 0.315517, 0.0338439, 0.0393744, 0.0339315, 0.154046]]],
dtype=np.float32)
labels = backend.variable(labels, dtype='int32')
inputs = backend.variable(inputs, dtype='float32')
input_lens = backend.variable(input_lens, dtype='int32')
label_lens = backend.variable(label_lens, dtype='int32')
res = backend.eval(
backend.ctc_batch_cost(labels, inputs, input_lens, label_lens))
self.assertAllClose(res[:, 0], loss_log_probs, atol=1e-05)
# test when batch_size = 1, that is, one sample only
ref = [3.34211]
input_lens = np.expand_dims(np.asarray([5]), 1)
label_lens = np.expand_dims(np.asarray([5]), 1)
labels = np.asarray([[0, 1, 2, 1, 0]])
inputs = np.asarray(
[[[0.633766, 0.221185, 0.0917319, 0.0129757, 0.0142857, 0.0260553], [
0.111121, 0.588392, 0.278779, 0.0055756, 0.00569609, 0.010436
], [0.0357786, 0.633813, 0.321418, 0.00249248, 0.00272882, 0.0037688],
[0.0663296, 0.643849, 0.280111, 0.00283995, 0.0035545, 0.00331533],
[0.458235, 0.396634, 0.123377, 0.00648837, 0.00903441, 0.00623107]]
],
dtype=np.float32)
k_labels = backend.variable(labels, dtype='int32')
k_inputs = backend.variable(inputs, dtype='float32')
k_input_lens = backend.variable(input_lens, dtype='int32')
k_label_lens = backend.variable(label_lens, dtype='int32')
res = backend.eval(
backend.ctc_batch_cost(k_labels, k_inputs, k_input_lens,
k_label_lens))
self.assertAllClose(res[:, 0], ref, atol=1e-05)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class TestRandomOps(test.TestCase):
def test_random_normal(self):
np.random.seed(123)
x = backend.random_normal((500, 500))
val = backend.eval(x)
self.assertAllClose(np.mean(val), 0., atol=0.01)
self.assertAllClose(np.std(val), 1., atol=0.01)
def test_random_uniform(self):
np.random.seed(123)
x = backend.random_uniform((500, 500))
val = backend.eval(x)
self.assertAllClose(np.mean(val), 0.5, atol=0.01)
self.assertAllClose(np.max(val), 1., atol=0.01)
self.assertAllClose(np.min(val), 0., atol=0.01)
def test_random_binomial(self):
np.random.seed(123)
x = backend.random_binomial((500, 500), p=0.5)
self.assertAllClose(np.mean(backend.eval(x)), 0.5, atol=0.01)
def test_truncated_normal(self):
np.random.seed(123)
x = backend.truncated_normal((500, 500), mean=0.0, stddev=1.0)
x = backend.truncated_normal((1000, 1000), mean=0.0, stddev=1.0)
y = backend.eval(x)
self.assertAllClose(np.mean(y), 0., atol=0.01)
self.assertAllClose(np.std(y), 0.88, atol=0.01)
self.assertAllClose(np.max(y), 2., atol=0.01)
self.assertAllClose(np.min(y), -2., atol=0.01)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class FunctionTest(test.TestCase):
def test_function_basics(self):
if context.executing_eagerly():
self.skipTest('eager backend.function does not support updates')
x1 = backend.placeholder(shape=(), dtype='float32')
x2 = backend.placeholder(shape=(), dtype='int32')
v = backend.variable(10.)
y1 = x1 + backend.cast(x2, 'float32') + v
y2 = x1 * backend.cast(x2, 'float32')
with ops.control_dependencies([y1]):
u = backend.update(v, x1)
f = backend.function([x1, x2], [y1, y2], updates=[u])
output_values = f([2, 3])
self.assertEqual(output_values, [15., 6.])
self.assertEqual(backend.eval(v), 2.)
def test_function_dict_outputs(self):
x_ph = backend.placeholder(shape=(), name='x')
y_ph = backend.placeholder(shape=(), name='y')
outputs = {'x*y': y_ph * x_ph, 'x*x': x_ph * x_ph}
f = backend.function(inputs=[x_ph, y_ph], outputs=outputs)
x, y = 2., 5.
results = f([x, y])
self.assertEqual(results['x*y'], 10.)
self.assertEqual(results['x*x'], 4)
def test_function_dict_inputs(self):
placeholders = {
'x': backend.placeholder(shape=()),
'y': backend.placeholder(shape=())
}
outputs = [placeholders['x'] * placeholders['y']]
f = backend.function(inputs=placeholders, outputs=outputs)
results = f({'x': 2., 'y': 3.})
self.assertEqual(results[0], 6.)
def test_function_single_input_output(self):
x_ph = backend.placeholder(shape=(), name='x')
output = x_ph * x_ph
f = backend.function(x_ph, output)
result = f(2.)
self.assertEqual(result, 4.)
def test_tuple_updates(self):
if context.executing_eagerly():
self.skipTest('eager backend.function does not support updates')
x_ph = backend.placeholder(ndim=2)
v = backend.variable(np.ones((4, 2)))
output = x_ph ** 2 + v
new_v = v + x_ph
f = backend.function(x_ph, output, updates=[(v, new_v)])
input_val = np.random.random((4, 2))
result = f(input_val)
self.assertAllClose(result, input_val ** 2 + 1)
self.assertAllClose(backend.get_value(v), np.ones((4, 2)) + input_val)
class BackendGraphTests(test.TestCase, parameterized.TestCase):
@combinations.generate(combinations.combine(mode=['graph']))
def test_function_placeholder_with_default(self):
with backend.get_graph().as_default():
x1 = array_ops.placeholder_with_default(
np.array(2., dtype='float32'), shape=())
x2 = array_ops.placeholder_with_default(
np.array(3, dtype='int32'), shape=())
y1 = x1 + backend.cast(x2, 'float32')
y2 = x1 * backend.cast(x2, 'float32')
f = backend.function([x1, x2], [y1, y2])
output_values = f([4, 5])
self.assertEqual(output_values, [9., 20.])
output_values = f([None, None])
self.assertEqual(output_values, [5., 6.])
def test_function_tf_feed_symbols(self):
# Test Keras backend functions with TF tensor inputs.
with ops.Graph().as_default(), self.cached_session():
# Test feeding a resource variable to `function`.
x1 = backend.placeholder(shape=())
x2 = backend.placeholder(shape=())
lr = backend.learning_phase() # Include a placeholder_with_default.
y1 = backend.variable(10.)
y2 = 3
f = backend.function(
inputs=[x1, x2, lr],
outputs=[x1 + 1, backend.in_train_phase(x2 + 2, x2 - 1)])
outs = f([y1, y2, None]) # Use default learning_phase value.
self.assertEqual(outs, [11., 2.])
outs = f([y1, y2, 1]) # Set learning phase value.
self.assertEqual(outs, [11., 5.])
# Test triggering a callable refresh by changing the input.
y3 = backend.constant(20.) # Test with tensor
outs = f([y3, y2, None])
self.assertEqual(outs, [21., 2.])
y4 = 4 # Test with non-symbol
outs = f([y4, y2, None])
self.assertEqual(outs, [5., 2.])
# Test with a different dtype
y5 = backend.constant(10., dtype='float64')
outs = f([y5, y2, None])
self.assertEqual(outs, [11., 2.])
def test_function_tf_fetches(self):
# Additional operations can be passed to tf.compat.v1.Session().run() via
# its `fetches` arguments. In contrast to `updates` argument of
# backend.function() these do not have control dependency on `outputs`
# so they can run in parallel. Also they should not contribute to output of
# backend.function().
with ops.Graph().as_default(), self.cached_session():
x = backend.variable(0.)
y = backend.variable(0.)
x_placeholder = backend.placeholder(shape=())
y_placeholder = backend.placeholder(shape=())
f = backend.function(
inputs=[x_placeholder, y_placeholder],
outputs=[x_placeholder + y_placeholder],
updates=[(x, x_placeholder + 1.)],
fetches=[backend.update(y, 5.)])
output = f([10., 20.])
self.assertEqual(output, [30.])
self.assertEqual(backend.get_session().run(fetches=[x, y]), [11., 5.])
def test_function_tf_feed_dict(self):
# Additional substitutions can be passed to `tf.compat.v1.Session().run()`
# via its `feed_dict` arguments. Note that the feed_dict is passed once in
# the constructor but we can modify the values in the dictionary. Through
# this feed_dict we can provide additional substitutions besides Keras
# inputs.
with ops.Graph().as_default(), self.cached_session():
x = backend.variable(0.)
y = backend.variable(0.)
x_placeholder = backend.placeholder(shape=())
y_placeholder = backend.placeholder(shape=())
feed_dict = {y_placeholder: 3.}
fetches = [backend.update(y, y_placeholder * 10.)]
f = backend.function(
inputs=[x_placeholder],
outputs=[x_placeholder + 1.],
updates=[(x, x_placeholder + 10.)],
feed_dict=feed_dict,
fetches=fetches)
output = f([10.])
self.assertEqual(output, [11.])
self.assertEqual(backend.get_session().run(fetches=[x, y]), [20., 30.])
# updated value in feed_dict will be modified within the K.function()
feed_dict[y_placeholder] = 4.
output = f([20.])
self.assertEqual(output, [21.])
self.assertEqual(backend.get_session().run(fetches=[x, y]), [30., 40.])
def test_function_tf_run_options_with_run_metadata(self):
with ops.Graph().as_default(), self.cached_session():
x_placeholder = backend.placeholder(shape=())
y_placeholder = backend.placeholder(shape=())
run_options = config_pb2.RunOptions(output_partition_graphs=True)
run_metadata = config_pb2.RunMetadata()
# enable run_options.
f = backend.function(
inputs=[x_placeholder, y_placeholder],
outputs=[x_placeholder + y_placeholder],
options=run_options,
run_metadata=run_metadata)
output = f([10., 20.])
self.assertEqual(output, [30.])
self.assertNotEmpty(run_metadata.partition_graphs)
# disable run_options.
f1 = backend.function(
inputs=[x_placeholder, y_placeholder],
outputs=[x_placeholder + y_placeholder],
run_metadata=run_metadata)
output1 = f1([10., 20.])
self.assertEqual(output1, [30.])
self.assertEmpty(run_metadata.partition_graphs)
def test_function_fetch_callbacks(self):
class CallbackStub(object):
def __init__(self):
self.times_called = 0
self.callback_result = 0
def _fetch_callback(self, result):
self.times_called += 1
self.callback_result = result
with ops.Graph().as_default(), self.cached_session():
callback = CallbackStub()
x_placeholder = backend.placeholder(shape=())
y_placeholder = backend.placeholder(shape=())
callback_op = x_placeholder * y_placeholder
f = backend.function(
inputs=[x_placeholder, y_placeholder],
outputs=[x_placeholder + y_placeholder])
f.fetches.append(callback_op)
f.fetch_callbacks[callback_op] = callback._fetch_callback
_ = f([10., 20.])
self.assertEqual(callback.times_called, 1)
self.assertEqual(callback.callback_result, 200)
def test_get_session_different_graphs(self):
with ops.Graph().as_default():
x = backend.constant(1)
session = backend.get_session()
self.assertIs(session, backend.get_session((x,)))
self.assertIs(session, backend.get_session())
with ops.Graph().as_default():
self.assertIs(session, backend.get_session((x,)))
self.assertIsNot(session, backend.get_session())
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
class ControlOpsTests(test.TestCase):
def test_function_switch_basics(self):
x = array_ops.constant(2.0)
y = array_ops.constant(3.0)
def xpowy():
return backend.pow(x, y)
def ypowx():
return backend.pow(y, x)
tensor = backend.switch(backend.less(x, y), xpowy, ypowx)
self.assertEqual(backend.eval(tensor), [8.0])
tensor = backend.switch(backend.greater(x, y), xpowy, ypowx)
self.assertEqual(backend.eval(tensor), [9.0])
def test_unequal_rank(self):
x = ops.convert_to_tensor_v2(
np.array([[1, 2, 3], [4, 5, 6]]), dtype='float32')
y = ops.convert_to_tensor_v2(np.array([1, 2, 3]), dtype='float32')
def true_func():
return x
def false_func():
return y
with self.assertRaisesRegexp(ValueError,
'Rank of `condition` should be less than'):
backend.switch(backend.equal(x, x), false_func, true_func)
class ContextValueCacheTest(test.TestCase):
def test_cache(self):
cache = backend.ContextValueCache(list)
graph1 = ops.Graph()
graph2 = ops.Graph()
cache[graph1].append(1)
with graph1.as_default():
cache[None].append(2)
with graph2.as_default():
cache[None].append(3)
cache[graph2].append(4)
self.assertAllEqual(cache[graph1], [1, 2])
self.assertAllEqual(cache[graph2], [3, 4])
with context.eager_mode():
cache[None].append(5)
cache[None].append(6)
self.assertAllEqual(cache[None], [5, 6])
self.assertLen(cache, 3)
del graph1
gc.collect()
self.assertLen(cache, 2)
def test_cache_in_parent_graph(self):
cache = backend.ContextValueCache(int)
cache.setdefault(None, backend.constant(5))
with ops.Graph().as_default() as g:
# g is not a child graph of the default test context, so the recursive
# lookup will create a new default value.
self.assertAllEqual(cache[g], 0)
@def_function.function
def fn():
# The function graph is a child of the default test context, so
# __getitem__ will return the previously saved value.
return cache[ops.get_default_graph()]
self.assertEqual(self.evaluate(fn()), 5)
if __name__ == '__main__':
test.main()