Move keras related convert_to_constants_test to keras/tests.

PiperOrigin-RevId: 306056993
Change-Id: I220960a74da3563de424bf50fbac6739b3c5df6a
This commit is contained in:
Scott Zhu 2020-04-11 14:03:45 -07:00 committed by TensorFlower Gardener
parent d2ad4a5c40
commit b737d47b53
3 changed files with 203 additions and 77 deletions

View File

@ -22,7 +22,6 @@ import os
import numpy as np
from tensorflow.python import keras
from tensorflow.python.client import session as session_lib
from tensorflow.python.eager import def_function
from tensorflow.python.framework import constant_op
@ -34,7 +33,6 @@ from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import cond_v2
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import rnn
from tensorflow.python.ops import rnn_cell_impl
@ -221,29 +219,6 @@ class VariablesToConstantsTest(test.TestCase):
input_func)
self._testConvertedFunction(root, root.add, output_func, input_data)
@test_util.run_v2_only
def testKerasModel(self):
"""Test a basic Keras model with Variables."""
input_data = {"x": constant_op.constant(1., shape=[1, 1])}
# Create a simple Keras model.
x = [-1, 0, 1, 2, 3, 4]
y = [-3, -1, 1, 3, 5, 7]
model = keras.models.Sequential(
[keras.layers.Dense(units=1, input_shape=[1])])
model.compile(optimizer="sgd", loss="mean_squared_error")
model.fit(x, y, epochs=1)
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[1, 1], dtype=dtypes.float32)
])
def to_save(x):
return model(x)
root, output_func = self._freezeModel(to_save)
self._testConvertedFunction(root, root.f, output_func, input_data)
def _singleMetaGraphSavedModel(self):
export_graph = ops.Graph()
with export_graph.as_default():
@ -413,58 +388,6 @@ class VariablesToConstantsTest(test.TestCase):
root, output_func = self._freezeModel(model)
self._testConvertedFunction(root, root.f, output_func, input_data)
@test_util.run_v2_only
def testKerasLSTM(self):
"""Test a Keras LSTM containing dynamic_rnn ops."""
input_data = {
"x":
constant_op.constant(
np.array(
np.random.random_sample((10, 10, 10)), dtype=np.float32))
}
model = keras.models.Sequential(
[keras.layers.LSTM(units=10, input_shape=(10, 10))])
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[10, 10, 10], dtype=dtypes.float32)
])
def to_save(x):
return model(x)
root, output_func = self._freezeModel(to_save)
self._testConvertedFunction(root, root.f, output_func, input_data)
@test_util.run_v2_only
def testEmbeddings(self):
"""Test model with embeddings."""
input_data = {
"x":
constant_op.constant(
np.array(np.random.random_sample((20)), dtype=np.int32))
}
class EmbeddingModel(keras.Model):
def __init__(self):
super(EmbeddingModel, self).__init__()
self.shared_weights = self.add_weight(
"weights",
shape=(2000, 300),
dtype=dtypes.float32,
initializer=init_ops.random_normal_initializer(
mean=0.0, stddev=300**(-0.5)))
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=(20), dtype=dtypes.int32)
])
def func(self, x):
return array_ops.gather(self.shared_weights, x)
model = EmbeddingModel()
root, output_func = self._freezeModel(model.func)
self._testConvertedFunction(root, root.f, output_func, input_data)
if __name__ == "__main__":
test.main()

View File

@ -26,6 +26,28 @@ tf_py_test(
],
)
tf_py_test(
name = "convert_to_constants_test",
srcs = ["convert_to_constants_test.py"],
python_version = "PY3",
shard_count = 4,
deps = [
"//tensorflow/python:array_ops",
"//tensorflow/python:client_testlib",
"//tensorflow/python:constant_op",
"//tensorflow/python:convert_to_constants",
"//tensorflow/python:dtypes",
"//tensorflow/python:framework_test_lib",
"//tensorflow/python:init_ops",
"//tensorflow/python:tensor_spec",
"//tensorflow/python:util",
"//tensorflow/python/eager:def_function",
"//tensorflow/python/saved_model:load",
"//tensorflow/python/saved_model:save",
"//tensorflow/python/training/tracking",
],
)
tf_py_test(
name = "custom_training_loop_test",
srcs = ["custom_training_loop_test.py"],

View File

@ -0,0 +1,181 @@
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for convert_to_constants.py."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import numpy as np
from tensorflow.python import keras
from tensorflow.python.eager import def_function
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import convert_to_constants
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import tensor_spec
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.platform import test
from tensorflow.python.saved_model.load import load
from tensorflow.python.saved_model.save import save
from tensorflow.python.training.tracking import tracking
from tensorflow.python.util import nest
class VariablesToConstantsTest(test.TestCase):
def _freezeModel(self, model):
"""Freezes the model.
Args:
model: Function.
Returns:
root: AutoTrackable object with original ConcreteFunction.
output_func: frozen ConcreteFunction.
"""
root = tracking.AutoTrackable()
root.f = model
input_func = root.f.get_concrete_function()
output_func = convert_to_constants.convert_variables_to_constants_v2(
input_func, lower_control_flow=False)
return root, output_func
def _hasStatefulPartitionedCallOp(self, graph_def):
"""Determines if a StatefulPartitionedCall op exists in the graph."""
for node in graph_def.node:
if node.op == "StatefulPartitionedCall":
return True
return False
def _getNumVariables(self, graph_def):
"""Returns the number of ReadVariableOp in the graph."""
return sum(node.op == "ReadVariableOp" for node in graph_def.node)
def _testConvertedFunction(self, obj, func, converted_concrete_func,
input_data):
# Ensure the converted graph has no variables and no function calls.
constant_graph_def = converted_concrete_func.graph.as_graph_def()
self.assertEqual(0, self._getNumVariables(constant_graph_def))
self.assertFalse(self._hasStatefulPartitionedCallOp(constant_graph_def))
# Check that the converted ConcreteFunction produces the same result as the
# original Function.
expected_value = nest.flatten(func(**input_data))
actual_value = nest.flatten(converted_concrete_func(**input_data))
for expected, actual in zip(expected_value, actual_value):
np.testing.assert_almost_equal(expected.numpy(), actual.numpy())
# Ensure the shape is retained.
for tensor in converted_concrete_func.inputs:
actual_shape = input_data[tensor.name.split(":")[0]].shape
self.assertEqual(tensor.shape, actual_shape)
# Save the converted ConcreteFunction as a signature.
save_dir = os.path.join(self.get_temp_dir(), "frozen_saved_model")
root = tracking.AutoTrackable()
root.f = converted_concrete_func
save(root, save_dir, {"mykey": converted_concrete_func})
# Load it back and make sure it works.
loaded_obj = load(save_dir)
actual_value = nest.flatten(loaded_obj.signatures["mykey"](**input_data))
for expected, actual in zip(expected_value, actual_value):
np.testing.assert_almost_equal(expected.numpy(), actual.numpy())
@test_util.run_v2_only
def testKerasModel(self):
"""Test a basic Keras model with Variables."""
input_data = {"x": constant_op.constant(1., shape=[1, 1])}
# Create a simple Keras model.
x = [-1, 0, 1, 2, 3, 4]
y = [-3, -1, 1, 3, 5, 7]
model = keras.models.Sequential(
[keras.layers.Dense(units=1, input_shape=[1])])
model.compile(optimizer="sgd", loss="mean_squared_error")
model.fit(x, y, epochs=1)
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[1, 1], dtype=dtypes.float32)
])
def to_save(x):
return model(x)
root, output_func = self._freezeModel(to_save)
self._testConvertedFunction(root, root.f, output_func, input_data)
@test_util.run_v2_only
def testKerasLSTM(self):
"""Test a Keras LSTM containing dynamic_rnn ops."""
input_data = {
"x":
constant_op.constant(
np.array(
np.random.random_sample((10, 10, 10)), dtype=np.float32))
}
model = keras.models.Sequential(
[keras.layers.LSTM(units=10, input_shape=(10, 10))])
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[10, 10, 10], dtype=dtypes.float32)
])
def to_save(x):
return model(x)
root, output_func = self._freezeModel(to_save)
self._testConvertedFunction(root, root.f, output_func, input_data)
@test_util.run_v2_only
def testEmbeddings(self):
"""Test model with embeddings."""
input_data = {
"x":
constant_op.constant(
np.array(np.random.random_sample((20)), dtype=np.int32))
}
class EmbeddingModel(keras.Model):
def __init__(self):
super(EmbeddingModel, self).__init__()
self.shared_weights = self.add_weight(
"weights",
shape=(2000, 300),
dtype=dtypes.float32,
initializer=init_ops.random_normal_initializer(
mean=0.0, stddev=300**(-0.5)))
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=(20), dtype=dtypes.int32)
])
def func(self, x):
return array_ops.gather(self.shared_weights, x)
model = EmbeddingModel()
root, output_func = self._freezeModel(model.func)
self._testConvertedFunction(root, root.f, output_func, input_data)
if __name__ == "__main__":
test.main()