Merge pull request from rahul003:s3_skip_temp

PiperOrigin-RevId: 297455352
Change-Id: I41411282776981e9cf4e347b25d238557151f9e6
This commit is contained in:
TensorFlower Gardener 2020-02-26 14:49:31 -08:00
commit eacf534690
16 changed files with 154 additions and 30 deletions

View File

@ -281,6 +281,12 @@ Status Env::IsDirectory(const string& fname) {
return fs->IsDirectory(fname);
}
Status Env::HasAtomicMove(const string& path, bool* has_atomic_move) {
FileSystem* fs;
TF_RETURN_IF_ERROR(GetFileSystemForFile(path, &fs));
return fs->HasAtomicMove(path, has_atomic_move);
}
Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files,
int64* undeleted_dirs) {
FileSystem* fs;

View File

@ -240,6 +240,18 @@ class Env {
/// * UNIMPLEMENTED - The file factory doesn't support directories.
Status IsDirectory(const string& fname);
/// \brief Returns whether the given path is on a file system
/// that has atomic move capabilities. This can be used
/// to determine if there needs to be a temp location to safely write objects.
/// The second boolean argument has_atomic_move contains this information.
///
/// Returns one of the following status codes (not guaranteed exhaustive):
/// * OK - The path is on a recognized file system,
/// so has_atomic_move holds the above information.
/// * UNIMPLEMENTED - The file system of the path hasn't been implemented in
/// TF
Status HasAtomicMove(const string& path, bool* has_atomic_move);
/// Stores the size of `fname` in `*file_size`.
Status GetFileSize(const string& fname, uint64* file_size);

View File

@ -77,6 +77,11 @@ Status FileSystem::IsDirectory(const string& name) {
return Status(tensorflow::error::FAILED_PRECONDITION, "Not a directory");
}
Status FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) {
*has_atomic_move = true;
return Status::OK();
}
void FileSystem::FlushCaches() {}
bool FileSystem::FilesExist(const std::vector<string>& files,

View File

@ -237,6 +237,18 @@ class FileSystem {
/// * UNIMPLEMENTED - The file factory doesn't support directories.
virtual tensorflow::Status IsDirectory(const string& fname);
/// \brief Returns whether the given path is on a file system
/// that has atomic move capabilities. This can be used
/// to determine if there needs to be a temp location to safely write objects.
/// The second boolean argument has_atomic_move contains this information.
///
/// Returns one of the following status codes (not guaranteed exhaustive):
/// * OK - The path is on a recognized file system,
/// so has_atomic_move holds the above information.
/// * UNIMPLEMENTED - The file system of the path hasn't been implemented in
/// TF
virtual Status HasAtomicMove(const string& path, bool* has_atomic_move);
/// \brief Flushes any cached filesystem objects from memory.
virtual void FlushCaches();

View File

@ -261,6 +261,14 @@ TEST(InterPlanetaryFileSystemTest, RecursivelyCreateAlreadyExistingDir) {
// env_test.cc as well as in the modular filesystem tests.
}
TEST(InterPlanetaryFileSystemTest, HasAtomicMove) {
InterPlanetaryFileSystem ipfs;
const string dirname = io::JoinPath(kPrefix, "match-00/abc/00");
bool has_atomic_move;
TF_EXPECT_OK(ipfs.HasAtomicMove(dirname, &has_atomic_move));
EXPECT_EQ(has_atomic_move, true);
}
// A simple file system with a root directory and a single file underneath it.
class TestFileSystem : public NullFileSystem {
public:

View File

@ -121,6 +121,11 @@ class RetryingFileSystem : public FileSystem {
retry_config_);
}
Status HasAtomicMove(const string& path, bool* has_atomic_move) override {
// this method does not need to be retried
return base_file_system_->HasAtomicMove(path, has_atomic_move);
}
Status DeleteRecursively(const string& dirname, int64* undeleted_files,
int64* undeleted_dirs) override {
return RetryingUtils::DeleteWithRetries(

View File

@ -1020,6 +1020,11 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) {
return Status::OK();
}
Status S3FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) {
*has_atomic_move = false;
return Status::OK();
}
REGISTER_FILE_SYSTEM("s3", RetryingS3FileSystem);
} // namespace tensorflow

View File

@ -81,6 +81,8 @@ class S3FileSystem : public FileSystem {
Status RenameFile(const string& src, const string& target) override;
Status HasAtomicMove(const string& path, bool* has_atomic_move) override;
private:
// Returns the member S3 client, initializing as-needed.
// When the client tries to access the object in S3, e.g.,

View File

@ -228,5 +228,13 @@ TEST_F(S3FileSystemTest, StatFile) {
EXPECT_FALSE(stat.is_directory);
}
TEST_F(S3FileSystemTest, HasAtomicMove) {
const string fname = TmpDir("HasAtomicMove");
TF_ASSERT_OK(WriteString(fname, "test"));
bool has_atomic_move = true;
TF_EXPECT_OK(s3fs.NeedsTempLocation(fname, &has_atomic_move).code());
EXPECT_EQ(has_atomic_move, false);
}
} // namespace
} // namespace tensorflow

View File

@ -39,6 +39,7 @@ limitations under the License.
#include "tensorflow/core/lib/io/path.h"
#include "tensorflow/core/lib/io/table_builder.h"
#include "tensorflow/core/lib/random/random.h"
#include "tensorflow/core/lib/strings/str_util.h"
#include "tensorflow/core/lib/strings/stringprintf.h"
#include "tensorflow/core/util/env_var.h"
#include "tensorflow/core/util/saved_tensor_slice_util.h"
@ -401,24 +402,31 @@ BundleWriter::BundleWriter(Env* env, StringPiece prefix, const Options& options)
: env_(env),
options_(options),
prefix_(prefix),
tmp_metadata_path_(strings::StrCat(MetaFilename(prefix_), ".tempstate",
random::New64())),
tmp_data_path_(strings::StrCat(DataFilename(prefix_, 0, 1), ".tempstate",
random::New64())),
out_(nullptr),
size_(0) {
status_ = env_->HasAtomicMove(prefix_, &use_temp_file_);
if (!status_.ok()) return;
data_path_ = DataFilename(prefix_, 0, 1);
metadata_path_ = MetaFilename(prefix_);
if (use_temp_file_) {
data_path_ = strings::StrCat(data_path_, ".tempstate", random::New64());
metadata_path_ =
strings::StrCat(metadata_path_, ".tempstate", random::New64());
}
status_ = env_->CreateDir(string(io::Dirname(prefix_)));
if (!status_.ok() && !errors::IsAlreadyExists(status_)) {
return;
}
const string filename = DataFilename(prefix_, 0, 1);
std::unique_ptr<WritableFile> wrapper;
status_ = env_->NewWritableFile(tmp_data_path_, &wrapper);
status_ = env_->NewWritableFile(data_path_, &wrapper);
if (!status_.ok()) return;
out_ = std::unique_ptr<FileOutputBuffer>(
new FileOutputBuffer(wrapper.release(), 8 << 20 /* 8MB write buffer */));
VLOG(1) << "Writing to file " << tmp_data_path_;
VLOG(1) << "Writing to file " << data_path_;
}
Status BundleWriter::Add(StringPiece key, const Tensor& val) {
@ -506,16 +514,18 @@ Status BundleWriter::Finish() {
status_.Update(out_->Close());
out_ = nullptr;
if (status_.ok()) {
status_ = Env::Default()->RenameFile(tmp_data_path_,
DataFilename(prefix_, 0, 1));
if (use_temp_file_) {
status_ =
Env::Default()->RenameFile(data_path_, DataFilename(prefix_, 0, 1));
}
} else {
Env::Default()->DeleteFile(tmp_data_path_).IgnoreError();
Env::Default()->DeleteFile(data_path_).IgnoreError();
}
}
if (!status_.ok()) return status_;
// Build key -> BundleEntryProto table.
std::unique_ptr<WritableFile> file;
status_ = env_->NewWritableFile(tmp_metadata_path_, &file);
status_ = env_->NewWritableFile(metadata_path_, &file);
if (!status_.ok()) return status_;
{
// N.B.: the default use of Snappy compression may not be supported on all
@ -542,11 +552,10 @@ Status BundleWriter::Finish() {
}
status_.Update(file->Close());
if (!status_.ok()) {
Env::Default()->DeleteFile(tmp_metadata_path_).IgnoreError();
Env::Default()->DeleteFile(metadata_path_).IgnoreError();
return status_;
} else {
status_ =
Env::Default()->RenameFile(tmp_metadata_path_, MetaFilename(prefix_));
} else if (use_temp_file_) {
status_ = Env::Default()->RenameFile(metadata_path_, MetaFilename(prefix_));
if (!status_.ok()) return status_;
}
status_ = errors::Internal("BundleWriter is closed");

View File

@ -149,8 +149,9 @@ class BundleWriter {
Env* const env_; // Not owned.
const Options options_;
const string prefix_;
const string tmp_metadata_path_;
const string tmp_data_path_;
string metadata_path_;
string data_path_;
bool use_temp_file_;
std::unique_ptr<FileOutputBuffer> out_;
int64 size_; // Number of bytes written into out_.
std::map<string, BundleEntryProto> entries_;

View File

@ -523,13 +523,16 @@ def atomic_write_string_to_file(filename, contents, overwrite=True):
overwrite: boolean, if false it's an error for `filename` to be occupied by
an existing file.
"""
temp_pathname = filename + ".tmp" + uuid.uuid4().hex
write_string_to_file(temp_pathname, contents)
try:
rename(temp_pathname, filename, overwrite)
except errors.OpError:
delete_file(temp_pathname)
raise
if not has_atomic_move(filename):
write_string_to_file(filename, contents)
else:
temp_pathname = filename + ".tmp" + uuid.uuid4().hex
write_string_to_file(temp_pathname, contents)
try:
rename(temp_pathname, filename, overwrite)
except errors.OpError:
delete_file(temp_pathname)
raise
@tf_export(v1=["gfile.DeleteRecursively"])
@ -587,6 +590,30 @@ def is_directory_v2(path):
return False
def has_atomic_move(path):
"""Checks whether the file system supports atomic moves.
Returns whether or not the file system of the given path supports the atomic
move operation for a file or folder. If atomic move is supported, it is
recommended to use a temp location for writing and then move to the final
location.
Args:
path: string, path to a file
Returns:
True, if the path is on a file system that supports atomic move
False, if the file system does not support atomic move. In such cases
we need to be careful about using moves. In some cases it is safer
not to use temporary locations in this case.
"""
try:
return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path))
except errors.OpError:
# defaults to True
return True
@tf_export(v1=["gfile.ListDirectory"])
def list_directory(dirname):
"""Returns a list of entries contained within a directory.

View File

@ -617,6 +617,9 @@ class FileIoTest(test.TestCase):
info = np.load(f, allow_pickle=True)
_ = [i for i in info.items()]
def testHasAtomicMove(self):
self.assertTrue(file_io.has_atomic_move("/a/b/c"))
if __name__ == "__main__":
test.main()

View File

@ -127,6 +127,13 @@ PYBIND11_MODULE(_pywrap_file_io, m) {
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return true;
});
m.def("HasAtomicMove", [](const std::string& path) {
bool has_atomic_move;
const auto status =
tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move);
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return has_atomic_move;
});
py::class_<tensorflow::FileStatistics>(m, "FileStatistics")
.def_readonly("length", &tensorflow::FileStatistics::length)

View File

@ -242,14 +242,22 @@ class BaseSaverBuilder(object):
# <train dir>/
# myckpt{.index, .data-?????-of-?????}
#
# Filesystems with eventual consistency (such as S3), don't need a
# temporary location. Using a temporary directory in those cases might
# cause situations where files are not available during copy.
#
# Users only need to interact with the user-specified prefix, which is
# "<train dir>/myckpt" in this case. Save() and Restore() work with the
# prefix directly, instead of any physical pathname. (On failure and
# subsequent restore, an outdated and orphaned temporary directory can be
# safely removed.)
_SHARDED_SUFFIX = "_temp_%s/part" % uuid.uuid4().hex
tmp_checkpoint_prefix = string_ops.string_join(
[checkpoint_prefix, _SHARDED_SUFFIX])
with ops.device("CPU"):
_SHARDED_SUFFIX = array_ops.where(
string_ops.regex_full_match(checkpoint_prefix, "^s3://.*"),
constant_op.constant(".part"),
constant_op.constant("_temp_%s/part" % uuid.uuid4().hex))
tmp_checkpoint_prefix = string_ops.string_join(
[checkpoint_prefix, _SHARDED_SUFFIX])
num_shards = len(per_device)
sharded_saves = []

View File

@ -221,14 +221,20 @@ class MultiDeviceSaver(object):
# <train dir>/
# myckpt{.index, .data-?????-of-?????}
#
# Filesystems with eventual consistency (such as S3), don't need a
# temporary location. Using a temporary directory in those cases might
# cause situations where files are not available during copy.
#
# Users only need to interact with the user-specified prefix, which is
# "<train dir>/myckpt" in this case. Save() and Restore() work with the
# prefix directly, instead of any physical pathname. (On failure and
# subsequent restore, an outdated and orphaned temporary directory can be
# safely removed.)
sharded_suffix = "_temp_%s/part" % uuid.uuid4().hex
with ops.device("cpu:0"):
with ops.device("CPU"):
sharded_suffix = array_ops.where(
string_ops.regex_full_match(file_prefix, "^s3://.*"),
constant_op.constant(".part"),
constant_op.constant("_temp_%s/part" % uuid.uuid4().hex))
tmp_checkpoint_prefix = string_ops.string_join(
[file_prefix, sharded_suffix])