Add way of periodically tracking database metrics
This commit is contained in:
parent
e651a953f6
commit
c3ccd64d5f
|
@ -14,7 +14,8 @@ use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::{Arc, Mutex, RwLock};
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::sync::{mpsc, Semaphore};
|
use tokio::sync::{mpsc, oneshot, Semaphore};
|
||||||
|
use tokio::time::MissedTickBehavior;
|
||||||
|
|
||||||
use quickpeep_raker::config;
|
use quickpeep_raker::config;
|
||||||
use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet};
|
use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet};
|
||||||
|
@ -153,11 +154,36 @@ pub async fn main() -> anyhow::Result<()> {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let (dsmu_cancel_tx, mut dsmu_cancel_rx) = oneshot::channel();
|
||||||
|
let datastore_metrics_updater = {
|
||||||
|
let store = task_context.store.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(60));
|
||||||
|
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = interval.tick() => {
|
||||||
|
let txn = store.ro_txn()?;
|
||||||
|
txn.emit_datastore_metrics()?;
|
||||||
|
}
|
||||||
|
_ = &mut dsmu_cancel_rx => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let r: anyhow::Result<()> = Ok(());
|
||||||
|
r
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
drop(task_context);
|
drop(task_context);
|
||||||
|
|
||||||
for task in tasks {
|
for task in tasks {
|
||||||
task.await?;
|
task.await?;
|
||||||
}
|
}
|
||||||
|
let _ = dsmu_cancel_tx.send(());
|
||||||
|
datastore_metrics_updater.await??;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ use libmdbx::{
|
||||||
WriteFlags, WriteMap, RO, RW,
|
WriteFlags, WriteMap, RO, RW,
|
||||||
};
|
};
|
||||||
use log::info;
|
use log::info;
|
||||||
|
use metrics::{describe_gauge, gauge, Unit};
|
||||||
use ouroboros::self_referencing;
|
use ouroboros::self_referencing;
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
@ -39,6 +40,23 @@ pub struct Databases<'env> {
|
||||||
pub visited_urls: Database<'env>,
|
pub visited_urls: Database<'env>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'env> Databases<'env> {
|
||||||
|
pub fn iter_all_databases(&self) -> impl Iterator<Item = (&'static str, &Database<'env>)> {
|
||||||
|
[
|
||||||
|
("queue_urls", &self.queue_urls),
|
||||||
|
("active_domains", &self.active_domains),
|
||||||
|
("active_domain_raffle", &self.active_domain_raffle),
|
||||||
|
(
|
||||||
|
"backing_off_reinstatements",
|
||||||
|
&self.backing_off_reinstatements,
|
||||||
|
),
|
||||||
|
("backing_off_domains", &self.backing_off_domains),
|
||||||
|
("visited_urls", &self.visited_urls),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Must match the order of the Databases struct fields.
|
// Must match the order of the Databases struct fields.
|
||||||
pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [
|
pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [
|
||||||
("urls_queue", DatabaseFlags::empty()),
|
("urls_queue", DatabaseFlags::empty()),
|
||||||
|
@ -142,6 +160,7 @@ impl RakerStore {
|
||||||
|
|
||||||
Ok(RakerStore {
|
Ok(RakerStore {
|
||||||
mdbx: Arc::new(mdbx),
|
mdbx: Arc::new(mdbx),
|
||||||
|
metrics: Some(Arc::new(Default::default())),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,6 +198,7 @@ impl RakerStore {
|
||||||
|
|
||||||
/// Optional struct to store metrics. Intended for exporting over Prometheus so we can see at a
|
/// Optional struct to store metrics. Intended for exporting over Prometheus so we can see at a
|
||||||
/// glance what the shape of the datastore looks like.
|
/// glance what the shape of the datastore looks like.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct RakerStoreMetrics {
|
pub struct RakerStoreMetrics {
|
||||||
pub active_domains_count: AtomicU64,
|
pub active_domains_count: AtomicU64,
|
||||||
pub backoff_domains_count: AtomicU64,
|
pub backoff_domains_count: AtomicU64,
|
||||||
|
@ -403,10 +423,24 @@ impl<'a> RakerTxn<'a, RW> {
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn calculate_initial_metrics(&self) -> anyhow::Result<()> {
|
|
||||||
todo!()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Registers metrics for the datastore. Call this once at startup.
|
||||||
|
pub fn register_datastore_metrics() -> anyhow::Result<()> {
|
||||||
|
// Registration is if you want a handle to the Gauge. We don't care for now so leave it be...
|
||||||
|
// register_gauge!("db_entries");
|
||||||
|
describe_gauge!(
|
||||||
|
"db_entries",
|
||||||
|
Unit::Count,
|
||||||
|
"Number of entries in a specific database."
|
||||||
|
);
|
||||||
|
// register_gauge!("db_size_bytes");
|
||||||
|
describe_gauge!(
|
||||||
|
"db_size_bytes",
|
||||||
|
Unit::Bytes,
|
||||||
|
"Size in bytes of all the pages used by a specific database."
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read-only implementations (but can also be used on RW transactions)
|
/// Read-only implementations (but can also be used on RW transactions)
|
||||||
|
@ -488,4 +522,17 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> {
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Emits metrics for the datastore. Call this occasionally.
|
||||||
|
pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> {
|
||||||
|
for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() {
|
||||||
|
let stat = self.mdbx_txn.db_stat(db)?;
|
||||||
|
let entries = stat.entries() as f64;
|
||||||
|
let size_in_pages = stat.branch_pages() + stat.leaf_pages() + stat.overflow_pages();
|
||||||
|
let size_in_bytes = stat.page_size() as f64 * size_in_pages as f64;
|
||||||
|
gauge!("db_entries", entries, "db" => db_name);
|
||||||
|
gauge!("db_size_bytes", size_in_bytes, "db" => db_name);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue