diff --git a/bare-metrics-core/src/structures.rs b/bare-metrics-core/src/structures.rs index 0ae07a8..933696b 100644 --- a/bare-metrics-core/src/structures.rs +++ b/bare-metrics-core/src/structures.rs @@ -16,6 +16,12 @@ pub fn get_supported_version() -> String { )] pub struct UnixTimestampMilliseconds(pub u64); +impl UnixTimestampMilliseconds { + pub fn as_f64_seconds(self) -> f64 { + self.0 as f64 * 0.001 + } +} + /// Header for a metric log file. #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub struct LogHeader { diff --git a/bare-metrics-gui/src/graph.rs b/bare-metrics-gui/src/graph.rs index 1dbc726..7f7f941 100644 --- a/bare-metrics-gui/src/graph.rs +++ b/bare-metrics-gui/src/graph.rs @@ -1,14 +1,18 @@ -use bare_metrics_core::structures::{Frame, MetricDescriptor, MetricId, UnixTimestampMilliseconds}; +use anyhow::{anyhow, bail}; +use bare_metrics_core::structures::{ + Frame, MetricDescriptor, MetricId, MetricKind, UnixTimestampMilliseconds, +}; use bare_metrics_reader::{MetricsLogReader, SeekToken}; use eframe::egui::{ Color32, Frame as EguiFrame, PointerButton, Pos2, Rect, Sense, Stroke, Ui, Vec2, }; +use hdrhistogram::Histogram; use log::{debug, error, info}; use std::collections::{BTreeMap, HashMap}; use std::io::{Read, Seek}; -use std::ops::RangeInclusive; +use std::ops::{Mul, RangeInclusive}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::Receiver; +use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::time::SystemTime; @@ -18,31 +22,95 @@ pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000; #[derive(Clone, Debug)] pub struct MetricsWindow { - time_range: RangeInclusive, - wanted_time_points: u32, - metrics: MetricDescriptorTable, - histograms: HashMap, - counters: HashMap, - gauges: HashMap, + pub time_range: RangeInclusive, + pub wanted_time_points: u32, + pub metrics: MetricDescriptorTable, + pub metric_descriptors: HashMap, + pub histograms: HashMap, + pub counters: HashMap, + pub gauges: HashMap, } -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug, Default)] +pub struct HeatmapRect { + min_ts: UnixTimestampMilliseconds, + max_ts: UnixTimestampMilliseconds, + min_val: f64, + max_val: f64, + density: f64, +} + +#[derive(Clone, Debug, Default)] pub struct HistogramWindow { - map: Vec<( + pub heatmap: Vec, + pub min_density: f64, + pub max_density: f64, +} + +#[derive(Clone, Debug, Default)] +pub struct UnsummarisedHistogramWindow { + pub map: Vec<( UnixTimestampMilliseconds, UnixTimestampMilliseconds, - HistogramWindow, + Histogram, )>, } -#[derive(Clone, Debug)] +impl UnsummarisedHistogramWindow { + pub fn summarise(&self) -> HistogramWindow { + let mut heatmap = Vec::new(); + + let mut min_density = f64::INFINITY; + let mut max_density = f64::NEG_INFINITY; + + for (xa, xb, hist) in self.map.iter() { + let min_val = hist.min(); + let max_val = hist.max(); + + let mut prev = min_val; + for q in (1..=10).map(|i| i as f64 * 0.1) { + // TODO improve this. It's going to be a bit off because of the way that + // numbers can be 'equivalent' etc ... + let value_at_q = hist.value_at_quantile(q); + let count_between = hist.count_between(prev, value_at_q); + let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64; + let density = count_between as f64 / area; + + min_density = min_density.min(density); + max_density = max_density.max(density); + + //eprintln!("den {}", density); + // TODO need to autoscale based on density... + + heatmap.push(HeatmapRect { + min_ts: *xa, + max_ts: *xb, + min_val: prev as f64, + max_val: value_at_q as f64, + density, + }); + prev = value_at_q; + } + + //debug!("hist min {:?} max {:?}", hist.min(), hist.max()); + } + + HistogramWindow { + heatmap, + min_density, + max_density, + } + } +} + +#[derive(Clone, Debug, Default)] pub struct ScalarWindow { points: Vec<(UnixTimestampMilliseconds, f64)>, } pub struct MetricsLogReadingRequester { - shared: Arc, - // todo sender: + pub shared: Arc, + pub tx: Sender, } pub struct MetricsLogReadingShared { @@ -72,6 +140,7 @@ impl MetricsLogReadingRequester { time_range: 0.0..=0.0, wanted_time_points: 0, metrics: Default::default(), + metric_descriptors: Default::default(), histograms: Default::default(), counters: Default::default(), gauges: Default::default(), @@ -79,9 +148,8 @@ impl MetricsLogReadingRequester { loading_new_window: AtomicBool::new(false), }); let manager_shared_ref = shared.clone(); - let requester = MetricsLogReadingRequester { shared }; - let (tx, rx) = std::sync::mpsc::channel(); + let requester = MetricsLogReadingRequester { shared, tx }; std::thread::Builder::new() .name("metricslogreader".to_string()) @@ -107,6 +175,14 @@ pub struct MetricsState { counters: HashMap, } +#[derive(Clone, Debug)] +pub struct Checkpoint { + /// State AT the checkpoint + pub state: MetricsState, + /// Seek token for the frame which ends at the checkpoint. + pub seek: SeekToken, +} + pub struct MetricsLogReaderManager { shared_ref: Arc, reader: MetricsLogReader, @@ -114,7 +190,7 @@ pub struct MetricsLogReaderManager { metric_descriptors: HashMap, metric_descriptors_dirty: bool, metric_descriptor_table: MetricDescriptorTable, - checkpoints: Vec<(MetricsState, SeekToken)>, + checkpoints: Vec, } /// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent @@ -122,6 +198,20 @@ pub struct MetricsLogReaderManager { pub type MetricDescriptorTable = BTreeMap, MetricId>>; impl MetricsLogReaderManager { + fn integrate_state(current_state: &mut MetricsState, incoming: &Frame) { + current_state.at = incoming.end_time; + + // Integrate the state... + // We have no need for histogram state because they emit every time there are any + // samples (not just on changes). + for (metric_id, count) in incoming.counter_updates.iter() { + current_state.counters.insert(*metric_id, count.0); + } + for (metric_id, value) in incoming.gauge_updates.iter() { + current_state.gauges.insert(*metric_id, *value); + } + } + fn initial_scan(&mut self) -> anyhow::Result<()> { let mut state = MetricsState { at: UnixTimestampMilliseconds(0), @@ -131,22 +221,18 @@ impl MetricsLogReaderManager { let mut next_checkpoint_at = 0u64; while let Some((old_seek_token, old_end_ts, frame)) = self.reader.read_frame_rewindable()? { - state.at = old_end_ts; - if state.at.0 > next_checkpoint_at { + if old_end_ts.0 > next_checkpoint_at { // Make a checkpoint here! - self.checkpoints.push((state.clone(), old_seek_token)); - next_checkpoint_at = state.at.0 + CHECKPOINT_EVERY_MILLISECONDS; + self.checkpoints.push(Checkpoint { + state: state.clone(), + seek: old_seek_token, + }); + next_checkpoint_at = old_end_ts.0 + CHECKPOINT_EVERY_MILLISECONDS; } - // Integrate the state... - // We have no need for histogram state because they emit every time there are any - // samples (not just on changes). - for (metric_id, count) in frame.counter_updates { - state.counters.insert(metric_id, count.0); - } - for (metric_id, value) in frame.gauge_updates { - state.gauges.insert(metric_id, value); - } + debug!("FRAME {:?}\n{:#?}", old_end_ts, frame); + + Self::integrate_state(&mut state, &frame); // Also keep track of new metrics for (new_metric, metric_descriptor) in frame.new_metrics { @@ -156,6 +242,8 @@ impl MetricsLogReaderManager { } } + debug!("Initial scan complete."); + Ok(()) } @@ -176,18 +264,254 @@ impl MetricsLogReaderManager { self.metric_descriptor_table = metric_names_to_labels_to_ids; } + /// Returns the first checkpoint that precedes or contains the wanted instant. fn find_checkpoint_just_before( + &self, + instant: UnixTimestampMilliseconds, + ) -> anyhow::Result<&Checkpoint> { + let idx = match self + .checkpoints + .binary_search_by(|candidate| candidate.state.at.cmp(&instant)) + { + Ok(idx) => idx, + Err(idx) => { + if idx == 0 { + // This probably means that we haven't created a checkpoint at time zero. + // We should make sure to always do that ... + bail!( + "The checkpoint for entry (before {:?}) would be at idx -1.", + instant + ); + } else { + idx - 1 + } + } + }; + + if let Some(checkpoint) = self.checkpoints.get(idx) { + debug!( + "First checkpoint just before {:?} is idx{} at {:?}until{:?}", + instant, idx, checkpoint.seek.start_ts, checkpoint.state.at + ); + Ok(checkpoint) + } else { + bail!("No checkpoints..."); + } + } + + /// Returns the seek token for the first frame whose time interval includes the time point specified. + fn lookup_seek_token_for_frame_containing_time( &mut self, just_before_or_at: UnixTimestampMilliseconds, ) -> anyhow::Result { - todo!() + let checkpoint_just_before = self.find_checkpoint_just_before(just_before_or_at)?.clone(); + let return_to = self.reader.seek(checkpoint_just_before.seek)?; + + let mut result = checkpoint_just_before.seek; + let mut previous = checkpoint_just_before.seek; + + loop { + if let Some((seek, start_ts, frame)) = self.reader.read_frame_rewindable()? { + previous = seek; + if frame.end_time > just_before_or_at { + if start_ts <= just_before_or_at { + // This frame contains the wanted time point. + // (If we don't hit this case, then we ran past it.) + result = seek; + } + break; + } + } else { + // Ran out of frames! We use the last one we saw. + result = previous; + break; + } + } + + self.reader.seek(return_to)?; + Ok(result) } - fn seek_to_first_before( + fn bicimate_histograms( + vec: &mut Vec<( + UnixTimestampMilliseconds, + UnixTimestampMilliseconds, + Histogram, + )>, + ) { + let mut iter = vec.chunks_exact_mut(2); + for chunks in &mut iter { + let (ch0, ch1) = chunks.split_at_mut(1); + let (_start0, end0, hist0) = &mut ch0[0]; + let (_start1, end1, hist1) = &mut ch1[0]; + + // Make chunk 0 encompass the whole thing. + *end0 = *end1; + // Add the histograms + *hist0 += &*hist1; + } + + // Bicimate to remove the unwanted parts. + Self::bicimate(vec); + } + + fn bicimate(vec: &mut Vec) { + // we always keep the last point! + let last = vec.len() - 1; + let mut the_vec = Vec::with_capacity(0); + std::mem::swap(&mut the_vec, vec); + *vec = the_vec + .into_iter() + .enumerate() + .filter(|(idx, _)| idx % 2 == 0 || *idx == last) + .map(|(_, v)| v) + .collect::>(); + } + + fn load_window_of_data( &mut self, - just_before_or_at: UnixTimestampMilliseconds, - ) -> anyhow::Result<(SeekToken, Option)> { - todo!() + start: UnixTimestampMilliseconds, + end: UnixTimestampMilliseconds, + time_points: u32, + ) -> anyhow::Result { + let mut metrics_window = MetricsWindow { + time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001), + wanted_time_points: time_points, + metrics: self.metric_descriptor_table.clone(), + metric_descriptors: self.metric_descriptors.clone(), + histograms: Default::default(), + counters: Default::default(), + gauges: Default::default(), + }; + let mut histograms: HashMap = Default::default(); + + for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() { + for (_, metric_id) in metric_labels_to_ids.iter() { + match metrics_window.metric_descriptors[metric_id].kind { + MetricKind::Histogram => { + histograms.insert(*metric_id, Default::default()); + } + MetricKind::Gauge => { + metrics_window.gauges.insert(*metric_id, Default::default()); + } + MetricKind::Counter => { + metrics_window + .counters + .insert(*metric_id, Default::default()); + } + } + } + } + + let checkpoint = self.find_checkpoint_just_before(start)?.clone(); + + let mut frame_state = checkpoint.state; + let next_token = checkpoint.seek; + + let mut in_window = false; + + fn write_to_window( + metric_window: &mut MetricsWindow, + histograms: &mut HashMap, + current_state: &MetricsState, + frame: Option<(UnixTimestampMilliseconds, &Frame)>, + ) { + if let Some((frame_start, frame)) = frame { + for (metric_id, histogram) in frame.histograms.iter() { + histograms.get_mut(metric_id).unwrap().map.push(( + frame_start, + frame.end_time, + histogram.underlying.clone(), + )); + } + } + for (metric_id, count) in current_state.counters.iter() { + metric_window + .counters + .get_mut(metric_id) + .unwrap() + .points + .push((current_state.at, *count as f64)); + } + for (metric_id, value) in current_state.gauges.iter() { + metric_window + .gauges + .get_mut(metric_id) + .unwrap() + .points + .push((current_state.at, *value)); + } + } + + // need to try and preserve extra points just before the window and just after, for + // continuity ... + + let mut num_points = 0; + self.reader.seek(next_token)?; + loop { + if let Some((start_ts, frame)) = self.reader.read_frame()? { + Self::integrate_state(&mut frame_state, &frame); + + if !in_window { + if frame_state.at >= start { + in_window = true; + } else { + // Clear the windows because we still haven't reached the start and only + // want one sample before the start. + num_points = 0; + for (_, window) in histograms.iter_mut() { + window.map.clear(); + } + for (_, window) in metrics_window.counters.iter_mut() { + window.points.clear(); + } + for (_, window) in metrics_window.gauges.iter_mut() { + window.points.clear(); + } + } + } + + num_points += 1; + debug!("Writing frame to window with start ts {:?}", start_ts); + write_to_window( + &mut metrics_window, + &mut histograms, + &frame_state, + Some((start_ts, &frame)), + ); + + if frame_state.at >= end { + // We've gone past the end. Stop here. + break; + } + } else { + break; + } + } + + while num_points >= time_points * 2 { + // Keep halving the points until we have less than double the number of requested points. + + for (_, counter) in metrics_window.counters.iter_mut() { + Self::bicimate(&mut counter.points); + } + for (_, gauge) in metrics_window.gauges.iter_mut() { + Self::bicimate(&mut gauge.points); + } + for (_, hists) in histograms.iter_mut() { + Self::bicimate_histograms(&mut hists.map); + } + + num_points = (num_points - 1) / 2 + 1; + } + + for (metric_id, unsummarised_histogram) in histograms.into_iter() { + metrics_window + .histograms + .insert(metric_id, unsummarised_histogram.summarise()); + } + + Ok(metrics_window) } fn run(&mut self) -> anyhow::Result<()> { @@ -203,35 +527,65 @@ impl MetricsLogReaderManager { new_time_range, new_wanted_time_points, } => { + debug!( + "Requested new window; time range {:?}, № points {}", + new_time_range, new_wanted_time_points + ); self.shared_ref .loading_new_window .store(true, Ordering::SeqCst); on_state_change(false); - todo!(); let start = if *new_time_range.start() == f64::NEG_INFINITY { // autoscale by getting the earliest time point of the metrics log - self.reader.header.start_time.0 + let new_start = self.reader.header.start_time.0; + debug!("Autoscaling start to: {:?}", new_start); + new_start } else { (*new_time_range.start() * 1000.0) as u64 }; let end = if *new_time_range.end() == f64::INFINITY { // get the first result before the end of time... // or in other words, the last result. - let (_, frame) = - self.seek_to_first_before(UnixTimestampMilliseconds(u64::MAX))?; - if let Some(frame) = frame { - frame.end_time.0 - } else { - // well, we have no choice. Just use the current time. - SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64 - } + let seek_token = self.lookup_seek_token_for_frame_containing_time( + UnixTimestampMilliseconds(u64::MAX), + )?; + + self.reader.seek(seek_token)?; + let (start_ts, frame) = self + .reader + .read_frame()? + .ok_or_else(|| anyhow!("Sought frame should exist"))?; + + // if let Some(frame) = frame { + // frame.end_time.0 + // } else { + // // well, we have no choice. Just use the current time. + // SystemTime::now() + // .duration_since(std::time::UNIX_EPOCH) + // .unwrap() + // .as_millis() as u64 + // } + let new_end = frame.end_time.0; + debug!("Autoscaled end to: {:?}", new_end); + new_end } else { (*new_time_range.end() * 1000.0) as u64 }; + + let metric_window = self.load_window_of_data( + UnixTimestampMilliseconds(start), + UnixTimestampMilliseconds(end), + new_wanted_time_points, + )?; + + debug!("METRIC WINDOW {:#?}", metric_window); + + *(self.shared_ref.current_window.write().unwrap()) = metric_window; + + debug!("Finished loading window."); + + on_state_change(true); } } } @@ -275,8 +629,48 @@ pub struct GraphState { pub x_last_time: UnixTimestampMilliseconds, } +#[derive(Copy, Clone, Debug)] +pub struct GraphTransform { + pub scale_x: f64, + pub scale_y: f64, + pub offs_x: f64, + pub offs_y: f64, +} + +/// Inspired by egui's RectTransform but uses f64 because f32 don't have enough precision +/// to work well with unix timestamps... +impl GraphTransform { + pub fn from_ranges_and_display_rect( + x_range: RangeInclusive, + y_range: RangeInclusive, + rect: Rect, + ) -> Self { + let scale_x = (rect.max.x - rect.min.x) as f64 / (x_range.end() - x_range.start()); + let scale_y = (rect.max.y - rect.min.y) as f64 / (y_range.end() - y_range.start()); + let offs_x = rect.min.x as f64 - scale_x * x_range.start(); + let offs_y = rect.min.y as f64 - scale_y * y_range.start(); + + GraphTransform { + scale_x, + scale_y, + offs_x, + offs_y, + } + } +} + +impl Mul<(f64, f64)> for GraphTransform { + type Output = Pos2; + + fn mul(self, (in_x, in_y): (f64, f64)) -> Self::Output { + let x = self.scale_x * in_x + self.offs_x; + let y = self.scale_y * in_y + self.offs_y; + Pos2::new(x as f32, y as f32) + } +} + impl Graph { - pub fn draw(ui: &mut Ui) { + pub fn draw(ui: &mut Ui, metric_id: MetricId, reader: &MetricsLogReadingRequester) { let context_menu_id = ui.id().with("context menu"); EguiFrame::dark_canvas(ui.style()).show(ui, |ui| { @@ -304,24 +698,111 @@ impl Graph { let display_rect = response.rect; - // Scale the graph to the appropriate axes. - let x_axis = 0.0..=1.0; + let current_window = reader.shared.current_window.read().unwrap(); + + let mut x_axis = *current_window.time_range.start()..=*current_window.time_range.end(); + if x_axis.end() == x_axis.start() { + x_axis = *x_axis.start()..=*x_axis.start() + 1.0; + } + // This range is reversed because screen coordinates go down, but we'd like them to go // up since this is more of a mathematical graph. - let y_axis = 1.0..=0.0; + let y_axis = 300.0..=0.0; + + // let display_transform = + // emath::RectTransform::from_to(Rect::from_x_y_ranges(x_axis, y_axis), display_rect); let display_transform = - emath::RectTransform::from_to(Rect::from_x_y_ranges(x_axis, y_axis), display_rect); + GraphTransform::from_ranges_and_display_rect(x_axis, y_axis, display_rect); let stroke = Stroke::new(2.0, Color32::GREEN); - ui.painter().line_segment( - [ - display_transform * Pos2::new(0.1, 0.3), - display_transform * Pos2::new(0.9, 0.6), - ], - stroke, - ); + let mut last_point = None; + + if let Some(scalar) = current_window + .counters + .get(&metric_id) + .or_else(|| current_window.gauges.get(&metric_id)) + { + for (x, y) in scalar.points.iter() { + let new_point = display_transform * (x.as_f64_seconds(), *y); + ui.painter().circle_filled(new_point, 5.0, Color32::RED); + if let Some(last_point) = last_point { + ui.painter().line_segment([last_point, new_point], stroke); + } + last_point = Some(new_point); + } + } + + if let Some(histogram) = current_window.histograms.get(&metric_id) { + let min_density = histogram.min_density; + let max_density = histogram.max_density; + for heatrect in histogram.heatmap.iter() { + let rect = Rect::from_min_max( + display_transform * (heatrect.min_ts.as_f64_seconds(), heatrect.max_val), + display_transform * (heatrect.max_ts.as_f64_seconds(), heatrect.min_val), + ); + eprintln!("R {:?} D {:?}", rect, display_rect); + + let colour_cold = Color32::LIGHT_YELLOW; + let colour_hot = Color32::RED; + + let lerp = + ((heatrect.density - min_density) / (max_density - min_density)) as f32; + eprintln!( + "Lerp {:?} D {:?} mD {:?} MD {:?}", + lerp, heatrect.density, min_density, max_density + ); + + // TODO refactor this to be a ColorF32 thing or something? + let (cr, cg, cb, ca) = colour_cold.to_tuple(); + let (hr, hg, hb, ha) = colour_hot.to_tuple(); + let mr = ((hr as f32 - cr as f32) * lerp + cr as f32) as u8; + let mg = ((hg as f32 - cg as f32) * lerp + cg as f32) as u8; + let mb = ((hb as f32 - cb as f32) * lerp + cb as f32) as u8; + let ma = ((ha as f32 - ca as f32) * lerp + ca as f32) as u8; + + let lerp_colour = Color32::from_rgba_premultiplied(mr, mg, mb, ma); + + eprintln!("LC {:?}", lerp_colour); + + ui.painter().rect_filled(rect, 5.0, lerp_colour); + } + /* + // for x in hist.iter_all() { + // debug!("H {:?}", x); + // } + + let min_val = hist.min(); + let max_val = hist.max(); + + let mut prev = min_val; + for q in (1..=10).map(|i| i as f64 * 0.1) { + // TODO improve this. It's going to be a bit off because of the way that + // numbers can be 'equivalent' etc ... + let value_at_q = hist.value_at_quantile(q); + let count_between = hist.count_between(prev, value_at_q); + let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64; + let density = count_between as f64 / area; + //eprintln!("den {}", density); + // TODO need to autoscale based on density... + + //debug!("xa{} xb{}", xa.as_f64_seconds(), xb.as_f64_seconds()); + let rect = Rect::from_min_max( + display_transform * (xa.as_f64_seconds(), value_at_q as f64), + display_transform * (xb.as_f64_seconds(), prev as f64) + ); + + ui.painter().rect_filled(rect, 5.0, Color32::DEBUG_COLOR); + } + + //debug!("hist min {:?} max {:?}", hist.min(), hist.max()); + + // let minimum = hist.value_at_quantile(0.0); + // hist.max + } + */ + } }); } } diff --git a/bare-metrics-gui/src/main.rs b/bare-metrics-gui/src/main.rs index f66a892..becb7da 100644 --- a/bare-metrics-gui/src/main.rs +++ b/bare-metrics-gui/src/main.rs @@ -1,17 +1,37 @@ -use crate::graph::Graph; +use crate::graph::{Graph, MetricsLogReaderMessage, MetricsLogReadingRequester}; +use bare_metrics_core::structures::MetricId; +use bare_metrics_reader::MetricsLogReader; use eframe::egui::{CentralPanel, CtxRef}; -use eframe::epi::{App, Frame}; +use eframe::epi::{App, Frame, Storage}; use eframe::NativeOptions; +use env_logger::Env; +use std::fs::File; +use std::path::PathBuf; +use std::str::FromStr; pub mod graph; -pub struct MetricsGui {} +pub struct MetricsGui { + requester: MetricsLogReadingRequester, +} impl App for MetricsGui { + fn setup(&mut self, ctx: &CtxRef, _frame: &mut Frame<'_>, _storage: Option<&dyn Storage>) { + let ctx = ctx.clone(); + self.requester + .tx + .send(MetricsLogReaderMessage::LoadNewWindow { + on_state_change: Box::new(move |_| ctx.request_repaint()), + new_time_range: f64::NEG_INFINITY..=f64::INFINITY, + new_wanted_time_points: 512, + }) + .unwrap(); + } + fn update(&mut self, ctx: &CtxRef, _frame: &mut Frame<'_>) { CentralPanel::default().show(ctx, |ui| { ui.label("Hah"); - Graph::draw(ui); + Graph::draw(ui, MetricId(0), &self.requester); }); } @@ -21,7 +41,15 @@ impl App for MetricsGui { } fn main() { - let app = MetricsGui {}; + env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + + let path = PathBuf::from_str(&std::env::args().skip(1).next().unwrap()).unwrap(); + + let file = File::open(path).unwrap(); + + let requester = MetricsLogReadingRequester::new_manager(MetricsLogReader::new(file).unwrap()); + + let app = MetricsGui { requester }; let native_options = NativeOptions::default(); diff --git a/bare-metrics-reader/src/lib.rs b/bare-metrics-reader/src/lib.rs index 1158024..f0da73e 100644 --- a/bare-metrics-reader/src/lib.rs +++ b/bare-metrics-reader/src/lib.rs @@ -7,7 +7,10 @@ use std::io::{Read, Seek, SeekFrom}; /// Token that is known to be usable for seeking to a frame in a metrics log. /// TODO identify which reader is applicable? Or don't bother? #[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)] -pub struct SeekToken(pub UnixTimestampMilliseconds, pub u64); +pub struct SeekToken { + pub start_ts: UnixTimestampMilliseconds, + pub offset: u64, +} /// A streaming reader for metric logs. /// If the underlying reader supports seeks, it is possible to get SeekTokens which may be used @@ -36,16 +39,69 @@ impl MetricsLogReader { /// Reads a frame. /// Returns the start time of the frame and the frame itself. pub fn read_frame(&mut self) -> anyhow::Result> { - match serde_bare::from_reader::<_, Frame>(&mut self.reader) { - Ok(frame) => Ok(Some((self.last_read_ts, frame))), + let mut interceptor = EofTrackingReadInterceptor::new(&mut self.reader); + match serde_bare::from_reader::<_, Frame>(&mut interceptor) { + Ok(frame) => { + let start_ts = self.last_read_ts; + self.last_read_ts = frame.end_time; + Ok(Some((start_ts, frame))) + } + // This doesn't seem to work properly for some reason... Err(err) if err.classify().is_eof() => Ok(None), Err(other_err) => { - bail!("Failed to read frame: {:?}", other_err); + let eof_flag = interceptor.was_eof(); + if eof_flag == Some(true) { + Ok(None) + } else { + bail!( + "Failed to read frame: {:?} class {:?}, intercepted eof flag {:?}", + other_err, + other_err.classify(), + eof_flag + ); + } } } } } +struct EofTrackingReadInterceptor { + inner: R, + /// None if no reads have taken place yet. + /// Some(true) if the first read that took place and was EOF + /// Some(false) if the first read that took place and was not EOF + was_eof_flag: Option, +} + +impl EofTrackingReadInterceptor { + pub fn new(inner: R) -> EofTrackingReadInterceptor { + EofTrackingReadInterceptor { + inner, + was_eof_flag: None, + } + } + + pub fn was_eof(self) -> Option { + self.was_eof_flag + } +} + +impl Read for EofTrackingReadInterceptor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if self.was_eof_flag.is_none() { + let count = self.inner.read(buf)?; + if count == 0 { + self.was_eof_flag = Some(true); + } else { + self.was_eof_flag = Some(false); + } + Ok(count) + } else { + self.inner.read(buf) + } + } +} + impl MetricsLogReader { /// Reads a new frame, and returns a seek token that can be used to rewind such that you can /// 'undo' the read. @@ -56,7 +112,10 @@ impl MetricsLogReader { // TODO should we rewind in case of error? if let Some((timestamp, frame)) = self.read_frame()? { Ok(Some(( - SeekToken(timestamp, current_pos_in_file), + SeekToken { + start_ts: timestamp, + offset: current_pos_in_file, + }, timestamp, frame, ))) @@ -72,10 +131,11 @@ impl MetricsLogReader { /// The given seek token MUST have come from this instance's `read_from_rewindable` function. /// Otherwise, corrupt frames may be read. /// The old position is returned as a seek token. - pub fn seek( - &mut self, - SeekToken(seek_timestamp, seek_pos): SeekToken, - ) -> anyhow::Result { + pub fn seek(&mut self, seek_token: SeekToken) -> anyhow::Result { + let SeekToken { + start_ts: seek_timestamp, + offset: seek_pos, + } = seek_token; let old_pos_in_file = self.reader.stream_position()?; let old_timestamp = self.last_read_ts; @@ -83,6 +143,9 @@ impl MetricsLogReader { self.reader.seek(SeekFrom::Start(seek_pos))?; self.last_read_ts = seek_timestamp; - Ok(SeekToken(old_timestamp, old_pos_in_file)) + Ok(SeekToken { + start_ts: old_timestamp, + offset: old_pos_in_file, + }) } } diff --git a/bare-metrics-recorder/examples/histogram.rs b/bare-metrics-recorder/examples/histogram.rs new file mode 100644 index 0000000..91b99bc --- /dev/null +++ b/bare-metrics-recorder/examples/histogram.rs @@ -0,0 +1,21 @@ +use bare_metrics_recorder::recording::BareMetricsRecorderCore; +use metrics::{histogram, register_histogram, Unit}; +use std::fs::File; +use std::time::Duration; + +pub fn main() -> anyhow::Result<()> { + let (shard, stopper) = + BareMetricsRecorderCore::new(File::create("/tmp/histogram-example.baremetrics")?) + .start()?; + shard.install_as_metrics_recorder()?; + register_histogram!("my_hist", Unit::BitsPerSecond, "Some thing :)."); + + for x in 0..300 { + histogram!("my_hist", x as f64); + std::thread::sleep(Duration::from_secs(1)); + } + + stopper.stop(); + + Ok(()) +} diff --git a/bare-metrics-recorder/src/recording.rs b/bare-metrics-recorder/src/recording.rs index 4675245..ddd355f 100644 --- a/bare-metrics-recorder/src/recording.rs +++ b/bare-metrics-recorder/src/recording.rs @@ -1,5 +1,6 @@ use bare_metrics_core::structures::{ - Frame, MetricDescriptor, MetricId, MetricKind, SerialisableHistogram, + get_supported_version, Frame, LogHeader, MetricDescriptor, MetricId, MetricKind, + SerialisableHistogram, UnixTimestampMilliseconds, }; use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; use dashmap::DashMap; @@ -7,12 +8,12 @@ use hdrhistogram::Histogram; use log::{error, warn}; use metrics::{GaugeValue, Key, Recorder, Unit}; use serde_bare::Uint; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::io::Write; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use std::thread::JoinHandle; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; // TODO add a reader // the reader accepts a Read or a File (is there a 'SeekRead'?) @@ -78,6 +79,12 @@ impl BareMetricsRecorderCore { } fn emit_and_clear_frame(&mut self) -> anyhow::Result<()> { + self.next_frame.end_time = UnixTimestampMilliseconds( + SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + ); serde_bare::to_writer(&mut self.writer, &self.next_frame)?; self.writer.flush()?; @@ -91,6 +98,21 @@ impl BareMetricsRecorderCore { } fn run_forever(mut self, rx: Receiver) -> anyhow::Result<()> { + // Write out the header. + let start = SystemTime::now(); + serde_bare::to_writer( + &mut self.writer, + &LogHeader { + bare_metrics_version: get_supported_version().to_string(), + start_time: UnixTimestampMilliseconds( + start + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + ), + }, + )?; + let mut next_emission = Instant::now() + self.emission_interval; loop { match rx.recv_deadline(next_emission) { @@ -219,7 +241,7 @@ pub enum RecorderMessage { unit: Option, description: &'static str, name: String, - labels: HashMap, + labels: BTreeMap, }, HistogramReading { metric_id: MetricId, @@ -268,7 +290,7 @@ impl BareMetricsRecorderShard { labels: key .labels() .map(|l| (l.key().to_string(), l.value().to_string())) - .collect::>(), + .collect::>(), }; if let Err(e) = self.recorder_tx.send(registration) {