Initial implementation of a recorder

This commit is contained in:
Olivier 'reivilibre' 2021-11-23 23:08:16 +00:00
parent 696954c631
commit d570d35112
4 changed files with 344 additions and 0 deletions

2
bare-metrics-recorder/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
Cargo.lock

View File

@ -0,0 +1,18 @@
[package]
name = "bare-metrics-recorder"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
metrics = "0.17.0"
anyhow = "1.0.48"
thiserror = "1.0.30"
log = "0.4.14"
bare-metrics-core = { version = "0.1.0", path = "../bare-metrics-core" }
dashmap = "4.0.2"
fxhash = "0.2.1"
crossbeam-channel = "0.5.1"
serde_bare = "0.5.0"
hdrhistogram = "7.4.0"

View File

@ -0,0 +1 @@
pub mod recording;

View File

@ -0,0 +1,323 @@
use bare_metrics_core::structures::{
Frame, MetricDescriptor, MetricId, MetricKind, SerialisableHistogram,
};
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use dashmap::DashMap;
use hdrhistogram::Histogram;
use log::{error, warn};
use metrics::{GaugeValue, Key, Recorder, Unit};
use serde_bare::Uint;
use std::collections::HashMap;
use std::io::Write;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
// TODO add a reader
// the reader accepts a Read or a File (is there a 'SeekRead'?)
// and streams BARE messages out of it
// Each message is returned with a seek token, which can be used by the application later to return
// the reader to that point in time.
pub struct BareMetricsRecorderCore<W: Write + Send> {
writer: W,
emission_interval: Duration,
queue_size: usize,
known_metric_kinds: HashMap<MetricId, MetricKind>,
next_frame: Frame,
current_gauges: HashMap<MetricId, f64>,
current_counters: HashMap<MetricId, u64>,
}
impl<W: Write + Send + 'static> BareMetricsRecorderCore<W> {
pub fn new(writer: W) -> Self {
BareMetricsRecorderCore {
writer,
emission_interval: Duration::from_secs(60),
queue_size: 256,
known_metric_kinds: Default::default(),
next_frame: Frame::default(),
current_gauges: Default::default(),
current_counters: Default::default(),
}
}
pub fn with_emission_interval(mut self, interval: Duration) -> Self {
self.emission_interval = interval;
self
}
pub fn with_queue_size(mut self, queue_size: usize) -> Self {
self.queue_size = queue_size;
self
}
pub fn start(self) -> anyhow::Result<(BareMetricsRecorderShard, BareMetricsRecorderStopper)> {
let (tx, rx) = crossbeam_channel::bounded(self.queue_size);
let handle = std::thread::Builder::new()
.name("bare-metrics-recorder".to_string())
.spawn(move || {
if let Err(e) = self.run_forever(rx) {
error!("Recorder erred: {:?}", e);
}
})?;
let shard = BareMetricsRecorderShard {
metric_ids: Default::default(),
metric_id_generator: Default::default(),
recorder_tx: tx.clone(),
};
let stopper = BareMetricsRecorderStopper {
join_handle: handle,
sender: tx,
};
Ok((shard, stopper))
}
fn emit_and_clear_frame(&mut self) -> anyhow::Result<()> {
serde_bare::to_writer(&mut self.writer, &self.next_frame)?;
self.writer.flush()?;
// Clear the next frame so the diff starts from zero again.
self.next_frame.counter_updates.clear();
self.next_frame.gauge_updates.clear();
self.next_frame.histograms.clear();
self.next_frame.new_metrics.clear();
Ok(())
}
fn run_forever(mut self, rx: Receiver<RecorderMessage>) -> anyhow::Result<()> {
let mut next_emission = Instant::now() + self.emission_interval;
loop {
match rx.recv_deadline(next_emission) {
Ok(RecorderMessage::NewMetric {
metric_id,
metric_kind,
unit,
description,
}) => {
self.next_frame.new_metrics.insert(
metric_id,
MetricDescriptor {
kind: metric_kind,
unit: unit.map(|unit| unit.as_str().to_string()),
description: description.to_string(),
},
);
self.known_metric_kinds.insert(metric_id, metric_kind);
match metric_kind {
MetricKind::Histogram => {}
MetricKind::Gauge => {
self.current_gauges.insert(metric_id, 0.0);
}
MetricKind::Counter => {
self.current_counters.insert(metric_id, 0);
}
}
}
Ok(RecorderMessage::HistogramReading { metric_id, value }) => {
let histogram =
self.next_frame
.histograms
.entry(metric_id)
.or_insert_with(|| SerialisableHistogram {
underlying: Histogram::new(3).unwrap(),
});
if let Err(err) = histogram.underlying.record(value as u64) {
warn!("Couldn't record histogram entry because {:?}", err);
}
}
Ok(RecorderMessage::CounterCount { metric_id, value }) => {
let entry = self.current_counters.get_mut(&metric_id);
if let Some(counter_value) = entry {
*counter_value += value;
self.next_frame
.counter_updates
.insert(metric_id, Uint(*counter_value));
}
}
Ok(RecorderMessage::GaugeUpdate {
metric_id,
gauge_update,
}) => {
let entry = self.current_gauges.get_mut(&metric_id);
if let Some(gauge_value) = entry {
match gauge_update {
GaugeValue::Absolute(value) => {
*gauge_value = value;
}
GaugeValue::Increment(by) => {
*gauge_value += by;
}
GaugeValue::Decrement(by) => {
*gauge_value -= by;
}
}
self.next_frame
.gauge_updates
.insert(metric_id, *gauge_value);
}
}
Ok(RecorderMessage::Stop) => {
break;
}
Err(RecvTimeoutError::Timeout) => {
// It's time to emit a new frame.
self.emit_and_clear_frame()?;
next_emission = Instant::now() + self.emission_interval;
}
Err(RecvTimeoutError::Disconnected) => {
break;
}
}
}
Ok(())
}
}
pub struct BareMetricsRecorderStopper {
join_handle: JoinHandle<()>,
sender: Sender<RecorderMessage>,
}
impl BareMetricsRecorderStopper {
pub fn stop(self) {
self.sender
.send(RecorderMessage::Stop)
.expect("Should have been able to stop bare metrics' recorder.");
self.join_handle
.join()
.expect("Should have been able to join on bare metrics' join handle.");
}
}
#[derive(Default, Debug)]
pub struct MetricIdGenerator {
next_id: AtomicU16,
}
impl MetricIdGenerator {
pub fn generate(&self) -> MetricId {
MetricId(self.next_id.fetch_add(1, Ordering::SeqCst))
}
}
pub enum RecorderMessage {
NewMetric {
metric_id: MetricId,
metric_kind: MetricKind,
unit: Option<Unit>,
description: &'static str,
},
HistogramReading {
metric_id: MetricId,
value: u64,
},
CounterCount {
metric_id: MetricId,
value: u64,
},
GaugeUpdate {
metric_id: MetricId,
gauge_update: GaugeValue,
},
Stop,
}
#[derive(Clone, Debug)]
pub struct BareMetricsRecorderShard {
metric_ids: Arc<DashMap<Key, MetricId>>,
metric_id_generator: Arc<MetricIdGenerator>,
recorder_tx: Sender<RecorderMessage>,
}
impl BareMetricsRecorderShard {
fn register_metric(
&self,
key: &Key,
metric_kind: MetricKind,
unit: Option<Unit>,
description: Option<&'static str>,
) {
let metric_id = self.metric_id_generator.generate();
if !self.metric_ids.insert(key.clone(), metric_id).is_none() {
error!(
"Double-registered metric key {:?} (id {:?} here)",
key, metric_id
);
}
let registration = RecorderMessage::NewMetric {
metric_id,
metric_kind,
unit,
description: description.unwrap_or(""),
};
if let Err(e) = self.recorder_tx.send(registration) {
error!(
"Couldn't register metric {:?}: sending to channel: {:?}",
key, e
)
}
}
pub fn install_as_metrics_recorder(&self) -> anyhow::Result<()> {
metrics::set_boxed_recorder(Box::new(self.clone()))?;
Ok(())
}
}
impl Recorder for BareMetricsRecorderShard {
fn register_counter(&self, key: &Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricKind::Counter, unit, description);
}
fn register_gauge(&self, key: &Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricKind::Gauge, unit, description);
}
fn register_histogram(&self, key: &Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricKind::Histogram, unit, description);
}
fn increment_counter(&self, key: &Key, value: u64) {
if let Some(counter_metric_id) = self.metric_ids.get(key) {
// we intentionally ignore errors (for now). Maybe logging occasionally is the right
// solution?
let _ = self.recorder_tx.send(RecorderMessage::CounterCount {
metric_id: *counter_metric_id,
value,
});
}
}
fn update_gauge(&self, key: &Key, value: GaugeValue) {
if let Some(gauge_metric_id) = self.metric_ids.get(key) {
// we intentionally ignore errors (for now). Maybe logging occasionally is the right
// solution?
let _ = self.recorder_tx.send(RecorderMessage::GaugeUpdate {
metric_id: *gauge_metric_id,
gauge_update: value,
});
}
}
fn record_histogram(&self, key: &Key, value: f64) {
if let Some(histogram_metric_id) = self.metric_ids.get(key) {
// we intentionally ignore errors (for now). Maybe logging occasionally is the right
// solution?
let _ = self.recorder_tx.send(RecorderMessage::HistogramReading {
metric_id: *histogram_metric_id,
// Our Histogram supports u64 only (maybe expand to i64 later?).
value: value as u64,
});
}
}
}