diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 142d540..2039043 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -11,7 +11,9 @@ use log::{error, info}; use crate::definitions::ChunkId; use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; -use metrics::{histogram, register_histogram, Unit}; +use metrics::{ + gauge, histogram, increment_counter, register_counter, register_gauge, register_histogram, Unit, +}; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::time::Instant; @@ -28,6 +30,12 @@ impl Requester { read: R, write: W, ) -> (Self, Vec>) { + register_histogram!( + "requester_cmd_response_time_ms", + Unit::Milliseconds, + "Time between request being issued and a response being received" + ); + let in_flight: Arc, Instant)>>> = Arc::new(Mutex::new(HashMap::new())); let (command_sender, command_receiver) = crossbeam_channel::bounded(16); @@ -349,7 +357,17 @@ impl RawPile for Requester { // need to be able to apply backpressure. let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec)>(128); let command_sender = self.commands.clone(); - register_histogram!("requester_cmd_response_time_ms", Unit::Milliseconds); + + register_counter!( + "requester_pipeline_cmds_issued", + Unit::Count, + "Number of write commands issued by the Requester's storage pipeline" + ); + register_gauge!( + "requester_pipeline_writes_inflight", + Unit::Count, + "Number of write commands in-flight" + ); std::thread::Builder::new() .name("ReqStPpln".to_string()) @@ -360,6 +378,10 @@ impl RawPile for Requester { let mut pipeline_still_going = true; while pipeline_still_going || in_flight_writes > 0 { + gauge!( + "requester_pipeline_writes_inflight", + in_flight_writes as f64 + ); // TODO this won't handle channel closure properly. if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going { crossbeam_channel::select! { @@ -386,6 +408,7 @@ impl RawPile for Requester { recv(receiver) -> resp => { if let Ok((chunk_id, write)) = resp { in_flight_writes += 1; + increment_counter!("requester_pipeline_cmds_issued"); command_sender.send((RequestBody::Write { kind: Keyspace::Chunk, key: chunk_id.to_vec(),