From 0cb65c025fbc6c8bb0715dc8c0ead9ba99111a0d Mon Sep 17 00:00:00 2001 From: Mingming Liu Date: Sat, 18 Jul 2020 16:29:23 -0700 Subject: [PATCH] Resolved merge conflict in core/kernels/BUILD --- tensorflow/core/kernels/BUILD | 4 ++ tensorflow/core/kernels/batch_kernels.cc | 1 + tensorflow/core/kernels/batching_util/BUILD | 24 ++++++++ .../batching_util/threadsafe_status.cc | 51 +++++++++++++++++ .../kernels/batching_util/threadsafe_status.h | 57 +++++++++++++++++++ .../batching_util/threadsafe_status_test.cc | 51 +++++++++++++++++ 6 files changed, 188 insertions(+) create mode 100644 tensorflow/core/kernels/batching_util/threadsafe_status.cc create mode 100644 tensorflow/core/kernels/batching_util/threadsafe_status.h create mode 100644 tensorflow/core/kernels/batching_util/threadsafe_status_test.cc diff --git a/tensorflow/core/kernels/BUILD b/tensorflow/core/kernels/BUILD index 7da864a6027..cbe0276003e 100644 --- a/tensorflow/core/kernels/BUILD +++ b/tensorflow/core/kernels/BUILD @@ -672,6 +672,10 @@ cc_library( "//tensorflow/core:protos_all_cc", "//tensorflow/core/kernels/batching_util:periodic_function_dynamic", "//tensorflow/core/kernels/batching_util:shared_batch_scheduler_hdrs", + "//tensorflow/core/kernels/batching_util:threadsafe_status", + "//tensorflow/core/util:incremental_barrier", + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/strings", ], alwayslink = 1, ) diff --git a/tensorflow/core/kernels/batch_kernels.cc b/tensorflow/core/kernels/batch_kernels.cc index 6449a399573..269b4d412cc 100644 --- a/tensorflow/core/kernels/batch_kernels.cc +++ b/tensorflow/core/kernels/batch_kernels.cc @@ -23,6 +23,7 @@ limitations under the License. #include "tensorflow/core/framework/types.h" #include "tensorflow/core/kernels/batching_util/periodic_function.h" #include "tensorflow/core/kernels/batching_util/shared_batch_scheduler.h" +#include "tensorflow/core/kernels/batching_util/threadsafe_status.h" #include "tensorflow/core/kernels/concat_lib.h" #include "tensorflow/core/kernels/ops_util.h" #include "tensorflow/core/kernels/split_lib.h" diff --git a/tensorflow/core/kernels/batching_util/BUILD b/tensorflow/core/kernels/batching_util/BUILD index 803eb2e9048..a23857cc8ce 100644 --- a/tensorflow/core/kernels/batching_util/BUILD +++ b/tensorflow/core/kernels/batching_util/BUILD @@ -52,6 +52,18 @@ cc_library( ], ) +cc_library( + name = "threadsafe_status", + srcs = ["threadsafe_status.cc"], + hdrs = ["threadsafe_status.h"], + deps = [ + "//tensorflow/core:lib", + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/status", + "@com_google_absl//absl/synchronization", + ], +) + tf_cc_test( name = "batch_scheduler_test", srcs = ["batch_scheduler_test.cc"], @@ -186,6 +198,18 @@ tf_cc_test( ], ) +tf_cc_test( + name = "threadsafe_status_test", + srcs = ["threadsafe_status_test.cc"], + deps = [ + ":threadsafe_status", + "//tensorflow/core:lib", + "//tensorflow/core:protos_all_cc", + "//tensorflow/core:test", + "//tensorflow/core:test_main", + ], +) + cc_library( name = "fake_clock_env", testonly = 1, diff --git a/tensorflow/core/kernels/batching_util/threadsafe_status.cc b/tensorflow/core/kernels/batching_util/threadsafe_status.cc new file mode 100644 index 00000000000..fa5cda7161b --- /dev/null +++ b/tensorflow/core/kernels/batching_util/threadsafe_status.cc @@ -0,0 +1,51 @@ +/* Copyright 2020 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/kernels/batching_util/threadsafe_status.h" + +#include "absl/base/thread_annotations.h" +#include "absl/status/status.h" +#include "absl/synchronization/mutex.h" +#include "tensorflow/core/platform/mutex.h" + +namespace tensorflow { +const Status& ThreadSafeStatus::status() const& { + tf_shared_lock lock(mutex_); + return status_; +} + +Status ThreadSafeStatus::status() && { + tf_shared_lock lock(mutex_); + return std::move(status_); +} + +void ThreadSafeStatus::Update(const Status& new_status) { + if (new_status.ok()) { + return; + } + + mutex_lock lock(mutex_); + status_.Update(new_status); +} + +void ThreadSafeStatus::Update(Status&& new_status) { + if (new_status.ok()) { + return; + } + + mutex_lock lock(mutex_); + status_.Update(std::forward(new_status)); +} +} // namespace tensorflow diff --git a/tensorflow/core/kernels/batching_util/threadsafe_status.h b/tensorflow/core/kernels/batching_util/threadsafe_status.h new file mode 100644 index 00000000000..c14a8a90714 --- /dev/null +++ b/tensorflow/core/kernels/batching_util/threadsafe_status.h @@ -0,0 +1,57 @@ +/* Copyright 2020 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_THREADSAFE_STATUS_H_ +#define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_THREADSAFE_STATUS_H_ + +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/status.h" +#include "tensorflow/core/platform/thread_annotations.h" + +namespace tensorflow { +// Wrapper class to allow both lock-free construction and concurrent updates on +// a 'status'. +// +// Example Usage: +// std::thread threads[2]; +// ThreadSafeStatus thread_safe_status; +// threads[0] = std::thread([&]() { +// status.Update(errors::Internal("internal error")); +// }); +// threads[1] = std::thread([&]() { +// status.Update(errors::InvalidArgument("invalid argument")); +// }); +// threads[0].Join(); +// threads[1].Join(); +// +// NOTE: +// When updated in a multi-threading setup, only the first error is retained. +class ThreadSafeStatus { + public: + const Status& status() const& TF_LOCKS_EXCLUDED(mutex_); + Status status() && TF_LOCKS_EXCLUDED(mutex_); + + // Retains the first error status: replaces the current status with + // `new_status` if `new_status` is not OK and the previous status is OK. + void Update(const Status& new_status) TF_LOCKS_EXCLUDED(mutex_); + void Update(Status&& new_status) TF_LOCKS_EXCLUDED(mutex_); + + private: + mutable mutex mutex_; + Status status_ TF_GUARDED_BY(mutex_); +}; +} // namespace tensorflow + +#endif // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_THREADSAFE_STATUS_H_ diff --git a/tensorflow/core/kernels/batching_util/threadsafe_status_test.cc b/tensorflow/core/kernels/batching_util/threadsafe_status_test.cc new file mode 100644 index 00000000000..e0c5d03c8a4 --- /dev/null +++ b/tensorflow/core/kernels/batching_util/threadsafe_status_test.cc @@ -0,0 +1,51 @@ +/* Copyright 2020 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/kernels/batching_util/threadsafe_status.h" + +#include "tensorflow/core/lib/core/status_test_util.h" +#include "tensorflow/core/platform/errors.h" +#include "tensorflow/core/platform/test.h" +#include "tensorflow/core/protobuf/error_codes.pb.h" + +namespace tensorflow { +namespace { + +TEST(ThreadSafeStatus, DefaultOk) { + ThreadSafeStatus status; + TF_EXPECT_OK(status.status()); +} + +TEST(ThreadSafeStatus, Update) { + ThreadSafeStatus status; + TF_EXPECT_OK(status.status()); + + status.Update(errors::FailedPrecondition("original error")); + EXPECT_EQ(status.status().code(), error::FAILED_PRECONDITION); + + status.Update(Status::OK()); + EXPECT_EQ(status.status().code(), error::FAILED_PRECONDITION); + + status.Update(errors::Internal("new error")); + EXPECT_EQ(status.status().code(), error::FAILED_PRECONDITION); +} + +TEST(ThreadSafeStatus, Move) { + ThreadSafeStatus status; + TF_EXPECT_OK(std::move(status).status()); +} + +} // namespace +} // namespace tensorflow