diff --git a/tensorflow/python/distribute/BUILD b/tensorflow/python/distribute/BUILD index 9c4da2becb6..ab50ec8810e 100644 --- a/tensorflow/python/distribute/BUILD +++ b/tensorflow/python/distribute/BUILD @@ -908,6 +908,9 @@ tf_py_test( name = "multi_worker_test_base_test", srcs = ["multi_worker_test_base_test.py"], srcs_version = "PY2AND3", + tags = [ + "no_oss", # TODO(b/170834611) + ], deps = [ ":multi_worker_test_base", ], diff --git a/tensorflow/python/distribute/integration_test/BUILD b/tensorflow/python/distribute/integration_test/BUILD index d3a71e0136d..c7c70776826 100644 --- a/tensorflow/python/distribute/integration_test/BUILD +++ b/tensorflow/python/distribute/integration_test/BUILD @@ -33,6 +33,7 @@ cuda_py_test( shard_count = 2, tags = [ "multi_and_single_gpu", + "no_oss", # TODO(b/170838851): UnavailableError: Connection reset by peer ], deps = [ "//tensorflow:tensorflow_py", diff --git a/tensorflow/python/distribute/multi_process_lib.py b/tensorflow/python/distribute/multi_process_lib.py index 89021448eb2..7cbfc2c7248 100644 --- a/tensorflow/python/distribute/multi_process_lib.py +++ b/tensorflow/python/distribute/multi_process_lib.py @@ -12,41 +12,144 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""OSS multi-process library to be implemented.""" +"""Library for multi-process testing.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function - -import multiprocessing as _multiprocessing +import multiprocessing import os +import sys import unittest +from absl import app from tensorflow.python.eager import test -try: - multiprocessing = _multiprocessing.get_context('forkserver') -except ValueError: - # forkserver is not available on Windows. - multiprocessing = _multiprocessing.get_context('spawn') - - -class Process(object): - """A process simulating a worker for testing multi-worker training.""" +class _AbslProcess: + """A process that runs using absl.app.run.""" def __init__(self, *args, **kwargs): - del args, kwargs - raise unittest.SkipTest( - 'TODO(b/150264776): Implement OSS version of `multi_process_lib`') + super(_AbslProcess, self).__init__(*args, **kwargs) + # Monkey-patch that is carried over into the spawned process by pickle. + self._run_impl = getattr(self, 'run') + self.run = self._run_with_absl + + def _run_with_absl(self): + app.run(lambda _: self._run_impl()) + + +if sys.platform != 'win32': + + class AbslForkServerProcess(_AbslProcess, + multiprocessing.context.ForkServerProcess): + """An absl-compatible Forkserver process. + + Note: Forkserver is not available in windows. + """ + + class AbslForkServerContext(multiprocessing.context.ForkServerContext): + _name = 'absl_forkserver' + Process = AbslForkServerProcess # pylint: disable=invalid-name + + multiprocessing = AbslForkServerContext() + Process = multiprocessing.Process + +else: + + class Process(object): + """A process that skips test (until windows is supported).""" + + def __init__(self, *args, **kwargs): + del args, kwargs + raise unittest.SkipTest( + 'TODO(b/150264776): Windows is not supported in MultiProcessRunner.') + + +_test_main_called = False + + +def _set_spawn_exe_path(): + """Set the path to the executable for spawned processes. + + This utility searches for the binary the parent process is using, and sets + the executable of multiprocessing's context accordingly. + + Raises: + RuntimeError: If the binary path cannot be determined. + """ + # TODO(b/150264776): This does not work with Windows. Find a solution. + if sys.argv[0].endswith('.py'): + # If all we have is a python module path, we'll need to make a guess for + # the actual executable path. Since the binary path may correspond to the + # parent's path of the python module, we are making guesses by reducing + # directories one at a time. E.g., + # tensorflow/python/some/path/my_test.py + # -> tensorflow/python/some/path/my_test + # -> tensorflow/python/some/my_test + # -> tensorflow/python/my_test + path_to_use = None + guess_path = sys.argv[0][:-3] + guess_path = guess_path.split(os.sep) + for path_reduction in range(-1, -len(guess_path), -1): + possible_path = os.sep.join(guess_path[:path_reduction] + + [guess_path[-1]]) + if os.access(possible_path, os.X_OK): + path_to_use = possible_path + break + # The binary can possibly have _gpu suffix. + possible_path += '_gpu' + if os.access(possible_path, os.X_OK): + path_to_use = possible_path + break + if path_to_use is None: + raise RuntimeError('Cannot determine binary path') + sys.argv[0] = path_to_use + # Note that this sets the executable for *all* contexts. + multiprocessing.get_context().set_executable(sys.argv[0]) + + +def _if_spawn_run_and_exit(): + """If spawned process, run requested spawn task and exit. Else a no-op.""" + + # `multiprocessing` module passes a script "from multiprocessing.x import y" + # to subprocess, followed by a main function call. We use this to tell if + # the process is spawned. Examples of x are "forkserver" or + # "semaphore_tracker". + is_spawned = ('-c' in sys.argv[1:] and + sys.argv[sys.argv.index('-c') + + 1].startswith('from multiprocessing.')) + + if not is_spawned: + return + cmd = sys.argv[sys.argv.index('-c') + 1] + # As a subprocess, we disregarding all other interpreter command line + # arguments. + sys.argv = sys.argv[0:1] + + # Run the specified command - this is expected to be one of: + # 1. Spawn the process for semaphore tracker. + # 2. Spawn the initial process for forkserver. + # 3. Spawn any process as requested by the "spawn" method. + exec(cmd) # pylint: disable=exec-used + sys.exit(0) # Semaphore tracker doesn't explicitly sys.exit. def test_main(): """Main function to be called within `__main__` of a test file.""" + global _test_main_called + _test_main_called = True + os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true' + + if sys.platform != 'win32': + _set_spawn_exe_path() + _if_spawn_run_and_exit() + + # Only runs test.main() if not spawned process. test.main() def initialized(): """Returns whether the module is initialized.""" - return True + return _test_main_called diff --git a/tensorflow/python/distribute/multi_process_runner.py b/tensorflow/python/distribute/multi_process_runner.py index 107aa1a6a48..4c72969c995 100644 --- a/tensorflow/python/distribute/multi_process_runner.py +++ b/tensorflow/python/distribute/multi_process_runner.py @@ -699,6 +699,8 @@ class MultiProcessRunner(object): sig = sig or getattr(signal, 'SIGKILL', signal.SIGTERM) for (task_type, task_id), p in self._processes.items(): if p.exitcode is not None: + logging.info('%s-%d has already exited. Not terminating.', task_type, + task_id) continue try: os.kill(p.pid, sig) @@ -866,6 +868,11 @@ def _shutdown_all_pool_runners(): pool.shutdown() +def is_oss(): + """Returns whether the test is run under OSS.""" + return len(sys.argv) >= 1 and 'bazel' in sys.argv[0] + + class MultiProcessPoolRunner(object): """A utility class to start a process pool to simulate a cluster. @@ -919,6 +926,9 @@ class MultiProcessPoolRunner(object): if dill is None: raise unittest.SkipTest( 'TODO(b/150264776): Resolve dependency issue in CI') + if is_oss(): + raise unittest.SkipTest( + 'TODO(b/170360740): MultiProcessPoolRunner timing out in OSS') self._runner = MultiProcessRunner( fn=lambda: None, diff --git a/tensorflow/python/distribute/multi_process_runner_test.py b/tensorflow/python/distribute/multi_process_runner_test.py index c164bd490e3..5f017b97379 100644 --- a/tensorflow/python/distribute/multi_process_runner_test.py +++ b/tensorflow/python/distribute/multi_process_runner_test.py @@ -331,7 +331,8 @@ class MultiProcessRunnerTest(test.TestCase): self.assertIn('Subprocess worker-0 exited with exit code', str(cm.exception)) list_to_assert = cm.exception.mpr_result.stdout - self.assertTrue(any('SIGSEGV' in line for line in list_to_assert)) + self.assertTrue( + any('Segmentation fault' in line for line in list_to_assert)) def test_seg_fault_in_chief_raises_error(self): @@ -350,7 +351,8 @@ class MultiProcessRunnerTest(test.TestCase): self.assertIn('Subprocess chief-0 exited with exit code', str(cm.exception)) list_to_assert = cm.exception.mpr_result.stdout - self.assertTrue(any('SIGSEGV' in line for line in list_to_assert)) + self.assertTrue( + any('Segmentation fault' in line for line in list_to_assert)) def test_exit_code_is_reported_by_chief_subprocess(self): @@ -579,9 +581,13 @@ class MultiProcessPoolRunnerTest(test.TestCase): self.assertAllEqual(result, [1, 1]) def test_global_pool(self): + if multi_process_runner.is_oss(): + self.skipTest('TODO(b/170360740): Failing in OSS') _global_pool.run(fn_that_does_nothing) def test_nested_pool(self): + if multi_process_runner.is_oss(): + self.skipTest('TODO(b/170360740): Failing in OSS') def fn(): # This runs in sub processes, so they are each using their own diff --git a/tensorflow/python/keras/distribute/collective_all_reduce_strategy_test.py b/tensorflow/python/keras/distribute/collective_all_reduce_strategy_test.py index f22a954d78d..313cb14c78b 100644 --- a/tensorflow/python/keras/distribute/collective_all_reduce_strategy_test.py +++ b/tensorflow/python/keras/distribute/collective_all_reduce_strategy_test.py @@ -309,51 +309,53 @@ class LocalCollectiveAllReduceStrategy( with policy.policy_scope('mixed_float16'): self._test_mixed_precision(None, None, required_gpus) +# TODO(b/170360740): Timeout in OSS +if not multi_process_runner.is_oss(): -@ds_combinations.generate( - combinations.combine( - strategy=[ - strategy_combinations.multi_worker_mirrored_2x1_cpu, - strategy_combinations.multi_worker_mirrored_2x1_gpu, - ], - mode=['eager'])) -class DistributedCollectiveAllReduceStrategyEagerTest(test.TestCase, - parameterized.TestCase): + @ds_combinations.generate( + combinations.combine( + strategy=[ + strategy_combinations.multi_worker_mirrored_2x1_cpu, + strategy_combinations.multi_worker_mirrored_2x1_gpu, + ], + mode=['eager'])) + class DistributedCollectiveAllReduceStrategyEagerTest(test.TestCase, + parameterized.TestCase): - def testFitWithoutStepsPerEpochPartialBatch(self, strategy): + def testFitWithoutStepsPerEpochPartialBatch(self, strategy): - def _model_fn(): - x = layers.Input(shape=(1,), name='input') - y = layers.Dense(1, name='dense')(x) - model = training.Model(x, y) - return model + def _model_fn(): + x = layers.Input(shape=(1,), name='input') + y = layers.Dense(1, name='dense')(x) + model = training.Model(x, y) + return model - def _get_dataset(): - inputs = array_ops.expand_dims_v2(constant_op.constant(range(10)), axis=1) - targets = array_ops.expand_dims_v2( - constant_op.constant(range(10)), axis=1) - # Make global batch size 12 for 2 replicas and a non-repeated dataset with - # 10 elements so that we have partial batch - dataset = dataset_ops.Dataset.from_tensor_slices( - (inputs, targets)).batch(12, drop_remainder=False) - return dataset + def _get_dataset(): + inputs = array_ops.expand_dims_v2( + constant_op.constant(range(10)), axis=1) + targets = array_ops.expand_dims_v2( + constant_op.constant(range(10)), axis=1) + # Make global batch size 12 for 2 replicas and a non-repeated dataset + # with 10 elements so that we have partial batch + dataset = dataset_ops.Dataset.from_tensor_slices( + (inputs, targets)).batch( + 12, drop_remainder=False) + return dataset + + with strategy.scope(): + optimizer_fn = gradient_descent_keras.SGD + optimizer = optimizer_fn(0.001) + model = _model_fn() + loss = 'mse' + metrics = ['mae'] + model.compile(optimizer, loss, metrics=metrics) + dataset = _get_dataset() + kernel_before = model.get_weights()[0][0] + model.fit(dataset, epochs=10) + kernel_after = model.get_weights()[0][0] + self.assertNotEqual(kernel_before, kernel_after) + self.assertGreater(abs(kernel_before - 1), abs(kernel_after - 1)) - with strategy.scope(): - optimizer_fn = gradient_descent_keras.SGD - optimizer = optimizer_fn(0.001) - model = _model_fn() - loss = 'mse' - metrics = ['mae'] - model.compile( - optimizer, - loss, - metrics=metrics) - dataset = _get_dataset() - kernel_before = model.get_weights()[0][0] - model.fit(dataset, epochs=10) - kernel_after = model.get_weights()[0][0] - self.assertNotEqual(kernel_before, kernel_after) - self.assertGreater(abs(kernel_before-1), abs(kernel_after-1)) if __name__ == '__main__': v2_compat.enable_v2_behavior() diff --git a/tensorflow/python/keras/distribute/multi_worker_callback_tf2_test.py b/tensorflow/python/keras/distribute/multi_worker_callback_tf2_test.py index d051eb5932a..fcef87faa9c 100644 --- a/tensorflow/python/keras/distribute/multi_worker_callback_tf2_test.py +++ b/tensorflow/python/keras/distribute/multi_worker_callback_tf2_test.py @@ -183,6 +183,8 @@ class KerasCallbackMultiProcessTest(parameterized.TestCase, test.TestCase): def proc_model_checkpoint_works_with_same_file_path(test_obj, saving_filepath): + if multi_process_runner.is_oss(): + test_obj.skipTest('TODO(b/170838633): Failing in OSS') model, _, train_ds, steps = _model_setup(test_obj, file_format='') num_epoch = 4 diff --git a/tensorflow/python/kernel_tests/collective_ops_multi_worker_test.py b/tensorflow/python/kernel_tests/collective_ops_multi_worker_test.py index 95143cc233a..47e296a384b 100644 --- a/tensorflow/python/kernel_tests/collective_ops_multi_worker_test.py +++ b/tensorflow/python/kernel_tests/collective_ops_multi_worker_test.py @@ -73,6 +73,9 @@ class CollectiveOpTest(test.TestCase): def testCheckHealthPeerDown(self): + if multi_process_runner.is_oss(): + self.skipTest("TODO(b/170838845): Failing in OSS") + def worker_fn(): enable_collective_ops(cluster_resolver_lib.TFConfigClusterResolver()) context.context().check_collective_ops_peer_health(