diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index ddc4491..065055c 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -14,7 +14,8 @@ use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use tokio::fs::File; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::{mpsc, oneshot, Semaphore}; +use tokio::time::MissedTickBehavior; use quickpeep_raker::config; use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet}; @@ -153,11 +154,36 @@ pub async fn main() -> anyhow::Result<()> { })); } + let (dsmu_cancel_tx, mut dsmu_cancel_rx) = oneshot::channel(); + let datastore_metrics_updater = { + let store = task_context.store.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = interval.tick() => { + let txn = store.ro_txn()?; + txn.emit_datastore_metrics()?; + } + _ = &mut dsmu_cancel_rx => { + break; + } + } + } + + let r: anyhow::Result<()> = Ok(()); + r + }) + }; + drop(task_context); for task in tasks { task.await?; } + let _ = dsmu_cancel_tx.send(()); + datastore_metrics_updater.await??; Ok(()) } diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 5a05135..8eb1059 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -11,6 +11,7 @@ use libmdbx::{ WriteFlags, WriteMap, RO, RW, }; use log::info; +use metrics::{describe_gauge, gauge, Unit}; use ouroboros::self_referencing; use reqwest::Url; use std::borrow::Cow; @@ -39,6 +40,23 @@ pub struct Databases<'env> { pub visited_urls: Database<'env>, } +impl<'env> Databases<'env> { + pub fn iter_all_databases(&self) -> impl Iterator)> { + [ + ("queue_urls", &self.queue_urls), + ("active_domains", &self.active_domains), + ("active_domain_raffle", &self.active_domain_raffle), + ( + "backing_off_reinstatements", + &self.backing_off_reinstatements, + ), + ("backing_off_domains", &self.backing_off_domains), + ("visited_urls", &self.visited_urls), + ] + .into_iter() + } +} + // Must match the order of the Databases struct fields. pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [ ("urls_queue", DatabaseFlags::empty()), @@ -142,6 +160,7 @@ impl RakerStore { Ok(RakerStore { mdbx: Arc::new(mdbx), + metrics: Some(Arc::new(Default::default())), }) } @@ -179,6 +198,7 @@ impl RakerStore { /// Optional struct to store metrics. Intended for exporting over Prometheus so we can see at a /// glance what the shape of the datastore looks like. +#[derive(Default)] pub struct RakerStoreMetrics { pub active_domains_count: AtomicU64, pub backoff_domains_count: AtomicU64, @@ -403,10 +423,24 @@ impl<'a> RakerTxn<'a, RW> { Ok(true) } +} - pub fn calculate_initial_metrics(&self) -> anyhow::Result<()> { - todo!() - } +/// Registers metrics for the datastore. Call this once at startup. +pub fn register_datastore_metrics() -> anyhow::Result<()> { + // Registration is if you want a handle to the Gauge. We don't care for now so leave it be... + // register_gauge!("db_entries"); + describe_gauge!( + "db_entries", + Unit::Count, + "Number of entries in a specific database." + ); + // register_gauge!("db_size_bytes"); + describe_gauge!( + "db_size_bytes", + Unit::Bytes, + "Size in bytes of all the pages used by a specific database." + ); + Ok(()) } /// Read-only implementations (but can also be used on RW transactions) @@ -488,4 +522,17 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> { None => Ok(None), } } + + /// Emits metrics for the datastore. Call this occasionally. + pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> { + for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() { + let stat = self.mdbx_txn.db_stat(db)?; + let entries = stat.entries() as f64; + let size_in_pages = stat.branch_pages() + stat.leaf_pages() + stat.overflow_pages(); + let size_in_bytes = stat.page_size() as f64 * size_in_pages as f64; + gauge!("db_entries", entries, "db" => db_name); + gauge!("db_size_bytes", size_in_bytes, "db" => db_name); + } + Ok(()) + } }