From ab0b1e84ee25ee896b985e11057b25d8558f75b1 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 19 Mar 2022 21:04:12 +0000 Subject: [PATCH] STASH work on Raking --- Cargo.lock | 28 ++++++ quickpeep_raker/Cargo.toml | 3 + quickpeep_raker/src/bin/qp-raker.rs | 71 ++++++++++++++ quickpeep_raker/src/bin/qp-seeds.rs | 2 + quickpeep_raker/src/raking.rs | 12 ++- quickpeep_raker/src/raking/analysis.rs | 11 +++ quickpeep_raker/src/raking/task.rs | 124 +++++++++++++++++++++++++ quickpeep_raker/src/storage.rs | 56 +++++++++-- quickpeep_raker/src/storage/records.rs | 15 +++ quickpeep_utils/Cargo.toml | 4 + quickpeep_utils/src/dates.rs | 23 +++++ quickpeep_utils/src/lazy.rs | 48 ++++++++++ quickpeep_utils/src/lib.rs | 50 +--------- 13 files changed, 388 insertions(+), 59 deletions(-) create mode 100644 quickpeep_raker/src/bin/qp-raker.rs create mode 100644 quickpeep_raker/src/raking/task.rs create mode 100644 quickpeep_utils/src/dates.rs create mode 100644 quickpeep_utils/src/lazy.rs diff --git a/Cargo.lock b/Cargo.lock index b121335..8f0f4b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2019,6 +2019,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lru" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb87f3080f6d1d69e8c564c0fcfde1d7aa8cc451ce40cae89479111f03bc0eb" +dependencies = [ + "hashbrown", +] + [[package]] name = "mac" version = "0.1.1" @@ -2599,6 +2608,18 @@ version = "2.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8eda7c62d9ecaafdf8b62374c006de0adf61666ae96a96ba74a37134aa4e470" +[[package]] +name = "publicsuffix" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "292972edad6bbecc137ab84c5e36421a4a6c979ea31d3cc73540dd04315b33e1" +dependencies = [ + "byteorder", + "hashbrown", + "idna", + "psl-types", +] + [[package]] name = "quick-xml" version = "0.22.0" @@ -2678,8 +2699,10 @@ dependencies = [ "libmdbx", "lingua", "log", + "lru", "mdbx-sys", "ouroboros", + "publicsuffix", "quickpeep_densedoc", "quickpeep_moz_readability", "quickpeep_structs", @@ -2706,6 +2729,11 @@ dependencies = [ [[package]] name = "quickpeep_utils" version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "lazy_static", +] [[package]] name = "quote" diff --git a/quickpeep_raker/Cargo.toml b/quickpeep_raker/Cargo.toml index c938406..d602227 100644 --- a/quickpeep_raker/Cargo.toml +++ b/quickpeep_raker/Cargo.toml @@ -44,6 +44,7 @@ log = "0.4.14" env_logger = "0.9.0" ouroboros = "0.14.2" rand = "0.8.5" +lru = "0.7.3" ### Raking helpers # HTTP Requests @@ -57,6 +58,8 @@ cylon = { version = "0.2.0", features = ["crawl-delay"] } feed-rs = "1.0.0" # Sitemaps sitemap = "0.4.1" +# Public Suffix List handling +publicsuffix = "2.1.1" ### Filtering helpers # AdBlock diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs new file mode 100644 index 0000000..4743d4e --- /dev/null +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -0,0 +1,71 @@ +use clap::Parser; + +use env_logger::Env; + +use anyhow::{bail, Context}; +use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; +use reqwest::redirect::Policy; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Semaphore; + +use quickpeep_raker::config; +use quickpeep_raker::raking::{RAKER_USER_AGENT, TIME_LIMIT}; +use quickpeep_raker::storage::RakerStore; + +/// Seeds a raker's queue with URLs +#[derive(Clone, Debug, Parser)] +pub struct Opts { + #[clap(long = "config")] + config: Option, + + #[clap(long = "concurrency")] + /// How many concurrent requests to perform + concurrent_jobs: u32, + + #[clap(long = "sleepers")] + /// Allow an additional number of sleeping tasks + /// (Waiting for a crawl delay before moving on to the next request) + concurrent_sleepers: u32, +} + +#[tokio::main] +pub async fn main() -> anyhow::Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("info,quickpeep=debug")).init(); + + let opts: Opts = Opts::parse(); + + let config_path = opts + .config + .unwrap_or_else(|| PathBuf::from("qp_raker.toml")); + let config = config::RakerConfig::load(&config_path).context("Failed to load config")?; + + if !config.workbench_dir.exists() { + bail!( + "Workbench directory ({:?}) doesn't exist.", + config.workbench_dir + ); + } + + let mut header_map = HeaderMap::new(); + header_map.insert(USER_AGENT, HeaderValue::from_static(RAKER_USER_AGENT)); + + let _client = reqwest::ClientBuilder::new() + .pool_idle_timeout(Duration::from_secs(90)) + .pool_max_idle_per_host(1) + .timeout(TIME_LIMIT) + .default_headers(header_map) + // We want to handle redirects ourselves so we can track them... + .redirect(Policy::none()) + .build()?; + + let _store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; + + let _num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; + let _semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize)); + + eprintln!("{:#?}", config); + + Ok(()) +} diff --git a/quickpeep_raker/src/bin/qp-seeds.rs b/quickpeep_raker/src/bin/qp-seeds.rs index 4d91051..01657e5 100644 --- a/quickpeep_raker/src/bin/qp-seeds.rs +++ b/quickpeep_raker/src/bin/qp-seeds.rs @@ -39,6 +39,8 @@ pub async fn main() -> anyhow::Result<()> { bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir); } + // TODO discover sitemaps at the same time as digging up robots.txt files + eprintln!("{:#?}", config); Ok(()) diff --git a/quickpeep_raker/src/raking.rs b/quickpeep_raker/src/raking.rs index 9ea65c8..b08648f 100644 --- a/quickpeep_raker/src/raking.rs +++ b/quickpeep_raker/src/raking.rs @@ -14,7 +14,7 @@ use lazy_static::lazy_static; use log::debug; use quickpeep_densedoc::{DenseDocument, DenseHead, DenseTree}; use quickpeep_structs::rake_entries::{AnalysisAntifeatures, RakedPageEntry, RakedReferrerEntry}; -use quickpeep_utils::Lazy; +use quickpeep_utils::lazy::Lazy; use reqwest::header::HeaderMap; use reqwest::{Client, Response, Url}; use serde::{Deserialize, Serialize}; @@ -25,6 +25,7 @@ use tokio::time::Instant; pub mod analysis; pub mod references; +pub mod task; /// 4 MiB ought to be enough for anybody. pub const SIZE_LIMIT: usize = 4 * 1024 * 1024; @@ -547,10 +548,13 @@ pub fn rake_sitemap(content: &[u8]) -> anyhow::Result> { Ok(urls) } +pub fn robots_txt_url_for(url: &Url) -> anyhow::Result { + url.join("/robots.txt") + .context("Whilst resolving /robots.txt on URL") +} + pub async fn get_robots_txt_for(url: &Url, client: &Client) -> anyhow::Result> { - let robots_url = url - .join("/robots.txt") - .context("Whilst resolving /robots.txt on URL")?; + let robots_url = robots_txt_url_for(url)?; let resp = client.get(robots_url.clone()).send().await?; if !resp.status().is_success() { diff --git a/quickpeep_raker/src/raking/analysis.rs b/quickpeep_raker/src/raking/analysis.rs index 8204a3b..b564782 100644 --- a/quickpeep_raker/src/raking/analysis.rs +++ b/quickpeep_raker/src/raking/analysis.rs @@ -4,6 +4,8 @@ use anyhow::Context; use ipnetwork::IpNetwork; use kuchiki::NodeRef; use lingua::Language; +use reqwest::Url; +use std::borrow::Cow; use std::collections::{BTreeSet, HashSet}; use std::net::IpAddr; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; @@ -177,6 +179,15 @@ impl IpSet { } } +pub fn get_reduced_domain(url: &Url) -> anyhow::Result> { + let domain = url.domain().context("URLs must have domains")?; + + Ok(Cow::Borrowed(match domain.strip_prefix("www.") { + Some(stripped) => stripped, + None => domain, + })) +} + #[cfg(test)] mod test { use crate::raking::analysis::IpSet; diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs new file mode 100644 index 0000000..f81e45f --- /dev/null +++ b/quickpeep_raker/src/raking/task.rs @@ -0,0 +1,124 @@ +use crate::raking::{get_robots_txt_for, RakeOutcome, Raker, RobotsTxt, UrlRaked}; +use crate::storage::RakerStore; +use cylon::Cylon; +use lru::LruCache; +use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry}; +use reqwest::{Client, Url}; +use std::collections::HashSet; +use std::sync::{Arc, Mutex as StdMutex, RwLock}; +use std::time::Duration; +use tokio::sync::mpsc::Sender; +use tokio::sync::Semaphore; + +/// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the +/// queue and get turned into a backoff. +/// (This is necessary because some crawl delays can easily be hours or days.) +pub const MAX_CRAWL_DELAY_BEFORE_BACKOFF: Duration = Duration::from_secs(61); + +/// Most sites request a crawl delay of 10 sec or less. +/// If unspecified, let's go with a reasonable-sounding number of 15 secs. +pub const DEFAULT_CRAWL_DELAY: Duration = Duration::from_secs(15); + +enum NextAction { + Continue, + ChangeDomain, +} + +#[derive(Clone)] +pub struct TaskResultSubmission { + pages: Sender<(Url, RakedPageEntry)>, + references: Sender<(Url, RakedReferrerEntry)>, +} + +#[derive(Clone)] +pub struct TaskContext { + /// The backing database store + store: RakerStore, + + /// HTTP client + client: Client, + + /// The raker + raker: Arc, + + /// Busy domains (that are being processed by other tasks) + busy_domains: Arc>>, + + /// Cache of robots.txt entries for recently-made dormant sites + robotstxt_cache: Arc>>>, + + /// Semaphore that gives permits to make HTTP requests + semaphore: Arc, + + submission: TaskResultSubmission, +} + +impl TaskContext { + pub async fn run(self) -> anyhow::Result<()> { + // Get a domain to process + todo!(); + Ok(()) + } + + pub async fn get_robot_rules(&self, url_of_site: &Url) -> anyhow::Result> { + let robots = get_robots_txt_for(url_of_site, &self.client).await?; + Ok(robots.map(|robots: RobotsTxt| robots.rules)) + } + + pub async fn process_domain(&mut self, domain: String) -> anyhow::Result<()> { + let mut current_robot_rules_url: Option = None; + let mut current_robot_rules: Option = None; + + loop { + // Get a URL to process + let url = { + let txn = self.store.ro_txn()?; + txn.choose_url_for_domain(&domain)? + }; + + let url = todo!(); + let intent = todo!(); + + // Check our robot rules are valid for that URL + todo!(); + + let permit = self.semaphore.acquire().await?; + + // TODO process errors + match self.raker.rake(url, intent, &self.client).await { + Ok(rake_outcome) => match self.process_outcome(&url, rake_outcome).await? { + NextAction::Continue => { + todo!() + } + NextAction::ChangeDomain => { + todo!() + } + }, + Err(_) => { + todo!(); + } + } + + drop(permit); + } + + Ok(()) + } + + /// Processes the outcome of + async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result { + match outcome { + RakeOutcome::RakedPage(page) => {} + RakeOutcome::RakedFeed(feed) => {} + RakeOutcome::RakedSitemap(sitemap) => {} + RakeOutcome::Redirect { reason, new_url } => {} + RakeOutcome::TemporaryFailure(failure) => {} + RakeOutcome::PermanentFailure(failure) => {} + } + todo!() + } + + async fn process_feed_or_sitemap(&self, feed: &Vec) -> anyhow::Result<()> { + todo!() + } +} diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index abe36f3..48c0a72 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -1,10 +1,10 @@ use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; -use crate::storage::records::ActiveDomainRecord; -use anyhow::{bail, ensure}; +use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord}; +use anyhow::{bail, ensure, Context}; use libmdbx::{ - Database, DatabaseFlags, Environment, Transaction, TransactionKind, WriteFlags, WriteMap, RO, - RW, + Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind, + WriteFlags, WriteMap, RO, RW, }; use log::info; use ouroboros::self_referencing; @@ -18,7 +18,7 @@ mod records; /// The databases available in an environment. pub struct Databases<'env> { - /// URL → QueueUrlRecord + /// Domain \n URL → QueueUrlRecord pub queue_urls: Database<'env>, /// Domain → ActiveDomainRecord pub active_domains: Database<'env>, @@ -53,6 +53,8 @@ pub struct RakerDb { pub dbs: Databases<'this>, } +/// Handle to the store. +/// Doesn't need wrapping in Arc because it already is. #[derive(Clone)] pub struct RakerStore { pub mdbx: Arc, @@ -60,7 +62,13 @@ pub struct RakerStore { impl RakerStore { pub fn open(path: &Path) -> anyhow::Result { - let env = Environment::new().set_max_dbs(256).open(path)?; + let mut flags = EnvironmentFlags::default(); + flags.no_sub_dir = true; + + let env = Environment::new() + .set_max_dbs(256) + .set_flags(flags) + .open(path)?; let mdbx: RakerDb = RakerDbBuilder { env, @@ -243,7 +251,8 @@ impl<'a> RakerTxn<'a, RW> { } } -impl<'a> RakerTxn<'a, RO> { +/// Read-only implementations (but can also be used on RW transactions) +impl<'a, K: TransactionKind> RakerTxn<'a, K> { pub fn choose_random_active_domain( &self, ) -> anyhow::Result> { @@ -288,4 +297,37 @@ impl<'a> RakerTxn<'a, RO> { Ok(Some((domain.into_string(), record))) } + + pub fn choose_url_for_domain( + &self, + domain: &str, + ) -> anyhow::Result> { + let queue: &Database = &self.mdbx.borrow_dbs().queue_urls; + + let mut cur = self.mdbx_txn.cursor(queue)?; + match cur + .iter_from::>( + MdbxString(Cow::Owned(format!("{}\n", domain))).as_bytes(), + ) + .next() + { + Some(entry) => { + let (k, MdbxBare(record)) = entry?; + let domain_followed_by_url = k.0.as_ref(); + let mut split = domain_followed_by_url.split("\n"); + let actual_domain = split.next().context("No domain")?; + let url = split.next().context("No URL")?; + + if domain != actual_domain { + // This means we've ran out of URLs for the domain in question. + return Ok(None); + } + + ensure!(split.next().is_none(), "Should be no more splits."); + + Ok(Some((url.to_owned(), record))) + } + None => Ok(None), + } + } } diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index 3d7980b..b748307 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -5,3 +5,18 @@ pub struct ActiveDomainRecord { /// The raffle ticket number owned by this domain. pub raffle_ticket: u32, } + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct VisitedDomainRecord { + /// Number of minutes since the QuickPeep Epoch that this page was last raked at. + /// We store minutes to give us 60× the range of times. + /// We'd really rather stick with 32-bit ints to reduce the space storage requirements. + /// We could *possibly* go for a u16 in the future and store number of days (179 years' range): + /// sitemaps and feeds usually only tell you the date the page was last updated. + pub last_visited_days: u16, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct QueueUrlRecord { + // TODO +} diff --git a/quickpeep_utils/Cargo.toml b/quickpeep_utils/Cargo.toml index b980a89..197992c 100644 --- a/quickpeep_utils/Cargo.toml +++ b/quickpeep_utils/Cargo.toml @@ -6,3 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +chrono = "0.4.19" +lazy_static = "1.4.0" +anyhow = "1.0.56" + diff --git a/quickpeep_utils/src/dates.rs b/quickpeep_utils/src/dates.rs new file mode 100644 index 0000000..6c01bff --- /dev/null +++ b/quickpeep_utils/src/dates.rs @@ -0,0 +1,23 @@ +use anyhow::Context; +use chrono::{Date, Duration, TimeZone, Utc}; +use lazy_static::lazy_static; + +lazy_static! { + /// The QuickPeep Epoch is 2022-01-01, as this gives us 52 years of extra headroom compared to the + /// Unix one. QuickPeep didn't exist before 2022 so we needn't worry about negative dates! + pub static ref QUICKPEEP_EPOCH: Date = Utc.ymd(2022, 1, 1); +} + +pub fn date_from_quickpeep_days(days: u16) -> Date { + let dt = QUICKPEEP_EPOCH.and_hms(0, 0, 0); + (dt + Duration::days(days as i64)).date() +} + +pub fn date_to_quickpeep_days(date: &Date) -> anyhow::Result { + let dt = date.and_hms(0, 0, 0); + let duration = dt - QUICKPEEP_EPOCH.and_hms(0, 0, 0); + duration + .num_days() + .try_into() + .context("Failed to convert date to QuickPeep datestamp") +} diff --git a/quickpeep_utils/src/lazy.rs b/quickpeep_utils/src/lazy.rs new file mode 100644 index 0000000..750d8f4 --- /dev/null +++ b/quickpeep_utils/src/lazy.rs @@ -0,0 +1,48 @@ +use std::cell::RefCell; +use std::ops::Deref; + +enum LazyInner<'a, T> { + Uncomputed(Option T + 'a>>), + Computed(T), +} + +pub struct Lazy<'a, T> { + inner: RefCell>, +} +impl<'a, T> Lazy<'a, T> { + pub fn new(func: Box T + 'a>) -> Lazy { + Lazy { + inner: RefCell::new(LazyInner::Uncomputed(Some(func))), + } + } +} + +impl<'a, T: 'a> Deref for Lazy<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe fn extend_lifetime<'a, 'b, A>(a: &'a A) -> &'b A { + std::mem::transmute(a) + } + + let mut inner_mut = self.inner.borrow_mut(); + if let LazyInner::Uncomputed(func) = &mut *inner_mut { + if let Some(func) = func.take() { + *inner_mut = LazyInner::Computed(func()); + } else { + panic!("Unreachable: uncomputed but no function to compute with") + } + } + + match &*inner_mut { + LazyInner::Computed(computed) => unsafe { + // Extending the lifetime *should* be safe because we don't ever overwrite + // a computed value... + extend_lifetime(computed) + }, + LazyInner::Uncomputed(_) => { + panic!("Unreachable: Should have been computed"); + } + } + } +} diff --git a/quickpeep_utils/src/lib.rs b/quickpeep_utils/src/lib.rs index 750d8f4..6bcb8ed 100644 --- a/quickpeep_utils/src/lib.rs +++ b/quickpeep_utils/src/lib.rs @@ -1,48 +1,2 @@ -use std::cell::RefCell; -use std::ops::Deref; - -enum LazyInner<'a, T> { - Uncomputed(Option T + 'a>>), - Computed(T), -} - -pub struct Lazy<'a, T> { - inner: RefCell>, -} -impl<'a, T> Lazy<'a, T> { - pub fn new(func: Box T + 'a>) -> Lazy { - Lazy { - inner: RefCell::new(LazyInner::Uncomputed(Some(func))), - } - } -} - -impl<'a, T: 'a> Deref for Lazy<'a, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - unsafe fn extend_lifetime<'a, 'b, A>(a: &'a A) -> &'b A { - std::mem::transmute(a) - } - - let mut inner_mut = self.inner.borrow_mut(); - if let LazyInner::Uncomputed(func) = &mut *inner_mut { - if let Some(func) = func.take() { - *inner_mut = LazyInner::Computed(func()); - } else { - panic!("Unreachable: uncomputed but no function to compute with") - } - } - - match &*inner_mut { - LazyInner::Computed(computed) => unsafe { - // Extending the lifetime *should* be safe because we don't ever overwrite - // a computed value... - extend_lifetime(computed) - }, - LazyInner::Uncomputed(_) => { - panic!("Unreachable: Should have been computed"); - } - } - } -} +pub mod dates; +pub mod lazy;