MultiProcessRunner: Update run_contained to not return _ProcessStatusInfo in finally block; if sys.exit() is called in a subprocess, a SystemExit is raised, in which case we do not expect a _ProcessStatusInfo to be returned.

Minor change of logic in join().

PiperOrigin-RevId: 318565799
Change-Id: I9c468bd801515cd53f08c06b28e97638a735cd8b
This commit is contained in:
Rick Chao 2020-06-26 16:37:33 -07:00 committed by TensorFlower Gardener
parent 61a6e22f5f
commit 355b4f2fc5
2 changed files with 51 additions and 26 deletions

View File

@ -487,12 +487,6 @@ class MultiProcessRunner(object):
logging.info('%s-%d exit code: %s', task_type, task_id, p.exitcode)
process_statuses = self._queue_to_list(self._process_status_queue)
if not self._all_forced_terminated and len(
process_statuses) != self._outstanding_subprocess_count:
raise UnexpectedSubprocessExitError(
'Missing status(es) from %d subprocess(es). See logs for details.' %
(self._outstanding_subprocess_count - len(process_statuses)),
self._get_mpr_result(process_statuses))
for process_status in process_statuses:
assert isinstance(process_status, _ProcessStatusInfo)
if not process_status.is_successful:
@ -500,12 +494,12 @@ class MultiProcessRunner(object):
# Checking all the processes that are expected to exit properly.
for (task_type, task_id), p in self._processes.items():
if self._dependence_on_chief and task_type != 'chief':
if self._dependence_on_chief and chief and task_type != 'chief':
# If _dependence_on_chief, other processes may have been
# forced-terminated, which is expected.
continue
# Successfully exiting process has exit code 0.
if p.exitcode > 0:
if p.exitcode is None or p.exitcode > 0:
raise UnexpectedSubprocessExitError(
'Subprocess %s-%d exited with exit code %d. See logs for details.' %
(task_type, task_id, p.exitcode),
@ -844,17 +838,24 @@ def _run_contained(proc_func, args, kwargs):
Returns:
a _ProcessStatusInfo.
"""
is_successful = False
return_value = None
exc_info = None
try:
return_value = proc_func(*args, **kwargs)
is_successful = True
exc_info = None
return _ProcessStatusInfo(
is_successful=is_successful,
exc_info=exc_info,
return_value=return_value)
# If `proc_func` ends up exiting with `sys.exit()`, the `SystemExit` is not
# handled here.
except Exception: # pylint: disable=broad-except
return_value = None
is_successful = False
exc_info = sys.exc_info()
finally:
return _ProcessStatusInfo( # pylint: disable=lost-exception
return _ProcessStatusInfo(
is_successful=is_successful,
exc_info=exc_info,
return_value=return_value)

View File

@ -156,7 +156,11 @@ class MultiProcessRunnerTest(test.TestCase):
mpr.start()
time.sleep(5)
mpr.terminate('worker', 0)
std_stream_results = mpr.join().stdout
with self.assertRaises(
multi_process_runner.UnexpectedSubprocessExitError) as cm:
mpr.join()
std_stream_results = cm.exception.mpr_result.stdout
# Worker 0 is terminated in the middle, so it should not have iteration 9
# printed.
@ -327,7 +331,7 @@ class MultiProcessRunnerTest(test.TestCase):
proc_func_expected_to_seg_fault,
multi_worker_test_base.create_cluster_spec(num_workers=1),
list_stdout=True)
self.assertIn('Missing status(es) from 1 subprocess(es).',
self.assertIn('Subprocess worker-0 exited with exit code',
str(cm.exception))
list_to_assert = cm.exception.mpr_result.stdout
self.assertTrue(any('SIGSEGV' in line for line in list_to_assert))
@ -351,20 +355,40 @@ class MultiProcessRunnerTest(test.TestCase):
list_to_assert = cm.exception.mpr_result.stdout
self.assertTrue(any('SIGSEGV' in line for line in list_to_assert))
def test_non_zero_exit_code_raises_error(self):
def test_exit_code_is_reported_by_chief_subprocess(self):
def proc_func_expected_to_exit_with_1():
sys.exit(1)
def proc_func_expected_to_exit_with_20():
if multi_worker_test_base.get_task_type() == 'worker':
time.sleep(10000)
sys.exit(20)
mpr = multi_process_runner.MultiProcessRunner(
proc_func_expected_to_exit_with_20,
multi_worker_test_base.create_cluster_spec(
has_chief=True, num_workers=1))
mpr.start()
with self.assertRaisesRegex(
multi_process_runner.UnexpectedSubprocessExitError,
'Subprocess chief-0 exited with exit code 20'):
mpr.join()
def test_exit_code_is_reported_by_subprocess(self):
def proc_func_expected_to_exit_with_10():
sys.exit(10)
mpr = multi_process_runner.MultiProcessRunner(
proc_func_expected_to_exit_with_10,
multi_worker_test_base.create_cluster_spec(num_workers=1))
mpr.start()
with self.assertRaisesRegex(
multi_process_runner.UnexpectedSubprocessExitError,
'Subprocess worker-0 exited with exit code 10'):
mpr.join()
with self.assertRaises(
multi_process_runner.UnexpectedSubprocessExitError) as cm:
multi_process_runner.run(
proc_func_expected_to_exit_with_1,
multi_worker_test_base.create_cluster_spec(num_workers=1))
self.assertIn('Missing status(es) from 1 subprocess(es).',
str(cm.exception))
class MultiProcessPoolRunnerTest(test.TestCase):
def test_same_process_across_runs(self):