Allow computing derivations in the background

This commit is contained in:
Olivier 'reivilibre' 2022-01-08 16:24:52 +00:00
parent c69b4c9332
commit db5f2260bd
4 changed files with 99 additions and 172 deletions

View File

@ -1,10 +1,11 @@
use crate::config::{GraphRequest, MetricTransform};
use anyhow::{anyhow, bail};
use bare_metrics_core::structures::{
Frame, MetricDescriptor, MetricId, MetricKind, UnixTimestampMilliseconds,
};
use bare_metrics_reader::{MetricsLogReader, SeekToken};
use hdrhistogram::Histogram;
use log::{debug, error, info};
use log::{debug, error, info, warn};
use std::collections::{BTreeMap, HashMap};
use std::io::{Read, Seek};
use std::ops::RangeInclusive;
@ -60,9 +61,8 @@ pub struct MetricsWindow {
pub wanted_time_points: u32,
pub metrics: MetricDescriptorTable,
pub metric_descriptors: HashMap<MetricId, MetricDescriptor>,
pub histograms: HashMap<MetricId, HistogramWindow>,
pub counters: HashMap<MetricId, ScalarWindow>,
pub gauges: HashMap<MetricId, ScalarWindow>,
pub histograms: HashMap<GraphRequest, HistogramWindow>,
pub scalars: HashMap<GraphRequest, ScalarWindow>,
}
#[derive(Clone, Debug)]
@ -218,8 +218,7 @@ impl MetricsLogReadingRequester {
metrics: Default::default(),
metric_descriptors: Default::default(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
scalars: Default::default(),
}),
loading_new_window: AtomicBool::new(false),
});
@ -461,159 +460,88 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
start: UnixTimestampMilliseconds,
end: UnixTimestampMilliseconds,
time_points: u32,
requests: Vec<GraphRequest>,
) -> anyhow::Result<MetricsWindow> {
let raw_window = self.load_raw_window_of_data(start, end, time_points)?;
let mut metrics_window = MetricsWindow {
time_range: RangeInclusive::new(start.0 as f64 * 0.001, end.0 as f64 * 0.001),
wanted_time_points: time_points,
metrics: self.metric_descriptor_table.clone(),
metric_descriptors: self.metric_descriptors.clone(),
histograms: Default::default(),
counters: Default::default(),
gauges: Default::default(),
scalars: Default::default(),
};
let mut histograms: HashMap<MetricId, UnsummarisedHistogramWindow> = Default::default();
let _histograms: HashMap<MetricId, UnsummarisedHistogramWindow> = Default::default();
for (_metric_name, metric_labels_to_ids) in self.metric_descriptor_table.iter() {
for (_, metric_id) in metric_labels_to_ids.iter() {
match metrics_window.metric_descriptors[metric_id].kind {
for request in requests {
let metric_id = self
.metric_descriptor_table
.get(&request.metric_name)
.map(|by_labels| by_labels.get(&request.metric_labels))
.flatten();
let metric_id = if let Some(metric_id) = metric_id {
metric_id
} else {
continue;
};
let metric_descriptor = &self.metric_descriptors[metric_id];
match metric_descriptor.kind {
MetricKind::Histogram => {
histograms.insert(*metric_id, Default::default());
if let Some(ref _derivation) = request.derivation {
error!("Derivations not supported for histograms: {:?}", request);
} else {
if let Some(histogram) = raw_window.histograms.get(metric_id) {
// TODO(memory): use COW / Arc for these
metrics_window.histograms.insert(request, histogram.clone());
} else {
warn!("No histogram for {:?}", request);
}
MetricKind::Gauge => {
metrics_window.gauges.insert(
*metric_id,
ScalarWindow {
}
}
MetricKind::Gauge | MetricKind::Counter => {
if let Some(data) = raw_window.scalars.get(metric_id) {
match request.derivation {
None => {
// TODO(memory): use COW / Arc for these
metrics_window.scalars.insert(request, data.clone());
}
Some(MetricTransform::Rate { time_unit }) => {
if data.points.is_empty() {
metrics_window.scalars.insert(request, data.clone());
} else {
let mut new_window = ScalarWindow {
points: vec![],
y_axis: 0.0..=0.0,
},
);
}
MetricKind::Counter => {
metrics_window.counters.insert(
*metric_id,
ScalarWindow {
points: vec![],
y_axis: 0.0..=0.0,
},
);
}
}
}
}
};
let scale_factor = 1.0 / (time_unit as f64);
let checkpoint = self.find_checkpoint_just_before(start)?.clone();
let mut frame_state = checkpoint.state;
let next_token = checkpoint.seek;
let mut in_window = false;
fn write_to_window(
metric_window: &mut MetricsWindow,
histograms: &mut HashMap<MetricId, UnsummarisedHistogramWindow>,
current_state: &MetricsState,
frame: Option<(UnixTimestampMilliseconds, &Frame)>,
) {
if let Some((frame_start, frame)) = frame {
for (metric_id, histogram) in frame.histograms.iter() {
histograms.get_mut(metric_id).unwrap().map.push((
frame_start,
frame.end_time,
histogram.underlying.clone(),
));
let mut last_val = data.points[0];
for next_val in data.points[1..].iter() {
let delta_time = next_val.0.as_f64_seconds()
- last_val.0.as_f64_seconds();
let delta_value = next_val.1 - last_val.1;
let rate = delta_value / delta_time * scale_factor;
// TODO(question): should this instead be at the midpoint in time?
new_window.points.push((next_val.0, rate));
last_val = *next_val;
}
metrics_window.scalars.insert(request, new_window);
}
}
for (metric_id, count) in current_state.counters.iter() {
metric_window
.counters
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *count as f64));
}
for (metric_id, value) in current_state.gauges.iter() {
metric_window
.gauges
.get_mut(metric_id)
.unwrap()
.points
.push((current_state.at, *value));
}
}
// need to try and preserve extra points just before the window and just after, for
// continuity ...
let mut num_points = 0;
self.reader.seek(next_token)?;
loop {
if let Some((start_ts, frame)) = self.reader.read_frame()? {
Self::integrate_state(&mut frame_state, &frame);
if !in_window {
if frame_state.at >= start {
in_window = true;
} else {
// Clear the windows because we still haven't reached the start and only
// want one sample before the start.
num_points = 0;
for (_, window) in histograms.iter_mut() {
window.map.clear();
}
for (_, window) in metrics_window.counters.iter_mut() {
window.points.clear();
}
for (_, window) in metrics_window.gauges.iter_mut() {
window.points.clear();
}
}
}
num_points += 1;
debug!("Writing frame to window with start ts {:?}", start_ts);
write_to_window(
&mut metrics_window,
&mut histograms,
&frame_state,
Some((start_ts, &frame)),
);
if frame_state.at >= end {
// We've gone past the end. Stop here.
break;
}
} else {
break;
warn!("No scalar for {:?}", request);
}
}
}
while num_points >= time_points * 2 {
// Keep halving the points until we have less than double the number of requested points.
for (_, counter) in metrics_window.counters.iter_mut() {
Self::bicimate(&mut counter.points);
if let Some(_scalar) = raw_window.scalars.get(metric_id) {
} else if let Some(_histogram) = raw_window.histograms.get(metric_id) {
} else {
warn!("No metric for ID {:?}", metric_id);
}
for (_, gauge) in metrics_window.gauges.iter_mut() {
Self::bicimate(&mut gauge.points);
}
for (_, hists) in histograms.iter_mut() {
Self::bicimate_histograms(&mut hists.map);
}
num_points = (num_points - 1) / 2 + 1;
}
for (metric_id, unsummarised_histogram) in histograms.into_iter() {
metrics_window
.histograms
.insert(metric_id, unsummarised_histogram.summarise());
}
for (_metric_id, gauge) in metrics_window.gauges.iter_mut() {
gauge.summarise_in_place();
}
for (_metric_id, counter) in metrics_window.counters.iter_mut() {
counter.summarise_in_place();
}
Ok(metrics_window)
@ -827,6 +755,7 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
UnixTimestampMilliseconds(start),
UnixTimestampMilliseconds(end),
new_wanted_time_points,
Default::default(), // TODO
)?;
debug!("METRIC WINDOW {:#?}", metric_window);

View File

@ -7,7 +7,7 @@ pub struct DashboardConfig {
pub graphs: Vec<GraphConfig>,
}
#[derive(Deserialize, Clone, Debug, Hash)]
#[derive(Deserialize, Clone, Debug, Hash, Eq, PartialEq)]
pub enum MetricTransform {
Rate {
// Divisor in seconds. Would usually expect to see 1 (/sec), 60 (/min) or 3600 (/hour).
@ -23,7 +23,7 @@ pub struct GraphConfig {
pub transform: Option<MetricTransform>,
}
#[derive(Clone, Debug, Hash)]
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct GraphRequest {
pub metric_name: String,
pub metric_labels: BTreeMap<String, String>,

View File

@ -1,5 +1,6 @@
use crate::background_loader::MetricsLogReadingRequester;
use bare_metrics_core::structures::{MetricId, UnixTimestampMilliseconds};
use crate::config::GraphRequest;
use bare_metrics_core::structures::{UnixTimestampMilliseconds};
use eframe::egui::{
Color32, Frame as EguiFrame, PointerButton, Pos2, Rect, Sense, Stroke, TextStyle, Ui, Vec2,
};
@ -63,7 +64,7 @@ impl Mul<(f64, f64)> for GraphTransform {
}
impl Graph {
pub fn draw(ui: &mut Ui, metric_id: MetricId, reader: &MetricsLogReadingRequester) {
pub fn draw(ui: &mut Ui, graph_request: &GraphRequest, reader: &MetricsLogReadingRequester) {
let context_menu_id = ui.id().with("context menu");
EguiFrame::dark_canvas(ui.style()).show(ui, |ui| {
@ -100,13 +101,9 @@ impl Graph {
// This range is reversed because screen coordinates go down, but we'd like them to go
// up since this is more of a mathematical graph.
let y_axis = if let Some(histogram) = current_window.histograms.get(&metric_id) {
let y_axis = if let Some(histogram) = current_window.histograms.get(graph_request) {
histogram.y_axis.clone().into_inner()
} else if let Some(scalar) = current_window
.counters
.get(&metric_id)
.or_else(|| current_window.gauges.get(&metric_id))
{
} else if let Some(scalar) = current_window.scalars.get(graph_request) {
scalar.y_axis.clone().into_inner()
} else {
(0.0, 10.0)
@ -153,11 +150,7 @@ impl Graph {
ui.painter().galley(text_rect.left_top(), text);
}
if let Some(scalar) = current_window
.counters
.get(&metric_id)
.or_else(|| current_window.gauges.get(&metric_id))
{
if let Some(scalar) = current_window.scalars.get(graph_request) {
for (x, y) in scalar.points.iter() {
let new_point = display_transform * (x.as_f64_seconds(), *y);
ui.painter().circle_filled(new_point, 5.0, Color32::RED);
@ -168,7 +161,7 @@ impl Graph {
}
}
if let Some(histogram) = current_window.histograms.get(&metric_id) {
if let Some(histogram) = current_window.histograms.get(graph_request) {
let min_density = histogram.min_density;
let max_density = histogram.max_density;
for heatrect in histogram.heatmap.iter() {

View File

@ -1,7 +1,7 @@
use crate::background_loader::{MetricsLogReaderMessage, MetricsLogReadingRequester};
use crate::config::DashboardConfig;
use crate::config::{DashboardConfig, GraphRequest};
use crate::graph::Graph;
use bare_metrics_core::structures::MetricId;
use bare_metrics_reader::MetricsLogReader;
use eframe::egui::{CentralPanel, CtxRef};
use eframe::epi::{App, Frame, Storage};
@ -20,6 +20,7 @@ pub struct MetricsGui {
requester: MetricsLogReadingRequester,
dashboard: Option<DashboardConfig>,
graph_requests: Vec<GraphRequest>,
}
impl App for MetricsGui {
@ -27,24 +28,27 @@ impl App for MetricsGui {
let Self {
requester,
dashboard,
graph_requests,
} = self;
CentralPanel::default().show(ctx, |ui| {
egui::ScrollArea::new([false, true]).show(ui, |ui| {
if let Some(dashboard) = dashboard {
if let Some(_dashboard) = dashboard {
let window = requester.shared.current_window.read().unwrap();
for graph in dashboard.graphs.iter() {
ui.label(&graph.name);
for graph in graph_requests.iter() {
ui.label(&graph.metric_name);
if let Some(metric) = window.metrics.get(&graph.name) {
// TODO support multiple labelled metrics
if let Some((_, first_one)) = metric.iter().next() {
Graph::draw(ui, *first_one, requester);
}
if window.scalars.contains_key(graph)
|| window.histograms.contains_key(graph)
{
Graph::draw(ui, graph, requester);
} else {
// TODO clarify
ui.label("(loading or missing ...)");
}
}
} else {
ui.label("(no dashboard)");
Graph::draw(ui, MetricId(0), &requester);
// TODO Graph::draw(ui, MetricId(0), &requester);
}
});
});
@ -110,6 +114,7 @@ fn main() -> anyhow::Result<()> {
let app = MetricsGui {
requester,
dashboard,
graph_requests: vec![],
};
let native_options = NativeOptions::default();