From d570d35112d943c19f03aadbdd09c3a6c93aef91 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 23 Nov 2021 23:08:16 +0000 Subject: [PATCH] Initial implementation of a recorder --- bare-metrics-recorder/.gitignore | 2 + bare-metrics-recorder/Cargo.toml | 18 ++ bare-metrics-recorder/src/lib.rs | 1 + bare-metrics-recorder/src/recording.rs | 323 +++++++++++++++++++++++++ 4 files changed, 344 insertions(+) create mode 100644 bare-metrics-recorder/.gitignore create mode 100644 bare-metrics-recorder/Cargo.toml create mode 100644 bare-metrics-recorder/src/lib.rs create mode 100644 bare-metrics-recorder/src/recording.rs diff --git a/bare-metrics-recorder/.gitignore b/bare-metrics-recorder/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/bare-metrics-recorder/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/bare-metrics-recorder/Cargo.toml b/bare-metrics-recorder/Cargo.toml new file mode 100644 index 0000000..320c141 --- /dev/null +++ b/bare-metrics-recorder/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "bare-metrics-recorder" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +metrics = "0.17.0" +anyhow = "1.0.48" +thiserror = "1.0.30" +log = "0.4.14" +bare-metrics-core = { version = "0.1.0", path = "../bare-metrics-core" } +dashmap = "4.0.2" +fxhash = "0.2.1" +crossbeam-channel = "0.5.1" +serde_bare = "0.5.0" +hdrhistogram = "7.4.0" diff --git a/bare-metrics-recorder/src/lib.rs b/bare-metrics-recorder/src/lib.rs new file mode 100644 index 0000000..9a606f4 --- /dev/null +++ b/bare-metrics-recorder/src/lib.rs @@ -0,0 +1 @@ +pub mod recording; diff --git a/bare-metrics-recorder/src/recording.rs b/bare-metrics-recorder/src/recording.rs new file mode 100644 index 0000000..efcd34d --- /dev/null +++ b/bare-metrics-recorder/src/recording.rs @@ -0,0 +1,323 @@ +use bare_metrics_core::structures::{ + Frame, MetricDescriptor, MetricId, MetricKind, SerialisableHistogram, +}; +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; +use dashmap::DashMap; +use hdrhistogram::Histogram; +use log::{error, warn}; +use metrics::{GaugeValue, Key, Recorder, Unit}; +use serde_bare::Uint; +use std::collections::HashMap; +use std::io::Write; +use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::Arc; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +// TODO add a reader +// the reader accepts a Read or a File (is there a 'SeekRead'?) +// and streams BARE messages out of it +// Each message is returned with a seek token, which can be used by the application later to return +// the reader to that point in time. + +pub struct BareMetricsRecorderCore { + writer: W, + emission_interval: Duration, + queue_size: usize, + known_metric_kinds: HashMap, + next_frame: Frame, + current_gauges: HashMap, + current_counters: HashMap, +} + +impl BareMetricsRecorderCore { + pub fn new(writer: W) -> Self { + BareMetricsRecorderCore { + writer, + emission_interval: Duration::from_secs(60), + queue_size: 256, + known_metric_kinds: Default::default(), + next_frame: Frame::default(), + current_gauges: Default::default(), + current_counters: Default::default(), + } + } + + pub fn with_emission_interval(mut self, interval: Duration) -> Self { + self.emission_interval = interval; + self + } + + pub fn with_queue_size(mut self, queue_size: usize) -> Self { + self.queue_size = queue_size; + self + } + + pub fn start(self) -> anyhow::Result<(BareMetricsRecorderShard, BareMetricsRecorderStopper)> { + let (tx, rx) = crossbeam_channel::bounded(self.queue_size); + + let handle = std::thread::Builder::new() + .name("bare-metrics-recorder".to_string()) + .spawn(move || { + if let Err(e) = self.run_forever(rx) { + error!("Recorder erred: {:?}", e); + } + })?; + + let shard = BareMetricsRecorderShard { + metric_ids: Default::default(), + metric_id_generator: Default::default(), + recorder_tx: tx.clone(), + }; + + let stopper = BareMetricsRecorderStopper { + join_handle: handle, + sender: tx, + }; + Ok((shard, stopper)) + } + + fn emit_and_clear_frame(&mut self) -> anyhow::Result<()> { + serde_bare::to_writer(&mut self.writer, &self.next_frame)?; + self.writer.flush()?; + + // Clear the next frame so the diff starts from zero again. + self.next_frame.counter_updates.clear(); + self.next_frame.gauge_updates.clear(); + self.next_frame.histograms.clear(); + self.next_frame.new_metrics.clear(); + + Ok(()) + } + + fn run_forever(mut self, rx: Receiver) -> anyhow::Result<()> { + let mut next_emission = Instant::now() + self.emission_interval; + loop { + match rx.recv_deadline(next_emission) { + Ok(RecorderMessage::NewMetric { + metric_id, + metric_kind, + unit, + description, + }) => { + self.next_frame.new_metrics.insert( + metric_id, + MetricDescriptor { + kind: metric_kind, + unit: unit.map(|unit| unit.as_str().to_string()), + description: description.to_string(), + }, + ); + self.known_metric_kinds.insert(metric_id, metric_kind); + match metric_kind { + MetricKind::Histogram => {} + MetricKind::Gauge => { + self.current_gauges.insert(metric_id, 0.0); + } + MetricKind::Counter => { + self.current_counters.insert(metric_id, 0); + } + } + } + Ok(RecorderMessage::HistogramReading { metric_id, value }) => { + let histogram = + self.next_frame + .histograms + .entry(metric_id) + .or_insert_with(|| SerialisableHistogram { + underlying: Histogram::new(3).unwrap(), + }); + if let Err(err) = histogram.underlying.record(value as u64) { + warn!("Couldn't record histogram entry because {:?}", err); + } + } + Ok(RecorderMessage::CounterCount { metric_id, value }) => { + let entry = self.current_counters.get_mut(&metric_id); + if let Some(counter_value) = entry { + *counter_value += value; + self.next_frame + .counter_updates + .insert(metric_id, Uint(*counter_value)); + } + } + Ok(RecorderMessage::GaugeUpdate { + metric_id, + gauge_update, + }) => { + let entry = self.current_gauges.get_mut(&metric_id); + if let Some(gauge_value) = entry { + match gauge_update { + GaugeValue::Absolute(value) => { + *gauge_value = value; + } + GaugeValue::Increment(by) => { + *gauge_value += by; + } + GaugeValue::Decrement(by) => { + *gauge_value -= by; + } + } + + self.next_frame + .gauge_updates + .insert(metric_id, *gauge_value); + } + } + Ok(RecorderMessage::Stop) => { + break; + } + Err(RecvTimeoutError::Timeout) => { + // It's time to emit a new frame. + self.emit_and_clear_frame()?; + next_emission = Instant::now() + self.emission_interval; + } + Err(RecvTimeoutError::Disconnected) => { + break; + } + } + } + + Ok(()) + } +} + +pub struct BareMetricsRecorderStopper { + join_handle: JoinHandle<()>, + sender: Sender, +} + +impl BareMetricsRecorderStopper { + pub fn stop(self) { + self.sender + .send(RecorderMessage::Stop) + .expect("Should have been able to stop bare metrics' recorder."); + self.join_handle + .join() + .expect("Should have been able to join on bare metrics' join handle."); + } +} + +#[derive(Default, Debug)] +pub struct MetricIdGenerator { + next_id: AtomicU16, +} + +impl MetricIdGenerator { + pub fn generate(&self) -> MetricId { + MetricId(self.next_id.fetch_add(1, Ordering::SeqCst)) + } +} + +pub enum RecorderMessage { + NewMetric { + metric_id: MetricId, + metric_kind: MetricKind, + unit: Option, + description: &'static str, + }, + HistogramReading { + metric_id: MetricId, + value: u64, + }, + CounterCount { + metric_id: MetricId, + value: u64, + }, + GaugeUpdate { + metric_id: MetricId, + gauge_update: GaugeValue, + }, + Stop, +} + +#[derive(Clone, Debug)] +pub struct BareMetricsRecorderShard { + metric_ids: Arc>, + metric_id_generator: Arc, + recorder_tx: Sender, +} + +impl BareMetricsRecorderShard { + fn register_metric( + &self, + key: &Key, + metric_kind: MetricKind, + unit: Option, + description: Option<&'static str>, + ) { + let metric_id = self.metric_id_generator.generate(); + if !self.metric_ids.insert(key.clone(), metric_id).is_none() { + error!( + "Double-registered metric key {:?} (id {:?} here)", + key, metric_id + ); + } + + let registration = RecorderMessage::NewMetric { + metric_id, + metric_kind, + unit, + description: description.unwrap_or(""), + }; + + if let Err(e) = self.recorder_tx.send(registration) { + error!( + "Couldn't register metric {:?}: sending to channel: {:?}", + key, e + ) + } + } + + pub fn install_as_metrics_recorder(&self) -> anyhow::Result<()> { + metrics::set_boxed_recorder(Box::new(self.clone()))?; + Ok(()) + } +} + +impl Recorder for BareMetricsRecorderShard { + fn register_counter(&self, key: &Key, unit: Option, description: Option<&'static str>) { + self.register_metric(key, MetricKind::Counter, unit, description); + } + + fn register_gauge(&self, key: &Key, unit: Option, description: Option<&'static str>) { + self.register_metric(key, MetricKind::Gauge, unit, description); + } + + fn register_histogram(&self, key: &Key, unit: Option, description: Option<&'static str>) { + self.register_metric(key, MetricKind::Histogram, unit, description); + } + + fn increment_counter(&self, key: &Key, value: u64) { + if let Some(counter_metric_id) = self.metric_ids.get(key) { + // we intentionally ignore errors (for now). Maybe logging occasionally is the right + // solution? + let _ = self.recorder_tx.send(RecorderMessage::CounterCount { + metric_id: *counter_metric_id, + value, + }); + } + } + + fn update_gauge(&self, key: &Key, value: GaugeValue) { + if let Some(gauge_metric_id) = self.metric_ids.get(key) { + // we intentionally ignore errors (for now). Maybe logging occasionally is the right + // solution? + let _ = self.recorder_tx.send(RecorderMessage::GaugeUpdate { + metric_id: *gauge_metric_id, + gauge_update: value, + }); + } + } + + fn record_histogram(&self, key: &Key, value: f64) { + if let Some(histogram_metric_id) = self.metric_ids.get(key) { + // we intentionally ignore errors (for now). Maybe logging occasionally is the right + // solution? + let _ = self.recorder_tx.send(RecorderMessage::HistogramReading { + metric_id: *histogram_metric_id, + // Our Histogram supports u64 only (maybe expand to i64 later?). + value: value as u64, + }); + } + } +}