Moving threadpool libraries to tensorflow/core/platform.
PiperOrigin-RevId: 269123124
This commit is contained in:
parent
bd4658c16a
commit
2179d24c5e
@ -3,7 +3,6 @@ tensorflow/core/framework/tensor_shape.cc
|
|||||||
tensorflow/core/lib/core/arena.cc
|
tensorflow/core/lib/core/arena.cc
|
||||||
tensorflow/core/lib/core/coding.cc
|
tensorflow/core/lib/core/coding.cc
|
||||||
tensorflow/core/lib/core/status.cc
|
tensorflow/core/lib/core/status.cc
|
||||||
tensorflow/core/lib/core/threadpool.cc
|
|
||||||
tensorflow/core/lib/hash/crc32c.cc
|
tensorflow/core/lib/hash/crc32c.cc
|
||||||
tensorflow/core/lib/hash/crc32c_accelerate.cc
|
tensorflow/core/lib/hash/crc32c_accelerate.cc
|
||||||
tensorflow/core/lib/hash/hash.cc
|
tensorflow/core/lib/hash/hash.cc
|
||||||
@ -56,6 +55,7 @@ tensorflow/core/platform/strcat.cc
|
|||||||
tensorflow/core/platform/stringprintf.cc
|
tensorflow/core/platform/stringprintf.cc
|
||||||
tensorflow/core/platform/str_util.cc
|
tensorflow/core/platform/str_util.cc
|
||||||
tensorflow/core/platform/tensor_coding.cc
|
tensorflow/core/platform/tensor_coding.cc
|
||||||
|
tensorflow/core/platform/threadpool.cc
|
||||||
tensorflow/core/platform/tracing.cc
|
tensorflow/core/platform/tracing.cc
|
||||||
tensorflow/tools/proto_text/gen_proto_text_functions.cc
|
tensorflow/tools/proto_text/gen_proto_text_functions.cc
|
||||||
tensorflow/tools/proto_text/gen_proto_text_functions_lib.cc
|
tensorflow/tools/proto_text/gen_proto_text_functions_lib.cc
|
||||||
|
@ -331,6 +331,9 @@ filegroup(
|
|||||||
"//tensorflow/core/platform:logging.h",
|
"//tensorflow/core/platform:logging.h",
|
||||||
"//tensorflow/core/platform:macros.h",
|
"//tensorflow/core/platform:macros.h",
|
||||||
"//tensorflow/core/platform:platform_strings.h",
|
"//tensorflow/core/platform:platform_strings.h",
|
||||||
|
"//tensorflow/core/platform:threadpool.h",
|
||||||
|
"//tensorflow/core/platform:threadpool_interface.h",
|
||||||
|
"//tensorflow/core/platform:threadpool_options.h",
|
||||||
"//tensorflow/core/platform:tstring.h",
|
"//tensorflow/core/platform:tstring.h",
|
||||||
"//tensorflow/core/platform:types.h",
|
"//tensorflow/core/platform:types.h",
|
||||||
],
|
],
|
||||||
|
@ -114,9 +114,7 @@ cc_library(
|
|||||||
name = "threadpool_interface",
|
name = "threadpool_interface",
|
||||||
hdrs = ["threadpool_interface.h"],
|
hdrs = ["threadpool_interface.h"],
|
||||||
deps = [
|
deps = [
|
||||||
"//tensorflow/core/platform:mutex",
|
"//tensorflow/core/platform:threadpool_interface",
|
||||||
"//tensorflow/core/platform:types",
|
|
||||||
"//third_party/eigen3",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -124,7 +122,7 @@ cc_library(
|
|||||||
name = "threadpool_options",
|
name = "threadpool_options",
|
||||||
hdrs = ["threadpool_options.h"],
|
hdrs = ["threadpool_options.h"],
|
||||||
deps = [
|
deps = [
|
||||||
":threadpool_interface",
|
"//tensorflow/core/platform:threadpool_options",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -165,7 +163,6 @@ filegroup(
|
|||||||
"bitmap.cc",
|
"bitmap.cc",
|
||||||
"coding.cc",
|
"coding.cc",
|
||||||
"status.cc",
|
"status.cc",
|
||||||
"threadpool.cc",
|
|
||||||
],
|
],
|
||||||
visibility = ["//tensorflow/core:__pkg__"],
|
visibility = ["//tensorflow/core:__pkg__"],
|
||||||
)
|
)
|
||||||
|
@ -16,228 +16,6 @@ limitations under the License.
|
|||||||
#ifndef TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_
|
#ifndef TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_
|
||||||
#define TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_
|
#define TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_
|
||||||
|
|
||||||
#include <functional>
|
#include "tensorflow/core/platform/threadpool.h"
|
||||||
#include <memory>
|
|
||||||
|
|
||||||
#include "absl/types/optional.h"
|
|
||||||
#include "tensorflow/core/lib/core/threadpool_interface.h"
|
|
||||||
#include "tensorflow/core/platform/env.h"
|
|
||||||
#include "tensorflow/core/platform/macros.h"
|
|
||||||
#include "tensorflow/core/platform/types.h"
|
|
||||||
|
|
||||||
namespace Eigen {
|
|
||||||
class Allocator;
|
|
||||||
class ThreadPoolInterface;
|
|
||||||
struct ThreadPoolDevice;
|
|
||||||
|
|
||||||
template <typename Environment>
|
|
||||||
class ThreadPoolTempl;
|
|
||||||
} // namespace Eigen
|
|
||||||
|
|
||||||
namespace tensorflow {
|
|
||||||
namespace thread {
|
|
||||||
|
|
||||||
struct EigenEnvironment;
|
|
||||||
|
|
||||||
class ThreadPool {
|
|
||||||
public:
|
|
||||||
// Scheduling strategies for ParallelFor. The strategy governs how the given
|
|
||||||
// units of work are distributed among the available threads in the
|
|
||||||
// threadpool.
|
|
||||||
enum class SchedulingStrategy {
|
|
||||||
// The Adaptive scheduling strategy adaptively chooses the shard sizes based
|
|
||||||
// on the cost of each unit of work, and the cost model of the underlying
|
|
||||||
// threadpool device.
|
|
||||||
//
|
|
||||||
// The 'cost_per_unit' is an estimate of the number of CPU cycles (or
|
|
||||||
// nanoseconds if not CPU-bound) to complete a unit of work. Overestimating
|
|
||||||
// creates too many shards and CPU time will be dominated by per-shard
|
|
||||||
// overhead, such as Context creation. Underestimating may not fully make
|
|
||||||
// use of the specified parallelism, and may also cause inefficiencies due
|
|
||||||
// to load balancing issues and stragglers.
|
|
||||||
kAdaptive,
|
|
||||||
// The Fixed Block Size scheduling strategy shards the given units of work
|
|
||||||
// into shards of fixed size. In case the total number of units is not
|
|
||||||
// evenly divisible by 'block_size', at most one of the shards may be of
|
|
||||||
// smaller size. The exact number of shards may be found by a call to
|
|
||||||
// NumShardsUsedByFixedBlockSizeScheduling.
|
|
||||||
//
|
|
||||||
// Each shard may be executed on a different thread in parallel, depending
|
|
||||||
// on the number of threads available in the pool. Note that when there
|
|
||||||
// aren't enough threads in the pool to achieve full parallelism, function
|
|
||||||
// calls will be automatically queued.
|
|
||||||
kFixedBlockSize
|
|
||||||
};
|
|
||||||
|
|
||||||
// Contains additional parameters for either the Adaptive or the Fixed Block
|
|
||||||
// Size scheduling strategy.
|
|
||||||
class SchedulingParams {
|
|
||||||
public:
|
|
||||||
explicit SchedulingParams(SchedulingStrategy strategy,
|
|
||||||
absl::optional<int64> cost_per_unit,
|
|
||||||
absl::optional<int64> block_size)
|
|
||||||
: strategy_(strategy),
|
|
||||||
cost_per_unit_(cost_per_unit),
|
|
||||||
block_size_(block_size) {}
|
|
||||||
|
|
||||||
SchedulingStrategy strategy() const { return strategy_; }
|
|
||||||
absl::optional<int64> cost_per_unit() const { return cost_per_unit_; }
|
|
||||||
absl::optional<int64> block_size() const { return block_size_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
// The underlying Scheduling Strategy for which this instance contains
|
|
||||||
// additional parameters.
|
|
||||||
SchedulingStrategy strategy_;
|
|
||||||
|
|
||||||
// The estimated cost per unit of work in number of CPU cycles (or
|
|
||||||
// nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling
|
|
||||||
// strategy.
|
|
||||||
absl::optional<int64> cost_per_unit_;
|
|
||||||
|
|
||||||
// The block size of each shard. Only applicable for Fixed Block Size
|
|
||||||
// scheduling strategy.
|
|
||||||
absl::optional<int64> block_size_;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Constructs a pool that contains "num_threads" threads with specified
|
|
||||||
// "name". env->StartThread() is used to create individual threads with the
|
|
||||||
// given ThreadOptions. If "low_latency_hint" is true the thread pool
|
|
||||||
// implementation may use it as a hint that lower latency is preferred at the
|
|
||||||
// cost of higher CPU usage, e.g. by letting one or more idle threads spin
|
|
||||||
// wait. Conversely, if the threadpool is used to schedule high-latency
|
|
||||||
// operations like I/O the hint should be set to false.
|
|
||||||
//
|
|
||||||
// REQUIRES: num_threads > 0
|
|
||||||
ThreadPool(Env* env, const ThreadOptions& thread_options, const string& name,
|
|
||||||
int num_threads, bool low_latency_hint,
|
|
||||||
Eigen::Allocator* allocator = nullptr);
|
|
||||||
|
|
||||||
// Constructs a pool for low-latency ops that contains "num_threads" threads
|
|
||||||
// with specified "name". env->StartThread() is used to create individual
|
|
||||||
// threads.
|
|
||||||
// REQUIRES: num_threads > 0
|
|
||||||
ThreadPool(Env* env, const string& name, int num_threads);
|
|
||||||
|
|
||||||
// Constructs a pool for low-latency ops that contains "num_threads" threads
|
|
||||||
// with specified "name". env->StartThread() is used to create individual
|
|
||||||
// threads with the given ThreadOptions.
|
|
||||||
// REQUIRES: num_threads > 0
|
|
||||||
ThreadPool(Env* env, const ThreadOptions& thread_options, const string& name,
|
|
||||||
int num_threads);
|
|
||||||
|
|
||||||
// Constructs a pool that wraps around the thread::ThreadPoolInterface
|
|
||||||
// instance provided by the caller. Caller retains ownership of
|
|
||||||
// `user_threadpool` and must ensure its lifetime is longer than the
|
|
||||||
// ThreadPool instance.
|
|
||||||
explicit ThreadPool(thread::ThreadPoolInterface* user_threadpool);
|
|
||||||
|
|
||||||
// Waits until all scheduled work has finished and then destroy the
|
|
||||||
// set of threads.
|
|
||||||
~ThreadPool();
|
|
||||||
|
|
||||||
// Schedules fn() for execution in the pool of threads.
|
|
||||||
void Schedule(std::function<void()> fn);
|
|
||||||
|
|
||||||
void SetStealPartitions(
|
|
||||||
const std::vector<std::pair<unsigned, unsigned>>& partitions);
|
|
||||||
|
|
||||||
void ScheduleWithHint(std::function<void()> fn, int start, int limit);
|
|
||||||
|
|
||||||
// Returns the number of shards used by ParallelForFixedBlockSizeScheduling
|
|
||||||
// with these parameters.
|
|
||||||
int NumShardsUsedByFixedBlockSizeScheduling(const int64 total,
|
|
||||||
const int64 block_size);
|
|
||||||
|
|
||||||
// Returns the number of threads spawned by calling TransformRangeConcurrently
|
|
||||||
// with these parameters.
|
|
||||||
// Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling.
|
|
||||||
int NumShardsUsedByTransformRangeConcurrently(const int64 block_size,
|
|
||||||
const int64 total);
|
|
||||||
|
|
||||||
// ParallelFor shards the "total" units of work assuming each unit of work
|
|
||||||
// having roughly "cost_per_unit" cost, in cycles. Each unit of work is
|
|
||||||
// indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
|
|
||||||
// and the total cost of each shard is roughly the same.
|
|
||||||
//
|
|
||||||
// "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds
|
|
||||||
// if not CPU-bound) to complete a unit of work. Overestimating creates too
|
|
||||||
// many shards and CPU time will be dominated by per-shard overhead, such as
|
|
||||||
// Context creation. Underestimating may not fully make use of the specified
|
|
||||||
// parallelism, and may also cause inefficiencies due to load balancing
|
|
||||||
// issues and stragglers.
|
|
||||||
void ParallelFor(int64 total, int64 cost_per_unit,
|
|
||||||
const std::function<void(int64, int64)>& fn);
|
|
||||||
|
|
||||||
// Similar to ParallelFor above, but takes the specified scheduling strategy
|
|
||||||
// into account.
|
|
||||||
void ParallelFor(int64 total, const SchedulingParams& scheduling_params,
|
|
||||||
const std::function<void(int64, int64)>& fn);
|
|
||||||
|
|
||||||
// Same as ParallelFor with Fixed Block Size scheduling strategy.
|
|
||||||
// Deprecated. Prefer ParallelFor with a SchedulingStrategy argument.
|
|
||||||
void TransformRangeConcurrently(const int64 block_size, const int64 total,
|
|
||||||
const std::function<void(int64, int64)>& fn);
|
|
||||||
|
|
||||||
// Shards the "total" units of work. For more details, see "ParallelFor".
|
|
||||||
//
|
|
||||||
// The function is passed a thread_id between 0 and NumThreads() *inclusive*.
|
|
||||||
// This is because some work can happen on the caller thread while the threads
|
|
||||||
// in the pool are also being used.
|
|
||||||
//
|
|
||||||
// The caller can allocate NumThreads() + 1 separate buffers for each thread.
|
|
||||||
// Each thread can safely write to the buffer given by its id without
|
|
||||||
// synchronization. However, the worker fn may be called multiple times
|
|
||||||
// sequentially with the same id.
|
|
||||||
//
|
|
||||||
// At most NumThreads() unique ids will actually be used, and only a few may
|
|
||||||
// be used for small workloads. If each buffer is expensive, the buffers
|
|
||||||
// should be stored in an array initially filled with null, and a buffer
|
|
||||||
// should be allocated by fn the first time that the id is used.
|
|
||||||
void ParallelForWithWorkerId(
|
|
||||||
int64 total, int64 cost_per_unit,
|
|
||||||
const std::function<void(int64, int64, int)>& fn);
|
|
||||||
|
|
||||||
// Similar to ParallelForWithWorkerId above, but takes the specified
|
|
||||||
// scheduling strategy into account.
|
|
||||||
void ParallelForWithWorkerId(
|
|
||||||
int64 total, const SchedulingParams& scheduling_params,
|
|
||||||
const std::function<void(int64, int64, int)>& fn);
|
|
||||||
|
|
||||||
// Returns the number of threads in the pool.
|
|
||||||
int NumThreads() const;
|
|
||||||
|
|
||||||
// Returns current thread id between 0 and NumThreads() - 1, if called from a
|
|
||||||
// thread in the pool. Returns -1 otherwise.
|
|
||||||
int CurrentThreadId() const;
|
|
||||||
|
|
||||||
// If ThreadPool implementation is compatible with Eigen::ThreadPoolInterface,
|
|
||||||
// returns a non-null pointer. The caller does not own the object the returned
|
|
||||||
// pointer points to, and should not attempt to delete.
|
|
||||||
Eigen::ThreadPoolInterface* AsEigenThreadPool() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
// Divides the work represented by the range [0, total) into k shards.
|
|
||||||
// Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k).
|
|
||||||
// Each shard may be executed on a different thread in parallel, depending on
|
|
||||||
// the number of threads available in the pool.
|
|
||||||
// When (i+1)*block_size > total, fn(i*block_size, total) is called instead.
|
|
||||||
// Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size).
|
|
||||||
// Requires 0 < block_size <= total.
|
|
||||||
void ParallelForFixedBlockSizeScheduling(
|
|
||||||
const int64 total, const int64 block_size,
|
|
||||||
const std::function<void(int64, int64)>& fn);
|
|
||||||
|
|
||||||
// underlying_threadpool_ is the user_threadpool if user_threadpool is
|
|
||||||
// provided in the constructor. Otherwise it is the eigen_threadpool_.
|
|
||||||
Eigen::ThreadPoolInterface* underlying_threadpool_;
|
|
||||||
// eigen_threadpool_ is instantiated and owned by thread::ThreadPool if
|
|
||||||
// user_threadpool is not in the constructor.
|
|
||||||
std::unique_ptr<Eigen::ThreadPoolTempl<EigenEnvironment>> eigen_threadpool_;
|
|
||||||
std::unique_ptr<Eigen::ThreadPoolDevice> threadpool_device_;
|
|
||||||
TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool);
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace thread
|
|
||||||
} // namespace tensorflow
|
|
||||||
|
|
||||||
#endif // TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_
|
#endif // TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_
|
||||||
|
@ -16,14 +16,6 @@ limitations under the License.
|
|||||||
#ifndef TENSORFLOW_CORE_LIB_CORE_THREADPOOL_INTERFACE_H_
|
#ifndef TENSORFLOW_CORE_LIB_CORE_THREADPOOL_INTERFACE_H_
|
||||||
#define TENSORFLOW_CORE_LIB_CORE_THREADPOOL_INTERFACE_H_
|
#define TENSORFLOW_CORE_LIB_CORE_THREADPOOL_INTERFACE_H_
|
||||||
|
|
||||||
#include "third_party/eigen3/unsupported/Eigen/CXX11/ThreadPool"
|
#include "tensorflow/core/platform/threadpool_interface.h"
|
||||||
|
|
||||||
namespace tensorflow {
|
|
||||||
namespace thread {
|
|
||||||
|
|
||||||
class ThreadPoolInterface : public Eigen::ThreadPoolInterface {};
|
|
||||||
|
|
||||||
} // namespace thread
|
|
||||||
} // namespace tensorflow
|
|
||||||
|
|
||||||
#endif // TENSORFLOW_CORE_LIB_CORE_THREADPOOL_INTERFACE_H_
|
#endif // TENSORFLOW_CORE_LIB_CORE_THREADPOOL_INTERFACE_H_
|
||||||
|
@ -16,20 +16,6 @@ limitations under the License.
|
|||||||
#ifndef TENSORFLOW_CORE_LIB_CORE_THREADPOOL_OPTIONS_H_
|
#ifndef TENSORFLOW_CORE_LIB_CORE_THREADPOOL_OPTIONS_H_
|
||||||
#define TENSORFLOW_CORE_LIB_CORE_THREADPOOL_OPTIONS_H_
|
#define TENSORFLOW_CORE_LIB_CORE_THREADPOOL_OPTIONS_H_
|
||||||
|
|
||||||
#include "tensorflow/core/lib/core/threadpool_interface.h"
|
#include "tensorflow/core/platform/threadpool_options.h"
|
||||||
|
|
||||||
namespace tensorflow {
|
|
||||||
namespace thread {
|
|
||||||
|
|
||||||
struct ThreadPoolOptions {
|
|
||||||
// If not null, use this threadpool to schedule inter-op operation
|
|
||||||
thread::ThreadPoolInterface* inter_op_threadpool;
|
|
||||||
|
|
||||||
// If not null, use this threadpool to schedule intra-op operation
|
|
||||||
thread::ThreadPoolInterface* intra_op_threadpool;
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace thread
|
|
||||||
} // namespace tensorflow
|
|
||||||
|
|
||||||
#endif // TENSORFLOW_CORE_LIB_CORE_THREADPOOL_OPTIONS_H_
|
#endif // TENSORFLOW_CORE_LIB_CORE_THREADPOOL_OPTIONS_H_
|
||||||
|
@ -487,6 +487,24 @@ cc_library(
|
|||||||
hdrs = ["thread_annotations.h"],
|
hdrs = ["thread_annotations.h"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
cc_library(
|
||||||
|
name = "threadpool_interface",
|
||||||
|
hdrs = ["threadpool_interface.h"],
|
||||||
|
deps = [
|
||||||
|
":mutex",
|
||||||
|
":types",
|
||||||
|
"//third_party/eigen3",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
cc_library(
|
||||||
|
name = "threadpool_options",
|
||||||
|
hdrs = ["threadpool_options.h"],
|
||||||
|
deps = [
|
||||||
|
":threadpool_interface",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
cc_library(
|
cc_library(
|
||||||
name = "tstring",
|
name = "tstring",
|
||||||
hdrs = ["tstring.h"],
|
hdrs = ["tstring.h"],
|
||||||
|
@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
|
|||||||
limitations under the License.
|
limitations under the License.
|
||||||
==============================================================================*/
|
==============================================================================*/
|
||||||
|
|
||||||
#include "tensorflow/core/lib/core/threadpool.h"
|
#include "tensorflow/core/platform/threadpool.h"
|
||||||
|
|
||||||
#define EIGEN_USE_THREADS
|
#define EIGEN_USE_THREADS
|
||||||
|
|
243
tensorflow/core/platform/threadpool.h
Normal file
243
tensorflow/core/platform/threadpool.h
Normal file
@ -0,0 +1,243 @@
|
|||||||
|
/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
==============================================================================*/
|
||||||
|
|
||||||
|
#ifndef TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_
|
||||||
|
#define TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
#include "absl/types/optional.h"
|
||||||
|
#include "tensorflow/core/platform/env.h"
|
||||||
|
#include "tensorflow/core/platform/macros.h"
|
||||||
|
#include "tensorflow/core/platform/threadpool_interface.h"
|
||||||
|
#include "tensorflow/core/platform/types.h"
|
||||||
|
|
||||||
|
namespace Eigen {
|
||||||
|
class Allocator;
|
||||||
|
class ThreadPoolInterface;
|
||||||
|
struct ThreadPoolDevice;
|
||||||
|
|
||||||
|
template <typename Environment>
|
||||||
|
class ThreadPoolTempl;
|
||||||
|
} // namespace Eigen
|
||||||
|
|
||||||
|
namespace tensorflow {
|
||||||
|
namespace thread {
|
||||||
|
|
||||||
|
struct EigenEnvironment;
|
||||||
|
|
||||||
|
class ThreadPool {
|
||||||
|
public:
|
||||||
|
// Scheduling strategies for ParallelFor. The strategy governs how the given
|
||||||
|
// units of work are distributed among the available threads in the
|
||||||
|
// threadpool.
|
||||||
|
enum class SchedulingStrategy {
|
||||||
|
// The Adaptive scheduling strategy adaptively chooses the shard sizes based
|
||||||
|
// on the cost of each unit of work, and the cost model of the underlying
|
||||||
|
// threadpool device.
|
||||||
|
//
|
||||||
|
// The 'cost_per_unit' is an estimate of the number of CPU cycles (or
|
||||||
|
// nanoseconds if not CPU-bound) to complete a unit of work. Overestimating
|
||||||
|
// creates too many shards and CPU time will be dominated by per-shard
|
||||||
|
// overhead, such as Context creation. Underestimating may not fully make
|
||||||
|
// use of the specified parallelism, and may also cause inefficiencies due
|
||||||
|
// to load balancing issues and stragglers.
|
||||||
|
kAdaptive,
|
||||||
|
// The Fixed Block Size scheduling strategy shards the given units of work
|
||||||
|
// into shards of fixed size. In case the total number of units is not
|
||||||
|
// evenly divisible by 'block_size', at most one of the shards may be of
|
||||||
|
// smaller size. The exact number of shards may be found by a call to
|
||||||
|
// NumShardsUsedByFixedBlockSizeScheduling.
|
||||||
|
//
|
||||||
|
// Each shard may be executed on a different thread in parallel, depending
|
||||||
|
// on the number of threads available in the pool. Note that when there
|
||||||
|
// aren't enough threads in the pool to achieve full parallelism, function
|
||||||
|
// calls will be automatically queued.
|
||||||
|
kFixedBlockSize
|
||||||
|
};
|
||||||
|
|
||||||
|
// Contains additional parameters for either the Adaptive or the Fixed Block
|
||||||
|
// Size scheduling strategy.
|
||||||
|
class SchedulingParams {
|
||||||
|
public:
|
||||||
|
explicit SchedulingParams(SchedulingStrategy strategy,
|
||||||
|
absl::optional<int64> cost_per_unit,
|
||||||
|
absl::optional<int64> block_size)
|
||||||
|
: strategy_(strategy),
|
||||||
|
cost_per_unit_(cost_per_unit),
|
||||||
|
block_size_(block_size) {}
|
||||||
|
|
||||||
|
SchedulingStrategy strategy() const { return strategy_; }
|
||||||
|
absl::optional<int64> cost_per_unit() const { return cost_per_unit_; }
|
||||||
|
absl::optional<int64> block_size() const { return block_size_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
// The underlying Scheduling Strategy for which this instance contains
|
||||||
|
// additional parameters.
|
||||||
|
SchedulingStrategy strategy_;
|
||||||
|
|
||||||
|
// The estimated cost per unit of work in number of CPU cycles (or
|
||||||
|
// nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling
|
||||||
|
// strategy.
|
||||||
|
absl::optional<int64> cost_per_unit_;
|
||||||
|
|
||||||
|
// The block size of each shard. Only applicable for Fixed Block Size
|
||||||
|
// scheduling strategy.
|
||||||
|
absl::optional<int64> block_size_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Constructs a pool that contains "num_threads" threads with specified
|
||||||
|
// "name". env->StartThread() is used to create individual threads with the
|
||||||
|
// given ThreadOptions. If "low_latency_hint" is true the thread pool
|
||||||
|
// implementation may use it as a hint that lower latency is preferred at the
|
||||||
|
// cost of higher CPU usage, e.g. by letting one or more idle threads spin
|
||||||
|
// wait. Conversely, if the threadpool is used to schedule high-latency
|
||||||
|
// operations like I/O the hint should be set to false.
|
||||||
|
//
|
||||||
|
// REQUIRES: num_threads > 0
|
||||||
|
ThreadPool(Env* env, const ThreadOptions& thread_options, const string& name,
|
||||||
|
int num_threads, bool low_latency_hint,
|
||||||
|
Eigen::Allocator* allocator = nullptr);
|
||||||
|
|
||||||
|
// Constructs a pool for low-latency ops that contains "num_threads" threads
|
||||||
|
// with specified "name". env->StartThread() is used to create individual
|
||||||
|
// threads.
|
||||||
|
// REQUIRES: num_threads > 0
|
||||||
|
ThreadPool(Env* env, const string& name, int num_threads);
|
||||||
|
|
||||||
|
// Constructs a pool for low-latency ops that contains "num_threads" threads
|
||||||
|
// with specified "name". env->StartThread() is used to create individual
|
||||||
|
// threads with the given ThreadOptions.
|
||||||
|
// REQUIRES: num_threads > 0
|
||||||
|
ThreadPool(Env* env, const ThreadOptions& thread_options, const string& name,
|
||||||
|
int num_threads);
|
||||||
|
|
||||||
|
// Constructs a pool that wraps around the thread::ThreadPoolInterface
|
||||||
|
// instance provided by the caller. Caller retains ownership of
|
||||||
|
// `user_threadpool` and must ensure its lifetime is longer than the
|
||||||
|
// ThreadPool instance.
|
||||||
|
explicit ThreadPool(thread::ThreadPoolInterface* user_threadpool);
|
||||||
|
|
||||||
|
// Waits until all scheduled work has finished and then destroy the
|
||||||
|
// set of threads.
|
||||||
|
~ThreadPool();
|
||||||
|
|
||||||
|
// Schedules fn() for execution in the pool of threads.
|
||||||
|
void Schedule(std::function<void()> fn);
|
||||||
|
|
||||||
|
void SetStealPartitions(
|
||||||
|
const std::vector<std::pair<unsigned, unsigned>>& partitions);
|
||||||
|
|
||||||
|
void ScheduleWithHint(std::function<void()> fn, int start, int limit);
|
||||||
|
|
||||||
|
// Returns the number of shards used by ParallelForFixedBlockSizeScheduling
|
||||||
|
// with these parameters.
|
||||||
|
int NumShardsUsedByFixedBlockSizeScheduling(const int64 total,
|
||||||
|
const int64 block_size);
|
||||||
|
|
||||||
|
// Returns the number of threads spawned by calling TransformRangeConcurrently
|
||||||
|
// with these parameters.
|
||||||
|
// Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling.
|
||||||
|
int NumShardsUsedByTransformRangeConcurrently(const int64 block_size,
|
||||||
|
const int64 total);
|
||||||
|
|
||||||
|
// ParallelFor shards the "total" units of work assuming each unit of work
|
||||||
|
// having roughly "cost_per_unit" cost, in cycles. Each unit of work is
|
||||||
|
// indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
|
||||||
|
// and the total cost of each shard is roughly the same.
|
||||||
|
//
|
||||||
|
// "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds
|
||||||
|
// if not CPU-bound) to complete a unit of work. Overestimating creates too
|
||||||
|
// many shards and CPU time will be dominated by per-shard overhead, such as
|
||||||
|
// Context creation. Underestimating may not fully make use of the specified
|
||||||
|
// parallelism, and may also cause inefficiencies due to load balancing
|
||||||
|
// issues and stragglers.
|
||||||
|
void ParallelFor(int64 total, int64 cost_per_unit,
|
||||||
|
const std::function<void(int64, int64)>& fn);
|
||||||
|
|
||||||
|
// Similar to ParallelFor above, but takes the specified scheduling strategy
|
||||||
|
// into account.
|
||||||
|
void ParallelFor(int64 total, const SchedulingParams& scheduling_params,
|
||||||
|
const std::function<void(int64, int64)>& fn);
|
||||||
|
|
||||||
|
// Same as ParallelFor with Fixed Block Size scheduling strategy.
|
||||||
|
// Deprecated. Prefer ParallelFor with a SchedulingStrategy argument.
|
||||||
|
void TransformRangeConcurrently(const int64 block_size, const int64 total,
|
||||||
|
const std::function<void(int64, int64)>& fn);
|
||||||
|
|
||||||
|
// Shards the "total" units of work. For more details, see "ParallelFor".
|
||||||
|
//
|
||||||
|
// The function is passed a thread_id between 0 and NumThreads() *inclusive*.
|
||||||
|
// This is because some work can happen on the caller thread while the threads
|
||||||
|
// in the pool are also being used.
|
||||||
|
//
|
||||||
|
// The caller can allocate NumThreads() + 1 separate buffers for each thread.
|
||||||
|
// Each thread can safely write to the buffer given by its id without
|
||||||
|
// synchronization. However, the worker fn may be called multiple times
|
||||||
|
// sequentially with the same id.
|
||||||
|
//
|
||||||
|
// At most NumThreads() unique ids will actually be used, and only a few may
|
||||||
|
// be used for small workloads. If each buffer is expensive, the buffers
|
||||||
|
// should be stored in an array initially filled with null, and a buffer
|
||||||
|
// should be allocated by fn the first time that the id is used.
|
||||||
|
void ParallelForWithWorkerId(
|
||||||
|
int64 total, int64 cost_per_unit,
|
||||||
|
const std::function<void(int64, int64, int)>& fn);
|
||||||
|
|
||||||
|
// Similar to ParallelForWithWorkerId above, but takes the specified
|
||||||
|
// scheduling strategy into account.
|
||||||
|
void ParallelForWithWorkerId(
|
||||||
|
int64 total, const SchedulingParams& scheduling_params,
|
||||||
|
const std::function<void(int64, int64, int)>& fn);
|
||||||
|
|
||||||
|
// Returns the number of threads in the pool.
|
||||||
|
int NumThreads() const;
|
||||||
|
|
||||||
|
// Returns current thread id between 0 and NumThreads() - 1, if called from a
|
||||||
|
// thread in the pool. Returns -1 otherwise.
|
||||||
|
int CurrentThreadId() const;
|
||||||
|
|
||||||
|
// If ThreadPool implementation is compatible with Eigen::ThreadPoolInterface,
|
||||||
|
// returns a non-null pointer. The caller does not own the object the returned
|
||||||
|
// pointer points to, and should not attempt to delete.
|
||||||
|
Eigen::ThreadPoolInterface* AsEigenThreadPool() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Divides the work represented by the range [0, total) into k shards.
|
||||||
|
// Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k).
|
||||||
|
// Each shard may be executed on a different thread in parallel, depending on
|
||||||
|
// the number of threads available in the pool.
|
||||||
|
// When (i+1)*block_size > total, fn(i*block_size, total) is called instead.
|
||||||
|
// Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size).
|
||||||
|
// Requires 0 < block_size <= total.
|
||||||
|
void ParallelForFixedBlockSizeScheduling(
|
||||||
|
const int64 total, const int64 block_size,
|
||||||
|
const std::function<void(int64, int64)>& fn);
|
||||||
|
|
||||||
|
// underlying_threadpool_ is the user_threadpool if user_threadpool is
|
||||||
|
// provided in the constructor. Otherwise it is the eigen_threadpool_.
|
||||||
|
Eigen::ThreadPoolInterface* underlying_threadpool_;
|
||||||
|
// eigen_threadpool_ is instantiated and owned by thread::ThreadPool if
|
||||||
|
// user_threadpool is not in the constructor.
|
||||||
|
std::unique_ptr<Eigen::ThreadPoolTempl<EigenEnvironment>> eigen_threadpool_;
|
||||||
|
std::unique_ptr<Eigen::ThreadPoolDevice> threadpool_device_;
|
||||||
|
TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool);
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace thread
|
||||||
|
} // namespace tensorflow
|
||||||
|
|
||||||
|
#endif // TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_
|
29
tensorflow/core/platform/threadpool_interface.h
Normal file
29
tensorflow/core/platform/threadpool_interface.h
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
==============================================================================*/
|
||||||
|
|
||||||
|
#ifndef TENSORFLOW_CORE_PLATFORM_THREADPOOL_INTERFACE_H_
|
||||||
|
#define TENSORFLOW_CORE_PLATFORM_THREADPOOL_INTERFACE_H_
|
||||||
|
|
||||||
|
#include "third_party/eigen3/unsupported/Eigen/CXX11/ThreadPool"
|
||||||
|
|
||||||
|
namespace tensorflow {
|
||||||
|
namespace thread {
|
||||||
|
|
||||||
|
class ThreadPoolInterface : public Eigen::ThreadPoolInterface {};
|
||||||
|
|
||||||
|
} // namespace thread
|
||||||
|
} // namespace tensorflow
|
||||||
|
|
||||||
|
#endif // TENSORFLOW_CORE_PLATFORM_THREADPOOL_INTERFACE_H_
|
35
tensorflow/core/platform/threadpool_options.h
Normal file
35
tensorflow/core/platform/threadpool_options.h
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
==============================================================================*/
|
||||||
|
|
||||||
|
#ifndef TENSORFLOW_CORE_PLATFORM_THREADPOOL_OPTIONS_H_
|
||||||
|
#define TENSORFLOW_CORE_PLATFORM_THREADPOOL_OPTIONS_H_
|
||||||
|
|
||||||
|
#include "tensorflow/core/platform/threadpool_interface.h"
|
||||||
|
|
||||||
|
namespace tensorflow {
|
||||||
|
namespace thread {
|
||||||
|
|
||||||
|
struct ThreadPoolOptions {
|
||||||
|
// If not null, use this threadpool to schedule inter-op operation
|
||||||
|
thread::ThreadPoolInterface* inter_op_threadpool;
|
||||||
|
|
||||||
|
// If not null, use this threadpool to schedule intra-op operation
|
||||||
|
thread::ThreadPoolInterface* intra_op_threadpool;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace thread
|
||||||
|
} // namespace tensorflow
|
||||||
|
|
||||||
|
#endif // TENSORFLOW_CORE_PLATFORM_THREADPOOL_OPTIONS_H_
|
Loading…
x
Reference in New Issue
Block a user