Added relevant changes for retrying operations in the s3 file system

Add header as in master for building the test, and change error type for delete dir
Register retrying file system for s3
Compile retrying s3 file system
Add forbidden errors, add mtime to the stat function, add if check around creating file in createDir
Move read error bug to separate PR for easier reviews

Lint fix with buildifier
This commit is contained in:
Rahul Huilgol 2019-12-10 18:31:43 -08:00
parent 21548e548d
commit d6a731b46a
14 changed files with 176 additions and 92 deletions

View File

@ -745,6 +745,62 @@ cc_library(
] + tf_platform_deps("unbounded_work_queue"),
)
cc_library(
name = "retrying_utils",
srcs = [
"retrying_utils.cc",
],
hdrs = [
"retrying_utils.h",
],
copts = tf_copts(),
deps = [
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core:lib_internal",
],
)
cc_library(
name = "retrying_file_system",
hdrs = [
"retrying_file_system.h",
],
copts = tf_copts(),
deps = [
":retrying_utils",
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core:lib_internal",
],
)
cc_test(
name = "retrying_file_system_test",
size = "small",
srcs = ["retrying_file_system_test.cc"],
deps = [
":retrying_file_system",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
"//tensorflow/core/platform:str_util",
],
)
cc_test(
name = "retrying_utils_test",
size = "small",
srcs = ["retrying_utils_test.cc"],
deps = [
":retrying_utils",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
"//tensorflow/core/platform:str_util",
],
)
# This is a hacky, do-nothing, binary that makes it easy to verify ability to
# build, link, and in some cases run all of the libraries under platform.
# Realistically, most of this would be covered by tests but at this point
@ -776,6 +832,8 @@ cc_binary(
":png",
":prefetch",
":protobuf",
":retrying_utils",
":retrying_file_system",
":scanner",
":setround",
":stacktrace",

View File

@ -93,14 +93,14 @@ cc_library(
":google_auth_provider",
":http_request",
":ram_file_block_cache",
":retrying_file_system",
":retrying_utils",
":time_util",
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core/platform:numbers",
"//tensorflow/core/platform:path",
"//tensorflow/core/platform:retrying_file_system",
"//tensorflow/core/platform:retrying_utils",
"//tensorflow/core/platform:str_util",
"//tensorflow/core/platform:stringprintf",
"@jsoncpp_git//:jsoncpp",
@ -129,14 +129,14 @@ cc_library(
":google_auth_provider",
":http_request",
":ram_file_block_cache",
":retrying_file_system",
":retrying_utils",
":time_util",
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core/platform:numbers",
"//tensorflow/core/platform:path",
"//tensorflow/core/platform:retrying_file_system",
"//tensorflow/core/platform:retrying_utils",
"//tensorflow/core/platform:str_util",
"//tensorflow/core/platform:stringprintf",
"@jsoncpp_git//:jsoncpp",
@ -201,12 +201,12 @@ cc_library(
deps = [
":compute_engine_metadata_client",
":oauth_client",
":retrying_utils",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core/platform:base64",
"//tensorflow/core/platform:errors",
"//tensorflow/core/platform:path",
"//tensorflow/core/platform:retrying_utils",
"//tensorflow/core/platform:status",
"@com_google_absl//absl/strings",
"@jsoncpp_git//:jsoncpp",
@ -225,9 +225,9 @@ cc_library(
deps = [
":curl_http_request",
":http_request",
":retrying_utils",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core/platform:retrying_utils",
],
)
@ -284,34 +284,6 @@ cc_library(
],
)
cc_library(
name = "retrying_utils",
srcs = [
"retrying_utils.cc",
],
hdrs = [
"retrying_utils.h",
],
copts = tf_copts(),
deps = [
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core:lib_internal",
],
)
cc_library(
name = "retrying_file_system",
hdrs = [
"retrying_file_system.h",
],
copts = tf_copts(),
deps = [
":retrying_utils",
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core:lib_internal",
],
)
cc_library(
name = "time_util",
srcs = [
@ -481,20 +453,6 @@ tf_cc_test(
],
)
tf_cc_test(
name = "retrying_file_system_test",
size = "small",
srcs = ["retrying_file_system_test.cc"],
deps = [
":retrying_file_system",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
"//tensorflow/core/platform:str_util",
],
)
tf_cc_test(
name = "time_util_test",
size = "small",
@ -505,17 +463,3 @@ tf_cc_test(
"//tensorflow/core:test_main",
],
)
tf_cc_test(
name = "retrying_utils_test",
size = "small",
srcs = ["retrying_utils_test.cc"],
deps = [
":retrying_utils",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
"//tensorflow/core/platform:str_util",
],
)

View File

@ -17,7 +17,7 @@ limitations under the License.
#define TENSORFLOW_CORE_PLATFORM_CLOUD_COMPUTE_ENGINE_METADATA_CLIENT_H_
#include "tensorflow/core/platform/cloud/http_request.h"
#include "tensorflow/core/platform/cloud/retrying_utils.h"
#include "tensorflow/core/platform/retrying_utils.h"
#include "tensorflow/core/platform/status.h"
namespace tensorflow {

View File

@ -32,7 +32,7 @@ limitations under the License.
#include "tensorflow/core/platform/cloud/file_block_cache.h"
#include "tensorflow/core/platform/cloud/google_auth_provider.h"
#include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
#include "tensorflow/core/platform/cloud/retrying_utils.h"
#include "tensorflow/core/platform/retrying_utils.h"
#include "tensorflow/core/platform/cloud/time_util.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/errors.h"

View File

@ -29,7 +29,7 @@ limitations under the License.
#include "tensorflow/core/platform/cloud/gcs_dns_cache.h"
#include "tensorflow/core/platform/cloud/gcs_throttle.h"
#include "tensorflow/core/platform/cloud/http_request.h"
#include "tensorflow/core/platform/cloud/retrying_file_system.h"
#include "tensorflow/core/platform/retrying_file_system.h"
#include "tensorflow/core/platform/file_system.h"
#include "tensorflow/core/platform/status.h"

View File

@ -26,7 +26,7 @@ limitations under the License.
#include "absl/strings/match.h"
#include "include/json/json.h"
#include "tensorflow/core/platform/base64.h"
#include "tensorflow/core/platform/cloud/retrying_utils.h"
#include "tensorflow/core/platform/retrying_utils.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/errors.h"
#include "tensorflow/core/platform/path.h"

View File

@ -21,7 +21,7 @@ limitations under the License.
#include <vector>
#include "tensorflow/core/lib/random/random.h"
#include "tensorflow/core/platform/cloud/retrying_utils.h"
#include "tensorflow/core/platform/retrying_utils.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/errors.h"
#include "tensorflow/core/platform/file_system.h"

View File

@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/cloud/retrying_file_system.h"
#include "tensorflow/core/platform/retrying_file_system.h"
#include <fstream>

View File

@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/cloud/retrying_utils.h"
#include "tensorflow/core/platform/retrying_utils.h"
#include "tensorflow/core/lib/random/random.h"
#include "tensorflow/core/platform/env.h"

View File

@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/cloud/retrying_utils.h"
#include "tensorflow/core/platform/retrying_utils.h"
#include <fstream>

View File

@ -33,6 +33,8 @@ tf_cc_binary(
linkshared = 1,
deps = [
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core/platform:retrying_file_system",
"//tensorflow/core/platform:retrying_utils",
"@aws",
"@com_google_protobuf//:protobuf_headers",
"@curl",

View File

@ -262,8 +262,14 @@ class S3WritableFile : public WritableFile {
outfile_->clear();
outfile_->seekp(offset);
if (!putObjectOutcome.IsSuccess()) {
return errors::Unknown(putObjectOutcome.GetError().GetExceptionName(),
": ", putObjectOutcome.GetError().GetMessage());
auto error = putObjectOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::Unknown(error.GetExceptionName(), ": ", error.GetMessage());
}
}
sync_needed_ = false;
return Status::OK();
@ -417,8 +423,13 @@ Status S3FileSystem::GetChildren(const string& dir,
auto listObjectsOutcome =
this->GetS3Client()->ListObjects(listObjectsRequest);
if (!listObjectsOutcome.IsSuccess()) {
return errors::Unknown(listObjectsOutcome.GetError().GetExceptionName(),
": ", listObjectsOutcome.GetError().GetMessage());
auto error = listObjectsOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::Unknown(error.GetExceptionName(), ": ", error.GetMessage());
}
}
listObjectsResult = listObjectsOutcome.GetResult();
@ -452,8 +463,13 @@ Status S3FileSystem::Stat(const string& fname, FileStatistics* stats) {
headBucketRequest.WithBucket(bucket.c_str());
auto headBucketOutcome = this->GetS3Client()->HeadBucket(headBucketRequest);
if (!headBucketOutcome.IsSuccess()) {
return errors::Unknown(headBucketOutcome.GetError().GetExceptionName(),
": ", headBucketOutcome.GetError().GetMessage());
auto error = headBucketOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::Unknown(error.GetExceptionName(), ": ", error.GetMessage());
}
}
stats->length = 0;
stats->is_directory = 1;
@ -473,6 +489,12 @@ Status S3FileSystem::Stat(const string& fname, FileStatistics* stats) {
stats->mtime_nsec =
headObjectOutcome.GetResult().GetLastModified().Millis() * 1e6;
found = true;
} else {
auto error = headObjectOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
}
}
string prefix = object;
if (prefix.back() != '/') {
@ -487,11 +509,19 @@ Status S3FileSystem::Stat(const string& fname, FileStatistics* stats) {
auto listObjectsOutcome =
this->GetS3Client()->ListObjects(listObjectsRequest);
if (listObjectsOutcome.IsSuccess()) {
if (listObjectsOutcome.GetResult().GetContents().size() > 0) {
auto listObjects = listObjectsOutcome.GetResult().GetContents();
if (listObjects.size() > 0) {
stats->length = 0;
stats->is_directory = 1;
stats->mtime_nsec = listObjects[0].GetLastModified().Millis() * 1e6;
found = true;
}
} else {
auto error = listObjectsOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
}
}
if (!found) {
return errors::NotFound("Object ", fname, " does not exist");
@ -514,8 +544,13 @@ Status S3FileSystem::DeleteFile(const string& fname) {
auto deleteObjectOutcome =
this->GetS3Client()->DeleteObject(deleteObjectRequest);
if (!deleteObjectOutcome.IsSuccess()) {
return errors::Unknown(deleteObjectOutcome.GetError().GetExceptionName(),
": ", deleteObjectOutcome.GetError().GetMessage());
auto error = deleteObjectOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::Unknown(error.GetExceptionName(), ": ", error.GetMessage());
}
}
return Status::OK();
}
@ -529,7 +564,13 @@ Status S3FileSystem::CreateDir(const string& dirname) {
headBucketRequest.WithBucket(bucket.c_str());
auto headBucketOutcome = this->GetS3Client()->HeadBucket(headBucketRequest);
if (!headBucketOutcome.IsSuccess()) {
return errors::NotFound("The bucket ", bucket, " was not found.");
auto error = headBucketOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::NotFound("The bucket ", bucket, " was not found.");
}
}
return Status::OK();
}
@ -537,9 +578,11 @@ Status S3FileSystem::CreateDir(const string& dirname) {
if (filename.back() != '/') {
filename.push_back('/');
}
std::unique_ptr<WritableFile> file;
TF_RETURN_IF_ERROR(NewWritableFile(filename, &file));
TF_RETURN_IF_ERROR(file->Close());
if (!this->FileExists(filename).ok()) {
std::unique_ptr <WritableFile> file;
TF_RETURN_IF_ERROR(NewWritableFile(filename, &file));
TF_RETURN_IF_ERROR(file->Close());
}
return Status::OK();
}
@ -563,7 +606,9 @@ Status S3FileSystem::DeleteDir(const string& dirname) {
auto contents = listObjectsOutcome.GetResult().GetContents();
if (contents.size() > 1 ||
(contents.size() == 1 && contents[0].GetKey() != prefix.c_str())) {
return errors::FailedPrecondition("Cannot delete a non-empty directory.");
return errors::Unknown("Cannot delete a non-empty directory. "
"This operation will be retried in case this "
"is due to S3's eventual consistency.");
}
if (contents.size() == 1 && contents[0].GetKey() == prefix.c_str()) {
string filename = dirname;
@ -572,6 +617,12 @@ Status S3FileSystem::DeleteDir(const string& dirname) {
}
return DeleteFile(filename);
}
} else {
auto error = listObjectsOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
}
}
return Status::OK();
}
@ -613,8 +664,13 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) {
auto listObjectsOutcome =
this->GetS3Client()->ListObjects(listObjectsRequest);
if (!listObjectsOutcome.IsSuccess()) {
return errors::Unknown(listObjectsOutcome.GetError().GetExceptionName(),
": ", listObjectsOutcome.GetError().GetMessage());
auto error = listObjectsOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::Unknown(error.GetExceptionName(), ": ", error.GetMessage());
}
}
listObjectsResult = listObjectsOutcome.GetResult();
@ -632,8 +688,13 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) {
auto copyObjectOutcome =
this->GetS3Client()->CopyObject(copyObjectRequest);
if (!copyObjectOutcome.IsSuccess()) {
return errors::Unknown(copyObjectOutcome.GetError().GetExceptionName(),
": ", copyObjectOutcome.GetError().GetMessage());
auto error = copyObjectOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::Unknown(error.GetExceptionName(), ": ", error.GetMessage());
}
}
deleteObjectRequest.SetBucket(src_bucket.c_str());
@ -642,9 +703,13 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) {
auto deleteObjectOutcome =
this->GetS3Client()->DeleteObject(deleteObjectRequest);
if (!deleteObjectOutcome.IsSuccess()) {
return errors::Unknown(
deleteObjectOutcome.GetError().GetExceptionName(), ": ",
deleteObjectOutcome.GetError().GetMessage());
auto error = deleteObjectOutcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return errors::FailedPrecondition("AWS Credentials have not been set properly. "
"Unable to access the specified S3 location");
} else {
return errors::Unknown(error.GetExceptionName(), ": ", error.GetMessage());
}
}
}
listObjectsRequest.SetMarker(listObjectsResult.GetNextMarker());
@ -653,6 +718,6 @@ Status S3FileSystem::RenameFile(const string& src, const string& target) {
return Status::OK();
}
REGISTER_FILE_SYSTEM("s3", S3FileSystem);
REGISTER_FILE_SYSTEM("s3", RetryingS3FileSystem);
} // namespace tensorflow

View File

@ -19,6 +19,7 @@ limitations under the License.
#include <aws/s3/S3Client.h>
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/retrying_file_system.h"
namespace tensorflow {
@ -80,6 +81,20 @@ class S3FileSystem : public FileSystem {
mutex client_lock_;
};
/// S3 implementation of a file system with retry on failures.
class RetryingS3FileSystem : public RetryingFileSystem<S3FileSystem> {
public:
RetryingS3FileSystem()
: RetryingFileSystem(std::unique_ptr<S3FileSystem>(new S3FileSystem),
RetryConfig(
100000 /* init_delay_time_us */,
32000000 /* max_delay_time_us */,
10 /* max_retries */
)) {}
};
} // namespace tensorflow
#endif // TENSORFLOW_CONTRIB_S3_S3_FILE_SYSTEM_H_