STT-tensorflow/tensorflow/python/distribute/combinations_test.py
Ran Chen 64f4a59d5e Add multi worker mirrored strategy combinations to enable easier testing again
The previous was rolled back because it breaks windows builds.

The change adds has_chief and num_workers to NamedDistribution. If there're more than one workers (chief+workers), the distribute flavor of combinations library will run the test in multiple processes to simulate a multi worker setup. Users are required to call combinations.main() instead of test.main().

Note that it's the same as running the test concurrently in multiple processes. You're expected to program your test in the same as if you're writing a multi client program. There's no way to get the return value from all processes. If you need that, use multi_process_runner directly.

This is very slow at this moment. Do not use it for a large number of tests.

PiperOrigin-RevId: 305136390
Change-Id: I814323cccaee5b8b7eed3b4dfef2971ab9d09cb4
2020-04-06 15:59:19 -07:00

152 lines
6.2 KiB
Python

# Lint as: python3
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for tensorflow.python.distribute.combinations."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
from absl.testing import parameterized
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute.cluster_resolver import tfconfig_cluster_resolver
from tensorflow.python.framework import combinations as framework_combinations
from tensorflow.python.platform import test
class ClusterParametersTest(test.TestCase, parameterized.TestCase):
# For this test we need to use `framework.test_combinations` because our
# `generate` eats the cluster parameters.
#
# Note that we don't have a standalone combination for ClusterParameters, so
# we should use GPUCombination which contains it.
@framework_combinations.generate(
framework_combinations.combine(distribution=[
combinations.NamedDistribution(
"HasClusterParams", lambda: None, has_chief=True, num_workers=2),
]),
test_combinations=(combinations.GPUCombination(),))
def testClusterParams(self, distribution, has_chief, num_workers):
self.assertTrue(has_chief)
self.assertEqual(num_workers, 2)
@framework_combinations.generate(
framework_combinations.combine(distribution=[
combinations.NamedDistribution("NoClusterParams", lambda: None),
]),
test_combinations=(combinations.GPUCombination(),))
def testClusterParamsHasDefault(self, distribution, has_chief, num_workers):
self.assertFalse(has_chief)
self.assertEqual(num_workers, 1)
@framework_combinations.generate(
framework_combinations.combine(v=1),
test_combinations=(combinations.GPUCombination(),))
def testClusterParamsNoStrategy(self, v, has_chief, num_workers):
self.assertFalse(has_chief)
self.assertEqual(num_workers, 1)
@framework_combinations.generate(
framework_combinations.combine(distribution=[
combinations.NamedDistribution(
"WithClusterParams", lambda: None, has_chief=True, num_workers=2),
combinations.NamedDistribution("WithoutClusterParams", lambda: None),
]),
test_combinations=(combinations.GPUCombination(),))
def testClusterParamsAreOptional(self, distribution):
# If combinations library doesn't raise an exception, the test is passed.
pass
@framework_combinations.generate(
framework_combinations.combine(
ds1=combinations.NamedDistribution(
"Strategy1", lambda: None, has_chief=True, num_workers=0),
ds2=combinations.NamedDistribution(
"Strategy2", lambda: None, has_chief=False, num_workers=1),
ds3=combinations.NamedDistribution(
"Strategy3", lambda: None, has_chief=True, num_workers=0),
),
test_combinations=(combinations.GPUCombination(),))
def testMultipleDistributionSingleWorker(self, ds1, ds2, ds3):
# If combinations library doesn't raise an exception, the test is passed.
pass
# unittest.expectedFailure doesn't work with parameterized test methods, so we
# have to decorate the class instead.
@unittest.expectedFailure
class ClusterParametersShouldFailTest(test.TestCase, parameterized.TestCase):
@framework_combinations.generate(
framework_combinations.combine(
ds1=combinations.NamedDistribution(
"Strategy1", lambda: None, has_chief=True, num_workers=2),
ds2=combinations.NamedDistribution(
"Strategy2", lambda: None, has_chief=True, num_workers=2),
),
test_combinations=(combinations.GPUCombination(),))
def testMultipleDistributionMultiWorker(self, ds1, ds2):
# combinations library should raise an exception.
pass
# Tests that we *actually* run the test method in multiple workers instead of
# just passing silently. More importantly, it verifies that the test can fail.
# Note that unittest.expectedFailure doesn't work with parameterized test
# methods, so we have to decorate the class instead.
@unittest.expectedFailure
class CombinationsExpectedFailureTest(test.TestCase, parameterized.TestCase):
@combinations.generate(
combinations.combine(distribution=[
combinations.NamedDistribution(
"OneChiefOneWorker", lambda: None, has_chief=True, num_workers=1),
combinations.NamedDistribution(
"TwoWorkers", lambda: None, has_chief=False, num_workers=2),
]))
def testMultiWorkerCanFail(self, distribution):
resolver = tfconfig_cluster_resolver.TFConfigClusterResolver()
# This should fail.
self.assertIsNone(resolver.task_id)
# Tests that we *actually* run the test method in multiple workers instead of
# just passing silently. More importantly, it verifies that the test can fail.
# Note that unittest.expectedFailure doesn't work with parameterized test
# methods, so we have to decorate the class instead.
@unittest.expectedFailure
@combinations.generate(
combinations.combine(distribution=[
combinations.NamedDistribution(
"OneChiefOneWorker", lambda: None, has_chief=True, num_workers=1),
combinations.NamedDistribution(
"TwoWorkers", lambda: None, has_chief=False, num_workers=2),
]))
class CombinationsOnClassMultiWorkerExpectedFailureTest(test.TestCase,
parameterized.TestCase):
def test(self, distribution):
resolver = tfconfig_cluster_resolver.TFConfigClusterResolver()
# This should fail.
self.assertIsNone(resolver.task_id)
if __name__ == "__main__":
combinations.main()