Track request-response time in milliseconds in the metrics

This commit is contained in:
Olivier 'reivilibre' 2022-01-10 21:25:46 +00:00
parent 660595046b
commit 9a74fa2cdc
1 changed files with 18 additions and 7 deletions

View File

@ -11,7 +11,9 @@ use log::{error, info};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings};
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
use metrics::{histogram, register_histogram, Unit};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::time::Instant;
/// A kind of RawPile which can make requests to a RawPile over a pipe (e.g. TCP socket or an
/// SSH connection).
@ -26,7 +28,7 @@ impl Requester {
read: R,
write: W,
) -> (Self, Vec<JoinHandle<()>>) {
let in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>> =
let in_flight: Arc<Mutex<HashMap<u16, (Sender<ResponseBody>, Instant)>>> =
Arc::new(Mutex::new(HashMap::new()));
let (command_sender, command_receiver) = crossbeam_channel::bounded(16);
let mut handles = Vec::new();
@ -64,7 +66,7 @@ impl Requester {
}
pub fn new_from_stdio() -> (Self, Vec<JoinHandle<()>>) {
let in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>> =
let in_flight: Arc<Mutex<HashMap<u16, (Sender<ResponseBody>, Instant)>>> =
Arc::new(Mutex::new(HashMap::new()));
let (command_sender, command_receiver) = crossbeam_channel::bounded(16);
let mut handles = Vec::new();
@ -112,7 +114,7 @@ impl Requester {
/// Thread that reads messages and sends them along.
fn reader<R: Read>(
mut read: R,
in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>,
in_flight: Arc<Mutex<HashMap<u16, (Sender<ResponseBody>, Instant)>>>,
shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>,
) -> anyhow::Result<()> {
loop {
@ -125,8 +127,15 @@ impl Requester {
}
let map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?;
map.get(&response.response_to)
.ok_or(anyhow!("Didn't find response channel..."))?
let (resp_sender, req_instant) = map
.get(&response.response_to)
.ok_or(anyhow!("Didn't find response channel..."))?;
let req_resp_time_in_millis =
Instant::now().duration_since(*req_instant).as_millis() as f64;
histogram!("requester_cmd_response_time_ms", req_resp_time_in_millis);
resp_sender
.send(response.body)
.or(Err(anyhow!("Failed to send response to channel")))?;
}
@ -135,7 +144,7 @@ impl Requester {
/// Thread that writes messages.
fn writer<W: Write>(
mut write: W,
in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>,
in_flight: Arc<Mutex<HashMap<u16, (Sender<ResponseBody>, Instant)>>>,
command_receiver: Receiver<(RequestBody, Option<Sender<ResponseBody>>)>,
shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>,
) -> anyhow::Result<()> {
@ -146,7 +155,8 @@ impl Requester {
.into_iter()
.find(|id| !map.contains_key(&id))
.expect("No ID found");
map.insert(request_id, response_channel);
let now = Instant::now();
map.insert(request_id, (response_channel, now));
request_id
} else {
0
@ -315,6 +325,7 @@ impl RawPile for Requester {
// need to be able to apply backpressure.
let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(128);
let command_sender = self.commands.clone();
register_histogram!("requester_cmd_response_time_ms", Unit::Milliseconds);
std::thread::spawn(move || {
let (response_tx, response_rx) = crossbeam_channel::bounded::<ResponseBody>(32);