From c460561bdcf25d049d3af71c44423c7901007d7e Mon Sep 17 00:00:00 2001
From: Ayush Dubey <ayushd@google.com>
Date: Fri, 10 Jan 2020 12:47:37 -0800
Subject: [PATCH] Disable `collective_ops_gpu_test` on single GPU and enable on
 multiple GPUs.

PiperOrigin-RevId: 289146430
Change-Id: If431b7a2a4e48b83b6e8027d98fcc1a85d9cd8a9
---
 tensorflow/python/BUILD                       |   7 +-
 .../python/ops/collective_ops_gpu_test.py     | 117 ++++++++----------
 2 files changed, 55 insertions(+), 69 deletions(-)

diff --git a/tensorflow/python/BUILD b/tensorflow/python/BUILD
index fe2f98afd00..f08d3e2fde1 100644
--- a/tensorflow/python/BUILD
+++ b/tensorflow/python/BUILD
@@ -2968,9 +2968,12 @@ cuda_py_test(
     srcs = ["ops/collective_ops_gpu_test.py"],
     python_version = "PY3",
     tags = [
-        "no_cuda_on_cpu_tap",
+        "guitar",
+        "manual",
+        "multi_gpu",
+        "no_oss",
         "no_rocm",
-        "no_windows",
+        "notap",
     ],
     deps = [
         ":client_testlib",
diff --git a/tensorflow/python/ops/collective_ops_gpu_test.py b/tensorflow/python/ops/collective_ops_gpu_test.py
index fb769752575..dfa4d445b0d 100644
--- a/tensorflow/python/ops/collective_ops_gpu_test.py
+++ b/tensorflow/python/ops/collective_ops_gpu_test.py
@@ -36,33 +36,28 @@ from tensorflow.python.platform import tf_logging as logging
 
 class CollectiveOpGPUTest(test.TestCase):
 
-  def _configure(self, group_size, set_config_proto_nccl=True):
-    """Set environment variables and return `ConfigProto` for NCCL execution."""
-    # Configure virtual GPU devices
-    virtual_devices = [config_pb2.GPUOptions.Experimental.VirtualDevices(
-        memory_limit_mb=([1 << 10] * group_size))]  # 1 GB per virtual GPU
-    gpu_options = config_pb2.GPUOptions(
-        visible_device_list='0',
-        experimental=config_pb2.GPUOptions.Experimental(
-            virtual_devices=virtual_devices))
-    # Configure NCCL
+  @classmethod
+  def setUpClass(cls):
+    """Set group_size = num_gpus = 2 for all tests in this class."""
+    super(CollectiveOpGPUTest, cls).setUpClass()
+    # Group size is the number of devices in a group communicating collectively.
+    # This will be passed into the collective ops in the tests below.
+    cls._group_size = 2
     os.environ['NCCL_DEBUG'] = 'INFO'
     os.environ['NCCL_LAUNCH_MODE'] = 'PARALLEL'
+
+  def _configure(self, set_config_proto_nccl=True):
+    """Return `ConfigProto` for NCCL execution."""
     experimental = config_pb2.ConfigProto.Experimental()
     if set_config_proto_nccl:
       experimental.collective_nccl = True
-    return config_pb2.ConfigProto(gpu_options=gpu_options,
-                                  experimental=experimental)
+    return config_pb2.ConfigProto(experimental=experimental)
 
   def _ensure_context_initialized(self):
     gpus = config.list_physical_devices('GPU')
-    if len(gpus) < 1:
-      self.skipTest('Expected at least 1 GPU but found {} GPUs'.format(
+    if len(gpus) < 2:
+      self.skipTest('Expected at least 2 GPUs but found {} GPUs'.format(
           len(gpus)))
-    config.set_logical_device_configuration(gpus[0], [
-        context.LogicalDeviceConfiguration(1024),
-        context.LogicalDeviceConfiguration(1024)
-    ])
     context.ensure_initialized()
 
   @test_util.run_deprecated_v1
@@ -70,20 +65,19 @@ class CollectiveOpGPUTest(test.TestCase):
     inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1],
               [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
     expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2]
-    group_size = len(inputs)
     group_key = 1
     instance_key = 1
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
-    with self.session(config=self._configure(group_size)) as sess:
+    with self.session(config=self._configure()) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
-      for i in range(group_size):
+      for i in range(self._group_size):
         with ops.device(devices[i]):
           t = constant_op.constant(inputs[i])
           collectives.append(collective_ops.all_reduce(
-              t, group_size, group_key, instance_key, 'Add', 'Div'))
+              t, self._group_size, group_key, instance_key, 'Add', 'Div'))
       results = sess.run(collectives)
     for result in results:
       self.assertAllClose(result, expected, rtol=1e-5, atol=1e-5)
@@ -91,20 +85,19 @@ class CollectiveOpGPUTest(test.TestCase):
   @test_util.run_deprecated_v1
   def testInt32Error(self):
     inputs = [[0, 1], [2, 3]]
-    group_size = len(inputs)
     group_key = 1
     instance_key = 50
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
-    with self.session(config=self._configure(group_size)) as sess:
+    with self.session(config=self._configure()) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
-      for i in range(group_size):
+      for i in range(self._group_size):
         with ops.device(devices[i]):
           t = constant_op.constant(inputs[i], dtype=dtypes.int32)
           collectives.append(collective_ops.all_reduce(
-              t, group_size, group_key, instance_key, 'Add', 'Div'))
+              t, self._group_size, group_key, instance_key, 'Add', 'Div'))
       with self.assertRaisesRegexp(
           errors.InternalError,
           'does not support datatype DT_INT32 on DEVICE_GPU'):
@@ -115,20 +108,19 @@ class CollectiveOpGPUTest(test.TestCase):
     inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1],
               [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
     expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2]
-    group_size = len(inputs)
     group_key = 1
     instance_key = 100
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
-    with self.session(config=self._configure(group_size)) as sess:
+    with self.session(config=self._configure()) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
-      for i in range(group_size):
+      for i in range(self._group_size):
         with ops.device(devices[i]):
           t = constant_op.constant(inputs[i], dtype=dtypes.float16)
           collectives.append(collective_ops.all_reduce(
-              t, group_size, group_key, instance_key, 'Add', 'Div'))
+              t, self._group_size, group_key, instance_key, 'Add', 'Div'))
       results = sess.run(collectives)
     for result in results:
       logging.info('i {} result {} expected {}'.format(i, results[i], expected))
@@ -139,22 +131,20 @@ class CollectiveOpGPUTest(test.TestCase):
     inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1],
               [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
     expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2]
-    group_size = len(inputs)
     group_key = 1
     instance_key = 1
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
     with self.session(
-        config=self._configure(group_size,
-                               set_config_proto_nccl=False)) as sess:
+        config=self._configure(set_config_proto_nccl=False)) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
-      for i in range(group_size):
+      for i in range(self._group_size):
         with ops.device(devices[i]):
           t = constant_op.constant(inputs[i])
           collectives.append(collective_ops.all_reduce(
-              t, group_size, group_key, instance_key, 'Add', 'Div',
+              t, self._group_size, group_key, instance_key, 'Add', 'Div',
               communication_hint='nccl'))
       results = sess.run(collectives)
     for result in results:
@@ -163,23 +153,22 @@ class CollectiveOpGPUTest(test.TestCase):
   @test_util.run_deprecated_v1
   def testBasicNcclBroadcast(self):
     tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1]
-    group_size = 2
     group_key = 1
     instance_key = 1
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
-    with self.session(config=self._configure(group_size)) as sess:
+    with self.session(config=self._configure()) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
       with ops.device(devices[0]):
         t = constant_op.constant(tensor_value)
         collectives.append(collective_ops.broadcast_send(
-            t, t.shape, t.dtype, group_size, group_key, instance_key))
+            t, t.shape, t.dtype, self._group_size, group_key, instance_key))
       with ops.device(devices[1]):
         t = constant_op.constant(tensor_value)
         collectives.append(collective_ops.broadcast_recv(
-            t.shape, t.dtype, group_size, group_key, instance_key))
+            t.shape, t.dtype, self._group_size, group_key, instance_key))
       results = sess.run(collectives)
     for result in results:
       self.assertAllClose(result, tensor_value, rtol=1e-5, atol=1e-5)
@@ -187,12 +176,11 @@ class CollectiveOpGPUTest(test.TestCase):
   @test_util.run_deprecated_v1
   def testNcclBroadcastDoubleRecv(self):
     tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1]
-    group_size = 2
     group_key = 1
     instance_key = 1
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
-    with self.session(config=self._configure(group_size)) as sess:
+    with self.session(config=self._configure()) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
@@ -200,19 +188,18 @@ class CollectiveOpGPUTest(test.TestCase):
         with ops.device(device):
           t = constant_op.constant(tensor_value)
           collectives.append(collective_ops.broadcast_recv(
-              t.shape, t.dtype, group_size, group_key, instance_key))
+              t.shape, t.dtype, self._group_size, group_key, instance_key))
       with self.assertRaisesRegexp(errors.InternalError, 'found no source'):
         sess.run(collectives)
 
   @test_util.run_deprecated_v1
   def testNcclBroadcastDoubleSend(self):
     tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1]
-    group_size = 2
     group_key = 1
     instance_key = 1
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
-    with self.session(config=self._configure(group_size)) as sess:
+    with self.session(config=self._configure()) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
@@ -220,7 +207,7 @@ class CollectiveOpGPUTest(test.TestCase):
         with ops.device(device):
           t = constant_op.constant(tensor_value)
           collectives.append(collective_ops.broadcast_send(
-              t, t.shape, t.dtype, group_size, group_key, instance_key))
+              t, t.shape, t.dtype, self._group_size, group_key, instance_key))
       with self.assertRaisesRegexp(errors.InternalError, 'already has source'):
         sess.run(collectives)
 
@@ -230,19 +217,18 @@ class CollectiveOpGPUTest(test.TestCase):
               [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
     expected = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1,
                 0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]
-    group_size = len(inputs)
     group_key = 1
     instance_key = 1
-    devices = ['/GPU:{}'.format(i) for i in range(group_size)]
+    devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
 
-    with self.session(config=self._configure(group_size)) as sess:
+    with self.session(config=self._configure()) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       collectives = []
-      for i in range(group_size):
+      for i in range(self._group_size):
         with ops.device(devices[i]):
           t = constant_op.constant(inputs[i])
-          collectives.append(collective_ops.all_gather(t, group_size,
+          collectives.append(collective_ops.all_gather(t, self._group_size,
                                                        group_key, instance_key))
       results = sess.run(collectives)
     for result in results:
@@ -250,23 +236,21 @@ class CollectiveOpGPUTest(test.TestCase):
 
   @test_util.run_deprecated_v1
   def testCollectiveDeviceMismatch(self):
-    group_size = 2
     group_key = 10
     instance_key = 20
     t0 = [1, 2, 3, 4]
     t1 = [5, 6, 7, 8]
     with self.session(
-        config=self._configure(group_size,
-                               set_config_proto_nccl=False)) as sess:
+        config=self._configure(set_config_proto_nccl=False)) as sess:
       if not test_util.is_gpu_available(cuda_only=True):
         self.skipTest('No GPU available')
       with ops.device('/CPU:0'):
         in0 = constant_op.constant(t0)
-        c0 = collective_ops.all_reduce(in0, group_size, group_key,
+        c0 = collective_ops.all_reduce(in0, self._group_size, group_key,
                                        instance_key, 'Add', 'Id')
       with ops.device('/GPU:0'):
         in1 = constant_op.constant(t1)
-        c1 = collective_ops.all_reduce(in1, group_size, group_key,
+        c1 = collective_ops.all_reduce(in1, self._group_size, group_key,
                                        instance_key, 'Add', 'Id')
       run_options = config_pb2.RunOptions()
       run_options.experimental.collective_graph_key = 100
@@ -280,7 +264,6 @@ class CollectiveOpGPUTest(test.TestCase):
 
     @def_function.function
     def run_all_reduce(group_key, instance_key, merge_op):
-      group_size = 2
       t0 = [1., 20., 3., 40., 5.]
       t1 = [10., 2., 30., 4., 50.]
       os.environ['NCCL_DEBUG'] = 'INFO'
@@ -288,13 +271,13 @@ class CollectiveOpGPUTest(test.TestCase):
       with ops.device('/GPU:0'):
         in0 = constant_op.constant(t0)
         c0 = collective_ops.all_reduce(
-            in0, group_size, group_key, instance_key, merge_op, final_op='Id',
-            communication_hint='nccl')
+            in0, self._group_size, group_key, instance_key, merge_op,
+            final_op='Id', communication_hint='nccl')
       with ops.device('/GPU:1'):
         in1 = constant_op.constant(t1)
         c1 = collective_ops.all_reduce(
-            in1, group_size, group_key, instance_key, merge_op, final_op='Id',
-            communication_hint='nccl')
+            in1, self._group_size, group_key, instance_key, merge_op,
+            final_op='Id', communication_hint='nccl')
       return c0, c1
 
     for combination in [('Max', [10., 20., 30., 40., 50.]),