[tf.data] Adds a histogram for elapsed time between successive calls to tf.data's iterator.getNext().
PiperOrigin-RevId: 319143835 Change-Id: Id79b746f9f7c4fffe766fdbe68d6b99f2dcf733e
This commit is contained in:
parent
696d3c5049
commit
70d1d81d08
@ -89,6 +89,18 @@ auto* tf_data_getnext_duration_counter = monitoring::Sampler<0>::New(
|
||||
// Power of 2 with bucket count 10 (1024 ms)
|
||||
{monitoring::Buckets::Exponential(1, 2, 10)});
|
||||
|
||||
auto* tf_data_getnext_time_between_ms_histogram = monitoring::Sampler<0>::New(
|
||||
{"/tensorflow/data/getnext_time_between",
|
||||
"Milliseconds spent in between calls to tf.data Dataset TF iterator."},
|
||||
// A typical training step is in the 200ms to 1 second range.
|
||||
// Elapsed time less than 25ms are likely due to multiple devices calling
|
||||
// the iterator's getNext() during the same step.
|
||||
// Bucket density is highest for small time intervals to more accurately
|
||||
// measure fast ingest rates. Step sizes are as follows:
|
||||
{monitoring::Buckets::Explicit({25., 50., 75., 100., 125., 150., 175., 200.,
|
||||
225., 250., 300., 350., 400., 450., 500.,
|
||||
1000., 10000.})});
|
||||
|
||||
auto* tf_data_optimization_counter = monitoring::Counter<1>::New(
|
||||
"/tensorflow/data/optimization", "tf.data optimization", "name");
|
||||
|
||||
@ -170,6 +182,14 @@ void RecordTFDataGetNextDuration(uint64 duration_us) {
|
||||
tfdata_getnext_duration_cell->Add(duration_us);
|
||||
}
|
||||
|
||||
void RecordTFDataGetNextTimeBetween(uint64 duration_us) {
|
||||
static auto* tfdata_getnext_time_between_cell =
|
||||
tf_data_getnext_time_between_ms_histogram->GetCell();
|
||||
// Convert to milliseconds for histogram
|
||||
const auto duration_ms = duration_us / 1000;
|
||||
tfdata_getnext_time_between_cell->Add(duration_ms);
|
||||
}
|
||||
|
||||
void RecordTFDataOptimization(const string& name, int64 num_changes) {
|
||||
tf_data_optimization_counter->GetCell(name)->IncrementBy(num_changes);
|
||||
}
|
||||
|
@ -59,6 +59,12 @@ void RecordTFDataBytesFetched(int64 num_bytes);
|
||||
// Records the time spent in ItertatorResource::GetNext() in microseconds.
|
||||
void RecordTFDataGetNextDuration(uint64 duration_us);
|
||||
|
||||
// Records the time spent between IteratorResource::GetNext() calls
|
||||
// in microseconds. Time is measured from the point of returning data from
|
||||
// GetNext() to the point of new data being requested.
|
||||
// This elapsed time corresponds to time spent outside the GetNext() function.
|
||||
void RecordTFDataGetNextTimeBetween(uint64 duration_us);
|
||||
|
||||
// Records the number of times each tf.data fingerprint is used
|
||||
// to measure duplicate pre-processing.
|
||||
//
|
||||
|
@ -91,11 +91,14 @@ Status IteratorResource::GetNext(OpKernelContext* ctx,
|
||||
[cm = params.cancellation_manager]() { cm->StartCancel(); },
|
||||
&deregister_fn));
|
||||
auto cleanup = gtl::MakeCleanup(std::move(deregister_fn));
|
||||
uint64 start_time_us = ctx->env()->NowMicros();
|
||||
RecordCtx record_ctx = CreateRecordCtx(); // Snapshot state prior to work
|
||||
// TODO(mkuchnik): Replace wallclock time with steady clock
|
||||
const uint64 start_time_us = ctx->env()->NowMicros();
|
||||
RecordGetNextStart(record_ctx, start_time_us);
|
||||
auto val = captured_state->iterator->GetNext(
|
||||
IteratorContext(std::move(params)), out_tensors, end_of_sequence);
|
||||
metrics::RecordTFDataGetNextDuration(ctx->env()->NowMicros() -
|
||||
start_time_us);
|
||||
const uint64 end_time_us = ctx->env()->NowMicros();
|
||||
RecordGetNextEnd(record_ctx, end_time_us);
|
||||
metrics::RecordTFDataBytesFetched(GetTotalBytes(*out_tensors));
|
||||
return val;
|
||||
}
|
||||
@ -206,6 +209,71 @@ Status IteratorResource::SetIteratorFromDataset(OpKernelContext* ctx,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
IteratorResource::RecordCtx IteratorResource::CreateRecordCtx()
|
||||
TF_LOCKS_EXCLUDED(mu_) {
|
||||
IteratorResource::RecordCtx record_ctx;
|
||||
{
|
||||
tf_shared_lock l(mu_);
|
||||
record_ctx.last_get_next_end_time_us =
|
||||
iterator_state_->last_get_next_end_time_us;
|
||||
}
|
||||
return record_ctx;
|
||||
}
|
||||
|
||||
void IteratorResource::RecordGetNextStart(
|
||||
IteratorResource::RecordCtx& record_ctx, const uint64 start_time_us) {
|
||||
record_ctx.get_next_start_time_us = start_time_us;
|
||||
uint64 last_end_time_us = record_ctx.last_get_next_end_time_us;
|
||||
|
||||
// Records the total amount of time that has elapsed between GetNext()
|
||||
// calls. The time between calls is measured from the point of returning
|
||||
// data from GetNext() to the point of requesting data from GetNext().
|
||||
// A steady clock is preferable. There are three parts to the algorithm
|
||||
// under concurrency which maintain the thread local invariant
|
||||
// last_end_time_us <= start_time_us <= end_time_us and the
|
||||
// IteratorResource invariant that last_end_time_us is increasing:
|
||||
// 1) CreateRecordCtx() is called, which copies the
|
||||
// last_get_next_end_time_us into a thread-local structure
|
||||
// 2) RecordGetNextStart is called with a clock measured after 1),
|
||||
// thus ensuring that local start_time_us >= last_get_next_end_time_us
|
||||
// 3) RecordGetNextEnd is called with a clock measured after 2),
|
||||
// thus ensuring that local end_time_us >= start_time_us. Additionally,
|
||||
// this function updates the IteratorResource last_get_next_end_time_us
|
||||
// with the most recent time. Thus, if two threads call this method,
|
||||
// only the most recent one is visible in the time.
|
||||
// It's worth noting that a mutex over all three pieces may be needed for
|
||||
// strict serialization correctness (i.e., local time may grow stale).
|
||||
if (last_end_time_us) { // last_end_time_us is initialized at 0
|
||||
if (start_time_us >= last_end_time_us) {
|
||||
const uint64 get_next_time_between = start_time_us - last_end_time_us;
|
||||
metrics::RecordTFDataGetNextTimeBetween(get_next_time_between);
|
||||
} else {
|
||||
// Clock went backward (not steady).
|
||||
metrics::RecordTFDataGetNextTimeBetween(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void IteratorResource::RecordGetNextEnd(
|
||||
const IteratorResource::RecordCtx& record_ctx, const uint64 end_time_us)
|
||||
TF_LOCKS_EXCLUDED(mu_) {
|
||||
uint64 start_time_us = record_ctx.get_next_start_time_us;
|
||||
{
|
||||
mutex_lock l(mu_);
|
||||
// Move last_end_time forward if more recent
|
||||
iterator_state_->last_get_next_end_time_us =
|
||||
std::max(end_time_us, iterator_state_->last_get_next_end_time_us);
|
||||
}
|
||||
DCHECK_NE(start_time_us, 0);
|
||||
if (end_time_us >= start_time_us) {
|
||||
const uint64 get_next_duration = end_time_us - start_time_us;
|
||||
metrics::RecordTFDataGetNextDuration(get_next_duration);
|
||||
} else {
|
||||
// Clock went backward (not steady).
|
||||
metrics::RecordTFDataGetNextDuration(0);
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// Wrapper for encoding/decoding the iterator state stored in a Variant tensor.
|
||||
|
@ -92,7 +92,8 @@ class IteratorResource : public ResourceBase {
|
||||
flr(flr),
|
||||
pflr(std::move(pflr)),
|
||||
function_handle_cache(absl::make_unique<FunctionHandleCache>(flr)),
|
||||
iterator(std::move(iterator)) {}
|
||||
iterator(std::move(iterator)),
|
||||
last_get_next_end_time_us(0) {}
|
||||
|
||||
~State() { cancellation_manager.StartCancel(); }
|
||||
|
||||
@ -109,8 +110,29 @@ class IteratorResource : public ResourceBase {
|
||||
ResourceMgr resource_mgr;
|
||||
CancellationManager cancellation_manager;
|
||||
std::unique_ptr<DatasetBaseIterator> iterator;
|
||||
uint64 last_get_next_end_time_us;
|
||||
};
|
||||
|
||||
// For thread-local record-keeping state
|
||||
struct RecordCtx {
|
||||
RecordCtx() : get_next_start_time_us(0), last_get_next_end_time_us(0) {}
|
||||
|
||||
uint64 get_next_start_time_us;
|
||||
uint64 last_get_next_end_time_us;
|
||||
};
|
||||
|
||||
// Copies relevant state to the RecordCtx
|
||||
// Intended to be followed by RecordGetNextStart and RecordGetNextEnd.
|
||||
// Recorded times must be measured after this call to enforce ordering.
|
||||
RecordCtx CreateRecordCtx() TF_LOCKS_EXCLUDED(mu_);
|
||||
|
||||
// Records that GetNext() has started work.
|
||||
void RecordGetNextStart(RecordCtx& record_ctx, const uint64 start_time_us);
|
||||
|
||||
// Records that GetNext() has ended work.
|
||||
void RecordGetNextEnd(const RecordCtx& record_ctx, const uint64 end_time_us)
|
||||
TF_LOCKS_EXCLUDED(mu_);
|
||||
|
||||
UnboundedThreadPool unbounded_thread_pool_;
|
||||
mutex mu_;
|
||||
const std::unique_ptr<DeviceMgr> device_mgr_ TF_GUARDED_BY(mu_);
|
||||
|
Loading…
x
Reference in New Issue
Block a user