STT-tensorflow/tensorflow/python/keras/engine/compile_utils_test.py
Thomas O'Malley cb043911fe Add MetricsContainer.weighted_metrics and MetricsContainer.unweighted_metrics
properties to help distinguish between metrics that should and shouldn't be
passed sample_weight argument.
Note these properties are set to None before Model.fit is called, since metrics
are potentially broadcast to match the structure of data seen in Model.fit.

PiperOrigin-RevId: 339892649
Change-Id: I0abffae08efde2b8adc58014ef205d318d66a9ab
2020-10-30 10:29:23 -07:00

740 lines
29 KiB
Python

# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for compile utitilies."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.python.distribute import one_device_strategy
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.keras import backend as K
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import losses as losses_mod
from tensorflow.python.keras import metrics as metrics_mod
from tensorflow.python.keras.engine import compile_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.platform import test
class LossesContainerTest(keras_parameterized.TestCase):
def test_single_loss(self):
loss_container = compile_utils.LossesContainer('mse')
y_t, y_p = array_ops.ones((10, 5)), array_ops.zeros((10, 5))
total_loss = loss_container(y_t, y_p)
self.assertTrue(loss_container._built)
self.assertLen(loss_container._losses, 1)
self.assertEqual(total_loss.numpy(), 1.)
self.assertLen(loss_container.metrics, 1)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 1.)
def test_loss_list(self):
loss_container = compile_utils.LossesContainer(['mse', 'mae'], [1, 0.5])
y_t = [array_ops.ones((10, 1)), array_ops.zeros((10, 1))]
y_p = [array_ops.ones((10, 1)), array_ops.ones((10, 1))]
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
total_loss = loss_container(y_t, y_p, sample_weight=sw)
self.assertEqual(loss_container._output_names, ['output_1', 'output_2'])
self.assertLen(loss_container._losses, 2)
self.assertEqual(total_loss.numpy(), 0.25)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 0.25)
output_1_metric = loss_container.metrics[1]
self.assertEqual(output_1_metric.name, 'output_1_loss')
self.assertEqual(output_1_metric.result().numpy(), 0)
output_2_metric = loss_container.metrics[2]
self.assertEqual(output_2_metric.name, 'output_2_loss')
self.assertEqual(output_2_metric.result().numpy(), 0.5)
def test_loss_dict(self):
loss_container = compile_utils.LossesContainer(
{
'out1': 'mse',
'out2': 'mae'
}, {
'out1': 1,
'out2': 0.5
})
y_t = {'out1': array_ops.ones((10, 1)), 'out2': array_ops.zeros((10, 1))}
y_p = {'out1': array_ops.ones((10, 1)), 'out2': array_ops.ones((10, 1))}
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
total_loss = loss_container(y_t, y_p, sample_weight=sw)
self.assertLen(loss_container._losses, 2)
self.assertEqual(total_loss.numpy(), 0.25)
self.assertLen(loss_container.metrics, 3)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 0.25)
out1_metric = loss_container.metrics[1]
self.assertEqual(out1_metric.name, 'out1_loss')
self.assertEqual(out1_metric.result().numpy(), 0)
out2_metric = loss_container.metrics[2]
self.assertEqual(out2_metric.name, 'out2_loss')
self.assertEqual(out2_metric.result().numpy(), 0.5)
def test_loss_partial_dict_with_output_names(self):
loss_container = compile_utils.LossesContainer(
{'out2': 'mae'}, {'out2': 1.}, output_names=['out1', 'out2'])
y_t = [array_ops.ones((10, 1)), array_ops.zeros((10, 1))]
y_p = [array_ops.ones((10, 1)), array_ops.ones((10, 1))]
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
total_loss = loss_container(y_t, y_p, sample_weight=sw)
self.assertEqual(total_loss.numpy(), 0.5)
self.assertLen(loss_container.metrics, 2)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 0.5)
out2_metric = loss_container.metrics[1]
self.assertEqual(out2_metric.name, 'out2_loss')
self.assertEqual(out2_metric.result().numpy(), 0.5)
def test_loss_dict_with_nones(self):
loss_container = compile_utils.LossesContainer({
'out1': None,
'out2': 'mae'
})
y_t = {'out1': array_ops.ones((10, 1)), 'out2': array_ops.zeros((10, 1))}
y_p = {'out1': array_ops.ones((10, 1)), 'out2': array_ops.ones((10, 1))}
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
total_loss = loss_container(y_t, y_p, sample_weight=sw)
self.assertEqual(total_loss.numpy(), 0.5)
self.assertLen(loss_container.metrics, 2)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 0.5)
out2_metric = loss_container.metrics[1]
self.assertEqual(out2_metric.name, 'out2_loss')
self.assertEqual(out2_metric.result().numpy(), 0.5)
def test_nested_structure(self):
loss_container = compile_utils.LossesContainer(
{
'b': ['mse', None],
'a': 'mae'
}, loss_weights={
'b': [0.5, 0],
'a': 1
})
y_t = {
'b': [array_ops.ones((10, 1)),
array_ops.zeros((10, 1))],
'a': array_ops.zeros((10, 1))
}
y_p = {
'b': [array_ops.zeros((10, 1)),
array_ops.zeros((10, 1))],
'a': array_ops.ones((10, 1))
}
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
total_loss = loss_container(y_t, y_p, sample_weight=sw)
self.assertEqual(total_loss.numpy(), 0.75)
self.assertLen(loss_container.metrics, 3)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 0.75)
a_metric = loss_container.metrics[1]
self.assertEqual(a_metric.name, 'a_loss')
self.assertEqual(a_metric.result().numpy(), 0.5)
b_1_metric = loss_container.metrics[2]
self.assertEqual(b_1_metric.name, 'b_1_loss')
self.assertEqual(b_1_metric.result().numpy(), 0.5)
def test_broadcast_single_loss(self):
loss_container = compile_utils.LossesContainer('mse')
y_t = [array_ops.ones((10, 1)), array_ops.zeros((10, 1))]
y_p = [array_ops.ones((10, 1)), array_ops.ones((10, 1))]
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
total_loss = loss_container(y_t, y_p, sample_weight=sw)
self.assertEqual(total_loss.numpy(), 0.5)
self.assertLen(loss_container.metrics, 3)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 0.5)
output_1_metric = loss_container.metrics[1]
self.assertEqual(output_1_metric.name, 'output_1_loss')
self.assertEqual(output_1_metric.result().numpy(), 0.)
output_2_metric = loss_container.metrics[2]
self.assertEqual(output_2_metric.name, 'output_2_loss')
self.assertEqual(output_2_metric.result().numpy(), 0.5)
def test_missing_label_with_no_loss(self):
# It's ok to exclude a label if that label has no
# losses or metrics associated with it.
loss_container = compile_utils.LossesContainer({
'output1': 'mse',
'output3': 'mae'
})
y_p = {
'output1': ops.convert_to_tensor_v2_with_dispatch([[0], [1], [2]]),
'output2': ops.convert_to_tensor_v2_with_dispatch([[3], [4], [5]]),
'output3': ops.convert_to_tensor_v2_with_dispatch([[6], [7], [8]])
}
y_t = {
'output1': ops.convert_to_tensor_v2_with_dispatch([[1], [2], [3]]),
'output3': ops.convert_to_tensor_v2_with_dispatch([[4], [5], [6]])
}
total_loss = loss_container(y_t, y_p)
self.assertEqual(total_loss.numpy(), 3.)
self.assertLen(loss_container.metrics, 3)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertEqual(loss_metric.result().numpy(), 3.)
output_1_metric = loss_container.metrics[1]
self.assertEqual(output_1_metric.name, 'output1_loss')
self.assertEqual(output_1_metric.result().numpy(), 1.)
output_3_metric = loss_container.metrics[2]
self.assertEqual(output_3_metric.name, 'output3_loss')
self.assertEqual(output_3_metric.result().numpy(), 2.)
def test_mismatched_dtypes(self):
y_t = constant_op.constant([1, 9, 2, -5], shape=(2, 2))
y_p = constant_op.constant([4, 8, 12, 8],
shape=(2, 2),
dtype=dtypes.float32)
def my_mae(labels, preds):
self.assertEqual(labels.dtype, dtypes.int32)
self.assertEqual(preds.dtype, dtypes.float32)
labels = math_ops.cast(labels, preds.dtype)
return K.mean(math_ops.abs(preds - labels), axis=-1)
loss_container = compile_utils.LossesContainer(my_mae)
total_loss = loss_container(y_t, y_p)
self.assertEqual(total_loss.dtype, dtypes.float32)
def test_integer_dtypes(self):
y_t = constant_op.constant([1, 9, 2, -5], shape=(2, 2))
y_p = constant_op.constant([4, 8, 12, 8], shape=(2, 2), dtype=dtypes.int64)
def my_mae(labels, preds):
self.assertEqual(labels.dtype, dtypes.int64)
self.assertEqual(preds.dtype, dtypes.int64)
return K.mean(math_ops.abs(preds - labels), axis=-1)
loss_container = compile_utils.LossesContainer(my_mae)
total_loss = loss_container(y_t, y_p)
self.assertEqual(total_loss.dtype, dtypes.int64)
def test_float_dtypes(self):
y_t = constant_op.constant([1, 9, 2, -5],
shape=(2, 2),
dtype=dtypes.float32)
y_p = constant_op.constant([4, 8, 12, 8],
shape=(2, 2),
dtype=dtypes.float64)
def my_mae(labels, preds):
self.assertEqual(labels.dtype, dtypes.float64)
self.assertEqual(preds.dtype, dtypes.float64)
return K.mean(math_ops.abs(preds - labels), axis=-1)
loss_container = compile_utils.LossesContainer(my_mae)
total_loss = loss_container(y_t, y_p)
self.assertEqual(total_loss.dtype, dtypes.float64)
def test_loss_masking(self):
loss_container = compile_utils.LossesContainer('mae')
y_p = constant_op.constant([[[1], [1]], [[0], [0]]], dtype=dtypes.float32)
y_t = constant_op.constant([[[1], [1]], [[1], [1]]], dtype=dtypes.float32)
y_p._keras_mask = constant_op.constant([[1, 0], [1, 0]],
dtype=dtypes.float32)
total_loss = loss_container(y_t, y_p)
self.assertAlmostEqual(total_loss.numpy(), .25) # sum over batch size
self.assertLen(loss_container.metrics, 1)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertAlmostEqual(loss_metric.result().numpy(), .25)
def test_loss_sample_weight(self):
loss_container = compile_utils.LossesContainer('mae')
y_p = constant_op.constant([[[1], [1]], [[0], [0]]], dtype=dtypes.float32)
y_t = constant_op.constant([[[1], [1]], [[1], [1]]], dtype=dtypes.float32)
sw = constant_op.constant([[.2, .3], [.5, 0]], dtype=dtypes.float32)
total_loss = loss_container(y_t, y_p, sample_weight=sw)
# (0 * .2 + 0 * .3 + 1 * .5 + 1 * 0) / 4
self.assertAlmostEqual(total_loss.numpy(), .125)
self.assertLen(loss_container.metrics, 1)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertAlmostEqual(loss_metric.result().numpy(), .125)
def test_loss_masking_sample_weight(self):
loss_container = compile_utils.LossesContainer('mae')
y_p = constant_op.constant([[[1], [1]], [[0], [0]]], dtype=dtypes.float32)
y_t = constant_op.constant([[[1], [1]], [[1], [1]]], dtype=dtypes.float32)
sw = constant_op.constant([[.2, .3], [.5, 0]], dtype=dtypes.float32)
y_p._keras_mask = constant_op.constant([[1, 0], [1, 0]],
dtype=dtypes.float32)
total_loss = loss_container(y_t, y_p, sample_weight=sw)
# (0 * .2 + 1 * .5) / 4
self.assertAlmostEqual(total_loss.numpy(), .125) # sum over batch size
self.assertLen(loss_container.metrics, 1)
loss_metric = loss_container.metrics[0]
self.assertEqual(loss_metric.name, 'loss')
self.assertAlmostEqual(loss_metric.result().numpy(), .125)
def test_custom_loss_callables(self):
def custom_loss_fn(y_true, y_pred):
return math_ops.reduce_sum(y_true - y_pred)
class CustomLossClass(object):
def __call__(self, y_true, y_pred):
return math_ops.reduce_sum(y_true - y_pred)
loss_container = compile_utils.LossesContainer(
[custom_loss_fn, CustomLossClass()])
y_t, y_p = array_ops.ones((10, 5)), array_ops.zeros((10, 5))
loss_container(y_t, y_p)
self.assertEqual(loss_container._losses[0].name, 'custom_loss_fn')
self.assertEqual(loss_container._losses[1].name, 'custom_loss_class')
class MetricsContainerTest(keras_parameterized.TestCase):
def test_single_metric(self):
metric_container = compile_utils.MetricsContainer('mse')
y_t, y_p = array_ops.ones((10, 5)), array_ops.zeros((10, 5))
metric_container.update_state(y_t, y_p)
self.assertLen(metric_container.metrics, 1)
metric = metric_container.metrics[0]
self.assertEqual(metric.name, 'mse')
self.assertEqual(metric.result().numpy(), 1.)
def test_list_of_metrics_one_output(self):
metric_container = compile_utils.MetricsContainer(['mse', 'mae'])
y_t, y_p = 2 * array_ops.ones((10, 5)), array_ops.zeros((10, 5))
metric_container.update_state(y_t, y_p)
self.assertLen(metric_container.metrics, 2)
mse_metric = metric_container.metrics[0]
self.assertEqual(mse_metric.name, 'mse')
self.assertEqual(mse_metric.result().numpy(), 4.)
mae_metric = metric_container.metrics[1]
self.assertEqual(mae_metric.name, 'mae')
self.assertEqual(mae_metric.result().numpy(), 2.)
def test_list_of_metrics_list_of_outputs(self):
metric_container = compile_utils.MetricsContainer(
metrics=['mse', 'mae'], # Should broadcast to both outputs.
weighted_metrics=['accuracy']) # Should broadcast to both outputs.
y_t = [array_ops.ones((10, 1)), array_ops.zeros((10, 1))]
y_p = [array_ops.ones((10, 1)), 2 * array_ops.ones((10, 1))]
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
metric_container.update_state(y_t, y_p, sample_weight=sw)
self.assertLen(metric_container.metrics, 6)
mse_metric = metric_container.metrics[0]
self.assertEqual(mse_metric.name, 'output_1_mse')
self.assertEqual(mse_metric.result().numpy(), 0.)
mse_metric = metric_container.metrics[1]
self.assertEqual(mse_metric.name, 'output_1_mae')
self.assertEqual(mse_metric.result().numpy(), 0.)
acc_metric_1 = metric_container.metrics[2]
self.assertEqual(acc_metric_1.name, 'output_1_accuracy')
self.assertEqual(acc_metric_1.result().numpy(), 1.)
self.assertEqual(acc_metric_1._fn, metrics_mod.binary_accuracy)
mae_metric = metric_container.metrics[3]
self.assertEqual(mae_metric.name, 'output_2_mse')
self.assertEqual(mae_metric.result().numpy(), 4.)
mae_metric = metric_container.metrics[4]
self.assertEqual(mae_metric.name, 'output_2_mae')
self.assertEqual(mae_metric.result().numpy(), 2.)
acc_metric_2 = metric_container.metrics[5]
self.assertEqual(acc_metric_2.name, 'output_2_accuracy')
self.assertEqual(acc_metric_2.result().numpy(), 0.)
self.assertEqual(acc_metric_2._fn, metrics_mod.binary_accuracy)
weighted_metrics = metric_container.weighted_metrics
self.assertLen(weighted_metrics, 2)
self.assertEqual(weighted_metrics[0].name, 'output_1_accuracy')
self.assertEqual(weighted_metrics[1].name, 'output_2_accuracy')
unweighted_metrics = metric_container.unweighted_metrics
self.assertLen(unweighted_metrics, 4)
self.assertEqual(unweighted_metrics[0].name, 'output_1_mse')
self.assertEqual(unweighted_metrics[1].name, 'output_1_mae')
self.assertEqual(unweighted_metrics[2].name, 'output_2_mse')
self.assertEqual(unweighted_metrics[3].name, 'output_2_mae')
def test_metric_dict(self):
metric_container = compile_utils.MetricsContainer(
metrics={
'out1': 'mse',
'out2': 'mae'
},
weighted_metrics={
'out1': 'mse',
'out2': 'mae'
})
y_t = {'out1': array_ops.ones((10, 1)), 'out2': array_ops.zeros((10, 1))}
y_p = {'out1': array_ops.ones((10, 1)), 'out2': 2 * array_ops.ones((10, 1))}
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
metric_container.update_state(y_t, y_p, sample_weight=sw)
mse_metric = metric_container.metrics[0]
self.assertEqual(mse_metric.name, 'out1_mse')
self.assertEqual(mse_metric.result().numpy(), 0.)
weighted_mse_metric = metric_container.metrics[1]
self.assertEqual(weighted_mse_metric.name, 'out1_weighted_mse')
self.assertEqual(weighted_mse_metric.result().numpy(), 0.)
mae_metric = metric_container.metrics[2]
self.assertEqual(mae_metric.name, 'out2_mae')
self.assertEqual(mae_metric.result().numpy(), 2.)
weighted_mae_metric = metric_container.metrics[3]
self.assertEqual(weighted_mae_metric.name, 'out2_weighted_mae')
self.assertEqual(weighted_mae_metric.result().numpy(), 2.)
def test_metric_partial_dict_with_output_names(self):
metric_container = compile_utils.MetricsContainer(
{'out2': 'mae'}, output_names=['out1', 'out2'])
y_t = [array_ops.ones((10, 1)), array_ops.zeros((10, 1))]
y_p = [array_ops.ones((10, 1)), array_ops.ones((10, 1))]
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
metric_container.update_state(y_t, y_p, sample_weight=sw)
self.assertLen(metric_container.metrics, 1)
mae_metric = metric_container.metrics[0]
self.assertEqual(mae_metric.name, 'out2_mae')
self.assertEqual(mae_metric.result().numpy(), 1.)
def test_metric_partial_dict_with_nones(self):
metric_container = compile_utils.MetricsContainer({
'out1': None,
'out2': 'mae'
})
y_t = {'out1': array_ops.ones((10, 1)), 'out2': array_ops.zeros((10, 1))}
y_p = {'out1': array_ops.ones((10, 1)), 'out2': array_ops.ones((10, 1))}
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
metric_container.update_state(y_t, y_p, sample_weight=sw)
self.assertLen(metric_container.metrics, 1)
mae_metric = metric_container.metrics[0]
self.assertEqual(mae_metric.name, 'out2_mae')
self.assertEqual(mae_metric.result().numpy(), 1.)
def test_nested_structure(self):
metric_container = compile_utils.MetricsContainer(
metrics={
'b': ['mse', None],
'a': 'mae'
},
weighted_metrics={
'b': [None, None],
'a': 'mse'
})
y_t = {
'b': [2 * array_ops.ones((10, 1)),
array_ops.zeros((10, 1))],
'a': array_ops.zeros((10, 1))
}
y_p = {
'b': [array_ops.zeros((10, 1)),
array_ops.zeros((10, 1))],
'a': array_ops.ones((10, 1))
}
sw = ops.convert_to_tensor_v2_with_dispatch([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
metric_container.update_state(y_t, y_p, sample_weight=sw)
self.assertLen(metric_container.metrics, 3)
a_mae_metric = metric_container.metrics[0]
self.assertEqual(a_mae_metric.name, 'a_mae')
self.assertEqual(a_mae_metric.result().numpy(), 1.)
weighted_a_mae_metric = metric_container.metrics[1]
self.assertEqual(weighted_a_mae_metric.name, 'a_mse')
self.assertEqual(weighted_a_mae_metric.result().numpy(), 1.)
b_1_mse_metric = metric_container.metrics[2]
self.assertEqual(b_1_mse_metric.name, 'b_1_mse')
self.assertEqual(b_1_mse_metric.result().numpy(), 4.)
def test_crossentropy(self):
metric_container = compile_utils.MetricsContainer('crossentropy')
y_t, y_p = array_ops.ones((10, 1)), array_ops.ones((10, 1))
metric_container.update_state(y_t, y_p)
self.assertEqual(metric_container.metrics[0]._fn,
metrics_mod.binary_crossentropy)
metric_container = compile_utils.MetricsContainer('crossentropy')
y_t, y_p = array_ops.ones((10, 1)), array_ops.ones((10, 20))
self.assertEqual(y_p.shape.as_list()[-1], 20)
metric_container.update_state(y_t, y_p)
self.assertEqual(metric_container.metrics[0]._fn,
metrics_mod.sparse_categorical_crossentropy)
metric_container = compile_utils.MetricsContainer('crossentropy')
y_t, y_p = array_ops.ones((10, 20)), array_ops.ones((10, 20))
metric_container.update_state(y_t, y_p)
self.assertEqual(metric_container.metrics[0]._fn,
metrics_mod.categorical_crossentropy)
def test_accuracy(self):
metric_container = compile_utils.MetricsContainer('accuracy')
y_t, y_p = array_ops.ones((10, 1)), array_ops.ones((10, 1))
metric_container.update_state(y_t, y_p)
self.assertEqual(metric_container.metrics[0]._fn,
metrics_mod.binary_accuracy)
metric_container = compile_utils.MetricsContainer('accuracy')
y_t, y_p = array_ops.ones((10, 1)), array_ops.ones((10, 20))
self.assertEqual(y_p.shape.as_list()[-1], 20)
metric_container.update_state(y_t, y_p)
self.assertEqual(metric_container.metrics[0]._fn,
metrics_mod.sparse_categorical_accuracy)
metric_container = compile_utils.MetricsContainer('accuracy')
y_t, y_p = array_ops.ones((10, 20)), array_ops.ones((10, 20))
metric_container.update_state(y_t, y_p)
self.assertEqual(metric_container.metrics[0]._fn,
metrics_mod.categorical_accuracy)
def test_metric_weighting(self):
metric_container = compile_utils.MetricsContainer(
metrics=['mae'], weighted_metrics=['mae'])
y_t = ops.convert_to_tensor_v2_with_dispatch([[0], [3], [0]])
y_p = ops.convert_to_tensor_v2_with_dispatch([[0], [0], [0]])
sw = ops.convert_to_tensor_v2_with_dispatch([[1], [0], [1]])
metric_container.update_state(y_t, y_p, sample_weight=sw)
self.assertLen(metric_container.metrics, 2)
mae_metric = metric_container.metrics[0]
self.assertEqual(mae_metric.name, 'mae')
self.assertEqual(mae_metric.result().numpy(), 1.)
weighted_mae_metric = metric_container.metrics[1]
self.assertEqual(weighted_mae_metric.name, 'weighted_mae')
self.assertEqual(weighted_mae_metric.result().numpy(), 0.)
def test_broadcast_metrics_to_dict(self):
metric_container = compile_utils.MetricsContainer(metrics=['mae'])
y_p = {'output': ops.convert_to_tensor_v2_with_dispatch([[0], [1], [2]])}
y_t = {'output': ops.convert_to_tensor_v2_with_dispatch([[1], [2], [3]])}
metric_container.update_state(y_t, y_p)
mae_metric = metric_container.metrics[0]
self.assertEqual(mae_metric.name, 'mae')
self.assertEqual(mae_metric.result().numpy(), 1.)
def test_broadcast_metrics_to_dict_with_output_names(self):
metric_container = compile_utils.MetricsContainer(
metrics=['mae'], output_names=['output'])
y_p = ops.convert_to_tensor_v2_with_dispatch([[0], [1], [2]])
y_t = {'output': ops.convert_to_tensor_v2_with_dispatch([[1], [2], [3]])}
metric_container.update_state(y_t, y_p)
mae_metric = metric_container.metrics[0]
self.assertEqual(mae_metric.name, 'mae')
self.assertEqual(mae_metric.result().numpy(), 1.)
def test_missing_label_with_no_metrics(self):
# It's ok to exclude a label if that label has no
# losses or metrics associated with it.
metric_container = compile_utils.MetricsContainer(metrics={
'output1': 'mae',
'output3': 'mse'
})
y_p = {
'output1': ops.convert_to_tensor_v2_with_dispatch([[0], [1], [2]]),
'output2': ops.convert_to_tensor_v2_with_dispatch([[3], [4], [5]]),
'output3': ops.convert_to_tensor_v2_with_dispatch([[6], [7], [8]])
}
y_t = {
'output1': ops.convert_to_tensor_v2_with_dispatch([[1], [2], [3]]),
'output3': ops.convert_to_tensor_v2_with_dispatch([[4], [5], [6]])
}
metric_container.update_state(y_t, y_p)
self.assertLen(metric_container.metrics, 2)
mae_metric = metric_container.metrics[0]
self.assertEqual(mae_metric.name, 'output1_mae')
self.assertEqual(mae_metric.result().numpy(), 1.)
mse_metric = metric_container.metrics[1]
self.assertEqual(mse_metric.name, 'output3_mse')
self.assertEqual(mse_metric.result().numpy(), 4.)
def test_metrics_masking(self):
metrics_container = compile_utils.MetricsContainer(
metrics=['mae'], weighted_metrics=['mse'])
y_p = constant_op.constant([[[1], [1]], [[0], [0]]], dtype=dtypes.float32)
y_t = constant_op.constant([[[1], [1]], [[1], [1]]], dtype=dtypes.float32)
y_p._keras_mask = constant_op.constant([[1, 1], [0, 0]],
dtype=dtypes.float32)
metrics_container.update_state(y_t, y_p)
self.assertLen(metrics_container.metrics, 2)
mae_metric = metrics_container.metrics[0]
self.assertEqual(mae_metric.name, 'mae')
self.assertAlmostEqual(mae_metric.result().numpy(), 0)
weighted_mae_metric = metrics_container.metrics[1]
self.assertEqual(weighted_mae_metric.name, 'mse')
self.assertAlmostEqual(weighted_mae_metric.result().numpy(), 0)
def test_metrics_sample_weight(self):
metrics_container = compile_utils.MetricsContainer(
metrics=['mae'], weighted_metrics=['mse'])
y_p = constant_op.constant([[[1], [1]], [[0], [1]]], dtype=dtypes.float32)
y_t = constant_op.constant([[[1], [1]], [[1], [1]]], dtype=dtypes.float32)
sw = constant_op.constant([[.2, .3], [.5, 0]], dtype=dtypes.float32)
metrics_container.update_state(y_t, y_p, sample_weight=sw)
self.assertLen(metrics_container.metrics, 2)
mae_metric = metrics_container.metrics[0]
self.assertEqual(mae_metric.name, 'mae')
self.assertAlmostEqual(mae_metric.result().numpy(), .25) # 1 / 4
weighted_mae_metric = metrics_container.metrics[1]
self.assertEqual(weighted_mae_metric.name, 'mse')
self.assertAlmostEqual(weighted_mae_metric.result().numpy(), .5) # .5 / 1
def test_metrics_masking_sample_weight(self):
metrics_container = compile_utils.MetricsContainer(
metrics=['mae'], weighted_metrics=['mse'])
y_p = constant_op.constant([[[1], [1]], [[0], [1]]], dtype=dtypes.float32)
y_t = constant_op.constant([[[1], [1]], [[1], [1]]], dtype=dtypes.float32)
sw = constant_op.constant([[.3, .2], [.2, .3]], dtype=dtypes.float32)
y_p._keras_mask = constant_op.constant([[1, 0], [1, 0]],
dtype=dtypes.float32)
metrics_container.update_state(y_t, y_p, sample_weight=sw)
self.assertLen(metrics_container.metrics, 2)
mae_metric = metrics_container.metrics[0]
self.assertEqual(mae_metric.name, 'mae')
self.assertAlmostEqual(mae_metric.result().numpy(), .5) # 1 / .5
weighted_mae_metric = metrics_container.metrics[1]
self.assertEqual(weighted_mae_metric.name, 'mse')
self.assertAlmostEqual(weighted_mae_metric.result().numpy(), .2 / .5)
def test_loss_class_as_metric_with_distribution(self):
distribution = one_device_strategy.OneDeviceStrategy('/device:CPU:0')
with distribution.scope():
metric_container = compile_utils.MetricsContainer(
losses_mod.MeanSquaredError())
y_t, y_p = array_ops.ones((10, 5)), array_ops.zeros((10, 5))
metric_container.update_state(y_t, y_p)
self.assertLen(metric_container.metrics, 1)
metric = metric_container.metrics[0]
self.assertEqual(metric.name, 'mean_squared_error')
self.assertEqual(metric.result().numpy(), 1.)
def test_custom_metric_callables(self):
def custom_metric_fn(y_true, y_pred):
return math_ops.reduce_sum(y_true - y_pred)
class CustomMetricClass(object):
def __call__(self, y_true, y_pred):
return math_ops.reduce_sum(y_true - y_pred)
metric_container = compile_utils.MetricsContainer(
[custom_metric_fn, CustomMetricClass()])
y_t, y_p = array_ops.ones((10, 5)), array_ops.zeros((10, 5))
metric_container.update_state(y_t, y_p)
self.assertEqual(metric_container.metrics[0].name, 'custom_metric_fn')
self.assertEqual(metric_container.metrics[1].name, 'custom_metric_class')
if __name__ == '__main__':
ops.enable_eager_execution()
test.main()