Condition whether to use NCCL for all collectives on the launcher

We can only order NCCL for collective V2. This allows to enable NCCL for all
collectives just for V2 kernels, and leaving TF1 users unaffected.

PiperOrigin-RevId: 343958688
Change-Id: Ib01309e220670d09f78dab7c8f1ae1e01a872f91
This commit is contained in:
Ran Chen 2020-11-23 17:19:16 -08:00 committed by TensorFlower Gardener
parent a55428dc78
commit d938e55c02
3 changed files with 10 additions and 7 deletions

View File

@ -990,11 +990,6 @@ class CollectiveAllReduce(CrossDeviceOps):
all workers and then put results on the right destinations.
"""
# Whether to only use NCCL for batched all-reduce when NCCL is requested. This
# is because of the lack of mechanism to order NCCL operations
# deterministically.
_limited_nccl = True
def __init__(self, devices, group_size, collective_keys=None):
"""Initializes the object.
@ -1034,12 +1029,18 @@ class CollectiveAllReduce(CrossDeviceOps):
# function building, the executors are not used.
self._executors = []
self._launchers = []
# Whether to only use NCCL for batched all-reduce when NCCL is requested.
# This is because of the lack of mechanism to order NCCL operations
# deterministically.
self._limited_nccl = False
for device in self._devices:
executor = executor_lib.new_executor(enable_async=True)
self._executors.append(executor)
launcher = cross_device_utils.CollectiveReplicaLauncher(
group_key, group_size, self._collective_keys, device, executor)
self._launchers.append(launcher)
if not launcher.can_order_nccl():
self._limited_nccl = True
super(CollectiveAllReduce, self).__init__()

View File

@ -107,7 +107,6 @@ def enable_collective_ops():
protocol=cluster_resolver.rpc_layer)
context.context().enable_collective_ops(server_def)
# Recover default flag values.
cross_device_ops_lib.CollectiveAllReduce._limited_nccl = True
cross_device_utils.CollectiveReplicaLauncher._prefer_scoped_allocator = True
cross_device_utils.CollectiveReplicaLauncher._prefer_collective_v2 = True
cross_device_utils.CollectiveReplicaLauncher._prefer_ordering_token = False
@ -875,7 +874,6 @@ class CollectiveOpsTest(test.TestCase, parameterized.TestCase):
def testNcclOrdering(self, num_processes, required_gpus):
def replica_fn():
cross_device_ops_lib.CollectiveAllReduce._limited_nccl = False
cross_device_utils.CollectiveReplicaLauncher._prefer_collective_v2 = True
cross_device_utils.CollectiveReplicaLauncher._prefer_ordering_token = True
collective, devices, _ = self.make_collective(num_processes,

View File

@ -345,6 +345,10 @@ class CollectiveReplicaLauncher(object):
return self._ordering_token.handle
return None
def can_order_nccl(self):
"""Whether this launcher can order NCCL operations."""
return self._use_ordering_token()
def all_reduce(self,
input_tensor,
control_input=None,