Extract background loader to separate file

This commit is contained in:
Olivier 'reivilibre' 2022-01-08 15:19:08 +00:00
parent 5e96828707
commit f8ec5f22b5
3 changed files with 671 additions and 651 deletions

View File

@ -0,0 +1,667 @@
use crate::config::MetricTransform;
use anyhow::{anyhow, bail};
use bare_metrics_core::structures::{
Frame, MetricDescriptor, MetricId, MetricKind, UnixTimestampMilliseconds,
};
use bare_metrics_reader::{MetricsLogReader, SeekToken};
use eframe::egui::{
Color32, Frame as EguiFrame, PointerButton, Pos2, Rect, Sense, Stroke, TextStyle, Ui, Vec2,
};
use hdrhistogram::Histogram;
use log::{debug, error, info};
use std::collections::{BTreeMap, HashMap};
use std::io::{Read, Seek};
use std::ops::{DerefMut, Mul, RangeInclusive};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
/// Make a checkpoint every 10 minutes?
/// This should probably be tunable since it will likely vary per-source ...
pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000;
pub type StateCallbackHook = Arc<Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>>;
#[derive(Clone, Debug)]
pub struct MetricsWindow {
pub time_range: RangeInclusive<f64>,
pub wanted_time_points: u32,
pub metrics: MetricDescriptorTable,
pub metric_descriptors: HashMap<MetricId, MetricDescriptor>,
pub histograms: HashMap<MetricId, HistogramWindow>,
pub counters: HashMap<MetricId, ScalarWindow>,
pub gauges: HashMap<MetricId, ScalarWindow>,
}
#[derive(Copy, Clone, Debug, Default)]
pub struct HeatmapRect {
pub min_ts: UnixTimestampMilliseconds,
pub max_ts: UnixTimestampMilliseconds,
pub min_val: f64,
pub max_val: f64,
pub density: f64,
}
#[derive(Clone, Debug)]
pub struct HistogramWindow {
pub heatmap: Vec<HeatmapRect>,
pub min_density: f64,
pub max_density: f64,
pub y_axis: RangeInclusive<f64>,
}
#[derive(Clone, Debug, Default)]
pub struct UnsummarisedHistogramWindow {
pub map: Vec<(
UnixTimestampMilliseconds,
UnixTimestampMilliseconds,
Histogram<u64>,
)>,
}
impl UnsummarisedHistogramWindow {
pub fn summarise(&self) -> HistogramWindow {
let mut heatmap = Vec::new();
let mut min_density = f64::INFINITY;
let mut max_density = f64::NEG_INFINITY;
let mut min_val = f64::INFINITY;
let mut max_val = f64::NEG_INFINITY;
for (xa, xb, hist) in self.map.iter() {
let min_this_slice = hist.min();
min_val = min_val.min(min_this_slice as f64);
max_val = max_val.max(hist.max() as f64);
let mut prev = min_this_slice;
for q in (1..=10).map(|i| i as f64 * 0.1) {
// TODO improve this. It's going to be a bit off because of the way that
// numbers can be 'equivalent' etc ...
let value_at_q = hist.value_at_quantile(q);
let count_between = hist.count_between(prev, value_at_q);
let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64;
let density = count_between as f64 / area;
min_density = min_density.min(density);
max_density = max_density.max(density);
//eprintln!("den {}", density);
// TODO need to autoscale based on density...
heatmap.push(HeatmapRect {
min_ts: *xa,
max_ts: *xb,
min_val: prev as f64,
max_val: value_at_q as f64,
density,
});
prev = value_at_q;
}
//debug!("hist min {:?} max {:?}", hist.min(), hist.max());
}
let y_axis = min_val..=max_val;
debug!("yaxis {:?}", y_axis);
HistogramWindow {
heatmap,
min_density,
max_density,
y_axis,
}
}
}
#[derive(Clone, Debug)]
pub struct ScalarWindow {
pub points: Vec<(UnixTimestampMilliseconds, f64)>,
pub y_axis: RangeInclusive<f64>,
}
impl ScalarWindow {
pub fn summarise_in_place(&mut self) {
let mut min_point = f64::INFINITY;
let mut max_point = f64::NEG_INFINITY;
for (_, point) in self.points.iter() {
min_point = min_point.min(*point);
max_point = max_point.max(*point);
}
self.y_axis = min_point..=max_point;
}
}
pub struct MetricsLogReadingRequester {
pub shared: Arc<MetricsLogReadingShared>,
pub tx: Sender<MetricsLogReaderMessage>,
}
pub struct MetricsLogReadingShared {
/// The current window of data.
pub current_window: RwLock<MetricsWindow>,
/// True if a new window of data is being loaded.
pub loading_new_window: AtomicBool,
}
pub enum MetricsLogReaderMessage {
LoadNewWindow {
/// Callback that gets called when there's a state change.
/// The bool represents whether the loading is finished or not.
on_state_change: Box<dyn Fn(bool) -> () + 'static + Send>,
new_time_range: RangeInclusive<f64>,
new_wanted_time_points: u32,
},
}
impl MetricsLogReadingRequester {
pub fn new_manager<R: Read + Seek + Send + 'static>(
reader: MetricsLogReader<R>,
on_initial_scan_done: StateCallbackHook,
) -> MetricsLogReadingRequester {
let shared = Arc::new(MetricsLogReadingShared {
current_window: RwLock::new(MetricsWindow {
time_range: 0.0..=0.0,
wanted_time_points: 0,
metrics: Default::default(),
metric_descriptors: Default::default(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
}),
loading_new_window: AtomicBool::new(false),
});
let manager_shared_ref = shared.clone();
let (tx, rx) = std::sync::mpsc::channel();
let requester = MetricsLogReadingRequester { shared, tx };
std::thread::Builder::new()
.name("metricslogreader".to_string())
.spawn(move || {
if let Err(err) = MetricsLogReaderManager::new_and_run(
manager_shared_ref,
reader,
rx,
on_initial_scan_done,
) {
error!("Error in background log reader: {:?}", err);
}
})
.unwrap();
requester
}
}
/// We don't track histograms because they don't 'accumulate'; the histograms are emitted every
/// frame when there are any samples.
#[derive(Clone, Debug)]
pub struct MetricsState {
at: UnixTimestampMilliseconds,
gauges: HashMap<MetricId, f64>,
counters: HashMap<MetricId, u64>,
}
#[derive(Clone, Debug)]
pub struct Checkpoint {
/// State AT the checkpoint
pub state: MetricsState,
/// Seek token for the frame which ends at the checkpoint.
pub seek: SeekToken,
}
pub struct MetricsLogReaderManager<R: Read + Seek> {
shared_ref: Arc<MetricsLogReadingShared>,
reader: MetricsLogReader<R>,
rx: Receiver<MetricsLogReaderMessage>,
metric_descriptors: HashMap<MetricId, MetricDescriptor>,
metric_descriptors_dirty: bool,
metric_descriptor_table: MetricDescriptorTable,
checkpoints: Vec<Checkpoint>,
}
/// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent
/// between re-runs and makes it easier to confidently order the UI and assign colours etc...
pub type MetricDescriptorTable = BTreeMap<String, BTreeMap<BTreeMap<String, String>, MetricId>>;
impl<R: Read + Seek> MetricsLogReaderManager<R> {
fn integrate_state(current_state: &mut MetricsState, incoming: &Frame) {
current_state.at = incoming.end_time;
// Integrate the state...
// We have no need for histogram state because they emit every time there are any
// samples (not just on changes).
for (metric_id, count) in incoming.counter_updates.iter() {
current_state.counters.insert(*metric_id, count.0);
}
for (metric_id, value) in incoming.gauge_updates.iter() {
current_state.gauges.insert(*metric_id, *value);
}
}
fn initial_scan(&mut self) -> anyhow::Result<()> {
let mut state = MetricsState {
at: UnixTimestampMilliseconds(0),
gauges: HashMap::with_capacity(0),
counters: HashMap::with_capacity(0),
};
let mut next_checkpoint_at = 0u64;
while let Some((old_seek_token, old_end_ts, frame)) = self.reader.read_frame_rewindable()? {
if old_end_ts.0 > next_checkpoint_at {
// Make a checkpoint here!
self.checkpoints.push(Checkpoint {
state: state.clone(),
seek: old_seek_token,
});
next_checkpoint_at = old_end_ts.0 + CHECKPOINT_EVERY_MILLISECONDS;
}
debug!("FRAME {:?}\n{:#?}", old_end_ts, frame);
Self::integrate_state(&mut state, &frame);
// Also keep track of new metrics
for (new_metric, metric_descriptor) in frame.new_metrics {
self.metric_descriptors
.insert(new_metric, metric_descriptor);
self.metric_descriptors_dirty = true;
}
}
debug!("Initial scan complete.");
Ok(())
}
fn update_metric_descriptors(&mut self) {
if !self.metric_descriptors_dirty {
return;
}
self.metric_descriptors_dirty = false;
let mut metric_names_to_labels_to_ids: MetricDescriptorTable = BTreeMap::new();
for (metric_id, metric_descriptor) in self.metric_descriptors.iter() {
let labels_to_ids = metric_names_to_labels_to_ids
.entry(metric_descriptor.name.to_string())
.or_insert_with(BTreeMap::new);
labels_to_ids.insert(metric_descriptor.labels.clone(), *metric_id);
}
self.metric_descriptor_table = metric_names_to_labels_to_ids;
}
/// Returns the first checkpoint that precedes or contains the wanted instant.
fn find_checkpoint_just_before(
&self,
instant: UnixTimestampMilliseconds,
) -> anyhow::Result<&Checkpoint> {
let idx = match self
.checkpoints
.binary_search_by(|candidate| candidate.state.at.cmp(&instant))
{
Ok(idx) => idx,
Err(idx) => {
if idx == 0 {
// This probably means that we haven't created a checkpoint at time zero.
// We should make sure to always do that ...
bail!(
"The checkpoint for entry (before {:?}) would be at idx -1.",
instant
);
} else {
idx - 1
}
}
};
if let Some(checkpoint) = self.checkpoints.get(idx) {
debug!(
"First checkpoint just before {:?} is idx{} at {:?}until{:?}",
instant, idx, checkpoint.seek.start_ts, checkpoint.state.at
);
Ok(checkpoint)
} else {
bail!("No checkpoints...");
}
}
/// Returns the seek token for the first frame whose time interval includes the time point specified.
fn lookup_seek_token_for_frame_containing_time(
&mut self,
just_before_or_at: UnixTimestampMilliseconds,
) -> anyhow::Result<SeekToken> {
let checkpoint_just_before = self.find_checkpoint_just_before(just_before_or_at)?.clone();
let return_to = self.reader.seek(checkpoint_just_before.seek)?;
let mut result = checkpoint_just_before.seek;
let mut previous = checkpoint_just_before.seek;
loop {
if let Some((seek, start_ts, frame)) = self.reader.read_frame_rewindable()? {
previous = seek;
if frame.end_time > just_before_or_at {
if start_ts <= just_before_or_at {
// This frame contains the wanted time point.
// (If we don't hit this case, then we ran past it.)
result = seek;
}
break;
}
} else {
// Ran out of frames! We use the last one we saw.
result = previous;
break;
}
}
self.reader.seek(return_to)?;
Ok(result)
}
fn bicimate_histograms(
vec: &mut Vec<(
UnixTimestampMilliseconds,
UnixTimestampMilliseconds,
Histogram<u64>,
)>,
) {
let mut iter = vec.chunks_exact_mut(2);
for chunks in &mut iter {
let (ch0, ch1) = chunks.split_at_mut(1);
let (_start0, end0, hist0) = &mut ch0[0];
let (_start1, end1, hist1) = &mut ch1[0];
// Make chunk 0 encompass the whole thing.
*end0 = *end1;
// Add the histograms
*hist0 += &*hist1;
}
// Bicimate to remove the unwanted parts.
Self::bicimate(vec);
}
fn bicimate<T>(vec: &mut Vec<T>) {
// we always keep the last point!
let last = vec.len() - 1;
let mut the_vec = Vec::with_capacity(0);
std::mem::swap(&mut the_vec, vec);
*vec = the_vec
.into_iter()
.enumerate()
.filter(|(idx, _)| idx % 2 == 0 || *idx == last)
.map(|(_, v)| v)
.collect::<Vec<T>>();
}
fn load_window_of_data(
&mut self,
start: UnixTimestampMilliseconds,
end: UnixTimestampMilliseconds,
time_points: u32,
) -> anyhow::Result<MetricsWindow> {
let mut metrics_window = MetricsWindow {
time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001),
wanted_time_points: time_points,
metrics: self.metric_descriptor_table.clone(),
metric_descriptors: self.metric_descriptors.clone(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
};
let mut histograms: HashMap<MetricId, UnsummarisedHistogramWindow> = Default::default();
for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() {
for (_, metric_id) in metric_labels_to_ids.iter() {
match metrics_window.metric_descriptors[metric_id].kind {
MetricKind::Histogram => {
histograms.insert(*metric_id, Default::default());
}
MetricKind::Gauge => {
metrics_window.gauges.insert(
*metric_id,
ScalarWindow {
points: vec![],
y_axis: 0.0..=0.0,
},
);
}
MetricKind::Counter => {
metrics_window.counters.insert(
*metric_id,
ScalarWindow {
points: vec![],
y_axis: 0.0..=0.0,
},
);
}
}
}
}
let checkpoint = self.find_checkpoint_just_before(start)?.clone();
let mut frame_state = checkpoint.state;
let next_token = checkpoint.seek;
let mut in_window = false;
fn write_to_window(
metric_window: &mut MetricsWindow,
histograms: &mut HashMap<MetricId, UnsummarisedHistogramWindow>,
current_state: &MetricsState,
frame: Option<(UnixTimestampMilliseconds, &Frame)>,
) {
if let Some((frame_start, frame)) = frame {
for (metric_id, histogram) in frame.histograms.iter() {
histograms.get_mut(metric_id).unwrap().map.push((
frame_start,
frame.end_time,
histogram.underlying.clone(),
));
}
}
for (metric_id, count) in current_state.counters.iter() {
metric_window
.counters
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *count as f64));
}
for (metric_id, value) in current_state.gauges.iter() {
metric_window
.gauges
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *value));
}
}
// need to try and preserve extra points just before the window and just after, for
// continuity ...
let mut num_points = 0;
self.reader.seek(next_token)?;
loop {
if let Some((start_ts, frame)) = self.reader.read_frame()? {
Self::integrate_state(&mut frame_state, &frame);
if !in_window {
if frame_state.at >= start {
in_window = true;
} else {
// Clear the windows because we still haven't reached the start and only
// want one sample before the start.
num_points = 0;
for (_, window) in histograms.iter_mut() {
window.map.clear();
}
for (_, window) in metrics_window.counters.iter_mut() {
window.points.clear();
}
for (_, window) in metrics_window.gauges.iter_mut() {
window.points.clear();
}
}
}
num_points += 1;
debug!("Writing frame to window with start ts {:?}", start_ts);
write_to_window(
&mut metrics_window,
&mut histograms,
&frame_state,
Some((start_ts, &frame)),
);
if frame_state.at >= end {
// We've gone past the end. Stop here.
break;
}
} else {
break;
}
}
while num_points >= time_points * 2 {
// Keep halving the points until we have less than double the number of requested points.
for (_, counter) in metrics_window.counters.iter_mut() {
Self::bicimate(&mut counter.points);
}
for (_, gauge) in metrics_window.gauges.iter_mut() {
Self::bicimate(&mut gauge.points);
}
for (_, hists) in histograms.iter_mut() {
Self::bicimate_histograms(&mut hists.map);
}
num_points = (num_points - 1) / 2 + 1;
}
for (metric_id, unsummarised_histogram) in histograms.into_iter() {
metrics_window
.histograms
.insert(metric_id, unsummarised_histogram.summarise());
}
for (metric_id, gauge) in metrics_window.gauges.iter_mut() {
gauge.summarise_in_place();
}
for (metric_id, counter) in metrics_window.counters.iter_mut() {
counter.summarise_in_place();
}
Ok(metrics_window)
}
fn run(&mut self, on_initial_scan_done: StateCallbackHook) -> anyhow::Result<()> {
info!("Starting manager");
self.initial_scan()?;
self.update_metric_descriptors();
info!("Initial scan done.");
let mut the_func = None;
let mut callback_guard = on_initial_scan_done
.lock()
.map_err(|_| anyhow!("Can't lock"))?;
std::mem::swap(&mut the_func, callback_guard.deref_mut());
if let Some(on_initial_scan_done) = the_func {
on_initial_scan_done();
}
while let Ok(msg) = self.rx.recv() {
match msg {
MetricsLogReaderMessage::LoadNewWindow {
on_state_change,
new_time_range,
new_wanted_time_points,
} => {
debug!(
"Requested new window; time range {:?}, № points {}",
new_time_range, new_wanted_time_points
);
self.shared_ref
.loading_new_window
.store(true, Ordering::SeqCst);
on_state_change(false);
let start = if *new_time_range.start() == f64::NEG_INFINITY {
// autoscale by getting the earliest time point of the metrics log
let new_start = self.reader.header.start_time.0;
debug!("Autoscaling start to: {:?}", new_start);
new_start
} else {
(*new_time_range.start() * 1000.0) as u64
};
let end = if *new_time_range.end() == f64::INFINITY {
// get the first result before the end of time...
// or in other words, the last result.
let seek_token = self.lookup_seek_token_for_frame_containing_time(
UnixTimestampMilliseconds(u64::MAX),
)?;
self.reader.seek(seek_token)?;
let (start_ts, frame) = self
.reader
.read_frame()?
.ok_or_else(|| anyhow!("Sought frame should exist"))?;
// if let Some(frame) = frame {
// frame.end_time.0
// } else {
// // well, we have no choice. Just use the current time.
// SystemTime::now()
// .duration_since(std::time::UNIX_EPOCH)
// .unwrap()
// .as_millis() as u64
// }
let new_end = frame.end_time.0;
debug!("Autoscaled end to: {:?}", new_end);
new_end
} else {
(*new_time_range.end() * 1000.0) as u64
};
let metric_window = self.load_window_of_data(
UnixTimestampMilliseconds(start),
UnixTimestampMilliseconds(end),
new_wanted_time_points,
)?;
debug!("METRIC WINDOW {:#?}", metric_window);
*(self.shared_ref.current_window.write().unwrap()) = metric_window;
debug!("Finished loading window.");
on_state_change(true);
}
}
}
Ok(())
}
fn new_and_run(
shared_ref: Arc<MetricsLogReadingShared>,
reader: MetricsLogReader<R>,
rx: Receiver<MetricsLogReaderMessage>,
on_initial_scan_done: StateCallbackHook,
) -> anyhow::Result<()> {
let mut manager = MetricsLogReaderManager {
shared_ref,
reader,
rx,
metric_descriptors: Default::default(),
metric_descriptors_dirty: false,
metric_descriptor_table: Default::default(),
checkpoints: Default::default(),
};
manager.run(on_initial_scan_done)
}
}

View File

@ -1,3 +1,4 @@
use crate::background_loader::MetricsLogReadingRequester;
use crate::config::MetricTransform;
use anyhow::{anyhow, bail};
use bare_metrics_core::structures::{
@ -16,656 +17,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
/// Make a checkpoint every 10 minutes?
/// This should probably be tunable since it will likely vary per-source ...
pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000;
pub type StateCallbackHook = Arc<Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>>;
#[derive(Clone, Debug)]
pub struct MetricsWindow {
pub time_range: RangeInclusive<f64>,
pub wanted_time_points: u32,
pub metrics: MetricDescriptorTable,
pub metric_descriptors: HashMap<MetricId, MetricDescriptor>,
pub histograms: HashMap<MetricId, HistogramWindow>,
pub counters: HashMap<MetricId, ScalarWindow>,
pub gauges: HashMap<MetricId, ScalarWindow>,
}
#[derive(Copy, Clone, Debug, Default)]
pub struct HeatmapRect {
min_ts: UnixTimestampMilliseconds,
max_ts: UnixTimestampMilliseconds,
min_val: f64,
max_val: f64,
density: f64,
}
#[derive(Clone, Debug)]
pub struct HistogramWindow {
pub heatmap: Vec<HeatmapRect>,
pub min_density: f64,
pub max_density: f64,
pub y_axis: RangeInclusive<f64>,
}
#[derive(Clone, Debug, Default)]
pub struct UnsummarisedHistogramWindow {
pub map: Vec<(
UnixTimestampMilliseconds,
UnixTimestampMilliseconds,
Histogram<u64>,
)>,
}
impl UnsummarisedHistogramWindow {
pub fn summarise(&self) -> HistogramWindow {
let mut heatmap = Vec::new();
let mut min_density = f64::INFINITY;
let mut max_density = f64::NEG_INFINITY;
let mut min_val = f64::INFINITY;
let mut max_val = f64::NEG_INFINITY;
for (xa, xb, hist) in self.map.iter() {
let min_this_slice = hist.min();
min_val = min_val.min(min_this_slice as f64);
max_val = max_val.max(hist.max() as f64);
let mut prev = min_this_slice;
for q in (1..=10).map(|i| i as f64 * 0.1) {
// TODO improve this. It's going to be a bit off because of the way that
// numbers can be 'equivalent' etc ...
let value_at_q = hist.value_at_quantile(q);
let count_between = hist.count_between(prev, value_at_q);
let area = ((value_at_q - prev) * (xb.0 - xa.0)) as f64;
let density = count_between as f64 / area;
min_density = min_density.min(density);
max_density = max_density.max(density);
//eprintln!("den {}", density);
// TODO need to autoscale based on density...
heatmap.push(HeatmapRect {
min_ts: *xa,
max_ts: *xb,
min_val: prev as f64,
max_val: value_at_q as f64,
density,
});
prev = value_at_q;
}
//debug!("hist min {:?} max {:?}", hist.min(), hist.max());
}
let y_axis = min_val..=max_val;
debug!("yaxis {:?}", y_axis);
HistogramWindow {
heatmap,
min_density,
max_density,
y_axis,
}
}
}
#[derive(Clone, Debug)]
pub struct ScalarWindow {
pub points: Vec<(UnixTimestampMilliseconds, f64)>,
pub y_axis: RangeInclusive<f64>,
}
impl ScalarWindow {
pub fn summarise_in_place(&mut self) {
let mut min_point = f64::INFINITY;
let mut max_point = f64::NEG_INFINITY;
for (_, point) in self.points.iter() {
min_point = min_point.min(*point);
max_point = max_point.max(*point);
}
self.y_axis = min_point..=max_point;
}
}
pub struct MetricsLogReadingRequester {
pub shared: Arc<MetricsLogReadingShared>,
pub tx: Sender<MetricsLogReaderMessage>,
}
pub struct MetricsLogReadingShared {
/// The current window of data.
pub current_window: RwLock<MetricsWindow>,
/// True if a new window of data is being loaded.
pub loading_new_window: AtomicBool,
}
pub enum MetricsLogReaderMessage {
LoadNewWindow {
/// Callback that gets called when there's a state change.
/// The bool represents whether the loading is finished or not.
on_state_change: Box<dyn Fn(bool) -> () + 'static + Send>,
new_time_range: RangeInclusive<f64>,
new_wanted_time_points: u32,
},
}
impl MetricsLogReadingRequester {
pub fn new_manager<R: Read + Seek + Send + 'static>(
reader: MetricsLogReader<R>,
on_initial_scan_done: StateCallbackHook,
) -> MetricsLogReadingRequester {
let shared = Arc::new(MetricsLogReadingShared {
current_window: RwLock::new(MetricsWindow {
time_range: 0.0..=0.0,
wanted_time_points: 0,
metrics: Default::default(),
metric_descriptors: Default::default(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
}),
loading_new_window: AtomicBool::new(false),
});
let manager_shared_ref = shared.clone();
let (tx, rx) = std::sync::mpsc::channel();
let requester = MetricsLogReadingRequester { shared, tx };
std::thread::Builder::new()
.name("metricslogreader".to_string())
.spawn(move || {
if let Err(err) = MetricsLogReaderManager::new_and_run(
manager_shared_ref,
reader,
rx,
on_initial_scan_done,
) {
error!("Error in background log reader: {:?}", err);
}
})
.unwrap();
requester
}
}
/// We don't track histograms because they don't 'accumulate'; the histograms are emitted every
/// frame when there are any samples.
#[derive(Clone, Debug)]
pub struct MetricsState {
at: UnixTimestampMilliseconds,
gauges: HashMap<MetricId, f64>,
counters: HashMap<MetricId, u64>,
}
#[derive(Clone, Debug)]
pub struct Checkpoint {
/// State AT the checkpoint
pub state: MetricsState,
/// Seek token for the frame which ends at the checkpoint.
pub seek: SeekToken,
}
pub struct MetricsLogReaderManager<R: Read + Seek> {
shared_ref: Arc<MetricsLogReadingShared>,
reader: MetricsLogReader<R>,
rx: Receiver<MetricsLogReaderMessage>,
metric_descriptors: HashMap<MetricId, MetricDescriptor>,
metric_descriptors_dirty: bool,
metric_descriptor_table: MetricDescriptorTable,
checkpoints: Vec<Checkpoint>,
}
/// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent
/// between re-runs and makes it easier to confidently order the UI and assign colours etc...
pub type MetricDescriptorTable = BTreeMap<String, BTreeMap<BTreeMap<String, String>, MetricId>>;
impl<R: Read + Seek> MetricsLogReaderManager<R> {
fn integrate_state(current_state: &mut MetricsState, incoming: &Frame) {
current_state.at = incoming.end_time;
// Integrate the state...
// We have no need for histogram state because they emit every time there are any
// samples (not just on changes).
for (metric_id, count) in incoming.counter_updates.iter() {
current_state.counters.insert(*metric_id, count.0);
}
for (metric_id, value) in incoming.gauge_updates.iter() {
current_state.gauges.insert(*metric_id, *value);
}
}
fn initial_scan(&mut self) -> anyhow::Result<()> {
let mut state = MetricsState {
at: UnixTimestampMilliseconds(0),
gauges: HashMap::with_capacity(0),
counters: HashMap::with_capacity(0),
};
let mut next_checkpoint_at = 0u64;
while let Some((old_seek_token, old_end_ts, frame)) = self.reader.read_frame_rewindable()? {
if old_end_ts.0 > next_checkpoint_at {
// Make a checkpoint here!
self.checkpoints.push(Checkpoint {
state: state.clone(),
seek: old_seek_token,
});
next_checkpoint_at = old_end_ts.0 + CHECKPOINT_EVERY_MILLISECONDS;
}
debug!("FRAME {:?}\n{:#?}", old_end_ts, frame);
Self::integrate_state(&mut state, &frame);
// Also keep track of new metrics
for (new_metric, metric_descriptor) in frame.new_metrics {
self.metric_descriptors
.insert(new_metric, metric_descriptor);
self.metric_descriptors_dirty = true;
}
}
debug!("Initial scan complete.");
Ok(())
}
fn update_metric_descriptors(&mut self) {
if !self.metric_descriptors_dirty {
return;
}
self.metric_descriptors_dirty = false;
let mut metric_names_to_labels_to_ids: MetricDescriptorTable = BTreeMap::new();
for (metric_id, metric_descriptor) in self.metric_descriptors.iter() {
let labels_to_ids = metric_names_to_labels_to_ids
.entry(metric_descriptor.name.to_string())
.or_insert_with(BTreeMap::new);
labels_to_ids.insert(metric_descriptor.labels.clone(), *metric_id);
}
self.metric_descriptor_table = metric_names_to_labels_to_ids;
}
/// Returns the first checkpoint that precedes or contains the wanted instant.
fn find_checkpoint_just_before(
&self,
instant: UnixTimestampMilliseconds,
) -> anyhow::Result<&Checkpoint> {
let idx = match self
.checkpoints
.binary_search_by(|candidate| candidate.state.at.cmp(&instant))
{
Ok(idx) => idx,
Err(idx) => {
if idx == 0 {
// This probably means that we haven't created a checkpoint at time zero.
// We should make sure to always do that ...
bail!(
"The checkpoint for entry (before {:?}) would be at idx -1.",
instant
);
} else {
idx - 1
}
}
};
if let Some(checkpoint) = self.checkpoints.get(idx) {
debug!(
"First checkpoint just before {:?} is idx{} at {:?}until{:?}",
instant, idx, checkpoint.seek.start_ts, checkpoint.state.at
);
Ok(checkpoint)
} else {
bail!("No checkpoints...");
}
}
/// Returns the seek token for the first frame whose time interval includes the time point specified.
fn lookup_seek_token_for_frame_containing_time(
&mut self,
just_before_or_at: UnixTimestampMilliseconds,
) -> anyhow::Result<SeekToken> {
let checkpoint_just_before = self.find_checkpoint_just_before(just_before_or_at)?.clone();
let return_to = self.reader.seek(checkpoint_just_before.seek)?;
let mut result = checkpoint_just_before.seek;
let mut previous = checkpoint_just_before.seek;
loop {
if let Some((seek, start_ts, frame)) = self.reader.read_frame_rewindable()? {
previous = seek;
if frame.end_time > just_before_or_at {
if start_ts <= just_before_or_at {
// This frame contains the wanted time point.
// (If we don't hit this case, then we ran past it.)
result = seek;
}
break;
}
} else {
// Ran out of frames! We use the last one we saw.
result = previous;
break;
}
}
self.reader.seek(return_to)?;
Ok(result)
}
fn bicimate_histograms(
vec: &mut Vec<(
UnixTimestampMilliseconds,
UnixTimestampMilliseconds,
Histogram<u64>,
)>,
) {
let mut iter = vec.chunks_exact_mut(2);
for chunks in &mut iter {
let (ch0, ch1) = chunks.split_at_mut(1);
let (_start0, end0, hist0) = &mut ch0[0];
let (_start1, end1, hist1) = &mut ch1[0];
// Make chunk 0 encompass the whole thing.
*end0 = *end1;
// Add the histograms
*hist0 += &*hist1;
}
// Bicimate to remove the unwanted parts.
Self::bicimate(vec);
}
fn bicimate<T>(vec: &mut Vec<T>) {
// we always keep the last point!
let last = vec.len() - 1;
let mut the_vec = Vec::with_capacity(0);
std::mem::swap(&mut the_vec, vec);
*vec = the_vec
.into_iter()
.enumerate()
.filter(|(idx, _)| idx % 2 == 0 || *idx == last)
.map(|(_, v)| v)
.collect::<Vec<T>>();
}
fn load_window_of_data(
&mut self,
start: UnixTimestampMilliseconds,
end: UnixTimestampMilliseconds,
time_points: u32,
) -> anyhow::Result<MetricsWindow> {
let mut metrics_window = MetricsWindow {
time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001),
wanted_time_points: time_points,
metrics: self.metric_descriptor_table.clone(),
metric_descriptors: self.metric_descriptors.clone(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
};
let mut histograms: HashMap<MetricId, UnsummarisedHistogramWindow> = Default::default();
for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() {
for (_, metric_id) in metric_labels_to_ids.iter() {
match metrics_window.metric_descriptors[metric_id].kind {
MetricKind::Histogram => {
histograms.insert(*metric_id, Default::default());
}
MetricKind::Gauge => {
metrics_window.gauges.insert(
*metric_id,
ScalarWindow {
points: vec![],
y_axis: 0.0..=0.0,
},
);
}
MetricKind::Counter => {
metrics_window.counters.insert(
*metric_id,
ScalarWindow {
points: vec![],
y_axis: 0.0..=0.0,
},
);
}
}
}
}
let checkpoint = self.find_checkpoint_just_before(start)?.clone();
let mut frame_state = checkpoint.state;
let next_token = checkpoint.seek;
let mut in_window = false;
fn write_to_window(
metric_window: &mut MetricsWindow,
histograms: &mut HashMap<MetricId, UnsummarisedHistogramWindow>,
current_state: &MetricsState,
frame: Option<(UnixTimestampMilliseconds, &Frame)>,
) {
if let Some((frame_start, frame)) = frame {
for (metric_id, histogram) in frame.histograms.iter() {
histograms.get_mut(metric_id).unwrap().map.push((
frame_start,
frame.end_time,
histogram.underlying.clone(),
));
}
}
for (metric_id, count) in current_state.counters.iter() {
metric_window
.counters
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *count as f64));
}
for (metric_id, value) in current_state.gauges.iter() {
metric_window
.gauges
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *value));
}
}
// need to try and preserve extra points just before the window and just after, for
// continuity ...
let mut num_points = 0;
self.reader.seek(next_token)?;
loop {
if let Some((start_ts, frame)) = self.reader.read_frame()? {
Self::integrate_state(&mut frame_state, &frame);
if !in_window {
if frame_state.at >= start {
in_window = true;
} else {
// Clear the windows because we still haven't reached the start and only
// want one sample before the start.
num_points = 0;
for (_, window) in histograms.iter_mut() {
window.map.clear();
}
for (_, window) in metrics_window.counters.iter_mut() {
window.points.clear();
}
for (_, window) in metrics_window.gauges.iter_mut() {
window.points.clear();
}
}
}
num_points += 1;
debug!("Writing frame to window with start ts {:?}", start_ts);
write_to_window(
&mut metrics_window,
&mut histograms,
&frame_state,
Some((start_ts, &frame)),
);
if frame_state.at >= end {
// We've gone past the end. Stop here.
break;
}
} else {
break;
}
}
while num_points >= time_points * 2 {
// Keep halving the points until we have less than double the number of requested points.
for (_, counter) in metrics_window.counters.iter_mut() {
Self::bicimate(&mut counter.points);
}
for (_, gauge) in metrics_window.gauges.iter_mut() {
Self::bicimate(&mut gauge.points);
}
for (_, hists) in histograms.iter_mut() {
Self::bicimate_histograms(&mut hists.map);
}
num_points = (num_points - 1) / 2 + 1;
}
for (metric_id, unsummarised_histogram) in histograms.into_iter() {
metrics_window
.histograms
.insert(metric_id, unsummarised_histogram.summarise());
}
for (metric_id, gauge) in metrics_window.gauges.iter_mut() {
gauge.summarise_in_place();
}
for (metric_id, counter) in metrics_window.counters.iter_mut() {
counter.summarise_in_place();
}
Ok(metrics_window)
}
fn run(&mut self, on_initial_scan_done: StateCallbackHook) -> anyhow::Result<()> {
info!("Starting manager");
self.initial_scan()?;
self.update_metric_descriptors();
info!("Initial scan done.");
let mut the_func = None;
let mut callback_guard = on_initial_scan_done
.lock()
.map_err(|_| anyhow!("Can't lock"))?;
std::mem::swap(&mut the_func, callback_guard.deref_mut());
if let Some(on_initial_scan_done) = the_func {
on_initial_scan_done();
}
while let Ok(msg) = self.rx.recv() {
match msg {
MetricsLogReaderMessage::LoadNewWindow {
on_state_change,
new_time_range,
new_wanted_time_points,
} => {
debug!(
"Requested new window; time range {:?}, № points {}",
new_time_range, new_wanted_time_points
);
self.shared_ref
.loading_new_window
.store(true, Ordering::SeqCst);
on_state_change(false);
let start = if *new_time_range.start() == f64::NEG_INFINITY {
// autoscale by getting the earliest time point of the metrics log
let new_start = self.reader.header.start_time.0;
debug!("Autoscaling start to: {:?}", new_start);
new_start
} else {
(*new_time_range.start() * 1000.0) as u64
};
let end = if *new_time_range.end() == f64::INFINITY {
// get the first result before the end of time...
// or in other words, the last result.
let seek_token = self.lookup_seek_token_for_frame_containing_time(
UnixTimestampMilliseconds(u64::MAX),
)?;
self.reader.seek(seek_token)?;
let (start_ts, frame) = self
.reader
.read_frame()?
.ok_or_else(|| anyhow!("Sought frame should exist"))?;
// if let Some(frame) = frame {
// frame.end_time.0
// } else {
// // well, we have no choice. Just use the current time.
// SystemTime::now()
// .duration_since(std::time::UNIX_EPOCH)
// .unwrap()
// .as_millis() as u64
// }
let new_end = frame.end_time.0;
debug!("Autoscaled end to: {:?}", new_end);
new_end
} else {
(*new_time_range.end() * 1000.0) as u64
};
let metric_window = self.load_window_of_data(
UnixTimestampMilliseconds(start),
UnixTimestampMilliseconds(end),
new_wanted_time_points,
)?;
debug!("METRIC WINDOW {:#?}", metric_window);
*(self.shared_ref.current_window.write().unwrap()) = metric_window;
debug!("Finished loading window.");
on_state_change(true);
}
}
}
Ok(())
}
fn new_and_run(
shared_ref: Arc<MetricsLogReadingShared>,
reader: MetricsLogReader<R>,
rx: Receiver<MetricsLogReaderMessage>,
on_initial_scan_done: StateCallbackHook,
) -> anyhow::Result<()> {
let mut manager = MetricsLogReaderManager {
shared_ref,
reader,
rx,
metric_descriptors: Default::default(),
metric_descriptors_dirty: false,
metric_descriptor_table: Default::default(),
checkpoints: Default::default(),
};
manager.run(on_initial_scan_done)
}
}
/// Renderer for a bare metrics graph.
/// Should support rendering both line graphs and histograms.
/// Input interactions:

View File

@ -1,5 +1,6 @@
use crate::background_loader::{MetricsLogReaderMessage, MetricsLogReadingRequester};
use crate::config::DashboardConfig;
use crate::graph::{Graph, MetricsLogReaderMessage, MetricsLogReadingRequester};
use crate::graph::Graph;
use bare_metrics_core::structures::MetricId;
use bare_metrics_reader::MetricsLogReader;
use eframe::egui::{CentralPanel, CtxRef};
@ -11,6 +12,7 @@ use std::fs::File;
use std::path::PathBuf;
use std::str::FromStr;
pub mod background_loader;
pub mod config;
pub mod graph;