diff --git a/.gitignore b/.gitignore index ecae9b5..db0c65f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ quickpeep/testdb.sqlite qp_web.ron dist book -workbench \ No newline at end of file +workbench +rakepacks \ No newline at end of file diff --git a/qp_raker.sample.toml b/qp_raker.sample.toml index ee37a62..069a6d7 100644 --- a/qp_raker.sample.toml +++ b/qp_raker.sample.toml @@ -5,4 +5,6 @@ emit_dir = "./rakepacks" [metrics] prometheus = "127.0.0.1:9774" -# bare_metrics = true \ No newline at end of file +# bare_metrics = true + +[pack_emitter] diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index f184dba..842a5bb 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -4,7 +4,7 @@ use env_logger::Env; use adblock::lists::RuleTypes; use anyhow::{bail, Context}; -use log::{error, warn}; +use log::{debug, error, warn}; use lru::LruCache; use metrics_exporter_prometheus::PrometheusBuilder; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; @@ -56,7 +56,11 @@ pub struct Opts { #[tokio::main] pub async fn main() -> anyhow::Result<()> { - env_logger::Builder::from_env(Env::default().default_filter_or("info,quickpeep=debug")).init(); + env_logger::Builder::from_env( + Env::default().default_filter_or("info,quickpeep_raker=debug,qp_raker=debug"), + ) + .init(); + debug!("Starting up..."); let opts: Opts = Opts::parse(); @@ -233,6 +237,12 @@ pub async fn main() -> anyhow::Result<()> { for task in tasks { task.await?; } + + for task in emitters { + let result = task.join().expect("Can't join thread"); + result?; + } + let _ = dsmu_cancel_tx.send(()); datastore_metrics_updater.await??; diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index c447864..82d55c8 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -231,20 +231,8 @@ impl TaskContext { match self.process_outcome(&url, rake_outcome).await? { NextAction::Continue => { - // Take that URL off the queue - let now = Utc::today(); - let record = UrlVisitedRecord { - last_visited_days: date_to_quickpeep_days(&now)?, - }; - - let domain = domain.clone(); - self.store - .async_rw_txn(move |txn| { - txn.mark_url_as_visited(&domain, &url_str, record)?; - txn.commit()?; - Ok(()) - }) - .await?; + // The URL has already been taken off the queue. + // We just need to continue! } NextAction::ChangeDomain => { let mut cache = self diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 1b85671..4352cee 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -324,10 +324,13 @@ impl<'a> RakerTxn<'a, RW> { let queue_key = format!("{}\n{}", domain, url_str); - ensure!( - self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?, - "No queued URL to delete" - ); + // We legitimately want this to NOP when already dequeued; so don't ensure the opposite. + // ensure!( + // self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?, + // "No queued URL to delete ({})", queue_key + // ); + + self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?; self.mdbx_txn.put( visited_urls,