[tf.data] Update the gradient descent algorithm for autotuning so that the optimization will not stop when CPU budget is reached. Instead it will only increase the buffer_size parameters.

PiperOrigin-RevId: 346374290
Change-Id: Id977bc6ab1329b59f1dfcf53ae03b0640bf56620
This commit is contained in:
Jay Shi 2020-12-08 11:42:43 -08:00 committed by TensorFlower Gardener
parent a8d76616fd
commit 4e6fbd12cb
2 changed files with 145 additions and 74 deletions

View File

@ -37,6 +37,88 @@ inline bool IsAutotuneNode(const std::shared_ptr<Node> node) {
// Wrapper for the square function to reduce verbosity.
inline double Square(double x) { return x * x; }
// Collects "essential" parallelism parameters and buffer size parameters in the
// tree rooted in the given node. Which parallelism parameters are essential is
// determined by the relative processing time spent in the corresponding
// transformation. The collected parameters are returned via maps that map node
// names to their respective parameters.
inline void CollectParameters(
std::shared_ptr<Node> node,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>& parameters,
absl::flat_hash_map<string, std::shared_ptr<Parameter>>*
parallelism_parameters,
absl::flat_hash_map<string, std::shared_ptr<Parameter>>*
buffer_size_parameters) {
// Parallelism parameter is considered to be essential if the corresponding
// transformations's processing time is greater than essential rate times the
// average transformation self processing time.
constexpr double kEssentialRate = 0.3L;
absl::flat_hash_map<string, double> processing_times;
double processing_time = node->TotalProcessingTime(&processing_times);
double uniform_share =
processing_time / static_cast<double>(processing_times.size());
for (auto& pair : parameters) {
if (pair.second->name == kParallelism &&
processing_times[pair.first] > kEssentialRate * uniform_share) {
parallelism_parameters->insert(pair);
} else if (pair.second->name == kBufferSize) {
buffer_size_parameters->insert(pair);
}
}
}
// Applies the gradient descent method once and updates the parameter values. If
// the new value is out of the range, bound it within the range between the
// minimal and maximum values.
inline void UpdateParameterValues(
const absl::flat_hash_map<string, double>& gradients,
absl::flat_hash_map<string, std::shared_ptr<Parameter>>* parameters) {
// Gradient descent step size.
constexpr double kDescentStep = 0.1L;
double new_value;
double max_abs_derivative = 1.0;
for (auto& pair : *parameters) {
if (std::round(pair.second->value) != pair.second->max) {
auto* gradient = gtl::FindOrNull(gradients, pair.first);
if (gradient) {
max_abs_derivative = std::max(max_abs_derivative, std::abs(*gradient));
}
}
}
for (auto& pair : *parameters) {
auto* gradient = gtl::FindOrNull(gradients, pair.first);
if (gradient) {
new_value =
pair.second->value - kDescentStep * (*gradient) / max_abs_derivative;
// Projection on a feasible interval.
if (new_value > pair.second->max) {
pair.second->value = pair.second->max;
} else if (new_value < pair.second->min) {
pair.second->value = pair.second->min;
} else {
pair.second->value = new_value;
}
}
}
}
// Copies the parameter values (which are for optimization tuning) and updates
// the state values (which are for the input pipeline to follow).
inline void UpdateStateValues(
absl::flat_hash_map<string, std::shared_ptr<Parameter>>* parameters) {
VLOG(2) << "Number of tunable parameters: " << parameters->size();
for (auto& pair : *parameters) {
auto& parameter = pair.second;
VLOG(2) << "Setting tunable parameter " << pair.first << " to "
<< parameter->value;
mutex_lock l(*parameter->state->mu);
parameter->state->value = parameter->value;
parameter->state->cond_var->notify_all();
}
}
// The first input of InterleaveMany corresponds to the input dataset whose
// elements are used to create the (derived) input datasets whose elements are
// interleaved as output.
@ -1406,27 +1488,37 @@ Model::CollectTunableParameters(std::shared_ptr<Node> node) {
return parameters;
}
absl::flat_hash_map<string, std::shared_ptr<Parameter>>
Model::CollectEssentialParallelism(
std::shared_ptr<Node> node,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>& parameters) {
// Parallelism parameter is considered to be essential if the corresponding
// transformations's processing time is greater than essential rate times the
// average transformation self processing time.
constexpr double kEssentialRate = 0.3L;
bool Model::ShouldStop(
int64 cpu_budget, int64 ram_budget,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>& parameters,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>&
parallelism_parameters,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>&
buffer_size_parameters,
std::shared_ptr<Node> snapshot, bool* cpu_budget_reached) {
if (!(*cpu_budget_reached)) {
// If those essential transformations' parallelism reaches the CPU
// budget, we will only tune the buffer size parameters in future
// iterations.
int64 model_parallelism = 0;
for (auto& pair : parallelism_parameters) {
model_parallelism += std::round(pair.second->value);
}
*cpu_budget_reached = (model_parallelism > cpu_budget);
}
absl::flat_hash_map<string, double> processing_times;
double processing_time = node->TotalProcessingTime(&processing_times);
double uniform_share =
processing_time / static_cast<double>(processing_times.size());
absl::flat_hash_map<string, std::shared_ptr<Parameter>> essential_parameters;
for (auto& pair : parameters) {
if (pair.second->name == kParallelism &&
processing_times[pair.first] > kEssentialRate * uniform_share) {
essential_parameters.insert(pair);
bool all_max = true;
for (auto& pair :
(*cpu_budget_reached ? buffer_size_parameters : parameters)) {
if (std::round(pair.second->value) < pair.second->max) {
all_max = false;
break;
}
}
return essential_parameters;
// If all parameters have reached their maximum values or RAM budget is
// reached, we stop the iterations.
return all_max || TotalMaximumBufferedBytes(snapshot) > ram_budget;
}
void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
@ -1438,12 +1530,16 @@ void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
}
VLOG(2) << "Starting optimization of tunable parameters with GradientDescent";
auto parameters = CollectTunableParameters(snapshot);
auto essential_parameters = CollectEssentialParallelism(snapshot, parameters);
// The maps of "essential" parallelism parameters and buffer size parameters.
absl::flat_hash_map<string, std::shared_ptr<Parameter>>
parallelism_parameters, buffer_size_parameters;
CollectParameters(snapshot, parameters, &parallelism_parameters,
&buffer_size_parameters);
// Initialize the parameter values to minimal before tuning.
for (auto& pair : parameters) {
pair.second->value = pair.second->min;
}
// Gradient descent step size.
constexpr double kDescentStep = 0.1L;
// Optimization is stopped once the `OutputTime` improvement is smaller than
// this value.
@ -1454,53 +1550,34 @@ void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
double output_time = 0;
double new_output_time;
double new_value;
for (int i = 0; i < kMaxIterations; ++i) {
// When the CPU budget is reached, the parallelism parameter values are fixed
// and we only increase the buffer size parameters.
bool cpu_budget_reached = false;
for (int i = 0;
i < kMaxIterations &&
!ShouldStop(cpu_budget, ram_budget, parameters, parallelism_parameters,
buffer_size_parameters, snapshot, &cpu_budget_reached);
++i) {
absl::flat_hash_map<string, double> gradients;
new_output_time = OutputTime(snapshot, model_input_time, &gradients);
int64 model_parallelism = 0;
for (auto& pair : essential_parameters) {
model_parallelism += std::round(pair.second->value);
}
// We terminate once the improvement of the output latency is too small or
// the essential transformations' parallelism reaches the CPU budget or the
// worst-case total buffer size exceeds the memory budget.
if (std::abs(output_time - new_output_time) < kOptimizationPrecision ||
model_parallelism > cpu_budget ||
TotalMaximumBufferedBytes(snapshot) > ram_budget) {
if (std::abs(output_time - new_output_time) < kOptimizationPrecision) {
break;
}
double max_abs_derivative = 1.0;
for (auto& pair : parameters) {
if (pair.second->value != pair.second->max) {
max_abs_derivative =
std::max(max_abs_derivative, std::abs(gradients[pair.first]));
}
}
for (auto& pair : parameters) {
new_value = pair.second->value -
kDescentStep * gradients[pair.first] / max_abs_derivative;
// Projection on a feasible interval.
if (new_value > pair.second->max) {
pair.second->value = pair.second->max;
} else if (new_value < pair.second->min) {
pair.second->value = pair.second->min;
} else {
pair.second->value = new_value;
}
}
UpdateParameterValues(
gradients, &(cpu_budget_reached ? buffer_size_parameters : parameters));
output_time = new_output_time;
}
VLOG(2) << "Number of tunable parameters: " << parameters.size();
for (auto& pair : parameters) {
pair.second->value = std::round(pair.second->value);
auto& parameter = pair.second;
VLOG(2) << "Setting tunable parameter " << pair.first << " to "
<< parameter->value;
mutex_lock l(*parameter->state->mu);
parameter->state->value = parameter->value;
parameter->state->cond_var->notify_all();
}
UpdateStateValues(&parameters);
}
void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
@ -1517,6 +1594,7 @@ void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
// improvement is greater than this constant.
constexpr double kBufferSizeMinDelta = 1.0L;
// Initialize the parameter values to minimal before tuning.
for (auto& pair : parameters) {
pair.second->value = pair.second->min;
}
@ -1560,15 +1638,7 @@ void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
}
best_parameter->value++;
}
VLOG(2) << "Number of tunable parameters: " << parameters.size();
for (auto& pair : parameters) {
auto& parameter = pair.second;
VLOG(2) << "Setting tunable parameter " << pair.first << " to "
<< parameter->value;
mutex_lock l(*parameter->state->mu);
parameter->state->value = parameter->value;
parameter->state->cond_var->notify_all();
}
UpdateStateValues(&parameters);
}
double Model::OutputTime(std::shared_ptr<Node> node, double model_input_time,

View File

@ -644,16 +644,17 @@ class Model {
absl::flat_hash_map<string, std::shared_ptr<Parameter>>
CollectTunableParameters(std::shared_ptr<Node> node);
// Collects "essential" parallelism parameters of transformations in the tree
// rooted in the given node. Which parameters are essential is determined by
// comparison the processing time spent in the corresponding transformation
// relative to other transformations. The collected parameters are returned
// as a mapping from a (unique) node name to a parallelism parameter.
absl::flat_hash_map<string, std::shared_ptr<Parameter>>
CollectEssentialParallelism(
std::shared_ptr<Node> node,
// Determines if we should stop the gradient descent optimization iterations
// based on number of increasable parameters, CPU budget, RAM budget and
// current resource usage.
bool ShouldStop(
int64 cpu_budget, int64 ram_budget,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>& parameters,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>&
parameters);
parallelism_parameters,
const absl::flat_hash_map<string, std::shared_ptr<Parameter>>&
buffer_size_parameters,
std::shared_ptr<Node> snapshot, bool* cpu_budget_reached);
// This optimization algorithm starts by setting all tunable parallelism
// parameters to the minimum value. It then repeatedly identifies the