Add a module level multi_process_runner.manager()

It's more convienent since we often need to create objects then passing them to
create MultiProcessRunner.

PiperOrigin-RevId: 323948350
Change-Id: Iaa22f03e7564e5e53b4a58e119f2e81e05f21851
This commit is contained in:
Ran Chen 2020-07-29 23:58:39 -07:00 committed by TensorFlower Gardener
parent f636ca42bc
commit 55c87a582d

View File

@ -205,7 +205,7 @@ class MultiProcessRunner(object):
self._outstanding_subprocess_count = 0
self._reading_threads = []
self._manager = multiprocessing.Manager()
self._manager = manager()
self._process_status_queue = self._manager.Queue()
self._parent_to_sub_queue = self._manager.Queue()
parties = sum(len(addresses) for addresses in self._cluster_spec.values())
@ -568,29 +568,6 @@ class MultiProcessRunner(object):
task_type, task_id)
self._all_forced_terminated = True
def get_manager(self):
"""Returns the multiprocessing manager object for concurrency tools.
The manager object is useful as it controls a server process that holds
the python objects that can be shared across processes. This can be used
for parent-subprocess communication:
```python
mpr = multi_process_runner.MultiProcessRunner(...)
manager = mpr.get_manager()
some_event_happening_in_subprocess = manager.Event()
mpr.set_args(args=(some_event_happening_in_subprocess,))
mpr.start()
some_event_happening_in_subprocess.wait()
# Do something that only should after some event happens in subprocess.
```
Note that the user of multi_process_runner should not create additional
`multiprocessing.Manager()` objects; doing so can result in segfault in
some cases.
"""
return self._manager
class _Process(multi_process_lib.Process):
"""A modified `multiprocessing.Process` that can set up environment variables."""
@ -991,6 +968,41 @@ def barrier():
return _barrier
_manager = None
_manager_lock = threading.Lock()
def manager():
"""Returns the multiprocessing manager object for concurrency tools.
The manager object is useful as it controls a server process that holds
the python objects that can be shared across processes. This can be used
for parent-subprocess communication:
```python
manager = multi_process_runner.manager()
some_event_happening_in_subprocess = manager.Event()
mpr = multi_process_runner.MultiProcessRunner(proc_func, cluster_spec,
args=(some_event_happening_in_subprocess,))
mpr.start()
some_event_happening_in_subprocess.wait()
# Do something that only should after some event happens in subprocess.
```
Note that the user of multi_process_runner should not create additional
`multiprocessing.Manager()` objects; doing so can result in segfault in
some cases.
This method should only be called after multi_process_runner.test_main() is
called.
"""
global _manager
with _manager_lock:
if _manager is None:
_manager = multiprocessing.Manager()
return _manager
def test_main():
"""Main function to be called within `__main__` of a test file."""
multi_process_lib.test_main()