diff --git a/tensorflow/contrib/batching/BUILD b/tensorflow/contrib/batching/BUILD index ae3f48f1b27..1555a3427fd 100644 --- a/tensorflow/contrib/batching/BUILD +++ b/tensorflow/contrib/batching/BUILD @@ -69,28 +69,6 @@ tf_cc_test( ], ) -cc_library( - name = "adaptive_shared_batch_scheduler", - hdrs = ["adaptive_shared_batch_scheduler.h"], - deps = [ - ":batch_scheduler", - "//tensorflow/contrib/batching/util:periodic_function_dynamic", - "//tensorflow/core:lib", - ], -) - -tf_cc_test( - name = "adaptive_shared_batch_scheduler_test", - srcs = ["adaptive_shared_batch_scheduler_test.cc"], - deps = [ - ":adaptive_shared_batch_scheduler", - "//tensorflow/contrib/batching/test_util:fake_clock_env", - "//tensorflow/core:lib", - "//tensorflow/core:test", - "//tensorflow/core:test_main", - ], -) - cc_library( name = "basic_batch_scheduler", hdrs = ["basic_batch_scheduler.h"], diff --git a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h deleted file mode 100644 index ac32f096395..00000000000 --- a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h +++ /dev/null @@ -1,463 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_ -#define THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_ - -#include -#include -#include -#include -#include - -#include "tensorflow/contrib/batching/batch_scheduler.h" -#include "tensorflow/contrib/batching/util/periodic_function.h" -#include "tensorflow/core/lib/core/errors.h" -#include "tensorflow/core/lib/core/status.h" -#include "tensorflow/core/lib/core/threadpool.h" -#include "tensorflow/core/platform/cpu_info.h" -#include "tensorflow/core/platform/env.h" -#include "tensorflow/core/platform/thread_annotations.h" -#include "tensorflow/core/platform/types.h" - -namespace tensorflow { -namespace serving { -namespace internal { -template -class ASBSBatch; - -template -class ASBSQueue; -} // namespace internal - -// Shared batch scheduler designed to minimize latency. The scheduler keeps -// track of a number of queues (one per model or model version) which are -// continuously enqueuing requests. The scheduler groups the requests into -// batches which it periodically sends off for processing (see -// shared_batch_scheduler.h for more details). The AdaptiveSharedBatchScheduler -// prioritizes batches by age (i.e. the batch's oldest request) irrespective of -// queue. The scheduler will process the oldest batch at an adjustable rate, -// regardless of batch size. The user can provide feedback to help set this rate -// to achieve some goal (i.e. minimize overall latency, limit cpu usage, etc). -// -// The rate (or rather, the corresponding period) is adjusted each time a batch -// is processed, using an exponentially weighted moving average to smooth -// potentially noisy feedback: -// ewma_feedback = ((N - 1) * ewma_feedback + feedback()) / N -// period *= (1 + K * emwa_feedback) -// -// Some potential use cases: -// Hardware Accelerators (GPUs & TPUs) - If some phase of batch processing -// involves serial processing by a device, from a latency perspective it is -// desirable to keep the device evenly loaded, avoiding the need to wait for -// the device to process prior batches. -// feedback = num_pending_on_device() - desired_pending. -// CPU utilization - If the batch processing is cpu dominated, you can reap -// latency gains when underutilized by increasing the processing rate, but -// back the rate off when the load increases to avoid overload. -// feedback = cpu_rate() - desired_cpu_rate. - -template -class AdaptiveSharedBatchScheduler - : public std::enable_shared_from_this< - AdaptiveSharedBatchScheduler> { - public: - struct Options { - // The name to use for the pool of batch threads. - string thread_pool_name = {"batch_threads"}; - // Number of batch processing threads; equivalently the maximum number of - // concurrently running batches. - int64 num_batch_threads = port::NumSchedulableCPUs(); - // The environment to use (typically only overridden by test code). - Env* env = Env::Default(); - // Initial batch scheduling period in microseconds. Will be altered for - // non-zero rate_feedback. - double initial_scheduling_period_micros = 500; - // Minimum batch scheduling period in microseconds. Recommend setting this - // value greater than 0, otherwise it may take a while to recover from a - // sustained time of negative scheduling_period_feedback (which may occur - // under low load). - double min_scheduling_period_micros = 100; - // Maximum batch scheduling period in microseconds. - double max_scheduling_period_micros = 10000; - // Feedback function used to modify the scheduling period each time a batch - // is scheduled. Should return values roughly O(1), with positive values - // resulting in an increased period. - std::function scheduling_period_feedback = [] { return 0.; }; - // To handle potentially noisy scheduling_period_feedback, the period is - // adjusted using an exponentially weighted moving average over the previous - // feedback_smoothing_batches batches. Must be greater than 0. - int64 feedback_smoothing_batches = 10; - }; - - // Ownership is shared between the caller of Create() and any queues created - // via AddQueue(). - static Status Create( - const Options& options, - std::shared_ptr>* scheduler); - - struct QueueOptions { - // Maximum size of each batch. - int max_batch_size = 1000; - // Maximum number of enqueued (i.e. non-scheduled) batches. - int max_enqueued_batches = 10; - }; - - using BatchProcessor = std::function>)>; - - // Adds queue (and its callback) to be managed by this scheduler. - Status AddQueue(const QueueOptions& options, - BatchProcessor process_batch_callback, - std::unique_ptr>* queue); - - private: - // access to AddBatch, RemoveQueue, GetEnv. - friend class internal::ASBSQueue; - - explicit AdaptiveSharedBatchScheduler(const Options& options); - - // Batch scheduling function which runs every scheduling_period_ microseconds. - void ProcessOneBatch(); - - // Notifies scheduler of non-empty batch which is eligible for processing. - void AddBatch(internal::ASBSBatch*); - - // Removes queue from scheduler. - void RemoveQueue(const internal::ASBSQueue* queue); - - Env* GetEnv() const { return options_.env; } - - const Options options_; - - struct BatchCompare { - bool operator()(const internal::ASBSBatch* a, - const internal::ASBSBatch* b); - }; - - // Collection of batches added by AddBatch, ordered by age. Owned by scheduler - // until they are released for processing. - std::priority_queue*, - std::vector*>, BatchCompare> - batches_ GUARDED_BY(mu_); - - // Unowned queues and callbacks added by AddQueue. - std::unordered_map*, BatchProcessor> - queues_and_callbacks_ GUARDED_BY(mu_); - - mutex mu_; - - // Responsible for running ProcessOneBatch. PeriodicFunction was used in order - // to check for deletion so that the thread can be shut down. - std::unique_ptr scheduling_thread_; - - // Responsible for running the batch processing callbacks. - std::unique_ptr batch_thread_pool_; - - // Time interval in microseconds between successive ProcessOneBatch calls. - double scheduling_period_; - - // Exponentially weighted moving average of - // options_.scheduling_period_feedback() evaluated in each ProcessOneBatch - // call. - double ewma_feedback_ = 0; - - TF_DISALLOW_COPY_AND_ASSIGN(AdaptiveSharedBatchScheduler); -}; - -////////////////////////////////////////////////////////// -// Implementation details follow. API users need not read. - -namespace internal { -// Consolidates tasks into batches, passing them off to the -// AdaptiveSharedBatchScheduler for processing. -template -class ASBSQueue : public BatchScheduler { - public: - using QueueOptions = - typename AdaptiveSharedBatchScheduler::QueueOptions; - - ASBSQueue(std::shared_ptr> scheduler, - const QueueOptions& options); - - ~ASBSQueue() override; - - // Adds task to current batch. Fails if the task size is larger than the batch - // size or if the current batch is full and this queue's number of outstanding - // batches is at its maximum. - Status Schedule(std::unique_ptr* task) override; - - // Number of tasks waiting to be scheduled. - size_t NumEnqueuedTasks() const override; - - // Number of size 1 tasks which could currently be scheduled without failing. - size_t SchedulingCapacity() const override; - - // Notifies queue that a batch is about to be scheduled; the queue should not - // place any more tasks in this batch. - void ReleaseBatch(const ASBSBatch* batch); - - private: - std::shared_ptr> scheduler_; - const QueueOptions options_; - // Owned by scheduler_. - ASBSBatch* current_batch_ GUARDED_BY(mu_) = nullptr; - int64 num_enqueued_batches_ GUARDED_BY(mu_) = 0; - int64 num_enqueued_tasks_ GUARDED_BY(mu_) = 0; - mutable mutex mu_; - TF_DISALLOW_COPY_AND_ASSIGN(ASBSQueue); -}; - -// Batch which remembers when and by whom it was created. -template -class ASBSBatch : public Batch { - public: - ASBSBatch(ASBSQueue* queue, int64 creation_time_micros) - : queue_(queue), creation_time_micros_(creation_time_micros) {} - - ~ASBSBatch() override {} - - ASBSQueue* queue() const { return queue_; } - - int64 creation_time_micros() const { return creation_time_micros_; } - - private: - ASBSQueue* queue_; - const int64 creation_time_micros_; - TF_DISALLOW_COPY_AND_ASSIGN(ASBSBatch); -}; -} // namespace internal - -// ---------------- AdaptiveSharedBatchScheduler ---------------- - -template -Status AdaptiveSharedBatchScheduler::Create( - const Options& options, - std::shared_ptr>* scheduler) { - if (options.num_batch_threads < 1) { - return errors::InvalidArgument("num_batch_threads must be positive; was ", - options.num_batch_threads); - } - if (options.min_scheduling_period_micros < 0) { - return errors::InvalidArgument( - "min_scheduling_period_micros must be >= 0; was ", - options.min_scheduling_period_micros); - } - if (options.min_scheduling_period_micros > - options.initial_scheduling_period_micros) { - return errors::InvalidArgument( - "initial_scheduling_period_micros (", - options.initial_scheduling_period_micros, - ") must be >= min_scheduling_period_micros (", - options.min_scheduling_period_micros, ")"); - } - if (options.initial_scheduling_period_micros > - options.max_scheduling_period_micros) { - return errors::InvalidArgument( - "initial_scheduling_period_micros (", - options.initial_scheduling_period_micros, - ") must be <= max_scheduling_period_micros (", - options.max_scheduling_period_micros, ")"); - } - if (options.feedback_smoothing_batches < 1) { - return errors::InvalidArgument( - "feedback_smoothing_batches must be positive; was ", - options.feedback_smoothing_batches); - } - scheduler->reset(new AdaptiveSharedBatchScheduler(options)); - return Status::OK(); -} - -template -AdaptiveSharedBatchScheduler::AdaptiveSharedBatchScheduler( - const Options& options) - : options_(options), - scheduling_period_(options.initial_scheduling_period_micros) { - PeriodicFunction::Options opts; - opts.thread_name_prefix = "scheduling_thread"; - opts.env = GetEnv(); - scheduling_thread_.reset( - new PeriodicFunction([this] { ProcessOneBatch(); }, 0, opts)); - batch_thread_pool_.reset(new thread::ThreadPool( - GetEnv(), options.thread_pool_name, options.num_batch_threads)); -} - -template -Status AdaptiveSharedBatchScheduler::AddQueue( - const QueueOptions& options, BatchProcessor process_batch_callback, - std::unique_ptr>* queue) { - if (options.max_batch_size <= 0) { - return errors::InvalidArgument("max_batch_size must be positive; was ", - options.max_batch_size); - } - if (options.max_enqueued_batches <= 0) { - return errors::InvalidArgument( - "max_enqueued_batches must be positive; was ", - options.max_enqueued_batches); - } - internal::ASBSQueue* asbs_queue_raw; - queue->reset(asbs_queue_raw = new internal::ASBSQueue( - this->shared_from_this(), options)); - mutex_lock l(mu_); - queues_and_callbacks_[asbs_queue_raw] = process_batch_callback; - return Status::OK(); -} - -template -void AdaptiveSharedBatchScheduler::AddBatch( - internal::ASBSBatch* batch) { - mutex_lock l(mu_); - batches_.push(batch); -} - -template -void AdaptiveSharedBatchScheduler::RemoveQueue( - const internal::ASBSQueue* queue) { - mutex_lock l(mu_); - queues_and_callbacks_.erase(queue); -} - -template -void AdaptiveSharedBatchScheduler::ProcessOneBatch() { - static const double kFeedbackMultiplier = .001; - internal::ASBSBatch* batch = nullptr; - BatchProcessor callback; - const int64 start_time_micros = GetEnv()->NowMicros(); - { - mutex_lock l(mu_); - if (!batches_.empty()) { - batch = batches_.top(); - batches_.pop(); - callback = queues_and_callbacks_[batch->queue()]; - } - } - if (batch != nullptr) { - double feedback = options_.scheduling_period_feedback(); - const int64 N = options_.feedback_smoothing_batches; - ewma_feedback_ = ((N - 1) * ewma_feedback_ + feedback) / N; - scheduling_period_ *= (1 + kFeedbackMultiplier * ewma_feedback_); - if (scheduling_period_ < options_.min_scheduling_period_micros) { - scheduling_period_ = options_.min_scheduling_period_micros; - } else if (scheduling_period_ > options_.max_scheduling_period_micros) { - scheduling_period_ = options_.max_scheduling_period_micros; - } - // Queue may destroy itself after ReleaseBatch is called. - batch->queue()->ReleaseBatch(batch); - batch_thread_pool_->Schedule([callback, batch] { - callback(std::unique_ptr>(batch)); - }); - } - const int64 sleep_time = - scheduling_period_ - (GetEnv()->NowMicros() - start_time_micros); - if (sleep_time > 0) { - GetEnv()->SleepForMicroseconds(sleep_time); - } -} - -template -bool AdaptiveSharedBatchScheduler::BatchCompare::operator()( - const internal::ASBSBatch* a, - const internal::ASBSBatch* b) { - return a->creation_time_micros() > b->creation_time_micros(); -} - -// ---------------- ASBSQueue ---------------- - -namespace internal { -template -ASBSQueue::ASBSQueue( - std::shared_ptr> scheduler, - const QueueOptions& options) - : scheduler_(scheduler), options_(options) {} - -template -ASBSQueue::~ASBSQueue() { - // Wait until last batch has been scheduled. - const int kSleepMicros = 1000; - for (;;) { - { - mutex_lock l(mu_); - if (num_enqueued_batches_ == 0) { - break; - } - } - scheduler_->GetEnv()->SleepForMicroseconds(kSleepMicros); - } - scheduler_->RemoveQueue(this); -} - -template -Status ASBSQueue::Schedule(std::unique_ptr* task) { - bool added_new_batch = false; - size_t size = (*task)->size(); - if (size > options_.max_batch_size) { - return errors::InvalidArgument("Task size ", size, - " is larger than maximum batch size ", - options_.max_batch_size); - } - { - mutex_lock l(mu_); - // Current batch is full, create another if allowed. - if (current_batch_ && - current_batch_->size() + size > options_.max_batch_size) { - if (num_enqueued_batches_ >= options_.max_enqueued_batches) { - return errors::Unavailable("The batch scheduling queue is full"); - } - current_batch_->Close(); - current_batch_ = nullptr; - } - if (!current_batch_) { - added_new_batch = true; - num_enqueued_batches_++; - current_batch_ = - new ASBSBatch(this, scheduler_->GetEnv()->NowMicros()); - } - current_batch_->AddTask(std::move(*task)); - num_enqueued_tasks_++; - } - if (added_new_batch) scheduler_->AddBatch(current_batch_); - return Status::OK(); -} - -template -void ASBSQueue::ReleaseBatch(const ASBSBatch* batch) { - mutex_lock l(mu_); - num_enqueued_batches_--; - num_enqueued_tasks_ -= batch->num_tasks(); - if (batch == current_batch_) { - current_batch_->Close(); - current_batch_ = nullptr; - } -} - -template -size_t ASBSQueue::NumEnqueuedTasks() const { - mutex_lock l(mu_); - return num_enqueued_tasks_; -} - -template -size_t ASBSQueue::SchedulingCapacity() const { - mutex_lock l(mu_); - const int current_batch_capacity = - current_batch_ ? options_.max_batch_size - current_batch_->size() : 0; - const int spare_batches = - options_.max_enqueued_batches - num_enqueued_batches_; - return spare_batches * options_.max_batch_size + current_batch_capacity; -} -} // namespace internal -} // namespace serving -} // namespace tensorflow - -#endif // THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_ diff --git a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc deleted file mode 100644 index a07cd6d834f..00000000000 --- a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc +++ /dev/null @@ -1,438 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h" - -#include "tensorflow/contrib/batching/test_util/fake_clock_env.h" -#include "tensorflow/core/lib/core/notification.h" -#include "tensorflow/core/lib/core/status_test_util.h" -#include "tensorflow/core/platform/macros.h" -#include "tensorflow/core/platform/test.h" - -namespace tensorflow { -namespace serving { -namespace anonymous { - -class FakeTask : public BatchTask { - public: - explicit FakeTask(size_t size) : size_(size) {} - - ~FakeTask() override = default; - - size_t size() const override { return size_; } - - private: - const size_t size_; - - TF_DISALLOW_COPY_AND_ASSIGN(FakeTask); -}; - -// Creates a FakeTask of size 'task_size', and calls 'scheduler->Schedule()' on -// that task. Returns the resulting status. -Status ScheduleTask(size_t task_size, BatchScheduler* scheduler) { - std::unique_ptr task(new FakeTask(task_size)); - Status status = scheduler->Schedule(&task); - // Schedule() should have consumed 'task' iff it returned Status::OK. - CHECK_EQ(status.ok(), task == nullptr); - return status; -} - -// Creates a thread that waits on 'start' and then advances the fake clock in -// 'env' in a loop until 'stop' is notified. Useful for allowing objects that -// use the clock to be destroyed. -std::unique_ptr CreateFakeClockAdvancerThread( - test_util::FakeClockEnv* env, Notification* start, Notification* stop) { - return std::unique_ptr(Env::Default()->StartThread( - {}, "FakeClockAdvancerThread", [env, start, stop] { - start->WaitForNotification(); - while (!stop->HasBeenNotified()) { - env->AdvanceByMicroseconds(10); - Env::Default()->SleepForMicroseconds(10); - } - })); -} - -TEST(AdaptiveSharedBatchSchedulerTest, Basic) { - for (const bool delete_scheduler_early : {false, true}) { - for (const bool delete_queue_1_early : {false, true}) { - int queue_0_tasks = 0; - auto queue_0_callback = - [&queue_0_tasks](std::unique_ptr> batch) { - ASSERT_TRUE(batch->IsClosed()); - EXPECT_GT(batch->num_tasks(), 0); - for (int i = 0; i < batch->num_tasks(); i++) { - queue_0_tasks += batch->task(i).size(); - } - }; - int queue_1_tasks = 0; - auto queue_1_callback = - [&queue_1_tasks](std::unique_ptr> batch) { - ASSERT_TRUE(batch->IsClosed()); - EXPECT_GT(batch->num_tasks(), 0); - for (int i = 0; i < batch->num_tasks(); i++) { - queue_1_tasks += batch->task(i).size(); - } - }; - { - std::shared_ptr> scheduler; - TF_ASSERT_OK( - AdaptiveSharedBatchScheduler::Create({}, &scheduler)); - - // Create two queues. - std::unique_ptr> queue_0; - TF_ASSERT_OK(scheduler->AddQueue({}, queue_0_callback, &queue_0)); - std::unique_ptr> queue_1; - TF_ASSERT_OK(scheduler->AddQueue({}, queue_1_callback, &queue_1)); - - if (delete_scheduler_early) { - // Delete our copy of the scheduler. The queues should keep it alive - // under the covers. - scheduler = nullptr; - } - // Submit tasks to the two queues, and (optionally) remove the queues. - TF_ASSERT_OK(ScheduleTask(1, queue_0.get())); - TF_ASSERT_OK(ScheduleTask(2, queue_1.get())); - TF_ASSERT_OK(ScheduleTask(3, queue_0.get())); - TF_ASSERT_OK(ScheduleTask(4, queue_1.get())); - if (delete_queue_1_early) { - queue_1 = nullptr; - } - TF_ASSERT_OK(ScheduleTask(5, queue_0.get())); - } - EXPECT_EQ(queue_0_tasks, 9); - EXPECT_EQ(queue_1_tasks, 6); - } - } -} - -TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) { - using Scheduler = AdaptiveSharedBatchScheduler; - std::shared_ptr scheduler; - Scheduler::Options options; - options.num_batch_threads = 0; - EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok()); - options = Scheduler::Options(); - options.min_scheduling_period_micros = 50; - options.max_scheduling_period_micros = 100; - options.initial_scheduling_period_micros = 1; - EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok()); - options = Scheduler::Options(); - options.min_scheduling_period_micros = 50; - options.max_scheduling_period_micros = 100; - options.initial_scheduling_period_micros = 1000; - EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok()); - options = Scheduler::Options(); - options.min_scheduling_period_micros = 100; - options.max_scheduling_period_micros = 50; - options.initial_scheduling_period_micros = 75; - EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok()); - options = Scheduler::Options(); - options.feedback_smoothing_batches = 0; - EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok()); -} - -TEST(AdaptiveSharedBatchSchedulerTest, ObeysQueueOptions) { - test_util::FakeClockEnv env(Env::Default()); - Notification start_teardown, stop_teardown; - std::unique_ptr teardown_thread = - CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown); - { - AdaptiveSharedBatchScheduler::Options options; - options.initial_scheduling_period_micros = 1000; - options.env = &env; - std::shared_ptr> scheduler; - TF_ASSERT_OK( - AdaptiveSharedBatchScheduler::Create(options, &scheduler)); - std::unique_ptr> queue_0; - std::unique_ptr> queue_1; - int queue_0_tasks = 0; - int queue_1_tasks = 0; - auto queue_0_callback = [&queue_0_tasks, - &env](std::unique_ptr> batch) { - ASSERT_TRUE(batch->IsClosed()); - EXPECT_GT(batch->num_tasks(), 0); - for (int i = 0; i < batch->num_tasks(); i++) { - queue_0_tasks += batch->task(i).size(); - } - env.SleepForMicroseconds(1); - }; - auto queue_1_callback = [&queue_1_tasks, - &env](std::unique_ptr> batch) { - ASSERT_TRUE(batch->IsClosed()); - EXPECT_GT(batch->num_tasks(), 0); - for (int i = 0; i < batch->num_tasks(); i++) { - queue_1_tasks += batch->task(i).size(); - } - env.SleepForMicroseconds(1); - }; - AdaptiveSharedBatchScheduler::QueueOptions queue_options; - queue_options.max_batch_size = 10; - queue_options.max_enqueued_batches = 0; - // Queue must have max_enqueued_batchs > 1. - EXPECT_FALSE( - scheduler->AddQueue(queue_options, queue_0_callback, &queue_0).ok()); - queue_options.max_enqueued_batches = 2; - TF_ASSERT_OK( - scheduler->AddQueue(queue_options, queue_0_callback, &queue_0)); - queue_options.max_batch_size = 0; - // Queue must have max_batch_size > 0. - EXPECT_FALSE( - scheduler->AddQueue(queue_options, queue_1_callback, &queue_1).ok()); - queue_options.max_batch_size = 2; - queue_options.max_enqueued_batches = 1; - TF_ASSERT_OK( - scheduler->AddQueue(queue_options, queue_1_callback, &queue_1)); - - // Wait for scheduling_thread to sleep. - env.BlockUntilThreadsAsleep(1); - // Task larger than max_batch_size shouldn't schedule. - EXPECT_FALSE(ScheduleTask(15, queue_0.get()).ok()); - TF_ASSERT_OK(ScheduleTask(5, queue_0.get())); - TF_ASSERT_OK(ScheduleTask(5, queue_0.get())); - env.AdvanceByMicroseconds(1); - - // Task larger than max_batch_size shouldn't schedule. - EXPECT_FALSE(ScheduleTask(3, queue_1.get()).ok()); - TF_ASSERT_OK(ScheduleTask(1, queue_1.get())); - TF_ASSERT_OK(ScheduleTask(1, queue_1.get())); - env.AdvanceByMicroseconds(1); - // Exceeds max_enqueued_batches, shouldn't schedule. - EXPECT_FALSE(ScheduleTask(1, queue_1.get()).ok()); - - TF_ASSERT_OK(ScheduleTask(5, queue_0.get())); - // Exceeds max_enqueued_batches, shouldn't schedule. - EXPECT_FALSE(ScheduleTask(6, queue_0.get()).ok()); - TF_ASSERT_OK(ScheduleTask(4, queue_0.get())); - - // Batches should be processed in order from oldest to newest. - env.AdvanceByMicroseconds(1000); - env.BlockUntilThreadsAsleep(2); - EXPECT_EQ(queue_0_tasks, 10); - EXPECT_EQ(queue_1_tasks, 0); - - env.AdvanceByMicroseconds(1000); - env.BlockUntilThreadsAsleep(2); - EXPECT_EQ(queue_0_tasks, 10); - EXPECT_EQ(queue_1_tasks, 2); - - env.AdvanceByMicroseconds(1000); - env.BlockUntilThreadsAsleep(2); - EXPECT_EQ(queue_0_tasks, 19); - EXPECT_EQ(queue_1_tasks, 2); - start_teardown.Notify(); - } - stop_teardown.Notify(); -} - -TEST(AdaptiveSharedBatchSchedulerTest, RateFeedback) { - test_util::FakeClockEnv env(Env::Default()); - Notification start_teardown, stop_teardown; - std::unique_ptr teardown_thread = - CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown); - { - double feedback = 0; - AdaptiveSharedBatchScheduler::Options options; - options.initial_scheduling_period_micros = 1000; - options.min_scheduling_period_micros = 200; - options.max_scheduling_period_micros = 2000; - options.env = &env; - options.scheduling_period_feedback = [&feedback] { return feedback; }; - options.feedback_smoothing_batches = 1; - std::shared_ptr> scheduler; - TF_ASSERT_OK( - AdaptiveSharedBatchScheduler::Create(options, &scheduler)); - std::unique_ptr> queue; - int scheduled_items = 0; - auto queue_callback = [&scheduled_items, - &env](std::unique_ptr> batch) { - ASSERT_TRUE(batch->IsClosed()); - EXPECT_GT(batch->num_tasks(), 0); - scheduled_items = 0; - for (int i = 0; i < batch->num_tasks(); i++) { - scheduled_items += batch->task(i).size(); - } - env.SleepForMicroseconds(1); - }; - - TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue)); - - // Wait for scheduling_thread to sleep. - env.BlockUntilThreadsAsleep(1); - // Enqueue 6 batches. - for (int i = 0; i < 6; i++) { - TF_ASSERT_OK(ScheduleTask(900 + i, queue.get())); - env.AdvanceByMicroseconds(1); - } - feedback = -500; - env.AdvanceByMicroseconds(994); - env.BlockUntilThreadsAsleep(2); // scheduling period = 500 usec. - EXPECT_EQ(scheduled_items, 900); - env.AdvanceByMicroseconds(500); - env.BlockUntilThreadsAsleep(2); // scheduling period = 250 usec. - EXPECT_EQ(scheduled_items, 901); - feedback = 0; - env.AdvanceByMicroseconds(250); - env.BlockUntilThreadsAsleep(2); // scheduling period = 250 usec. - EXPECT_EQ(scheduled_items, 902); - feedback = 10000; // large feedback should hit max_scheduling_period. - env.AdvanceByMicroseconds(250); - env.BlockUntilThreadsAsleep(2); // scheduling period = 2000 usec. - EXPECT_EQ(scheduled_items, 903); - feedback = -10000; // large feedback should hit min_scheduling_period. - env.AdvanceByMicroseconds(1999); - // No callback scheduled, only scheduling thread sleeping. - env.BlockUntilThreadsAsleep(1); - EXPECT_EQ(scheduled_items, 903); - env.AdvanceByMicroseconds(1); - env.BlockUntilThreadsAsleep(2); // scheduling period = 200 usec. - EXPECT_EQ(scheduled_items, 904); - env.AdvanceByMicroseconds(200); - env.BlockUntilThreadsAsleep(2); - EXPECT_EQ(scheduled_items, 905); - start_teardown.Notify(); - } - stop_teardown.Notify(); -} - -TEST(AdaptiveSharedBatchSchedulerTest, FeedbackSmoothing) { - test_util::FakeClockEnv env(Env::Default()); - Notification start_teardown, stop_teardown; - std::unique_ptr teardown_thread = - CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown); - { - double feedback = 0; - AdaptiveSharedBatchScheduler::Options options; - options.initial_scheduling_period_micros = 1000; - options.env = &env; - options.scheduling_period_feedback = [&feedback] { return feedback; }; - options.feedback_smoothing_batches = 3; - std::shared_ptr> scheduler; - TF_ASSERT_OK( - AdaptiveSharedBatchScheduler::Create(options, &scheduler)); - std::unique_ptr> queue; - int scheduled_items = 0; - auto queue_callback = [&scheduled_items, - &env](std::unique_ptr> batch) { - ASSERT_TRUE(batch->IsClosed()); - EXPECT_GT(batch->num_tasks(), 0); - scheduled_items = 0; - for (int i = 0; i < batch->num_tasks(); i++) { - scheduled_items += batch->task(i).size(); - } - env.SleepForMicroseconds(1); - }; - - TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue)); - - // Wait for scheduling_thread to sleep. - env.BlockUntilThreadsAsleep(1); - // Enqueue 4 batches. - for (int i = 0; i < 4; i++) { - TF_ASSERT_OK(ScheduleTask(900 + i, queue.get())); - env.AdvanceByMicroseconds(1); - } - feedback = -300; - env.AdvanceByMicroseconds(996); - env.BlockUntilThreadsAsleep(2); - // ewma_feedback = 100, scheduling_period = 900. - EXPECT_EQ(scheduled_items, 900); - env.AdvanceByMicroseconds(899); - // No callback scheduled, only scheduling thread sleeping. - env.BlockUntilThreadsAsleep(1); - EXPECT_EQ(scheduled_items, 900); - env.AdvanceByMicroseconds(1); - env.BlockUntilThreadsAsleep(2); - // ewma_feedback = 167, scheduling_period = 750. - EXPECT_EQ(scheduled_items, 901); - env.AdvanceByMicroseconds(749); - // No callback scheduled, only scheduling thread sleeping. - env.BlockUntilThreadsAsleep(1); - EXPECT_EQ(scheduled_items, 901); - feedback = 1000 / 3.; - env.AdvanceByMicroseconds(1); - env.BlockUntilThreadsAsleep(2); - // emwa_feedback = 0, scheduling_period = 750. - EXPECT_EQ(scheduled_items, 902); - env.AdvanceByMicroseconds(749); - // No callback scheduled, only scheduling thread sleeping. - env.BlockUntilThreadsAsleep(1); - EXPECT_EQ(scheduled_items, 902); - env.AdvanceByMicroseconds(1); - env.BlockUntilThreadsAsleep(2); - EXPECT_EQ(scheduled_items, 903); - start_teardown.Notify(); - } - stop_teardown.Notify(); -} - -TEST(AdaptiveSharedBatchSchedulerTest, QueueCapacityInfo) { - test_util::FakeClockEnv env(Env::Default()); - Notification start_teardown, stop_teardown; - std::unique_ptr teardown_thread = - CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown); - { - AdaptiveSharedBatchScheduler::Options options; - options.initial_scheduling_period_micros = 1000; - options.env = &env; - std::shared_ptr> scheduler; - TF_ASSERT_OK( - AdaptiveSharedBatchScheduler::Create(options, &scheduler)); - std::unique_ptr> queue; - int scheduled_items = 0; - auto queue_callback = [&scheduled_items, - &env](std::unique_ptr> batch) { - ASSERT_TRUE(batch->IsClosed()); - EXPECT_GT(batch->num_tasks(), 0); - scheduled_items = 0; - for (int i = 0; i < batch->num_tasks(); i++) { - scheduled_items += batch->task(i).size(); - } - env.SleepForMicroseconds(1); - }; - AdaptiveSharedBatchScheduler::QueueOptions queue_options; - queue_options.max_batch_size = 10; - queue_options.max_enqueued_batches = 10; - TF_ASSERT_OK(scheduler->AddQueue(queue_options, queue_callback, &queue)); - - // Wait for scheduling_thread to sleep. - env.BlockUntilThreadsAsleep(1); - // Enqueue 3 tasks. - EXPECT_EQ(queue->NumEnqueuedTasks(), 0); - EXPECT_EQ(queue->SchedulingCapacity(), 100); - TF_ASSERT_OK(ScheduleTask(5, queue.get())); - EXPECT_EQ(queue->NumEnqueuedTasks(), 1); - EXPECT_EQ(queue->SchedulingCapacity(), 95); - env.AdvanceByMicroseconds(1); - TF_ASSERT_OK(ScheduleTask(6, queue.get())); - EXPECT_EQ(queue->NumEnqueuedTasks(), 2); - EXPECT_EQ(queue->SchedulingCapacity(), 84); - env.AdvanceByMicroseconds(1); - TF_ASSERT_OK(ScheduleTask(1, queue.get())); - EXPECT_EQ(queue->NumEnqueuedTasks(), 3); - EXPECT_EQ(queue->SchedulingCapacity(), 83); - - env.AdvanceByMicroseconds(998); - env.BlockUntilThreadsAsleep(2); - EXPECT_EQ(scheduled_items, 5); - env.AdvanceByMicroseconds(1000); - env.BlockUntilThreadsAsleep(2); - EXPECT_EQ(scheduled_items, 7); - start_teardown.Notify(); - } - stop_teardown.Notify(); -} -} // namespace anonymous -} // namespace serving -} // namespace tensorflow diff --git a/tensorflow/contrib/batching/batch_scheduler.h b/tensorflow/contrib/batching/batch_scheduler.h index a5072f439ab..7c41ad88180 100644 --- a/tensorflow/contrib/batching/batch_scheduler.h +++ b/tensorflow/contrib/batching/batch_scheduler.h @@ -78,7 +78,7 @@ template class Batch { public: Batch() = default; - virtual ~Batch(); // Blocks until the batch is closed. + ~Batch(); // Blocks until the batch is closed. // Appends 'task' to the batch. After calling AddTask(), the newly-added task // can be accessed via task(num_tasks()-1) or mutable_task(num_tasks()-1).