From f83799de00ccb7d74a81fb708293d065c192781d Mon Sep 17 00:00:00 2001 From: Olivier Date: Sat, 8 Jan 2022 15:35:22 +0000 Subject: [PATCH] Rework the startup hook and make it a proper response bus --- bare-metrics-gui/src/background_loader.rs | 99 +++++++++++++++++------ bare-metrics-gui/src/main.rs | 10 ++- 2 files changed, 82 insertions(+), 27 deletions(-) diff --git a/bare-metrics-gui/src/background_loader.rs b/bare-metrics-gui/src/background_loader.rs index 61f26a1..f11b5b0 100644 --- a/bare-metrics-gui/src/background_loader.rs +++ b/bare-metrics-gui/src/background_loader.rs @@ -7,7 +7,7 @@ use hdrhistogram::Histogram; use log::{debug, error, info}; use std::collections::{BTreeMap, HashMap}; use std::io::{Read, Seek}; -use std::ops::{DerefMut, RangeInclusive}; +use std::ops::RangeInclusive; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; @@ -16,7 +16,43 @@ use std::sync::{Arc, Mutex, RwLock}; /// This should probably be tunable since it will likely vary per-source ... pub const CHECKPOINT_EVERY_MILLISECONDS: u64 = 600_000_000; -pub type StateCallbackHook = Arc>>>; +pub type MessageNotifyHook = Option>; +pub type MessageNotifyHookContainer = Arc>; + +/// Channel used to send notifications back to the rendering thread. +/// Includes a hook (which is intended to be used to store a callback to request a repaint after +/// a message is sent), called on new messages. +pub struct ResponseBus { + notify: MessageNotifyHookContainer, + channel: Sender, +} + +impl ResponseBus { + pub fn new(sender: Sender) -> ResponseBus { + ResponseBus { + notify: Default::default(), + channel: sender, + } + } + + pub fn set_notify_hook(&self, hook: MessageNotifyHook) { + *(self.notify.lock().expect("Notify hook mutex poisoned.")) = hook; + } + + /// Returns true if the send actually did anything, false if the receiver hung up. + pub fn send(&self, notification: MetricsLogReaderNotification) -> bool { + let result = self.channel.send(notification).is_ok(); + if let Some(callback) = self + .notify + .lock() + .expect("Notify hook mutex poisoned.") + .as_ref() + { + callback(); + } + result + } +} #[derive(Clone, Debug)] pub struct MetricsWindow { @@ -131,6 +167,8 @@ impl ScalarWindow { pub struct MetricsLogReadingRequester { pub shared: Arc, pub tx: Sender, + pub rx: Receiver, + pub hook: MessageNotifyHookContainer, } pub struct MetricsLogReadingShared { @@ -142,19 +180,28 @@ pub struct MetricsLogReadingShared { pub enum MetricsLogReaderMessage { LoadNewWindow { - /// Callback that gets called when there's a state change. - /// The bool represents whether the loading is finished or not. - on_state_change: Box () + 'static + Send>, - new_time_range: RangeInclusive, new_wanted_time_points: u32, }, } +#[derive(Clone, Debug)] +pub enum MetricsLogReaderNotification { + /// Signalled when metric descriptors are available (on first load). + /// (may re-use in future if further descriptors become available) + MetricDescriptorsAvailable {}, + + /// Signalled when there's a state change in the loading of a window. + WindowLoadStateChange { + /// Whether the window is ready or not. + done: bool, + }, +} + impl MetricsLogReadingRequester { pub fn new_manager( reader: MetricsLogReader, - on_initial_scan_done: StateCallbackHook, + hook: MessageNotifyHook, ) -> MetricsLogReadingRequester { let shared = Arc::new(MetricsLogReadingShared { current_window: RwLock::new(MetricsWindow { @@ -170,7 +217,15 @@ impl MetricsLogReadingRequester { }); let manager_shared_ref = shared.clone(); let (tx, rx) = std::sync::mpsc::channel(); - let requester = MetricsLogReadingRequester { shared, tx }; + let (notify_tx, notify_rx) = std::sync::mpsc::channel(); + let response_bus = ResponseBus::new(notify_tx); + response_bus.set_notify_hook(hook); + let requester = MetricsLogReadingRequester { + shared, + tx, + rx: notify_rx, + hook: response_bus.notify.clone(), + }; std::thread::Builder::new() .name("metricslogreader".to_string()) @@ -179,7 +234,7 @@ impl MetricsLogReadingRequester { manager_shared_ref, reader, rx, - on_initial_scan_done, + response_bus, ) { error!("Error in background log reader: {:?}", err); } @@ -215,6 +270,7 @@ pub struct MetricsLogReaderManager { metric_descriptors_dirty: bool, metric_descriptor_table: MetricDescriptorTable, checkpoints: Vec, + response_bus: ResponseBus, } /// We use B-Tree maps because they sort properly. That's useful for ensuring the UI is consistent @@ -555,24 +611,18 @@ impl MetricsLogReaderManager { Ok(metrics_window) } - fn run(&mut self, on_initial_scan_done: StateCallbackHook) -> anyhow::Result<()> { + fn run(&mut self) -> anyhow::Result<()> { info!("Starting manager"); self.initial_scan()?; self.update_metric_descriptors(); info!("Initial scan done."); - let mut the_func = None; - let mut callback_guard = on_initial_scan_done - .lock() - .map_err(|_| anyhow!("Can't lock"))?; - std::mem::swap(&mut the_func, callback_guard.deref_mut()); - if let Some(on_initial_scan_done) = the_func { - on_initial_scan_done(); - } + + self.response_bus + .send(MetricsLogReaderNotification::MetricDescriptorsAvailable {}); while let Ok(msg) = self.rx.recv() { match msg { MetricsLogReaderMessage::LoadNewWindow { - on_state_change, new_time_range, new_wanted_time_points, } => { @@ -583,7 +633,8 @@ impl MetricsLogReaderManager { self.shared_ref .loading_new_window .store(true, Ordering::SeqCst); - on_state_change(false); + self.response_bus + .send(MetricsLogReaderNotification::WindowLoadStateChange { done: false }); let start = if *new_time_range.start() == f64::NEG_INFINITY { // autoscale by getting the earliest time point of the metrics log @@ -634,7 +685,8 @@ impl MetricsLogReaderManager { debug!("Finished loading window."); - on_state_change(true); + self.response_bus + .send(MetricsLogReaderNotification::WindowLoadStateChange { done: true }); } } } @@ -646,7 +698,7 @@ impl MetricsLogReaderManager { shared_ref: Arc, reader: MetricsLogReader, rx: Receiver, - on_initial_scan_done: StateCallbackHook, + response_bus: ResponseBus, ) -> anyhow::Result<()> { let mut manager = MetricsLogReaderManager { shared_ref, @@ -656,8 +708,9 @@ impl MetricsLogReaderManager { metric_descriptors_dirty: false, metric_descriptor_table: Default::default(), checkpoints: Default::default(), + response_bus, }; - manager.run(on_initial_scan_done) + manager.run() } } diff --git a/bare-metrics-gui/src/main.rs b/bare-metrics-gui/src/main.rs index 2247f5b..73a28fd 100644 --- a/bare-metrics-gui/src/main.rs +++ b/bare-metrics-gui/src/main.rs @@ -52,10 +52,14 @@ impl App for MetricsGui { fn setup(&mut self, ctx: &CtxRef, _frame: &mut Frame<'_>, _storage: Option<&dyn Storage>) { let ctx = ctx.clone(); + + *(self.requester.hook.lock().unwrap()) = Some(Box::new(move || { + ctx.request_repaint(); + })); + self.requester .tx .send(MetricsLogReaderMessage::LoadNewWindow { - on_state_change: Box::new(move |_| ctx.request_repaint()), new_time_range: f64::NEG_INFINITY..=f64::INFINITY, new_wanted_time_points: 512, }) @@ -84,9 +88,7 @@ fn main() -> anyhow::Result<()> { let reader = MetricsLogReader::new(file).unwrap(); let log_app_name = reader.header.application_name.clone(); - // TODO we can attach a callback to this hook. - let init_hook_callback = Default::default(); - let requester = MetricsLogReadingRequester::new_manager(reader, init_hook_callback); + let requester = MetricsLogReadingRequester::new_manager(reader, None); let dashboard_dir = appdirs::user_data_dir(Some("baremetrics"), Some("rei"), true) .unwrap()