Add log to s3 filesystem

This commit is contained in:
Vo Van Nghia 2020-08-13 00:16:34 +07:00
parent 4e3283a891
commit 22609c8683
3 changed files with 33 additions and 0 deletions

View File

@ -217,6 +217,7 @@ cc_library(
name = "logging",
srcs = ["logging.cc"],
hdrs = ["logging.h"],
visibility = ["//visibility:public"],
deps = [
":c_api_macros",
"//tensorflow/core/platform:logging",

View File

@ -26,6 +26,7 @@ cc_library(
}),
deps = [
":aws_crypto",
"//tensorflow/c:logging",
"//tensorflow/c:tf_status",
"//tensorflow/c/experimental/filesystem:filesystem_interface",
"@aws",

View File

@ -38,6 +38,7 @@ limitations under the License.
#include "absl/strings/str_cat.h"
#include "tensorflow/c/experimental/filesystem/filesystem_interface.h"
#include "tensorflow/c/experimental/filesystem/plugins/s3/aws_crypto.h"
#include "tensorflow/c/logging.h"
#include "tensorflow/c/tf_status.h"
// Implementation of a filesystem for S3 environments.
@ -281,6 +282,7 @@ void Cleanup(TF_RandomAccessFile* file) {
static int64_t ReadS3Client(S3File* s3_file, uint64_t offset, size_t n,
char* buffer, TF_Status* status) {
TF_VLog(3, "ReadFile using S3Client\n");
Aws::S3::Model::GetObjectRequest get_object_request;
get_object_request.WithBucket(s3_file->bucket).WithKey(s3_file->object);
Aws::String bytes =
@ -306,12 +308,14 @@ static int64_t ReadS3Client(S3File* s3_file, uint64_t offset, size_t n,
static int64_t ReadS3TransferManager(S3File* s3_file, uint64_t offset, size_t n,
char* buffer, TF_Status* status) {
TF_VLog(3, "Using TransferManager\n");
auto create_download_stream = [&]() {
return Aws::New<TFS3UnderlyingStream>(
"S3ReadStream",
Aws::New<Aws::Utils::Stream::PreallocatedStreamBuf>(
"S3ReadStream", reinterpret_cast<unsigned char*>(buffer), n));
};
TF_VLog(3, "Created stream to read with transferManager\n");
auto handle = s3_file->transfer_manager->DownloadFile(
s3_file->bucket, s3_file->object, offset, n, create_download_stream);
handle->WaitUntilFinished();
@ -322,6 +326,10 @@ static int64_t ReadS3TransferManager(S3File* s3_file, uint64_t offset, size_t n,
Aws::Http::HttpResponseCode::REQUESTED_RANGE_NOT_SATISFIABLE &&
retries++ < kDownloadRetries) {
// Only failed parts will be downloaded again.
TF_VLog(
1,
"Retrying read of s3://%s/%s after failure. Current retry count: %u\n",
s3_file->bucket.c_str(), s3_file->object.c_str(), retries);
s3_file->transfer_manager->RetryDownload(handle);
handle->WaitUntilFinished();
}
@ -341,6 +349,8 @@ static int64_t ReadS3TransferManager(S3File* s3_file, uint64_t offset, size_t n,
int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
char* buffer, TF_Status* status) {
auto s3_file = static_cast<S3File*>(file->plugin_file);
TF_VLog(1, "ReadFilefromS3 s3://%s/%s from %u for n: %u\n",
s3_file->bucket.c_str(), s3_file->object.c_str(), offset, n);
if (s3_file->use_multi_part_download)
return ReadS3TransferManager(s3_file, offset, n, buffer, status);
else
@ -416,6 +426,8 @@ void Sync(const TF_WritableFile* file, TF_Status* status) {
TF_SetStatus(status, TF_OK, "");
return;
}
TF_VLog(1, "WriteFileToS3: s3://%s/%s\n", s3_file->bucket.c_str(),
s3_file->object.c_str());
auto position = static_cast<int64_t>(s3_file->outfile->tellp());
auto handle = s3_file->transfer_manager->UploadFile(
s3_file->outfile, s3_file->bucket, s3_file->object,
@ -426,6 +438,10 @@ void Sync(const TF_WritableFile* file, TF_Status* status) {
while (handle->GetStatus() == Aws::Transfer::TransferStatus::FAILED &&
retries++ < kUploadRetries) {
// if multipart upload was used, only the failed parts will be re-sent
TF_VLog(1,
"Retrying upload of s3://%s/%s after failure. Current retry count: "
"%u\n",
s3_file->bucket.c_str(), s3_file->object.c_str(), retries);
s3_file->transfer_manager->RetryUpload(s3_file->outfile, handle);
handle->WaitUntilFinished();
}
@ -613,6 +629,7 @@ void NewAppendableFile(const TF_Filesystem* filesystem, const char* path,
void Stat(const TF_Filesystem* filesystem, const char* path,
TF_FileStatistics* stats, TF_Status* status) {
TF_VLog(1, "Stat on path: %s\n", path);
Aws::String bucket, object;
ParseS3Path(path, true, &bucket, &object, status);
if (TF_GetCode(status) != TF_OK) return;
@ -737,6 +754,8 @@ static void SimpleCopyFile(const Aws::String& source,
const Aws::String& bucket_dst,
const Aws::String& object_dst, S3File* s3_file,
TF_Status* status) {
TF_VLog(1, "SimpleCopyFile from %s to %s/%s\n", bucket_dst.c_str(),
object_dst.c_str());
Aws::S3::Model::CopyObjectRequest copy_object_request;
copy_object_request.WithCopySource(source)
.WithBucket(bucket_dst)
@ -801,6 +820,8 @@ static void MultiPartCopy(const Aws::String& source,
const Aws::String& object_dst, const size_t num_parts,
const uint64_t file_size, S3File* s3_file,
TF_Status* status) {
TF_VLog(1, "MultiPartCopy from %s to %s/%s\n", bucket_dst.c_str(),
object_dst.c_str());
Aws::S3::Model::CreateMultipartUploadRequest create_multipart_upload_request;
create_multipart_upload_request.WithBucket(bucket_dst).WithKey(object_dst);
@ -827,6 +848,8 @@ static void MultiPartCopy(const Aws::String& source,
auto chunk_size =
s3_file->multi_part_chunk_sizes[Aws::Transfer::TransferDirection::UPLOAD];
TF_VLog(1, "Copying from %s in %u parts of size %u each\n", source.c_str(),
num_parts, chunk_size);
size_t retries = 0;
while (retries++ < 3) {
// Queue up parts.
@ -891,6 +914,9 @@ static void MultiPartCopy(const Aws::String& source,
status);
} else {
// Retry.
TF_Log(TF_ERROR,
"Retrying failed copy of part %u due to an error with S3\n",
part_number);
num_finished_parts--;
}
}
@ -967,6 +993,7 @@ void CopyFile(const TF_Filesystem* filesystem, const char* src, const char* dst,
void DeleteFile(const TF_Filesystem* filesystem, const char* path,
TF_Status* status) {
TF_VLog(1, "DeleteFile: %s\n", path);
Aws::String bucket, object;
ParseS3Path(path, false, &bucket, &object, status);
if (TF_GetCode(status) != TF_OK) return;
@ -985,6 +1012,7 @@ void DeleteFile(const TF_Filesystem* filesystem, const char* path,
void CreateDir(const TF_Filesystem* filesystem, const char* path,
TF_Status* status) {
TF_VLog(1, "CreateDir: %s\n", path);
Aws::String bucket, object;
ParseS3Path(path, true, &bucket, &object, status);
if (TF_GetCode(status) != TF_OK) return;
@ -1026,6 +1054,7 @@ void CreateDir(const TF_Filesystem* filesystem, const char* path,
void DeleteDir(const TF_Filesystem* filesystem, const char* path,
TF_Status* status) {
TF_VLog(1, "DeleteDir: %s\n", path);
Aws::String bucket, object;
ParseS3Path(path, false, &bucket, &object, status);
if (TF_GetCode(status) != TF_OK) return;
@ -1060,6 +1089,7 @@ void DeleteDir(const TF_Filesystem* filesystem, const char* path,
void RenameFile(const TF_Filesystem* filesystem, const char* src,
const char* dst, TF_Status* status) {
TF_VLog(1, "RenameFile from: %s to %s\n", src, dst);
Aws::String bucket_src, object_src;
ParseS3Path(src, false, &bucket_src, &object_src, status);
if (TF_GetCode(status) != TF_OK) return;
@ -1120,6 +1150,7 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
int GetChildren(const TF_Filesystem* filesystem, const char* path,
char*** entries, TF_Status* status) {
TF_VLog(1, "GetChildren for path: %s\n", path);
Aws::String bucket, prefix;
ParseS3Path(path, true, &bucket, &prefix, status);
if (TF_GetCode(status) != TF_OK) return -1;