Internal change

PiperOrigin-RevId: 245658213
This commit is contained in:
Jiri Simsa 2019-04-28 13:44:58 -07:00 committed by TensorFlower Gardener
parent 84c5a4551e
commit 932874df5f
7 changed files with 304 additions and 203 deletions

View File

@ -766,6 +766,22 @@ class DatasetBaseIterator : public IteratorBase {
return model::MakeUnknownNode(std::move(args));
}
// When modeling is enabled, this method disables autotuning for the given
// iterator (and the transitive closure of its inputs).
void DisableAutotune(IteratorContext* ctx, IteratorBase* iterator) {
if (iterator->node_) {
iterator->node_->set_autotune(false);
}
}
// When modeling is enabled, this method enables autotuning for the given
// iterator (and the transitive closure of its inputs).
void EnableAutotune(IteratorContext* ctx, IteratorBase* iterator) {
if (iterator->node_) {
iterator->node_->set_autotune(true);
}
}
// When modeling is enabled, this method records the fact that this iterator
// has dequeued an element from an internal buffer.
void RecordBufferDequeue(IteratorContext* ctx,

View File

@ -41,7 +41,8 @@ namespace {
// The formula used for computing the probability is derived by modeling the
// problem as an M/M/1/K queue
// (https://en.wikipedia.org/wiki/Birth%E2%80%93death_process#M/M/1/K_queue).
int64 ComputeWaitTime(int64 output_time, int64 input_time, int64 buffer_size) {
double ComputeWaitTime(double output_time, double input_time,
int64 buffer_size) {
if (output_time == 0 || input_time == 0) {
return output_time;
}
@ -75,34 +76,40 @@ class InterleaveMany : public Node {
Args{id_, name_, std::move(output)});
}
int64 OutputTimeLocked(std::vector<int64>* input_times) const override
// The output time is the sum of the self processing time and the average
// output time of inputs comprising the interleave "cycle".
double OutputTimeLocked(std::vector<double>* input_times) const override
SHARED_LOCKS_REQUIRED(mu_) {
if (inputs_.size() <= 1) {
return NanosPerElementLocked();
return SelfProcessingTimeLocked();
}
int64 delta = NanosPerElementLocked() * (inputs_.size() - 1);
double delta = SelfProcessingTimeLocked() * (inputs_.size() - 1);
input_times->back() += delta;
auto cleanup = gtl::MakeCleanup(
[input_times, delta]() { input_times->back() -= delta; });
int64 output_time =
static_cast<double>(OutputTimeForInputs(input_times) -
double output_time = (OutputTimeForInputs(input_times) -
inputs_.front()->OutputTime(input_times)) /
static_cast<double>(inputs_.size() - 1);
return NanosPerElementLocked() + output_time;
return SelfProcessingTimeLocked() + output_time;
}
int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
// The processing time is the sum of the self processing time and the average
// processing time of inputs comprising the interleave "cycle".
double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
if (inputs_.size() <= 1) {
return NanosPerElementLocked();
return SelfProcessingTimeLocked();
}
int64 processing_time =
static_cast<double>(ProcessingTimeForInputs() -
inputs_.front()->ProcessingTime()) /
double processing_time =
(ProcessingTimeForInputs() - inputs_.front()->TotalProcessingTime()) /
static_cast<double>(inputs_.size() - 1);
return NanosPerElementLocked() + processing_time;
return SelfProcessingTimeLocked() + processing_time;
}
};
// The first input of AsyncInterleaveMany corresponds to the input dataset whose
// elements are used to create the (derived) input datasets whose elements are
// interleaved as output.
//
// TODO(jsimsa): model the first input
class AsyncInterleaveMany : public Node {
public:
@ -127,14 +134,19 @@ class AsyncInterleaveMany : public Node {
Args{id_, name_, std::move(output)}, parameters);
}
int64 OutputTimeLocked(std::vector<int64>* input_times) const override
// The output time is estimated using `ComputeWaitTime(output_time,
// input_time, parallelism)`, where `output_time` is the sum of the
// self-processing time and the average output time of inputs comprising the
// interleave "cycle", `input_time` is specified through `input_times` and
// `buffer_size` is derived from parallelism.
double OutputTimeLocked(std::vector<double>* input_times) const override
SHARED_LOCKS_REQUIRED(mu_) {
if (inputs_.size() <= 1) {
return NanosPerElementLocked();
return SelfProcessingTimeLocked();
}
int64 old_input_time = input_times->back();
int64 new_input_time = static_cast<double>(NanosPerElementLocked()) *
static_cast<double>(inputs_.size() - 1);
double old_input_time = input_times->back();
double new_input_time =
SelfProcessingTimeLocked() * static_cast<double>(inputs_.size() - 1);
input_times->push_back(new_input_time);
auto cleanup =
gtl::MakeCleanup([input_times]() { input_times->pop_back(); });
@ -143,23 +155,23 @@ class AsyncInterleaveMany : public Node {
parallelism = std::min(static_cast<int>(parallelism),
static_cast<int>((*parameter)->value));
}
int64 output_time =
static_cast<double>(OutputTimeForInputs(input_times) -
double output_time = (OutputTimeForInputs(input_times) -
inputs_.front()->OutputTime(input_times)) /
static_cast<double>(inputs_.size() - 1) / parallelism;
return ComputeWaitTime(NanosPerElementLocked() + output_time,
static_cast<double>(num_inputs() - 1) / parallelism;
return ComputeWaitTime(SelfProcessingTimeLocked() + output_time,
old_input_time, parallelism);
}
int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
// The processing time is the sum of the self processing time and the average
// processing time of inputs comprising the interleave "cycle".
double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
if (inputs_.size() <= 1) {
return NanosPerElementLocked();
return SelfProcessingTimeLocked();
}
int64 processing_time =
ProcessingTimeForInputs() - inputs_.front()->ProcessingTime();
return NanosPerElementLocked() +
static_cast<double>(processing_time) /
static_cast<double>(inputs_.size() - 1);
double processing_time =
ProcessingTimeForInputs() - inputs_.front()->TotalProcessingTime();
return SelfProcessingTimeLocked() +
processing_time / static_cast<double>(num_inputs() - 1);
}
};
@ -176,22 +188,27 @@ class KnownRatio : public Node {
ratio_);
}
int64 OutputTimeLocked(std::vector<int64>* input_times) const override
// The output time is the sum of the self processing time and the product of
// `ratio_` and the sum of output times of inputs.
double OutputTimeLocked(std::vector<double>* input_times) const override
SHARED_LOCKS_REQUIRED(mu_) {
if (ratio_ == 0) {
return NanosPerElementLocked();
return SelfProcessingTimeLocked();
}
int64 old_input_time = input_times->back();
input_times->back() += static_cast<int64>(
static_cast<double>(old_input_time + NanosPerElementLocked()) / ratio_);
double old_input_time = input_times->back();
input_times->back() +=
(old_input_time + SelfProcessingTimeLocked()) / ratio_;
auto cleanup = gtl::MakeCleanup([input_times, old_input_time]() {
input_times->back() = old_input_time;
});
return NanosPerElementLocked() + ratio_ * OutputTimeForInputs(input_times);
return SelfProcessingTimeLocked() +
ratio_ * OutputTimeForInputs(input_times);
}
int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
return NanosPerElementLocked() + ratio_ * ProcessingTimeForInputs();
// The processing time is the sum of the self processing time and the product
// of `ratio_` and the sum of processing times of inputs.
double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
return SelfProcessingTimeLocked() + ratio_ * ProcessingTimeForInputs();
}
private:
@ -221,31 +238,35 @@ class AsyncKnownRatio : public Node {
Args{id_, name_, std::move(output)}, ratio_, parameters);
}
int64 OutputTimeLocked(std::vector<int64>* input_times) const override
// The output time is estimated using `ComputeWaitTime(output_time,
// input_time, parallelism)`, where `output_time` is the sum of the self
// processing time and the product of `ratio_` and the sum of output times of
// inputs, `input_time` is specified through `input_times` and `buffer_size`
// is derived from parallelism.
double OutputTimeLocked(std::vector<double>* input_times) const override
SHARED_LOCKS_REQUIRED(mu_) {
double parallelism = 1.0;
if (auto* parameter = gtl::FindOrNull(parameters_, "parallelism")) {
parallelism = (*parameter)->value;
}
if (ratio_ == 0.0) {
int64 output_time =
static_cast<double>(NanosPerElementLocked()) / parallelism;
double output_time = SelfProcessingTimeLocked() / parallelism;
return ComputeWaitTime(output_time, input_times->back(), parallelism);
}
int64 old_input_time = input_times->back();
int64 new_input_time = static_cast<int64>(
static_cast<double>(NanosPerElementLocked()) / ratio_ / parallelism);
double old_input_time = input_times->back();
double new_input_time = SelfProcessingTimeLocked() / ratio_ / parallelism;
input_times->push_back(new_input_time);
auto cleanup =
gtl::MakeCleanup([input_times]() { input_times->pop_back(); });
int64 output_time = static_cast<int64>(
static_cast<double>(NanosPerElementLocked()) / parallelism +
ratio_ * OutputTimeForInputs(input_times));
double output_time = SelfProcessingTimeLocked() / parallelism +
ratio_ * OutputTimeForInputs(input_times);
return ComputeWaitTime(output_time, old_input_time, parallelism);
}
int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
return NanosPerElementLocked() + ratio_ * ProcessingTimeForInputs();
// The processing time is the sum of the self processing time and the product
// of `ratio_` and the sum of processing times of inputs.
double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
return SelfProcessingTimeLocked() + ratio_ * ProcessingTimeForInputs();
}
private:
@ -264,40 +285,40 @@ class UnknownRatio : public Node {
return std::make_shared<UnknownRatio>(Args{id_, name_, std::move(output)});
}
int64 OutputTimeLocked(std::vector<int64>* input_times) const override
// The output time is the sum of the self processing time and the product of
// the ratio estimate and the sum of output times of inputs.
double OutputTimeLocked(std::vector<double>* input_times) const override
SHARED_LOCKS_REQUIRED(mu_) {
if (num_elements_ == 0 || inputs_.empty() ||
inputs_.front()->num_elements() == 0) {
return NanosPerElementLocked();
return SelfProcessingTimeLocked();
}
// TODO(jsimsa): The current implementation assumes that the number of input
// elements consumed per output is the same across all inputs.
std::shared_ptr<Node> input = inputs_.front();
double ratio = static_cast<double>(input->num_elements()) /
static_cast<double>(num_elements_);
int64 old_input_time = input_times->back();
input_times->back() =
static_cast<double>(old_input_time + NanosPerElementLocked()) / ratio;
double old_input_time = input_times->back();
input_times->back() = (old_input_time + SelfProcessingTimeLocked()) / ratio;
auto cleanup = gtl::MakeCleanup([input_times, old_input_time]() {
input_times->back() = old_input_time;
});
return NanosPerElementLocked() +
static_cast<int64>(
ratio * static_cast<double>(OutputTimeForInputs(input_times)));
return SelfProcessingTimeLocked() +
ratio * OutputTimeForInputs(input_times);
}
int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
// The processing time is the sum of the self processing time and the product
// of the ratio estimate and the sum of processing times of inputs.
double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
if (inputs_.empty() || num_elements_ == 0) {
return NanosPerElementLocked();
return SelfProcessingTimeLocked();
}
// TODO(jsimsa): The current implementation that the number of input
// TODO(jsimsa): The current implementation assumes that the number of input
// elements consumed per output is the same across all inputs.
std::shared_ptr<Node> input = inputs_.front();
double ratio = static_cast<double>(input->num_elements()) /
static_cast<double>(num_elements_);
return NanosPerElementLocked() +
static_cast<int64>(ratio *
static_cast<double>(ProcessingTimeForInputs()));
return SelfProcessingTimeLocked() + ratio * ProcessingTimeForInputs();
}
};
@ -313,12 +334,14 @@ class Unknown : public Node {
return std::make_shared<Unknown>(Args{id_, name_, std::move(output)});
}
int64 OutputTimeLocked(std::vector<int64>* input_times) const override
// The output time is the sum of output times of inputs.
double OutputTimeLocked(std::vector<double>* input_times) const override
SHARED_LOCKS_REQUIRED(mu_) {
return OutputTimeForInputs(input_times);
}
int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
// The processing time is the sum of processing times of inputs.
double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
return ProcessingTimeForInputs();
}
};
@ -415,7 +438,7 @@ void Model::Optimize(int64 cpu_budget) {
snapshot = output_->Snapshot(nullptr);
}
VLOG(2) << "Starting optimization of tunable parameters";
const int64 processing_time = ProcessingTime(snapshot);
const int64 processing_time = TotalProcessingTime(snapshot);
auto parameters = CollectTunableParameters(snapshot);
for (auto& pair : parameters) {
pair.second->value = 1;
@ -441,13 +464,6 @@ void Model::Optimize(int64 cpu_budget) {
pair.second->value++;
int64 new_output_time = OutputTime(snapshot);
int64 delta = output_time - new_output_time;
if (delta < 0) {
VLOG(3) << "Increasing the parallelism of tunable parameter "
<< pair.first << " resulted in slowdown (before=" << output_time
<< ", after=" << new_output_time
<< "). This should never happen because the latency "
"should be monotonic w.r.t. to parallelism.";
}
if (delta > best_delta) {
best_delta = delta;
best_parameter = pair.second.get();
@ -455,11 +471,10 @@ void Model::Optimize(int64 cpu_budget) {
pair.second->value--;
}
if (!best_parameter) {
// This should never happen because we are using a model snapshot and
// the output time is monotonically decreasing w.r.t. parallelism.
LOG(WARNING) << "Failed to find a tunable parameter that would "
"decrease the output time, aborting the current "
"optimization attempt.";
"decrease the output time. This means that the "
"autotuning optimization got stuck in a local maximum. "
"The optimization attempt will be aborted.";
return;
}
best_parameter->value++;
@ -537,12 +552,18 @@ std::map<string, std::shared_ptr<Parameter>> Model::CollectTunableParameters(
}
int64 Model::OutputTime(std::shared_ptr<Node> node) {
std::vector<int64> input_times(1, 0);
std::vector<double> input_times(1, 0);
// TODO(jsimsa): Now that we are accounting for buffer size in wait time
// computation, assuming that the input is infinitely fast will result in
// inaccurate estimates of the output latency.
//
// We should compute the output latency as a fix-point of the following
// equation: `output_time = node(OutputTime(input_times(1, output_time))`.
return node->OutputTime(&input_times);
}
int64 Model::ProcessingTime(std::shared_ptr<Node> node) {
return node->ProcessingTime();
int64 Model::TotalProcessingTime(std::shared_ptr<Node> node) {
return node->TotalProcessingTime();
}
} // namespace model

View File

@ -138,6 +138,12 @@ class Node {
processing_time_ += delta;
}
// Returns an indication whether autotuning is enabled for this node.
bool autotune() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return autotune_;
}
// Returns the number of bytes stored in this node's buffer.
int64 buffered_bytes() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
@ -215,11 +221,20 @@ class Node {
inputs_.remove(input);
}
// Sets the value that determines whether autotuning is enabled for this node.
void set_autotune(bool autotune) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
autotune_ = autotune;
}
// Collects tunable parameters in the subtree rooted in this node.
void CollectTunableParameters(
std::map<string, std::shared_ptr<Parameter>>* parameters) const
LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
if (!autotune_) {
return;
}
for (auto& pair : parameters_) {
if (pair.second->state->tunable) {
parameters->insert(std::make_pair(long_name(), pair.second));
@ -230,17 +245,31 @@ class Node {
}
}
// Returns the per-element output time for this node.
int64 OutputTime(std::vector<int64>* input_times) const LOCKS_EXCLUDED(mu_) {
// Returns a human-readable representation of this node.
string DebugString() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return OutputTimeLocked(input_times);
string result;
strings::StrAppend(&result, long_name(), ":\n");
strings::StrAppend(&result, " autotune=", autotune_, "\n");
strings::StrAppend(&result, " buffered_bytes=", buffered_bytes_, "\n");
strings::StrAppend(&result, " processing_time=", processing_time_, "\n");
strings::StrAppend(&result, " num_elements=", num_elements_, "\n");
string inputs;
for (auto& input : inputs_) {
strings::StrAppend(&inputs, input->long_name(), ",");
}
strings::StrAppend(&result, " inputs={", inputs, "}\n");
for (auto& input : inputs_) {
strings::StrAppend(&result, input->DebugString());
}
return result;
}
// Returns the per-element processing time spent in the subtree rooted in
// this node.
int64 ProcessingTime() const LOCKS_EXCLUDED(mu_) {
// Returns the per-element output time for this node.
double OutputTime(std::vector<double>* input_times) const
LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return ProcessingTimeLocked();
return OutputTimeLocked(input_times);
}
// Returns a copy of this node, making a deep copy of its inputs and a
@ -254,6 +283,7 @@ class Node {
std::shared_ptr<Node> result = Clone(output);
{
mutex_lock l2(result->mu_);
result->autotune_ = autotune_;
result->buffered_bytes_ = buffered_bytes_;
result->processing_time_ = processing_time_;
result->num_elements_ = num_elements_;
@ -265,57 +295,90 @@ class Node {
return result;
}
// Returns the per-element CPU time spent in the subtree rooted in this node.
double TotalProcessingTime() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return TotalProcessingTimeLocked();
}
protected:
// Returns the number of inputs.
int64 num_inputs() const SHARED_LOCKS_REQUIRED(mu_) {
int64 num_inputs = 0;
for (auto& input : inputs_) {
// Inputs for which autotuning is disabled are excluded.
if (input->autotune()) {
++num_inputs;
}
}
return num_inputs;
}
// Creates a clone of this node.
virtual std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const
SHARED_LOCKS_REQUIRED(mu_) = 0;
// Returns the per-element processing time spent in this node.
int64 NanosPerElementLocked() const SHARED_LOCKS_REQUIRED(mu_) {
if (num_elements_ == 0) {
return 0;
}
return static_cast<int64>(static_cast<double>(processing_time_) /
static_cast<double>(num_elements_));
}
// Returns the sum of per-element output time for the inputs of this node.
int64 OutputTimeForInputs(std::vector<int64>* input_times) const
double OutputTimeForInputs(std::vector<double>* input_times) const
SHARED_LOCKS_REQUIRED(mu_) {
int64 sum = 0;
double sum = 0;
for (auto& input : inputs_) {
// Inputs for which autotuning is disabled are excluded.
if (input->autotune()) {
sum += input->OutputTime(input_times);
}
}
return sum;
}
// Returns the per-element output time for this node.
virtual int64 OutputTimeLocked(std::vector<int64>* input_times) const
virtual double OutputTimeLocked(std::vector<double>* input_times) const
SHARED_LOCKS_REQUIRED(mu_) = 0;
// Returns the sum of per-element processing time for the inputs of this node.
//
// TODO(jsimsa): use processing time history as a prior for future inputs
int64 ProcessingTimeForInputs() const SHARED_LOCKS_REQUIRED(mu_) {
double ProcessingTimeForInputs() const SHARED_LOCKS_REQUIRED(mu_) {
int64 sum = 0;
for (auto& input : inputs_) {
sum += input->ProcessingTime();
// Inputs for which autotuning is disabled are excluded.
if (input->autotune()) {
sum += input->SelfProcessingTimeLocked();
}
}
return sum;
}
// Returns the per-element processing time spent in the subtree rooted in
// this node.
virtual int64 ProcessingTimeLocked() const SHARED_LOCKS_REQUIRED(mu_) = 0;
// Returns the per-element processing time spent in this node.
double SelfProcessingTimeLocked() const SHARED_LOCKS_REQUIRED(mu_) {
if (num_elements_ == 0) {
return 0;
}
return static_cast<double>(processing_time_) /
static_cast<double>(num_elements_);
}
// Returns the per-element CPU time spent in the subtree rooted in this node.
virtual double TotalProcessingTimeLocked() const
SHARED_LOCKS_REQUIRED(mu_) = 0;
mutable mutex mu_;
const int64 id_;
const string name_;
// Indicates whether the subtree rooted in this node should be included in
// autotuning. In particular, if this is `false`, then the subtree is excluded
// from computation of output time and processing time.
bool autotune_ GUARDED_BY(mu_) = true;
int64 buffered_bytes_ GUARDED_BY(mu_) = 0;
int64 processing_time_ GUARDED_BY(mu_) = 0;
int64 num_elements_ GUARDED_BY(mu_) = 0;
std::map<std::thread::id, int64> work_start_ GUARDED_BY(mu_);
std::map<string, std::shared_ptr<Parameter>> parameters_ GUARDED_BY(mu_);
// Inputs of this node. These can represent an iterator created from the input
// dataset but also other input iterators (e.g. created by the user-defined
// functions of `flat_map` or `interleave`).
std::list<std::shared_ptr<Node>> inputs_ GUARDED_BY(mu_);
// The reference to the output node is not owned so that deletion of a
@ -421,7 +484,7 @@ class Model {
int64 OutputTime(std::shared_ptr<Node> node);
// Collects the processing time for the given node.
int64 ProcessingTime(std::shared_ptr<Node> node);
int64 TotalProcessingTime(std::shared_ptr<Node> node);
// Used for coordination between different input pipeline threads. Exclusive
// access is required only when adding or removing nodes. Concurrent access to

View File

@ -25,11 +25,11 @@ namespace model {
namespace {
class AsyncInterleaveManyTest
: public ::testing::TestWithParam<std::tuple<int64, int64>> {};
: public ::testing::TestWithParam<std::tuple<int64, double>> {};
TEST_P(AsyncInterleaveManyTest, Model) {
const int64 parallelism = std::get<0>(GetParam());
const int64 input_time = std::get<1>(GetParam());
const double input_time = std::get<1>(GetParam());
std::shared_ptr<Node> async_interleave_many =
model::MakeAsyncInterleaveManyNode(
{0, "async_interleave_many", nullptr},
@ -55,29 +55,29 @@ TEST_P(AsyncInterleaveManyTest, Model) {
auto cleanup2 = gtl::MakeCleanup([async_interleave_many, source2]() {
async_interleave_many->remove_input(source2);
});
std::vector<int64> input_times(1, input_time);
std::vector<double> input_times(1, input_time);
async_interleave_many->add_processing_time(100);
EXPECT_EQ(async_interleave_many->processing_time(), 100);
EXPECT_EQ(async_interleave_many->ProcessingTime(), 0);
EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 0);
EXPECT_EQ(async_interleave_many->OutputTime(&input_times), 0);
async_interleave_many->record_element();
EXPECT_EQ(async_interleave_many->num_elements(), 1);
EXPECT_EQ(async_interleave_many->ProcessingTime(), 100);
EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 100);
EXPECT_LE(async_interleave_many->OutputTime(&input_times), 100);
EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0);
source1->add_processing_time(200);
source2->add_processing_time(300);
EXPECT_EQ(async_interleave_many->ProcessingTime(), 100);
EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 100);
EXPECT_LE(async_interleave_many->OutputTime(&input_times), 100);
EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0);
source1->record_element();
source2->record_element();
EXPECT_EQ(async_interleave_many->ProcessingTime(), 100 + 250);
EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 100 + 250);
EXPECT_LE(async_interleave_many->OutputTime(&input_times),
100 + 250 / parallelism);
EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0);
async_interleave_many->record_element();
EXPECT_EQ(async_interleave_many->ProcessingTime(), 50 + 250);
EXPECT_EQ(async_interleave_many->TotalProcessingTime(), 50 + 250);
EXPECT_LE(async_interleave_many->OutputTime(&input_times),
50 + 250 / parallelism);
EXPECT_GE(async_interleave_many->OutputTime(&input_times), 0);
@ -89,11 +89,11 @@ INSTANTIATE_TEST_SUITE_P(Test, AsyncInterleaveManyTest,
200)));
class AsyncKnownRatioTest
: public ::testing::TestWithParam<std::tuple<int64, int64, int64>> {};
: public ::testing::TestWithParam<std::tuple<int64, double, int64>> {};
TEST_P(AsyncKnownRatioTest, Model) {
const int64 parallelism = std::get<0>(GetParam());
const int64 input_time = std::get<1>(GetParam());
const double input_time = std::get<1>(GetParam());
const int64 num_inputs_per_output = std::get<2>(GetParam());
std::shared_ptr<Node> async_known_many = model::MakeAsyncKnownRatioNode(
{0, "async_known_many", nullptr}, num_inputs_per_output,
@ -107,50 +107,51 @@ TEST_P(AsyncKnownRatioTest, Model) {
std::shared_ptr<Node> source2 =
model::MakeSourceNode({2, "source2", async_known_many});
async_known_many->add_input(source2);
std::vector<int64> input_times(1, input_time);
std::vector<double> input_times(1, input_time);
source1->add_processing_time(100);
EXPECT_EQ(async_known_many->ProcessingTime(), 0);
EXPECT_EQ(async_known_many->TotalProcessingTime(), 0);
EXPECT_EQ(async_known_many->OutputTime(&input_times), 0);
source2->add_processing_time(200);
EXPECT_EQ(async_known_many->ProcessingTime(), 0);
EXPECT_EQ(async_known_many->TotalProcessingTime(), 0);
EXPECT_EQ(async_known_many->OutputTime(&input_times), 0);
source1->record_element();
EXPECT_EQ(async_known_many->ProcessingTime(), num_inputs_per_output * 100);
EXPECT_EQ(async_known_many->TotalProcessingTime(),
num_inputs_per_output * 100);
EXPECT_LE(async_known_many->OutputTime(&input_times),
num_inputs_per_output * 100);
EXPECT_GE(async_known_many->OutputTime(&input_times), 0);
source2->record_element();
EXPECT_EQ(async_known_many->ProcessingTime(),
EXPECT_EQ(async_known_many->TotalProcessingTime(),
num_inputs_per_output * (100 + 200));
EXPECT_LE(async_known_many->OutputTime(&input_times),
num_inputs_per_output * (100 + 200));
EXPECT_GE(async_known_many->OutputTime(&input_times), 0);
source1->record_element();
EXPECT_EQ(async_known_many->ProcessingTime(),
EXPECT_EQ(async_known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 200));
EXPECT_LE(async_known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 200));
EXPECT_GE(async_known_many->OutputTime(&input_times), 0);
source2->record_element();
EXPECT_EQ(async_known_many->ProcessingTime(),
EXPECT_EQ(async_known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100));
EXPECT_LE(async_known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100));
EXPECT_GE(async_known_many->OutputTime(&input_times), 0);
async_known_many->add_processing_time(128);
EXPECT_EQ(async_known_many->ProcessingTime(),
EXPECT_EQ(async_known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100));
EXPECT_LE(async_known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100));
EXPECT_GE(async_known_many->OutputTime(&input_times), 0);
async_known_many->record_element();
EXPECT_EQ(async_known_many->ProcessingTime(),
EXPECT_EQ(async_known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100) + 128);
EXPECT_LE(async_known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100) + 128 / parallelism);
EXPECT_GE(async_known_many->OutputTime(&input_times), 0);
async_known_many->record_element();
EXPECT_EQ(async_known_many->ProcessingTime(),
EXPECT_EQ(async_known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100) + 64);
EXPECT_LE(async_known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100) + 64 / parallelism);
@ -174,25 +175,25 @@ TEST(InterleaveManyTest, Model) {
std::shared_ptr<Node> source2 =
model::MakeSourceNode({2, "source2", interleave_many});
interleave_many->add_input(source2);
std::vector<int64> input_times(1, 0);
std::vector<double> input_times(1, 0);
interleave_many->add_processing_time(100);
EXPECT_EQ(interleave_many->processing_time(), 100);
EXPECT_EQ(interleave_many->ProcessingTime(), 0);
EXPECT_EQ(interleave_many->TotalProcessingTime(), 0);
EXPECT_EQ(interleave_many->OutputTime(&input_times), 0);
interleave_many->record_element();
EXPECT_EQ(interleave_many->num_elements(), 1);
EXPECT_EQ(interleave_many->ProcessingTime(), 100);
EXPECT_EQ(interleave_many->TotalProcessingTime(), 100);
EXPECT_EQ(interleave_many->OutputTime(&input_times), 100);
source1->add_processing_time(200);
source2->add_processing_time(300);
EXPECT_EQ(interleave_many->ProcessingTime(), 100);
EXPECT_EQ(interleave_many->TotalProcessingTime(), 100);
EXPECT_EQ(interleave_many->OutputTime(&input_times), 100);
source1->record_element();
source2->record_element();
EXPECT_EQ(interleave_many->ProcessingTime(), 350);
EXPECT_EQ(interleave_many->TotalProcessingTime(), 350);
EXPECT_EQ(interleave_many->OutputTime(&input_times), 350);
interleave_many->record_element();
EXPECT_EQ(interleave_many->ProcessingTime(), 300);
EXPECT_EQ(interleave_many->TotalProcessingTime(), 300);
EXPECT_EQ(interleave_many->OutputTime(&input_times), 300);
}
@ -208,39 +209,43 @@ TEST_P(KnownRatioTest, Model) {
std::shared_ptr<Node> source2 =
model::MakeSourceNode({2, "source2", known_many});
known_many->add_input(source2);
std::vector<int64> input_times(1, 0);
std::vector<double> input_times(1, 0);
source1->add_processing_time(100);
EXPECT_EQ(known_many->ProcessingTime(), 0);
EXPECT_EQ(known_many->TotalProcessingTime(), 0);
EXPECT_EQ(known_many->OutputTime(&input_times), 0);
source2->add_processing_time(200);
EXPECT_EQ(known_many->ProcessingTime(), 0);
EXPECT_EQ(known_many->TotalProcessingTime(), 0);
EXPECT_EQ(known_many->OutputTime(&input_times), 0);
source1->record_element();
EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * 100);
EXPECT_EQ(known_many->TotalProcessingTime(), num_inputs_per_output * 100);
EXPECT_EQ(known_many->OutputTime(&input_times), num_inputs_per_output * 100);
source2->record_element();
EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (100 + 200));
EXPECT_EQ(known_many->TotalProcessingTime(),
num_inputs_per_output * (100 + 200));
EXPECT_EQ(known_many->OutputTime(&input_times),
num_inputs_per_output * (100 + 200));
source1->record_element();
EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (50 + 200));
EXPECT_EQ(known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 200));
EXPECT_EQ(known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 200));
source2->record_element();
EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (50 + 100));
EXPECT_EQ(known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100));
EXPECT_EQ(known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100));
known_many->add_processing_time(128);
EXPECT_EQ(known_many->ProcessingTime(), num_inputs_per_output * (50 + 100));
EXPECT_EQ(known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100));
EXPECT_EQ(known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100));
known_many->record_element();
EXPECT_EQ(known_many->ProcessingTime(),
EXPECT_EQ(known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100) + 128);
EXPECT_EQ(known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100) + 128);
known_many->record_element();
EXPECT_EQ(known_many->ProcessingTime(),
EXPECT_EQ(known_many->TotalProcessingTime(),
num_inputs_per_output * (50 + 100) + 64);
EXPECT_EQ(known_many->OutputTime(&input_times),
num_inputs_per_output * (50 + 100) + 64);
@ -250,18 +255,18 @@ INSTANTIATE_TEST_SUITE_P(Test, KnownRatioTest, ::testing::Values(0, 1, 2, 4));
TEST(SourceTest, Model) {
std::shared_ptr<Node> source = model::MakeSourceNode({0, "source", nullptr});
std::vector<int64> input_times(1, 0);
std::vector<double> input_times(1, 0);
source->add_processing_time(100);
EXPECT_EQ(source->processing_time(), 100);
EXPECT_EQ(source->ProcessingTime(), 0);
EXPECT_EQ(source->TotalProcessingTime(), 0);
EXPECT_EQ(source->OutputTime(&input_times), 0);
source->record_element();
EXPECT_EQ(source->num_elements(), 1);
EXPECT_EQ(source->ProcessingTime(), 100);
EXPECT_EQ(source->TotalProcessingTime(), 100);
EXPECT_EQ(source->OutputTime(&input_times), 100);
source->record_element();
EXPECT_EQ(source->num_elements(), 2);
EXPECT_EQ(source->ProcessingTime(), 50);
EXPECT_EQ(source->TotalProcessingTime(), 50);
EXPECT_EQ(source->OutputTime(&input_times), 50);
}
@ -274,25 +279,25 @@ TEST(UnknownRatioTest, Model) {
std::shared_ptr<Node> source2 =
model::MakeSourceNode({2, "source2", unknown_many});
unknown_many->add_input(source2);
std::vector<int64> input_times(1, 0);
std::vector<double> input_times(1, 0);
unknown_many->add_processing_time(100);
EXPECT_EQ(unknown_many->processing_time(), 100);
EXPECT_EQ(unknown_many->ProcessingTime(), 0);
EXPECT_EQ(unknown_many->TotalProcessingTime(), 0);
EXPECT_EQ(unknown_many->OutputTime(&input_times), 0);
unknown_many->record_element();
EXPECT_EQ(unknown_many->num_elements(), 1);
EXPECT_EQ(unknown_many->ProcessingTime(), 100);
EXPECT_EQ(unknown_many->TotalProcessingTime(), 100);
EXPECT_EQ(unknown_many->OutputTime(&input_times), 100);
source1->add_processing_time(100);
source2->add_processing_time(200);
EXPECT_EQ(unknown_many->ProcessingTime(), 100);
EXPECT_EQ(unknown_many->TotalProcessingTime(), 100);
EXPECT_EQ(unknown_many->OutputTime(&input_times), 100);
source1->record_element();
source2->record_element();
EXPECT_EQ(unknown_many->ProcessingTime(), 400);
EXPECT_EQ(unknown_many->TotalProcessingTime(), 400);
EXPECT_EQ(unknown_many->OutputTime(&input_times), 400);
unknown_many->record_element();
EXPECT_EQ(unknown_many->ProcessingTime(), 200);
EXPECT_EQ(unknown_many->TotalProcessingTime(), 200);
EXPECT_EQ(unknown_many->OutputTime(&input_times), 200);
}
@ -305,36 +310,36 @@ TEST(UnknownTest, Model) {
std::shared_ptr<Node> source2 =
model::MakeSourceNode({2, "source2", unknown});
unknown->add_input(source2);
std::vector<int64> input_times(1, 0);
std::vector<double> input_times(1, 0);
source1->add_processing_time(100);
EXPECT_EQ(unknown->ProcessingTime(), 0);
EXPECT_EQ(unknown->TotalProcessingTime(), 0);
EXPECT_EQ(unknown->OutputTime(&input_times), 0);
source2->add_processing_time(100);
EXPECT_EQ(unknown->ProcessingTime(), 0);
EXPECT_EQ(unknown->TotalProcessingTime(), 0);
EXPECT_EQ(unknown->OutputTime(&input_times), 0);
source1->record_element();
EXPECT_EQ(unknown->ProcessingTime(), 100);
EXPECT_EQ(unknown->TotalProcessingTime(), 100);
EXPECT_EQ(unknown->OutputTime(&input_times), 100);
source2->record_element();
EXPECT_EQ(unknown->ProcessingTime(), 200);
EXPECT_EQ(unknown->TotalProcessingTime(), 200);
EXPECT_EQ(unknown->OutputTime(&input_times), 200);
source1->record_element();
EXPECT_EQ(unknown->ProcessingTime(), 150);
EXPECT_EQ(unknown->TotalProcessingTime(), 150);
EXPECT_EQ(unknown->OutputTime(&input_times), 150);
source2->record_element();
EXPECT_EQ(unknown->ProcessingTime(), 100);
EXPECT_EQ(unknown->TotalProcessingTime(), 100);
EXPECT_EQ(unknown->OutputTime(&input_times), 100);
// Unknown node processing time should not affect its ProcessingTime() or
// Unknown node processing time should not affect its TotalProcessingTime() or
// OutputTime().
unknown->add_processing_time(100);
EXPECT_EQ(unknown->processing_time(), 100);
EXPECT_EQ(unknown->ProcessingTime(), 100);
EXPECT_EQ(unknown->TotalProcessingTime(), 100);
EXPECT_EQ(unknown->OutputTime(&input_times), 100);
// Unknown node number of elements should not affect its ProcessingTime() or
// OutputTime().
// Unknown node number of elements should not affect its TotalProcessingTime()
// or OutputTime().
unknown->record_element();
EXPECT_EQ(unknown->num_elements(), 1);
EXPECT_EQ(unknown->ProcessingTime(), 100);
EXPECT_EQ(unknown->TotalProcessingTime(), 100);
EXPECT_EQ(unknown->OutputTime(&input_times), 100);
}
@ -350,12 +355,12 @@ class TestNode : public model::Node {
return nullptr;
}
int64 OutputTimeLocked(std::vector<int64>* input_times) const override
double OutputTimeLocked(std::vector<double>* input_times) const override
SHARED_LOCKS_REQUIRED(mu_) {
return 0;
}
int64 ProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
double TotalProcessingTimeLocked() const override SHARED_LOCKS_REQUIRED(mu_) {
return 0;
}
};

View File

@ -252,7 +252,7 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel {
return model::MakeAsyncInterleaveManyNode(
std::move(args),
{model::MakeParameter("parallelism", num_parallel_calls_, /*min=*/1,
/*max=*/port::NumSchedulableCPUs())});
/*max=*/dataset()->cycle_length_)});
}
Status SaveInternal(IteratorStateWriter* writer) override {
@ -462,6 +462,10 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel {
if (!future_elements_.empty()) {
current_elements_[idx] = std::move(future_elements_.back());
future_elements_.pop_back();
if (current_elements_[idx]->iterator) {
EnableAutotune(ctx.get(),
current_elements_[idx]->iterator.get());
}
} else {
current_elements_[idx] = MakeElement(ctx);
if (!current_elements_[idx]) {
@ -480,9 +484,21 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel {
if (num_results > 0) {
num_calls_++;
element->in_use = true;
thread_pool_->Schedule(
std::bind(&ParallelInterleaveIterator::FetchResults, this,
ctx, std::move(element), num_results));
thread_pool_->Schedule(std::bind(
&ParallelInterleaveIterator::FetchResults, this, ctx,
std::move(element), num_results,
[this, ctx]() EXCLUSIVE_LOCKS_REQUIRED(*mu_) {
--num_calls_;
const auto& stats_aggregator = ctx->stats_aggregator();
if (stats_aggregator) {
stats_aggregator->AddScalar(
stats_utils::ThreadUtilizationScalarName(
dataset()->node_name()),
static_cast<float>(num_calls_) /
static_cast<float>(num_parallel_calls_->value),
num_elements());
}
}));
}
}
}
@ -518,7 +534,8 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel {
// Fetches up to `dataset()->block_length_` results from `element`.
void FetchResults(const std::shared_ptr<IteratorContext>& ctx,
const std::shared_ptr<Element>& element,
int64 num_results) LOCKS_EXCLUDED(*mu_) {
int64 num_results, std::function<void()> done)
LOCKS_EXCLUDED(*mu_) {
RecordStart(ctx.get());
auto cleanup = gtl::MakeCleanup([this, ctx] { RecordStop(ctx.get()); });
bool end_of_input = false;
@ -546,15 +563,7 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel {
element->inputs.clear();
--num_open_;
}
--num_calls_;
const auto& stats_aggregator = ctx->stats_aggregator();
if (stats_aggregator) {
stats_aggregator->AddScalar(
stats_utils::ThreadUtilizationScalarName(dataset()->node_name()),
static_cast<float>(num_calls_) /
static_cast<float>(num_parallel_calls_->value),
num_elements());
}
done();
cond_var_->notify_all();
}
@ -566,9 +575,8 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel {
RecordStart(ctx.get());
auto cleanup = gtl::MakeCleanup([this, ctx] { RecordStop(ctx.get()); });
auto busy = [this]() EXCLUSIVE_LOCKS_REQUIRED(*mu_) -> bool {
// TODO(jsimsa): Autotune the buffer size.
return num_calls_ >= num_parallel_calls_->value ||
future_elements_.size() >= 2 * dataset()->cycle_length_;
// TODO(jsimsa): Autotune the number of iterators to prefetch.
return future_elements_.size() >= 2 * dataset()->cycle_length_;
};
while (true) {
mutex_lock l(*mu_);
@ -595,20 +603,11 @@ class ParallelInterleaveDatasetOp : public UnaryDatasetOpKernel {
if (!element->iterator) {
continue;
}
++num_calls_;
DisableAutotune(ctx.get(), element->iterator.get());
element->in_use = true;
thread_pool_->Schedule(
std::bind(&ParallelInterleaveIterator::FetchResults, this, ctx,
std::move(element), dataset()->block_length_));
}
const auto& stats_aggregator = ctx->stats_aggregator();
if (stats_aggregator) {
stats_aggregator->AddScalar(
stats_utils::ThreadUtilizationScalarName(
dataset()->node_name()),
static_cast<float>(num_calls_) /
static_cast<float>(num_parallel_calls_->value),
num_elements());
std::move(element), dataset()->block_length_, [] {}));
}
cond_var_->notify_all();
}

View File

@ -15,7 +15,6 @@ py_test(
"//tensorflow/python:client_testlib",
"//tensorflow/python:math_ops",
"//tensorflow/python:session",
"//tensorflow/python/data/experimental/ops:batching",
"//tensorflow/python/data/experimental/ops:optimization",
"//tensorflow/python/data/ops:dataset_ops",
"//third_party/py/numpy",

View File

@ -22,7 +22,6 @@ import time
import numpy as np
from tensorflow.python.client import session
from tensorflow.python.data.experimental.ops import batching
from tensorflow.python.data.experimental.ops import optimization
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.ops import math_ops
@ -78,13 +77,12 @@ class AutotuneBenchmark(test.Benchmark):
dataset = dataset_ops.Dataset.from_tensors((np.random.rand(1, 4 * k),
np.random.rand(4 * k,
1))).repeat()
dataset = dataset.apply(
batching.map_and_batch(
math_ops.matmul,
num_parallel_calls=optimization.AUTOTUNE,
batch_size=batch_size))
dataset = dataset.map(
math_ops.matmul, num_parallel_calls=optimization.AUTOTUNE)
dataset = dataset.batch(batch_size=batch_size)
options = dataset_ops.Options()
options.experimental_optimization.apply_default_optimizations = False
options.experimental_optimization.map_and_batch_fusion = True
options.experimental_optimization.autotune = autotune
dataset = dataset.with_options(options)
iterator = dataset_ops.make_one_shot_iterator(dataset)