Not use temp files when writing to S3

Address feedback

Add test for the python method has_atomic_move

Removed old comment, and fixed indentation

Remove unncessary imports

Remove the test which checks for reference cycles when saving. Since the check for file system introduces a conditional op, it introduces a reference cycle, and this check does not apply anymore

Fighting lint

Fix lint errors

Use returned status of hasAtomicMove
This commit is contained in:
Rahul Huilgol 2020-01-30 22:56:03 +00:00
parent 89d0729777
commit e65e99c433
17 changed files with 148 additions and 49 deletions

View File

@ -120,6 +120,11 @@ class RetryingFileSystem : public FileSystem {
[this, &dirname]() { return base_file_system_->IsDirectory(dirname); },
retry_config_);
}
Status HasAtomicMove(const string& path, bool* has_atomic_move) override {
// this method does not need to be retried
return base_file_system_->HasAtomicMove(path, has_atomic_move);
}
Status DeleteRecursively(const string& dirname, int64* undeleted_files,
int64* undeleted_dirs) override {

View File

@ -281,6 +281,12 @@ Status Env::IsDirectory(const string& fname) {
return fs->IsDirectory(fname);
}
Status Env::HasAtomicMove(const string& path, bool* has_atomic_move) {
FileSystem* fs;
TF_RETURN_IF_ERROR(GetFileSystemForFile(path, &fs));
return fs->HasAtomicMove(path, has_atomic_move);
}
Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files,
int64* undeleted_dirs) {
FileSystem* fs;

View File

@ -239,7 +239,18 @@ class Env {
/// * PERMISSION_DENIED - Insufficient permissions.
/// * UNIMPLEMENTED - The file factory doesn't support directories.
Status IsDirectory(const string& fname);
/// \brief Returns whether the given path is on a file system
/// that has atomic move capabilities. This can be used
/// to determine if there needs to be a temp location to safely write objects.
/// The second boolean argument has_atomic_move contains this information.
///
/// Returns one of the following status codes (not guaranteed exhaustive):
/// * OK - The path is on a recognized file system,
/// so has_atomic_move holds the above information.
/// * UNIMPLEMENTED - The file system of the path hasn't been implemented in TF
Status HasAtomicMove(const string& path, bool* has_atomic_move);
/// Stores the size of `fname` in `*file_size`.
Status GetFileSize(const string& fname, uint64* file_size);

View File

@ -77,6 +77,11 @@ Status FileSystem::IsDirectory(const string& name) {
return Status(tensorflow::error::FAILED_PRECONDITION, "Not a directory");
}
Status FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) {
*has_atomic_move = true;
return Status::OK();
}
void FileSystem::FlushCaches() {}
bool FileSystem::FilesExist(const std::vector<string>& files,

View File

@ -237,6 +237,17 @@ class FileSystem {
/// * UNIMPLEMENTED - The file factory doesn't support directories.
virtual tensorflow::Status IsDirectory(const string& fname);
/// \brief Returns whether the given path is on a file system
/// that has atomic move capabilities. This can be used
/// to determine if there needs to be a temp location to safely write objects.
/// The second boolean argument has_atomic_move contains this information.
///
/// Returns one of the following status codes (not guaranteed exhaustive):
/// * OK - The path is on a recognized file system,
/// so has_atomic_move holds the above information.
/// * UNIMPLEMENTED - The file system of the path hasn't been implemented in TF
virtual Status HasAtomicMove(const string& path, bool* has_atomic_move);
/// \brief Flushes any cached filesystem objects from memory.
virtual void FlushCaches();

View File

@ -261,6 +261,14 @@ TEST(InterPlanetaryFileSystemTest, RecursivelyCreateAlreadyExistingDir) {
// env_test.cc as well as in the modular filesystem tests.
}
TEST(InterPlanetaryFileSystemTest, HasAtomicMove) {
InterPlanetaryFileSystem ipfs;
const string dirname = io::JoinPath(kPrefix, "match-00/abc/00");
bool has_atomic_move;
TF_EXPECT_OK(ipfs.HasAtomicMove(dirname, &has_atomic_move));
EXPECT_EQ(has_atomic_move, true);
}
// A simple file system with a root directory and a single file underneath it.
class TestFileSystem : public NullFileSystem {
public:

View File

@ -661,6 +661,11 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) {
return Status::OK();
}
Status S3FileSystem::HasAtomicMove(const string& path, bool* has_atomic_move) {
*has_atomic_move = false;
return Status::OK();
}
REGISTER_FILE_SYSTEM("s3", S3FileSystem);
} // namespace tensorflow

View File

@ -59,6 +59,8 @@ class S3FileSystem : public FileSystem {
Status RenameFile(const string& src, const string& target) override;
virtual Status HasAtomicMove(const string& path, bool* has_atomic_move) override;
private:
// Returns the member S3 client, initializing as-needed.
// When the client tries to access the object in S3, e.g.,

View File

@ -228,5 +228,13 @@ TEST_F(S3FileSystemTest, StatFile) {
EXPECT_FALSE(stat.is_directory);
}
TEST_F(S3FileSystemTest, HasAtomicMove) {
const string fname = TmpDir("HasAtomicMove");
TF_ASSERT_OK(WriteString(fname, "test"));
bool has_atomic_move = true;
TF_EXPECT_OK(s3fs.NeedsTempLocation(fname, &has_atomic_move).code());
EXPECT_EQ(has_atomic_move, false);
}
} // namespace
} // namespace tensorflow

View File

@ -39,6 +39,7 @@ limitations under the License.
#include "tensorflow/core/lib/io/path.h"
#include "tensorflow/core/lib/io/table_builder.h"
#include "tensorflow/core/lib/random/random.h"
#include "tensorflow/core/lib/strings/str_util.h"
#include "tensorflow/core/lib/strings/stringprintf.h"
#include "tensorflow/core/util/saved_tensor_slice_util.h"
#include "tensorflow/core/util/tensor_bundle/byte_swap.h"
@ -400,24 +401,31 @@ BundleWriter::BundleWriter(Env* env, StringPiece prefix, const Options& options)
: env_(env),
options_(options),
prefix_(prefix),
tmp_metadata_path_(strings::StrCat(MetaFilename(prefix_), ".tempstate",
random::New64())),
tmp_data_path_(strings::StrCat(DataFilename(prefix_, 0, 1), ".tempstate",
random::New64())),
out_(nullptr),
size_(0) {
status_ = env_->HasAtomicMove(prefix_, &use_temp_file_);
if (!status_.ok()) return;
data_path_ = DataFilename(prefix_, 0, 1);
metadata_path_ = MetaFilename(prefix_);
if (use_temp_file_) {
data_path_ = strings::StrCat(data_path_, ".tempstate", random::New64());
metadata_path_ =
strings::StrCat(metadata_path_, ".tempstate", random::New64());
}
status_ = env_->CreateDir(string(io::Dirname(prefix_)));
if (!status_.ok() && !errors::IsAlreadyExists(status_)) {
return;
}
const string filename = DataFilename(prefix_, 0, 1);
std::unique_ptr<WritableFile> wrapper;
status_ = env_->NewWritableFile(tmp_data_path_, &wrapper);
status_ = env_->NewWritableFile(data_path_, &wrapper);
if (!status_.ok()) return;
out_ = std::unique_ptr<FileOutputBuffer>(
new FileOutputBuffer(wrapper.release(), 8 << 20 /* 8MB write buffer */));
VLOG(1) << "Writing to file " << tmp_data_path_;
VLOG(1) << "Writing to file " << data_path_;
}
Status BundleWriter::Add(StringPiece key, const Tensor& val) {
@ -505,16 +513,18 @@ Status BundleWriter::Finish() {
status_.Update(out_->Close());
out_ = nullptr;
if (status_.ok()) {
status_ = Env::Default()->RenameFile(tmp_data_path_,
DataFilename(prefix_, 0, 1));
if (use_temp_file_) {
status_ =
Env::Default()->RenameFile(data_path_, DataFilename(prefix_, 0, 1));
}
} else {
Env::Default()->DeleteFile(tmp_data_path_).IgnoreError();
Env::Default()->DeleteFile(data_path_).IgnoreError();
}
}
if (!status_.ok()) return status_;
// Build key -> BundleEntryProto table.
std::unique_ptr<WritableFile> file;
status_ = env_->NewWritableFile(tmp_metadata_path_, &file);
status_ = env_->NewWritableFile(metadata_path_, &file);
if (!status_.ok()) return status_;
{
// N.B.: the default use of Snappy compression may not be supported on all
@ -541,11 +551,11 @@ Status BundleWriter::Finish() {
}
status_.Update(file->Close());
if (!status_.ok()) {
Env::Default()->DeleteFile(tmp_metadata_path_).IgnoreError();
Env::Default()->DeleteFile(metadata_path_).IgnoreError();
return status_;
} else {
} else if (use_temp_file_) {
status_ =
Env::Default()->RenameFile(tmp_metadata_path_, MetaFilename(prefix_));
Env::Default()->RenameFile(metadata_path_, MetaFilename(prefix_));
if (!status_.ok()) return status_;
}
status_ = errors::Internal("BundleWriter is closed");

View File

@ -149,8 +149,9 @@ class BundleWriter {
Env* const env_; // Not owned.
const Options options_;
const string prefix_;
const string tmp_metadata_path_;
const string tmp_data_path_;
string metadata_path_;
string data_path_;
bool use_temp_file_;
std::unique_ptr<FileOutputBuffer> out_;
int64 size_; // Number of bytes written into out_.
std::map<string, BundleEntryProto> entries_;

View File

@ -523,13 +523,16 @@ def atomic_write_string_to_file(filename, contents, overwrite=True):
overwrite: boolean, if false it's an error for `filename` to be occupied by
an existing file.
"""
temp_pathname = filename + ".tmp" + uuid.uuid4().hex
write_string_to_file(temp_pathname, contents)
try:
rename(temp_pathname, filename, overwrite)
except errors.OpError:
delete_file(temp_pathname)
raise
if not has_atomic_move(filename):
write_string_to_file(filename, contents)
else:
temp_pathname = filename + ".tmp" + uuid.uuid4().hex
write_string_to_file(temp_pathname, contents)
try:
rename(temp_pathname, filename, overwrite)
except errors.OpError:
delete_file(temp_pathname)
raise
@tf_export(v1=["gfile.DeleteRecursively"])
@ -586,6 +589,26 @@ def is_directory_v2(path):
except errors.OpError:
return False
def has_atomic_move(path):
""" Returns whether or not the file system of the given path
supports the atomic move operation for file or folder.
If atomic move is supported, it is recommended to use a temp location
for writing and then move to the final location.
Args:
path: string, path to a file
Returns:
True, if the path is on a file system that supports atomic move
False, if the file system does not support atomic move. In such cases
we need to be careful about using moves. In some cases it is safer
not to use temporary locations in this case.
"""
try:
return _pywrap_file_io.HasAtomicMove(compat.as_bytes(path))
except errors.OpError:
# defaults to True
return True
@tf_export(v1=["gfile.ListDirectory"])
def list_directory(dirname):

View File

@ -28,7 +28,6 @@ from tensorflow.python.lib.io import file_io
from tensorflow.python.platform import gfile
from tensorflow.python.platform import test
class FileIoTest(test.TestCase):
def setUp(self):
@ -616,7 +615,11 @@ class FileIoTest(test.TestCase):
with gfile.GFile(filename, "rb") as f:
info = np.load(f, allow_pickle=True)
_ = [i for i in info.items()]
def testHasAtomicMove(self):
self.assertFalse(file_io.has_atomic_move('s3://x/y'))
self.assertTrue(file_io.has_atomic_move('/a/b/c'))
if __name__ == "__main__":
test.main()

View File

@ -127,7 +127,13 @@ PYBIND11_MODULE(_pywrap_file_io, m) {
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return true;
});
m.def("HasAtomicMove", [](const std::string& path) {
bool has_atomic_move;
const auto status = tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move);
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return has_atomic_move;
});
py::class_<tensorflow::FileStatistics>(m, "FileStatistics")
.def_readonly("length", &tensorflow::FileStatistics::length)
.def_readonly("mtime_nsec", &tensorflow::FileStatistics::mtime_nsec)

View File

@ -19,7 +19,6 @@ from __future__ import division
from __future__ import print_function
import os
import sys
from google.protobuf import text_format
@ -667,23 +666,5 @@ class _ModelWithOptimizerUsingDefun(util.Checkpoint):
return {"loss": loss}
class MemoryTests(test.TestCase):
def setUp(self):
self._model = _ModelWithOptimizerUsingDefun()
@test_util.assert_no_garbage_created
def test_no_reference_cycles(self):
x = constant_op.constant([[3., 4.]])
y = constant_op.constant([2.])
self._model.call(x, y)
if sys.version_info[0] < 3:
# TODO(allenl): debug reference cycles in Python 2.x
self.skipTest("This test only works in Python 3+. Reference cycles are "
"created in older Python versions.")
save_dir = os.path.join(self.get_temp_dir(), "saved_model")
save.save(self._model, save_dir, self._model.call)
if __name__ == "__main__":
test.main()

View File

@ -242,12 +242,19 @@ class BaseSaverBuilder(object):
# <train dir>/
# myckpt{.index, .data-?????-of-?????}
#
# Filesystems with eventual consistency (such as S3), don't need a temporary
# location. Using a temporary directory in those cases
# might cause situations where files are not available during copy.
#
# Users only need to interact with the user-specified prefix, which is
# "<train dir>/myckpt" in this case. Save() and Restore() work with the
# prefix directly, instead of any physical pathname. (On failure and
# subsequent restore, an outdated and orphaned temporary directory can be
# safely removed.)
_SHARDED_SUFFIX = "_temp_%s/part" % uuid.uuid4().hex
_SHARDED_SUFFIX = control_flow_ops.cond(
string_ops.regex_full_match(checkpoint_prefix, '^s3://.*'),
lambda: ".part",
lambda: "_temp_%s/part" % uuid.uuid4().hex)
tmp_checkpoint_prefix = string_ops.string_join(
[checkpoint_prefix, _SHARDED_SUFFIX])

View File

@ -30,6 +30,7 @@ from tensorflow.python.ops import array_ops
from tensorflow.python.ops import gen_io_ops
from tensorflow.python.ops import io_ops
from tensorflow.python.ops import string_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.training.saving import saveable_hook
from tensorflow.python.training.saving import saveable_object
from tensorflow.python.training.saving import saveable_object_util
@ -221,14 +222,20 @@ class MultiDeviceSaver(object):
# <train dir>/
# myckpt{.index, .data-?????-of-?????}
#
# Filesystems with eventual consistency (such as S3), don't need a temporary
# location. Using a temporary directory in those cases
# might cause situations where files are not available during copy.
#
# Users only need to interact with the user-specified prefix, which is
# "<train dir>/myckpt" in this case. Save() and Restore() work with the
# prefix directly, instead of any physical pathname. (On failure and
# subsequent restore, an outdated and orphaned temporary directory can be
# safely removed.)
sharded_suffix = "_temp_%s/part" % uuid.uuid4().hex
with ops.device("cpu:0"):
sharded_suffix = control_flow_ops.cond(
string_ops.regex_full_match(file_prefix, '^s3://.*'),
lambda: ".part",
lambda: "_temp_%s/part" % uuid.uuid4().hex)
tmp_checkpoint_prefix = string_ops.string_join(
[file_prefix, sharded_suffix])