Get ever closer to a raker being usable
continuous-integration/drone the build failed Details

This commit is contained in:
Olivier 'reivilibre' 2022-03-20 00:08:37 +00:00
parent ea4f2d1332
commit 085020b80d
8 changed files with 541 additions and 39 deletions

View File

@ -2,21 +2,35 @@ use clap::Parser;
use env_logger::Env; use env_logger::Env;
use adblock::lists::RuleTypes;
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use log::error; use log::{error, warn};
use lru::LruCache; use lru::LruCache;
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use reqwest::redirect::Policy; use reqwest::redirect::Policy;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
use tokio::fs::File;
use tokio::sync::{mpsc, Semaphore}; use tokio::sync::{mpsc, Semaphore};
use quickpeep_raker::config; use quickpeep_raker::config;
use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet};
use quickpeep_raker::raking::page_extraction::PageExtractionService; use quickpeep_raker::raking::page_extraction::PageExtractionService;
use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission}; use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission};
use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::storage::RakerStore; use quickpeep_raker::storage::RakerStore;
use quickpeep_structs::rake_entries::AnalysisAntifeatures;
/// The ordering is slightly important on these: more specific things should come first.
/// This means they filter out the troublesome elements before the broader filters do.
pub const ADBLOCK_FILTER_PATHS: [(AnalysisAntifeatures, &'static str); 5] = [
(AnalysisAntifeatures::COOKIE_NAG, "cookie_nag"),
(AnalysisAntifeatures::ANNOYANCE, "annoyance"),
(AnalysisAntifeatures::SOCIAL, "social"),
(AnalysisAntifeatures::PRIVACY, "privacy"),
(AnalysisAntifeatures::ADVERTS, "adverts"),
];
/// Seeds a raker's queue with URLs /// Seeds a raker's queue with URLs
#[derive(Clone, Debug, Parser)] #[derive(Clone, Debug, Parser)]
@ -66,9 +80,30 @@ pub async fn main() -> anyhow::Result<()> {
let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?;
let mut adblock_engines = Vec::new();
for (antifeature, name) in &ADBLOCK_FILTER_PATHS {
let path = PathBuf::from(config.data_dir.join(format!("{}.adblock", name)));
if !path.exists() {
warn!("Missing adblock rules: {:?}.", path);
continue;
}
let file = File::open(&path).await?;
adblock_engines.push((
*antifeature,
preload_adblock_engine(file, RuleTypes::All).await?,
));
}
let mut antifeature_ip_set = IpSet::new();
let ips_file = File::open(config.data_dir.join("cf_ips.txt"))
.await
.context("Failed to open CF IPs file")?;
antifeature_ip_set.add_all_from_file(ips_file).await?;
let raker = Raker { let raker = Raker {
antifeature_ip_set: todo!(), antifeature_ip_set,
page_extraction: PageExtractionService::new(vec![])?, // TODO page_extraction: PageExtractionService::new(adblock_engines)?,
}; };
let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers;
@ -76,10 +111,12 @@ pub async fn main() -> anyhow::Result<()> {
let (pages_tx, pages_rx) = mpsc::channel(32); let (pages_tx, pages_rx) = mpsc::channel(32);
let (refs_tx, refs_rx) = mpsc::channel(32); let (refs_tx, refs_rx) = mpsc::channel(32);
let (rejections_tx, rejections_rx) = mpsc::channel(32);
let submission = TaskResultSubmission { let submission = TaskResultSubmission {
pages: pages_tx, pages: pages_tx,
references: refs_tx, references: refs_tx,
rejections: rejections_tx,
}; };
let task_context = TaskContext { let task_context = TaskContext {

View File

@ -8,7 +8,7 @@ use html5ever::tendril::fmt::Slice;
use itertools::Itertools; use itertools::Itertools;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::debug; use log::debug;
use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry}; use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry, ReferenceKind};
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use reqwest::{Client, Response, Url}; use reqwest::{Client, Response, Url};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -70,27 +70,29 @@ pub struct RobotsTxt {
pub rules: Cylon, pub rules: Cylon,
} }
#[derive(Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TemporaryFailure { pub struct TemporaryFailure {
pub reason: TemporaryFailureReason, pub reason: TemporaryFailureReason,
pub backoff_sec: u32, pub backoff_sec: u32,
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct PermanentFailure { pub struct PermanentFailure {
pub reason: PermanentFailureReason, pub reason: PermanentFailureReason,
} }
#[derive(Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum TemporaryFailureReason { pub enum TemporaryFailureReason {
MissingInformation(String), MissingInformation(String),
ServerError(u16), ServerError(u16),
UnknownClientError(String), UnknownClientError(String),
ExcruciatingCrawlDelay(u64),
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub enum PermanentFailureReason { pub enum PermanentFailureReason {
ResourceDenied(u16), ResourceDenied(u16),
DeniedToRobots,
WrongLanguage(String), WrongLanguage(String),
UnknownContentType(String), UnknownContentType(String),
} }
@ -103,6 +105,28 @@ pub enum RakeIntent {
SiteMap, SiteMap,
} }
impl From<ReferenceKind> for RakeIntent {
fn from(kind: ReferenceKind) -> Self {
match kind {
ReferenceKind::CanonicalUrl => {
// FIXME We don't know what this is a canonical URL for. Suppose it doesn't matter...
RakeIntent::Any
}
ReferenceKind::Redirect => {
// FIXME We don't know what this is a redirect for. Suppose it doesn't matter...
RakeIntent::Any
}
ReferenceKind::Link => {
// Links can go to pages but also to RSS feeds
RakeIntent::Any
}
ReferenceKind::HeaderLinkedFeed => RakeIntent::Feed,
ReferenceKind::FeedEntry => RakeIntent::Page,
ReferenceKind::SitemapEntry => RakeIntent::Page,
}
}
}
lazy_static! { lazy_static! {
static ref SITEMAP_MIME_TYPES: HashSet<&'static str> = static ref SITEMAP_MIME_TYPES: HashSet<&'static str> =
HashSet::from_iter(vec!["text/xml", "application/xml",]); HashSet::from_iter(vec!["text/xml", "application/xml",]);

View File

@ -1,6 +1,7 @@
use crate::raking::UrlRaked; use crate::raking::UrlRaked;
use quickpeep_densedoc::{DenseDocument, DenseTree}; use quickpeep_densedoc::{DenseDocument, DenseTree};
use quickpeep_structs::rake_entries::{RakedReference, ReferenceKind}; use quickpeep_structs::rake_entries::{RakedReference, ReferenceKind};
use quickpeep_utils::dates::date_to_quickpeep_days;
use reqwest::Url; use reqwest::Url;
use std::collections::BTreeSet; use std::collections::BTreeSet;
@ -42,6 +43,7 @@ pub fn find_references(
refs.insert(RakedReference { refs.insert(RakedReference {
target: clean_url(&full_url).to_string(), target: clean_url(&full_url).to_string(),
kind: ReferenceKind::Link, kind: ReferenceKind::Link,
last_mod: None,
}); });
} }
} }
@ -60,6 +62,7 @@ pub fn find_references(
refs.insert(RakedReference { refs.insert(RakedReference {
target: clean_url(feed).as_str().to_owned(), target: clean_url(feed).as_str().to_owned(),
kind: ReferenceKind::HeaderLinkedFeed, kind: ReferenceKind::HeaderLinkedFeed,
last_mod: None,
}); });
} }
@ -75,6 +78,10 @@ pub fn references_from_urlrakes(
.map(|url_raked| RakedReference { .map(|url_raked| RakedReference {
target: url_raked.url.to_string(), target: url_raked.url.to_string(),
kind: ref_kind, kind: ref_kind,
last_mod: url_raked
.last_changed
.map(|datetime| date_to_quickpeep_days(&datetime.date()).ok())
.flatten(),
}) })
.collect() .collect()
} }

View File

@ -1,16 +1,22 @@
use crate::raking::analysis::get_reduced_domain;
use crate::raking::references::references_from_urlrakes;
use crate::raking::{ use crate::raking::{
get_robots_txt_for, robots_txt_url_for, RakeOutcome, Raker, RobotsTxt, TemporaryFailure, get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome,
TemporaryFailureReason, UrlRaked, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
}; };
use crate::storage::records::UrlVisitedRecord; use crate::storage::records::{ActiveDomainRecord, UrlVisitedRecord};
use crate::storage::RakerStore; use crate::storage::RakerStore;
use anyhow::{anyhow, Context};
use chrono::Utc; use chrono::Utc;
use cylon::Cylon; use cylon::Cylon;
use log::warn; use log::warn;
use lru::LruCache; use lru::LruCache;
use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry}; use quickpeep_structs::rake_entries::{
RakedPageEntry, RakedReference, RakedReferrerEntry, ReferenceKind,
};
use quickpeep_utils::dates::date_to_quickpeep_days; use quickpeep_utils::dates::date_to_quickpeep_days;
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use std::borrow::Cow;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::sync::{Arc, Mutex as StdMutex, RwLock};
use std::time::Duration; use std::time::Duration;
@ -36,6 +42,7 @@ enum NextAction {
pub struct TaskResultSubmission { pub struct TaskResultSubmission {
pub pages: Sender<(Url, RakedPageEntry)>, pub pages: Sender<(Url, RakedPageEntry)>,
pub references: Sender<(Url, RakedReferrerEntry)>, pub references: Sender<(Url, RakedReferrerEntry)>,
pub rejections: Sender<(Url, PermanentFailure)>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -62,9 +69,40 @@ pub struct TaskContext {
} }
impl TaskContext { impl TaskContext {
pub async fn run(self) -> anyhow::Result<()> { pub async fn run(mut self) -> anyhow::Result<()> {
// Get a domain to process // Get a domain to process
todo!(); loop {
let domain = {
let txn = self.store.ro_txn()?;
txn.choose_random_active_domain()?
};
match domain {
Some((domain, active_record)) => {
let is_ours = {
let mut busy_domains = self
.busy_domains
.lock()
.map_err(|_| anyhow!("Busy Domains set poisoned"))?;
busy_domains.insert(domain.clone())
};
if is_ours {
self.process_domain(domain).await?;
} else {
// Already in use; need to pick again
// TODO be smarter about this.
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
None => {
// No domains left! Woot.
// TODO come up with some stopping conditions
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
}
Ok(()) Ok(())
} }
@ -128,7 +166,14 @@ impl TaskContext {
if let Some(robot_rules) = current_robot_rules.as_ref() { if let Some(robot_rules) = current_robot_rules.as_ref() {
if !robot_rules.allow(url.path().as_bytes()) { if !robot_rules.allow(url.path().as_bytes()) {
// Permanently reject this. // Permanently reject this.
todo!(); self.process_outcome(
&url,
RakeOutcome::PermanentFailure(PermanentFailure {
reason: PermanentFailureReason::DeniedToRobots,
}),
)
.await?;
continue;
} }
} }
@ -137,11 +182,23 @@ impl TaskContext {
tokio::time::sleep_until(wait_until).await; tokio::time::sleep_until(wait_until).await;
} }
let delay = if let Some(robot_rules) = current_robot_rules.as_ref() {
robot_rules
.delay()
.map(Duration::from_secs)
.unwrap_or(DEFAULT_CRAWL_DELAY)
} else {
DEFAULT_CRAWL_DELAY
};
// Now acquire a permit to go and fetch the desired URL // Now acquire a permit to go and fetch the desired URL
let permit = self.semaphore.acquire().await?; let permit = self.semaphore.acquire().await?;
let raked = self.raker.rake(&url, url_record.intent, &self.client).await; let raked = self.raker.rake(&url, url_record.intent, &self.client).await;
drop(permit); drop(permit);
// Next time, we need to wait before our request.
wait_until = Some(Instant::now() + delay);
let rake_outcome = raked.unwrap_or_else(|err| { let rake_outcome = raked.unwrap_or_else(|err| {
warn!("Failed to rake {:?}: {:?}", url, err); warn!("Failed to rake {:?}: {:?}", url, err);
// Treat this as a temporary rejection (backoff). // Treat this as a temporary rejection (backoff).
@ -166,15 +223,50 @@ impl TaskContext {
let domain = domain.clone(); let domain = domain.clone();
self.store self.store
.async_rw_txn(move |txn| { .async_rw_txn(move |txn| {
txn.mark_url_as_visited(domain, url_str, record)?; txn.mark_url_as_visited(&domain, &url_str, record)?;
Ok(()) Ok(())
}) })
.await?; .await?;
} }
NextAction::ChangeDomain => { NextAction::ChangeDomain => {
todo!() let mut cache = self
.robotstxt_cache
.write()
.map_err(|_| anyhow!("Robots.txt cache poisoned"))?;
cache.put(domain, current_robot_rules);
// Choose another domain
return Ok(());
} }
} }
if delay > MAX_CRAWL_DELAY_BEFORE_BACKOFF {
// This crawl-delay is painfully long!
// We'll respect it, but we treat this as a backoff and let the site lose its place
// in the queue.
let domain = domain.clone();
let url = url.clone();
let backoff = delay.as_secs().try_into().unwrap_or(u32::MAX);
self.store
.async_rw_txn(move |txn| {
txn.start_backing_off(
&domain,
backoff,
url.to_string(),
TemporaryFailure {
reason: TemporaryFailureReason::ExcruciatingCrawlDelay(
delay.as_secs(),
),
// Don't stack this up with a backoff; it's not an actual failure!
backoff_sec: 0,
},
)?;
Ok(())
})
.await?;
}
} }
Ok(()) Ok(())
@ -182,18 +274,200 @@ impl TaskContext {
/// Processes the outcome of /// Processes the outcome of
async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result<NextAction> { async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result<NextAction> {
let today = date_to_quickpeep_days(&Utc::today())?;
match outcome { match outcome {
RakeOutcome::RakedPage(page) => {} RakeOutcome::RakedPage(page) => {
RakeOutcome::RakedFeed(feed) => {} self.submission
RakeOutcome::RakedSitemap(sitemap) => {} .pages
RakeOutcome::Redirect { reason, new_url } => {} .send((url.clone(), page.page_entry))
RakeOutcome::TemporaryFailure(failure) => {} .await
RakeOutcome::PermanentFailure(failure) => {} .context("Page processor shut down; can't stream page!")?;
self.submission
.references
.send((url.clone(), page.referrer_entry.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_page(url.clone(), today)
.await?;
self.as_event_processor()
.process_refs(url.clone(), page.referrer_entry, today)
.await?;
Ok(NextAction::Continue)
}
RakeOutcome::RakedFeed(feed) => {
let refs = RakedReferrerEntry {
references: references_from_urlrakes(&feed, ReferenceKind::FeedEntry),
};
self.submission
.references
.send((url.clone(), refs.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today)
.await?;
Ok(NextAction::Continue)
}
RakeOutcome::RakedSitemap(sitemap) => {
let refs = RakedReferrerEntry {
references: references_from_urlrakes(&sitemap, ReferenceKind::SitemapEntry),
};
self.submission
.references
.send((url.clone(), refs.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today)
.await?;
Ok(NextAction::Continue)
}
RakeOutcome::Redirect { reason, new_url } => {
let refs = RakedReferrerEntry {
references: [RakedReference {
target: new_url.to_string(),
kind: match reason {
RedirectReason::Redirected { .. } => ReferenceKind::Redirect,
RedirectReason::NotCanonical { .. } => ReferenceKind::CanonicalUrl,
},
last_mod: None,
}]
.into_iter()
.collect(),
};
self.submission
.references
.send((url.clone(), refs.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today)
.await?;
Ok(NextAction::Continue)
}
RakeOutcome::TemporaryFailure(failure) => {
// TODO do we want to log this somewhere?
let domain = get_reduced_domain(url)?;
let url = url.clone();
// TODO add 1.1× the previous backoff, if there was one.
let new_backoff = failure.backoff_sec;
let domain = domain.into_owned();
self.store
.async_rw_txn(move |txn| {
txn.start_backing_off(&domain, new_backoff, url.to_string(), failure)?;
Ok(())
})
.await?;
// Change domain now
Ok(NextAction::ChangeDomain)
}
RakeOutcome::PermanentFailure(failure) => {
self.submission
.rejections
.send((url.clone(), failure.clone()))
.await
.context("Rejection processor shut down; can't stream rejection!!")?;
self.as_event_processor()
.process_rejection(url.clone(), today)
.await?;
// Reasons for permanent rejection aren't our fault or a site-wide fault;
// so don't worry about carrying on.
Ok(NextAction::Continue)
}
} }
todo!()
} }
async fn process_feed_or_sitemap(&self, feed: &Vec<UrlRaked>) -> anyhow::Result<()> { fn as_event_processor(&self) -> EventProcessor {
todo!() EventProcessor {
store: Cow::Borrowed(&self.store),
}
}
}
/// Processes events that are either emitted into or loaded from a RakePack.
/// Processing events this ways means we can bring up a raker's store to the same state as another,
/// just by replaying the stream of RakePacks and importing seeds.
pub struct EventProcessor<'a> {
store: Cow<'a, RakerStore>,
}
impl EventProcessor<'_> {
pub async fn process_page(&self, url: Url, datestamp: u16) -> anyhow::Result<()> {
self.store
.as_ref()
.async_rw_txn(move |txn| {
let domain = get_reduced_domain(&url)?;
txn.mark_url_as_visited(
domain.as_ref(),
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
)
})
.await
}
pub async fn process_refs(
&self,
url: Url,
refs: RakedReferrerEntry,
datestamp: u16,
) -> anyhow::Result<()> {
self.store
.as_ref()
.async_rw_txn(move |txn| {
let domain = get_reduced_domain(&url)?;
txn.mark_url_as_visited(
domain.as_ref(),
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
)?;
// track all the referred-to URLs!
for reference in refs.references {
txn.enqueue_url(&reference.target, reference.last_mod, reference.kind.into())?;
}
Ok(())
})
.await
}
pub async fn process_rejection(&self, url: Url, datestamp: u16) -> anyhow::Result<()> {
self.store
.as_ref()
.async_rw_txn(move |txn| {
let domain = get_reduced_domain(&url)?;
txn.mark_url_as_visited(
domain.as_ref(),
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
)
})
.await
} }
} }

View File

@ -1,6 +1,10 @@
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32}; use crate::raking::analysis::get_reduced_domain;
use crate::raking::{RakeIntent, TemporaryFailure};
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64};
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord, UrlVisitedRecord}; use crate::storage::records::{
ActiveDomainRecord, BackingOffDomainRecord, QueueUrlRecord, UrlVisitedRecord,
};
use anyhow::{bail, ensure, Context}; use anyhow::{bail, ensure, Context};
use libmdbx::{ use libmdbx::{
Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind,
@ -8,9 +12,11 @@ use libmdbx::{
}; };
use log::info; use log::info;
use ouroboros::self_referencing; use ouroboros::self_referencing;
use reqwest::Url;
use std::borrow::Cow; use std::borrow::Cow;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
mod mdbx_helper_types; mod mdbx_helper_types;
mod migrations; mod migrations;
@ -29,12 +35,12 @@ pub struct Databases<'env> {
/// Domain → BackingOffDomainRecord /// Domain → BackingOffDomainRecord
pub backing_off_domains: Database<'env>, pub backing_off_domains: Database<'env>,
/// URL → VisitedDomainRecord /// URL → VisitedDomainRecord
pub visited: Database<'env>, pub visited_urls: Database<'env>,
} }
// Must match the order of the Databases struct fields. // Must match the order of the Databases struct fields.
pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [ pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [
("queue_urls", DatabaseFlags::empty()), ("urls_queue", DatabaseFlags::empty()),
("active_domains", DatabaseFlags::empty()), ("active_domains", DatabaseFlags::empty()),
("active_domain_raffle", DatabaseFlags::INTEGER_KEY), ("active_domain_raffle", DatabaseFlags::INTEGER_KEY),
( (
@ -42,7 +48,7 @@ pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [
DatabaseFlags::INTEGER_KEY.union(DatabaseFlags::DUP_SORT), DatabaseFlags::INTEGER_KEY.union(DatabaseFlags::DUP_SORT),
), ),
("backing_off_domains", DatabaseFlags::empty()), ("backing_off_domains", DatabaseFlags::empty()),
("visited", DatabaseFlags::empty()), ("urls_visited", DatabaseFlags::empty()),
]; ];
#[self_referencing] #[self_referencing]
@ -126,7 +132,7 @@ impl RakerStore {
active_domain_raffle: dbs.next().unwrap(), active_domain_raffle: dbs.next().unwrap(),
backing_off_reinstatements: dbs.next().unwrap(), backing_off_reinstatements: dbs.next().unwrap(),
backing_off_domains: dbs.next().unwrap(), backing_off_domains: dbs.next().unwrap(),
visited: dbs.next().unwrap(), visited_urls: dbs.next().unwrap(),
} }
}, },
} }
@ -178,13 +184,14 @@ impl<'a> RakerTxn<'a, RW> {
/// Inserts a domain into the active domain table, /// Inserts a domain into the active domain table,
/// generating a raffle ticket (and inserting it too). /// generating a raffle ticket (and inserting it too).
/// ///
/// No-op if the domain is already an active domain. /// No-op if the domain is already an active domain or being backed off from.
pub fn insert_active_domain_with_new_raffle_ticket( pub fn insert_active_domain_with_new_raffle_ticket(
&self, &self,
new_domain: String, new_domain: String,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let active_domains = &self.mdbx.borrow_dbs().active_domains; let active_domains = &self.mdbx.borrow_dbs().active_domains;
let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle;
let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains;
let new_domain = MdbxString(Cow::Owned(new_domain)); let new_domain = MdbxString(Cow::Owned(new_domain));
@ -196,6 +203,14 @@ impl<'a> RakerTxn<'a, RW> {
return Ok(()); return Ok(());
} }
if self
.mdbx_txn
.get::<()>(backing_off_domains, new_domain.as_bytes())?
.is_some()
{
return Ok(());
}
let reserved_raffle_ticket = loop { let reserved_raffle_ticket = loop {
let next_raffle_ticket = MdbxU32(rand::random()); let next_raffle_ticket = MdbxU32(rand::random());
if self if self
@ -229,11 +244,11 @@ impl<'a> RakerTxn<'a, RW> {
/// table. /// table.
/// ///
/// Returns true if a deletion took place, and false if it did not. /// Returns true if a deletion took place, and false if it did not.
pub fn remove_active_domain(&self, domain: String) -> anyhow::Result<bool> { pub fn remove_active_domain(&self, domain: &str) -> anyhow::Result<bool> {
let active_domains = &self.mdbx.borrow_dbs().active_domains; let active_domains = &self.mdbx.borrow_dbs().active_domains;
let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle;
let domain = MdbxString(Cow::Owned(domain)); let domain = MdbxString(Cow::Borrowed(domain));
if let Some(MdbxBare(active_domain)) = self if let Some(MdbxBare(active_domain)) = self
.mdbx_txn .mdbx_txn
@ -253,12 +268,12 @@ impl<'a> RakerTxn<'a, RW> {
/// Marks a URL as visited and takes it out of the queue. /// Marks a URL as visited and takes it out of the queue.
pub fn mark_url_as_visited( pub fn mark_url_as_visited(
&self, &self,
domain: String, domain: &str,
url_str: String, url_str: &str,
record: UrlVisitedRecord, record: UrlVisitedRecord,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let queue_urls = &self.mdbx.borrow_dbs().queue_urls; let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
let visited_urls = &self.mdbx.borrow_dbs().visited; let visited_urls = &self.mdbx.borrow_dbs().visited_urls;
let queue_key = format!("{}\n{}", domain, url_str); let queue_key = format!("{}\n{}", domain, url_str);
@ -276,6 +291,108 @@ impl<'a> RakerTxn<'a, RW> {
Ok(()) Ok(())
} }
pub fn start_backing_off(
&self,
domain: &str,
backoff_for: u32,
failed_url: String,
failure: TemporaryFailure,
) -> anyhow::Result<()> {
ensure!(
self.remove_active_domain(domain)?,
"Can't back off from domain that's not active"
);
let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains;
let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements;
let reinstate_at =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() + backoff_for as u64;
let backoff_record = BackingOffDomainRecord {
failed_url,
failure,
backoff: backoff_for,
reinstate_at,
};
self.mdbx_txn.put(
backing_off_domains,
domain.as_bytes(),
&MdbxBare(backoff_record).as_bytes(),
WriteFlags::empty(),
)?;
self.mdbx_txn.put(
backing_off_reinstatements,
MdbxU64(reinstate_at).as_bytes(),
domain.as_bytes(),
WriteFlags::empty(),
)?;
Ok(())
}
/// Enqueues a URL.
/// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been
/// visited since then.
/// If `only_if_not_visited_since` is not specified, then this is a no-op if the page has already
/// been visited.
///
/// Returns: true if it was enqueued, false if nothing changed.
pub fn enqueue_url(
&self,
url_str: &str,
last_modified: Option<u16>,
intent: RakeIntent,
) -> anyhow::Result<bool> {
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
let visited_urls = &self.mdbx.borrow_dbs().visited_urls;
let url = Url::parse(url_str)?;
let url_domain = get_reduced_domain(&url)?;
let queue_key = format!("{}\n{}", url_domain, url);
if self
.mdbx_txn
.get::<()>(queue_urls, queue_key.as_bytes())?
.is_some()
{
// Already in the queue. Nothing to do here.
return Ok(false);
}
if let Some(MdbxBare(visited_entry)) = self
.mdbx_txn
.get::<MdbxBare<UrlVisitedRecord>>(visited_urls, url_str.as_bytes())?
{
match last_modified {
None => {
// Already visited. Nothing to do here.
return Ok(false);
}
Some(last_modified) => {
if last_modified <= visited_entry.last_visited_days {
// Hasn't been modified since our last visit
return Ok(false);
}
}
}
}
// Add the entry to the queue
self.mdbx_txn.put(
queue_urls,
queue_key.as_bytes(),
&MdbxBare(QueueUrlRecord { intent }).as_bytes(),
WriteFlags::empty(),
)?;
// Activate the domain if needed...
self.insert_active_domain_with_new_raffle_ticket(url_domain.into_owned())?;
Ok(true)
}
} }
/// Read-only implementations (but can also be used on RW transactions) /// Read-only implementations (but can also be used on RW transactions)

View File

@ -30,6 +30,32 @@ impl TableObject<'_> for MdbxU32 {
} }
} }
/// u64 in native byte endianness (as required by INTEGERKEY mode)
#[derive(Copy, Clone, Debug)]
pub struct MdbxU64(pub u64);
impl MdbxU64 {
pub fn as_bytes(&self) -> Cow<'_, [u8]> {
Cow::Owned(self.0.to_ne_bytes().to_vec())
}
}
impl TableObject<'_> for MdbxU64 {
fn decode(data_val: &[u8]) -> Result<Self, libmdbx::Error>
where
Self: Sized,
{
if data_val.len() != 8 {
return Err(libmdbx::Error::DecodeError(
anyhow!("MDBX Key not 8 bytes; can't be decoded as u64").into(),
));
}
let mut buf = [0u8; 8];
buf.copy_from_slice(&data_val);
Ok(MdbxU64(u64::from_ne_bytes(buf)))
}
}
/// UTF-8 String /// UTF-8 String
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MdbxString<'txn>(pub Cow<'txn, str>); pub struct MdbxString<'txn>(pub Cow<'txn, str>);

View File

@ -1,4 +1,4 @@
use crate::raking::RakeIntent; use crate::raking::{RakeIntent, TemporaryFailure};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
@ -21,3 +21,16 @@ pub struct UrlVisitedRecord {
pub struct QueueUrlRecord { pub struct QueueUrlRecord {
pub intent: RakeIntent, // TODO pub intent: RakeIntent, // TODO
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct BackingOffDomainRecord {
/// The URL that caused the backoff.
pub failed_url: String,
/// The reason that this backoff is in place
pub failure: TemporaryFailure,
/// Duration of the backoff. Used to provide increasing backoffs if the failures persist.
pub backoff: u32,
/// When the domain should be reinstated
/// MUST match the timestamp present in the reinstatements table.
pub reinstate_at: u64,
}

View File

@ -14,6 +14,8 @@ bitflags! {
const COOKIE_NAG = 0x04; const COOKIE_NAG = 0x04;
/// Unspecified annoyances are present on this page, according to a cosmetic filter. /// Unspecified annoyances are present on this page, according to a cosmetic filter.
const ANNOYANCE = 0x08; const ANNOYANCE = 0x08;
/// Social trackers are a subcategory of privacy...
const SOCIAL = 0x10;
/// The web page was served over CloudFlare at the time of indexing, which is not in the /// The web page was served over CloudFlare at the time of indexing, which is not in the
/// spirit of decentralisation. /// spirit of decentralisation.
@ -38,6 +40,8 @@ pub struct RakedReferrerEntry {
pub struct RakedReference { pub struct RakedReference {
pub target: String, pub target: String,
pub kind: ReferenceKind, pub kind: ReferenceKind,
/// Date of last modification (if known), as a QuickPeep datestamp.
pub last_mod: Option<u16>,
} }
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] #[derive(Serialize, Deserialize, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]