Bugfixes and very primitive GUI display

This commit is contained in:
Olivier 'reivilibre' 2021-11-27 22:44:43 +00:00
parent a78d384252
commit 912e826bb1
6 changed files with 702 additions and 81 deletions

View File

@ -16,6 +16,12 @@ pub fn get_supported_version() -> String {
)]
pub struct UnixTimestampMilliseconds(pub u64);
impl UnixTimestampMilliseconds {
pub fn as_f64_seconds(self) -> f64 {
self.0 as f64 * 0.001
}
}
/// Header for a metric log file.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct LogHeader {

View File

@ -1,14 +1,18 @@
use bare_metrics_core::structures::{Frame, MetricDescriptor, MetricId, UnixTimestampMilliseconds};
use anyhow::{anyhow, bail};
use bare_metrics_core::structures::{
Frame, MetricDescriptor, MetricId, MetricKind, UnixTimestampMilliseconds,
};
use bare_metrics_reader::{MetricsLogReader, SeekToken};
use eframe::egui::{
Color32, Frame as EguiFrame, PointerButton, Pos2, Rect, Sense, Stroke, Ui, Vec2,
};
use hdrhistogram::Histogram;
use log::{debug, error, info};
use std::collections::{BTreeMap, HashMap};
use std::io::{Read, Seek};
use std::ops::RangeInclusive;
use std::ops::{Mul, RangeInclusive};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
@ -18,31 +22,95 @@ pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000;
#[derive(Clone, Debug)]
pub struct MetricsWindow {
time_range: RangeInclusive<f64>,
wanted_time_points: u32,
metrics: MetricDescriptorTable,
histograms: HashMap<MetricId, HistogramWindow>,
counters: HashMap<MetricId, ScalarWindow>,
gauges: HashMap<MetricId, ScalarWindow>,
pub time_range: RangeInclusive<f64>,
pub wanted_time_points: u32,
pub metrics: MetricDescriptorTable,
pub metric_descriptors: HashMap<MetricId, MetricDescriptor>,
pub histograms: HashMap<MetricId, HistogramWindow>,
pub counters: HashMap<MetricId, ScalarWindow>,
pub gauges: HashMap<MetricId, ScalarWindow>,
}
#[derive(Clone, Debug)]
#[derive(Copy, Clone, Debug, Default)]
pub struct HeatmapRect {
min_ts: UnixTimestampMilliseconds,
max_ts: UnixTimestampMilliseconds,
min_val: f64,
max_val: f64,
density: f64,
}
#[derive(Clone, Debug, Default)]
pub struct HistogramWindow {
map: Vec<(
pub heatmap: Vec<HeatmapRect>,
pub min_density: f64,
pub max_density: f64,
}
#[derive(Clone, Debug, Default)]
pub struct UnsummarisedHistogramWindow {
pub map: Vec<(
UnixTimestampMilliseconds,
UnixTimestampMilliseconds,
HistogramWindow,
Histogram<u64>,
)>,
}
#[derive(Clone, Debug)]
impl UnsummarisedHistogramWindow {
pub fn summarise(&self) -> HistogramWindow {
let mut heatmap = Vec::new();
let mut min_density = f64::INFINITY;
let mut max_density = f64::NEG_INFINITY;
for (xa, xb, hist) in self.map.iter() {
let min_val = hist.min();
let max_val = hist.max();
let mut prev = min_val;
for q in (1..=10).map(|i| i as f64 * 0.1) {
// TODO improve this. It's going to be a bit off because of the way that
// numbers can be 'equivalent' etc ...
let value_at_q = hist.value_at_quantile(q);
let count_between = hist.count_between(prev, value_at_q);
let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64;
let density = count_between as f64 / area;
min_density = min_density.min(density);
max_density = max_density.max(density);
//eprintln!("den {}", density);
// TODO need to autoscale based on density...
heatmap.push(HeatmapRect {
min_ts: *xa,
max_ts: *xb,
min_val: prev as f64,
max_val: value_at_q as f64,
density,
});
prev = value_at_q;
}
//debug!("hist min {:?} max {:?}", hist.min(), hist.max());
}
HistogramWindow {
heatmap,
min_density,
max_density,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct ScalarWindow {
points: Vec<(UnixTimestampMilliseconds, f64)>,
}
pub struct MetricsLogReadingRequester {
shared: Arc<MetricsLogReadingShared>,
// todo sender:
pub shared: Arc<MetricsLogReadingShared>,
pub tx: Sender<MetricsLogReaderMessage>,
}
pub struct MetricsLogReadingShared {
@ -72,6 +140,7 @@ impl MetricsLogReadingRequester {
time_range: 0.0..=0.0,
wanted_time_points: 0,
metrics: Default::default(),
metric_descriptors: Default::default(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
@ -79,9 +148,8 @@ impl MetricsLogReadingRequester {
loading_new_window: AtomicBool::new(false),
});
let manager_shared_ref = shared.clone();
let requester = MetricsLogReadingRequester { shared };
let (tx, rx) = std::sync::mpsc::channel();
let requester = MetricsLogReadingRequester { shared, tx };
std::thread::Builder::new()
.name("metricslogreader".to_string())
@ -107,6 +175,14 @@ pub struct MetricsState {
counters: HashMap<MetricId, u64>,
}
#[derive(Clone, Debug)]
pub struct Checkpoint {
/// State AT the checkpoint
pub state: MetricsState,
/// Seek token for the frame which ends at the checkpoint.
pub seek: SeekToken,
}
pub struct MetricsLogReaderManager<R: Read + Seek> {
shared_ref: Arc<MetricsLogReadingShared>,
reader: MetricsLogReader<R>,
@ -114,7 +190,7 @@ pub struct MetricsLogReaderManager<R: Read + Seek> {
metric_descriptors: HashMap<MetricId, MetricDescriptor>,
metric_descriptors_dirty: bool,
metric_descriptor_table: MetricDescriptorTable,
checkpoints: Vec<(MetricsState, SeekToken)>,
checkpoints: Vec<Checkpoint>,
}
/// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent
@ -122,6 +198,20 @@ pub struct MetricsLogReaderManager<R: Read + Seek> {
pub type MetricDescriptorTable = BTreeMap<String, BTreeMap<BTreeMap<String, String>, MetricId>>;
impl<R: Read + Seek> MetricsLogReaderManager<R> {
fn integrate_state(current_state: &mut MetricsState, incoming: &Frame) {
current_state.at = incoming.end_time;
// Integrate the state...
// We have no need for histogram state because they emit every time there are any
// samples (not just on changes).
for (metric_id, count) in incoming.counter_updates.iter() {
current_state.counters.insert(*metric_id, count.0);
}
for (metric_id, value) in incoming.gauge_updates.iter() {
current_state.gauges.insert(*metric_id, *value);
}
}
fn initial_scan(&mut self) -> anyhow::Result<()> {
let mut state = MetricsState {
at: UnixTimestampMilliseconds(0),
@ -131,22 +221,18 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
let mut next_checkpoint_at = 0u64;
while let Some((old_seek_token, old_end_ts, frame)) = self.reader.read_frame_rewindable()? {
state.at = old_end_ts;
if state.at.0 > next_checkpoint_at {
if old_end_ts.0 > next_checkpoint_at {
// Make a checkpoint here!
self.checkpoints.push((state.clone(), old_seek_token));
next_checkpoint_at = state.at.0 + CHECKPOINT_EVERY_MILLISECONDS;
self.checkpoints.push(Checkpoint {
state: state.clone(),
seek: old_seek_token,
});
next_checkpoint_at = old_end_ts.0 + CHECKPOINT_EVERY_MILLISECONDS;
}
// Integrate the state...
// We have no need for histogram state because they emit every time there are any
// samples (not just on changes).
for (metric_id, count) in frame.counter_updates {
state.counters.insert(metric_id, count.0);
}
for (metric_id, value) in frame.gauge_updates {
state.gauges.insert(metric_id, value);
}
debug!("FRAME {:?}\n{:#?}", old_end_ts, frame);
Self::integrate_state(&mut state, &frame);
// Also keep track of new metrics
for (new_metric, metric_descriptor) in frame.new_metrics {
@ -156,6 +242,8 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
}
}
debug!("Initial scan complete.");
Ok(())
}
@ -176,18 +264,254 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
self.metric_descriptor_table = metric_names_to_labels_to_ids;
}
/// Returns the first checkpoint that precedes or contains the wanted instant.
fn find_checkpoint_just_before(
&self,
instant: UnixTimestampMilliseconds,
) -> anyhow::Result<&Checkpoint> {
let idx = match self
.checkpoints
.binary_search_by(|candidate| candidate.state.at.cmp(&instant))
{
Ok(idx) => idx,
Err(idx) => {
if idx == 0 {
// This probably means that we haven't created a checkpoint at time zero.
// We should make sure to always do that ...
bail!(
"The checkpoint for entry (before {:?}) would be at idx -1.",
instant
);
} else {
idx - 1
}
}
};
if let Some(checkpoint) = self.checkpoints.get(idx) {
debug!(
"First checkpoint just before {:?} is idx{} at {:?}until{:?}",
instant, idx, checkpoint.seek.start_ts, checkpoint.state.at
);
Ok(checkpoint)
} else {
bail!("No checkpoints...");
}
}
/// Returns the seek token for the first frame whose time interval includes the time point specified.
fn lookup_seek_token_for_frame_containing_time(
&mut self,
just_before_or_at: UnixTimestampMilliseconds,
) -> anyhow::Result<SeekToken> {
todo!()
let checkpoint_just_before = self.find_checkpoint_just_before(just_before_or_at)?.clone();
let return_to = self.reader.seek(checkpoint_just_before.seek)?;
let mut result = checkpoint_just_before.seek;
let mut previous = checkpoint_just_before.seek;
loop {
if let Some((seek, start_ts, frame)) = self.reader.read_frame_rewindable()? {
previous = seek;
if frame.end_time > just_before_or_at {
if start_ts <= just_before_or_at {
// This frame contains the wanted time point.
// (If we don't hit this case, then we ran past it.)
result = seek;
}
break;
}
} else {
// Ran out of frames! We use the last one we saw.
result = previous;
break;
}
}
self.reader.seek(return_to)?;
Ok(result)
}
fn seek_to_first_before(
fn bicimate_histograms(
vec: &mut Vec<(
UnixTimestampMilliseconds,
UnixTimestampMilliseconds,
Histogram<u64>,
)>,
) {
let mut iter = vec.chunks_exact_mut(2);
for chunks in &mut iter {
let (ch0, ch1) = chunks.split_at_mut(1);
let (_start0, end0, hist0) = &mut ch0[0];
let (_start1, end1, hist1) = &mut ch1[0];
// Make chunk 0 encompass the whole thing.
*end0 = *end1;
// Add the histograms
*hist0 += &*hist1;
}
// Bicimate to remove the unwanted parts.
Self::bicimate(vec);
}
fn bicimate<T>(vec: &mut Vec<T>) {
// we always keep the last point!
let last = vec.len() - 1;
let mut the_vec = Vec::with_capacity(0);
std::mem::swap(&mut the_vec, vec);
*vec = the_vec
.into_iter()
.enumerate()
.filter(|(idx, _)| idx % 2 == 0 || *idx == last)
.map(|(_, v)| v)
.collect::<Vec<T>>();
}
fn load_window_of_data(
&mut self,
just_before_or_at: UnixTimestampMilliseconds,
) -> anyhow::Result<(SeekToken, Option<Frame>)> {
todo!()
start: UnixTimestampMilliseconds,
end: UnixTimestampMilliseconds,
time_points: u32,
) -> anyhow::Result<MetricsWindow> {
let mut metrics_window = MetricsWindow {
time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001),
wanted_time_points: time_points,
metrics: self.metric_descriptor_table.clone(),
metric_descriptors: self.metric_descriptors.clone(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
};
let mut histograms: HashMap<MetricId, UnsummarisedHistogramWindow> = Default::default();
for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() {
for (_, metric_id) in metric_labels_to_ids.iter() {
match metrics_window.metric_descriptors[metric_id].kind {
MetricKind::Histogram => {
histograms.insert(*metric_id, Default::default());
}
MetricKind::Gauge => {
metrics_window.gauges.insert(*metric_id, Default::default());
}
MetricKind::Counter => {
metrics_window
.counters
.insert(*metric_id, Default::default());
}
}
}
}
let checkpoint = self.find_checkpoint_just_before(start)?.clone();
let mut frame_state = checkpoint.state;
let next_token = checkpoint.seek;
let mut in_window = false;
fn write_to_window(
metric_window: &mut MetricsWindow,
histograms: &mut HashMap<MetricId, UnsummarisedHistogramWindow>,
current_state: &MetricsState,
frame: Option<(UnixTimestampMilliseconds, &Frame)>,
) {
if let Some((frame_start, frame)) = frame {
for (metric_id, histogram) in frame.histograms.iter() {
histograms.get_mut(metric_id).unwrap().map.push((
frame_start,
frame.end_time,
histogram.underlying.clone(),
));
}
}
for (metric_id, count) in current_state.counters.iter() {
metric_window
.counters
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *count as f64));
}
for (metric_id, value) in current_state.gauges.iter() {
metric_window
.gauges
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *value));
}
}
// need to try and preserve extra points just before the window and just after, for
// continuity ...
let mut num_points = 0;
self.reader.seek(next_token)?;
loop {
if let Some((start_ts, frame)) = self.reader.read_frame()? {
Self::integrate_state(&mut frame_state, &frame);
if !in_window {
if frame_state.at >= start {
in_window = true;
} else {
// Clear the windows because we still haven't reached the start and only
// want one sample before the start.
num_points = 0;
for (_, window) in histograms.iter_mut() {
window.map.clear();
}
for (_, window) in metrics_window.counters.iter_mut() {
window.points.clear();
}
for (_, window) in metrics_window.gauges.iter_mut() {
window.points.clear();
}
}
}
num_points += 1;
debug!("Writing frame to window with start ts {:?}", start_ts);
write_to_window(
&mut metrics_window,
&mut histograms,
&frame_state,
Some((start_ts, &frame)),
);
if frame_state.at >= end {
// We've gone past the end. Stop here.
break;
}
} else {
break;
}
}
while num_points >= time_points * 2 {
// Keep halving the points until we have less than double the number of requested points.
for (_, counter) in metrics_window.counters.iter_mut() {
Self::bicimate(&mut counter.points);
}
for (_, gauge) in metrics_window.gauges.iter_mut() {
Self::bicimate(&mut gauge.points);
}
for (_, hists) in histograms.iter_mut() {
Self::bicimate_histograms(&mut hists.map);
}
num_points = (num_points - 1) / 2 + 1;
}
for (metric_id, unsummarised_histogram) in histograms.into_iter() {
metrics_window
.histograms
.insert(metric_id, unsummarised_histogram.summarise());
}
Ok(metrics_window)
}
fn run(&mut self) -> anyhow::Result<()> {
@ -203,35 +527,65 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
new_time_range,
new_wanted_time_points,
} => {
debug!(
"Requested new window; time range {:?}, № points {}",
new_time_range, new_wanted_time_points
);
self.shared_ref
.loading_new_window
.store(true, Ordering::SeqCst);
on_state_change(false);
todo!();
let start = if *new_time_range.start() == f64::NEG_INFINITY {
// autoscale by getting the earliest time point of the metrics log
self.reader.header.start_time.0
let new_start = self.reader.header.start_time.0;
debug!("Autoscaling start to: {:?}", new_start);
new_start
} else {
(*new_time_range.start() * 1000.0) as u64
};
let end = if *new_time_range.end() == f64::INFINITY {
// get the first result before the end of time...
// or in other words, the last result.
let (_, frame) =
self.seek_to_first_before(UnixTimestampMilliseconds(u64::MAX))?;
if let Some(frame) = frame {
frame.end_time.0
} else {
// well, we have no choice. Just use the current time.
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
let seek_token = self.lookup_seek_token_for_frame_containing_time(
UnixTimestampMilliseconds(u64::MAX),
)?;
self.reader.seek(seek_token)?;
let (start_ts, frame) = self
.reader
.read_frame()?
.ok_or_else(|| anyhow!("Sought frame should exist"))?;
// if let Some(frame) = frame {
// frame.end_time.0
// } else {
// // well, we have no choice. Just use the current time.
// SystemTime::now()
// .duration_since(std::time::UNIX_EPOCH)
// .unwrap()
// .as_millis() as u64
// }
let new_end = frame.end_time.0;
debug!("Autoscaled end to: {:?}", new_end);
new_end
} else {
(*new_time_range.end() * 1000.0) as u64
};
let metric_window = self.load_window_of_data(
UnixTimestampMilliseconds(start),
UnixTimestampMilliseconds(end),
new_wanted_time_points,
)?;
debug!("METRIC WINDOW {:#?}", metric_window);
*(self.shared_ref.current_window.write().unwrap()) = metric_window;
debug!("Finished loading window.");
on_state_change(true);
}
}
}
@ -275,8 +629,48 @@ pub struct GraphState {
pub x_last_time: UnixTimestampMilliseconds,
}
#[derive(Copy, Clone, Debug)]
pub struct GraphTransform {
pub scale_x: f64,
pub scale_y: f64,
pub offs_x: f64,
pub offs_y: f64,
}
/// Inspired by egui's RectTransform but uses f64 because f32 don't have enough precision
/// to work well with unix timestamps...
impl GraphTransform {
pub fn from_ranges_and_display_rect(
x_range: RangeInclusive<f64>,
y_range: RangeInclusive<f64>,
rect: Rect,
) -> Self {
let scale_x = (rect.max.x - rect.min.x) as f64 / (x_range.end() - x_range.start());
let scale_y = (rect.max.y - rect.min.y) as f64 / (y_range.end() - y_range.start());
let offs_x = rect.min.x as f64 - scale_x * x_range.start();
let offs_y = rect.min.y as f64 - scale_y * y_range.start();
GraphTransform {
scale_x,
scale_y,
offs_x,
offs_y,
}
}
}
impl Mul<(f64, f64)> for GraphTransform {
type Output = Pos2;
fn mul(self, (in_x, in_y): (f64, f64)) -> Self::Output {
let x = self.scale_x * in_x + self.offs_x;
let y = self.scale_y * in_y + self.offs_y;
Pos2::new(x as f32, y as f32)
}
}
impl Graph {
pub fn draw(ui: &mut Ui) {
pub fn draw(ui: &mut Ui, metric_id: MetricId, reader: &MetricsLogReadingRequester) {
let context_menu_id = ui.id().with("context menu");
EguiFrame::dark_canvas(ui.style()).show(ui, |ui| {
@ -304,24 +698,111 @@ impl Graph {
let display_rect = response.rect;
// Scale the graph to the appropriate axes.
let x_axis = 0.0..=1.0;
let current_window = reader.shared.current_window.read().unwrap();
let mut x_axis = *current_window.time_range.start()..=*current_window.time_range.end();
if x_axis.end() == x_axis.start() {
x_axis = *x_axis.start()..=*x_axis.start() + 1.0;
}
// This range is reversed because screen coordinates go down, but we'd like them to go
// up since this is more of a mathematical graph.
let y_axis = 1.0..=0.0;
let y_axis = 300.0..=0.0;
// let display_transform =
// emath::RectTransform::from_to(Rect::from_x_y_ranges(x_axis, y_axis), display_rect);
let display_transform =
emath::RectTransform::from_to(Rect::from_x_y_ranges(x_axis, y_axis), display_rect);
GraphTransform::from_ranges_and_display_rect(x_axis, y_axis, display_rect);
let stroke = Stroke::new(2.0, Color32::GREEN);
ui.painter().line_segment(
[
display_transform * Pos2::new(0.1, 0.3),
display_transform * Pos2::new(0.9, 0.6),
],
stroke,
);
let mut last_point = None;
if let Some(scalar) = current_window
.counters
.get(&metric_id)
.or_else(|| current_window.gauges.get(&metric_id))
{
for (x, y) in scalar.points.iter() {
let new_point = display_transform * (x.as_f64_seconds(), *y);
ui.painter().circle_filled(new_point, 5.0, Color32::RED);
if let Some(last_point) = last_point {
ui.painter().line_segment([last_point, new_point], stroke);
}
last_point = Some(new_point);
}
}
if let Some(histogram) = current_window.histograms.get(&metric_id) {
let min_density = histogram.min_density;
let max_density = histogram.max_density;
for heatrect in histogram.heatmap.iter() {
let rect = Rect::from_min_max(
display_transform * (heatrect.min_ts.as_f64_seconds(), heatrect.max_val),
display_transform * (heatrect.max_ts.as_f64_seconds(), heatrect.min_val),
);
eprintln!("R {:?} D {:?}", rect, display_rect);
let colour_cold = Color32::LIGHT_YELLOW;
let colour_hot = Color32::RED;
let lerp =
((heatrect.density - min_density) / (max_density - min_density)) as f32;
eprintln!(
"Lerp {:?} D {:?} mD {:?} MD {:?}",
lerp, heatrect.density, min_density, max_density
);
// TODO refactor this to be a ColorF32 thing or something?
let (cr, cg, cb, ca) = colour_cold.to_tuple();
let (hr, hg, hb, ha) = colour_hot.to_tuple();
let mr = ((hr as f32 - cr as f32) * lerp + cr as f32) as u8;
let mg = ((hg as f32 - cg as f32) * lerp + cg as f32) as u8;
let mb = ((hb as f32 - cb as f32) * lerp + cb as f32) as u8;
let ma = ((ha as f32 - ca as f32) * lerp + ca as f32) as u8;
let lerp_colour = Color32::from_rgba_premultiplied(mr, mg, mb, ma);
eprintln!("LC {:?}", lerp_colour);
ui.painter().rect_filled(rect, 5.0, lerp_colour);
}
/*
// for x in hist.iter_all() {
// debug!("H {:?}", x);
// }
let min_val = hist.min();
let max_val = hist.max();
let mut prev = min_val;
for q in (1..=10).map(|i| i as f64 * 0.1) {
// TODO improve this. It's going to be a bit off because of the way that
// numbers can be 'equivalent' etc ...
let value_at_q = hist.value_at_quantile(q);
let count_between = hist.count_between(prev, value_at_q);
let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64;
let density = count_between as f64 / area;
//eprintln!("den {}", density);
// TODO need to autoscale based on density...
//debug!("xa{} xb{}", xa.as_f64_seconds(), xb.as_f64_seconds());
let rect = Rect::from_min_max(
display_transform * (xa.as_f64_seconds(), value_at_q as f64),
display_transform * (xb.as_f64_seconds(), prev as f64)
);
ui.painter().rect_filled(rect, 5.0, Color32::DEBUG_COLOR);
}
//debug!("hist min {:?} max {:?}", hist.min(), hist.max());
// let minimum = hist.value_at_quantile(0.0);
// hist.max
}
*/
}
});
}
}

View File

@ -1,17 +1,37 @@
use crate::graph::Graph;
use crate::graph::{Graph, MetricsLogReaderMessage, MetricsLogReadingRequester};
use bare_metrics_core::structures::MetricId;
use bare_metrics_reader::MetricsLogReader;
use eframe::egui::{CentralPanel, CtxRef};
use eframe::epi::{App, Frame};
use eframe::epi::{App, Frame, Storage};
use eframe::NativeOptions;
use env_logger::Env;
use std::fs::File;
use std::path::PathBuf;
use std::str::FromStr;
pub mod graph;
pub struct MetricsGui {}
pub struct MetricsGui {
requester: MetricsLogReadingRequester,
}
impl App for MetricsGui {
fn setup(&mut self, ctx: &CtxRef, _frame: &mut Frame<'_>, _storage: Option<&dyn Storage>) {
let ctx = ctx.clone();
self.requester
.tx
.send(MetricsLogReaderMessage::LoadNewWindow {
on_state_change: Box::new(move |_| ctx.request_repaint()),
new_time_range: f64::NEG_INFINITY..=f64::INFINITY,
new_wanted_time_points: 512,
})
.unwrap();
}
fn update(&mut self, ctx: &CtxRef, _frame: &mut Frame<'_>) {
CentralPanel::default().show(ctx, |ui| {
ui.label("Hah");
Graph::draw(ui);
Graph::draw(ui, MetricId(0), &self.requester);
});
}
@ -21,7 +41,15 @@ impl App for MetricsGui {
}
fn main() {
let app = MetricsGui {};
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let path = PathBuf::from_str(&std::env::args().skip(1).next().unwrap()).unwrap();
let file = File::open(path).unwrap();
let requester = MetricsLogReadingRequester::new_manager(MetricsLogReader::new(file).unwrap());
let app = MetricsGui { requester };
let native_options = NativeOptions::default();

View File

@ -7,7 +7,10 @@ use std::io::{Read, Seek, SeekFrom};
/// Token that is known to be usable for seeking to a frame in a metrics log.
/// TODO identify which reader is applicable? Or don't bother?
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct SeekToken(pub UnixTimestampMilliseconds, pub u64);
pub struct SeekToken {
pub start_ts: UnixTimestampMilliseconds,
pub offset: u64,
}
/// A streaming reader for metric logs.
/// If the underlying reader supports seeks, it is possible to get SeekTokens which may be used
@ -36,16 +39,69 @@ impl<R: Read> MetricsLogReader<R> {
/// Reads a frame.
/// Returns the start time of the frame and the frame itself.
pub fn read_frame(&mut self) -> anyhow::Result<Option<(UnixTimestampMilliseconds, Frame)>> {
match serde_bare::from_reader::<_, Frame>(&mut self.reader) {
Ok(frame) => Ok(Some((self.last_read_ts, frame))),
let mut interceptor = EofTrackingReadInterceptor::new(&mut self.reader);
match serde_bare::from_reader::<_, Frame>(&mut interceptor) {
Ok(frame) => {
let start_ts = self.last_read_ts;
self.last_read_ts = frame.end_time;
Ok(Some((start_ts, frame)))
}
// This doesn't seem to work properly for some reason...
Err(err) if err.classify().is_eof() => Ok(None),
Err(other_err) => {
bail!("Failed to read frame: {:?}", other_err);
let eof_flag = interceptor.was_eof();
if eof_flag == Some(true) {
Ok(None)
} else {
bail!(
"Failed to read frame: {:?} class {:?}, intercepted eof flag {:?}",
other_err,
other_err.classify(),
eof_flag
);
}
}
}
}
}
struct EofTrackingReadInterceptor<R: Read> {
inner: R,
/// None if no reads have taken place yet.
/// Some(true) if the first read that took place and was EOF
/// Some(false) if the first read that took place and was not EOF
was_eof_flag: Option<bool>,
}
impl<R: Read> EofTrackingReadInterceptor<R> {
pub fn new(inner: R) -> EofTrackingReadInterceptor<R> {
EofTrackingReadInterceptor {
inner,
was_eof_flag: None,
}
}
pub fn was_eof(self) -> Option<bool> {
self.was_eof_flag
}
}
impl<R: Read> Read for EofTrackingReadInterceptor<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.was_eof_flag.is_none() {
let count = self.inner.read(buf)?;
if count == 0 {
self.was_eof_flag = Some(true);
} else {
self.was_eof_flag = Some(false);
}
Ok(count)
} else {
self.inner.read(buf)
}
}
}
impl<R: Read + Seek> MetricsLogReader<R> {
/// Reads a new frame, and returns a seek token that can be used to rewind such that you can
/// 'undo' the read.
@ -56,7 +112,10 @@ impl<R: Read + Seek> MetricsLogReader<R> {
// TODO should we rewind in case of error?
if let Some((timestamp, frame)) = self.read_frame()? {
Ok(Some((
SeekToken(timestamp, current_pos_in_file),
SeekToken {
start_ts: timestamp,
offset: current_pos_in_file,
},
timestamp,
frame,
)))
@ -72,10 +131,11 @@ impl<R: Read + Seek> MetricsLogReader<R> {
/// The given seek token MUST have come from this instance's `read_from_rewindable` function.
/// Otherwise, corrupt frames may be read.
/// The old position is returned as a seek token.
pub fn seek(
&mut self,
SeekToken(seek_timestamp, seek_pos): SeekToken,
) -> anyhow::Result<SeekToken> {
pub fn seek(&mut self, seek_token: SeekToken) -> anyhow::Result<SeekToken> {
let SeekToken {
start_ts: seek_timestamp,
offset: seek_pos,
} = seek_token;
let old_pos_in_file = self.reader.stream_position()?;
let old_timestamp = self.last_read_ts;
@ -83,6 +143,9 @@ impl<R: Read + Seek> MetricsLogReader<R> {
self.reader.seek(SeekFrom::Start(seek_pos))?;
self.last_read_ts = seek_timestamp;
Ok(SeekToken(old_timestamp, old_pos_in_file))
Ok(SeekToken {
start_ts: old_timestamp,
offset: old_pos_in_file,
})
}
}

View File

@ -0,0 +1,21 @@
use bare_metrics_recorder::recording::BareMetricsRecorderCore;
use metrics::{histogram, register_histogram, Unit};
use std::fs::File;
use std::time::Duration;
pub fn main() -> anyhow::Result<()> {
let (shard, stopper) =
BareMetricsRecorderCore::new(File::create("/tmp/histogram-example.baremetrics")?)
.start()?;
shard.install_as_metrics_recorder()?;
register_histogram!("my_hist", Unit::BitsPerSecond, "Some thing :).");
for x in 0..300 {
histogram!("my_hist", x as f64);
std::thread::sleep(Duration::from_secs(1));
}
stopper.stop();
Ok(())
}

View File

@ -1,5 +1,6 @@
use bare_metrics_core::structures::{
Frame, MetricDescriptor, MetricId, MetricKind, SerialisableHistogram,
get_supported_version, Frame, LogHeader, MetricDescriptor, MetricId, MetricKind,
SerialisableHistogram, UnixTimestampMilliseconds,
};
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use dashmap::DashMap;
@ -7,12 +8,12 @@ use hdrhistogram::Histogram;
use log::{error, warn};
use metrics::{GaugeValue, Key, Recorder, Unit};
use serde_bare::Uint;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::io::Write;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime};
// TODO add a reader
// the reader accepts a Read or a File (is there a 'SeekRead'?)
@ -78,6 +79,12 @@ impl<W: Write + Send + 'static> BareMetricsRecorderCore<W> {
}
fn emit_and_clear_frame(&mut self) -> anyhow::Result<()> {
self.next_frame.end_time = UnixTimestampMilliseconds(
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
);
serde_bare::to_writer(&mut self.writer, &self.next_frame)?;
self.writer.flush()?;
@ -91,6 +98,21 @@ impl<W: Write + Send + 'static> BareMetricsRecorderCore<W> {
}
fn run_forever(mut self, rx: Receiver<RecorderMessage>) -> anyhow::Result<()> {
// Write out the header.
let start = SystemTime::now();
serde_bare::to_writer(
&mut self.writer,
&LogHeader {
bare_metrics_version: get_supported_version().to_string(),
start_time: UnixTimestampMilliseconds(
start
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
),
},
)?;
let mut next_emission = Instant::now() + self.emission_interval;
loop {
match rx.recv_deadline(next_emission) {
@ -219,7 +241,7 @@ pub enum RecorderMessage {
unit: Option<Unit>,
description: &'static str,
name: String,
labels: HashMap<String, String>,
labels: BTreeMap<String, String>,
},
HistogramReading {
metric_id: MetricId,
@ -268,7 +290,7 @@ impl BareMetricsRecorderShard {
labels: key
.labels()
.map(|l| (l.key().to_string(), l.value().to_string()))
.collect::<HashMap<String, String>>(),
.collect::<BTreeMap<String, String>>(),
};
if let Err(e) = self.recorder_tx.send(registration) {