Cleanup: Get rid of unused copy of simple threadpool implementation. The implementation is identical to the simple threadpool in Eigen (can be selected by not defining EIGEN_USE_NONBLOCKING_THREAD_POOL), which also has had all the same tracing calls added via the hooks in EigenEnvironment.
Change: 124006070
This commit is contained in:
parent
f4e89d4508
commit
89fb09e790
@ -15,15 +15,8 @@ limitations under the License.
|
||||
|
||||
#include "tensorflow/core/lib/core/threadpool.h"
|
||||
|
||||
#ifdef TENSORFLOW_USE_EIGEN_THREADPOOL
|
||||
#define EIGEN_USE_THREADS
|
||||
#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
|
||||
#else
|
||||
#include <deque>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#endif
|
||||
|
||||
#include "tensorflow/core/platform/denormal.h"
|
||||
#include "tensorflow/core/platform/logging.h"
|
||||
#include "tensorflow/core/platform/mutex.h"
|
||||
@ -33,8 +26,6 @@ limitations under the License.
|
||||
namespace tensorflow {
|
||||
namespace thread {
|
||||
|
||||
#ifdef TENSORFLOW_USE_EIGEN_THREADPOOL
|
||||
|
||||
struct EigenEnvironment {
|
||||
typedef Thread EnvThread;
|
||||
struct Task {
|
||||
@ -105,125 +96,6 @@ struct ThreadPool::Impl : Eigen::ThreadPoolTempl<EigenEnvironment> {
|
||||
const int num_threads_;
|
||||
};
|
||||
|
||||
#else
|
||||
|
||||
struct ThreadPool::Impl {
|
||||
Impl(Env* env, const ThreadOptions& thread_options, const string& name,
|
||||
int num_threads);
|
||||
~Impl();
|
||||
void Schedule(std::function<void()> fn);
|
||||
void ParallelFor(int64 total, int64 cost_per_unit,
|
||||
std::function<void(int64, int64)> fn) {
|
||||
CHECK(0); // should not be used with the old thread pool
|
||||
}
|
||||
|
||||
int NumThreads() const { return threads_.size(); };
|
||||
|
||||
private:
|
||||
struct Waiter {
|
||||
condition_variable cv;
|
||||
bool ready;
|
||||
};
|
||||
|
||||
struct Task {
|
||||
std::function<void()> fn;
|
||||
uint64 id;
|
||||
};
|
||||
|
||||
void WorkerLoop();
|
||||
|
||||
const string name_;
|
||||
mutex mu_;
|
||||
std::vector<Thread*> threads_; // All threads
|
||||
std::vector<Waiter*> waiters_; // Stack of waiting threads.
|
||||
std::deque<Task> pending_; // Queue of pending work
|
||||
};
|
||||
|
||||
ThreadPool::Impl::Impl(Env* env, const ThreadOptions& thread_options,
|
||||
const string& name, int num_threads)
|
||||
: name_(name) {
|
||||
for (int i = 0; i < num_threads; i++) {
|
||||
threads_.push_back(
|
||||
env->StartThread(thread_options, name, [this]() { WorkerLoop(); }));
|
||||
}
|
||||
}
|
||||
|
||||
ThreadPool::Impl::~Impl() {
|
||||
{
|
||||
// Wait for all work to get done.
|
||||
mutex_lock l(mu_);
|
||||
|
||||
// Inform every thread to exit.
|
||||
for (size_t i = 0; i < threads_.size(); ++i) {
|
||||
pending_.push_back({nullptr, 0});
|
||||
}
|
||||
|
||||
// Wakeup all waiters.
|
||||
for (auto w : waiters_) {
|
||||
w->ready = true;
|
||||
w->cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for threads to finish.
|
||||
for (auto t : threads_) {
|
||||
delete t;
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadPool::Impl::Schedule(std::function<void()> fn) {
|
||||
uint64 id = 0;
|
||||
if (port::Tracing::IsActive()) {
|
||||
id = port::Tracing::UniqueId();
|
||||
port::Tracing::RecordEvent(port::Tracing::EventCategory::kScheduleClosure,
|
||||
id);
|
||||
}
|
||||
|
||||
mutex_lock l(mu_);
|
||||
pending_.push_back({fn, id});
|
||||
if (!waiters_.empty()) {
|
||||
Waiter* w = waiters_.back();
|
||||
waiters_.pop_back();
|
||||
w->ready = true;
|
||||
w->cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadPool::Impl::WorkerLoop() {
|
||||
// Set the processor flag to flush denormals to zero
|
||||
port::ScopedFlushDenormal flush;
|
||||
|
||||
port::Tracing::RegisterCurrentThread(name_.c_str());
|
||||
mutex_lock l(mu_);
|
||||
Waiter w;
|
||||
while (true) {
|
||||
while (pending_.empty()) {
|
||||
// Wait for work to be assigned to me
|
||||
w.ready = false;
|
||||
waiters_.push_back(&w);
|
||||
while (!w.ready) {
|
||||
w.cv.wait(l);
|
||||
}
|
||||
}
|
||||
// Pick up pending work
|
||||
Task t = pending_.front();
|
||||
pending_.pop_front();
|
||||
if (t.fn == nullptr) {
|
||||
break;
|
||||
}
|
||||
mu_.unlock();
|
||||
if (t.id != 0) {
|
||||
port::Tracing::ScopedActivity region(
|
||||
port::Tracing::EventCategory::kRunClosure, t.id);
|
||||
t.fn();
|
||||
} else {
|
||||
t.fn();
|
||||
}
|
||||
mu_.lock();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
ThreadPool::ThreadPool(Env* env, const string& name, int num_threads)
|
||||
: ThreadPool(env, ThreadOptions(), name, num_threads) {}
|
||||
|
||||
|
@ -100,8 +100,7 @@ def if_android(a):
|
||||
})
|
||||
|
||||
def tf_copts():
|
||||
return (["-fno-exceptions", "-DEIGEN_AVOID_STL_ARRAY",
|
||||
"-DTENSORFLOW_USE_EIGEN_THREADPOOL"] +
|
||||
return (["-fno-exceptions", "-DEIGEN_AVOID_STL_ARRAY"] +
|
||||
if_cuda(["-DGOOGLE_CUDA=1"]) +
|
||||
if_android_arm(["-mfpu=neon"]) +
|
||||
select({"//tensorflow:android": [
|
||||
|
Loading…
Reference in New Issue
Block a user