TF Internal API: tf_export a few distribute-related symbols:
tf.__internal__.distribute.multi_process_runner.get_barrier tf.__internal__.distribute.multi_process_runner.create_cluster_spec tf.__internal__.distribute.multi_process_runner.run tf.__internal__.distribute.multi_process_runner.test_main tf.__internal__.distribute.multi_process_runner.NotInitializedError tf.__internal__.distribute.multi_process_runner.SubprocessTimeoutError tf.__internal__.distribute.multi_process_runner.UnexpectedSubprocessExitError Also build multi_process_runner as part of py/tensorflow. PiperOrigin-RevId: 335951713 Change-Id: Icc4bc95140ff35ede4214752cb618644c2073176
This commit is contained in:
parent
24d00f249d
commit
4c276ba9a2
@ -263,6 +263,8 @@ py_library(
|
||||
],
|
||||
deps = [
|
||||
":no_contrib",
|
||||
"//tensorflow/python/distribute:multi_process_runner",
|
||||
"//tensorflow/python/distribute:multi_worker_test_base",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -142,6 +142,8 @@ py_library(
|
||||
":cross_device_ops",
|
||||
":distribute_lib",
|
||||
":mirrored_strategy",
|
||||
":multi_process_runner",
|
||||
":multi_worker_test_base",
|
||||
":one_device_strategy",
|
||||
":sharded_variable",
|
||||
"//tensorflow/python/distribute/client",
|
||||
|
@ -39,6 +39,7 @@ from tensorflow.python import tf2
|
||||
from tensorflow.python.compat import v2_compat
|
||||
from tensorflow.python.distribute import multi_process_lib
|
||||
from tensorflow.python.eager import context
|
||||
from tensorflow.python.util.tf_export import tf_export
|
||||
|
||||
multiprocessing = multi_process_lib.multiprocessing
|
||||
|
||||
@ -136,36 +137,38 @@ class MultiProcessRunner(object):
|
||||
auto_restart=False,
|
||||
args=None,
|
||||
kwargs=None):
|
||||
"""Creates a multi-process runner.
|
||||
"""Instantiation of a `MultiProcessRunner`.
|
||||
|
||||
Args:
|
||||
fn: Function to be run on child processes. This will be run on
|
||||
processes for all task types.
|
||||
cluster_spec: Dict for cluster spec. The following is an example of
|
||||
cluster with three workers and two ps's.
|
||||
fn: Function to be run on child processes. This will be run on processes
|
||||
for all task types.
|
||||
cluster_spec: Dict for cluster spec. The utility function
|
||||
`tf.__internal__.distribute.multi_process_runner.create_cluster_spec`
|
||||
can be conveniently used to create such dict. The following is an
|
||||
example of cluster with three workers and two ps's.
|
||||
{"worker": ["worker0.example.com:2222",
|
||||
"worker1.example.com:2222",
|
||||
"worker2.example.com:2222"],
|
||||
"ps": ["ps0.example.com:2222",
|
||||
"ps1.example.com:2222"]}
|
||||
rpc_layer: RPC layer to use. Default value is 'grpc'.
|
||||
max_run_time: If set, child processes is forced to exit at approximately
|
||||
this many seconds after `start` is called. We achieve this through
|
||||
`signal.alarm()` api. Note that this is best effort at Python level
|
||||
since Python signal handler does not get executed when it runs lower
|
||||
level C/C++ code. So it can be delayed for arbitrarily long time.
|
||||
If any of the child process is still running when `max_run_time` is up,
|
||||
they will be force-terminated and a `UnexpectedSubprocessExitError`
|
||||
may be raised at `join()`.
|
||||
max_run_time: `None` or integer. If not `None`, child processes are forced
|
||||
to exit at approximately this many seconds after this utility is called.
|
||||
We achieve this through `signal.alarm()` api. Note that this is best
|
||||
effort at Python level since Python signal handler does not get executed
|
||||
when it runs lower level C/C++ code. So it can be delayed for
|
||||
arbitrarily long time. If any of the child process is still running when
|
||||
`max_run_time` is up, they will be force-terminated and an
|
||||
`UnexpectedSubprocessExitError` may be raised. If `None`, child
|
||||
processes are not forced to exit.
|
||||
grpc_fail_fast: Whether GRPC connection between processes should fail
|
||||
without retrying. Defaults to None, in which case the environment
|
||||
variable is not explicitly set.
|
||||
stream_output: True if the output/error from the subprocesses should be
|
||||
streamed to be printed in parent process' log. Defaults to True.
|
||||
return_output: True if the output/error from the subprocesses should be
|
||||
collected to be attached to the resulting `MultiProcessRunnerResult`
|
||||
returned from `MultiProcessRunner.join()`. If True, the list of stdout
|
||||
can be retrieved via `MultiProcessRunnerResult.stdout` attribute.
|
||||
return_output: If True, the output/error from the subprocesses should be
|
||||
collected to be attached to the resulting namedtuple returned from
|
||||
`join()`. The list of output can be retrieved via `stdout` attribute.
|
||||
Defaults to False.
|
||||
use_dill_for_args: Whether to use dill to pickle `args` and `kwargs`. dill
|
||||
can pickle more objects, but doesn't work with types in
|
||||
@ -176,23 +179,24 @@ class MultiProcessRunner(object):
|
||||
exits with a zero exit code.
|
||||
auto_restart: Whether to automatically restart processes that exit with
|
||||
non-zero exit code.
|
||||
args: Positional arguments to be sent to functions run on processes.
|
||||
kwargs: Keyword arguments to be sent to functions run on processes.
|
||||
args: Positional arguments to be sent to `fn` run on subprocesses.
|
||||
kwargs: Keyword arguments to be sent to `fn` run on subprocesses.
|
||||
|
||||
Raises:
|
||||
RuntimeError: if `multi_process_runner.test_main()` is not called.
|
||||
ValueError: if there are more than one chief in the `cluster_spec`.
|
||||
"""
|
||||
|
||||
assert cluster_spec is not None
|
||||
if 'chief' in cluster_spec and len(cluster_spec['chief']) > 1:
|
||||
raise ValueError('If chief exists in the cluster, there must be at most '
|
||||
'one chief. Current `cluster_spec` has {} chiefs.'
|
||||
.format(len(cluster_spec['chief'])))
|
||||
if not multi_process_lib.initialized():
|
||||
raise MultiProcessRunnerNotInitializedError(
|
||||
raise NotInitializedError(
|
||||
'`multi_process_runner` is not initialized. '
|
||||
'Please call `multi_process_runner.test_main()` '
|
||||
'within `if __name__ == \'__main__\':` block '
|
||||
'Please call `tf.__internal__.distribute.multi_process_runner.'
|
||||
'test_main()` within `if __name__ == \'__main__\':` block '
|
||||
'in your python module to properly initialize '
|
||||
'`multi_process_runner`.')
|
||||
if not callable(fn):
|
||||
@ -569,11 +573,11 @@ class MultiProcessRunner(object):
|
||||
times out.
|
||||
|
||||
Returns:
|
||||
A MultiProcessRunnerResult object, which has two attributes,
|
||||
`return_value` and `stdout`. `return_value` always contains the return
|
||||
values from the subprocesses. If `return_output` argument is True at
|
||||
`__init__`, `stdout` is available that contains a list of all messages
|
||||
from subprocesses' stdout and stderr.
|
||||
A `MultiProcessRunnerResult` object, which has two attributes,
|
||||
`return_value` and `stdout`. `return_value` always contains a list of
|
||||
return values from the subprocesses, although the order is not meaningful.
|
||||
If `return_output` argument is True at `__init__`, `stdout` is available
|
||||
that contains a list of all messages from subprocesses' stdout and stderr.
|
||||
|
||||
Raises:
|
||||
SubprocessTimeoutError: if not all processes report status approximately
|
||||
@ -1058,12 +1062,17 @@ def _run_contained(task_type, task_id, fn, args, kwargs):
|
||||
return_value=return_value)
|
||||
|
||||
|
||||
@tf_export('__internal__.distribute.multi_process_runner'
|
||||
'.SubprocessTimeoutError',
|
||||
v1=[])
|
||||
class SubprocessTimeoutError(RuntimeError):
|
||||
"""An error that indicates there is at least one subprocess timing out.
|
||||
|
||||
When this is raised, a `MultiProcessRunnerResult` object can be retrieved by
|
||||
`SubprocessTimeoutError`'s mpr_result attribute. See
|
||||
`MultiProcessRunner.join()` for more information.
|
||||
When this is raised, a namedtuple object representing the multi-process run
|
||||
result can be retrieved by
|
||||
`tf.__internal__.distribute.multi_process_runner.SubprocessTimeoutError`'s
|
||||
`mpr_result` attribute. See
|
||||
`tf.__internal__.distribute.multi_process_runner.run` for more information.
|
||||
"""
|
||||
|
||||
def __init__(self, msg, mpr_result):
|
||||
@ -1071,12 +1080,18 @@ class SubprocessTimeoutError(RuntimeError):
|
||||
self.mpr_result = mpr_result
|
||||
|
||||
|
||||
@tf_export('__internal__.distribute.multi_process_runner'
|
||||
'.UnexpectedSubprocessExitError',
|
||||
v1=[])
|
||||
class UnexpectedSubprocessExitError(RuntimeError):
|
||||
"""An error indicating there is at least one subprocess with unexpected exit.
|
||||
|
||||
When this is raised, a `MultiProcessRunnerResult` object can be retrieved by
|
||||
`UnexpectedSubprocessExitError`'s mpr_result attribute. See
|
||||
`MultiProcessRunner.join()` for more information.
|
||||
When this is raised, a namedtuple object representing the multi-process run
|
||||
result can be retrieved by
|
||||
`tf.__internal__.distribute.multi_process_runner
|
||||
.UnexpectedSubprocessExitError`'s
|
||||
`mpr_result` attribute. See
|
||||
`tf.__internal__.distribute.multi_process_runner.run` for more information.
|
||||
"""
|
||||
|
||||
def __init__(self, msg, mpr_result):
|
||||
@ -1084,12 +1099,15 @@ class UnexpectedSubprocessExitError(RuntimeError):
|
||||
self.mpr_result = mpr_result
|
||||
|
||||
|
||||
class MultiProcessRunnerNotInitializedError(RuntimeError):
|
||||
"""An error indicating `MultiProcessRunner` is used without initialization.
|
||||
@tf_export(
|
||||
'__internal__.distribute.multi_process_runner.NotInitializedError', v1=[])
|
||||
class NotInitializedError(RuntimeError):
|
||||
"""An error indicating `multi_process_runner.run` is used without init.
|
||||
|
||||
When this is raised, user is supposed to call
|
||||
`multi_process_runner.test_main()` within `if __name__ == '__main__':` block
|
||||
to properly initialize `multi_process_runner`.
|
||||
`tf.__internal__.distribute.multi_process_runner.test_main()` within
|
||||
`if __name__ == '__main__':` block to properly initialize
|
||||
`multi_process_runner.run`.
|
||||
"""
|
||||
pass
|
||||
|
||||
@ -1108,32 +1126,176 @@ def _set_tf_config(task_type, task_id, cluster_spec, rpc_layer=None):
|
||||
os.environ['TF_CONFIG'] = json.dumps(tf_config_dict)
|
||||
|
||||
|
||||
@tf_export('__internal__.distribute.multi_process_runner.run', v1=[])
|
||||
def run(fn,
|
||||
cluster_spec,
|
||||
rpc_layer=None,
|
||||
max_run_time=None,
|
||||
grpc_fail_fast=None,
|
||||
stream_output=True,
|
||||
return_output=False,
|
||||
timeout=_DEFAULT_TIMEOUT_SEC,
|
||||
args=None,
|
||||
kwargs=None): # pylint: disable=g-doc-args
|
||||
"""Runs functions in local child processes.
|
||||
kwargs=None):
|
||||
"""Run `fn` in multiple processes according to `cluster_spec`.
|
||||
|
||||
It is a convenience method that creates a `MultiProcessRunner` object and
|
||||
invokes `start` and `join` method. Please see these methods for detailed
|
||||
documentations.
|
||||
Given a callable `fn`, `tf.__internal__.distribute.multi_process_runner.run`
|
||||
launches multiple processes, each of which runs `fn`. These processes are
|
||||
referred to as "subprocesses" or "child processes". Each of those subprocesses
|
||||
will have their `TF_CONFIG` environment variable set, according to
|
||||
`cluster_spec` and their task types. The stdout of the subprocesses are
|
||||
streamed to the main process' and thus available in logs (if `stream_output`
|
||||
is True), with [type-id] prefix.
|
||||
|
||||
`tf.__internal__.distribute.multi_process_runner.run` will block until all
|
||||
subprocesses have successfully exited, and return a namedtuple object that
|
||||
represents the run result. This object has a `return_value` attribute, which
|
||||
is a list that contains subprocesses `fn`'s return values, for those
|
||||
subprocesses that successfully returned from `fn`. The order of `return_value`
|
||||
list is not meaningful. If an optional arg `return_output` (default to False)
|
||||
is set to True, the namedtuple object will have an additional attribute
|
||||
`stdout`, which is a list containing the stdout of the subprocesses. If any
|
||||
subprocess' `fn` ends up raising an error, that error will be reraised from
|
||||
`tf.__internal__.distribute.multi_process_runner.run`, and the aforementioned
|
||||
namedtuple object will be available through the exception's
|
||||
`mpr_result` attribute.
|
||||
|
||||
This utility is used for simulating running TensorFlow programs across
|
||||
multiple task types, and each of the task type may contain more than one task
|
||||
(except for "chief" where more than one task is prohibited). Test coverage of
|
||||
multi-worker training is the main application of this utility, where code
|
||||
written for multi-worker training can be realistically covered in unit tests.
|
||||
|
||||
Any test module that uses
|
||||
`tf.__internal__.distribute.multi_process_runner.run()` must call
|
||||
`tf.__internal__.distribute.multi_process_runner.test_main()` instead of
|
||||
regular `test.main()` inside `if __name__ == '__main__':` block for proper
|
||||
initialization.
|
||||
|
||||
Args:
|
||||
fn: Function to be run on child processes. This will be run on processes for
|
||||
all task types.
|
||||
cluster_spec: Dict for cluster spec. The utility function
|
||||
`tf.__internal__.distribute.multi_process_runner.create_cluster_spec` can
|
||||
be conveniently used to create such dict. The following is an example of
|
||||
cluster with three workers and two ps's.
|
||||
{"worker": ["worker0.example.com:2222",
|
||||
"worker1.example.com:2222",
|
||||
"worker2.example.com:2222"],
|
||||
"ps": ["ps0.example.com:2222",
|
||||
"ps1.example.com:2222"]}
|
||||
rpc_layer: RPC layer to use. Default value is 'grpc'.
|
||||
max_run_time: `None` or integer. If not `None`, child processes are forced
|
||||
to exit at approximately this many seconds after this utility is called.
|
||||
We achieve this through `signal.alarm()` api. Note that this is best
|
||||
effort at Python level since Python signal handler does not get executed
|
||||
when it runs lower level C/C++ code. So it can be delayed for arbitrarily
|
||||
long time. If any of the child process is still running when
|
||||
`max_run_time` is up, they will be force-terminated and an
|
||||
`tf.__internal__.distribute.multi_process_runner
|
||||
.UnexpectedSubprocessExitError`
|
||||
may be raised. If `None`, child processes are not forced to exit.
|
||||
return_output: If True, the output/error from the subprocesses should be
|
||||
collected to be attached to the resulting namedtuple returned from this
|
||||
utility. The list of output can be retrieved via `stdout` attribute.
|
||||
Defaults to False.
|
||||
timeout: optional integer or `None`. If provided as an integer, and not all
|
||||
processes report status within roughly `timeout` seconds, a
|
||||
`tf.__internal__.distribute.multi_process_runner.SubprocessTimeoutError`
|
||||
exception will be raised. If `None`,
|
||||
`tf.__internal__.distribute.multi_process_runner.run` never times out.
|
||||
Defaults to the constant `_DEFAULT_TIMEOUT_SEC` defined in
|
||||
`multi_process_runner` module.
|
||||
args: Positional arguments to be sent to `fn` run on subprocesses.
|
||||
kwargs: Keyword arguments to be sent to `fn` run on subprocesses.
|
||||
|
||||
Returns:
|
||||
A MultiProcessRunnerResult object returned from `MultiProcessRunner.join()`.
|
||||
A namedtuple object, which has two attributes,
|
||||
`return_value` and `stdout`. `return_value` always contains a list of
|
||||
returnvalues from the subprocesses, although the order is not meaningful.
|
||||
If `return_output` argument is True, `stdout` is available that contains a
|
||||
list of all messages from subprocesses' stdout and stderr, and the order
|
||||
is mostly chronological.
|
||||
|
||||
Raises:
|
||||
RuntimeError: if
|
||||
`tf.__internal__.distribute.multi_process_runner.test_main()` is
|
||||
not called in test's `if __name__ == '__main__':` block.
|
||||
ValueError: if there are more than one chief in the `cluster_spec`.
|
||||
tf.__internal__.distribute.multi_process_runner.SubprocessTimeoutError: if
|
||||
not all processes report status approximately
|
||||
within `timeout` seconds. When this is raised, a
|
||||
namedtuple object can be retrieved by
|
||||
`tf.__internal__.distribute.multi_process_runner.SubprocessTimeoutError`'s
|
||||
`mpr_result` attribute, which has the same
|
||||
structure as above 'Returns' section describes.
|
||||
tf.__internal__.distribute.multi_process_runner
|
||||
.UnexpectedSubprocessExitError:
|
||||
If any of the subprocesses did not exit
|
||||
properly (for example, they exit on SIGTERM or SIGKILL signal). When
|
||||
this is raised, a namedtuple object can be retrieved by
|
||||
`tf.__internal__.distribute.multi_process_runner
|
||||
.UnexpectedSubprocessExitError`'s
|
||||
`mpr_result` attribute, which has the
|
||||
same structure as above 'Returns' section describes. If `max_run_time`
|
||||
is not `None`, it is expected that some subprocesses may be
|
||||
force-killed when `max_run_time` is up, and this is raised in those
|
||||
cases.
|
||||
Exception: if there is an Exception propagated from any subprocess. When
|
||||
this is raised, a namedtuple object can be retrieved by
|
||||
`tf.__internal__.distribute.multi_process_runner
|
||||
.UnexpectedSubprocessExitError`
|
||||
`mpr_result` attribute, which has the
|
||||
same structure as above 'Returns' section describes.
|
||||
|
||||
Examples:
|
||||
|
||||
```python
|
||||
class SimpleMultiProcessTest(tf.test.TestCase):
|
||||
|
||||
def test_simple_printing_and_return(self):
|
||||
|
||||
def fn():
|
||||
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
|
||||
|
||||
# This will print "[chief-0]: Task type: chief , task id: 0"
|
||||
# for chief, for example.
|
||||
logging.info('Task type: %s, task id: %d',
|
||||
resolver.task_type, resolver.task_id)
|
||||
|
||||
return resolver.task_type
|
||||
|
||||
result = tf.__internal__.distribute.multi_process_runner.run(
|
||||
fn=fn,
|
||||
cluster_spec=(
|
||||
tf.__internal__
|
||||
.distribute.multi_process_runner.create_cluster_spec(
|
||||
has_chief=True, num_workers=2)))
|
||||
assert sorted(result.return_value) == ['chief', 'worker', 'worker']
|
||||
|
||||
def test_error_from_fn(self):
|
||||
|
||||
def fn():
|
||||
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
|
||||
raise ValueError('Task type {}, task id {} is errors out'.format(
|
||||
resolver.task_type, resolver.task_id))
|
||||
|
||||
with self.assertRaisesRegexp(ValueError,
|
||||
'Task type worker, task id 0 is errors out'):
|
||||
cluster_spec = (
|
||||
tf.__internal__.distribute.multi_process_runner.create_cluster_spec(
|
||||
num_workers=1))
|
||||
tf.__internal__.distribute.multi_process_runner.run(
|
||||
fn=fn, cluster_spec=cluster_spec)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
tf.__internal__.distribute.multi_process_runner.test_main()
|
||||
```
|
||||
"""
|
||||
runner = MultiProcessRunner(
|
||||
fn,
|
||||
cluster_spec,
|
||||
rpc_layer,
|
||||
max_run_time=max_run_time,
|
||||
grpc_fail_fast=grpc_fail_fast,
|
||||
stream_output=stream_output,
|
||||
return_output=return_output,
|
||||
args=args,
|
||||
kwargs=kwargs)
|
||||
@ -1145,7 +1307,44 @@ def run(fn,
|
||||
_barrier = None
|
||||
|
||||
|
||||
@tf_export('__internal__.distribute.multi_process_runner.get_barrier', v1=[])
|
||||
def get_barrier():
|
||||
"""Returns a `multiprocessing.Barrier` for `multi_process_runner.run`.
|
||||
|
||||
`tf.__internal__.distribute.multi_process_runner.get_barrier()` returns
|
||||
a `multiprocessing.Barrier` object which can be used within `fn` of
|
||||
`tf.__internal__.distribute.multi_process_runner` to wait with
|
||||
`barrier.wait()` call until all other tasks have also reached the
|
||||
`barrier.wait()` call, before they can proceed individually.
|
||||
|
||||
Note that all tasks (subprocesses) have to reach `barrier.wait()` call to
|
||||
proceed. Currently it is not supported to block on only a subset of tasks
|
||||
in the cluster.
|
||||
|
||||
Example:
|
||||
```python
|
||||
|
||||
def fn():
|
||||
some_work_to_be_done_by_all_tasks()
|
||||
|
||||
tf.__internal__.distribute.multi_process_runner.get_barrier().wait()
|
||||
|
||||
# The barrier guarantees that at this point, all tasks have finished
|
||||
# `some_work_to_be_done_by_all_tasks()`
|
||||
some_other_work_to_be_done_by_all_tasks()
|
||||
|
||||
result = tf.__internal__.distribute.multi_process_runner.run(
|
||||
fn=fn,
|
||||
cluster_spec=(
|
||||
tf.__internal__
|
||||
.distribute.multi_process_runner.create_cluster_spec(
|
||||
num_workers=2)))
|
||||
```
|
||||
|
||||
|
||||
Returns:
|
||||
A `multiprocessing.Barrier` for `multi_process_runner.run`.
|
||||
"""
|
||||
if _barrier is None:
|
||||
raise ValueError(
|
||||
'barrier is not defined. It is likely because you are calling '
|
||||
@ -1190,6 +1389,27 @@ def manager():
|
||||
return _manager
|
||||
|
||||
|
||||
@tf_export('__internal__.distribute.multi_process_runner.test_main', v1=[])
|
||||
def test_main():
|
||||
"""Main function to be called within `__main__` of a test file."""
|
||||
"""Main function to be called within `__main__` of a test file.
|
||||
|
||||
Any test module that uses
|
||||
`tf.__internal__.distribute.multi_process_runner.run()`
|
||||
must call this instead of regular `test.main()` inside
|
||||
`if __name__ == '__main__':` block, or an error will be raised when
|
||||
`tf.__internal__.distribute.multi_process_runner.run()` is used. This method
|
||||
takes
|
||||
care of needed initialization for launching multiple subprocesses.
|
||||
|
||||
Example:
|
||||
```python
|
||||
class MyTestClass(tf.test.TestCase):
|
||||
def testSomething(self):
|
||||
# Testing code making use of
|
||||
# `tf.__internal__.distribute.multi_process_runner.run()`.
|
||||
|
||||
if __name__ == '__main__':
|
||||
tf.__internal__.distribute.multi_process_runner.test_main()
|
||||
```
|
||||
"""
|
||||
multi_process_lib.test_main()
|
||||
|
@ -30,9 +30,8 @@ class MultiProcessRunnerNoInitTest(test.TestCase):
|
||||
def simple_func():
|
||||
return 'foobar'
|
||||
|
||||
with self.assertRaisesRegex(
|
||||
multi_process_runner.MultiProcessRunnerNotInitializedError,
|
||||
'`multi_process_runner` is not initialized.'):
|
||||
with self.assertRaisesRegex(multi_process_runner.NotInitializedError,
|
||||
'`multi_process_runner` is not initialized.'):
|
||||
multi_process_runner.run(
|
||||
simple_func,
|
||||
multi_worker_test_base.create_cluster_spec(num_workers=1))
|
||||
|
@ -81,15 +81,15 @@ class MultiProcessRunnerTest(test.TestCase):
|
||||
mpr_result = multi_process_runner.run(
|
||||
fn_that_adds_task_type_in_return_data,
|
||||
multi_worker_test_base.create_cluster_spec(
|
||||
num_workers=2, num_ps=3, has_eval=1))
|
||||
num_workers=2, num_ps=3, has_chief=True))
|
||||
|
||||
job_count_dict = {'worker': 2, 'ps': 3, 'evaluator': 1}
|
||||
job_count_dict = {'worker': 2, 'ps': 3, 'chief': 1}
|
||||
for data in mpr_result.return_value:
|
||||
job_count_dict[data] -= 1
|
||||
|
||||
self.assertEqual(job_count_dict['worker'], 0)
|
||||
self.assertEqual(job_count_dict['ps'], 0)
|
||||
self.assertEqual(job_count_dict['evaluator'], 0)
|
||||
self.assertEqual(job_count_dict['chief'], 0)
|
||||
|
||||
def test_multi_process_runner_error_propagates_from_subprocesses(self):
|
||||
runner = multi_process_runner.MultiProcessRunner(
|
||||
@ -210,7 +210,7 @@ class MultiProcessRunnerTest(test.TestCase):
|
||||
mpr = multi_process_runner.MultiProcessRunner(
|
||||
fn,
|
||||
multi_worker_test_base.create_cluster_spec(
|
||||
has_chief=True, num_workers=2, num_ps=2, has_eval=True),
|
||||
has_chief=True, num_workers=2, num_ps=2),
|
||||
return_output=True)
|
||||
mpr._dependence_on_chief = False
|
||||
|
||||
@ -221,7 +221,7 @@ class MultiProcessRunnerTest(test.TestCase):
|
||||
|
||||
list_to_assert = mpr_result.stdout
|
||||
|
||||
for job in ['chief', 'evaluator']:
|
||||
for job in ['chief']:
|
||||
for iteration in range(5):
|
||||
self.assertTrue(
|
||||
any('(logging) {}-0, i: {}'.format(job, iteration) in line
|
||||
|
@ -32,7 +32,7 @@ import six
|
||||
_portpicker_import_error = None
|
||||
try:
|
||||
import portpicker # pylint: disable=g-import-not-at-top
|
||||
except ImportError as _error: # pylint: disable=invalid-name
|
||||
except (ImportError, ModuleNotFoundError) as _error: # pylint: disable=invalid-name
|
||||
_portpicker_import_error = _error
|
||||
portpicker = None
|
||||
|
||||
@ -56,6 +56,7 @@ from tensorflow.python.training import server_lib
|
||||
from tensorflow.python.util import deprecation
|
||||
from tensorflow.python.util import nest
|
||||
from tensorflow.python.util.compat import collections_abc
|
||||
from tensorflow.python.util.tf_export import tf_export
|
||||
|
||||
|
||||
original_run_std_server = dc._run_std_server # pylint: disable=protected-access
|
||||
@ -353,16 +354,53 @@ def create_multi_process_cluster(num_workers,
|
||||
return cluster
|
||||
|
||||
|
||||
# TODO(rchao): Remove `test_obj` once estimator repo picks up the updated
|
||||
# nightly TF.
|
||||
@tf_export(
|
||||
'__internal__.distribute.multi_process_runner.create_cluster_spec', v1=[])
|
||||
def create_cluster_spec(has_chief=False,
|
||||
num_workers=1,
|
||||
num_ps=0,
|
||||
has_eval=False,
|
||||
test_obj=None):
|
||||
"""Create a cluster spec with tasks with unused local ports."""
|
||||
del test_obj
|
||||
has_eval=False):
|
||||
"""Create a cluster spec with tasks with unused local ports.
|
||||
|
||||
This utility finds available ports at localhost, and returns a dict that
|
||||
represents the cluster spec that utilizes those ports, according to the
|
||||
arguments. The dict representing the cluster spec contains task types, and
|
||||
their instances' addresses. Note that this is usually only for testing purpose
|
||||
using multiple processes in the local machine, and should not be used for real
|
||||
multi-worker TensorFlow programs, where the addresses need to point to the
|
||||
processes at separate machines.
|
||||
|
||||
This util is useful when creating the `cluster_spec` arg for
|
||||
`tf.__internal__.distribute.multi_process_runner.run`.
|
||||
|
||||
Arguments:
|
||||
has_chief: Whether the generated cluster spec should contain "chief" task
|
||||
type.
|
||||
num_workers: Number of workers to use in the cluster spec.
|
||||
num_ps: Number of parameter servers to use in the cluster spec.
|
||||
has_eval: Whether this cluster spec has evaluator.
|
||||
|
||||
Returns:
|
||||
A dict that represents the cluster spec using localhost ports for the tasks.
|
||||
|
||||
Example:
|
||||
|
||||
```python
|
||||
cluster_spec =
|
||||
tf.__internal__.distribute.multi_process_runner.create_cluster_spec(
|
||||
has_chief=True, num_workers=2, num_ps=2)
|
||||
# An example of cluster_spec is
|
||||
# {'chief': ['localhost:23381'],
|
||||
# 'worker': ['localhost:19197', 'localhost:22903'],
|
||||
# 'ps': ['localhost:16912', 'localhost:21535']}
|
||||
|
||||
cluster_spec =
|
||||
tf.__internal__.distribute.multi_process_runner.create_cluster_spec(
|
||||
has_chief=False, num_workers=0, num_ps=0, has_eval=True)
|
||||
# An example of cluster_spec is
|
||||
# {'evaluator': ['localhost:23381']}
|
||||
```
|
||||
"""
|
||||
if _portpicker_import_error:
|
||||
raise _portpicker_import_error # pylint: disable=raising-bad-type
|
||||
|
||||
|
@ -55,6 +55,8 @@ from tensorflow.python.util.tf_export import tf_export
|
||||
|
||||
# _internal APIs
|
||||
from tensorflow.python.distribute.combinations import generate
|
||||
from tensorflow.python.distribute.multi_process_runner import *
|
||||
from tensorflow.python.distribute.multi_worker_test_base import *
|
||||
from tensorflow.python.framework.combinations import *
|
||||
from tensorflow.python.framework.composite_tensor import *
|
||||
from tensorflow.python.framework.test_combinations import *
|
||||
|
@ -88,6 +88,8 @@ py_test(
|
||||
"//tensorflow/python:modules_with_exports",
|
||||
"//tensorflow/python:no_contrib",
|
||||
"//tensorflow/python/distribute:combinations",
|
||||
"//tensorflow/python/distribute:multi_process_runner",
|
||||
"//tensorflow/python/distribute:multi_worker_test_base",
|
||||
"//tensorflow/python/tools/api/generator:create_python_api",
|
||||
],
|
||||
)
|
||||
|
@ -8,6 +8,7 @@ TENSORFLOW_API_INIT_FILES = [
|
||||
"__internal__/decorator/__init__.py",
|
||||
"__internal__/distribute/__init__.py",
|
||||
"__internal__/distribute/combinations/__init__.py",
|
||||
"__internal__/distribute/multi_process_runner/__init__.py",
|
||||
"__internal__/test/__init__.py",
|
||||
"__internal__/test/combinations/__init__.py",
|
||||
"__internal__/tracking/__init__.py",
|
||||
|
@ -25,6 +25,8 @@ import sys
|
||||
from tensorflow import python as _tf_for_api_traversal
|
||||
from tensorflow.lite.python import lite as _tflite_for_api_traversal
|
||||
from tensorflow.python import modules_with_exports
|
||||
from tensorflow.python.distribute import multi_process_runner
|
||||
from tensorflow.python.distribute import multi_worker_test_base
|
||||
from tensorflow.python.framework import combinations
|
||||
from tensorflow.python.framework import test_combinations
|
||||
# pylint: enable=unused-import
|
||||
|
@ -0,0 +1,12 @@
|
||||
path: "tensorflow.__internal__.distribute.multi_process_runner.NotInitializedError"
|
||||
tf_class {
|
||||
is_instance: "<class \'tensorflow.python.distribute.multi_process_runner.NotInitializedError\'>"
|
||||
is_instance: "<type \'exceptions.RuntimeError\'>"
|
||||
member {
|
||||
name: "args"
|
||||
mtype: "<type \'getset_descriptor\'>"
|
||||
}
|
||||
member_method {
|
||||
name: "__init__"
|
||||
}
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
path: "tensorflow.__internal__.distribute.multi_process_runner.SubprocessTimeoutError"
|
||||
tf_class {
|
||||
is_instance: "<class \'tensorflow.python.distribute.multi_process_runner.SubprocessTimeoutError\'>"
|
||||
is_instance: "<type \'exceptions.RuntimeError\'>"
|
||||
member {
|
||||
name: "args"
|
||||
mtype: "<type \'getset_descriptor\'>"
|
||||
}
|
||||
member_method {
|
||||
name: "__init__"
|
||||
argspec: "args=[\'self\', \'msg\', \'mpr_result\'], varargs=None, keywords=None, defaults=None"
|
||||
}
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
path: "tensorflow.__internal__.distribute.multi_process_runner.UnexpectedSubprocessExitError"
|
||||
tf_class {
|
||||
is_instance: "<class \'tensorflow.python.distribute.multi_process_runner.UnexpectedSubprocessExitError\'>"
|
||||
is_instance: "<type \'exceptions.RuntimeError\'>"
|
||||
member {
|
||||
name: "args"
|
||||
mtype: "<type \'getset_descriptor\'>"
|
||||
}
|
||||
member_method {
|
||||
name: "__init__"
|
||||
argspec: "args=[\'self\', \'msg\', \'mpr_result\'], varargs=None, keywords=None, defaults=None"
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
path: "tensorflow.__internal__.distribute.multi_process_runner"
|
||||
tf_module {
|
||||
member {
|
||||
name: "NotInitializedError"
|
||||
mtype: "<type \'type\'>"
|
||||
}
|
||||
member {
|
||||
name: "SubprocessTimeoutError"
|
||||
mtype: "<type \'type\'>"
|
||||
}
|
||||
member {
|
||||
name: "UnexpectedSubprocessExitError"
|
||||
mtype: "<type \'type\'>"
|
||||
}
|
||||
member_method {
|
||||
name: "create_cluster_spec"
|
||||
argspec: "args=[\'has_chief\', \'num_workers\', \'num_ps\', \'has_eval\'], varargs=None, keywords=None, defaults=[\'False\', \'1\', \'0\', \'False\'], "
|
||||
}
|
||||
member_method {
|
||||
name: "get_barrier"
|
||||
argspec: "args=[], varargs=None, keywords=None, defaults=None"
|
||||
}
|
||||
member_method {
|
||||
name: "run"
|
||||
argspec: "args=[\'fn\', \'cluster_spec\', \'rpc_layer\', \'max_run_time\', \'return_output\', \'timeout\', \'args\', \'kwargs\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'False\', \'200\', \'None\', \'None\'], "
|
||||
}
|
||||
member_method {
|
||||
name: "test_main"
|
||||
argspec: "args=[], varargs=None, keywords=None, defaults=None"
|
||||
}
|
||||
}
|
@ -4,4 +4,8 @@ tf_module {
|
||||
name: "combinations"
|
||||
mtype: "<type \'module\'>"
|
||||
}
|
||||
member {
|
||||
name: "multi_process_runner"
|
||||
mtype: "<type \'module\'>"
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user