From feacc4de637054794ac0d4eacc7e43cc1d524aa0 Mon Sep 17 00:00:00 2001 From: Mihai Maruseac Date: Wed, 15 Apr 2020 08:50:41 -0700 Subject: [PATCH] Rollback of c609fda75ca2113708e6f7a9a53ec57544fe1ca0 which causes TSAN failures PiperOrigin-RevId: 306652032 Change-Id: Iabf3914e9dd8012bbc4814baaa7dc47ada64ca74 --- tensorflow/core/framework/dataset.h | 7 +-- tensorflow/core/framework/model.cc | 51 ++++++++++++++++--- tensorflow/core/framework/model.h | 22 ++++++-- .../core/kernels/data/captured_function.cc | 41 +++++++-------- .../core/kernels/data/captured_function.h | 3 +- .../experimental/choose_fastest_dataset_op.cc | 2 - .../experimental/map_and_batch_dataset_op.cc | 2 +- .../experimental/parse_example_dataset_op.cc | 8 ++- .../data/parallel_interleave_dataset_op.cc | 35 +++++-------- .../kernels/data/parallel_map_dataset_op.cc | 14 ++--- .../kernels/data/parallel_map_dataset_op.h | 3 +- 11 files changed, 109 insertions(+), 79 deletions(-) diff --git a/tensorflow/core/framework/dataset.h b/tensorflow/core/framework/dataset.h index 62594014dd1..ab159a92109 100644 --- a/tensorflow/core/framework/dataset.h +++ b/tensorflow/core/framework/dataset.h @@ -668,11 +668,6 @@ class IteratorBase { virtual Status RestoreInternal(IteratorContext* ctx, IteratorStateReader* reader) = 0; - // Returns a pointer to the node representing this iterator in the performance - // model. It may be null, if performance modeling is not enabled for this - // iterator. - std::shared_ptr model_node() const { return node_; } - // Returns the number of elements produced by this iterator. int64 num_elements() const { if (node_) return node_->num_elements(); @@ -689,7 +684,7 @@ class IteratorBase { const string& output_prefix); std::vector> cleanup_fns_; - std::shared_ptr node_ = nullptr; + model::Node* node_ = nullptr; // Not owned. const IteratorBase* parent_ = nullptr; // Not owned. int64 id_ = 0; int64 parent_id_ = 0; diff --git a/tensorflow/core/framework/model.cc b/tensorflow/core/framework/model.cc index fd322e238c3..ab2f8b17a28 100644 --- a/tensorflow/core/framework/model.cc +++ b/tensorflow/core/framework/model.cc @@ -696,8 +696,7 @@ string Node::DebugString() const { "\n"); strings::StrAppend(&result, " bytes_produced=", bytes_produced_.load(), "\n"); - strings::StrAppend(&result, " processing_time=", processing_time_.load(), - "\n"); + strings::StrAppend(&result, " processing_time=", processing_time_, "\n"); strings::StrAppend(&result, " num_elements=", num_elements_.load(), "\n"); string inputs; for (auto& input : inputs_) { @@ -736,9 +735,9 @@ std::shared_ptr Node::Snapshot(std::shared_ptr output) { result->bytes_produced_.store(bytes_produced_); result->num_elements_.store(num_elements_); result->record_metrics_.store(false); - result->processing_time_.store(processing_time_); mutex_lock l2(result->mu_); result->parameters_ = parameters_; + result->processing_time_ = processing_time_; } for (auto& input : inputs_) { result->add_input(input->Snapshot(result)); @@ -863,8 +862,7 @@ double Node::SelfProcessingTimeLocked() const { } void Model::AddNode(Node::Factory factory, const string& name, - const string& output_name, - std::shared_ptr* out_node) { + const string& output_name, Node** out_node) { // The name captures the sequence of iterators joined by `::`. We use the full // sequence as the key in the lookup table, but only the last element of the // sequence as the name node. @@ -896,7 +894,15 @@ void Model::AddNode(Node::Factory factory, const string& name, collect_resource_usage_ = collect_resource_usage_ || node->has_tunable_parameters(); lookup_table_.insert(std::make_pair(name, node)); - *out_node = node; + *out_node = node.get(); +} + +void Model::AddProcessingTime(const string& name, int64 delta) { + tf_shared_lock l(mu_); + auto node = gtl::FindOrNull(lookup_table_, name); + if (node) { + (*node)->add_processing_time(delta); + } } void Model::FlushMetrics() { @@ -906,6 +912,15 @@ void Model::FlushMetrics() { } } +int64 Model::NumElements(const string& name) { + tf_shared_lock l(mu_); + auto node = gtl::FindOrNull(lookup_table_, name); + if (node) { + return (*node)->num_elements(); + } + return 0; +} + void Model::Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget, int64 ram_budget) { switch (algorithm) { @@ -918,6 +933,30 @@ void Model::Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget, } } +void Model::RecordStart(const string& name, bool stop_output) { + tf_shared_lock l(mu_); + auto node = gtl::FindOrNull(lookup_table_, name); + if (collect_resource_usage_ && node) { + int64 now_nanos = absl::GetCurrentTimeNanos(); + if (stop_output && (*node)->output()) { + (*node)->output()->record_stop(now_nanos); + } + (*node)->record_start(now_nanos); + } +} + +void Model::RecordStop(const string& name, bool start_output) { + tf_shared_lock l(mu_); + auto node = gtl::FindOrNull(lookup_table_, name); + if (collect_resource_usage_ && node) { + int64 now_nanos = absl::GetCurrentTimeNanos(); + (*node)->record_stop(now_nanos); + if (start_output && (*node)->output()) { + (*node)->output()->record_start(now_nanos); + } + } +} + void Model::RemoveNode(const string& name) { mutex_lock l(mu_); auto node = gtl::FindOrNull(lookup_table_, name); diff --git a/tensorflow/core/framework/model.h b/tensorflow/core/framework/model.h index f8e4cb50f9d..d947a9d117b 100644 --- a/tensorflow/core/framework/model.h +++ b/tensorflow/core/framework/model.h @@ -133,7 +133,6 @@ class Node { bytes_consumed_(0), bytes_produced_(0), num_elements_(0), - processing_time_(0), record_metrics_(true), metrics_(name_), output_(args.output.get()) {} @@ -148,6 +147,7 @@ class Node { // Increments the aggregate processing time by the given delta. void add_processing_time(int64 delta) TF_LOCKS_EXCLUDED(mu_) { + mutex_lock l(mu_); processing_time_ += delta; } @@ -210,6 +210,7 @@ class Node { // Returns the aggregate processing time. int64 processing_time() const TF_LOCKS_EXCLUDED(mu_) { + tf_shared_lock l(mu_); return processing_time_; } @@ -417,10 +418,10 @@ class Node { std::atomic bytes_consumed_; std::atomic bytes_produced_; std::atomic num_elements_; - std::atomic processing_time_; std::atomic record_metrics_; Metrics metrics_; std::map> parameters_ TF_GUARDED_BY(mu_); + int64 processing_time_ TF_GUARDED_BY(mu_) = 0; std::map work_start_ TF_GUARDED_BY(mu_); // Statistic of inputs processing time history. @@ -490,16 +491,31 @@ class Model { // Adds a node with the given name and given output. The method returns // a pointer to the node but does not transfer ownership. void AddNode(Node::Factory factory, const string& name, - const string& output_name, std::shared_ptr* out_node) + const string& output_name, Node** out_node) + TF_LOCKS_EXCLUDED(mu_); + + // Increments the processing time for the given node.. + void AddProcessingTime(const string& name, int64 delta) TF_LOCKS_EXCLUDED(mu_); // Flushes metrics record by the model. void FlushMetrics() TF_LOCKS_EXCLUDED(mu_); + // Returns the number of elements that the input pipeline has produced. + int64 NumElements(const string& name) TF_LOCKS_EXCLUDED(mu_); + // Uses the given algorithm to perform the autotuning optimization. void Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget, int64 ram_budget) TF_LOCKS_EXCLUDED(mu_); + // Records that the given node has started work. If `stop_output` is set, it + // also records that the output of the given node has stopped work. + void RecordStart(const string& name, bool stop_output) TF_LOCKS_EXCLUDED(mu_); + + // Records that the given node has stopped work. If `stop_output` is set, it + // also records that the output of the given node has started work. + void RecordStop(const string& name, bool start_output) TF_LOCKS_EXCLUDED(mu_); + // Removes the given node. void RemoveNode(const string& name) TF_LOCKS_EXCLUDED(mu_); diff --git a/tensorflow/core/kernels/data/captured_function.cc b/tensorflow/core/kernels/data/captured_function.cc index c6b912b6736..97c8a2ac5e3 100644 --- a/tensorflow/core/kernels/data/captured_function.cc +++ b/tensorflow/core/kernels/data/captured_function.cc @@ -759,8 +759,7 @@ Status InstantiatedCapturedFunction::RunInstantiated( void InstantiatedCapturedFunction::RunAsync( IteratorContext* ctx, std::vector&& args, std::vector* rets, - FunctionLibraryRuntime::DoneCallback done, - const std::shared_ptr& node) const { + FunctionLibraryRuntime::DoneCallback done, const string& prefix) const { auto& info = captured_func_->short_circuit_info(); if (!info.indices.empty()) { // Run the `done` callback on a threadpool thread, because it will @@ -793,21 +792,18 @@ void InstantiatedCapturedFunction::RunAsync( f_opts.cancellation_manager = cancellation_manager.get(); std::shared_ptr stats_collector; - if (node || ctx->stats_aggregator()) { - stats_collector = std::make_shared(); + if (ctx->model() || ctx->stats_aggregator()) { + stats_collector = absl::make_unique(); } - const bool collect_usage = - node && ctx->model() && ctx->model()->collect_resource_usage(); f_opts.stats_collector = stats_collector.get(); // Transfer ownership of the cancellation manager to `callback`. CancellationManager* raw_cancellation_manager = cancellation_manager.release(); auto callback = std::bind( - [this, rets, step_container, raw_cancellation_manager, frame, node, - collect_usage]( + [this, rets, step_container, raw_cancellation_manager, frame]( const FunctionLibraryRuntime::DoneCallback& done, - IteratorContext* ctx, + IteratorContext* ctx, const string& prefix, const std::shared_ptr& stats_collector, // Begin unbound arguments. Status s) { @@ -817,30 +813,32 @@ void InstantiatedCapturedFunction::RunAsync( s = frame->ConsumeRetvals(rets); } delete frame; - if (node) { + if (ctx->model()) { // TODO(b/129085499) Utilize the `node_name` which would be unique // than the prefix for the function execution time statistics. // prefix_with_func_name would then be node_name + func_name. if (ctx->stats_aggregator()) { + string prefix_end = + str_util::Split(prefix, "::", str_util::SkipEmpty()).back(); string prefix_with_func_name = - strings::StrCat(node->name(), stats_utils::kDelimiter, + strings::StrCat(prefix_end, stats_utils::kDelimiter, captured_func_->func().name()); ctx->stats_aggregator()->AddToHistogram( stats_utils::ExecutionTimeHistogramName(prefix_with_func_name), {static_cast(stats_collector->processing_time())}, - node->num_elements()); + ctx->model()->NumElements(prefix)); } - node->add_processing_time(stats_collector->processing_time()); - } - if (collect_usage) { - node->record_start(EnvTime::NowNanos()); + ctx->model()->AddProcessingTime(prefix, + stats_collector->processing_time()); + ctx->model()->RecordStart(prefix, false /* stop_output */); } done(s); - if (collect_usage) { - node->record_stop(EnvTime::NowNanos()); + if (ctx->model()) { + ctx->model()->RecordStop(prefix, false /* start_output */); } }, - std::move(done), ctx, std::move(stats_collector), std::placeholders::_1); + std::move(done), ctx, prefix, std::move(stats_collector), + std::placeholders::_1); profiler::TraceMe activity( [&] { @@ -848,12 +846,7 @@ void InstantiatedCapturedFunction::RunAsync( "InstantiatedCapturedFunction::RunAsync#id=", f_opts.step_id, "#"); }, profiler::TraceMeLevel::kInfo); - // Stop the usage collection before calling `Run()` because `callback` may - // be executed synchronously, and so the `node->record_start()` call within - // `callback` would violate nesting. - if (collect_usage) node->record_stop(EnvTime::NowNanos()); lib_->Run(f_opts, f_handle_, frame, std::move(callback)); - if (collect_usage) node->record_start(EnvTime::NowNanos()); } bool InstantiatedCapturedFunction::ShouldCreateRendezvous() const { diff --git a/tensorflow/core/kernels/data/captured_function.h b/tensorflow/core/kernels/data/captured_function.h index 284a02091dd..564ab9418ee 100644 --- a/tensorflow/core/kernels/data/captured_function.h +++ b/tensorflow/core/kernels/data/captured_function.h @@ -21,7 +21,6 @@ limitations under the License. #include "tensorflow/core/framework/cancellation.h" #include "tensorflow/core/framework/dataset.h" #include "tensorflow/core/framework/function.h" -#include "tensorflow/core/framework/model.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/lib/core/status.h" @@ -96,7 +95,7 @@ class InstantiatedCapturedFunction { void RunAsync(IteratorContext* ctx, std::vector&& args, std::vector* rets, FunctionLibraryRuntime::DoneCallback done, - const std::shared_ptr& node) const; + const string& prefix) const; private: InstantiatedCapturedFunction( diff --git a/tensorflow/core/kernels/data/experimental/choose_fastest_dataset_op.cc b/tensorflow/core/kernels/data/experimental/choose_fastest_dataset_op.cc index 6ab72d85a99..f346dcc70c3 100644 --- a/tensorflow/core/kernels/data/experimental/choose_fastest_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/choose_fastest_dataset_op.cc @@ -319,8 +319,6 @@ class ChooseFastestDatasetOp : public DatasetOpKernel { } void RunnerThread(IteratorContext* ctx, InvocationResult* result, int i) { - RecordStart(ctx); - auto cleanup = gtl::MakeCleanup([this, ctx]() { RecordStop(ctx); }); int64 start = EnvTime::NowNanos(); Status s = input_impls_[i]->GetNext(ctx, &result->out_tensors, &result->end_of_sequence); diff --git a/tensorflow/core/kernels/data/experimental/map_and_batch_dataset_op.cc b/tensorflow/core/kernels/data/experimental/map_and_batch_dataset_op.cc index 38550730a0f..c016711bedc 100644 --- a/tensorflow/core/kernels/data/experimental/map_and_batch_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/map_and_batch_dataset_op.cc @@ -439,7 +439,7 @@ class MapAndBatchDatasetOp::Dataset : public DatasetBase { // `return_values`, and invoking `done` when finished. instantiated_captured_func_->RunAsync(ctx.get(), std::move(input_element), return_values.get(), - std::move(done), model_node()); + std::move(done), prefix()); } void CancelThreads(bool wait) TF_LOCKS_EXCLUDED(mu_) { diff --git a/tensorflow/core/kernels/data/experimental/parse_example_dataset_op.cc b/tensorflow/core/kernels/data/experimental/parse_example_dataset_op.cc index 943759fa4b5..c5972f11a38 100644 --- a/tensorflow/core/kernels/data/experimental/parse_example_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/parse_example_dataset_op.cc @@ -351,12 +351,10 @@ class ParseExampleDatasetOp : public UnaryDatasetOpKernel { Status CheckExternalState() override { return Status::OK(); } - void MapFunc(IteratorContext* ctx, - const std::shared_ptr& node, + void MapFunc(IteratorContext* ctx, const string& prefix, std::vector input, std::vector* output, StatusCallback callback) override { - (*ctx->runner())([this, ctx, node, input, output, - callback = std::move(callback)]() { + (*ctx->runner())([this, ctx, prefix, input, output, callback]() { thread::ThreadPool* device_threadpool = ctx->flr()->device()->tensorflow_cpu_worker_threads()->workers; std::vector slice_vec; @@ -425,7 +423,7 @@ class ParseExampleDatasetOp : public UnaryDatasetOpKernel { stats_aggregator->IncrementCounter( stats_utils::kFeatureValuesCount, "trainer", feature_stats.feature_values_count); - int64 steps = node ? node->num_elements() : 0; + int64 steps = ctx->model()->NumElements(prefix); stats_aggregator->AddToHistogram( stats_utils::FeatureHistogramName(dataset_->node_name()), {static_cast(feature_stats.features_count)}, steps); diff --git a/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc b/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc index 7afaf249718..e1677b95959 100644 --- a/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc +++ b/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc @@ -714,12 +714,6 @@ class ParallelInterleaveDatasetOp::Dataset : public DatasetBase { // Thread responsible for launching all worker threads. The thread stays // around after startup in case autotuning increases num_parallel_calls. void WorkerManagerThread() TF_LOCKS_EXCLUDED(mu_) { - RecordStart(ctx_.get()); - auto cleanup = gtl::MakeCleanup([this]() { - RecordStop(ctx_.get()); - mutex_lock l(*mu_); - DecrementOutstandingThreads(); - }); int initial_current_workers; // When elements are moved from `future_elements_` to `current_elements_`, // the future worker which created the element may continue to process @@ -754,6 +748,7 @@ class ParallelInterleaveDatasetOp::Dataset : public DatasetBase { RecordStart(ctx_.get()); } if (cancelled_ || end_of_input_) { + DecrementOutstandingThreads(); return; } IncrementOutstandingThreads(); @@ -1326,21 +1321,19 @@ class ParallelInterleaveDatasetOp::Dataset : public DatasetBase { Status s = Status::OK(); BlockingCounter counter(size); for (int idx = 0; idx < size; ++idx) { - threadpool->Schedule([this, ctx, reader, idx, name, &s, &counter, - elements] { - RecordStart(ctx); - auto cleanup = gtl::MakeCleanup([this, ctx]() { RecordStop(ctx); }); - std::shared_ptr elem; - Status ret_status = ReadElement(ctx, reader, idx, name, &elem); - mutex_lock l(*mu_); - if (!ret_status.ok()) { - s.Update(ret_status); - counter.DecrementCount(); - return; - } - (*elements)[idx] = elem; - counter.DecrementCount(); - }); + threadpool->Schedule( + [this, ctx, reader, idx, name, &s, &counter, elements] { + std::shared_ptr elem; + Status ret_status = ReadElement(ctx, reader, idx, name, &elem); + mutex_lock l(*mu_); + if (!ret_status.ok()) { + s.Update(ret_status); + counter.DecrementCount(); + return; + } + (*elements)[idx] = elem; + counter.DecrementCount(); + }); } counter.Wait(); return s; diff --git a/tensorflow/core/kernels/data/parallel_map_dataset_op.cc b/tensorflow/core/kernels/data/parallel_map_dataset_op.cc index 5dae096d5b5..ca547fb6339 100644 --- a/tensorflow/core/kernels/data/parallel_map_dataset_op.cc +++ b/tensorflow/core/kernels/data/parallel_map_dataset_op.cc @@ -194,22 +194,22 @@ class ParallelMapDatasetOp::Dataset : public DatasetBase { return dataset_->captured_func_->CheckExternalState(); } - void MapFunc(IteratorContext* ctx, const std::shared_ptr& node, + void MapFunc(IteratorContext* ctx, const string& prefix, std::vector input_element, std::vector* result, StatusCallback done) override { - auto map_func = [this](IteratorContext* ctx, - const std::shared_ptr& node, + auto map_func = [this](IteratorContext* ctx, const string& prefix, std::vector input_element, std::vector* result, StatusCallback done) { instantiated_captured_func_->RunAsync(ctx, std::move(input_element), - result, std::move(done), node); + result, std::move(done), prefix); }; if (!dataset_->captured_func_->use_inter_op_parallelism()) { - (*ctx->runner())(std::bind(map_func, ctx, node, + (*ctx->runner())(std::bind(map_func, ctx, prefix, std::move(input_element), result, std::move(done))); } else { - map_func(ctx, node, std::move(input_element), result, std::move(done)); + map_func(ctx, prefix, std::move(input_element), result, + std::move(done)); } } @@ -540,7 +540,7 @@ class ParallelMapIterator : public DatasetBaseIterator { // Apply the map function on `input_element`, storing the result in // `result->return_values`, and invoking `done` when finished. - parallel_map_functor_->MapFunc(ctx.get(), model_node(), + parallel_map_functor_->MapFunc(ctx.get(), prefix(), std::move(input_element), &result->return_values, std::move(done)); } diff --git a/tensorflow/core/kernels/data/parallel_map_dataset_op.h b/tensorflow/core/kernels/data/parallel_map_dataset_op.h index ca04b0791bb..064ccb5f812 100644 --- a/tensorflow/core/kernels/data/parallel_map_dataset_op.h +++ b/tensorflow/core/kernels/data/parallel_map_dataset_op.h @@ -77,8 +77,7 @@ class ParallelMapFunctor { // 2. A `std::vector` containing the input element. // 3. A `std::vector*` to which the function will write the result. // 4. A `StatusCallback` that should be invoked when the function is complete. - virtual void MapFunc(IteratorContext* ctx, - const std::shared_ptr& node, + virtual void MapFunc(IteratorContext* ctx, const string& prefix, std::vector input, std::vector* output, StatusCallback callback) = 0; };