Support uploading only new data when a file in GCS Filesystem is Flush()'d multiple times.

This uses the GCS compose API to avoid reuploading the entire file.

Also add some Vmodules for profiling GCS write paths.

PiperOrigin-RevId: 317377298
Change-Id: I3f36fd684c44070331ba1d9e6689efd0f74bfc0e
This commit is contained in:
Michael Banfield 2020-06-19 14:11:21 -07:00 committed by TensorFlower Gardener
parent 94bf57d06c
commit f60f6f0c1f
3 changed files with 1065 additions and 711 deletions

View File

@ -14,14 +14,18 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/cloud/gcs_file_system.h"
#include <stdio.h>
#include <unistd.h>
#include <algorithm>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <vector>
#include "tensorflow/core/platform/strcat.h"
#ifdef _WIN32
#include <io.h> // for _mktemp
#endif
@ -128,6 +132,15 @@ constexpr char kAllowedBucketLocations[] = "GCS_ALLOWED_BUCKET_LOCATIONS";
// is running in and restricts to buckets in that region.
constexpr char kDetectZoneSentinelValue[] = "auto";
// How to upload new data when Flush() is called multiple times.
// By default the entire file is reuploaded.
constexpr char kAppendMode[] = "GCS_APPEND_MODE";
// If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
// temporary object and composed with the original object. This is disabled by
// default as the multiple API calls required add a risk of stranding temporary
// objects.
constexpr char kComposeAppend[] = "compose";
Status GetTmpFilename(string* filename) {
*filename = io::GetTempFilename("");
return Status::OK();
@ -379,15 +392,18 @@ class GcsWritableFile : public WritableFile {
GcsFileSystem* filesystem,
GcsFileSystem::TimeoutConfig* timeouts,
std::function<void()> file_cache_erase,
RetryConfig retry_config)
RetryConfig retry_config, bool compose_append)
: bucket_(bucket),
object_(object),
filesystem_(filesystem),
timeouts_(timeouts),
file_cache_erase_(std::move(file_cache_erase)),
sync_needed_(true),
retry_config_(retry_config) {
retry_config_(retry_config),
compose_append_(compose_append),
start_offset_(0) {
// TODO: to make it safer, outfile_ should be constructed from an FD
VLOG(3) << "GcsWritableFile: " << GetGcsPath();
if (GetTmpFilename(&tmp_content_filename_).ok()) {
outfile_.open(tmp_content_filename_,
std::ofstream::binary | std::ofstream::app);
@ -403,14 +419,18 @@ class GcsWritableFile : public WritableFile {
GcsFileSystem* filesystem, const string& tmp_content_filename,
GcsFileSystem::TimeoutConfig* timeouts,
std::function<void()> file_cache_erase,
RetryConfig retry_config)
RetryConfig retry_config, bool compose_append)
: bucket_(bucket),
object_(object),
filesystem_(filesystem),
timeouts_(timeouts),
file_cache_erase_(std::move(file_cache_erase)),
sync_needed_(true),
retry_config_(retry_config) {
retry_config_(retry_config),
compose_append_(compose_append),
start_offset_(0) {
VLOG(3) << "GcsWritableFile: " << GetGcsPath() << "with existing file "
<< tmp_content_filename;
tmp_content_filename_ = tmp_content_filename;
outfile_.open(tmp_content_filename_,
std::ofstream::binary | std::ofstream::app);
@ -423,6 +443,7 @@ class GcsWritableFile : public WritableFile {
Status Append(StringPiece data) override {
TF_RETURN_IF_ERROR(CheckWritable());
VLOG(3) << "Append: " << GetGcsPath() << " size " << data.length();
sync_needed_ = true;
outfile_ << data;
if (!outfile_.good()) {
@ -433,6 +454,7 @@ class GcsWritableFile : public WritableFile {
}
Status Close() override {
VLOG(3) << "Close:" << GetGcsPath();
if (outfile_.is_open()) {
Status sync_status = Sync();
if (sync_status.ok()) {
@ -443,18 +465,23 @@ class GcsWritableFile : public WritableFile {
return Status::OK();
}
Status Flush() override { return Sync(); }
Status Flush() override {
VLOG(3) << "Flush:" << GetGcsPath();
return Sync();
}
Status Name(StringPiece* result) const override {
return errors::Unimplemented("GCSWritableFile does not support Name()");
}
Status Sync() override {
VLOG(3) << "Sync started:" << GetGcsPath();
TF_RETURN_IF_ERROR(CheckWritable());
if (!sync_needed_) {
return Status::OK();
}
Status status = SyncImpl();
VLOG(3) << "Sync finished " << GetGcsPath();
if (status.ok()) {
sync_needed_ = false;
}
@ -483,11 +510,26 @@ class GcsWritableFile : public WritableFile {
"Could not write to the internal temporary file.");
}
string session_uri;
TF_RETURN_IF_ERROR(CreateNewUploadSession(&session_uri));
uint64 start_offset = 0;
string object_to_upload = object_;
bool should_compose = false;
if (compose_append_) {
start_offset = start_offset_;
// Only compose if the object has already been uploaded to GCS
should_compose = start_offset > 0;
if (should_compose) {
object_to_upload =
strings::StrCat(io::Dirname(object_), "/.tmpcompose/",
io::Basename(object_), ".", start_offset_);
}
}
TF_RETURN_IF_ERROR(
CreateNewUploadSession(&session_uri, start_offset, object_to_upload));
uint64 already_uploaded = 0;
bool first_attempt = true;
const Status upload_status = RetryingUtils::CallWithRetries(
[&first_attempt, &already_uploaded, &session_uri, this]() {
[&first_attempt, &already_uploaded, &session_uri, &start_offset,
this]() {
if (!first_attempt) {
bool completed;
TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
@ -502,7 +544,7 @@ class GcsWritableFile : public WritableFile {
}
}
first_attempt = false;
return UploadToSession(session_uri, already_uploaded);
return UploadToSession(session_uri, start_offset, already_uploaded);
},
retry_config_);
if (upload_status.code() == errors::Code::NOT_FOUND) {
@ -512,6 +554,12 @@ class GcsWritableFile : public WritableFile {
strings::StrCat("Upload to gs://", bucket_, "/", object_,
" failed, caused by: ", upload_status.ToString()));
}
if (upload_status.ok()) {
if (should_compose) {
TF_RETURN_IF_ERROR(AppendObject(object_to_upload));
}
TF_RETURN_IF_ERROR(GetCurrentFileSize(&start_offset_));
}
return upload_status;
}
@ -534,7 +582,8 @@ class GcsWritableFile : public WritableFile {
}
/// Initiates a new resumable upload session.
Status CreateNewUploadSession(string* session_uri) {
Status CreateNewUploadSession(string* session_uri, uint64 start_offset,
string object_to_upload) {
uint64 file_size;
TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
@ -542,10 +591,11 @@ class GcsWritableFile : public WritableFile {
std::unique_ptr<HttpRequest> request;
TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
request->SetUri(strings::StrCat(
kGcsUploadUriBase, "b/", bucket_,
"/o?uploadType=resumable&name=", request->EscapeString(object_)));
request->AddHeader("X-Upload-Content-Length", std::to_string(file_size));
request->SetUri(strings::StrCat(kGcsUploadUriBase, "b/", bucket_,
"/o?uploadType=resumable&name=",
request->EscapeString(object_to_upload)));
request->AddHeader("X-Upload-Content-Length",
std::to_string(file_size - start_offset));
request->SetPostEmptyBody();
request->SetResultBuffer(&output_buffer);
request->SetTimeouts(timeouts_->connect, timeouts_->idle,
@ -561,6 +611,37 @@ class GcsWritableFile : public WritableFile {
return Status::OK();
}
/// Appends the data of append_object to the original object and deletes
/// append_object.
Status AppendObject(string append_object) {
VLOG(3) << "AppendObject: " << GetGcsPathWithObject(append_object) << " to "
<< GetGcsPath();
std::unique_ptr<HttpRequest> request;
TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket_, "/o/",
request->EscapeString(object_),
"/compose"));
const string request_body =
strings::StrCat("{'sourceObjects': [{'name': '", object_,
"'},{'name': '", append_object, "'}]}");
request->SetTimeouts(timeouts_->connect, timeouts_->idle,
timeouts_->metadata);
request->AddHeader("content-type", "application/json");
request->SetPostFromBuffer(request_body.c_str(), request_body.size());
return RetryingUtils::CallWithRetries(
[&request, &append_object, this]() {
TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
" when composing to ", GetGcsPath());
TF_RETURN_WITH_CONTEXT_IF_ERROR(
filesystem_->DeleteFile(GetGcsPathWithObject(append_object)),
" when cleaning up.");
return Status::OK();
},
retry_config_);
}
/// \brief Requests status of a previously initiated upload session.
///
/// If the upload has already succeeded, sets 'completed' to true.
@ -628,7 +709,8 @@ class GcsWritableFile : public WritableFile {
return Status::OK();
}
Status UploadToSession(const string& session_uri, uint64 start_offset) {
Status UploadToSession(const string& session_uri, uint64 start_offset,
uint64 already_uploaded) {
uint64 file_size;
TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
@ -637,13 +719,14 @@ class GcsWritableFile : public WritableFile {
request->SetUri(session_uri);
if (file_size > 0) {
request->AddHeader("Content-Range",
strings::StrCat("bytes ", start_offset, "-",
file_size - 1, "/", file_size));
strings::StrCat("bytes ", already_uploaded, "-",
file_size - start_offset - 1, "/",
file_size - start_offset));
}
request->SetTimeouts(timeouts_->connect, timeouts_->idle, timeouts_->write);
TF_RETURN_IF_ERROR(
request->SetPutFromFile(tmp_content_filename_, start_offset));
TF_RETURN_IF_ERROR(request->SetPutFromFile(
tmp_content_filename_, start_offset + already_uploaded));
TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
GetGcsPath());
// Erase the file from the file cache on every successful write.
@ -651,9 +734,10 @@ class GcsWritableFile : public WritableFile {
return Status::OK();
}
string GetGcsPath() const {
return strings::StrCat("gs://", bucket_, "/", object_);
string GetGcsPathWithObject(string object) const {
return strings::StrCat("gs://", bucket_, "/", object);
}
string GetGcsPath() const { return GetGcsPathWithObject(object_); }
string bucket_;
string object_;
@ -664,6 +748,8 @@ class GcsWritableFile : public WritableFile {
std::function<void()> file_cache_erase_;
bool sync_needed_; // whether there is buffered data that needs to be synced
RetryConfig retry_config_;
bool compose_append_;
uint64 start_offset_;
};
class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
@ -849,6 +935,14 @@ GcsFileSystem::GcsFileSystem(bool make_default_cache) {
GetEnvVar(kAllowedBucketLocations, SplitByCommaToLowercaseSet,
&allowed_locations_);
StringPiece append_mode;
GetEnvVar(kAppendMode, StringPieceIdentity, &append_mode);
if (append_mode == kComposeAppend) {
compose_append_ = true;
} else {
compose_append_ = false;
}
}
GcsFileSystem::GcsFileSystem(
@ -859,7 +953,8 @@ GcsFileSystem::GcsFileSystem(
size_t stat_cache_max_entries, uint64 matching_paths_cache_max_age,
size_t matching_paths_cache_max_entries, RetryConfig retry_config,
TimeoutConfig timeouts, const std::unordered_set<string>& allowed_locations,
std::pair<const string, const string>* additional_header)
std::pair<const string, const string>* additional_header,
bool compose_append)
: auth_provider_(std::move(auth_provider)),
http_request_factory_(std::move(http_request_factory)),
zone_provider_(std::move(zone_provider)),
@ -872,6 +967,7 @@ GcsFileSystem::GcsFileSystem(
bucket_location_cache_(new BucketLocationCache(
kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
allowed_locations_(allowed_locations),
compose_append_(compose_append),
timeouts_(timeouts),
retry_config_(retry_config),
additional_header_(additional_header) {}
@ -1056,9 +1152,10 @@ Status GcsFileSystem::NewWritableFile(const string& fname,
std::unique_ptr<WritableFile>* result) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
result->reset(new GcsWritableFile(bucket, object, this, &timeouts_,
[this, fname]() { ClearFileCaches(fname); },
retry_config_));
result->reset(new GcsWritableFile(
bucket, object, this, &timeouts_,
[this, fname]() { ClearFileCaches(fname); }, retry_config_,
compose_append_));
return Status::OK();
}
@ -1098,7 +1195,8 @@ Status GcsFileSystem::NewAppendableFile(const string& fname,
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
result->reset(new GcsWritableFile(
bucket, object, this, old_content_filename, &timeouts_,
[this, fname]() { ClearFileCaches(fname); }, retry_config_));
[this, fname]() { ClearFileCaches(fname); }, retry_config_,
compose_append_));
return Status::OK();
}
@ -1629,6 +1727,7 @@ Status GcsFileSystem::RenameFile(const string& src, const string& target) {
// Uses a GCS API command to copy the object and then deletes the old one.
Status GcsFileSystem::RenameObject(const string& src, const string& target) {
VLOG(3) << "RenameObject: started gs://" << src << " to " << target;
string src_bucket, src_object, target_bucket, target_object;
TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
TF_RETURN_IF_ERROR(
@ -1664,6 +1763,7 @@ Status GcsFileSystem::RenameObject(const string& src, const string& target) {
"locations or storage classes is not supported.");
}
VLOG(3) << "RenameObject: finished from: gs://" << src << " to " << target;
// In case the delete API call failed, but the deletion actually happened
// on the server side, we can't just retry the whole RenameFile operation
// because the source object is already gone.

View File

@ -122,7 +122,8 @@ class GcsFileSystem : public FileSystem {
size_t matching_paths_cache_max_entries,
RetryConfig retry_config, TimeoutConfig timeouts,
const std::unordered_set<string>& allowed_locations,
std::pair<const string, const string>* additional_header);
std::pair<const string, const string>* additional_header,
bool compose_append);
Status NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result) override;
@ -187,6 +188,8 @@ class GcsFileSystem : public FileSystem {
std::unordered_set<string> allowed_locations() const {
return allowed_locations_;
}
bool compose_append() const { return compose_append_; }
string additional_header_name() const {
return additional_header_ ? additional_header_->first : "";
}
@ -373,6 +376,7 @@ class GcsFileSystem : public FileSystem {
using BucketLocationCache = ExpiringLRUCache<string>;
std::unique_ptr<BucketLocationCache> bucket_location_cache_;
std::unordered_set<string> allowed_locations_;
bool compose_append_;
TimeoutConfig timeouts_;

File diff suppressed because it is too large Load Diff