From 17fb3fcf7801df02c5d7696fb08e1dd55c9d0fe1 Mon Sep 17 00:00:00 2001 From: Rick Chao Date: Mon, 22 Jun 2020 14:21:25 -0700 Subject: [PATCH] MultiProcessRunner: Add UnexpectedSubprocessExitError to be raised if the exit code from a subprocess in unexpected. This results in subprocess having segfault failing the test, which would not have before this change. PiperOrigin-RevId: 317734369 Change-Id: I008f9bd4dd5b7a68e8e4a7f17b7e5d490a30c09a --- .../python/distribute/multi_process_runner.py | 38 +++++++++++++++++-- .../distribute/multi_process_runner_test.py | 35 +++++++++++++++++ 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/tensorflow/python/distribute/multi_process_runner.py b/tensorflow/python/distribute/multi_process_runner.py index 8699e59b410..af527b67b4b 100644 --- a/tensorflow/python/distribute/multi_process_runner.py +++ b/tensorflow/python/distribute/multi_process_runner.py @@ -463,14 +463,28 @@ class MultiProcessRunner(object): process_statuses = self._queue_to_list(self._process_status_queue) if not self._all_forced_terminated and len( process_statuses) != self._outstanding_subprocess_count: - raise RuntimeError( - 'missing statuses from %d subproceses.' % - (self._outstanding_subprocess_count - len(process_statuses))) + raise UnexpectedSubprocessExitError( + 'Missing status(es) from %d subprocess(es). See logs for details.' % + (self._outstanding_subprocess_count - len(process_statuses)), + self._get_mpr_result(process_statuses)) for process_status in process_statuses: assert isinstance(process_status, _ProcessStatusInfo) if not process_status.is_successful: six.reraise(*process_status.exc_info) + # Checking all the processes that are expected to exit properly. + for (task_type, task_id), p in self._processes.items(): + if self._dependence_on_chief and task_type != 'chief': + # If _dependence_on_chief, other processes may have been + # forced-terminated, which is expected. + continue + # Successfully exiting process has exit code 0. + if p.exitcode > 0: + raise UnexpectedSubprocessExitError( + 'Subprocess %s-%d exited with exit code %d. See logs for details.' % + (task_type, task_id, p.exitcode), + self._get_mpr_result(process_statuses)) + logging.info('Joining log reading threads.') for thread in self._reading_threads: thread.join() @@ -506,6 +520,8 @@ class MultiProcessRunner(object): for (task_type, task_id), p in self._processes.items(): try: os.kill(p.pid, sig) + logging.info('%s-%d terminated with signal %r.', task_type, task_id, + sig) except ProcessLookupError: logging.info('Attempting to kill %s-%d but it does not exist.', task_type, task_id) @@ -658,6 +674,9 @@ class _ProcFunc(object): self._resources.process_status_queue.put(info) self._close_streaming() + # Exit with code 0 as it's considered successful exit at this point. + sys.exit(0) + class SubprocessTimeoutError(RuntimeError): """An error that indicates there is at least one subprocess timing out. @@ -672,6 +691,19 @@ class SubprocessTimeoutError(RuntimeError): self.mpr_result = mpr_result +class UnexpectedSubprocessExitError(RuntimeError): + """An error indicating there is at least one subprocess with unexpected exit. + + When this is raised, a `MultiProcessRunnerResult` object can be retrieved by + `UnexpectedSubprocessExitError`'s mpr_result attribute. See + `MultiProcessRunner.join()` for more information. + """ + + def __init__(self, msg, mpr_result): + super(UnexpectedSubprocessExitError, self).__init__(msg) + self.mpr_result = mpr_result + + def _set_tf_config(task_type, task_id, cluster_spec, rpc_layer=None): """Set TF_CONFIG environment variable.""" tf_config_dict = { diff --git a/tensorflow/python/distribute/multi_process_runner_test.py b/tensorflow/python/distribute/multi_process_runner_test.py index aeba43b6b7c..6194ac527d5 100644 --- a/tensorflow/python/distribute/multi_process_runner_test.py +++ b/tensorflow/python/distribute/multi_process_runner_test.py @@ -18,6 +18,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import ctypes import json import os import threading @@ -324,6 +325,40 @@ class MultiProcessRunnerTest(test.TestCase): any('(logging) {}-0, i: {}'.format(job, iteration) in line for line in list_to_assert)) + def test_seg_fault_raises_error(self): + + def proc_func_expected_to_seg_fault(): + ctypes.string_at(0) # Intentionally made seg fault. + + with self.assertRaises( + multi_process_runner.UnexpectedSubprocessExitError) as cm: + multi_process_runner.run( + proc_func_expected_to_seg_fault, + multi_worker_test_base.create_cluster_spec(num_workers=1), + list_stdout=True) + self.assertIn('Missing status(es) from 1 subprocess(es).', + str(cm.exception)) + list_to_assert = cm.exception.mpr_result.stdout + self.assertTrue(any('SIGSEGV' in line for line in list_to_assert)) + + def test_seg_fault_in_chief_raises_error(self): + + def proc_func_expected_to_seg_fault(): + if multi_worker_test_base.get_task_type() == 'worker': + time.sleep(10000) + ctypes.string_at(0) # Intentionally made seg fault. + + with self.assertRaises( + multi_process_runner.UnexpectedSubprocessExitError) as cm: + multi_process_runner.run( + proc_func_expected_to_seg_fault, + multi_worker_test_base.create_cluster_spec( + has_chief=True, num_workers=1), + list_stdout=True) + self.assertIn('Subprocess chief-0 exited with exit code', + str(cm.exception)) + list_to_assert = cm.exception.mpr_result.stdout + self.assertTrue(any('SIGSEGV' in line for line in list_to_assert)) if __name__ == '__main__': multi_process_runner.test_main()