Stop sharding the memory tests, use different files instead.

Keeping all tests in a single file led to timeouts.

PiperOrigin-RevId: 255234617
This commit is contained in:
Akshay Modi 2019-06-26 11:57:11 -07:00 committed by TensorFlower Gardener
parent fc870cd334
commit bdadd0a4fe
4 changed files with 118 additions and 63 deletions
tensorflow/python/eager

View File

@ -564,27 +564,6 @@ py_library(
],
)
cuda_py_test(
name = "memory_test",
size = "medium",
srcs = ["memory_test.py"],
additional_deps = [
":remote",
"//tensorflow/python/eager:backprop",
"//tensorflow/python/keras",
"//tensorflow/python/eager:test",
"//tensorflow/python:array_ops",
"//tensorflow/python:client_testlib",
"//tensorflow/python:framework_test_lib",
"@six_archive//:six",
],
shard_count = 4,
tags = [
"optonly", # The test is too slow in non-opt mode
],
xla_enable_strict_auto_jit = True,
)
py_library(
name = "def_function",
srcs = ["def_function.py"],

View File

@ -0,0 +1,46 @@
load("//tensorflow:tensorflow.bzl", "cuda_py_test")
package(
licenses = ["notice"], # Apache 2.0
)
# NOTE: Do not add sharding to these tests. If tests run concurrently, they
# seem to confuse the memory_profiler, and the tests begin to flake. Add new
# test files as needed.
cuda_py_test(
name = "memory_test",
size = "medium",
srcs = ["memory_test.py"],
additional_deps = [
"//tensorflow/python/keras",
"//tensorflow/python/eager:backprop",
"//tensorflow/python/eager:test",
"//tensorflow/python:array_ops",
"//tensorflow/python:client_testlib",
"//tensorflow/python:framework_test_lib",
"@six_archive//:six",
],
tags = [
"optonly", # The test is too slow in non-opt mode
],
xla_enable_strict_auto_jit = True,
)
cuda_py_test(
name = "remote_memory_test",
size = "medium",
srcs = ["remote_memory_test.py"],
additional_deps = [
":memory_test",
"//tensorflow/python/eager:backprop",
"//tensorflow/python/eager:remote",
"//tensorflow/python/eager:test",
"//tensorflow/python:array_ops",
"//tensorflow/python:client_testlib",
],
tags = [
"optonly", # The test is too slow in non-opt mode
],
xla_enable_strict_auto_jit = True,
)

View File

@ -24,7 +24,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import six
@ -32,14 +31,11 @@ from tensorflow.python import keras
from tensorflow.python.eager import backprop
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.eager import remote
from tensorflow.python.eager import test
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops.variables import Variable
from tensorflow.python.training import server_lib
# memory_profiler might not be available in the OSS version of TensorFlow.
try:
@ -84,6 +80,10 @@ def assert_no_leak(f, num_iters=100000, increase_threshold_absolute_mb=10):
increase_threshold_absolute_mb)
def memory_profiler_is_available():
return memory_profiler is not None
class MemoryTest(test.TestCase):
def testMemoryLeakAnonymousVariable(self):
@ -96,8 +96,7 @@ class MemoryTest(test.TestCase):
assert_no_leak(f, num_iters=10000)
# TODO(b/136031434): De-flake
def DISABLE_testMemoryLeakInSimpleModelForwardOnly(self):
def testMemoryLeakInSimpleModelForwardOnly(self):
if memory_profiler is None:
self.skipTest("memory_profiler required to run this test")
@ -140,39 +139,3 @@ class MemoryTest(test.TestCase):
graph(constant_op.constant(42))
assert_no_leak(f, num_iters=1000, increase_threshold_absolute_mb=30)
class RemoteWorkerMemoryTest(test.TestCase):
def __init__(self, method):
super(RemoteWorkerMemoryTest, self).__init__(method)
# used for remote worker tests
os.environ["TF_EAGER_REMOTE_USE_SEND_TENSOR_RPC"] = "1"
self._cached_server = server_lib.Server.create_local_server()
self._cached_server_target = self._cached_server.target[len("grpc://"):]
def testMemoryLeakInLocalCopy(self):
if memory_profiler is None:
self.skipTest("memory_profiler required to run this test")
remote.connect_to_remote_host(self._cached_server_target)
# Run a function locally with the input on a remote worker and ensure we
# do not leak a reference to the remote tensor.
@def_function.function
def local_func(i):
return i
def func():
with ops.device("job:worker/replica:0/task:0/device:CPU:0"):
x = array_ops.zeros([1000, 1000], dtypes.int32)
local_func(x)
assert_no_leak(func, num_iters=100, increase_threshold_absolute_mb=50)
if __name__ == "__main__":
test.main()

View File

@ -0,0 +1,67 @@
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for memory leaks in remote eager execution."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from tensorflow.python.eager import def_function
from tensorflow.python.eager import remote
from tensorflow.python.eager import test
from tensorflow.python.eager.memory_tests import memory_test
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.training import server_lib
class RemoteWorkerMemoryTest(test.TestCase):
def __init__(self, method):
super(RemoteWorkerMemoryTest, self).__init__(method)
# used for remote worker tests
os.environ["TF_EAGER_REMOTE_USE_SEND_TENSOR_RPC"] = "1"
self._cached_server = server_lib.Server.create_local_server()
self._cached_server_target = self._cached_server.target[len("grpc://"):]
def testMemoryLeakInLocalCopy(self):
if not memory_test.memory_profiler_is_available():
self.skipTest("memory_profiler required to run this test")
remote.connect_to_remote_host(self._cached_server_target)
# Run a function locally with the input on a remote worker and ensure we
# do not leak a reference to the remote tensor.
@def_function.function
def local_func(i):
return i
def func():
with ops.device("job:worker/replica:0/task:0/device:CPU:0"):
x = array_ops.zeros([1000, 1000], dtypes.int32)
local_func(x)
memory_test.assert_no_leak(
func, num_iters=100, increase_threshold_absolute_mb=50)
if __name__ == "__main__":
test.main()