STT-tensorflow/tensorflow/python/kernel_tests/qr_op_test.py
2020-12-07 19:46:56 -05:00

313 lines
12 KiB
Python

# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for tensorflow.ops.math_ops.matrix_inverse."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from tensorflow.python.eager import backprop
from tensorflow.python.client import session
from tensorflow.python.eager import context
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import errors_impl
from tensorflow.python.framework import ops
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import gradient_checker_v2
from tensorflow.python.ops import linalg_ops
from tensorflow.python.ops import stateless_random_ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import benchmark
from tensorflow.python.platform import test
def _AddTest(test_class, op_name, testcase_name, fn):
test_name = "_".join(["test", op_name, testcase_name])
if hasattr(test_class, test_name):
raise RuntimeError("Test %s defined more than once" % test_name)
setattr(test_class, test_name, fn)
class QrOpTest(test.TestCase):
@test_util.run_in_graph_and_eager_modes(use_gpu=True)
def testWrongDimensions(self):
# The input to qr should be a tensor of at least rank 2.
scalar = constant_op.constant(1.)
with self.assertRaisesRegex((ValueError, errors_impl.InvalidArgumentError),
"rank.* 2.*0"):
linalg_ops.qr(scalar)
vector = constant_op.constant([1., 2.])
with self.assertRaisesRegex((ValueError, errors_impl.InvalidArgumentError),
"rank.* 2.*1"):
linalg_ops.qr(vector)
@test_util.run_in_graph_and_eager_modes(use_gpu=True)
def testConcurrentExecutesWithoutError(self):
seed = [42, 24]
all_ops = []
for full_matrices_ in True, False:
for rows_ in 4, 5:
for cols_ in 4, 5:
matrix_shape = [rows_, cols_]
matrix1 = stateless_random_ops.stateless_random_normal(
matrix_shape, seed)
matrix2 = stateless_random_ops.stateless_random_normal(
matrix_shape, seed)
self.assertAllEqual(matrix1, matrix2)
q1, r1 = linalg_ops.qr(matrix1, full_matrices=full_matrices_)
q2, r2 = linalg_ops.qr(matrix2, full_matrices=full_matrices_)
all_ops += [q1, q2, r1, r2]
val = self.evaluate(all_ops)
for i in range(0, len(val), 2):
self.assertAllClose(val[i], val[i + 1])
def _GetQrOpTest(dtype_, shape_, full_matrices_, use_static_shape_):
is_complex = dtype_ in (np.complex64, np.complex128)
is_single = dtype_ in (np.float32, np.complex64)
def CompareOrthogonal(self, x, y, rank):
if is_single:
atol = 5e-4
else:
atol = 5e-14
# We only compare the first 'rank' orthogonal vectors since the
# remainder form an arbitrary orthonormal basis for the
# (row- or column-) null space, whose exact value depends on
# implementation details. Notice that since we check that the
# matrices of singular vectors are unitary elsewhere, we do
# implicitly test that the trailing vectors of x and y span the
# same space.
x = x[..., 0:rank]
y = y[..., 0:rank]
# Q is only unique up to sign (complex phase factor for complex matrices),
# so we normalize the sign first.
sum_of_ratios = np.sum(np.divide(y, x), -2, keepdims=True)
phases = np.divide(sum_of_ratios, np.abs(sum_of_ratios))
x *= phases
self.assertAllClose(x, y, atol=atol)
def CheckApproximation(self, a, q, r):
if is_single:
tol = 1e-5
else:
tol = 1e-14
# Tests that a ~= q*r.
a_recon = test_util.matmul_without_tf32(q, r)
self.assertAllClose(a_recon, a, rtol=tol, atol=tol)
def CheckUnitary(self, x):
# Tests that x[...,:,:]^H * x[...,:,:] is close to the identity.
xx = test_util.matmul_without_tf32(x, x, adjoint_a=True)
identity = array_ops.matrix_band_part(array_ops.ones_like(xx), 0, 0)
if is_single:
tol = 1e-5
else:
tol = 1e-14
self.assertAllClose(identity, xx, atol=tol)
@test_util.run_in_graph_and_eager_modes(use_gpu=True)
def Test(self):
if not use_static_shape_ and context.executing_eagerly():
return
np.random.seed(1)
x_np = np.random.uniform(
low=-1.0, high=1.0, size=np.prod(shape_)).reshape(shape_).astype(dtype_)
if is_complex:
x_np += 1j * np.random.uniform(
low=-1.0, high=1.0,
size=np.prod(shape_)).reshape(shape_).astype(dtype_)
if use_static_shape_:
x_tf = constant_op.constant(x_np)
else:
x_tf = array_ops.placeholder(dtype_)
q_tf, r_tf = linalg_ops.qr(x_tf, full_matrices=full_matrices_)
if use_static_shape_:
q_tf_val, r_tf_val = self.evaluate([q_tf, r_tf])
else:
with self.session(use_gpu=True) as sess:
q_tf_val, r_tf_val = sess.run([q_tf, r_tf], feed_dict={x_tf: x_np})
q_dims = q_tf_val.shape
np_q = np.ndarray(q_dims, dtype_)
np_q_reshape = np.reshape(np_q, (-1, q_dims[-2], q_dims[-1]))
new_first_dim = np_q_reshape.shape[0]
x_reshape = np.reshape(x_np, (-1, x_np.shape[-2], x_np.shape[-1]))
for i in range(new_first_dim):
if full_matrices_:
np_q_reshape[i, :, :], _ = np.linalg.qr(
x_reshape[i, :, :], mode="complete")
else:
np_q_reshape[i, :, :], _ = np.linalg.qr(
x_reshape[i, :, :], mode="reduced")
np_q = np.reshape(np_q_reshape, q_dims)
CompareOrthogonal(self, np_q, q_tf_val, min(shape_[-2:]))
CheckApproximation(self, x_np, q_tf_val, r_tf_val)
CheckUnitary(self, q_tf_val)
return Test
class QrGradOpTest(test.TestCase):
@test_util.run_in_graph_and_eager_modes(use_gpu=True)
def testNotImplementedCheck(self):
# Test that the correct message is issued
np.random.seed(42)
matrix = constant_op.constant(
np.random.uniform(low=-1.0, high=1.0, size=(5, 2)).astype(np.float32))
def _NoGrad(x):
with backprop.GradientTape() as tape:
tape.watch(x)
ret = linalg_ops.qr(x, full_matrices=True)
return tape.gradient(ret, x)
m = r"QrGrad not implemented when nrows > ncols and full_matrices is true."
with self.assertRaisesRegex(NotImplementedError, m):
_NoGrad(matrix)
def _GetQrGradOpTest(dtype_, shape_, full_matrices_):
def RandomInput():
a = np.random.uniform(low=-1.0, high=1.0, size=shape_).astype(dtype_)
if dtype_ in [np.complex64, np.complex128]:
a += 1j * np.random.uniform(
low=-1.0, high=1.0, size=shape_).astype(dtype_)
return a
@test_util.run_in_graph_and_eager_modes(use_gpu=True)
@test_util.run_without_tensor_float_32("Tests Qr gradient, which calls matmul"
)
def Test(self):
np.random.seed(42)
# Optimal stepsize for central difference is O(epsilon^{1/3}).
epsilon = np.finfo(dtype_).eps
delta = 0.1 * epsilon**(1.0 / 3.0)
if dtype_ in [np.float32, np.complex64]:
tol = 3e-2
else:
tol = 1e-6
# TODO(b/157171666): Sadly we have to double the computation because
# gradient_checker_v2.compute_gradient expects a list of functions.
funcs = [
lambda a: linalg_ops.qr(a, full_matrices=full_matrices_)[0],
lambda a: linalg_ops.qr(a, full_matrices=full_matrices_)[1]
]
for f in funcs:
theoretical, numerical = gradient_checker_v2.compute_gradient(
f, [RandomInput()], delta=delta)
self.assertAllClose(theoretical, numerical, atol=tol, rtol=tol)
return Test
class QRBenchmark(test.Benchmark):
shapes = [
(4, 4),
(8, 8),
(16, 16),
(101, 101),
(256, 256),
(1024, 1024),
(2048, 2048),
(1024, 2),
(1024, 32),
(1024, 128),
(1024, 512),
(1, 8, 8),
(10, 8, 8),
(100, 8, 8),
(1, 256, 256),
(10, 256, 256),
(100, 256, 256),
]
def benchmarkQROp(self):
for shape_ in self.shapes:
with ops.Graph().as_default(), \
session.Session(config=benchmark.benchmark_config()) as sess, \
ops.device("/cpu:0"):
matrix_value = np.random.uniform(
low=-1.0, high=1.0, size=shape_).astype(np.float32)
matrix = variables.Variable(matrix_value)
q, r = linalg_ops.qr(matrix)
self.evaluate(variables.global_variables_initializer())
self.run_op_benchmark(
sess,
control_flow_ops.group(q, r),
min_iters=25,
name="QR_cpu_{shape}".format(shape=shape_))
if test.is_gpu_available(True):
with ops.Graph().as_default(), \
session.Session(config=benchmark.benchmark_config()) as sess, \
ops.device("/device:GPU:0"):
matrix_value = np.random.uniform(
low=-1.0, high=1.0, size=shape_).astype(np.float32)
matrix = variables.Variable(matrix_value)
q, r = linalg_ops.qr(matrix)
self.evaluate(variables.global_variables_initializer())
self.run_op_benchmark(
sess,
control_flow_ops.group(q, r),
min_iters=25,
name="QR_gpu_{shape}".format(shape=shape_))
if __name__ == "__main__":
for dtype in np.float32, np.float64, np.complex64, np.complex128:
for rows in 1, 2, 5, 10, 32, 100:
for cols in 1, 2, 5, 10, 32, 100:
for full_matrices in False, True:
for batch_dims in [(), (3,)] + [(3, 2)] * (max(rows, cols) < 10):
# TF2 does not support placeholders under eager so we skip it
for use_static_shape in [True, False]:
shape = batch_dims + (rows, cols)
name = "%s_%s_full_%s_static_%s" % (dtype.__name__,
"_".join(map(str, shape)),
full_matrices,
use_static_shape)
_AddTest(QrOpTest, "Qr", name,
_GetQrOpTest(dtype, shape, full_matrices,
use_static_shape))
# TODO(pfau): Get working with full_matrices when rows > cols
# TODO(pfau): Get working with shapeholders (dynamic shapes)
for full_matrices in False, True:
for dtype in np.float32, np.float64, np.complex64, np.complex128:
for rows in 1, 2, 5, 10:
for cols in 1, 2, 5, 10:
if rows <= cols or (not full_matrices and rows > cols):
for batch_dims in [(), (3,)] + [(3, 2)] * (max(rows, cols) < 10):
shape = batch_dims + (rows, cols)
name = "%s_%s_full_%s" % (dtype.__name__,
"_".join(map(str, shape)),
full_matrices)
_AddTest(QrGradOpTest, "QrGrad", name,
_GetQrGradOpTest(dtype, shape, full_matrices))
test.main()