From 355b4f2fc5b7e732cc982102cd6b0c715e60fa7c Mon Sep 17 00:00:00 2001 From: Rick Chao Date: Fri, 26 Jun 2020 16:37:33 -0700 Subject: [PATCH] MultiProcessRunner: Update run_contained to not return _ProcessStatusInfo in finally block; if sys.exit() is called in a subprocess, a SystemExit is raised, in which case we do not expect a _ProcessStatusInfo to be returned. Minor change of logic in join(). PiperOrigin-RevId: 318565799 Change-Id: I9c468bd801515cd53f08c06b28e97638a735cd8b --- .../python/distribute/multi_process_runner.py | 27 +++++----- .../distribute/multi_process_runner_test.py | 50 ++++++++++++++----- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/tensorflow/python/distribute/multi_process_runner.py b/tensorflow/python/distribute/multi_process_runner.py index 68a407d2b22..89162b50f4b 100644 --- a/tensorflow/python/distribute/multi_process_runner.py +++ b/tensorflow/python/distribute/multi_process_runner.py @@ -487,12 +487,6 @@ class MultiProcessRunner(object): logging.info('%s-%d exit code: %s', task_type, task_id, p.exitcode) process_statuses = self._queue_to_list(self._process_status_queue) - if not self._all_forced_terminated and len( - process_statuses) != self._outstanding_subprocess_count: - raise UnexpectedSubprocessExitError( - 'Missing status(es) from %d subprocess(es). See logs for details.' % - (self._outstanding_subprocess_count - len(process_statuses)), - self._get_mpr_result(process_statuses)) for process_status in process_statuses: assert isinstance(process_status, _ProcessStatusInfo) if not process_status.is_successful: @@ -500,12 +494,12 @@ class MultiProcessRunner(object): # Checking all the processes that are expected to exit properly. for (task_type, task_id), p in self._processes.items(): - if self._dependence_on_chief and task_type != 'chief': + if self._dependence_on_chief and chief and task_type != 'chief': # If _dependence_on_chief, other processes may have been # forced-terminated, which is expected. continue # Successfully exiting process has exit code 0. - if p.exitcode > 0: + if p.exitcode is None or p.exitcode > 0: raise UnexpectedSubprocessExitError( 'Subprocess %s-%d exited with exit code %d. See logs for details.' % (task_type, task_id, p.exitcode), @@ -844,17 +838,24 @@ def _run_contained(proc_func, args, kwargs): Returns: a _ProcessStatusInfo. + """ + is_successful = False + return_value = None + exc_info = None try: return_value = proc_func(*args, **kwargs) is_successful = True - exc_info = None + return _ProcessStatusInfo( + is_successful=is_successful, + exc_info=exc_info, + return_value=return_value) + + # If `proc_func` ends up exiting with `sys.exit()`, the `SystemExit` is not + # handled here. except Exception: # pylint: disable=broad-except - return_value = None - is_successful = False exc_info = sys.exc_info() - finally: - return _ProcessStatusInfo( # pylint: disable=lost-exception + return _ProcessStatusInfo( is_successful=is_successful, exc_info=exc_info, return_value=return_value) diff --git a/tensorflow/python/distribute/multi_process_runner_test.py b/tensorflow/python/distribute/multi_process_runner_test.py index acec6d0c999..a6219dc5322 100644 --- a/tensorflow/python/distribute/multi_process_runner_test.py +++ b/tensorflow/python/distribute/multi_process_runner_test.py @@ -156,7 +156,11 @@ class MultiProcessRunnerTest(test.TestCase): mpr.start() time.sleep(5) mpr.terminate('worker', 0) - std_stream_results = mpr.join().stdout + with self.assertRaises( + multi_process_runner.UnexpectedSubprocessExitError) as cm: + mpr.join() + + std_stream_results = cm.exception.mpr_result.stdout # Worker 0 is terminated in the middle, so it should not have iteration 9 # printed. @@ -327,7 +331,7 @@ class MultiProcessRunnerTest(test.TestCase): proc_func_expected_to_seg_fault, multi_worker_test_base.create_cluster_spec(num_workers=1), list_stdout=True) - self.assertIn('Missing status(es) from 1 subprocess(es).', + self.assertIn('Subprocess worker-0 exited with exit code', str(cm.exception)) list_to_assert = cm.exception.mpr_result.stdout self.assertTrue(any('SIGSEGV' in line for line in list_to_assert)) @@ -351,20 +355,40 @@ class MultiProcessRunnerTest(test.TestCase): list_to_assert = cm.exception.mpr_result.stdout self.assertTrue(any('SIGSEGV' in line for line in list_to_assert)) - def test_non_zero_exit_code_raises_error(self): + def test_exit_code_is_reported_by_chief_subprocess(self): - def proc_func_expected_to_exit_with_1(): - sys.exit(1) + def proc_func_expected_to_exit_with_20(): + if multi_worker_test_base.get_task_type() == 'worker': + time.sleep(10000) + sys.exit(20) + + mpr = multi_process_runner.MultiProcessRunner( + proc_func_expected_to_exit_with_20, + multi_worker_test_base.create_cluster_spec( + has_chief=True, num_workers=1)) + mpr.start() + + with self.assertRaisesRegex( + multi_process_runner.UnexpectedSubprocessExitError, + 'Subprocess chief-0 exited with exit code 20'): + mpr.join() + + def test_exit_code_is_reported_by_subprocess(self): + + def proc_func_expected_to_exit_with_10(): + sys.exit(10) + + mpr = multi_process_runner.MultiProcessRunner( + proc_func_expected_to_exit_with_10, + multi_worker_test_base.create_cluster_spec(num_workers=1)) + mpr.start() + + with self.assertRaisesRegex( + multi_process_runner.UnexpectedSubprocessExitError, + 'Subprocess worker-0 exited with exit code 10'): + mpr.join() - with self.assertRaises( - multi_process_runner.UnexpectedSubprocessExitError) as cm: - multi_process_runner.run( - proc_func_expected_to_exit_with_1, - multi_worker_test_base.create_cluster_spec(num_workers=1)) - self.assertIn('Missing status(es) from 1 subprocess(es).', - str(cm.exception)) - class MultiProcessPoolRunnerTest(test.TestCase): def test_same_process_across_runs(self):