[tf.data] In tf.data autotuning, keep the initialized parameter values if processing times are not correctly collected yet.

PiperOrigin-RevId: 349589081
Change-Id: I0478ddf02d4b9360898485744ee57f99bcb57aee
This commit is contained in:
Jay Shi 2020-12-30 12:32:02 -08:00 committed by TensorFlower Gardener
parent 73b304c9e4
commit 0c04b471fb
3 changed files with 94 additions and 6 deletions

View File

@ -1315,7 +1315,9 @@ Node::NodeVector Node::CollectNodes(
void Node::CollectTunableParametersHelper(
absl::flat_hash_map<string, std::shared_ptr<Parameter>>* parameters) const
TF_SHARED_LOCKS_REQUIRED(mu_) {
if (!autotune_) {
// If autotune is turned off or there are no elements recorded, we don't
// collect the parameters on the node.
if (!autotune_ || num_elements_ <= 0) {
return;
}
for (auto& pair : parameters_) {
@ -1528,8 +1530,15 @@ void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
tf_shared_lock lock(mu_);
snapshot = output_->Snapshot();
}
VLOG(2) << "Starting optimization of tunable parameters with GradientDescent";
VLOG(2) << "Starting optimization of tunable parameters with Gradient "
"Descent.";
auto parameters = CollectTunableParameters(snapshot);
if (parameters.empty()) {
VLOG(2) << "The Gradient Descent optimization is terminated since no node "
"with tunable parameters has recorded elements.";
return;
}
// The maps of "essential" parallelism parameters and buffer size parameters.
absl::flat_hash_map<string, std::shared_ptr<Parameter>>
parallelism_parameters, buffer_size_parameters;
@ -1586,9 +1595,15 @@ void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
tf_shared_lock lock(mu_);
snapshot = output_->Snapshot();
}
VLOG(2) << "Starting optimization of tunable parameters with HillClimb";
VLOG(2) << "Starting optimization of tunable parameters with Hill Climb.";
const double processing_time = TotalProcessingTime(snapshot);
auto parameters = CollectTunableParameters(snapshot);
if (parameters.empty()) {
VLOG(2) << "The Hill Climb optimization is terminated since no node with "
"tunable parameters has recorded elements.";
return;
}
// Buffer size parameter will only be incremented if the output latency
// improvement is greater than this constant.
constexpr double kBufferSizeMinDelta = 1.0L;

View File

@ -503,7 +503,7 @@ class Node {
bool collect_node(const std::shared_ptr<Node>)) const
TF_SHARED_LOCKS_REQUIRED(mu_);
// Collect tunable parameters for the node.
// Collect tunable parameters on the nodes which have recorded elements.
void CollectTunableParametersHelper(
absl::flat_hash_map<string, std::shared_ptr<Parameter>>* parameters) const
TF_SHARED_LOCKS_REQUIRED(mu_);

View File

@ -507,6 +507,75 @@ TEST(TestManyElements, Model) {
0);
}
TEST(CollectAutotuneParametersWithElementsTest, Model) {
std::shared_ptr<Node> unknown =
model::MakeUnknownNode({0, "unknown", nullptr});
std::shared_ptr<Node> async_known_ratio = model::MakeAsyncKnownRatioNode(
{1, "source", unknown}, 2,
{model::MakeParameter(
"parallelism",
std::make_shared<SharedState>(model::kAutotune, nullptr, nullptr), 1,
5)});
async_known_ratio->record_element();
unknown->add_input(async_known_ratio);
absl::flat_hash_map<string, std::shared_ptr<Parameter>> parameters;
unknown->CollectTunableParameters(&parameters);
EXPECT_FALSE(parameters.contains(unknown->long_name()));
EXPECT_TRUE(parameters.contains(async_known_ratio->long_name()));
EXPECT_EQ(parameters.size(), 1);
}
TEST(DontCollectNonAutotuneParametersTest, Model) {
std::shared_ptr<Node> unknown =
model::MakeUnknownNode({0, "unknown", nullptr});
std::shared_ptr<Node> async_known_ratio = model::MakeAsyncKnownRatioNode(
{1, "source", unknown}, 2,
{model::MakeParameter("parallelism",
std::make_shared<SharedState>(3, nullptr, nullptr),
1, 5)});
async_known_ratio->record_element();
unknown->add_input(async_known_ratio);
absl::flat_hash_map<string, std::shared_ptr<Parameter>> parameters;
unknown->CollectTunableParameters(&parameters);
EXPECT_EQ(parameters.size(), 0);
}
TEST(DontCollectAutotuneDisabledParametersTest, Model) {
std::shared_ptr<Node> unknown =
model::MakeUnknownNode({0, "unknown", nullptr});
std::shared_ptr<Node> async_known_ratio = model::MakeAsyncKnownRatioNode(
{1, "source", unknown}, 2,
{model::MakeParameter(
"parallelism",
std::make_shared<SharedState>(model::kAutotune, nullptr, nullptr), 1,
5)});
async_known_ratio->record_element();
async_known_ratio->set_autotune(false);
unknown->add_input(async_known_ratio);
absl::flat_hash_map<string, std::shared_ptr<Parameter>> parameters;
unknown->CollectTunableParameters(&parameters);
EXPECT_EQ(parameters.size(), 0);
}
TEST(DontCollectParametersWithoutElementsTest, Model) {
std::shared_ptr<Node> unknown =
model::MakeUnknownNode({0, "unknown", nullptr});
std::shared_ptr<Node> async_known_ratio = model::MakeAsyncKnownRatioNode(
{1, "source", unknown}, 2,
{model::MakeParameter(
"parallelism",
std::make_shared<SharedState>(model::kAutotune, nullptr, nullptr), 1,
5)});
unknown->add_input(async_known_ratio);
absl::flat_hash_map<string, std::shared_ptr<Parameter>> parameters;
unknown->CollectTunableParameters(&parameters);
EXPECT_EQ(parameters.size(), 0);
}
// Precision for comparison of the gradient and a relative output time change.
constexpr double kComparisonPrecision = 1e-1;
@ -547,14 +616,15 @@ TEST(AsyncInterleaveManyGradientTest, Model) {
});
absl::flat_hash_map<string, double> input_times;
input_times[kModelInputTimeKey] = input_time;
absl::flat_hash_map<string, std::shared_ptr<Parameter>> parameters;
async_interleave_many->CollectTunableParameters(&parameters);
async_interleave_many->record_element();
async_interleave_many->add_processing_time(100);
source1->record_element();
source1->add_processing_time(100);
source2->record_element();
source2->add_processing_time(300);
absl::flat_hash_map<string, std::shared_ptr<Parameter>> parameters;
async_interleave_many->CollectTunableParameters(&parameters);
parameters[async_interleave_many->long_name()]->value = 1;
parameters[source1->long_name()]->value = 1;
@ -907,6 +977,7 @@ TEST_P(OptimizeZeroRamBudgetTest, Model) {
std::make_shared<SharedState>(-1, mutex1, cv1), 1,
5)});
node1->record_buffer_event(1, 1);
node1->record_element();
std::shared_ptr<mutex> mutex2 = std::make_shared<mutex>();
std::shared_ptr<condition_variable> cv2 =
@ -917,6 +988,7 @@ TEST_P(OptimizeZeroRamBudgetTest, Model) {
std::make_shared<SharedState>(-1, mutex2, cv2), 0,
6)});
node2->record_buffer_event(1, 1);
node2->record_element();
std::shared_ptr<mutex> mutex3 = std::make_shared<mutex>();
std::shared_ptr<condition_variable> cv3 =
@ -927,6 +999,7 @@ TEST_P(OptimizeZeroRamBudgetTest, Model) {
std::make_shared<SharedState>(-1, mutex3, cv3), 1,
7)});
node3->record_buffer_event(1, 1);
node3->record_element();
EXPECT_EQ(node1->parameter_value("parallelism"), -1);
EXPECT_EQ(node2->parameter_value("buffer_size"), -1);