From d938e55c0297cf3084aaffdcdb013032cd7b02e1 Mon Sep 17 00:00:00 2001 From: Ran Chen Date: Mon, 23 Nov 2020 17:19:16 -0800 Subject: [PATCH] Condition whether to use NCCL for all collectives on the launcher We can only order NCCL for collective V2. This allows to enable NCCL for all collectives just for V2 kernels, and leaving TF1 users unaffected. PiperOrigin-RevId: 343958688 Change-Id: Ib01309e220670d09f78dab7c8f1ae1e01a872f91 --- tensorflow/python/distribute/cross_device_ops.py | 11 ++++++----- tensorflow/python/distribute/cross_device_ops_test.py | 2 -- tensorflow/python/distribute/cross_device_utils.py | 4 ++++ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/tensorflow/python/distribute/cross_device_ops.py b/tensorflow/python/distribute/cross_device_ops.py index 3c424b301a8..0ef4fd8a0c3 100644 --- a/tensorflow/python/distribute/cross_device_ops.py +++ b/tensorflow/python/distribute/cross_device_ops.py @@ -990,11 +990,6 @@ class CollectiveAllReduce(CrossDeviceOps): all workers and then put results on the right destinations. """ - # Whether to only use NCCL for batched all-reduce when NCCL is requested. This - # is because of the lack of mechanism to order NCCL operations - # deterministically. - _limited_nccl = True - def __init__(self, devices, group_size, collective_keys=None): """Initializes the object. @@ -1034,12 +1029,18 @@ class CollectiveAllReduce(CrossDeviceOps): # function building, the executors are not used. self._executors = [] self._launchers = [] + # Whether to only use NCCL for batched all-reduce when NCCL is requested. + # This is because of the lack of mechanism to order NCCL operations + # deterministically. + self._limited_nccl = False for device in self._devices: executor = executor_lib.new_executor(enable_async=True) self._executors.append(executor) launcher = cross_device_utils.CollectiveReplicaLauncher( group_key, group_size, self._collective_keys, device, executor) self._launchers.append(launcher) + if not launcher.can_order_nccl(): + self._limited_nccl = True super(CollectiveAllReduce, self).__init__() diff --git a/tensorflow/python/distribute/cross_device_ops_test.py b/tensorflow/python/distribute/cross_device_ops_test.py index ca17550626a..7edbf71f994 100644 --- a/tensorflow/python/distribute/cross_device_ops_test.py +++ b/tensorflow/python/distribute/cross_device_ops_test.py @@ -107,7 +107,6 @@ def enable_collective_ops(): protocol=cluster_resolver.rpc_layer) context.context().enable_collective_ops(server_def) # Recover default flag values. - cross_device_ops_lib.CollectiveAllReduce._limited_nccl = True cross_device_utils.CollectiveReplicaLauncher._prefer_scoped_allocator = True cross_device_utils.CollectiveReplicaLauncher._prefer_collective_v2 = True cross_device_utils.CollectiveReplicaLauncher._prefer_ordering_token = False @@ -875,7 +874,6 @@ class CollectiveOpsTest(test.TestCase, parameterized.TestCase): def testNcclOrdering(self, num_processes, required_gpus): def replica_fn(): - cross_device_ops_lib.CollectiveAllReduce._limited_nccl = False cross_device_utils.CollectiveReplicaLauncher._prefer_collective_v2 = True cross_device_utils.CollectiveReplicaLauncher._prefer_ordering_token = True collective, devices, _ = self.make_collective(num_processes, diff --git a/tensorflow/python/distribute/cross_device_utils.py b/tensorflow/python/distribute/cross_device_utils.py index 15bc9d0da7d..a58120752aa 100644 --- a/tensorflow/python/distribute/cross_device_utils.py +++ b/tensorflow/python/distribute/cross_device_utils.py @@ -345,6 +345,10 @@ class CollectiveReplicaLauncher(object): return self._ordering_token.handle return None + def can_order_nccl(self): + """Whether this launcher can order NCCL operations.""" + return self._use_ordering_token() + def all_reduce(self, input_tensor, control_input=None,