Rework the startup hook and make it a proper response bus

This commit is contained in:
Olivier 'reivilibre' 2022-01-08 15:35:22 +00:00
parent 451bf180b3
commit f83799de00
2 changed files with 82 additions and 27 deletions

View File

@ -7,7 +7,7 @@ use hdrhistogram::Histogram;
use log::{debug, error, info}; use log::{debug, error, info};
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use std::ops::{DerefMut, RangeInclusive}; use std::ops::RangeInclusive;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender}; use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@ -16,7 +16,43 @@ use std::sync::{Arc, Mutex, RwLock};
/// This should probably be tunable since it will likely vary per-source ... /// This should probably be tunable since it will likely vary per-source ...
pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000; pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000;
pub type StateCallbackHook = Arc<Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>>; pub type MessageNotifyHook = Option<Box<dyn Fn() + Send + 'static>>;
pub type MessageNotifyHookContainer = Arc<Mutex<MessageNotifyHook>>;
/// Channel used to send notifications back to the rendering thread.
/// Includes a hook (which is intended to be used to store a callback to request a repaint after
/// a message is sent), called on new messages.
pub struct ResponseBus {
notify: MessageNotifyHookContainer,
channel: Sender<MetricsLogReaderNotification>,
}
impl ResponseBus {
pub fn new(sender: Sender<MetricsLogReaderNotification>) -> ResponseBus {
ResponseBus {
notify: Default::default(),
channel: sender,
}
}
pub fn set_notify_hook(&self, hook: MessageNotifyHook) {
*(self.notify.lock().expect("Notify hook mutex poisoned.")) = hook;
}
/// Returns true if the send actually did anything, false if the receiver hung up.
pub fn send(&self, notification: MetricsLogReaderNotification) -> bool {
let result = self.channel.send(notification).is_ok();
if let Some(callback) = self
.notify
.lock()
.expect("Notify hook mutex poisoned.")
.as_ref()
{
callback();
}
result
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MetricsWindow { pub struct MetricsWindow {
@ -131,6 +167,8 @@ impl ScalarWindow {
pub struct MetricsLogReadingRequester { pub struct MetricsLogReadingRequester {
pub shared: Arc<MetricsLogReadingShared>, pub shared: Arc<MetricsLogReadingShared>,
pub tx: Sender<MetricsLogReaderMessage>, pub tx: Sender<MetricsLogReaderMessage>,
pub rx: Receiver<MetricsLogReaderNotification>,
pub hook: MessageNotifyHookContainer,
} }
pub struct MetricsLogReadingShared { pub struct MetricsLogReadingShared {
@ -142,19 +180,28 @@ pub struct MetricsLogReadingShared {
pub enum MetricsLogReaderMessage { pub enum MetricsLogReaderMessage {
LoadNewWindow { LoadNewWindow {
/// Callback that gets called when there's a state change.
/// The bool represents whether the loading is finished or not.
on_state_change: Box<dyn Fn(bool) -> () + 'static + Send>,
new_time_range: RangeInclusive<f64>, new_time_range: RangeInclusive<f64>,
new_wanted_time_points: u32, new_wanted_time_points: u32,
}, },
} }
#[derive(Clone, Debug)]
pub enum MetricsLogReaderNotification {
/// Signalled when metric descriptors are available (on first load).
/// (may re-use in future if further descriptors become available)
MetricDescriptorsAvailable {},
/// Signalled when there's a state change in the loading of a window.
WindowLoadStateChange {
/// Whether the window is ready or not.
done: bool,
},
}
impl MetricsLogReadingRequester { impl MetricsLogReadingRequester {
pub fn new_manager<R: Read + Seek + Send + 'static>( pub fn new_manager<R: Read + Seek + Send + 'static>(
reader: MetricsLogReader<R>, reader: MetricsLogReader<R>,
on_initial_scan_done: StateCallbackHook, hook: MessageNotifyHook,
) -> MetricsLogReadingRequester { ) -> MetricsLogReadingRequester {
let shared = Arc::new(MetricsLogReadingShared { let shared = Arc::new(MetricsLogReadingShared {
current_window: RwLock::new(MetricsWindow { current_window: RwLock::new(MetricsWindow {
@ -170,7 +217,15 @@ impl MetricsLogReadingRequester {
}); });
let manager_shared_ref = shared.clone(); let manager_shared_ref = shared.clone();
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
let requester = MetricsLogReadingRequester { shared, tx }; let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let response_bus = ResponseBus::new(notify_tx);
response_bus.set_notify_hook(hook);
let requester = MetricsLogReadingRequester {
shared,
tx,
rx: notify_rx,
hook: response_bus.notify.clone(),
};
std::thread::Builder::new() std::thread::Builder::new()
.name("metricslogreader".to_string()) .name("metricslogreader".to_string())
@ -179,7 +234,7 @@ impl MetricsLogReadingRequester {
manager_shared_ref, manager_shared_ref,
reader, reader,
rx, rx,
on_initial_scan_done, response_bus,
) { ) {
error!("Error in background log reader: {:?}", err); error!("Error in background log reader: {:?}", err);
} }
@ -215,6 +270,7 @@ pub struct MetricsLogReaderManager<R: Read + Seek> {
metric_descriptors_dirty: bool, metric_descriptors_dirty: bool,
metric_descriptor_table: MetricDescriptorTable, metric_descriptor_table: MetricDescriptorTable,
checkpoints: Vec<Checkpoint>, checkpoints: Vec<Checkpoint>,
response_bus: ResponseBus,
} }
/// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent /// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent
@ -555,24 +611,18 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
Ok(metrics_window) Ok(metrics_window)
} }
fn run(&mut self, on_initial_scan_done: StateCallbackHook) -> anyhow::Result<()> { fn run(&mut self) -> anyhow::Result<()> {
info!("Starting manager"); info!("Starting manager");
self.initial_scan()?; self.initial_scan()?;
self.update_metric_descriptors(); self.update_metric_descriptors();
info!("Initial scan done."); info!("Initial scan done.");
let mut the_func = None;
let mut callback_guard = on_initial_scan_done self.response_bus
.lock() .send(MetricsLogReaderNotification::MetricDescriptorsAvailable {});
.map_err(|_| anyhow!("Can't lock"))?;
std::mem::swap(&mut the_func, callback_guard.deref_mut());
if let Some(on_initial_scan_done) = the_func {
on_initial_scan_done();
}
while let Ok(msg) = self.rx.recv() { while let Ok(msg) = self.rx.recv() {
match msg { match msg {
MetricsLogReaderMessage::LoadNewWindow { MetricsLogReaderMessage::LoadNewWindow {
on_state_change,
new_time_range, new_time_range,
new_wanted_time_points, new_wanted_time_points,
} => { } => {
@ -583,7 +633,8 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
self.shared_ref self.shared_ref
.loading_new_window .loading_new_window
.store(true, Ordering::SeqCst); .store(true, Ordering::SeqCst);
on_state_change(false); self.response_bus
.send(MetricsLogReaderNotification::WindowLoadStateChange { done: false });
let start = if *new_time_range.start() == f64::NEG_INFINITY { let start = if *new_time_range.start() == f64::NEG_INFINITY {
// autoscale by getting the earliest time point of the metrics log // autoscale by getting the earliest time point of the metrics log
@ -634,7 +685,8 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
debug!("Finished loading window."); debug!("Finished loading window.");
on_state_change(true); self.response_bus
.send(MetricsLogReaderNotification::WindowLoadStateChange { done: true });
} }
} }
} }
@ -646,7 +698,7 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
shared_ref: Arc<MetricsLogReadingShared>, shared_ref: Arc<MetricsLogReadingShared>,
reader: MetricsLogReader<R>, reader: MetricsLogReader<R>,
rx: Receiver<MetricsLogReaderMessage>, rx: Receiver<MetricsLogReaderMessage>,
on_initial_scan_done: StateCallbackHook, response_bus: ResponseBus,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut manager = MetricsLogReaderManager { let mut manager = MetricsLogReaderManager {
shared_ref, shared_ref,
@ -656,8 +708,9 @@ impl<R: Read + Seek> MetricsLogReaderManager<R> {
metric_descriptors_dirty: false, metric_descriptors_dirty: false,
metric_descriptor_table: Default::default(), metric_descriptor_table: Default::default(),
checkpoints: Default::default(), checkpoints: Default::default(),
response_bus,
}; };
manager.run(on_initial_scan_done) manager.run()
} }
} }

View File

@ -52,10 +52,14 @@ impl App for MetricsGui {
fn setup(&mut self, ctx: &CtxRef, _frame: &mut Frame<'_>, _storage: Option<&dyn Storage>) { fn setup(&mut self, ctx: &CtxRef, _frame: &mut Frame<'_>, _storage: Option<&dyn Storage>) {
let ctx = ctx.clone(); let ctx = ctx.clone();
*(self.requester.hook.lock().unwrap()) = Some(Box::new(move || {
ctx.request_repaint();
}));
self.requester self.requester
.tx .tx
.send(MetricsLogReaderMessage::LoadNewWindow { .send(MetricsLogReaderMessage::LoadNewWindow {
on_state_change: Box::new(move |_| ctx.request_repaint()),
new_time_range: f64::NEG_INFINITY..=f64::INFINITY, new_time_range: f64::NEG_INFINITY..=f64::INFINITY,
new_wanted_time_points: 512, new_wanted_time_points: 512,
}) })
@ -84,9 +88,7 @@ fn main() -> anyhow::Result<()> {
let reader = MetricsLogReader::new(file).unwrap(); let reader = MetricsLogReader::new(file).unwrap();
let log_app_name = reader.header.application_name.clone(); let log_app_name = reader.header.application_name.clone();
// TODO we can attach a callback to this hook. let requester = MetricsLogReadingRequester::new_manager(reader, None);
let init_hook_callback = Default::default();
let requester = MetricsLogReadingRequester::new_manager(reader, init_hook_callback);
let dashboard_dir = appdirs::user_data_dir(Some("baremetrics"), Some("rei"), true) let dashboard_dir = appdirs::user_data_dir(Some("baremetrics"), Some("rei"), true)
.unwrap() .unwrap()