diff --git a/tensorflow/core/platform/cloud/retrying_file_system.cc b/tensorflow/core/platform/cloud/retrying_file_system.cc index 6dd6383faf5..2b6e0a30836 100644 --- a/tensorflow/core/platform/cloud/retrying_file_system.cc +++ b/tensorflow/core/platform/cloud/retrying_file_system.cc @@ -16,14 +16,18 @@ limitations under the License. #include "tensorflow/core/platform/cloud/retrying_file_system.h" #include #include "tensorflow/core/lib/core/errors.h" +#include "tensorflow/core/lib/random/random.h" +#include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/file_system.h" namespace tensorflow { namespace { -// In case of failure, every call will be retried kMaxAttempts-1 times. -constexpr int kMaxAttempts = 4; +// In case of failure, every call will be retried kMaxRetries times. +constexpr int kMaxRetries = 3; +// Maximum backoff time in microseconds. +constexpr int64 kMaximumBackoffMicroseconds = 32000000; bool IsRetriable(Status status) { switch (status.code()) { @@ -37,55 +41,76 @@ bool IsRetriable(Status status) { } } -Status CallWithRetries(const std::function& f) { - int attempts = 0; +void WaitBeforeRetry(const int64 delay_micros) { + const int64 random_micros = random::New64() % 1000000; + + Env::Default()->SleepForMicroseconds(std::min(delay_micros + random_micros, + kMaximumBackoffMicroseconds)); +} + +Status CallWithRetries(const std::function& f, + const int64 initial_delay_microseconds) { + int retries = 0; while (true) { - attempts++; auto status = f(); - if (!IsRetriable(status) || attempts >= kMaxAttempts) { + if (!IsRetriable(status) || retries >= kMaxRetries) { return status; } - LOG(ERROR) << "The operation resulted in an error and will be retried: " - << status.ToString(); + const int64 delay_micros = initial_delay_microseconds << retries; + LOG(ERROR) << "The operation resulted in an error: " << status.ToString() + << " will be retried after " << delay_micros << " microseconds"; + WaitBeforeRetry(delay_micros); + retries++; } } class RetryingRandomAccessFile : public RandomAccessFile { public: - RetryingRandomAccessFile(std::unique_ptr base_file) - : base_file_(std::move(base_file)) {} + RetryingRandomAccessFile(std::unique_ptr base_file, + int64 delay_microseconds = 1000000) + : base_file_(std::move(base_file)), + initial_delay_microseconds_(delay_microseconds) {} Status Read(uint64 offset, size_t n, StringPiece* result, char* scratch) const override { return CallWithRetries(std::bind(&RandomAccessFile::Read, base_file_.get(), - offset, n, result, scratch)); + offset, n, result, scratch), + initial_delay_microseconds_); } private: std::unique_ptr base_file_; + const int64 initial_delay_microseconds_; }; class RetryingWritableFile : public WritableFile { public: - RetryingWritableFile(std::unique_ptr base_file) - : base_file_(std::move(base_file)) {} + RetryingWritableFile(std::unique_ptr base_file, + int64 delay_microseconds = 1000000) + : base_file_(std::move(base_file)), + initial_delay_microseconds_(delay_microseconds) {} Status Append(const StringPiece& data) override { return CallWithRetries( - std::bind(&WritableFile::Append, base_file_.get(), data)); + std::bind(&WritableFile::Append, base_file_.get(), data), + initial_delay_microseconds_); } Status Close() override { - return CallWithRetries(std::bind(&WritableFile::Close, base_file_.get())); + return CallWithRetries(std::bind(&WritableFile::Close, base_file_.get()), + initial_delay_microseconds_); } Status Flush() override { - return CallWithRetries(std::bind(&WritableFile::Flush, base_file_.get())); + return CallWithRetries(std::bind(&WritableFile::Flush, base_file_.get()), + initial_delay_microseconds_); } Status Sync() override { - return CallWithRetries(std::bind(&WritableFile::Sync, base_file_.get())); + return CallWithRetries(std::bind(&WritableFile::Sync, base_file_.get()), + initial_delay_microseconds_); } private: std::unique_ptr base_file_; + const int64 initial_delay_microseconds_; }; } // namespace @@ -95,7 +120,8 @@ Status RetryingFileSystem::NewRandomAccessFile( std::unique_ptr base_file; TF_RETURN_IF_ERROR(CallWithRetries(std::bind(&FileSystem::NewRandomAccessFile, base_file_system_.get(), - filename, &base_file))); + filename, &base_file), + initial_delay_microseconds_)); result->reset(new RetryingRandomAccessFile(std::move(base_file))); return Status::OK(); } @@ -105,7 +131,8 @@ Status RetryingFileSystem::NewWritableFile( std::unique_ptr base_file; TF_RETURN_IF_ERROR(CallWithRetries(std::bind(&FileSystem::NewWritableFile, base_file_system_.get(), - filename, &base_file))); + filename, &base_file), + initial_delay_microseconds_)); result->reset(new RetryingWritableFile(std::move(base_file))); return Status::OK(); } @@ -115,7 +142,8 @@ Status RetryingFileSystem::NewAppendableFile( std::unique_ptr base_file; TF_RETURN_IF_ERROR(CallWithRetries(std::bind(&FileSystem::NewAppendableFile, base_file_system_.get(), - filename, &base_file))); + filename, &base_file), + initial_delay_microseconds_)); result->reset(new RetryingWritableFile(std::move(base_file))); return Status::OK(); } @@ -123,7 +151,8 @@ Status RetryingFileSystem::NewAppendableFile( Status RetryingFileSystem::NewReadOnlyMemoryRegionFromFile( const string& filename, std::unique_ptr* result) { return CallWithRetries(std::bind(&FileSystem::NewReadOnlyMemoryRegionFromFile, - base_file_system_.get(), filename, result)); + base_file_system_.get(), filename, result), + initial_delay_microseconds_); } bool RetryingFileSystem::FileExists(const string& fname) { @@ -133,49 +162,58 @@ bool RetryingFileSystem::FileExists(const string& fname) { Status RetryingFileSystem::Stat(const string& fname, FileStatistics* stat) { return CallWithRetries( - std::bind(&FileSystem::Stat, base_file_system_.get(), fname, stat)); + std::bind(&FileSystem::Stat, base_file_system_.get(), fname, stat), + initial_delay_microseconds_); } Status RetryingFileSystem::GetChildren(const string& dir, std::vector* result) { return CallWithRetries(std::bind(&FileSystem::GetChildren, - base_file_system_.get(), dir, result)); + base_file_system_.get(), dir, result), + initial_delay_microseconds_); } Status RetryingFileSystem::GetMatchingPaths(const string& pattern, std::vector* result) { return CallWithRetries(std::bind(&FileSystem::GetMatchingPaths, - base_file_system_.get(), pattern, result)); + base_file_system_.get(), pattern, result), + initial_delay_microseconds_); } Status RetryingFileSystem::DeleteFile(const string& fname) { return CallWithRetries( - std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname)); + std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname), + initial_delay_microseconds_); } Status RetryingFileSystem::CreateDir(const string& dirname) { return CallWithRetries( - std::bind(&FileSystem::CreateDir, base_file_system_.get(), dirname)); + std::bind(&FileSystem::CreateDir, base_file_system_.get(), dirname), + initial_delay_microseconds_); } Status RetryingFileSystem::DeleteDir(const string& dirname) { return CallWithRetries( - std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname)); + std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname), + initial_delay_microseconds_); } Status RetryingFileSystem::GetFileSize(const string& fname, uint64* file_size) { return CallWithRetries(std::bind(&FileSystem::GetFileSize, - base_file_system_.get(), fname, file_size)); + base_file_system_.get(), fname, file_size), + initial_delay_microseconds_); } Status RetryingFileSystem::RenameFile(const string& src, const string& target) { return CallWithRetries( - std::bind(&FileSystem::RenameFile, base_file_system_.get(), src, target)); + std::bind(&FileSystem::RenameFile, base_file_system_.get(), src, target), + initial_delay_microseconds_); } Status RetryingFileSystem::IsDirectory(const string& dirname) { return CallWithRetries( - std::bind(&FileSystem::IsDirectory, base_file_system_.get(), dirname)); + std::bind(&FileSystem::IsDirectory, base_file_system_.get(), dirname), + initial_delay_microseconds_); } } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/retrying_file_system.h b/tensorflow/core/platform/cloud/retrying_file_system.h index d021ed2821b..790ea61bd6e 100644 --- a/tensorflow/core/platform/cloud/retrying_file_system.h +++ b/tensorflow/core/platform/cloud/retrying_file_system.h @@ -26,8 +26,10 @@ namespace tensorflow { /// A wrapper to add retry logic to another file system. class RetryingFileSystem : public FileSystem { public: - RetryingFileSystem(std::unique_ptr base_file_system) - : base_file_system_(std::move(base_file_system)) {} + RetryingFileSystem(std::unique_ptr base_file_system, + int64 delay_microseconds = 1000000) + : base_file_system_(std::move(base_file_system)), + initial_delay_microseconds_(delay_microseconds) {} Status NewRandomAccessFile( const string& filename, @@ -66,6 +68,7 @@ class RetryingFileSystem : public FileSystem { private: std::unique_ptr base_file_system_; + const int64 initial_delay_microseconds_; TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem); }; diff --git a/tensorflow/core/platform/cloud/retrying_file_system_test.cc b/tensorflow/core/platform/cloud/retrying_file_system_test.cc index cc50fd72a07..9ec1105aa81 100644 --- a/tensorflow/core/platform/cloud/retrying_file_system_test.cc +++ b/tensorflow/core/platform/cloud/retrying_file_system_test.cc @@ -158,7 +158,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_ImmediateSuccess) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->random_access_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped random access file. std::unique_ptr random_access_file; @@ -185,7 +185,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_SuccessWith3rdTry) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->random_access_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped random access file. std::unique_ptr random_access_file; @@ -213,7 +213,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_AllRetriesFailed) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->random_access_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped random access file. std::unique_ptr random_access_file; @@ -241,7 +241,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_NoRetriesForSomeErrors) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->random_access_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped random access file. std::unique_ptr random_access_file; @@ -266,7 +266,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_ImmediateSuccess) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->writable_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped writable file. std::unique_ptr writable_file; @@ -291,7 +291,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_SuccessWith3rdTry) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->writable_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped writable file. std::unique_ptr writable_file; @@ -316,7 +316,7 @@ TEST(RetryingFileSystemTest, NewAppendableFile_SuccessWith3rdTry) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->writable_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped appendable file. std::unique_ptr writable_file; @@ -342,7 +342,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_AllRetriesFailed) { std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); base_fs->writable_file_to_return = std::move(base_file); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); // Retrieve the wrapped writable file. std::unique_ptr writable_file; @@ -360,7 +360,7 @@ TEST(RetryingFileSystemTest, std::make_tuple("NewReadOnlyMemoryRegionFromFile", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::unique_ptr result; TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile("filename.txt", &result)); @@ -378,7 +378,7 @@ TEST(RetryingFileSystemTest, NewReadOnlyMemoryRegionFromFile_AllRetriesFailed) { errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::unique_ptr result; EXPECT_EQ("Last error", @@ -393,7 +393,7 @@ TEST(RetryingFileSystemTest, GetChildren_SuccessWith2ndTry) { std::make_tuple("GetChildren", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; TF_EXPECT_OK(fs.GetChildren("gs://path", &result)); @@ -409,7 +409,7 @@ TEST(RetryingFileSystemTest, GetChildren_AllRetriesFailed) { std::make_tuple("GetChildren", errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; EXPECT_EQ("Last error", fs.GetChildren("gs://path", &result).error_message()); @@ -422,7 +422,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_SuccessWith2ndTry) { std::make_tuple("GetMatchingPaths", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://path/dir", &result)); @@ -438,7 +438,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_AllRetriesFailed) { std::make_tuple("GetMatchingPaths", errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; EXPECT_EQ("Last error", @@ -451,7 +451,7 @@ TEST(RetryingFileSystemTest, DeleteFile_SuccessWith2ndTry) { std::make_tuple("DeleteFile", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; TF_EXPECT_OK(fs.DeleteFile("gs://path/file.txt")); @@ -466,7 +466,7 @@ TEST(RetryingFileSystemTest, DeleteFile_AllRetriesFailed) { std::make_tuple("DeleteFile", errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; EXPECT_EQ("Last error", fs.DeleteFile("gs://path/file.txt").error_message()); @@ -478,7 +478,7 @@ TEST(RetryingFileSystemTest, CreateDir_SuccessWith2ndTry) { std::make_tuple("CreateDir", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; TF_EXPECT_OK(fs.CreateDir("gs://path/newdir")); @@ -493,7 +493,7 @@ TEST(RetryingFileSystemTest, CreateDir_AllRetriesFailed) { std::make_tuple("CreateDir", errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; EXPECT_EQ("Last error", fs.CreateDir("gs://path/newdir").error_message()); @@ -505,7 +505,7 @@ TEST(RetryingFileSystemTest, DeleteDir_SuccessWith2ndTry) { std::make_tuple("DeleteDir", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; TF_EXPECT_OK(fs.DeleteDir("gs://path/dir")); @@ -520,7 +520,7 @@ TEST(RetryingFileSystemTest, DeleteDir_AllRetriesFailed) { std::make_tuple("DeleteDir", errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); std::vector result; EXPECT_EQ("Last error", fs.DeleteDir("gs://path/dir").error_message()); @@ -533,7 +533,7 @@ TEST(RetryingFileSystemTest, GetFileSize_SuccessWith2ndTry) { std::make_tuple("GetFileSize", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); uint64 size; TF_EXPECT_OK(fs.GetFileSize("gs://path/file.txt", &size)); @@ -549,7 +549,7 @@ TEST(RetryingFileSystemTest, GetFileSize_AllRetriesFailed) { std::make_tuple("GetFileSize", errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); uint64 size; EXPECT_EQ("Last error", @@ -562,7 +562,7 @@ TEST(RetryingFileSystemTest, RenameFile_SuccessWith2ndTry) { std::make_tuple("RenameFile", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); TF_EXPECT_OK(fs.RenameFile("old_name", "new_name")); } @@ -577,7 +577,7 @@ TEST(RetryingFileSystemTest, RenameFile_AllRetriesFailed) { }); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); EXPECT_EQ("Last error", fs.RenameFile("old_name", "new_name").error_message()); @@ -589,7 +589,7 @@ TEST(RetryingFileSystemTest, Stat_SuccessWith2ndTry) { std::make_tuple("Stat", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); FileStatistics stat; TF_EXPECT_OK(fs.Stat("file_name", &stat)); @@ -604,7 +604,7 @@ TEST(RetryingFileSystemTest, Stat_AllRetriesFailed) { }); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); FileStatistics stat; EXPECT_EQ("Last error", fs.Stat("file_name", &stat).error_message()); @@ -617,7 +617,7 @@ TEST(RetryingFileSystemTest, IsDirectory_SuccessWith2ndTry) { std::make_tuple("IsDirectory", Status::OK())}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); TF_EXPECT_OK(fs.IsDirectory("gs://path/dir")); } @@ -632,7 +632,7 @@ TEST(RetryingFileSystemTest, IsDirectory_AllRetriesFailed) { std::make_tuple("IsDirectory", errors::Unavailable("Last error"))}); std::unique_ptr base_fs( new MockFileSystem(expected_fs_calls)); - RetryingFileSystem fs(std::move(base_fs)); + RetryingFileSystem fs(std::move(base_fs), 0); EXPECT_EQ("Last error", fs.IsDirectory("gs://path/dir").error_message()); }