diff --git a/Cargo.lock b/Cargo.lock index 5ea8e37..ee06922 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -594,6 +594,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "diplomatic-bag" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6448adcf5616f6dd9627e545f447cf13dc5fda312f938364240d8feb84b0d12b" +dependencies = [ + "crossbeam-channel", + "once_cell", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -2687,6 +2697,7 @@ dependencies = [ "clap", "colour", "cylon", + "diplomatic-bag", "env_logger", "feed-rs", "futures-util", diff --git a/quickpeep_raker/Cargo.toml b/quickpeep_raker/Cargo.toml index d602227..c3f9a5d 100644 --- a/quickpeep_raker/Cargo.toml +++ b/quickpeep_raker/Cargo.toml @@ -45,6 +45,7 @@ env_logger = "0.9.0" ouroboros = "0.14.2" rand = "0.8.5" lru = "0.7.3" +diplomatic-bag = "0.2.0" ### Raking helpers # HTTP Requests diff --git a/quickpeep_raker/src/bin/qp-rake1.rs b/quickpeep_raker/src/bin/qp-rake1.rs index 8e26d9c..2531dc4 100644 --- a/quickpeep_raker/src/bin/qp-rake1.rs +++ b/quickpeep_raker/src/bin/qp-rake1.rs @@ -4,7 +4,8 @@ use clap::Parser; use colour::{blue_ln, green_ln, red_ln, yellow_ln}; use env_logger::Env; use log::warn; -use quickpeep_raker::raking::analysis::{load_adblock_engine, IpSet}; +use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet}; +use quickpeep_raker::raking::page_extraction::PageExtractionService; use quickpeep_raker::raking::references::references_from_urlrakes; use quickpeep_raker::raking::{RakeIntent, RakeOutcome}; use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; @@ -57,7 +58,7 @@ pub async fn main() -> anyhow::Result<()> { let file = File::open(&path).await?; adblock_engines.push(( *antifeature, - load_adblock_engine(file, RuleTypes::All).await?, + preload_adblock_engine(file, RuleTypes::All).await?, )); } @@ -69,8 +70,8 @@ pub async fn main() -> anyhow::Result<()> { antifeature_ip_set.add_all_from_file(ips_file).await?; let raker = Raker { - adblock_engines, antifeature_ip_set, + page_extraction: PageExtractionService::new(adblock_engines)?, }; let outcome = raker.rake(&opts.url, RakeIntent::Any, &client).await?; diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index f059378..d88a56d 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -3,18 +3,19 @@ use clap::Parser; use env_logger::Env; use anyhow::{bail, Context}; +use log::error; +use lru::LruCache; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use reqwest::redirect::Policy; use std::path::PathBuf; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; -use log::error; -use lru::LruCache; use tokio::sync::{mpsc, Semaphore}; use quickpeep_raker::config; -use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; +use quickpeep_raker::raking::page_extraction::PageExtractionService; use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission}; +use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; use quickpeep_raker::storage::RakerStore; /// Seeds a raker's queue with URLs @@ -66,8 +67,8 @@ pub async fn main() -> anyhow::Result<()> { let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; let raker = Raker { - adblock_engines: vec![], - antifeature_ip_set: todo!() + antifeature_ip_set: todo!(), + page_extraction: PageExtractionService::new(vec![])?, // TODO }; let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; @@ -78,7 +79,7 @@ pub async fn main() -> anyhow::Result<()> { let submission = TaskResultSubmission { pages: pages_tx, - references: refs_tx + references: refs_tx, }; let task_context = TaskContext { @@ -88,7 +89,7 @@ pub async fn main() -> anyhow::Result<()> { busy_domains: Arc::new(Mutex::new(Default::default())), robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))), semaphore, - submission + submission, }; let mut tasks = Vec::with_capacity(num_tasks as usize); @@ -101,8 +102,6 @@ pub async fn main() -> anyhow::Result<()> { error!("Raker task {:?} encountered an error: {:?}", task_num, err); } })); - - // TODO spawn task } drop(task_context); diff --git a/quickpeep_raker/src/raking.rs b/quickpeep_raker/src/raking.rs index d94773f..3f2c954 100644 --- a/quickpeep_raker/src/raking.rs +++ b/quickpeep_raker/src/raking.rs @@ -1,31 +1,24 @@ -use crate::raking::analysis::{ - analyse_with_ad_block_cosmetic_filter, guess_document_language, IpSet, -}; -use adblock::engine::Engine; +use crate::raking::analysis::IpSet; +use crate::raking::page_extraction::{ExtractedPage, PageExtractionService}; use anyhow::{bail, Context}; use chrono::{DateTime, FixedOffset, Utc}; use cylon::Cylon; use futures_util::stream::StreamExt; use html5ever::tendril::fmt::Slice; use itertools::Itertools; -use kuchiki::traits::TendrilSink; -use kuchiki::NodeRef; use lazy_static::lazy_static; use log::debug; -use quickpeep_densedoc::{DenseDocument, DenseHead, DenseTree}; -use quickpeep_structs::rake_entries::{AnalysisAntifeatures, RakedPageEntry, RakedReferrerEntry}; -use quickpeep_utils::lazy::Lazy; +use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry}; use reqwest::header::HeaderMap; use reqwest::{Client, Response, Url}; use serde::{Deserialize, Serialize}; use sitemap::reader::SiteMapEntity; use std::collections::HashSet; use std::time::Duration; -use tokio::sync::Mutex; use tokio::time::Instant; -use quickpeep_utils::threadguard::ThreadGuard; pub mod analysis; +pub mod page_extraction; pub mod references; pub mod task; @@ -92,6 +85,7 @@ pub struct PermanentFailure { pub enum TemporaryFailureReason { MissingInformation(String), ServerError(u16), + UnknownClientError(String), } #[derive(Debug)] @@ -168,8 +162,8 @@ async fn response_to_bytes_limited( } pub struct Raker { - pub adblock_engines: Vec<(AnalysisAntifeatures, ThreadGuard)>, pub antifeature_ip_set: IpSet, + pub page_extraction: PageExtractionService, } impl Raker { @@ -249,7 +243,8 @@ impl Raker { { // We don't try any fallbacks for an HTML page return Ok(self - .rake_html_page(&content, url, is_cf, &headers).await + .rake_html_page(&content, url, is_cf, &headers) + .await .context("Raking HTML page")?); } @@ -291,172 +286,31 @@ impl Raker { is_cf: bool, headers: &HeaderMap, ) -> anyhow::Result { - let content_str = std::str::from_utf8(content)?; + let content_str = std::str::from_utf8(content)?.to_owned(); - let root_node: NodeRef = kuchiki::parse_html().one(content_str); - - // See whether this page is at the canonical URL for the page. - // If it's not, then we redirect the raker to the canonical URL. - if let Ok(canonical_link_node) = root_node.select_first("head link[rel=canonical]") { - if let Some(canonical_href) = canonical_link_node.attributes.borrow().get("href") { - let canonical_url = url - .join(canonical_href) - .context("Failed to resolve or parse canonical URL")?; - - if &canonical_url != url { - return Ok(RakeOutcome::Redirect { - reason: RedirectReason::NotCanonical, - new_url: canonical_url, - }); - } - } - } - - // Try and dig up the page's language. - // First try since this is the modern way, and potentially the most trustworthy... - let mut language = None; - - if let Ok(html_node) = root_node.select_first("html") { - if let Some(lang) = html_node.attributes.borrow().get("lang") { - language = Some(lang.trim().to_string()); - } - } - - if language.is_none() { - // Next fallback: prefer the content-language header baked into the page itself - if let Ok(meta_node) = root_node.select_first("meta[http-equiv=content-language]") { - if let Some(lang) = meta_node.attributes.borrow().get("content") { - language = Some(lang.trim().to_string()); - } - } - } - - if language.is_none() { - // Next fallback: prefer the content-language received as a header - if let Some(lang) = headers.get("content-language") { - language = Some(lang.to_str()?.to_owned()); - } - } - - let mut antifeature_flags = AnalysisAntifeatures::empty(); - - if is_cf { - antifeature_flags |= AnalysisAntifeatures::CLOUDFLARE; - } - - for (engine_antifeature_flag, adblock_engine) in &self.adblock_engines { - adblock_engine.apply(Box::new(|adblock_engine| { - match analyse_with_ad_block_cosmetic_filter( - &root_node, - &adblock_engine, - url.as_str(), - true, - ) { - Ok(cosmetic_filters_tripped) => { - eprintln!("?cosmetic filters tripped: {}", cosmetic_filters_tripped); - antifeature_flags |= *engine_antifeature_flag; - } - Err(err) => { - eprintln!("Cosmetic Filter Err {:?}", err); - } - }; - Ok(()) - })); - } - - let dense_doc = DenseTree::from_body(root_node.clone()); - let dense_doc_text = Lazy::new(Box::new(|| DenseTree::generate_textual_format(&dense_doc))); - //eprintln!("^^^^^\n{}\n^^^^^", *dense_doc_text); - - if language.is_none() { - // Final fallback: guess the language - language = guess_document_language(&*dense_doc_text); - } - - // Try and enforce some consistency in the language code; - // we want codes like en_US rather than en-us. - if let Some(language) = language.as_mut() { - normalise_language(language); - } - - let mut title = "".to_owned(); - - if let Ok(title_node) = root_node.select_first("head title") { - title = title_node.text_contents(); - } - - let mut feeds = Vec::new(); - let mut icon = None; - - for link_node in root_node.select("head link").into_iter().flatten() { - if let Some(rel) = link_node.attributes.borrow().get("rel") { - let rels = rel.split_whitespace().collect_vec(); - if rels.contains(&"icon") { - // This is an icon - if let Some(href) = link_node.attributes.borrow().get("href") { - let icon_url = url - .join(href) - .context("Failed to resolve or parse canonical URL to icon")?; - - icon = Some(icon_url); - } - } else if rels.contains(&"alternate") { - if let Some(rel_type) = link_node.attributes.borrow().get("type") { - if FEED_LINK_MIME_TYPES.contains(rel_type) { - if let Some(href) = link_node.attributes.borrow().get("href") { - let feed_url = url - .join(href) - .context("Failed to resolve or parse canonical URL to feed")?; - - feeds.push(feed_url); - } - } - } - } - } - } - - let mut readability = - quickpeep_moz_readability::Readability::new_from_node(root_node.clone()); - if let Err(err) = readability.parse(url.as_str()) { - debug!("Failed to analyse readability: {:?}", err); - } - - eprintln!("{:#?}", readability.metadata); - - if title.is_empty() && !readability.metadata.title().is_empty() { - // Fall back to the readability-derived page title - title = readability.metadata.title().to_owned(); - } - - let mut document = DenseDocument { - head: DenseHead { - title, - language: language.unwrap_or(String::with_capacity(0)), - icon: icon - .map(|url| url.as_str().to_owned()) - .unwrap_or(String::with_capacity(0)), - }, - body_content: Vec::with_capacity(0), - body_remainder: Vec::with_capacity(0), - }; - - if let Some(article_node) = readability.article_node { - document.body_remainder = DenseTree::from_body(root_node.clone()); - document.body_content = DenseTree::from_body(article_node); - } - - let bare_size = serde_bare::to_vec(&dense_doc)?.len(); - eprintln!("CS {:?} → {:?}", content.len(), bare_size); - - let references = references::find_references(&document, &feeds, url); - Ok(RakeOutcome::RakedPage(RakedPage { - page_entry: RakedPageEntry { - analysed_antifeatures: antifeature_flags, + match self + .page_extraction + .extract(content_str, url.clone(), headers.clone(), is_cf) + .await? + { + ExtractedPage::Success { document, - }, - referrer_entry: RakedReferrerEntry { references }, - })) + feeds, + antifeature_flags, + } => { + let references = references::find_references(&document, &feeds, url); + Ok(RakeOutcome::RakedPage(RakedPage { + page_entry: RakedPageEntry { + analysed_antifeatures: antifeature_flags, + document, + }, + referrer_entry: RakedReferrerEntry { references }, + })) + } + ExtractedPage::Redirect { reason, new_url } => { + Ok(RakeOutcome::Redirect { reason, new_url }) + } + } } } diff --git a/quickpeep_raker/src/raking/analysis.rs b/quickpeep_raker/src/raking/analysis.rs index b564782..eb4a61e 100644 --- a/quickpeep_raker/src/raking/analysis.rs +++ b/quickpeep_raker/src/raking/analysis.rs @@ -10,10 +10,30 @@ use std::collections::{BTreeSet, HashSet}; use std::net::IpAddr; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; -pub async fn load_adblock_engine( +pub struct PreloadedEngine { + rules: Vec, + rule_types: RuleTypes, +} + +impl PreloadedEngine { + pub fn build(&self) -> Engine { + Engine::from_rules( + &self.rules, + ParseOptions { + format: Default::default(), + include_redirect_urls: false, + rule_types: self.rule_types, + }, + ) + } +} + +/// 'Pre-load' the adblock engine. +/// This doesn't construct the adblock engine yet because the adblock engine is not `Send`. +pub async fn preload_adblock_engine( reader: R, rule_types: RuleTypes, -) -> anyhow::Result { +) -> anyhow::Result { let mut br = BufReader::new(reader); let mut rules = Vec::new(); let mut buf = String::new(); @@ -24,14 +44,7 @@ pub async fn load_adblock_engine( } rules.push(buf.trim().to_owned()); } - Ok(Engine::from_rules( - &rules, - ParseOptions { - format: Default::default(), - include_redirect_urls: false, - rule_types, - }, - )) + Ok(PreloadedEngine { rules, rule_types }) } // Relevant: diff --git a/quickpeep_raker/src/raking/page_extraction.rs b/quickpeep_raker/src/raking/page_extraction.rs new file mode 100644 index 0000000..71ce17b --- /dev/null +++ b/quickpeep_raker/src/raking/page_extraction.rs @@ -0,0 +1,285 @@ +use crate::raking::analysis::{ + analyse_with_ad_block_cosmetic_filter, guess_document_language, PreloadedEngine, +}; +use crate::raking::{normalise_language, RedirectReason, FEED_LINK_MIME_TYPES}; +use adblock::engine::Engine; +use anyhow::{bail, Context}; +use html5ever::tendril::TendrilSink; +use itertools::Itertools; +use kuchiki::NodeRef; +use log::{debug, warn}; +use quickpeep_densedoc::{DenseDocument, DenseHead, DenseTree}; +use quickpeep_structs::rake_entries::AnalysisAntifeatures; +use quickpeep_utils::lazy::Lazy; +use reqwest::header::HeaderMap; +use reqwest::Url; +use tokio::runtime; +use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::LocalSet; + +/// As page extraction requires dealing with things are are not `Send`, we send those tasks off to +/// another thread. +pub struct PageExtractionService { + submission: Sender, +} + +pub struct ExtractionTask { + content: String, + url: Url, + headers: HeaderMap, + is_cf: bool, + response: oneshot::Sender>, +} + +impl PageExtractionService { + pub async fn extract( + &self, + content: String, + url: Url, + headers: HeaderMap, + is_cf: bool, + ) -> anyhow::Result { + let (resp_tx, resp_rx) = oneshot::channel(); + if let Err(_) = self + .submission + .send(ExtractionTask { + content, + url, + headers, + is_cf, + response: resp_tx, + }) + .await + { + bail!("Page extraction service has shut down"); + } + resp_rx.await? + } +} + +impl PageExtractionService { + pub fn new( + adblock_engines: Vec<(AnalysisAntifeatures, PreloadedEngine)>, + ) -> anyhow::Result { + let (submission, mut recv) = mpsc::channel::(16); + + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + std::thread::spawn(move || { + let internal = PageExtractionServiceInternal { + adblock_engines: adblock_engines + .into_iter() + .map(|(aa, engine)| (aa, engine.build())) + .collect(), + }; + + let local = LocalSet::new(); + + local.spawn_local(async move { + while let Some(new_task) = recv.recv().await { + let result = internal.extract_page( + new_task.content, + new_task.url, + new_task.headers, + new_task.is_cf, + ); + if let Err(_) = new_task.response.send(result) { + warn!("Requester went away before could send extracted page"); + } + } + }); + + // This will return once all senders are dropped and all spawned tasks have returned. + rt.block_on(local); + }); + + Ok(PageExtractionService { submission }) + } +} + +struct PageExtractionServiceInternal { + adblock_engines: Vec<(AnalysisAntifeatures, Engine)>, +} + +impl PageExtractionServiceInternal { + fn extract_page( + &self, + content_str: String, + url: Url, + headers: HeaderMap, + is_cf: bool, + ) -> anyhow::Result { + let root_node: NodeRef = kuchiki::parse_html().one(content_str.as_ref()); + + // See whether this page is at the canonical URL for the page. + // If it's not, then we redirect the raker to the canonical URL. + if let Ok(canonical_link_node) = root_node.select_first("head link[rel=canonical]") { + if let Some(canonical_href) = canonical_link_node.attributes.borrow().get("href") { + let canonical_url = url + .join(canonical_href) + .context("Failed to resolve or parse canonical URL")?; + + if &canonical_url != &url { + return Ok(ExtractedPage::Redirect { + reason: RedirectReason::NotCanonical, + new_url: canonical_url, + }); + } + } + } + + // Try and dig up the page's language. + // First try since this is the modern way, and potentially the most trustworthy... + let mut language = None; + + if let Ok(html_node) = root_node.select_first("html") { + if let Some(lang) = html_node.attributes.borrow().get("lang") { + language = Some(lang.trim().to_string()); + } + } + + if language.is_none() { + // Next fallback: prefer the content-language header baked into the page itself + if let Ok(meta_node) = root_node.select_first("meta[http-equiv=content-language]") { + if let Some(lang) = meta_node.attributes.borrow().get("content") { + language = Some(lang.trim().to_string()); + } + } + } + + if language.is_none() { + // Next fallback: prefer the content-language received as a header + if let Some(lang) = headers.get("content-language") { + language = Some(lang.to_str()?.to_owned()); + } + } + + let mut antifeature_flags = AnalysisAntifeatures::empty(); + + if is_cf { + antifeature_flags |= AnalysisAntifeatures::CLOUDFLARE; + } + + for (engine_antifeature_flag, adblock_engine) in &self.adblock_engines { + match analyse_with_ad_block_cosmetic_filter( + &root_node, + &adblock_engine, + url.as_str(), + true, + ) { + Ok(cosmetic_filters_tripped) => { + eprintln!("?cosmetic filters tripped: {}", cosmetic_filters_tripped); + antifeature_flags |= *engine_antifeature_flag; + } + Err(err) => { + eprintln!("Cosmetic Filter Err {:?}", err); + } + }; + } + + let dense_doc = DenseTree::from_body(root_node.clone()); + let dense_doc_text = Lazy::new(Box::new(|| DenseTree::generate_textual_format(&dense_doc))); + //eprintln!("^^^^^\n{}\n^^^^^", *dense_doc_text); + + if language.is_none() { + // Final fallback: guess the language + language = guess_document_language(&*dense_doc_text); + } + + // Try and enforce some consistency in the language code; + // we want codes like en_US rather than en-us. + if let Some(language) = language.as_mut() { + normalise_language(language); + } + + let mut title = "".to_owned(); + + if let Ok(title_node) = root_node.select_first("head title") { + title = title_node.text_contents(); + } + + let mut feeds = Vec::new(); + let mut icon = None; + + for link_node in root_node.select("head link").into_iter().flatten() { + if let Some(rel) = link_node.attributes.borrow().get("rel") { + let rels = rel.split_whitespace().collect_vec(); + if rels.contains(&"icon") { + // This is an icon + if let Some(href) = link_node.attributes.borrow().get("href") { + let icon_url = url + .join(href) + .context("Failed to resolve or parse canonical URL to icon")?; + + icon = Some(icon_url); + } + } else if rels.contains(&"alternate") { + if let Some(rel_type) = link_node.attributes.borrow().get("type") { + if FEED_LINK_MIME_TYPES.contains(rel_type) { + if let Some(href) = link_node.attributes.borrow().get("href") { + let feed_url = url + .join(href) + .context("Failed to resolve or parse canonical URL to feed")?; + + feeds.push(feed_url); + } + } + } + } + } + } + + let mut readability = + quickpeep_moz_readability::Readability::new_from_node(root_node.clone()); + if let Err(err) = readability.parse(url.as_str()) { + debug!("Failed to analyse readability: {:?}", err); + } + + eprintln!("{:#?}", readability.metadata); + + if title.is_empty() && !readability.metadata.title().is_empty() { + // Fall back to the readability-derived page title + title = readability.metadata.title().to_owned(); + } + + let mut document = DenseDocument { + head: DenseHead { + title, + language: language.unwrap_or(String::with_capacity(0)), + icon: icon + .map(|url| url.as_str().to_owned()) + .unwrap_or(String::with_capacity(0)), + }, + body_content: Vec::with_capacity(0), + body_remainder: Vec::with_capacity(0), + }; + + if let Some(article_node) = readability.article_node { + document.body_remainder = DenseTree::from_body(root_node.clone()); + document.body_content = DenseTree::from_body(article_node); + } else { + todo!() + } + + Ok(ExtractedPage::Success { + document, + feeds, + antifeature_flags, + }) + } +} + +pub enum ExtractedPage { + Success { + document: DenseDocument, + feeds: Vec, + antifeature_flags: AnalysisAntifeatures, + }, + Redirect { + reason: RedirectReason, + new_url: Url, + }, +} diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index 25b3a2f..1622145 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -1,14 +1,22 @@ -use crate::raking::{get_robots_txt_for, RakeOutcome, Raker, RobotsTxt, UrlRaked}; +use crate::raking::{ + get_robots_txt_for, robots_txt_url_for, RakeOutcome, Raker, RobotsTxt, TemporaryFailure, + TemporaryFailureReason, UrlRaked, +}; +use crate::storage::records::UrlVisitedRecord; use crate::storage::RakerStore; +use chrono::Utc; use cylon::Cylon; +use log::warn; use lru::LruCache; use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry}; +use quickpeep_utils::dates::date_to_quickpeep_days; use reqwest::{Client, Url}; use std::collections::HashSet; use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::time::Duration; use tokio::sync::mpsc::Sender; use tokio::sync::Semaphore; +use tokio::time::Instant; /// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the /// queue and get turned into a backoff. @@ -68,6 +76,7 @@ impl TaskContext { pub async fn process_domain(&mut self, domain: String) -> anyhow::Result<()> { let mut current_robot_rules_url: Option = None; let mut current_robot_rules: Option = None; + let mut wait_until: Option = None; loop { // Get a URL to process @@ -76,30 +85,96 @@ impl TaskContext { txn.choose_url_for_domain(&domain)? }; - let url = todo!(); - let intent = todo!(); + let (url_str, url_record) = if let Some(url) = url { + url + } else { + // Exhausted all the URLs for this domain. Let's remove it from the active domains + // queue. + let domain = domain.to_owned(); + let out_of_urls = self + .store + .async_rw_txn(move |txn| { + // Double-check we're still out of URLs (another could have been added since + // we last checked!) + let out_of_urls = txn.choose_url_for_domain(&domain)?.is_none(); - // Check our robot rules are valid for that URL - todo!(); + if !out_of_urls { + return Ok(false); + } - let permit = self.semaphore.acquire().await?; + // TODO delete the domain from the store - // TODO process errors - match self.raker.rake(url, intent, &self.client).await { - Ok(rake_outcome) => match self.process_outcome(&url, rake_outcome).await? { - NextAction::Continue => { - todo!() - } - NextAction::ChangeDomain => { - todo!() - } - }, - Err(_) => { + Ok(true) + }) + .await?; + if out_of_urls { + break; + } else { + continue; + } + }; + + let url = Url::parse(&url_str)?; + + // Check our robot rules are valid for that URL. + let robot_url = robots_txt_url_for(&url)?; + if Some(&robot_url) != current_robot_rules_url.as_ref() { + // We need to update our robot rules! + current_robot_rules = self.get_robot_rules(&url).await?; + current_robot_rules_url = Some(robot_url); + } + + // Check our robot rules allow us to visit the desired URL + if let Some(robot_rules) = current_robot_rules.as_ref() { + if !robot_rules.allow(url.path().as_bytes()) { + // Permanently reject this. todo!(); } } + if let Some(wait_until) = wait_until.take() { + // Sleep to respect a crawl-delay + tokio::time::sleep_until(wait_until).await; + } + + // Now acquire a permit to go and fetch the desired URL + let permit = self.semaphore.acquire().await?; + let raked = self.raker.rake(&url, url_record.intent, &self.client).await; drop(permit); + + let rake_outcome = raked.unwrap_or_else(|err| { + warn!("Failed to rake {:?}: {:?}", url, err); + // Treat this as a temporary rejection (backoff). + RakeOutcome::TemporaryFailure(TemporaryFailure { + reason: TemporaryFailureReason::UnknownClientError(format!( + "Failed to rake {:?}: {:?}", + url, err + )), + // Back off for a day: this ought to be enough time for the operator to fix the problem... maybe? + backoff_sec: 86400, + }) + }); + + match self.process_outcome(&url, rake_outcome).await? { + NextAction::Continue => { + // Take that URL off the queue + let now = Utc::today(); + let record = UrlVisitedRecord { + last_visited_days: date_to_quickpeep_days(&now)?, + }; + + let domain = domain.clone(); + self.store + .async_rw_txn(move |txn| { + txn.mark_url_as_visited(domain, url_str, record)?; + Ok(()) + }) + .await?; + } + NextAction::ChangeDomain => { + todo!() + } + } } Ok(()) diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 48c0a72..5457abd 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -1,6 +1,6 @@ use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; -use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord}; +use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord, UrlVisitedRecord}; use anyhow::{bail, ensure, Context}; use libmdbx::{ Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind, @@ -14,7 +14,7 @@ use std::sync::Arc; mod mdbx_helper_types; mod migrations; -mod records; +pub mod records; /// The databases available in an environment. pub struct Databases<'env> { @@ -249,6 +249,33 @@ impl<'a> RakerTxn<'a, RW> { Ok(false) } } + + /// Marks a URL as visited and takes it out of the queue. + pub fn mark_url_as_visited( + &self, + domain: String, + url_str: String, + record: UrlVisitedRecord, + ) -> anyhow::Result<()> { + let queue_urls = &self.mdbx.borrow_dbs().queue_urls; + let visited_urls = &self.mdbx.borrow_dbs().visited; + + let queue_key = format!("{}\n{}", domain, url_str); + + ensure!( + self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?, + "No queued URL to delete" + ); + + self.mdbx_txn.put( + visited_urls, + url_str.as_bytes(), + &MdbxBare(record).as_bytes(), + WriteFlags::empty(), + )?; + + Ok(()) + } } /// Read-only implementations (but can also be used on RW transactions) diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index b748307..a406e6b 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -1,3 +1,4 @@ +use crate::raking::RakeIntent; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -7,7 +8,7 @@ pub struct ActiveDomainRecord { } #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct VisitedDomainRecord { +pub struct UrlVisitedRecord { /// Number of minutes since the QuickPeep Epoch that this page was last raked at. /// We store minutes to give us 60× the range of times. /// We'd really rather stick with 32-bit ints to reduce the space storage requirements. @@ -18,5 +19,5 @@ pub struct VisitedDomainRecord { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct QueueUrlRecord { - // TODO + pub intent: RakeIntent, // TODO } diff --git a/quickpeep_utils/src/lib.rs b/quickpeep_utils/src/lib.rs index 39634ce..6bcb8ed 100644 --- a/quickpeep_utils/src/lib.rs +++ b/quickpeep_utils/src/lib.rs @@ -1,3 +1,2 @@ pub mod dates; pub mod lazy; -pub mod threadguard; \ No newline at end of file diff --git a/quickpeep_utils/src/threadguard.rs b/quickpeep_utils/src/threadguard.rs deleted file mode 100644 index fd29ed1..0000000 --- a/quickpeep_utils/src/threadguard.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::sync::mpsc; -use std::sync::mpsc::Sender; -use std::thread::JoinHandle; -use anyhow::anyhow; -use tokio::sync::oneshot::Sender as OneshotSender; -use log::warn; - -pub type ThreadGuardJob = Box anyhow::Result + Send + Sync>; - -/// Helper to use structs that aren't Sync or Send. -/// Spawns a thread upon which operations can be sent to. -pub struct ThreadGuard { - jobs: Sender<(ThreadGuardJob, OneshotSender>)>, - thread: Option>> -} - -impl Drop for ThreadGuard { - fn drop(&mut self) { - if let Some(thread) = self.thread.take() { - if let Err(err) = thread.join() - .unwrap_or_else(|e| Err(anyhow!("Can't join: {:?}", e))) { - warn!("Failed to join threadguard, or it returned an error."); - } - } - - } -} - -impl ThreadGuard { - pub fn new(initialiser: Box T + Send>) -> ThreadGuard { - let (jobs_tx, jobs_rx) = mpsc::channel::<(ThreadGuardJob, OneshotSender>)>(); - - let thread = Some(std::thread::spawn(move || -> anyhow::Result<()> { - let object = initialiser(); - - while let Ok((job, job_return_tx)) = jobs_rx.recv() { - let result = job(&object); - job_return_tx.send(result); - } - - Ok(()) - })); - - ThreadGuard { - jobs: jobs_tx, - thread - } - } - - pub async fn apply(&self, function: ThreadGuardJob) -> anyhow::Result { - let (return_tx, return_rx) = tokio::sync::oneshot::channel(); - self.jobs.send((function, return_tx))?; - - Ok(return_rx.await??) - } -} \ No newline at end of file