From f8ec5f22b5bf54482d7118c0c49249689b41be3d Mon Sep 17 00:00:00 2001 From: Olivier Date: Sat, 8 Jan 2022 15:19:08 +0000 Subject: [PATCH] Extract background loader to separate file --- bare-metrics-gui/src/background_loader.rs | 667 ++++++++++++++++++++++ bare-metrics-gui/src/graph.rs | 651 +-------------------- bare-metrics-gui/src/main.rs | 4 +- 3 files changed, 671 insertions(+), 651 deletions(-) create mode 100644 bare-metrics-gui/src/background_loader.rs diff --git a/bare-metrics-gui/src/background_loader.rs b/bare-metrics-gui/src/background_loader.rs new file mode 100644 index 0000000..3f2b144 --- /dev/null +++ b/bare-metrics-gui/src/background_loader.rs @@ -0,0 +1,667 @@ +use crate::config::MetricTransform; +use anyhow::{anyhow, bail}; +use bare_metrics_core::structures::{ + Frame, MetricDescriptor, MetricId, MetricKind, UnixTimestampMilliseconds, +}; +use bare_metrics_reader::{MetricsLogReader, SeekToken}; +use eframe::egui::{ + Color32, Frame as EguiFrame, PointerButton, Pos2, Rect, Sense, Stroke, TextStyle, Ui, Vec2, +}; +use hdrhistogram::Histogram; +use log::{debug, error, info}; +use std::collections::{BTreeMap, HashMap}; +use std::io::{Read, Seek}; +use std::ops::{DerefMut, Mul, RangeInclusive}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, Mutex, RwLock}; + +/// Make a checkpoint every 10 minutes? +/// This should probably be tunable since it will likely vary per-source ... +pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000; + +pub type StateCallbackHook = Arc>>>; + +#[derive(Clone, Debug)] +pub struct MetricsWindow { + pub time_range: RangeInclusive, + pub wanted_time_points: u32, + pub metrics: MetricDescriptorTable, + pub metric_descriptors: HashMap, + pub histograms: HashMap, + pub counters: HashMap, + pub gauges: HashMap, +} + +#[derive(Copy, Clone, Debug, Default)] +pub struct HeatmapRect { + pub min_ts: UnixTimestampMilliseconds, + pub max_ts: UnixTimestampMilliseconds, + pub min_val: f64, + pub max_val: f64, + pub density: f64, +} + +#[derive(Clone, Debug)] +pub struct HistogramWindow { + pub heatmap: Vec, + pub min_density: f64, + pub max_density: f64, + pub y_axis: RangeInclusive, +} + +#[derive(Clone, Debug, Default)] +pub struct UnsummarisedHistogramWindow { + pub map: Vec<( + UnixTimestampMilliseconds, + UnixTimestampMilliseconds, + Histogram, + )>, +} + +impl UnsummarisedHistogramWindow { + pub fn summarise(&self) -> HistogramWindow { + let mut heatmap = Vec::new(); + + let mut min_density = f64::INFINITY; + let mut max_density = f64::NEG_INFINITY; + let mut min_val = f64::INFINITY; + let mut max_val = f64::NEG_INFINITY; + + for (xa, xb, hist) in self.map.iter() { + let min_this_slice = hist.min(); + min_val = min_val.min(min_this_slice as f64); + max_val = max_val.max(hist.max() as f64); + + let mut prev = min_this_slice; + for q in (1..=10).map(|i| i as f64 * 0.1) { + // TODO improve this. It's going to be a bit off because of the way that + // numbers can be 'equivalent' etc ... + let value_at_q = hist.value_at_quantile(q); + let count_between = hist.count_between(prev, value_at_q); + let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64; + let density = count_between as f64 / area; + + min_density = min_density.min(density); + max_density = max_density.max(density); + + //eprintln!("den {}", density); + // TODO need to autoscale based on density... + + heatmap.push(HeatmapRect { + min_ts: *xa, + max_ts: *xb, + min_val: prev as f64, + max_val: value_at_q as f64, + density, + }); + prev = value_at_q; + } + + //debug!("hist min {:?} max {:?}", hist.min(), hist.max()); + } + + let y_axis = min_val..=max_val; + debug!("yaxis {:?}", y_axis); + HistogramWindow { + heatmap, + min_density, + max_density, + y_axis, + } + } +} + +#[derive(Clone, Debug)] +pub struct ScalarWindow { + pub points: Vec<(UnixTimestampMilliseconds, f64)>, + pub y_axis: RangeInclusive, +} + +impl ScalarWindow { + pub fn summarise_in_place(&mut self) { + let mut min_point = f64::INFINITY; + let mut max_point = f64::NEG_INFINITY; + + for (_, point) in self.points.iter() { + min_point = min_point.min(*point); + max_point = max_point.max(*point); + } + + self.y_axis = min_point..=max_point; + } +} + +pub struct MetricsLogReadingRequester { + pub shared: Arc, + pub tx: Sender, +} + +pub struct MetricsLogReadingShared { + /// The current window of data. + pub current_window: RwLock, + /// True if a new window of data is being loaded. + pub loading_new_window: AtomicBool, +} + +pub enum MetricsLogReaderMessage { + LoadNewWindow { + /// Callback that gets called when there's a state change. + /// The bool represents whether the loading is finished or not. + on_state_change: Box () + 'static + Send>, + + new_time_range: RangeInclusive, + new_wanted_time_points: u32, + }, +} + +impl MetricsLogReadingRequester { + pub fn new_manager( + reader: MetricsLogReader, + on_initial_scan_done: StateCallbackHook, + ) -> MetricsLogReadingRequester { + let shared = Arc::new(MetricsLogReadingShared { + current_window: RwLock::new(MetricsWindow { + time_range: 0.0..=0.0, + wanted_time_points: 0, + metrics: Default::default(), + metric_descriptors: Default::default(), + histograms: Default::default(), + counters: Default::default(), + gauges: Default::default(), + }), + loading_new_window: AtomicBool::new(false), + }); + let manager_shared_ref = shared.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + let requester = MetricsLogReadingRequester { shared, tx }; + + std::thread::Builder::new() + .name("metricslogreader".to_string()) + .spawn(move || { + if let Err(err) = MetricsLogReaderManager::new_and_run( + manager_shared_ref, + reader, + rx, + on_initial_scan_done, + ) { + error!("Error in background log reader: {:?}", err); + } + }) + .unwrap(); + + requester + } +} + +/// We don't track histograms because they don't 'accumulate'; the histograms are emitted every +/// frame when there are any samples. +#[derive(Clone, Debug)] +pub struct MetricsState { + at: UnixTimestampMilliseconds, + gauges: HashMap, + counters: HashMap, +} + +#[derive(Clone, Debug)] +pub struct Checkpoint { + /// State AT the checkpoint + pub state: MetricsState, + /// Seek token for the frame which ends at the checkpoint. + pub seek: SeekToken, +} + +pub struct MetricsLogReaderManager { + shared_ref: Arc, + reader: MetricsLogReader, + rx: Receiver, + metric_descriptors: HashMap, + metric_descriptors_dirty: bool, + metric_descriptor_table: MetricDescriptorTable, + checkpoints: Vec, +} + +/// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent +/// between re-runs and makes it easier to confidently order the UI and assign colours etc... +pub type MetricDescriptorTable = BTreeMap, MetricId>>; + +impl MetricsLogReaderManager { + fn integrate_state(current_state: &mut MetricsState, incoming: &Frame) { + current_state.at = incoming.end_time; + + // Integrate the state... + // We have no need for histogram state because they emit every time there are any + // samples (not just on changes). + for (metric_id, count) in incoming.counter_updates.iter() { + current_state.counters.insert(*metric_id, count.0); + } + for (metric_id, value) in incoming.gauge_updates.iter() { + current_state.gauges.insert(*metric_id, *value); + } + } + + fn initial_scan(&mut self) -> anyhow::Result<()> { + let mut state = MetricsState { + at: UnixTimestampMilliseconds(0), + gauges: HashMap::with_capacity(0), + counters: HashMap::with_capacity(0), + }; + let mut next_checkpoint_at = 0u64; + + while let Some((old_seek_token, old_end_ts, frame)) = self.reader.read_frame_rewindable()? { + if old_end_ts.0 > next_checkpoint_at { + // Make a checkpoint here! + self.checkpoints.push(Checkpoint { + state: state.clone(), + seek: old_seek_token, + }); + next_checkpoint_at = old_end_ts.0 + CHECKPOINT_EVERY_MILLISECONDS; + } + + debug!("FRAME {:?}\n{:#?}", old_end_ts, frame); + + Self::integrate_state(&mut state, &frame); + + // Also keep track of new metrics + for (new_metric, metric_descriptor) in frame.new_metrics { + self.metric_descriptors + .insert(new_metric, metric_descriptor); + self.metric_descriptors_dirty = true; + } + } + + debug!("Initial scan complete."); + + Ok(()) + } + + fn update_metric_descriptors(&mut self) { + if !self.metric_descriptors_dirty { + return; + } + self.metric_descriptors_dirty = false; + + let mut metric_names_to_labels_to_ids: MetricDescriptorTable = BTreeMap::new(); + for (metric_id, metric_descriptor) in self.metric_descriptors.iter() { + let labels_to_ids = metric_names_to_labels_to_ids + .entry(metric_descriptor.name.to_string()) + .or_insert_with(BTreeMap::new); + labels_to_ids.insert(metric_descriptor.labels.clone(), *metric_id); + } + + self.metric_descriptor_table = metric_names_to_labels_to_ids; + } + + /// Returns the first checkpoint that precedes or contains the wanted instant. + fn find_checkpoint_just_before( + &self, + instant: UnixTimestampMilliseconds, + ) -> anyhow::Result<&Checkpoint> { + let idx = match self + .checkpoints + .binary_search_by(|candidate| candidate.state.at.cmp(&instant)) + { + Ok(idx) => idx, + Err(idx) => { + if idx == 0 { + // This probably means that we haven't created a checkpoint at time zero. + // We should make sure to always do that ... + bail!( + "The checkpoint for entry (before {:?}) would be at idx -1.", + instant + ); + } else { + idx - 1 + } + } + }; + + if let Some(checkpoint) = self.checkpoints.get(idx) { + debug!( + "First checkpoint just before {:?} is idx{} at {:?}until{:?}", + instant, idx, checkpoint.seek.start_ts, checkpoint.state.at + ); + Ok(checkpoint) + } else { + bail!("No checkpoints..."); + } + } + + /// Returns the seek token for the first frame whose time interval includes the time point specified. + fn lookup_seek_token_for_frame_containing_time( + &mut self, + just_before_or_at: UnixTimestampMilliseconds, + ) -> anyhow::Result { + let checkpoint_just_before = self.find_checkpoint_just_before(just_before_or_at)?.clone(); + let return_to = self.reader.seek(checkpoint_just_before.seek)?; + + let mut result = checkpoint_just_before.seek; + let mut previous = checkpoint_just_before.seek; + + loop { + if let Some((seek, start_ts, frame)) = self.reader.read_frame_rewindable()? { + previous = seek; + if frame.end_time > just_before_or_at { + if start_ts <= just_before_or_at { + // This frame contains the wanted time point. + // (If we don't hit this case, then we ran past it.) + result = seek; + } + break; + } + } else { + // Ran out of frames! We use the last one we saw. + result = previous; + break; + } + } + + self.reader.seek(return_to)?; + Ok(result) + } + + fn bicimate_histograms( + vec: &mut Vec<( + UnixTimestampMilliseconds, + UnixTimestampMilliseconds, + Histogram, + )>, + ) { + let mut iter = vec.chunks_exact_mut(2); + for chunks in &mut iter { + let (ch0, ch1) = chunks.split_at_mut(1); + let (_start0, end0, hist0) = &mut ch0[0]; + let (_start1, end1, hist1) = &mut ch1[0]; + + // Make chunk 0 encompass the whole thing. + *end0 = *end1; + // Add the histograms + *hist0 += &*hist1; + } + + // Bicimate to remove the unwanted parts. + Self::bicimate(vec); + } + + fn bicimate(vec: &mut Vec) { + // we always keep the last point! + let last = vec.len() - 1; + let mut the_vec = Vec::with_capacity(0); + std::mem::swap(&mut the_vec, vec); + *vec = the_vec + .into_iter() + .enumerate() + .filter(|(idx, _)| idx % 2 == 0 || *idx == last) + .map(|(_, v)| v) + .collect::>(); + } + + fn load_window_of_data( + &mut self, + start: UnixTimestampMilliseconds, + end: UnixTimestampMilliseconds, + time_points: u32, + ) -> anyhow::Result { + let mut metrics_window = MetricsWindow { + time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001), + wanted_time_points: time_points, + metrics: self.metric_descriptor_table.clone(), + metric_descriptors: self.metric_descriptors.clone(), + histograms: Default::default(), + counters: Default::default(), + gauges: Default::default(), + }; + let mut histograms: HashMap = Default::default(); + + for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() { + for (_, metric_id) in metric_labels_to_ids.iter() { + match metrics_window.metric_descriptors[metric_id].kind { + MetricKind::Histogram => { + histograms.insert(*metric_id, Default::default()); + } + MetricKind::Gauge => { + metrics_window.gauges.insert( + *metric_id, + ScalarWindow { + points: vec![], + y_axis: 0.0..=0.0, + }, + ); + } + MetricKind::Counter => { + metrics_window.counters.insert( + *metric_id, + ScalarWindow { + points: vec![], + y_axis: 0.0..=0.0, + }, + ); + } + } + } + } + + let checkpoint = self.find_checkpoint_just_before(start)?.clone(); + + let mut frame_state = checkpoint.state; + let next_token = checkpoint.seek; + + let mut in_window = false; + + fn write_to_window( + metric_window: &mut MetricsWindow, + histograms: &mut HashMap, + current_state: &MetricsState, + frame: Option<(UnixTimestampMilliseconds, &Frame)>, + ) { + if let Some((frame_start, frame)) = frame { + for (metric_id, histogram) in frame.histograms.iter() { + histograms.get_mut(metric_id).unwrap().map.push(( + frame_start, + frame.end_time, + histogram.underlying.clone(), + )); + } + } + for (metric_id, count) in current_state.counters.iter() { + metric_window + .counters + .get_mut(metric_id) + .unwrap() + .points + .push((current_state.at, *count as f64)); + } + for (metric_id, value) in current_state.gauges.iter() { + metric_window + .gauges + .get_mut(metric_id) + .unwrap() + .points + .push((current_state.at, *value)); + } + } + + // need to try and preserve extra points just before the window and just after, for + // continuity ... + + let mut num_points = 0; + self.reader.seek(next_token)?; + loop { + if let Some((start_ts, frame)) = self.reader.read_frame()? { + Self::integrate_state(&mut frame_state, &frame); + + if !in_window { + if frame_state.at >= start { + in_window = true; + } else { + // Clear the windows because we still haven't reached the start and only + // want one sample before the start. + num_points = 0; + for (_, window) in histograms.iter_mut() { + window.map.clear(); + } + for (_, window) in metrics_window.counters.iter_mut() { + window.points.clear(); + } + for (_, window) in metrics_window.gauges.iter_mut() { + window.points.clear(); + } + } + } + + num_points += 1; + debug!("Writing frame to window with start ts {:?}", start_ts); + write_to_window( + &mut metrics_window, + &mut histograms, + &frame_state, + Some((start_ts, &frame)), + ); + + if frame_state.at >= end { + // We've gone past the end. Stop here. + break; + } + } else { + break; + } + } + + while num_points >= time_points * 2 { + // Keep halving the points until we have less than double the number of requested points. + + for (_, counter) in metrics_window.counters.iter_mut() { + Self::bicimate(&mut counter.points); + } + for (_, gauge) in metrics_window.gauges.iter_mut() { + Self::bicimate(&mut gauge.points); + } + for (_, hists) in histograms.iter_mut() { + Self::bicimate_histograms(&mut hists.map); + } + + num_points = (num_points - 1) / 2 + 1; + } + + for (metric_id, unsummarised_histogram) in histograms.into_iter() { + metrics_window + .histograms + .insert(metric_id, unsummarised_histogram.summarise()); + } + + for (metric_id, gauge) in metrics_window.gauges.iter_mut() { + gauge.summarise_in_place(); + } + for (metric_id, counter) in metrics_window.counters.iter_mut() { + counter.summarise_in_place(); + } + + Ok(metrics_window) + } + + fn run(&mut self, on_initial_scan_done: StateCallbackHook) -> anyhow::Result<()> { + info!("Starting manager"); + self.initial_scan()?; + self.update_metric_descriptors(); + info!("Initial scan done."); + let mut the_func = None; + let mut callback_guard = on_initial_scan_done + .lock() + .map_err(|_| anyhow!("Can't lock"))?; + std::mem::swap(&mut the_func, callback_guard.deref_mut()); + if let Some(on_initial_scan_done) = the_func { + on_initial_scan_done(); + } + + while let Ok(msg) = self.rx.recv() { + match msg { + MetricsLogReaderMessage::LoadNewWindow { + on_state_change, + new_time_range, + new_wanted_time_points, + } => { + debug!( + "Requested new window; time range {:?}, № points {}", + new_time_range, new_wanted_time_points + ); + self.shared_ref + .loading_new_window + .store(true, Ordering::SeqCst); + on_state_change(false); + + let start = if *new_time_range.start() == f64::NEG_INFINITY { + // autoscale by getting the earliest time point of the metrics log + let new_start = self.reader.header.start_time.0; + debug!("Autoscaling start to: {:?}", new_start); + new_start + } else { + (*new_time_range.start() * 1000.0) as u64 + }; + let end = if *new_time_range.end() == f64::INFINITY { + // get the first result before the end of time... + // or in other words, the last result. + let seek_token = self.lookup_seek_token_for_frame_containing_time( + UnixTimestampMilliseconds(u64::MAX), + )?; + + self.reader.seek(seek_token)?; + let (start_ts, frame) = self + .reader + .read_frame()? + .ok_or_else(|| anyhow!("Sought frame should exist"))?; + + // if let Some(frame) = frame { + // frame.end_time.0 + // } else { + // // well, we have no choice. Just use the current time. + // SystemTime::now() + // .duration_since(std::time::UNIX_EPOCH) + // .unwrap() + // .as_millis() as u64 + // } + let new_end = frame.end_time.0; + debug!("Autoscaled end to: {:?}", new_end); + new_end + } else { + (*new_time_range.end() * 1000.0) as u64 + }; + + let metric_window = self.load_window_of_data( + UnixTimestampMilliseconds(start), + UnixTimestampMilliseconds(end), + new_wanted_time_points, + )?; + + debug!("METRIC WINDOW {:#?}", metric_window); + + *(self.shared_ref.current_window.write().unwrap()) = metric_window; + + debug!("Finished loading window."); + + on_state_change(true); + } + } + } + + Ok(()) + } + + fn new_and_run( + shared_ref: Arc, + reader: MetricsLogReader, + rx: Receiver, + on_initial_scan_done: StateCallbackHook, + ) -> anyhow::Result<()> { + let mut manager = MetricsLogReaderManager { + shared_ref, + reader, + rx, + metric_descriptors: Default::default(), + metric_descriptors_dirty: false, + metric_descriptor_table: Default::default(), + checkpoints: Default::default(), + }; + + manager.run(on_initial_scan_done) + } +} diff --git a/bare-metrics-gui/src/graph.rs b/bare-metrics-gui/src/graph.rs index ac84894..fb346de 100644 --- a/bare-metrics-gui/src/graph.rs +++ b/bare-metrics-gui/src/graph.rs @@ -1,3 +1,4 @@ +use crate::background_loader::MetricsLogReadingRequester; use crate::config::MetricTransform; use anyhow::{anyhow, bail}; use bare_metrics_core::structures::{ @@ -16,656 +17,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; -/// Make a checkpoint every 10 minutes? -/// This should probably be tunable since it will likely vary per-source ... -pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000; - -pub type StateCallbackHook = Arc>>>; - -#[derive(Clone, Debug)] -pub struct MetricsWindow { - pub time_range: RangeInclusive, - pub wanted_time_points: u32, - pub metrics: MetricDescriptorTable, - pub metric_descriptors: HashMap, - pub histograms: HashMap, - pub counters: HashMap, - pub gauges: HashMap, -} - -#[derive(Copy, Clone, Debug, Default)] -pub struct HeatmapRect { - min_ts: UnixTimestampMilliseconds, - max_ts: UnixTimestampMilliseconds, - min_val: f64, - max_val: f64, - density: f64, -} - -#[derive(Clone, Debug)] -pub struct HistogramWindow { - pub heatmap: Vec, - pub min_density: f64, - pub max_density: f64, - pub y_axis: RangeInclusive, -} - -#[derive(Clone, Debug, Default)] -pub struct UnsummarisedHistogramWindow { - pub map: Vec<( - UnixTimestampMilliseconds, - UnixTimestampMilliseconds, - Histogram, - )>, -} - -impl UnsummarisedHistogramWindow { - pub fn summarise(&self) -> HistogramWindow { - let mut heatmap = Vec::new(); - - let mut min_density = f64::INFINITY; - let mut max_density = f64::NEG_INFINITY; - let mut min_val = f64::INFINITY; - let mut max_val = f64::NEG_INFINITY; - - for (xa, xb, hist) in self.map.iter() { - let min_this_slice = hist.min(); - min_val = min_val.min(min_this_slice as f64); - max_val = max_val.max(hist.max() as f64); - - let mut prev = min_this_slice; - for q in (1..=10).map(|i| i as f64 * 0.1) { - // TODO improve this. It's going to be a bit off because of the way that - // numbers can be 'equivalent' etc ... - let value_at_q = hist.value_at_quantile(q); - let count_between = hist.count_between(prev, value_at_q); - let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64; - let density = count_between as f64 / area; - - min_density = min_density.min(density); - max_density = max_density.max(density); - - //eprintln!("den {}", density); - // TODO need to autoscale based on density... - - heatmap.push(HeatmapRect { - min_ts: *xa, - max_ts: *xb, - min_val: prev as f64, - max_val: value_at_q as f64, - density, - }); - prev = value_at_q; - } - - //debug!("hist min {:?} max {:?}", hist.min(), hist.max()); - } - - let y_axis = min_val..=max_val; - debug!("yaxis {:?}", y_axis); - HistogramWindow { - heatmap, - min_density, - max_density, - y_axis, - } - } -} - -#[derive(Clone, Debug)] -pub struct ScalarWindow { - pub points: Vec<(UnixTimestampMilliseconds, f64)>, - pub y_axis: RangeInclusive, -} - -impl ScalarWindow { - pub fn summarise_in_place(&mut self) { - let mut min_point = f64::INFINITY; - let mut max_point = f64::NEG_INFINITY; - - for (_, point) in self.points.iter() { - min_point = min_point.min(*point); - max_point = max_point.max(*point); - } - - self.y_axis = min_point..=max_point; - } -} - -pub struct MetricsLogReadingRequester { - pub shared: Arc, - pub tx: Sender, -} - -pub struct MetricsLogReadingShared { - /// The current window of data. - pub current_window: RwLock, - /// True if a new window of data is being loaded. - pub loading_new_window: AtomicBool, -} - -pub enum MetricsLogReaderMessage { - LoadNewWindow { - /// Callback that gets called when there's a state change. - /// The bool represents whether the loading is finished or not. - on_state_change: Box () + 'static + Send>, - - new_time_range: RangeInclusive, - new_wanted_time_points: u32, - }, -} - -impl MetricsLogReadingRequester { - pub fn new_manager( - reader: MetricsLogReader, - on_initial_scan_done: StateCallbackHook, - ) -> MetricsLogReadingRequester { - let shared = Arc::new(MetricsLogReadingShared { - current_window: RwLock::new(MetricsWindow { - time_range: 0.0..=0.0, - wanted_time_points: 0, - metrics: Default::default(), - metric_descriptors: Default::default(), - histograms: Default::default(), - counters: Default::default(), - gauges: Default::default(), - }), - loading_new_window: AtomicBool::new(false), - }); - let manager_shared_ref = shared.clone(); - let (tx, rx) = std::sync::mpsc::channel(); - let requester = MetricsLogReadingRequester { shared, tx }; - - std::thread::Builder::new() - .name("metricslogreader".to_string()) - .spawn(move || { - if let Err(err) = MetricsLogReaderManager::new_and_run( - manager_shared_ref, - reader, - rx, - on_initial_scan_done, - ) { - error!("Error in background log reader: {:?}", err); - } - }) - .unwrap(); - - requester - } -} - -/// We don't track histograms because they don't 'accumulate'; the histograms are emitted every -/// frame when there are any samples. -#[derive(Clone, Debug)] -pub struct MetricsState { - at: UnixTimestampMilliseconds, - gauges: HashMap, - counters: HashMap, -} - -#[derive(Clone, Debug)] -pub struct Checkpoint { - /// State AT the checkpoint - pub state: MetricsState, - /// Seek token for the frame which ends at the checkpoint. - pub seek: SeekToken, -} - -pub struct MetricsLogReaderManager { - shared_ref: Arc, - reader: MetricsLogReader, - rx: Receiver, - metric_descriptors: HashMap, - metric_descriptors_dirty: bool, - metric_descriptor_table: MetricDescriptorTable, - checkpoints: Vec, -} - -/// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent -/// between re-runs and makes it easier to confidently order the UI and assign colours etc... -pub type MetricDescriptorTable = BTreeMap, MetricId>>; - -impl MetricsLogReaderManager { - fn integrate_state(current_state: &mut MetricsState, incoming: &Frame) { - current_state.at = incoming.end_time; - - // Integrate the state... - // We have no need for histogram state because they emit every time there are any - // samples (not just on changes). - for (metric_id, count) in incoming.counter_updates.iter() { - current_state.counters.insert(*metric_id, count.0); - } - for (metric_id, value) in incoming.gauge_updates.iter() { - current_state.gauges.insert(*metric_id, *value); - } - } - - fn initial_scan(&mut self) -> anyhow::Result<()> { - let mut state = MetricsState { - at: UnixTimestampMilliseconds(0), - gauges: HashMap::with_capacity(0), - counters: HashMap::with_capacity(0), - }; - let mut next_checkpoint_at = 0u64; - - while let Some((old_seek_token, old_end_ts, frame)) = self.reader.read_frame_rewindable()? { - if old_end_ts.0 > next_checkpoint_at { - // Make a checkpoint here! - self.checkpoints.push(Checkpoint { - state: state.clone(), - seek: old_seek_token, - }); - next_checkpoint_at = old_end_ts.0 + CHECKPOINT_EVERY_MILLISECONDS; - } - - debug!("FRAME {:?}\n{:#?}", old_end_ts, frame); - - Self::integrate_state(&mut state, &frame); - - // Also keep track of new metrics - for (new_metric, metric_descriptor) in frame.new_metrics { - self.metric_descriptors - .insert(new_metric, metric_descriptor); - self.metric_descriptors_dirty = true; - } - } - - debug!("Initial scan complete."); - - Ok(()) - } - - fn update_metric_descriptors(&mut self) { - if !self.metric_descriptors_dirty { - return; - } - self.metric_descriptors_dirty = false; - - let mut metric_names_to_labels_to_ids: MetricDescriptorTable = BTreeMap::new(); - for (metric_id, metric_descriptor) in self.metric_descriptors.iter() { - let labels_to_ids = metric_names_to_labels_to_ids - .entry(metric_descriptor.name.to_string()) - .or_insert_with(BTreeMap::new); - labels_to_ids.insert(metric_descriptor.labels.clone(), *metric_id); - } - - self.metric_descriptor_table = metric_names_to_labels_to_ids; - } - - /// Returns the first checkpoint that precedes or contains the wanted instant. - fn find_checkpoint_just_before( - &self, - instant: UnixTimestampMilliseconds, - ) -> anyhow::Result<&Checkpoint> { - let idx = match self - .checkpoints - .binary_search_by(|candidate| candidate.state.at.cmp(&instant)) - { - Ok(idx) => idx, - Err(idx) => { - if idx == 0 { - // This probably means that we haven't created a checkpoint at time zero. - // We should make sure to always do that ... - bail!( - "The checkpoint for entry (before {:?}) would be at idx -1.", - instant - ); - } else { - idx - 1 - } - } - }; - - if let Some(checkpoint) = self.checkpoints.get(idx) { - debug!( - "First checkpoint just before {:?} is idx{} at {:?}until{:?}", - instant, idx, checkpoint.seek.start_ts, checkpoint.state.at - ); - Ok(checkpoint) - } else { - bail!("No checkpoints..."); - } - } - - /// Returns the seek token for the first frame whose time interval includes the time point specified. - fn lookup_seek_token_for_frame_containing_time( - &mut self, - just_before_or_at: UnixTimestampMilliseconds, - ) -> anyhow::Result { - let checkpoint_just_before = self.find_checkpoint_just_before(just_before_or_at)?.clone(); - let return_to = self.reader.seek(checkpoint_just_before.seek)?; - - let mut result = checkpoint_just_before.seek; - let mut previous = checkpoint_just_before.seek; - - loop { - if let Some((seek, start_ts, frame)) = self.reader.read_frame_rewindable()? { - previous = seek; - if frame.end_time > just_before_or_at { - if start_ts <= just_before_or_at { - // This frame contains the wanted time point. - // (If we don't hit this case, then we ran past it.) - result = seek; - } - break; - } - } else { - // Ran out of frames! We use the last one we saw. - result = previous; - break; - } - } - - self.reader.seek(return_to)?; - Ok(result) - } - - fn bicimate_histograms( - vec: &mut Vec<( - UnixTimestampMilliseconds, - UnixTimestampMilliseconds, - Histogram, - )>, - ) { - let mut iter = vec.chunks_exact_mut(2); - for chunks in &mut iter { - let (ch0, ch1) = chunks.split_at_mut(1); - let (_start0, end0, hist0) = &mut ch0[0]; - let (_start1, end1, hist1) = &mut ch1[0]; - - // Make chunk 0 encompass the whole thing. - *end0 = *end1; - // Add the histograms - *hist0 += &*hist1; - } - - // Bicimate to remove the unwanted parts. - Self::bicimate(vec); - } - - fn bicimate(vec: &mut Vec) { - // we always keep the last point! - let last = vec.len() - 1; - let mut the_vec = Vec::with_capacity(0); - std::mem::swap(&mut the_vec, vec); - *vec = the_vec - .into_iter() - .enumerate() - .filter(|(idx, _)| idx % 2 == 0 || *idx == last) - .map(|(_, v)| v) - .collect::>(); - } - - fn load_window_of_data( - &mut self, - start: UnixTimestampMilliseconds, - end: UnixTimestampMilliseconds, - time_points: u32, - ) -> anyhow::Result { - let mut metrics_window = MetricsWindow { - time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001), - wanted_time_points: time_points, - metrics: self.metric_descriptor_table.clone(), - metric_descriptors: self.metric_descriptors.clone(), - histograms: Default::default(), - counters: Default::default(), - gauges: Default::default(), - }; - let mut histograms: HashMap = Default::default(); - - for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() { - for (_, metric_id) in metric_labels_to_ids.iter() { - match metrics_window.metric_descriptors[metric_id].kind { - MetricKind::Histogram => { - histograms.insert(*metric_id, Default::default()); - } - MetricKind::Gauge => { - metrics_window.gauges.insert( - *metric_id, - ScalarWindow { - points: vec![], - y_axis: 0.0..=0.0, - }, - ); - } - MetricKind::Counter => { - metrics_window.counters.insert( - *metric_id, - ScalarWindow { - points: vec![], - y_axis: 0.0..=0.0, - }, - ); - } - } - } - } - - let checkpoint = self.find_checkpoint_just_before(start)?.clone(); - - let mut frame_state = checkpoint.state; - let next_token = checkpoint.seek; - - let mut in_window = false; - - fn write_to_window( - metric_window: &mut MetricsWindow, - histograms: &mut HashMap, - current_state: &MetricsState, - frame: Option<(UnixTimestampMilliseconds, &Frame)>, - ) { - if let Some((frame_start, frame)) = frame { - for (metric_id, histogram) in frame.histograms.iter() { - histograms.get_mut(metric_id).unwrap().map.push(( - frame_start, - frame.end_time, - histogram.underlying.clone(), - )); - } - } - for (metric_id, count) in current_state.counters.iter() { - metric_window - .counters - .get_mut(metric_id) - .unwrap() - .points - .push((current_state.at, *count as f64)); - } - for (metric_id, value) in current_state.gauges.iter() { - metric_window - .gauges - .get_mut(metric_id) - .unwrap() - .points - .push((current_state.at, *value)); - } - } - - // need to try and preserve extra points just before the window and just after, for - // continuity ... - - let mut num_points = 0; - self.reader.seek(next_token)?; - loop { - if let Some((start_ts, frame)) = self.reader.read_frame()? { - Self::integrate_state(&mut frame_state, &frame); - - if !in_window { - if frame_state.at >= start { - in_window = true; - } else { - // Clear the windows because we still haven't reached the start and only - // want one sample before the start. - num_points = 0; - for (_, window) in histograms.iter_mut() { - window.map.clear(); - } - for (_, window) in metrics_window.counters.iter_mut() { - window.points.clear(); - } - for (_, window) in metrics_window.gauges.iter_mut() { - window.points.clear(); - } - } - } - - num_points += 1; - debug!("Writing frame to window with start ts {:?}", start_ts); - write_to_window( - &mut metrics_window, - &mut histograms, - &frame_state, - Some((start_ts, &frame)), - ); - - if frame_state.at >= end { - // We've gone past the end. Stop here. - break; - } - } else { - break; - } - } - - while num_points >= time_points * 2 { - // Keep halving the points until we have less than double the number of requested points. - - for (_, counter) in metrics_window.counters.iter_mut() { - Self::bicimate(&mut counter.points); - } - for (_, gauge) in metrics_window.gauges.iter_mut() { - Self::bicimate(&mut gauge.points); - } - for (_, hists) in histograms.iter_mut() { - Self::bicimate_histograms(&mut hists.map); - } - - num_points = (num_points - 1) / 2 + 1; - } - - for (metric_id, unsummarised_histogram) in histograms.into_iter() { - metrics_window - .histograms - .insert(metric_id, unsummarised_histogram.summarise()); - } - - for (metric_id, gauge) in metrics_window.gauges.iter_mut() { - gauge.summarise_in_place(); - } - for (metric_id, counter) in metrics_window.counters.iter_mut() { - counter.summarise_in_place(); - } - - Ok(metrics_window) - } - - fn run(&mut self, on_initial_scan_done: StateCallbackHook) -> anyhow::Result<()> { - info!("Starting manager"); - self.initial_scan()?; - self.update_metric_descriptors(); - info!("Initial scan done."); - let mut the_func = None; - let mut callback_guard = on_initial_scan_done - .lock() - .map_err(|_| anyhow!("Can't lock"))?; - std::mem::swap(&mut the_func, callback_guard.deref_mut()); - if let Some(on_initial_scan_done) = the_func { - on_initial_scan_done(); - } - - while let Ok(msg) = self.rx.recv() { - match msg { - MetricsLogReaderMessage::LoadNewWindow { - on_state_change, - new_time_range, - new_wanted_time_points, - } => { - debug!( - "Requested new window; time range {:?}, № points {}", - new_time_range, new_wanted_time_points - ); - self.shared_ref - .loading_new_window - .store(true, Ordering::SeqCst); - on_state_change(false); - - let start = if *new_time_range.start() == f64::NEG_INFINITY { - // autoscale by getting the earliest time point of the metrics log - let new_start = self.reader.header.start_time.0; - debug!("Autoscaling start to: {:?}", new_start); - new_start - } else { - (*new_time_range.start() * 1000.0) as u64 - }; - let end = if *new_time_range.end() == f64::INFINITY { - // get the first result before the end of time... - // or in other words, the last result. - let seek_token = self.lookup_seek_token_for_frame_containing_time( - UnixTimestampMilliseconds(u64::MAX), - )?; - - self.reader.seek(seek_token)?; - let (start_ts, frame) = self - .reader - .read_frame()? - .ok_or_else(|| anyhow!("Sought frame should exist"))?; - - // if let Some(frame) = frame { - // frame.end_time.0 - // } else { - // // well, we have no choice. Just use the current time. - // SystemTime::now() - // .duration_since(std::time::UNIX_EPOCH) - // .unwrap() - // .as_millis() as u64 - // } - let new_end = frame.end_time.0; - debug!("Autoscaled end to: {:?}", new_end); - new_end - } else { - (*new_time_range.end() * 1000.0) as u64 - }; - - let metric_window = self.load_window_of_data( - UnixTimestampMilliseconds(start), - UnixTimestampMilliseconds(end), - new_wanted_time_points, - )?; - - debug!("METRIC WINDOW {:#?}", metric_window); - - *(self.shared_ref.current_window.write().unwrap()) = metric_window; - - debug!("Finished loading window."); - - on_state_change(true); - } - } - } - - Ok(()) - } - - fn new_and_run( - shared_ref: Arc, - reader: MetricsLogReader, - rx: Receiver, - on_initial_scan_done: StateCallbackHook, - ) -> anyhow::Result<()> { - let mut manager = MetricsLogReaderManager { - shared_ref, - reader, - rx, - metric_descriptors: Default::default(), - metric_descriptors_dirty: false, - metric_descriptor_table: Default::default(), - checkpoints: Default::default(), - }; - - manager.run(on_initial_scan_done) - } -} - /// Renderer for a bare metrics graph. /// Should support rendering both line graphs and histograms. /// Input interactions: diff --git a/bare-metrics-gui/src/main.rs b/bare-metrics-gui/src/main.rs index 93da5d0..2247f5b 100644 --- a/bare-metrics-gui/src/main.rs +++ b/bare-metrics-gui/src/main.rs @@ -1,5 +1,6 @@ +use crate::background_loader::{MetricsLogReaderMessage, MetricsLogReadingRequester}; use crate::config::DashboardConfig; -use crate::graph::{Graph, MetricsLogReaderMessage, MetricsLogReadingRequester}; +use crate::graph::Graph; use bare_metrics_core::structures::MetricId; use bare_metrics_reader::MetricsLogReader; use eframe::egui::{CentralPanel, CtxRef}; @@ -11,6 +12,7 @@ use std::fs::File; use std::path::PathBuf; use std::str::FromStr; +pub mod background_loader; pub mod config; pub mod graph;