diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index 31b0c790f50..f0d2138b379 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -389,7 +389,7 @@ class BufferedGcsRandomAccessFile : public RandomAccessFile { typedef std::function + UploadSessionHandle* session_handle)> SessionCreator; // Function object declaration with params needed to upload objects. @@ -542,7 +542,7 @@ class GcsWritableFile : public WritableFile { return errors::Internal( "Could not write to the internal temporary file."); } - string session_uri; + UploadSessionHandle session_handle; uint64 start_offset = 0; string object_to_upload = object_; bool should_compose = false; @@ -556,17 +556,21 @@ class GcsWritableFile : public WritableFile { io::Basename(object_), ".", start_offset_); } } - TF_RETURN_IF_ERROR( - CreateNewUploadSession(start_offset, object_to_upload, &session_uri)); + TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload, + &session_handle)); uint64 already_uploaded = 0; bool first_attempt = true; const Status upload_status = RetryingUtils::CallWithRetries( - [&first_attempt, &already_uploaded, &session_uri, &start_offset, + [&first_attempt, &already_uploaded, &session_handle, &start_offset, this]() { - if (!first_attempt) { + if (session_handle.resumable && !first_attempt) { bool completed; TF_RETURN_IF_ERROR(RequestUploadSessionStatus( - session_uri, &completed, &already_uploaded)); + session_handle.session_uri, &completed, &already_uploaded)); + LOG(INFO) << "### RequestUploadSessionStatus: completed = " + << completed + << ", already_uploaded = " << already_uploaded + << ", file = " << GetGcsPath(); if (completed) { // Erase the file from the file cache on every successful write. file_cache_erase_(); @@ -577,7 +581,8 @@ class GcsWritableFile : public WritableFile { } } first_attempt = false; - return UploadToSession(session_uri, start_offset, already_uploaded); + return UploadToSession(session_handle.session_uri, start_offset, + already_uploaded); }, retry_config_); if (upload_status.code() == errors::Code::NOT_FOUND) { @@ -617,11 +622,11 @@ class GcsWritableFile : public WritableFile { /// Initiates a new resumable upload session. Status CreateNewUploadSession(uint64 start_offset, std::string object_to_upload, - std::string* session_uri) { + UploadSessionHandle* session_handle) { uint64 file_size; TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size)); return session_creator_(start_offset, object_to_upload, bucket_, file_size, - GetGcsPath(), session_uri); + GetGcsPath(), session_handle); } /// Appends the data of append_object to the original object and deletes @@ -913,6 +918,7 @@ GcsFileSystem::GcsFileSystem( std::pair* additional_header, bool compose_append) : timeouts_(timeouts), + retry_config_(retry_config), auth_provider_(std::move(auth_provider)), http_request_factory_(std::move(http_request_factory)), zone_provider_(std::move(zone_provider)), @@ -926,7 +932,6 @@ GcsFileSystem::GcsFileSystem( kCacheNeverExpire, kBucketLocationCacheMaxEntries)), allowed_locations_(allowed_locations), compose_append_(compose_append), - retry_config_(retry_config), additional_header_(additional_header) {} Status GcsFileSystem::NewRandomAccessFile( @@ -1080,7 +1085,7 @@ Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset, Status GcsFileSystem::CreateNewUploadSession( uint64 start_offset, const std::string& object_to_upload, const std::string& bucket, uint64 file_size, const std::string& gcs_path, - std::string* session_uri) { + UploadSessionHandle* session_handle) { std::vector output_buffer; std::unique_ptr request; TF_RETURN_IF_ERROR(CreateHttpRequest(&request)); @@ -1096,9 +1101,10 @@ Status GcsFileSystem::CreateNewUploadSession( request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata); TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when initiating an upload to ", gcs_path); - if (session_uri != nullptr) { - *session_uri = request->GetResponseHeader("Location"); - if (session_uri->empty()) { + if (session_handle != nullptr) { + session_handle->resumable = true; + session_handle->session_uri = request->GetResponseHeader("Location"); + if (session_handle->session_uri.empty()) { return errors::Internal("Unexpected response from GCS when writing to ", gcs_path, ": 'Location' header not returned."); } @@ -1241,9 +1247,9 @@ Status GcsFileSystem::NewWritableFile(const string& fname, auto session_creator = [this](uint64 start_offset, const std::string& object_to_upload, const std::string& bucket, uint64 file_size, - const std::string& gcs_path, std::string* session_uri) { + const std::string& gcs_path, UploadSessionHandle* session_handle) { return CreateNewUploadSession(start_offset, object_to_upload, bucket, - file_size, gcs_path, session_uri); + file_size, gcs_path, session_handle); }; auto object_uploader = [this](const std::string& session_uri, uint64 start_offset, @@ -1301,9 +1307,9 @@ Status GcsFileSystem::NewAppendableFile(const string& fname, auto session_creator = [this](uint64 start_offset, const std::string& object_to_upload, const std::string& bucket, uint64 file_size, - const std::string& gcs_path, std::string* session_uri) { + const std::string& gcs_path, UploadSessionHandle* session_handle) { return CreateNewUploadSession(start_offset, object_to_upload, bucket, - file_size, gcs_path, session_uri); + file_size, gcs_path, session_handle); }; auto object_uploader = [this](const std::string& session_uri, uint64 start_offset, diff --git a/tensorflow/core/platform/cloud/gcs_file_system.h b/tensorflow/core/platform/cloud/gcs_file_system.h index 203c501ff4c..eceb76970fb 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.h +++ b/tensorflow/core/platform/cloud/gcs_file_system.h @@ -101,6 +101,11 @@ class GcsStatsInterface { virtual ~GcsStatsInterface() = default; }; +struct UploadSessionHandle { + std::string session_uri; + bool resumable; +}; + /// Google Cloud Storage implementation of a file system. /// /// The clients should use RetryingGcsFileSystem defined below, @@ -281,7 +286,7 @@ class GcsFileSystem : public FileSystem { const std::string& bucket, uint64 file_size, const std::string& gcs_path, - std::string* session_uri); + UploadSessionHandle* session_handle); // Uploads object data to session. virtual Status UploadToSession(const std::string& session_uri, @@ -318,6 +323,9 @@ class GcsFileSystem : public FileSystem { // Used by a subclass. TimeoutConfig timeouts_; + /// The retry configuration used for retrying failed calls. + RetryConfig retry_config_; + private: // GCS file statistics. struct GcsFileStat { @@ -416,9 +424,6 @@ class GcsFileSystem : public FileSystem { GcsStatsInterface* stats_ = nullptr; // Not owned. - /// The initial delay for exponential backoffs when retrying failed calls. - RetryConfig retry_config_; - // Additional header material to be transmitted with all GCS requests std::unique_ptr> additional_header_;