From db9fe77c1617392dcc1bfb432ae02306c29c1b18 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 22 Mar 2022 20:01:26 +0000 Subject: [PATCH] Re-apply seeds and weeds at import time, to on-hold URLs --- quickpeep_raker/src/bin/qp-seedrake.rs | 16 ++-- quickpeep_raker/src/storage.rs | 1 + quickpeep_raker/src/storage/maintenance.rs | 93 ++++++++++++++++++++++ 3 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 quickpeep_raker/src/storage/maintenance.rs diff --git a/quickpeep_raker/src/bin/qp-seedrake.rs b/quickpeep_raker/src/bin/qp-seedrake.rs index f89ae77..065c1da 100644 --- a/quickpeep_raker/src/bin/qp-seedrake.rs +++ b/quickpeep_raker/src/bin/qp-seedrake.rs @@ -18,7 +18,7 @@ use quickpeep_raker::config::RakerConfig; use quickpeep_raker::raking::analysis::get_reduced_domain; use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent}; use quickpeep_raker::storage::records::{AllowedDomainRecord, WeedDomainRecord}; -use quickpeep_raker::storage::RakerStore; +use quickpeep_raker::storage::{maintenance, RakerStore}; use quickpeep_seed_parser::parse_seeds; use quickpeep_utils::dirty::DirtyTracker; @@ -63,6 +63,12 @@ pub async fn main() -> anyhow::Result<()> { import_weeds(store.clone(), &config).await?; + eprintln!("... re-applying seeds and weeds to on-hold URLs ..."); + store + .async_rw_txn(|txn| maintenance::reapply_seeds_and_weeds_to_on_hold_urls(txn)) + .await?; + eprintln!("... done!"); + Ok(()) } @@ -96,19 +102,19 @@ pub async fn import_seeds(store: RakerStore, config: &RakerConfig) -> anyhow::Re } pub async fn import_weeds(store: RakerStore, config: &RakerConfig) -> anyhow::Result<()> { - let (seed_tx, seed_rx) = mpsc::channel(128); + let (weed_tx, weed_rx) = mpsc::channel(128); - let seed_files = find_seed_files(config.seed_dir.clone(), SEED_EXTENSION).await?; + let seed_files = find_seed_files(config.seed_dir.clone(), WEED_EXTENSION).await?; eprintln!("{:?}", seed_files); tokio::spawn(async move { - seed_loader(seed_files, &seed_tx).await?; + seed_loader(seed_files, &weed_tx).await?; Ok(()) as anyhow::Result<()> }); - let stats = importer(store, seed_rx, true).await?; + let stats = importer(store, weed_rx, true).await?; dark_red_ln!("=== Weeds Imported! ==="); red!("New domains: "); diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 2eec3f4..1a92c43 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -22,6 +22,7 @@ use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; +pub mod maintenance; pub mod mdbx_helper_types; mod migrations; pub mod records; diff --git a/quickpeep_raker/src/storage/maintenance.rs b/quickpeep_raker/src/storage/maintenance.rs new file mode 100644 index 0000000..f48f4d6 --- /dev/null +++ b/quickpeep_raker/src/storage/maintenance.rs @@ -0,0 +1,93 @@ +use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString}; +use crate::storage::records::{AllowedDomainRecord, OnHoldUrlRecord, WeedDomainRecord}; +use crate::storage::RakerTxn; +use anyhow::Context; +use libmdbx::{Database, WriteFlags, RW}; +use log::warn; +use reqwest::Url; + +/// Runs one big transaction that: +/// - scans on-hold URLs +/// - moves 'allowed' ones to the queue +/// - deletes 'weeds' +/// - leaves unknown ones alone +/// +/// Ideally should be applied after importing seeds and weeds on an existing database. +pub fn reapply_seeds_and_weeds_to_on_hold_urls(txn: RakerTxn) -> anyhow::Result<()> { + struct DomainState { + pub domain: String, + pub allowed_domain_record: Option, + pub weed_domain_record: Option, + } + + let urls_on_hold: &Database = &txn.mdbx.borrow_dbs().urls_on_hold; + + let mut domain_state = None; + + // Scan through the on-hold URLs + let mut cur = txn.mdbx_txn.cursor(urls_on_hold)?; + let mut first_iteration = true; + + while let Some((MdbxString(domain_then_url), MdbxBare(record))) = if first_iteration { + first_iteration = false; + cur.first::>() + } else { + cur.next::>() + }? { + let mut split = domain_then_url.as_ref().split("\n"); + let domain = split.next().context("No first split..?")?; + let url_str = split.next().context("No URL")?; + + // Is the domain new? + if domain_state + .as_ref() + .map(|ds: &DomainState| &ds.domain != domain) + .unwrap_or(true) + { + // Then load the relevant records for it. + domain_state = Some(DomainState { + domain: domain.to_owned(), + allowed_domain_record: txn.get_allowed_domain_record(domain)?, + weed_domain_record: txn.get_weed_domain_record(domain)?, + }); + } + + let url = Url::parse(url_str)?; + + let domain_state = domain_state.as_ref().unwrap(); + let is_allowed = domain_state + .allowed_domain_record + .as_ref() + .map(|adr: &AllowedDomainRecord| adr.applies_to_url(&url)) + .unwrap_or(false); + let is_weed = domain_state + .weed_domain_record + .as_ref() + .map(|wdr: &WeedDomainRecord| wdr.applies_to_url(&url)) + .unwrap_or(false); + + match (is_allowed, is_weed) { + (false, false) => { /* nop */ } + (true, true) => { + warn!( + "Ambiguous: {:?} is both mentioned by a seed and a weed. Ignoring.", + url + ); + } + (true, false) => { + // ALLOWED + // Make it a queued URL + txn.enqueue_url(url_str, None, record.queue_record.intent)?; + cur.del(WriteFlags::empty())?; + } + (false, true) => { + // WEED + // Just delete + cur.del(WriteFlags::empty())?; + } + } + } + + txn.commit()?; + Ok(()) +}