diff --git a/quickpeep_raker/src/rakepack_emitter.rs b/quickpeep_raker/src/rakepack_emitter.rs index 80a88c1..7642445 100644 --- a/quickpeep_raker/src/rakepack_emitter.rs +++ b/quickpeep_raker/src/rakepack_emitter.rs @@ -1,5 +1,6 @@ use chrono::Utc; use log::warn; +use metrics::{describe_counter, register_counter, Counter, Unit}; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::fs::OpenOptions; @@ -32,6 +33,16 @@ pub fn pack_emitter( mut rx: Receiver<(Url, T)>, settings: &PackEmitterSettings, ) -> anyhow::Result<()> { + describe_counter!( + "emitted_pack_bytes", + Unit::Bytes, + "Bytes emitted into a pack file (uncompressed)" + ); + describe_counter!( + "emitted_pack_count", + Unit::Count, + "Records emitted into a pack file" + ); loop { let now = Utc::now(); // 2022-01-01 01:01:01 @@ -49,7 +60,7 @@ pub fn pack_emitter( } }; - if !pack_emitter_to_file(&new_pack_file_path, &mut rx, settings)? { + if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, settings)? { // File wasn't filled; the receiver was exhausted (we're shutting down). break; } @@ -67,6 +78,7 @@ struct PackRecord<'a, T> { fn pack_emitter_to_file( file: &Path, rx: &mut Receiver<(Url, T)>, + name: &str, settings: &PackEmitterSettings, ) -> anyhow::Result { let file = OpenOptions::new().create_new(true).write(true).open(file)?; @@ -76,6 +88,10 @@ fn pack_emitter_to_file( let mut length_so_far = 0usize; + let byte_counter: Counter = register_counter!("emitted_pack_bytes", "pack" => name.to_owned()); + let record_counter: Counter = + register_counter!("emitted_pack_count", "pack" => name.to_owned()); + while let Some((url, record)) = rx.blocking_recv() { serde_bare::to_writer( &mut ser_buf, @@ -87,6 +103,8 @@ fn pack_emitter_to_file( compressor.write_all(&ser_buf)?; length_so_far += ser_buf.len(); + byte_counter.increment(ser_buf.len() as u64); + record_counter.increment(1); if length_so_far > settings.size_cutoff.unwrap_or(SUGGESTED_SIZE_CUTOFF) { // MUST CALL