diff --git a/quickpeep_raker/src/bin/qp-seedrake.rs b/quickpeep_raker/src/bin/qp-seedrake.rs index 1c312b5..4de742e 100644 --- a/quickpeep_raker/src/bin/qp-seedrake.rs +++ b/quickpeep_raker/src/bin/qp-seedrake.rs @@ -1,18 +1,23 @@ use clap::Parser; +use std::borrow::{Borrow, BorrowMut}; use std::collections::BTreeSet; use env_logger::Env; use anyhow::{anyhow, bail, Context}; -use arc_interner::ArcIntern; use smartstring::alias::CompactString; +use reqwest::{Client, Url}; use std::path::PathBuf; use tokio::sync::mpsc::{Receiver, Sender}; use quickpeep_raker::config; +use quickpeep_raker::raking::analysis::get_reduced_domain; +use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent}; +use quickpeep_raker::storage::records::AllowedDomainRecord; use quickpeep_raker::storage::RakerStore; use quickpeep_seed_parser::parse_seeds; +use quickpeep_utils::dirty::DirtyTracker; /// Seeds a raker's queue with URLs #[derive(Clone, Debug, Parser)] @@ -69,6 +74,15 @@ pub enum UrlOrUrlPattern { UrlPrefix(String), } +impl UrlOrUrlPattern { + pub fn as_str(&self) -> &str { + match self { + UrlOrUrlPattern::Url(url) => url.as_str(), + UrlOrUrlPattern::UrlPrefix(url_prefix) => url_prefix.as_str(), + } + } +} + /// Task that loads seeds from the filesystem async fn seed_loader(seed_files: Vec, send: &Sender) -> anyhow::Result<()> { for seed_file in seed_files { @@ -141,7 +155,100 @@ async fn find_seed_files(seed_dir: PathBuf) -> anyhow::Result> { Ok(seedfiles) } -/// Task that imports seeds into the store -async fn importer(store: RakerStore, recv: Receiver) -> anyhow::Result<()> { - todo!() +const BATCH_SIZE: usize = 256; + +#[derive(Clone, Debug, Default)] +pub struct SeedImportStats { + pub new_domains: u32, + pub new_sitemaps: u32, + pub new_urls: u32, + pub already_present_urls: u32, +} + +/// Task that imports seeds into the store +async fn importer(store: RakerStore, mut recv: Receiver) -> anyhow::Result { + let mut buf = Vec::with_capacity(BATCH_SIZE); + let mut stats = SeedImportStats::default(); + let client = Client::new(); + while let Some(seed) = recv.recv().await { + buf.push(seed); + + if buf.len() == BATCH_SIZE { + import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?; + } + } + import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?; + + Ok(stats) +} + +async fn import_and_flush_batch( + store: &RakerStore, + buf: &mut Vec, + stats: &mut SeedImportStats, + client: &Client, +) -> anyhow::Result<()> { + let txn = store.rw_txn()?; + for seed in buf.drain(..) { + let as_url = Url::parse(seed.url.as_str()) + .with_context(|| format!("Failed to parse {:?} as URL", seed.url))?; + let domain = get_reduced_domain(&as_url)?; + + let allowed_domain_record = txn.get_allowed_domain_record(domain.borrow())?; + + let is_domain_new = allowed_domain_record.is_none(); + if is_domain_new { + stats.new_domains += 1; + } + + let mut allowed_domain_record = DirtyTracker::new( + allowed_domain_record.unwrap_or_else(|| AllowedDomainRecord::default()), + ); + + // Register the domain. This is a no-op if it's already active or backing off. + txn.insert_active_domain_with_new_raffle_ticket(domain.clone().into_owned())?; + + let url_like = match &seed.url { + UrlOrUrlPattern::Url(url_str) => { + let url = Url::parse(url_str.as_str())?; + if txn.enqueue_url(url.as_str(), None, RakeIntent::Any)? { + stats.new_urls += 1; + } else { + stats.already_present_urls += 1; + } + url + } + UrlOrUrlPattern::UrlPrefix(prefix) => { + let prefix_as_url = Url::parse(prefix.as_str())?; + if txn.enqueue_url(prefix_as_url.as_str(), None, RakeIntent::Any)? { + stats.new_urls += 1; + } else { + stats.already_present_urls += 1; + } + if is_domain_new { + let allowed_domain_record: &mut AllowedDomainRecord = + allowed_domain_record.borrow_mut(); + allowed_domain_record + .restricted_prefixes + .insert(prefix_as_url.path().to_string()); + } + prefix_as_url + } + }; + + if allowed_domain_record.is_dirty() { + txn.put_allowed_domain_record(domain.borrow(), allowed_domain_record.into_inner())?; + } + + if is_domain_new { + // look at robots.txt and discover sitemaps! + if let Some(robots_txt) = get_robots_txt_for(&url_like, &client).await? { + for sitemap in robots_txt.sitemaps { + txn.enqueue_url(sitemap.url.as_str(), None, RakeIntent::SiteMap)?; + stats.new_sitemaps += 1; + } + } + } + } + Ok(()) } diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 8eb1059..0b0519d 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -3,7 +3,8 @@ use crate::raking::{RakeIntent, TemporaryFailure}; use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::records::{ - ActiveDomainRecord, BackingOffDomainRecord, QueueUrlRecord, UrlVisitedRecord, + ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, QueueUrlRecord, + UrlVisitedRecord, }; use anyhow::{bail, ensure, Context}; use libmdbx::{ @@ -38,6 +39,10 @@ pub struct Databases<'env> { pub backing_off_domains: Database<'env>, /// URL → VisitedDomainRecord pub visited_urls: Database<'env>, + /// Domain → AllowedDomainRecord + pub allowed_domains: Database<'env>, + /// Domain \n URL → Number of refs (INT VALUE) + pub urls_on_hold: Database<'env>, } impl<'env> Databases<'env> { @@ -58,7 +63,7 @@ impl<'env> Databases<'env> { } // Must match the order of the Databases struct fields. -pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [ +pub const DATABASES: [(&'static str, DatabaseFlags); 8] = [ ("urls_queue", DatabaseFlags::empty()), ("active_domains", DatabaseFlags::empty()), ("active_domain_raffle", DatabaseFlags::INTEGER_KEY), @@ -68,6 +73,8 @@ pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [ ), ("backing_off_domains", DatabaseFlags::empty()), ("urls_visited", DatabaseFlags::empty()), + ("allowed_domains", DatabaseFlags::empty()), + ("urls_on_hold", DatabaseFlags::empty()), ]; #[self_referencing] @@ -153,6 +160,8 @@ impl RakerStore { backing_off_reinstatements: dbs.next().unwrap(), backing_off_domains: dbs.next().unwrap(), visited_urls: dbs.next().unwrap(), + allowed_domains: dbs.next().unwrap(), + urls_on_hold: dbs.next().unwrap(), } }, } @@ -423,6 +432,22 @@ impl<'a> RakerTxn<'a, RW> { Ok(true) } + + pub fn put_allowed_domain_record( + &self, + domain: &str, + allowed_domain_record: AllowedDomainRecord, + ) -> anyhow::Result<()> { + let allowed_domains = &self.mdbx.borrow_dbs().allowed_domains; + + self.mdbx_txn.put( + allowed_domains, + domain.as_bytes(), + MdbxBare(allowed_domain_record).as_bytes(), + WriteFlags::empty(), + )?; + Ok(()) + } } /// Registers metrics for the datastore. Call this once at startup. @@ -523,6 +548,21 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> { } } + pub fn get_allowed_domain_record( + &self, + domain: &str, + ) -> anyhow::Result> { + let allowed_domains = &self.mdbx.borrow_dbs().allowed_domains; + + match self + .mdbx_txn + .get::>(allowed_domains, domain.as_bytes())? + { + None => Ok(None), + Some(MdbxBare(record)) => Ok(Some(record)), + } + } + /// Emits metrics for the datastore. Call this occasionally. pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> { for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() { diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index 5102ae9..745e219 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -1,5 +1,6 @@ use crate::raking::{RakeIntent, TemporaryFailure}; use serde::{Deserialize, Serialize}; +use std::collections::BTreeSet; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ActiveDomainRecord { @@ -35,8 +36,9 @@ pub struct BackingOffDomainRecord { pub reinstate_at: u64, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct DomainMaskRestriction { - /// TODO List of acceptable URL patterns... - pub patterns: Vec, +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct AllowedDomainRecord { + /// Set of acceptable path prefixes. + /// Empty if ALL path prefixes are permitted. + pub restricted_prefixes: BTreeSet, } diff --git a/quickpeep_utils/src/dirty.rs b/quickpeep_utils/src/dirty.rs new file mode 100644 index 0000000..03bec03 --- /dev/null +++ b/quickpeep_utils/src/dirty.rs @@ -0,0 +1,40 @@ +use std::borrow::{Borrow, BorrowMut}; + +pub struct DirtyTracker { + inner: T, + dirty: bool, +} + +impl Borrow for DirtyTracker { + fn borrow(&self) -> &T { + &self.inner + } +} + +impl BorrowMut for DirtyTracker { + fn borrow_mut(&mut self) -> &mut T { + self.dirty = true; + &mut self.inner + } +} + +impl DirtyTracker { + pub fn new(inner: T) -> DirtyTracker { + DirtyTracker { + inner, + dirty: false, + } + } + + pub fn is_dirty(&self) -> bool { + self.dirty + } + + pub fn make_clean(&mut self) { + self.dirty = false; + } + + pub fn into_inner(self) -> T { + self.inner + } +} diff --git a/quickpeep_utils/src/lib.rs b/quickpeep_utils/src/lib.rs index 6bcb8ed..ae342d9 100644 --- a/quickpeep_utils/src/lib.rs +++ b/quickpeep_utils/src/lib.rs @@ -1,2 +1,3 @@ pub mod dates; +pub mod dirty; pub mod lazy;