Automated g4 rollback of changelist 172924803

PiperOrigin-RevId: 172936802
This commit is contained in:
A. Unique TensorFlower 2017-10-20 14:48:27 -07:00 committed by TensorFlower Gardener
parent b2dcbca94c
commit d1e7382af7
4 changed files with 1 additions and 924 deletions

View File

@ -69,28 +69,6 @@ tf_cc_test(
],
)
cc_library(
name = "adaptive_shared_batch_scheduler",
hdrs = ["adaptive_shared_batch_scheduler.h"],
deps = [
":batch_scheduler",
"//tensorflow/contrib/batching/util:periodic_function_dynamic",
"//tensorflow/core:lib",
],
)
tf_cc_test(
name = "adaptive_shared_batch_scheduler_test",
srcs = ["adaptive_shared_batch_scheduler_test.cc"],
deps = [
":adaptive_shared_batch_scheduler",
"//tensorflow/contrib/batching/test_util:fake_clock_env",
"//tensorflow/core:lib",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
],
)
cc_library(
name = "basic_batch_scheduler",
hdrs = ["basic_batch_scheduler.h"],

View File

@ -1,463 +0,0 @@
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_
#define THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_
#include <functional>
#include <memory>
#include <queue>
#include <unordered_map>
#include <vector>
#include "tensorflow/contrib/batching/batch_scheduler.h"
#include "tensorflow/contrib/batching/util/periodic_function.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/core/threadpool.h"
#include "tensorflow/core/platform/cpu_info.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/thread_annotations.h"
#include "tensorflow/core/platform/types.h"
namespace tensorflow {
namespace serving {
namespace internal {
template <typename TaskType>
class ASBSBatch;
template <typename TaskType>
class ASBSQueue;
} // namespace internal
// Shared batch scheduler designed to minimize latency. The scheduler keeps
// track of a number of queues (one per model or model version) which are
// continuously enqueuing requests. The scheduler groups the requests into
// batches which it periodically sends off for processing (see
// shared_batch_scheduler.h for more details). The AdaptiveSharedBatchScheduler
// prioritizes batches by age (i.e. the batch's oldest request) irrespective of
// queue. The scheduler will process the oldest batch at an adjustable rate,
// regardless of batch size. The user can provide feedback to help set this rate
// to achieve some goal (i.e. minimize overall latency, limit cpu usage, etc).
//
// The rate (or rather, the corresponding period) is adjusted each time a batch
// is processed, using an exponentially weighted moving average to smooth
// potentially noisy feedback:
// ewma_feedback = ((N - 1) * ewma_feedback + feedback()) / N
// period *= (1 + K * emwa_feedback)
//
// Some potential use cases:
// Hardware Accelerators (GPUs & TPUs) - If some phase of batch processing
// involves serial processing by a device, from a latency perspective it is
// desirable to keep the device evenly loaded, avoiding the need to wait for
// the device to process prior batches.
// feedback = num_pending_on_device() - desired_pending.
// CPU utilization - If the batch processing is cpu dominated, you can reap
// latency gains when underutilized by increasing the processing rate, but
// back the rate off when the load increases to avoid overload.
// feedback = cpu_rate() - desired_cpu_rate.
template <typename TaskType>
class AdaptiveSharedBatchScheduler
: public std::enable_shared_from_this<
AdaptiveSharedBatchScheduler<TaskType>> {
public:
struct Options {
// The name to use for the pool of batch threads.
string thread_pool_name = {"batch_threads"};
// Number of batch processing threads; equivalently the maximum number of
// concurrently running batches.
int64 num_batch_threads = port::NumSchedulableCPUs();
// The environment to use (typically only overridden by test code).
Env* env = Env::Default();
// Initial batch scheduling period in microseconds. Will be altered for
// non-zero rate_feedback.
double initial_scheduling_period_micros = 500;
// Minimum batch scheduling period in microseconds. Recommend setting this
// value greater than 0, otherwise it may take a while to recover from a
// sustained time of negative scheduling_period_feedback (which may occur
// under low load).
double min_scheduling_period_micros = 100;
// Maximum batch scheduling period in microseconds.
double max_scheduling_period_micros = 10000;
// Feedback function used to modify the scheduling period each time a batch
// is scheduled. Should return values roughly O(1), with positive values
// resulting in an increased period.
std::function<double()> scheduling_period_feedback = [] { return 0.; };
// To handle potentially noisy scheduling_period_feedback, the period is
// adjusted using an exponentially weighted moving average over the previous
// feedback_smoothing_batches batches. Must be greater than 0.
int64 feedback_smoothing_batches = 10;
};
// Ownership is shared between the caller of Create() and any queues created
// via AddQueue().
static Status Create(
const Options& options,
std::shared_ptr<AdaptiveSharedBatchScheduler<TaskType>>* scheduler);
struct QueueOptions {
// Maximum size of each batch.
int max_batch_size = 1000;
// Maximum number of enqueued (i.e. non-scheduled) batches.
int max_enqueued_batches = 10;
};
using BatchProcessor = std::function<void(std::unique_ptr<Batch<TaskType>>)>;
// Adds queue (and its callback) to be managed by this scheduler.
Status AddQueue(const QueueOptions& options,
BatchProcessor process_batch_callback,
std::unique_ptr<BatchScheduler<TaskType>>* queue);
private:
// access to AddBatch, RemoveQueue, GetEnv.
friend class internal::ASBSQueue<TaskType>;
explicit AdaptiveSharedBatchScheduler(const Options& options);
// Batch scheduling function which runs every scheduling_period_ microseconds.
void ProcessOneBatch();
// Notifies scheduler of non-empty batch which is eligible for processing.
void AddBatch(internal::ASBSBatch<TaskType>*);
// Removes queue from scheduler.
void RemoveQueue(const internal::ASBSQueue<TaskType>* queue);
Env* GetEnv() const { return options_.env; }
const Options options_;
struct BatchCompare {
bool operator()(const internal::ASBSBatch<TaskType>* a,
const internal::ASBSBatch<TaskType>* b);
};
// Collection of batches added by AddBatch, ordered by age. Owned by scheduler
// until they are released for processing.
std::priority_queue<const internal::ASBSBatch<TaskType>*,
std::vector<internal::ASBSBatch<TaskType>*>, BatchCompare>
batches_ GUARDED_BY(mu_);
// Unowned queues and callbacks added by AddQueue.
std::unordered_map<const internal::ASBSQueue<TaskType>*, BatchProcessor>
queues_and_callbacks_ GUARDED_BY(mu_);
mutex mu_;
// Responsible for running ProcessOneBatch. PeriodicFunction was used in order
// to check for deletion so that the thread can be shut down.
std::unique_ptr<PeriodicFunction> scheduling_thread_;
// Responsible for running the batch processing callbacks.
std::unique_ptr<thread::ThreadPool> batch_thread_pool_;
// Time interval in microseconds between successive ProcessOneBatch calls.
double scheduling_period_;
// Exponentially weighted moving average of
// options_.scheduling_period_feedback() evaluated in each ProcessOneBatch
// call.
double ewma_feedback_ = 0;
TF_DISALLOW_COPY_AND_ASSIGN(AdaptiveSharedBatchScheduler);
};
//////////////////////////////////////////////////////////
// Implementation details follow. API users need not read.
namespace internal {
// Consolidates tasks into batches, passing them off to the
// AdaptiveSharedBatchScheduler for processing.
template <typename TaskType>
class ASBSQueue : public BatchScheduler<TaskType> {
public:
using QueueOptions =
typename AdaptiveSharedBatchScheduler<TaskType>::QueueOptions;
ASBSQueue(std::shared_ptr<AdaptiveSharedBatchScheduler<TaskType>> scheduler,
const QueueOptions& options);
~ASBSQueue() override;
// Adds task to current batch. Fails if the task size is larger than the batch
// size or if the current batch is full and this queue's number of outstanding
// batches is at its maximum.
Status Schedule(std::unique_ptr<TaskType>* task) override;
// Number of tasks waiting to be scheduled.
size_t NumEnqueuedTasks() const override;
// Number of size 1 tasks which could currently be scheduled without failing.
size_t SchedulingCapacity() const override;
// Notifies queue that a batch is about to be scheduled; the queue should not
// place any more tasks in this batch.
void ReleaseBatch(const ASBSBatch<TaskType>* batch);
private:
std::shared_ptr<AdaptiveSharedBatchScheduler<TaskType>> scheduler_;
const QueueOptions options_;
// Owned by scheduler_.
ASBSBatch<TaskType>* current_batch_ GUARDED_BY(mu_) = nullptr;
int64 num_enqueued_batches_ GUARDED_BY(mu_) = 0;
int64 num_enqueued_tasks_ GUARDED_BY(mu_) = 0;
mutable mutex mu_;
TF_DISALLOW_COPY_AND_ASSIGN(ASBSQueue);
};
// Batch which remembers when and by whom it was created.
template <typename TaskType>
class ASBSBatch : public Batch<TaskType> {
public:
ASBSBatch(ASBSQueue<TaskType>* queue, int64 creation_time_micros)
: queue_(queue), creation_time_micros_(creation_time_micros) {}
~ASBSBatch() override {}
ASBSQueue<TaskType>* queue() const { return queue_; }
int64 creation_time_micros() const { return creation_time_micros_; }
private:
ASBSQueue<TaskType>* queue_;
const int64 creation_time_micros_;
TF_DISALLOW_COPY_AND_ASSIGN(ASBSBatch);
};
} // namespace internal
// ---------------- AdaptiveSharedBatchScheduler ----------------
template <typename TaskType>
Status AdaptiveSharedBatchScheduler<TaskType>::Create(
const Options& options,
std::shared_ptr<AdaptiveSharedBatchScheduler<TaskType>>* scheduler) {
if (options.num_batch_threads < 1) {
return errors::InvalidArgument("num_batch_threads must be positive; was ",
options.num_batch_threads);
}
if (options.min_scheduling_period_micros < 0) {
return errors::InvalidArgument(
"min_scheduling_period_micros must be >= 0; was ",
options.min_scheduling_period_micros);
}
if (options.min_scheduling_period_micros >
options.initial_scheduling_period_micros) {
return errors::InvalidArgument(
"initial_scheduling_period_micros (",
options.initial_scheduling_period_micros,
") must be >= min_scheduling_period_micros (",
options.min_scheduling_period_micros, ")");
}
if (options.initial_scheduling_period_micros >
options.max_scheduling_period_micros) {
return errors::InvalidArgument(
"initial_scheduling_period_micros (",
options.initial_scheduling_period_micros,
") must be <= max_scheduling_period_micros (",
options.max_scheduling_period_micros, ")");
}
if (options.feedback_smoothing_batches < 1) {
return errors::InvalidArgument(
"feedback_smoothing_batches must be positive; was ",
options.feedback_smoothing_batches);
}
scheduler->reset(new AdaptiveSharedBatchScheduler<TaskType>(options));
return Status::OK();
}
template <typename TaskType>
AdaptiveSharedBatchScheduler<TaskType>::AdaptiveSharedBatchScheduler(
const Options& options)
: options_(options),
scheduling_period_(options.initial_scheduling_period_micros) {
PeriodicFunction::Options opts;
opts.thread_name_prefix = "scheduling_thread";
opts.env = GetEnv();
scheduling_thread_.reset(
new PeriodicFunction([this] { ProcessOneBatch(); }, 0, opts));
batch_thread_pool_.reset(new thread::ThreadPool(
GetEnv(), options.thread_pool_name, options.num_batch_threads));
}
template <typename TaskType>
Status AdaptiveSharedBatchScheduler<TaskType>::AddQueue(
const QueueOptions& options, BatchProcessor process_batch_callback,
std::unique_ptr<BatchScheduler<TaskType>>* queue) {
if (options.max_batch_size <= 0) {
return errors::InvalidArgument("max_batch_size must be positive; was ",
options.max_batch_size);
}
if (options.max_enqueued_batches <= 0) {
return errors::InvalidArgument(
"max_enqueued_batches must be positive; was ",
options.max_enqueued_batches);
}
internal::ASBSQueue<TaskType>* asbs_queue_raw;
queue->reset(asbs_queue_raw = new internal::ASBSQueue<TaskType>(
this->shared_from_this(), options));
mutex_lock l(mu_);
queues_and_callbacks_[asbs_queue_raw] = process_batch_callback;
return Status::OK();
}
template <typename TaskType>
void AdaptiveSharedBatchScheduler<TaskType>::AddBatch(
internal::ASBSBatch<TaskType>* batch) {
mutex_lock l(mu_);
batches_.push(batch);
}
template <typename TaskType>
void AdaptiveSharedBatchScheduler<TaskType>::RemoveQueue(
const internal::ASBSQueue<TaskType>* queue) {
mutex_lock l(mu_);
queues_and_callbacks_.erase(queue);
}
template <typename TaskType>
void AdaptiveSharedBatchScheduler<TaskType>::ProcessOneBatch() {
static const double kFeedbackMultiplier = .001;
internal::ASBSBatch<TaskType>* batch = nullptr;
BatchProcessor callback;
const int64 start_time_micros = GetEnv()->NowMicros();
{
mutex_lock l(mu_);
if (!batches_.empty()) {
batch = batches_.top();
batches_.pop();
callback = queues_and_callbacks_[batch->queue()];
}
}
if (batch != nullptr) {
double feedback = options_.scheduling_period_feedback();
const int64 N = options_.feedback_smoothing_batches;
ewma_feedback_ = ((N - 1) * ewma_feedback_ + feedback) / N;
scheduling_period_ *= (1 + kFeedbackMultiplier * ewma_feedback_);
if (scheduling_period_ < options_.min_scheduling_period_micros) {
scheduling_period_ = options_.min_scheduling_period_micros;
} else if (scheduling_period_ > options_.max_scheduling_period_micros) {
scheduling_period_ = options_.max_scheduling_period_micros;
}
// Queue may destroy itself after ReleaseBatch is called.
batch->queue()->ReleaseBatch(batch);
batch_thread_pool_->Schedule([callback, batch] {
callback(std::unique_ptr<Batch<TaskType>>(batch));
});
}
const int64 sleep_time =
scheduling_period_ - (GetEnv()->NowMicros() - start_time_micros);
if (sleep_time > 0) {
GetEnv()->SleepForMicroseconds(sleep_time);
}
}
template <typename TaskType>
bool AdaptiveSharedBatchScheduler<TaskType>::BatchCompare::operator()(
const internal::ASBSBatch<TaskType>* a,
const internal::ASBSBatch<TaskType>* b) {
return a->creation_time_micros() > b->creation_time_micros();
}
// ---------------- ASBSQueue ----------------
namespace internal {
template <typename TaskType>
ASBSQueue<TaskType>::ASBSQueue(
std::shared_ptr<AdaptiveSharedBatchScheduler<TaskType>> scheduler,
const QueueOptions& options)
: scheduler_(scheduler), options_(options) {}
template <typename TaskType>
ASBSQueue<TaskType>::~ASBSQueue() {
// Wait until last batch has been scheduled.
const int kSleepMicros = 1000;
for (;;) {
{
mutex_lock l(mu_);
if (num_enqueued_batches_ == 0) {
break;
}
}
scheduler_->GetEnv()->SleepForMicroseconds(kSleepMicros);
}
scheduler_->RemoveQueue(this);
}
template <typename TaskType>
Status ASBSQueue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
bool added_new_batch = false;
size_t size = (*task)->size();
if (size > options_.max_batch_size) {
return errors::InvalidArgument("Task size ", size,
" is larger than maximum batch size ",
options_.max_batch_size);
}
{
mutex_lock l(mu_);
// Current batch is full, create another if allowed.
if (current_batch_ &&
current_batch_->size() + size > options_.max_batch_size) {
if (num_enqueued_batches_ >= options_.max_enqueued_batches) {
return errors::Unavailable("The batch scheduling queue is full");
}
current_batch_->Close();
current_batch_ = nullptr;
}
if (!current_batch_) {
added_new_batch = true;
num_enqueued_batches_++;
current_batch_ =
new ASBSBatch<TaskType>(this, scheduler_->GetEnv()->NowMicros());
}
current_batch_->AddTask(std::move(*task));
num_enqueued_tasks_++;
}
if (added_new_batch) scheduler_->AddBatch(current_batch_);
return Status::OK();
}
template <typename TaskType>
void ASBSQueue<TaskType>::ReleaseBatch(const ASBSBatch<TaskType>* batch) {
mutex_lock l(mu_);
num_enqueued_batches_--;
num_enqueued_tasks_ -= batch->num_tasks();
if (batch == current_batch_) {
current_batch_->Close();
current_batch_ = nullptr;
}
}
template <typename TaskType>
size_t ASBSQueue<TaskType>::NumEnqueuedTasks() const {
mutex_lock l(mu_);
return num_enqueued_tasks_;
}
template <typename TaskType>
size_t ASBSQueue<TaskType>::SchedulingCapacity() const {
mutex_lock l(mu_);
const int current_batch_capacity =
current_batch_ ? options_.max_batch_size - current_batch_->size() : 0;
const int spare_batches =
options_.max_enqueued_batches - num_enqueued_batches_;
return spare_batches * options_.max_batch_size + current_batch_capacity;
}
} // namespace internal
} // namespace serving
} // namespace tensorflow
#endif // THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_

View File

@ -1,438 +0,0 @@
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h"
#include "tensorflow/contrib/batching/test_util/fake_clock_env.h"
#include "tensorflow/core/lib/core/notification.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/test.h"
namespace tensorflow {
namespace serving {
namespace anonymous {
class FakeTask : public BatchTask {
public:
explicit FakeTask(size_t size) : size_(size) {}
~FakeTask() override = default;
size_t size() const override { return size_; }
private:
const size_t size_;
TF_DISALLOW_COPY_AND_ASSIGN(FakeTask);
};
// Creates a FakeTask of size 'task_size', and calls 'scheduler->Schedule()' on
// that task. Returns the resulting status.
Status ScheduleTask(size_t task_size, BatchScheduler<FakeTask>* scheduler) {
std::unique_ptr<FakeTask> task(new FakeTask(task_size));
Status status = scheduler->Schedule(&task);
// Schedule() should have consumed 'task' iff it returned Status::OK.
CHECK_EQ(status.ok(), task == nullptr);
return status;
}
// Creates a thread that waits on 'start' and then advances the fake clock in
// 'env' in a loop until 'stop' is notified. Useful for allowing objects that
// use the clock to be destroyed.
std::unique_ptr<Thread> CreateFakeClockAdvancerThread(
test_util::FakeClockEnv* env, Notification* start, Notification* stop) {
return std::unique_ptr<Thread>(Env::Default()->StartThread(
{}, "FakeClockAdvancerThread", [env, start, stop] {
start->WaitForNotification();
while (!stop->HasBeenNotified()) {
env->AdvanceByMicroseconds(10);
Env::Default()->SleepForMicroseconds(10);
}
}));
}
TEST(AdaptiveSharedBatchSchedulerTest, Basic) {
for (const bool delete_scheduler_early : {false, true}) {
for (const bool delete_queue_1_early : {false, true}) {
int queue_0_tasks = 0;
auto queue_0_callback =
[&queue_0_tasks](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
EXPECT_GT(batch->num_tasks(), 0);
for (int i = 0; i < batch->num_tasks(); i++) {
queue_0_tasks += batch->task(i).size();
}
};
int queue_1_tasks = 0;
auto queue_1_callback =
[&queue_1_tasks](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
EXPECT_GT(batch->num_tasks(), 0);
for (int i = 0; i < batch->num_tasks(); i++) {
queue_1_tasks += batch->task(i).size();
}
};
{
std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(
AdaptiveSharedBatchScheduler<FakeTask>::Create({}, &scheduler));
// Create two queues.
std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
TF_ASSERT_OK(scheduler->AddQueue({}, queue_0_callback, &queue_0));
std::unique_ptr<BatchScheduler<FakeTask>> queue_1;
TF_ASSERT_OK(scheduler->AddQueue({}, queue_1_callback, &queue_1));
if (delete_scheduler_early) {
// Delete our copy of the scheduler. The queues should keep it alive
// under the covers.
scheduler = nullptr;
}
// Submit tasks to the two queues, and (optionally) remove the queues.
TF_ASSERT_OK(ScheduleTask(1, queue_0.get()));
TF_ASSERT_OK(ScheduleTask(2, queue_1.get()));
TF_ASSERT_OK(ScheduleTask(3, queue_0.get()));
TF_ASSERT_OK(ScheduleTask(4, queue_1.get()));
if (delete_queue_1_early) {
queue_1 = nullptr;
}
TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
}
EXPECT_EQ(queue_0_tasks, 9);
EXPECT_EQ(queue_1_tasks, 6);
}
}
}
TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
using Scheduler = AdaptiveSharedBatchScheduler<FakeTask>;
std::shared_ptr<Scheduler> scheduler;
Scheduler::Options options;
options.num_batch_threads = 0;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
options = Scheduler::Options();
options.min_scheduling_period_micros = 50;
options.max_scheduling_period_micros = 100;
options.initial_scheduling_period_micros = 1;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
options = Scheduler::Options();
options.min_scheduling_period_micros = 50;
options.max_scheduling_period_micros = 100;
options.initial_scheduling_period_micros = 1000;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
options = Scheduler::Options();
options.min_scheduling_period_micros = 100;
options.max_scheduling_period_micros = 50;
options.initial_scheduling_period_micros = 75;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
options = Scheduler::Options();
options.feedback_smoothing_batches = 0;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
}
TEST(AdaptiveSharedBatchSchedulerTest, ObeysQueueOptions) {
test_util::FakeClockEnv env(Env::Default());
Notification start_teardown, stop_teardown;
std::unique_ptr<Thread> teardown_thread =
CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
{
AdaptiveSharedBatchScheduler<FakeTask>::Options options;
options.initial_scheduling_period_micros = 1000;
options.env = &env;
std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(
AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
std::unique_ptr<BatchScheduler<FakeTask>> queue_1;
int queue_0_tasks = 0;
int queue_1_tasks = 0;
auto queue_0_callback = [&queue_0_tasks,
&env](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
EXPECT_GT(batch->num_tasks(), 0);
for (int i = 0; i < batch->num_tasks(); i++) {
queue_0_tasks += batch->task(i).size();
}
env.SleepForMicroseconds(1);
};
auto queue_1_callback = [&queue_1_tasks,
&env](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
EXPECT_GT(batch->num_tasks(), 0);
for (int i = 0; i < batch->num_tasks(); i++) {
queue_1_tasks += batch->task(i).size();
}
env.SleepForMicroseconds(1);
};
AdaptiveSharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.max_batch_size = 10;
queue_options.max_enqueued_batches = 0;
// Queue must have max_enqueued_batchs > 1.
EXPECT_FALSE(
scheduler->AddQueue(queue_options, queue_0_callback, &queue_0).ok());
queue_options.max_enqueued_batches = 2;
TF_ASSERT_OK(
scheduler->AddQueue(queue_options, queue_0_callback, &queue_0));
queue_options.max_batch_size = 0;
// Queue must have max_batch_size > 0.
EXPECT_FALSE(
scheduler->AddQueue(queue_options, queue_1_callback, &queue_1).ok());
queue_options.max_batch_size = 2;
queue_options.max_enqueued_batches = 1;
TF_ASSERT_OK(
scheduler->AddQueue(queue_options, queue_1_callback, &queue_1));
// Wait for scheduling_thread to sleep.
env.BlockUntilThreadsAsleep(1);
// Task larger than max_batch_size shouldn't schedule.
EXPECT_FALSE(ScheduleTask(15, queue_0.get()).ok());
TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
env.AdvanceByMicroseconds(1);
// Task larger than max_batch_size shouldn't schedule.
EXPECT_FALSE(ScheduleTask(3, queue_1.get()).ok());
TF_ASSERT_OK(ScheduleTask(1, queue_1.get()));
TF_ASSERT_OK(ScheduleTask(1, queue_1.get()));
env.AdvanceByMicroseconds(1);
// Exceeds max_enqueued_batches, shouldn't schedule.
EXPECT_FALSE(ScheduleTask(1, queue_1.get()).ok());
TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
// Exceeds max_enqueued_batches, shouldn't schedule.
EXPECT_FALSE(ScheduleTask(6, queue_0.get()).ok());
TF_ASSERT_OK(ScheduleTask(4, queue_0.get()));
// Batches should be processed in order from oldest to newest.
env.AdvanceByMicroseconds(1000);
env.BlockUntilThreadsAsleep(2);
EXPECT_EQ(queue_0_tasks, 10);
EXPECT_EQ(queue_1_tasks, 0);
env.AdvanceByMicroseconds(1000);
env.BlockUntilThreadsAsleep(2);
EXPECT_EQ(queue_0_tasks, 10);
EXPECT_EQ(queue_1_tasks, 2);
env.AdvanceByMicroseconds(1000);
env.BlockUntilThreadsAsleep(2);
EXPECT_EQ(queue_0_tasks, 19);
EXPECT_EQ(queue_1_tasks, 2);
start_teardown.Notify();
}
stop_teardown.Notify();
}
TEST(AdaptiveSharedBatchSchedulerTest, RateFeedback) {
test_util::FakeClockEnv env(Env::Default());
Notification start_teardown, stop_teardown;
std::unique_ptr<Thread> teardown_thread =
CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
{
double feedback = 0;
AdaptiveSharedBatchScheduler<FakeTask>::Options options;
options.initial_scheduling_period_micros = 1000;
options.min_scheduling_period_micros = 200;
options.max_scheduling_period_micros = 2000;
options.env = &env;
options.scheduling_period_feedback = [&feedback] { return feedback; };
options.feedback_smoothing_batches = 1;
std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(
AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
std::unique_ptr<BatchScheduler<FakeTask>> queue;
int scheduled_items = 0;
auto queue_callback = [&scheduled_items,
&env](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
EXPECT_GT(batch->num_tasks(), 0);
scheduled_items = 0;
for (int i = 0; i < batch->num_tasks(); i++) {
scheduled_items += batch->task(i).size();
}
env.SleepForMicroseconds(1);
};
TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
// Wait for scheduling_thread to sleep.
env.BlockUntilThreadsAsleep(1);
// Enqueue 6 batches.
for (int i = 0; i < 6; i++) {
TF_ASSERT_OK(ScheduleTask(900 + i, queue.get()));
env.AdvanceByMicroseconds(1);
}
feedback = -500;
env.AdvanceByMicroseconds(994);
env.BlockUntilThreadsAsleep(2); // scheduling period = 500 usec.
EXPECT_EQ(scheduled_items, 900);
env.AdvanceByMicroseconds(500);
env.BlockUntilThreadsAsleep(2); // scheduling period = 250 usec.
EXPECT_EQ(scheduled_items, 901);
feedback = 0;
env.AdvanceByMicroseconds(250);
env.BlockUntilThreadsAsleep(2); // scheduling period = 250 usec.
EXPECT_EQ(scheduled_items, 902);
feedback = 10000; // large feedback should hit max_scheduling_period.
env.AdvanceByMicroseconds(250);
env.BlockUntilThreadsAsleep(2); // scheduling period = 2000 usec.
EXPECT_EQ(scheduled_items, 903);
feedback = -10000; // large feedback should hit min_scheduling_period.
env.AdvanceByMicroseconds(1999);
// No callback scheduled, only scheduling thread sleeping.
env.BlockUntilThreadsAsleep(1);
EXPECT_EQ(scheduled_items, 903);
env.AdvanceByMicroseconds(1);
env.BlockUntilThreadsAsleep(2); // scheduling period = 200 usec.
EXPECT_EQ(scheduled_items, 904);
env.AdvanceByMicroseconds(200);
env.BlockUntilThreadsAsleep(2);
EXPECT_EQ(scheduled_items, 905);
start_teardown.Notify();
}
stop_teardown.Notify();
}
TEST(AdaptiveSharedBatchSchedulerTest, FeedbackSmoothing) {
test_util::FakeClockEnv env(Env::Default());
Notification start_teardown, stop_teardown;
std::unique_ptr<Thread> teardown_thread =
CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
{
double feedback = 0;
AdaptiveSharedBatchScheduler<FakeTask>::Options options;
options.initial_scheduling_period_micros = 1000;
options.env = &env;
options.scheduling_period_feedback = [&feedback] { return feedback; };
options.feedback_smoothing_batches = 3;
std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(
AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
std::unique_ptr<BatchScheduler<FakeTask>> queue;
int scheduled_items = 0;
auto queue_callback = [&scheduled_items,
&env](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
EXPECT_GT(batch->num_tasks(), 0);
scheduled_items = 0;
for (int i = 0; i < batch->num_tasks(); i++) {
scheduled_items += batch->task(i).size();
}
env.SleepForMicroseconds(1);
};
TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
// Wait for scheduling_thread to sleep.
env.BlockUntilThreadsAsleep(1);
// Enqueue 4 batches.
for (int i = 0; i < 4; i++) {
TF_ASSERT_OK(ScheduleTask(900 + i, queue.get()));
env.AdvanceByMicroseconds(1);
}
feedback = -300;
env.AdvanceByMicroseconds(996);
env.BlockUntilThreadsAsleep(2);
// ewma_feedback = 100, scheduling_period = 900.
EXPECT_EQ(scheduled_items, 900);
env.AdvanceByMicroseconds(899);
// No callback scheduled, only scheduling thread sleeping.
env.BlockUntilThreadsAsleep(1);
EXPECT_EQ(scheduled_items, 900);
env.AdvanceByMicroseconds(1);
env.BlockUntilThreadsAsleep(2);
// ewma_feedback = 167, scheduling_period = 750.
EXPECT_EQ(scheduled_items, 901);
env.AdvanceByMicroseconds(749);
// No callback scheduled, only scheduling thread sleeping.
env.BlockUntilThreadsAsleep(1);
EXPECT_EQ(scheduled_items, 901);
feedback = 1000 / 3.;
env.AdvanceByMicroseconds(1);
env.BlockUntilThreadsAsleep(2);
// emwa_feedback = 0, scheduling_period = 750.
EXPECT_EQ(scheduled_items, 902);
env.AdvanceByMicroseconds(749);
// No callback scheduled, only scheduling thread sleeping.
env.BlockUntilThreadsAsleep(1);
EXPECT_EQ(scheduled_items, 902);
env.AdvanceByMicroseconds(1);
env.BlockUntilThreadsAsleep(2);
EXPECT_EQ(scheduled_items, 903);
start_teardown.Notify();
}
stop_teardown.Notify();
}
TEST(AdaptiveSharedBatchSchedulerTest, QueueCapacityInfo) {
test_util::FakeClockEnv env(Env::Default());
Notification start_teardown, stop_teardown;
std::unique_ptr<Thread> teardown_thread =
CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
{
AdaptiveSharedBatchScheduler<FakeTask>::Options options;
options.initial_scheduling_period_micros = 1000;
options.env = &env;
std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
TF_ASSERT_OK(
AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
std::unique_ptr<BatchScheduler<FakeTask>> queue;
int scheduled_items = 0;
auto queue_callback = [&scheduled_items,
&env](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
EXPECT_GT(batch->num_tasks(), 0);
scheduled_items = 0;
for (int i = 0; i < batch->num_tasks(); i++) {
scheduled_items += batch->task(i).size();
}
env.SleepForMicroseconds(1);
};
AdaptiveSharedBatchScheduler<FakeTask>::QueueOptions queue_options;
queue_options.max_batch_size = 10;
queue_options.max_enqueued_batches = 10;
TF_ASSERT_OK(scheduler->AddQueue(queue_options, queue_callback, &queue));
// Wait for scheduling_thread to sleep.
env.BlockUntilThreadsAsleep(1);
// Enqueue 3 tasks.
EXPECT_EQ(queue->NumEnqueuedTasks(), 0);
EXPECT_EQ(queue->SchedulingCapacity(), 100);
TF_ASSERT_OK(ScheduleTask(5, queue.get()));
EXPECT_EQ(queue->NumEnqueuedTasks(), 1);
EXPECT_EQ(queue->SchedulingCapacity(), 95);
env.AdvanceByMicroseconds(1);
TF_ASSERT_OK(ScheduleTask(6, queue.get()));
EXPECT_EQ(queue->NumEnqueuedTasks(), 2);
EXPECT_EQ(queue->SchedulingCapacity(), 84);
env.AdvanceByMicroseconds(1);
TF_ASSERT_OK(ScheduleTask(1, queue.get()));
EXPECT_EQ(queue->NumEnqueuedTasks(), 3);
EXPECT_EQ(queue->SchedulingCapacity(), 83);
env.AdvanceByMicroseconds(998);
env.BlockUntilThreadsAsleep(2);
EXPECT_EQ(scheduled_items, 5);
env.AdvanceByMicroseconds(1000);
env.BlockUntilThreadsAsleep(2);
EXPECT_EQ(scheduled_items, 7);
start_teardown.Notify();
}
stop_teardown.Notify();
}
} // namespace anonymous
} // namespace serving
} // namespace tensorflow

View File

@ -78,7 +78,7 @@ template <typename TaskType>
class Batch {
public:
Batch() = default;
virtual ~Batch(); // Blocks until the batch is closed.
~Batch(); // Blocks until the batch is closed.
// Appends 'task' to the batch. After calling AddTask(), the newly-added task
// can be accessed via task(num_tasks()-1) or mutable_task(num_tasks()-1).