From 932874df5ff31e38fa6d38fc87d100525dd46e50 Mon Sep 17 00:00:00 2001 From: Jiri Simsa Date: Sun, 28 Apr 2019 13:44:58 -0700 Subject: [PATCH] Internal change PiperOrigin-RevId: 245658213 --- tensorflow/core/framework/dataset.h | 16 ++ tensorflow/core/framework/model.cc | 185 ++++++++++-------- tensorflow/core/framework/model.h | 115 ++++++++--- tensorflow/core/framework/model_test.cc | 125 ++++++------ .../data/parallel_interleave_dataset_op.cc | 55 +++--- .../python/data/experimental/benchmarks/BUILD | 1 - .../benchmarks/autotune_benchmark.py | 10 +- 7 files changed, 304 insertions(+), 203 deletions(-) diff --git a/tensorflow/core/framework/dataset.h b/tensorflow/core/framework/dataset.h index 6de3e48b950..67447194c8c 100644 --- a/tensorflow/core/framework/dataset.h +++ b/tensorflow/core/framework/dataset.h @@ -766,6 +766,22 @@ class DatasetBaseIterator : public IteratorBase { return model::MakeUnknownNode(std::move(args)); } + // When modeling is enabled, this method disables autotuning for the given + // iterator (and the transitive closure of its inputs). + void DisableAutotune(IteratorContext* ctx, IteratorBase* iterator) { + if (iterator->node_) { + iterator->node_->set_autotune(false); + } + } + + // When modeling is enabled, this method enables autotuning for the given + // iterator (and the transitive closure of its inputs). + void EnableAutotune(IteratorContext* ctx, IteratorBase* iterator) { + if (iterator->node_) { + iterator->node_->set_autotune(true); + } + } + // When modeling is enabled, this method records the fact that this iterator // has dequeued an element from an internal buffer. void RecordBufferDequeue(IteratorContext* ctx, diff --git a/tensorflow/core/framework/model.cc b/tensorflow/core/framework/model.cc index 6cff7e59fe5..dd769cfb85c 100644 --- a/tensorflow/core/framework/model.cc +++ b/tensorflow/core/framework/model.cc @@ -41,7 +41,8 @@ namespace { // The formula used for computing the probability is derived by modeling the // problem as an M/M/1/K queue // (https://en.wikipedia.org/wiki/Birth%E2%80%93death_process#M/M/1/K_queue). -int64 ComputeWaitTime(int64 output_time, int64 input_time, int64 buffer_size) { +double ComputeWaitTime(double output_time, double input_time, + int64 buffer_size) { if (output_time == 0 || input_time == 0) { return output_time; } @@ -75,34 +76,40 @@ class InterleaveMany : public Node { Args{id_, name_, std::move(output)}); } - int64 OutputTimeLocked(std::vector* input_times) const override + // The output time is the sum of the self processing time and the average + // output time of inputs comprising the interleave "cycle". + double OutputTimeLocked(std::vector* input_times) const override SHARED_LOCKS_REQUIRED(mu_) { if (inputs_.size() <= 1) { - return NanosPerElementLocked(); + return SelfProcessingTimeLocked(); } - int64 delta = NanosPerElementLocked() * (inputs_.size() - 1); + double delta = SelfProcessingTimeLocked() * (inputs_.size() - 1); input_times->back() += delta; auto cleanup = gtl::MakeCleanup( [input_times, delta]() { input_times->back() -= delta; }); - int64 output_time = - static_cast(OutputTimeForInputs(input_times) - - inputs_.front()->OutputTime(input_times)) / - static_cast(inputs_.size() - 1); - return NanosPerElementLocked() + output_time; + double output_time = (OutputTimeForInputs(input_times) - + inputs_.front()->OutputTime(input_times)) / + static_cast(inputs_.size() - 1); + return SelfProcessingTimeLocked() + output_time; } - int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { + // The processing time is the sum of the self processing time and the average + // processing time of inputs comprising the interleave "cycle". + double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { if (inputs_.size() <= 1) { - return NanosPerElementLocked(); + return SelfProcessingTimeLocked(); } - int64 processing_time = - static_cast(ProcessingTimeForInputs() - - inputs_.front()->ProcessingTime()) / + double processing_time = + (ProcessingTimeForInputs() - inputs_.front()->TotalProcessingTime()) / static_cast(inputs_.size() - 1); - return NanosPerElementLocked() + processing_time; + return SelfProcessingTimeLocked() + processing_time; } }; +// The first input of AsyncInterleaveMany corresponds to the input dataset whose +// elements are used to create the (derived) input datasets whose elements are +// interleaved as output. +// // TODO(jsimsa): model the first input class AsyncInterleaveMany : public Node { public: @@ -127,14 +134,19 @@ class AsyncInterleaveMany : public Node { Args{id_, name_, std::move(output)}, parameters); } - int64 OutputTimeLocked(std::vector* input_times) const override + // The output time is estimated using `ComputeWaitTime(output_time, + // input_time, parallelism)`, where `output_time` is the sum of the + // self-processing time and the average output time of inputs comprising the + // interleave "cycle", `input_time` is specified through `input_times` and + // `buffer_size` is derived from parallelism. + double OutputTimeLocked(std::vector* input_times) const override SHARED_LOCKS_REQUIRED(mu_) { if (inputs_.size() <= 1) { - return NanosPerElementLocked(); + return SelfProcessingTimeLocked(); } - int64 old_input_time = input_times->back(); - int64 new_input_time = static_cast(NanosPerElementLocked()) * - static_cast(inputs_.size() - 1); + double old_input_time = input_times->back(); + double new_input_time = + SelfProcessingTimeLocked() * static_cast(inputs_.size() - 1); input_times->push_back(new_input_time); auto cleanup = gtl::MakeCleanup([input_times]() { input_times->pop_back(); }); @@ -143,23 +155,23 @@ class AsyncInterleaveMany : public Node { parallelism = std::min(static_cast(parallelism), static_cast((*parameter)->value)); } - int64 output_time = - static_cast(OutputTimeForInputs(input_times) - - inputs_.front()->OutputTime(input_times)) / - static_cast(inputs_.size() - 1) / parallelism; - return ComputeWaitTime(NanosPerElementLocked() + output_time, + double output_time = (OutputTimeForInputs(input_times) - + inputs_.front()->OutputTime(input_times)) / + static_cast(num_inputs() - 1) / parallelism; + return ComputeWaitTime(SelfProcessingTimeLocked() + output_time, old_input_time, parallelism); } - int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { + // The processing time is the sum of the self processing time and the average + // processing time of inputs comprising the interleave "cycle". + double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { if (inputs_.size() <= 1) { - return NanosPerElementLocked(); + return SelfProcessingTimeLocked(); } - int64 processing_time = - ProcessingTimeForInputs() - inputs_.front()->ProcessingTime(); - return NanosPerElementLocked() + - static_cast(processing_time) / - static_cast(inputs_.size() - 1); + double processing_time = + ProcessingTimeForInputs() - inputs_.front()->TotalProcessingTime(); + return SelfProcessingTimeLocked() + + processing_time / static_cast(num_inputs() - 1); } }; @@ -176,22 +188,27 @@ class KnownRatio : public Node { ratio_); } - int64 OutputTimeLocked(std::vector* input_times) const override + // The output time is the sum of the self processing time and the product of + // `ratio_` and the sum of output times of inputs. + double OutputTimeLocked(std::vector* input_times) const override SHARED_LOCKS_REQUIRED(mu_) { if (ratio_ == 0) { - return NanosPerElementLocked(); + return SelfProcessingTimeLocked(); } - int64 old_input_time = input_times->back(); - input_times->back() += static_cast( - static_cast(old_input_time + NanosPerElementLocked()) / ratio_); + double old_input_time = input_times->back(); + input_times->back() += + (old_input_time + SelfProcessingTimeLocked()) / ratio_; auto cleanup = gtl::MakeCleanup([input_times, old_input_time]() { input_times->back() = old_input_time; }); - return NanosPerElementLocked() + ratio_ * OutputTimeForInputs(input_times); + return SelfProcessingTimeLocked() + + ratio_ * OutputTimeForInputs(input_times); } - int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { - return NanosPerElementLocked() + ratio_ * ProcessingTimeForInputs(); + // The processing time is the sum of the self processing time and the product + // of `ratio_` and the sum of processing times of inputs. + double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { + return SelfProcessingTimeLocked() + ratio_ * ProcessingTimeForInputs(); } private: @@ -221,31 +238,35 @@ class AsyncKnownRatio : public Node { Args{id_, name_, std::move(output)}, ratio_, parameters); } - int64 OutputTimeLocked(std::vector* input_times) const override + // The output time is estimated using `ComputeWaitTime(output_time, + // input_time, parallelism)`, where `output_time` is the sum of the self + // processing time and the product of `ratio_` and the sum of output times of + // inputs, `input_time` is specified through `input_times` and `buffer_size` + // is derived from parallelism. + double OutputTimeLocked(std::vector* input_times) const override SHARED_LOCKS_REQUIRED(mu_) { double parallelism = 1.0; if (auto* parameter = gtl::FindOrNull(parameters_, "parallelism")) { parallelism = (*parameter)->value; } if (ratio_ == 0.0) { - int64 output_time = - static_cast(NanosPerElementLocked()) / parallelism; + double output_time = SelfProcessingTimeLocked() / parallelism; return ComputeWaitTime(output_time, input_times->back(), parallelism); } - int64 old_input_time = input_times->back(); - int64 new_input_time = static_cast( - static_cast(NanosPerElementLocked()) / ratio_ / parallelism); + double old_input_time = input_times->back(); + double new_input_time = SelfProcessingTimeLocked() / ratio_ / parallelism; input_times->push_back(new_input_time); auto cleanup = gtl::MakeCleanup([input_times]() { input_times->pop_back(); }); - int64 output_time = static_cast( - static_cast(NanosPerElementLocked()) / parallelism + - ratio_ * OutputTimeForInputs(input_times)); + double output_time = SelfProcessingTimeLocked() / parallelism + + ratio_ * OutputTimeForInputs(input_times); return ComputeWaitTime(output_time, old_input_time, parallelism); } - int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { - return NanosPerElementLocked() + ratio_ * ProcessingTimeForInputs(); + // The processing time is the sum of the self processing time and the product + // of `ratio_` and the sum of processing times of inputs. + double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { + return SelfProcessingTimeLocked() + ratio_ * ProcessingTimeForInputs(); } private: @@ -264,40 +285,40 @@ class UnknownRatio : public Node { return std::make_shared(Args{id_, name_, std::move(output)}); } - int64 OutputTimeLocked(std::vector* input_times) const override + // The output time is the sum of the self processing time and the product of + // the ratio estimate and the sum of output times of inputs. + double OutputTimeLocked(std::vector* input_times) const override SHARED_LOCKS_REQUIRED(mu_) { if (num_elements_ == 0 || inputs_.empty() || inputs_.front()->num_elements() == 0) { - return NanosPerElementLocked(); + return SelfProcessingTimeLocked(); } // TODO(jsimsa): The current implementation assumes that the number of input // elements consumed per output is the same across all inputs. std::shared_ptr input = inputs_.front(); double ratio = static_cast(input->num_elements()) / static_cast(num_elements_); - int64 old_input_time = input_times->back(); - input_times->back() = - static_cast(old_input_time + NanosPerElementLocked()) / ratio; + double old_input_time = input_times->back(); + input_times->back() = (old_input_time + SelfProcessingTimeLocked()) / ratio; auto cleanup = gtl::MakeCleanup([input_times, old_input_time]() { input_times->back() = old_input_time; }); - return NanosPerElementLocked() + - static_cast( - ratio * static_cast(OutputTimeForInputs(input_times))); + return SelfProcessingTimeLocked() + + ratio * OutputTimeForInputs(input_times); } - int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { + // The processing time is the sum of the self processing time and the product + // of the ratio estimate and the sum of processing times of inputs. + double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { if (inputs_.empty() || num_elements_ == 0) { - return NanosPerElementLocked(); + return SelfProcessingTimeLocked(); } - // TODO(jsimsa): The current implementation that the number of input + // TODO(jsimsa): The current implementation assumes that the number of input // elements consumed per output is the same across all inputs. std::shared_ptr input = inputs_.front(); double ratio = static_cast(input->num_elements()) / static_cast(num_elements_); - return NanosPerElementLocked() + - static_cast(ratio * - static_cast(ProcessingTimeForInputs())); + return SelfProcessingTimeLocked() + ratio * ProcessingTimeForInputs(); } }; @@ -313,12 +334,14 @@ class Unknown : public Node { return std::make_shared(Args{id_, name_, std::move(output)}); } - int64 OutputTimeLocked(std::vector* input_times) const override + // The output time is the sum of output times of inputs. + double OutputTimeLocked(std::vector* input_times) const override SHARED_LOCKS_REQUIRED(mu_) { return OutputTimeForInputs(input_times); } - int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { + // The processing time is the sum of processing times of inputs. + double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { return ProcessingTimeForInputs(); } }; @@ -415,7 +438,7 @@ void Model::Optimize(int64 cpu_budget) { snapshot = output_->Snapshot(nullptr); } VLOG(2) << "Starting optimization of tunable parameters"; - const int64 processing_time = ProcessingTime(snapshot); + const int64 processing_time = TotalProcessingTime(snapshot); auto parameters = CollectTunableParameters(snapshot); for (auto& pair : parameters) { pair.second->value = 1; @@ -441,13 +464,6 @@ void Model::Optimize(int64 cpu_budget) { pair.second->value++; int64 new_output_time = OutputTime(snapshot); int64 delta = output_time - new_output_time; - if (delta < 0) { - VLOG(3) << "Increasing the parallelism of tunable parameter " - << pair.first << " resulted in slowdown (before=" << output_time - << ", after=" << new_output_time - << "). This should never happen because the latency " - "should be monotonic w.r.t. to parallelism."; - } if (delta > best_delta) { best_delta = delta; best_parameter = pair.second.get(); @@ -455,11 +471,10 @@ void Model::Optimize(int64 cpu_budget) { pair.second->value--; } if (!best_parameter) { - // This should never happen because we are using a model snapshot and - // the output time is monotonically decreasing w.r.t. parallelism. LOG(WARNING) << "Failed to find a tunable parameter that would " - "decrease the output time, aborting the current " - "optimization attempt."; + "decrease the output time. This means that the " + "autotuning optimization got stuck in a local maximum. " + "The optimization attempt will be aborted."; return; } best_parameter->value++; @@ -537,12 +552,18 @@ std::map> Model::CollectTunableParameters( } int64 Model::OutputTime(std::shared_ptr node) { - std::vector input_times(1, 0); + std::vector input_times(1, 0); + // TODO(jsimsa): Now that we are accounting for buffer size in wait time + // computation, assuming that the input is infinitely fast will result in + // inaccurate estimates of the output latency. + // + // We should compute the output latency as a fix-point of the following + // equation: `output_time = node(OutputTime(input_times(1, output_time))`. return node->OutputTime(&input_times); } -int64 Model::ProcessingTime(std::shared_ptr node) { - return node->ProcessingTime(); +int64 Model::TotalProcessingTime(std::shared_ptr node) { + return node->TotalProcessingTime(); } } // namespace model diff --git a/tensorflow/core/framework/model.h b/tensorflow/core/framework/model.h index 2fba51c8393..c8a9a499eaf 100644 --- a/tensorflow/core/framework/model.h +++ b/tensorflow/core/framework/model.h @@ -138,6 +138,12 @@ class Node { processing_time_ += delta; } + // Returns an indication whether autotuning is enabled for this node. + bool autotune() const LOCKS_EXCLUDED(mu_) { + tf_shared_lock l(mu_); + return autotune_; + } + // Returns the number of bytes stored in this node's buffer. int64 buffered_bytes() const LOCKS_EXCLUDED(mu_) { tf_shared_lock l(mu_); @@ -215,11 +221,20 @@ class Node { inputs_.remove(input); } + // Sets the value that determines whether autotuning is enabled for this node. + void set_autotune(bool autotune) LOCKS_EXCLUDED(mu_) { + mutex_lock l(mu_); + autotune_ = autotune; + } + // Collects tunable parameters in the subtree rooted in this node. void CollectTunableParameters( std::map>* parameters) const LOCKS_EXCLUDED(mu_) { tf_shared_lock l(mu_); + if (!autotune_) { + return; + } for (auto& pair : parameters_) { if (pair.second->state->tunable) { parameters->insert(std::make_pair(long_name(), pair.second)); @@ -230,17 +245,31 @@ class Node { } } - // Returns the per-element output time for this node. - int64 OutputTime(std::vector* input_times) const LOCKS_EXCLUDED(mu_) { + // Returns a human-readable representation of this node. + string DebugString() const LOCKS_EXCLUDED(mu_) { tf_shared_lock l(mu_); - return OutputTimeLocked(input_times); + string result; + strings::StrAppend(&result, long_name(), ":\n"); + strings::StrAppend(&result, " autotune=", autotune_, "\n"); + strings::StrAppend(&result, " buffered_bytes=", buffered_bytes_, "\n"); + strings::StrAppend(&result, " processing_time=", processing_time_, "\n"); + strings::StrAppend(&result, " num_elements=", num_elements_, "\n"); + string inputs; + for (auto& input : inputs_) { + strings::StrAppend(&inputs, input->long_name(), ","); + } + strings::StrAppend(&result, " inputs={", inputs, "}\n"); + for (auto& input : inputs_) { + strings::StrAppend(&result, input->DebugString()); + } + return result; } - // Returns the per-element processing time spent in the subtree rooted in - // this node. - int64 ProcessingTime() const LOCKS_EXCLUDED(mu_) { + // Returns the per-element output time for this node. + double OutputTime(std::vector* input_times) const + LOCKS_EXCLUDED(mu_) { tf_shared_lock l(mu_); - return ProcessingTimeLocked(); + return OutputTimeLocked(input_times); } // Returns a copy of this node, making a deep copy of its inputs and a @@ -254,6 +283,7 @@ class Node { std::shared_ptr result = Clone(output); { mutex_lock l2(result->mu_); + result->autotune_ = autotune_; result->buffered_bytes_ = buffered_bytes_; result->processing_time_ = processing_time_; result->num_elements_ = num_elements_; @@ -265,57 +295,90 @@ class Node { return result; } + // Returns the per-element CPU time spent in the subtree rooted in this node. + double TotalProcessingTime() const LOCKS_EXCLUDED(mu_) { + tf_shared_lock l(mu_); + return TotalProcessingTimeLocked(); + } + protected: + // Returns the number of inputs. + int64 num_inputs() const SHARED_LOCKS_REQUIRED(mu_) { + int64 num_inputs = 0; + for (auto& input : inputs_) { + // Inputs for which autotuning is disabled are excluded. + if (input->autotune()) { + ++num_inputs; + } + } + return num_inputs; + } + // Creates a clone of this node. virtual std::shared_ptr Clone(std::shared_ptr output) const SHARED_LOCKS_REQUIRED(mu_) = 0; - // Returns the per-element processing time spent in this node. - int64 NanosPerElementLocked() const SHARED_LOCKS_REQUIRED(mu_) { - if (num_elements_ == 0) { - return 0; - } - return static_cast(static_cast(processing_time_) / - static_cast(num_elements_)); - } - // Returns the sum of per-element output time for the inputs of this node. - int64 OutputTimeForInputs(std::vector* input_times) const + double OutputTimeForInputs(std::vector* input_times) const SHARED_LOCKS_REQUIRED(mu_) { - int64 sum = 0; + double sum = 0; for (auto& input : inputs_) { - sum += input->OutputTime(input_times); + // Inputs for which autotuning is disabled are excluded. + if (input->autotune()) { + sum += input->OutputTime(input_times); + } } return sum; } // Returns the per-element output time for this node. - virtual int64 OutputTimeLocked(std::vector* input_times) const + virtual double OutputTimeLocked(std::vector* input_times) const SHARED_LOCKS_REQUIRED(mu_) = 0; // Returns the sum of per-element processing time for the inputs of this node. // // TODO(jsimsa): use processing time history as a prior for future inputs - int64 ProcessingTimeForInputs() const SHARED_LOCKS_REQUIRED(mu_) { + double ProcessingTimeForInputs() const SHARED_LOCKS_REQUIRED(mu_) { int64 sum = 0; for (auto& input : inputs_) { - sum += input->ProcessingTime(); + // Inputs for which autotuning is disabled are excluded. + if (input->autotune()) { + sum += input->SelfProcessingTimeLocked(); + } } return sum; } - // Returns the per-element processing time spent in the subtree rooted in - // this node. - virtual int64 ProcessingTimeLocked() const SHARED_LOCKS_REQUIRED(mu_) = 0; + // Returns the per-element processing time spent in this node. + double SelfProcessingTimeLocked() const SHARED_LOCKS_REQUIRED(mu_) { + if (num_elements_ == 0) { + return 0; + } + return static_cast(processing_time_) / + static_cast(num_elements_); + } + + // Returns the per-element CPU time spent in the subtree rooted in this node. + virtual double TotalProcessingTimeLocked() const + SHARED_LOCKS_REQUIRED(mu_) = 0; mutable mutex mu_; const int64 id_; const string name_; + + // Indicates whether the subtree rooted in this node should be included in + // autotuning. In particular, if this is `false`, then the subtree is excluded + // from computation of output time and processing time. + bool autotune_ GUARDED_BY(mu_) = true; int64 buffered_bytes_ GUARDED_BY(mu_) = 0; int64 processing_time_ GUARDED_BY(mu_) = 0; int64 num_elements_ GUARDED_BY(mu_) = 0; std::map work_start_ GUARDED_BY(mu_); std::map> parameters_ GUARDED_BY(mu_); + + // Inputs of this node. These can represent an iterator created from the input + // dataset but also other input iterators (e.g. created by the user-defined + // functions of `flat_map` or `interleave`). std::list> inputs_ GUARDED_BY(mu_); // The reference to the output node is not owned so that deletion of a @@ -421,7 +484,7 @@ class Model { int64 OutputTime(std::shared_ptr node); // Collects the processing time for the given node. - int64 ProcessingTime(std::shared_ptr node); + int64 TotalProcessingTime(std::shared_ptr node); // Used for coordination between different input pipeline threads. Exclusive // access is required only when adding or removing nodes. Concurrent access to diff --git a/tensorflow/core/framework/model_test.cc b/tensorflow/core/framework/model_test.cc index 1d7f407e180..52e6339ff9e 100644 --- a/tensorflow/core/framework/model_test.cc +++ b/tensorflow/core/framework/model_test.cc @@ -25,11 +25,11 @@ namespace model { namespace { class AsyncInterleaveManyTest - : public ::testing::TestWithParam> {}; + : public ::testing::TestWithParam> {}; TEST_P(AsyncInterleaveManyTest, Model) { const int64 parallelism = std::get<0>(GetParam()); - const int64 input_time = std::get<1>(GetParam()); + const double input_time = std::get<1>(GetParam()); std::shared_ptr async_interleave_many = model::MakeAsyncInterleaveManyNode( {0, "async_interleave_many", nullptr}, @@ -55,29 +55,29 @@ TEST_P(AsyncInterleaveManyTest, Model) { auto cleanup2 = gtl::MakeCleanup([async_interleave_many, source2]() { async_interleave_many->remove_input(source2); }); - std::vector input_times(1, input_time); + std::vector input_times(1, input_time); async_interleave_many->add_processing_time(100); EXPECT_EQ(async_interleave_many->processing_time(), 100); - EXPECT_EQ(async_interleave_many->ProcessingTime(), 0); + EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 0); EXPECT_EQ(async_interleave_many->OutputTime(&input_times), 0); async_interleave_many->record_element(); EXPECT_EQ(async_interleave_many->num_elements(), 1); - EXPECT_EQ(async_interleave_many->ProcessingTime(), 100); + EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 100); EXPECT_LE(async_interleave_many->OutputTime(&input_times), 100); EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0); source1->add_processing_time(200); source2->add_processing_time(300); - EXPECT_EQ(async_interleave_many->ProcessingTime(), 100); + EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 100); EXPECT_LE(async_interleave_many->OutputTime(&input_times), 100); EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0); source1->record_element(); source2->record_element(); - EXPECT_EQ(async_interleave_many->ProcessingTime(), 100 + 250); + EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 100 + 250); EXPECT_LE(async_interleave_many->OutputTime(&input_times), 100 + 250 / parallelism); EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0); async_interleave_many->record_element(); - EXPECT_EQ(async_interleave_many->ProcessingTime(), 50 + 250); + EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 50 + 250); EXPECT_LE(async_interleave_many->OutputTime(&input_times), 50 + 250 / parallelism); EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0); @@ -89,11 +89,11 @@ INSTANTIATE_TEST_SUITE_P(Test, AsyncInterleaveManyTest, 200))); class AsyncKnownRatioTest - : public ::testing::TestWithParam> {}; + : public ::testing::TestWithParam> {}; TEST_P(AsyncKnownRatioTest, Model) { const int64 parallelism = std::get<0>(GetParam()); - const int64 input_time = std::get<1>(GetParam()); + const double input_time = std::get<1>(GetParam()); const int64 num_inputs_per_output = std::get<2>(GetParam()); std::shared_ptr async_known_many = model::MakeAsyncKnownRatioNode( {0, "async_known_many", nullptr}, num_inputs_per_output, @@ -107,50 +107,51 @@ TEST_P(AsyncKnownRatioTest, Model) { std::shared_ptr source2 = model::MakeSourceNode({2, "source2", async_known_many}); async_known_many->add_input(source2); - std::vector input_times(1, input_time); + std::vector input_times(1, input_time); source1->add_processing_time(100); - EXPECT_EQ(async_known_many->ProcessingTime(), 0); + EXPECT_EQ(async_known_many->TotalProcessingTime(), 0); EXPECT_EQ(async_known_many->OutputTime(&input_times), 0); source2->add_processing_time(200); - EXPECT_EQ(async_known_many->ProcessingTime(), 0); + EXPECT_EQ(async_known_many->TotalProcessingTime(), 0); EXPECT_EQ(async_known_many->OutputTime(&input_times), 0); source1->record_element(); - EXPECT_EQ(async_known_many->ProcessingTime(), num_inputs_per_output * 100); + EXPECT_EQ(async_known_many->TotalProcessingTime(), + num_inputs_per_output * 100); EXPECT_LE(async_known_many->OutputTime(&input_times), num_inputs_per_output * 100); EXPECT_GE(async_known_many->OutputTime(&input_times), 0); source2->record_element(); - EXPECT_EQ(async_known_many->ProcessingTime(), + EXPECT_EQ(async_known_many->TotalProcessingTime(), num_inputs_per_output * (100 + 200)); EXPECT_LE(async_known_many->OutputTime(&input_times), num_inputs_per_output * (100 + 200)); EXPECT_GE(async_known_many->OutputTime(&input_times), 0); source1->record_element(); - EXPECT_EQ(async_known_many->ProcessingTime(), + EXPECT_EQ(async_known_many->TotalProcessingTime(), num_inputs_per_output * (50 + 200)); EXPECT_LE(async_known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 200)); EXPECT_GE(async_known_many->OutputTime(&input_times), 0); source2->record_element(); - EXPECT_EQ(async_known_many->ProcessingTime(), + EXPECT_EQ(async_known_many->TotalProcessingTime(), num_inputs_per_output * (50 + 100)); EXPECT_LE(async_known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100)); EXPECT_GE(async_known_many->OutputTime(&input_times), 0); async_known_many->add_processing_time(128); - EXPECT_EQ(async_known_many->ProcessingTime(), + EXPECT_EQ(async_known_many->TotalProcessingTime(), num_inputs_per_output * (50 + 100)); EXPECT_LE(async_known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100)); EXPECT_GE(async_known_many->OutputTime(&input_times), 0); async_known_many->record_element(); - EXPECT_EQ(async_known_many->ProcessingTime(), + EXPECT_EQ(async_known_many->TotalProcessingTime(), num_inputs_per_output * (50 + 100) + 128); EXPECT_LE(async_known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100) + 128 / parallelism); EXPECT_GE(async_known_many->OutputTime(&input_times), 0); async_known_many->record_element(); - EXPECT_EQ(async_known_many->ProcessingTime(), + EXPECT_EQ(async_known_many->TotalProcessingTime(), num_inputs_per_output * (50 + 100) + 64); EXPECT_LE(async_known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100) + 64 / parallelism); @@ -174,25 +175,25 @@ TEST(InterleaveManyTest, Model) { std::shared_ptr source2 = model::MakeSourceNode({2, "source2", interleave_many}); interleave_many->add_input(source2); - std::vector input_times(1, 0); + std::vector input_times(1, 0); interleave_many->add_processing_time(100); EXPECT_EQ(interleave_many->processing_time(), 100); - EXPECT_EQ(interleave_many->ProcessingTime(), 0); + EXPECT_EQ(interleave_many->TotalProcessingTime(), 0); EXPECT_EQ(interleave_many->OutputTime(&input_times), 0); interleave_many->record_element(); EXPECT_EQ(interleave_many->num_elements(), 1); - EXPECT_EQ(interleave_many->ProcessingTime(), 100); + EXPECT_EQ(interleave_many->TotalProcessingTime(), 100); EXPECT_EQ(interleave_many->OutputTime(&input_times), 100); source1->add_processing_time(200); source2->add_processing_time(300); - EXPECT_EQ(interleave_many->ProcessingTime(), 100); + EXPECT_EQ(interleave_many->TotalProcessingTime(), 100); EXPECT_EQ(interleave_many->OutputTime(&input_times), 100); source1->record_element(); source2->record_element(); - EXPECT_EQ(interleave_many->ProcessingTime(), 350); + EXPECT_EQ(interleave_many->TotalProcessingTime(), 350); EXPECT_EQ(interleave_many->OutputTime(&input_times), 350); interleave_many->record_element(); - EXPECT_EQ(interleave_many->ProcessingTime(), 300); + EXPECT_EQ(interleave_many->TotalProcessingTime(), 300); EXPECT_EQ(interleave_many->OutputTime(&input_times), 300); } @@ -208,39 +209,43 @@ TEST_P(KnownRatioTest, Model) { std::shared_ptr source2 = model::MakeSourceNode({2, "source2", known_many}); known_many->add_input(source2); - std::vector input_times(1, 0); + std::vector input_times(1, 0); source1->add_processing_time(100); - EXPECT_EQ(known_many->ProcessingTime(), 0); + EXPECT_EQ(known_many->TotalProcessingTime(), 0); EXPECT_EQ(known_many->OutputTime(&input_times), 0); source2->add_processing_time(200); - EXPECT_EQ(known_many->ProcessingTime(), 0); + EXPECT_EQ(known_many->TotalProcessingTime(), 0); EXPECT_EQ(known_many->OutputTime(&input_times), 0); source1->record_element(); - EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * 100); + EXPECT_EQ(known_many->TotalProcessingTime(), num_inputs_per_output * 100); EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * 100); source2->record_element(); - EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (100 + 200)); + EXPECT_EQ(known_many->TotalProcessingTime(), + num_inputs_per_output * (100 + 200)); EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * (100 + 200)); source1->record_element(); - EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (50 + 200)); + EXPECT_EQ(known_many->TotalProcessingTime(), + num_inputs_per_output * (50 + 200)); EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 200)); source2->record_element(); - EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (50 + 100)); + EXPECT_EQ(known_many->TotalProcessingTime(), + num_inputs_per_output * (50 + 100)); EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100)); known_many->add_processing_time(128); - EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (50 + 100)); + EXPECT_EQ(known_many->TotalProcessingTime(), + num_inputs_per_output * (50 + 100)); EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100)); known_many->record_element(); - EXPECT_EQ(known_many->ProcessingTime(), + EXPECT_EQ(known_many->TotalProcessingTime(), num_inputs_per_output * (50 + 100) + 128); EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100) + 128); known_many->record_element(); - EXPECT_EQ(known_many->ProcessingTime(), + EXPECT_EQ(known_many->TotalProcessingTime(), num_inputs_per_output * (50 + 100) + 64); EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * (50 + 100) + 64); @@ -250,18 +255,18 @@ INSTANTIATE_TEST_SUITE_P(Test, KnownRatioTest, ::testing::Values(0, 1, 2, 4)); TEST(SourceTest, Model) { std::shared_ptr source = model::MakeSourceNode({0, "source", nullptr}); - std::vector input_times(1, 0); + std::vector input_times(1, 0); source->add_processing_time(100); EXPECT_EQ(source->processing_time(), 100); - EXPECT_EQ(source->ProcessingTime(), 0); + EXPECT_EQ(source->TotalProcessingTime(), 0); EXPECT_EQ(source->OutputTime(&input_times), 0); source->record_element(); EXPECT_EQ(source->num_elements(), 1); - EXPECT_EQ(source->ProcessingTime(), 100); + EXPECT_EQ(source->TotalProcessingTime(), 100); EXPECT_EQ(source->OutputTime(&input_times), 100); source->record_element(); EXPECT_EQ(source->num_elements(), 2); - EXPECT_EQ(source->ProcessingTime(), 50); + EXPECT_EQ(source->TotalProcessingTime(), 50); EXPECT_EQ(source->OutputTime(&input_times), 50); } @@ -274,25 +279,25 @@ TEST(UnknownRatioTest, Model) { std::shared_ptr source2 = model::MakeSourceNode({2, "source2", unknown_many}); unknown_many->add_input(source2); - std::vector input_times(1, 0); + std::vector input_times(1, 0); unknown_many->add_processing_time(100); EXPECT_EQ(unknown_many->processing_time(), 100); - EXPECT_EQ(unknown_many->ProcessingTime(), 0); + EXPECT_EQ(unknown_many->TotalProcessingTime(), 0); EXPECT_EQ(unknown_many->OutputTime(&input_times), 0); unknown_many->record_element(); EXPECT_EQ(unknown_many->num_elements(), 1); - EXPECT_EQ(unknown_many->ProcessingTime(), 100); + EXPECT_EQ(unknown_many->TotalProcessingTime(), 100); EXPECT_EQ(unknown_many->OutputTime(&input_times), 100); source1->add_processing_time(100); source2->add_processing_time(200); - EXPECT_EQ(unknown_many->ProcessingTime(), 100); + EXPECT_EQ(unknown_many->TotalProcessingTime(), 100); EXPECT_EQ(unknown_many->OutputTime(&input_times), 100); source1->record_element(); source2->record_element(); - EXPECT_EQ(unknown_many->ProcessingTime(), 400); + EXPECT_EQ(unknown_many->TotalProcessingTime(), 400); EXPECT_EQ(unknown_many->OutputTime(&input_times), 400); unknown_many->record_element(); - EXPECT_EQ(unknown_many->ProcessingTime(), 200); + EXPECT_EQ(unknown_many->TotalProcessingTime(), 200); EXPECT_EQ(unknown_many->OutputTime(&input_times), 200); } @@ -305,36 +310,36 @@ TEST(UnknownTest, Model) { std::shared_ptr source2 = model::MakeSourceNode({2, "source2", unknown}); unknown->add_input(source2); - std::vector input_times(1, 0); + std::vector input_times(1, 0); source1->add_processing_time(100); - EXPECT_EQ(unknown->ProcessingTime(), 0); + EXPECT_EQ(unknown->TotalProcessingTime(), 0); EXPECT_EQ(unknown->OutputTime(&input_times), 0); source2->add_processing_time(100); - EXPECT_EQ(unknown->ProcessingTime(), 0); + EXPECT_EQ(unknown->TotalProcessingTime(), 0); EXPECT_EQ(unknown->OutputTime(&input_times), 0); source1->record_element(); - EXPECT_EQ(unknown->ProcessingTime(), 100); + EXPECT_EQ(unknown->TotalProcessingTime(), 100); EXPECT_EQ(unknown->OutputTime(&input_times), 100); source2->record_element(); - EXPECT_EQ(unknown->ProcessingTime(), 200); + EXPECT_EQ(unknown->TotalProcessingTime(), 200); EXPECT_EQ(unknown->OutputTime(&input_times), 200); source1->record_element(); - EXPECT_EQ(unknown->ProcessingTime(), 150); + EXPECT_EQ(unknown->TotalProcessingTime(), 150); EXPECT_EQ(unknown->OutputTime(&input_times), 150); source2->record_element(); - EXPECT_EQ(unknown->ProcessingTime(), 100); + EXPECT_EQ(unknown->TotalProcessingTime(), 100); EXPECT_EQ(unknown->OutputTime(&input_times), 100); - // Unknown node processing time should not affect its ProcessingTime() or + // Unknown node processing time should not affect its TotalProcessingTime() or // OutputTime(). unknown->add_processing_time(100); EXPECT_EQ(unknown->processing_time(), 100); - EXPECT_EQ(unknown->ProcessingTime(), 100); + EXPECT_EQ(unknown->TotalProcessingTime(), 100); EXPECT_EQ(unknown->OutputTime(&input_times), 100); - // Unknown node number of elements should not affect its ProcessingTime() or - // OutputTime(). + // Unknown node number of elements should not affect its TotalProcessingTime() + // or OutputTime(). unknown->record_element(); EXPECT_EQ(unknown->num_elements(), 1); - EXPECT_EQ(unknown->ProcessingTime(), 100); + EXPECT_EQ(unknown->TotalProcessingTime(), 100); EXPECT_EQ(unknown->OutputTime(&input_times), 100); } @@ -350,12 +355,12 @@ class TestNode : public model::Node { return nullptr; } - int64 OutputTimeLocked(std::vector* input_times) const override + double OutputTimeLocked(std::vector* input_times) const override SHARED_LOCKS_REQUIRED(mu_) { return 0; } - int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { + double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) { return 0; } }; diff --git a/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc b/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc index b702353439b..515c701e42c 100644 --- a/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc +++ b/tensorflow/core/kernels/data/parallel_interleave_dataset_op.cc @@ -252,7 +252,7 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel { return model::MakeAsyncInterleaveManyNode( std::move(args), {model::MakeParameter("parallelism", num_parallel_calls_, /*min=*/1, - /*max=*/port::NumSchedulableCPUs())}); + /*max=*/dataset()->cycle_length_)}); } Status SaveInternal(IteratorStateWriter* writer) override { @@ -462,6 +462,10 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel { if (!future_elements_.empty()) { current_elements_[idx] = std::move(future_elements_.back()); future_elements_.pop_back(); + if (current_elements_[idx]->iterator) { + EnableAutotune(ctx.get(), + current_elements_[idx]->iterator.get()); + } } else { current_elements_[idx] = MakeElement(ctx); if (!current_elements_[idx]) { @@ -480,9 +484,21 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel { if (num_results > 0) { num_calls_++; element->in_use = true; - thread_pool_->Schedule( - std::bind(&ParallelInterleaveIterator::FetchResults, this, - ctx, std::move(element), num_results)); + thread_pool_->Schedule(std::bind( + &ParallelInterleaveIterator::FetchResults, this, ctx, + std::move(element), num_results, + [this, ctx]() EXCLUSIVE_LOCKS_REQUIRED(*mu_) { + --num_calls_; + const auto& stats_aggregator = ctx->stats_aggregator(); + if (stats_aggregator) { + stats_aggregator->AddScalar( + stats_utils::ThreadUtilizationScalarName( + dataset()->node_name()), + static_cast(num_calls_) / + static_cast(num_parallel_calls_->value), + num_elements()); + } + })); } } } @@ -518,7 +534,8 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel { // Fetches up to `dataset()->block_length_` results from `element`. void FetchResults(const std::shared_ptr& ctx, const std::shared_ptr& element, - int64 num_results) LOCKS_EXCLUDED(*mu_) { + int64 num_results, std::function done) + LOCKS_EXCLUDED(*mu_) { RecordStart(ctx.get()); auto cleanup = gtl::MakeCleanup([this, ctx] { RecordStop(ctx.get()); }); bool end_of_input = false; @@ -546,15 +563,7 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel { element->inputs.clear(); --num_open_; } - --num_calls_; - const auto& stats_aggregator = ctx->stats_aggregator(); - if (stats_aggregator) { - stats_aggregator->AddScalar( - stats_utils::ThreadUtilizationScalarName(dataset()->node_name()), - static_cast(num_calls_) / - static_cast(num_parallel_calls_->value), - num_elements()); - } + done(); cond_var_->notify_all(); } @@ -566,9 +575,8 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel { RecordStart(ctx.get()); auto cleanup = gtl::MakeCleanup([this, ctx] { RecordStop(ctx.get()); }); auto busy = [this]() EXCLUSIVE_LOCKS_REQUIRED(*mu_) -> bool { - // TODO(jsimsa): Autotune the buffer size. - return num_calls_ >= num_parallel_calls_->value || - future_elements_.size() >= 2 * dataset()->cycle_length_; + // TODO(jsimsa): Autotune the number of iterators to prefetch. + return future_elements_.size() >= 2 * dataset()->cycle_length_; }; while (true) { mutex_lock l(*mu_); @@ -595,20 +603,11 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel { if (!element->iterator) { continue; } - ++num_calls_; + DisableAutotune(ctx.get(), element->iterator.get()); element->in_use = true; thread_pool_->Schedule( std::bind(&ParallelInterleaveIterator::FetchResults, this, ctx, - std::move(element), dataset()->block_length_)); - } - const auto& stats_aggregator = ctx->stats_aggregator(); - if (stats_aggregator) { - stats_aggregator->AddScalar( - stats_utils::ThreadUtilizationScalarName( - dataset()->node_name()), - static_cast(num_calls_) / - static_cast(num_parallel_calls_->value), - num_elements()); + std::move(element), dataset()->block_length_, [] {})); } cond_var_->notify_all(); } diff --git a/tensorflow/python/data/experimental/benchmarks/BUILD b/tensorflow/python/data/experimental/benchmarks/BUILD index d6b7d216228..42381a3468a 100644 --- a/tensorflow/python/data/experimental/benchmarks/BUILD +++ b/tensorflow/python/data/experimental/benchmarks/BUILD @@ -15,7 +15,6 @@ py_test( "//tensorflow/python:client_testlib", "//tensorflow/python:math_ops", "//tensorflow/python:session", - "//tensorflow/python/data/experimental/ops:batching", "//tensorflow/python/data/experimental/ops:optimization", "//tensorflow/python/data/ops:dataset_ops", "//third_party/py/numpy", diff --git a/tensorflow/python/data/experimental/benchmarks/autotune_benchmark.py b/tensorflow/python/data/experimental/benchmarks/autotune_benchmark.py index 4d9e6258182..e0e6f88cd04 100644 --- a/tensorflow/python/data/experimental/benchmarks/autotune_benchmark.py +++ b/tensorflow/python/data/experimental/benchmarks/autotune_benchmark.py @@ -22,7 +22,6 @@ import time import numpy as np from tensorflow.python.client import session -from tensorflow.python.data.experimental.ops import batching from tensorflow.python.data.experimental.ops import optimization from tensorflow.python.data.ops import dataset_ops from tensorflow.python.ops import math_ops @@ -78,13 +77,12 @@ class AutotuneBenchmark(test.Benchmark): dataset = dataset_ops.Dataset.from_tensors((np.random.rand(1, 4 * k), np.random.rand(4 * k, 1))).repeat() - dataset = dataset.apply( - batching.map_and_batch( - math_ops.matmul, - num_parallel_calls=optimization.AUTOTUNE, - batch_size=batch_size)) + dataset = dataset.map( + math_ops.matmul, num_parallel_calls=optimization.AUTOTUNE) + dataset = dataset.batch(batch_size=batch_size) options = dataset_ops.Options() options.experimental_optimization.apply_default_optimizations = False + options.experimental_optimization.map_and_batch_fusion = True options.experimental_optimization.autotune = autotune dataset = dataset.with_options(options) iterator = dataset_ops.make_one_shot_iterator(dataset)