diff --git a/tensorflow/core/platform/env.cc b/tensorflow/core/platform/env.cc index eedfa2ee48f..17328bbc8eb 100644 --- a/tensorflow/core/platform/env.cc +++ b/tensorflow/core/platform/env.cc @@ -281,6 +281,12 @@ Status Env::IsDirectory(const string& fname) { return fs->IsDirectory(fname); } +Status Env::HasAtomicMove(const string& path, bool* has_atomic_move) { + FileSystem* fs; + TF_RETURN_IF_ERROR(GetFileSystemForFile(path, &fs)); + return fs->HasAtomicMove(path, has_atomic_move); +} + Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) { FileSystem* fs; diff --git a/tensorflow/core/platform/env.h b/tensorflow/core/platform/env.h index 829d8dba3b2..ab1598b22f1 100644 --- a/tensorflow/core/platform/env.h +++ b/tensorflow/core/platform/env.h @@ -240,6 +240,18 @@ class Env { /// * UNIMPLEMENTED - The file factory doesn't support directories. Status IsDirectory(const string& fname); + /// \brief Returns whether the given path is on a file system + /// that has atomic move capabilities. This can be used + /// to determine if there needs to be a temp location to safely write objects. + /// The second boolean argument has_atomic_move contains this information. + /// + /// Returns one of the following status codes (not guaranteed exhaustive): + /// * OK - The path is on a recognized file system, + /// so has_atomic_move holds the above information. + /// * UNIMPLEMENTED - The file system of the path hasn't been implemented in + /// TF + Status HasAtomicMove(const string& path, bool* has_atomic_move); + /// Stores the size of `fname` in `*file_size`. Status GetFileSize(const string& fname, uint64* file_size); diff --git a/tensorflow/core/platform/file_system.cc b/tensorflow/core/platform/file_system.cc index 7f8a34383f8..7d8c6aefc33 100644 --- a/tensorflow/core/platform/file_system.cc +++ b/tensorflow/core/platform/file_system.cc @@ -77,6 +77,11 @@ Status FileSystem::IsDirectory(const string& name) { return Status(tensorflow::error::FAILED_PRECONDITION, "Not a directory"); } +Status FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) { + *has_atomic_move = true; + return Status::OK(); +} + void FileSystem::FlushCaches() {} bool FileSystem::FilesExist(const std::vector& files, diff --git a/tensorflow/core/platform/file_system.h b/tensorflow/core/platform/file_system.h index bc860865a53..3ab9618e371 100644 --- a/tensorflow/core/platform/file_system.h +++ b/tensorflow/core/platform/file_system.h @@ -237,6 +237,18 @@ class FileSystem { /// * UNIMPLEMENTED - The file factory doesn't support directories. virtual tensorflow::Status IsDirectory(const string& fname); + /// \brief Returns whether the given path is on a file system + /// that has atomic move capabilities. This can be used + /// to determine if there needs to be a temp location to safely write objects. + /// The second boolean argument has_atomic_move contains this information. + /// + /// Returns one of the following status codes (not guaranteed exhaustive): + /// * OK - The path is on a recognized file system, + /// so has_atomic_move holds the above information. + /// * UNIMPLEMENTED - The file system of the path hasn't been implemented in + /// TF + virtual Status HasAtomicMove(const string& path, bool* has_atomic_move); + /// \brief Flushes any cached filesystem objects from memory. virtual void FlushCaches(); diff --git a/tensorflow/core/platform/file_system_test.cc b/tensorflow/core/platform/file_system_test.cc index 72ef84a8e30..0af45185612 100644 --- a/tensorflow/core/platform/file_system_test.cc +++ b/tensorflow/core/platform/file_system_test.cc @@ -261,6 +261,14 @@ TEST(InterPlanetaryFileSystemTest, RecursivelyCreateAlreadyExistingDir) { // env_test.cc as well as in the modular filesystem tests. } +TEST(InterPlanetaryFileSystemTest, HasAtomicMove) { + InterPlanetaryFileSystem ipfs; + const string dirname = io::JoinPath(kPrefix, "match-00/abc/00"); + bool has_atomic_move; + TF_EXPECT_OK(ipfs.HasAtomicMove(dirname, &has_atomic_move)); + EXPECT_EQ(has_atomic_move, true); +} + // A simple file system with a root directory and a single file underneath it. class TestFileSystem : public NullFileSystem { public: diff --git a/tensorflow/core/platform/retrying_file_system.h b/tensorflow/core/platform/retrying_file_system.h index df8850ace93..7c7d7563d22 100644 --- a/tensorflow/core/platform/retrying_file_system.h +++ b/tensorflow/core/platform/retrying_file_system.h @@ -121,6 +121,11 @@ class RetryingFileSystem : public FileSystem { retry_config_); } + Status HasAtomicMove(const string& path, bool* has_atomic_move) override { + // this method does not need to be retried + return base_file_system_->HasAtomicMove(path, has_atomic_move); + } + Status DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) override { return RetryingUtils::DeleteWithRetries( diff --git a/tensorflow/core/platform/s3/s3_file_system.cc b/tensorflow/core/platform/s3/s3_file_system.cc index 3be563d2c94..9749fec8ee5 100644 --- a/tensorflow/core/platform/s3/s3_file_system.cc +++ b/tensorflow/core/platform/s3/s3_file_system.cc @@ -1020,6 +1020,11 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) { return Status::OK(); } +Status S3FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) { + *has_atomic_move = false; + return Status::OK(); +} + REGISTER_FILE_SYSTEM("s3", RetryingS3FileSystem); } // namespace tensorflow diff --git a/tensorflow/core/platform/s3/s3_file_system.h b/tensorflow/core/platform/s3/s3_file_system.h index 7ea01b24cf9..7b70c374a51 100644 --- a/tensorflow/core/platform/s3/s3_file_system.h +++ b/tensorflow/core/platform/s3/s3_file_system.h @@ -81,6 +81,8 @@ class S3FileSystem : public FileSystem { Status RenameFile(const string& src, const string& target) override; + Status HasAtomicMove(const string& path, bool* has_atomic_move) override; + private: // Returns the member S3 client, initializing as-needed. // When the client tries to access the object in S3, e.g., diff --git a/tensorflow/core/platform/s3/s3_file_system_test.cc b/tensorflow/core/platform/s3/s3_file_system_test.cc index 102c82d8ee8..95c7467fb74 100644 --- a/tensorflow/core/platform/s3/s3_file_system_test.cc +++ b/tensorflow/core/platform/s3/s3_file_system_test.cc @@ -228,5 +228,13 @@ TEST_F(S3FileSystemTest, StatFile) { EXPECT_FALSE(stat.is_directory); } +TEST_F(S3FileSystemTest, HasAtomicMove) { + const string fname = TmpDir("HasAtomicMove"); + TF_ASSERT_OK(WriteString(fname, "test")); + bool has_atomic_move = true; + TF_EXPECT_OK(s3fs.NeedsTempLocation(fname, &has_atomic_move).code()); + EXPECT_EQ(has_atomic_move, false); +} + } // namespace } // namespace tensorflow diff --git a/tensorflow/core/util/tensor_bundle/tensor_bundle.cc b/tensorflow/core/util/tensor_bundle/tensor_bundle.cc index 339c28dfb66..bbfb8883227 100644 --- a/tensorflow/core/util/tensor_bundle/tensor_bundle.cc +++ b/tensorflow/core/util/tensor_bundle/tensor_bundle.cc @@ -39,6 +39,7 @@ limitations under the License. #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/io/table_builder.h" #include "tensorflow/core/lib/random/random.h" +#include "tensorflow/core/lib/strings/str_util.h" #include "tensorflow/core/lib/strings/stringprintf.h" #include "tensorflow/core/util/env_var.h" #include "tensorflow/core/util/saved_tensor_slice_util.h" @@ -401,24 +402,31 @@ BundleWriter::BundleWriter(Env* env, StringPiece prefix, const Options& options) : env_(env), options_(options), prefix_(prefix), - tmp_metadata_path_(strings::StrCat(MetaFilename(prefix_), ".tempstate", - random::New64())), - tmp_data_path_(strings::StrCat(DataFilename(prefix_, 0, 1), ".tempstate", - random::New64())), out_(nullptr), size_(0) { + status_ = env_->HasAtomicMove(prefix_, &use_temp_file_); + if (!status_.ok()) return; + + data_path_ = DataFilename(prefix_, 0, 1); + metadata_path_ = MetaFilename(prefix_); + if (use_temp_file_) { + data_path_ = strings::StrCat(data_path_, ".tempstate", random::New64()); + metadata_path_ = + strings::StrCat(metadata_path_, ".tempstate", random::New64()); + } + status_ = env_->CreateDir(string(io::Dirname(prefix_))); if (!status_.ok() && !errors::IsAlreadyExists(status_)) { return; } - const string filename = DataFilename(prefix_, 0, 1); + std::unique_ptr wrapper; - status_ = env_->NewWritableFile(tmp_data_path_, &wrapper); + status_ = env_->NewWritableFile(data_path_, &wrapper); if (!status_.ok()) return; out_ = std::unique_ptr( new FileOutputBuffer(wrapper.release(), 8 << 20 /* 8MB write buffer */)); - VLOG(1) << "Writing to file " << tmp_data_path_; + VLOG(1) << "Writing to file " << data_path_; } Status BundleWriter::Add(StringPiece key, const Tensor& val) { @@ -506,16 +514,18 @@ Status BundleWriter::Finish() { status_.Update(out_->Close()); out_ = nullptr; if (status_.ok()) { - status_ = Env::Default()->RenameFile(tmp_data_path_, - DataFilename(prefix_, 0, 1)); + if (use_temp_file_) { + status_ = + Env::Default()->RenameFile(data_path_, DataFilename(prefix_, 0, 1)); + } } else { - Env::Default()->DeleteFile(tmp_data_path_).IgnoreError(); + Env::Default()->DeleteFile(data_path_).IgnoreError(); } } if (!status_.ok()) return status_; // Build key -> BundleEntryProto table. std::unique_ptr file; - status_ = env_->NewWritableFile(tmp_metadata_path_, &file); + status_ = env_->NewWritableFile(metadata_path_, &file); if (!status_.ok()) return status_; { // N.B.: the default use of Snappy compression may not be supported on all @@ -542,11 +552,10 @@ Status BundleWriter::Finish() { } status_.Update(file->Close()); if (!status_.ok()) { - Env::Default()->DeleteFile(tmp_metadata_path_).IgnoreError(); + Env::Default()->DeleteFile(metadata_path_).IgnoreError(); return status_; - } else { - status_ = - Env::Default()->RenameFile(tmp_metadata_path_, MetaFilename(prefix_)); + } else if (use_temp_file_) { + status_ = Env::Default()->RenameFile(metadata_path_, MetaFilename(prefix_)); if (!status_.ok()) return status_; } status_ = errors::Internal("BundleWriter is closed"); diff --git a/tensorflow/core/util/tensor_bundle/tensor_bundle.h b/tensorflow/core/util/tensor_bundle/tensor_bundle.h index c362dd41151..c441000e47d 100644 --- a/tensorflow/core/util/tensor_bundle/tensor_bundle.h +++ b/tensorflow/core/util/tensor_bundle/tensor_bundle.h @@ -149,8 +149,9 @@ class BundleWriter { Env* const env_; // Not owned. const Options options_; const string prefix_; - const string tmp_metadata_path_; - const string tmp_data_path_; + string metadata_path_; + string data_path_; + bool use_temp_file_; std::unique_ptr out_; int64 size_; // Number of bytes written into out_. std::map entries_; diff --git a/tensorflow/python/lib/io/file_io.py b/tensorflow/python/lib/io/file_io.py index 55b4359d75b..d34afb0f1c0 100644 --- a/tensorflow/python/lib/io/file_io.py +++ b/tensorflow/python/lib/io/file_io.py @@ -523,13 +523,16 @@ def atomic_write_string_to_file(filename, contents, overwrite=True): overwrite: boolean, if false it's an error for `filename` to be occupied by an existing file. """ - temp_pathname = filename + ".tmp" + uuid.uuid4().hex - write_string_to_file(temp_pathname, contents) - try: - rename(temp_pathname, filename, overwrite) - except errors.OpError: - delete_file(temp_pathname) - raise + if not has_atomic_move(filename): + write_string_to_file(filename, contents) + else: + temp_pathname = filename + ".tmp" + uuid.uuid4().hex + write_string_to_file(temp_pathname, contents) + try: + rename(temp_pathname, filename, overwrite) + except errors.OpError: + delete_file(temp_pathname) + raise @tf_export(v1=["gfile.DeleteRecursively"]) @@ -587,6 +590,30 @@ def is_directory_v2(path): return False +def has_atomic_move(path): + """Checks whether the file system supports atomic moves. + + Returns whether or not the file system of the given path supports the atomic + move operation for a file or folder. If atomic move is supported, it is + recommended to use a temp location for writing and then move to the final + location. + + Args: + path: string, path to a file + + Returns: + True, if the path is on a file system that supports atomic move + False, if the file system does not support atomic move. In such cases + we need to be careful about using moves. In some cases it is safer + not to use temporary locations in this case. + """ + try: + return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path)) + except errors.OpError: + # defaults to True + return True + + @tf_export(v1=["gfile.ListDirectory"]) def list_directory(dirname): """Returns a list of entries contained within a directory. diff --git a/tensorflow/python/lib/io/file_io_test.py b/tensorflow/python/lib/io/file_io_test.py index 86e0c602d89..2e42eb8fbe8 100644 --- a/tensorflow/python/lib/io/file_io_test.py +++ b/tensorflow/python/lib/io/file_io_test.py @@ -617,6 +617,9 @@ class FileIoTest(test.TestCase): info = np.load(f, allow_pickle=True) _ = [i for i in info.items()] + def testHasAtomicMove(self): + self.assertTrue(file_io.has_atomic_move("/a/b/c")) + if __name__ == "__main__": test.main() diff --git a/tensorflow/python/lib/io/file_io_wrapper.cc b/tensorflow/python/lib/io/file_io_wrapper.cc index 6a5399c0db1..e104881a64d 100644 --- a/tensorflow/python/lib/io/file_io_wrapper.cc +++ b/tensorflow/python/lib/io/file_io_wrapper.cc @@ -127,6 +127,13 @@ PYBIND11_MODULE(_pywrap_file_io, m) { tensorflow::MaybeRaiseRegisteredFromStatus(status); return true; }); + m.def("HasAtomicMove", [](const std::string& path) { + bool has_atomic_move; + const auto status = + tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move); + tensorflow::MaybeRaiseRegisteredFromStatus(status); + return has_atomic_move; + }); py::class_(m, "FileStatistics") .def_readonly("length", &tensorflow::FileStatistics::length) diff --git a/tensorflow/python/training/saver.py b/tensorflow/python/training/saver.py index a4cf53f1505..9f28264f52d 100644 --- a/tensorflow/python/training/saver.py +++ b/tensorflow/python/training/saver.py @@ -242,14 +242,22 @@ class BaseSaverBuilder(object): # / # myckpt{.index, .data-?????-of-?????} # + # Filesystems with eventual consistency (such as S3), don't need a + # temporary location. Using a temporary directory in those cases might + # cause situations where files are not available during copy. + # # Users only need to interact with the user-specified prefix, which is # "/myckpt" in this case. Save() and Restore() work with the # prefix directly, instead of any physical pathname. (On failure and # subsequent restore, an outdated and orphaned temporary directory can be # safely removed.) - _SHARDED_SUFFIX = "_temp_%s/part" % uuid.uuid4().hex - tmp_checkpoint_prefix = string_ops.string_join( - [checkpoint_prefix, _SHARDED_SUFFIX]) + with ops.device("CPU"): + _SHARDED_SUFFIX = array_ops.where( + string_ops.regex_full_match(checkpoint_prefix, "^s3://.*"), + constant_op.constant(".part"), + constant_op.constant("_temp_%s/part" % uuid.uuid4().hex)) + tmp_checkpoint_prefix = string_ops.string_join( + [checkpoint_prefix, _SHARDED_SUFFIX]) num_shards = len(per_device) sharded_saves = [] diff --git a/tensorflow/python/training/saving/functional_saver.py b/tensorflow/python/training/saving/functional_saver.py index f5048271ea7..d85852dabe6 100644 --- a/tensorflow/python/training/saving/functional_saver.py +++ b/tensorflow/python/training/saving/functional_saver.py @@ -221,14 +221,20 @@ class MultiDeviceSaver(object): # / # myckpt{.index, .data-?????-of-?????} # + # Filesystems with eventual consistency (such as S3), don't need a + # temporary location. Using a temporary directory in those cases might + # cause situations where files are not available during copy. + # # Users only need to interact with the user-specified prefix, which is # "/myckpt" in this case. Save() and Restore() work with the # prefix directly, instead of any physical pathname. (On failure and # subsequent restore, an outdated and orphaned temporary directory can be # safely removed.) - sharded_suffix = "_temp_%s/part" % uuid.uuid4().hex - - with ops.device("cpu:0"): + with ops.device("CPU"): + sharded_suffix = array_ops.where( + string_ops.regex_full_match(file_prefix, "^s3://.*"), + constant_op.constant(".part"), + constant_op.constant("_temp_%s/part" % uuid.uuid4().hex)) tmp_checkpoint_prefix = string_ops.string_join( [file_prefix, sharded_suffix])