Disable collective_ops_gpu_test on single GPU and enable on multiple GPUs.

PiperOrigin-RevId: 289146430
Change-Id: If431b7a2a4e48b83b6e8027d98fcc1a85d9cd8a9
This commit is contained in:
Ayush Dubey 2020-01-10 12:47:37 -08:00 committed by TensorFlower Gardener
parent d65a5f1bdf
commit c460561bdc
2 changed files with 55 additions and 69 deletions

View File

@ -2968,9 +2968,12 @@ cuda_py_test(
srcs = ["ops/collective_ops_gpu_test.py"], srcs = ["ops/collective_ops_gpu_test.py"],
python_version = "PY3", python_version = "PY3",
tags = [ tags = [
"no_cuda_on_cpu_tap", "guitar",
"manual",
"multi_gpu",
"no_oss",
"no_rocm", "no_rocm",
"no_windows", "notap",
], ],
deps = [ deps = [
":client_testlib", ":client_testlib",

View File

@ -36,33 +36,28 @@ from tensorflow.python.platform import tf_logging as logging
class CollectiveOpGPUTest(test.TestCase): class CollectiveOpGPUTest(test.TestCase):
def _configure(self, group_size, set_config_proto_nccl=True): @classmethod
"""Set environment variables and return `ConfigProto` for NCCL execution.""" def setUpClass(cls):
# Configure virtual GPU devices """Set group_size = num_gpus = 2 for all tests in this class."""
virtual_devices = [config_pb2.GPUOptions.Experimental.VirtualDevices( super(CollectiveOpGPUTest, cls).setUpClass()
memory_limit_mb=([1 << 10] * group_size))] # 1 GB per virtual GPU # Group size is the number of devices in a group communicating collectively.
gpu_options = config_pb2.GPUOptions( # This will be passed into the collective ops in the tests below.
visible_device_list='0', cls._group_size = 2
experimental=config_pb2.GPUOptions.Experimental(
virtual_devices=virtual_devices))
# Configure NCCL
os.environ['NCCL_DEBUG'] = 'INFO' os.environ['NCCL_DEBUG'] = 'INFO'
os.environ['NCCL_LAUNCH_MODE'] = 'PARALLEL' os.environ['NCCL_LAUNCH_MODE'] = 'PARALLEL'
def _configure(self, set_config_proto_nccl=True):
"""Return `ConfigProto` for NCCL execution."""
experimental = config_pb2.ConfigProto.Experimental() experimental = config_pb2.ConfigProto.Experimental()
if set_config_proto_nccl: if set_config_proto_nccl:
experimental.collective_nccl = True experimental.collective_nccl = True
return config_pb2.ConfigProto(gpu_options=gpu_options, return config_pb2.ConfigProto(experimental=experimental)
experimental=experimental)
def _ensure_context_initialized(self): def _ensure_context_initialized(self):
gpus = config.list_physical_devices('GPU') gpus = config.list_physical_devices('GPU')
if len(gpus) < 1: if len(gpus) < 2:
self.skipTest('Expected at least 1 GPU but found {} GPUs'.format( self.skipTest('Expected at least 2 GPUs but found {} GPUs'.format(
len(gpus))) len(gpus)))
config.set_logical_device_configuration(gpus[0], [
context.LogicalDeviceConfiguration(1024),
context.LogicalDeviceConfiguration(1024)
])
context.ensure_initialized() context.ensure_initialized()
@test_util.run_deprecated_v1 @test_util.run_deprecated_v1
@ -70,20 +65,19 @@ class CollectiveOpGPUTest(test.TestCase):
inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1], inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1],
[0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]] [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2] expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2]
group_size = len(inputs)
group_key = 1 group_key = 1
instance_key = 1 instance_key = 1
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session(config=self._configure(group_size)) as sess: with self.session(config=self._configure()) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
for i in range(group_size): for i in range(self._group_size):
with ops.device(devices[i]): with ops.device(devices[i]):
t = constant_op.constant(inputs[i]) t = constant_op.constant(inputs[i])
collectives.append(collective_ops.all_reduce( collectives.append(collective_ops.all_reduce(
t, group_size, group_key, instance_key, 'Add', 'Div')) t, self._group_size, group_key, instance_key, 'Add', 'Div'))
results = sess.run(collectives) results = sess.run(collectives)
for result in results: for result in results:
self.assertAllClose(result, expected, rtol=1e-5, atol=1e-5) self.assertAllClose(result, expected, rtol=1e-5, atol=1e-5)
@ -91,20 +85,19 @@ class CollectiveOpGPUTest(test.TestCase):
@test_util.run_deprecated_v1 @test_util.run_deprecated_v1
def testInt32Error(self): def testInt32Error(self):
inputs = [[0, 1], [2, 3]] inputs = [[0, 1], [2, 3]]
group_size = len(inputs)
group_key = 1 group_key = 1
instance_key = 50 instance_key = 50
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session(config=self._configure(group_size)) as sess: with self.session(config=self._configure()) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
for i in range(group_size): for i in range(self._group_size):
with ops.device(devices[i]): with ops.device(devices[i]):
t = constant_op.constant(inputs[i], dtype=dtypes.int32) t = constant_op.constant(inputs[i], dtype=dtypes.int32)
collectives.append(collective_ops.all_reduce( collectives.append(collective_ops.all_reduce(
t, group_size, group_key, instance_key, 'Add', 'Div')) t, self._group_size, group_key, instance_key, 'Add', 'Div'))
with self.assertRaisesRegexp( with self.assertRaisesRegexp(
errors.InternalError, errors.InternalError,
'does not support datatype DT_INT32 on DEVICE_GPU'): 'does not support datatype DT_INT32 on DEVICE_GPU'):
@ -115,20 +108,19 @@ class CollectiveOpGPUTest(test.TestCase):
inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1], inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1],
[0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]] [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2] expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2]
group_size = len(inputs)
group_key = 1 group_key = 1
instance_key = 100 instance_key = 100
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session(config=self._configure(group_size)) as sess: with self.session(config=self._configure()) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
for i in range(group_size): for i in range(self._group_size):
with ops.device(devices[i]): with ops.device(devices[i]):
t = constant_op.constant(inputs[i], dtype=dtypes.float16) t = constant_op.constant(inputs[i], dtype=dtypes.float16)
collectives.append(collective_ops.all_reduce( collectives.append(collective_ops.all_reduce(
t, group_size, group_key, instance_key, 'Add', 'Div')) t, self._group_size, group_key, instance_key, 'Add', 'Div'))
results = sess.run(collectives) results = sess.run(collectives)
for result in results: for result in results:
logging.info('i {} result {} expected {}'.format(i, results[i], expected)) logging.info('i {} result {} expected {}'.format(i, results[i], expected))
@ -139,22 +131,20 @@ class CollectiveOpGPUTest(test.TestCase):
inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1], inputs = [[0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1],
[0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]] [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2] expected = [0.2, 1.2, 2.2, 3.2, 4.2, 5.2, 6.2, 7.2]
group_size = len(inputs)
group_key = 1 group_key = 1
instance_key = 1 instance_key = 1
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session( with self.session(
config=self._configure(group_size, config=self._configure(set_config_proto_nccl=False)) as sess:
set_config_proto_nccl=False)) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
for i in range(group_size): for i in range(self._group_size):
with ops.device(devices[i]): with ops.device(devices[i]):
t = constant_op.constant(inputs[i]) t = constant_op.constant(inputs[i])
collectives.append(collective_ops.all_reduce( collectives.append(collective_ops.all_reduce(
t, group_size, group_key, instance_key, 'Add', 'Div', t, self._group_size, group_key, instance_key, 'Add', 'Div',
communication_hint='nccl')) communication_hint='nccl'))
results = sess.run(collectives) results = sess.run(collectives)
for result in results: for result in results:
@ -163,23 +153,22 @@ class CollectiveOpGPUTest(test.TestCase):
@test_util.run_deprecated_v1 @test_util.run_deprecated_v1
def testBasicNcclBroadcast(self): def testBasicNcclBroadcast(self):
tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1] tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1]
group_size = 2
group_key = 1 group_key = 1
instance_key = 1 instance_key = 1
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session(config=self._configure(group_size)) as sess: with self.session(config=self._configure()) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
with ops.device(devices[0]): with ops.device(devices[0]):
t = constant_op.constant(tensor_value) t = constant_op.constant(tensor_value)
collectives.append(collective_ops.broadcast_send( collectives.append(collective_ops.broadcast_send(
t, t.shape, t.dtype, group_size, group_key, instance_key)) t, t.shape, t.dtype, self._group_size, group_key, instance_key))
with ops.device(devices[1]): with ops.device(devices[1]):
t = constant_op.constant(tensor_value) t = constant_op.constant(tensor_value)
collectives.append(collective_ops.broadcast_recv( collectives.append(collective_ops.broadcast_recv(
t.shape, t.dtype, group_size, group_key, instance_key)) t.shape, t.dtype, self._group_size, group_key, instance_key))
results = sess.run(collectives) results = sess.run(collectives)
for result in results: for result in results:
self.assertAllClose(result, tensor_value, rtol=1e-5, atol=1e-5) self.assertAllClose(result, tensor_value, rtol=1e-5, atol=1e-5)
@ -187,12 +176,11 @@ class CollectiveOpGPUTest(test.TestCase):
@test_util.run_deprecated_v1 @test_util.run_deprecated_v1
def testNcclBroadcastDoubleRecv(self): def testNcclBroadcastDoubleRecv(self):
tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1] tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1]
group_size = 2
group_key = 1 group_key = 1
instance_key = 1 instance_key = 1
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session(config=self._configure(group_size)) as sess: with self.session(config=self._configure()) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
@ -200,19 +188,18 @@ class CollectiveOpGPUTest(test.TestCase):
with ops.device(device): with ops.device(device):
t = constant_op.constant(tensor_value) t = constant_op.constant(tensor_value)
collectives.append(collective_ops.broadcast_recv( collectives.append(collective_ops.broadcast_recv(
t.shape, t.dtype, group_size, group_key, instance_key)) t.shape, t.dtype, self._group_size, group_key, instance_key))
with self.assertRaisesRegexp(errors.InternalError, 'found no source'): with self.assertRaisesRegexp(errors.InternalError, 'found no source'):
sess.run(collectives) sess.run(collectives)
@test_util.run_deprecated_v1 @test_util.run_deprecated_v1
def testNcclBroadcastDoubleSend(self): def testNcclBroadcastDoubleSend(self):
tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1] tensor_value = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1]
group_size = 2
group_key = 1 group_key = 1
instance_key = 1 instance_key = 1
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session(config=self._configure(group_size)) as sess: with self.session(config=self._configure()) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
@ -220,7 +207,7 @@ class CollectiveOpGPUTest(test.TestCase):
with ops.device(device): with ops.device(device):
t = constant_op.constant(tensor_value) t = constant_op.constant(tensor_value)
collectives.append(collective_ops.broadcast_send( collectives.append(collective_ops.broadcast_send(
t, t.shape, t.dtype, group_size, group_key, instance_key)) t, t.shape, t.dtype, self._group_size, group_key, instance_key))
with self.assertRaisesRegexp(errors.InternalError, 'already has source'): with self.assertRaisesRegexp(errors.InternalError, 'already has source'):
sess.run(collectives) sess.run(collectives)
@ -230,19 +217,18 @@ class CollectiveOpGPUTest(test.TestCase):
[0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]] [0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]]
expected = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, expected = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1,
0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3] 0.3, 1.3, 2.3, 3.3, 4.3, 5.3, 6.3, 7.3]
group_size = len(inputs)
group_key = 1 group_key = 1
instance_key = 1 instance_key = 1
devices = ['/GPU:{}'.format(i) for i in range(group_size)] devices = ['/GPU:{}'.format(i) for i in range(self._group_size)]
with self.session(config=self._configure(group_size)) as sess: with self.session(config=self._configure()) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
collectives = [] collectives = []
for i in range(group_size): for i in range(self._group_size):
with ops.device(devices[i]): with ops.device(devices[i]):
t = constant_op.constant(inputs[i]) t = constant_op.constant(inputs[i])
collectives.append(collective_ops.all_gather(t, group_size, collectives.append(collective_ops.all_gather(t, self._group_size,
group_key, instance_key)) group_key, instance_key))
results = sess.run(collectives) results = sess.run(collectives)
for result in results: for result in results:
@ -250,23 +236,21 @@ class CollectiveOpGPUTest(test.TestCase):
@test_util.run_deprecated_v1 @test_util.run_deprecated_v1
def testCollectiveDeviceMismatch(self): def testCollectiveDeviceMismatch(self):
group_size = 2
group_key = 10 group_key = 10
instance_key = 20 instance_key = 20
t0 = [1, 2, 3, 4] t0 = [1, 2, 3, 4]
t1 = [5, 6, 7, 8] t1 = [5, 6, 7, 8]
with self.session( with self.session(
config=self._configure(group_size, config=self._configure(set_config_proto_nccl=False)) as sess:
set_config_proto_nccl=False)) as sess:
if not test_util.is_gpu_available(cuda_only=True): if not test_util.is_gpu_available(cuda_only=True):
self.skipTest('No GPU available') self.skipTest('No GPU available')
with ops.device('/CPU:0'): with ops.device('/CPU:0'):
in0 = constant_op.constant(t0) in0 = constant_op.constant(t0)
c0 = collective_ops.all_reduce(in0, group_size, group_key, c0 = collective_ops.all_reduce(in0, self._group_size, group_key,
instance_key, 'Add', 'Id') instance_key, 'Add', 'Id')
with ops.device('/GPU:0'): with ops.device('/GPU:0'):
in1 = constant_op.constant(t1) in1 = constant_op.constant(t1)
c1 = collective_ops.all_reduce(in1, group_size, group_key, c1 = collective_ops.all_reduce(in1, self._group_size, group_key,
instance_key, 'Add', 'Id') instance_key, 'Add', 'Id')
run_options = config_pb2.RunOptions() run_options = config_pb2.RunOptions()
run_options.experimental.collective_graph_key = 100 run_options.experimental.collective_graph_key = 100
@ -280,7 +264,6 @@ class CollectiveOpGPUTest(test.TestCase):
@def_function.function @def_function.function
def run_all_reduce(group_key, instance_key, merge_op): def run_all_reduce(group_key, instance_key, merge_op):
group_size = 2
t0 = [1., 20., 3., 40., 5.] t0 = [1., 20., 3., 40., 5.]
t1 = [10., 2., 30., 4., 50.] t1 = [10., 2., 30., 4., 50.]
os.environ['NCCL_DEBUG'] = 'INFO' os.environ['NCCL_DEBUG'] = 'INFO'
@ -288,13 +271,13 @@ class CollectiveOpGPUTest(test.TestCase):
with ops.device('/GPU:0'): with ops.device('/GPU:0'):
in0 = constant_op.constant(t0) in0 = constant_op.constant(t0)
c0 = collective_ops.all_reduce( c0 = collective_ops.all_reduce(
in0, group_size, group_key, instance_key, merge_op, final_op='Id', in0, self._group_size, group_key, instance_key, merge_op,
communication_hint='nccl') final_op='Id', communication_hint='nccl')
with ops.device('/GPU:1'): with ops.device('/GPU:1'):
in1 = constant_op.constant(t1) in1 = constant_op.constant(t1)
c1 = collective_ops.all_reduce( c1 = collective_ops.all_reduce(
in1, group_size, group_key, instance_key, merge_op, final_op='Id', in1, self._group_size, group_key, instance_key, merge_op,
communication_hint='nccl') final_op='Id', communication_hint='nccl')
return c0, c1 return c0, c1
for combination in [('Max', [10., 20., 30., 40., 50.]), for combination in [('Max', [10., 20., 30., 40., 50.]),