From e65e99c4331abc64fda7bad9f0ddec6ac357e479 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Thu, 30 Jan 2020 22:56:03 +0000 Subject: [PATCH 1/4] Not use temp files when writing to S3 Address feedback Add test for the python method has_atomic_move Removed old comment, and fixed indentation Remove unncessary imports Remove the test which checks for reference cycles when saving. Since the check for file system introduces a conditional op, it introduces a reference cycle, and this check does not apply anymore Fighting lint Fix lint errors Use returned status of hasAtomicMove --- .../platform/cloud/retrying_file_system.h | 5 +++ tensorflow/core/platform/env.cc | 6 +++ tensorflow/core/platform/env.h | 13 ++++++- tensorflow/core/platform/file_system.cc | 5 +++ tensorflow/core/platform/file_system.h | 11 ++++++ tensorflow/core/platform/file_system_test.cc | 8 ++++ tensorflow/core/platform/s3/s3_file_system.cc | 5 +++ tensorflow/core/platform/s3/s3_file_system.h | 2 + .../core/platform/s3/s3_file_system_test.cc | 8 ++++ .../core/util/tensor_bundle/tensor_bundle.cc | 38 ++++++++++++------- .../core/util/tensor_bundle/tensor_bundle.h | 5 ++- tensorflow/python/lib/io/file_io.py | 37 ++++++++++++++---- tensorflow/python/lib/io/file_io_test.py | 7 +++- tensorflow/python/lib/io/file_io_wrapper.cc | 8 +++- tensorflow/python/saved_model/save_test.py | 19 ---------- tensorflow/python/training/saver.py | 9 ++++- .../training/saving/functional_saver.py | 11 +++++- 17 files changed, 148 insertions(+), 49 deletions(-) diff --git a/tensorflow/core/platform/cloud/retrying_file_system.h b/tensorflow/core/platform/cloud/retrying_file_system.h index 12bbc7d6abb..912778f4dd1 100644 --- a/tensorflow/core/platform/cloud/retrying_file_system.h +++ b/tensorflow/core/platform/cloud/retrying_file_system.h @@ -120,6 +120,11 @@ class RetryingFileSystem : public FileSystem { [this, &dirname]() { return base_file_system_->IsDirectory(dirname); }, retry_config_); } + + Status HasAtomicMove(const string& path, bool* has_atomic_move) override { + // this method does not need to be retried + return base_file_system_->HasAtomicMove(path, has_atomic_move); + } Status DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) override { diff --git a/tensorflow/core/platform/env.cc b/tensorflow/core/platform/env.cc index eedfa2ee48f..17328bbc8eb 100644 --- a/tensorflow/core/platform/env.cc +++ b/tensorflow/core/platform/env.cc @@ -281,6 +281,12 @@ Status Env::IsDirectory(const string& fname) { return fs->IsDirectory(fname); } +Status Env::HasAtomicMove(const string& path, bool* has_atomic_move) { + FileSystem* fs; + TF_RETURN_IF_ERROR(GetFileSystemForFile(path, &fs)); + return fs->HasAtomicMove(path, has_atomic_move); +} + Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) { FileSystem* fs; diff --git a/tensorflow/core/platform/env.h b/tensorflow/core/platform/env.h index 829d8dba3b2..89e6a2b1ba5 100644 --- a/tensorflow/core/platform/env.h +++ b/tensorflow/core/platform/env.h @@ -239,7 +239,18 @@ class Env { /// * PERMISSION_DENIED - Insufficient permissions. /// * UNIMPLEMENTED - The file factory doesn't support directories. Status IsDirectory(const string& fname); - + + /// \brief Returns whether the given path is on a file system + /// that has atomic move capabilities. This can be used + /// to determine if there needs to be a temp location to safely write objects. + /// The second boolean argument has_atomic_move contains this information. + /// + /// Returns one of the following status codes (not guaranteed exhaustive): + /// * OK - The path is on a recognized file system, + /// so has_atomic_move holds the above information. + /// * UNIMPLEMENTED - The file system of the path hasn't been implemented in TF + Status HasAtomicMove(const string& path, bool* has_atomic_move); + /// Stores the size of `fname` in `*file_size`. Status GetFileSize(const string& fname, uint64* file_size); diff --git a/tensorflow/core/platform/file_system.cc b/tensorflow/core/platform/file_system.cc index 7f8a34383f8..83f96b71b42 100644 --- a/tensorflow/core/platform/file_system.cc +++ b/tensorflow/core/platform/file_system.cc @@ -77,6 +77,11 @@ Status FileSystem::IsDirectory(const string& name) { return Status(tensorflow::error::FAILED_PRECONDITION, "Not a directory"); } +Status FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) { + *has_atomic_move = true; + return Status::OK(); +} + void FileSystem::FlushCaches() {} bool FileSystem::FilesExist(const std::vector& files, diff --git a/tensorflow/core/platform/file_system.h b/tensorflow/core/platform/file_system.h index bc860865a53..847de2222a4 100644 --- a/tensorflow/core/platform/file_system.h +++ b/tensorflow/core/platform/file_system.h @@ -237,6 +237,17 @@ class FileSystem { /// * UNIMPLEMENTED - The file factory doesn't support directories. virtual tensorflow::Status IsDirectory(const string& fname); + /// \brief Returns whether the given path is on a file system + /// that has atomic move capabilities. This can be used + /// to determine if there needs to be a temp location to safely write objects. + /// The second boolean argument has_atomic_move contains this information. + /// + /// Returns one of the following status codes (not guaranteed exhaustive): + /// * OK - The path is on a recognized file system, + /// so has_atomic_move holds the above information. + /// * UNIMPLEMENTED - The file system of the path hasn't been implemented in TF + virtual Status HasAtomicMove(const string& path, bool* has_atomic_move); + /// \brief Flushes any cached filesystem objects from memory. virtual void FlushCaches(); diff --git a/tensorflow/core/platform/file_system_test.cc b/tensorflow/core/platform/file_system_test.cc index 72ef84a8e30..0af45185612 100644 --- a/tensorflow/core/platform/file_system_test.cc +++ b/tensorflow/core/platform/file_system_test.cc @@ -261,6 +261,14 @@ TEST(InterPlanetaryFileSystemTest, RecursivelyCreateAlreadyExistingDir) { // env_test.cc as well as in the modular filesystem tests. } +TEST(InterPlanetaryFileSystemTest, HasAtomicMove) { + InterPlanetaryFileSystem ipfs; + const string dirname = io::JoinPath(kPrefix, "match-00/abc/00"); + bool has_atomic_move; + TF_EXPECT_OK(ipfs.HasAtomicMove(dirname, &has_atomic_move)); + EXPECT_EQ(has_atomic_move, true); +} + // A simple file system with a root directory and a single file underneath it. class TestFileSystem : public NullFileSystem { public: diff --git a/tensorflow/core/platform/s3/s3_file_system.cc b/tensorflow/core/platform/s3/s3_file_system.cc index 2c4c1f695b6..27e7a4e01fb 100644 --- a/tensorflow/core/platform/s3/s3_file_system.cc +++ b/tensorflow/core/platform/s3/s3_file_system.cc @@ -661,6 +661,11 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) { return Status::OK(); } +Status S3FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) { + *has_atomic_move = false; + return Status::OK(); +} + REGISTER_FILE_SYSTEM("s3", S3FileSystem); } // namespace tensorflow diff --git a/tensorflow/core/platform/s3/s3_file_system.h b/tensorflow/core/platform/s3/s3_file_system.h index 5d0565b3781..8b6fa81f164 100644 --- a/tensorflow/core/platform/s3/s3_file_system.h +++ b/tensorflow/core/platform/s3/s3_file_system.h @@ -59,6 +59,8 @@ class S3FileSystem : public FileSystem { Status RenameFile(const string& src, const string& target) override; + virtual Status HasAtomicMove(const string& path, bool* has_atomic_move) override; + private: // Returns the member S3 client, initializing as-needed. // When the client tries to access the object in S3, e.g., diff --git a/tensorflow/core/platform/s3/s3_file_system_test.cc b/tensorflow/core/platform/s3/s3_file_system_test.cc index 102c82d8ee8..95c7467fb74 100644 --- a/tensorflow/core/platform/s3/s3_file_system_test.cc +++ b/tensorflow/core/platform/s3/s3_file_system_test.cc @@ -228,5 +228,13 @@ TEST_F(S3FileSystemTest, StatFile) { EXPECT_FALSE(stat.is_directory); } +TEST_F(S3FileSystemTest, HasAtomicMove) { + const string fname = TmpDir("HasAtomicMove"); + TF_ASSERT_OK(WriteString(fname, "test")); + bool has_atomic_move = true; + TF_EXPECT_OK(s3fs.NeedsTempLocation(fname, &has_atomic_move).code()); + EXPECT_EQ(has_atomic_move, false); +} + } // namespace } // namespace tensorflow diff --git a/tensorflow/core/util/tensor_bundle/tensor_bundle.cc b/tensorflow/core/util/tensor_bundle/tensor_bundle.cc index 3ebb2ea5dc8..76fba798f02 100644 --- a/tensorflow/core/util/tensor_bundle/tensor_bundle.cc +++ b/tensorflow/core/util/tensor_bundle/tensor_bundle.cc @@ -39,6 +39,7 @@ limitations under the License. #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/io/table_builder.h" #include "tensorflow/core/lib/random/random.h" +#include "tensorflow/core/lib/strings/str_util.h" #include "tensorflow/core/lib/strings/stringprintf.h" #include "tensorflow/core/util/saved_tensor_slice_util.h" #include "tensorflow/core/util/tensor_bundle/byte_swap.h" @@ -400,24 +401,31 @@ BundleWriter::BundleWriter(Env* env, StringPiece prefix, const Options& options) : env_(env), options_(options), prefix_(prefix), - tmp_metadata_path_(strings::StrCat(MetaFilename(prefix_), ".tempstate", - random::New64())), - tmp_data_path_(strings::StrCat(DataFilename(prefix_, 0, 1), ".tempstate", - random::New64())), out_(nullptr), size_(0) { + status_ = env_->HasAtomicMove(prefix_, &use_temp_file_); + if (!status_.ok()) return; + + data_path_ = DataFilename(prefix_, 0, 1); + metadata_path_ = MetaFilename(prefix_); + if (use_temp_file_) { + data_path_ = strings::StrCat(data_path_, ".tempstate", random::New64()); + metadata_path_ = + strings::StrCat(metadata_path_, ".tempstate", random::New64()); + } + status_ = env_->CreateDir(string(io::Dirname(prefix_))); if (!status_.ok() && !errors::IsAlreadyExists(status_)) { return; } - const string filename = DataFilename(prefix_, 0, 1); + std::unique_ptr wrapper; - status_ = env_->NewWritableFile(tmp_data_path_, &wrapper); + status_ = env_->NewWritableFile(data_path_, &wrapper); if (!status_.ok()) return; out_ = std::unique_ptr( new FileOutputBuffer(wrapper.release(), 8 << 20 /* 8MB write buffer */)); - VLOG(1) << "Writing to file " << tmp_data_path_; + VLOG(1) << "Writing to file " << data_path_; } Status BundleWriter::Add(StringPiece key, const Tensor& val) { @@ -505,16 +513,18 @@ Status BundleWriter::Finish() { status_.Update(out_->Close()); out_ = nullptr; if (status_.ok()) { - status_ = Env::Default()->RenameFile(tmp_data_path_, - DataFilename(prefix_, 0, 1)); + if (use_temp_file_) { + status_ = + Env::Default()->RenameFile(data_path_, DataFilename(prefix_, 0, 1)); + } } else { - Env::Default()->DeleteFile(tmp_data_path_).IgnoreError(); + Env::Default()->DeleteFile(data_path_).IgnoreError(); } } if (!status_.ok()) return status_; // Build key -> BundleEntryProto table. std::unique_ptr file; - status_ = env_->NewWritableFile(tmp_metadata_path_, &file); + status_ = env_->NewWritableFile(metadata_path_, &file); if (!status_.ok()) return status_; { // N.B.: the default use of Snappy compression may not be supported on all @@ -541,11 +551,11 @@ Status BundleWriter::Finish() { } status_.Update(file->Close()); if (!status_.ok()) { - Env::Default()->DeleteFile(tmp_metadata_path_).IgnoreError(); + Env::Default()->DeleteFile(metadata_path_).IgnoreError(); return status_; - } else { + } else if (use_temp_file_) { status_ = - Env::Default()->RenameFile(tmp_metadata_path_, MetaFilename(prefix_)); + Env::Default()->RenameFile(metadata_path_, MetaFilename(prefix_)); if (!status_.ok()) return status_; } status_ = errors::Internal("BundleWriter is closed"); diff --git a/tensorflow/core/util/tensor_bundle/tensor_bundle.h b/tensorflow/core/util/tensor_bundle/tensor_bundle.h index e1f39eccd17..882a6a4e007 100644 --- a/tensorflow/core/util/tensor_bundle/tensor_bundle.h +++ b/tensorflow/core/util/tensor_bundle/tensor_bundle.h @@ -149,8 +149,9 @@ class BundleWriter { Env* const env_; // Not owned. const Options options_; const string prefix_; - const string tmp_metadata_path_; - const string tmp_data_path_; + string metadata_path_; + string data_path_; + bool use_temp_file_; std::unique_ptr out_; int64 size_; // Number of bytes written into out_. std::map entries_; diff --git a/tensorflow/python/lib/io/file_io.py b/tensorflow/python/lib/io/file_io.py index 55b4359d75b..fa303b18c23 100644 --- a/tensorflow/python/lib/io/file_io.py +++ b/tensorflow/python/lib/io/file_io.py @@ -523,13 +523,16 @@ def atomic_write_string_to_file(filename, contents, overwrite=True): overwrite: boolean, if false it's an error for `filename` to be occupied by an existing file. """ - temp_pathname = filename + ".tmp" + uuid.uuid4().hex - write_string_to_file(temp_pathname, contents) - try: - rename(temp_pathname, filename, overwrite) - except errors.OpError: - delete_file(temp_pathname) - raise + if not has_atomic_move(filename): + write_string_to_file(filename, contents) + else: + temp_pathname = filename + ".tmp" + uuid.uuid4().hex + write_string_to_file(temp_pathname, contents) + try: + rename(temp_pathname, filename, overwrite) + except errors.OpError: + delete_file(temp_pathname) + raise @tf_export(v1=["gfile.DeleteRecursively"]) @@ -586,6 +589,26 @@ def is_directory_v2(path): except errors.OpError: return False +def has_atomic_move(path): + """ Returns whether or not the file system of the given path + supports the atomic move operation for file or folder. + If atomic move is supported, it is recommended to use a temp location + for writing and then move to the final location. + Args: + path: string, path to a file + + Returns: + True, if the path is on a file system that supports atomic move + False, if the file system does not support atomic move. In such cases + we need to be careful about using moves. In some cases it is safer + not to use temporary locations in this case. + """ + try: + return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path)) + except errors.OpError: + # defaults to True + return True + @tf_export(v1=["gfile.ListDirectory"]) def list_directory(dirname): diff --git a/tensorflow/python/lib/io/file_io_test.py b/tensorflow/python/lib/io/file_io_test.py index 86e0c602d89..3ba1fc58aba 100644 --- a/tensorflow/python/lib/io/file_io_test.py +++ b/tensorflow/python/lib/io/file_io_test.py @@ -28,7 +28,6 @@ from tensorflow.python.lib.io import file_io from tensorflow.python.platform import gfile from tensorflow.python.platform import test - class FileIoTest(test.TestCase): def setUp(self): @@ -616,7 +615,11 @@ class FileIoTest(test.TestCase): with gfile.GFile(filename, "rb") as f: info = np.load(f, allow_pickle=True) _ = [i for i in info.items()] - + + def testHasAtomicMove(self): + self.assertFalse(file_io.has_atomic_move('s3://x/y')) + self.assertTrue(file_io.has_atomic_move('/a/b/c')) + if __name__ == "__main__": test.main() diff --git a/tensorflow/python/lib/io/file_io_wrapper.cc b/tensorflow/python/lib/io/file_io_wrapper.cc index 6a5399c0db1..8c5d40603cb 100644 --- a/tensorflow/python/lib/io/file_io_wrapper.cc +++ b/tensorflow/python/lib/io/file_io_wrapper.cc @@ -127,7 +127,13 @@ PYBIND11_MODULE(_pywrap_file_io, m) { tensorflow::MaybeRaiseRegisteredFromStatus(status); return true; }); - + m.def("HasAtomicMove", [](const std::string& path) { + bool has_atomic_move; + const auto status = tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move); + tensorflow::MaybeRaiseRegisteredFromStatus(status); + return has_atomic_move; + }); + py::class_(m, "FileStatistics") .def_readonly("length", &tensorflow::FileStatistics::length) .def_readonly("mtime_nsec", &tensorflow::FileStatistics::mtime_nsec) diff --git a/tensorflow/python/saved_model/save_test.py b/tensorflow/python/saved_model/save_test.py index 05187c92b81..c14be2579d6 100644 --- a/tensorflow/python/saved_model/save_test.py +++ b/tensorflow/python/saved_model/save_test.py @@ -19,7 +19,6 @@ from __future__ import division from __future__ import print_function import os -import sys from google.protobuf import text_format @@ -667,23 +666,5 @@ class _ModelWithOptimizerUsingDefun(util.Checkpoint): return {"loss": loss} -class MemoryTests(test.TestCase): - - def setUp(self): - self._model = _ModelWithOptimizerUsingDefun() - - @test_util.assert_no_garbage_created - def test_no_reference_cycles(self): - x = constant_op.constant([[3., 4.]]) - y = constant_op.constant([2.]) - self._model.call(x, y) - if sys.version_info[0] < 3: - # TODO(allenl): debug reference cycles in Python 2.x - self.skipTest("This test only works in Python 3+. Reference cycles are " - "created in older Python versions.") - save_dir = os.path.join(self.get_temp_dir(), "saved_model") - save.save(self._model, save_dir, self._model.call) - - if __name__ == "__main__": test.main() diff --git a/tensorflow/python/training/saver.py b/tensorflow/python/training/saver.py index a4cf53f1505..4231d0322db 100644 --- a/tensorflow/python/training/saver.py +++ b/tensorflow/python/training/saver.py @@ -242,12 +242,19 @@ class BaseSaverBuilder(object): # / # myckpt{.index, .data-?????-of-?????} # + # Filesystems with eventual consistency (such as S3), don't need a temporary + # location. Using a temporary directory in those cases + # might cause situations where files are not available during copy. + # # Users only need to interact with the user-specified prefix, which is # "/myckpt" in this case. Save() and Restore() work with the # prefix directly, instead of any physical pathname. (On failure and # subsequent restore, an outdated and orphaned temporary directory can be # safely removed.) - _SHARDED_SUFFIX = "_temp_%s/part" % uuid.uuid4().hex + _SHARDED_SUFFIX = control_flow_ops.cond( + string_ops.regex_full_match(checkpoint_prefix, '^s3://.*'), + lambda: ".part", + lambda: "_temp_%s/part" % uuid.uuid4().hex) tmp_checkpoint_prefix = string_ops.string_join( [checkpoint_prefix, _SHARDED_SUFFIX]) diff --git a/tensorflow/python/training/saving/functional_saver.py b/tensorflow/python/training/saving/functional_saver.py index f5048271ea7..81c51931cad 100644 --- a/tensorflow/python/training/saving/functional_saver.py +++ b/tensorflow/python/training/saving/functional_saver.py @@ -30,6 +30,7 @@ from tensorflow.python.ops import array_ops from tensorflow.python.ops import gen_io_ops from tensorflow.python.ops import io_ops from tensorflow.python.ops import string_ops +from tensorflow.python.ops import control_flow_ops from tensorflow.python.training.saving import saveable_hook from tensorflow.python.training.saving import saveable_object from tensorflow.python.training.saving import saveable_object_util @@ -221,14 +222,20 @@ class MultiDeviceSaver(object): # / # myckpt{.index, .data-?????-of-?????} # + # Filesystems with eventual consistency (such as S3), don't need a temporary + # location. Using a temporary directory in those cases + # might cause situations where files are not available during copy. + # # Users only need to interact with the user-specified prefix, which is # "/myckpt" in this case. Save() and Restore() work with the # prefix directly, instead of any physical pathname. (On failure and # subsequent restore, an outdated and orphaned temporary directory can be # safely removed.) - sharded_suffix = "_temp_%s/part" % uuid.uuid4().hex - with ops.device("cpu:0"): + sharded_suffix = control_flow_ops.cond( + string_ops.regex_full_match(file_prefix, '^s3://.*'), + lambda: ".part", + lambda: "_temp_%s/part" % uuid.uuid4().hex) tmp_checkpoint_prefix = string_ops.string_join( [file_prefix, sharded_suffix]) From 411f70eb0ef6f7f26e91c3de3d8e24d5b0e618e7 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Thu, 20 Feb 2020 23:05:26 +0000 Subject: [PATCH 2/4] remove the try except block around hasAtomicMove --- tensorflow/python/lib/io/file_io.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tensorflow/python/lib/io/file_io.py b/tensorflow/python/lib/io/file_io.py index fa303b18c23..c50b702a5be 100644 --- a/tensorflow/python/lib/io/file_io.py +++ b/tensorflow/python/lib/io/file_io.py @@ -603,11 +603,7 @@ def has_atomic_move(path): we need to be careful about using moves. In some cases it is safer not to use temporary locations in this case. """ - try: - return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path)) - except errors.OpError: - # defaults to True - return True + return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path)) @tf_export(v1=["gfile.ListDirectory"]) From cb5b00852c4305e17aa0322091e8310c63448779 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Thu, 20 Feb 2020 23:44:04 +0000 Subject: [PATCH 3/4] Remove s3 test in file_io --- tensorflow/python/lib/io/file_io_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tensorflow/python/lib/io/file_io_test.py b/tensorflow/python/lib/io/file_io_test.py index 3ba1fc58aba..8f4197f52bd 100644 --- a/tensorflow/python/lib/io/file_io_test.py +++ b/tensorflow/python/lib/io/file_io_test.py @@ -617,7 +617,6 @@ class FileIoTest(test.TestCase): _ = [i for i in info.items()] def testHasAtomicMove(self): - self.assertFalse(file_io.has_atomic_move('s3://x/y')) self.assertTrue(file_io.has_atomic_move('/a/b/c')) From 52f4a69a11e3354ce54004977222418fb38162b8 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Thu, 20 Feb 2020 23:45:13 +0000 Subject: [PATCH 4/4] Revert "remove the try except block around hasAtomicMove" This reverts commit 411f70eb0ef6f7f26e91c3de3d8e24d5b0e618e7. --- tensorflow/python/lib/io/file_io.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tensorflow/python/lib/io/file_io.py b/tensorflow/python/lib/io/file_io.py index c50b702a5be..fa303b18c23 100644 --- a/tensorflow/python/lib/io/file_io.py +++ b/tensorflow/python/lib/io/file_io.py @@ -603,7 +603,11 @@ def has_atomic_move(path): we need to be careful about using moves. In some cases it is safer not to use temporary locations in this case. """ - return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path)) + try: + return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path)) + except errors.OpError: + # defaults to True + return True @tf_export(v1=["gfile.ListDirectory"])