From d5ab30ca1450876c0de65e896279dc898c288d6a Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Tue, 20 Oct 2020 13:56:33 -0700 Subject: [PATCH] Graduate MultiWorkerMirroredStrategy out of experimental Over the past months we've several improvements: - Test coverage is now on par with other strategies. - Peer failure will no longer cause the cluster to hang. - Major issues with saving are fixed. - gather() API is added. PiperOrigin-RevId: 338132035 Change-Id: I384c084717cd5f2b6167668ebe96af0f7b371530 --- RELEASE.md | 4 - tensorflow/python/distribute/BUILD | 8 - .../collective_all_reduce_strategy.py | 154 +++--------------- .../collective_all_reduce_strategy_test.py | 24 --- .../distribute/v1/cross_device_ops_test.py | 6 +- ...bute.-multi-worker-mirrored-strategy.pbtxt | 91 ----------- ...ntal.-multi-worker-mirrored-strategy.pbtxt | 1 - .../tensorflow.distribute.experimental.pbtxt | 2 +- .../api/golden/v2/tensorflow.distribute.pbtxt | 4 - 9 files changed, 22 insertions(+), 272 deletions(-) delete mode 100644 tensorflow/tools/api/golden/v2/tensorflow.distribute.-multi-worker-mirrored-strategy.pbtxt diff --git a/RELEASE.md b/RELEASE.md index 8fa8c01268f..9286c7a2236 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -91,10 +91,6 @@ `tf.config.experimental.enable_tensor_float_32_execution`. * `tf.distribute`: - * `MultiWorkerMirroredStrategy` is graduated out of experimental. - * Peer failure will no longer cause the cluster to hang. - * Major issues with saving are fixed. - * See [Multi-worker training with Keras](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras) for a tutorial. * Deprecated `experimental_distribute_datasets_from_function` method and renamed it to `distribute_datasets_from_function` as it is no longer experimental. ## Bug Fixes and Other Changes diff --git a/tensorflow/python/distribute/BUILD b/tensorflow/python/distribute/BUILD index d32a4781b7c..9bde802d549 100644 --- a/tensorflow/python/distribute/BUILD +++ b/tensorflow/python/distribute/BUILD @@ -429,24 +429,16 @@ py_library( ":collective_util", ":cross_device_ops", ":cross_device_utils", - ":device_util", - ":distribute_lib", - ":distribute_utils", ":input_lib", ":mirrored_strategy", ":multi_worker_util", ":numpy_dataset", - ":reduce_util", ":values", "//tensorflow/core:protos_all_py", "//tensorflow/python:array_ops", "//tensorflow/python:collective_ops", - "//tensorflow/python:errors", "//tensorflow/python:framework_ops", - "//tensorflow/python:platform", - "//tensorflow/python:tf_export", "//tensorflow/python:training", - "//tensorflow/python:util", "//tensorflow/python/distribute/cluster_resolver:cluster_resolver_lib", "//tensorflow/python/eager:context", ], diff --git a/tensorflow/python/distribute/collective_all_reduce_strategy.py b/tensorflow/python/distribute/collective_all_reduce_strategy.py index 674d70b2bd7..b10e3d7350c 100644 --- a/tensorflow/python/distribute/collective_all_reduce_strategy.py +++ b/tensorflow/python/distribute/collective_all_reduce_strategy.py @@ -37,7 +37,6 @@ from tensorflow.python.distribute import multi_worker_util from tensorflow.python.distribute import numpy_dataset from tensorflow.python.distribute import reduce_util from tensorflow.python.distribute import values -from tensorflow.python.distribute.cluster_resolver import ClusterResolver from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver from tensorflow.python.distribute.cluster_resolver import TFConfigClusterResolver from tensorflow.python.eager import context @@ -47,12 +46,10 @@ from tensorflow.python.ops import array_ops from tensorflow.python.ops import collective_ops from tensorflow.python.platform import tf_logging as logging from tensorflow.python.training.tracking import base -from tensorflow.python.util import deprecation from tensorflow.python.util.tf_export import tf_export -# pylint: disable=line-too-long -@tf_export("distribute.MultiWorkerMirroredStrategy", v1=[]) +@tf_export("distribute.experimental.MultiWorkerMirroredStrategy", v1=[]) class CollectiveAllReduceStrategy(distribute_lib.Strategy): """A distribution strategy for synchronous training on multiple workers. @@ -66,12 +63,7 @@ class CollectiveAllReduceStrategy(distribute_lib.Strategy): `cluster_resolver` correctly. For example, if you are using `tf.distribute.cluster_resolver.TFConfigClusterResolver`, each worker needs to have its corresponding `task_type` and `task_id` set in the `TF_CONFIG` - environment variable. An example TF_CONFIG on worker-0 of a two worker cluster - is: - - ``` - TF_CONFIG = '{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }' - ``` + environment variable. Your program runs on each worker as-is. Note that collectives require each worker to participate. All `tf.distribute` and non `tf.distribute` API may use @@ -84,57 +76,8 @@ class CollectiveAllReduceStrategy(distribute_lib.Strategy): strategy uses. If it's zero, the strategy uses the CPU. All workers need to use the same number of devices, otherwise the behavior is undefined. - This strategy is not intended for TPU. Use `tf.distribute.TPUStrategy` - instead. - - After setting up TF_CONFIG, using this strategy is similar to using - `tf.distribute.MirroredStrategy` and `tf.distribute.TPUStrategy`. - - ``` - strategy = tf.distribute.MultiWorkerMirroredStrategy() - - with strategy.scope(): - model = tf.keras.Sequential([ - tf.keras.layers.Dense(2, input_shape=(5,)), - ]) - optimizer = tf.keras.optimizers.SGD(learning_rate=0.1) - - def dataset_fn(ctx): - x = np.random.random((2, 5)).astype(np.float32) - y = np.random.randint(2, size=(2, 1)) - dataset = tf.data.Dataset.from_tensor_slices((x, y)) - return dataset.repeat().batch(1, drop_remainder=True) - dist_dataset = strategy.distribute_datasets_from_function(dataset_fn) - - model.compile() - model.fit(dist_dataset) - ``` - - You can also write your own training loop: - - ``` - @tf.function - def train_step(iterator): - - def step_fn(inputs): - features, labels = inputs - with tf.GradientTape() as tape: - logits = model(features, training=True) - loss = tf.keras.losses.sparse_categorical_crossentropy( - labels, logits) - - grads = tape.gradient(loss, model.trainable_variables) - optimizer.apply_gradients(zip(grads, model.trainable_variables)) - - strategy.run(step_fn, args=(next(iterator),)) - - for _ in range(NUM_STEP): - train_step(iterator) - ``` - - See - [Multi-worker training with Keras](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras) - for a detailed tutorial. + This strategy is not intended for TPU. Use + `tf.distribute.experimental.TPUStrategy` instead. __Saving__ @@ -155,7 +98,6 @@ class CollectiveAllReduceStrategy(distribute_lib.Strategy): Tensorflow API. """ - # pylint: enable=line-too-long # TODO(anjalisridhar): Update our guides with examples showing how we can use # the cluster_resolver argument. @@ -164,23 +106,21 @@ class CollectiveAllReduceStrategy(distribute_lib.Strategy): _collective_key_base = 0 def __init__(self, - cluster_resolver=None, - communication_options=None): + communication=collective_util.CommunicationImplemenation.AUTO, + cluster_resolver=None): """Creates the strategy. Args: + communication: optional + `tf.distribute.experimental.CommunicationImplemenation`. This is a hint + on the preferred collective communication implementation. Possible + values include `AUTO`, `RING`, and `NCCL`. cluster_resolver: optional `tf.distribute.cluster_resolver.ClusterResolver`. If `None`, `tf.distribute.cluster_resolver.TFConfigClusterResolver` is used. - communication_options: optional - `tf.distribute.experimental.CommunicationOptions`. This configures the - default options for cross device communications. It can be overridden by - options provided to the communication APIs like - `tf.distribute.ReplicaContext.all_reduce`. See - `tf.distribute.experimental.CommunicationOptions` for details. """ - if communication_options is None: - communication_options = collective_util.Options() + communication_options = collective_util.Options( + implementation=communication) super(CollectiveAllReduceStrategy, self).__init__( CollectiveAllReduceExtended( self, @@ -196,9 +136,12 @@ class CollectiveAllReduceStrategy(distribute_lib.Strategy): "num_replicas_per_worker").set(self.extended._num_gpus_per_worker) @classmethod - def _from_local_devices(cls, devices, communication_options=None): + def _from_local_devices( + cls, + devices, + communication=collective_util.CommunicationImplemenation.AUTO): """A convenience method to create an object with a list of devices.""" - obj = cls(communication_options=communication_options) + obj = cls(communication) obj.extended._initialize_local(TFConfigClusterResolver(), devices=devices) # pylint: disable=protected-access return obj @@ -215,58 +158,6 @@ class CollectiveAllReduceStrategy(distribute_lib.Strategy): return self.extended._cluster_resolver # pylint: disable=protected-access -class _CollectiveAllReduceStrategyExperimentalMeta(type): - - @classmethod - def __instancecheck__(cls, instance): - # This is to make isinstance(tf.distribute.MultiWorkerMirroredStrategy(), - # tf.distribute.experimental.MultiWorkerMirroredStrategy). Some libraries is - # performing such check. - return isinstance(instance, CollectiveAllReduceStrategy) - - -@tf_export("distribute.experimental.MultiWorkerMirroredStrategy", v1=[]) -class _CollectiveAllReduceStrategyExperimental( - CollectiveAllReduceStrategy, - metaclass=_CollectiveAllReduceStrategyExperimentalMeta): - - __doc__ = CollectiveAllReduceStrategy.__doc__ - - @deprecation.deprecated( - None, "use distribute.MultiWorkerMirroredStrategy instead") - def __init__(self, - communication=collective_util.CommunicationImplemenation.AUTO, - cluster_resolver=None): - """Creates the strategy. - - Args: - communication: optional - `tf.distribute.experimental.CommunicationImplementation`. This is a hint - on the preferred collective communication implementation. Possible - values include `AUTO`, `RING`, and `NCCL`. - cluster_resolver: optional - `tf.distribute.cluster_resolver.ClusterResolver`. If `None`, - `tf.distribute.cluster_resolver.TFConfigClusterResolver` is used. - """ - communication_options = collective_util.Options( - implementation=communication) - super(_CollectiveAllReduceStrategyExperimental, - self).__init__(cluster_resolver, communication_options) - - @classmethod - def _from_local_devices( - cls, - devices, - communication=collective_util.CommunicationImplemenation.AUTO): - """A convenience method to create an object with a list of devices.""" - obj = cls(communication) - obj.extended._initialize_local(TFConfigClusterResolver(), devices=devices) # pylint: disable=protected-access - return obj - - -_CollectiveAllReduceStrategyExperimental.__name__ = CollectiveAllReduceStrategy.__name__ - - @tf_export(v1=["distribute.experimental.MultiWorkerMirroredStrategy"]) # pylint: disable=missing-docstring class CollectiveAllReduceStrategyV1(distribute_lib.StrategyV1): @@ -309,16 +200,9 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended): def __init__(self, container_strategy, cluster_resolver, communication_options): - if not isinstance(communication_options, collective_util.Options): - raise ValueError("communication_options must be an instance of " - "tf.distribute.experimental.CommunicationOptions") self._cluster_resolver = cluster_resolver or TFConfigClusterResolver() - if not isinstance(self._cluster_resolver, ClusterResolver): - raise ValueError("cluster_resolver must be an instance of " - "tf.distribute.cluster_resolver.ClusterResolver") distribute_lib.StrategyExtendedV1.__init__(self, container_strategy) self._communication_options = communication_options - self._collective_key_base = container_strategy._collective_key_base # pylint: disable=protected-access self._initialize_strategy(self._cluster_resolver) self._cfer_fn_cache = weakref.WeakKeyDictionary() self.experimental_enable_get_next_as_optional = True @@ -364,7 +248,7 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended): self._host_input_device = numpy_dataset.SingleDevice(self._worker_device) self._collective_keys = cross_device_utils.CollectiveKeys( - group_key_start=1 + self._collective_key_base) + group_key_start=1 + CollectiveAllReduceStrategy._collective_key_base) # pylint: disable=protected-access self._cross_device_ops = cross_device_ops_lib.CollectiveAllReduce( devices=local_devices, group_size=len(local_devices), @@ -479,7 +363,7 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended): local_devices = (self._worker_device,) self._collective_keys = cross_device_utils.CollectiveKeys( - group_key_start=1 + self._collective_key_base) + group_key_start=1 + CollectiveAllReduceStrategy._collective_key_base) # pylint: disable=protected-access self._cross_device_ops = cross_device_ops_lib.CollectiveAllReduce( devices=local_devices, group_size=len(local_devices) * self._num_workers, diff --git a/tensorflow/python/distribute/collective_all_reduce_strategy_test.py b/tensorflow/python/distribute/collective_all_reduce_strategy_test.py index 39d2b432a25..305008f6cb8 100644 --- a/tensorflow/python/distribute/collective_all_reduce_strategy_test.py +++ b/tensorflow/python/distribute/collective_all_reduce_strategy_test.py @@ -62,8 +62,6 @@ CollectiveAllReduceStrategy = ( collective_all_reduce_strategy.CollectiveAllReduceStrategy) CollectiveAllReduceExtended = ( collective_all_reduce_strategy.CollectiveAllReduceExtended) -_CollectiveAllReduceStrategyExperimental = ( - collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental) def create_test_objects(cluster_spec=None, @@ -612,27 +610,5 @@ class CollectiveAllReduceStrategyV2Test(test.TestCase, parameterized.TestCase): strategy.extended._num_workers, results[1].numpy()) -class ExperimentalCompatibilityTest(test.TestCase): - - def testIsInstance(self): - # It's not uncommon for people to special case MultiWorkerMirroredStrategy, - # so we need to make sure isinstance check works for combinations between - # the experimental and non-experimental endpoints. - strategy = CollectiveAllReduceStrategy() - experimental_strategy = _CollectiveAllReduceStrategyExperimental() - self.assertIsInstance(strategy, CollectiveAllReduceStrategy) - self.assertIsInstance(strategy, _CollectiveAllReduceStrategyExperimental) - self.assertIsInstance(experimental_strategy, CollectiveAllReduceStrategy) - self.assertIsInstance(experimental_strategy, - _CollectiveAllReduceStrategyExperimental) - - def testName(self): - # Estimator checks the __name__ to special case MultiWorkerMirroredStrategy. - self.assertEqual(CollectiveAllReduceStrategy.__name__, - 'CollectiveAllReduceStrategy') - self.assertEqual(_CollectiveAllReduceStrategyExperimental.__name__, - 'CollectiveAllReduceStrategy') - - if __name__ == '__main__': test_util.main() diff --git a/tensorflow/python/distribute/v1/cross_device_ops_test.py b/tensorflow/python/distribute/v1/cross_device_ops_test.py index a38c3c705ea..d552115bf4b 100644 --- a/tensorflow/python/distribute/v1/cross_device_ops_test.py +++ b/tensorflow/python/distribute/v1/cross_device_ops_test.py @@ -470,9 +470,8 @@ class CollectiveAllReduceTest(multi_worker_test_base.MultiWorkerTestBase, devices = ["/device:CPU:0"] if use_strategy_object: - comm_options = collective_util.Options(implementation=communication) strategy = (mwms_lib.CollectiveAllReduceStrategy - ._from_local_devices(devices, comm_options)) # pylint: disable=protected-access + ._from_local_devices(devices, communication=communication)) # pylint: disable=protected-access return strategy, devices, "" else: collective_all_reduce_ops = cross_device_ops_lib.CollectiveAllReduce( @@ -501,9 +500,8 @@ class CollectiveAllReduceTest(multi_worker_test_base.MultiWorkerTestBase, task_type=task_type, task_id=task_id, num_accelerators={"GPU": num_gpus}) - comm_options = collective_util.Options(implementation=communication) strategy = mwms_lib.CollectiveAllReduceStrategy( - communication_options=comm_options, cluster_resolver=resolver) + cluster_resolver=resolver, communication=communication) return (strategy, devices, "grpc://" + self._cluster_spec[task_type][task_id]) else: diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.-multi-worker-mirrored-strategy.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.-multi-worker-mirrored-strategy.pbtxt deleted file mode 100644 index 02816f1c5f6..00000000000 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.-multi-worker-mirrored-strategy.pbtxt +++ /dev/null @@ -1,91 +0,0 @@ -path: "tensorflow.distribute.MultiWorkerMirroredStrategy" -tf_class { - is_instance: "" - is_instance: "" - is_instance: "" - is_instance: "" - member { - name: "cluster_resolver" - mtype: "" - } - member { - name: "extended" - mtype: "" - } - member { - name: "num_replicas_in_sync" - mtype: "" - } - member_method { - name: "__init__" - argspec: "args=[\'self\', \'cluster_resolver\', \'communication_options\'], varargs=None, keywords=None, defaults=[\'None\', \'None\'], " - } - member_method { - name: "colocate_vars_with" - argspec: "args=[\'self\', \'colocate_with_variable\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "configure" - argspec: "args=[\'self\', \'session_config\', \'cluster_spec\', \'task_type\', \'task_id\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'None\', \'None\'], " - } - member_method { - name: "distribute_datasets_from_function" - argspec: "args=[\'self\', \'dataset_fn\', \'options\'], varargs=None, keywords=None, defaults=[\'None\'], " - } - member_method { - name: "experimental_distribute_dataset" - argspec: "args=[\'self\', \'dataset\', \'options\'], varargs=None, keywords=None, defaults=[\'None\'], " - } - member_method { - name: "experimental_distribute_datasets_from_function" - argspec: "args=[\'self\', \'dataset_fn\', \'options\'], varargs=None, keywords=None, defaults=[\'None\'], " - } - member_method { - name: "experimental_distribute_values_from_function" - argspec: "args=[\'self\', \'value_fn\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "experimental_local_results" - argspec: "args=[\'self\', \'value\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "experimental_run" - argspec: "args=[\'self\', \'fn\', \'input_iterator\'], varargs=None, keywords=None, defaults=[\'None\'], " - } - member_method { - name: "gather" - argspec: "args=[\'self\', \'value\', \'axis\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "group" - argspec: "args=[\'self\', \'value\', \'name\'], varargs=None, keywords=None, defaults=[\'None\'], " - } - member_method { - name: "make_dataset_iterator" - argspec: "args=[\'self\', \'dataset\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "make_input_fn_iterator" - argspec: "args=[\'self\', \'input_fn\', \'replication_mode\'], varargs=None, keywords=None, defaults=[\'InputReplicationMode.PER_WORKER\'], " - } - member_method { - name: "reduce" - argspec: "args=[\'self\', \'reduce_op\', \'value\', \'axis\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "run" - argspec: "args=[\'self\', \'fn\', \'args\', \'kwargs\', \'options\'], varargs=None, keywords=None, defaults=[\'()\', \'None\', \'None\'], " - } - member_method { - name: "scope" - argspec: "args=[\'self\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "unwrap" - argspec: "args=[\'self\', \'value\'], varargs=None, keywords=None, defaults=None" - } - member_method { - name: "update_config_proto" - argspec: "args=[\'self\', \'config_proto\'], varargs=None, keywords=None, defaults=None" - } -} diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt index 42289ca48dc..9669433cdd8 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt @@ -1,6 +1,5 @@ path: "tensorflow.distribute.experimental.MultiWorkerMirroredStrategy" tf_class { - is_instance: "" is_instance: "" is_instance: "" is_instance: "" diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt index b1bbc2df12f..fdc27f7acab 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt @@ -22,7 +22,7 @@ tf_module { } member { name: "MultiWorkerMirroredStrategy" - mtype: "" + mtype: "" } member { name: "ParameterServerStrategy" diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.pbtxt index 9bd37181958..d3867889a4f 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.pbtxt @@ -36,10 +36,6 @@ tf_module { name: "MirroredStrategy" mtype: "" } - member { - name: "MultiWorkerMirroredStrategy" - mtype: "" - } member { name: "NcclAllReduce" mtype: ""