diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 0b47f39..0965359 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -11,7 +11,9 @@ use log::{error, info}; use crate::definitions::ChunkId; use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; +use metrics::{histogram, register_histogram, Unit}; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; +use std::time::Instant; /// A kind of RawPile which can make requests to a RawPile over a pipe (e.g. TCP socket or an /// SSH connection). @@ -26,7 +28,7 @@ impl Requester { read: R, write: W, ) -> (Self, Vec>) { - let in_flight: Arc>>> = + let in_flight: Arc, Instant)>>> = Arc::new(Mutex::new(HashMap::new())); let (command_sender, command_receiver) = crossbeam_channel::bounded(16); let mut handles = Vec::new(); @@ -64,7 +66,7 @@ impl Requester { } pub fn new_from_stdio() -> (Self, Vec>) { - let in_flight: Arc>>> = + let in_flight: Arc, Instant)>>> = Arc::new(Mutex::new(HashMap::new())); let (command_sender, command_receiver) = crossbeam_channel::bounded(16); let mut handles = Vec::new(); @@ -112,7 +114,7 @@ impl Requester { /// Thread that reads messages and sends them along. fn reader( mut read: R, - in_flight: Arc>>>, + in_flight: Arc, Instant)>>>, shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>, ) -> anyhow::Result<()> { loop { @@ -125,8 +127,15 @@ impl Requester { } let map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?; - map.get(&response.response_to) - .ok_or(anyhow!("Didn't find response channel..."))? + let (resp_sender, req_instant) = map + .get(&response.response_to) + .ok_or(anyhow!("Didn't find response channel..."))?; + + let req_resp_time_in_millis = + Instant::now().duration_since(*req_instant).as_millis() as f64; + histogram!("requester_cmd_response_time_ms", req_resp_time_in_millis); + + resp_sender .send(response.body) .or(Err(anyhow!("Failed to send response to channel")))?; } @@ -135,7 +144,7 @@ impl Requester { /// Thread that writes messages. fn writer( mut write: W, - in_flight: Arc>>>, + in_flight: Arc, Instant)>>>, command_receiver: Receiver<(RequestBody, Option>)>, shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>, ) -> anyhow::Result<()> { @@ -146,7 +155,8 @@ impl Requester { .into_iter() .find(|id| !map.contains_key(&id)) .expect("No ID found"); - map.insert(request_id, response_channel); + let now = Instant::now(); + map.insert(request_id, (response_channel, now)); request_id } else { 0 @@ -315,6 +325,7 @@ impl RawPile for Requester { // need to be able to apply backpressure. let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec)>(128); let command_sender = self.commands.clone(); + register_histogram!("requester_cmd_response_time_ms", Unit::Milliseconds); std::thread::spawn(move || { let (response_tx, response_rx) = crossbeam_channel::bounded::(32);