diff --git a/tensorflow/contrib/distribute/python/collective_all_reduce_strategy.py b/tensorflow/contrib/distribute/python/collective_all_reduce_strategy.py index cc09a1a7238..08b472a73b8 100644 --- a/tensorflow/contrib/distribute/python/collective_all_reduce_strategy.py +++ b/tensorflow/contrib/distribute/python/collective_all_reduce_strategy.py @@ -19,6 +19,7 @@ from __future__ import division from __future__ import print_function from tensorflow.python.distribute import collective_all_reduce_strategy +from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib from tensorflow.python.distribute import distribute_lib from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver from tensorflow.python.distribute.cluster_resolver import TFConfigClusterResolver @@ -41,22 +42,34 @@ class CollectiveAllReduceStrategy(distribute_lib.DistributionStrategy): distributed environment. """ - def __init__(self, num_gpus_per_worker=0): + def __init__(self, + num_gpus_per_worker=0, + communication=cross_device_ops_lib.CollectiveCommunication.AUTO): """Initializes the object. Args: num_gpus_per_worker: number of local GPUs or GPUs per worker, the default is 0 meaning CPU only. + communication: optional Enum of type + `distribute.experimental.CollectiveCommunication`. This provides a way + for the user to override the choice of collective op communication. + Possible values include `AUTO`, `RING`, and `NCCL`. """ super(CollectiveAllReduceStrategy, self).__init__( - CollectiveAllReduceExtended(self, num_gpus_per_worker)) + CollectiveAllReduceExtended( + self, + num_gpus_per_worker=num_gpus_per_worker, + communication=communication)) class CollectiveAllReduceExtended( collective_all_reduce_strategy.CollectiveAllReduceExtended): """Implementation of CollectiveAllReduceStrategy.""" - def __init__(self, container_strategy, num_gpus_per_worker): + def __init__(self, + container_strategy, + num_gpus_per_worker, + communication): # Use TFConfigClusterResolver to parse TF_CONFIG. We don't want to change # the constructor's interface to allow customized cluster resolver. Use # SimpleClusterResolver to override num_accelerators. @@ -67,4 +80,6 @@ class CollectiveAllReduceExtended( task_id=tfconfig.task_id, num_accelerators={"GPU": num_gpus_per_worker}) super(CollectiveAllReduceExtended, self).__init__( - container_strategy, cluster_resolver=cluster_resolver) + container_strategy, + communication=communication, + cluster_resolver=cluster_resolver) diff --git a/tensorflow/contrib/distribute/python/collective_all_reduce_strategy_test.py b/tensorflow/contrib/distribute/python/collective_all_reduce_strategy_test.py index 50f3b83eb45..8c8e742f329 100644 --- a/tensorflow/contrib/distribute/python/collective_all_reduce_strategy_test.py +++ b/tensorflow/contrib/distribute/python/collective_all_reduce_strategy_test.py @@ -30,6 +30,7 @@ from tensorflow.core.protobuf import rewriter_config_pb2 from tensorflow.python import keras from tensorflow.python.data.ops import dataset_ops from tensorflow.python.distribute import collective_all_reduce_strategy as core_collective_all_reduce_strategy +from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib from tensorflow.python.distribute import cross_device_utils from tensorflow.python.distribute import distribute_lib from tensorflow.python.distribute import multi_worker_util @@ -62,7 +63,9 @@ class MockCollectiveAllReduceStrategy(distribute_lib.DistributionStrategy): def __init__(self, cluster_resolver): super(MockCollectiveAllReduceStrategy, self).__init__( core_collective_all_reduce_strategy.CollectiveAllReduceExtended( - self, cluster_resolver=cluster_resolver)) + self, + communication=cross_device_ops_lib.CollectiveCommunication.AUTO, + cluster_resolver=cluster_resolver)) def create_test_objects(cluster_spec=None, diff --git a/tensorflow/contrib/distribute/python/estimator_training_test.py b/tensorflow/contrib/distribute/python/estimator_training_test.py index 3f55a8a1c8b..e8513943e8d 100644 --- a/tensorflow/contrib/distribute/python/estimator_training_test.py +++ b/tensorflow/contrib/distribute/python/estimator_training_test.py @@ -34,6 +34,7 @@ from tensorflow.contrib.distribute.python import multi_worker_test_base from tensorflow.contrib.distribute.python import parameter_server_strategy from tensorflow.contrib.optimizer_v2 import adagrad from tensorflow.python.data.ops import dataset_ops +from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib from tensorflow.python.distribute import distribute_coordinator as dc from tensorflow.python.distribute import estimator_training as dc_training from tensorflow.python.distribute.distribute_config import DistributeConfig @@ -287,6 +288,34 @@ class DistributeCoordinatorIntegrationTest( cluster_spec) self._inspect_train_and_eval_events(estimator) + @combinations.generate( + combinations.combine( + mode=["graph"], + eval_distribute_class=[ + None, + mirrored_strategy.MirroredStrategy, + mirrored_strategy.CoreMirroredStrategy, + parameter_server_strategy.ParameterServerStrategy, + ], + required_gpus=[0, 1])) + def test_complete_flow_standalone_client_collective_nccl( + self, eval_distribute_class): + train_distribute = ( + collective_all_reduce_strategy.CollectiveAllReduceStrategy( + num_gpus_per_worker=context.num_gpus(), + communication=cross_device_ops_lib.CollectiveCommunication.NCCL)) + + if eval_distribute_class: + eval_distribute = self._get_strategy_object(eval_distribute_class) + else: + eval_distribute = None + + cluster_spec = copy.deepcopy(self._cluster_spec) + cluster_spec.pop("ps", None) + estimator = self._complete_flow(train_distribute, eval_distribute, + cluster_spec) + self._inspect_train_and_eval_events(estimator) + @combinations.generate( combinations.combine( mode=["graph"], @@ -347,7 +376,7 @@ class DistributeCoordinatorIntegrationTest( parameter_server_strategy.ParameterServerStrategy, ], required_gpus=[0, 1])) - def test_complete_flow_indepedent_worker_between_graph( + def test_complete_flow_independent_worker_between_graph( self, train_distribute_cls, eval_distribute_cls): if (context.num_gpus() < 2 and eval_distribute_cls == collective_all_reduce_strategy.CollectiveAllReduceStrategy): @@ -399,8 +428,8 @@ class DistributeCoordinatorIntegrationTest( mirrored_strategy.CoreMirroredStrategy ], required_gpus=[0, 1])) - def test_complete_flow_indepedent_worker_in_graph(self, train_distribute_cls, - eval_distribute_cls): + def test_complete_flow_independent_worker_in_graph(self, train_distribute_cls, + eval_distribute_cls): train_distribute = self._get_strategy_object(train_distribute_cls) if eval_distribute_cls: diff --git a/tensorflow/python/distribute/collective_all_reduce_strategy.py b/tensorflow/python/distribute/collective_all_reduce_strategy.py index 0fc67efc172..7aa67790c33 100644 --- a/tensorflow/python/distribute/collective_all_reduce_strategy.py +++ b/tensorflow/python/distribute/collective_all_reduce_strategy.py @@ -56,12 +56,22 @@ class CollectiveAllReduceStrategy(distribute_lib.DistributionStrategy): which mirrores models on GPUs of all machines in a cluster. In the current implementation, it uses all GPUs in a cluster and it assumes all workers have the same number of GPUs. + + Args: + communication: optional Enum of type + `distribute.experimental.CollectiveCommunication`. This provides a way + for the user to override the choice of collective op communication. + Possible values include `AUTO`, `RING`, and `NCCL`. """ - def __init__(self): + def __init__( + self, + communication=cross_device_ops_lib.CollectiveCommunication.AUTO): """Initializes the object.""" super(CollectiveAllReduceStrategy, self).__init__( - CollectiveAllReduceExtended(self)) + CollectiveAllReduceExtended( + self, + communication=communication)) class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended): @@ -69,10 +79,14 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended): def __init__(self, container_strategy, + communication, cluster_resolver=TFConfigClusterResolver()): distribute_lib.DistributionStrategyExtended.__init__( self, container_strategy) - self._cross_device_ops = None + assert isinstance( + communication, + cross_device_ops_lib.CollectiveCommunication) + self._communication = communication self._initialize_strategy(cluster_resolver) assert isinstance(self._get_cross_device_ops(), cross_device_ops_lib.CollectiveAllReduce) @@ -166,10 +180,11 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended): self._task_id = task_id logging.info( - "Multi-worker CollectiveAllReduceStrategy with " - "cluster_spec = %r, task_type = %r, task_id = %r, " - "num_workers = %r, local_devices = %r", cluster_spec.as_dict(), - task_type, task_id, self._num_workers, local_devices) + "Multi-worker CollectiveAllReduceStrategy with cluster_spec = %r, " + "task_type = %r, task_id = %r, num_workers = %r, local_devices = %r, " + "communication = %s", cluster_spec.as_dict(), task_type, + task_id, self._num_workers, local_devices, + self._communication) def _create_variable(self, next_creator, *args, **kwargs): colocate_with = kwargs.pop("colocate_with", None) @@ -333,6 +348,11 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended): del rewrite_options.scoped_allocator_opts.enable_op[:] rewrite_options.scoped_allocator_opts.enable_op.append("CollectiveReduce") + if ((self._communication == + cross_device_ops_lib.CollectiveCommunication.NCCL) and + self._num_gpus_per_worker > 0): + updated_config.experimental.collective_nccl = True + if not self._cluster_spec: return updated_config diff --git a/tensorflow/python/distribute/cross_device_ops.py b/tensorflow/python/distribute/cross_device_ops.py index 6f16b4d2900..ef124baf378 100644 --- a/tensorflow/python/distribute/cross_device_ops.py +++ b/tensorflow/python/distribute/cross_device_ops.py @@ -19,6 +19,7 @@ from __future__ import division from __future__ import print_function import collections +import enum import six from tensorflow.python.client import device_lib @@ -924,6 +925,21 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps): reduce_op) +@tf_export("distribute.experimental.CollectiveCommunication") +class CollectiveCommunication(enum.Enum): + """Communication choices for CollectiveOps. + + * `AUTO`: Default to runtime's automatic choices. + * `RING`: TensorFlow's ring algorithms for all-reduce and + all-gather. + * `NCCL`: Use ncclAllReduce for all-reduce, and ring algorithms for + all-gather. TODO(ayushd): add ncclAllGather implementation. + """ + AUTO = "AUTO" + RING = "RING" + NCCL = "NCCL" + + # TODO(yuefengz): support in-graph collective all-reduce. class CollectiveAllReduce(CrossDeviceOps): """All-reduce cross device ops using collective ops. diff --git a/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.-collective-communication.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.-collective-communication.pbtxt new file mode 100644 index 00000000000..7eca1c80d8b --- /dev/null +++ b/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.-collective-communication.pbtxt @@ -0,0 +1,16 @@ +path: "tensorflow.distribute.experimental.CollectiveCommunication" +tf_class { + is_instance: "" + member { + name: "AUTO" + mtype: "" + } + member { + name: "NCCL" + mtype: "" + } + member { + name: "RING" + mtype: "" + } +} diff --git a/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt index 619c3744d6c..72e26114a23 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt @@ -13,7 +13,7 @@ tf_class { } member_method { name: "__init__" - argspec: "args=[\'self\'], varargs=None, keywords=None, defaults=None" + argspec: "args=[\'self\', \'communication\'], varargs=None, keywords=None, defaults=[\'CollectiveCommunication.AUTO\'], " } member_method { name: "broadcast" diff --git a/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.pbtxt index b0d55ade992..cf8cedb50cb 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.distribute.experimental.pbtxt @@ -1,5 +1,9 @@ path: "tensorflow.distribute.experimental" tf_module { + member { + name: "CollectiveCommunication" + mtype: "" + } member { name: "MultiWorkerMirroredStrategy" mtype: "" diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-collective-communication.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-collective-communication.pbtxt new file mode 100644 index 00000000000..7eca1c80d8b --- /dev/null +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-collective-communication.pbtxt @@ -0,0 +1,16 @@ +path: "tensorflow.distribute.experimental.CollectiveCommunication" +tf_class { + is_instance: "" + member { + name: "AUTO" + mtype: "" + } + member { + name: "NCCL" + mtype: "" + } + member { + name: "RING" + mtype: "" + } +} diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt index 619c3744d6c..72e26114a23 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.-multi-worker-mirrored-strategy.pbtxt @@ -13,7 +13,7 @@ tf_class { } member_method { name: "__init__" - argspec: "args=[\'self\'], varargs=None, keywords=None, defaults=None" + argspec: "args=[\'self\', \'communication\'], varargs=None, keywords=None, defaults=[\'CollectiveCommunication.AUTO\'], " } member_method { name: "broadcast" diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt index b0d55ade992..cf8cedb50cb 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.experimental.pbtxt @@ -1,5 +1,9 @@ path: "tensorflow.distribute.experimental" tf_module { + member { + name: "CollectiveCommunication" + mtype: "" + } member { name: "MultiWorkerMirroredStrategy" mtype: ""