Enable user to choose between all-reduce implementations in

MultiWorkerMirroredStrategy.

Possible choices: AUTO, RING - which uses `common_runtime/ring_reducer.{cc,h}`,
and NCCL - which uses Nvidia NCCL for all-reduce.

PiperOrigin-RevId: 236000699
This commit is contained in:
Ayush Dubey 2019-02-27 15:06:50 -08:00 committed by TensorFlower Gardener
parent 4c6563e4d8
commit 34024edf7f
11 changed files with 140 additions and 17 deletions

View File

@ -19,6 +19,7 @@ from __future__ import division
from __future__ import print_function
from tensorflow.python.distribute import collective_all_reduce_strategy
from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib
from tensorflow.python.distribute import distribute_lib
from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver
from tensorflow.python.distribute.cluster_resolver import TFConfigClusterResolver
@ -41,22 +42,34 @@ class CollectiveAllReduceStrategy(distribute_lib.DistributionStrategy):
distributed environment.
"""
def __init__(self, num_gpus_per_worker=0):
def __init__(self,
num_gpus_per_worker=0,
communication=cross_device_ops_lib.CollectiveCommunication.AUTO):
"""Initializes the object.
Args:
num_gpus_per_worker: number of local GPUs or GPUs per worker, the default
is 0 meaning CPU only.
communication: optional Enum of type
`distribute.experimental.CollectiveCommunication`. This provides a way
for the user to override the choice of collective op communication.
Possible values include `AUTO`, `RING`, and `NCCL`.
"""
super(CollectiveAllReduceStrategy, self).__init__(
CollectiveAllReduceExtended(self, num_gpus_per_worker))
CollectiveAllReduceExtended(
self,
num_gpus_per_worker=num_gpus_per_worker,
communication=communication))
class CollectiveAllReduceExtended(
collective_all_reduce_strategy.CollectiveAllReduceExtended):
"""Implementation of CollectiveAllReduceStrategy."""
def __init__(self, container_strategy, num_gpus_per_worker):
def __init__(self,
container_strategy,
num_gpus_per_worker,
communication):
# Use TFConfigClusterResolver to parse TF_CONFIG. We don't want to change
# the constructor's interface to allow customized cluster resolver. Use
# SimpleClusterResolver to override num_accelerators.
@ -67,4 +80,6 @@ class CollectiveAllReduceExtended(
task_id=tfconfig.task_id,
num_accelerators={"GPU": num_gpus_per_worker})
super(CollectiveAllReduceExtended, self).__init__(
container_strategy, cluster_resolver=cluster_resolver)
container_strategy,
communication=communication,
cluster_resolver=cluster_resolver)

View File

@ -30,6 +30,7 @@ from tensorflow.core.protobuf import rewriter_config_pb2
from tensorflow.python import keras
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.distribute import collective_all_reduce_strategy as core_collective_all_reduce_strategy
from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib
from tensorflow.python.distribute import cross_device_utils
from tensorflow.python.distribute import distribute_lib
from tensorflow.python.distribute import multi_worker_util
@ -62,7 +63,9 @@ class MockCollectiveAllReduceStrategy(distribute_lib.DistributionStrategy):
def __init__(self, cluster_resolver):
super(MockCollectiveAllReduceStrategy, self).__init__(
core_collective_all_reduce_strategy.CollectiveAllReduceExtended(
self, cluster_resolver=cluster_resolver))
self,
communication=cross_device_ops_lib.CollectiveCommunication.AUTO,
cluster_resolver=cluster_resolver))
def create_test_objects(cluster_spec=None,

View File

@ -34,6 +34,7 @@ from tensorflow.contrib.distribute.python import multi_worker_test_base
from tensorflow.contrib.distribute.python import parameter_server_strategy
from tensorflow.contrib.optimizer_v2 import adagrad
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib
from tensorflow.python.distribute import distribute_coordinator as dc
from tensorflow.python.distribute import estimator_training as dc_training
from tensorflow.python.distribute.distribute_config import DistributeConfig
@ -287,6 +288,34 @@ class DistributeCoordinatorIntegrationTest(
cluster_spec)
self._inspect_train_and_eval_events(estimator)
@combinations.generate(
combinations.combine(
mode=["graph"],
eval_distribute_class=[
None,
mirrored_strategy.MirroredStrategy,
mirrored_strategy.CoreMirroredStrategy,
parameter_server_strategy.ParameterServerStrategy,
],
required_gpus=[0, 1]))
def test_complete_flow_standalone_client_collective_nccl(
self, eval_distribute_class):
train_distribute = (
collective_all_reduce_strategy.CollectiveAllReduceStrategy(
num_gpus_per_worker=context.num_gpus(),
communication=cross_device_ops_lib.CollectiveCommunication.NCCL))
if eval_distribute_class:
eval_distribute = self._get_strategy_object(eval_distribute_class)
else:
eval_distribute = None
cluster_spec = copy.deepcopy(self._cluster_spec)
cluster_spec.pop("ps", None)
estimator = self._complete_flow(train_distribute, eval_distribute,
cluster_spec)
self._inspect_train_and_eval_events(estimator)
@combinations.generate(
combinations.combine(
mode=["graph"],
@ -347,7 +376,7 @@ class DistributeCoordinatorIntegrationTest(
parameter_server_strategy.ParameterServerStrategy,
],
required_gpus=[0, 1]))
def test_complete_flow_indepedent_worker_between_graph(
def test_complete_flow_independent_worker_between_graph(
self, train_distribute_cls, eval_distribute_cls):
if (context.num_gpus() < 2 and eval_distribute_cls ==
collective_all_reduce_strategy.CollectiveAllReduceStrategy):
@ -399,8 +428,8 @@ class DistributeCoordinatorIntegrationTest(
mirrored_strategy.CoreMirroredStrategy
],
required_gpus=[0, 1]))
def test_complete_flow_indepedent_worker_in_graph(self, train_distribute_cls,
eval_distribute_cls):
def test_complete_flow_independent_worker_in_graph(self, train_distribute_cls,
eval_distribute_cls):
train_distribute = self._get_strategy_object(train_distribute_cls)
if eval_distribute_cls:

View File

@ -56,12 +56,22 @@ class CollectiveAllReduceStrategy(distribute_lib.DistributionStrategy):
which mirrores models on GPUs of all machines in a cluster. In the current
implementation, it uses all GPUs in a cluster and it assumes all workers have
the same number of GPUs.
Args:
communication: optional Enum of type
`distribute.experimental.CollectiveCommunication`. This provides a way
for the user to override the choice of collective op communication.
Possible values include `AUTO`, `RING`, and `NCCL`.
"""
def __init__(self):
def __init__(
self,
communication=cross_device_ops_lib.CollectiveCommunication.AUTO):
"""Initializes the object."""
super(CollectiveAllReduceStrategy, self).__init__(
CollectiveAllReduceExtended(self))
CollectiveAllReduceExtended(
self,
communication=communication))
class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended):
@ -69,10 +79,14 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended):
def __init__(self,
container_strategy,
communication,
cluster_resolver=TFConfigClusterResolver()):
distribute_lib.DistributionStrategyExtended.__init__(
self, container_strategy)
self._cross_device_ops = None
assert isinstance(
communication,
cross_device_ops_lib.CollectiveCommunication)
self._communication = communication
self._initialize_strategy(cluster_resolver)
assert isinstance(self._get_cross_device_ops(),
cross_device_ops_lib.CollectiveAllReduce)
@ -166,10 +180,11 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended):
self._task_id = task_id
logging.info(
"Multi-worker CollectiveAllReduceStrategy with "
"cluster_spec = %r, task_type = %r, task_id = %r, "
"num_workers = %r, local_devices = %r", cluster_spec.as_dict(),
task_type, task_id, self._num_workers, local_devices)
"Multi-worker CollectiveAllReduceStrategy with cluster_spec = %r, "
"task_type = %r, task_id = %r, num_workers = %r, local_devices = %r, "
"communication = %s", cluster_spec.as_dict(), task_type,
task_id, self._num_workers, local_devices,
self._communication)
def _create_variable(self, next_creator, *args, **kwargs):
colocate_with = kwargs.pop("colocate_with", None)
@ -333,6 +348,11 @@ class CollectiveAllReduceExtended(mirrored_strategy.MirroredExtended):
del rewrite_options.scoped_allocator_opts.enable_op[:]
rewrite_options.scoped_allocator_opts.enable_op.append("CollectiveReduce")
if ((self._communication ==
cross_device_ops_lib.CollectiveCommunication.NCCL) and
self._num_gpus_per_worker > 0):
updated_config.experimental.collective_nccl = True
if not self._cluster_spec:
return updated_config

View File

@ -19,6 +19,7 @@ from __future__ import division
from __future__ import print_function
import collections
import enum
import six
from tensorflow.python.client import device_lib
@ -924,6 +925,21 @@ class MultiWorkerAllReduce(AllReduceCrossDeviceOps):
reduce_op)
@tf_export("distribute.experimental.CollectiveCommunication")
class CollectiveCommunication(enum.Enum):
"""Communication choices for CollectiveOps.
* `AUTO`: Default to runtime's automatic choices.
* `RING`: TensorFlow's ring algorithms for all-reduce and
all-gather.
* `NCCL`: Use ncclAllReduce for all-reduce, and ring algorithms for
all-gather. TODO(ayushd): add ncclAllGather implementation.
"""
AUTO = "AUTO"
RING = "RING"
NCCL = "NCCL"
# TODO(yuefengz): support in-graph collective all-reduce.
class CollectiveAllReduce(CrossDeviceOps):
"""All-reduce cross device ops using collective ops.

View File

@ -0,0 +1,16 @@
path: "tensorflow.distribute.experimental.CollectiveCommunication"
tf_class {
is_instance: "<enum \'CollectiveCommunication\'>"
member {
name: "AUTO"
mtype: "<enum \'CollectiveCommunication\'>"
}
member {
name: "NCCL"
mtype: "<enum \'CollectiveCommunication\'>"
}
member {
name: "RING"
mtype: "<enum \'CollectiveCommunication\'>"
}
}

View File

@ -13,7 +13,7 @@ tf_class {
}
member_method {
name: "__init__"
argspec: "args=[\'self\'], varargs=None, keywords=None, defaults=None"
argspec: "args=[\'self\', \'communication\'], varargs=None, keywords=None, defaults=[\'CollectiveCommunication.AUTO\'], "
}
member_method {
name: "broadcast"

View File

@ -1,5 +1,9 @@
path: "tensorflow.distribute.experimental"
tf_module {
member {
name: "CollectiveCommunication"
mtype: "<class \'enum.EnumMeta\'>"
}
member {
name: "MultiWorkerMirroredStrategy"
mtype: "<type \'type\'>"

View File

@ -0,0 +1,16 @@
path: "tensorflow.distribute.experimental.CollectiveCommunication"
tf_class {
is_instance: "<enum \'CollectiveCommunication\'>"
member {
name: "AUTO"
mtype: "<enum \'CollectiveCommunication\'>"
}
member {
name: "NCCL"
mtype: "<enum \'CollectiveCommunication\'>"
}
member {
name: "RING"
mtype: "<enum \'CollectiveCommunication\'>"
}
}

View File

@ -13,7 +13,7 @@ tf_class {
}
member_method {
name: "__init__"
argspec: "args=[\'self\'], varargs=None, keywords=None, defaults=None"
argspec: "args=[\'self\', \'communication\'], varargs=None, keywords=None, defaults=[\'CollectiveCommunication.AUTO\'], "
}
member_method {
name: "broadcast"

View File

@ -1,5 +1,9 @@
path: "tensorflow.distribute.experimental"
tf_module {
member {
name: "CollectiveCommunication"
mtype: "<class \'enum.EnumMeta\'>"
}
member {
name: "MultiWorkerMirroredStrategy"
mtype: "<type \'type\'>"