quickpeep/quickpeep_raker/src/raking/task.rs

656 lines
25 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

use crate::config::RerakeTimings;
use crate::raking::references::{clean_url, references_from_urlrakes};
use crate::raking::{
get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeIntent,
RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
};
use crate::storage::records::{AllowedDomainRecord, UrlVisitedRecord, WeedDomainRecord};
use crate::storage::{RakerStore, RandomActiveDomainAcquisition};
use anyhow::{anyhow, ensure, Context};
use chrono::Utc;
use cylon::Cylon;
use log::{error, warn};
use lru::LruCache;
use metrics::increment_counter;
use quickpeep_structs::rake_entries::{
IconEntry, RakedPageEntry, RakedReference, RakedReferrerEntry, ReferenceKind,
};
use quickpeep_utils::dates::date_to_quickpeep_days;
use quickpeep_utils::urls::get_reduced_domain;
use reqwest::{Client, Url};
use std::borrow::{Borrow, Cow};
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, RwLock};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::{Notify, Semaphore};
use tokio::time::Instant;
/// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the
/// queue and get turned into a backoff.
/// (This is necessary because some crawl delays can easily be hours or days.)
pub const MAX_CRAWL_DELAY_BEFORE_BACKOFF: Duration = Duration::from_secs(61);
/// Most sites request a crawl delay of 10 sec or less.
/// If unspecified, let's go with a reasonable-sounding number of 10 secs.
pub const DEFAULT_CRAWL_DELAY: Duration = Duration::from_secs(10);
enum NextAction {
Continue,
ChangeDomain,
}
#[derive(Clone)]
pub struct TaskResultSubmission {
pub pages: Sender<(Url, RakedPageEntry)>,
pub references: Sender<(Url, RakedReferrerEntry)>,
pub rejections: Sender<(Url, PermanentFailure)>,
pub icons: Sender<(Url, IconEntry)>,
}
#[derive(Clone)]
pub struct TaskContext {
/// The backing database store
pub store: RakerStore,
/// HTTP client
pub client: Client,
/// HTTP client that follows redirects automatically. Only used for favicons so far.
pub redirect_following_client: Client,
/// The raker
pub raker: Arc<Raker>,
/// Busy domains (that are being processed by other tasks)
pub busy_domains: Arc<StdMutex<HashSet<String>>>,
/// Cache of robots.txt entries for recently-made dormant sites
pub robotstxt_cache: Arc<RwLock<LruCache<String, Option<Cylon>>>>,
/// Semaphore that gives permits to make HTTP requests
pub semaphore: Arc<Semaphore>,
pub submission: TaskResultSubmission,
pub graceful_stop: Arc<AtomicBool>,
/// Notifier used to wake up sleepers (either to stop them gracefully, or because work
/// is available (not implemented))
pub notify: Arc<Notify>,
pub rerake_timings: Arc<RerakeTimings>,
}
impl TaskContext {
pub async fn run(mut self) -> anyhow::Result<()> {
// Get a domain to process
while !self.graceful_stop.load(Ordering::SeqCst) {
let domain = {
let txn = self.store.ro_txn()?;
txn.acquire_random_active_domain(self.busy_domains.clone())?
};
match domain {
RandomActiveDomainAcquisition::GotOne {
domain,
record: _active_record,
} => {
if let Err(err) = self.process_domain(domain.clone()).await {
error!("Encountered error processing {:?}: {:?}", domain, err);
}
ensure!(
self.busy_domains
.lock()
.map_err(|_| anyhow!("busy domains set poisoned"))?
.remove(&domain),
"Our domain was not busy after processing!"
);
}
RandomActiveDomainAcquisition::AllBusy => {
// TODO(perf): notify waiters when new domains are available.
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {
// nop
},
_ = self.notify.notified() => {
// nop (we allow the notifier to wake us up in case we need to gracefully
// stop).
},
};
}
RandomActiveDomainAcquisition::NoneLeft => {
// Nothing left to do, and it's not temporary because there aren't even any
// busy domains left.
break;
}
}
}
Ok(())
}
pub async fn get_robot_rules(&self, url_of_site: &Url) -> anyhow::Result<Option<Cylon>> {
let robots = get_robots_txt_for(url_of_site, &self.redirect_following_client).await?;
Ok(robots.map(|robots: RobotsTxt| robots.rules))
}
pub async fn process_domain(&mut self, domain: String) -> anyhow::Result<()> {
let mut current_robot_rules_url: Option<Url> = None;
let mut current_robot_rules: Option<Cylon> = None;
let mut wait_until: Option<Instant> = None;
while !self.graceful_stop.load(Ordering::Relaxed) {
// Get a URL to process
let url = {
let txn = self.store.ro_txn()?;
txn.choose_url_for_domain(&domain)
.context("failed to choose URL for domain")?
};
let (url_str, url_record) = if let Some(url) = url {
url
} else {
// Exhausted all the URLs for this domain. Let's remove it from the active domains
// queue.
let domain = domain.to_owned();
let out_of_urls = self
.store
.async_rw_txn(move |txn| {
// Double-check we're still out of URLs (another could have been added since
// we last checked!)
let out_of_urls = txn.choose_url_for_domain(&domain)?.is_none();
if !out_of_urls {
return Ok(false);
}
// Delete the active domain from the store
txn.remove_active_domain(&domain)
.context("failed to remove active domain")?;
txn.commit()?;
Ok(true)
})
.await
.context("failed to check if we're out of URLs")?;
if out_of_urls {
break;
} else {
continue;
}
};
let url = Url::parse(&url_str)
.with_context(|| format!("failed to parse as URL: {url_str:?}"))?;
// Check our robot rules are valid for that URL.
let robot_url = robots_txt_url_for(&url)
.with_context(|| format!("failed to get robots.txt URL for {url_str:?}"))?;
if Some(&robot_url) != current_robot_rules_url.as_ref() {
// We need to update our robot rules!
match self.get_robot_rules(&url).await {
Ok(rules) => {
current_robot_rules = rules;
}
Err(err) => {
self.process_outcome(
&url,
RakeOutcome::TemporaryFailure(TemporaryFailure {
reason: TemporaryFailureReason::UnknownClientError(format!(
"robots.txt failure {:?}: {:?}",
url, err
)),
// Back off for a day: this ought to be enough time for the operator to fix the problem... maybe?
backoff_sec: 86400,
}),
)
.await
.context("failed to handle TemporaryFailure outcome for robots.txt")?;
// Forcefully change domain
return Ok(());
}
}
current_robot_rules_url = Some(robot_url);
}
// Check our robot rules allow us to visit the desired URL
if let Some(robot_rules) = current_robot_rules.as_ref() {
if !robot_rules.allow(url.path().as_bytes()) {
// Permanently reject this.
self.process_outcome(
&url,
RakeOutcome::PermanentFailure(PermanentFailure {
reason: PermanentFailureReason::DeniedToRobots,
}),
)
.await
.context("failed to process PermanentFailure outcome for robots.txt")?;
continue;
}
}
if let Some(wait_until) = wait_until.take() {
// Sleep to respect a crawl-delay
tokio::select! {
_ = tokio::time::sleep_until(wait_until) => {
}
_ = self.notify.notified() => {
if self.graceful_stop.load(Ordering::SeqCst) {
// It's time to shut down
break;
}
}
};
}
let delay = if let Some(robot_rules) = current_robot_rules.as_ref() {
robot_rules
.delay()
.map(Duration::from_secs)
.unwrap_or(DEFAULT_CRAWL_DELAY)
} else {
DEFAULT_CRAWL_DELAY
};
// Now acquire a permit to go and fetch the desired URL
let permit = self.semaphore.acquire().await?;
let client = if url_record.intent == RakeIntent::Icon {
&self.redirect_following_client
} else {
&self.client
};
let raked = self.raker.rake(&url, url_record.intent, client).await;
drop(permit);
// Next time, we need to wait before our request.
wait_until = Some(Instant::now() + delay);
let rake_outcome = raked.unwrap_or_else(|err| {
warn!("Failed to rake {:?}: {:?}", url, err);
match err.downcast::<PermanentFailure>() {
Ok(permanent) => RakeOutcome::PermanentFailure(permanent),
Err(err) => {
// Treat this as a temporary rejection (backoff).
RakeOutcome::TemporaryFailure(TemporaryFailure {
reason: TemporaryFailureReason::UnknownClientError(format!(
"Failed to rake {:?}: {:?}",
url, err
)),
// Back off for a day: this ought to be enough time for the operator to fix the problem... maybe?
backoff_sec: 86400,
})
}
}
});
match self.process_outcome(&url, rake_outcome).await? {
NextAction::Continue => {
// The URL has already been taken off the queue.
// We just need to continue!
}
NextAction::ChangeDomain => {
let mut cache = self
.robotstxt_cache
.write()
.map_err(|_| anyhow!("Robots.txt cache poisoned"))?;
cache.put(domain, current_robot_rules);
// Choose another domain
return Ok(());
}
}
if delay > MAX_CRAWL_DELAY_BEFORE_BACKOFF {
// This crawl-delay is painfully long!
// We'll respect it, but we treat this as a backoff and let the site lose its place
// in the queue.
let domain = domain.clone();
let url = url.clone();
let backoff = delay.as_secs().try_into().unwrap_or(u32::MAX);
self.store
.async_rw_txn(move |txn| {
txn.start_backing_off(
&domain,
backoff,
url.to_string(),
TemporaryFailure {
reason: TemporaryFailureReason::ExcruciatingCrawlDelay(
delay.as_secs(),
),
// Don't stack this up with a backoff; it's not an actual failure!
backoff_sec: 0,
},
)?;
txn.commit()?;
Ok(())
})
.await
.context("failure whilst turning long crawl delay into backoff")?;
}
}
Ok(())
}
/// Processes the outcome of
async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result<NextAction> {
let today = date_to_quickpeep_days(&Utc::today())?;
match outcome {
RakeOutcome::RakedPage(page) => {
self.submission
.pages
.send((url.clone(), page.page_entry.clone()))
.await
.context("Page processor shut down; can't stream page!")?;
self.submission
.references
.send((url.clone(), page.referrer_entry.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_page(url.clone(), page.page_entry, today)
.await
.context("failure processing page for RakedPage")?;
self.as_event_processor()
.process_refs(url.clone(), page.referrer_entry, today, false)
.await
.context("failure processing refs for RakedPage")?;
Ok(NextAction::Continue)
}
RakeOutcome::RakedFeed(feed) => {
let refs = RakedReferrerEntry {
references: references_from_urlrakes(&feed, ReferenceKind::FeedEntry),
};
self.submission
.references
.send((url.clone(), refs.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today, true)
.await
.context("failure processing refs for RakedFeed")?;
Ok(NextAction::Continue)
}
RakeOutcome::RakedSitemap(sitemap) => {
let refs = RakedReferrerEntry {
references: references_from_urlrakes(&sitemap, ReferenceKind::SitemapEntry),
};
self.submission
.references
.send((url.clone(), refs.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today, true)
.await
.context("failure processing refs for RakedSitemap")?;
Ok(NextAction::Continue)
}
RakeOutcome::RakedIcon(icon) => {
// Store icon to icon store
self.submission
.icons
.send((
url.clone(),
IconEntry {
webp_bytes: icon.webp_bytes,
},
))
.await?;
self.as_event_processor()
.process_icon(url.clone(), today)
.await
.context("failure processing icon for RakedIcon")?;
Ok(NextAction::Continue)
}
RakeOutcome::Redirect { reason, new_url } => {
let refs = RakedReferrerEntry {
references: [RakedReference {
target: clean_url(&new_url).to_string(),
kind: match reason {
RedirectReason::Redirected { .. } => ReferenceKind::Redirect,
RedirectReason::NotCanonical { .. } => ReferenceKind::CanonicalUrl,
RedirectReason::SecureUpgrade => ReferenceKind::SecureUpgrade,
},
last_mod: None,
}]
.into_iter()
.collect(),
};
self.submission
.references
.send((url.clone(), refs.clone()))
.await
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today, false)
.await
.context("Failure processing refs for Redirect")?;
Ok(NextAction::Continue)
}
RakeOutcome::TemporaryFailure(failure) => {
// TODO(future) do we want to log this somewhere?
// or at least a metric
let domain = get_reduced_domain(url)?;
let url = url.clone();
// TODO(feature) add 1.1× the previous backoff, if there was one.
let new_backoff = failure.backoff_sec;
let domain = domain.into_owned();
self.store
.async_rw_txn(move |txn| {
txn.start_backing_off(&domain, new_backoff, url.to_string(), failure)?;
txn.commit()?;
Ok(())
})
.await
.context("failed to store backoff")?;
// Change domain now
Ok(NextAction::ChangeDomain)
}
RakeOutcome::PermanentFailure(failure) => {
self.submission
.rejections
.send((url.clone(), failure.clone()))
.await
.context("Rejection processor shut down; can't stream rejection!!")?;
self.as_event_processor()
.process_rejection(url.clone(), today)
.await
.context("failed to process rejection for PermanentFailure")?;
// Reasons for permanent rejection aren't our fault or a site-wide fault;
// so don't worry about carrying on.
Ok(NextAction::Continue)
}
}
}
fn as_event_processor(&self) -> EventProcessor {
EventProcessor {
store: Cow::Borrowed(&self.store),
rerake_timings: &self.rerake_timings,
}
}
}
/// Processes events that are either emitted into or loaded from a RakePack.
/// Processing events this ways means we can bring up a raker's store to the same state as another,
/// just by replaying the stream of RakePacks and importing seeds.
pub struct EventProcessor<'a> {
store: Cow<'a, RakerStore>,
rerake_timings: &'a RerakeTimings,
}
impl EventProcessor<'_> {
pub async fn process_page(
&self,
url: Url,
page: RakedPageEntry,
datestamp: u16,
) -> anyhow::Result<()> {
let rerake_on = Some(datestamp + self.rerake_timings.page);
self.store
.as_ref()
.async_rw_txn(move |txn| {
let domain = get_reduced_domain(&url)?;
txn.mark_url_as_visited(
domain.as_ref(),
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
rerake_on,
)?;
// If there's a favicon to be tried, add it to the list...
let favicon_url_rel = page.document.head.effective_favicon_url();
if let Ok(favicon_url) = url.join(favicon_url_rel) {
txn.enqueue_url(favicon_url.as_str(), None, RakeIntent::Icon)?;
}
txn.commit()?;
Ok(())
})
.await
}
pub async fn process_icon(&self, url: Url, datestamp: u16) -> anyhow::Result<()> {
let rerake_on = Some(datestamp + self.rerake_timings.icon);
self.store
.as_ref()
.async_rw_txn(move |txn| {
let domain = get_reduced_domain(&url)?;
txn.mark_url_as_visited(
domain.as_ref(),
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
rerake_on,
)?;
txn.commit()?;
Ok(())
})
.await
}
pub async fn process_refs(
&self,
url: Url,
refs: RakedReferrerEntry,
datestamp: u16,
rerakeable_feed: bool,
) -> anyhow::Result<()> {
let rerake_on = if rerakeable_feed {
Some(self.rerake_timings.feed)
} else {
None
};
self.store
.as_ref()
.async_rw_txn(move |txn| {
let domain = get_reduced_domain(&url)?;
txn.mark_url_as_visited(
domain.as_ref(),
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
rerake_on,
)
.context("failed to mark URL as visited")?;
// track all the referred-to URLs!
for reference in refs.references {
let ref_url = Url::parse(&reference.target).with_context(|| {
format!(
"failed to parse target URL of reference: {:?}",
reference.target
)
})?;
let domain = get_reduced_domain(&ref_url).with_context(|| {
format!("failed to reduce domain: {:?}", reference.target)
})?;
// First check if this URL is an allowed URL (hence should be enqueued)
let allowed = txn
.get_allowed_domain_record(domain.borrow())?
.map(|record: AllowedDomainRecord| record.applies_to_url(&ref_url))
.unwrap_or(false);
if allowed {
let is_fresh = txn.enqueue_url(
&reference.target,
reference.last_mod,
reference.kind.into(),
)?;
if is_fresh {
increment_counter!("qprake_queue_new_url");
}
continue;
}
// Then check if this URL is a weed (hence should be ignored)
let is_weed = txn
.get_weed_domain_record(domain.borrow())?
.map(|record: WeedDomainRecord| record.applies_to_url(&ref_url))
.unwrap_or(false);
if !is_weed {
// It's neither allowed nor weeded, so put it on hold for later inspection
txn.put_url_on_hold(&reference.target, reference.kind.into())?;
}
}
txn.commit()?;
Ok(())
})
.await
}
pub async fn process_rejection(&self, url: Url, datestamp: u16) -> anyhow::Result<()> {
self.store
.as_ref()
.async_rw_txn(move |txn| {
let domain = get_reduced_domain(&url)?;
txn.mark_url_as_visited(
domain.as_ref(),
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
None,
)?;
txn.commit()?;
Ok(())
})
.await
}
}