2446 lines
100 KiB
Python
2446 lines
100 KiB
Python
# Lint as: python2, python3
|
|
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""Tests for lite.py."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import io
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
|
|
from absl.testing import parameterized
|
|
import numpy as np
|
|
import six
|
|
from six.moves import range
|
|
|
|
from tensorflow.lite.python import lite
|
|
from tensorflow.lite.python import lite_constants
|
|
from tensorflow.lite.python.convert import ConverterError
|
|
from tensorflow.lite.python.convert import mlir_quantize
|
|
from tensorflow.lite.python.interpreter import Interpreter
|
|
from tensorflow.python import keras
|
|
from tensorflow.python.client import session
|
|
from tensorflow.python.eager import context
|
|
from tensorflow.python.eager import def_function
|
|
from tensorflow.python.framework import constant_op
|
|
from tensorflow.python.framework import dtypes
|
|
from tensorflow.python.framework import ops
|
|
from tensorflow.python.framework import test_util
|
|
from tensorflow.python.ops import array_ops
|
|
from tensorflow.python.ops import gen_array_ops
|
|
from tensorflow.python.ops import math_ops
|
|
from tensorflow.python.ops import nn_ops
|
|
from tensorflow.python.ops import variable_scope
|
|
from tensorflow.python.ops import variables
|
|
from tensorflow.python.ops.variables import global_variables_initializer as _global_variables_initializer
|
|
from tensorflow.python.platform import gfile
|
|
from tensorflow.python.platform import resource_loader
|
|
from tensorflow.python.platform import test
|
|
from tensorflow.python.saved_model import saved_model
|
|
from tensorflow.python.training.training_util import write_graph
|
|
|
|
|
|
class LiteTest(test_util.TensorFlowTestCase):
|
|
"""Base class of all the tests in this module."""
|
|
|
|
def setUp(self):
|
|
self._original_use_experimental_new_converter = (
|
|
lite._USE_EXPERIMENTAL_NEW_CONVERTER)
|
|
super(LiteTest, self).setUp()
|
|
|
|
def tearDown(self):
|
|
super(LiteTest, self).tearDown()
|
|
lite._USE_EXPERIMENTAL_NEW_CONVERTER = (
|
|
self._original_use_experimental_new_converter)
|
|
|
|
|
|
class TestModels(LiteTest):
|
|
|
|
def assertValidDebugInfo(self, debug_info):
|
|
"""Verify the DebugInfo is valid."""
|
|
file_names = set()
|
|
for file_path in debug_info.files:
|
|
file_names.add(os.path.basename(file_path))
|
|
# To make the test independent on how the nodes are created, we only assert
|
|
# the name of this test file.
|
|
self.assertIn('lite_test.py', file_names)
|
|
self.assertNotIn('lite_v2_test.py', file_names)
|
|
|
|
|
|
class FromConstructor(TestModels):
|
|
|
|
# Tests invalid constructors using a dummy value for the GraphDef.
|
|
def testInvalidConstructor(self):
|
|
message = ('If input_tensors and output_tensors are None, both '
|
|
'input_arrays_with_shape and output_arrays must be defined.')
|
|
|
|
# `output_arrays` is not defined.
|
|
with self.assertRaises(ValueError) as error:
|
|
lite.TFLiteConverter(
|
|
None, None, [], input_arrays_with_shape=[('input', [3, 9])])
|
|
self.assertEqual(message, str(error.exception))
|
|
|
|
# `input_arrays_with_shape` is not defined.
|
|
with self.assertRaises(ValueError) as error:
|
|
lite.TFLiteConverter(None, [], None, output_arrays=['output'])
|
|
self.assertEqual(message, str(error.exception))
|
|
|
|
# Tests valid constructors using a dummy value for the GraphDef.
|
|
def testValidConstructor(self):
|
|
converter = lite.TFLiteConverter(
|
|
None,
|
|
None,
|
|
None,
|
|
input_arrays_with_shape=[('input', [3, 9])],
|
|
output_arrays=['output'])
|
|
self.assertFalse(converter._has_valid_tensors())
|
|
self.assertEqual(converter.get_input_arrays(), ['input'])
|
|
|
|
with self.assertRaises(ValueError) as error:
|
|
converter._set_batch_size(1)
|
|
self.assertEqual(
|
|
'The batch size cannot be set for this model. Please use '
|
|
'input_shapes parameter.', str(error.exception))
|
|
|
|
converter = lite.TFLiteConverter(None, ['input_tensor'], ['output_tensor'])
|
|
self.assertTrue(converter._has_valid_tensors())
|
|
|
|
|
|
class FromSessionTest(TestModels, parameterized.TestCase):
|
|
|
|
def testFloat(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testForgottenCallToAllocateTensors(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
input_index = interpreter.get_input_details()[0]['index']
|
|
dummy_tensor = np.ones(shape=[1, 16, 16, 3], dtype=np.float32)
|
|
with self.assertRaises(ValueError):
|
|
interpreter.set_tensor(input_index, dummy_tensor)
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testString(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(shape=[4], dtype=dtypes.string)
|
|
out_tensor = array_ops.reshape(in_tensor, shape=[2, 2])
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = enable_mlir
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.string_, input_details[0]['dtype'])
|
|
self.assertTrue(([4] == input_details[0]['shape']).all())
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('Reshape', output_details[0]['name'])
|
|
self.assertEqual(np.string_, output_details[0]['dtype'])
|
|
self.assertTrue(([2, 2] == output_details[0]['shape']).all())
|
|
# TODO(b/122659643): Test setting/getting string data via the python
|
|
# interpreter API after support has been added.
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testQuantization(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
in_tensor_1 = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputA')
|
|
in_tensor_2 = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputB')
|
|
out_tensor = array_ops.fake_quant_with_min_max_args(
|
|
in_tensor_1 + in_tensor_2, min=0., max=1., name='output')
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess,
|
|
[in_tensor_1, in_tensor_2],
|
|
[out_tensor])
|
|
converter.inference_type = lite_constants.QUANTIZED_UINT8
|
|
converter.quantized_input_stats = {
|
|
'inputA': (0., 1.),
|
|
'inputB': (0., 1.)
|
|
} # mean, std_dev
|
|
converter.experimental_new_converter = enable_mlir
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(2, len(input_details))
|
|
self.assertEqual('inputA', input_details[0]['name'])
|
|
self.assertEqual(np.uint8, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((1., 0.),
|
|
input_details[0]['quantization']) # scale, zero_point
|
|
|
|
self.assertEqual('inputB', input_details[1]['name'])
|
|
self.assertEqual(np.uint8, input_details[1]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all())
|
|
self.assertEqual((1., 0.),
|
|
input_details[1]['quantization']) # scale, zero_point
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual(np.uint8, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertTrue(output_details[0]['quantization'][0] > 0) # scale
|
|
|
|
def testQuantizedInput(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.inference_input_type = lite_constants.QUANTIZED_UINT8
|
|
converter.inference_type = lite_constants.FLOAT
|
|
converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.uint8, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((1., 0.),
|
|
input_details[0]['quantization']) # scale, zero_point
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertLen(output_details, 1)
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization']) # float
|
|
|
|
def testQuantizationInvalid(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor_1 = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputA')
|
|
in_tensor_2 = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputB')
|
|
out_tensor = array_ops.fake_quant_with_min_max_args(
|
|
in_tensor_1 + in_tensor_2, min=0., max=1., name='output')
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess,
|
|
[in_tensor_1, in_tensor_2],
|
|
[out_tensor])
|
|
converter.inference_type = lite_constants.QUANTIZED_UINT8
|
|
converter.quantized_input_stats = {'inputA': (0., 1.)} # mean, std_dev
|
|
with self.assertRaises(ValueError) as error:
|
|
converter.convert()
|
|
self.assertEqual(
|
|
'Quantization input stats are not available for input tensors '
|
|
'\'inputB\'.', str(error.exception))
|
|
|
|
def testIntermediateInputArray(self):
|
|
"""Convert a model from an intermediate input array."""
|
|
with ops.Graph().as_default():
|
|
in_tensor_init = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
in_tensor_final = in_tensor_init + in_tensor_init
|
|
out_tensor = in_tensor_final + in_tensor_final
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor_final],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('add', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add_1', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testSizeNoneInvalid(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Test None as shape when dynamic shapes are disabled. Run with TOCO in
|
|
# order to invoke shape checking code.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = False
|
|
with self.assertRaises(ValueError) as error:
|
|
converter.convert()
|
|
self.assertEqual('Provide an input shape for input array \'Placeholder\'.',
|
|
str(error.exception))
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testScalarValid(self, enable_mlir):
|
|
# Construct a graph using a scalar (empty shape) input.
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(dtype=dtypes.float32, shape=[])
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Test conversion with the scalar input shape.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = enable_mlir
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([] == input_details[0]['shape']).all())
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([] == input_details[0]['shape']).all())
|
|
|
|
# Validate inference using the scalar inputs/outputs.
|
|
test_input = np.array(4.0, dtype=np.float32)
|
|
expected_output = np.array(8.0, dtype=np.float32)
|
|
interpreter.set_tensor(input_details[0]['index'], test_input)
|
|
interpreter.invoke()
|
|
|
|
output_data = interpreter.get_tensor(output_details[0]['index'])
|
|
self.assertTrue((expected_output == output_data).all())
|
|
|
|
def testSizeInvalid(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, None, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Test invalid shape. None after 1st dimension. Run with TOCO in order to
|
|
# invoke shape checking code.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = False
|
|
with self.assertRaises(ValueError) as error:
|
|
converter.convert()
|
|
self.assertEqual(
|
|
'None is only supported in the 1st dimension. Tensor '
|
|
'\'Placeholder\' has invalid shape \'[1, None, 16, 3]\'.',
|
|
str(error.exception))
|
|
|
|
def testSizeNone(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, None, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Test None after 1st dimension.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = True
|
|
tflite_model = converter.convert()
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 1, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertTrue(([1, -1, 16,
|
|
3] == input_details[0]['shape_signature']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
# Resize tensor with strict checking.
|
|
with self.assertRaises(RuntimeError) as error:
|
|
interpreter.resize_tensor_input(0, [3, 16, 16, 3], strict=True)
|
|
self.assertIn(
|
|
'ResizeInputTensorStrict only allows mutating unknown dimensions '
|
|
'identified by -1.', str(error.exception))
|
|
|
|
# Resize tensor and invoke.
|
|
interpreter.resize_tensor_input(0, [1, 16, 16, 3], strict=True)
|
|
interpreter.allocate_tensors()
|
|
interpreter.invoke()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertTrue(([1, -1, 16,
|
|
3] == input_details[0]['shape_signature']).all())
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertTrue(([1, -1, 16,
|
|
3] == output_details[0]['shape_signature']).all())
|
|
|
|
def testResizeTensorInputStrict(self):
|
|
# Ensures that resize_tensor_input(strict=True) works as expected.
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
|
|
# Resize incorrect value.
|
|
with self.assertRaises(RuntimeError) as error:
|
|
interpreter.resize_tensor_input(0, [3, 16, 16, 3], strict=True)
|
|
self.assertIn(
|
|
'ResizeInputTensorStrict only allows mutating unknown dimensions '
|
|
'identified by -1.', str(error.exception))
|
|
|
|
# Resize correct value.
|
|
interpreter.resize_tensor_input(0, [1, 16, 16, 3], strict=True)
|
|
interpreter.allocate_tensors()
|
|
|
|
def testBatchSizeValid(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[None, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testBatchSizeNonZero(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor_1 = array_ops.placeholder(
|
|
shape=[None, 4], dtype=dtypes.float32, name='input1')
|
|
in_tensor_2 = array_ops.placeholder(
|
|
shape=[4, 10], dtype=dtypes.float32, name='input2')
|
|
out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2)
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess,
|
|
[in_tensor_1, in_tensor_2],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 2)
|
|
self.assertEqual('input1', input_details[0]['name'])
|
|
self.assertTrue(([1, 4] == input_details[0]['shape']).all())
|
|
self.assertEqual('input2', input_details[1]['name'])
|
|
self.assertTrue(([4, 10] == input_details[1]['shape']).all())
|
|
|
|
def testFreezeGraph(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
var = variable_scope.get_variable(
|
|
'weights', shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
# Get the second output to ensure freezing properly processes tensor names
|
|
# like 'X:1'.
|
|
out_tensor = nn_ops.top_k(in_tensor + var, name='top_k')[1]
|
|
sess = session.Session()
|
|
sess.run(_global_variables_initializer())
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('top_k:1', output_details[0]['name'])
|
|
self.assertEqual(np.int32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 1] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testGraphviz(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.output_format = lite_constants.GRAPHVIZ_DOT
|
|
graphviz_output = converter.convert()
|
|
self.assertTrue(graphviz_output)
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testDumpGraphviz(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = enable_mlir
|
|
graphviz_dir = self.get_temp_dir()
|
|
converter.dump_graphviz_dir = graphviz_dir
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Ensure interpreter is able to allocate and check graphviz data.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
num_items_graphviz = len(os.listdir(graphviz_dir))
|
|
self.assertTrue(num_items_graphviz)
|
|
self.assertTrue(
|
|
os.path.exists(os.path.join(graphviz_dir, 'toco_AT_IMPORT.dot')))
|
|
self.assertTrue(
|
|
os.path.exists(
|
|
os.path.join(graphviz_dir, 'toco_AFTER_TRANSFORMATIONS.dot')))
|
|
|
|
# new converter doesn't support `dump_graphviz_video` flag
|
|
if not enable_mlir:
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = enable_mlir
|
|
graphviz_dir = self.get_temp_dir()
|
|
converter.dump_graphviz_dir = graphviz_dir
|
|
converter.dump_graphviz_video = True
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Ensure graphviz folder has more data after using video flag.
|
|
num_items_graphviz_video = len(os.listdir(graphviz_dir))
|
|
self.assertGreater(num_items_graphviz_video, num_items_graphviz)
|
|
|
|
def testDumpConversionSummary(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
log_dir = self.get_temp_dir()
|
|
converter.conversion_summary_dir = log_dir
|
|
# Conversion logs will only be generated when the mlir converter is enabled.
|
|
converter.experimental_new_converter = True
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
num_items_conversion_summary = len(os.listdir(log_dir))
|
|
self.assertTrue(num_items_conversion_summary)
|
|
|
|
def testDumpConversionSummaryWithOldConverter(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = False
|
|
log_dir = self.get_temp_dir()
|
|
converter.conversion_summary_dir = log_dir
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
# Check nothing is generated under the conversion summary path.
|
|
num_items_conversion_summary = len(os.listdir(log_dir))
|
|
self.assertEqual(num_items_conversion_summary, 0)
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testInferenceInputType(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = enable_mlir
|
|
converter.inference_input_type = lite_constants.QUANTIZED_UINT8
|
|
converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.uint8, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((1., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
|
|
def testDefaultRangesStats(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.inference_type = lite_constants.QUANTIZED_UINT8
|
|
converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev
|
|
converter.default_ranges_stats = (0, 6) # min, max
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.uint8, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((1., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.uint8, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertTrue(output_details[0]['quantization'][0] > 0) # scale
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testPostTrainingQuantizeDeprecatedAttribute(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
in_tensor_1 = array_ops.placeholder(
|
|
shape=[33, 33], dtype=dtypes.float32, name='inputA')
|
|
in_tensor_2 = constant_op.constant(
|
|
np.random.uniform(low=-10., high=10., size=(33, 33)),
|
|
shape=[33, 33],
|
|
dtype=dtypes.float32,
|
|
name='inputB')
|
|
out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2, name='output')
|
|
sess = session.Session()
|
|
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [in_tensor_1], [out_tensor])
|
|
self.assertFalse(quantized_converter.post_training_quantize)
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
|
|
quantized_converter.post_training_quantize = True
|
|
self.assertTrue(quantized_converter.post_training_quantize)
|
|
self.assertEqual(quantized_converter.optimizations, [lite.Optimize.DEFAULT])
|
|
|
|
quantized_tflite = quantized_converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testPostTrainingQuantize(self, enable_mlir):
|
|
np.random.seed(0)
|
|
with ops.Graph().as_default():
|
|
# We need the tensor to have more than 1024 elements for quantize_weights
|
|
# to kick in. Thus, the [33, 33] shape.
|
|
in_tensor_1 = array_ops.placeholder(
|
|
shape=[33, 33], dtype=dtypes.float32, name='inputA')
|
|
in_tensor_2 = constant_op.constant(
|
|
np.random.uniform(low=-10., high=10., size=(33, 33)),
|
|
shape=[33, 33],
|
|
dtype=dtypes.float32,
|
|
name='inputB')
|
|
out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2, name='output')
|
|
sess = session.Session()
|
|
|
|
# Convert float model.
|
|
float_converter = lite.TFLiteConverter.from_session(sess, [in_tensor_1],
|
|
[out_tensor])
|
|
float_converter.experimental_new_converter = enable_mlir
|
|
float_tflite = float_converter.convert()
|
|
self.assertTrue(float_tflite)
|
|
|
|
# Convert quantized weights model.
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [in_tensor_1], [out_tensor])
|
|
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_tflite = quantized_converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
|
|
# Ensure that the quantized weights tflite model is smaller.
|
|
self.assertTrue(len(quantized_tflite) < len(float_tflite))
|
|
|
|
def _getCalibrationQuantizeModel(self):
|
|
np.random.seed(0)
|
|
inp = array_ops.placeholder(
|
|
dtype=dtypes.float32, shape=(1, 5, 5, 3), name='input')
|
|
conv = nn_ops.conv2d(
|
|
inp,
|
|
filter=array_ops.ones([3, 3, 3, 16]),
|
|
strides=[1, 1, 1, 1],
|
|
padding='SAME')
|
|
output = nn_ops.relu(conv, name='output')
|
|
|
|
def calibration_gen():
|
|
for _ in range(5):
|
|
yield [np.random.uniform(-1, 1, size=(1, 5, 5, 3)).astype(np.float32)]
|
|
|
|
return (inp, output, calibration_gen)
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testPostTrainingCalibrateAndQuantize(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
inp, output, calibration_gen = self._getCalibrationQuantizeModel()
|
|
sess = session.Session()
|
|
|
|
# Convert float model.
|
|
float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output])
|
|
float_tflite = float_converter.convert()
|
|
self.assertTrue(float_tflite)
|
|
|
|
# Convert quantized model.
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [inp], [output])
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
|
|
quantized_converter.representative_dataset = calibration_gen
|
|
quantized_tflite = quantized_converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
|
|
# The default input and output types should be float.
|
|
interpreter = Interpreter(model_content=quantized_tflite)
|
|
interpreter.allocate_tensors()
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
|
|
# Ensure that the quantized weights tflite model is smaller.
|
|
self.assertLess(len(quantized_tflite), len(float_tflite))
|
|
|
|
@parameterized.named_parameters(
|
|
# Quantize model to Int8: with enable mlir
|
|
('UseTfliteBuiltinsIntEnableMLIR',
|
|
[lite.OpsSet.TFLITE_BUILTINS_INT8], True),
|
|
# Quantize model to Int8: with disable mlir
|
|
('UseTfliteBuiltinsIntDisableMLIR',
|
|
[lite.OpsSet.TFLITE_BUILTINS_INT8], False),
|
|
# Quantize model to Int16: with disable mlir
|
|
('UseTfliteBuiltinsInt16DisableMLIR',
|
|
[lite.OpsSet.\
|
|
EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8],
|
|
False),
|
|
('UseTfliteBuiltinsInt16EnableMLIR',
|
|
[lite.OpsSet.\
|
|
EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8],
|
|
True))
|
|
def testCalibrateAndQuantizeBuiltinInt(self, supported_ops, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
inp, output, calibration_gen = self._getCalibrationQuantizeModel()
|
|
sess = session.Session()
|
|
|
|
# Convert float model.
|
|
float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output])
|
|
float_converter.experimental_new_converter = enable_mlir
|
|
float_tflite = float_converter.convert()
|
|
self.assertTrue(float_tflite)
|
|
|
|
# Convert model by specifying target spec (instead of optimizations), since
|
|
# when targeting an integer only backend, quantization is mandatory.
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [inp], [output])
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.target_spec.supported_ops = supported_ops
|
|
quantized_converter.representative_dataset = calibration_gen
|
|
quantized_tflite = quantized_converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
|
|
# The default input and output types should be float.
|
|
interpreter = Interpreter(model_content=quantized_tflite)
|
|
interpreter.allocate_tensors()
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
|
|
# Ensure that the quantized weights tflite model is smaller.
|
|
self.assertLess(len(quantized_tflite), len(float_tflite))
|
|
|
|
@parameterized.named_parameters(
|
|
# Quantize to Float16 even if rep data provided.
|
|
('UseRepresentativeData', True, False, True, False, False, False),
|
|
# Quantize to Float16 if no rep data provided.
|
|
('NoRepresentativeData', False, False, True, False, False, False),
|
|
# Post training quantization if both rep data and int8 included.
|
|
('UseSampleDataIncludeInt8', True, True, False, False, True, False),
|
|
|
|
# Quantize to Float16 even if rep data provided with mlir.
|
|
('UseRepresentativeDataMlir', True, False, True, False, False, True),
|
|
# Quantize to Float16 if no rep data provided with mlir.
|
|
('NoRepresentativeDataMlir', False, False, True, False, False, True),
|
|
# Post training quantization if both rep data and int8 included with mlir.
|
|
('SampleDataIncludeInt8Mlir', True, True, False, False, True, True))
|
|
def testQuantizeFloat16(self, use_rep_data, include_int8,
|
|
is_float16_quantized, is_error,
|
|
is_post_training_quantized, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
inp, output, calibration_gen = self._getCalibrationQuantizeModel()
|
|
sess = session.Session()
|
|
|
|
idx = 1 if enable_mlir else 0
|
|
node_name = 'Conv2D' if enable_mlir else 'Conv2D_bias'
|
|
# Convert float model.
|
|
float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output])
|
|
float_converter.experimental_new_converter = enable_mlir
|
|
float_tflite = float_converter.convert()
|
|
self.assertTrue(float_tflite)
|
|
interpreter = Interpreter(model_content=float_tflite)
|
|
interpreter.allocate_tensors()
|
|
self.assertEqual(interpreter.get_tensor_details()[idx]['name'], node_name)
|
|
self.assertEqual(interpreter.get_tensor_details()[idx]['dtype'],
|
|
lite.constants.FLOAT)
|
|
# Convert model to quantized version
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [inp], [output])
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
|
|
quantized_converter.target_spec.supported_types = [lite.constants.FLOAT16]
|
|
if include_int8:
|
|
quantized_converter.target_spec.supported_types.append(
|
|
lite.constants.INT8)
|
|
if use_rep_data:
|
|
quantized_converter.representative_dataset = calibration_gen
|
|
|
|
if is_error:
|
|
with self.assertRaises(ValueError) as error:
|
|
quantized_converter.convert()
|
|
self.assertEqual(
|
|
'representative_dataset is required when specifying '
|
|
'TFLITE_BUILTINS_INT8 or INT8 supported types.', str(error.exception))
|
|
|
|
else:
|
|
quantized_tflite = quantized_converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
interpreter = Interpreter(model_content=quantized_tflite)
|
|
interpreter.allocate_tensors()
|
|
self.assertEqual(interpreter.get_tensor_details()[idx]['name'], node_name)
|
|
|
|
if is_float16_quantized:
|
|
# Verify that bias constant is float16 type.
|
|
self.assertEqual(interpreter.get_tensor_details()[idx]['dtype'],
|
|
lite.constants.FLOAT16)
|
|
elif is_post_training_quantized:
|
|
# Verify that bias constants is int32 type.
|
|
self.assertEqual(interpreter.get_tensor_details()[idx]['dtype'],
|
|
lite.constants.INT32)
|
|
else:
|
|
raise ValueError('Invalid test options.')
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testInvalidQuantizeFloat16(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
inp, output, _ = self._getCalibrationQuantizeModel()
|
|
sess = session.Session()
|
|
|
|
# Specify float16 quantization
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [inp], [output])
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
|
|
quantized_converter.target_spec.supported_types = [lite.constants.FLOAT16]
|
|
# Specify only int8 builtin ops
|
|
quantized_converter.target_spec.supported_ops = [
|
|
lite.OpsSet.TFLITE_BUILTINS_INT8
|
|
]
|
|
with self.assertRaises(ValueError) as error:
|
|
quantized_converter.convert()
|
|
self.assertEqual(
|
|
'TFLITE_BUILTINS_INT8 requires smallest supported type to be INT8.',
|
|
str(error.exception))
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testInvalidPostTrainingQuantize(self, enable_mlir):
|
|
np.random.seed(0)
|
|
with ops.Graph().as_default():
|
|
# We need the tensor to have more than 1024 elements for quantize_weights
|
|
# to kick in. Thus, the [33, 33] shape.
|
|
in_tensor_1 = array_ops.placeholder(
|
|
shape=[33, 33], dtype=dtypes.float32, name='inputA')
|
|
in_tensor_2 = constant_op.constant(
|
|
np.random.uniform(low=-10., high=10., size=(33, 33)),
|
|
shape=[33, 33],
|
|
dtype=dtypes.float32,
|
|
name='inputB')
|
|
out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2, name='output')
|
|
sess = session.Session()
|
|
|
|
# Attempt to convert to quantized weights model.
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [in_tensor_1], [out_tensor])
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
|
|
# Restricting to int8 type only
|
|
quantized_converter.target_spec.supported_types = [lite.constants.INT8]
|
|
# A representative dataset is required for full fixed point quantization.
|
|
with self.assertRaises(ValueError) as error:
|
|
quantized_converter.convert()
|
|
self.assertEqual(
|
|
'representative_dataset is required when specifying '
|
|
'TFLITE_BUILTINS_INT8 or INT8 supported types.', str(error.exception))
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testPostTrainingCalibrateAndQuantizeFloatNotAllowed(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
inp, output, calibration_gen = self._getCalibrationQuantizeModel()
|
|
sess = session.Session()
|
|
|
|
# Convert float model.
|
|
float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output])
|
|
float_converter.experimental_new_converter = enable_mlir
|
|
float_tflite = float_converter.convert()
|
|
self.assertTrue(float_tflite)
|
|
|
|
# Convert quantized model.
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [inp], [output])
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
|
|
quantized_converter.representative_dataset = calibration_gen
|
|
quantized_converter.target_spec.supported_types = [lite.constants.INT8]
|
|
quantized_tflite = quantized_converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
|
|
# Ensure that the quantized weights tflite model is smaller.
|
|
self.assertLess(len(quantized_tflite), len(float_tflite))
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testPostTrainingCalibrateAndQuantizeInt8Inputs(self, enable_mlir):
|
|
with ops.Graph().as_default():
|
|
inp, output, calibration_gen = self._getCalibrationQuantizeModel()
|
|
sess = session.Session()
|
|
|
|
# Convert float model.
|
|
float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output])
|
|
float_converter.experimental_new_converter = enable_mlir
|
|
float_tflite = float_converter.convert()
|
|
self.assertTrue(float_tflite)
|
|
|
|
# Convert quantized weights model.
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [inp], [output])
|
|
quantized_converter.experimental_new_converter = enable_mlir
|
|
quantized_converter.inference_input_type = lite_constants.INT8
|
|
quantized_converter.inference_output_type = lite_constants.INT8
|
|
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
|
|
quantized_converter.representative_dataset = calibration_gen
|
|
quantized_tflite = quantized_converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
|
|
# The input and output types should be int8.
|
|
interpreter = Interpreter(model_content=quantized_tflite)
|
|
interpreter.allocate_tensors()
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual(np.int8, input_details[0]['dtype'])
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual(np.int8, output_details[0]['dtype'])
|
|
|
|
# Ensure that the quantized weights tflite model is smaller.
|
|
self.assertLess(len(quantized_tflite), len(float_tflite))
|
|
|
|
@parameterized.named_parameters(
|
|
('InferenceType_INT8', lite_constants.INT8),
|
|
('InferenceType_QUANTIZED_INT8', lite_constants.QUANTIZED_UINT8))
|
|
def testRequiresInputStatsForTrainingTimeQuantization(self, quantized_type):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = array_ops.fake_quant_with_min_max_args(
|
|
in_tensor + in_tensor, min=0., max=1.)
|
|
sess = session.Session()
|
|
|
|
quantized_converter = lite.TFLiteConverter.from_session(
|
|
sess, [in_tensor], [out_tensor])
|
|
|
|
with self.assertRaises(ValueError) as error:
|
|
quantized_converter.inference_type = quantized_type
|
|
quantized_converter.convert()
|
|
self.assertEqual(
|
|
'std_dev and mean must be defined when inference_type or '
|
|
'inference_input_type is QUANTIZED_UINT8 or INT8.',
|
|
str(error.exception))
|
|
|
|
with self.assertRaises(ValueError) as error:
|
|
quantized_converter.inference_type = lite_constants.FLOAT
|
|
quantized_converter.inference_input_type = quantized_type
|
|
quantized_converter.convert()
|
|
self.assertEqual(
|
|
'std_dev and mean must be defined when inference_type or '
|
|
'inference_input_type is QUANTIZED_UINT8 or INT8.',
|
|
str(error.exception))
|
|
|
|
quantized_converter.inference_type = quantized_type
|
|
quantized_converter.inference_input_type = quantized_type
|
|
|
|
input_arrays = quantized_converter.get_input_arrays()
|
|
quantized_converter.quantized_input_stats = {
|
|
input_arrays[0]: (0., 1.)
|
|
}
|
|
quantized_converter.convert()
|
|
|
|
def testTrainingTimeAndPostTrainingCalibrateAndQuantize(self):
|
|
with ops.Graph().as_default():
|
|
inp, output, calibration_gen = self._getCalibrationQuantizeModel()
|
|
sess = session.Session()
|
|
|
|
# Convert float model.
|
|
float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output])
|
|
float_tflite = float_converter.convert()
|
|
self.assertTrue(float_tflite)
|
|
|
|
converter = lite.TFLiteConverter.from_session(sess, [inp], [output])
|
|
|
|
# extra flags to trigger training time quantization conversion
|
|
converter.inference_type = lite_constants.INT8
|
|
converter.inference_input_type = lite_constants.FLOAT
|
|
converter.inference_output_type = lite_constants.FLOAT
|
|
input_arrays = converter.get_input_arrays()
|
|
converter.quantized_input_stats = {
|
|
input_arrays[0]: (0., 1.)
|
|
}
|
|
# trigger post-training quantization
|
|
converter.optimizations = [lite.Optimize.DEFAULT]
|
|
converter.representative_dataset = calibration_gen
|
|
converter._experimental_new_quantizer = True
|
|
quantized_tflite = converter.convert()
|
|
self.assertTrue(quantized_tflite)
|
|
self.assertLess(len(quantized_tflite), len(float_tflite))
|
|
|
|
# calibration only api
|
|
converter._experimental_calibrate_only = True
|
|
calibrated_tflite = converter.convert()
|
|
quantized_tflite = mlir_quantize(calibrated_tflite, fully_quantize=True)
|
|
interpreter = Interpreter(model_content=quantized_tflite)
|
|
interpreter.allocate_tensors()
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(np.int8, input_details[0]['dtype'])
|
|
self.assertEqual((1., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(np.int8, output_details[0]['dtype'])
|
|
|
|
def testFloatTocoConverter(self):
|
|
"""Tests deprecated test TocoConverter."""
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TocoConverter.from_session(sess, [in_tensor], [out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Ensure the interpreter is able to load.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
def testMultipleOutputNodeNames(self):
|
|
"""Tests converting a graph with an op that have multiple outputs."""
|
|
with ops.Graph().as_default():
|
|
input_tensor = array_ops.placeholder(shape=[4], dtype=dtypes.float32)
|
|
out0, out1, out2, out3 = array_ops.split(
|
|
input_tensor, [1, 1, 1, 1], axis=0)
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [input_tensor],
|
|
[out0, out1, out2, out3])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
interpreter.set_tensor(input_details[0]['index'],
|
|
np.asarray([1.0, 2.0, 3.0, 4.0], dtype=np.float32))
|
|
interpreter.invoke()
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(4, len(output_details))
|
|
self.assertEqual(1.0, interpreter.get_tensor(output_details[0]['index']))
|
|
self.assertEqual(2.0, interpreter.get_tensor(output_details[1]['index']))
|
|
self.assertEqual(3.0, interpreter.get_tensor(output_details[2]['index']))
|
|
self.assertEqual(4.0, interpreter.get_tensor(output_details[3]['index']))
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
@test_util.run_in_graph_and_eager_modes
|
|
def testFunctions(self, enable_mlir):
|
|
"""Tests tf.function in 1.X."""
|
|
|
|
@def_function.function
|
|
def plus_placeholder(x, placeholder):
|
|
return x + placeholder
|
|
|
|
with ops.Graph().as_default():
|
|
placeholder = array_ops.placeholder(
|
|
dtype=dtypes.float32, shape=[1], name='input')
|
|
variable_node = variables.Variable(1.0, name='variable_node')
|
|
defun_node = plus_placeholder(variable_node, placeholder)
|
|
output_node = math_ops.multiply(defun_node, 2.0, name='output_node')
|
|
|
|
# Initialize variables in the model.
|
|
sess = session.Session()
|
|
sess.run(variables.variables_initializer([variable_node]))
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [placeholder],
|
|
[output_node])
|
|
converter.experimental_new_converter = enable_mlir
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('input', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('output_node', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testInferenceInputOutputTypeFloatDefault(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
|
|
def testInferenceInputOutputTypeQuantizedUint8Default(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = array_ops.fake_quant_with_min_max_args(
|
|
in_tensor + in_tensor, min=0., max=1., name='output')
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.inference_type = lite_constants.QUANTIZED_UINT8
|
|
converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.uint8, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('output', output_details[0]['name'])
|
|
self.assertEqual(np.uint8, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
|
|
def testReusingConverterWithDifferentPostTrainingQuantization(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
out_tensor = array_ops.fake_quant_with_min_max_args(
|
|
in_tensor + in_tensor, min=0., max=1., name='output')
|
|
sess = session.Session()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
|
|
converter.post_training_quantize = True
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
converter.post_training_quantize = False
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
def testResizeWithShape(self):
|
|
with ops.Graph().as_default():
|
|
# Construct a graph with a dynamically shapped input and an internal node
|
|
# that relies on the output of that input's shape.
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[None, None], dtype=dtypes.float32)
|
|
in_tensor2 = [[1, 2], [3, 4]]
|
|
out_tensor = array_ops.reshape(in_tensor2, array_ops.shape(in_tensor))
|
|
sess = session.Session()
|
|
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = True
|
|
tflite_model = converter.convert()
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertTrue(([1, 1] == input_details[0]['shape']).all())
|
|
self.assertTrue(([-1, -1] == input_details[0]['shape_signature']).all())
|
|
|
|
# Resize tensor and invoke.
|
|
interpreter.resize_tensor_input(0, [4])
|
|
interpreter.allocate_tensors()
|
|
interpreter.invoke()
|
|
|
|
# The output should be reshaped properly according to the resized input.
|
|
output_details = interpreter.get_output_details()
|
|
self.assertLen(output_details, 1)
|
|
self.assertEqual(np.int32, output_details[0]['dtype'])
|
|
self.assertTrue(([4] == output_details[0]['shape']).all())
|
|
output_data = interpreter.get_tensor(output_details[0]['index'])
|
|
self.assertTrue(([1, 2, 3, 4] == output_data).all())
|
|
|
|
def testResizingIntermediateDynamicTensor(self):
|
|
# This is a regression test for the case where shape of dynamic output
|
|
# tensors changes between invocations.
|
|
# See also https://github.com/tensorflow/tensorflow/issues/26549
|
|
with ops.Graph().as_default():
|
|
input_tensor = array_ops.placeholder(shape=[1, 1], dtype=dtypes.float32)
|
|
input2_tensor = array_ops.placeholder(shape=[1], dtype=dtypes.float32)
|
|
|
|
# The bug is triggered only when dynamic tensor is intermediate. Putting
|
|
# some other ops around it.
|
|
neg = math_ops.negative(input2_tensor)
|
|
padding = array_ops.placeholder(shape=[2, 2], dtype=dtypes.int32)
|
|
output_tensor = array_ops.pad(input_tensor, padding) + neg
|
|
|
|
sess = session.Session()
|
|
|
|
converter = lite.TFLiteConverter.from_session(
|
|
sess, [input_tensor, padding, input2_tensor], [output_tensor])
|
|
tflite_model = converter.convert()
|
|
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
interpreter.set_tensor(input_details[1]['index'],
|
|
np.array([[1, 1], [1, 1]], dtype=np.int32))
|
|
interpreter.invoke()
|
|
|
|
# Without the fix, invocation will fail when changing the shape of
|
|
# intermediate dynamic tensors.
|
|
interpreter.set_tensor(input_details[1]['index'],
|
|
np.array([[2, 2], [2, 2]], dtype=np.int32))
|
|
interpreter.invoke()
|
|
|
|
def testGraphDebugInfo(self):
|
|
"""Test a session has debug info captured."""
|
|
|
|
@def_function.function
|
|
def plus_placeholder(x, placeholder):
|
|
return x + placeholder
|
|
|
|
with ops.Graph().as_default():
|
|
placeholder = array_ops.placeholder(
|
|
dtype=dtypes.float32, shape=[1], name='input')
|
|
variable_node = variables.Variable(1.0, name='variable_node')
|
|
defun_node = plus_placeholder(variable_node, placeholder)
|
|
output_node = math_ops.multiply(defun_node, 2.0, name='output_node')
|
|
|
|
# Initialize variables in the model.
|
|
sess = session.Session()
|
|
sess.run(variables.variables_initializer([variable_node]))
|
|
|
|
converter = lite.TFLiteConverter.from_session(sess, [placeholder],
|
|
[output_node])
|
|
converter.convert()
|
|
self.assertValidDebugInfo(converter._debug_info)
|
|
|
|
# Check the add node in the inlined function is included.
|
|
func = sess.graph.as_graph_def().library.function[0].signature.name
|
|
self.assertIn(('add@' + six.ensure_str(func)), converter._debug_info.traces)
|
|
|
|
|
|
class FromFrozenGraphFile(LiteTest):
|
|
|
|
def testFloat(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
_ = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Write graph to file.
|
|
graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb')
|
|
write_graph(sess.graph_def, '', graph_def_file, False)
|
|
sess.close()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_frozen_graph(graph_def_file,
|
|
['Placeholder'], ['add'])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testFloatWithShapesArray(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
_ = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Write graph to file.
|
|
graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb')
|
|
write_graph(sess.graph_def, '', graph_def_file, False)
|
|
sess.close()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_frozen_graph(
|
|
graph_def_file, ['Placeholder'], ['add'],
|
|
input_shapes={'Placeholder': [1, 16, 16, 3]})
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
|
|
def testFreezeGraph(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
var = variable_scope.get_variable(
|
|
'weights', shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
_ = in_tensor + var
|
|
sess = session.Session()
|
|
|
|
# Write graph to file.
|
|
graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb')
|
|
write_graph(sess.graph_def, '', graph_def_file, False)
|
|
sess.close()
|
|
|
|
# Ensure the graph with variables cannot be converted.
|
|
with self.assertRaises(ValueError) as error:
|
|
lite.TFLiteConverter.from_frozen_graph(graph_def_file, ['Placeholder'],
|
|
['add'])
|
|
self.assertEqual('Please freeze the graph using freeze_graph.py.',
|
|
str(error.exception))
|
|
|
|
def testPbtxt(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
_ = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Write graph to file.
|
|
graph_def_file = os.path.join(self.get_temp_dir(), 'model.pbtxt')
|
|
write_graph(sess.graph_def, '', graph_def_file, True)
|
|
sess.close()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_frozen_graph(graph_def_file,
|
|
['Placeholder'], ['add'])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('add', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testInvalidFileNotFound(self):
|
|
with self.assertRaises(IOError) as error:
|
|
lite.TFLiteConverter.from_frozen_graph('invalid_file', ['Placeholder'],
|
|
['add'])
|
|
self.assertEqual('File \'invalid_file\' does not exist.',
|
|
str(error.exception))
|
|
|
|
def testInvalidFileBadData(self):
|
|
graph_def_file = os.path.join(self.get_temp_dir(), 'invalid_file')
|
|
with gfile.Open(graph_def_file, 'wb') as temp_file:
|
|
temp_file.write('bad data')
|
|
temp_file.flush()
|
|
|
|
# Attempts to convert the invalid model.
|
|
with self.assertRaises(IOError) as error:
|
|
lite.TFLiteConverter.from_frozen_graph(graph_def_file, ['Placeholder'],
|
|
['add'])
|
|
self.assertEqual(
|
|
'Unable to parse input file \'{}\'.'.format(graph_def_file),
|
|
str(error.exception))
|
|
|
|
def testFloatTocoConverter(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
_ = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Write graph to file.
|
|
graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb')
|
|
write_graph(sess.graph_def, '', graph_def_file, False)
|
|
sess.close()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TocoConverter.from_frozen_graph(graph_def_file,
|
|
['Placeholder'], ['add'])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Ensure the model is able to load.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
def testGraphDebugInfo(self):
|
|
"""Test a frozen graph doesn't have debug info captured."""
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(
|
|
shape=[1, 16, 16, 3], dtype=dtypes.float32)
|
|
_ = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Write graph to file.
|
|
graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb')
|
|
write_graph(sess.graph_def, '', graph_def_file, False)
|
|
sess.close()
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TocoConverter.from_frozen_graph(graph_def_file,
|
|
['Placeholder'], ['add'])
|
|
converter.convert()
|
|
# GraphDebugInfo should be none for frozen graph.
|
|
self.assertTrue(not converter._debug_info)
|
|
|
|
|
|
class FromFrozenGraphObjectDetection(LiteTest):
|
|
|
|
def _initObjectDetectionArgs(self):
|
|
# Initializes the arguments required for the object detection model.
|
|
# Looks for the model file which is saved in a different location internally
|
|
# and externally.
|
|
filename = resource_loader.get_path_to_datafile('testdata/tflite_graph.pb')
|
|
if not os.path.exists(filename):
|
|
filename = os.path.join(
|
|
resource_loader.get_root_dir_with_all_resources(),
|
|
'../tflite_mobilenet_ssd_quant_protobuf/tflite_graph.pb')
|
|
if not os.path.exists(filename):
|
|
raise IOError("File '{0}' does not exist.".format(filename))
|
|
|
|
self._graph_def_file = filename
|
|
self._input_arrays = ['normalized_input_image_tensor']
|
|
self._output_arrays = [
|
|
'TFLite_Detection_PostProcess', 'TFLite_Detection_PostProcess:1',
|
|
'TFLite_Detection_PostProcess:2', 'TFLite_Detection_PostProcess:3'
|
|
]
|
|
self._input_shapes = {'normalized_input_image_tensor': [1, 300, 300, 3]}
|
|
|
|
def testTFLiteGraphDef(self):
|
|
# Tests the object detection model that cannot be loaded in TensorFlow.
|
|
self._initObjectDetectionArgs()
|
|
|
|
converter = lite.TFLiteConverter.from_frozen_graph(self._graph_def_file,
|
|
self._input_arrays,
|
|
self._output_arrays,
|
|
self._input_shapes)
|
|
converter.allow_custom_ops = True
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('normalized_input_image_tensor', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 300, 300, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(4, len(output_details))
|
|
self.assertEqual('TFLite_Detection_PostProcess', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 10, 4] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
self.assertEqual('TFLite_Detection_PostProcess:1',
|
|
output_details[1]['name'])
|
|
self.assertTrue(([1, 10] == output_details[1]['shape']).all())
|
|
self.assertEqual('TFLite_Detection_PostProcess:2',
|
|
output_details[2]['name'])
|
|
self.assertTrue(([1, 10] == output_details[2]['shape']).all())
|
|
self.assertEqual('TFLite_Detection_PostProcess:3',
|
|
output_details[3]['name'])
|
|
self.assertTrue(([1] == output_details[3]['shape']).all())
|
|
|
|
|
|
class FromSavedModelTest(TestModels):
|
|
|
|
def _createSavedModel(self, shape):
|
|
"""Create a simple SavedModel."""
|
|
saved_model_dir = os.path.join(self.get_temp_dir(), 'simple_savedmodel')
|
|
with ops.Graph().as_default():
|
|
with session.Session() as sess:
|
|
in_tensor_1 = array_ops.placeholder(
|
|
shape=shape, dtype=dtypes.float32, name='inputB')
|
|
in_tensor_2 = array_ops.placeholder(
|
|
shape=shape, dtype=dtypes.float32, name='inputA')
|
|
out_tensor = in_tensor_1 + in_tensor_2
|
|
inputs = {'x': in_tensor_1, 'y': in_tensor_2}
|
|
outputs = {'z': out_tensor}
|
|
saved_model.simple_save(sess, saved_model_dir, inputs, outputs)
|
|
return saved_model_dir
|
|
|
|
def testSimpleModel(self):
|
|
"""Test a SavedModel."""
|
|
saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3])
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_saved_model(saved_model_dir)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(2, len(input_details))
|
|
self.assertStartsWith(input_details[0]['name'], 'inputA')
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
self.assertStartsWith(input_details[1]['name'], 'inputB')
|
|
self.assertEqual(np.float32, input_details[1]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[1]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertStartsWith(output_details[0]['name'], 'add')
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testOldConverterWarning(self):
|
|
"""Test if the warning message when using TOCO is logged."""
|
|
saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3])
|
|
log = io.BytesIO() if six.PY2 else io.StringIO()
|
|
handler = logging.StreamHandler(log)
|
|
logging.root.addHandler(handler)
|
|
warning_message = 'Please consider switching to the new converter'
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_saved_model(saved_model_dir)
|
|
converter.experimental_new_converter = False
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
self.assertIn(warning_message, log.getvalue())
|
|
logging.root.removeHandler(handler)
|
|
|
|
def testNewConverterOptOut(self):
|
|
"""Test if the opt out message when using New converter is logged."""
|
|
saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3])
|
|
log = io.BytesIO() if six.PY2 else io.StringIO()
|
|
handler = logging.StreamHandler(log)
|
|
logging.root.addHandler(handler)
|
|
optout_message = ('Using experimental converter: '
|
|
'If you encountered a problem')
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TFLiteConverter.from_saved_model(saved_model_dir)
|
|
converter.experimental_new_converter = True
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
self.assertIn(optout_message, log.getvalue())
|
|
logging.root.removeHandler(handler)
|
|
|
|
def testNoneBatchSize(self):
|
|
"""Test a SavedModel, with None in input tensor's shape."""
|
|
saved_model_dir = self._createSavedModel(shape=[None, 16, 16, 3])
|
|
|
|
converter = lite.TFLiteConverter.from_saved_model(saved_model_dir)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(2, len(input_details))
|
|
self.assertStartsWith(input_details[0]['name'], 'inputA')
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
self.assertStartsWith(input_details[1]['name'], 'inputB')
|
|
self.assertEqual(np.float32, input_details[1]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[1]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertStartsWith(output_details[0]['name'], 'add')
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testOrderInputArrays(self):
|
|
"""Test a SavedModel ordering of input arrays."""
|
|
saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3])
|
|
|
|
converter = lite.TFLiteConverter.from_saved_model(
|
|
saved_model_dir, input_arrays=['inputB', 'inputA'])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(2, len(input_details))
|
|
self.assertStartsWith(input_details[0]['name'], 'inputA')
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
self.assertStartsWith(input_details[1]['name'], 'inputB')
|
|
self.assertEqual(np.float32, input_details[1]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[1]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertStartsWith(output_details[0]['name'], 'add')
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
def testSubsetInputArrays(self):
|
|
"""Test a SavedModel with a subset of the input array names of the model."""
|
|
saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3])
|
|
|
|
# Check case where input shape is given.
|
|
converter = lite.TFLiteConverter.from_saved_model(
|
|
saved_model_dir,
|
|
input_arrays=['inputA'],
|
|
input_shapes={'inputA': [1, 16, 16, 3]})
|
|
|
|
# Since we only partially specify the input, this is not allowed.
|
|
with self.assertRaises(ConverterError):
|
|
_ = converter.convert()
|
|
|
|
# Check case where input shape is None.
|
|
converter = lite.TFLiteConverter.from_saved_model(
|
|
saved_model_dir, input_arrays=['inputA'], input_shapes={'inputA': None})
|
|
|
|
# Since we only partially specify the input, this is not allowed.
|
|
with self.assertRaises(ConverterError):
|
|
_ = converter.convert()
|
|
|
|
def testSimpleModelTocoConverter(self):
|
|
"""Test a SavedModel with deprecated TocoConverter."""
|
|
saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3])
|
|
|
|
# Convert model and ensure model is not None.
|
|
converter = lite.TocoConverter.from_saved_model(saved_model_dir)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Ensure the model is able to load.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
def testGraphDebugInfo(self):
|
|
"""Test a SavedModel has debug info captured."""
|
|
saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3])
|
|
converter = lite.TFLiteConverter.from_saved_model(saved_model_dir)
|
|
converter.convert()
|
|
self.assertValidDebugInfo(converter._debug_info)
|
|
|
|
|
|
class MyAddLayer(keras.layers.Layer):
|
|
|
|
def __init__(self, increment, **kwargs):
|
|
super(MyAddLayer, self).__init__(**kwargs)
|
|
self._increment = increment
|
|
|
|
def call(self, inputs):
|
|
return inputs + self._increment
|
|
|
|
def get_config(self):
|
|
config = super(MyAddLayer, self).get_config()
|
|
config['increment'] = self._increment
|
|
return config
|
|
|
|
|
|
class FromKerasFile(TestModels, parameterized.TestCase):
|
|
|
|
def setUp(self):
|
|
super(FromKerasFile, self).setUp()
|
|
self._keras_file = None
|
|
self._custom_objects = None
|
|
if not context.executing_eagerly():
|
|
keras.backend.clear_session()
|
|
|
|
def tearDown(self):
|
|
if self._keras_file:
|
|
os.remove(self._keras_file)
|
|
super(FromKerasFile, self).tearDown()
|
|
|
|
def _getSequentialModel(self, include_custom_layer=False):
|
|
model = keras.models.Sequential()
|
|
model.add(keras.layers.Dense(2, input_shape=(3,)))
|
|
if include_custom_layer:
|
|
model.add(MyAddLayer(1.0))
|
|
model.add(keras.layers.RepeatVector(3))
|
|
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
|
|
model.compile(
|
|
loss=keras.losses.MSE,
|
|
optimizer='sgd',
|
|
metrics=[keras.metrics.categorical_accuracy],
|
|
sample_weight_mode='temporal')
|
|
x = np.random.random((1, 3))
|
|
y = np.random.random((1, 3, 3))
|
|
model.train_on_batch(x, y)
|
|
model.predict(x)
|
|
|
|
try:
|
|
fd, self._keras_file = tempfile.mkstemp('.h5')
|
|
keras.models.save_model(model, self._keras_file)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
if include_custom_layer:
|
|
self._custom_objects = {'MyAddLayer': MyAddLayer}
|
|
|
|
@parameterized.named_parameters(('_graph', context.graph_mode),
|
|
('_eager', context.eager_mode))
|
|
def testSequentialModel(self, test_context):
|
|
"""Test a Sequential tf.keras model with default inputs."""
|
|
with test_context():
|
|
self._getSequentialModel()
|
|
|
|
converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check tensor details of converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertEndsWith(input_details[0]['name'], 'dense_input')
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertLen(output_details, 1)
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 3, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
# Check inference of converted model.
|
|
input_data = np.array([[1, 2, 3]], dtype=np.float32)
|
|
interpreter.set_tensor(input_details[0]['index'], input_data)
|
|
interpreter.invoke()
|
|
tflite_result = interpreter.get_tensor(output_details[0]['index'])
|
|
|
|
keras_model = keras.models.load_model(self._keras_file)
|
|
keras_result = keras_model.predict(input_data)
|
|
|
|
np.testing.assert_almost_equal(tflite_result, keras_result, 5)
|
|
|
|
@parameterized.named_parameters(('_graph', context.graph_mode),
|
|
('_eager', context.eager_mode))
|
|
def testCustomLayer(self, test_context):
|
|
"""Test a Sequential tf.keras model with default inputs."""
|
|
with test_context():
|
|
self._getSequentialModel(include_custom_layer=True)
|
|
|
|
converter = lite.TFLiteConverter.from_keras_model_file(
|
|
self._keras_file, custom_objects=self._custom_objects)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check tensor details of converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
output_details = interpreter.get_output_details()
|
|
|
|
# Check inference of converted model.
|
|
input_data = np.array([[1, 2, 3]], dtype=np.float32)
|
|
interpreter.set_tensor(input_details[0]['index'], input_data)
|
|
interpreter.invoke()
|
|
tflite_result = interpreter.get_tensor(output_details[0]['index'])
|
|
|
|
keras_model = keras.models.load_model(
|
|
self._keras_file, custom_objects=self._custom_objects)
|
|
keras_result = keras_model.predict(input_data)
|
|
|
|
np.testing.assert_almost_equal(tflite_result, keras_result, 5)
|
|
|
|
def testSequentialModelInputArray(self):
|
|
"""Test a Sequential tf.keras model testing input arrays argument."""
|
|
ops.disable_eager_execution()
|
|
self._getSequentialModel()
|
|
|
|
# Invalid input array raises error.
|
|
with self.assertRaises(ValueError) as error:
|
|
lite.TFLiteConverter.from_keras_model_file(
|
|
self._keras_file, input_arrays=['invalid-input'])
|
|
self.assertEqual("Invalid tensors 'invalid-input' were found.",
|
|
str(error.exception))
|
|
|
|
# Valid input array.
|
|
converter = lite.TFLiteConverter.from_keras_model_file(
|
|
self._keras_file, input_arrays=['dense_input'])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
def testSequentialModelInputShape(self):
|
|
"""Test a Sequential tf.keras model testing input shapes argument."""
|
|
self._getSequentialModel()
|
|
|
|
# Passing in shape of invalid input array raises error.
|
|
with self.assertRaises(ValueError) as error:
|
|
converter = lite.TFLiteConverter.from_keras_model_file(
|
|
self._keras_file, input_shapes={'invalid-input': [2, 3]})
|
|
self.assertEqual(
|
|
"Invalid tensor 'invalid-input' found in tensor shapes map.",
|
|
str(error.exception))
|
|
|
|
# Passing in shape of valid input array.
|
|
converter = lite.TFLiteConverter.from_keras_model_file(
|
|
self._keras_file, input_shapes={'dense_input': [2, 3]})
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check input shape from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertEndsWith(input_details[0]['name'], 'dense_input')
|
|
self.assertTrue(([2, 3] == input_details[0]['shape']).all())
|
|
|
|
def testSequentialModelOutputArray(self):
|
|
"""Test a Sequential tf.keras model testing output arrays argument."""
|
|
ops.disable_eager_execution()
|
|
self._getSequentialModel()
|
|
|
|
# Invalid output array raises error.
|
|
with self.assertRaises(ValueError) as error:
|
|
lite.TFLiteConverter.from_keras_model_file(
|
|
self._keras_file, output_arrays=['invalid-output'])
|
|
self.assertEqual("Invalid tensors 'invalid-output' were found.",
|
|
str(error.exception))
|
|
|
|
# Valid output array.
|
|
converter = lite.TFLiteConverter.from_keras_model_file(
|
|
self._keras_file, output_arrays=['time_distributed/Reshape_1'])
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
@parameterized.named_parameters(('_graph', context.graph_mode),
|
|
('_eager', context.eager_mode))
|
|
def testFunctionalModel(self, test_context):
|
|
"""Test a Functional tf.keras model with default inputs."""
|
|
with test_context():
|
|
inputs = keras.layers.Input(shape=(3,), name='input')
|
|
x = keras.layers.Dense(2)(inputs)
|
|
output = keras.layers.Dense(3)(x)
|
|
|
|
model = keras.models.Model(inputs, output)
|
|
model.compile(
|
|
loss=keras.losses.MSE,
|
|
optimizer='sgd',
|
|
metrics=[keras.metrics.categorical_accuracy])
|
|
x = np.random.random((1, 3))
|
|
y = np.random.random((1, 3))
|
|
model.train_on_batch(x, y)
|
|
|
|
model.predict(x)
|
|
fd, self._keras_file = tempfile.mkstemp('.h5')
|
|
try:
|
|
keras.models.save_model(model, self._keras_file)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
# Convert to TFLite model.
|
|
converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check tensor details of converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertEqual('input', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertLen(output_details, 1)
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
# Check inference of converted model.
|
|
input_data = np.array([[1, 2, 3]], dtype=np.float32)
|
|
interpreter.set_tensor(input_details[0]['index'], input_data)
|
|
interpreter.invoke()
|
|
tflite_result = interpreter.get_tensor(output_details[0]['index'])
|
|
|
|
keras_model = keras.models.load_model(self._keras_file)
|
|
keras_result = keras_model.predict(input_data)
|
|
|
|
np.testing.assert_almost_equal(tflite_result, keras_result, 5)
|
|
|
|
def testFunctionalModelMultipleInputs(self):
|
|
"""Test a Functional tf.keras model with multiple inputs and outputs."""
|
|
a = keras.layers.Input(shape=(3,), name='input_a')
|
|
b = keras.layers.Input(shape=(3,), name='input_b')
|
|
dense = keras.layers.Dense(4, name='dense')
|
|
c = dense(a)
|
|
d = dense(b)
|
|
e = keras.layers.Dropout(0.5, name='dropout')(c)
|
|
|
|
model = keras.models.Model([a, b], [d, e])
|
|
model.compile(
|
|
loss=keras.losses.MSE,
|
|
optimizer='sgd',
|
|
metrics=[keras.metrics.mae],
|
|
loss_weights=[1., 0.5])
|
|
|
|
input_a_np = np.random.random((10, 3))
|
|
input_b_np = np.random.random((10, 3))
|
|
output_d_np = np.random.random((10, 4))
|
|
output_e_np = np.random.random((10, 4))
|
|
model.train_on_batch([input_a_np, input_b_np], [output_d_np, output_e_np])
|
|
|
|
model.predict([input_a_np, input_b_np], batch_size=5)
|
|
fd, self._keras_file = tempfile.mkstemp('.h5')
|
|
try:
|
|
keras.models.save_model(model, self._keras_file)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
# Convert to TFLite model.
|
|
converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 2)
|
|
self.assertEndsWith(input_details[0]['name'], 'input_a')
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
self.assertEndsWith(input_details[1]['name'], 'input_b')
|
|
self.assertEqual(np.float32, input_details[1]['dtype'])
|
|
self.assertTrue(([1, 3] == input_details[1]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[1]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertLen(output_details, 2)
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 4] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
self.assertEqual(np.float32, output_details[1]['dtype'])
|
|
self.assertTrue(([1, 4] == output_details[1]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[1]['quantization'])
|
|
|
|
def testFunctionalSequentialModel(self):
|
|
"""Test a Functional tf.keras model containing a Sequential model."""
|
|
model = keras.models.Sequential()
|
|
model.add(keras.layers.Dense(2, input_shape=(3,)))
|
|
model.add(keras.layers.RepeatVector(3))
|
|
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
|
|
model = keras.models.Model(model.input, model.output)
|
|
|
|
model.compile(
|
|
loss=keras.losses.MSE,
|
|
optimizer='sgd',
|
|
metrics=[keras.metrics.categorical_accuracy],
|
|
sample_weight_mode='temporal')
|
|
x = np.random.random((1, 3))
|
|
y = np.random.random((1, 3, 3))
|
|
model.train_on_batch(x, y)
|
|
model.predict(x)
|
|
|
|
model.predict(x)
|
|
fd, self._keras_file = tempfile.mkstemp('.h5')
|
|
try:
|
|
keras.models.save_model(model, self._keras_file)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
# Convert to TFLite model.
|
|
converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Check tensor details of converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertEndsWith(input_details[0]['name'], 'dense_input')
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([1, 3] == input_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), input_details[0]['quantization'])
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertLen(output_details, 1)
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([1, 3, 3] == output_details[0]['shape']).all())
|
|
self.assertEqual((0., 0.), output_details[0]['quantization'])
|
|
|
|
# Check inference of converted model.
|
|
input_data = np.array([[1, 2, 3]], dtype=np.float32)
|
|
interpreter.set_tensor(input_details[0]['index'], input_data)
|
|
interpreter.invoke()
|
|
tflite_result = interpreter.get_tensor(output_details[0]['index'])
|
|
|
|
keras_model = keras.models.load_model(self._keras_file)
|
|
keras_result = keras_model.predict(input_data)
|
|
|
|
np.testing.assert_almost_equal(tflite_result, keras_result, 5)
|
|
|
|
def testSequentialModelTocoConverter(self):
|
|
"""Test a Sequential tf.keras model with deprecated TocoConverter."""
|
|
self._getSequentialModel()
|
|
|
|
converter = lite.TocoConverter.from_keras_model_file(self._keras_file)
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
# Ensure the model is able to load.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
@parameterized.named_parameters(('_graph', context.graph_mode),
|
|
('_eager', context.eager_mode))
|
|
def testGraphDebugInfo(self, test_context):
|
|
"""Test a Sequential tf.keras model has debug info captured."""
|
|
with test_context():
|
|
self._getSequentialModel()
|
|
converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file)
|
|
converter.convert()
|
|
self.assertValidDebugInfo(converter._debug_info)
|
|
|
|
def testExperimentalSparsifyModel(self):
|
|
self._getSequentialModel()
|
|
|
|
converter = lite.TocoConverter.from_keras_model_file(self._keras_file)
|
|
converter._experimental_sparsify_model = True
|
|
tflite_model = converter.convert()
|
|
self.assertTrue(tflite_model)
|
|
|
|
|
|
class GrapplerTest(TestModels, parameterized.TestCase):
|
|
|
|
def testConstantFolding(self):
|
|
ops.disable_eager_execution()
|
|
# Constant folding handles the tf.broadcast_to operation which was not
|
|
# supported by the TFLite at the time this test was added.
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(shape=[3, 3], dtype=dtypes.float32)
|
|
y_const = constant_op.constant([1., 2., 3.])
|
|
y_broadcast = gen_array_ops.broadcast_to(y_const, [3, 3])
|
|
out_tensor = math_ops.matmul(in_tensor, y_broadcast, name='output')
|
|
sess = session.Session()
|
|
|
|
# Convert model.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
tflite_model = converter.convert()
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertEqual(1, len(input_details))
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual(np.float32, input_details[0]['dtype'])
|
|
self.assertTrue(([3, 3] == input_details[0]['shape']).all())
|
|
|
|
output_details = interpreter.get_output_details()
|
|
self.assertEqual(1, len(output_details))
|
|
self.assertEqual('output', output_details[0]['name'])
|
|
self.assertEqual(np.float32, output_details[0]['dtype'])
|
|
self.assertTrue(([3, 3] == output_details[0]['shape']).all())
|
|
|
|
@parameterized.named_parameters(
|
|
('EnableMlirConverter', True), # enable mlir
|
|
('DisableMlirConverter', False)) # disable mlir
|
|
def testInputNodeIsNotFolded(self, enable_mlir):
|
|
ops.disable_eager_execution()
|
|
# Constant folding handles the tf.broadcast_to operation which was not
|
|
# supported by the TFLite at the time this test was added.
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(shape=[3], dtype=dtypes.float32)
|
|
y_const = constant_op.constant([1., 2., 3.])
|
|
y_add = y_const + y_const
|
|
out_tensor = in_tensor * y_add
|
|
sess = session.Session()
|
|
|
|
# Convert model.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor, y_const],
|
|
[out_tensor])
|
|
converter.experimental_new_converter = enable_mlir
|
|
tflite_model = converter.convert()
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 2)
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
self.assertEqual('Const', input_details[1]['name'])
|
|
|
|
def testGrapplerConstFolding(self):
|
|
# Constant folding converts the following add operation to tf.broadcast_to
|
|
# operation which was not supported by the TFLite at the time this test was
|
|
# added.
|
|
@def_function.function
|
|
def plus_placeholder(x, placeholder):
|
|
return x + placeholder
|
|
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(shape=[2, 2], dtype=dtypes.float32)
|
|
out_tensor = plus_placeholder(
|
|
array_ops.zeros([2, 2, 2]),
|
|
array_ops.reshape(in_tensor, shape=[2, 2]))
|
|
sess = session.Session()
|
|
|
|
# Convert model.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
# Only disable this path in MLIR conversion for toco compatibility.
|
|
converter.experimental_new_converter = True
|
|
tflite_model = converter.convert()
|
|
|
|
# Check values from converted model.
|
|
interpreter = Interpreter(model_content=tflite_model)
|
|
interpreter.allocate_tensors()
|
|
|
|
input_details = interpreter.get_input_details()
|
|
self.assertLen(input_details, 1)
|
|
self.assertEqual('Placeholder', input_details[0]['name'])
|
|
|
|
|
|
class ImportOpsUtilTest(LiteTest):
|
|
|
|
def testGetPotentiallySupportedOps(self):
|
|
self.assertIsNotNone(lite.get_potentially_supported_ops())
|
|
|
|
|
|
class DefaultConverterAttrsTest(LiteTest):
|
|
|
|
def testAttrs(self):
|
|
with ops.Graph().as_default():
|
|
in_tensor = array_ops.placeholder(shape=[2, 2], dtype=dtypes.float32)
|
|
out_tensor = in_tensor + in_tensor
|
|
sess = session.Session()
|
|
|
|
# Convert model.
|
|
converter = lite.TFLiteConverter.from_session(sess, [in_tensor],
|
|
[out_tensor])
|
|
|
|
# Assert output format.
|
|
self.assertEqual(converter.output_format, lite_constants.TFLITE)
|
|
|
|
# Assert the default inference type is float.
|
|
self.assertEqual(converter.inference_type, lite_constants.FLOAT)
|
|
|
|
# Assert the default inference type overrides are None.
|
|
self.assertIsNone(converter.inference_input_type)
|
|
self.assertIsNone(converter.inference_output_type)
|
|
|
|
# Assert the default quantization options are not set.
|
|
self.assertEqual(converter.quantized_input_stats, {})
|
|
self.assertIsNone(converter.default_ranges_stats)
|
|
self.assertFalse(converter.reorder_across_fake_quant)
|
|
self.assertFalse(converter.change_concat_input_ranges)
|
|
|
|
# Assert dropping control dependency is enabled by default.
|
|
self.assertTrue(converter.drop_control_dependency)
|
|
|
|
# Assert dumping extra information is disabled by default.
|
|
self.assertIsNone(converter.dump_graphviz_dir)
|
|
self.assertFalse(converter.dump_graphviz_video)
|
|
self.assertIsNone(converter.conversion_summary_dir)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test.main()
|