Moving some filesystems to Transactional API

This commit is contained in:
Sami Kama 2020-07-31 14:22:53 -07:00
parent 822eeba7ed
commit e2cba3e0a2
12 changed files with 215 additions and 306 deletions

View File

@ -35,8 +35,8 @@ using UniquePtrTo_TF_Status =
::std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)>;
Status ModularFileSystem::NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) {
if (ops_->new_random_access_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewRandomAccessFile()"));
@ -55,8 +55,8 @@ Status ModularFileSystem::NewRandomAccessFile(
}
Status ModularFileSystem::NewWritableFile(
const std::string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
if (ops_->new_writable_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewWritableFile()"));
@ -75,8 +75,8 @@ Status ModularFileSystem::NewWritableFile(
}
Status ModularFileSystem::NewAppendableFile(
const std::string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
if (ops_->new_appendable_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewAppendableFile()"));
@ -95,8 +95,8 @@ Status ModularFileSystem::NewAppendableFile(
}
Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
const std::string& fname, std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
if (ops_->new_read_only_memory_region_from_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname,
@ -116,8 +116,8 @@ Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::FileExists(
const std::string& fname /*, TransactionToken* token */) {
Status ModularFileSystem::FileExists(const std::string& fname,
TransactionToken* token) {
if (ops_->path_exists == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support FileExists()"));
@ -129,9 +129,9 @@ Status ModularFileSystem::FileExists(
return StatusFromTF_Status(plugin_status.get());
}
bool ModularFileSystem::FilesExist(
const std::vector<std::string>& files,
std::vector<Status>* status /*, TransactionToken* token */) {
bool ModularFileSystem::FilesExist(const std::vector<std::string>& files,
TransactionToken* token,
std::vector<Status>* status) {
if (ops_->paths_exist == nullptr)
return FileSystem::FilesExist(files, status);
@ -162,9 +162,9 @@ bool ModularFileSystem::FilesExist(
return result;
}
Status ModularFileSystem::GetChildren(
const std::string& dir,
std::vector<std::string>* result /*, TransactionToken* token */) {
Status ModularFileSystem::GetChildren(const std::string& dir,
TransactionToken* token,
std::vector<std::string>* result) {
if (ops_->get_children == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dir, " does not support GetChildren()"));
@ -188,9 +188,9 @@ Status ModularFileSystem::GetChildren(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::GetMatchingPaths(
const std::string& pattern,
std::vector<std::string>* result /*, TransactionToken* token */) {
Status ModularFileSystem::GetMatchingPaths(const std::string& pattern,
TransactionToken* token,
std::vector<std::string>* result) {
if (ops_->get_matching_paths == nullptr)
return internal::GetMatchingPaths(this, Env::Default(), pattern, result);
@ -211,8 +211,8 @@ Status ModularFileSystem::GetMatchingPaths(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteFile(
const std::string& fname /*, TransactionToken* token */) {
Status ModularFileSystem::DeleteFile(const std::string& fname,
TransactionToken* token) {
if (ops_->delete_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support DeleteFile()"));
@ -224,9 +224,10 @@ Status ModularFileSystem::DeleteFile(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteRecursively(
const std::string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token */) {
Status ModularFileSystem::DeleteRecursively(const std::string& dirname,
TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) {
if (undeleted_files == nullptr || undeleted_dirs == nullptr)
return errors::FailedPrecondition(
"DeleteRecursively must not be called with `undeleted_files` or "
@ -247,8 +248,8 @@ Status ModularFileSystem::DeleteRecursively(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteDir(
const std::string& dirname /*, TransactionToken* token */) {
Status ModularFileSystem::DeleteDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->delete_dir == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dirname, " does not support DeleteDir()"));
@ -260,8 +261,8 @@ Status ModularFileSystem::DeleteDir(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::RecursivelyCreateDir(
const std::string& dirname /*, TransactionToken* token */) {
Status ModularFileSystem::RecursivelyCreateDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->recursively_create_dir == nullptr)
return FileSystem::RecursivelyCreateDir(dirname);
@ -272,8 +273,8 @@ Status ModularFileSystem::RecursivelyCreateDir(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::CreateDir(
const std::string& dirname /*, TransactionToken* token */) {
Status ModularFileSystem::CreateDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->create_dir == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dirname, " does not support CreateDir()"));
@ -285,9 +286,8 @@ Status ModularFileSystem::CreateDir(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::Stat(
const std::string& fname,
FileStatistics* stat /*, TransactionToken* token */) {
Status ModularFileSystem::Stat(const std::string& fname,
TransactionToken* token, FileStatistics* stat) {
if (ops_->stat == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support Stat()"));
@ -310,8 +310,8 @@ Status ModularFileSystem::Stat(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::IsDirectory(
const std::string& name /*, TransactionToken* token */) {
Status ModularFileSystem::IsDirectory(const std::string& name,
TransactionToken* token) {
if (ops_->is_directory == nullptr) return FileSystem::IsDirectory(name);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
@ -321,9 +321,9 @@ Status ModularFileSystem::IsDirectory(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::GetFileSize(
const std::string& fname,
uint64* file_size /*, TransactionToken* token */) {
Status ModularFileSystem::GetFileSize(const std::string& fname,
TransactionToken* token,
uint64* file_size) {
if (ops_->get_file_size == nullptr) {
FileStatistics stat;
Status status = Stat(fname, &stat);
@ -342,9 +342,9 @@ Status ModularFileSystem::GetFileSize(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::RenameFile(
const std::string& src,
const std::string& target /*, TransactionToken* token */) {
Status ModularFileSystem::RenameFile(const std::string& src,
const std::string& target,
TransactionToken* token) {
if (ops_->rename_file == nullptr) {
Status status = CopyFile(src, target);
if (status.ok()) status = DeleteFile(src);
@ -359,9 +359,9 @@ Status ModularFileSystem::RenameFile(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::CopyFile(
const std::string& src,
const std::string& target /*, TransactionToken* token */) {
Status ModularFileSystem::CopyFile(const std::string& src,
const std::string& target,
TransactionToken* token) {
if (ops_->copy_file == nullptr) return FileSystem::CopyFile(src, target);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
@ -372,8 +372,7 @@ Status ModularFileSystem::CopyFile(
return StatusFromTF_Status(plugin_status.get());
}
std::string ModularFileSystem::TranslateName(
const std::string& name /*, TransactionToken* token */) const {
std::string ModularFileSystem::TranslateName(const std::string& name) const {
if (ops_->translate_name == nullptr) return FileSystem::TranslateName(name);
char* p = ops_->translate_name(filesystem_.get(), name.c_str());
@ -385,7 +384,7 @@ std::string ModularFileSystem::TranslateName(
return ret;
}
void ModularFileSystem::FlushCaches(/*TransactionToken* token*/) {
void ModularFileSystem::FlushCaches(TransactionToken* token) {
if (ops_->flush_caches != nullptr) ops_->flush_caches(filesystem_.get());
}

View File

@ -60,70 +60,45 @@ class ModularFileSystem final : public FileSystem {
~ModularFileSystem() override { ops_->cleanup(filesystem_.get()); }
Status NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewWritableFile(
const std::string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewAppendableFile(
const std::string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
const std::string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const std::string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override;
Status FileExists(
const std::string& fname /*, TransactionToken* token = nullptr */)
override;
const std::string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
Status FileExists(const std::string& fname, TransactionToken* token) override;
bool FilesExist(const std::vector<std::string>& files,
std::vector<Status>*
status /*, TransactionToken* token = nullptr */) override;
Status GetChildren(
const std::string& dir,
std::vector<std::string>* result /*, TransactionToken* token = nullptr */)
override;
Status GetMatchingPaths(
const std::string& pattern,
std::vector<std::string>*
results /*, TransactionToken* token = nullptr */) override;
Status DeleteFile(
const std::string& fname /*, TransactionToken* token = nullptr */)
override;
Status DeleteRecursively(
const std::string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override;
Status DeleteDir(
const std::string& dirname /*, TransactionToken* token = nullptr */)
override;
Status RecursivelyCreateDir(
const std::string& dirname /*, TransactionToken* token = nullptr */)
override;
Status CreateDir(
const std::string& dirname /*, TransactionToken* token = nullptr */)
override;
Status Stat(
const std::string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override;
Status IsDirectory(
const std::string& fname /*, TransactionToken* token = nullptr */)
override;
Status GetFileSize(
const std::string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override;
Status RenameFile(
const std::string& src,
const std::string& target /*, TransactionToken* token = nullptr */)
override;
Status CopyFile(const std::string& src,
const std::string&
target /*, TransactionToken* token = nullptr */) override;
std::string TranslateName(
const std::string& name /*, TransactionToken* token = nullptr */)
const override;
void FlushCaches(/* TransactionToken* token=nullptr */) override;
TransactionToken* token,
std::vector<Status>* status) override;
Status GetChildren(const std::string& dir, TransactionToken* token,
std::vector<std::string>* result) override;
Status GetMatchingPaths(const std::string& pattern, TransactionToken* token,
std::vector<std::string>* results) override;
Status DeleteFile(const std::string& fname, TransactionToken* token) override;
Status DeleteRecursively(const std::string& dirname, TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) override;
Status DeleteDir(const std::string& dirname,
TransactionToken* token) override;
Status RecursivelyCreateDir(const std::string& dirname,
TransactionToken* token) override;
Status CreateDir(const std::string& dirname,
TransactionToken* token) override;
Status Stat(const std::string& fname, TransactionToken* token,
FileStatistics* stat) override;
Status IsDirectory(const std::string& fname,
TransactionToken* token) override;
Status GetFileSize(const std::string& fname, TransactionToken* token,
uint64* file_size) override;
Status RenameFile(const std::string& src, const std::string& target,
TransactionToken* token) override;
Status CopyFile(const std::string& src, const std::string& target,
TransactionToken* token) override;
std::string TranslateName(const std::string& name) const override;
void FlushCaches(TransactionToken* token) override;
private:
std::unique_ptr<TF_Filesystem> filesystem_;

View File

@ -688,7 +688,7 @@ class TestTFFileSystem : public ::tensorflow::NullFileSystem {
data_tensor_(test::AsTensor<double>({1., 2., 3., 4.}, {2, 2})) {}
::tensorflow::Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
const string& fname, ::tensorflow::TransactionToken* token,
std::unique_ptr<::tensorflow::ReadOnlyMemoryRegion>* result) override {
if (fname != kTestMemRegionName) {
return ::tensorflow::errors::Unimplemented(

View File

@ -61,7 +61,7 @@ class TestFileSystem : public NullFileSystem {
public:
~TestFileSystem() override = default;
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
float val = 0;
StringPiece scheme, host, path;

View File

@ -295,7 +295,7 @@ TEST_F(DefaultEnvTest, SleepForMicroseconds) {
class TmpDirFileSystem : public NullFileSystem {
public:
Status FileExists(const string& dir) override {
Status FileExists(const string& dir,TransactionToken* token) override {
StringPiece scheme, host, path;
io::ParseURI(dir, &scheme, &host, &path);
if (path.empty()) return errors::NotFound(dir, " not found");
@ -311,7 +311,7 @@ class TmpDirFileSystem : public NullFileSystem {
return Env::Default()->FileExists(io::JoinPath(BaseDir(), path));
}
Status CreateDir(const string& dir) override {
Status CreateDir(const string& dir,TransactionToken* token) override {
StringPiece scheme, host, path;
io::ParseURI(dir, &scheme, &host, &path);
if (scheme != "tmpdirfs") {
@ -328,7 +328,7 @@ class TmpDirFileSystem : public NullFileSystem {
return status;
}
Status IsDirectory(const string& dir) override {
Status IsDirectory(const string& dir,TransactionToken* token) override {
StringPiece scheme, host, path;
io::ParseURI(dir, &scheme, &host, &path);
for (const auto& existing_dir : created_directories_)
@ -336,7 +336,7 @@ class TmpDirFileSystem : public NullFileSystem {
return errors::NotFound(dir, " not found");
}
void FlushCaches() override { flushed_ = true; }
void FlushCaches(TransactionToken* token) override { flushed_ = true; }
private:
bool flushed_ = false;

View File

@ -481,7 +481,7 @@ class FileSystem {
/// \brief Starts a new transaction
virtual tensorflow::Status StartTransaction(TransactionToken** token) {
token = nullptr;
*token = nullptr;
return Status::OK();
}
@ -499,15 +499,15 @@ class FileSystem {
/// \brief Get token for `path` or start a new transaction and add `path` to
/// it.
virtual tensorflow::Status GetTokenOrStartTransaction(
const std::string& path, TransactionToken** token) {
token = nullptr;
const string& path, TransactionToken** token) {
*token = nullptr;
return Status::OK();
}
/// \brief Return transaction for `path` or nullptr in `token`
virtual tensorflow::Status GetTransactionForPath(const std::string& path,
TransactionToken** token) {
token = nullptr;
*token = nullptr;
return Status::OK();
}

View File

@ -32,7 +32,7 @@ static const char* const kPrefix = "ipfs://solarsystem";
// cannot have children further.
class InterPlanetaryFileSystem : public NullFileSystem {
public:
Status FileExists(const string& fname) override {
Status FileExists(const string& fname, TransactionToken* token) override {
string parsed_path;
ParsePath(fname, &parsed_path);
if (BodyExists(parsed_path)) {
@ -42,7 +42,7 @@ class InterPlanetaryFileSystem : public NullFileSystem {
}
// Adds the dir to the parent's children list and creates an entry for itself.
Status CreateDir(const string& dirname) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
string parsed_path;
ParsePath(dirname, &parsed_path);
// If the directory already exists, throw an error.
@ -88,7 +88,7 @@ class InterPlanetaryFileSystem : public NullFileSystem {
return Status(tensorflow::error::FAILED_PRECONDITION, "Failed to create");
}
Status IsDirectory(const string& dirname) override {
Status IsDirectory(const string& dirname, TransactionToken* token) override {
string parsed_path;
ParsePath(dirname, &parsed_path);
// Simulate evil_directory has bad permissions by throwing a LOG(FATAL)
@ -105,7 +105,8 @@ class InterPlanetaryFileSystem : public NullFileSystem {
return Status(tensorflow::error::FAILED_PRECONDITION, "Not a dir");
}
Status GetChildren(const string& dir, std::vector<string>* result) override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
TF_RETURN_IF_ERROR(IsDirectory(dir));
string parsed_path;
ParsePath(dir, &parsed_path);
@ -273,7 +274,7 @@ TEST(InterPlanetaryFileSystemTest, HasAtomicMove) {
class TestFileSystem : public NullFileSystem {
public:
// Only allow for a single root directory.
Status IsDirectory(const string& dirname) override {
Status IsDirectory(const string& dirname, TransactionToken* token) override {
if (dirname == "." || dirname.empty()) {
return Status::OK();
}
@ -281,7 +282,8 @@ class TestFileSystem : public NullFileSystem {
}
// Simulating a FS with a root dir and a single file underneath it.
Status GetChildren(const string& dir, std::vector<string>* result) override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
if (dir == "." || dir.empty()) {
result->push_back("test");
}

View File

@ -37,83 +37,66 @@ class NullFileSystem : public FileSystem {
~NullFileSystem() override = default;
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
return errors::Unimplemented("NewRandomAccessFile unimplemented");
}
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
return errors::Unimplemented("NewWritableFile unimplemented");
}
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
return errors::Unimplemented("NewAppendableFile unimplemented");
}
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
return errors::Unimplemented(
"NewReadOnlyMemoryRegionFromFile unimplemented");
}
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
return errors::Unimplemented("FileExists unimplemented");
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return errors::Unimplemented("GetChildren unimplemented");
}
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token = nullptr */)
override {
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* results) override {
return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status DeleteFile(const string& fname, TransactionToken* token) override {
return errors::Unimplemented("DeleteFile unimplemented");
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return errors::Unimplemented("CreateDir unimplemented");
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return errors::Unimplemented("DeleteDir unimplemented");
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
return errors::Unimplemented("GetFileSize unimplemented");
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
return errors::Unimplemented("RenameFile unimplemented");
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
return errors::Unimplemented("Stat unimplemented");
}
};

View File

@ -104,9 +104,8 @@ class RamRandomAccessFile : public RandomAccessFile, public WritableFile {
class RamFileSystem : public FileSystem {
public:
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
mutex_lock m(mu_);
if (fs_.find(fname) == fs_.end()) {
return errors::NotFound("");
@ -116,10 +115,8 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
mutex_lock m(mu_);
if (fs_.find(fname) == fs_.end()) {
fs_[fname] = std::make_shared<std::string>();
@ -128,10 +125,8 @@ class RamFileSystem : public FileSystem {
new RamRandomAccessFile(fname, fs_[fname]));
return Status::OK();
}
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
mutex_lock m(mu_);
if (fs_.find(fname) == fs_.end()) {
fs_[fname] = std::make_shared<std::string>();
@ -142,22 +137,18 @@ class RamFileSystem : public FileSystem {
}
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
return errors::Unimplemented("");
}
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
FileStatistics stat;
return Stat(fname, &stat);
return Stat(fname, token, &stat);
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
mutex_lock m(mu_);
auto it = fs_.lower_bound(dir);
while (it != fs_.end() && absl::StartsWith(it->first, dir)) {
@ -168,10 +159,8 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token = nullptr */)
override {
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* results) override {
mutex_lock m(mu_);
Env* env = Env::Default();
for (auto it = fs_.begin(); it != fs_.end(); ++it) {
@ -182,9 +171,8 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
mutex_lock m(mu_);
auto it = fs_.lower_bound(fname);
if (it == fs_.end()) {
@ -204,8 +192,7 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status DeleteFile(const string& fname, TransactionToken* token) override {
mutex_lock m(mu_);
if (fs_.find(fname) != fs_.end()) {
fs_.erase(fname);
@ -215,24 +202,21 @@ class RamFileSystem : public FileSystem {
return errors::NotFound("");
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return Status::OK();
}
Status RecursivelyCreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status RecursivelyCreateDir(const string& dirname,
TransactionToken* token) override {
return Status::OK();
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return Status::OK();
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
mutex_lock m(mu_);
if (fs_.find(fname) != fs_.end()) {
*file_size = fs_[fname]->size();
@ -241,9 +225,8 @@ class RamFileSystem : public FileSystem {
return errors::NotFound("");
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
mutex_lock m(mu_);
if (fs_.find(src) != fs_.end()) {
fs_[target] = fs_[src];

View File

@ -39,36 +39,27 @@ class RetryingFileSystem : public FileSystem {
retry_config_(retry_config) {}
Status NewRandomAccessFile(
const string& filename,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override;
const string& filename, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(
const string& filename,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewWritableFile(const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(
const string& filename,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewAppendableFile(const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const string& filename,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override;
const string& filename, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &fname]() { return base_file_system_->FileExists(fname); },
retry_config_);
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return RetryingUtils::CallWithRetries(
[this, &dir, result]() {
return base_file_system_->GetChildren(dir, result);
@ -76,10 +67,8 @@ class RetryingFileSystem : public FileSystem {
retry_config_);
}
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* result) override {
return RetryingUtils::CallWithRetries(
[this, &pattern, result]() {
return base_file_system_->GetMatchingPaths(pattern, result);
@ -87,38 +76,33 @@ class RetryingFileSystem : public FileSystem {
retry_config_);
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
return RetryingUtils::CallWithRetries(
[this, &fname, stat]() { return base_file_system_->Stat(fname, stat); },
retry_config_);
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status DeleteFile(const string& fname, TransactionToken* token) override {
return RetryingUtils::DeleteWithRetries(
[this, &fname]() { return base_file_system_->DeleteFile(fname); },
retry_config_);
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &dirname]() { return base_file_system_->CreateDir(dirname); },
retry_config_);
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return RetryingUtils::DeleteWithRetries(
[this, &dirname]() { return base_file_system_->DeleteDir(dirname); },
retry_config_);
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
return RetryingUtils::CallWithRetries(
[this, &fname, file_size]() {
return base_file_system_->GetFileSize(fname, file_size);
@ -126,9 +110,8 @@ class RetryingFileSystem : public FileSystem {
retry_config_);
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &src, &target]() {
return base_file_system_->RenameFile(src, target);
@ -136,8 +119,7 @@ class RetryingFileSystem : public FileSystem {
retry_config_);
}
Status IsDirectory(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status IsDirectory(const string& dirname, TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &dirname]() { return base_file_system_->IsDirectory(dirname); },
retry_config_);
@ -148,9 +130,9 @@ class RetryingFileSystem : public FileSystem {
return base_file_system_->HasAtomicMove(path, has_atomic_move);
}
Status DeleteRecursively(
const string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override {
Status DeleteRecursively(const string& dirname, TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) override {
return RetryingUtils::DeleteWithRetries(
[this, &dirname, undeleted_files, undeleted_dirs]() {
return base_file_system_->DeleteRecursively(dirname, undeleted_files,
@ -159,7 +141,7 @@ class RetryingFileSystem : public FileSystem {
retry_config_);
}
void FlushCaches(/* TransactionToken* token=nullptr */) override {
void FlushCaches(TransactionToken* token) override {
base_file_system_->FlushCaches();
}
@ -243,8 +225,8 @@ class RetryingWritableFile : public WritableFile {
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
const string& filename,
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) {
std::unique_ptr<RandomAccessFile> base_file;
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
[this, &filename, &base_file]() {
@ -258,8 +240,8 @@ Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewWritableFile(
const string& filename,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
std::unique_ptr<WritableFile> base_file;
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
[this, &filename, &base_file]() {
@ -273,8 +255,8 @@ Status RetryingFileSystem<Underlying>::NewWritableFile(
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewAppendableFile(
const string& filename,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
std::unique_ptr<WritableFile> base_file;
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
[this, &filename, &base_file]() {
@ -288,8 +270,8 @@ Status RetryingFileSystem<Underlying>::NewAppendableFile(
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
return RetryingUtils::CallWithRetries(
[this, &filename, result]() {
return base_file_system_->NewReadOnlyMemoryRegionFromFile(filename,

View File

@ -100,100 +100,83 @@ class MockFileSystem : public FileSystem {
: calls_(calls), flushed_(flushed) {}
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
*result = std::move(random_access_file_to_return);
return calls_.ConsumeNextCall("NewRandomAccessFile");
}
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
*result = std::move(writable_file_to_return);
return calls_.ConsumeNextCall("NewWritableFile");
}
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
*result = std::move(writable_file_to_return);
return calls_.ConsumeNextCall("NewAppendableFile");
}
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
return calls_.ConsumeNextCall("NewReadOnlyMemoryRegionFromFile");
}
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
return calls_.ConsumeNextCall("FileExists");
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return calls_.ConsumeNextCall("GetChildren");
}
Status GetMatchingPaths(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetMatchingPaths(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return calls_.ConsumeNextCall("GetMatchingPaths");
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
return calls_.ConsumeNextCall("Stat");
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status DeleteFile(const string& fname, TransactionToken* token) override {
return calls_.ConsumeNextCall("DeleteFile");
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return calls_.ConsumeNextCall("CreateDir");
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return calls_.ConsumeNextCall("DeleteDir");
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
return calls_.ConsumeNextCall("GetFileSize");
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
return calls_.ConsumeNextCall("RenameFile");
}
Status IsDirectory(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status IsDirectory(const string& dirname, TransactionToken* token) override {
return calls_.ConsumeNextCall("IsDirectory");
}
Status DeleteRecursively(
const string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override {
Status DeleteRecursively(const string& dirname, TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) override {
return calls_.ConsumeNextCall("DeleteRecursively");
}
void FlushCaches(/* TransactionToken* token=nullptr */) override {
void FlushCaches(
TransactionToken* token) override {
if (flushed_) {
*flushed_ = true;
}

View File

@ -39,12 +39,14 @@ class TestRandomAccessFile : public RandomAccessFile {
class TestFileSystem : public NullFileSystem {
public:
Status NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
result->reset(new TestRandomAccessFile);
return Status::OK();
}
// Always return size of 10
Status GetFileSize(const string& fname, uint64* file_size) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
*file_size = 10;
return Status::OK();
}