Merge pull request #41946 from samikama:Transactions_part11

PiperOrigin-RevId: 325260530
Change-Id: I03430704d600acee6d70d07f7cc3802eefa242ae
This commit is contained in:
TensorFlower Gardener 2020-08-06 10:57:03 -07:00
commit eaebc51dd6
17 changed files with 713 additions and 763 deletions

View File

@ -35,8 +35,8 @@ using UniquePtrTo_TF_Status =
::std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)>;
Status ModularFileSystem::NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) {
if (ops_->new_random_access_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewRandomAccessFile()"));
@ -55,8 +55,8 @@ Status ModularFileSystem::NewRandomAccessFile(
}
Status ModularFileSystem::NewWritableFile(
const std::string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
if (ops_->new_writable_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewWritableFile()"));
@ -75,8 +75,8 @@ Status ModularFileSystem::NewWritableFile(
}
Status ModularFileSystem::NewAppendableFile(
const std::string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
if (ops_->new_appendable_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewAppendableFile()"));
@ -95,8 +95,8 @@ Status ModularFileSystem::NewAppendableFile(
}
Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
const std::string& fname, std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token */) {
const std::string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
if (ops_->new_read_only_memory_region_from_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname,
@ -116,8 +116,8 @@ Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::FileExists(
const std::string& fname /*, TransactionToken* token */) {
Status ModularFileSystem::FileExists(const std::string& fname,
TransactionToken* token) {
if (ops_->path_exists == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support FileExists()"));
@ -129,9 +129,9 @@ Status ModularFileSystem::FileExists(
return StatusFromTF_Status(plugin_status.get());
}
bool ModularFileSystem::FilesExist(
const std::vector<std::string>& files,
std::vector<Status>* status /*, TransactionToken* token */) {
bool ModularFileSystem::FilesExist(const std::vector<std::string>& files,
TransactionToken* token,
std::vector<Status>* status) {
if (ops_->paths_exist == nullptr)
return FileSystem::FilesExist(files, status);
@ -162,9 +162,9 @@ bool ModularFileSystem::FilesExist(
return result;
}
Status ModularFileSystem::GetChildren(
const std::string& dir,
std::vector<std::string>* result /*, TransactionToken* token */) {
Status ModularFileSystem::GetChildren(const std::string& dir,
TransactionToken* token,
std::vector<std::string>* result) {
if (ops_->get_children == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dir, " does not support GetChildren()"));
@ -188,9 +188,9 @@ Status ModularFileSystem::GetChildren(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::GetMatchingPaths(
const std::string& pattern,
std::vector<std::string>* result /*, TransactionToken* token */) {
Status ModularFileSystem::GetMatchingPaths(const std::string& pattern,
TransactionToken* token,
std::vector<std::string>* result) {
if (ops_->get_matching_paths == nullptr)
return internal::GetMatchingPaths(this, Env::Default(), pattern, result);
@ -211,8 +211,8 @@ Status ModularFileSystem::GetMatchingPaths(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteFile(
const std::string& fname /*, TransactionToken* token */) {
Status ModularFileSystem::DeleteFile(const std::string& fname,
TransactionToken* token) {
if (ops_->delete_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support DeleteFile()"));
@ -224,9 +224,10 @@ Status ModularFileSystem::DeleteFile(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteRecursively(
const std::string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token */) {
Status ModularFileSystem::DeleteRecursively(const std::string& dirname,
TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) {
if (undeleted_files == nullptr || undeleted_dirs == nullptr)
return errors::FailedPrecondition(
"DeleteRecursively must not be called with `undeleted_files` or "
@ -247,8 +248,8 @@ Status ModularFileSystem::DeleteRecursively(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteDir(
const std::string& dirname /*, TransactionToken* token */) {
Status ModularFileSystem::DeleteDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->delete_dir == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dirname, " does not support DeleteDir()"));
@ -260,8 +261,8 @@ Status ModularFileSystem::DeleteDir(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::RecursivelyCreateDir(
const std::string& dirname /*, TransactionToken* token */) {
Status ModularFileSystem::RecursivelyCreateDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->recursively_create_dir == nullptr)
return FileSystem::RecursivelyCreateDir(dirname);
@ -272,8 +273,8 @@ Status ModularFileSystem::RecursivelyCreateDir(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::CreateDir(
const std::string& dirname /*, TransactionToken* token */) {
Status ModularFileSystem::CreateDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->create_dir == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dirname, " does not support CreateDir()"));
@ -285,9 +286,8 @@ Status ModularFileSystem::CreateDir(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::Stat(
const std::string& fname,
FileStatistics* stat /*, TransactionToken* token */) {
Status ModularFileSystem::Stat(const std::string& fname,
TransactionToken* token, FileStatistics* stat) {
if (ops_->stat == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support Stat()"));
@ -310,8 +310,8 @@ Status ModularFileSystem::Stat(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::IsDirectory(
const std::string& name /*, TransactionToken* token */) {
Status ModularFileSystem::IsDirectory(const std::string& name,
TransactionToken* token) {
if (ops_->is_directory == nullptr) return FileSystem::IsDirectory(name);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
@ -321,9 +321,9 @@ Status ModularFileSystem::IsDirectory(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::GetFileSize(
const std::string& fname,
uint64* file_size /*, TransactionToken* token */) {
Status ModularFileSystem::GetFileSize(const std::string& fname,
TransactionToken* token,
uint64* file_size) {
if (ops_->get_file_size == nullptr) {
FileStatistics stat;
Status status = Stat(fname, &stat);
@ -342,9 +342,9 @@ Status ModularFileSystem::GetFileSize(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::RenameFile(
const std::string& src,
const std::string& target /*, TransactionToken* token */) {
Status ModularFileSystem::RenameFile(const std::string& src,
const std::string& target,
TransactionToken* token) {
if (ops_->rename_file == nullptr) {
Status status = CopyFile(src, target);
if (status.ok()) status = DeleteFile(src);
@ -359,9 +359,9 @@ Status ModularFileSystem::RenameFile(
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::CopyFile(
const std::string& src,
const std::string& target /*, TransactionToken* token */) {
Status ModularFileSystem::CopyFile(const std::string& src,
const std::string& target,
TransactionToken* token) {
if (ops_->copy_file == nullptr) return FileSystem::CopyFile(src, target);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
@ -372,8 +372,7 @@ Status ModularFileSystem::CopyFile(
return StatusFromTF_Status(plugin_status.get());
}
std::string ModularFileSystem::TranslateName(
const std::string& name /*, TransactionToken* token */) const {
std::string ModularFileSystem::TranslateName(const std::string& name) const {
if (ops_->translate_name == nullptr) return FileSystem::TranslateName(name);
char* p = ops_->translate_name(filesystem_.get(), name.c_str());
@ -385,7 +384,7 @@ std::string ModularFileSystem::TranslateName(
return ret;
}
void ModularFileSystem::FlushCaches(/*TransactionToken* token*/) {
void ModularFileSystem::FlushCaches(TransactionToken* token) {
if (ops_->flush_caches != nullptr) ops_->flush_caches(filesystem_.get());
}

View File

@ -59,71 +59,48 @@ class ModularFileSystem final : public FileSystem {
~ModularFileSystem() override { ops_->cleanup(filesystem_.get()); }
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewWritableFile(
const std::string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewAppendableFile(
const std::string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
const std::string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const std::string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override;
Status FileExists(
const std::string& fname /*, TransactionToken* token = nullptr */)
override;
const std::string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
Status FileExists(const std::string& fname, TransactionToken* token) override;
bool FilesExist(const std::vector<std::string>& files,
std::vector<Status>*
status /*, TransactionToken* token = nullptr */) override;
Status GetChildren(
const std::string& dir,
std::vector<std::string>* result /*, TransactionToken* token = nullptr */)
override;
Status GetMatchingPaths(
const std::string& pattern,
std::vector<std::string>*
results /*, TransactionToken* token = nullptr */) override;
Status DeleteFile(
const std::string& fname /*, TransactionToken* token = nullptr */)
override;
Status DeleteRecursively(
const std::string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override;
Status DeleteDir(
const std::string& dirname /*, TransactionToken* token = nullptr */)
override;
Status RecursivelyCreateDir(
const std::string& dirname /*, TransactionToken* token = nullptr */)
override;
Status CreateDir(
const std::string& dirname /*, TransactionToken* token = nullptr */)
override;
Status Stat(
const std::string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override;
Status IsDirectory(
const std::string& fname /*, TransactionToken* token = nullptr */)
override;
Status GetFileSize(
const std::string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override;
Status RenameFile(
const std::string& src,
const std::string& target /*, TransactionToken* token = nullptr */)
override;
Status CopyFile(const std::string& src,
const std::string&
target /*, TransactionToken* token = nullptr */) override;
std::string TranslateName(
const std::string& name /*, TransactionToken* token = nullptr */)
const override;
void FlushCaches(/* TransactionToken* token=nullptr */) override;
TransactionToken* token,
std::vector<Status>* status) override;
Status GetChildren(const std::string& dir, TransactionToken* token,
std::vector<std::string>* result) override;
Status GetMatchingPaths(const std::string& pattern, TransactionToken* token,
std::vector<std::string>* results) override;
Status DeleteFile(const std::string& fname, TransactionToken* token) override;
Status DeleteRecursively(const std::string& dirname, TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) override;
Status DeleteDir(const std::string& dirname,
TransactionToken* token) override;
Status RecursivelyCreateDir(const std::string& dirname,
TransactionToken* token) override;
Status CreateDir(const std::string& dirname,
TransactionToken* token) override;
Status Stat(const std::string& fname, TransactionToken* token,
FileStatistics* stat) override;
Status IsDirectory(const std::string& fname,
TransactionToken* token) override;
Status GetFileSize(const std::string& fname, TransactionToken* token,
uint64* file_size) override;
Status RenameFile(const std::string& src, const std::string& target,
TransactionToken* token) override;
Status CopyFile(const std::string& src, const std::string& target,
TransactionToken* token) override;
std::string TranslateName(const std::string& name) const override;
void FlushCaches(TransactionToken* token) override;
private:
std::unique_ptr<TF_Filesystem> filesystem_;

View File

@ -13,15 +13,15 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/common_runtime/constant_folding.h"
#include <map>
#include <string>
#include <unordered_map>
#include <vector>
#include "tensorflow/cc/ops/nn_ops.h"
#include "tensorflow/core/common_runtime/constant_folding.h"
#include "tensorflow/cc/ops/array_ops_internal.h"
#include "tensorflow/cc/ops/nn_ops.h"
#include "tensorflow/cc/ops/sendrecv_ops.h"
#include "tensorflow/cc/ops/standard_ops.h"
#include "tensorflow/core/common_runtime/device.h"
@ -687,8 +687,10 @@ class TestTFFileSystem : public ::tensorflow::NullFileSystem {
: ::tensorflow::NullFileSystem(),
data_tensor_(test::AsTensor<double>({1., 2., 3., 4.}, {2, 2})) {}
using ::tensorflow::NullFileSystem::NewReadOnlyMemoryRegionFromFile;
::tensorflow::Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
const string& fname, ::tensorflow::TransactionToken* token,
std::unique_ptr<::tensorflow::ReadOnlyMemoryRegion>* result) override {
if (fname != kTestMemRegionName) {
return ::tensorflow::errors::Unimplemented(

View File

@ -60,8 +60,12 @@ class TestReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
class TestFileSystem : public NullFileSystem {
public:
~TestFileSystem() override = default;
// import non-transactional method from the base class
using NullFileSystem::NewReadOnlyMemoryRegionFromFile;
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
float val = 0;
StringPiece scheme, host, path;

View File

@ -648,7 +648,8 @@ class GcsWritableFile : public WritableFile {
TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
" when composing to ", GetGcsPath());
TF_RETURN_WITH_CONTEXT_IF_ERROR(
filesystem_->DeleteFile(GetGcsPathWithObject(append_object)),
filesystem_->DeleteFile(GetGcsPathWithObject(append_object),
nullptr),
" when cleaning up.");
return Status::OK();
},
@ -929,8 +930,8 @@ GcsFileSystem::GcsFileSystem(
additional_header_(additional_header) {}
Status GcsFileSystem::NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
@ -1231,9 +1232,9 @@ void GcsFileSystem::ClearFileCaches(const string& fname) {
// MatchingPathsCache as well.
}
Status GcsFileSystem::NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
Status GcsFileSystem::NewWritableFile(const string& fname,
TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
@ -1267,11 +1268,11 @@ Status GcsFileSystem::NewWritableFile(
// Reads the file from GCS in chunks and stores it in a tmp file,
// which is then passed to GcsWritableFile.
Status GcsFileSystem::NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
Status GcsFileSystem::NewAppendableFile(const string& fname,
TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
std::unique_ptr<RandomAccessFile> reader;
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
Status status;
uint64 offset = 0;
@ -1330,14 +1331,14 @@ Status GcsFileSystem::NewAppendableFile(
}
Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token */) {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
uint64 size;
TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
std::unique_ptr<char[]> data(new char[size]);
std::unique_ptr<RandomAccessFile> file;
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
StringPiece piece;
TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
@ -1346,8 +1347,7 @@ Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
return Status::OK();
}
Status GcsFileSystem::FileExists(
const string& fname /*, TransactionToken* token */) {
Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
if (object.empty()) {
@ -1561,17 +1561,17 @@ Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
return s;
}
Status GcsFileSystem::GetChildren(
const string& dirname,
std::vector<string>* result /*, TransactionToken* token */) {
Status GcsFileSystem::GetChildren(const string& dirname,
TransactionToken* token,
std::vector<string>* result) {
return GetChildrenBounded(dirname, UINT64_MAX, result,
false /* recursively */,
false /* include_self_directory_marker */);
}
Status GcsFileSystem::GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token */) {
Status GcsFileSystem::GetMatchingPaths(const string& pattern,
TransactionToken* token,
std::vector<string>* results) {
MatchingPathsCache::ComputeFunc compute_func =
[this](const string& pattern, std::vector<string>* results) {
results->clear();
@ -1731,8 +1731,8 @@ Status GcsFileSystem::GetChildrenBounded(const string& dirname,
}
}
Status GcsFileSystem::Stat(
const string& fname, FileStatistics* stat /*, TransactionToken* token */) {
Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) {
if (!stat) {
return errors::Internal("'stat' cannot be nullptr.");
}
@ -1766,8 +1766,7 @@ Status GcsFileSystem::Stat(
return errors::NotFound("The specified path ", fname, " was not found.");
}
Status GcsFileSystem::DeleteFile(
const string& fname /*, TransactionToken* token */) {
Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
@ -1783,8 +1782,8 @@ Status GcsFileSystem::DeleteFile(
return Status::OK();
}
Status GcsFileSystem::CreateDir(
const string& dirname /*, TransactionToken* token */) {
Status GcsFileSystem::CreateDir(const string& dirname,
TransactionToken* token) {
string dirname_with_slash = MaybeAppendSlash(dirname);
VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
<< " and dirname_with_slash: " << dirname_with_slash;
@ -1799,7 +1798,7 @@ Status GcsFileSystem::CreateDir(
dirname_with_slash, " was not found.");
}
if (FileExists(dirname_with_slash).ok()) {
if (FileExists(dirname_with_slash, token).ok()) {
// Use the original name for a correct error here.
VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
return errors::AlreadyExists(dirname);
@ -1833,8 +1832,8 @@ Status GcsFileSystem::CreateDir(
// Checks that the directory is empty (i.e no objects with this prefix exist).
// Deletes the GCS directory marker if it exists.
Status GcsFileSystem::DeleteDir(
const string& dirname /*, TransactionToken* token */) {
Status GcsFileSystem::DeleteDir(const string& dirname,
TransactionToken* token) {
std::vector<string> children;
// A directory is considered empty either if there are no matching objects
// with the corresponding name prefix or if there is exactly one matching
@ -1849,13 +1848,13 @@ Status GcsFileSystem::DeleteDir(
}
if (children.size() == 1 && children[0].empty()) {
// This is the directory marker object. Delete it.
return DeleteFile(MaybeAppendSlash(dirname));
return DeleteFile(MaybeAppendSlash(dirname), token);
}
return Status::OK();
}
Status GcsFileSystem::GetFileSize(
const string& fname, uint64* file_size /*, TransactionToken* token */) {
Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) {
if (!file_size) {
return errors::Internal("'file_size' cannot be nullptr.");
}
@ -1865,14 +1864,14 @@ Status GcsFileSystem::GetFileSize(
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
FileStatistics stat;
TF_RETURN_IF_ERROR(Stat(fname, &stat));
TF_RETURN_IF_ERROR(Stat(fname, token, &stat));
*file_size = stat.length;
return Status::OK();
}
Status GcsFileSystem::RenameFile(
const string& src, const string& target /*, TransactionToken* token */) {
if (!IsDirectory(src).ok()) {
Status GcsFileSystem::RenameFile(const string& src, const string& target,
TransactionToken* token) {
if (!IsDirectory(src, token).ok()) {
return RenameObject(src, target);
}
// Rename all individual objects in the directory one by one.
@ -1930,11 +1929,11 @@ Status GcsFileSystem::RenameObject(const string& src, const string& target) {
// on the server side, we can't just retry the whole RenameFile operation
// because the source object is already gone.
return RetryingUtils::DeleteWithRetries(
[this, &src]() { return DeleteFile(src); }, retry_config_);
[this, &src]() { return DeleteFile(src, nullptr); }, retry_config_);
}
Status GcsFileSystem::IsDirectory(
const string& fname /*, TransactionToken* token */) {
Status GcsFileSystem::IsDirectory(const string& fname,
TransactionToken* token) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
if (object.empty()) {
@ -1960,16 +1959,17 @@ Status GcsFileSystem::IsDirectory(
return errors::NotFound("The specified path ", fname, " was not found.");
}
Status GcsFileSystem::DeleteRecursively(
const string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token */) {
Status GcsFileSystem::DeleteRecursively(const string& dirname,
TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) {
if (!undeleted_files || !undeleted_dirs) {
return errors::Internal(
"'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
}
*undeleted_files = 0;
*undeleted_dirs = 0;
if (!IsDirectory(dirname).ok()) {
if (!IsDirectory(dirname, token).ok()) {
*undeleted_dirs = 1;
return Status(
error::NOT_FOUND,
@ -1987,9 +1987,10 @@ Status GcsFileSystem::DeleteRecursively(
// and therefore RetryingFileSystem won't pay attention to the failures,
// we need to make sure these failures are properly retried.
const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
[this, &full_path]() { return DeleteFile(full_path); }, retry_config_);
[this, &full_path, token]() { return DeleteFile(full_path, token); },
retry_config_);
if (!delete_file_status.ok()) {
if (IsDirectory(full_path).ok()) {
if (IsDirectory(full_path, token).ok()) {
// The object is a directory marker.
(*undeleted_dirs)++;
} else {
@ -2003,7 +2004,7 @@ Status GcsFileSystem::DeleteRecursively(
// Flushes all caches for filesystem metadata and file contents. Useful for
// reclaiming memory once filesystem operations are done (e.g. model is loaded),
// or for resetting the filesystem to a consistent state.
void GcsFileSystem::FlushCaches(/* TransactionToken* token */) {
void GcsFileSystem::FlushCaches(TransactionToken* token) {
tf_shared_lock l(block_cache_lock_);
file_block_cache_->Flush();
stat_cache_->Clear();

View File

@ -125,68 +125,52 @@ class GcsFileSystem : public FileSystem {
std::pair<const string, const string>* additional_header,
bool compose_append);
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override;
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result) /*, TransactionToken* token = nullptr */ override;
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override;
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override;
Status FileExists(const string& fname, TransactionToken* token) override;
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override;
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override;
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override;
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override;
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token = nullptr */)
override;
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* results) override;
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override;
Status DeleteFile(const string& fname, TransactionToken* token) override;
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override;
Status CreateDir(const string& dirname, TransactionToken* token) override;
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override;
Status DeleteDir(const string& dirname, TransactionToken* token) override;
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override;
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override;
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override;
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override;
Status IsDirectory(
const string& fname /*, TransactionToken* token = nullptr */) override;
Status IsDirectory(const string& fname, TransactionToken* token) override;
Status DeleteRecursively(
const string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override;
Status DeleteRecursively(const string& dirname, TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) override;
void FlushCaches(/* TransactionToken* token = nullptr */) override;
void FlushCaches(TransactionToken* token) override;
/// Set an object to collect runtime statistics from the GcsFilesystem.
void SetStats(GcsStatsInterface* stats);

View File

@ -86,7 +86,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -133,7 +134,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -181,7 +183,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_Errors) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -228,7 +231,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_ReadAtEOF) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -269,7 +273,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_CachedOutOfRange) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -320,7 +325,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_CachedNotSequential) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -361,7 +367,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_Growing) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -408,7 +415,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_ReadBackwards) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
StringPiece filename;
TF_EXPECT_OK(file->Name(&filename));
@ -450,7 +458,8 @@ TEST(GcsFileSystemTest,
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
}
TEST(GcsFileSystemTest, NewRandomAccessFile_WithLocationConstraintCaching) {
@ -496,18 +505,18 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithLocationConstraintCaching) {
string bucket = "gs://bucket/random_access.txt";
string another_bucket = "gs://anotherbucket/random_access.txt";
// Multiple calls should only cause one request to the location api.
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
// A new bucket should have one cache miss
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, &file));
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, nullptr, &file));
// And then future calls to both should be cached
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, &file));
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, nullptr, &file));
// Trigger a flush, should then require one more call
fs.FlushCaches();
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
fs.FlushCaches(nullptr);
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
}
TEST(GcsFileSystemTest,
@ -533,10 +542,11 @@ TEST(GcsFileSystemTest,
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
EXPECT_EQ(tensorflow::errors::FailedPrecondition(
"Bucket 'bucket' is in 'barfoo' location, allowed locations "
"are: (us-east1)."),
fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
EXPECT_EQ(
tensorflow::errors::FailedPrecondition(
"Bucket 'bucket' is in 'barfoo' location, allowed locations "
"are: (us-east1)."),
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
}
TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache_DifferentN) {
@ -565,7 +575,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache_DifferentN) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
char small_scratch[3];
StringPiece result;
@ -630,8 +641,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) {
// We are instantiating this in an enclosed scope to make sure after the
// unique ptr goes out of scope, we can still access result.
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt",
nullptr, &file));
// Read the first chunk. The cache will be populated with the first block of
// 9 bytes.
@ -716,7 +727,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_Flush) {
char scratch[100];
StringPiece result;
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
// Read the first chunk. The cache will be populated with the first block of
// 9 bytes.
scratch[5] = 'x';
@ -725,7 +737,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_Flush) {
EXPECT_EQ(scratch[5], 'x'); // Make sure we only copied 4 bytes.
// Flush caches and read the second chunk. This will be a cache miss, and
// the same block will be fetched again.
fs.FlushCaches();
fs.FlushCaches(nullptr);
TF_EXPECT_OK(file->Read(4, 4, &result, scratch));
EXPECT_EQ("4567", result);
}
@ -772,8 +784,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) {
// staleness of the filesystem is > 0, they will share the same blocks.
std::unique_ptr<RandomAccessFile> file1;
std::unique_ptr<RandomAccessFile> file2;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file1));
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file2));
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", nullptr, &file1));
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", nullptr, &file2));
// Reading the first block from file1 should load it once.
TF_EXPECT_OK(file1->Read(0, 8, &result, scratch));
EXPECT_EQ("01234567", result);
@ -834,7 +846,8 @@ TEST(GcsFileSystemTest,
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
char scratch[5];
StringPiece result;
@ -864,7 +877,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoObjectName) {
std::unique_ptr<RandomAccessFile> file;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.NewRandomAccessFile("gs://bucket/", &file).code());
fs.NewRandomAccessFile("gs://bucket/", nullptr, &file).code());
}
TEST(GcsFileSystemTest, NewRandomAccessFile_InconsistentRead) {
@ -897,10 +910,11 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_InconsistentRead) {
// Stat the file first so that the file stats are cached.
FileStatistics stat;
TF_ASSERT_OK(fs.Stat("gs://bucket/random_access.txt", &stat));
TF_ASSERT_OK(fs.Stat("gs://bucket/random_access.txt", nullptr, &stat));
std::unique_ptr<RandomAccessFile> file;
TF_ASSERT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_ASSERT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
char scratch[6];
StringPiece result;
@ -964,14 +978,16 @@ TEST(GcsFileSystemTest, NewWritableFile) {
// Read from the file first, to fill the block cache.
std::unique_ptr<RandomAccessFile> rfile;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/writeable", &rfile));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/path/writeable", nullptr, &rfile));
char scratch[100];
StringPiece result;
TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch));
EXPECT_EQ("0123", result);
// Open the writable file.
std::unique_ptr<WritableFile> wfile;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable", &wfile));
TF_EXPECT_OK(
fs.NewWritableFile("gs://bucket/path/writeable", nullptr, &wfile));
TF_EXPECT_OK(wfile->Append("content1,"));
int64 pos;
TF_EXPECT_OK(wfile->Tell(&pos));
@ -1055,7 +1071,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceeds) {
nullptr /* gcs additional header */, false /* compose append */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
TF_EXPECT_OK(
fs.NewWritableFile("gs://bucket/path/writeable.txt", nullptr, &file));
TF_EXPECT_OK(file->Append("content1,"));
TF_EXPECT_OK(file->Append("content2"));
@ -1127,7 +1144,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) {
// Pull the file's first block into the cache. This will trigger the first
// HTTP request to GCS.
std::unique_ptr<RandomAccessFile> rfile;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/writeable", &rfile));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/path/writeable", nullptr, &rfile));
char scratch[100];
StringPiece result;
TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch));
@ -1135,7 +1153,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) {
// Now write to the same file. Once the write succeeds, the cached block will
// be flushed.
std::unique_ptr<WritableFile> wfile;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable", &wfile));
TF_EXPECT_OK(
fs.NewWritableFile("gs://bucket/path/writeable", nullptr, &wfile));
TF_EXPECT_OK(wfile->Append("content1,"));
TF_EXPECT_OK(wfile->Append("content2"));
// Appending doesn't invalidate the read cache - only flushing does. This read
@ -1213,7 +1232,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadAllAttemptsFail) {
false /* compose append */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
TF_EXPECT_OK(
fs.NewWritableFile("gs://bucket/path/writeable.txt", nullptr, &file));
TF_EXPECT_OK(file->Append("content1,"));
TF_EXPECT_OK(file->Append("content2"));
@ -1277,7 +1297,8 @@ TEST(GcsFileSystemTest, NewWritableFile_UploadReturns410) {
{
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
TF_EXPECT_OK(
fs.NewWritableFile("gs://bucket/path/writeable.txt", nullptr, &file));
TF_EXPECT_OK(file->Append("content1,"));
TF_EXPECT_OK(file->Append("content2"));
@ -1317,7 +1338,7 @@ TEST(GcsFileSystemTest, NewWritableFile_NoObjectName) {
std::unique_ptr<WritableFile> file;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.NewWritableFile("gs://bucket/", &file).code());
fs.NewWritableFile("gs://bucket/", nullptr, &file).code());
}
TEST(GcsFileSystemTest, NewAppendableFile) {
@ -1382,12 +1403,14 @@ TEST(GcsFileSystemTest, NewAppendableFile) {
// Create an appendable file. This should read the file from GCS, and pull its
// contents into the block cache.
std::unique_ptr<WritableFile> wfile;
TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable", &wfile));
TF_EXPECT_OK(
fs.NewAppendableFile("gs://bucket/path/appendable", nullptr, &wfile));
TF_EXPECT_OK(wfile->Append("content2"));
// Verify that the file contents are in the block cache. This read should not
// trigger an HTTP request to GCS.
std::unique_ptr<RandomAccessFile> rfile;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/appendable", &rfile));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/path/appendable", nullptr, &rfile));
char scratch[100];
StringPiece result;
TF_EXPECT_OK(rfile->Read(0, 8, &result, scratch));
@ -1416,7 +1439,7 @@ TEST(GcsFileSystemTest, NewAppendableFile_NoObjectName) {
std::unique_ptr<WritableFile> file;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.NewAppendableFile("gs://bucket/", &file).code());
fs.NewAppendableFile("gs://bucket/", nullptr, &file).code());
}
TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile) {
@ -1450,7 +1473,7 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile) {
std::unique_ptr<ReadOnlyMemoryRegion> region;
TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile(
"gs://bucket/path/random_access.txt", &region));
"gs://bucket/path/random_access.txt", nullptr, &region));
EXPECT_EQ(content, StringPiece(reinterpret_cast<const char*>(region->data()),
region->length()));
@ -1471,7 +1494,8 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile_NoObjectName) {
std::unique_ptr<ReadOnlyMemoryRegion> region;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.NewReadOnlyMemoryRegionFromFile("gs://bucket/", &region).code());
fs.NewReadOnlyMemoryRegionFromFile("gs://bucket/", nullptr, &region)
.code());
}
TEST(GcsFileSystemTest, FileExists_YesAsObject) {
@ -1493,7 +1517,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsObject) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt"));
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt", nullptr));
}
TEST(GcsFileSystemTest, FileExists_YesAsFolder) {
@ -1523,7 +1547,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsFolder) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder"));
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder", nullptr));
}
TEST(GcsFileSystemTest, FileExists_YesAsBucket) {
@ -1549,8 +1573,8 @@ TEST(GcsFileSystemTest, FileExists_YesAsBucket) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.FileExists("gs://bucket1"));
TF_EXPECT_OK(fs.FileExists("gs://bucket1/"));
TF_EXPECT_OK(fs.FileExists("gs://bucket1", nullptr));
TF_EXPECT_OK(fs.FileExists("gs://bucket1/", nullptr));
}
TEST(GcsFileSystemTest, FileExists_NotAsObjectOrFolder) {
@ -1580,7 +1604,7 @@ TEST(GcsFileSystemTest, FileExists_NotAsObjectOrFolder) {
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(errors::Code::NOT_FOUND,
fs.FileExists("gs://bucket/path/file1.txt").code());
fs.FileExists("gs://bucket/path/file1.txt", nullptr).code());
}
TEST(GcsFileSystemTest, FileExists_NotAsBucket) {
@ -1606,9 +1630,9 @@ TEST(GcsFileSystemTest, FileExists_NotAsBucket) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.FileExists("gs://bucket2/").code());
fs.FileExists("gs://bucket2/", nullptr).code());
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.FileExists("gs://bucket2").code());
fs.FileExists("gs://bucket2", nullptr).code());
}
TEST(GcsFileSystemTest, FileExists_StatCache) {
@ -1648,8 +1672,8 @@ TEST(GcsFileSystemTest, FileExists_StatCache) {
// The stat cache will ensure that repeated lookups don't trigger additional
// HTTP requests.
for (int i = 0; i < 10; i++) {
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt"));
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder/"));
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt", nullptr));
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder/", nullptr));
}
}
@ -1672,8 +1696,8 @@ TEST(GcsFileSystemTest, FileExists_DirectoryMark) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.FileExists("gs://bucket/dir/"));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/dir/"));
TF_EXPECT_OK(fs.FileExists("gs://bucket/dir/", nullptr));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/dir/", nullptr));
}
TEST(GcsFileSystemTest, GetChildren_NoItems) {
@ -1696,7 +1720,7 @@ TEST(GcsFileSystemTest, GetChildren_NoItems) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
EXPECT_EQ(std::vector<string>({"subpath/"}), children);
}
@ -1724,7 +1748,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
EXPECT_EQ(std::vector<string>({"file1.txt", "file3.txt", "subpath/"}),
children);
@ -1753,7 +1777,7 @@ TEST(GcsFileSystemTest, GetChildren_SelfDirectoryMarker) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
EXPECT_EQ(std::vector<string>({"file3.txt", "subpath/"}), children);
}
@ -1781,7 +1805,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles_NoSlash) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children));
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", nullptr, &children));
EXPECT_EQ(std::vector<string>({"file1.txt", "file3.txt", "subpath/"}),
children);
@ -1806,7 +1830,7 @@ TEST(GcsFileSystemTest, GetChildren_Root) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", &children));
TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", nullptr, &children));
EXPECT_EQ(0, children.size());
}
@ -1831,7 +1855,7 @@ TEST(GcsFileSystemTest, GetChildren_Empty) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
EXPECT_EQ(0, children.size());
}
@ -1872,7 +1896,7 @@ TEST(GcsFileSystemTest, GetChildren_Pagination) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children));
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", nullptr, &children));
EXPECT_EQ(std::vector<string>({"file1.txt", "file3.txt", "subpath/",
"file4.txt", "file5.txt"}),
@ -1899,8 +1923,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_NoWildcard) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> result;
TF_EXPECT_OK(
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
result);
}
@ -1927,7 +1951,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_BucketAndWildcard) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/file1.txt",
"gs://bucket/path/file3.txt",
"gs://bucket/path/subpath"}),
@ -1956,7 +1980,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_Matches) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file2.txt", &result));
TF_EXPECT_OK(
fs.GetMatchingPaths("gs://bucket/path/*/file2.txt", nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
result);
}
@ -1982,7 +2007,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SelfDirectoryMarker) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/file3.txt"}), result);
}
@ -2007,7 +2032,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SlashInObjectName) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", nullptr, &result));
EXPECT_EQ(std::vector<string>(), result);
}
@ -2032,7 +2057,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SlashInObjectNameEscaped) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/\\/*", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/\\/*", nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path//foo.txt"}), result);
}
@ -2058,7 +2083,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_NoMatches) {
nullptr /* gcs additional header */, false /* compose append */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file3.txt", &result));
TF_EXPECT_OK(
fs.GetMatchingPaths("gs://bucket/path/*/file3.txt", nullptr, &result));
EXPECT_EQ(std::vector<string>(), result);
}
@ -2077,7 +2103,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_OnlyWildcard) {
std::vector<string> result;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.GetMatchingPaths("gs://*", &result).code());
fs.GetMatchingPaths("gs://*", nullptr, &result).code());
}
TEST(GcsFileSystemTest, GetMatchingPaths_Cache) {
@ -2113,11 +2139,11 @@ TEST(GcsFileSystemTest, GetMatchingPaths_Cache) {
// any additional HTTP requests to GCS.
for (int i = 0; i < 10; i++) {
std::vector<string> result;
TF_EXPECT_OK(
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
result);
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/file1.txt",
"gs://bucket/path/file3.txt",
"gs://bucket/path/subpath"}),
@ -2155,17 +2181,17 @@ TEST(GcsFileSystemTest, GetMatchingPaths_Cache_Flush) {
// This loop should trigger the first HTTP request to GCS.
for (int i = 0; i < 10; i++) {
std::vector<string> result;
TF_EXPECT_OK(
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
result);
}
// After flushing caches, there should be another (identical) request to GCS.
fs.FlushCaches();
fs.FlushCaches(nullptr);
for (int i = 0; i < 10; i++) {
std::vector<string> result;
TF_EXPECT_OK(
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
nullptr, &result));
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
result);
}
@ -2220,11 +2246,12 @@ TEST(GcsFileSystemTest, DeleteFile) {
char scratch[100];
StringPiece result;
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/file1.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/path/file1.txt", nullptr, &file));
TF_EXPECT_OK(file->Read(0, 8, &result, scratch));
EXPECT_EQ("01234567", result);
// Deleting the file triggers the next HTTP request to GCS.
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/path/file1.txt"));
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/path/file1.txt", nullptr));
// Re-reading the file causes its contents to be reloaded from GCS and not
// from the block cache.
TF_EXPECT_OK(file->Read(0, 8, &result, scratch));
@ -2245,7 +2272,7 @@ TEST(GcsFileSystemTest, DeleteFile_NoObjectName) {
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.DeleteFile("gs://bucket/").code());
fs.DeleteFile("gs://bucket/", nullptr).code());
}
TEST(GcsFileSystemTest, DeleteFile_StatCacheRemoved) {
@ -2289,14 +2316,15 @@ TEST(GcsFileSystemTest, DeleteFile_StatCacheRemoved) {
// Stats the file first so the stat is cached.
FileStatistics stat_before_deletion;
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat_before_deletion));
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat_before_deletion));
EXPECT_EQ(1010, stat_before_deletion.length);
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/file.txt"));
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/file.txt", nullptr));
FileStatistics stat_after_deletion;
EXPECT_EQ(error::Code::NOT_FOUND,
fs.Stat("gs://bucket/file.txt", &stat_after_deletion).code());
EXPECT_EQ(
error::Code::NOT_FOUND,
fs.Stat("gs://bucket/file.txt", nullptr, &stat_after_deletion).code());
}
TEST(GcsFileSystemTest, DeleteDir_Empty) {
@ -2317,7 +2345,7 @@ TEST(GcsFileSystemTest, DeleteDir_Empty) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/"));
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/", nullptr));
}
TEST(GcsFileSystemTest, DeleteDir_OnlyDirMarkerLeft) {
@ -2346,7 +2374,7 @@ TEST(GcsFileSystemTest, DeleteDir_OnlyDirMarkerLeft) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/"));
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/", nullptr));
}
TEST(GcsFileSystemTest, DeleteDir_BucketOnly) {
@ -2366,7 +2394,7 @@ TEST(GcsFileSystemTest, DeleteDir_BucketOnly) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.DeleteDir("gs://bucket"));
TF_EXPECT_OK(fs.DeleteDir("gs://bucket", nullptr));
}
TEST(GcsFileSystemTest, DeleteDir_NonEmpty) {
@ -2389,7 +2417,7 @@ TEST(GcsFileSystemTest, DeleteDir_NonEmpty) {
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(error::Code::FAILED_PRECONDITION,
fs.DeleteDir("gs://bucket/path/").code());
fs.DeleteDir("gs://bucket/path/", nullptr).code());
}
TEST(GcsFileSystemTest, GetFileSize) {
@ -2412,7 +2440,7 @@ TEST(GcsFileSystemTest, GetFileSize) {
nullptr /* gcs additional header */, false /* compose append */);
uint64 size;
TF_EXPECT_OK(fs.GetFileSize("gs://bucket/file.txt", &size));
TF_EXPECT_OK(fs.GetFileSize("gs://bucket/file.txt", nullptr, &size));
EXPECT_EQ(1010, size);
}
@ -2431,7 +2459,7 @@ TEST(GcsFileSystemTest, GetFileSize_NoObjectName) {
uint64 size;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.GetFileSize("gs://bucket/", &size).code());
fs.GetFileSize("gs://bucket/", nullptr, &size).code());
}
TEST(GcsFileSystemTest, RenameFile_Folder) {
@ -2515,7 +2543,8 @@ TEST(GcsFileSystemTest, RenameFile_Folder) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path1", "gs://bucket/path2/"));
TF_EXPECT_OK(
fs.RenameFile("gs://bucket/path1", "gs://bucket/path2/", nullptr));
}
TEST(GcsFileSystemTest, RenameFile_Object) {
@ -2612,15 +2641,17 @@ TEST(GcsFileSystemTest, RenameFile_Object) {
StringPiece result;
std::unique_ptr<RandomAccessFile> src;
std::unique_ptr<RandomAccessFile> dst;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/src.txt", &src));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/path/src.txt", nullptr, &src));
TF_EXPECT_OK(src->Read(0, 8, &result, scratch));
EXPECT_EQ("01234567", result);
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/dst.txt", &dst));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/path/dst.txt", nullptr, &dst));
TF_EXPECT_OK(dst->Read(0, 8, &result, scratch));
EXPECT_EQ("76543210", result);
// Now rename src to dst. This should flush the block cache for both files.
TF_EXPECT_OK(
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path/src.txt",
"gs://bucket/path/dst.txt", nullptr));
// Re-read both files. This should reload their contents from GCS.
TF_EXPECT_OK(src->Read(0, 8, &result, scratch));
EXPECT_EQ("89abcdef", result);
@ -2690,14 +2721,16 @@ TEST(GcsFileSystemTest, RenameFile_Object_FlushTargetStatCache) {
// Do an initial stat of the destination file to load their contents into the
// stat cache.
FileStatistics stat_before_renaming;
TF_EXPECT_OK(fs.Stat("gs://bucket/path/dst.txt", &stat_before_renaming));
TF_EXPECT_OK(
fs.Stat("gs://bucket/path/dst.txt", nullptr, &stat_before_renaming));
EXPECT_EQ(1000, stat_before_renaming.length);
TF_EXPECT_OK(
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path/src.txt",
"gs://bucket/path/dst.txt", nullptr));
FileStatistics stat_after_renaming;
TF_EXPECT_OK(fs.Stat("gs://bucket/path/dst.txt", &stat_after_renaming));
TF_EXPECT_OK(
fs.Stat("gs://bucket/path/dst.txt", nullptr, &stat_after_renaming));
EXPECT_EQ(1010, stat_after_renaming.length);
}
@ -2755,8 +2788,8 @@ TEST(GcsFileSystemTest, RenameFile_Object_DeletionRetried) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path/src.txt",
"gs://bucket/path/dst.txt", nullptr));
}
/// Tests the case when rewrite couldn't complete in one RPC.
@ -2797,10 +2830,10 @@ TEST(GcsFileSystemTest, RenameFile_Object_Incomplete) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(
errors::Code::UNIMPLEMENTED,
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")
.code());
EXPECT_EQ(errors::Code::UNIMPLEMENTED,
fs.RenameFile("gs://bucket/path/src.txt",
"gs://bucket/path/dst.txt", nullptr)
.code());
}
TEST(GcsFileSystemTest, Stat_Object) {
@ -2823,7 +2856,7 @@ TEST(GcsFileSystemTest, Stat_Object) {
nullptr /* gcs additional header */, false /* compose append */);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
EXPECT_EQ(1010, stat.length);
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
EXPECT_FALSE(stat.is_directory);
@ -2857,7 +2890,7 @@ TEST(GcsFileSystemTest, Stat_Folder) {
nullptr /* gcs additional header */, false /* compose append */);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder", nullptr, &stat));
EXPECT_EQ(0, stat.length);
EXPECT_EQ(0, stat.mtime_nsec);
EXPECT_TRUE(stat.is_directory);
@ -2890,7 +2923,8 @@ TEST(GcsFileSystemTest, Stat_ObjectOrFolderNotFound) {
nullptr /* gcs additional header */, false /* compose append */);
FileStatistics stat;
EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/path", &stat).code());
EXPECT_EQ(error::Code::NOT_FOUND,
fs.Stat("gs://bucket/path", nullptr, &stat).code());
}
TEST(GcsFileSystemTest, Stat_Bucket) {
@ -2911,7 +2945,7 @@ TEST(GcsFileSystemTest, Stat_Bucket) {
nullptr /* gcs additional header */, false /* compose append */);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/", nullptr, &stat));
EXPECT_EQ(0, stat.length);
EXPECT_EQ(0, stat.mtime_nsec);
EXPECT_TRUE(stat.is_directory);
@ -2935,7 +2969,8 @@ TEST(GcsFileSystemTest, Stat_BucketNotFound) {
nullptr /* gcs additional header */, false /* compose append */);
FileStatistics stat;
EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/", &stat).code());
EXPECT_EQ(error::Code::NOT_FOUND,
fs.Stat("gs://bucket/", nullptr, &stat).code());
}
TEST(GcsFileSystemTest, Stat_Cache) {
@ -2976,11 +3011,11 @@ TEST(GcsFileSystemTest, Stat_Cache) {
// HTTP requests to GCS.
for (int i = 0; i < 10; i++) {
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
EXPECT_EQ(1010, stat.length);
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
EXPECT_FALSE(stat.is_directory);
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder/", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder/", nullptr, &stat));
EXPECT_EQ(0, stat.length);
EXPECT_EQ(0, stat.mtime_nsec);
EXPECT_TRUE(stat.is_directory);
@ -3016,16 +3051,16 @@ TEST(GcsFileSystemTest, Stat_Cache_Flush) {
// There should be a single HTTP request to GCS for fs.Stat in this loop.
for (int i = 0; i < 10; i++) {
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
EXPECT_EQ(1010, stat.length);
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
EXPECT_FALSE(stat.is_directory);
}
// After flushing caches, there should be a second request to GCS for fs.Stat.
fs.FlushCaches();
fs.FlushCaches(nullptr);
for (int i = 0; i < 10; i++) {
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
EXPECT_EQ(1010, stat.length);
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
EXPECT_FALSE(stat.is_directory);
@ -3052,7 +3087,7 @@ TEST(GcsFileSystemTest, Stat_FilenameEndingWithSlash) {
nullptr /* gcs additional header */, false /* compose append */);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/dir/", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/dir/", nullptr, &stat));
EXPECT_EQ(5, stat.length);
EXPECT_TRUE(stat.is_directory);
}
@ -3084,7 +3119,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotFound) {
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(error::Code::NOT_FOUND,
fs.IsDirectory("gs://bucket/file.txt").code());
fs.IsDirectory("gs://bucket/file.txt", nullptr).code());
}
TEST(GcsFileSystemTest, IsDirectory_NotDirectoryButObject) {
@ -3115,7 +3150,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotDirectoryButObject) {
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(error::Code::FAILED_PRECONDITION,
fs.IsDirectory("gs://bucket/file.txt").code());
fs.IsDirectory("gs://bucket/file.txt", nullptr).code());
}
TEST(GcsFileSystemTest, IsDirectory_Yes) {
@ -3145,8 +3180,8 @@ TEST(GcsFileSystemTest, IsDirectory_Yes) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder"));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder/"));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder", nullptr));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder/", nullptr));
}
TEST(GcsFileSystemTest, IsDirectory_Bucket) {
@ -3172,8 +3207,8 @@ TEST(GcsFileSystemTest, IsDirectory_Bucket) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.IsDirectory("gs://bucket"));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/"));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket", nullptr));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/", nullptr));
}
TEST(GcsFileSystemTest, IsDirectory_BucketNotFound) {
@ -3193,7 +3228,8 @@ TEST(GcsFileSystemTest, IsDirectory_BucketNotFound) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
EXPECT_EQ(error::Code::NOT_FOUND, fs.IsDirectory("gs://bucket/").code());
EXPECT_EQ(error::Code::NOT_FOUND,
fs.IsDirectory("gs://bucket/", nullptr).code());
}
TEST(GcsFileSystemTest, CreateDir_Folder) {
@ -3250,15 +3286,15 @@ TEST(GcsFileSystemTest, CreateDir_Folder) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath"));
TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath", nullptr));
// Check that when GCS returns the object already exists return that the
// directory already exists.
EXPECT_EQ(errors::AlreadyExists("gs://bucket/subpath"),
fs.CreateDir("gs://bucket/subpath"));
fs.CreateDir("gs://bucket/subpath", nullptr));
// Check that when GCS returns the object already has a version (failed
// precondition) return directory already exists.
EXPECT_EQ(errors::AlreadyExists("gs://bucket/subpath"),
fs.CreateDir("gs://bucket/subpath"));
fs.CreateDir("gs://bucket/subpath", nullptr));
}
TEST(GcsFileSystemTest, CreateDir_Bucket) {
@ -3284,8 +3320,8 @@ TEST(GcsFileSystemTest, CreateDir_Bucket) {
kTestTimeoutConfig, *kAllowedLocationsDefault,
nullptr /* gcs additional header */, false /* compose append */);
TF_EXPECT_OK(fs.CreateDir("gs://bucket/"));
TF_EXPECT_OK(fs.CreateDir("gs://bucket"));
TF_EXPECT_OK(fs.CreateDir("gs://bucket/", nullptr));
TF_EXPECT_OK(fs.CreateDir("gs://bucket", nullptr));
}
TEST(GcsFileSystemTest, DeleteRecursively_Ok) {
@ -3357,8 +3393,8 @@ TEST(GcsFileSystemTest, DeleteRecursively_Ok) {
nullptr /* gcs additional header */, false /* compose append */);
int64 undeleted_files, undeleted_dirs;
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
&undeleted_dirs));
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", nullptr,
&undeleted_files, &undeleted_dirs));
EXPECT_EQ(0, undeleted_files);
EXPECT_EQ(0, undeleted_dirs);
}
@ -3450,8 +3486,8 @@ TEST(GcsFileSystemTest, DeleteRecursively_DeletionErrors) {
nullptr /* gcs additional header */, false /* compose append */);
int64 undeleted_files, undeleted_dirs;
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
&undeleted_dirs));
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", nullptr,
&undeleted_files, &undeleted_dirs));
EXPECT_EQ(1, undeleted_files);
EXPECT_EQ(1, undeleted_dirs);
}
@ -3486,7 +3522,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_NotAFolder) {
int64 undeleted_files, undeleted_dirs;
EXPECT_EQ(error::Code::NOT_FOUND,
fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
fs.DeleteRecursively("gs://bucket/path", nullptr, &undeleted_files,
&undeleted_dirs)
.code());
EXPECT_EQ(0, undeleted_files);
@ -3501,7 +3537,7 @@ TEST(GcsFileSystemTest, NoConstraintsEnvironmentVariableTest) {
// Cover cache initialization code, any uninitialized cache will cause this to
// fail
fs1.FlushCaches();
fs1.FlushCaches(nullptr);
}
TEST(GcsFileSystemTest, BucketLocationConstraintEnvironmentVariableTest) {
@ -3715,7 +3751,7 @@ TEST(GcsFileSystemTest, Stat_StatsRecording) {
EXPECT_EQ(stats.fs_, &fs);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
EXPECT_EQ(1, stats.stat_object_request_count_);
}
@ -3742,7 +3778,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_StatsRecording) {
EXPECT_EQ(stats.fs_, &fs);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
char scratch[6];
StringPiece result;
@ -3883,8 +3920,8 @@ TEST(GcsFileSystemTest, NewAppendableFile_MultipleFlushesWithCompose) {
// Create an appendable file. This should read the file from GCS, and pull its
// contents into the block cache.
std::unique_ptr<WritableFile> wfile;
TF_EXPECT_OK(
fs.NewAppendableFile("gs://bucket/some/path/appendable", &wfile));
TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/some/path/appendable", nullptr,
&wfile));
TF_EXPECT_OK(wfile->Append(contents[1]));
TF_EXPECT_OK(wfile->Flush());
TF_EXPECT_OK(wfile->Append(contents[2]));
@ -3981,7 +4018,8 @@ TEST(GcsFileSystemTest, NewAppendableFile_MultipleFlushesWithoutCompose) {
// Create an appendable file. This should read the file from GCS, and pull its
// contents into the block cache.
std::unique_ptr<WritableFile> wfile;
TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable", &wfile));
TF_EXPECT_OK(
fs.NewAppendableFile("gs://bucket/path/appendable", nullptr, &wfile));
TF_EXPECT_OK(wfile->Append(contents[1]));
TF_EXPECT_OK(wfile->Flush());
TF_EXPECT_OK(wfile->Append(contents[2]));

View File

@ -295,7 +295,9 @@ TEST_F(DefaultEnvTest, SleepForMicroseconds) {
class TmpDirFileSystem : public NullFileSystem {
public:
Status FileExists(const string& dir) override {
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status FileExists(const string& dir, TransactionToken* token) override {
StringPiece scheme, host, path;
io::ParseURI(dir, &scheme, &host, &path);
if (path.empty()) return errors::NotFound(dir, " not found");
@ -311,7 +313,7 @@ class TmpDirFileSystem : public NullFileSystem {
return Env::Default()->FileExists(io::JoinPath(BaseDir(), path));
}
Status CreateDir(const string& dir) override {
Status CreateDir(const string& dir, TransactionToken* token) override {
StringPiece scheme, host, path;
io::ParseURI(dir, &scheme, &host, &path);
if (scheme != "tmpdirfs") {
@ -328,7 +330,7 @@ class TmpDirFileSystem : public NullFileSystem {
return status;
}
Status IsDirectory(const string& dir) override {
Status IsDirectory(const string& dir, TransactionToken* token) override {
StringPiece scheme, host, path;
io::ParseURI(dir, &scheme, &host, &path);
for (const auto& existing_dir : created_directories_)
@ -336,7 +338,7 @@ class TmpDirFileSystem : public NullFileSystem {
return errors::NotFound(dir, " not found");
}
void FlushCaches() override { flushed_ = true; }
void FlushCaches(TransactionToken* token) override { flushed_ = true; }
private:
bool flushed_ = false;

View File

@ -481,7 +481,7 @@ class FileSystem {
/// \brief Starts a new transaction
virtual tensorflow::Status StartTransaction(TransactionToken** token) {
token = nullptr;
*token = nullptr;
return Status::OK();
}
@ -499,15 +499,15 @@ class FileSystem {
/// \brief Get token for `path` or start a new transaction and add `path` to
/// it.
virtual tensorflow::Status GetTokenOrStartTransaction(
const std::string& path, TransactionToken** token) {
token = nullptr;
const string& path, TransactionToken** token) {
*token = nullptr;
return Status::OK();
}
/// \brief Return transaction for `path` or nullptr in `token`
virtual tensorflow::Status GetTransactionForPath(const std::string& path,
TransactionToken** token) {
token = nullptr;
*token = nullptr;
return Status::OK();
}
@ -518,6 +518,30 @@ class FileSystem {
virtual ~FileSystem() = default;
};
/// This macro adds forwarding methods from FileSystem class to
/// used class since name hiding will prevent these to be accessed from
/// derived classes and would require all use locations to migrate to
/// Transactional API. This is an interim solution until ModularFileSystem class
/// becomes a singleton.
// TODO(sami): Remove this macro when filesystem plugins migration is complete.
#define TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT \
using FileSystem::NewRandomAccessFile; \
using FileSystem::NewWritableFile; \
using FileSystem::NewAppendableFile; \
using FileSystem::NewReadOnlyMemoryRegionFromFile; \
using FileSystem::FileExists; \
using FileSystem::GetChildren; \
using FileSystem::GetMatchingPaths; \
using FileSystem::Stat; \
using FileSystem::DeleteFile; \
using FileSystem::RecursivelyCreateDir; \
using FileSystem::DeleteDir; \
using FileSystem::DeleteRecursively; \
using FileSystem::GetFileSize; \
using FileSystem::RenameFile; \
using FileSystem::CopyFile; \
using FileSystem::IsDirectory; \
using FileSystem::FlushCaches
/// A Wrapper class for Transactional FileSystem support.
/// This provides means to make use of the transactions with minimal code change
@ -529,6 +553,8 @@ class FileSystem {
/// transactional filesystem access with minimal code change.
class WrappedFileSystem : public FileSystem {
public:
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
tensorflow::Status NewRandomAccessFile(
const std::string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
@ -691,31 +717,6 @@ class WrappedFileSystem : public FileSystem {
TransactionToken* token_;
};
/// This macro adds forwarding methods from FileSystem class to
/// used class since name hiding will prevent these to be accessed from
/// derived classes and would require all use locations to migrate to
/// Transactional API. This is an interim solution until ModularFileSystem class
/// becomes a singleton.
// TODO(sami): Remove this macro when filesystem plugins migration is complete.
#define TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT \
using FileSystem::NewRandomAccessFile; \
using FileSystem::NewWritableFile; \
using FileSystem::NewAppendableFile; \
using FileSystem::NewReadOnlyMemoryRegionFromFile; \
using FileSystem::FileExists; \
using FileSystem::GetChildren; \
using FileSystem::GetMatchingPaths; \
using FileSystem::Stat; \
using FileSystem::DeleteFile; \
using FileSystem::RecursivelyCreateDir; \
using FileSystem::DeleteDir; \
using FileSystem::DeleteRecursively; \
using FileSystem::GetFileSize; \
using FileSystem::RenameFile; \
using FileSystem::CopyFile; \
using FileSystem::IsDirectory; \
using FileSystem::FlushCaches
/// A file abstraction for randomly reading the contents of a file.
class RandomAccessFile {
public:

View File

@ -32,7 +32,9 @@ static const char* const kPrefix = "ipfs://solarsystem";
// cannot have children further.
class InterPlanetaryFileSystem : public NullFileSystem {
public:
Status FileExists(const string& fname) override {
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status FileExists(const string& fname, TransactionToken* token) override {
string parsed_path;
ParsePath(fname, &parsed_path);
if (BodyExists(parsed_path)) {
@ -42,7 +44,7 @@ class InterPlanetaryFileSystem : public NullFileSystem {
}
// Adds the dir to the parent's children list and creates an entry for itself.
Status CreateDir(const string& dirname) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
string parsed_path;
ParsePath(dirname, &parsed_path);
// If the directory already exists, throw an error.
@ -88,7 +90,7 @@ class InterPlanetaryFileSystem : public NullFileSystem {
return Status(tensorflow::error::FAILED_PRECONDITION, "Failed to create");
}
Status IsDirectory(const string& dirname) override {
Status IsDirectory(const string& dirname, TransactionToken* token) override {
string parsed_path;
ParsePath(dirname, &parsed_path);
// Simulate evil_directory has bad permissions by throwing a LOG(FATAL)
@ -105,8 +107,9 @@ class InterPlanetaryFileSystem : public NullFileSystem {
return Status(tensorflow::error::FAILED_PRECONDITION, "Not a dir");
}
Status GetChildren(const string& dir, std::vector<string>* result) override {
TF_RETURN_IF_ERROR(IsDirectory(dir));
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
TF_RETURN_IF_ERROR(IsDirectory(dir, nullptr));
string parsed_path;
ParsePath(dir, &parsed_path);
result->insert(result->begin(), celestial_bodies_[parsed_path].begin(),
@ -151,8 +154,8 @@ class InterPlanetaryFileSystem : public NullFileSystem {
// common prefix of BaseDir().
string Match(InterPlanetaryFileSystem* ipfs, const string& suffix_pattern) {
std::vector<string> results;
Status s =
ipfs->GetMatchingPaths(ipfs->JoinPath(kPrefix, suffix_pattern), &results);
Status s = ipfs->GetMatchingPaths(ipfs->JoinPath(kPrefix, suffix_pattern),
nullptr, &results);
if (!s.ok()) {
return s.ToString();
} else {
@ -179,18 +182,18 @@ TEST(InterPlanetaryFileSystemTest, IPFSMatch) {
// Returns Jupiter's and Earth's moons.
EXPECT_EQ(Match(&ipfs, "*/*"),
"Earth/Moon,Jupiter/Europa,Jupiter/Ganymede,Jupiter/Io");
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet0")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet1")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet0"), nullptr));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet1"), nullptr));
EXPECT_EQ(Match(&ipfs, "Planet[0-1]"), "Planet0,Planet1");
EXPECT_EQ(Match(&ipfs, "Planet?"), "Planet0,Planet1");
}
TEST(InterPlanetaryFileSystemTest, MatchSimple) {
InterPlanetaryFileSystem ipfs;
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-00")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-0a")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-01")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-aaa")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-00"), nullptr));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-0a"), nullptr));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-01"), nullptr));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-aaa"), nullptr));
EXPECT_EQ(Match(&ipfs, "match-*"), "match-00,match-01,match-0a,match-aaa");
EXPECT_EQ(Match(&ipfs, "match-0[0-9]"), "match-00,match-01");
@ -203,22 +206,23 @@ TEST(InterPlanetaryFileSystemTest, MatchSimple) {
// that evil_directory isn't accessed.
TEST(InterPlanetaryFileSystemTest, MatchOnlyNeeded) {
InterPlanetaryFileSystem ipfs;
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "abcd")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "evil_directory")));
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "abcd"), nullptr));
TF_EXPECT_OK(
ipfs.CreateDir(ipfs.JoinPath(kPrefix, "evil_directory"), nullptr));
EXPECT_EQ(Match(&ipfs, "abcd"), "abcd");
}
TEST(InterPlanetaryFileSystemTest, MatchDirectory) {
InterPlanetaryFileSystem ipfs;
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/x")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-0a/abc/x")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/x")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-aaa/abc/x")));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-00/abc/x"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-0a/abc/x"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-01/abc/x"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-aaa/abc/x"), nullptr));
EXPECT_EQ(Match(&ipfs, "match-*/abc/x"),
"match-00/abc/x,match-01/abc/x,match-0a/abc/x,match-aaa/abc/x");
@ -232,20 +236,20 @@ TEST(InterPlanetaryFileSystemTest, MatchDirectory) {
TEST(InterPlanetaryFileSystemTest, MatchMultipleWildcards) {
InterPlanetaryFileSystem ipfs;
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/00")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/01")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/09")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/00")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/04")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/10")));
TF_EXPECT_OK(
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-02/abc/00")));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-00/abc/00"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-00/abc/01"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-00/abc/09"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-01/abc/00"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-01/abc/04"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-01/abc/10"), nullptr));
TF_EXPECT_OK(ipfs.RecursivelyCreateDir(
ipfs.JoinPath(kPrefix, "match-02/abc/00"), nullptr));
EXPECT_EQ(Match(&ipfs, "match-0[0-1]/abc/0[0-8]"),
"match-00/abc/00,match-00/abc/01,match-01/abc/00,match-01/abc/04");
@ -273,7 +277,7 @@ TEST(InterPlanetaryFileSystemTest, HasAtomicMove) {
class TestFileSystem : public NullFileSystem {
public:
// Only allow for a single root directory.
Status IsDirectory(const string& dirname) override {
Status IsDirectory(const string& dirname, TransactionToken* token) override {
if (dirname == "." || dirname.empty()) {
return Status::OK();
}
@ -281,7 +285,8 @@ class TestFileSystem : public NullFileSystem {
}
// Simulating a FS with a root dir and a single file underneath it.
Status GetChildren(const string& dir, std::vector<string>* result) override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
if (dir == "." || dir.empty()) {
result->push_back("test");
}
@ -293,10 +298,10 @@ class TestFileSystem : public NullFileSystem {
TEST(TestFileSystemTest, RootDirectory) {
TestFileSystem fs;
std::vector<string> results;
auto ret = fs.GetMatchingPaths("./te*", &results);
auto ret = fs.GetMatchingPaths("./te*", nullptr, &results);
EXPECT_EQ(1, results.size());
EXPECT_EQ("./test", results[0]);
ret = fs.GetMatchingPaths("te*", &results);
ret = fs.GetMatchingPaths("te*", nullptr, &results);
EXPECT_EQ(1, results.size());
EXPECT_EQ("./test", results[0]);
}

View File

@ -36,84 +36,69 @@ class NullFileSystem : public FileSystem {
~NullFileSystem() override = default;
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
return errors::Unimplemented("NewRandomAccessFile unimplemented");
}
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
return errors::Unimplemented("NewWritableFile unimplemented");
}
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
return errors::Unimplemented("NewAppendableFile unimplemented");
}
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
return errors::Unimplemented(
"NewReadOnlyMemoryRegionFromFile unimplemented");
}
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
return errors::Unimplemented("FileExists unimplemented");
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return errors::Unimplemented("GetChildren unimplemented");
}
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token = nullptr */)
override {
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* results) override {
return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status DeleteFile(const string& fname, TransactionToken* token) override {
return errors::Unimplemented("DeleteFile unimplemented");
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return errors::Unimplemented("CreateDir unimplemented");
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return errors::Unimplemented("DeleteDir unimplemented");
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
return errors::Unimplemented("GetFileSize unimplemented");
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
return errors::Unimplemented("RenameFile unimplemented");
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
return errors::Unimplemented("Stat unimplemented");
}
};

View File

@ -103,10 +103,11 @@ class RamRandomAccessFile : public RandomAccessFile, public WritableFile {
class RamFileSystem : public FileSystem {
public:
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
mutex_lock m(mu_);
if (fs_.find(fname) == fs_.end()) {
return errors::NotFound("");
@ -116,10 +117,8 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
mutex_lock m(mu_);
if (fs_.find(fname) == fs_.end()) {
fs_[fname] = std::make_shared<std::string>();
@ -128,10 +127,8 @@ class RamFileSystem : public FileSystem {
new RamRandomAccessFile(fname, fs_[fname]));
return Status::OK();
}
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
mutex_lock m(mu_);
if (fs_.find(fname) == fs_.end()) {
fs_[fname] = std::make_shared<std::string>();
@ -142,22 +139,18 @@ class RamFileSystem : public FileSystem {
}
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
return errors::Unimplemented("");
}
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
FileStatistics stat;
return Stat(fname, &stat);
return Stat(fname, token, &stat);
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
mutex_lock m(mu_);
auto it = fs_.lower_bound(dir);
while (it != fs_.end() && absl::StartsWith(it->first, dir)) {
@ -168,10 +161,8 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token = nullptr */)
override {
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* results) override {
mutex_lock m(mu_);
Env* env = Env::Default();
for (auto it = fs_.begin(); it != fs_.end(); ++it) {
@ -182,9 +173,8 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
mutex_lock m(mu_);
auto it = fs_.lower_bound(fname);
if (it == fs_.end()) {
@ -204,8 +194,7 @@ class RamFileSystem : public FileSystem {
return Status::OK();
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status DeleteFile(const string& fname, TransactionToken* token) override {
mutex_lock m(mu_);
if (fs_.find(fname) != fs_.end()) {
fs_.erase(fname);
@ -215,24 +204,21 @@ class RamFileSystem : public FileSystem {
return errors::NotFound("");
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return Status::OK();
}
Status RecursivelyCreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status RecursivelyCreateDir(const string& dirname,
TransactionToken* token) override {
return Status::OK();
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return Status::OK();
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
mutex_lock m(mu_);
if (fs_.find(fname) != fs_.end()) {
*file_size = fs_[fname]->size();
@ -241,9 +227,8 @@ class RamFileSystem : public FileSystem {
return errors::NotFound("");
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
mutex_lock m(mu_);
if (fs_.find(src) != fs_.end()) {
fs_[target] = fs_[src];

View File

@ -38,108 +38,104 @@ class RetryingFileSystem : public FileSystem {
: base_file_system_(std::move(base_file_system)),
retry_config_(retry_config) {}
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status NewRandomAccessFile(
const string& filename,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override;
const string& filename, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(
const string& filename,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewWritableFile(const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(
const string& filename,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewAppendableFile(const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const string& filename,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override;
const string& filename, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &fname]() { return base_file_system_->FileExists(fname); },
retry_config_);
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
return RetryingUtils::CallWithRetries(
[this, &dir, result]() {
return base_file_system_->GetChildren(dir, result);
[this, &fname, token]() {
return base_file_system_->FileExists(fname, token);
},
retry_config_);
}
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return RetryingUtils::CallWithRetries(
[this, &pattern, result]() {
return base_file_system_->GetMatchingPaths(pattern, result);
[this, &dir, result, token]() {
return base_file_system_->GetChildren(dir, token, result);
},
retry_config_);
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* result) override {
return RetryingUtils::CallWithRetries(
[this, &fname, stat]() { return base_file_system_->Stat(fname, stat); },
[this, &pattern, result, token]() {
return base_file_system_->GetMatchingPaths(pattern, token, result);
},
retry_config_);
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
return RetryingUtils::CallWithRetries(
[this, &fname, stat, token]() {
return base_file_system_->Stat(fname, token, stat);
},
retry_config_);
}
Status DeleteFile(const string& fname, TransactionToken* token) override {
return RetryingUtils::DeleteWithRetries(
[this, &fname]() { return base_file_system_->DeleteFile(fname); },
[this, &fname, token]() {
return base_file_system_->DeleteFile(fname, token);
},
retry_config_);
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &dirname]() { return base_file_system_->CreateDir(dirname); },
[this, &dirname, token]() {
return base_file_system_->CreateDir(dirname, token);
},
retry_config_);
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return RetryingUtils::DeleteWithRetries(
[this, &dirname]() { return base_file_system_->DeleteDir(dirname); },
retry_config_);
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
return RetryingUtils::CallWithRetries(
[this, &fname, file_size]() {
return base_file_system_->GetFileSize(fname, file_size);
[this, &dirname, token]() {
return base_file_system_->DeleteDir(dirname, token);
},
retry_config_);
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
return RetryingUtils::CallWithRetries(
[this, &src, &target]() {
return base_file_system_->RenameFile(src, target);
[this, &fname, file_size, token]() {
return base_file_system_->GetFileSize(fname, token, file_size);
},
retry_config_);
}
Status IsDirectory(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &dirname]() { return base_file_system_->IsDirectory(dirname); },
[this, &src, &target, token]() {
return base_file_system_->RenameFile(src, target, token);
},
retry_config_);
}
Status IsDirectory(const string& dirname, TransactionToken* token) override {
return RetryingUtils::CallWithRetries(
[this, &dirname, token]() {
return base_file_system_->IsDirectory(dirname, token);
},
retry_config_);
}
@ -148,19 +144,19 @@ class RetryingFileSystem : public FileSystem {
return base_file_system_->HasAtomicMove(path, has_atomic_move);
}
Status DeleteRecursively(
const string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override {
Status DeleteRecursively(const string& dirname, TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) override {
return RetryingUtils::DeleteWithRetries(
[this, &dirname, undeleted_files, undeleted_dirs]() {
return base_file_system_->DeleteRecursively(dirname, undeleted_files,
undeleted_dirs);
[this, &dirname, token, undeleted_files, undeleted_dirs]() {
return base_file_system_->DeleteRecursively(
dirname, token, undeleted_files, undeleted_dirs);
},
retry_config_);
}
void FlushCaches(/* TransactionToken* token=nullptr */) override {
base_file_system_->FlushCaches();
void FlushCaches(TransactionToken* token) override {
base_file_system_->FlushCaches(token);
}
Underlying* underlying() const { return base_file_system_.get(); }
@ -243,12 +239,13 @@ class RetryingWritableFile : public WritableFile {
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
const string& filename,
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) {
std::unique_ptr<RandomAccessFile> base_file;
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
[this, &filename, &base_file]() {
return base_file_system_->NewRandomAccessFile(filename, &base_file);
[this, &filename, &base_file, token]() {
return base_file_system_->NewRandomAccessFile(filename, token,
&base_file);
},
retry_config_));
result->reset(new retrying_internals::RetryingRandomAccessFile(
@ -258,12 +255,12 @@ Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewWritableFile(
const string& filename,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
std::unique_ptr<WritableFile> base_file;
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
[this, &filename, &base_file]() {
return base_file_system_->NewWritableFile(filename, &base_file);
[this, &filename, &base_file, token]() {
return base_file_system_->NewWritableFile(filename, token, &base_file);
},
retry_config_));
result->reset(new retrying_internals::RetryingWritableFile(
@ -273,12 +270,13 @@ Status RetryingFileSystem<Underlying>::NewWritableFile(
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewAppendableFile(
const string& filename,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
std::unique_ptr<WritableFile> base_file;
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
[this, &filename, &base_file]() {
return base_file_system_->NewAppendableFile(filename, &base_file);
[this, &filename, &base_file, token]() {
return base_file_system_->NewAppendableFile(filename, token,
&base_file);
},
retry_config_));
result->reset(new retrying_internals::RetryingWritableFile(
@ -288,12 +286,12 @@ Status RetryingFileSystem<Underlying>::NewAppendableFile(
template <typename Underlying>
Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token */) {
const string& filename, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
return RetryingUtils::CallWithRetries(
[this, &filename, result]() {
return base_file_system_->NewReadOnlyMemoryRegionFromFile(filename,
result);
[this, &filename, result, token]() {
return base_file_system_->NewReadOnlyMemoryRegionFromFile(
filename, token, result);
},
retry_config_);
}

View File

@ -99,101 +99,85 @@ class MockFileSystem : public FileSystem {
explicit MockFileSystem(const ExpectedCalls& calls, bool* flushed = nullptr)
: calls_(calls), flushed_(flushed) {}
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
*result = std::move(random_access_file_to_return);
return calls_.ConsumeNextCall("NewRandomAccessFile");
}
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
*result = std::move(writable_file_to_return);
return calls_.ConsumeNextCall("NewWritableFile");
}
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override {
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override {
*result = std::move(writable_file_to_return);
return calls_.ConsumeNextCall("NewAppendableFile");
}
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
return calls_.ConsumeNextCall("NewReadOnlyMemoryRegionFromFile");
}
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status FileExists(const string& fname, TransactionToken* token) override {
return calls_.ConsumeNextCall("FileExists");
}
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return calls_.ConsumeNextCall("GetChildren");
}
Status GetMatchingPaths(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override {
Status GetMatchingPaths(const string& dir, TransactionToken* token,
std::vector<string>* result) override {
return calls_.ConsumeNextCall("GetMatchingPaths");
}
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override {
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override {
return calls_.ConsumeNextCall("Stat");
}
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override {
Status DeleteFile(const string& fname, TransactionToken* token) override {
return calls_.ConsumeNextCall("DeleteFile");
}
Status CreateDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status CreateDir(const string& dirname, TransactionToken* token) override {
return calls_.ConsumeNextCall("CreateDir");
}
Status DeleteDir(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status DeleteDir(const string& dirname, TransactionToken* token) override {
return calls_.ConsumeNextCall("DeleteDir");
}
Status GetFileSize(
const string& fname,
uint64* file_size /*, TransactionToken* token = nullptr */) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
return calls_.ConsumeNextCall("GetFileSize");
}
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override {
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override {
return calls_.ConsumeNextCall("RenameFile");
}
Status IsDirectory(
const string& dirname /*, TransactionToken* token = nullptr */) override {
Status IsDirectory(const string& dirname, TransactionToken* token) override {
return calls_.ConsumeNextCall("IsDirectory");
}
Status DeleteRecursively(
const string& dirname, int64* undeleted_files,
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override {
Status DeleteRecursively(const string& dirname, TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) override {
return calls_.ConsumeNextCall("DeleteRecursively");
}
void FlushCaches(/* TransactionToken* token=nullptr */) override {
void FlushCaches(TransactionToken* token) override {
if (flushed_) {
*flushed_ = true;
}
@ -225,7 +209,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_ImmediateSuccess) {
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
// Use it and check the results.
StringPiece result;
@ -256,7 +241,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_SuccessWith3rdTry) {
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
// Use it and check the results.
StringPiece result;
@ -281,7 +267,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_AllRetriesFailed) {
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
// Use it and check the results.
StringPiece result;
@ -311,7 +298,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_NoRetriesForSomeErrors) {
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
TF_EXPECT_OK(
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
// Use it and check the results.
StringPiece result;
@ -339,7 +327,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_ImmediateSuccess) {
// Retrieve the wrapped writable file.
std::unique_ptr<WritableFile> writable_file;
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
StringPiece result;
TF_EXPECT_OK(writable_file->Name(&result));
@ -370,7 +358,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_SuccessWith3rdTry) {
// Retrieve the wrapped writable file.
std::unique_ptr<WritableFile> writable_file;
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
// Use it and check the results.
TF_EXPECT_OK(writable_file->Sync());
@ -397,7 +385,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_SuccessWith3rdTry_ViaDestructor) {
// Retrieve the wrapped writable file.
std::unique_ptr<WritableFile> writable_file;
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
writable_file.reset(); // Trigger Close() via destructor.
}
@ -423,7 +411,7 @@ TEST(RetryingFileSystemTest, NewAppendableFile_SuccessWith3rdTry) {
// Retrieve the wrapped appendable file.
std::unique_ptr<WritableFile> writable_file;
TF_EXPECT_OK(fs.NewAppendableFile("filename.txt", &writable_file));
TF_EXPECT_OK(fs.NewAppendableFile("filename.txt", nullptr, &writable_file));
// Use it and check the results.
TF_EXPECT_OK(writable_file->Sync());
@ -447,7 +435,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_AllRetriesFailed) {
// Retrieve the wrapped writable file.
std::unique_ptr<WritableFile> writable_file;
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
// Use it and check the results.
const auto& status = writable_file->Sync();
@ -467,7 +455,8 @@ TEST(RetryingFileSystemTest,
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
std::unique_ptr<ReadOnlyMemoryRegion> result;
TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile("filename.txt", &result));
TF_EXPECT_OK(
fs.NewReadOnlyMemoryRegionFromFile("filename.txt", nullptr, &result));
}
TEST(RetryingFileSystemTest, NewReadOnlyMemoryRegionFromFile_AllRetriesFailed) {
@ -480,7 +469,7 @@ TEST(RetryingFileSystemTest, NewReadOnlyMemoryRegionFromFile_AllRetriesFailed) {
std::unique_ptr<ReadOnlyMemoryRegion> result;
const auto& status =
fs.NewReadOnlyMemoryRegionFromFile("filename.txt", &result);
fs.NewReadOnlyMemoryRegionFromFile("filename.txt", nullptr, &result);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -496,7 +485,7 @@ TEST(RetryingFileSystemTest, GetChildren_SuccessWith2ndTry) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
std::vector<string> result;
TF_EXPECT_OK(fs.GetChildren("gs://path", &result));
TF_EXPECT_OK(fs.GetChildren("gs://path", nullptr, &result));
}
TEST(RetryingFileSystemTest, GetChildren_AllRetriesFailed) {
@ -507,7 +496,7 @@ TEST(RetryingFileSystemTest, GetChildren_AllRetriesFailed) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
std::vector<string> result;
const auto& status = fs.GetChildren("gs://path", &result);
const auto& status = fs.GetChildren("gs://path", nullptr, &result);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -523,7 +512,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_SuccessWith2ndTry) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://path/dir", &result));
TF_EXPECT_OK(fs.GetMatchingPaths("gs://path/dir", nullptr, &result));
}
TEST(RetryingFileSystemTest, GetMatchingPaths_AllRetriesFailed) {
@ -535,7 +524,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_AllRetriesFailed) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
std::vector<string> result;
const auto& status = fs.GetMatchingPaths("gs://path/dir", &result);
const auto& status = fs.GetMatchingPaths("gs://path/dir", nullptr, &result);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -549,7 +538,7 @@ TEST(RetryingFileSystemTest, DeleteFile_SuccessWith2ndTry) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
TF_EXPECT_OK(fs.DeleteFile("gs://path/file.txt"));
TF_EXPECT_OK(fs.DeleteFile("gs://path/file.txt", nullptr));
}
TEST(RetryingFileSystemTest, DeleteFile_AllRetriesFailed) {
@ -559,7 +548,7 @@ TEST(RetryingFileSystemTest, DeleteFile_AllRetriesFailed) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
const auto& status = fs.DeleteFile("gs://path/file.txt");
const auto& status = fs.DeleteFile("gs://path/file.txt", nullptr);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -573,7 +562,7 @@ TEST(RetryingFileSystemTest, CreateDir_SuccessWith2ndTry) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
TF_EXPECT_OK(fs.CreateDir("gs://path/newdir"));
TF_EXPECT_OK(fs.CreateDir("gs://path/newdir", nullptr));
}
TEST(RetryingFileSystemTest, CreateDir_AllRetriesFailed) {
@ -583,7 +572,7 @@ TEST(RetryingFileSystemTest, CreateDir_AllRetriesFailed) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
const auto& status = fs.CreateDir("gs://path/newdir");
const auto& status = fs.CreateDir("gs://path/newdir", nullptr);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -597,7 +586,7 @@ TEST(RetryingFileSystemTest, DeleteDir_SuccessWith2ndTry) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
TF_EXPECT_OK(fs.DeleteDir("gs://path/dir"));
TF_EXPECT_OK(fs.DeleteDir("gs://path/dir", nullptr));
}
TEST(RetryingFileSystemTest, DeleteDir_AllRetriesFailed) {
@ -607,7 +596,7 @@ TEST(RetryingFileSystemTest, DeleteDir_AllRetriesFailed) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
const auto& status = fs.DeleteDir("gs://path/dir");
const auto& status = fs.DeleteDir("gs://path/dir", nullptr);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -623,7 +612,7 @@ TEST(RetryingFileSystemTest, GetFileSize_SuccessWith2ndTry) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
uint64 size;
TF_EXPECT_OK(fs.GetFileSize("gs://path/file.txt", &size));
TF_EXPECT_OK(fs.GetFileSize("gs://path/file.txt", nullptr, &size));
}
TEST(RetryingFileSystemTest, GetFileSize_AllRetriesFailed) {
@ -634,7 +623,7 @@ TEST(RetryingFileSystemTest, GetFileSize_AllRetriesFailed) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
uint64 size;
const auto& status = fs.GetFileSize("gs://path/file.txt", &size);
const auto& status = fs.GetFileSize("gs://path/file.txt", nullptr, &size);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -648,7 +637,7 @@ TEST(RetryingFileSystemTest, RenameFile_SuccessWith2ndTry) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
TF_EXPECT_OK(fs.RenameFile("old_name", "new_name"));
TF_EXPECT_OK(fs.RenameFile("old_name", "new_name", nullptr));
}
TEST(RetryingFileSystemTest, RenameFile_AllRetriesFailed) {
@ -658,7 +647,7 @@ TEST(RetryingFileSystemTest, RenameFile_AllRetriesFailed) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
const auto& status = fs.RenameFile("old_name", "new_name");
const auto& status = fs.RenameFile("old_name", "new_name", nullptr);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -673,7 +662,7 @@ TEST(RetryingFileSystemTest, Stat_SuccessWith2ndTry) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("file_name", &stat));
TF_EXPECT_OK(fs.Stat("file_name", nullptr, &stat));
}
TEST(RetryingFileSystemTest, Stat_AllRetriesFailed) {
@ -684,7 +673,7 @@ TEST(RetryingFileSystemTest, Stat_AllRetriesFailed) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
FileStatistics stat;
const auto& status = fs.Stat("file_name", &stat);
const auto& status = fs.Stat("file_name", nullptr, &stat);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -696,7 +685,7 @@ TEST(RetryingFileSystemTest, FileExists_AllRetriesFailed) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
const auto& status = fs.FileExists("file_name");
const auto& status = fs.FileExists("file_name", nullptr);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -710,7 +699,7 @@ TEST(RetryingFileSystemTest, FileExists_SuccessWith2ndTry) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
TF_EXPECT_OK(fs.FileExists("gs://path/dir"));
TF_EXPECT_OK(fs.FileExists("gs://path/dir", nullptr));
}
TEST(RetryingFileSystemTest, IsDirectory_SuccessWith2ndTry) {
@ -723,7 +712,7 @@ TEST(RetryingFileSystemTest, IsDirectory_SuccessWith2ndTry) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
TF_EXPECT_OK(fs.IsDirectory("gs://path/dir"));
TF_EXPECT_OK(fs.IsDirectory("gs://path/dir", nullptr));
}
TEST(RetryingFileSystemTest, IsDirectory_AllRetriesFailed) {
@ -733,7 +722,7 @@ TEST(RetryingFileSystemTest, IsDirectory_AllRetriesFailed) {
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
const auto& status = fs.IsDirectory("gs://path/dir");
const auto& status = fs.IsDirectory("gs://path/dir", nullptr);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -749,8 +738,8 @@ TEST(RetryingFileSystemTest, DeleteRecursively_SuccessWith2ndTry) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
int64 undeleted_files, undeleted_dirs;
TF_EXPECT_OK(
fs.DeleteRecursively("gs://path/dir", &undeleted_files, &undeleted_dirs));
TF_EXPECT_OK(fs.DeleteRecursively("gs://path/dir", nullptr, &undeleted_files,
&undeleted_dirs));
}
TEST(RetryingFileSystemTest, DeleteRecursively_AllRetriesFailed) {
@ -762,8 +751,8 @@ TEST(RetryingFileSystemTest, DeleteRecursively_AllRetriesFailed) {
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
int64 undeleted_files, undeleted_dirs;
const auto& status =
fs.DeleteRecursively("gs://path/dir", &undeleted_files, &undeleted_dirs);
const auto& status = fs.DeleteRecursively("gs://path/dir", nullptr,
&undeleted_files, &undeleted_dirs);
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
<< status;
}
@ -774,7 +763,7 @@ TEST(RetryingFileSystemTest, FlushCaches) {
std::unique_ptr<MockFileSystem> base_fs(new MockFileSystem(none, &flushed));
RetryingFileSystem<MockFileSystem> fs(
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
fs.FlushCaches();
fs.FlushCaches(nullptr);
EXPECT_TRUE(flushed);
}

View File

@ -58,7 +58,7 @@ static const char* kS3TempFileTemplate = "/tmp/s3_filesystem_XXXXXX";
#endif
static const char* kS3FileSystemAllocationTag = "S3FileSystemAllocation";
static const size_t kS3ReadAppendableFileBufferSize = 1024 * 1024;
static const int64 kS3TimeoutMsec = 300000; // 5 min
static const int64 kS3TimeoutMsec = 300000; // 5 min
static const uint64 kS3MultiPartUploadChunkSize = 50 * 1024 * 1024; // 50 MB
static const uint64 kS3MultiPartDownloadChunkSize = 2 * 1024 * 1024; // 50 MB
static const int kS3GetChildrenMaxKeys = 100;
@ -568,14 +568,14 @@ S3FileSystem::GetExecutor() {
}
Status S3FileSystem::NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
return NewRandomAccessFile(fname, result, true);
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) {
return NewRandomAccessFile(fname, token, result, true);
}
Status S3FileSystem::NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result,
bool use_multi_part_download /*, TransactionToken* token */) {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result, bool use_multi_part_download) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
@ -588,9 +588,9 @@ Status S3FileSystem::NewRandomAccessFile(
return Status::OK();
}
Status S3FileSystem::NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
Status S3FileSystem::NewWritableFile(const string& fname,
TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
result->reset(new S3WritableFile(
@ -601,11 +601,11 @@ Status S3FileSystem::NewWritableFile(
return Status::OK();
}
Status S3FileSystem::NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
Status S3FileSystem::NewAppendableFile(const string& fname,
TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
std::unique_ptr<RandomAccessFile> reader;
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
std::unique_ptr<char[]> buffer(new char[kS3ReadAppendableFileBufferSize]);
Status status;
uint64 offset = 0;
@ -637,14 +637,14 @@ Status S3FileSystem::NewAppendableFile(
}
Status S3FileSystem::NewReadOnlyMemoryRegionFromFile(
const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token */) {
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
uint64 size;
TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
std::unique_ptr<char[]> data(new char[size]);
std::unique_ptr<RandomAccessFile> file;
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
StringPiece piece;
TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
@ -653,16 +653,14 @@ Status S3FileSystem::NewReadOnlyMemoryRegionFromFile(
return Status::OK();
}
Status S3FileSystem::FileExists(
const string& fname /*, TransactionToken* token */) {
Status S3FileSystem::FileExists(const string& fname, TransactionToken* token) {
FileStatistics stats;
TF_RETURN_IF_ERROR(this->Stat(fname, &stats));
TF_RETURN_IF_ERROR(this->Stat(fname, token, &stats));
return Status::OK();
}
Status S3FileSystem::GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token */) {
Status S3FileSystem::GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) {
VLOG(1) << "GetChildren for path: " << dir;
string bucket, prefix;
TF_RETURN_IF_ERROR(ParseS3Path(dir, true, &bucket, &prefix));
@ -709,8 +707,8 @@ Status S3FileSystem::GetChildren(
return Status::OK();
}
Status S3FileSystem::Stat(
const string& fname, FileStatistics* stats /*, TransactionToken* token */) {
Status S3FileSystem::Stat(const string& fname, TransactionToken* token,
FileStatistics* stats) {
VLOG(1) << "Stat on path: " << fname;
string bucket, object;
TF_RETURN_IF_ERROR(ParseS3Path(fname, true, &bucket, &object));
@ -772,14 +770,13 @@ Status S3FileSystem::Stat(
return Status::OK();
}
Status S3FileSystem::GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token */) {
Status S3FileSystem::GetMatchingPaths(const string& pattern,
TransactionToken* token,
std::vector<string>* results) {
return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
}
Status S3FileSystem::DeleteFile(
const string& fname /*, TransactionToken* token */) {
Status S3FileSystem::DeleteFile(const string& fname, TransactionToken* token) {
VLOG(1) << "DeleteFile: " << fname;
string bucket, object;
TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
@ -795,8 +792,7 @@ Status S3FileSystem::DeleteFile(
return Status::OK();
}
Status S3FileSystem::CreateDir(
const string& dirname /*, TransactionToken* token */) {
Status S3FileSystem::CreateDir(const string& dirname, TransactionToken* token) {
VLOG(1) << "CreateDir: " << dirname;
string bucket, object;
TF_RETURN_IF_ERROR(ParseS3Path(dirname, true, &bucket, &object));
@ -815,16 +811,15 @@ Status S3FileSystem::CreateDir(
if (filename.back() != '/') {
filename.push_back('/');
}
if (!this->FileExists(filename).ok()) {
if (!this->FileExists(filename, token).ok()) {
std::unique_ptr<WritableFile> file;
TF_RETURN_IF_ERROR(NewWritableFile(filename, &file));
TF_RETURN_IF_ERROR(NewWritableFile(filename, token, &file));
TF_RETURN_IF_ERROR(file->Close());
}
return Status::OK();
}
Status S3FileSystem::DeleteDir(
const string& dirname /*, TransactionToken* token */) {
Status S3FileSystem::DeleteDir(const string& dirname, TransactionToken* token) {
VLOG(1) << "DeleteDir: " << dirname;
string bucket, object;
TF_RETURN_IF_ERROR(ParseS3Path(dirname, false, &bucket, &object));
@ -855,7 +850,7 @@ Status S3FileSystem::DeleteDir(
if (filename.back() != '/') {
filename.push_back('/');
}
return DeleteFile(filename);
return DeleteFile(filename, token);
}
} else {
TF_RETURN_IF_ERROR(CheckForbiddenError(listObjectsOutcome.GetError()));
@ -863,10 +858,10 @@ Status S3FileSystem::DeleteDir(
return Status::OK();
}
Status S3FileSystem::GetFileSize(
const string& fname, uint64* file_size /*, TransactionToken* token */) {
Status S3FileSystem::GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) {
FileStatistics stats;
TF_RETURN_IF_ERROR(this->Stat(fname, &stats));
TF_RETURN_IF_ERROR(this->Stat(fname, token, &stats));
*file_size = stats.length;
return Status::OK();
}
@ -916,8 +911,8 @@ Status S3FileSystem::CopyFile(const Aws::String& source_bucket,
Aws::String source = Aws::String((source_bucket + "/" + source_key).c_str());
Aws::String source_full_path = Aws::String("s3://") + source;
uint64 file_length;
TF_RETURN_IF_ERROR(
this->GetFileSize(string(source_full_path.c_str()), &file_length));
TF_RETURN_IF_ERROR(this->GetFileSize(string(source_full_path.c_str()),
nullptr, &file_length));
int num_parts;
if (file_length <=
multi_part_chunk_size_[Aws::Transfer::TransferDirection::UPLOAD]) {
@ -1135,8 +1130,8 @@ Status S3FileSystem::CompleteMultiPartCopy(
return Status::OK();
}
Status S3FileSystem::RenameFile(
const string& src, const string& target /*, TransactionToken* token */) {
Status S3FileSystem::RenameFile(const string& src, const string& target,
TransactionToken* token) {
VLOG(1) << "RenameFile from: " << src << " to: " << target;
string src_bucket, src_object, target_bucket, target_object;
TF_RETURN_IF_ERROR(ParseS3Path(src, false, &src_bucket, &src_object));

View File

@ -49,67 +49,50 @@ class S3FileSystem : public FileSystem {
S3FileSystem();
~S3FileSystem();
Status NewRandomAccessFile(
const string& fname,
std::unique_ptr<RandomAccessFile>*
result /*, TransactionToken* token = nullptr */) override;
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
Status NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result,
bool use_multi_part_download /*, TransactionToken* token = nullptr */);
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewRandomAccessFile(const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result,
bool use_multi_part_download);
Status NewAppendableFile(
const string& fname,
std::unique_ptr<WritableFile>*
result /*, TransactionToken* token = nullptr */) override;
Status NewWritableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>*
result /*, TransactionToken* token = nullptr */) override;
const string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
Status FileExists(
const string& fname /*, TransactionToken* token = nullptr */) override;
Status FileExists(const string& fname, TransactionToken* token) override;
Status GetChildren(
const string& dir,
std::vector<string>* result /*, TransactionToken* token = nullptr */)
override;
Status GetChildren(const string& dir, TransactionToken* token,
std::vector<string>* result) override;
Status Stat(
const string& fname,
FileStatistics* stat /*, TransactionToken* token = nullptr */) override;
Status Stat(const string& fname, TransactionToken* token,
FileStatistics* stat) override;
Status GetMatchingPaths(
const string& pattern,
std::vector<string>* results /*, TransactionToken* token = nullptr */)
override;
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
std::vector<string>* results) override;
Status DeleteFile(
const string& fname /*, TransactionToken* token = nullptr */) override;
Status DeleteFile(const string& fname, TransactionToken* token) override;
Status CreateDir(
const string& name /*, TransactionToken* token = nullptr */) override;
Status CreateDir(const string& name, TransactionToken* token) override;
Status DeleteDir(
const string& name /*, TransactionToken* token = nullptr */) override;
Status DeleteDir(const string& name, TransactionToken* token) override;
Status GetFileSize(
const string& fname,
uint64* size /*, TransactionToken* token = nullptr */) override;
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* size) override;
Status RenameFile(
const string& src,
const string& target /*, TransactionToken* token = nullptr */) override;
Status RenameFile(const string& src, const string& target,
TransactionToken* token) override;
Status HasAtomicMove(
const string& path,
bool* has_atomic_move /*, TransactionToken* token = nullptr */) override;
Status HasAtomicMove(const string& path, bool* has_atomic_move) override;
private:
// Returns the member S3 client, initializing as-needed.

View File

@ -39,12 +39,14 @@ class TestRandomAccessFile : public RandomAccessFile {
class TestFileSystem : public NullFileSystem {
public:
Status NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result) override {
const string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) override {
result->reset(new TestRandomAccessFile);
return Status::OK();
}
// Always return size of 10
Status GetFileSize(const string& fname, uint64* file_size) override {
Status GetFileSize(const string& fname, TransactionToken* token,
uint64* file_size) override {
*file_size = 10;
return Status::OK();
}