From 95a9006681c766639c6a224fa5a29fda98daeac9 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 25 Nov 2021 09:08:53 +0000 Subject: [PATCH] Add a basic reader implementation --- bare-metrics-reader/Cargo.toml | 14 ++++++ bare-metrics-reader/src/lib.rs | 88 ++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 bare-metrics-reader/Cargo.toml create mode 100644 bare-metrics-reader/src/lib.rs diff --git a/bare-metrics-reader/Cargo.toml b/bare-metrics-reader/Cargo.toml new file mode 100644 index 0000000..5c504a0 --- /dev/null +++ b/bare-metrics-reader/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "bare-metrics-reader" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.48" +thiserror = "1.0.30" +log = "0.4.14" +bare-metrics-core = { version = "0.1.0", path = "../bare-metrics-core" } +serde_bare = "0.5.0" +hdrhistogram = "7.4.0" diff --git a/bare-metrics-reader/src/lib.rs b/bare-metrics-reader/src/lib.rs new file mode 100644 index 0000000..1158024 --- /dev/null +++ b/bare-metrics-reader/src/lib.rs @@ -0,0 +1,88 @@ +use anyhow::bail; +use bare_metrics_core::structures::{ + get_supported_version, Frame, LogHeader, UnixTimestampMilliseconds, +}; +use std::io::{Read, Seek, SeekFrom}; + +/// Token that is known to be usable for seeking to a frame in a metrics log. +/// TODO identify which reader is applicable? Or don't bother? +#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)] +pub struct SeekToken(pub UnixTimestampMilliseconds, pub u64); + +/// A streaming reader for metric logs. +/// If the underlying reader supports seeks, it is possible to get SeekTokens which may be used +/// to seek to previous frames in the stream. +pub struct MetricsLogReader { + reader: R, + pub header: LogHeader, + last_read_ts: UnixTimestampMilliseconds, +} + +impl MetricsLogReader { + /// Constructs a bare metrics log reader. + pub fn new(mut reader: R) -> anyhow::Result { + let header: LogHeader = serde_bare::from_reader(&mut reader)?; + if header.bare_metrics_version != get_supported_version() { + bail!("Wrong version. Expected {:?} got {:?}. Later versions of Bare Metrics may use a stable format.", get_supported_version(), header.bare_metrics_version); + } + let last_read_ts = header.start_time; + Ok(MetricsLogReader { + reader, + header, + last_read_ts, + }) + } + + /// Reads a frame. + /// Returns the start time of the frame and the frame itself. + pub fn read_frame(&mut self) -> anyhow::Result> { + match serde_bare::from_reader::<_, Frame>(&mut self.reader) { + Ok(frame) => Ok(Some((self.last_read_ts, frame))), + Err(err) if err.classify().is_eof() => Ok(None), + Err(other_err) => { + bail!("Failed to read frame: {:?}", other_err); + } + } + } +} + +impl MetricsLogReader { + /// Reads a new frame, and returns a seek token that can be used to rewind such that you can + /// 'undo' the read. + pub fn read_frame_rewindable( + &mut self, + ) -> anyhow::Result> { + let current_pos_in_file = self.reader.stream_position()?; + // TODO should we rewind in case of error? + if let Some((timestamp, frame)) = self.read_frame()? { + Ok(Some(( + SeekToken(timestamp, current_pos_in_file), + timestamp, + frame, + ))) + } else { + // EOF (no more things to read here). + // Rewind to where we were before. + self.reader.seek(SeekFrom::Start(current_pos_in_file))?; + Ok(None) + } + } + + /// Seeks to a position in the stream. + /// The given seek token MUST have come from this instance's `read_from_rewindable` function. + /// Otherwise, corrupt frames may be read. + /// The old position is returned as a seek token. + pub fn seek( + &mut self, + SeekToken(seek_timestamp, seek_pos): SeekToken, + ) -> anyhow::Result { + let old_pos_in_file = self.reader.stream_position()?; + let old_timestamp = self.last_read_ts; + + // TODO should we rewind in case of error? + self.reader.seek(SeekFrom::Start(seek_pos))?; + self.last_read_ts = seek_timestamp; + + Ok(SeekToken(old_timestamp, old_pos_in_file)) + } +}