diff --git a/quickpeep_raker/src/bin/qp-seedrake.rs b/quickpeep_raker/src/bin/qp-seedrake.rs index 8566a4c..f89ae77 100644 --- a/quickpeep_raker/src/bin/qp-seedrake.rs +++ b/quickpeep_raker/src/bin/qp-seedrake.rs @@ -6,7 +6,7 @@ use env_logger::Env; use anyhow::{anyhow, bail, Context}; -use colour::{dark_green_ln, dark_yellow, green, yellow_ln}; +use colour::{dark_green_ln, dark_red_ln, dark_yellow, green, red, yellow_ln}; use log::warn; use reqwest::{Client, Url}; use std::path::PathBuf; @@ -14,13 +14,17 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; use quickpeep_raker::config; +use quickpeep_raker::config::RakerConfig; use quickpeep_raker::raking::analysis::get_reduced_domain; use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent}; -use quickpeep_raker::storage::records::AllowedDomainRecord; +use quickpeep_raker::storage::records::{AllowedDomainRecord, WeedDomainRecord}; use quickpeep_raker::storage::RakerStore; use quickpeep_seed_parser::parse_seeds; use quickpeep_utils::dirty::DirtyTracker; +pub const SEED_EXTENSION: &'static str = ".seed"; +pub const WEED_EXTENSION: &'static str = ".weed"; + /// Seeds a raker's queue with URLs #[derive(Clone, Debug, Parser)] pub struct Opts { @@ -55,19 +59,27 @@ pub async fn main() -> anyhow::Result<()> { let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; - let (seed_tx, seed_rx) = mpsc::channel(128); + import_seeds(store.clone(), &config).await?; - let seed_files = find_seed_files(config.seed_dir.clone()).await?; + import_weeds(store.clone(), &config).await?; - eprintln!("{:?}", seed_files); + Ok(()) +} + +pub async fn import_seeds(store: RakerStore, config: &RakerConfig) -> anyhow::Result<()> { + let (weed_tx, weed_rx) = mpsc::channel(128); + + let weed_files = find_seed_files(config.seed_dir.clone(), WEED_EXTENSION).await?; + + eprintln!("{:?}", weed_files); tokio::spawn(async move { - seed_loader(seed_files, &seed_tx).await?; + seed_loader(weed_files, &weed_tx).await?; Ok(()) as anyhow::Result<()> }); - let stats = importer(store, seed_rx).await?; + let stats = importer(store, weed_rx, false).await?; dark_green_ln!("=== Seeds Imported! ==="); green!("New URLs: "); @@ -78,6 +90,30 @@ pub async fn main() -> anyhow::Result<()> { yellow_ln!("{:?}", stats.new_domains); dark_yellow!("Seen URLs: "); yellow_ln!("{:?}", stats.already_present_urls); + println!(); + + Ok(()) +} + +pub async fn import_weeds(store: RakerStore, config: &RakerConfig) -> anyhow::Result<()> { + let (seed_tx, seed_rx) = mpsc::channel(128); + + let seed_files = find_seed_files(config.seed_dir.clone(), SEED_EXTENSION).await?; + + eprintln!("{:?}", seed_files); + + tokio::spawn(async move { + seed_loader(seed_files, &seed_tx).await?; + + Ok(()) as anyhow::Result<()> + }); + + let stats = importer(store, seed_rx, true).await?; + + dark_red_ln!("=== Weeds Imported! ==="); + red!("New domains: "); + yellow_ln!("{:?}", stats.new_domains); + println!(); Ok(()) } @@ -151,7 +187,7 @@ fn seed_url_parse_pattern(mut url: String) -> UrlOrUrlPattern { } } -async fn find_seed_files(seed_dir: PathBuf) -> anyhow::Result> { +async fn find_seed_files(seed_dir: PathBuf, extension: &str) -> anyhow::Result> { let mut dirs = vec![seed_dir]; let mut seedfiles = Vec::new(); @@ -174,7 +210,7 @@ async fn find_seed_files(seed_dir: PathBuf) -> anyhow::Result> { if file_name.starts_with(".") { continue; } - if file_name.ends_with(".seed") { + if file_name.ends_with(extension) { seedfiles.push(path); continue; } @@ -199,7 +235,11 @@ pub struct SeedImportStats { } /// Task that imports seeds into the store -async fn importer(store: RakerStore, mut recv: Receiver) -> anyhow::Result { +async fn importer( + store: RakerStore, + mut recv: Receiver, + are_weeds: bool, +) -> anyhow::Result { let mut buf = Vec::with_capacity(BATCH_SIZE); let mut stats = SeedImportStats::default(); let client = Client::new(); @@ -207,15 +247,23 @@ async fn importer(store: RakerStore, mut recv: Receiver) -> anyhow::Result buf.push(seed); if buf.len() == BATCH_SIZE { - import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?; + if are_weeds { + import_and_flush_batch_weeds(&store, &mut buf, &mut stats).await?; + } else { + import_and_flush_batch_seeds(&store, &mut buf, &mut stats, &client).await?; + } } } - import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?; + if are_weeds { + import_and_flush_batch_weeds(&store, &mut buf, &mut stats).await?; + } else { + import_and_flush_batch_seeds(&store, &mut buf, &mut stats, &client).await?; + } Ok(stats) } -async fn import_and_flush_batch( +async fn import_and_flush_batch_seeds( store: &RakerStore, buf: &mut Vec, stats: &mut SeedImportStats, @@ -290,3 +338,63 @@ async fn import_and_flush_batch( txn.commit()?; Ok(()) } + +async fn import_and_flush_batch_weeds( + store: &RakerStore, + buf: &mut Vec, + stats: &mut SeedImportStats, +) -> anyhow::Result<()> { + let txn = store.rw_txn()?; + for seed in buf.drain(..) { + let as_url = Url::parse(seed.url.as_str()) + .with_context(|| format!("Failed to parse {:?} as URL", seed.url))?; + let domain = get_reduced_domain(&as_url)?; + + let weed_domain_record = txn.get_weed_domain_record(domain.borrow())?; + + let is_domain_new = weed_domain_record.is_none(); + if is_domain_new { + stats.new_domains += 1; + } + + let mut weed_domain_record = + DirtyTracker::new(weed_domain_record.unwrap_or_else(|| WeedDomainRecord::default())); + if is_domain_new { + // Mark it as dirty + let _: &mut WeedDomainRecord = weed_domain_record.borrow_mut(); + } + + match &seed.url { + UrlOrUrlPattern::Url(url_str) => { + let url = Url::parse(url_str.as_str())?; + if txn.enqueue_url(url.as_str(), None, RakeIntent::Any)? { + stats.new_urls += 1; + } else { + stats.already_present_urls += 1; + } + url + } + UrlOrUrlPattern::UrlPrefix(prefix) => { + let prefix_as_url = Url::parse(prefix.as_str())?; + if txn.enqueue_url(prefix_as_url.as_str(), None, RakeIntent::Any)? { + stats.new_urls += 1; + } else { + stats.already_present_urls += 1; + } + if is_domain_new { + let weed_domain_record: &mut WeedDomainRecord = weed_domain_record.borrow_mut(); + weed_domain_record + .restricted_prefixes + .insert(prefix_as_url.path().to_string()); + } + prefix_as_url + } + }; + + if weed_domain_record.is_dirty() { + txn.put_weed_domain_record(domain.borrow(), weed_domain_record.into_inner())?; + } + } + txn.commit()?; + Ok(()) +} diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 5f2e0cb..2eec3f4 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -4,7 +4,7 @@ use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::records::{ ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, OnHoldUrlRecord, - QueueUrlRecord, UrlVisitedRecord, + QueueUrlRecord, UrlVisitedRecord, WeedDomainRecord, }; use anyhow::{anyhow, bail, ensure, Context}; use libmdbx::{ @@ -44,6 +44,8 @@ pub struct Databases<'env> { pub allowed_domains: Database<'env>, /// Domain \n URL → OnHoldUrlRecord Number of refs (INT VALUE) pub urls_on_hold: Database<'env>, + /// Domain → WeedDomainRecord + pub weed_domains: Database<'env>, } impl<'env> Databases<'env> { @@ -60,13 +62,14 @@ impl<'env> Databases<'env> { ("visited_urls", &self.visited_urls), ("allowed_domains", &self.allowed_domains), ("urls_on_hold", &self.urls_on_hold), + ("weed_domains", &self.weed_domains), ] .into_iter() } } // Must match the order of the Databases struct fields. -pub const DATABASES: [(&'static str, DatabaseFlags); 8] = [ +pub const DATABASES: [(&'static str, DatabaseFlags); 9] = [ ("urls_queue", DatabaseFlags::empty()), ("active_domains", DatabaseFlags::empty()), ("active_domain_raffle", DatabaseFlags::INTEGER_KEY), @@ -78,6 +81,7 @@ pub const DATABASES: [(&'static str, DatabaseFlags); 8] = [ ("urls_visited", DatabaseFlags::empty()), ("allowed_domains", DatabaseFlags::empty()), ("urls_on_hold", DatabaseFlags::empty()), + ("weed_domains", DatabaseFlags::empty()), ]; #[self_referencing] @@ -165,6 +169,7 @@ impl RakerStore { visited_urls: dbs.next().unwrap(), allowed_domains: dbs.next().unwrap(), urls_on_hold: dbs.next().unwrap(), + weed_domains: dbs.next().unwrap(), } }, } @@ -499,6 +504,22 @@ impl<'a> RakerTxn<'a, RW> { )?; Ok(()) } + + pub fn put_weed_domain_record( + &self, + domain: &str, + weed_domain_record: WeedDomainRecord, + ) -> anyhow::Result<()> { + let weed_domains = &self.mdbx.borrow_dbs().weed_domains; + + self.mdbx_txn.put( + weed_domains, + domain.as_bytes(), + MdbxBare(weed_domain_record).as_bytes(), + WriteFlags::empty(), + )?; + Ok(()) + } } /// Registers metrics for the datastore. Call this once at startup. @@ -658,6 +679,18 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> { } } + pub fn get_weed_domain_record(&self, domain: &str) -> anyhow::Result> { + let weed_domains = &self.mdbx.borrow_dbs().weed_domains; + + match self + .mdbx_txn + .get::>(weed_domains, domain.as_bytes())? + { + None => Ok(None), + Some(MdbxBare(record)) => Ok(Some(record)), + } + } + /// Emits metrics for the datastore. Call this occasionally. pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> { for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() { diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index 91c9976..9861d40 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -51,3 +51,10 @@ pub struct AllowedDomainRecord { /// Empty if ALL path prefixes are permitted. pub restricted_prefixes: BTreeSet, } + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct WeedDomainRecord { + /// Set of weedy path prefixes. + /// Empty if ALL path prefixes are weedy. + pub restricted_prefixes: BTreeSet, +}