Add a basic reader implementation

This commit is contained in:
Olivier 'reivilibre' 2021-11-25 09:08:53 +00:00
parent 4c65056856
commit 95a9006681
2 changed files with 102 additions and 0 deletions

View File

@ -0,0 +1,14 @@
[package]
name = "bare-metrics-reader"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.48"
thiserror = "1.0.30"
log = "0.4.14"
bare-metrics-core = { version = "0.1.0", path = "../bare-metrics-core" }
serde_bare = "0.5.0"
hdrhistogram = "7.4.0"

View File

@ -0,0 +1,88 @@
use anyhow::bail;
use bare_metrics_core::structures::{
get_supported_version, Frame, LogHeader, UnixTimestampMilliseconds,
};
use std::io::{Read, Seek, SeekFrom};
/// Token that is known to be usable for seeking to a frame in a metrics log.
/// TODO identify which reader is applicable? Or don't bother?
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct SeekToken(pub UnixTimestampMilliseconds, pub u64);
/// A streaming reader for metric logs.
/// If the underlying reader supports seeks, it is possible to get SeekTokens which may be used
/// to seek to previous frames in the stream.
pub struct MetricsLogReader<R: Read> {
reader: R,
pub header: LogHeader,
last_read_ts: UnixTimestampMilliseconds,
}
impl<R: Read> MetricsLogReader<R> {
/// Constructs a bare metrics log reader.
pub fn new(mut reader: R) -> anyhow::Result<Self> {
let header: LogHeader = serde_bare::from_reader(&mut reader)?;
if header.bare_metrics_version != get_supported_version() {
bail!("Wrong version. Expected {:?} got {:?}. Later versions of Bare Metrics may use a stable format.", get_supported_version(), header.bare_metrics_version);
}
let last_read_ts = header.start_time;
Ok(MetricsLogReader {
reader,
header,
last_read_ts,
})
}
/// Reads a frame.
/// Returns the start time of the frame and the frame itself.
pub fn read_frame(&mut self) -> anyhow::Result<Option<(UnixTimestampMilliseconds, Frame)>> {
match serde_bare::from_reader::<_, Frame>(&mut self.reader) {
Ok(frame) => Ok(Some((self.last_read_ts, frame))),
Err(err) if err.classify().is_eof() => Ok(None),
Err(other_err) => {
bail!("Failed to read frame: {:?}", other_err);
}
}
}
}
impl<R: Read + Seek> MetricsLogReader<R> {
/// Reads a new frame, and returns a seek token that can be used to rewind such that you can
/// 'undo' the read.
pub fn read_frame_rewindable(
&mut self,
) -> anyhow::Result<Option<(SeekToken, UnixTimestampMilliseconds, Frame)>> {
let current_pos_in_file = self.reader.stream_position()?;
// TODO should we rewind in case of error?
if let Some((timestamp, frame)) = self.read_frame()? {
Ok(Some((
SeekToken(timestamp, current_pos_in_file),
timestamp,
frame,
)))
} else {
// EOF (no more things to read here).
// Rewind to where we were before.
self.reader.seek(SeekFrom::Start(current_pos_in_file))?;
Ok(None)
}
}
/// Seeks to a position in the stream.
/// The given seek token MUST have come from this instance's `read_from_rewindable` function.
/// Otherwise, corrupt frames may be read.
/// The old position is returned as a seek token.
pub fn seek(
&mut self,
SeekToken(seek_timestamp, seek_pos): SeekToken,
) -> anyhow::Result<SeekToken> {
let old_pos_in_file = self.reader.stream_position()?;
let old_timestamp = self.last_read_ts;
// TODO should we rewind in case of error?
self.reader.seek(SeekFrom::Start(seek_pos))?;
self.last_read_ts = seek_timestamp;
Ok(SeekToken(old_timestamp, old_pos_in_file))
}
}