Add some metrics for emitted packs
This commit is contained in:
parent
806192fab5
commit
2d35298a2e
|
@ -1,5 +1,6 @@
|
|||
use chrono::Utc;
|
||||
use log::warn;
|
||||
use metrics::{describe_counter, register_counter, Counter, Unit};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::OpenOptions;
|
||||
|
@ -32,6 +33,16 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
|
|||
mut rx: Receiver<(Url, T)>,
|
||||
settings: &PackEmitterSettings,
|
||||
) -> anyhow::Result<()> {
|
||||
describe_counter!(
|
||||
"emitted_pack_bytes",
|
||||
Unit::Bytes,
|
||||
"Bytes emitted into a pack file (uncompressed)"
|
||||
);
|
||||
describe_counter!(
|
||||
"emitted_pack_count",
|
||||
Unit::Count,
|
||||
"Records emitted into a pack file"
|
||||
);
|
||||
loop {
|
||||
let now = Utc::now();
|
||||
// 2022-01-01 01:01:01
|
||||
|
@ -49,7 +60,7 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
|
|||
}
|
||||
};
|
||||
|
||||
if !pack_emitter_to_file(&new_pack_file_path, &mut rx, settings)? {
|
||||
if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, settings)? {
|
||||
// File wasn't filled; the receiver was exhausted (we're shutting down).
|
||||
break;
|
||||
}
|
||||
|
@ -67,6 +78,7 @@ struct PackRecord<'a, T> {
|
|||
fn pack_emitter_to_file<T: Serialize>(
|
||||
file: &Path,
|
||||
rx: &mut Receiver<(Url, T)>,
|
||||
name: &str,
|
||||
settings: &PackEmitterSettings,
|
||||
) -> anyhow::Result<bool> {
|
||||
let file = OpenOptions::new().create_new(true).write(true).open(file)?;
|
||||
|
@ -76,6 +88,10 @@ fn pack_emitter_to_file<T: Serialize>(
|
|||
|
||||
let mut length_so_far = 0usize;
|
||||
|
||||
let byte_counter: Counter = register_counter!("emitted_pack_bytes", "pack" => name.to_owned());
|
||||
let record_counter: Counter =
|
||||
register_counter!("emitted_pack_count", "pack" => name.to_owned());
|
||||
|
||||
while let Some((url, record)) = rx.blocking_recv() {
|
||||
serde_bare::to_writer(
|
||||
&mut ser_buf,
|
||||
|
@ -87,6 +103,8 @@ fn pack_emitter_to_file<T: Serialize>(
|
|||
|
||||
compressor.write_all(&ser_buf)?;
|
||||
length_so_far += ser_buf.len();
|
||||
byte_counter.increment(ser_buf.len() as u64);
|
||||
record_counter.increment(1);
|
||||
|
||||
if length_so_far > settings.size_cutoff.unwrap_or(SUGGESTED_SIZE_CUTOFF) {
|
||||
// MUST CALL
|
||||
|
|
Loading…
Reference in New Issue