diff --git a/tensorflow/python/distribute/multi_process_runner.py b/tensorflow/python/distribute/multi_process_runner.py index 7f653c0e2de..e5be4fa4a14 100644 --- a/tensorflow/python/distribute/multi_process_runner.py +++ b/tensorflow/python/distribute/multi_process_runner.py @@ -205,7 +205,7 @@ class MultiProcessRunner(object): self._outstanding_subprocess_count = 0 self._reading_threads = [] - self._manager = multiprocessing.Manager() + self._manager = manager() self._process_status_queue = self._manager.Queue() self._parent_to_sub_queue = self._manager.Queue() parties = sum(len(addresses) for addresses in self._cluster_spec.values()) @@ -568,29 +568,6 @@ class MultiProcessRunner(object): task_type, task_id) self._all_forced_terminated = True - def get_manager(self): - """Returns the multiprocessing manager object for concurrency tools. - - The manager object is useful as it controls a server process that holds - the python objects that can be shared across processes. This can be used - for parent-subprocess communication: - - ```python - mpr = multi_process_runner.MultiProcessRunner(...) - manager = mpr.get_manager() - some_event_happening_in_subprocess = manager.Event() - mpr.set_args(args=(some_event_happening_in_subprocess,)) - mpr.start() - some_event_happening_in_subprocess.wait() - # Do something that only should after some event happens in subprocess. - ``` - - Note that the user of multi_process_runner should not create additional - `multiprocessing.Manager()` objects; doing so can result in segfault in - some cases. - """ - return self._manager - class _Process(multi_process_lib.Process): """A modified `multiprocessing.Process` that can set up environment variables.""" @@ -991,6 +968,41 @@ def barrier(): return _barrier +_manager = None +_manager_lock = threading.Lock() + + +def manager(): + """Returns the multiprocessing manager object for concurrency tools. + + The manager object is useful as it controls a server process that holds + the python objects that can be shared across processes. This can be used + for parent-subprocess communication: + + ```python + manager = multi_process_runner.manager() + some_event_happening_in_subprocess = manager.Event() + mpr = multi_process_runner.MultiProcessRunner(proc_func, cluster_spec, + args=(some_event_happening_in_subprocess,)) + mpr.start() + some_event_happening_in_subprocess.wait() + # Do something that only should after some event happens in subprocess. + ``` + + Note that the user of multi_process_runner should not create additional + `multiprocessing.Manager()` objects; doing so can result in segfault in + some cases. + + This method should only be called after multi_process_runner.test_main() is + called. + """ + global _manager + with _manager_lock: + if _manager is None: + _manager = multiprocessing.Manager() + return _manager + + def test_main(): """Main function to be called within `__main__` of a test file.""" multi_process_lib.test_main()