Create slightly simplified `RawMetricsWindow` with equivalent simplified loading function
This commit is contained in:
parent
f83799de00
commit
c69b4c9332
|
@ -65,6 +65,14 @@ pub struct MetricsWindow {
|
|||
pub gauges: HashMap<MetricId, ScalarWindow>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RawMetricsWindow {
|
||||
pub histograms: HashMap<MetricId, HistogramWindow>,
|
||||
pub scalars: HashMap<MetricId, ScalarWindow>,
|
||||
pub time_range: RangeInclusive<f64>,
|
||||
pub wanted_time_points: u32,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Default)]
|
||||
pub struct HeatmapRect {
|
||||
pub min_ts: UnixTimestampMilliseconds,
|
||||
|
@ -611,6 +619,148 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
|
|||
Ok(metrics_window)
|
||||
}
|
||||
|
||||
fn load_raw_window_of_data(
|
||||
&mut self,
|
||||
start: UnixTimestampMilliseconds,
|
||||
end: UnixTimestampMilliseconds,
|
||||
time_points: u32,
|
||||
) -> anyhow::Result<RawMetricsWindow> {
|
||||
let mut metrics_window = RawMetricsWindow {
|
||||
time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001),
|
||||
wanted_time_points: time_points,
|
||||
histograms: Default::default(),
|
||||
scalars: Default::default(),
|
||||
};
|
||||
let mut histograms: HashMap<MetricId, UnsummarisedHistogramWindow> = Default::default();
|
||||
|
||||
for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() {
|
||||
for (_, metric_id) in metric_labels_to_ids.iter() {
|
||||
match self.metric_descriptors[metric_id].kind {
|
||||
MetricKind::Histogram => {
|
||||
histograms.insert(*metric_id, Default::default());
|
||||
}
|
||||
MetricKind::Gauge | MetricKind::Counter => {
|
||||
metrics_window.scalars.insert(
|
||||
*metric_id,
|
||||
ScalarWindow {
|
||||
points: vec![],
|
||||
y_axis: 0.0..=0.0,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let checkpoint = self.find_checkpoint_just_before(start)?.clone();
|
||||
|
||||
let mut frame_state = checkpoint.state;
|
||||
let next_token = checkpoint.seek;
|
||||
|
||||
let mut in_window = false;
|
||||
|
||||
fn write_to_window(
|
||||
metric_window: &mut RawMetricsWindow,
|
||||
histograms: &mut HashMap<MetricId, UnsummarisedHistogramWindow>,
|
||||
current_state: &MetricsState,
|
||||
frame: Option<(UnixTimestampMilliseconds, &Frame)>,
|
||||
) {
|
||||
if let Some((frame_start, frame)) = frame {
|
||||
for (metric_id, histogram) in frame.histograms.iter() {
|
||||
histograms.get_mut(metric_id).unwrap().map.push((
|
||||
frame_start,
|
||||
frame.end_time,
|
||||
histogram.underlying.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
for (metric_id, count) in current_state.counters.iter() {
|
||||
metric_window
|
||||
.scalars
|
||||
.get_mut(metric_id)
|
||||
.unwrap()
|
||||
.points
|
||||
.push((current_state.at, *count as f64));
|
||||
}
|
||||
for (metric_id, value) in current_state.gauges.iter() {
|
||||
metric_window
|
||||
.scalars
|
||||
.get_mut(metric_id)
|
||||
.unwrap()
|
||||
.points
|
||||
.push((current_state.at, *value));
|
||||
}
|
||||
}
|
||||
|
||||
// need to try and preserve extra points just before the window and just after, for
|
||||
// continuity ...
|
||||
|
||||
let mut num_points = 0;
|
||||
self.reader.seek(next_token)?;
|
||||
loop {
|
||||
if let Some((start_ts, frame)) = self.reader.read_frame()? {
|
||||
Self::integrate_state(&mut frame_state, &frame);
|
||||
|
||||
if !in_window {
|
||||
if frame_state.at >= start {
|
||||
in_window = true;
|
||||
} else {
|
||||
// Clear the windows because we still haven't reached the start and only
|
||||
// want one sample before the start.
|
||||
num_points = 0;
|
||||
for (_, window) in histograms.iter_mut() {
|
||||
window.map.clear();
|
||||
}
|
||||
for (_, window) in metrics_window.scalars.iter_mut() {
|
||||
window.points.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
num_points += 1;
|
||||
debug!("Writing frame to window with start ts {:?}", start_ts);
|
||||
write_to_window(
|
||||
&mut metrics_window,
|
||||
&mut histograms,
|
||||
&frame_state,
|
||||
Some((start_ts, &frame)),
|
||||
);
|
||||
|
||||
if frame_state.at >= end {
|
||||
// We've gone past the end. Stop here.
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while num_points >= time_points * 2 {
|
||||
// Keep halving the points until we have less than double the number of requested points.
|
||||
|
||||
for (_, scalar) in metrics_window.scalars.iter_mut() {
|
||||
Self::bicimate(&mut scalar.points);
|
||||
}
|
||||
for (_, hists) in histograms.iter_mut() {
|
||||
Self::bicimate_histograms(&mut hists.map);
|
||||
}
|
||||
|
||||
num_points = (num_points - 1) / 2 + 1;
|
||||
}
|
||||
|
||||
for (metric_id, unsummarised_histogram) in histograms.into_iter() {
|
||||
metrics_window
|
||||
.histograms
|
||||
.insert(metric_id, unsummarised_histogram.summarise());
|
||||
}
|
||||
|
||||
for (_metric_id, scalar) in metrics_window.scalars.iter_mut() {
|
||||
scalar.summarise_in_place();
|
||||
}
|
||||
|
||||
Ok(metrics_window)
|
||||
}
|
||||
|
||||
fn run(&mut self) -> anyhow::Result<()> {
|
||||
info!("Starting manager");
|
||||
self.initial_scan()?;
|
||||
|
|
Loading…
Reference in New Issue