From 3313b1d724670ed25e3ac3d360e5876fa2a50fb2 Mon Sep 17 00:00:00 2001 From: Vo Van Nghia Date: Thu, 16 Jul 2020 11:53:25 +0700 Subject: [PATCH 1/3] Add tf_new_writable_file --- .../filesystem/plugins/s3/s3_filesystem.cc | 100 +++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc b/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc index 7714b36c015..9bff8070427 100644 --- a/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc +++ b/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc @@ -15,6 +15,7 @@ limitations under the License. #include "tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.h" #include +#include #include #include #include @@ -39,6 +40,7 @@ constexpr int kExecutorPoolSize = 25; constexpr uint64_t kS3MultiPartUploadChunkSize = 50 * 1024 * 1024; // 50 MB constexpr uint64_t kS3MultiPartDownloadChunkSize = 50 * 1024 * 1024; // 50 MB constexpr size_t kDownloadRetries = 3; +constexpr size_t kUploadRetries = 3; static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } static void plugin_memory_free(void* ptr) { free(ptr); } @@ -331,8 +333,104 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n, // SECTION 2. Implementation for `TF_WritableFile` // ---------------------------------------------------------------------------- namespace tf_writable_file { +typedef struct S3File { + Aws::String bucket; + Aws::String object; + std::shared_ptr s3_client; + std::shared_ptr transfer_manager; + bool sync_needed; + std::shared_ptr outfile; + S3File(Aws::String bucket, Aws::String object, + std::shared_ptr s3_client, + std::shared_ptr transfer_manager) + : bucket(bucket), + object(object), + s3_client(s3_client), + transfer_manager(transfer_manager), + outfile(Aws::MakeShared( + kS3FileSystemAllocationTag, nullptr, "_s3_filesystem_XXXXXX", + std::ios_base::binary | std::ios_base::trunc | std::ios_base::in | + std::ios_base::out)) {} +} S3File; -// TODO(vnvo2409): Implement later +void Cleanup(TF_WritableFile* file) { + auto s3_file = static_cast(file->plugin_file); + delete s3_file; +} + +void Append(const TF_WritableFile* file, const char* buffer, size_t n, + TF_Status* status) { + auto s3_file = static_cast(file->plugin_file); + if (!s3_file->outfile) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "The internal temporary file is not writable."); + return; + } + s3_file->sync_needed = true; + s3_file->outfile->write(buffer, n); + if (!s3_file->outfile->good()) + TF_SetStatus(status, TF_INTERNAL, + "Could not append to the internal temporary file."); + else + TF_SetStatus(status, TF_OK, ""); +} + +int64_t Tell(const TF_WritableFile* file, TF_Status* status) { + auto s3_file = static_cast(file->plugin_file); + auto position = static_cast(s3_file->outfile->tellp()); + if (position == -1) + TF_SetStatus(status, TF_INTERNAL, + "tellp on the internal temporary file failed"); + else + TF_SetStatus(status, TF_OK, ""); + return position; +} + +void Sync(const TF_WritableFile* file, TF_Status* status) { + auto s3_file = static_cast(file->plugin_file); + if (!s3_file->outfile) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "The internal temporary file is not writable."); + return; + } + if (!s3_file->sync_needed) { + TF_SetStatus(status, TF_OK, ""); + return; + } + auto position = static_cast(s3_file->outfile->tellp()); + auto handle = s3_file->transfer_manager->UploadFile( + s3_file->outfile, s3_file->bucket, s3_file->object, + "application/octet-stream", Aws::Map()); + handle->WaitUntilFinished(); + + size_t retries = 0; + while (handle->GetStatus() == Aws::Transfer::TransferStatus::FAILED && + retries++ < kUploadRetries) { + // if multipart upload was used, only the failed parts will be re-sent + s3_file->transfer_manager->RetryUpload(s3_file->outfile, handle); + handle->WaitUntilFinished(); + } + if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) + return TF_SetStatusFromAWSError(handle->GetLastError(), status); + s3_file->outfile->clear(); + s3_file->outfile->seekp(position); + s3_file->sync_needed = false; + TF_SetStatus(status, TF_OK, ""); +} + +void Flush(const TF_WritableFile* file, TF_Status* status) { + Sync(file, status); +} + +void Close(const TF_WritableFile* file, TF_Status* status) { + auto s3_file = static_cast(file->plugin_file); + if (s3_file->outfile) { + Sync(file, status); + if (TF_GetCode(status) != TF_OK) return; + s3_file->outfile.reset(); + } + TF_SetStatus(status, TF_OK, ""); +} } // namespace tf_writable_file From e4aca25a2664d4a50b03bcb16880b0ba37b7987d Mon Sep 17 00:00:00 2001 From: Vo Van Nghia Date: Thu, 16 Jul 2020 12:23:01 +0700 Subject: [PATCH 2/3] Add New Writable File --- .../filesystem/plugins/s3/s3_filesystem.cc | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc b/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc index 9bff8070427..ff272cae4de 100644 --- a/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc +++ b/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc @@ -495,6 +495,21 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path, TF_SetStatus(status, TF_OK, ""); } +void NewWritableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { + Aws::String bucket, object; + ParseS3Path(path, false, &bucket, &object, status); + if (TF_GetCode(status) != TF_OK) return; + + auto s3_file = static_cast(filesystem->plugin_filesystem); + GetS3Client(s3_file); + GetTransferManager(Aws::Transfer::TransferDirection::UPLOAD, s3_file); + file->plugin_file = new tf_writable_file::S3File( + bucket, object, s3_file->s3_client, + s3_file->transfer_managers[Aws::Transfer::TransferDirection::UPLOAD]); + TF_SetStatus(status, TF_OK, ""); +} + // TODO(vnvo2409): Implement later } // namespace tf_s3_filesystem From 126460334278781b5990d43eeb5448807cbdb150 Mon Sep 17 00:00:00 2001 From: Vo Van Nghia Date: Thu, 16 Jul 2020 18:05:41 +0700 Subject: [PATCH 3/3] add NewAppendableFile --- .../filesystem/plugins/s3/s3_filesystem.cc | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc b/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc index ff272cae4de..318a64b720a 100644 --- a/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc +++ b/tensorflow/c/experimental/filesystem/plugins/s3/s3_filesystem.cc @@ -42,6 +42,8 @@ constexpr uint64_t kS3MultiPartDownloadChunkSize = 50 * 1024 * 1024; // 50 MB constexpr size_t kDownloadRetries = 3; constexpr size_t kUploadRetries = 3; +constexpr size_t kS3ReadAppendableFileBufferSize = 1024 * 1024; // 1 MB + static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } static void plugin_memory_free(void* ptr) { free(ptr); } @@ -56,6 +58,9 @@ static inline void TF_SetStatusFromAWSError( case Aws::Http::HttpResponseCode::REQUESTED_RANGE_NOT_SATISFIABLE: TF_SetStatus(status, TF_OUT_OF_RANGE, "Read less bytes than requested"); break; + case Aws::Http::HttpResponseCode::NOT_FOUND: + TF_SetStatus(status, TF_NOT_FOUND, error.GetMessage().c_str()); + break; default: TF_SetStatus( status, TF_UNKNOWN, @@ -510,6 +515,64 @@ void NewWritableFile(const TF_Filesystem* filesystem, const char* path, TF_SetStatus(status, TF_OK, ""); } +void NewAppendableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { + Aws::String bucket, object; + ParseS3Path(path, false, &bucket, &object, status); + if (TF_GetCode(status) != TF_OK) return; + + auto s3_file = static_cast(filesystem->plugin_filesystem); + GetS3Client(s3_file); + GetTransferManager(Aws::Transfer::TransferDirection::UPLOAD, s3_file); + + // We need to delete `file->plugin_file` in case of errors. + std::unique_ptr writer( + file, [](TF_WritableFile* file) { + if (file != nullptr && file->plugin_file != nullptr) { + tf_writable_file::Cleanup(file); + } + }); + writer->plugin_file = new tf_writable_file::S3File( + bucket, object, s3_file->s3_client, + s3_file->transfer_managers[Aws::Transfer::TransferDirection::UPLOAD]); + TF_SetStatus(status, TF_OK, ""); + + // Wraping inside a `std::unique_ptr` to prevent memory-leaking. + std::unique_ptr reader( + new TF_RandomAccessFile, [](TF_RandomAccessFile* file) { + if (file != nullptr) { + tf_random_access_file::Cleanup(file); + delete file; + } + }); + NewRandomAccessFile(filesystem, path, reader.get(), status); + if (TF_GetCode(status) != TF_OK) return; + + uint64_t offset = 0; + std::string buffer(kS3ReadAppendableFileBufferSize, {}); + while (true) { + auto read = tf_random_access_file::Read(reader.get(), offset, + kS3ReadAppendableFileBufferSize, + &buffer[0], status); + if (TF_GetCode(status) == TF_NOT_FOUND) { + break; + } else if (TF_GetCode(status) == TF_OK) { + offset += read; + tf_writable_file::Append(file, buffer.c_str(), read, status); + if (TF_GetCode(status) != TF_OK) return; + } else if (TF_GetCode(status) == TF_OUT_OF_RANGE) { + offset += read; + tf_writable_file::Append(file, buffer.c_str(), read, status); + if (TF_GetCode(status) != TF_OK) return; + break; + } else { + return; + } + } + writer.release(); + TF_SetStatus(status, TF_OK, ""); +} + // TODO(vnvo2409): Implement later } // namespace tf_s3_filesystem