Introducing functionality to save and load an autotuning model and its optimization parameters to/from a file. Model saving happens in a background thread (separate from the optimization thread) after each optimization run. It is activated by setting an environment variable AUTOTUNE_DEBUG_DIR.

PiperOrigin-RevId: 358019843
Change-Id: I219e6d960b7177759d5d54ecf0a331ae4c0d08b2
This commit is contained in:
Ihor Indyk 2021-02-17 13:27:23 -08:00 committed by TensorFlower Gardener
parent 1178262a2a
commit 5cdb9a43aa
4 changed files with 225 additions and 46 deletions

View File

@ -1638,13 +1638,38 @@ void Model::FlushMetrics() {
void Model::Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget,
int64 ram_budget, double model_input_time) {
std::shared_ptr<Node> snapshot;
{
tf_shared_lock lock(mu_);
snapshot = output_->Snapshot();
}
OptimizationParams optimization_params;
optimization_params.set_algorithm(algorithm);
optimization_params.set_cpu_budget(cpu_budget);
optimization_params.set_ram_budget(ram_budget);
optimization_params.set_model_input_time(model_input_time);
switch (algorithm) {
case AutotuneAlgorithm::HILL_CLIMB:
OptimizeHillClimb(cpu_budget, ram_budget, model_input_time);
OptimizeHillClimb(snapshot, optimization_params);
break;
case AutotuneAlgorithm::GRADIENT_DESCENT:
OptimizeGradientDescent(cpu_budget, ram_budget, model_input_time);
OptimizeGradientDescent(snapshot, optimization_params);
break;
default:
VLOG(2) << "Autotuning algorithm was not recognized. Aborting "
"optimization.";
return;
}
if (!save_dir_.empty()) {
mutex_lock lock(mu_);
Status status = EnsureSaveLoopThreadStarted();
if (status.ok() && save_buffer_.size() < kMaxNumBufferedOptimizeArgs) {
save_buffer_.push_back(std::make_pair(snapshot, optimization_params));
save_cond_var_.notify_all();
} else if (save_buffer_.size() >= kMaxNumBufferedOptimizeArgs) {
VLOG(3) << "Saved snapshots buffer is full. Current snapshot and "
"optimization parameters will not be saved.";
}
}
}
@ -1707,7 +1732,7 @@ Status Model::OptimizeLoop(AutotuneAlgorithm algorithm, int64 cpu_budget,
cancellation_manager,
[this]() {
mutex_lock l(mu_);
cond_var_.notify_all();
optimize_cond_var_.notify_all();
},
/*deregister_fn=*/&unused));
@ -1721,7 +1746,7 @@ Status Model::OptimizeLoop(AutotuneAlgorithm algorithm, int64 cpu_budget,
auto wait_ms =
last_optimization_ms + optimization_period_ms_ - current_time_ms;
VLOG(2) << "Waiting for " << wait_ms << " ms.";
cond_var_.wait_for(l, std::chrono::milliseconds(wait_ms));
optimize_cond_var_.wait_for(l, std::chrono::milliseconds(wait_ms));
current_time_ms = EnvTime::NowMicros() / EnvTime::kMillisToMicros;
}
if (cancellation_manager->IsCancelled()) {
@ -1747,13 +1772,9 @@ Status Model::OptimizeLoop(AutotuneAlgorithm algorithm, int64 cpu_budget,
}
}
void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
double model_input_time) {
std::shared_ptr<Node> snapshot;
{
tf_shared_lock lock(mu_);
snapshot = output_->Snapshot();
}
void Model::OptimizeGradientDescent(
std::shared_ptr<Node> snapshot,
const OptimizationParams& optimization_params) {
VLOG(2) << "Starting optimization of tunable parameters with Gradient "
"Descent.";
auto parameters = CollectTunableParameters(snapshot);
@ -1788,13 +1809,15 @@ void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
// and we only increase the buffer size parameters.
bool cpu_budget_reached = false;
for (int i = 0;
i < kMaxIterations &&
!ShouldStop(cpu_budget, ram_budget, parameters, parallelism_parameters,
buffer_size_parameters, snapshot, &cpu_budget_reached);
for (int i = 0; i < kMaxIterations &&
!ShouldStop(optimization_params.cpu_budget(),
optimization_params.ram_budget(), parameters,
parallelism_parameters, buffer_size_parameters,
snapshot, &cpu_budget_reached);
++i) {
absl::flat_hash_map<string, double> gradients;
new_output_time = OutputTime(snapshot, model_input_time, &gradients);
new_output_time = OutputTime(
snapshot, optimization_params.model_input_time(), &gradients);
// We also terminate once the improvement of the output latency is too
// small.
if (std::abs(output_time - new_output_time) < kOptimizationPrecision) {
@ -1812,13 +1835,8 @@ void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
UpdateStateValues(&parameters);
}
void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
double model_input_time) {
std::shared_ptr<Node> snapshot;
{
tf_shared_lock lock(mu_);
snapshot = output_->Snapshot();
}
void Model::OptimizeHillClimb(std::shared_ptr<Node> snapshot,
const OptimizationParams& optimization_params) {
VLOG(2) << "Starting optimization of tunable parameters with Hill Climb.";
const double processing_time = TotalProcessingTime(snapshot);
auto parameters = CollectTunableParameters(snapshot);
@ -1838,7 +1856,8 @@ void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
}
while (true) {
const double output_time =
OutputTime(snapshot, model_input_time, /*gradients=*/nullptr);
OutputTime(snapshot, optimization_params.model_input_time(),
/*gradients=*/nullptr);
bool all_max = true;
for (auto& pair : parameters) {
if (pair.second->value < pair.second->max) {
@ -1846,8 +1865,10 @@ void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
break;
}
}
if (output_time < processing_time / cpu_budget || all_max ||
TotalMaximumBufferedBytes(snapshot) > ram_budget) {
if (output_time < processing_time / optimization_params.cpu_budget() ||
all_max ||
TotalMaximumBufferedBytes(snapshot) >
optimization_params.ram_budget()) {
break;
}
double best_delta = -1.0L;
@ -1858,7 +1879,8 @@ void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
}
pair.second->value++;
double new_output_time =
OutputTime(snapshot, model_input_time, /*gradients=*/nullptr);
OutputTime(snapshot, optimization_params.model_input_time(),
/*gradients=*/nullptr);
double delta = output_time - new_output_time;
if (delta > best_delta &&
(delta > kBufferSizeMinDelta || pair.second->name != kBufferSize)) {
@ -1930,6 +1952,72 @@ Status Model::FromProto(ModelProto model_proto, std::unique_ptr<Model>* model) {
return Status::OK();
}
Status Model::Save(const string& fname, std::shared_ptr<Node> snapshot,
const OptimizationParams& optimization_params) {
ModelProto model_proto;
std::unique_ptr<Model> model_snapshot = std::make_unique<Model>();
{
mutex_lock lock(model_snapshot->mu_);
model_snapshot->output_ = std::move(snapshot);
model_snapshot->id_counter_ = id_counter_;
model_snapshot->collect_resource_usage_.store(collect_resource_usage_);
}
TF_RETURN_IF_ERROR(model_snapshot->ToProto(&model_proto));
OptimizationParams* saved_optimization_params =
model_proto.mutable_optimization_params();
*saved_optimization_params = optimization_params;
return WriteBinaryProto(Env::Default(), fname, model_proto);
}
Status Model::Load(const string& fname, std::unique_ptr<Model>* model,
OptimizationParams* optimization_params) {
ModelProto model_proto;
TF_RETURN_IF_ERROR(ReadBinaryProto(Env::Default(), fname, &model_proto));
TF_RETURN_IF_ERROR(FromProto(model_proto, model));
const OptimizationParams restored_optimization_params =
model_proto.optimization_params();
*optimization_params = restored_optimization_params;
return Status::OK();
}
Status Model::EnsureSaveLoopThreadStarted() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (!save_thread_) {
save_thread_ = absl::WrapUnique(
Env::Default()->StartThread({}, "tf_data_model_save", [this]() {
Status status = SaveLoop();
if (!status.ok()) {
VLOG(2) << "Model save loop failed: " << status.ToString();
}
}));
}
return Status::OK();
}
Status Model::SaveLoop() {
TF_RETURN_IF_ERROR(Env::Default()->RecursivelyCreateDir(save_dir_));
while (true) {
std::pair<std::shared_ptr<Node>, OptimizationParams> to_save;
{
mutex_lock l(mu_);
while (!save_thread_cancelled_ && save_buffer_.empty()) {
save_cond_var_.wait(l);
}
if (save_thread_cancelled_) {
return Status::OK();
}
to_save = save_buffer_.front();
save_buffer_.pop_front();
}
string model_name =
absl::StrCat("autotune_model_",
Hash64Combine(static_cast<uint64>(EnvTime::NowMicros()),
reinterpret_cast<uint64>(this)));
string fname = io::JoinPath(save_dir_, model_name);
TF_RETURN_IF_ERROR(Save(fname, to_save.first, to_save.second));
VLOG(2) << "Model was saved as " << fname;
}
}
} // namespace model
} // namespace data
} // namespace tensorflow

View File

@ -35,6 +35,7 @@ limitations under the License.
#include "tensorflow/core/platform/cpu_info.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/path.h"
namespace tensorflow {
namespace data {
@ -48,11 +49,6 @@ constexpr char kBufferSize[] = "buffer_size";
// A key used to identify the input time of the model.
constexpr char kModelInputTimeKey[] = "model_input_time";
enum class AutotuneAlgorithm {
HILL_CLIMB = 0,
GRADIENT_DESCENT = 1,
};
enum class TraversalOrder {
BFS = 0,
REVERSE_BFS = 1,
@ -641,10 +637,24 @@ std::shared_ptr<Node> MakeUnknownNode(Node::Args args);
// implementation of `DatasetBase` and `DatasetBaseIterator` respectively.
class Model {
public:
using OptimizationParams = ModelProto::OptimizationParams;
// Creates a new model.
Model()
: collect_resource_usage_(false),
optimization_period_ms_(kOptimizationPeriodMinMs) {}
optimization_period_ms_(kOptimizationPeriodMinMs) {
const char* save_dir = std::getenv("TF_DATA_AUTOTUNE_DEBUG_DIR");
if (save_dir) {
save_dir_ = string(save_dir);
}
}
~Model() {
if (!save_dir_.empty()) {
save_thread_cancelled_ = true;
save_cond_var_.notify_all();
}
}
// Indicates whether to collect resource usage.
bool collect_resource_usage() const { return collect_resource_usage_; }
@ -664,7 +674,7 @@ class Model {
// autotuning optimization.
//
// To terminate the execution of the optimization loop, the caller needs to
// to invoke `cancellation_mgr->StartCancel()`.
// invoke `cancellation_mgr->StartCancel()`.
Status OptimizeLoop(AutotuneAlgorithm algorithm, int64 cpu_budget,
int64 ram_budget, CancellationManager* cancellation_mgr);
@ -683,11 +693,24 @@ class Model {
static Status FromProto(ModelProto model_proto,
std::unique_ptr<Model>* model);
// Saves this model with a given snapshot and its optimization parameters to a
// file. Note that the file directory must already exist.
Status Save(const string& fname, std::shared_ptr<Node> snapshot,
const OptimizationParams& optimization_params);
// Loads a model and its optimization parameters from a file with the given
// name.
static Status Load(const string& fname, std::unique_ptr<Model>* model,
OptimizationParams* optimization_params);
private:
static constexpr int64 kOptimizationPeriodMinMs = 10;
static constexpr int64 kOptimizationPeriodMaxMs =
60 * EnvTime::kSecondsToMillis;
// Maximum number of optimization snapshots kept in a buffer for saving.
static constexpr int64 kMaxNumBufferedOptimizeArgs = 100;
// Collects tunable parameters in the tree rooted in the given node, returning
// a mapping from a (unique) node name to a tunable parameter.
absl::flat_hash_map<string, std::shared_ptr<Parameter>>
@ -702,8 +725,8 @@ class Model {
// This process is repeated until all parameters reach their maximum values or
// the projected output time is less than or equal to the processing time
// needed to produce an element divided by CPU budget.
void OptimizeHillClimb(int64 cpu_budget, int64 ram_budget,
double model_input_time);
void OptimizeHillClimb(std::shared_ptr<Node> snapshot,
const OptimizationParams& optimization_params);
// This optimization algorithm starts by setting all tunable parallelism
// parameters to the minimum value. It then improves current parameters by
@ -712,8 +735,8 @@ class Model {
// repeated until either the output time improvement is smaller than threshold
// value or the output time is less than the processing time needed to produce
// an element divided by CPU budget.
void OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget,
double model_input_time);
void OptimizeGradientDescent(std::shared_ptr<Node> snapshot,
const OptimizationParams& optimization_params);
// Collects the output time and if `gradients` is not `nullptr`, the output
// time gradient w.r.t. tunable parameters of the subtree rooted in the given
@ -746,12 +769,21 @@ class Model {
// buffers were full.
double TotalMaximumBufferedBytes(std::shared_ptr<Node> node);
// Starts a model saving thread if it hasn't started yet.
Status EnsureSaveLoopThreadStarted();
// Periodically saves the state of optimization that is kept in
// `save_buffer_`.
//
// The saving loop is terminated when the model is destroyed.
Status SaveLoop();
// Used for coordination between different input pipeline threads. Exclusive
// access is required only when adding or removing nodes. Concurrent access to
// existing nodes is protected by a node mutex.
mutex mu_;
// Used for coordinating the optimization loop and model modifications.
condition_variable cond_var_;
condition_variable optimize_cond_var_;
int64 id_counter_ TF_GUARDED_BY(mu_) = 1;
std::shared_ptr<Node> output_ TF_GUARDED_BY(mu_);
@ -766,6 +798,25 @@ class Model {
// Determines the time the optimization loop should wait between
// running optimizations.
int64 optimization_period_ms_ TF_GUARDED_BY(mu_);
// Thread that runs the model saving loop.
std::unique_ptr<Thread> save_thread_ TF_GUARDED_BY(mu_);
// Used for coordinating the saving loop and model optimization.
condition_variable save_cond_var_;
// Indicates whether the save thread is cancelled.
bool save_thread_cancelled_ = false;
// Contains path to the model saving directory if saving is enabled, empty
// otherwise.
string save_dir_;
// Contains pairs of model snapshots and optimization parameters to be saved
// if model saving is enabled, empty otherwise. Buffer elements are pushed by
// `OptimizeLoop` and popped by `SaveLoop`.
std::deque<std::pair<std::shared_ptr<Node>, OptimizationParams>> save_buffer_
TF_GUARDED_BY(mu_);
};
} // namespace model

View File

@ -14,6 +14,12 @@ enum NodeClass {
UNKNOWN_RATIO = 5;
}
// Algorithm used for model autotuning optimization.
enum AutotuneAlgorithm {
HILL_CLIMB = 0;
GRADIENT_DESCENT = 1;
}
// Protocol buffer representing the data used by the autotuning modeling
// framework.
message ModelProto {
@ -103,4 +109,22 @@ message ModelProto {
// Indicates whether the modeling framework should collect resource usage,
// e.g. CPU, memory.
bool collect_resource_usage = 3;
// Contains parameters of the model autotuning optimization.
message OptimizationParams {
// Algorithm used for autotuning optimization.
AutotuneAlgorithm algorithm = 1;
// Number of available logical threads.
int64 cpu_budget = 2;
// Amount of available memory in bytes.
int64 ram_budget = 3;
// Time between two consecutive `GetNext` calls to the iterator represented
// by the output node.
double model_input_time = 4;
}
OptimizationParams optimization_params = 4;
}

View File

@ -885,7 +885,7 @@ TEST(SnapshotTest, Model) {
}
}
TEST(SerializeModelTest, Model) {
TEST(SaveModelTest, Model) {
model::Model model;
std::shared_ptr<Node> root = model::MakeUnknownNode({0, "unknown0", nullptr});
model.AddNode([&root](model::Node::Args args) { return root; }, root->name(),
@ -941,13 +941,29 @@ TEST(SerializeModelTest, Model) {
current = input;
}
// Make ToProto->FromProto roundtrip.
ModelProto model_proto;
Status status = model.ToProto(&model_proto);
TF_ASSERT_OK(status);
// Make Save->Load roundtrip.
ModelProto::OptimizationParams optimization_params;
optimization_params.set_algorithm(AutotuneAlgorithm::GRADIENT_DESCENT);
optimization_params.set_cpu_budget(64);
optimization_params.set_ram_budget(1024);
optimization_params.set_model_input_time(43653.34534);
TF_ASSERT_OK(model.Save("/tmp/autotune_model_test",
model.output()->Snapshot(), optimization_params));
std::unique_ptr<model::Model> restored_model;
status = model::Model::FromProto(model_proto, &restored_model);
TF_ASSERT_OK(status);
ModelProto::OptimizationParams restored_optimization_params;
TF_ASSERT_OK(model.Load("/tmp/autotune_model_test", &restored_model,
&restored_optimization_params));
// Check optimization parameters.
EXPECT_EQ(optimization_params.algorithm(),
restored_optimization_params.algorithm());
EXPECT_EQ(optimization_params.cpu_budget(),
restored_optimization_params.cpu_budget());
EXPECT_EQ(optimization_params.ram_budget(),
restored_optimization_params.ram_budget());
EXPECT_EQ(optimization_params.model_input_time(),
restored_optimization_params.model_input_time());
// Check that original and restored models hold the same data.
EXPECT_EQ(model.collect_resource_usage(),