Let CreateNewUploadSession support non-resumable writes by returning whether a resumable session was created via the out param "bool* resumable".

PiperOrigin-RevId: 325886630
Change-Id: I97d81cb636d75c8593370676e052f40ab10d9a0c
This commit is contained in:
A. Unique TensorFlower 2020-08-10 14:22:32 -07:00 committed by TensorFlower Gardener
parent 45d4e5624b
commit 642360f24e
2 changed files with 34 additions and 23 deletions

View File

@ -389,7 +389,7 @@ class BufferedGcsRandomAccessFile : public RandomAccessFile {
typedef std::function<Status(
uint64 start_offset, const std::string& object_to_upload,
const std::string& bucket, uint64 file_size, const std::string& gcs_path,
std::string* session_uri)>
UploadSessionHandle* session_handle)>
SessionCreator;
// Function object declaration with params needed to upload objects.
@ -542,7 +542,7 @@ class GcsWritableFile : public WritableFile {
return errors::Internal(
"Could not write to the internal temporary file.");
}
string session_uri;
UploadSessionHandle session_handle;
uint64 start_offset = 0;
string object_to_upload = object_;
bool should_compose = false;
@ -556,17 +556,21 @@ class GcsWritableFile : public WritableFile {
io::Basename(object_), ".", start_offset_);
}
}
TF_RETURN_IF_ERROR(
CreateNewUploadSession(start_offset, object_to_upload, &session_uri));
TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload,
&session_handle));
uint64 already_uploaded = 0;
bool first_attempt = true;
const Status upload_status = RetryingUtils::CallWithRetries(
[&first_attempt, &already_uploaded, &session_uri, &start_offset,
[&first_attempt, &already_uploaded, &session_handle, &start_offset,
this]() {
if (!first_attempt) {
if (session_handle.resumable && !first_attempt) {
bool completed;
TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
session_uri, &completed, &already_uploaded));
session_handle.session_uri, &completed, &already_uploaded));
LOG(INFO) << "### RequestUploadSessionStatus: completed = "
<< completed
<< ", already_uploaded = " << already_uploaded
<< ", file = " << GetGcsPath();
if (completed) {
// Erase the file from the file cache on every successful write.
file_cache_erase_();
@ -577,7 +581,8 @@ class GcsWritableFile : public WritableFile {
}
}
first_attempt = false;
return UploadToSession(session_uri, start_offset, already_uploaded);
return UploadToSession(session_handle.session_uri, start_offset,
already_uploaded);
},
retry_config_);
if (upload_status.code() == errors::Code::NOT_FOUND) {
@ -617,11 +622,11 @@ class GcsWritableFile : public WritableFile {
/// Initiates a new resumable upload session.
Status CreateNewUploadSession(uint64 start_offset,
std::string object_to_upload,
std::string* session_uri) {
UploadSessionHandle* session_handle) {
uint64 file_size;
TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
return session_creator_(start_offset, object_to_upload, bucket_, file_size,
GetGcsPath(), session_uri);
GetGcsPath(), session_handle);
}
/// Appends the data of append_object to the original object and deletes
@ -913,6 +918,7 @@ GcsFileSystem::GcsFileSystem(
std::pair<const string, const string>* additional_header,
bool compose_append)
: timeouts_(timeouts),
retry_config_(retry_config),
auth_provider_(std::move(auth_provider)),
http_request_factory_(std::move(http_request_factory)),
zone_provider_(std::move(zone_provider)),
@ -926,7 +932,6 @@ GcsFileSystem::GcsFileSystem(
kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
allowed_locations_(allowed_locations),
compose_append_(compose_append),
retry_config_(retry_config),
additional_header_(additional_header) {}
Status GcsFileSystem::NewRandomAccessFile(
@ -1080,7 +1085,7 @@ Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset,
Status GcsFileSystem::CreateNewUploadSession(
uint64 start_offset, const std::string& object_to_upload,
const std::string& bucket, uint64 file_size, const std::string& gcs_path,
std::string* session_uri) {
UploadSessionHandle* session_handle) {
std::vector<char> output_buffer;
std::unique_ptr<HttpRequest> request;
TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
@ -1096,9 +1101,10 @@ Status GcsFileSystem::CreateNewUploadSession(
request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
" when initiating an upload to ", gcs_path);
if (session_uri != nullptr) {
*session_uri = request->GetResponseHeader("Location");
if (session_uri->empty()) {
if (session_handle != nullptr) {
session_handle->resumable = true;
session_handle->session_uri = request->GetResponseHeader("Location");
if (session_handle->session_uri.empty()) {
return errors::Internal("Unexpected response from GCS when writing to ",
gcs_path, ": 'Location' header not returned.");
}
@ -1241,9 +1247,9 @@ Status GcsFileSystem::NewWritableFile(const string& fname,
auto session_creator =
[this](uint64 start_offset, const std::string& object_to_upload,
const std::string& bucket, uint64 file_size,
const std::string& gcs_path, std::string* session_uri) {
const std::string& gcs_path, UploadSessionHandle* session_handle) {
return CreateNewUploadSession(start_offset, object_to_upload, bucket,
file_size, gcs_path, session_uri);
file_size, gcs_path, session_handle);
};
auto object_uploader =
[this](const std::string& session_uri, uint64 start_offset,
@ -1301,9 +1307,9 @@ Status GcsFileSystem::NewAppendableFile(const string& fname,
auto session_creator =
[this](uint64 start_offset, const std::string& object_to_upload,
const std::string& bucket, uint64 file_size,
const std::string& gcs_path, std::string* session_uri) {
const std::string& gcs_path, UploadSessionHandle* session_handle) {
return CreateNewUploadSession(start_offset, object_to_upload, bucket,
file_size, gcs_path, session_uri);
file_size, gcs_path, session_handle);
};
auto object_uploader =
[this](const std::string& session_uri, uint64 start_offset,

View File

@ -101,6 +101,11 @@ class GcsStatsInterface {
virtual ~GcsStatsInterface() = default;
};
struct UploadSessionHandle {
std::string session_uri;
bool resumable;
};
/// Google Cloud Storage implementation of a file system.
///
/// The clients should use RetryingGcsFileSystem defined below,
@ -281,7 +286,7 @@ class GcsFileSystem : public FileSystem {
const std::string& bucket,
uint64 file_size,
const std::string& gcs_path,
std::string* session_uri);
UploadSessionHandle* session_handle);
// Uploads object data to session.
virtual Status UploadToSession(const std::string& session_uri,
@ -318,6 +323,9 @@ class GcsFileSystem : public FileSystem {
// Used by a subclass.
TimeoutConfig timeouts_;
/// The retry configuration used for retrying failed calls.
RetryConfig retry_config_;
private:
// GCS file statistics.
struct GcsFileStat {
@ -416,9 +424,6 @@ class GcsFileSystem : public FileSystem {
GcsStatsInterface* stats_ = nullptr; // Not owned.
/// The initial delay for exponential backoffs when retrying failed calls.
RetryConfig retry_config_;
// Additional header material to be transmitted with all GCS requests
std::unique_ptr<std::pair<const string, const string>> additional_header_;