Some partial progress towards raking pages

This commit is contained in:
Olivier 'reivilibre' 2022-03-19 22:57:36 +00:00
parent 5bab279cc2
commit ea4f2d1332
12 changed files with 487 additions and 277 deletions

11
Cargo.lock generated
View File

@ -594,6 +594,16 @@ dependencies = [
"generic-array",
]
[[package]]
name = "diplomatic-bag"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6448adcf5616f6dd9627e545f447cf13dc5fda312f938364240d8feb84b0d12b"
dependencies = [
"crossbeam-channel",
"once_cell",
]
[[package]]
name = "dotenv"
version = "0.15.0"
@ -2687,6 +2697,7 @@ dependencies = [
"clap",
"colour",
"cylon",
"diplomatic-bag",
"env_logger",
"feed-rs",
"futures-util",

View File

@ -45,6 +45,7 @@ env_logger = "0.9.0"
ouroboros = "0.14.2"
rand = "0.8.5"
lru = "0.7.3"
diplomatic-bag = "0.2.0"
### Raking helpers
# HTTP Requests

View File

@ -4,7 +4,8 @@ use clap::Parser;
use colour::{blue_ln, green_ln, red_ln, yellow_ln};
use env_logger::Env;
use log::warn;
use quickpeep_raker::raking::analysis::{load_adblock_engine, IpSet};
use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet};
use quickpeep_raker::raking::page_extraction::PageExtractionService;
use quickpeep_raker::raking::references::references_from_urlrakes;
use quickpeep_raker::raking::{RakeIntent, RakeOutcome};
use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
@ -57,7 +58,7 @@ pub async fn main() -> anyhow::Result<()> {
let file = File::open(&path).await?;
adblock_engines.push((
*antifeature,
load_adblock_engine(file, RuleTypes::All).await?,
preload_adblock_engine(file, RuleTypes::All).await?,
));
}
@ -69,8 +70,8 @@ pub async fn main() -> anyhow::Result<()> {
antifeature_ip_set.add_all_from_file(ips_file).await?;
let raker = Raker {
adblock_engines,
antifeature_ip_set,
page_extraction: PageExtractionService::new(adblock_engines)?,
};
let outcome = raker.rake(&opts.url, RakeIntent::Any, &client).await?;

View File

@ -3,18 +3,19 @@ use clap::Parser;
use env_logger::Env;
use anyhow::{bail, Context};
use log::error;
use lru::LruCache;
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use reqwest::redirect::Policy;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use log::error;
use lru::LruCache;
use tokio::sync::{mpsc, Semaphore};
use quickpeep_raker::config;
use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::raking::page_extraction::PageExtractionService;
use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission};
use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::storage::RakerStore;
/// Seeds a raker's queue with URLs
@ -66,8 +67,8 @@ pub async fn main() -> anyhow::Result<()> {
let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?;
let raker = Raker {
adblock_engines: vec![],
antifeature_ip_set: todo!()
antifeature_ip_set: todo!(),
page_extraction: PageExtractionService::new(vec![])?, // TODO
};
let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers;
@ -78,7 +79,7 @@ pub async fn main() -> anyhow::Result<()> {
let submission = TaskResultSubmission {
pages: pages_tx,
references: refs_tx
references: refs_tx,
};
let task_context = TaskContext {
@ -88,7 +89,7 @@ pub async fn main() -> anyhow::Result<()> {
busy_domains: Arc::new(Mutex::new(Default::default())),
robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))),
semaphore,
submission
submission,
};
let mut tasks = Vec::with_capacity(num_tasks as usize);
@ -101,8 +102,6 @@ pub async fn main() -> anyhow::Result<()> {
error!("Raker task {:?} encountered an error: {:?}", task_num, err);
}
}));
// TODO spawn task
}
drop(task_context);

View File

@ -1,31 +1,24 @@
use crate::raking::analysis::{
analyse_with_ad_block_cosmetic_filter, guess_document_language, IpSet,
};
use adblock::engine::Engine;
use crate::raking::analysis::IpSet;
use crate::raking::page_extraction::{ExtractedPage, PageExtractionService};
use anyhow::{bail, Context};
use chrono::{DateTime, FixedOffset, Utc};
use cylon::Cylon;
use futures_util::stream::StreamExt;
use html5ever::tendril::fmt::Slice;
use itertools::Itertools;
use kuchiki::traits::TendrilSink;
use kuchiki::NodeRef;
use lazy_static::lazy_static;
use log::debug;
use quickpeep_densedoc::{DenseDocument, DenseHead, DenseTree};
use quickpeep_structs::rake_entries::{AnalysisAntifeatures, RakedPageEntry, RakedReferrerEntry};
use quickpeep_utils::lazy::Lazy;
use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry};
use reqwest::header::HeaderMap;
use reqwest::{Client, Response, Url};
use serde::{Deserialize, Serialize};
use sitemap::reader::SiteMapEntity;
use std::collections::HashSet;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
use quickpeep_utils::threadguard::ThreadGuard;
pub mod analysis;
pub mod page_extraction;
pub mod references;
pub mod task;
@ -92,6 +85,7 @@ pub struct PermanentFailure {
pub enum TemporaryFailureReason {
MissingInformation(String),
ServerError(u16),
UnknownClientError(String),
}
#[derive(Debug)]
@ -168,8 +162,8 @@ async fn response_to_bytes_limited(
}
pub struct Raker {
pub adblock_engines: Vec<(AnalysisAntifeatures, ThreadGuard<Engine, ()>)>,
pub antifeature_ip_set: IpSet,
pub page_extraction: PageExtractionService,
}
impl Raker {
@ -249,7 +243,8 @@ impl Raker {
{
// We don't try any fallbacks for an HTML page
return Ok(self
.rake_html_page(&content, url, is_cf, &headers).await
.rake_html_page(&content, url, is_cf, &headers)
.await
.context("Raking HTML page")?);
}
@ -291,172 +286,31 @@ impl Raker {
is_cf: bool,
headers: &HeaderMap,
) -> anyhow::Result<RakeOutcome> {
let content_str = std::str::from_utf8(content)?;
let content_str = std::str::from_utf8(content)?.to_owned();
let root_node: NodeRef = kuchiki::parse_html().one(content_str);
// See whether this page is at the canonical URL for the page.
// If it's not, then we redirect the raker to the canonical URL.
if let Ok(canonical_link_node) = root_node.select_first("head link[rel=canonical]") {
if let Some(canonical_href) = canonical_link_node.attributes.borrow().get("href") {
let canonical_url = url
.join(canonical_href)
.context("Failed to resolve or parse canonical URL")?;
if &canonical_url != url {
return Ok(RakeOutcome::Redirect {
reason: RedirectReason::NotCanonical,
new_url: canonical_url,
});
}
}
}
// Try and dig up the page's language.
// First try <html lang=...> since this is the modern way, and potentially the most trustworthy...
let mut language = None;
if let Ok(html_node) = root_node.select_first("html") {
if let Some(lang) = html_node.attributes.borrow().get("lang") {
language = Some(lang.trim().to_string());
}
}
if language.is_none() {
// Next fallback: prefer the content-language header baked into the page itself
if let Ok(meta_node) = root_node.select_first("meta[http-equiv=content-language]") {
if let Some(lang) = meta_node.attributes.borrow().get("content") {
language = Some(lang.trim().to_string());
}
}
}
if language.is_none() {
// Next fallback: prefer the content-language received as a header
if let Some(lang) = headers.get("content-language") {
language = Some(lang.to_str()?.to_owned());
}
}
let mut antifeature_flags = AnalysisAntifeatures::empty();
if is_cf {
antifeature_flags |= AnalysisAntifeatures::CLOUDFLARE;
}
for (engine_antifeature_flag, adblock_engine) in &self.adblock_engines {
adblock_engine.apply(Box::new(|adblock_engine| {
match analyse_with_ad_block_cosmetic_filter(
&root_node,
&adblock_engine,
url.as_str(),
true,
) {
Ok(cosmetic_filters_tripped) => {
eprintln!("?cosmetic filters tripped: {}", cosmetic_filters_tripped);
antifeature_flags |= *engine_antifeature_flag;
}
Err(err) => {
eprintln!("Cosmetic Filter Err {:?}", err);
}
};
Ok(())
}));
}
let dense_doc = DenseTree::from_body(root_node.clone());
let dense_doc_text = Lazy::new(Box::new(|| DenseTree::generate_textual_format(&dense_doc)));
//eprintln!("^^^^^\n{}\n^^^^^", *dense_doc_text);
if language.is_none() {
// Final fallback: guess the language
language = guess_document_language(&*dense_doc_text);
}
// Try and enforce some consistency in the language code;
// we want codes like en_US rather than en-us.
if let Some(language) = language.as_mut() {
normalise_language(language);
}
let mut title = "".to_owned();
if let Ok(title_node) = root_node.select_first("head title") {
title = title_node.text_contents();
}
let mut feeds = Vec::new();
let mut icon = None;
for link_node in root_node.select("head link").into_iter().flatten() {
if let Some(rel) = link_node.attributes.borrow().get("rel") {
let rels = rel.split_whitespace().collect_vec();
if rels.contains(&"icon") {
// This is an icon
if let Some(href) = link_node.attributes.borrow().get("href") {
let icon_url = url
.join(href)
.context("Failed to resolve or parse canonical URL to icon")?;
icon = Some(icon_url);
}
} else if rels.contains(&"alternate") {
if let Some(rel_type) = link_node.attributes.borrow().get("type") {
if FEED_LINK_MIME_TYPES.contains(rel_type) {
if let Some(href) = link_node.attributes.borrow().get("href") {
let feed_url = url
.join(href)
.context("Failed to resolve or parse canonical URL to feed")?;
feeds.push(feed_url);
}
}
}
}
}
}
let mut readability =
quickpeep_moz_readability::Readability::new_from_node(root_node.clone());
if let Err(err) = readability.parse(url.as_str()) {
debug!("Failed to analyse readability: {:?}", err);
}
eprintln!("{:#?}", readability.metadata);
if title.is_empty() && !readability.metadata.title().is_empty() {
// Fall back to the readability-derived page title
title = readability.metadata.title().to_owned();
}
let mut document = DenseDocument {
head: DenseHead {
title,
language: language.unwrap_or(String::with_capacity(0)),
icon: icon
.map(|url| url.as_str().to_owned())
.unwrap_or(String::with_capacity(0)),
},
body_content: Vec::with_capacity(0),
body_remainder: Vec::with_capacity(0),
};
if let Some(article_node) = readability.article_node {
document.body_remainder = DenseTree::from_body(root_node.clone());
document.body_content = DenseTree::from_body(article_node);
}
let bare_size = serde_bare::to_vec(&dense_doc)?.len();
eprintln!("CS {:?}{:?}", content.len(), bare_size);
let references = references::find_references(&document, &feeds, url);
Ok(RakeOutcome::RakedPage(RakedPage {
page_entry: RakedPageEntry {
analysed_antifeatures: antifeature_flags,
match self
.page_extraction
.extract(content_str, url.clone(), headers.clone(), is_cf)
.await?
{
ExtractedPage::Success {
document,
},
referrer_entry: RakedReferrerEntry { references },
}))
feeds,
antifeature_flags,
} => {
let references = references::find_references(&document, &feeds, url);
Ok(RakeOutcome::RakedPage(RakedPage {
page_entry: RakedPageEntry {
analysed_antifeatures: antifeature_flags,
document,
},
referrer_entry: RakedReferrerEntry { references },
}))
}
ExtractedPage::Redirect { reason, new_url } => {
Ok(RakeOutcome::Redirect { reason, new_url })
}
}
}
}

View File

@ -10,10 +10,30 @@ use std::collections::{BTreeSet, HashSet};
use std::net::IpAddr;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
pub async fn load_adblock_engine<R: AsyncRead + Unpin>(
pub struct PreloadedEngine {
rules: Vec<String>,
rule_types: RuleTypes,
}
impl PreloadedEngine {
pub fn build(&self) -> Engine {
Engine::from_rules(
&self.rules,
ParseOptions {
format: Default::default(),
include_redirect_urls: false,
rule_types: self.rule_types,
},
)
}
}
/// 'Pre-load' the adblock engine.
/// This doesn't construct the adblock engine yet because the adblock engine is not `Send`.
pub async fn preload_adblock_engine<R: AsyncRead + Unpin>(
reader: R,
rule_types: RuleTypes,
) -> anyhow::Result<Engine> {
) -> anyhow::Result<PreloadedEngine> {
let mut br = BufReader::new(reader);
let mut rules = Vec::new();
let mut buf = String::new();
@ -24,14 +44,7 @@ pub async fn load_adblock_engine<R: AsyncRead + Unpin>(
}
rules.push(buf.trim().to_owned());
}
Ok(Engine::from_rules(
&rules,
ParseOptions {
format: Default::default(),
include_redirect_urls: false,
rule_types,
},
))
Ok(PreloadedEngine { rules, rule_types })
}
// Relevant:

View File

@ -0,0 +1,285 @@
use crate::raking::analysis::{
analyse_with_ad_block_cosmetic_filter, guess_document_language, PreloadedEngine,
};
use crate::raking::{normalise_language, RedirectReason, FEED_LINK_MIME_TYPES};
use adblock::engine::Engine;
use anyhow::{bail, Context};
use html5ever::tendril::TendrilSink;
use itertools::Itertools;
use kuchiki::NodeRef;
use log::{debug, warn};
use quickpeep_densedoc::{DenseDocument, DenseHead, DenseTree};
use quickpeep_structs::rake_entries::AnalysisAntifeatures;
use quickpeep_utils::lazy::Lazy;
use reqwest::header::HeaderMap;
use reqwest::Url;
use tokio::runtime;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio::task::LocalSet;
/// As page extraction requires dealing with things are are not `Send`, we send those tasks off to
/// another thread.
pub struct PageExtractionService {
submission: Sender<ExtractionTask>,
}
pub struct ExtractionTask {
content: String,
url: Url,
headers: HeaderMap,
is_cf: bool,
response: oneshot::Sender<anyhow::Result<ExtractedPage>>,
}
impl PageExtractionService {
pub async fn extract(
&self,
content: String,
url: Url,
headers: HeaderMap,
is_cf: bool,
) -> anyhow::Result<ExtractedPage> {
let (resp_tx, resp_rx) = oneshot::channel();
if let Err(_) = self
.submission
.send(ExtractionTask {
content,
url,
headers,
is_cf,
response: resp_tx,
})
.await
{
bail!("Page extraction service has shut down");
}
resp_rx.await?
}
}
impl PageExtractionService {
pub fn new(
adblock_engines: Vec<(AnalysisAntifeatures, PreloadedEngine)>,
) -> anyhow::Result<PageExtractionService> {
let (submission, mut recv) = mpsc::channel::<ExtractionTask>(16);
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()?;
std::thread::spawn(move || {
let internal = PageExtractionServiceInternal {
adblock_engines: adblock_engines
.into_iter()
.map(|(aa, engine)| (aa, engine.build()))
.collect(),
};
let local = LocalSet::new();
local.spawn_local(async move {
while let Some(new_task) = recv.recv().await {
let result = internal.extract_page(
new_task.content,
new_task.url,
new_task.headers,
new_task.is_cf,
);
if let Err(_) = new_task.response.send(result) {
warn!("Requester went away before could send extracted page");
}
}
});
// This will return once all senders are dropped and all spawned tasks have returned.
rt.block_on(local);
});
Ok(PageExtractionService { submission })
}
}
struct PageExtractionServiceInternal {
adblock_engines: Vec<(AnalysisAntifeatures, Engine)>,
}
impl PageExtractionServiceInternal {
fn extract_page(
&self,
content_str: String,
url: Url,
headers: HeaderMap,
is_cf: bool,
) -> anyhow::Result<ExtractedPage> {
let root_node: NodeRef = kuchiki::parse_html().one(content_str.as_ref());
// See whether this page is at the canonical URL for the page.
// If it's not, then we redirect the raker to the canonical URL.
if let Ok(canonical_link_node) = root_node.select_first("head link[rel=canonical]") {
if let Some(canonical_href) = canonical_link_node.attributes.borrow().get("href") {
let canonical_url = url
.join(canonical_href)
.context("Failed to resolve or parse canonical URL")?;
if &canonical_url != &url {
return Ok(ExtractedPage::Redirect {
reason: RedirectReason::NotCanonical,
new_url: canonical_url,
});
}
}
}
// Try and dig up the page's language.
// First try <html lang=...> since this is the modern way, and potentially the most trustworthy...
let mut language = None;
if let Ok(html_node) = root_node.select_first("html") {
if let Some(lang) = html_node.attributes.borrow().get("lang") {
language = Some(lang.trim().to_string());
}
}
if language.is_none() {
// Next fallback: prefer the content-language header baked into the page itself
if let Ok(meta_node) = root_node.select_first("meta[http-equiv=content-language]") {
if let Some(lang) = meta_node.attributes.borrow().get("content") {
language = Some(lang.trim().to_string());
}
}
}
if language.is_none() {
// Next fallback: prefer the content-language received as a header
if let Some(lang) = headers.get("content-language") {
language = Some(lang.to_str()?.to_owned());
}
}
let mut antifeature_flags = AnalysisAntifeatures::empty();
if is_cf {
antifeature_flags |= AnalysisAntifeatures::CLOUDFLARE;
}
for (engine_antifeature_flag, adblock_engine) in &self.adblock_engines {
match analyse_with_ad_block_cosmetic_filter(
&root_node,
&adblock_engine,
url.as_str(),
true,
) {
Ok(cosmetic_filters_tripped) => {
eprintln!("?cosmetic filters tripped: {}", cosmetic_filters_tripped);
antifeature_flags |= *engine_antifeature_flag;
}
Err(err) => {
eprintln!("Cosmetic Filter Err {:?}", err);
}
};
}
let dense_doc = DenseTree::from_body(root_node.clone());
let dense_doc_text = Lazy::new(Box::new(|| DenseTree::generate_textual_format(&dense_doc)));
//eprintln!("^^^^^\n{}\n^^^^^", *dense_doc_text);
if language.is_none() {
// Final fallback: guess the language
language = guess_document_language(&*dense_doc_text);
}
// Try and enforce some consistency in the language code;
// we want codes like en_US rather than en-us.
if let Some(language) = language.as_mut() {
normalise_language(language);
}
let mut title = "".to_owned();
if let Ok(title_node) = root_node.select_first("head title") {
title = title_node.text_contents();
}
let mut feeds = Vec::new();
let mut icon = None;
for link_node in root_node.select("head link").into_iter().flatten() {
if let Some(rel) = link_node.attributes.borrow().get("rel") {
let rels = rel.split_whitespace().collect_vec();
if rels.contains(&"icon") {
// This is an icon
if let Some(href) = link_node.attributes.borrow().get("href") {
let icon_url = url
.join(href)
.context("Failed to resolve or parse canonical URL to icon")?;
icon = Some(icon_url);
}
} else if rels.contains(&"alternate") {
if let Some(rel_type) = link_node.attributes.borrow().get("type") {
if FEED_LINK_MIME_TYPES.contains(rel_type) {
if let Some(href) = link_node.attributes.borrow().get("href") {
let feed_url = url
.join(href)
.context("Failed to resolve or parse canonical URL to feed")?;
feeds.push(feed_url);
}
}
}
}
}
}
let mut readability =
quickpeep_moz_readability::Readability::new_from_node(root_node.clone());
if let Err(err) = readability.parse(url.as_str()) {
debug!("Failed to analyse readability: {:?}", err);
}
eprintln!("{:#?}", readability.metadata);
if title.is_empty() && !readability.metadata.title().is_empty() {
// Fall back to the readability-derived page title
title = readability.metadata.title().to_owned();
}
let mut document = DenseDocument {
head: DenseHead {
title,
language: language.unwrap_or(String::with_capacity(0)),
icon: icon
.map(|url| url.as_str().to_owned())
.unwrap_or(String::with_capacity(0)),
},
body_content: Vec::with_capacity(0),
body_remainder: Vec::with_capacity(0),
};
if let Some(article_node) = readability.article_node {
document.body_remainder = DenseTree::from_body(root_node.clone());
document.body_content = DenseTree::from_body(article_node);
} else {
todo!()
}
Ok(ExtractedPage::Success {
document,
feeds,
antifeature_flags,
})
}
}
pub enum ExtractedPage {
Success {
document: DenseDocument,
feeds: Vec<Url>,
antifeature_flags: AnalysisAntifeatures,
},
Redirect {
reason: RedirectReason,
new_url: Url,
},
}

View File

@ -1,14 +1,22 @@
use crate::raking::{get_robots_txt_for, RakeOutcome, Raker, RobotsTxt, UrlRaked};
use crate::raking::{
get_robots_txt_for, robots_txt_url_for, RakeOutcome, Raker, RobotsTxt, TemporaryFailure,
TemporaryFailureReason, UrlRaked,
};
use crate::storage::records::UrlVisitedRecord;
use crate::storage::RakerStore;
use chrono::Utc;
use cylon::Cylon;
use log::warn;
use lru::LruCache;
use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry};
use quickpeep_utils::dates::date_to_quickpeep_days;
use reqwest::{Client, Url};
use std::collections::HashSet;
use std::sync::{Arc, Mutex as StdMutex, RwLock};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore;
use tokio::time::Instant;
/// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the
/// queue and get turned into a backoff.
@ -68,6 +76,7 @@ impl TaskContext {
pub async fn process_domain(&mut self, domain: String) -> anyhow::Result<()> {
let mut current_robot_rules_url: Option<Url> = None;
let mut current_robot_rules: Option<Cylon> = None;
let mut wait_until: Option<Instant> = None;
loop {
// Get a URL to process
@ -76,30 +85,96 @@ impl TaskContext {
txn.choose_url_for_domain(&domain)?
};
let url = todo!();
let intent = todo!();
let (url_str, url_record) = if let Some(url) = url {
url
} else {
// Exhausted all the URLs for this domain. Let's remove it from the active domains
// queue.
let domain = domain.to_owned();
let out_of_urls = self
.store
.async_rw_txn(move |txn| {
// Double-check we're still out of URLs (another could have been added since
// we last checked!)
let out_of_urls = txn.choose_url_for_domain(&domain)?.is_none();
// Check our robot rules are valid for that URL
todo!();
if !out_of_urls {
return Ok(false);
}
let permit = self.semaphore.acquire().await?;
// TODO delete the domain from the store
// TODO process errors
match self.raker.rake(url, intent, &self.client).await {
Ok(rake_outcome) => match self.process_outcome(&url, rake_outcome).await? {
NextAction::Continue => {
todo!()
}
NextAction::ChangeDomain => {
todo!()
}
},
Err(_) => {
Ok(true)
})
.await?;
if out_of_urls {
break;
} else {
continue;
}
};
let url = Url::parse(&url_str)?;
// Check our robot rules are valid for that URL.
let robot_url = robots_txt_url_for(&url)?;
if Some(&robot_url) != current_robot_rules_url.as_ref() {
// We need to update our robot rules!
current_robot_rules = self.get_robot_rules(&url).await?;
current_robot_rules_url = Some(robot_url);
}
// Check our robot rules allow us to visit the desired URL
if let Some(robot_rules) = current_robot_rules.as_ref() {
if !robot_rules.allow(url.path().as_bytes()) {
// Permanently reject this.
todo!();
}
}
if let Some(wait_until) = wait_until.take() {
// Sleep to respect a crawl-delay
tokio::time::sleep_until(wait_until).await;
}
// Now acquire a permit to go and fetch the desired URL
let permit = self.semaphore.acquire().await?;
let raked = self.raker.rake(&url, url_record.intent, &self.client).await;
drop(permit);
let rake_outcome = raked.unwrap_or_else(|err| {
warn!("Failed to rake {:?}: {:?}", url, err);
// Treat this as a temporary rejection (backoff).
RakeOutcome::TemporaryFailure(TemporaryFailure {
reason: TemporaryFailureReason::UnknownClientError(format!(
"Failed to rake {:?}: {:?}",
url, err
)),
// Back off for a day: this ought to be enough time for the operator to fix the problem... maybe?
backoff_sec: 86400,
})
});
match self.process_outcome(&url, rake_outcome).await? {
NextAction::Continue => {
// Take that URL off the queue
let now = Utc::today();
let record = UrlVisitedRecord {
last_visited_days: date_to_quickpeep_days(&now)?,
};
let domain = domain.clone();
self.store
.async_rw_txn(move |txn| {
txn.mark_url_as_visited(domain, url_str, record)?;
Ok(())
})
.await?;
}
NextAction::ChangeDomain => {
todo!()
}
}
}
Ok(())

View File

@ -1,6 +1,6 @@
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32};
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord};
use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord, UrlVisitedRecord};
use anyhow::{bail, ensure, Context};
use libmdbx::{
Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind,
@ -14,7 +14,7 @@ use std::sync::Arc;
mod mdbx_helper_types;
mod migrations;
mod records;
pub mod records;
/// The databases available in an environment.
pub struct Databases<'env> {
@ -249,6 +249,33 @@ impl<'a> RakerTxn<'a, RW> {
Ok(false)
}
}
/// Marks a URL as visited and takes it out of the queue.
pub fn mark_url_as_visited(
&self,
domain: String,
url_str: String,
record: UrlVisitedRecord,
) -> anyhow::Result<()> {
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
let visited_urls = &self.mdbx.borrow_dbs().visited;
let queue_key = format!("{}\n{}", domain, url_str);
ensure!(
self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?,
"No queued URL to delete"
);
self.mdbx_txn.put(
visited_urls,
url_str.as_bytes(),
&MdbxBare(record).as_bytes(),
WriteFlags::empty(),
)?;
Ok(())
}
}
/// Read-only implementations (but can also be used on RW transactions)

View File

@ -1,3 +1,4 @@
use crate::raking::RakeIntent;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
@ -7,7 +8,7 @@ pub struct ActiveDomainRecord {
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct VisitedDomainRecord {
pub struct UrlVisitedRecord {
/// Number of minutes since the QuickPeep Epoch that this page was last raked at.
/// We store minutes to give us 60× the range of times.
/// We'd really rather stick with 32-bit ints to reduce the space storage requirements.
@ -18,5 +19,5 @@ pub struct VisitedDomainRecord {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct QueueUrlRecord {
// TODO
pub intent: RakeIntent, // TODO
}

View File

@ -1,3 +1,2 @@
pub mod dates;
pub mod lazy;
pub mod threadguard;

View File

@ -1,56 +0,0 @@
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use std::thread::JoinHandle;
use anyhow::anyhow;
use tokio::sync::oneshot::Sender as OneshotSender;
use log::warn;
pub type ThreadGuardJob<T, R> = Box<dyn FnOnce(&T) -> anyhow::Result<R> + Send + Sync>;
/// Helper to use structs that aren't Sync or Send.
/// Spawns a thread upon which operations can be sent to.
pub struct ThreadGuard<T, R> {
jobs: Sender<(ThreadGuardJob<T, R>, OneshotSender<anyhow::Result<R>>)>,
thread: Option<JoinHandle<anyhow::Result<()>>>
}
impl<T, R> Drop for ThreadGuard<T, R> {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
if let Err(err) = thread.join()
.unwrap_or_else(|e| Err(anyhow!("Can't join: {:?}", e))) {
warn!("Failed to join threadguard, or it returned an error.");
}
}
}
}
impl<T: 'static, R: Send + 'static> ThreadGuard<T, R> {
pub fn new(initialiser: Box<dyn FnOnce() -> T + Send>) -> ThreadGuard<T, R> {
let (jobs_tx, jobs_rx) = mpsc::channel::<(ThreadGuardJob<T, R>, OneshotSender<anyhow::Result<R>>)>();
let thread = Some(std::thread::spawn(move || -> anyhow::Result<()> {
let object = initialiser();
while let Ok((job, job_return_tx)) = jobs_rx.recv() {
let result = job(&object);
job_return_tx.send(result);
}
Ok(())
}));
ThreadGuard {
jobs: jobs_tx,
thread
}
}
pub async fn apply(&self, function: ThreadGuardJob<T, R>) -> anyhow::Result<R> {
let (return_tx, return_rx) = tokio::sync::oneshot::channel();
self.jobs.send((function, return_tx))?;
Ok(return_rx.await??)
}
}