Clean up duplicated tests since they are covered in keras_test.py

PiperOrigin-RevId: 226877947
This commit is contained in:
Sourabh Bajaj 2018-12-25 21:06:17 -08:00 committed by TensorFlower Gardener
parent 432f4e9068
commit c343196842

View File

@ -17,7 +17,6 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import os
from absl.testing import parameterized from absl.testing import parameterized
import numpy as np import numpy as np
@ -27,20 +26,12 @@ from tensorflow.contrib.distribute.python import tpu_strategy
from tensorflow.python import keras from tensorflow.python import keras
from tensorflow.python.data.ops import dataset_ops from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.distribute import distribute_lib from tensorflow.python.distribute import distribute_lib
from tensorflow.python.distribute import values
from tensorflow.python.eager import test from tensorflow.python.eager import test
from tensorflow.python.estimator import keras as keras_lib
from tensorflow.python.estimator import run_config as run_config_lib
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import random_seed from tensorflow.python.framework import random_seed
from tensorflow.python.framework import test_util
from tensorflow.python.keras import testing_utils from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import distributed_training_utils from tensorflow.python.keras.engine import distributed_training_utils
from tensorflow.python.keras.optimizer_v2 import gradient_descent as gradient_descent_keras from tensorflow.python.keras.optimizer_v2 import gradient_descent as gradient_descent_keras
from tensorflow.python.ops.parsing_ops import gen_parsing_ops from tensorflow.python.ops.parsing_ops import gen_parsing_ops
from tensorflow.python.platform import gfile
from tensorflow.python.summary.writer import writer_cache
from tensorflow.python.training import gradient_descent from tensorflow.python.training import gradient_descent
from tensorflow.python.training import rmsprop from tensorflow.python.training import rmsprop
@ -359,298 +350,9 @@ def strategy_for_numpy_input_combinations():
mode=['graph']) mode=['graph'])
class TestEstimatorDistributionStrategy(test_util.TensorFlowTestCase,
parameterized.TestCase):
def setUp(self):
self._base_dir = os.path.join(self.get_temp_dir(),
'keras_mirrored_strategy_test')
gfile.MakeDirs(self._base_dir)
self._config = run_config_lib.RunConfig(
tf_random_seed=_RANDOM_SEED, model_dir=self._base_dir)
def tearDown(self):
writer_cache.FileWriterCache.clear()
if os.path.isdir(self._base_dir):
gfile.DeleteRecursively(self._base_dir)
@combinations.generate(combinations.combine(
distribution=[
combinations.mirrored_strategy_with_gpu_and_cpu,
combinations.mirrored_strategy_with_two_gpus,
combinations.core_mirrored_strategy_with_gpu_and_cpu,
combinations.core_mirrored_strategy_with_two_gpus],
mode=['graph']))
def test_train_functional_with_distribution_strategy(self, distribution):
keras_model = simple_functional_model()
keras_model.compile(
loss='categorical_crossentropy',
metrics=[keras.metrics.CategoricalAccuracy()],
optimizer=rmsprop.RMSPropOptimizer(learning_rate=0.01))
config = run_config_lib.RunConfig(tf_random_seed=_RANDOM_SEED,
model_dir=self._base_dir,
train_distribute=distribution,
eval_distribute=distribution)
with self.cached_session():
est_keras = keras_lib.model_to_estimator(
keras_model=keras_model, config=config)
before_eval_results = est_keras.evaluate(
input_fn=get_ds_test_input_fn, steps=1)
est_keras.train(input_fn=get_ds_train_input_fn, steps=_TRAIN_SIZE / 16)
after_eval_results = est_keras.evaluate(input_fn=get_ds_test_input_fn,
steps=1)
self.assertLess(after_eval_results['loss'], before_eval_results['loss'])
writer_cache.FileWriterCache.clear()
gfile.DeleteRecursively(self._config.model_dir)
@combinations.generate(combinations.combine(
distribution=[
combinations.mirrored_strategy_with_gpu_and_cpu,
combinations.mirrored_strategy_with_two_gpus,
combinations.core_mirrored_strategy_with_gpu_and_cpu,
combinations.core_mirrored_strategy_with_two_gpus],
mode=['graph']))
def test_train_sequential_with_distribution_strategy(self, distribution):
keras_model = simple_sequential_model()
keras_model.compile(
loss='categorical_crossentropy',
metrics=[keras.metrics.CategoricalAccuracy()],
optimizer=rmsprop.RMSPropOptimizer(learning_rate=0.01))
config = run_config_lib.RunConfig(tf_random_seed=_RANDOM_SEED,
model_dir=self._base_dir,
train_distribute=distribution)
with self.cached_session():
est_keras = keras_lib.model_to_estimator(
keras_model=keras_model, config=config)
before_eval_results = est_keras.evaluate(
input_fn=get_ds_test_input_fn, steps=1)
est_keras.train(input_fn=get_ds_train_input_fn, steps=_TRAIN_SIZE / 16)
after_eval_results = est_keras.evaluate(input_fn=get_ds_test_input_fn,
steps=1)
self.assertLess(after_eval_results['loss'], before_eval_results['loss'])
writer_cache.FileWriterCache.clear()
gfile.DeleteRecursively(self._config.model_dir)
@combinations.generate(combinations.combine(
distribution=[
combinations.mirrored_strategy_with_gpu_and_cpu,
combinations.core_mirrored_strategy_with_gpu_and_cpu],
mode=['graph']))
def test_multi_inputs_multi_outputs_with_input_fn_as_dict(self, distribution):
train_data, test_data = get_multi_inputs_multi_outputs_data()
def train_input_fn():
input_dict = {
'input_a': train_data['input_a'],
'input_b': train_data['input_b'],
'input_m': train_data['input_m'].astype(np.str)
}
output_dict = {
'dense_2': train_data['output_c'],
'dense_3': train_data['output_d']
}
return dataset_ops.Dataset.from_tensor_slices((input_dict,
output_dict)).batch(16)
def eval_input_fn():
input_dict = {
'input_a': test_data['input_a'],
'input_b': test_data['input_b'],
'input_m': test_data['input_m'].astype(np.str)
}
output_dict = {
'dense_2': test_data['output_c'],
'dense_3': test_data['output_d']
}
return dataset_ops.Dataset.from_tensor_slices((input_dict,
output_dict)).batch(16)
self.do_test_multi_inputs_multi_outputs_with_input_fn(
distribution, train_input_fn, eval_input_fn)
def do_test_multi_inputs_multi_outputs_with_input_fn(
self, distribution, train_input_fn, eval_input_fn):
config = run_config_lib.RunConfig(
tf_random_seed=_RANDOM_SEED,
model_dir=self._base_dir,
train_distribute=distribution)
with self.cached_session():
model = multi_inputs_multi_outputs_model()
est_keras = keras_lib.model_to_estimator(keras_model=model, config=config)
baseline_eval_results = est_keras.evaluate(
input_fn=eval_input_fn, steps=1)
est_keras.train(input_fn=train_input_fn, steps=_TRAIN_SIZE / 16)
eval_results = est_keras.evaluate(input_fn=eval_input_fn, steps=1)
self.assertLess(eval_results['loss'], baseline_eval_results['loss'])
@combinations.generate(combinations.combine(
distribution=[
combinations.mirrored_strategy_with_gpu_and_cpu,
combinations.core_mirrored_strategy_with_gpu_and_cpu],
mode=['graph']))
def test_keras_optimizer_with_distribution_strategy(self, distribution):
keras_model = simple_sequential_model()
keras_model.compile(
loss='categorical_crossentropy',
optimizer=keras.optimizers.rmsprop(lr=0.01))
config = run_config_lib.RunConfig(tf_random_seed=_RANDOM_SEED,
model_dir=self._base_dir,
train_distribute=distribution)
with self.cached_session():
est_keras = keras_lib.model_to_estimator(keras_model=keras_model,
config=config)
with self.assertRaisesRegexp(ValueError,
'Only TensorFlow native optimizers are '
'supported with DistributionStrategy.'):
est_keras.train(input_fn=get_ds_train_input_fn, steps=_TRAIN_SIZE / 16)
writer_cache.FileWriterCache.clear()
gfile.DeleteRecursively(self._config.model_dir)
class TestDistributionStrategyWithNumpyArrays(test.TestCase, class TestDistributionStrategyWithNumpyArrays(test.TestCase,
parameterized.TestCase): parameterized.TestCase):
@combinations.generate(strategy_for_numpy_input_combinations())
def test_creating_var_with_numpy_arrays(self, distribution):
with self.cached_session():
x = np.asarray(np.random.random((64, 3)), dtype=np.float32)
var_x = distributed_training_utils.get_var_for_numpy(distribution, x)
val = self.evaluate(var_x.value())
# Verify that the numpy value is copied to the variable.
self.assertAllEqual(x, val)
@combinations.generate(strategy_for_numpy_input_combinations())
def test_calculating_input_params_no_steps_no_batch_size(self, distribution):
# Calculate the per_replica_batch_size scaling factor for strategies
# that use per_core_batch_size
replica_scale_factor = 1.0
if not distributed_training_utils.global_batch_size_supported(distribution):
replica_scale_factor = distribution.num_replicas_in_sync
with self.cached_session():
# Input samples of different sizes
input_20_samples = np.zeros((20, 3), dtype=np.float32)
input_63_samples = np.zeros((63, 3), dtype=np.float32)
input_64_samples = np.zeros((64, 3), dtype=np.float32)
# Default global batch size 32 for input with 64 samples run in 2 steps
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=None, batch_size=None)
self.assertEqual(batch_size, 32 // replica_scale_factor)
self.assertEqual(steps, 2)
# Computed global batch size 20 is lower than 32 if we pass less samples.
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_20_samples, steps=None, batch_size=None)
self.assertEqual(batch_size, 20 // replica_scale_factor)
self.assertEqual(steps, 1)
# Default global batch size 32 cannot be used with 63 samples.
with self.assertRaisesRegexp(ValueError, 'not divisible by batch size'):
distributed_training_utils.get_input_params(
distribution, input_63_samples, steps=None, batch_size=None)
@combinations.generate(strategy_for_numpy_input_combinations())
def test_calculating_input_params_with_steps_no_batch_size(self,
distribution):
# Calculate the per_replica_batch_size scaling factor for strategies
# that use per_core_batch_size
replica_scale_factor = 1.0
if not distributed_training_utils.global_batch_size_supported(distribution):
replica_scale_factor = distribution.num_replicas_in_sync
with self.cached_session():
# Input samples of different sizes
input_63_samples = np.zeros((63, 3), dtype=np.float32)
input_64_samples = np.zeros((64, 3), dtype=np.float32)
# Computed global batch size is correct for number of specified 1 step
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=1, batch_size=None)
self.assertEqual(batch_size, 64 // replica_scale_factor)
self.assertEqual(steps, 1)
# Computed global batch size is correct for number of specified 2 steps
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=2, batch_size=None)
self.assertEqual(batch_size, 32 // replica_scale_factor)
self.assertEqual(steps, 2)
# All samples can not be consumed in specified number of steps
with self.assertRaisesRegexp(ValueError, 'not divisible by steps'):
distributed_training_utils.get_input_params(
distribution, input_63_samples, steps=2, batch_size=None)
# This cases is different for different strategies due to the
# difference in supported batch size being global or per-replica.
if replica_scale_factor == 1:
# Computed global batch size is correct even if not sharadable
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_63_samples, steps=3, batch_size=None)
self.assertEqual(batch_size, 21)
self.assertEqual(steps, 3)
else:
# Computed global batch size can not be sharded across replicas
with self.assertRaisesRegexp(ValueError, 'could not be sharded evenly '
'across the sync replicas'):
distributed_training_utils.get_input_params(
distribution, input_63_samples, steps=1, batch_size=None)
@combinations.generate(strategy_for_numpy_input_combinations())
def test_calculating_input_params_no_steps_with_batch_size(self,
distribution):
# Calculate the per_replica_batch_size scaling factor for strategies
# that use per_core_batch_size
replica_scale_factor = 1.0
if not distributed_training_utils.global_batch_size_supported(distribution):
replica_scale_factor = distribution.num_replicas_in_sync
with self.cached_session():
input_64_samples = np.zeros((64, 3), dtype=np.float32)
# Computed steps is correct for specified batch size
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=None, batch_size=16)
self.assertEqual(batch_size, 16)
self.assertEqual(steps, 4 // replica_scale_factor)
# Computed steps is correct for specified batch size
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=None, batch_size=32)
self.assertEqual(batch_size, 32)
self.assertEqual(steps, 2 // replica_scale_factor)
# Number of samples is not divisible by the global batch size
with self.assertRaisesRegexp(ValueError, 'not divisible by batch size'):
distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=None, batch_size=20)
# Number of samples is not divisible by the global batch size
with self.assertRaisesRegexp(ValueError, 'not divisible by batch size'):
distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=None, batch_size=3)
@combinations.generate(strategy_for_numpy_input_combinations())
def test_calculating_input_params_with_steps_with_batch_size(self,
distribution):
with self.cached_session():
input_64_samples = np.zeros((64, 3), dtype=np.float32)
# No change to steps and batch size if both specified and feasible
steps, batch_size = distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=5, batch_size=3)
self.assertEqual(batch_size, 3)
self.assertEqual(steps, 5)
# Number of samples is less than global batch size * steps
with self.assertRaisesRegexp(ValueError, 'less than samples required'):
distributed_training_utils.get_input_params(
distribution, input_64_samples, steps=10, batch_size=13)
@combinations.generate(strategy_for_numpy_input_combinations()) @combinations.generate(strategy_for_numpy_input_combinations())
def test_calling_model_with_numpy_arrays(self, distribution): def test_calling_model_with_numpy_arrays(self, distribution):
with self.cached_session(): with self.cached_session():
@ -1048,54 +750,6 @@ class TestDistributionStrategyWithDatasets(test.TestCase,
class TestDistributionStrategyErrorCases(test.TestCase, parameterized.TestCase): class TestDistributionStrategyErrorCases(test.TestCase, parameterized.TestCase):
@combinations.generate(combinations.combine(
distribution=[
combinations.mirrored_strategy_with_gpu_and_cpu,
combinations.core_mirrored_strategy_with_gpu_and_cpu],
mode=['graph', 'eager']))
def test_validating_dataset_input_tensors_with_shape_mismatch(self,
distribution):
with self.cached_session():
a = constant_op.constant([1, 2], shape=(1, 2))
b = constant_op.constant([[1, 2], [1, 2]], shape=(2, 2))
device_map = values.ReplicaDeviceMap(('/device:CPU:0', '/device:GPU:0'))
x = values.DistributedValues(device_map, (a, b))
y = values.DistributedValues(device_map, (a, a))
with distribution.scope():
# Removed device and input tensor shape details from the error message
# since the order of the device and the corresponding input tensor shape
# is not deterministic over different runs.
with self.assertRaisesRegexp(ValueError,
'Input tensor shapes do not match for '
'distributed tensor inputs '
'DistributedValues:.+'):
distributed_training_utils.validate_distributed_dataset_inputs(
distribution, x, y)
@combinations.generate(combinations.combine(
distribution=[
combinations.mirrored_strategy_with_gpu_and_cpu,
combinations.core_mirrored_strategy_with_gpu_and_cpu],
mode=['graph', 'eager']))
def test_validating_dataset_input_tensors_with_dtype_mismatch(self,
distribution):
with self.cached_session():
a = constant_op.constant([1, 2], shape=(1, 2), dtype=dtypes.int32)
b = constant_op.constant([1, 2], shape=(1, 2), dtype=dtypes.float64)
device_map = values.ReplicaDeviceMap(('/device:CPU:0', '/device:GPU:0'))
x = values.DistributedValues(device_map, (a, b))
y = values.DistributedValues(device_map, (a, a))
with distribution.scope():
# Removed device and input tensor dtype details from the error message
# since the order of the device and the corresponding input tensor dtype
# is not deterministic over different runs.
with self.assertRaisesRegexp(ValueError,
'Input tensor dtypes do not match for '
'distributed tensor inputs '
'DistributedValues:.+'):
distributed_training_utils.validate_distributed_dataset_inputs(
distribution, x, y)
@combinations.generate(combinations.combine( @combinations.generate(combinations.combine(
distribution=[ distribution=[
combinations.mirrored_strategy_with_gpu_and_cpu, combinations.mirrored_strategy_with_gpu_and_cpu,