Updating CallWithRetries to do exponential backoff ()

This commit is contained in:
Balachander Ramachandran 2016-09-30 09:16:57 -07:00 committed by Jonathan Hseu
parent 7f282a03ea
commit d246219976
3 changed files with 101 additions and 60 deletions

View File

@ -16,14 +16,18 @@ limitations under the License.
#include "tensorflow/core/platform/cloud/retrying_file_system.h"
#include <functional>
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/random/random.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/file_system.h"
namespace tensorflow {
namespace {
// In case of failure, every call will be retried kMaxAttempts-1 times.
constexpr int kMaxAttempts = 4;
// In case of failure, every call will be retried kMaxRetries times.
constexpr int kMaxRetries = 3;
// Maximum backoff time in microseconds.
constexpr int64 kMaximumBackoffMicroseconds = 32000000;
bool IsRetriable(Status status) {
switch (status.code()) {
@ -37,55 +41,76 @@ bool IsRetriable(Status status) {
}
}
Status CallWithRetries(const std::function<Status()>& f) {
int attempts = 0;
void WaitBeforeRetry(const int64 delay_micros) {
const int64 random_micros = random::New64() % 1000000;
Env::Default()->SleepForMicroseconds(std::min(delay_micros + random_micros,
kMaximumBackoffMicroseconds));
}
Status CallWithRetries(const std::function<Status()>& f,
const int64 initial_delay_microseconds) {
int retries = 0;
while (true) {
attempts++;
auto status = f();
if (!IsRetriable(status) || attempts >= kMaxAttempts) {
if (!IsRetriable(status) || retries >= kMaxRetries) {
return status;
}
LOG(ERROR) << "The operation resulted in an error and will be retried: "
<< status.ToString();
const int64 delay_micros = initial_delay_microseconds << retries;
LOG(ERROR) << "The operation resulted in an error: " << status.ToString()
<< " will be retried after " << delay_micros << " microseconds";
WaitBeforeRetry(delay_micros);
retries++;
}
}
class RetryingRandomAccessFile : public RandomAccessFile {
public:
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file)
: base_file_(std::move(base_file)) {}
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
int64 delay_microseconds = 1000000)
: base_file_(std::move(base_file)),
initial_delay_microseconds_(delay_microseconds) {}
Status Read(uint64 offset, size_t n, StringPiece* result,
char* scratch) const override {
return CallWithRetries(std::bind(&RandomAccessFile::Read, base_file_.get(),
offset, n, result, scratch));
offset, n, result, scratch),
initial_delay_microseconds_);
}
private:
std::unique_ptr<RandomAccessFile> base_file_;
const int64 initial_delay_microseconds_;
};
class RetryingWritableFile : public WritableFile {
public:
RetryingWritableFile(std::unique_ptr<WritableFile> base_file)
: base_file_(std::move(base_file)) {}
RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
int64 delay_microseconds = 1000000)
: base_file_(std::move(base_file)),
initial_delay_microseconds_(delay_microseconds) {}
Status Append(const StringPiece& data) override {
return CallWithRetries(
std::bind(&WritableFile::Append, base_file_.get(), data));
std::bind(&WritableFile::Append, base_file_.get(), data),
initial_delay_microseconds_);
}
Status Close() override {
return CallWithRetries(std::bind(&WritableFile::Close, base_file_.get()));
return CallWithRetries(std::bind(&WritableFile::Close, base_file_.get()),
initial_delay_microseconds_);
}
Status Flush() override {
return CallWithRetries(std::bind(&WritableFile::Flush, base_file_.get()));
return CallWithRetries(std::bind(&WritableFile::Flush, base_file_.get()),
initial_delay_microseconds_);
}
Status Sync() override {
return CallWithRetries(std::bind(&WritableFile::Sync, base_file_.get()));
return CallWithRetries(std::bind(&WritableFile::Sync, base_file_.get()),
initial_delay_microseconds_);
}
private:
std::unique_ptr<WritableFile> base_file_;
const int64 initial_delay_microseconds_;
};
} // namespace
@ -95,7 +120,8 @@ Status RetryingFileSystem::NewRandomAccessFile(
std::unique_ptr<RandomAccessFile> base_file;
TF_RETURN_IF_ERROR(CallWithRetries(std::bind(&FileSystem::NewRandomAccessFile,
base_file_system_.get(),
filename, &base_file)));
filename, &base_file),
initial_delay_microseconds_));
result->reset(new RetryingRandomAccessFile(std::move(base_file)));
return Status::OK();
}
@ -105,7 +131,8 @@ Status RetryingFileSystem::NewWritableFile(
std::unique_ptr<WritableFile> base_file;
TF_RETURN_IF_ERROR(CallWithRetries(std::bind(&FileSystem::NewWritableFile,
base_file_system_.get(),
filename, &base_file)));
filename, &base_file),
initial_delay_microseconds_));
result->reset(new RetryingWritableFile(std::move(base_file)));
return Status::OK();
}
@ -115,7 +142,8 @@ Status RetryingFileSystem::NewAppendableFile(
std::unique_ptr<WritableFile> base_file;
TF_RETURN_IF_ERROR(CallWithRetries(std::bind(&FileSystem::NewAppendableFile,
base_file_system_.get(),
filename, &base_file)));
filename, &base_file),
initial_delay_microseconds_));
result->reset(new RetryingWritableFile(std::move(base_file)));
return Status::OK();
}
@ -123,7 +151,8 @@ Status RetryingFileSystem::NewAppendableFile(
Status RetryingFileSystem::NewReadOnlyMemoryRegionFromFile(
const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
return CallWithRetries(std::bind(&FileSystem::NewReadOnlyMemoryRegionFromFile,
base_file_system_.get(), filename, result));
base_file_system_.get(), filename, result),
initial_delay_microseconds_);
}
bool RetryingFileSystem::FileExists(const string& fname) {
@ -133,49 +162,58 @@ bool RetryingFileSystem::FileExists(const string& fname) {
Status RetryingFileSystem::Stat(const string& fname, FileStatistics* stat) {
return CallWithRetries(
std::bind(&FileSystem::Stat, base_file_system_.get(), fname, stat));
std::bind(&FileSystem::Stat, base_file_system_.get(), fname, stat),
initial_delay_microseconds_);
}
Status RetryingFileSystem::GetChildren(const string& dir,
std::vector<string>* result) {
return CallWithRetries(std::bind(&FileSystem::GetChildren,
base_file_system_.get(), dir, result));
base_file_system_.get(), dir, result),
initial_delay_microseconds_);
}
Status RetryingFileSystem::GetMatchingPaths(const string& pattern,
std::vector<string>* result) {
return CallWithRetries(std::bind(&FileSystem::GetMatchingPaths,
base_file_system_.get(), pattern, result));
base_file_system_.get(), pattern, result),
initial_delay_microseconds_);
}
Status RetryingFileSystem::DeleteFile(const string& fname) {
return CallWithRetries(
std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname));
std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname),
initial_delay_microseconds_);
}
Status RetryingFileSystem::CreateDir(const string& dirname) {
return CallWithRetries(
std::bind(&FileSystem::CreateDir, base_file_system_.get(), dirname));
std::bind(&FileSystem::CreateDir, base_file_system_.get(), dirname),
initial_delay_microseconds_);
}
Status RetryingFileSystem::DeleteDir(const string& dirname) {
return CallWithRetries(
std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname));
std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname),
initial_delay_microseconds_);
}
Status RetryingFileSystem::GetFileSize(const string& fname, uint64* file_size) {
return CallWithRetries(std::bind(&FileSystem::GetFileSize,
base_file_system_.get(), fname, file_size));
base_file_system_.get(), fname, file_size),
initial_delay_microseconds_);
}
Status RetryingFileSystem::RenameFile(const string& src, const string& target) {
return CallWithRetries(
std::bind(&FileSystem::RenameFile, base_file_system_.get(), src, target));
std::bind(&FileSystem::RenameFile, base_file_system_.get(), src, target),
initial_delay_microseconds_);
}
Status RetryingFileSystem::IsDirectory(const string& dirname) {
return CallWithRetries(
std::bind(&FileSystem::IsDirectory, base_file_system_.get(), dirname));
std::bind(&FileSystem::IsDirectory, base_file_system_.get(), dirname),
initial_delay_microseconds_);
}
} // namespace tensorflow

View File

@ -26,8 +26,10 @@ namespace tensorflow {
/// A wrapper to add retry logic to another file system.
class RetryingFileSystem : public FileSystem {
public:
RetryingFileSystem(std::unique_ptr<FileSystem> base_file_system)
: base_file_system_(std::move(base_file_system)) {}
RetryingFileSystem(std::unique_ptr<FileSystem> base_file_system,
int64 delay_microseconds = 1000000)
: base_file_system_(std::move(base_file_system)),
initial_delay_microseconds_(delay_microseconds) {}
Status NewRandomAccessFile(
const string& filename,
@ -66,6 +68,7 @@ class RetryingFileSystem : public FileSystem {
private:
std::unique_ptr<FileSystem> base_file_system_;
const int64 initial_delay_microseconds_;
TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
};

View File

@ -158,7 +158,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_ImmediateSuccess) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->random_access_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
@ -185,7 +185,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_SuccessWith3rdTry) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->random_access_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
@ -213,7 +213,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_AllRetriesFailed) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->random_access_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
@ -241,7 +241,7 @@ TEST(RetryingFileSystemTest, NewRandomAccessFile_NoRetriesForSomeErrors) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->random_access_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped random access file.
std::unique_ptr<RandomAccessFile> random_access_file;
@ -266,7 +266,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_ImmediateSuccess) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->writable_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped writable file.
std::unique_ptr<WritableFile> writable_file;
@ -291,7 +291,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_SuccessWith3rdTry) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->writable_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped writable file.
std::unique_ptr<WritableFile> writable_file;
@ -316,7 +316,7 @@ TEST(RetryingFileSystemTest, NewAppendableFile_SuccessWith3rdTry) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->writable_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped appendable file.
std::unique_ptr<WritableFile> writable_file;
@ -342,7 +342,7 @@ TEST(RetryingFileSystemTest, NewWritableFile_AllRetriesFailed) {
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
base_fs->writable_file_to_return = std::move(base_file);
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
// Retrieve the wrapped writable file.
std::unique_ptr<WritableFile> writable_file;
@ -360,7 +360,7 @@ TEST(RetryingFileSystemTest,
std::make_tuple("NewReadOnlyMemoryRegionFromFile", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::unique_ptr<ReadOnlyMemoryRegion> result;
TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile("filename.txt", &result));
@ -378,7 +378,7 @@ TEST(RetryingFileSystemTest, NewReadOnlyMemoryRegionFromFile_AllRetriesFailed) {
errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::unique_ptr<ReadOnlyMemoryRegion> result;
EXPECT_EQ("Last error",
@ -393,7 +393,7 @@ TEST(RetryingFileSystemTest, GetChildren_SuccessWith2ndTry) {
std::make_tuple("GetChildren", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
TF_EXPECT_OK(fs.GetChildren("gs://path", &result));
@ -409,7 +409,7 @@ TEST(RetryingFileSystemTest, GetChildren_AllRetriesFailed) {
std::make_tuple("GetChildren", errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
EXPECT_EQ("Last error", fs.GetChildren("gs://path", &result).error_message());
@ -422,7 +422,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_SuccessWith2ndTry) {
std::make_tuple("GetMatchingPaths", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://path/dir", &result));
@ -438,7 +438,7 @@ TEST(RetryingFileSystemTest, GetMatchingPaths_AllRetriesFailed) {
std::make_tuple("GetMatchingPaths", errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
EXPECT_EQ("Last error",
@ -451,7 +451,7 @@ TEST(RetryingFileSystemTest, DeleteFile_SuccessWith2ndTry) {
std::make_tuple("DeleteFile", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
TF_EXPECT_OK(fs.DeleteFile("gs://path/file.txt"));
@ -466,7 +466,7 @@ TEST(RetryingFileSystemTest, DeleteFile_AllRetriesFailed) {
std::make_tuple("DeleteFile", errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
EXPECT_EQ("Last error", fs.DeleteFile("gs://path/file.txt").error_message());
@ -478,7 +478,7 @@ TEST(RetryingFileSystemTest, CreateDir_SuccessWith2ndTry) {
std::make_tuple("CreateDir", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
TF_EXPECT_OK(fs.CreateDir("gs://path/newdir"));
@ -493,7 +493,7 @@ TEST(RetryingFileSystemTest, CreateDir_AllRetriesFailed) {
std::make_tuple("CreateDir", errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
EXPECT_EQ("Last error", fs.CreateDir("gs://path/newdir").error_message());
@ -505,7 +505,7 @@ TEST(RetryingFileSystemTest, DeleteDir_SuccessWith2ndTry) {
std::make_tuple("DeleteDir", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
TF_EXPECT_OK(fs.DeleteDir("gs://path/dir"));
@ -520,7 +520,7 @@ TEST(RetryingFileSystemTest, DeleteDir_AllRetriesFailed) {
std::make_tuple("DeleteDir", errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
std::vector<string> result;
EXPECT_EQ("Last error", fs.DeleteDir("gs://path/dir").error_message());
@ -533,7 +533,7 @@ TEST(RetryingFileSystemTest, GetFileSize_SuccessWith2ndTry) {
std::make_tuple("GetFileSize", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
uint64 size;
TF_EXPECT_OK(fs.GetFileSize("gs://path/file.txt", &size));
@ -549,7 +549,7 @@ TEST(RetryingFileSystemTest, GetFileSize_AllRetriesFailed) {
std::make_tuple("GetFileSize", errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
uint64 size;
EXPECT_EQ("Last error",
@ -562,7 +562,7 @@ TEST(RetryingFileSystemTest, RenameFile_SuccessWith2ndTry) {
std::make_tuple("RenameFile", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
TF_EXPECT_OK(fs.RenameFile("old_name", "new_name"));
}
@ -577,7 +577,7 @@ TEST(RetryingFileSystemTest, RenameFile_AllRetriesFailed) {
});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
EXPECT_EQ("Last error",
fs.RenameFile("old_name", "new_name").error_message());
@ -589,7 +589,7 @@ TEST(RetryingFileSystemTest, Stat_SuccessWith2ndTry) {
std::make_tuple("Stat", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("file_name", &stat));
@ -604,7 +604,7 @@ TEST(RetryingFileSystemTest, Stat_AllRetriesFailed) {
});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
FileStatistics stat;
EXPECT_EQ("Last error", fs.Stat("file_name", &stat).error_message());
@ -617,7 +617,7 @@ TEST(RetryingFileSystemTest, IsDirectory_SuccessWith2ndTry) {
std::make_tuple("IsDirectory", Status::OK())});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
TF_EXPECT_OK(fs.IsDirectory("gs://path/dir"));
}
@ -632,7 +632,7 @@ TEST(RetryingFileSystemTest, IsDirectory_AllRetriesFailed) {
std::make_tuple("IsDirectory", errors::Unavailable("Last error"))});
std::unique_ptr<MockFileSystem> base_fs(
new MockFileSystem(expected_fs_calls));
RetryingFileSystem fs(std::move(base_fs));
RetryingFileSystem fs(std::move(base_fs), 0);
EXPECT_EQ("Last error", fs.IsDirectory("gs://path/dir").error_message());
}