From 7fe4876d7019db727459d8e303906cb4ac56643a Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Fri, 26 Oct 2018 09:36:29 -0700 Subject: [PATCH 1/6] Add runner_threadpool_size into IteratorContext --- tensorflow/core/framework/dataset.h | 10 ++++++- .../experimental/threadpool_dataset_op.cc | 3 +++ tensorflow/core/kernels/data/iterator_ops.cc | 27 +++++++++++++++++++ .../kernels/data/parallel_map_iterator.cc | 4 +-- 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/tensorflow/core/framework/dataset.h b/tensorflow/core/framework/dataset.h index b4cd2751319..ffd6b620258 100644 --- a/tensorflow/core/framework/dataset.h +++ b/tensorflow/core/framework/dataset.h @@ -279,12 +279,15 @@ class IteratorContext { lib(ctx->lib()), model(ctx->model()), runner(*(ctx->runner())), + runner_threadpool_size(ctx->runner_threadpool_size()), stats_aggregator(ctx->stats_aggregator()) {} explicit Params(OpKernelContext* ctx) : env(ctx->env()), lib(ctx->function_library()), - runner(*(ctx->runner())) { + runner(*(ctx->runner())), + runner_threadpool_size( + ctx->device()->tensorflow_cpu_worker_threads()->num_threads) { // NOTE: need reinterpret_cast because function.h forward-declares Device. DeviceBase* device = reinterpret_cast(ctx->function_library()->device()); @@ -311,6 +314,9 @@ class IteratorContext { // Function call support. std::function)> runner = nullptr; + // Number of threads used for executing user-defined functions. + int32 runner_threadpool_size = 0; + // The `StatsAggregator` object to record statistics about the iterator. std::shared_ptr stats_aggregator = nullptr; }; @@ -343,6 +349,8 @@ class IteratorContext { return ¶ms_.runner; } + int32 runner_threadpool_size() { return params_.runner_threadpool_size; } + std::shared_ptr stats_aggregator() { return params_.stats_aggregator; } diff --git a/tensorflow/core/kernels/data/experimental/threadpool_dataset_op.cc b/tensorflow/core/kernels/data/experimental/threadpool_dataset_op.cc index 56fbbde1a3a..ab21dfc6bc5 100644 --- a/tensorflow/core/kernels/data/experimental/threadpool_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/threadpool_dataset_op.cc @@ -47,6 +47,8 @@ class ThreadPoolResource : public ResourceBase { } } + int32 NumThreads() { return thread_pool_.NumThreads(); } + string DebugString() override { return "ThreadPoolResource"; } private: @@ -196,6 +198,7 @@ class ThreadPoolDatasetOp : public UnaryDatasetOpKernel { params.runner = [pool](std::function c) { pool->Schedule(std::move(c)); }; + params.runner_threadpool_size = pool->NumThreads(); IteratorContext iter_ctx(params); return input_impl_->GetNext(&iter_ctx, out_tensors, end_of_sequence); } diff --git a/tensorflow/core/kernels/data/iterator_ops.cc b/tensorflow/core/kernels/data/iterator_ops.cc index 445718ba1e5..bb6b5fba06a 100644 --- a/tensorflow/core/kernels/data/iterator_ops.cc +++ b/tensorflow/core/kernels/data/iterator_ops.cc @@ -145,6 +145,8 @@ class IteratorResource : public ResourceBase { params.allocator_getter = [device](AllocatorAttributes attrs) { return device->GetAllocator(attrs); }; + params.runner_threadpool_size = + ctx->device()->tensorflow_cpu_worker_threads()->num_threads; IteratorContext iter_ctx(std::move(params)); TF_RETURN_IF_ERROR(captured_iterator->Restore(&iter_ctx, reader)); mutex_lock l(mu_); @@ -978,8 +980,20 @@ void IteratorGetNextOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) { IteratorContext::Params params(ctx); params.function_library = iterator->function_library(); +<<<<<<< HEAD Status s = iterator->GetNext(IteratorContext(std::move(params)), &components, &end_of_sequence); +======= + DeviceBase* device = ctx->function_library()->device(); + params.allocator_getter = [device](AllocatorAttributes attrs) { + return device->GetAllocator(attrs); + }; + params.runner_threadpool_size = + ctx->device()->tensorflow_cpu_worker_threads()->num_threads; + IteratorContext iter_ctx(std::move(params)); + + Status s = iterator->GetNext(&iter_ctx, &components, &end_of_sequence); +>>>>>>> Add runner_threadpool_size into IteratorContext // NOTE(mrry): We must unref the iterator before calling `done()`, to // avoid destruction races. iterator->Unref(); @@ -1007,8 +1021,21 @@ void IteratorGetNextSyncOp::Compute(OpKernelContext* ctx) { bool end_of_sequence = false; IteratorContext::Params params(ctx); params.function_library = iterator->function_library(); +<<<<<<< HEAD OP_REQUIRES_OK(ctx, iterator->GetNext(IteratorContext(std::move(params)), &components, &end_of_sequence)); +======= + DeviceBase* device = ctx->function_library()->device(); + params.allocator_getter = [device](AllocatorAttributes attrs) { + return device->GetAllocator(attrs); + }; + params.runner_threadpool_size = + ctx->device()->tensorflow_cpu_worker_threads()->num_threads; + IteratorContext iter_ctx(std::move(params)); + + OP_REQUIRES_OK(ctx, + iterator->GetNext(&iter_ctx, &components, &end_of_sequence)); +>>>>>>> Add runner_threadpool_size into IteratorContext OP_REQUIRES(ctx, !end_of_sequence, errors::OutOfRange("End of sequence")); for (int i = 0; i < components.size(); ++i) { diff --git a/tensorflow/core/kernels/data/parallel_map_iterator.cc b/tensorflow/core/kernels/data/parallel_map_iterator.cc index 1fa9a1fdc50..10103230950 100644 --- a/tensorflow/core/kernels/data/parallel_map_iterator.cc +++ b/tensorflow/core/kernels/data/parallel_map_iterator.cc @@ -65,9 +65,7 @@ class ParallelMapIterator : public DatasetBaseIterator { Status Initialize(IteratorContext* ctx) override { mutex_lock l(*mu_); if (num_parallel_calls_->value == kAutoTune) { - // TODO(jsimsa): Surface the number of threads used by `ctx->runner()` and - // use it here for the default. - num_parallel_calls_->value = port::NumSchedulableCPUs(); + num_parallel_calls_->value = ctx->runner_threadpool_size(); num_parallel_calls_->tunable = true; } TF_RETURN_IF_ERROR( From 8f9a977a60baa2aeb101d71448f9b87f0f3f5a37 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Mon, 5 Nov 2018 13:13:56 -0800 Subject: [PATCH 2/6] Resolve the code conflicts --- tensorflow/core/kernels/data/iterator_ops.cc | 27 +------------------- 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/tensorflow/core/kernels/data/iterator_ops.cc b/tensorflow/core/kernels/data/iterator_ops.cc index bb6b5fba06a..92bbfdbea5c 100644 --- a/tensorflow/core/kernels/data/iterator_ops.cc +++ b/tensorflow/core/kernels/data/iterator_ops.cc @@ -980,20 +980,8 @@ void IteratorGetNextOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) { IteratorContext::Params params(ctx); params.function_library = iterator->function_library(); -<<<<<<< HEAD Status s = iterator->GetNext(IteratorContext(std::move(params)), &components, &end_of_sequence); -======= - DeviceBase* device = ctx->function_library()->device(); - params.allocator_getter = [device](AllocatorAttributes attrs) { - return device->GetAllocator(attrs); - }; - params.runner_threadpool_size = - ctx->device()->tensorflow_cpu_worker_threads()->num_threads; - IteratorContext iter_ctx(std::move(params)); - - Status s = iterator->GetNext(&iter_ctx, &components, &end_of_sequence); ->>>>>>> Add runner_threadpool_size into IteratorContext // NOTE(mrry): We must unref the iterator before calling `done()`, to // avoid destruction races. iterator->Unref(); @@ -1021,21 +1009,9 @@ void IteratorGetNextSyncOp::Compute(OpKernelContext* ctx) { bool end_of_sequence = false; IteratorContext::Params params(ctx); params.function_library = iterator->function_library(); -<<<<<<< HEAD + OP_REQUIRES_OK(ctx, iterator->GetNext(IteratorContext(std::move(params)), &components, &end_of_sequence)); -======= - DeviceBase* device = ctx->function_library()->device(); - params.allocator_getter = [device](AllocatorAttributes attrs) { - return device->GetAllocator(attrs); - }; - params.runner_threadpool_size = - ctx->device()->tensorflow_cpu_worker_threads()->num_threads; - IteratorContext iter_ctx(std::move(params)); - - OP_REQUIRES_OK(ctx, - iterator->GetNext(&iter_ctx, &components, &end_of_sequence)); ->>>>>>> Add runner_threadpool_size into IteratorContext OP_REQUIRES(ctx, !end_of_sequence, errors::OutOfRange("End of sequence")); for (int i = 0; i < components.size(); ++i) { @@ -1236,7 +1212,6 @@ class DeserializeIteratorOp : public OpKernel { } }; - REGISTER_KERNEL_BUILDER(Name("Iterator").Device(DEVICE_CPU), IteratorHandleOp); REGISTER_KERNEL_BUILDER(Name("IteratorV2").Device(DEVICE_CPU), IteratorHandleOp); From 361093aea98d5fb0fe93711324580a8d55a6ca13 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Mon, 5 Nov 2018 14:34:48 -0800 Subject: [PATCH 3/6] Clean code and resolve the style issue --- tensorflow/core/kernels/data/iterator_ops.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tensorflow/core/kernels/data/iterator_ops.cc b/tensorflow/core/kernels/data/iterator_ops.cc index 92bbfdbea5c..445718ba1e5 100644 --- a/tensorflow/core/kernels/data/iterator_ops.cc +++ b/tensorflow/core/kernels/data/iterator_ops.cc @@ -145,8 +145,6 @@ class IteratorResource : public ResourceBase { params.allocator_getter = [device](AllocatorAttributes attrs) { return device->GetAllocator(attrs); }; - params.runner_threadpool_size = - ctx->device()->tensorflow_cpu_worker_threads()->num_threads; IteratorContext iter_ctx(std::move(params)); TF_RETURN_IF_ERROR(captured_iterator->Restore(&iter_ctx, reader)); mutex_lock l(mu_); @@ -1009,7 +1007,6 @@ void IteratorGetNextSyncOp::Compute(OpKernelContext* ctx) { bool end_of_sequence = false; IteratorContext::Params params(ctx); params.function_library = iterator->function_library(); - OP_REQUIRES_OK(ctx, iterator->GetNext(IteratorContext(std::move(params)), &components, &end_of_sequence)); OP_REQUIRES(ctx, !end_of_sequence, errors::OutOfRange("End of sequence")); @@ -1212,6 +1209,7 @@ class DeserializeIteratorOp : public OpKernel { } }; + REGISTER_KERNEL_BUILDER(Name("Iterator").Device(DEVICE_CPU), IteratorHandleOp); REGISTER_KERNEL_BUILDER(Name("IteratorV2").Device(DEVICE_CPU), IteratorHandleOp); From 634005cb53d2a136c04885ffa6cbf5323649fe32 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Mon, 5 Nov 2018 15:42:34 -0800 Subject: [PATCH 4/6] Surface the number of threads used by ctx->runner() --- tensorflow/core/kernels/data/map_and_batch_dataset_op.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc b/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc index 7790d133203..d7db3c4d183 100644 --- a/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc +++ b/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc @@ -262,9 +262,7 @@ class MapAndBatchDatasetOp : public UnaryDatasetOpKernel { Status Initialize(IteratorContext* ctx) override { mutex_lock l(*mu_); if (num_parallel_calls_->value == kAutoTune) { - // TODO(jsimsa): Surface the number of threads used by `ctx->runner()` - // and use it here for the default. - num_parallel_calls_->value = port::NumSchedulableCPUs(); + num_parallel_calls_->value = ctx->runner_threadpool_size(); num_parallel_calls_->tunable = true; } TF_RETURN_IF_ERROR( From 0ff931a5965241ffdb1551493617b6d9592976e8 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Mon, 5 Nov 2018 15:50:22 -0800 Subject: [PATCH 5/6] Surface the number of threads used by ctx->runner() for numa_map_and_batch_dataset_op.cc --- .../kernels/data/experimental/numa_map_and_batch_dataset_op.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc b/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc index 677141a89d6..1286795a0b2 100644 --- a/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc @@ -201,7 +201,7 @@ class NumaMapAndBatchDatasetOp : public UnaryDatasetOpKernel { Status Initialize(IteratorContext* ctx) override { mutex_lock l(*mu_); if (num_parallel_calls_->value == kAutoTune) { - num_parallel_calls_->value = port::NumSchedulableCPUs(); + num_parallel_calls_->value = ctx->runner_threadpool_size(); num_parallel_calls_->tunable = true; } TF_RETURN_IF_ERROR( From 689d9974b95a764759be2028cff36735fde3001f Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Mon, 5 Nov 2018 16:05:42 -0800 Subject: [PATCH 6/6] Change the maximum input for model::MakeParameter::parallelism --- .../kernels/data/experimental/numa_map_and_batch_dataset_op.cc | 2 +- tensorflow/core/kernels/data/map_and_batch_dataset_op.cc | 2 +- tensorflow/core/kernels/data/parallel_map_iterator.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc b/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc index 1286795a0b2..068f8540230 100644 --- a/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/numa_map_and_batch_dataset_op.cc @@ -244,7 +244,7 @@ class NumaMapAndBatchDatasetOp : public UnaryDatasetOpKernel { return model::MakeAsyncKnownRatioNode( std::move(args), dataset()->batch_size_, {model::MakeParameter("parallelism", num_parallel_calls_, /*min=*/1, - /*max=*/port::NumSchedulableCPUs())}); + /*max=*/ctx->runner_threadpool_size())}); } Status SaveInternal(IteratorStateWriter* writer) override { diff --git a/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc b/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc index d7db3c4d183..31851925124 100644 --- a/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc +++ b/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc @@ -296,7 +296,7 @@ class MapAndBatchDatasetOp : public UnaryDatasetOpKernel { return model::MakeAsyncKnownRatioNode( std::move(args), dataset()->batch_size_, {model::MakeParameter("parallelism", num_parallel_calls_, /*min=*/1, - /*max=*/port::NumSchedulableCPUs())}); + /*max=*/ctx->runner_threadpool_size())}); } Status SaveInternal(IteratorStateWriter* writer) override { diff --git a/tensorflow/core/kernels/data/parallel_map_iterator.cc b/tensorflow/core/kernels/data/parallel_map_iterator.cc index 10103230950..ec1c9238430 100644 --- a/tensorflow/core/kernels/data/parallel_map_iterator.cc +++ b/tensorflow/core/kernels/data/parallel_map_iterator.cc @@ -101,7 +101,7 @@ class ParallelMapIterator : public DatasetBaseIterator { std::move(args), /*ratio=*/1, {model::MakeParameter("parallelism", num_parallel_calls_, /*min=*/1, - /*max=*/port::NumSchedulableCPUs())}); + /*max=*/ctx->runner_threadpool_size())}); } Status SaveInternal(IteratorStateWriter* writer) override {