diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD index 5ea1d7a863d..6673f19e814 100644 --- a/tensorflow/core/BUILD +++ b/tensorflow/core/BUILD @@ -751,7 +751,6 @@ cc_library( "@com_google_absl//absl/container:inlined_vector", "@com_google_absl//absl/strings", "@com_google_absl//absl/types:optional", - "@com_google_absl//absl/types:variant", ], ) @@ -2487,7 +2486,6 @@ cc_library( ":lib_hash_crc32c_accelerate_internal", ":lib_proto_parsing", "@com_google_absl//absl/memory", - "@com_google_absl//absl/types:variant", "@com_google_absl//absl/strings", "//third_party/eigen3", "//tensorflow/core/lib/bfloat16", diff --git a/tensorflow/core/lib/core/threadpool.cc b/tensorflow/core/lib/core/threadpool.cc index 660819f55d4..e1a27d4f5a6 100644 --- a/tensorflow/core/lib/core/threadpool.cc +++ b/tensorflow/core/lib/core/threadpool.cc @@ -17,7 +17,6 @@ limitations under the License. #define EIGEN_USE_THREADS -#include "absl/types/variant.h" #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" #include "tensorflow/core/lib/core/blocking_counter.h" #include "tensorflow/core/platform/context.h" @@ -118,8 +117,8 @@ void ThreadPool::Schedule(std::function fn) { underlying_threadpool_->Schedule(std::move(fn)); } -int ThreadPool::NumShardsUsedByFixedBlockSizeScheduling( - const int64 total, const int64 block_size) { +int ThreadPool::NumShardsUsedByTransformRangeConcurrently( + const int64 block_size, const int64 total) { if (block_size <= 0 || total <= 1 || total <= block_size || NumThreads() == 1) { return 1; @@ -127,54 +126,13 @@ int ThreadPool::NumShardsUsedByFixedBlockSizeScheduling( return (total + block_size - 1) / block_size; } -int ThreadPool::NumShardsUsedByTransformRangeConcurrently( - const int64 block_size, const int64 total) { - return NumShardsUsedByFixedBlockSizeScheduling(total, block_size); -} - -void ThreadPool::ParallelFor( - int64 total, SchedulingStrategy strategy, - absl::variant - scheduling_params, - const std::function& fn) { - switch (strategy) { - case SchedulingStrategy::kUnknown: { - // Invalid scheduling strategy. Do nothing. - break; - } - case SchedulingStrategy::kAdaptive: { - const auto* params = - absl::get_if(&scheduling_params); - if (params) { - ParallelFor(total, params->cost_per_unit, fn); - } - break; - } - case SchedulingStrategy::kFixedBlockSize: { - const auto* params = - absl::get_if(&scheduling_params); - if (params) { - ParallelForFixedBlockSizeScheduling(params->block_size, total, fn); - } - break; - } - } -} - +// This functionality is similar to parallelFor, except that reasoning about +// the number of shards used is significantly easier. void ThreadPool::TransformRangeConcurrently( const int64 block_size, const int64 total, const std::function& fn) { - ParallelFor(total, SchedulingStrategy::kFixedBlockSize, - FixedBlockSizeSchedulingParams(block_size), fn); -} - -// This functionality is similar to parallelFor, except that reasoning about -// the number of shards used is significantly easier. -void ThreadPool::ParallelForFixedBlockSizeScheduling( - const int64 block_size, const int64 total, - const std::function& fn) { const int num_shards_used = - NumShardsUsedByFixedBlockSizeScheduling(total, block_size); + NumShardsUsedByTransformRangeConcurrently(block_size, total); if (num_shards_used == 1) { fn(0, total); return; @@ -208,7 +166,7 @@ void ThreadPool::ParallelForFixedBlockSizeScheduling( } void ThreadPool::ParallelFor(int64 total, int64 cost_per_unit, - const std::function& fn) { + std::function fn) { CHECK_GE(total, 0); CHECK_EQ(total, (int64)(Eigen::Index)total); threadpool_device_->parallelFor( @@ -235,21 +193,6 @@ void ThreadPool::ParallelForWithWorkerId( }); } -void ThreadPool::ParallelForWithWorkerId( - int64 total, SchedulingStrategy strategy, - absl::variant - scheduling_params, - const std::function& fn) { - ParallelFor(total, strategy, scheduling_params, - [this, &fn](int64 start, int64 limit) { - // We may use the current thread to do some work synchronously. - // When calling CurrentThreadId() from outside of the thread - // pool, we get -1, so we can shift every id up by 1. - int id = CurrentThreadId() + 1; - fn(start, limit, id); - }); -} - int ThreadPool::NumThreads() const { return underlying_threadpool_->NumThreads(); } diff --git a/tensorflow/core/lib/core/threadpool.h b/tensorflow/core/lib/core/threadpool.h index 54d120465fc..51aa83cc625 100644 --- a/tensorflow/core/lib/core/threadpool.h +++ b/tensorflow/core/lib/core/threadpool.h @@ -19,7 +19,6 @@ limitations under the License. #include #include -#include "absl/types/variant.h" #include "tensorflow/core/lib/core/threadpool_interface.h" #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/macros.h" @@ -41,56 +40,6 @@ struct EigenEnvironment; class ThreadPool { public: - // Scheduling strategies for ParallelFor. The strategy governs how the given - // units of work are distributed among the available threads in the - // threadpool. - enum class SchedulingStrategy { - // Unknown and invalid scheduling strategy. This will result in a no-op. - kUnknown, - // The Adaptive scheduling strategy adaptively chooses the shard sizes based - // on the cost of each unit of work, and the cost model of the underlying - // threadpool device. Requires an instance of AdaptiveSchedulingParams for - // the associated parameters. - kAdaptive, - // The Fixed Block Size scheduling strategy shards the given units of work - // into shards of fixed size. The shard size or block size is given by an - // instance of FixedBlockSizeSchedulingParams. - kFixedBlockSize - }; - - // Parameters for the 'Adaptive' scheduling strategy, which shards the given - // units of work based on the cost of each unit. The shard sizes are - // adaptively computed depending on the cost model of the underlying - // threadpool device. - // - // The 'cost_per_unit' is an estimate of the number of CPU cycles (or - // nanoseconds if not CPU-bound) to complete a unit of work. Overestimating - // creates too many shards and CPU time will be dominated by per-shard - // overhead, such as Context creation. Underestimating may not fully make use - // of the specified parallelism, and may also cause inefficiencies due to - // load balancing issues and stragglers. - struct AdaptiveSchedulingParams { - explicit AdaptiveSchedulingParams(int64 cost_per_unit) - : cost_per_unit(cost_per_unit) {} - const int64 cost_per_unit; - }; - - // Parameters for the 'FixedBlockSize' scheduling strategy. This strategy - // shards the given units of work into fixed block sizes. In case the total - // number of units is not evenly divisible by 'block_size', at most one of the - // shards have may be of smaller size. The exact number of shards may be found - // by a call to NumShardsUsedByFixedBlockSizeScheduling. - // - // Each shard may be executed on a different thread in parallel, depending on - // the number of threads available in the pool. Note that when there aren't - // enough threads in the pool to achieve full parallelism, function calls will - // be automatically queued. - struct FixedBlockSizeSchedulingParams { - explicit FixedBlockSizeSchedulingParams(int64 block_size) - : block_size(block_size) {} - const int64 block_size; - }; - // Constructs a pool that contains "num_threads" threads with specified // "name". env->StartThread() is used to create individual threads with the // given ThreadOptions. If "low_latency_hint" is true the thread pool @@ -134,15 +83,17 @@ class ThreadPool { const std::vector>& partitions); void ScheduleWithHint(std::function fn, int start, int limit); - - // Returns the number of shards used by ParallelForFixedBlockSizeScheduling - // with these parameters. - int NumShardsUsedByFixedBlockSizeScheduling(const int64 total, - const int64 block_size); + // Requires 0 < block_size <= total. + // Spawns k threads and calls fn(i*block_size, (i+1)*block_size) from the + // ith thread (i>=0). When (i+1)*block_size > total, fn(i*block_size, total) + // is called instead. k = NumShardsUsedByTransformRangeConcurrently(...). + // Note that when there aren't enough threads in the pool to achieve full + // parallelism, function calls will be automatically queued. + void TransformRangeConcurrently(const int64 block_size, const int64 total, + const std::function& fn); // Returns the number of threads spawned by calling TransformRangeConcurrently // with these parameters. - // Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling. int NumShardsUsedByTransformRangeConcurrently(const int64 block_size, const int64 total); @@ -155,23 +106,9 @@ class ThreadPool { // if not CPU-bound) to complete a unit of work. Overestimating creates too // many shards and CPU time will be dominated by per-shard overhead, such as // Context creation. Underestimating may not fully make use of the specified - // parallelism, and may also cause inefficiencies due to load balancing - // issues and stragglers. + // parallelism. void ParallelFor(int64 total, int64 cost_per_unit, - const std::function& fn); - - // Similar to ParallelFor above, but takes the specified scheduling strategy - // into account. - void ParallelFor( - int64 total, SchedulingStrategy strategy, - absl::variant - scheduling_params, - const std::function& fn); - - // Same as ParallelFor with Fixed Block Size scheduling strategy. - // Deprecated. Prefer ParallelFor with a SchedulingStrategy argument. - void TransformRangeConcurrently(const int64 block_size, const int64 total, - const std::function& fn); + std::function fn); // Shards the "total" units of work. For more details, see "ParallelFor". // @@ -192,14 +129,6 @@ class ThreadPool { int64 total, int64 cost_per_unit, const std::function& fn); - // Similar to ParallelForWithWorkerId above, but takes the specified - // scheduling strategy into account. - void ParallelForWithWorkerId( - int64 total, SchedulingStrategy strategy, - absl::variant - scheduling_params, - const std::function& fn); - // Returns the number of threads in the pool. int NumThreads() const; @@ -213,17 +142,6 @@ class ThreadPool { Eigen::ThreadPoolInterface* AsEigenThreadPool() const; private: - // Divides the work represented by the range [0, total) into k shards. - // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k). - // Each shard may be executed on a different thread in parallel, depending on - // the number of threads available in the pool. - // When (i+1)*block_size > total, fn(i*block_size, total) is called instead. - // Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size). - // Requires 0 < block_size <= total. - void ParallelForFixedBlockSizeScheduling( - const int64 block_size, const int64 total, - const std::function& fn); - // underlying_threadpool_ is the user_threadpool if user_threadpool is // provided in the constructor. Otherwise it is the eigen_threadpool_. Eigen::ThreadPoolInterface* underlying_threadpool_; diff --git a/tensorflow/core/lib/core/threadpool_test.cc b/tensorflow/core/lib/core/threadpool_test.cc index ebc6dae2932..f972fb4fb47 100644 --- a/tensorflow/core/lib/core/threadpool_test.cc +++ b/tensorflow/core/lib/core/threadpool_test.cc @@ -62,57 +62,7 @@ TEST(ThreadPool, DoWork) { } } -void RunWithFixedBlockSize(int64 block_size, int64 total, ThreadPool* threads) { - mutex mu; - int64 num_shards = 0; - int64 num_done_work = 0; - std::vector> work(total); - for (int i = 0; i < total; i++) { - work[i] = false; - } - threads->ParallelFor( - total, ThreadPool::SchedulingStrategy::kFixedBlockSize, - ThreadPool::FixedBlockSizeSchedulingParams(block_size), - [=, &mu, &num_shards, &num_done_work, &work](int64 start, int64 end) { - VLOG(1) << "Shard [" << start << "," << end << ")"; - EXPECT_GE(start, 0); - EXPECT_LE(end, total); - mutex_lock l(mu); - ++num_shards; - for (; start < end; ++start) { - EXPECT_FALSE(work[start].exchange(true)); // No duplicate - ++num_done_work; - } - }); - EXPECT_EQ(num_done_work, total); - for (int i = 0; i < total; i++) { - ASSERT_TRUE(work[i]); - } - const int64 num_workers = (total + block_size - 1) / block_size; - if (num_workers < threads->NumThreads()) { - // If the intention is to limit the parallelism explicitly, we'd - // better honor it. Ideally, even if per_thread_max_parallelism > - // num_workers, we should expect that Shard() implementation do - // not over-shard. Unfortunately, ThreadPoolDevice::parallelFor - // tends to over-shard. - EXPECT_LE(num_shards, 1 + num_workers); - } -} - -// Adapted from work_sharder_test.cc -TEST(ThreadPoolTest, ParallelForFixedBlockSizeScheduling) { - ThreadPool threads(Env::Default(), "test", 16); - for (auto block_size : {1, 7, 10, 64, 100, 256, 1000, 9999}) { - for (auto diff : {0, 1, 11, 102, 1003, 10005, 1000007}) { - const int64 total = block_size + diff; - RunWithFixedBlockSize(block_size, total, &threads); - } - } -} - -void RunWithFixedBlockSizeTransformRangeConcurrently(int64 block_size, - int64 total, - ThreadPool* threads) { +void RunSharding(int64 block_size, int64 total, ThreadPool* threads) { mutex mu; int64 num_shards = 0; int64 num_done_work = 0; @@ -133,6 +83,7 @@ void RunWithFixedBlockSizeTransformRangeConcurrently(int64 block_size, ++num_done_work; } }); + LOG(INFO) << block_size << " " << total; EXPECT_EQ(num_done_work, total); for (int i = 0; i < total; i++) { ASSERT_TRUE(work[i]); @@ -149,39 +100,18 @@ void RunWithFixedBlockSizeTransformRangeConcurrently(int64 block_size, } // Adapted from work_sharder_test.cc -TEST(ThreadPoolTest, TransformRangeConcurrently) { +TEST(SparseUtilsTest, TransformRangeConcurrently) { ThreadPool threads(Env::Default(), "test", 16); for (auto block_size : {1, 7, 10, 64, 100, 256, 1000, 9999}) { for (auto diff : {0, 1, 11, 102, 1003, 10005, 1000007}) { const int64 total = block_size + diff; - RunWithFixedBlockSizeTransformRangeConcurrently(block_size, total, - &threads); + RunSharding(block_size, total, &threads); } } } -TEST(ThreadPoolTest, NumShardsUsedByFixedBlockSizeScheduling) { +TEST(SparseUtilsTest, NumShardsUsedByTransformRangeConcurrently) { ThreadPool threads(Env::Default(), "test", 16); - - EXPECT_EQ(1, threads.NumShardsUsedByFixedBlockSizeScheduling( - 3 /* total */, 3 /* block_size */)); - EXPECT_EQ(2, threads.NumShardsUsedByFixedBlockSizeScheduling( - 4 /* total */, 3 /* block_size */)); - EXPECT_EQ(2, threads.NumShardsUsedByFixedBlockSizeScheduling( - 5 /* total */, 3 /* block_size */)); - EXPECT_EQ(2, threads.NumShardsUsedByFixedBlockSizeScheduling( - 6 /* total */, 3 /* block_size */)); - EXPECT_EQ(3, threads.NumShardsUsedByFixedBlockSizeScheduling( - 7 /* total */, 3 /* block_size */)); - EXPECT_EQ(7, threads.NumShardsUsedByFixedBlockSizeScheduling( - 7 /* total */, 1 /* block_size */)); - EXPECT_EQ(1, threads.NumShardsUsedByFixedBlockSizeScheduling( - 7 /* total */, 0 /* block_size */)); -} - -TEST(ThreadPoolTest, NumShardsUsedByTransformRangeConcurrently) { - ThreadPool threads(Env::Default(), "test", 16); - EXPECT_EQ(1, threads.NumShardsUsedByTransformRangeConcurrently( 3 /* block_size */, 3 /* total */)); EXPECT_EQ(2, threads.NumShardsUsedByTransformRangeConcurrently( @@ -198,61 +128,6 @@ TEST(ThreadPoolTest, NumShardsUsedByTransformRangeConcurrently) { 0 /* block_size */, 7 /* total */)); } -void RunShardingWithWorkerId(int64 block_size, int64 total, - ThreadPool* threads) { - mutex mu; - int64 num_done_work = 0; - std::vector> work(total); - for (int i = 0; i < total; i++) { - work[i] = false; - } - const int64 num_threads = threads->NumThreads(); - std::vector> threads_running(num_threads + 1); - for (int i = 0; i < num_threads + 1; i++) { - threads_running[i] = false; - } - - threads->ParallelForWithWorkerId( - total, ThreadPool::SchedulingStrategy::kAdaptive, - ThreadPool::AdaptiveSchedulingParams(block_size), - [=, &mu, &num_done_work, &work, &threads_running](int64 start, int64 end, - int id) { - VLOG(1) << "Shard [" << start << "," << end << ")"; - EXPECT_GE(start, 0); - EXPECT_LE(end, total); - - // Store true for the current thread, and assert that another thread - // is not running with the same id. - EXPECT_GE(id, 0); - EXPECT_LE(id, num_threads); - EXPECT_FALSE(threads_running[id].exchange(true)); - - mutex_lock l(mu); - for (; start < end; ++start) { - EXPECT_FALSE(work[start].exchange(true)); // No duplicate - ++num_done_work; - } - EXPECT_TRUE(threads_running[id].exchange(false)); - }); - - EXPECT_EQ(num_done_work, total); - for (int i = 0; i < total; i++) { - EXPECT_TRUE(work[i]); - } -} - -TEST(ThreadPoolTest, ParallelForFixedBlockSizeSchedulingWithWorkerId) { - for (int32 num_threads : {1, 2, 3, 9, 16, 31}) { - ThreadPool threads(Env::Default(), "test", num_threads); - for (int64 block_size : {1, 7, 10, 64, 100, 256, 1000}) { - for (int64 diff : {0, 1, 11, 102, 1003}) { - const int64 total = block_size + diff; - RunShardingWithWorkerId(block_size, total, &threads); - } - } - } -} - TEST(ThreadPool, ParallelFor) { Context outer_context(ContextKind::kThread); // Make ParallelFor use as many threads as possible. @@ -279,33 +154,6 @@ TEST(ThreadPool, ParallelFor) { } } -TEST(ThreadPool, ParallelForWithAdaptiveSchedulingStrategy) { - Context outer_context(ContextKind::kThread); - // Make ParallelFor use as many threads as possible. - int64 kHugeCost = 1 << 30; - for (int num_threads = 1; num_threads < kNumThreads; num_threads++) { - fprintf(stderr, "Testing with %d threads\n", num_threads); - const int kWorkItems = 15; - std::atomic work[kWorkItems]; - ThreadPool pool(Env::Default(), "test", num_threads); - for (int i = 0; i < kWorkItems; i++) { - work[i] = false; - } - pool.ParallelFor(kWorkItems, ThreadPool::SchedulingStrategy::kAdaptive, - ThreadPool::AdaptiveSchedulingParams(kHugeCost), - [&outer_context, &work](int64 begin, int64 end) { - Context inner_context(ContextKind::kThread); - ASSERT_EQ(outer_context, inner_context); - for (int64 i = begin; i < end; ++i) { - ASSERT_FALSE(work[i].exchange(true)); - } - }); - for (int i = 0; i < kWorkItems; i++) { - ASSERT_TRUE(work[i]); - } - } -} - TEST(ThreadPool, ParallelForWithWorkerId) { // Make ParallelForWithWorkerId use as many threads as possible. int64 kHugeCost = 1 << 30; diff --git a/tensorflow/core/util/work_sharder.cc b/tensorflow/core/util/work_sharder.cc index 58808e3a636..74f0713a618 100644 --- a/tensorflow/core/util/work_sharder.cc +++ b/tensorflow/core/util/work_sharder.cc @@ -45,15 +45,13 @@ void Shard(int max_parallelism, thread::ThreadPool* workers, int64 total, workers->ParallelFor(total, cost_per_unit, work); return; } - Sharder::Do( - total, cost_per_unit, work, - [&workers](Sharder::Closure c) { workers->Schedule(c); }, - max_parallelism); + Sharder::Do(total, cost_per_unit, work, + [&workers](Sharder::Closure c) { workers->Schedule(c); }, + max_parallelism); } -// DEPRECATED: Prefer threadpool->ParallelFor with SchedulingStrategy, which -// allows you to specify the strategy for choosing shard sizes, including using -// a fixed shard size. +// DEPRECATED: Prefer threadpool->TransformRangeConcurrently, which allows you +// to directly specify the shard size. void Sharder::Do(int64 total, int64 cost_per_unit, const Work& work, const Runner& runner, int max_parallelism) { cost_per_unit = std::max(int64{1}, cost_per_unit); diff --git a/tensorflow/core/util/work_sharder.h b/tensorflow/core/util/work_sharder.h index 3ec4f98351a..9db85a54c6c 100644 --- a/tensorflow/core/util/work_sharder.h +++ b/tensorflow/core/util/work_sharder.h @@ -23,9 +23,8 @@ limitations under the License. namespace tensorflow { -// DEPRECATED: Prefer threadpool->ParallelFor with SchedulingStrategy, which -// allows you to specify the strategy for choosing shard sizes, including using -// a fixed shard size. Use this function only if you want to +// DEPRECATED: Prefer threadpool->TransformRangeConcurrently, which allows you +// to directly specify the shard size. Use this function only if you want to // manually cap parallelism. // Shards the "total" unit of work assuming each unit of work having // roughly "cost_per_unit". Each unit of work is indexed 0, 1, ...,