diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index d88a56d..1c0f51e 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -2,21 +2,35 @@ use clap::Parser; use env_logger::Env; +use adblock::lists::RuleTypes; use anyhow::{bail, Context}; -use log::error; +use log::{error, warn}; use lru::LruCache; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use reqwest::redirect::Policy; use std::path::PathBuf; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; +use tokio::fs::File; use tokio::sync::{mpsc, Semaphore}; use quickpeep_raker::config; +use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet}; use quickpeep_raker::raking::page_extraction::PageExtractionService; use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission}; use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; use quickpeep_raker::storage::RakerStore; +use quickpeep_structs::rake_entries::AnalysisAntifeatures; + +/// The ordering is slightly important on these: more specific things should come first. +/// This means they filter out the troublesome elements before the broader filters do. +pub const ADBLOCK_FILTER_PATHS: [(AnalysisAntifeatures, &'static str); 5] = [ + (AnalysisAntifeatures::COOKIE_NAG, "cookie_nag"), + (AnalysisAntifeatures::ANNOYANCE, "annoyance"), + (AnalysisAntifeatures::SOCIAL, "social"), + (AnalysisAntifeatures::PRIVACY, "privacy"), + (AnalysisAntifeatures::ADVERTS, "adverts"), +]; /// Seeds a raker's queue with URLs #[derive(Clone, Debug, Parser)] @@ -66,9 +80,30 @@ pub async fn main() -> anyhow::Result<()> { let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; + let mut adblock_engines = Vec::new(); + for (antifeature, name) in &ADBLOCK_FILTER_PATHS { + let path = PathBuf::from(config.data_dir.join(format!("{}.adblock", name))); + if !path.exists() { + warn!("Missing adblock rules: {:?}.", path); + continue; + } + let file = File::open(&path).await?; + adblock_engines.push(( + *antifeature, + preload_adblock_engine(file, RuleTypes::All).await?, + )); + } + + let mut antifeature_ip_set = IpSet::new(); + + let ips_file = File::open(config.data_dir.join("cf_ips.txt")) + .await + .context("Failed to open CF IPs file")?; + antifeature_ip_set.add_all_from_file(ips_file).await?; + let raker = Raker { - antifeature_ip_set: todo!(), - page_extraction: PageExtractionService::new(vec![])?, // TODO + antifeature_ip_set, + page_extraction: PageExtractionService::new(adblock_engines)?, }; let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; @@ -76,10 +111,12 @@ pub async fn main() -> anyhow::Result<()> { let (pages_tx, pages_rx) = mpsc::channel(32); let (refs_tx, refs_rx) = mpsc::channel(32); + let (rejections_tx, rejections_rx) = mpsc::channel(32); let submission = TaskResultSubmission { pages: pages_tx, references: refs_tx, + rejections: rejections_tx, }; let task_context = TaskContext { diff --git a/quickpeep_raker/src/raking.rs b/quickpeep_raker/src/raking.rs index 3f2c954..0287384 100644 --- a/quickpeep_raker/src/raking.rs +++ b/quickpeep_raker/src/raking.rs @@ -8,7 +8,7 @@ use html5ever::tendril::fmt::Slice; use itertools::Itertools; use lazy_static::lazy_static; use log::debug; -use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry}; +use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry, ReferenceKind}; use reqwest::header::HeaderMap; use reqwest::{Client, Response, Url}; use serde::{Deserialize, Serialize}; @@ -70,27 +70,29 @@ pub struct RobotsTxt { pub rules: Cylon, } -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TemporaryFailure { pub reason: TemporaryFailureReason, pub backoff_sec: u32, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PermanentFailure { pub reason: PermanentFailureReason, } -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum TemporaryFailureReason { MissingInformation(String), ServerError(u16), UnknownClientError(String), + ExcruciatingCrawlDelay(u64), } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum PermanentFailureReason { ResourceDenied(u16), + DeniedToRobots, WrongLanguage(String), UnknownContentType(String), } @@ -103,6 +105,28 @@ pub enum RakeIntent { SiteMap, } +impl From for RakeIntent { + fn from(kind: ReferenceKind) -> Self { + match kind { + ReferenceKind::CanonicalUrl => { + // FIXME We don't know what this is a canonical URL for. Suppose it doesn't matter... + RakeIntent::Any + } + ReferenceKind::Redirect => { + // FIXME We don't know what this is a redirect for. Suppose it doesn't matter... + RakeIntent::Any + } + ReferenceKind::Link => { + // Links can go to pages but also to RSS feeds + RakeIntent::Any + } + ReferenceKind::HeaderLinkedFeed => RakeIntent::Feed, + ReferenceKind::FeedEntry => RakeIntent::Page, + ReferenceKind::SitemapEntry => RakeIntent::Page, + } + } +} + lazy_static! { static ref SITEMAP_MIME_TYPES: HashSet<&'static str> = HashSet::from_iter(vec!["text/xml", "application/xml",]); diff --git a/quickpeep_raker/src/raking/references.rs b/quickpeep_raker/src/raking/references.rs index 40b32ac..5e6d796 100644 --- a/quickpeep_raker/src/raking/references.rs +++ b/quickpeep_raker/src/raking/references.rs @@ -1,6 +1,7 @@ use crate::raking::UrlRaked; use quickpeep_densedoc::{DenseDocument, DenseTree}; use quickpeep_structs::rake_entries::{RakedReference, ReferenceKind}; +use quickpeep_utils::dates::date_to_quickpeep_days; use reqwest::Url; use std::collections::BTreeSet; @@ -42,6 +43,7 @@ pub fn find_references( refs.insert(RakedReference { target: clean_url(&full_url).to_string(), kind: ReferenceKind::Link, + last_mod: None, }); } } @@ -60,6 +62,7 @@ pub fn find_references( refs.insert(RakedReference { target: clean_url(feed).as_str().to_owned(), kind: ReferenceKind::HeaderLinkedFeed, + last_mod: None, }); } @@ -75,6 +78,10 @@ pub fn references_from_urlrakes( .map(|url_raked| RakedReference { target: url_raked.url.to_string(), kind: ref_kind, + last_mod: url_raked + .last_changed + .map(|datetime| date_to_quickpeep_days(&datetime.date()).ok()) + .flatten(), }) .collect() } diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index 1622145..0d9e837 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -1,16 +1,22 @@ +use crate::raking::analysis::get_reduced_domain; +use crate::raking::references::references_from_urlrakes; use crate::raking::{ - get_robots_txt_for, robots_txt_url_for, RakeOutcome, Raker, RobotsTxt, TemporaryFailure, - TemporaryFailureReason, UrlRaked, + get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome, + Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, }; -use crate::storage::records::UrlVisitedRecord; +use crate::storage::records::{ActiveDomainRecord, UrlVisitedRecord}; use crate::storage::RakerStore; +use anyhow::{anyhow, Context}; use chrono::Utc; use cylon::Cylon; use log::warn; use lru::LruCache; -use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry}; +use quickpeep_structs::rake_entries::{ + RakedPageEntry, RakedReference, RakedReferrerEntry, ReferenceKind, +}; use quickpeep_utils::dates::date_to_quickpeep_days; use reqwest::{Client, Url}; +use std::borrow::Cow; use std::collections::HashSet; use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::time::Duration; @@ -36,6 +42,7 @@ enum NextAction { pub struct TaskResultSubmission { pub pages: Sender<(Url, RakedPageEntry)>, pub references: Sender<(Url, RakedReferrerEntry)>, + pub rejections: Sender<(Url, PermanentFailure)>, } #[derive(Clone)] @@ -62,9 +69,40 @@ pub struct TaskContext { } impl TaskContext { - pub async fn run(self) -> anyhow::Result<()> { + pub async fn run(mut self) -> anyhow::Result<()> { // Get a domain to process - todo!(); + loop { + let domain = { + let txn = self.store.ro_txn()?; + txn.choose_random_active_domain()? + }; + + match domain { + Some((domain, active_record)) => { + let is_ours = { + let mut busy_domains = self + .busy_domains + .lock() + .map_err(|_| anyhow!("Busy Domains set poisoned"))?; + busy_domains.insert(domain.clone()) + }; + + if is_ours { + self.process_domain(domain).await?; + } else { + // Already in use; need to pick again + // TODO be smarter about this. + tokio::time::sleep(Duration::from_secs(60)).await; + } + } + None => { + // No domains left! Woot. + // TODO come up with some stopping conditions + tokio::time::sleep(Duration::from_secs(60)).await; + } + } + } + Ok(()) } @@ -128,7 +166,14 @@ impl TaskContext { if let Some(robot_rules) = current_robot_rules.as_ref() { if !robot_rules.allow(url.path().as_bytes()) { // Permanently reject this. - todo!(); + self.process_outcome( + &url, + RakeOutcome::PermanentFailure(PermanentFailure { + reason: PermanentFailureReason::DeniedToRobots, + }), + ) + .await?; + continue; } } @@ -137,11 +182,23 @@ impl TaskContext { tokio::time::sleep_until(wait_until).await; } + let delay = if let Some(robot_rules) = current_robot_rules.as_ref() { + robot_rules + .delay() + .map(Duration::from_secs) + .unwrap_or(DEFAULT_CRAWL_DELAY) + } else { + DEFAULT_CRAWL_DELAY + }; + // Now acquire a permit to go and fetch the desired URL let permit = self.semaphore.acquire().await?; let raked = self.raker.rake(&url, url_record.intent, &self.client).await; drop(permit); + // Next time, we need to wait before our request. + wait_until = Some(Instant::now() + delay); + let rake_outcome = raked.unwrap_or_else(|err| { warn!("Failed to rake {:?}: {:?}", url, err); // Treat this as a temporary rejection (backoff). @@ -166,15 +223,50 @@ impl TaskContext { let domain = domain.clone(); self.store .async_rw_txn(move |txn| { - txn.mark_url_as_visited(domain, url_str, record)?; + txn.mark_url_as_visited(&domain, &url_str, record)?; Ok(()) }) .await?; } NextAction::ChangeDomain => { - todo!() + let mut cache = self + .robotstxt_cache + .write() + .map_err(|_| anyhow!("Robots.txt cache poisoned"))?; + cache.put(domain, current_robot_rules); + + // Choose another domain + return Ok(()); } } + + if delay > MAX_CRAWL_DELAY_BEFORE_BACKOFF { + // This crawl-delay is painfully long! + // We'll respect it, but we treat this as a backoff and let the site lose its place + // in the queue. + + let domain = domain.clone(); + let url = url.clone(); + let backoff = delay.as_secs().try_into().unwrap_or(u32::MAX); + + self.store + .async_rw_txn(move |txn| { + txn.start_backing_off( + &domain, + backoff, + url.to_string(), + TemporaryFailure { + reason: TemporaryFailureReason::ExcruciatingCrawlDelay( + delay.as_secs(), + ), + // Don't stack this up with a backoff; it's not an actual failure! + backoff_sec: 0, + }, + )?; + Ok(()) + }) + .await?; + } } Ok(()) @@ -182,18 +274,200 @@ impl TaskContext { /// Processes the outcome of async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result { + let today = date_to_quickpeep_days(&Utc::today())?; match outcome { - RakeOutcome::RakedPage(page) => {} - RakeOutcome::RakedFeed(feed) => {} - RakeOutcome::RakedSitemap(sitemap) => {} - RakeOutcome::Redirect { reason, new_url } => {} - RakeOutcome::TemporaryFailure(failure) => {} - RakeOutcome::PermanentFailure(failure) => {} + RakeOutcome::RakedPage(page) => { + self.submission + .pages + .send((url.clone(), page.page_entry)) + .await + .context("Page processor shut down; can't stream page!")?; + + self.submission + .references + .send((url.clone(), page.referrer_entry.clone())) + .await + .context("Reference processor shut down; can't stream references!")?; + + self.as_event_processor() + .process_page(url.clone(), today) + .await?; + self.as_event_processor() + .process_refs(url.clone(), page.referrer_entry, today) + .await?; + + Ok(NextAction::Continue) + } + RakeOutcome::RakedFeed(feed) => { + let refs = RakedReferrerEntry { + references: references_from_urlrakes(&feed, ReferenceKind::FeedEntry), + }; + + self.submission + .references + .send((url.clone(), refs.clone())) + .await + .context("Reference processor shut down; can't stream references!")?; + + self.as_event_processor() + .process_refs(url.clone(), refs, today) + .await?; + + Ok(NextAction::Continue) + } + RakeOutcome::RakedSitemap(sitemap) => { + let refs = RakedReferrerEntry { + references: references_from_urlrakes(&sitemap, ReferenceKind::SitemapEntry), + }; + + self.submission + .references + .send((url.clone(), refs.clone())) + .await + .context("Reference processor shut down; can't stream references!")?; + + self.as_event_processor() + .process_refs(url.clone(), refs, today) + .await?; + + Ok(NextAction::Continue) + } + RakeOutcome::Redirect { reason, new_url } => { + let refs = RakedReferrerEntry { + references: [RakedReference { + target: new_url.to_string(), + kind: match reason { + RedirectReason::Redirected { .. } => ReferenceKind::Redirect, + RedirectReason::NotCanonical { .. } => ReferenceKind::CanonicalUrl, + }, + last_mod: None, + }] + .into_iter() + .collect(), + }; + + self.submission + .references + .send((url.clone(), refs.clone())) + .await + .context("Reference processor shut down; can't stream references!")?; + + self.as_event_processor() + .process_refs(url.clone(), refs, today) + .await?; + + Ok(NextAction::Continue) + } + RakeOutcome::TemporaryFailure(failure) => { + // TODO do we want to log this somewhere? + + let domain = get_reduced_domain(url)?; + let url = url.clone(); + + // TODO add 1.1× the previous backoff, if there was one. + let new_backoff = failure.backoff_sec; + let domain = domain.into_owned(); + + self.store + .async_rw_txn(move |txn| { + txn.start_backing_off(&domain, new_backoff, url.to_string(), failure)?; + + Ok(()) + }) + .await?; + + // Change domain now + Ok(NextAction::ChangeDomain) + } + RakeOutcome::PermanentFailure(failure) => { + self.submission + .rejections + .send((url.clone(), failure.clone())) + .await + .context("Rejection processor shut down; can't stream rejection!!")?; + self.as_event_processor() + .process_rejection(url.clone(), today) + .await?; + + // Reasons for permanent rejection aren't our fault or a site-wide fault; + // so don't worry about carrying on. + Ok(NextAction::Continue) + } } - todo!() } - async fn process_feed_or_sitemap(&self, feed: &Vec) -> anyhow::Result<()> { - todo!() + fn as_event_processor(&self) -> EventProcessor { + EventProcessor { + store: Cow::Borrowed(&self.store), + } + } +} + +/// Processes events that are either emitted into or loaded from a RakePack. +/// Processing events this ways means we can bring up a raker's store to the same state as another, +/// just by replaying the stream of RakePacks and importing seeds. +pub struct EventProcessor<'a> { + store: Cow<'a, RakerStore>, +} + +impl EventProcessor<'_> { + pub async fn process_page(&self, url: Url, datestamp: u16) -> anyhow::Result<()> { + self.store + .as_ref() + .async_rw_txn(move |txn| { + let domain = get_reduced_domain(&url)?; + txn.mark_url_as_visited( + domain.as_ref(), + url.as_ref(), + UrlVisitedRecord { + last_visited_days: datestamp, + }, + ) + }) + .await + } + + pub async fn process_refs( + &self, + url: Url, + refs: RakedReferrerEntry, + datestamp: u16, + ) -> anyhow::Result<()> { + self.store + .as_ref() + .async_rw_txn(move |txn| { + let domain = get_reduced_domain(&url)?; + txn.mark_url_as_visited( + domain.as_ref(), + url.as_ref(), + UrlVisitedRecord { + last_visited_days: datestamp, + }, + )?; + + // track all the referred-to URLs! + for reference in refs.references { + txn.enqueue_url(&reference.target, reference.last_mod, reference.kind.into())?; + } + + Ok(()) + }) + .await + } + + pub async fn process_rejection(&self, url: Url, datestamp: u16) -> anyhow::Result<()> { + self.store + .as_ref() + .async_rw_txn(move |txn| { + let domain = get_reduced_domain(&url)?; + txn.mark_url_as_visited( + domain.as_ref(), + url.as_ref(), + UrlVisitedRecord { + last_visited_days: datestamp, + }, + ) + }) + .await } } diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 5457abd..4bf5854 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -1,6 +1,10 @@ -use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32}; +use crate::raking::analysis::get_reduced_domain; +use crate::raking::{RakeIntent, TemporaryFailure}; +use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; -use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord, UrlVisitedRecord}; +use crate::storage::records::{ + ActiveDomainRecord, BackingOffDomainRecord, QueueUrlRecord, UrlVisitedRecord, +}; use anyhow::{bail, ensure, Context}; use libmdbx::{ Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind, @@ -8,9 +12,11 @@ use libmdbx::{ }; use log::info; use ouroboros::self_referencing; +use reqwest::Url; use std::borrow::Cow; use std::path::Path; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; mod mdbx_helper_types; mod migrations; @@ -29,12 +35,12 @@ pub struct Databases<'env> { /// Domain → BackingOffDomainRecord pub backing_off_domains: Database<'env>, /// URL → VisitedDomainRecord - pub visited: Database<'env>, + pub visited_urls: Database<'env>, } // Must match the order of the Databases struct fields. pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [ - ("queue_urls", DatabaseFlags::empty()), + ("urls_queue", DatabaseFlags::empty()), ("active_domains", DatabaseFlags::empty()), ("active_domain_raffle", DatabaseFlags::INTEGER_KEY), ( @@ -42,7 +48,7 @@ pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [ DatabaseFlags::INTEGER_KEY.union(DatabaseFlags::DUP_SORT), ), ("backing_off_domains", DatabaseFlags::empty()), - ("visited", DatabaseFlags::empty()), + ("urls_visited", DatabaseFlags::empty()), ]; #[self_referencing] @@ -126,7 +132,7 @@ impl RakerStore { active_domain_raffle: dbs.next().unwrap(), backing_off_reinstatements: dbs.next().unwrap(), backing_off_domains: dbs.next().unwrap(), - visited: dbs.next().unwrap(), + visited_urls: dbs.next().unwrap(), } }, } @@ -178,13 +184,14 @@ impl<'a> RakerTxn<'a, RW> { /// Inserts a domain into the active domain table, /// generating a raffle ticket (and inserting it too). /// - /// No-op if the domain is already an active domain. + /// No-op if the domain is already an active domain or being backed off from. pub fn insert_active_domain_with_new_raffle_ticket( &self, new_domain: String, ) -> anyhow::Result<()> { let active_domains = &self.mdbx.borrow_dbs().active_domains; let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; + let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains; let new_domain = MdbxString(Cow::Owned(new_domain)); @@ -196,6 +203,14 @@ impl<'a> RakerTxn<'a, RW> { return Ok(()); } + if self + .mdbx_txn + .get::<()>(backing_off_domains, new_domain.as_bytes())? + .is_some() + { + return Ok(()); + } + let reserved_raffle_ticket = loop { let next_raffle_ticket = MdbxU32(rand::random()); if self @@ -229,11 +244,11 @@ impl<'a> RakerTxn<'a, RW> { /// table. /// /// Returns true if a deletion took place, and false if it did not. - pub fn remove_active_domain(&self, domain: String) -> anyhow::Result { + pub fn remove_active_domain(&self, domain: &str) -> anyhow::Result { let active_domains = &self.mdbx.borrow_dbs().active_domains; let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; - let domain = MdbxString(Cow::Owned(domain)); + let domain = MdbxString(Cow::Borrowed(domain)); if let Some(MdbxBare(active_domain)) = self .mdbx_txn @@ -253,12 +268,12 @@ impl<'a> RakerTxn<'a, RW> { /// Marks a URL as visited and takes it out of the queue. pub fn mark_url_as_visited( &self, - domain: String, - url_str: String, + domain: &str, + url_str: &str, record: UrlVisitedRecord, ) -> anyhow::Result<()> { let queue_urls = &self.mdbx.borrow_dbs().queue_urls; - let visited_urls = &self.mdbx.borrow_dbs().visited; + let visited_urls = &self.mdbx.borrow_dbs().visited_urls; let queue_key = format!("{}\n{}", domain, url_str); @@ -276,6 +291,108 @@ impl<'a> RakerTxn<'a, RW> { Ok(()) } + + pub fn start_backing_off( + &self, + domain: &str, + backoff_for: u32, + failed_url: String, + failure: TemporaryFailure, + ) -> anyhow::Result<()> { + ensure!( + self.remove_active_domain(domain)?, + "Can't back off from domain that's not active" + ); + let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains; + let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements; + + let reinstate_at = + SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() + backoff_for as u64; + + let backoff_record = BackingOffDomainRecord { + failed_url, + failure, + backoff: backoff_for, + reinstate_at, + }; + + self.mdbx_txn.put( + backing_off_domains, + domain.as_bytes(), + &MdbxBare(backoff_record).as_bytes(), + WriteFlags::empty(), + )?; + self.mdbx_txn.put( + backing_off_reinstatements, + MdbxU64(reinstate_at).as_bytes(), + domain.as_bytes(), + WriteFlags::empty(), + )?; + + Ok(()) + } + + /// Enqueues a URL. + /// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been + /// visited since then. + /// If `only_if_not_visited_since` is not specified, then this is a no-op if the page has already + /// been visited. + /// + /// Returns: true if it was enqueued, false if nothing changed. + pub fn enqueue_url( + &self, + url_str: &str, + last_modified: Option, + intent: RakeIntent, + ) -> anyhow::Result { + let queue_urls = &self.mdbx.borrow_dbs().queue_urls; + let visited_urls = &self.mdbx.borrow_dbs().visited_urls; + + let url = Url::parse(url_str)?; + let url_domain = get_reduced_domain(&url)?; + + let queue_key = format!("{}\n{}", url_domain, url); + + if self + .mdbx_txn + .get::<()>(queue_urls, queue_key.as_bytes())? + .is_some() + { + // Already in the queue. Nothing to do here. + return Ok(false); + } + + if let Some(MdbxBare(visited_entry)) = self + .mdbx_txn + .get::>(visited_urls, url_str.as_bytes())? + { + match last_modified { + None => { + // Already visited. Nothing to do here. + return Ok(false); + } + Some(last_modified) => { + if last_modified <= visited_entry.last_visited_days { + // Hasn't been modified since our last visit + return Ok(false); + } + } + } + } + + // Add the entry to the queue + self.mdbx_txn.put( + queue_urls, + queue_key.as_bytes(), + &MdbxBare(QueueUrlRecord { intent }).as_bytes(), + WriteFlags::empty(), + )?; + + // Activate the domain if needed... + self.insert_active_domain_with_new_raffle_ticket(url_domain.into_owned())?; + + Ok(true) + } } /// Read-only implementations (but can also be used on RW transactions) diff --git a/quickpeep_raker/src/storage/mdbx_helper_types.rs b/quickpeep_raker/src/storage/mdbx_helper_types.rs index 6e5e650..5d8c2d1 100644 --- a/quickpeep_raker/src/storage/mdbx_helper_types.rs +++ b/quickpeep_raker/src/storage/mdbx_helper_types.rs @@ -30,6 +30,32 @@ impl TableObject<'_> for MdbxU32 { } } +/// u64 in native byte endianness (as required by INTEGERKEY mode) +#[derive(Copy, Clone, Debug)] +pub struct MdbxU64(pub u64); + +impl MdbxU64 { + pub fn as_bytes(&self) -> Cow<'_, [u8]> { + Cow::Owned(self.0.to_ne_bytes().to_vec()) + } +} + +impl TableObject<'_> for MdbxU64 { + fn decode(data_val: &[u8]) -> Result + where + Self: Sized, + { + if data_val.len() != 8 { + return Err(libmdbx::Error::DecodeError( + anyhow!("MDBX Key not 8 bytes; can't be decoded as u64").into(), + )); + } + let mut buf = [0u8; 8]; + buf.copy_from_slice(&data_val); + Ok(MdbxU64(u64::from_ne_bytes(buf))) + } +} + /// UTF-8 String #[derive(Clone, Debug)] pub struct MdbxString<'txn>(pub Cow<'txn, str>); diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index a406e6b..cc1bd69 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -1,4 +1,4 @@ -use crate::raking::RakeIntent; +use crate::raking::{RakeIntent, TemporaryFailure}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -21,3 +21,16 @@ pub struct UrlVisitedRecord { pub struct QueueUrlRecord { pub intent: RakeIntent, // TODO } + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct BackingOffDomainRecord { + /// The URL that caused the backoff. + pub failed_url: String, + /// The reason that this backoff is in place + pub failure: TemporaryFailure, + /// Duration of the backoff. Used to provide increasing backoffs if the failures persist. + pub backoff: u32, + /// When the domain should be reinstated + /// MUST match the timestamp present in the reinstatements table. + pub reinstate_at: u64, +} diff --git a/quickpeep_structs/src/rake_entries.rs b/quickpeep_structs/src/rake_entries.rs index d605381..0e3d9a0 100644 --- a/quickpeep_structs/src/rake_entries.rs +++ b/quickpeep_structs/src/rake_entries.rs @@ -14,6 +14,8 @@ bitflags! { const COOKIE_NAG = 0x04; /// Unspecified annoyances are present on this page, according to a cosmetic filter. const ANNOYANCE = 0x08; + /// Social trackers are a subcategory of privacy... + const SOCIAL = 0x10; /// The web page was served over CloudFlare at the time of indexing, which is not in the /// spirit of decentralisation. @@ -38,6 +40,8 @@ pub struct RakedReferrerEntry { pub struct RakedReference { pub target: String, pub kind: ReferenceKind, + /// Date of last modification (if known), as a QuickPeep datestamp. + pub last_mod: Option, } #[derive(Serialize, Deserialize, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]