From 51d5b9208bf4c4c19498c2165c19c431f982058a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 21 Mar 2022 19:16:48 +0000 Subject: [PATCH] Put URLs on hold rather than the queue if they are not allowed --- quickpeep_raker/src/raking/task.rs | 41 ++++++++++++++++++++-- quickpeep_raker/src/storage.rs | 48 ++++++++++++++++++++++++-- quickpeep_raker/src/storage/records.rs | 9 +++++ 3 files changed, 92 insertions(+), 6 deletions(-) diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index ce4beb2..c447864 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -4,7 +4,7 @@ use crate::raking::{ get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, }; -use crate::storage::records::UrlVisitedRecord; +use crate::storage::records::{AllowedDomainRecord, UrlVisitedRecord}; use crate::storage::{RakerStore, RandomActiveDomainAcquisition}; use anyhow::{anyhow, ensure, Context}; use chrono::Utc; @@ -16,7 +16,7 @@ use quickpeep_structs::rake_entries::{ }; use quickpeep_utils::dates::date_to_quickpeep_days; use reqwest::{Client, Url}; -use std::borrow::Cow; +use std::borrow::{Borrow, Cow}; use std::collections::HashSet; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex as StdMutex, RwLock}; @@ -469,7 +469,42 @@ impl EventProcessor<'_> { // track all the referred-to URLs! for reference in refs.references { - txn.enqueue_url(&reference.target, reference.last_mod, reference.kind.into())?; + let ref_url = Url::parse(&reference.target)?; + let domain = get_reduced_domain(&ref_url)?; + let allowed = match txn.get_allowed_domain_record(domain.borrow())? { + None => false, + Some(AllowedDomainRecord { + restricted_prefixes, + }) => { + if restricted_prefixes.is_empty() { + true + } else { + let mut allowed = false; + for prefix in restricted_prefixes.iter() { + if ref_url.path().starts_with(prefix) { + allowed = true; + break; + } + if prefix.as_str() > ref_url.path() { + // e.g. /dog > /cat/xyz + // This means we've missed all chances to see our prefix, + // so we break here (efficiency). + break; + } + } + allowed + } + } + }; + if allowed { + txn.enqueue_url( + &reference.target, + reference.last_mod, + reference.kind.into(), + )?; + } else { + txn.put_url_on_hold(&reference.target, reference.kind.into())?; + } } txn.commit()?; diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index e8d1fc7..1b85671 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -3,8 +3,8 @@ use crate::raking::{RakeIntent, TemporaryFailure}; use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::records::{ - ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, QueueUrlRecord, - UrlVisitedRecord, + ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, OnHoldUrlRecord, + QueueUrlRecord, UrlVisitedRecord, }; use anyhow::{anyhow, bail, ensure, Context}; use libmdbx::{ @@ -42,7 +42,7 @@ pub struct Databases<'env> { pub visited_urls: Database<'env>, /// Domain → AllowedDomainRecord pub allowed_domains: Database<'env>, - /// Domain \n URL → Number of refs (INT VALUE) + /// Domain \n URL → OnHoldUrlRecord Number of refs (INT VALUE) pub urls_on_hold: Database<'env>, } @@ -58,6 +58,8 @@ impl<'env> Databases<'env> { ), ("backing_off_domains", &self.backing_off_domains), ("visited_urls", &self.visited_urls), + ("allowed_domains", &self.allowed_domains), + ("urls_on_hold", &self.urls_on_hold), ] .into_iter() } @@ -439,6 +441,46 @@ impl<'a> RakerTxn<'a, RW> { Ok(true) } + /// Enqueues a URL to the 'on hold' queue. + /// + /// Returns: true if it was enqueued, false if nothing changed. + pub fn put_url_on_hold(&self, url_str: &str, intent: RakeIntent) -> anyhow::Result { + let urls_on_hold = &self.mdbx.borrow_dbs().urls_on_hold; + + let url = Url::parse(url_str)?; + let url_domain = get_reduced_domain(&url)?; + + let queue_key = format!("{}\n{}", url_domain, url); + + let (record, is_new) = if let Some(mut record) = self + .mdbx_txn + .get::>(urls_on_hold, queue_key.as_bytes())? + { + // Already in the queue. Nothing to do here, except bump up the refs count. + record.0.refs = record.0.refs.saturating_add(1); + + (record, false) + } else { + ( + MdbxBare(OnHoldUrlRecord { + refs: 1, + queue_record: QueueUrlRecord { intent }, + }), + true, + ) + }; + + // Add the entry to the queue + self.mdbx_txn.put( + urls_on_hold, + queue_key.as_bytes(), + &record.as_bytes(), + WriteFlags::empty(), + )?; + + Ok(is_new) + } + pub fn put_allowed_domain_record( &self, domain: &str, diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index 2cafdbe..91c9976 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -23,6 +23,15 @@ pub struct QueueUrlRecord { pub intent: RakeIntent, // TODO CONSIDER } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct OnHoldUrlRecord { + /// Record that should be emitted once this is released. + pub queue_record: QueueUrlRecord, + + /// Number of times this URL has been 'enqueued'; capped at 255. + pub refs: u8, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct BackingOffDomainRecord { /// The URL that caused the backoff.