Add all retrying file systems to PR
This commit is contained in:
parent
e2cba3e0a2
commit
18e4ffd5be
@ -648,7 +648,7 @@ class GcsWritableFile : public WritableFile {
|
||||
TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
|
||||
" when composing to ", GetGcsPath());
|
||||
TF_RETURN_WITH_CONTEXT_IF_ERROR(
|
||||
filesystem_->DeleteFile(GetGcsPathWithObject(append_object)),
|
||||
filesystem_->DeleteFile(GetGcsPathWithObject(append_object),nullptr),
|
||||
" when cleaning up.");
|
||||
return Status::OK();
|
||||
},
|
||||
@ -929,8 +929,8 @@ GcsFileSystem::GcsFileSystem(
|
||||
additional_header_(additional_header) {}
|
||||
|
||||
Status GcsFileSystem::NewRandomAccessFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
|
||||
const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<RandomAccessFile>* result) {
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
|
||||
TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
|
||||
@ -1231,9 +1231,9 @@ void GcsFileSystem::ClearFileCaches(const string& fname) {
|
||||
// MatchingPathsCache as well.
|
||||
}
|
||||
|
||||
Status GcsFileSystem::NewWritableFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::NewWritableFile(const string& fname,
|
||||
TransactionToken* token,
|
||||
std::unique_ptr<WritableFile>* result) {
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
|
||||
|
||||
@ -1267,11 +1267,11 @@ Status GcsFileSystem::NewWritableFile(
|
||||
|
||||
// Reads the file from GCS in chunks and stores it in a tmp file,
|
||||
// which is then passed to GcsWritableFile.
|
||||
Status GcsFileSystem::NewAppendableFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::NewAppendableFile(const string& fname,
|
||||
TransactionToken* token,
|
||||
std::unique_ptr<WritableFile>* result) {
|
||||
std::unique_ptr<RandomAccessFile> reader;
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
|
||||
std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
|
||||
Status status;
|
||||
uint64 offset = 0;
|
||||
@ -1330,14 +1330,14 @@ Status GcsFileSystem::NewAppendableFile(
|
||||
}
|
||||
|
||||
Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
|
||||
const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>*
|
||||
result /*, TransactionToken* token */) {
|
||||
const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
|
||||
uint64 size;
|
||||
TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
|
||||
TF_RETURN_IF_ERROR(GetFileSize(fname,token, &size));
|
||||
std::unique_ptr<char[]> data(new char[size]);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
|
||||
|
||||
StringPiece piece;
|
||||
TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
|
||||
@ -1346,8 +1346,7 @@ Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status GcsFileSystem::FileExists(
|
||||
const string& fname /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
|
||||
if (object.empty()) {
|
||||
@ -1561,17 +1560,17 @@ Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
|
||||
return s;
|
||||
}
|
||||
|
||||
Status GcsFileSystem::GetChildren(
|
||||
const string& dirname,
|
||||
std::vector<string>* result /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::GetChildren(const string& dirname,
|
||||
TransactionToken* token,
|
||||
std::vector<string>* result) {
|
||||
return GetChildrenBounded(dirname, UINT64_MAX, result,
|
||||
false /* recursively */,
|
||||
false /* include_self_directory_marker */);
|
||||
}
|
||||
|
||||
Status GcsFileSystem::GetMatchingPaths(
|
||||
const string& pattern,
|
||||
std::vector<string>* results /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::GetMatchingPaths(const string& pattern,
|
||||
TransactionToken* token,
|
||||
std::vector<string>* results) {
|
||||
MatchingPathsCache::ComputeFunc compute_func =
|
||||
[this](const string& pattern, std::vector<string>* results) {
|
||||
results->clear();
|
||||
@ -1731,8 +1730,8 @@ Status GcsFileSystem::GetChildrenBounded(const string& dirname,
|
||||
}
|
||||
}
|
||||
|
||||
Status GcsFileSystem::Stat(
|
||||
const string& fname, FileStatistics* stat /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
|
||||
FileStatistics* stat) {
|
||||
if (!stat) {
|
||||
return errors::Internal("'stat' cannot be nullptr.");
|
||||
}
|
||||
@ -1766,8 +1765,7 @@ Status GcsFileSystem::Stat(
|
||||
return errors::NotFound("The specified path ", fname, " was not found.");
|
||||
}
|
||||
|
||||
Status GcsFileSystem::DeleteFile(
|
||||
const string& fname /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
|
||||
|
||||
@ -1783,8 +1781,8 @@ Status GcsFileSystem::DeleteFile(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status GcsFileSystem::CreateDir(
|
||||
const string& dirname /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::CreateDir(const string& dirname,
|
||||
TransactionToken* token) {
|
||||
string dirname_with_slash = MaybeAppendSlash(dirname);
|
||||
VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
|
||||
<< " and dirname_with_slash: " << dirname_with_slash;
|
||||
@ -1799,7 +1797,7 @@ Status GcsFileSystem::CreateDir(
|
||||
dirname_with_slash, " was not found.");
|
||||
}
|
||||
|
||||
if (FileExists(dirname_with_slash).ok()) {
|
||||
if (FileExists(dirname_with_slash,token).ok()) {
|
||||
// Use the original name for a correct error here.
|
||||
VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
|
||||
return errors::AlreadyExists(dirname);
|
||||
@ -1833,8 +1831,8 @@ Status GcsFileSystem::CreateDir(
|
||||
|
||||
// Checks that the directory is empty (i.e no objects with this prefix exist).
|
||||
// Deletes the GCS directory marker if it exists.
|
||||
Status GcsFileSystem::DeleteDir(
|
||||
const string& dirname /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::DeleteDir(const string& dirname,
|
||||
TransactionToken* token) {
|
||||
std::vector<string> children;
|
||||
// A directory is considered empty either if there are no matching objects
|
||||
// with the corresponding name prefix or if there is exactly one matching
|
||||
@ -1849,13 +1847,13 @@ Status GcsFileSystem::DeleteDir(
|
||||
}
|
||||
if (children.size() == 1 && children[0].empty()) {
|
||||
// This is the directory marker object. Delete it.
|
||||
return DeleteFile(MaybeAppendSlash(dirname));
|
||||
return DeleteFile(MaybeAppendSlash(dirname),token);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status GcsFileSystem::GetFileSize(
|
||||
const string& fname, uint64* file_size /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
|
||||
uint64* file_size) {
|
||||
if (!file_size) {
|
||||
return errors::Internal("'file_size' cannot be nullptr.");
|
||||
}
|
||||
@ -1865,14 +1863,14 @@ Status GcsFileSystem::GetFileSize(
|
||||
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
|
||||
|
||||
FileStatistics stat;
|
||||
TF_RETURN_IF_ERROR(Stat(fname, &stat));
|
||||
TF_RETURN_IF_ERROR(Stat(fname,token, &stat));
|
||||
*file_size = stat.length;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status GcsFileSystem::RenameFile(
|
||||
const string& src, const string& target /*, TransactionToken* token */) {
|
||||
if (!IsDirectory(src).ok()) {
|
||||
Status GcsFileSystem::RenameFile(const string& src, const string& target,
|
||||
TransactionToken* token) {
|
||||
if (!IsDirectory(src,token).ok()) {
|
||||
return RenameObject(src, target);
|
||||
}
|
||||
// Rename all individual objects in the directory one by one.
|
||||
@ -1930,11 +1928,11 @@ Status GcsFileSystem::RenameObject(const string& src, const string& target) {
|
||||
// on the server side, we can't just retry the whole RenameFile operation
|
||||
// because the source object is already gone.
|
||||
return RetryingUtils::DeleteWithRetries(
|
||||
[this, &src]() { return DeleteFile(src); }, retry_config_);
|
||||
[this, &src]() { return DeleteFile(src,nullptr); }, retry_config_);
|
||||
}
|
||||
|
||||
Status GcsFileSystem::IsDirectory(
|
||||
const string& fname /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::IsDirectory(const string& fname,
|
||||
TransactionToken* token) {
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
|
||||
if (object.empty()) {
|
||||
@ -1960,16 +1958,17 @@ Status GcsFileSystem::IsDirectory(
|
||||
return errors::NotFound("The specified path ", fname, " was not found.");
|
||||
}
|
||||
|
||||
Status GcsFileSystem::DeleteRecursively(
|
||||
const string& dirname, int64* undeleted_files,
|
||||
int64* undeleted_dirs /*, TransactionToken* token */) {
|
||||
Status GcsFileSystem::DeleteRecursively(const string& dirname,
|
||||
TransactionToken* token,
|
||||
int64* undeleted_files,
|
||||
int64* undeleted_dirs) {
|
||||
if (!undeleted_files || !undeleted_dirs) {
|
||||
return errors::Internal(
|
||||
"'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
|
||||
}
|
||||
*undeleted_files = 0;
|
||||
*undeleted_dirs = 0;
|
||||
if (!IsDirectory(dirname).ok()) {
|
||||
if (!IsDirectory(dirname,token).ok()) {
|
||||
*undeleted_dirs = 1;
|
||||
return Status(
|
||||
error::NOT_FOUND,
|
||||
@ -1987,9 +1986,9 @@ Status GcsFileSystem::DeleteRecursively(
|
||||
// and therefore RetryingFileSystem won't pay attention to the failures,
|
||||
// we need to make sure these failures are properly retried.
|
||||
const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
|
||||
[this, &full_path]() { return DeleteFile(full_path); }, retry_config_);
|
||||
[this, &full_path,token]() { return DeleteFile(full_path,token); }, retry_config_);
|
||||
if (!delete_file_status.ok()) {
|
||||
if (IsDirectory(full_path).ok()) {
|
||||
if (IsDirectory(full_path,token).ok()) {
|
||||
// The object is a directory marker.
|
||||
(*undeleted_dirs)++;
|
||||
} else {
|
||||
@ -2003,7 +2002,7 @@ Status GcsFileSystem::DeleteRecursively(
|
||||
// Flushes all caches for filesystem metadata and file contents. Useful for
|
||||
// reclaiming memory once filesystem operations are done (e.g. model is loaded),
|
||||
// or for resetting the filesystem to a consistent state.
|
||||
void GcsFileSystem::FlushCaches(/* TransactionToken* token */) {
|
||||
void GcsFileSystem::FlushCaches(TransactionToken* token) {
|
||||
tf_shared_lock l(block_cache_lock_);
|
||||
file_block_cache_->Flush();
|
||||
stat_cache_->Clear();
|
||||
|
||||
@ -126,67 +126,49 @@ class GcsFileSystem : public FileSystem {
|
||||
bool compose_append);
|
||||
|
||||
Status NewRandomAccessFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<RandomAccessFile>*
|
||||
result /*, TransactionToken* token = nullptr */) override;
|
||||
const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<RandomAccessFile>* result) override;
|
||||
|
||||
Status NewWritableFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<WritableFile>*
|
||||
result) /*, TransactionToken* token = nullptr */ override;
|
||||
Status NewWritableFile(const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<WritableFile>* result) override;
|
||||
|
||||
Status NewAppendableFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<WritableFile>*
|
||||
result /*, TransactionToken* token = nullptr */) override;
|
||||
Status NewAppendableFile(const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<WritableFile>* result) override;
|
||||
|
||||
Status NewReadOnlyMemoryRegionFromFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<ReadOnlyMemoryRegion>*
|
||||
result /*, TransactionToken* token = nullptr */) override;
|
||||
const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
|
||||
|
||||
Status FileExists(
|
||||
const string& fname /*, TransactionToken* token = nullptr */) override;
|
||||
Status FileExists(const string& fname, TransactionToken* token) override;
|
||||
|
||||
Status Stat(
|
||||
const string& fname,
|
||||
FileStatistics* stat /*, TransactionToken* token = nullptr */) override;
|
||||
Status Stat(const string& fname, TransactionToken* token,
|
||||
FileStatistics* stat) override;
|
||||
|
||||
Status GetChildren(
|
||||
const string& dir,
|
||||
std::vector<string>* result /*, TransactionToken* token = nullptr */)
|
||||
override;
|
||||
Status GetChildren(const string& dir, TransactionToken* token,
|
||||
std::vector<string>* result) override;
|
||||
|
||||
Status GetMatchingPaths(
|
||||
const string& pattern,
|
||||
std::vector<string>* results /*, TransactionToken* token = nullptr */)
|
||||
override;
|
||||
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
|
||||
std::vector<string>* results) override;
|
||||
|
||||
Status DeleteFile(
|
||||
const string& fname /*, TransactionToken* token = nullptr */) override;
|
||||
Status DeleteFile(const string& fname, TransactionToken* token) override;
|
||||
|
||||
Status CreateDir(
|
||||
const string& dirname /*, TransactionToken* token = nullptr */) override;
|
||||
Status CreateDir(const string& dirname, TransactionToken* token) override;
|
||||
|
||||
Status DeleteDir(
|
||||
const string& dirname /*, TransactionToken* token = nullptr */) override;
|
||||
Status DeleteDir(const string& dirname, TransactionToken* token) override;
|
||||
|
||||
Status GetFileSize(
|
||||
const string& fname,
|
||||
uint64* file_size /*, TransactionToken* token = nullptr */) override;
|
||||
Status GetFileSize(const string& fname, TransactionToken* token,
|
||||
uint64* file_size) override;
|
||||
|
||||
Status RenameFile(
|
||||
const string& src,
|
||||
const string& target /*, TransactionToken* token = nullptr */) override;
|
||||
Status RenameFile(const string& src, const string& target,
|
||||
TransactionToken* token) override;
|
||||
|
||||
Status IsDirectory(
|
||||
const string& fname /*, TransactionToken* token = nullptr */) override;
|
||||
Status IsDirectory(const string& fname, TransactionToken* token) override;
|
||||
|
||||
Status DeleteRecursively(
|
||||
const string& dirname, int64* undeleted_files,
|
||||
int64* undeleted_dirs /*, TransactionToken* token = nullptr */) override;
|
||||
Status DeleteRecursively(const string& dirname, TransactionToken* token,
|
||||
int64* undeleted_files,
|
||||
int64* undeleted_dirs) override;
|
||||
|
||||
void FlushCaches(/* TransactionToken* token = nullptr */) override;
|
||||
void FlushCaches(TransactionToken* token) override;
|
||||
|
||||
/// Set an object to collect runtime statistics from the GcsFilesystem.
|
||||
void SetStats(GcsStatsInterface* stats);
|
||||
|
||||
@ -86,7 +86,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -133,7 +134,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -181,7 +183,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_Errors) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -228,7 +231,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_ReadAtEOF) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -269,7 +273,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_CachedOutOfRange) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -320,7 +325,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_CachedNotSequential) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -361,7 +367,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_Growing) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -408,7 +415,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_Buffered_ReadBackwards) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
StringPiece filename;
|
||||
TF_EXPECT_OK(file->Name(&filename));
|
||||
@ -450,7 +458,8 @@ TEST(GcsFileSystemTest,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, NewRandomAccessFile_WithLocationConstraintCaching) {
|
||||
@ -496,18 +505,18 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithLocationConstraintCaching) {
|
||||
string bucket = "gs://bucket/random_access.txt";
|
||||
string another_bucket = "gs://anotherbucket/random_access.txt";
|
||||
// Multiple calls should only cause one request to the location api.
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
|
||||
|
||||
// A new bucket should have one cache miss
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, nullptr, &file));
|
||||
// And then future calls to both should be cached
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(another_bucket, nullptr, &file));
|
||||
|
||||
// Trigger a flush, should then require one more call
|
||||
fs.FlushCaches();
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, &file));
|
||||
fs.FlushCaches(nullptr);
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile(bucket, nullptr, &file));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest,
|
||||
@ -533,10 +542,11 @@ TEST(GcsFileSystemTest,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
EXPECT_EQ(tensorflow::errors::FailedPrecondition(
|
||||
"Bucket 'bucket' is in 'barfoo' location, allowed locations "
|
||||
"are: (us-east1)."),
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
EXPECT_EQ(
|
||||
tensorflow::errors::FailedPrecondition(
|
||||
"Bucket 'bucket' is in 'barfoo' location, allowed locations "
|
||||
"are: (us-east1)."),
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache_DifferentN) {
|
||||
@ -565,7 +575,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache_DifferentN) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
char small_scratch[3];
|
||||
StringPiece result;
|
||||
@ -630,8 +641,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) {
|
||||
// We are instantiating this in an enclosed scope to make sure after the
|
||||
// unique ptr goes out of scope, we can still access result.
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt",
|
||||
nullptr, &file));
|
||||
|
||||
// Read the first chunk. The cache will be populated with the first block of
|
||||
// 9 bytes.
|
||||
@ -716,7 +727,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_Flush) {
|
||||
char scratch[100];
|
||||
StringPiece result;
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
// Read the first chunk. The cache will be populated with the first block of
|
||||
// 9 bytes.
|
||||
scratch[5] = 'x';
|
||||
@ -725,7 +737,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_Flush) {
|
||||
EXPECT_EQ(scratch[5], 'x'); // Make sure we only copied 4 bytes.
|
||||
// Flush caches and read the second chunk. This will be a cache miss, and
|
||||
// the same block will be fetched again.
|
||||
fs.FlushCaches();
|
||||
fs.FlushCaches(nullptr);
|
||||
TF_EXPECT_OK(file->Read(4, 4, &result, scratch));
|
||||
EXPECT_EQ("4567", result);
|
||||
}
|
||||
@ -772,8 +784,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) {
|
||||
// staleness of the filesystem is > 0, they will share the same blocks.
|
||||
std::unique_ptr<RandomAccessFile> file1;
|
||||
std::unique_ptr<RandomAccessFile> file2;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file1));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file2));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", nullptr, &file1));
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", nullptr, &file2));
|
||||
// Reading the first block from file1 should load it once.
|
||||
TF_EXPECT_OK(file1->Read(0, 8, &result, scratch));
|
||||
EXPECT_EQ("01234567", result);
|
||||
@ -834,7 +846,8 @@ TEST(GcsFileSystemTest,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
char scratch[5];
|
||||
StringPiece result;
|
||||
@ -864,7 +877,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoObjectName) {
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.NewRandomAccessFile("gs://bucket/", &file).code());
|
||||
fs.NewRandomAccessFile("gs://bucket/", nullptr, &file).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, NewRandomAccessFile_InconsistentRead) {
|
||||
@ -897,10 +910,11 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_InconsistentRead) {
|
||||
|
||||
// Stat the file first so that the file stats are cached.
|
||||
FileStatistics stat;
|
||||
TF_ASSERT_OK(fs.Stat("gs://bucket/random_access.txt", &stat));
|
||||
TF_ASSERT_OK(fs.Stat("gs://bucket/random_access.txt", nullptr, &stat));
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_ASSERT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_ASSERT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
char scratch[6];
|
||||
StringPiece result;
|
||||
@ -964,14 +978,16 @@ TEST(GcsFileSystemTest, NewWritableFile) {
|
||||
|
||||
// Read from the file first, to fill the block cache.
|
||||
std::unique_ptr<RandomAccessFile> rfile;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/writeable", &rfile));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/path/writeable", nullptr, &rfile));
|
||||
char scratch[100];
|
||||
StringPiece result;
|
||||
TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch));
|
||||
EXPECT_EQ("0123", result);
|
||||
// Open the writable file.
|
||||
std::unique_ptr<WritableFile> wfile;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable", &wfile));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewWritableFile("gs://bucket/path/writeable", nullptr, &wfile));
|
||||
TF_EXPECT_OK(wfile->Append("content1,"));
|
||||
int64 pos;
|
||||
TF_EXPECT_OK(wfile->Tell(&pos));
|
||||
@ -1055,7 +1071,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceeds) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::unique_ptr<WritableFile> file;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewWritableFile("gs://bucket/path/writeable.txt", nullptr, &file));
|
||||
|
||||
TF_EXPECT_OK(file->Append("content1,"));
|
||||
TF_EXPECT_OK(file->Append("content2"));
|
||||
@ -1127,7 +1144,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) {
|
||||
// Pull the file's first block into the cache. This will trigger the first
|
||||
// HTTP request to GCS.
|
||||
std::unique_ptr<RandomAccessFile> rfile;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/writeable", &rfile));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/path/writeable", nullptr, &rfile));
|
||||
char scratch[100];
|
||||
StringPiece result;
|
||||
TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch));
|
||||
@ -1135,7 +1153,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) {
|
||||
// Now write to the same file. Once the write succeeds, the cached block will
|
||||
// be flushed.
|
||||
std::unique_ptr<WritableFile> wfile;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable", &wfile));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewWritableFile("gs://bucket/path/writeable", nullptr, &wfile));
|
||||
TF_EXPECT_OK(wfile->Append("content1,"));
|
||||
TF_EXPECT_OK(wfile->Append("content2"));
|
||||
// Appending doesn't invalidate the read cache - only flushing does. This read
|
||||
@ -1213,7 +1232,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadAllAttemptsFail) {
|
||||
false /* compose append */);
|
||||
|
||||
std::unique_ptr<WritableFile> file;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewWritableFile("gs://bucket/path/writeable.txt", nullptr, &file));
|
||||
|
||||
TF_EXPECT_OK(file->Append("content1,"));
|
||||
TF_EXPECT_OK(file->Append("content2"));
|
||||
@ -1277,7 +1297,8 @@ TEST(GcsFileSystemTest, NewWritableFile_UploadReturns410) {
|
||||
|
||||
{
|
||||
std::unique_ptr<WritableFile> file;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewWritableFile("gs://bucket/path/writeable.txt", nullptr, &file));
|
||||
|
||||
TF_EXPECT_OK(file->Append("content1,"));
|
||||
TF_EXPECT_OK(file->Append("content2"));
|
||||
@ -1317,7 +1338,7 @@ TEST(GcsFileSystemTest, NewWritableFile_NoObjectName) {
|
||||
|
||||
std::unique_ptr<WritableFile> file;
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.NewWritableFile("gs://bucket/", &file).code());
|
||||
fs.NewWritableFile("gs://bucket/", nullptr, &file).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, NewAppendableFile) {
|
||||
@ -1382,12 +1403,14 @@ TEST(GcsFileSystemTest, NewAppendableFile) {
|
||||
// Create an appendable file. This should read the file from GCS, and pull its
|
||||
// contents into the block cache.
|
||||
std::unique_ptr<WritableFile> wfile;
|
||||
TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable", &wfile));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewAppendableFile("gs://bucket/path/appendable", nullptr, &wfile));
|
||||
TF_EXPECT_OK(wfile->Append("content2"));
|
||||
// Verify that the file contents are in the block cache. This read should not
|
||||
// trigger an HTTP request to GCS.
|
||||
std::unique_ptr<RandomAccessFile> rfile;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/appendable", &rfile));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/path/appendable", nullptr, &rfile));
|
||||
char scratch[100];
|
||||
StringPiece result;
|
||||
TF_EXPECT_OK(rfile->Read(0, 8, &result, scratch));
|
||||
@ -1416,7 +1439,7 @@ TEST(GcsFileSystemTest, NewAppendableFile_NoObjectName) {
|
||||
|
||||
std::unique_ptr<WritableFile> file;
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.NewAppendableFile("gs://bucket/", &file).code());
|
||||
fs.NewAppendableFile("gs://bucket/", nullptr, &file).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile) {
|
||||
@ -1450,7 +1473,7 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile) {
|
||||
|
||||
std::unique_ptr<ReadOnlyMemoryRegion> region;
|
||||
TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile(
|
||||
"gs://bucket/path/random_access.txt", ®ion));
|
||||
"gs://bucket/path/random_access.txt", nullptr, ®ion));
|
||||
|
||||
EXPECT_EQ(content, StringPiece(reinterpret_cast<const char*>(region->data()),
|
||||
region->length()));
|
||||
@ -1471,7 +1494,8 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile_NoObjectName) {
|
||||
|
||||
std::unique_ptr<ReadOnlyMemoryRegion> region;
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.NewReadOnlyMemoryRegionFromFile("gs://bucket/", ®ion).code());
|
||||
fs.NewReadOnlyMemoryRegionFromFile("gs://bucket/", nullptr, ®ion)
|
||||
.code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, FileExists_YesAsObject) {
|
||||
@ -1493,7 +1517,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsObject) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, FileExists_YesAsFolder) {
|
||||
@ -1523,7 +1547,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsFolder) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, FileExists_YesAsBucket) {
|
||||
@ -1549,8 +1573,8 @@ TEST(GcsFileSystemTest, FileExists_YesAsBucket) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket1"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket1/"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket1", nullptr));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket1/", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, FileExists_NotAsObjectOrFolder) {
|
||||
@ -1580,7 +1604,7 @@ TEST(GcsFileSystemTest, FileExists_NotAsObjectOrFolder) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
EXPECT_EQ(errors::Code::NOT_FOUND,
|
||||
fs.FileExists("gs://bucket/path/file1.txt").code());
|
||||
fs.FileExists("gs://bucket/path/file1.txt", nullptr).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, FileExists_NotAsBucket) {
|
||||
@ -1606,9 +1630,9 @@ TEST(GcsFileSystemTest, FileExists_NotAsBucket) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.FileExists("gs://bucket2/").code());
|
||||
fs.FileExists("gs://bucket2/", nullptr).code());
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.FileExists("gs://bucket2").code());
|
||||
fs.FileExists("gs://bucket2", nullptr).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, FileExists_StatCache) {
|
||||
@ -1648,8 +1672,8 @@ TEST(GcsFileSystemTest, FileExists_StatCache) {
|
||||
// The stat cache will ensure that repeated lookups don't trigger additional
|
||||
// HTTP requests.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder/"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt", nullptr));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder/", nullptr));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1672,8 +1696,8 @@ TEST(GcsFileSystemTest, FileExists_DirectoryMark) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/dir/"));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/dir/"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://bucket/dir/", nullptr));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/dir/", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, GetChildren_NoItems) {
|
||||
@ -1696,7 +1720,7 @@ TEST(GcsFileSystemTest, GetChildren_NoItems) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> children;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
|
||||
|
||||
EXPECT_EQ(std::vector<string>({"subpath/"}), children);
|
||||
}
|
||||
@ -1724,7 +1748,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> children;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
|
||||
|
||||
EXPECT_EQ(std::vector<string>({"file1.txt", "file3.txt", "subpath/"}),
|
||||
children);
|
||||
@ -1753,7 +1777,7 @@ TEST(GcsFileSystemTest, GetChildren_SelfDirectoryMarker) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> children;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
|
||||
|
||||
EXPECT_EQ(std::vector<string>({"file3.txt", "subpath/"}), children);
|
||||
}
|
||||
@ -1781,7 +1805,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles_NoSlash) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> children;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", nullptr, &children));
|
||||
|
||||
EXPECT_EQ(std::vector<string>({"file1.txt", "file3.txt", "subpath/"}),
|
||||
children);
|
||||
@ -1806,7 +1830,7 @@ TEST(GcsFileSystemTest, GetChildren_Root) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> children;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", &children));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", nullptr, &children));
|
||||
|
||||
EXPECT_EQ(0, children.size());
|
||||
}
|
||||
@ -1831,7 +1855,7 @@ TEST(GcsFileSystemTest, GetChildren_Empty) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> children;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", nullptr, &children));
|
||||
|
||||
EXPECT_EQ(0, children.size());
|
||||
}
|
||||
@ -1872,7 +1896,7 @@ TEST(GcsFileSystemTest, GetChildren_Pagination) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> children;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", nullptr, &children));
|
||||
|
||||
EXPECT_EQ(std::vector<string>({"file1.txt", "file3.txt", "subpath/",
|
||||
"file4.txt", "file5.txt"}),
|
||||
@ -1899,8 +1923,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_NoWildcard) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(
|
||||
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
|
||||
nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
|
||||
result);
|
||||
}
|
||||
@ -1927,7 +1951,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_BucketAndWildcard) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/file1.txt",
|
||||
"gs://bucket/path/file3.txt",
|
||||
"gs://bucket/path/subpath"}),
|
||||
@ -1956,7 +1980,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_Matches) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file2.txt", &result));
|
||||
TF_EXPECT_OK(
|
||||
fs.GetMatchingPaths("gs://bucket/path/*/file2.txt", nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
|
||||
result);
|
||||
}
|
||||
@ -1982,7 +2007,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SelfDirectoryMarker) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/file3.txt"}), result);
|
||||
}
|
||||
|
||||
@ -2007,7 +2032,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SlashInObjectName) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>(), result);
|
||||
}
|
||||
|
||||
@ -2032,7 +2057,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SlashInObjectNameEscaped) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/\\/*", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/\\/*", nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path//foo.txt"}), result);
|
||||
}
|
||||
|
||||
@ -2058,7 +2083,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_NoMatches) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file3.txt", &result));
|
||||
TF_EXPECT_OK(
|
||||
fs.GetMatchingPaths("gs://bucket/path/*/file3.txt", nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>(), result);
|
||||
}
|
||||
|
||||
@ -2077,7 +2103,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_OnlyWildcard) {
|
||||
|
||||
std::vector<string> result;
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.GetMatchingPaths("gs://*", &result).code());
|
||||
fs.GetMatchingPaths("gs://*", nullptr, &result).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, GetMatchingPaths_Cache) {
|
||||
@ -2113,11 +2139,11 @@ TEST(GcsFileSystemTest, GetMatchingPaths_Cache) {
|
||||
// any additional HTTP requests to GCS.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(
|
||||
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
|
||||
nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
|
||||
result);
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/file1.txt",
|
||||
"gs://bucket/path/file3.txt",
|
||||
"gs://bucket/path/subpath"}),
|
||||
@ -2155,17 +2181,17 @@ TEST(GcsFileSystemTest, GetMatchingPaths_Cache_Flush) {
|
||||
// This loop should trigger the first HTTP request to GCS.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(
|
||||
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
|
||||
nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
|
||||
result);
|
||||
}
|
||||
// After flushing caches, there should be another (identical) request to GCS.
|
||||
fs.FlushCaches();
|
||||
fs.FlushCaches(nullptr);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(
|
||||
fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt",
|
||||
nullptr, &result));
|
||||
EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
|
||||
result);
|
||||
}
|
||||
@ -2220,11 +2246,12 @@ TEST(GcsFileSystemTest, DeleteFile) {
|
||||
char scratch[100];
|
||||
StringPiece result;
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/file1.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/path/file1.txt", nullptr, &file));
|
||||
TF_EXPECT_OK(file->Read(0, 8, &result, scratch));
|
||||
EXPECT_EQ("01234567", result);
|
||||
// Deleting the file triggers the next HTTP request to GCS.
|
||||
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/path/file1.txt"));
|
||||
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/path/file1.txt", nullptr));
|
||||
// Re-reading the file causes its contents to be reloaded from GCS and not
|
||||
// from the block cache.
|
||||
TF_EXPECT_OK(file->Read(0, 8, &result, scratch));
|
||||
@ -2245,7 +2272,7 @@ TEST(GcsFileSystemTest, DeleteFile_NoObjectName) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.DeleteFile("gs://bucket/").code());
|
||||
fs.DeleteFile("gs://bucket/", nullptr).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, DeleteFile_StatCacheRemoved) {
|
||||
@ -2289,14 +2316,15 @@ TEST(GcsFileSystemTest, DeleteFile_StatCacheRemoved) {
|
||||
|
||||
// Stats the file first so the stat is cached.
|
||||
FileStatistics stat_before_deletion;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat_before_deletion));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat_before_deletion));
|
||||
EXPECT_EQ(1010, stat_before_deletion.length);
|
||||
|
||||
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/file.txt"));
|
||||
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/file.txt", nullptr));
|
||||
|
||||
FileStatistics stat_after_deletion;
|
||||
EXPECT_EQ(error::Code::NOT_FOUND,
|
||||
fs.Stat("gs://bucket/file.txt", &stat_after_deletion).code());
|
||||
EXPECT_EQ(
|
||||
error::Code::NOT_FOUND,
|
||||
fs.Stat("gs://bucket/file.txt", nullptr, &stat_after_deletion).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, DeleteDir_Empty) {
|
||||
@ -2317,7 +2345,7 @@ TEST(GcsFileSystemTest, DeleteDir_Empty) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/"));
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, DeleteDir_OnlyDirMarkerLeft) {
|
||||
@ -2346,7 +2374,7 @@ TEST(GcsFileSystemTest, DeleteDir_OnlyDirMarkerLeft) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/"));
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, DeleteDir_BucketOnly) {
|
||||
@ -2366,7 +2394,7 @@ TEST(GcsFileSystemTest, DeleteDir_BucketOnly) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://bucket"));
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://bucket", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, DeleteDir_NonEmpty) {
|
||||
@ -2389,7 +2417,7 @@ TEST(GcsFileSystemTest, DeleteDir_NonEmpty) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
EXPECT_EQ(error::Code::FAILED_PRECONDITION,
|
||||
fs.DeleteDir("gs://bucket/path/").code());
|
||||
fs.DeleteDir("gs://bucket/path/", nullptr).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, GetFileSize) {
|
||||
@ -2412,7 +2440,7 @@ TEST(GcsFileSystemTest, GetFileSize) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
uint64 size;
|
||||
TF_EXPECT_OK(fs.GetFileSize("gs://bucket/file.txt", &size));
|
||||
TF_EXPECT_OK(fs.GetFileSize("gs://bucket/file.txt", nullptr, &size));
|
||||
EXPECT_EQ(1010, size);
|
||||
}
|
||||
|
||||
@ -2431,7 +2459,7 @@ TEST(GcsFileSystemTest, GetFileSize_NoObjectName) {
|
||||
|
||||
uint64 size;
|
||||
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
|
||||
fs.GetFileSize("gs://bucket/", &size).code());
|
||||
fs.GetFileSize("gs://bucket/", nullptr, &size).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, RenameFile_Folder) {
|
||||
@ -2515,7 +2543,8 @@ TEST(GcsFileSystemTest, RenameFile_Folder) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path1", "gs://bucket/path2/"));
|
||||
TF_EXPECT_OK(
|
||||
fs.RenameFile("gs://bucket/path1", "gs://bucket/path2/", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, RenameFile_Object) {
|
||||
@ -2612,15 +2641,17 @@ TEST(GcsFileSystemTest, RenameFile_Object) {
|
||||
StringPiece result;
|
||||
std::unique_ptr<RandomAccessFile> src;
|
||||
std::unique_ptr<RandomAccessFile> dst;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/src.txt", &src));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/path/src.txt", nullptr, &src));
|
||||
TF_EXPECT_OK(src->Read(0, 8, &result, scratch));
|
||||
EXPECT_EQ("01234567", result);
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/dst.txt", &dst));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/path/dst.txt", nullptr, &dst));
|
||||
TF_EXPECT_OK(dst->Read(0, 8, &result, scratch));
|
||||
EXPECT_EQ("76543210", result);
|
||||
// Now rename src to dst. This should flush the block cache for both files.
|
||||
TF_EXPECT_OK(
|
||||
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
|
||||
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path/src.txt",
|
||||
"gs://bucket/path/dst.txt", nullptr));
|
||||
// Re-read both files. This should reload their contents from GCS.
|
||||
TF_EXPECT_OK(src->Read(0, 8, &result, scratch));
|
||||
EXPECT_EQ("89abcdef", result);
|
||||
@ -2690,14 +2721,16 @@ TEST(GcsFileSystemTest, RenameFile_Object_FlushTargetStatCache) {
|
||||
// Do an initial stat of the destination file to load their contents into the
|
||||
// stat cache.
|
||||
FileStatistics stat_before_renaming;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/path/dst.txt", &stat_before_renaming));
|
||||
TF_EXPECT_OK(
|
||||
fs.Stat("gs://bucket/path/dst.txt", nullptr, &stat_before_renaming));
|
||||
EXPECT_EQ(1000, stat_before_renaming.length);
|
||||
|
||||
TF_EXPECT_OK(
|
||||
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
|
||||
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path/src.txt",
|
||||
"gs://bucket/path/dst.txt", nullptr));
|
||||
|
||||
FileStatistics stat_after_renaming;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/path/dst.txt", &stat_after_renaming));
|
||||
TF_EXPECT_OK(
|
||||
fs.Stat("gs://bucket/path/dst.txt", nullptr, &stat_after_renaming));
|
||||
EXPECT_EQ(1010, stat_after_renaming.length);
|
||||
}
|
||||
|
||||
@ -2755,8 +2788,8 @@ TEST(GcsFileSystemTest, RenameFile_Object_DeletionRetried) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(
|
||||
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
|
||||
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path/src.txt",
|
||||
"gs://bucket/path/dst.txt", nullptr));
|
||||
}
|
||||
|
||||
/// Tests the case when rewrite couldn't complete in one RPC.
|
||||
@ -2797,10 +2830,10 @@ TEST(GcsFileSystemTest, RenameFile_Object_Incomplete) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
EXPECT_EQ(
|
||||
errors::Code::UNIMPLEMENTED,
|
||||
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")
|
||||
.code());
|
||||
EXPECT_EQ(errors::Code::UNIMPLEMENTED,
|
||||
fs.RenameFile("gs://bucket/path/src.txt",
|
||||
"gs://bucket/path/dst.txt", nullptr)
|
||||
.code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, Stat_Object) {
|
||||
@ -2823,7 +2856,7 @@ TEST(GcsFileSystemTest, Stat_Object) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
|
||||
EXPECT_EQ(1010, stat.length);
|
||||
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
|
||||
EXPECT_FALSE(stat.is_directory);
|
||||
@ -2857,7 +2890,7 @@ TEST(GcsFileSystemTest, Stat_Folder) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder", nullptr, &stat));
|
||||
EXPECT_EQ(0, stat.length);
|
||||
EXPECT_EQ(0, stat.mtime_nsec);
|
||||
EXPECT_TRUE(stat.is_directory);
|
||||
@ -2890,7 +2923,8 @@ TEST(GcsFileSystemTest, Stat_ObjectOrFolderNotFound) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
FileStatistics stat;
|
||||
EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/path", &stat).code());
|
||||
EXPECT_EQ(error::Code::NOT_FOUND,
|
||||
fs.Stat("gs://bucket/path", nullptr, &stat).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, Stat_Bucket) {
|
||||
@ -2911,7 +2945,7 @@ TEST(GcsFileSystemTest, Stat_Bucket) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/", nullptr, &stat));
|
||||
EXPECT_EQ(0, stat.length);
|
||||
EXPECT_EQ(0, stat.mtime_nsec);
|
||||
EXPECT_TRUE(stat.is_directory);
|
||||
@ -2935,7 +2969,8 @@ TEST(GcsFileSystemTest, Stat_BucketNotFound) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
FileStatistics stat;
|
||||
EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/", &stat).code());
|
||||
EXPECT_EQ(error::Code::NOT_FOUND,
|
||||
fs.Stat("gs://bucket/", nullptr, &stat).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, Stat_Cache) {
|
||||
@ -2976,11 +3011,11 @@ TEST(GcsFileSystemTest, Stat_Cache) {
|
||||
// HTTP requests to GCS.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
|
||||
EXPECT_EQ(1010, stat.length);
|
||||
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
|
||||
EXPECT_FALSE(stat.is_directory);
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder/", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder/", nullptr, &stat));
|
||||
EXPECT_EQ(0, stat.length);
|
||||
EXPECT_EQ(0, stat.mtime_nsec);
|
||||
EXPECT_TRUE(stat.is_directory);
|
||||
@ -3016,16 +3051,16 @@ TEST(GcsFileSystemTest, Stat_Cache_Flush) {
|
||||
// There should be a single HTTP request to GCS for fs.Stat in this loop.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
|
||||
EXPECT_EQ(1010, stat.length);
|
||||
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
|
||||
EXPECT_FALSE(stat.is_directory);
|
||||
}
|
||||
// After flushing caches, there should be a second request to GCS for fs.Stat.
|
||||
fs.FlushCaches();
|
||||
fs.FlushCaches(nullptr);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
|
||||
EXPECT_EQ(1010, stat.length);
|
||||
EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
|
||||
EXPECT_FALSE(stat.is_directory);
|
||||
@ -3052,7 +3087,7 @@ TEST(GcsFileSystemTest, Stat_FilenameEndingWithSlash) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/dir/", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/dir/", nullptr, &stat));
|
||||
EXPECT_EQ(5, stat.length);
|
||||
EXPECT_TRUE(stat.is_directory);
|
||||
}
|
||||
@ -3084,7 +3119,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotFound) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
EXPECT_EQ(error::Code::NOT_FOUND,
|
||||
fs.IsDirectory("gs://bucket/file.txt").code());
|
||||
fs.IsDirectory("gs://bucket/file.txt", nullptr).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, IsDirectory_NotDirectoryButObject) {
|
||||
@ -3115,7 +3150,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotDirectoryButObject) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
EXPECT_EQ(error::Code::FAILED_PRECONDITION,
|
||||
fs.IsDirectory("gs://bucket/file.txt").code());
|
||||
fs.IsDirectory("gs://bucket/file.txt", nullptr).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, IsDirectory_Yes) {
|
||||
@ -3145,8 +3180,8 @@ TEST(GcsFileSystemTest, IsDirectory_Yes) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder"));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder/"));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder", nullptr));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder/", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, IsDirectory_Bucket) {
|
||||
@ -3172,8 +3207,8 @@ TEST(GcsFileSystemTest, IsDirectory_Bucket) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket"));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/"));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket", nullptr));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, IsDirectory_BucketNotFound) {
|
||||
@ -3193,7 +3228,8 @@ TEST(GcsFileSystemTest, IsDirectory_BucketNotFound) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
EXPECT_EQ(error::Code::NOT_FOUND, fs.IsDirectory("gs://bucket/").code());
|
||||
EXPECT_EQ(error::Code::NOT_FOUND,
|
||||
fs.IsDirectory("gs://bucket/", nullptr).code());
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, CreateDir_Folder) {
|
||||
@ -3250,15 +3286,15 @@ TEST(GcsFileSystemTest, CreateDir_Folder) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath"));
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath", nullptr));
|
||||
// Check that when GCS returns the object already exists return that the
|
||||
// directory already exists.
|
||||
EXPECT_EQ(errors::AlreadyExists("gs://bucket/subpath"),
|
||||
fs.CreateDir("gs://bucket/subpath"));
|
||||
fs.CreateDir("gs://bucket/subpath", nullptr));
|
||||
// Check that when GCS returns the object already has a version (failed
|
||||
// precondition) return directory already exists.
|
||||
EXPECT_EQ(errors::AlreadyExists("gs://bucket/subpath"),
|
||||
fs.CreateDir("gs://bucket/subpath"));
|
||||
fs.CreateDir("gs://bucket/subpath", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, CreateDir_Bucket) {
|
||||
@ -3284,8 +3320,8 @@ TEST(GcsFileSystemTest, CreateDir_Bucket) {
|
||||
kTestTimeoutConfig, *kAllowedLocationsDefault,
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://bucket/"));
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://bucket"));
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://bucket/", nullptr));
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://bucket", nullptr));
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, DeleteRecursively_Ok) {
|
||||
@ -3357,8 +3393,8 @@ TEST(GcsFileSystemTest, DeleteRecursively_Ok) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
int64 undeleted_files, undeleted_dirs;
|
||||
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
|
||||
&undeleted_dirs));
|
||||
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", nullptr,
|
||||
&undeleted_files, &undeleted_dirs));
|
||||
EXPECT_EQ(0, undeleted_files);
|
||||
EXPECT_EQ(0, undeleted_dirs);
|
||||
}
|
||||
@ -3450,8 +3486,8 @@ TEST(GcsFileSystemTest, DeleteRecursively_DeletionErrors) {
|
||||
nullptr /* gcs additional header */, false /* compose append */);
|
||||
|
||||
int64 undeleted_files, undeleted_dirs;
|
||||
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
|
||||
&undeleted_dirs));
|
||||
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", nullptr,
|
||||
&undeleted_files, &undeleted_dirs));
|
||||
EXPECT_EQ(1, undeleted_files);
|
||||
EXPECT_EQ(1, undeleted_dirs);
|
||||
}
|
||||
@ -3486,7 +3522,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_NotAFolder) {
|
||||
|
||||
int64 undeleted_files, undeleted_dirs;
|
||||
EXPECT_EQ(error::Code::NOT_FOUND,
|
||||
fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
|
||||
fs.DeleteRecursively("gs://bucket/path", nullptr, &undeleted_files,
|
||||
&undeleted_dirs)
|
||||
.code());
|
||||
EXPECT_EQ(0, undeleted_files);
|
||||
@ -3501,7 +3537,7 @@ TEST(GcsFileSystemTest, NoConstraintsEnvironmentVariableTest) {
|
||||
|
||||
// Cover cache initialization code, any uninitialized cache will cause this to
|
||||
// fail
|
||||
fs1.FlushCaches();
|
||||
fs1.FlushCaches(nullptr);
|
||||
}
|
||||
|
||||
TEST(GcsFileSystemTest, BucketLocationConstraintEnvironmentVariableTest) {
|
||||
@ -3715,7 +3751,7 @@ TEST(GcsFileSystemTest, Stat_StatsRecording) {
|
||||
EXPECT_EQ(stats.fs_, &fs);
|
||||
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", nullptr, &stat));
|
||||
EXPECT_EQ(1, stats.stat_object_request_count_);
|
||||
}
|
||||
|
||||
@ -3742,7 +3778,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_StatsRecording) {
|
||||
EXPECT_EQ(stats.fs_, &fs);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("gs://bucket/random_access.txt", nullptr, &file));
|
||||
|
||||
char scratch[6];
|
||||
StringPiece result;
|
||||
@ -3883,8 +3920,8 @@ TEST(GcsFileSystemTest, NewAppendableFile_MultipleFlushesWithCompose) {
|
||||
// Create an appendable file. This should read the file from GCS, and pull its
|
||||
// contents into the block cache.
|
||||
std::unique_ptr<WritableFile> wfile;
|
||||
TF_EXPECT_OK(
|
||||
fs.NewAppendableFile("gs://bucket/some/path/appendable", &wfile));
|
||||
TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/some/path/appendable", nullptr,
|
||||
&wfile));
|
||||
TF_EXPECT_OK(wfile->Append(contents[1]));
|
||||
TF_EXPECT_OK(wfile->Flush());
|
||||
TF_EXPECT_OK(wfile->Append(contents[2]));
|
||||
@ -3981,7 +4018,8 @@ TEST(GcsFileSystemTest, NewAppendableFile_MultipleFlushesWithoutCompose) {
|
||||
// Create an appendable file. This should read the file from GCS, and pull its
|
||||
// contents into the block cache.
|
||||
std::unique_ptr<WritableFile> wfile;
|
||||
TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable", &wfile));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewAppendableFile("gs://bucket/path/appendable", nullptr, &wfile));
|
||||
TF_EXPECT_OK(wfile->Append(contents[1]));
|
||||
TF_EXPECT_OK(wfile->Flush());
|
||||
TF_EXPECT_OK(wfile->Append(contents[2]));
|
||||
|
||||
@ -107,7 +107,7 @@ class InterPlanetaryFileSystem : public NullFileSystem {
|
||||
|
||||
Status GetChildren(const string& dir, TransactionToken* token,
|
||||
std::vector<string>* result) override {
|
||||
TF_RETURN_IF_ERROR(IsDirectory(dir));
|
||||
TF_RETURN_IF_ERROR(IsDirectory(dir, nullptr));
|
||||
string parsed_path;
|
||||
ParsePath(dir, &parsed_path);
|
||||
result->insert(result->begin(), celestial_bodies_[parsed_path].begin(),
|
||||
@ -153,7 +153,7 @@ class InterPlanetaryFileSystem : public NullFileSystem {
|
||||
string Match(InterPlanetaryFileSystem* ipfs, const string& suffix_pattern) {
|
||||
std::vector<string> results;
|
||||
Status s =
|
||||
ipfs->GetMatchingPaths(ipfs->JoinPath(kPrefix, suffix_pattern), &results);
|
||||
ipfs->GetMatchingPaths(ipfs->JoinPath(kPrefix, suffix_pattern), nullptr, &results);
|
||||
if (!s.ok()) {
|
||||
return s.ToString();
|
||||
} else {
|
||||
@ -180,18 +180,18 @@ TEST(InterPlanetaryFileSystemTest, IPFSMatch) {
|
||||
// Returns Jupiter's and Earth's moons.
|
||||
EXPECT_EQ(Match(&ipfs, "*/*"),
|
||||
"Earth/Moon,Jupiter/Europa,Jupiter/Ganymede,Jupiter/Io");
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet0")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet1")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet0"), nullptr));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "Planet1"), nullptr));
|
||||
EXPECT_EQ(Match(&ipfs, "Planet[0-1]"), "Planet0,Planet1");
|
||||
EXPECT_EQ(Match(&ipfs, "Planet?"), "Planet0,Planet1");
|
||||
}
|
||||
|
||||
TEST(InterPlanetaryFileSystemTest, MatchSimple) {
|
||||
InterPlanetaryFileSystem ipfs;
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-00")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-0a")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-01")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-aaa")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-00"), nullptr));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-0a"), nullptr));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-01"), nullptr));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "match-aaa"), nullptr));
|
||||
|
||||
EXPECT_EQ(Match(&ipfs, "match-*"), "match-00,match-01,match-0a,match-aaa");
|
||||
EXPECT_EQ(Match(&ipfs, "match-0[0-9]"), "match-00,match-01");
|
||||
@ -204,8 +204,8 @@ TEST(InterPlanetaryFileSystemTest, MatchSimple) {
|
||||
// that evil_directory isn't accessed.
|
||||
TEST(InterPlanetaryFileSystemTest, MatchOnlyNeeded) {
|
||||
InterPlanetaryFileSystem ipfs;
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "abcd")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "evil_directory")));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "abcd"), nullptr));
|
||||
TF_EXPECT_OK(ipfs.CreateDir(ipfs.JoinPath(kPrefix, "evil_directory"), nullptr));
|
||||
|
||||
EXPECT_EQ(Match(&ipfs, "abcd"), "abcd");
|
||||
}
|
||||
@ -213,13 +213,13 @@ TEST(InterPlanetaryFileSystemTest, MatchOnlyNeeded) {
|
||||
TEST(InterPlanetaryFileSystemTest, MatchDirectory) {
|
||||
InterPlanetaryFileSystem ipfs;
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/x")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/x"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-0a/abc/x")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-0a/abc/x"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/x")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/x"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-aaa/abc/x")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-aaa/abc/x"), nullptr));
|
||||
|
||||
EXPECT_EQ(Match(&ipfs, "match-*/abc/x"),
|
||||
"match-00/abc/x,match-01/abc/x,match-0a/abc/x,match-aaa/abc/x");
|
||||
@ -234,19 +234,19 @@ TEST(InterPlanetaryFileSystemTest, MatchDirectory) {
|
||||
TEST(InterPlanetaryFileSystemTest, MatchMultipleWildcards) {
|
||||
InterPlanetaryFileSystem ipfs;
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/00")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/00"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/01")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/01"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/09")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-00/abc/09"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/00")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/00"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/04")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/04"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/10")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-01/abc/10"), nullptr));
|
||||
TF_EXPECT_OK(
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-02/abc/00")));
|
||||
ipfs.RecursivelyCreateDir(ipfs.JoinPath(kPrefix, "match-02/abc/00"), nullptr));
|
||||
|
||||
EXPECT_EQ(Match(&ipfs, "match-0[0-1]/abc/0[0-8]"),
|
||||
"match-00/abc/00,match-00/abc/01,match-01/abc/00,match-01/abc/04");
|
||||
@ -295,10 +295,10 @@ class TestFileSystem : public NullFileSystem {
|
||||
TEST(TestFileSystemTest, RootDirectory) {
|
||||
TestFileSystem fs;
|
||||
std::vector<string> results;
|
||||
auto ret = fs.GetMatchingPaths("./te*", &results);
|
||||
auto ret = fs.GetMatchingPaths("./te*", nullptr, &results);
|
||||
EXPECT_EQ(1, results.size());
|
||||
EXPECT_EQ("./test", results[0]);
|
||||
ret = fs.GetMatchingPaths("te*", &results);
|
||||
ret = fs.GetMatchingPaths("te*", nullptr, &results);
|
||||
EXPECT_EQ(1, results.size());
|
||||
EXPECT_EQ("./test", results[0]);
|
||||
}
|
||||
|
||||
@ -54,15 +54,17 @@ class RetryingFileSystem : public FileSystem {
|
||||
|
||||
Status FileExists(const string& fname, TransactionToken* token) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &fname]() { return base_file_system_->FileExists(fname); },
|
||||
[this, &fname, token]() {
|
||||
return base_file_system_->FileExists(fname, token);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
Status GetChildren(const string& dir, TransactionToken* token,
|
||||
std::vector<string>* result) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &dir, result]() {
|
||||
return base_file_system_->GetChildren(dir, result);
|
||||
[this, &dir, result, token]() {
|
||||
return base_file_system_->GetChildren(dir, token, result);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
@ -70,8 +72,8 @@ class RetryingFileSystem : public FileSystem {
|
||||
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
|
||||
std::vector<string>* result) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &pattern, result]() {
|
||||
return base_file_system_->GetMatchingPaths(pattern, result);
|
||||
[this, &pattern, result, token]() {
|
||||
return base_file_system_->GetMatchingPaths(pattern, token, result);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
@ -79,33 +81,41 @@ class RetryingFileSystem : public FileSystem {
|
||||
Status Stat(const string& fname, TransactionToken* token,
|
||||
FileStatistics* stat) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &fname, stat]() { return base_file_system_->Stat(fname, stat); },
|
||||
[this, &fname, stat, token]() {
|
||||
return base_file_system_->Stat(fname, token, stat);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
Status DeleteFile(const string& fname, TransactionToken* token) override {
|
||||
return RetryingUtils::DeleteWithRetries(
|
||||
[this, &fname]() { return base_file_system_->DeleteFile(fname); },
|
||||
[this, &fname, token]() {
|
||||
return base_file_system_->DeleteFile(fname, token);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
Status CreateDir(const string& dirname, TransactionToken* token) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &dirname]() { return base_file_system_->CreateDir(dirname); },
|
||||
[this, &dirname, token]() {
|
||||
return base_file_system_->CreateDir(dirname, token);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
Status DeleteDir(const string& dirname, TransactionToken* token) override {
|
||||
return RetryingUtils::DeleteWithRetries(
|
||||
[this, &dirname]() { return base_file_system_->DeleteDir(dirname); },
|
||||
[this, &dirname, token]() {
|
||||
return base_file_system_->DeleteDir(dirname, token);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
Status GetFileSize(const string& fname, TransactionToken* token,
|
||||
uint64* file_size) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &fname, file_size]() {
|
||||
return base_file_system_->GetFileSize(fname, file_size);
|
||||
[this, &fname, file_size, token]() {
|
||||
return base_file_system_->GetFileSize(fname, token, file_size);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
@ -113,15 +123,17 @@ class RetryingFileSystem : public FileSystem {
|
||||
Status RenameFile(const string& src, const string& target,
|
||||
TransactionToken* token) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &src, &target]() {
|
||||
return base_file_system_->RenameFile(src, target);
|
||||
[this, &src, &target, token]() {
|
||||
return base_file_system_->RenameFile(src, target, token);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
Status IsDirectory(const string& dirname, TransactionToken* token) override {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &dirname]() { return base_file_system_->IsDirectory(dirname); },
|
||||
[this, &dirname, token]() {
|
||||
return base_file_system_->IsDirectory(dirname, token);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
@ -134,15 +146,15 @@ class RetryingFileSystem : public FileSystem {
|
||||
int64* undeleted_files,
|
||||
int64* undeleted_dirs) override {
|
||||
return RetryingUtils::DeleteWithRetries(
|
||||
[this, &dirname, undeleted_files, undeleted_dirs]() {
|
||||
return base_file_system_->DeleteRecursively(dirname, undeleted_files,
|
||||
undeleted_dirs);
|
||||
[this, &dirname, token, undeleted_files, undeleted_dirs]() {
|
||||
return base_file_system_->DeleteRecursively(
|
||||
dirname, token, undeleted_files, undeleted_dirs);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
void FlushCaches(TransactionToken* token) override {
|
||||
base_file_system_->FlushCaches();
|
||||
base_file_system_->FlushCaches(token);
|
||||
}
|
||||
|
||||
Underlying* underlying() const { return base_file_system_.get(); }
|
||||
@ -229,8 +241,9 @@ Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
|
||||
std::unique_ptr<RandomAccessFile>* result) {
|
||||
std::unique_ptr<RandomAccessFile> base_file;
|
||||
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
|
||||
[this, &filename, &base_file]() {
|
||||
return base_file_system_->NewRandomAccessFile(filename, &base_file);
|
||||
[this, &filename, &base_file, token]() {
|
||||
return base_file_system_->NewRandomAccessFile(filename, token,
|
||||
&base_file);
|
||||
},
|
||||
retry_config_));
|
||||
result->reset(new retrying_internals::RetryingRandomAccessFile(
|
||||
@ -244,8 +257,8 @@ Status RetryingFileSystem<Underlying>::NewWritableFile(
|
||||
std::unique_ptr<WritableFile>* result) {
|
||||
std::unique_ptr<WritableFile> base_file;
|
||||
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
|
||||
[this, &filename, &base_file]() {
|
||||
return base_file_system_->NewWritableFile(filename, &base_file);
|
||||
[this, &filename, &base_file, token]() {
|
||||
return base_file_system_->NewWritableFile(filename, token, &base_file);
|
||||
},
|
||||
retry_config_));
|
||||
result->reset(new retrying_internals::RetryingWritableFile(
|
||||
@ -259,8 +272,9 @@ Status RetryingFileSystem<Underlying>::NewAppendableFile(
|
||||
std::unique_ptr<WritableFile>* result) {
|
||||
std::unique_ptr<WritableFile> base_file;
|
||||
TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
|
||||
[this, &filename, &base_file]() {
|
||||
return base_file_system_->NewAppendableFile(filename, &base_file);
|
||||
[this, &filename, &base_file, token]() {
|
||||
return base_file_system_->NewAppendableFile(filename, token,
|
||||
&base_file);
|
||||
},
|
||||
retry_config_));
|
||||
result->reset(new retrying_internals::RetryingWritableFile(
|
||||
@ -273,9 +287,9 @@ Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
|
||||
const string& filename, TransactionToken* token,
|
||||
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
|
||||
return RetryingUtils::CallWithRetries(
|
||||
[this, &filename, result]() {
|
||||
return base_file_system_->NewReadOnlyMemoryRegionFromFile(filename,
|
||||
result);
|
||||
[this, &filename, result, token]() {
|
||||
return base_file_system_->NewReadOnlyMemoryRegionFromFile(
|
||||
filename, token, result);
|
||||
},
|
||||
retry_config_);
|
||||
}
|
||||
|
||||
@ -175,8 +175,7 @@ class MockFileSystem : public FileSystem {
|
||||
return calls_.ConsumeNextCall("DeleteRecursively");
|
||||
}
|
||||
|
||||
void FlushCaches(
|
||||
TransactionToken* token) override {
|
||||
void FlushCaches(TransactionToken* token) override {
|
||||
if (flushed_) {
|
||||
*flushed_ = true;
|
||||
}
|
||||
@ -208,7 +207,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_ImmediateSuccess) {
|
||||
|
||||
// Retrieve the wrapped random access file.
|
||||
std::unique_ptr<RandomAccessFile> random_access_file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
|
||||
|
||||
// Use it and check the results.
|
||||
StringPiece result;
|
||||
@ -239,7 +239,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_SuccessWith3rdTry) {
|
||||
|
||||
// Retrieve the wrapped random access file.
|
||||
std::unique_ptr<RandomAccessFile> random_access_file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
|
||||
|
||||
// Use it and check the results.
|
||||
StringPiece result;
|
||||
@ -264,7 +265,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_AllRetriesFailed) {
|
||||
|
||||
// Retrieve the wrapped random access file.
|
||||
std::unique_ptr<RandomAccessFile> random_access_file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
|
||||
|
||||
// Use it and check the results.
|
||||
StringPiece result;
|
||||
@ -294,7 +296,8 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_NoRetriesForSomeErrors) {
|
||||
|
||||
// Retrieve the wrapped random access file.
|
||||
std::unique_ptr<RandomAccessFile> random_access_file;
|
||||
TF_EXPECT_OK(fs.NewRandomAccessFile("filename.txt", &random_access_file));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewRandomAccessFile("filename.txt", nullptr, &random_access_file));
|
||||
|
||||
// Use it and check the results.
|
||||
StringPiece result;
|
||||
@ -322,7 +325,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_ImmediateSuccess) {
|
||||
|
||||
// Retrieve the wrapped writable file.
|
||||
std::unique_ptr<WritableFile> writable_file;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
|
||||
|
||||
StringPiece result;
|
||||
TF_EXPECT_OK(writable_file->Name(&result));
|
||||
@ -353,7 +356,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_SuccessWith3rdTry) {
|
||||
|
||||
// Retrieve the wrapped writable file.
|
||||
std::unique_ptr<WritableFile> writable_file;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
|
||||
|
||||
// Use it and check the results.
|
||||
TF_EXPECT_OK(writable_file->Sync());
|
||||
@ -380,7 +383,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_SuccessWith3rdTry_ViaDestructor) {
|
||||
|
||||
// Retrieve the wrapped writable file.
|
||||
std::unique_ptr<WritableFile> writable_file;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
|
||||
|
||||
writable_file.reset(); // Trigger Close() via destructor.
|
||||
}
|
||||
@ -406,7 +409,7 @@ TEST(RetryingFileSystemTest, NewAppendableFile_SuccessWith3rdTry) {
|
||||
|
||||
// Retrieve the wrapped appendable file.
|
||||
std::unique_ptr<WritableFile> writable_file;
|
||||
TF_EXPECT_OK(fs.NewAppendableFile("filename.txt", &writable_file));
|
||||
TF_EXPECT_OK(fs.NewAppendableFile("filename.txt", nullptr, &writable_file));
|
||||
|
||||
// Use it and check the results.
|
||||
TF_EXPECT_OK(writable_file->Sync());
|
||||
@ -430,7 +433,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_AllRetriesFailed) {
|
||||
|
||||
// Retrieve the wrapped writable file.
|
||||
std::unique_ptr<WritableFile> writable_file;
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", &writable_file));
|
||||
TF_EXPECT_OK(fs.NewWritableFile("filename.txt", nullptr, &writable_file));
|
||||
|
||||
// Use it and check the results.
|
||||
const auto& status = writable_file->Sync();
|
||||
@ -450,7 +453,8 @@ TEST(RetryingFileSystemTest,
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
std::unique_ptr<ReadOnlyMemoryRegion> result;
|
||||
TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile("filename.txt", &result));
|
||||
TF_EXPECT_OK(
|
||||
fs.NewReadOnlyMemoryRegionFromFile("filename.txt", nullptr, &result));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, NewReadOnlyMemoryRegionFromFile_AllRetriesFailed) {
|
||||
@ -463,7 +467,7 @@ TEST(RetryingFileSystemTest, NewReadOnlyMemoryRegionFromFile_AllRetriesFailed) {
|
||||
|
||||
std::unique_ptr<ReadOnlyMemoryRegion> result;
|
||||
const auto& status =
|
||||
fs.NewReadOnlyMemoryRegionFromFile("filename.txt", &result);
|
||||
fs.NewReadOnlyMemoryRegionFromFile("filename.txt", nullptr, &result);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -479,7 +483,7 @@ TEST(RetryingFileSystemTest, GetChildren_SuccessWith2ndTry) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://path", &result));
|
||||
TF_EXPECT_OK(fs.GetChildren("gs://path", nullptr, &result));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, GetChildren_AllRetriesFailed) {
|
||||
@ -490,7 +494,7 @@ TEST(RetryingFileSystemTest, GetChildren_AllRetriesFailed) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
std::vector<string> result;
|
||||
const auto& status = fs.GetChildren("gs://path", &result);
|
||||
const auto& status = fs.GetChildren("gs://path", nullptr, &result);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -506,7 +510,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_SuccessWith2ndTry) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
std::vector<string> result;
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://path/dir", &result));
|
||||
TF_EXPECT_OK(fs.GetMatchingPaths("gs://path/dir", nullptr, &result));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, GetMatchingPaths_AllRetriesFailed) {
|
||||
@ -518,7 +522,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_AllRetriesFailed) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
std::vector<string> result;
|
||||
const auto& status = fs.GetMatchingPaths("gs://path/dir", &result);
|
||||
const auto& status = fs.GetMatchingPaths("gs://path/dir", nullptr, &result);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -532,7 +536,7 @@ TEST(RetryingFileSystemTest, DeleteFile_SuccessWith2ndTry) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
TF_EXPECT_OK(fs.DeleteFile("gs://path/file.txt"));
|
||||
TF_EXPECT_OK(fs.DeleteFile("gs://path/file.txt", nullptr));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, DeleteFile_AllRetriesFailed) {
|
||||
@ -542,7 +546,7 @@ TEST(RetryingFileSystemTest, DeleteFile_AllRetriesFailed) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
const auto& status = fs.DeleteFile("gs://path/file.txt");
|
||||
const auto& status = fs.DeleteFile("gs://path/file.txt", nullptr);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -556,7 +560,7 @@ TEST(RetryingFileSystemTest, CreateDir_SuccessWith2ndTry) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://path/newdir"));
|
||||
TF_EXPECT_OK(fs.CreateDir("gs://path/newdir", nullptr));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, CreateDir_AllRetriesFailed) {
|
||||
@ -566,7 +570,7 @@ TEST(RetryingFileSystemTest, CreateDir_AllRetriesFailed) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
const auto& status = fs.CreateDir("gs://path/newdir");
|
||||
const auto& status = fs.CreateDir("gs://path/newdir", nullptr);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -580,7 +584,7 @@ TEST(RetryingFileSystemTest, DeleteDir_SuccessWith2ndTry) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://path/dir"));
|
||||
TF_EXPECT_OK(fs.DeleteDir("gs://path/dir", nullptr));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, DeleteDir_AllRetriesFailed) {
|
||||
@ -590,7 +594,7 @@ TEST(RetryingFileSystemTest, DeleteDir_AllRetriesFailed) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
const auto& status = fs.DeleteDir("gs://path/dir");
|
||||
const auto& status = fs.DeleteDir("gs://path/dir", nullptr);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -606,7 +610,7 @@ TEST(RetryingFileSystemTest, GetFileSize_SuccessWith2ndTry) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
uint64 size;
|
||||
TF_EXPECT_OK(fs.GetFileSize("gs://path/file.txt", &size));
|
||||
TF_EXPECT_OK(fs.GetFileSize("gs://path/file.txt", nullptr, &size));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, GetFileSize_AllRetriesFailed) {
|
||||
@ -617,7 +621,7 @@ TEST(RetryingFileSystemTest, GetFileSize_AllRetriesFailed) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
uint64 size;
|
||||
const auto& status = fs.GetFileSize("gs://path/file.txt", &size);
|
||||
const auto& status = fs.GetFileSize("gs://path/file.txt", nullptr, &size);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -631,7 +635,7 @@ TEST(RetryingFileSystemTest, RenameFile_SuccessWith2ndTry) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
TF_EXPECT_OK(fs.RenameFile("old_name", "new_name"));
|
||||
TF_EXPECT_OK(fs.RenameFile("old_name", "new_name", nullptr));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, RenameFile_AllRetriesFailed) {
|
||||
@ -641,7 +645,7 @@ TEST(RetryingFileSystemTest, RenameFile_AllRetriesFailed) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
const auto& status = fs.RenameFile("old_name", "new_name");
|
||||
const auto& status = fs.RenameFile("old_name", "new_name", nullptr);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -656,7 +660,7 @@ TEST(RetryingFileSystemTest, Stat_SuccessWith2ndTry) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
FileStatistics stat;
|
||||
TF_EXPECT_OK(fs.Stat("file_name", &stat));
|
||||
TF_EXPECT_OK(fs.Stat("file_name", nullptr, &stat));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, Stat_AllRetriesFailed) {
|
||||
@ -667,7 +671,7 @@ TEST(RetryingFileSystemTest, Stat_AllRetriesFailed) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
FileStatistics stat;
|
||||
const auto& status = fs.Stat("file_name", &stat);
|
||||
const auto& status = fs.Stat("file_name", nullptr, &stat);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -679,7 +683,7 @@ TEST(RetryingFileSystemTest, FileExists_AllRetriesFailed) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
const auto& status = fs.FileExists("file_name");
|
||||
const auto& status = fs.FileExists("file_name", nullptr);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -693,7 +697,7 @@ TEST(RetryingFileSystemTest, FileExists_SuccessWith2ndTry) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
TF_EXPECT_OK(fs.FileExists("gs://path/dir"));
|
||||
TF_EXPECT_OK(fs.FileExists("gs://path/dir", nullptr));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, IsDirectory_SuccessWith2ndTry) {
|
||||
@ -706,7 +710,7 @@ TEST(RetryingFileSystemTest, IsDirectory_SuccessWith2ndTry) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://path/dir"));
|
||||
TF_EXPECT_OK(fs.IsDirectory("gs://path/dir", nullptr));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, IsDirectory_AllRetriesFailed) {
|
||||
@ -716,7 +720,7 @@ TEST(RetryingFileSystemTest, IsDirectory_AllRetriesFailed) {
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
|
||||
const auto& status = fs.IsDirectory("gs://path/dir");
|
||||
const auto& status = fs.IsDirectory("gs://path/dir", nullptr);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -732,8 +736,8 @@ TEST(RetryingFileSystemTest, DeleteRecursively_SuccessWith2ndTry) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
int64 undeleted_files, undeleted_dirs;
|
||||
|
||||
TF_EXPECT_OK(
|
||||
fs.DeleteRecursively("gs://path/dir", &undeleted_files, &undeleted_dirs));
|
||||
TF_EXPECT_OK(fs.DeleteRecursively("gs://path/dir", nullptr, &undeleted_files,
|
||||
&undeleted_dirs));
|
||||
}
|
||||
|
||||
TEST(RetryingFileSystemTest, DeleteRecursively_AllRetriesFailed) {
|
||||
@ -745,8 +749,8 @@ TEST(RetryingFileSystemTest, DeleteRecursively_AllRetriesFailed) {
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
int64 undeleted_files, undeleted_dirs;
|
||||
|
||||
const auto& status =
|
||||
fs.DeleteRecursively("gs://path/dir", &undeleted_files, &undeleted_dirs);
|
||||
const auto& status = fs.DeleteRecursively("gs://path/dir", nullptr,
|
||||
&undeleted_files, &undeleted_dirs);
|
||||
EXPECT_TRUE(absl::StrContains(status.error_message(), "Retriable error #10"))
|
||||
<< status;
|
||||
}
|
||||
@ -757,7 +761,7 @@ TEST(RetryingFileSystemTest, FlushCaches) {
|
||||
std::unique_ptr<MockFileSystem> base_fs(new MockFileSystem(none, &flushed));
|
||||
RetryingFileSystem<MockFileSystem> fs(
|
||||
std::move(base_fs), RetryConfig(0 /* init_delay_time_us */));
|
||||
fs.FlushCaches();
|
||||
fs.FlushCaches(nullptr);
|
||||
EXPECT_TRUE(flushed);
|
||||
}
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ static const char* kS3TempFileTemplate = "/tmp/s3_filesystem_XXXXXX";
|
||||
#endif
|
||||
static const char* kS3FileSystemAllocationTag = "S3FileSystemAllocation";
|
||||
static const size_t kS3ReadAppendableFileBufferSize = 1024 * 1024;
|
||||
static const int64 kS3TimeoutMsec = 300000; // 5 min
|
||||
static const int64 kS3TimeoutMsec = 300000; // 5 min
|
||||
static const uint64 kS3MultiPartUploadChunkSize = 50 * 1024 * 1024; // 50 MB
|
||||
static const uint64 kS3MultiPartDownloadChunkSize = 2 * 1024 * 1024; // 50 MB
|
||||
static const int kS3GetChildrenMaxKeys = 100;
|
||||
@ -568,14 +568,14 @@ S3FileSystem::GetExecutor() {
|
||||
}
|
||||
|
||||
Status S3FileSystem::NewRandomAccessFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<RandomAccessFile>* result /*, TransactionToken* token */) {
|
||||
return NewRandomAccessFile(fname, result, true);
|
||||
const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<RandomAccessFile>* result) {
|
||||
return NewRandomAccessFile(fname, token, result, true);
|
||||
}
|
||||
|
||||
Status S3FileSystem::NewRandomAccessFile(
|
||||
const string& fname, std::unique_ptr<RandomAccessFile>* result,
|
||||
bool use_multi_part_download /*, TransactionToken* token */) {
|
||||
const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<RandomAccessFile>* result, bool use_multi_part_download) {
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
|
||||
|
||||
@ -588,9 +588,9 @@ Status S3FileSystem::NewRandomAccessFile(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::NewWritableFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::NewWritableFile(const string& fname,
|
||||
TransactionToken* token,
|
||||
std::unique_ptr<WritableFile>* result) {
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
|
||||
result->reset(new S3WritableFile(
|
||||
@ -601,11 +601,11 @@ Status S3FileSystem::NewWritableFile(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::NewAppendableFile(
|
||||
const string& fname,
|
||||
std::unique_ptr<WritableFile>* result /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::NewAppendableFile(const string& fname,
|
||||
TransactionToken* token,
|
||||
std::unique_ptr<WritableFile>* result) {
|
||||
std::unique_ptr<RandomAccessFile> reader;
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
|
||||
std::unique_ptr<char[]> buffer(new char[kS3ReadAppendableFileBufferSize]);
|
||||
Status status;
|
||||
uint64 offset = 0;
|
||||
@ -637,14 +637,14 @@ Status S3FileSystem::NewAppendableFile(
|
||||
}
|
||||
|
||||
Status S3FileSystem::NewReadOnlyMemoryRegionFromFile(
|
||||
const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>*
|
||||
result /*, TransactionToken* token */) {
|
||||
const string& fname, TransactionToken* token,
|
||||
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
|
||||
uint64 size;
|
||||
TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
|
||||
TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
|
||||
std::unique_ptr<char[]> data(new char[size]);
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
|
||||
TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
|
||||
|
||||
StringPiece piece;
|
||||
TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
|
||||
@ -653,16 +653,14 @@ Status S3FileSystem::NewReadOnlyMemoryRegionFromFile(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::FileExists(
|
||||
const string& fname /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::FileExists(const string& fname, TransactionToken* token) {
|
||||
FileStatistics stats;
|
||||
TF_RETURN_IF_ERROR(this->Stat(fname, &stats));
|
||||
TF_RETURN_IF_ERROR(this->Stat(fname, token, &stats));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::GetChildren(
|
||||
const string& dir,
|
||||
std::vector<string>* result /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::GetChildren(const string& dir, TransactionToken* token,
|
||||
std::vector<string>* result) {
|
||||
VLOG(1) << "GetChildren for path: " << dir;
|
||||
string bucket, prefix;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(dir, true, &bucket, &prefix));
|
||||
@ -709,8 +707,8 @@ Status S3FileSystem::GetChildren(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::Stat(
|
||||
const string& fname, FileStatistics* stats /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::Stat(const string& fname, TransactionToken* token,
|
||||
FileStatistics* stats) {
|
||||
VLOG(1) << "Stat on path: " << fname;
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(fname, true, &bucket, &object));
|
||||
@ -772,14 +770,13 @@ Status S3FileSystem::Stat(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::GetMatchingPaths(
|
||||
const string& pattern,
|
||||
std::vector<string>* results /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::GetMatchingPaths(const string& pattern,
|
||||
TransactionToken* token,
|
||||
std::vector<string>* results) {
|
||||
return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
|
||||
}
|
||||
|
||||
Status S3FileSystem::DeleteFile(
|
||||
const string& fname /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::DeleteFile(const string& fname, TransactionToken* token) {
|
||||
VLOG(1) << "DeleteFile: " << fname;
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
|
||||
@ -795,8 +792,7 @@ Status S3FileSystem::DeleteFile(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::CreateDir(
|
||||
const string& dirname /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::CreateDir(const string& dirname, TransactionToken* token) {
|
||||
VLOG(1) << "CreateDir: " << dirname;
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(dirname, true, &bucket, &object));
|
||||
@ -815,16 +811,15 @@ Status S3FileSystem::CreateDir(
|
||||
if (filename.back() != '/') {
|
||||
filename.push_back('/');
|
||||
}
|
||||
if (!this->FileExists(filename).ok()) {
|
||||
if (!this->FileExists(filename,token).ok()) {
|
||||
std::unique_ptr<WritableFile> file;
|
||||
TF_RETURN_IF_ERROR(NewWritableFile(filename, &file));
|
||||
TF_RETURN_IF_ERROR(NewWritableFile(filename,token, &file));
|
||||
TF_RETURN_IF_ERROR(file->Close());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::DeleteDir(
|
||||
const string& dirname /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::DeleteDir(const string& dirname, TransactionToken* token) {
|
||||
VLOG(1) << "DeleteDir: " << dirname;
|
||||
string bucket, object;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(dirname, false, &bucket, &object));
|
||||
@ -855,7 +850,7 @@ Status S3FileSystem::DeleteDir(
|
||||
if (filename.back() != '/') {
|
||||
filename.push_back('/');
|
||||
}
|
||||
return DeleteFile(filename);
|
||||
return DeleteFile(filename,token);
|
||||
}
|
||||
} else {
|
||||
TF_RETURN_IF_ERROR(CheckForbiddenError(listObjectsOutcome.GetError()));
|
||||
@ -863,10 +858,10 @@ Status S3FileSystem::DeleteDir(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::GetFileSize(
|
||||
const string& fname, uint64* file_size /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::GetFileSize(const string& fname, TransactionToken* token,
|
||||
uint64* file_size) {
|
||||
FileStatistics stats;
|
||||
TF_RETURN_IF_ERROR(this->Stat(fname, &stats));
|
||||
TF_RETURN_IF_ERROR(this->Stat(fname, token, &stats));
|
||||
*file_size = stats.length;
|
||||
return Status::OK();
|
||||
}
|
||||
@ -917,7 +912,7 @@ Status S3FileSystem::CopyFile(const Aws::String& source_bucket,
|
||||
Aws::String source_full_path = Aws::String("s3://") + source;
|
||||
uint64 file_length;
|
||||
TF_RETURN_IF_ERROR(
|
||||
this->GetFileSize(string(source_full_path.c_str()), &file_length));
|
||||
this->GetFileSize(string(source_full_path.c_str()), nullptr, &file_length));
|
||||
int num_parts;
|
||||
if (file_length <=
|
||||
multi_part_chunk_size_[Aws::Transfer::TransferDirection::UPLOAD]) {
|
||||
@ -1135,8 +1130,8 @@ Status S3FileSystem::CompleteMultiPartCopy(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::RenameFile(
|
||||
const string& src, const string& target /*, TransactionToken* token */) {
|
||||
Status S3FileSystem::RenameFile(const string& src, const string& target,
|
||||
TransactionToken* token) {
|
||||
VLOG(1) << "RenameFile from: " << src << " to: " << target;
|
||||
string src_bucket, src_object, target_bucket, target_object;
|
||||
TF_RETURN_IF_ERROR(ParseS3Path(src, false, &src_bucket, &src_object));
|
||||
|
||||
@ -50,66 +50,66 @@ class S3FileSystem : public FileSystem {
|
||||
~S3FileSystem();
|
||||
|
||||
Status NewRandomAccessFile(
|
||||
const string& fname,
|
||||
const string& fname, TransactionToken * token,
|
||||
std::unique_ptr<RandomAccessFile>*
|
||||
result /*, TransactionToken* token = nullptr */) override;
|
||||
result ) override;
|
||||
|
||||
Status NewRandomAccessFile(
|
||||
const string& fname, std::unique_ptr<RandomAccessFile>* result,
|
||||
bool use_multi_part_download /*, TransactionToken* token = nullptr */);
|
||||
const string& fname, TransactionToken * token,std::unique_ptr<RandomAccessFile>* result,
|
||||
bool use_multi_part_download );
|
||||
|
||||
Status NewWritableFile(
|
||||
const string& fname,
|
||||
const string& fname,TransactionToken * token,
|
||||
std::unique_ptr<WritableFile>*
|
||||
result /*, TransactionToken* token = nullptr */) override;
|
||||
result ) override;
|
||||
|
||||
Status NewAppendableFile(
|
||||
const string& fname,
|
||||
const string& fname,TransactionToken * token,
|
||||
std::unique_ptr<WritableFile>*
|
||||
result /*, TransactionToken* token = nullptr */) override;
|
||||
result ) override;
|
||||
|
||||
Status NewReadOnlyMemoryRegionFromFile(
|
||||
const string& fname,
|
||||
const string& fname,TransactionToken * token,
|
||||
std::unique_ptr<ReadOnlyMemoryRegion>*
|
||||
result /*, TransactionToken* token = nullptr */) override;
|
||||
result ) override;
|
||||
|
||||
Status FileExists(
|
||||
const string& fname /*, TransactionToken* token = nullptr */) override;
|
||||
const string& fname,TransactionToken * token ) override;
|
||||
|
||||
Status GetChildren(
|
||||
const string& dir,
|
||||
std::vector<string>* result /*, TransactionToken* token = nullptr */)
|
||||
const string& dir,TransactionToken * token,
|
||||
std::vector<string>* result )
|
||||
override;
|
||||
|
||||
Status Stat(
|
||||
const string& fname,
|
||||
FileStatistics* stat /*, TransactionToken* token = nullptr */) override;
|
||||
const string& fname,TransactionToken * token,
|
||||
FileStatistics* stat ) override;
|
||||
|
||||
Status GetMatchingPaths(
|
||||
const string& pattern,
|
||||
std::vector<string>* results /*, TransactionToken* token = nullptr */)
|
||||
const string& pattern,TransactionToken * token,
|
||||
std::vector<string>* results )
|
||||
override;
|
||||
|
||||
Status DeleteFile(
|
||||
const string& fname /*, TransactionToken* token = nullptr */) override;
|
||||
const string& fname,TransactionToken * token ) override;
|
||||
|
||||
Status CreateDir(
|
||||
const string& name /*, TransactionToken* token = nullptr */) override;
|
||||
const string& name, TransactionToken * token) override;
|
||||
|
||||
Status DeleteDir(
|
||||
const string& name /*, TransactionToken* token = nullptr */) override;
|
||||
const string& name,TransactionToken * token ) override;
|
||||
|
||||
Status GetFileSize(
|
||||
const string& fname,
|
||||
uint64* size /*, TransactionToken* token = nullptr */) override;
|
||||
const string& fname,TransactionToken * token,
|
||||
uint64* size ) override;
|
||||
|
||||
Status RenameFile(
|
||||
const string& src,
|
||||
const string& target /*, TransactionToken* token = nullptr */) override;
|
||||
const string& target,TransactionToken * token ) override;
|
||||
|
||||
Status HasAtomicMove(
|
||||
const string& path,
|
||||
bool* has_atomic_move /*, TransactionToken* token = nullptr */) override;
|
||||
bool* has_atomic_move ) override;
|
||||
|
||||
private:
|
||||
// Returns the member S3 client, initializing as-needed.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user