STT-tensorflow/tensorflow/python/lib/io/file_io_test.py
A. Unique TensorFlower 0bf070c54d Reacquire the GIL when handing errors in the read method of file_io's BufferedInputStream.
PiperOrigin-RevId: 316120310
Change-Id: I905e2c8652b977c154618c2e4d870dd2925a7234
2020-06-12 09:49:13 -07:00

650 lines
25 KiB
Python

# This Python file uses the following encoding: utf-8
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Testing File IO operations in file_io.py."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os.path
import numpy as np
from tensorflow.python.framework import errors
from tensorflow.python.lib.io import file_io
from tensorflow.python.platform import gfile
from tensorflow.python.platform import test
class FileIoTest(test.TestCase):
def setUp(self):
self._base_dir = os.path.join(self.get_temp_dir(), "base_dir")
file_io.create_dir(self._base_dir)
def tearDown(self):
file_io.delete_recursively(self._base_dir)
def testEmptyFilename(self):
f = file_io.FileIO("", mode="r")
with self.assertRaises(errors.NotFoundError):
_ = f.read()
def testFileDoesntExist(self):
file_path = os.path.join(self._base_dir, "temp_file")
self.assertFalse(file_io.file_exists(file_path))
with self.assertRaises(errors.NotFoundError):
_ = file_io.read_file_to_string(file_path)
def testWriteToString(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.write_string_to_file(file_path, "testing")
self.assertTrue(file_io.file_exists(file_path))
file_contents = file_io.read_file_to_string(file_path)
self.assertEqual("testing", file_contents)
def testAtomicWriteStringToFile(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.atomic_write_string_to_file(file_path, "testing")
self.assertTrue(file_io.file_exists(file_path))
file_contents = file_io.read_file_to_string(file_path)
self.assertEqual("testing", file_contents)
def testAtomicWriteStringToFileOverwriteFalse(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.atomic_write_string_to_file(file_path, "old", overwrite=False)
with self.assertRaises(errors.AlreadyExistsError):
file_io.atomic_write_string_to_file(file_path, "new", overwrite=False)
file_contents = file_io.read_file_to_string(file_path)
self.assertEqual("old", file_contents)
file_io.delete_file(file_path)
file_io.atomic_write_string_to_file(file_path, "new", overwrite=False)
file_contents = file_io.read_file_to_string(file_path)
self.assertEqual("new", file_contents)
def testReadBinaryMode(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.write_string_to_file(file_path, "testing")
with file_io.FileIO(file_path, mode="rb") as f:
self.assertEqual(b"testing", f.read())
def testWriteBinaryMode(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, "wb").write("testing")
with file_io.FileIO(file_path, mode="r") as f:
self.assertEqual("testing", f.read())
def testAppend(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="w") as f:
f.write("begin\n")
with file_io.FileIO(file_path, mode="a") as f:
f.write("a1\n")
with file_io.FileIO(file_path, mode="a") as f:
f.write("a2\n")
with file_io.FileIO(file_path, mode="r") as f:
file_contents = f.read()
self.assertEqual("begin\na1\na2\n", file_contents)
def testMultipleFiles(self):
file_prefix = os.path.join(self._base_dir, "temp_file")
for i in range(5000):
f = file_io.FileIO(file_prefix + str(i), mode="w+")
f.write("testing")
f.flush()
self.assertEqual("testing", f.read())
f.close()
def testMultipleWrites(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="w") as f:
f.write("line1\n")
f.write("line2")
file_contents = file_io.read_file_to_string(file_path)
self.assertEqual("line1\nline2", file_contents)
def testFileWriteBadMode(self):
file_path = os.path.join(self._base_dir, "temp_file")
with self.assertRaises(errors.PermissionDeniedError):
file_io.FileIO(file_path, mode="r").write("testing")
def testFileReadBadMode(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
self.assertTrue(file_io.file_exists(file_path))
with self.assertRaises(errors.PermissionDeniedError):
file_io.FileIO(file_path, mode="w").read()
def testFileDelete(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
file_io.delete_file(file_path)
self.assertFalse(file_io.file_exists(file_path))
def testFileDeleteFail(self):
file_path = os.path.join(self._base_dir, "temp_file")
with self.assertRaises(errors.NotFoundError):
file_io.delete_file(file_path)
def testGetMatchingFiles(self):
dir_path = os.path.join(self._base_dir, "temp_dir")
file_io.create_dir(dir_path)
files = ["file1.txt", "file2.txt", "file3.txt", "file*.txt"]
for name in files:
file_path = os.path.join(dir_path, name)
file_io.FileIO(file_path, mode="w").write("testing")
expected_match = [os.path.join(dir_path, name) for name in files]
self.assertItemsEqual(
file_io.get_matching_files(os.path.join(dir_path, "file*.txt")),
expected_match)
self.assertItemsEqual(file_io.get_matching_files(tuple()), [])
files_subset = [
os.path.join(dir_path, files[0]), os.path.join(dir_path, files[2])
]
self.assertItemsEqual(
file_io.get_matching_files(files_subset), files_subset)
file_io.delete_recursively(dir_path)
self.assertFalse(file_io.file_exists(os.path.join(dir_path, "file3.txt")))
def testGetMatchingFilesWhenParentDirContainsParantheses(self):
dir_path = os.path.join(self._base_dir, "dir_(special)")
file_io.create_dir(dir_path)
files = ["file1.txt", "file(2).txt"]
for name in files:
file_path = os.path.join(dir_path, name)
file_io.FileIO(file_path, mode="w").write("testing")
expected_match = [os.path.join(dir_path, name) for name in files]
glob_pattern = os.path.join(dir_path, "*")
self.assertItemsEqual(
file_io.get_matching_files(glob_pattern), expected_match)
def testCreateRecursiveDir(self):
dir_path = os.path.join(self._base_dir, "temp_dir/temp_dir1/temp_dir2")
file_io.recursive_create_dir(dir_path)
file_io.recursive_create_dir(dir_path) # repeat creation
file_path = os.path.join(dir_path, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
self.assertTrue(file_io.file_exists(file_path))
file_io.delete_recursively(os.path.join(self._base_dir, "temp_dir"))
self.assertFalse(file_io.file_exists(file_path))
def testCopy(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
copy_path = os.path.join(self._base_dir, "copy_file")
file_io.copy(file_path, copy_path)
self.assertTrue(file_io.file_exists(copy_path))
f = file_io.FileIO(file_path, mode="r")
self.assertEqual("testing", f.read())
self.assertEqual(7, f.tell())
def testCopyOverwrite(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
copy_path = os.path.join(self._base_dir, "copy_file")
file_io.FileIO(copy_path, mode="w").write("copy")
file_io.copy(file_path, copy_path, overwrite=True)
self.assertTrue(file_io.file_exists(copy_path))
self.assertEqual("testing", file_io.FileIO(file_path, mode="r").read())
def testCopyOverwriteFalse(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
copy_path = os.path.join(self._base_dir, "copy_file")
file_io.FileIO(copy_path, mode="w").write("copy")
with self.assertRaises(errors.AlreadyExistsError):
file_io.copy(file_path, copy_path, overwrite=False)
def testRename(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
rename_path = os.path.join(self._base_dir, "rename_file")
file_io.rename(file_path, rename_path)
self.assertTrue(file_io.file_exists(rename_path))
self.assertFalse(file_io.file_exists(file_path))
def testRenameOverwrite(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
rename_path = os.path.join(self._base_dir, "rename_file")
file_io.FileIO(rename_path, mode="w").write("rename")
file_io.rename(file_path, rename_path, overwrite=True)
self.assertTrue(file_io.file_exists(rename_path))
self.assertFalse(file_io.file_exists(file_path))
def testRenameOverwriteFalse(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
rename_path = os.path.join(self._base_dir, "rename_file")
file_io.FileIO(rename_path, mode="w").write("rename")
with self.assertRaises(errors.AlreadyExistsError):
file_io.rename(file_path, rename_path, overwrite=False)
self.assertTrue(file_io.file_exists(rename_path))
self.assertTrue(file_io.file_exists(file_path))
def testDeleteRecursivelyFail(self):
fake_dir_path = os.path.join(self._base_dir, "temp_dir")
with self.assertRaises(errors.NotFoundError):
file_io.delete_recursively(fake_dir_path)
def testIsDirectory(self):
dir_path = os.path.join(self._base_dir, "test_dir")
# Failure for a non-existing dir.
self.assertFalse(file_io.is_directory(dir_path))
file_io.create_dir(dir_path)
self.assertTrue(file_io.is_directory(dir_path))
file_path = os.path.join(dir_path, "test_file")
file_io.FileIO(file_path, mode="w").write("test")
# False for a file.
self.assertFalse(file_io.is_directory(file_path))
# Test that the value returned from `stat()` has `is_directory` set.
file_statistics = file_io.stat(dir_path)
self.assertTrue(file_statistics.is_directory)
def testListDirectory(self):
dir_path = os.path.join(self._base_dir, "test_dir")
file_io.create_dir(dir_path)
files = ["file1.txt", "file2.txt", "file3.txt"]
for name in files:
file_path = os.path.join(dir_path, name)
file_io.FileIO(file_path, mode="w").write("testing")
subdir_path = os.path.join(dir_path, "sub_dir")
file_io.create_dir(subdir_path)
subdir_file_path = os.path.join(subdir_path, "file4.txt")
file_io.FileIO(subdir_file_path, mode="w").write("testing")
dir_list = file_io.list_directory(dir_path)
self.assertItemsEqual(files + ["sub_dir"], dir_list)
def testListDirectoryFailure(self):
dir_path = os.path.join(self._base_dir, "test_dir")
with self.assertRaises(errors.NotFoundError):
file_io.list_directory(dir_path)
def _setupWalkDirectories(self, dir_path):
# Creating a file structure as follows
# test_dir -> file: file1.txt; dirs: subdir1_1, subdir1_2, subdir1_3
# subdir1_1 -> file: file3.txt
# subdir1_2 -> dir: subdir2
file_io.create_dir(dir_path)
file_io.FileIO(
os.path.join(dir_path, "file1.txt"), mode="w").write("testing")
sub_dirs1 = ["subdir1_1", "subdir1_2", "subdir1_3"]
for name in sub_dirs1:
file_io.create_dir(os.path.join(dir_path, name))
file_io.FileIO(
os.path.join(dir_path, "subdir1_1/file2.txt"),
mode="w").write("testing")
file_io.create_dir(os.path.join(dir_path, "subdir1_2/subdir2"))
def testWalkInOrder(self):
dir_path = os.path.join(self._base_dir, "test_dir")
self._setupWalkDirectories(dir_path)
# Now test the walk (in_order = True)
all_dirs = []
all_subdirs = []
all_files = []
for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=True):
all_dirs.append(w_dir)
all_subdirs.append(w_subdirs)
all_files.append(w_files)
self.assertItemsEqual(all_dirs, [dir_path] + [
os.path.join(dir_path, item)
for item in
["subdir1_1", "subdir1_2", "subdir1_2/subdir2", "subdir1_3"]
])
self.assertEqual(dir_path, all_dirs[0])
self.assertLess(
all_dirs.index(os.path.join(dir_path, "subdir1_2")),
all_dirs.index(os.path.join(dir_path, "subdir1_2/subdir2")))
self.assertItemsEqual(all_subdirs[1:5], [[], ["subdir2"], [], []])
self.assertItemsEqual(all_subdirs[0],
["subdir1_1", "subdir1_2", "subdir1_3"])
self.assertItemsEqual(all_files, [["file1.txt"], ["file2.txt"], [], [], []])
self.assertLess(
all_files.index(["file1.txt"]), all_files.index(["file2.txt"]))
def testWalkPostOrder(self):
dir_path = os.path.join(self._base_dir, "test_dir")
self._setupWalkDirectories(dir_path)
# Now test the walk (in_order = False)
all_dirs = []
all_subdirs = []
all_files = []
for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False):
all_dirs.append(w_dir)
all_subdirs.append(w_subdirs)
all_files.append(w_files)
self.assertItemsEqual(all_dirs, [
os.path.join(dir_path, item)
for item in
["subdir1_1", "subdir1_2/subdir2", "subdir1_2", "subdir1_3"]
] + [dir_path])
self.assertEqual(dir_path, all_dirs[4])
self.assertLess(
all_dirs.index(os.path.join(dir_path, "subdir1_2/subdir2")),
all_dirs.index(os.path.join(dir_path, "subdir1_2")))
self.assertItemsEqual(all_subdirs[0:4], [[], [], ["subdir2"], []])
self.assertItemsEqual(all_subdirs[4],
["subdir1_1", "subdir1_2", "subdir1_3"])
self.assertItemsEqual(all_files, [["file2.txt"], [], [], [], ["file1.txt"]])
self.assertLess(
all_files.index(["file2.txt"]), all_files.index(["file1.txt"]))
def testWalkFailure(self):
dir_path = os.path.join(self._base_dir, "test_dir")
# Try walking a directory that wasn't created.
all_dirs = []
all_subdirs = []
all_files = []
for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False):
all_dirs.append(w_dir)
all_subdirs.append(w_subdirs)
all_files.append(w_files)
self.assertItemsEqual(all_dirs, [])
self.assertItemsEqual(all_subdirs, [])
self.assertItemsEqual(all_files, [])
def testStat(self):
file_path = os.path.join(self._base_dir, "temp_file")
file_io.FileIO(file_path, mode="w").write("testing")
file_statistics = file_io.stat(file_path)
os_statistics = os.stat(file_path)
self.assertEqual(7, file_statistics.length)
self.assertEqual(
int(os_statistics.st_mtime), int(file_statistics.mtime_nsec / 1e9))
self.assertFalse(file_statistics.is_directory)
def testReadLine(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="r+") as f:
f.write("testing1\ntesting2\ntesting3\n\ntesting5")
self.assertEqual(36, f.size())
self.assertEqual("testing1\n", f.readline())
self.assertEqual("testing2\n", f.readline())
self.assertEqual("testing3\n", f.readline())
self.assertEqual("\n", f.readline())
self.assertEqual("testing5", f.readline())
self.assertEqual("", f.readline())
def testRead(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="r+") as f:
f.write("testing1\ntesting2\ntesting3\n\ntesting5")
self.assertEqual(36, f.size())
self.assertEqual("testing1\n", f.read(9))
self.assertEqual("testing2\n", f.read(9))
self.assertEqual("t", f.read(1))
self.assertEqual("esting3\n\ntesting5", f.read())
def testReadErrorReacquiresGil(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="r+") as f:
f.write("testing1\ntesting2\ntesting3\n\ntesting5")
with self.assertRaises(errors.InvalidArgumentError):
# At present, this is sufficient to convince ourselves that the change
# fixes the problem. That is, this test will seg fault without the change,
# and pass with it. Unfortunately, this is brittle, as it relies on the
# Python layer to pass the argument along to the wrapped C++ without
# checking the argument itself.
f.read(-2)
def testTell(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="r+") as f:
f.write("testing1\ntesting2\ntesting3\n\ntesting5")
self.assertEqual(0, f.tell())
self.assertEqual("testing1\n", f.readline())
self.assertEqual(9, f.tell())
self.assertEqual("testing2\n", f.readline())
self.assertEqual(18, f.tell())
self.assertEqual("testing3\n", f.readline())
self.assertEqual(27, f.tell())
self.assertEqual("\n", f.readline())
self.assertEqual(28, f.tell())
self.assertEqual("testing5", f.readline())
self.assertEqual(36, f.tell())
self.assertEqual("", f.readline())
self.assertEqual(36, f.tell())
def testSeek(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="r+") as f:
f.write("testing1\ntesting2\ntesting3\n\ntesting5")
self.assertEqual("testing1\n", f.readline())
self.assertEqual(9, f.tell())
# Seek to 18
f.seek(18)
self.assertEqual(18, f.tell())
self.assertEqual("testing3\n", f.readline())
# Seek back to 9
f.seek(9)
self.assertEqual(9, f.tell())
self.assertEqual("testing2\n", f.readline())
f.seek(0)
self.assertEqual(0, f.tell())
self.assertEqual("testing1\n", f.readline())
with self.assertRaises(errors.InvalidArgumentError):
f.seek(-1)
with self.assertRaises(TypeError):
f.seek()
# TODO(jhseu): Delete after position deprecation.
with self.assertRaises(TypeError):
f.seek(offset=0, position=0)
f.seek(position=9)
self.assertEqual(9, f.tell())
self.assertEqual("testing2\n", f.readline())
def testSeekFromWhat(self):
file_path = os.path.join(self._base_dir, "temp_file")
with file_io.FileIO(file_path, mode="r+") as f:
f.write("testing1\ntesting2\ntesting3\n\ntesting5")
self.assertEqual("testing1\n", f.readline())
self.assertEqual(9, f.tell())
# Seek to 18
f.seek(9, 1)
self.assertEqual(18, f.tell())
self.assertEqual("testing3\n", f.readline())
# Seek back to 9
f.seek(9, 0)
self.assertEqual(9, f.tell())
self.assertEqual("testing2\n", f.readline())
f.seek(-f.size(), 2)
self.assertEqual(0, f.tell())
self.assertEqual("testing1\n", f.readline())
with self.assertRaises(errors.InvalidArgumentError):
f.seek(0, 3)
def testReadingIterator(self):
file_path = os.path.join(self._base_dir, "temp_file")
data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"]
with file_io.FileIO(file_path, mode="r+") as f:
f.write("".join(data))
actual_data = []
for line in f:
actual_data.append(line)
self.assertSequenceEqual(actual_data, data)
def testReadlines(self):
file_path = os.path.join(self._base_dir, "temp_file")
data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"]
f = file_io.FileIO(file_path, mode="r+")
f.write("".join(data))
f.flush()
lines = f.readlines()
self.assertSequenceEqual(lines, data)
def testUTF8StringPath(self):
file_path = os.path.join(self._base_dir, "UTF8测试_file")
file_io.write_string_to_file(file_path, "testing")
with file_io.FileIO(file_path, mode="rb") as f:
self.assertEqual(b"testing", f.read())
def testEof(self):
"""Test that reading past EOF does not raise an exception."""
file_path = os.path.join(self._base_dir, "temp_file")
f = file_io.FileIO(file_path, mode="r+")
content = "testing"
f.write(content)
f.flush()
self.assertEqual(content, f.read(len(content) + 1))
def testUTF8StringPathExists(self):
file_path = os.path.join(self._base_dir, "UTF8测试_file_exist")
file_io.write_string_to_file(file_path, "testing")
v = file_io.file_exists(file_path)
self.assertEqual(v, True)
def testFilecmp(self):
file1 = os.path.join(self._base_dir, "file1")
file_io.write_string_to_file(file1, "This is a sentence\n" * 100)
file2 = os.path.join(self._base_dir, "file2")
file_io.write_string_to_file(file2, "This is another sentence\n" * 100)
file3 = os.path.join(self._base_dir, "file3")
file_io.write_string_to_file(file3, u"This is another sentence\n" * 100)
self.assertFalse(file_io.filecmp(file1, file2))
self.assertTrue(file_io.filecmp(file2, file3))
def testFilecmpSameSize(self):
file1 = os.path.join(self._base_dir, "file1")
file_io.write_string_to_file(file1, "This is a sentence\n" * 100)
file2 = os.path.join(self._base_dir, "file2")
file_io.write_string_to_file(file2, "This is b sentence\n" * 100)
file3 = os.path.join(self._base_dir, "file3")
file_io.write_string_to_file(file3, u"This is b sentence\n" * 100)
self.assertFalse(file_io.filecmp(file1, file2))
self.assertTrue(file_io.filecmp(file2, file3))
def testFilecmpBinary(self):
file1 = os.path.join(self._base_dir, "file1")
file_io.FileIO(file1, "wb").write("testing\n\na")
file2 = os.path.join(self._base_dir, "file2")
file_io.FileIO(file2, "wb").write("testing\n\nb")
file3 = os.path.join(self._base_dir, "file3")
file_io.FileIO(file3, "wb").write("testing\n\nb")
file4 = os.path.join(self._base_dir, "file4")
file_io.FileIO(file4, "wb").write("testing\n\ntesting")
self.assertFalse(file_io.filecmp(file1, file2))
self.assertFalse(file_io.filecmp(file1, file4))
self.assertTrue(file_io.filecmp(file2, file3))
def testFileCrc32(self):
file1 = os.path.join(self._base_dir, "file1")
file_io.write_string_to_file(file1, "This is a sentence\n" * 100)
crc1 = file_io.file_crc32(file1)
file2 = os.path.join(self._base_dir, "file2")
file_io.write_string_to_file(file2, "This is another sentence\n" * 100)
crc2 = file_io.file_crc32(file2)
file3 = os.path.join(self._base_dir, "file3")
file_io.write_string_to_file(file3, "This is another sentence\n" * 100)
crc3 = file_io.file_crc32(file3)
self.assertTrue(crc1 != crc2)
self.assertEqual(crc2, crc3)
def testFileCrc32WithBytes(self):
file1 = os.path.join(self._base_dir, "file1")
file_io.write_string_to_file(file1, "This is a sentence\n" * 100)
crc1 = file_io.file_crc32(file1, block_size=24)
file2 = os.path.join(self._base_dir, "file2")
file_io.write_string_to_file(file2, "This is another sentence\n" * 100)
crc2 = file_io.file_crc32(file2, block_size=24)
file3 = os.path.join(self._base_dir, "file3")
file_io.write_string_to_file(file3, "This is another sentence\n" * 100)
crc3 = file_io.file_crc32(file3, block_size=-1)
self.assertTrue(crc1 != crc2)
self.assertEqual(crc2, crc3)
def testFileCrc32Binary(self):
file1 = os.path.join(self._base_dir, "file1")
file_io.FileIO(file1, "wb").write("testing\n\n")
crc1 = file_io.file_crc32(file1)
file2 = os.path.join(self._base_dir, "file2")
file_io.FileIO(file2, "wb").write("testing\n\n\n")
crc2 = file_io.file_crc32(file2)
file3 = os.path.join(self._base_dir, "file3")
file_io.FileIO(file3, "wb").write("testing\n\n\n")
crc3 = file_io.file_crc32(file3)
self.assertTrue(crc1 != crc2)
self.assertEqual(crc2, crc3)
def testMatchingFilesPermission(self):
# Create top level directory test_dir.
dir_path = os.path.join(self._base_dir, "test_dir")
file_io.create_dir(dir_path)
# Create second level directories `noread` and `any`.
noread_path = os.path.join(dir_path, "noread")
file_io.create_dir(noread_path)
any_path = os.path.join(dir_path, "any")
file_io.create_dir(any_path)
files = ["file1.txt", "file2.txt", "file3.txt"]
for name in files:
file_path = os.path.join(any_path, name)
file_io.FileIO(file_path, mode="w").write("testing")
file_path = os.path.join(noread_path, "file4.txt")
file_io.FileIO(file_path, mode="w").write("testing")
# Change noread to noread access.
os.chmod(noread_path, 0)
expected_match = [os.path.join(any_path, name) for name in files]
self.assertItemsEqual(
file_io.get_matching_files(os.path.join(dir_path, "*", "file*.txt")),
expected_match)
# Change noread back so that it could be cleaned during tearDown.
os.chmod(noread_path, 0o777)
def testFileSeekableWithZip(self):
# Note: Test case for GitHub issue 27276, issue only exposed in python 3.7+.
filename = os.path.join(self._base_dir, "a.npz")
np.savez_compressed(filename, {"a": 1, "b": 2})
with gfile.GFile(filename, "rb") as f:
info = np.load(f, allow_pickle=True)
_ = [i for i in info.items()]
def testHasAtomicMove(self):
self.assertTrue(file_io.has_atomic_move("/a/b/c"))
if __name__ == "__main__":
test.main()