MultiProcessRunner: Open source multi_process_runner with a OSS backend.

Some tests are timing out and being disabled on tap. Cause TBD.

PiperOrigin-RevId: 337376027
Change-Id: Ia0e58be434ce59469498db3a24c3bd32cc17c023
This commit is contained in:
Rick Chao 2020-10-15 13:54:10 -07:00 committed by TensorFlower Gardener
parent 63c0cfbb3c
commit e70e880bd5
8 changed files with 188 additions and 58 deletions

View File

@ -908,6 +908,9 @@ tf_py_test(
name = "multi_worker_test_base_test",
srcs = ["multi_worker_test_base_test.py"],
srcs_version = "PY2AND3",
tags = [
"no_oss", # TODO(b/170834611)
],
deps = [
":multi_worker_test_base",
],

View File

@ -33,6 +33,7 @@ cuda_py_test(
shard_count = 2,
tags = [
"multi_and_single_gpu",
"no_oss", # TODO(b/170838851): UnavailableError: Connection reset by peer
],
deps = [
"//tensorflow:tensorflow_py",

View File

@ -12,41 +12,144 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""OSS multi-process library to be implemented."""
"""Library for multi-process testing."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing as _multiprocessing
import multiprocessing
import os
import sys
import unittest
from absl import app
from tensorflow.python.eager import test
try:
multiprocessing = _multiprocessing.get_context('forkserver')
except ValueError:
# forkserver is not available on Windows.
multiprocessing = _multiprocessing.get_context('spawn')
class Process(object):
"""A process simulating a worker for testing multi-worker training."""
class _AbslProcess:
"""A process that runs using absl.app.run."""
def __init__(self, *args, **kwargs):
del args, kwargs
raise unittest.SkipTest(
'TODO(b/150264776): Implement OSS version of `multi_process_lib`')
super(_AbslProcess, self).__init__(*args, **kwargs)
# Monkey-patch that is carried over into the spawned process by pickle.
self._run_impl = getattr(self, 'run')
self.run = self._run_with_absl
def _run_with_absl(self):
app.run(lambda _: self._run_impl())
if sys.platform != 'win32':
class AbslForkServerProcess(_AbslProcess,
multiprocessing.context.ForkServerProcess):
"""An absl-compatible Forkserver process.
Note: Forkserver is not available in windows.
"""
class AbslForkServerContext(multiprocessing.context.ForkServerContext):
_name = 'absl_forkserver'
Process = AbslForkServerProcess # pylint: disable=invalid-name
multiprocessing = AbslForkServerContext()
Process = multiprocessing.Process
else:
class Process(object):
"""A process that skips test (until windows is supported)."""
def __init__(self, *args, **kwargs):
del args, kwargs
raise unittest.SkipTest(
'TODO(b/150264776): Windows is not supported in MultiProcessRunner.')
_test_main_called = False
def _set_spawn_exe_path():
"""Set the path to the executable for spawned processes.
This utility searches for the binary the parent process is using, and sets
the executable of multiprocessing's context accordingly.
Raises:
RuntimeError: If the binary path cannot be determined.
"""
# TODO(b/150264776): This does not work with Windows. Find a solution.
if sys.argv[0].endswith('.py'):
# If all we have is a python module path, we'll need to make a guess for
# the actual executable path. Since the binary path may correspond to the
# parent's path of the python module, we are making guesses by reducing
# directories one at a time. E.g.,
# tensorflow/python/some/path/my_test.py
# -> tensorflow/python/some/path/my_test
# -> tensorflow/python/some/my_test
# -> tensorflow/python/my_test
path_to_use = None
guess_path = sys.argv[0][:-3]
guess_path = guess_path.split(os.sep)
for path_reduction in range(-1, -len(guess_path), -1):
possible_path = os.sep.join(guess_path[:path_reduction] +
[guess_path[-1]])
if os.access(possible_path, os.X_OK):
path_to_use = possible_path
break
# The binary can possibly have _gpu suffix.
possible_path += '_gpu'
if os.access(possible_path, os.X_OK):
path_to_use = possible_path
break
if path_to_use is None:
raise RuntimeError('Cannot determine binary path')
sys.argv[0] = path_to_use
# Note that this sets the executable for *all* contexts.
multiprocessing.get_context().set_executable(sys.argv[0])
def _if_spawn_run_and_exit():
"""If spawned process, run requested spawn task and exit. Else a no-op."""
# `multiprocessing` module passes a script "from multiprocessing.x import y"
# to subprocess, followed by a main function call. We use this to tell if
# the process is spawned. Examples of x are "forkserver" or
# "semaphore_tracker".
is_spawned = ('-c' in sys.argv[1:] and
sys.argv[sys.argv.index('-c') +
1].startswith('from multiprocessing.'))
if not is_spawned:
return
cmd = sys.argv[sys.argv.index('-c') + 1]
# As a subprocess, we disregarding all other interpreter command line
# arguments.
sys.argv = sys.argv[0:1]
# Run the specified command - this is expected to be one of:
# 1. Spawn the process for semaphore tracker.
# 2. Spawn the initial process for forkserver.
# 3. Spawn any process as requested by the "spawn" method.
exec(cmd) # pylint: disable=exec-used
sys.exit(0) # Semaphore tracker doesn't explicitly sys.exit.
def test_main():
"""Main function to be called within `__main__` of a test file."""
global _test_main_called
_test_main_called = True
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
if sys.platform != 'win32':
_set_spawn_exe_path()
_if_spawn_run_and_exit()
# Only runs test.main() if not spawned process.
test.main()
def initialized():
"""Returns whether the module is initialized."""
return True
return _test_main_called

View File

@ -699,6 +699,8 @@ class MultiProcessRunner(object):
sig = sig or getattr(signal, 'SIGKILL', signal.SIGTERM)
for (task_type, task_id), p in self._processes.items():
if p.exitcode is not None:
logging.info('%s-%d has already exited. Not terminating.', task_type,
task_id)
continue
try:
os.kill(p.pid, sig)
@ -866,6 +868,11 @@ def _shutdown_all_pool_runners():
pool.shutdown()
def is_oss():
"""Returns whether the test is run under OSS."""
return len(sys.argv) >= 1 and 'bazel' in sys.argv[0]
class MultiProcessPoolRunner(object):
"""A utility class to start a process pool to simulate a cluster.
@ -919,6 +926,9 @@ class MultiProcessPoolRunner(object):
if dill is None:
raise unittest.SkipTest(
'TODO(b/150264776): Resolve dependency issue in CI')
if is_oss():
raise unittest.SkipTest(
'TODO(b/170360740): MultiProcessPoolRunner timing out in OSS')
self._runner = MultiProcessRunner(
fn=lambda: None,

View File

@ -331,7 +331,8 @@ class MultiProcessRunnerTest(test.TestCase):
self.assertIn('Subprocess worker-0 exited with exit code',
str(cm.exception))
list_to_assert = cm.exception.mpr_result.stdout
self.assertTrue(any('SIGSEGV' in line for line in list_to_assert))
self.assertTrue(
any('Segmentation fault' in line for line in list_to_assert))
def test_seg_fault_in_chief_raises_error(self):
@ -350,7 +351,8 @@ class MultiProcessRunnerTest(test.TestCase):
self.assertIn('Subprocess chief-0 exited with exit code',
str(cm.exception))
list_to_assert = cm.exception.mpr_result.stdout
self.assertTrue(any('SIGSEGV' in line for line in list_to_assert))
self.assertTrue(
any('Segmentation fault' in line for line in list_to_assert))
def test_exit_code_is_reported_by_chief_subprocess(self):
@ -579,9 +581,13 @@ class MultiProcessPoolRunnerTest(test.TestCase):
self.assertAllEqual(result, [1, 1])
def test_global_pool(self):
if multi_process_runner.is_oss():
self.skipTest('TODO(b/170360740): Failing in OSS')
_global_pool.run(fn_that_does_nothing)
def test_nested_pool(self):
if multi_process_runner.is_oss():
self.skipTest('TODO(b/170360740): Failing in OSS')
def fn():
# This runs in sub processes, so they are each using their own

View File

@ -309,51 +309,53 @@ class LocalCollectiveAllReduceStrategy(
with policy.policy_scope('mixed_float16'):
self._test_mixed_precision(None, None, required_gpus)
# TODO(b/170360740): Timeout in OSS
if not multi_process_runner.is_oss():
@ds_combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.multi_worker_mirrored_2x1_cpu,
strategy_combinations.multi_worker_mirrored_2x1_gpu,
],
mode=['eager']))
class DistributedCollectiveAllReduceStrategyEagerTest(test.TestCase,
parameterized.TestCase):
@ds_combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.multi_worker_mirrored_2x1_cpu,
strategy_combinations.multi_worker_mirrored_2x1_gpu,
],
mode=['eager']))
class DistributedCollectiveAllReduceStrategyEagerTest(test.TestCase,
parameterized.TestCase):
def testFitWithoutStepsPerEpochPartialBatch(self, strategy):
def testFitWithoutStepsPerEpochPartialBatch(self, strategy):
def _model_fn():
x = layers.Input(shape=(1,), name='input')
y = layers.Dense(1, name='dense')(x)
model = training.Model(x, y)
return model
def _model_fn():
x = layers.Input(shape=(1,), name='input')
y = layers.Dense(1, name='dense')(x)
model = training.Model(x, y)
return model
def _get_dataset():
inputs = array_ops.expand_dims_v2(constant_op.constant(range(10)), axis=1)
targets = array_ops.expand_dims_v2(
constant_op.constant(range(10)), axis=1)
# Make global batch size 12 for 2 replicas and a non-repeated dataset with
# 10 elements so that we have partial batch
dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(12, drop_remainder=False)
return dataset
def _get_dataset():
inputs = array_ops.expand_dims_v2(
constant_op.constant(range(10)), axis=1)
targets = array_ops.expand_dims_v2(
constant_op.constant(range(10)), axis=1)
# Make global batch size 12 for 2 replicas and a non-repeated dataset
# with 10 elements so that we have partial batch
dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(
12, drop_remainder=False)
return dataset
with strategy.scope():
optimizer_fn = gradient_descent_keras.SGD
optimizer = optimizer_fn(0.001)
model = _model_fn()
loss = 'mse'
metrics = ['mae']
model.compile(optimizer, loss, metrics=metrics)
dataset = _get_dataset()
kernel_before = model.get_weights()[0][0]
model.fit(dataset, epochs=10)
kernel_after = model.get_weights()[0][0]
self.assertNotEqual(kernel_before, kernel_after)
self.assertGreater(abs(kernel_before - 1), abs(kernel_after - 1))
with strategy.scope():
optimizer_fn = gradient_descent_keras.SGD
optimizer = optimizer_fn(0.001)
model = _model_fn()
loss = 'mse'
metrics = ['mae']
model.compile(
optimizer,
loss,
metrics=metrics)
dataset = _get_dataset()
kernel_before = model.get_weights()[0][0]
model.fit(dataset, epochs=10)
kernel_after = model.get_weights()[0][0]
self.assertNotEqual(kernel_before, kernel_after)
self.assertGreater(abs(kernel_before-1), abs(kernel_after-1))
if __name__ == '__main__':
v2_compat.enable_v2_behavior()

View File

@ -183,6 +183,8 @@ class KerasCallbackMultiProcessTest(parameterized.TestCase, test.TestCase):
def proc_model_checkpoint_works_with_same_file_path(test_obj,
saving_filepath):
if multi_process_runner.is_oss():
test_obj.skipTest('TODO(b/170838633): Failing in OSS')
model, _, train_ds, steps = _model_setup(test_obj, file_format='')
num_epoch = 4

View File

@ -73,6 +73,9 @@ class CollectiveOpTest(test.TestCase):
def testCheckHealthPeerDown(self):
if multi_process_runner.is_oss():
self.skipTest("TODO(b/170838845): Failing in OSS")
def worker_fn():
enable_collective_ops(cluster_resolver_lib.TFConfigClusterResolver())
context.context().check_collective_ops_peer_health(