Add priority into Run Handler thread pool options. Run handler thread pool will take consideration of the priorities of requests when scheduling ops.

PiperOrigin-RevId: 299199567
Change-Id: I22b99f3bc9e708573d09c56194baaaba7948291e
This commit is contained in:
Chao Xie 2020-03-05 14:49:02 -08:00 committed by TensorFlower Gardener
parent 94c04fc77e
commit e7766eba87
8 changed files with 223 additions and 22 deletions

View File

@ -593,7 +593,9 @@ Status DirectSession::RunInternal(
if (ShouldUseRunHandlerPool(run_options) &&
run_options.experimental().use_run_handler_pool()) {
VLOG(1) << "Using RunHandler to scheduler inter-op closures.";
handler = GetOrCreateRunHandlerPool(options_)->Get(step_id, call_timeout);
handler = GetOrCreateRunHandlerPool(options_)->Get(
step_id, call_timeout,
run_options.experimental().run_handler_pool_options());
if (!handler) {
return errors::DeadlineExceeded(
"Could not obtain RunHandler for request after waiting for ",

View File

@ -19,6 +19,7 @@ limitations under the License.
#include <algorithm>
#include <cmath>
#include <list>
#include <memory>
#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
@ -841,12 +842,15 @@ class RunHandler::Impl {
void ScheduleInterOpClosure(std::function<void()> fn);
void ScheduleIntraOpClosure(std::function<void()> fn);
void Reset(int64 step_id);
void Reset(int64 step_id,
const RunOptions::Experimental::RunHandlerPoolOptions& options);
RunHandlerPool::Impl* pool_impl() { return pool_impl_; }
ThreadWorkSource* tws() { return &tws_; }
int64 priority() { return options_.priority(); }
private:
class ThreadPoolInterfaceWrapper : public thread::ThreadPoolInterface {
public:
@ -866,6 +870,7 @@ class RunHandler::Impl {
int64 step_id_;
std::unique_ptr<thread::ThreadPoolInterface> thread_pool_interface_;
ThreadWorkSource tws_;
RunOptions::Experimental::RunHandlerPoolOptions options_;
};
// Contains shared state across all run handlers present in the pool. Also
@ -891,7 +896,6 @@ class RunHandlerPool::Impl {
std::vector<double>({1}))) {
VLOG(1) << "Creating a RunHandlerPool with max handlers: " << max_handlers_;
free_handlers_.reserve(max_handlers_);
sorted_active_handlers_.reserve(max_handlers_);
handlers_.reserve(max_handlers_);
for (int i = 0; i < max_handlers_; ++i) {
handlers_.emplace_back(new RunHandler::Impl(this));
@ -928,7 +932,9 @@ class RunHandlerPool::Impl {
return !free_handlers_.empty();
}
std::unique_ptr<RunHandler> Get(int64 step_id, int64 timeout_in_ms)
std::unique_ptr<RunHandler> Get(
int64 step_id, int64 timeout_in_ms,
const RunOptions::Experimental::RunHandlerPoolOptions& options)
TF_LOCKS_EXCLUDED(mu_) {
thread_local std::unique_ptr<Eigen::MaxSizeVector<ThreadWorkSource*>>
thread_work_sources =
@ -960,17 +966,24 @@ class RunHandlerPool::Impl {
// Remove the last entry from free_handlers_ and add to the end of
// sorted_active_handlers_.
handler_impl = free_handlers_.back();
handler_impl->Reset(step_id);
// Sortedness isn't violated if we simply add at the end of the list,
// since handlers are expected to be obtained in increasing order of time.
sorted_active_handlers_.push_back(handler_impl);
DCHECK_LE(sorted_active_handlers_.size(), max_handlers_);
handler_impl->Reset(step_id, options);
free_handlers_.pop_back();
num_active_requests = sorted_active_handlers_.size();
num_active_requests = sorted_active_handlers_.size() + 1;
thread_work_sources->resize(num_active_requests);
int priority = options.priority();
auto it = sorted_active_handlers_.cbegin();
bool new_handler_inserted = false;
for (int i = 0; i < num_active_requests; ++i) {
(*thread_work_sources)[i] = sorted_active_handlers_[i]->tws();
if (!new_handler_inserted && (it == sorted_active_handlers_.cend() ||
priority > (*it)->priority())) {
sorted_active_handlers_.insert(it, handler_impl);
new_handler_inserted = true;
// Point to the newly added handler.
--it;
}
(*thread_work_sources)[i] = (*it)->tws();
++it;
}
version = ++version_;
}
@ -1009,6 +1022,16 @@ class RunHandlerPool::Impl {
// requests will trigger recomputation.
}
std::vector<int64> GetActiveHandlerPrioritiesForTesting()
TF_LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
std::vector<int64> ret;
for (const auto& handler_impl : sorted_active_handlers_) {
ret.push_back(handler_impl->priority());
}
return ret;
}
private:
void RecomputePoolStats(
int num_active_requests, uint64 version,
@ -1029,9 +1052,13 @@ class RunHandlerPool::Impl {
// Thread compatible part used only by lock under RunHandlerPool.
// Handlers are sorted by start time.
// TODO(azaks): sort by the remaining latency budget.
std::vector<RunHandler::Impl*> sorted_active_handlers_ TF_GUARDED_BY(mu_);
// TODO(chaox): Consider other data structure for maintaining the sorted
// active handlers if the searching overhead(currently O(n)) becomes the
// bottleneck.
std::list<RunHandler::Impl*> sorted_active_handlers_ TF_GUARDED_BY(mu_);
std::vector<RunHandler::Impl*> free_handlers_ TF_GUARDED_BY(mu_);
std::vector<std::unique_ptr<RunHandler::Impl>> handlers_ TF_GUARDED_BY(mu_);
// Histogram of elapsed runtime of every handler (in ms).
histogram::Histogram time_hist_ TF_GUARDED_BY(mu_);
@ -1092,16 +1119,17 @@ void RunHandlerPool::Impl::LogInfo() {
uint64 now = tensorflow::Env::Default()->NowMicros();
string times_str = "";
string ids_str = "";
auto it = sorted_active_handlers_.cbegin();
for (int i = 0; i < num_active_requests; ++i) {
if (i > 0) {
times_str += " ";
ids_str += " ";
}
times_str += strings::StrCat(
(now - sorted_active_handlers_[i]->start_time_us()) / 1000.0, " ms.");
ids_str +=
strings::StrCat(sorted_active_handlers_[i]->tws()->GetTracemeId());
times_str +=
strings::StrCat((now - (*it)->start_time_us()) / 1000.0, " ms.");
ids_str += strings::StrCat((*it)->tws()->GetTracemeId());
++it;
}
VLOG(1) << "Elapsed times are: " << times_str;
VLOG(1) << "Step ids are: " << ids_str;
@ -1127,7 +1155,7 @@ void RunHandler::Impl::ThreadPoolInterfaceWrapper::Schedule(
RunHandler::Impl::Impl(RunHandlerPool::Impl* pool_impl)
: pool_impl_(pool_impl) {
thread_pool_interface_.reset(new ThreadPoolInterfaceWrapper(this));
Reset(0);
Reset(0, RunOptions::Experimental::RunHandlerPoolOptions());
}
void RunHandler::Impl::ScheduleInterOpClosure(std::function<void()> fn) {
@ -1142,9 +1170,12 @@ void RunHandler::Impl::ScheduleIntraOpClosure(std::function<void()> fn) {
std::move(fn));
}
void RunHandler::Impl::Reset(int64 step_id) {
void RunHandler::Impl::Reset(
int64 step_id,
const RunOptions::Experimental::RunHandlerPoolOptions& options) {
start_time_us_ = tensorflow::Env::Default()->NowMicros();
step_id_ = step_id;
options_ = options;
tws_.SetTracemeId(step_id);
}
@ -1157,9 +1188,15 @@ RunHandlerPool::RunHandlerPool(int num_inter_op_threads,
RunHandlerPool::~RunHandlerPool() {}
std::unique_ptr<RunHandler> RunHandlerPool::Get(int64 step_id,
int64 timeout_in_ms) {
return impl_->Get(step_id, timeout_in_ms);
std::unique_ptr<RunHandler> RunHandlerPool::Get(
int64 step_id, int64 timeout_in_ms,
const RunOptions::Experimental::RunHandlerPoolOptions& options) {
return impl_->Get(step_id, timeout_in_ms, options);
}
std::vector<int64> RunHandlerPool::GetActiveHandlerPrioritiesForTesting()
const {
return impl_->GetActiveHandlerPrioritiesForTesting();
}
RunHandler::RunHandler(Impl* impl) : impl_(impl) {}

View File

@ -62,7 +62,14 @@ class RunHandlerPool {
// unique_ptr is destroyed.
//
// Will block unless there is an inactive handler.
std::unique_ptr<RunHandler> Get(int64 step_id = 0, int64 timeout_in_ms = 0);
std::unique_ptr<RunHandler> Get(
int64 step_id = 0, int64 timeout_in_ms = 0,
const RunOptions::Experimental::RunHandlerPoolOptions& options =
RunOptions::Experimental::RunHandlerPoolOptions());
// Get the priorities for active handlers. The return result is with the same
// order of the active handler list.
std::vector<int64> GetActiveHandlerPrioritiesForTesting() const;
private:
class Impl;

View File

@ -78,6 +78,41 @@ TEST(RunHandlerUtilTest, TestBasicScheduling) {
counter.Wait();
}
TEST(RunHandlerUtilTest, PrioritySchedulingTest) {
int num_threads = 2;
std::unique_ptr<RunHandlerPool> pool(
new RunHandlerPool(num_threads, num_threads));
RunOptions::Experimental::RunHandlerPoolOptions options =
RunOptions::Experimental::RunHandlerPoolOptions();
options.set_priority(2);
auto handler1 = pool->Get(/*step_id=*/1, /*timeout_in_ms=*/0, options);
options.set_priority(1);
auto handler2 = pool->Get(/*step_id=*/2, /*timeout_in_ms=*/0, options);
options.set_priority(3);
auto handler3 = pool->Get(/*step_id=*/3, /*timeout_in_ms=*/0, options);
// The active requests should be ordered by priorites.
std::vector<int64> sorted_active_list =
pool->GetActiveHandlerPrioritiesForTesting();
EXPECT_EQ(sorted_active_list.size(), 3);
EXPECT_EQ(sorted_active_list[0], 3);
EXPECT_EQ(sorted_active_list[1], 2);
EXPECT_EQ(sorted_active_list[2], 1);
handler1.reset();
options.set_priority(5);
auto handler4 = pool->Get(/*step_id=*/4, /*timeout_in_ms=*/0, options);
options.set_priority(4);
auto handler5 = pool->Get(/*step_id=*/5, /*timeout_in_ms=*/0, options);
sorted_active_list = pool->GetActiveHandlerPrioritiesForTesting();
EXPECT_EQ(sorted_active_list.size(), 4);
EXPECT_EQ(sorted_active_list[0], 5);
EXPECT_EQ(sorted_active_list[1], 4);
EXPECT_EQ(sorted_active_list[2], 3);
EXPECT_EQ(sorted_active_list[3], 1);
}
SessionOptions DefaultSessionOptions() {
SessionOptions options;
(*options.config.mutable_device_count())["CPU"] = 2;
@ -205,6 +240,75 @@ TEST_F(RunHandlerTest, TestConcurrencyUseRunHandlerPool) {
delete tp;
}
TEST_F(RunHandlerTest, UseRunHandlerPoolEnableSubPoolWithPriority) {
Initialize({3, 2, -1, 0});
auto session = CreateSession();
ASSERT_TRUE(session != nullptr);
EXPECT_EQ(::tensorflow::Status::OK(), session->Create(def_));
std::vector<std::pair<string, Tensor>> inputs;
// Request two targets: one fetch output and one non-fetched output.
std::vector<string> output_names = {y_ + ":0"};
std::vector<string> target_nodes = {y_neg_};
std::vector<Tensor> outputs;
// Prepares RunOptions and RunMetadata
RunOptions run_options;
run_options.mutable_experimental()->set_use_run_handler_pool(true);
run_options.mutable_experimental()
->mutable_run_handler_pool_options()
->set_priority(1);
Status s = session->Run(run_options, inputs, output_names, target_nodes,
&outputs, nullptr);
EXPECT_EQ(::tensorflow::Status::OK(), s);
ASSERT_EQ(1, outputs.size());
// The first output should be initialized and have the correct
// output.
auto mat = outputs[0].matrix<float>();
ASSERT_TRUE(outputs[0].IsInitialized());
EXPECT_FLOAT_EQ(5.0, mat(0, 0));
}
TEST_F(RunHandlerTest, TestConcurrencyUseRunHandlerPoolWithPriority) {
Initialize({1, 2, 3, 4});
auto session = CreateSession();
ASSERT_TRUE(session != nullptr);
EXPECT_EQ(::tensorflow::Status::OK(), session->Create(def_));
// Fill in the input and ask for the output
thread::ThreadPool* tp = new thread::ThreadPool(Env::Default(), "test", 4);
// Run the graph 1000 times in 4 different threads concurrently.
std::vector<string> output_names = {y_ + ":0"};
auto fn = [&session, output_names]() {
for (int i = 0; i < 1000; ++i) {
RunOptions run_options;
run_options.mutable_experimental()->set_use_run_handler_pool(true);
run_options.mutable_experimental()
->mutable_run_handler_pool_options()
->set_priority(i % 4);
std::vector<std::pair<string, Tensor>> inputs;
std::vector<Tensor> outputs;
// Run the graph
Status s = session->Run(run_options, inputs, output_names, {}, &outputs,
nullptr);
EXPECT_EQ(::tensorflow::Status::OK(), s);
ASSERT_EQ(1, outputs.size());
auto mat = outputs[0].matrix<float>();
EXPECT_FLOAT_EQ(3.0, mat(0, 0));
}
};
for (int i = 0; i < 4; ++i) {
tp->Schedule(fn);
}
// Wait for the functions to finish.
delete tp;
}
TEST_F(RunHandlerTest, TestWaitTimeout) {
std::unique_ptr<RunHandlerPool> pool(new RunHandlerPool(1, 1));

View File

@ -640,6 +640,13 @@ message RunOptions {
// and tail) latency.
// Consider using this option for CPU-bound workloads like inference.
bool use_run_handler_pool = 2;
// Options for run handler thread pool.
message RunHandlerPoolOptions {
// Priority of the request. The run handler thread pool will schedule ops
// based on the priority number. The larger number means higher priority.
int64 priority = 1;
}
RunHandlerPoolOptions run_handler_pool_options = 3;
}
Experimental experimental = 8;

View File

@ -0,0 +1,12 @@
path: "tensorflow.RunOptions.Experimental.RunHandlerPoolOptions"
tf_proto {
descriptor {
name: "RunHandlerPoolOptions"
field {
name: "priority"
number: 1
label: LABEL_OPTIONAL
type: TYPE_INT64
}
}
}

View File

@ -14,5 +14,21 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
field {
name: "run_handler_pool_options"
number: 3
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".tensorflow.RunOptions.Experimental.RunHandlerPoolOptions"
}
nested_type {
name: "RunHandlerPoolOptions"
field {
name: "priority"
number: 1
label: LABEL_OPTIONAL
type: TYPE_INT64
}
}
}
}

View File

@ -61,6 +61,22 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
field {
name: "run_handler_pool_options"
number: 3
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".tensorflow.RunOptions.Experimental.RunHandlerPoolOptions"
}
nested_type {
name: "RunHandlerPoolOptions"
field {
name: "priority"
number: 1
label: LABEL_OPTIONAL
type: TYPE_INT64
}
}
}
enum_type {
name: "TraceLevel"