diff --git a/tensorflow/core/framework/metrics.cc b/tensorflow/core/framework/metrics.cc index 4af3d7cffcf..576f0430126 100644 --- a/tensorflow/core/framework/metrics.cc +++ b/tensorflow/core/framework/metrics.cc @@ -89,6 +89,18 @@ auto* tf_data_getnext_duration_counter = monitoring::Sampler<0>::New( // Power of 2 with bucket count 10 (1024 ms) {monitoring::Buckets::Exponential(1, 2, 10)}); +auto* tf_data_getnext_time_between_ms_histogram = monitoring::Sampler<0>::New( + {"/tensorflow/data/getnext_time_between", + "Milliseconds spent in between calls to tf.data Dataset TF iterator."}, + // A typical training step is in the 200ms to 1 second range. + // Elapsed time less than 25ms are likely due to multiple devices calling + // the iterator's getNext() during the same step. + // Bucket density is highest for small time intervals to more accurately + // measure fast ingest rates. Step sizes are as follows: + {monitoring::Buckets::Explicit({25., 50., 75., 100., 125., 150., 175., 200., + 225., 250., 300., 350., 400., 450., 500., + 1000., 10000.})}); + auto* tf_data_optimization_counter = monitoring::Counter<1>::New( "/tensorflow/data/optimization", "tf.data optimization", "name"); @@ -170,6 +182,14 @@ void RecordTFDataGetNextDuration(uint64 duration_us) { tfdata_getnext_duration_cell->Add(duration_us); } +void RecordTFDataGetNextTimeBetween(uint64 duration_us) { + static auto* tfdata_getnext_time_between_cell = + tf_data_getnext_time_between_ms_histogram->GetCell(); + // Convert to milliseconds for histogram + const auto duration_ms = duration_us / 1000; + tfdata_getnext_time_between_cell->Add(duration_ms); +} + void RecordTFDataOptimization(const string& name, int64 num_changes) { tf_data_optimization_counter->GetCell(name)->IncrementBy(num_changes); } diff --git a/tensorflow/core/framework/metrics.h b/tensorflow/core/framework/metrics.h index 7d281f97c66..263fde272ab 100644 --- a/tensorflow/core/framework/metrics.h +++ b/tensorflow/core/framework/metrics.h @@ -59,6 +59,12 @@ void RecordTFDataBytesFetched(int64 num_bytes); // Records the time spent in ItertatorResource::GetNext() in microseconds. void RecordTFDataGetNextDuration(uint64 duration_us); +// Records the time spent between IteratorResource::GetNext() calls +// in microseconds. Time is measured from the point of returning data from +// GetNext() to the point of new data being requested. +// This elapsed time corresponds to time spent outside the GetNext() function. +void RecordTFDataGetNextTimeBetween(uint64 duration_us); + // Records the number of times each tf.data fingerprint is used // to measure duplicate pre-processing. // diff --git a/tensorflow/core/kernels/data/iterator_ops.cc b/tensorflow/core/kernels/data/iterator_ops.cc index eebcd4a8248..d91f91bd684 100644 --- a/tensorflow/core/kernels/data/iterator_ops.cc +++ b/tensorflow/core/kernels/data/iterator_ops.cc @@ -91,11 +91,14 @@ Status IteratorResource::GetNext(OpKernelContext* ctx, [cm = params.cancellation_manager]() { cm->StartCancel(); }, &deregister_fn)); auto cleanup = gtl::MakeCleanup(std::move(deregister_fn)); - uint64 start_time_us = ctx->env()->NowMicros(); + RecordCtx record_ctx = CreateRecordCtx(); // Snapshot state prior to work + // TODO(mkuchnik): Replace wallclock time with steady clock + const uint64 start_time_us = ctx->env()->NowMicros(); + RecordGetNextStart(record_ctx, start_time_us); auto val = captured_state->iterator->GetNext( IteratorContext(std::move(params)), out_tensors, end_of_sequence); - metrics::RecordTFDataGetNextDuration(ctx->env()->NowMicros() - - start_time_us); + const uint64 end_time_us = ctx->env()->NowMicros(); + RecordGetNextEnd(record_ctx, end_time_us); metrics::RecordTFDataBytesFetched(GetTotalBytes(*out_tensors)); return val; } @@ -206,6 +209,71 @@ Status IteratorResource::SetIteratorFromDataset(OpKernelContext* ctx, return Status::OK(); } +IteratorResource::RecordCtx IteratorResource::CreateRecordCtx() + TF_LOCKS_EXCLUDED(mu_) { + IteratorResource::RecordCtx record_ctx; + { + tf_shared_lock l(mu_); + record_ctx.last_get_next_end_time_us = + iterator_state_->last_get_next_end_time_us; + } + return record_ctx; +} + +void IteratorResource::RecordGetNextStart( + IteratorResource::RecordCtx& record_ctx, const uint64 start_time_us) { + record_ctx.get_next_start_time_us = start_time_us; + uint64 last_end_time_us = record_ctx.last_get_next_end_time_us; + + // Records the total amount of time that has elapsed between GetNext() + // calls. The time between calls is measured from the point of returning + // data from GetNext() to the point of requesting data from GetNext(). + // A steady clock is preferable. There are three parts to the algorithm + // under concurrency which maintain the thread local invariant + // last_end_time_us <= start_time_us <= end_time_us and the + // IteratorResource invariant that last_end_time_us is increasing: + // 1) CreateRecordCtx() is called, which copies the + // last_get_next_end_time_us into a thread-local structure + // 2) RecordGetNextStart is called with a clock measured after 1), + // thus ensuring that local start_time_us >= last_get_next_end_time_us + // 3) RecordGetNextEnd is called with a clock measured after 2), + // thus ensuring that local end_time_us >= start_time_us. Additionally, + // this function updates the IteratorResource last_get_next_end_time_us + // with the most recent time. Thus, if two threads call this method, + // only the most recent one is visible in the time. + // It's worth noting that a mutex over all three pieces may be needed for + // strict serialization correctness (i.e., local time may grow stale). + if (last_end_time_us) { // last_end_time_us is initialized at 0 + if (start_time_us >= last_end_time_us) { + const uint64 get_next_time_between = start_time_us - last_end_time_us; + metrics::RecordTFDataGetNextTimeBetween(get_next_time_between); + } else { + // Clock went backward (not steady). + metrics::RecordTFDataGetNextTimeBetween(0); + } + } +} + +void IteratorResource::RecordGetNextEnd( + const IteratorResource::RecordCtx& record_ctx, const uint64 end_time_us) + TF_LOCKS_EXCLUDED(mu_) { + uint64 start_time_us = record_ctx.get_next_start_time_us; + { + mutex_lock l(mu_); + // Move last_end_time forward if more recent + iterator_state_->last_get_next_end_time_us = + std::max(end_time_us, iterator_state_->last_get_next_end_time_us); + } + DCHECK_NE(start_time_us, 0); + if (end_time_us >= start_time_us) { + const uint64 get_next_duration = end_time_us - start_time_us; + metrics::RecordTFDataGetNextDuration(get_next_duration); + } else { + // Clock went backward (not steady). + metrics::RecordTFDataGetNextDuration(0); + } +} + namespace { // Wrapper for encoding/decoding the iterator state stored in a Variant tensor. diff --git a/tensorflow/core/kernels/data/iterator_ops.h b/tensorflow/core/kernels/data/iterator_ops.h index 938b218bcb7..a6a04e502ad 100644 --- a/tensorflow/core/kernels/data/iterator_ops.h +++ b/tensorflow/core/kernels/data/iterator_ops.h @@ -92,7 +92,8 @@ class IteratorResource : public ResourceBase { flr(flr), pflr(std::move(pflr)), function_handle_cache(absl::make_unique(flr)), - iterator(std::move(iterator)) {} + iterator(std::move(iterator)), + last_get_next_end_time_us(0) {} ~State() { cancellation_manager.StartCancel(); } @@ -109,8 +110,29 @@ class IteratorResource : public ResourceBase { ResourceMgr resource_mgr; CancellationManager cancellation_manager; std::unique_ptr iterator; + uint64 last_get_next_end_time_us; }; + // For thread-local record-keeping state + struct RecordCtx { + RecordCtx() : get_next_start_time_us(0), last_get_next_end_time_us(0) {} + + uint64 get_next_start_time_us; + uint64 last_get_next_end_time_us; + }; + + // Copies relevant state to the RecordCtx + // Intended to be followed by RecordGetNextStart and RecordGetNextEnd. + // Recorded times must be measured after this call to enforce ordering. + RecordCtx CreateRecordCtx() TF_LOCKS_EXCLUDED(mu_); + + // Records that GetNext() has started work. + void RecordGetNextStart(RecordCtx& record_ctx, const uint64 start_time_us); + + // Records that GetNext() has ended work. + void RecordGetNextEnd(const RecordCtx& record_ctx, const uint64 end_time_us) + TF_LOCKS_EXCLUDED(mu_); + UnboundedThreadPool unbounded_thread_pool_; mutex mu_; const std::unique_ptr device_mgr_ TF_GUARDED_BY(mu_);