Add some more metrics to the Requester storage pipeline

This commit is contained in:
Olivier 'reivilibre' 2022-01-11 07:31:20 +00:00
parent b24a0771ed
commit bedb9785dc
1 changed files with 25 additions and 2 deletions

View File

@ -11,7 +11,9 @@ use log::{error, info};
use crate::definitions::ChunkId; use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings}; use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings};
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
use metrics::{histogram, register_histogram, Unit}; use metrics::{
gauge, histogram, increment_counter, register_counter, register_gauge, register_histogram, Unit,
};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::time::Instant; use std::time::Instant;
@ -28,6 +30,12 @@ impl Requester {
read: R, read: R,
write: W, write: W,
) -> (Self, Vec<JoinHandle<()>>) { ) -> (Self, Vec<JoinHandle<()>>) {
register_histogram!(
"requester_cmd_response_time_ms",
Unit::Milliseconds,
"Time between request being issued and a response being received"
);
let in_flight: Arc<Mutex<HashMap<u16, (Sender<ResponseBody>, Instant)>>> = let in_flight: Arc<Mutex<HashMap<u16, (Sender<ResponseBody>, Instant)>>> =
Arc::new(Mutex::new(HashMap::new())); Arc::new(Mutex::new(HashMap::new()));
let (command_sender, command_receiver) = crossbeam_channel::bounded(16); let (command_sender, command_receiver) = crossbeam_channel::bounded(16);
@ -349,7 +357,17 @@ impl RawPile for Requester {
// need to be able to apply backpressure. // need to be able to apply backpressure.
let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(128); let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(128);
let command_sender = self.commands.clone(); let command_sender = self.commands.clone();
register_histogram!("requester_cmd_response_time_ms", Unit::Milliseconds);
register_counter!(
"requester_pipeline_cmds_issued",
Unit::Count,
"Number of write commands issued by the Requester's storage pipeline"
);
register_gauge!(
"requester_pipeline_writes_inflight",
Unit::Count,
"Number of write commands in-flight"
);
std::thread::Builder::new() std::thread::Builder::new()
.name("ReqStPpln".to_string()) .name("ReqStPpln".to_string())
@ -360,6 +378,10 @@ impl RawPile for Requester {
let mut pipeline_still_going = true; let mut pipeline_still_going = true;
while pipeline_still_going || in_flight_writes > 0 { while pipeline_still_going || in_flight_writes > 0 {
gauge!(
"requester_pipeline_writes_inflight",
in_flight_writes as f64
);
// TODO this won't handle channel closure properly. // TODO this won't handle channel closure properly.
if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going { if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going {
crossbeam_channel::select! { crossbeam_channel::select! {
@ -386,6 +408,7 @@ impl RawPile for Requester {
recv(receiver) -> resp => { recv(receiver) -> resp => {
if let Ok((chunk_id, write)) = resp { if let Ok((chunk_id, write)) = resp {
in_flight_writes += 1; in_flight_writes += 1;
increment_counter!("requester_pipeline_cmds_issued");
command_sender.send((RequestBody::Write { command_sender.send((RequestBody::Write {
kind: Keyspace::Chunk, kind: Keyspace::Chunk,
key: chunk_id.to_vec(), key: chunk_id.to_vec(),