622 lines
24 KiB
Rust
622 lines
24 KiB
Rust
use crate::config::RerakeTimings;
|
||
use crate::raking::references::{clean_url, references_from_urlrakes, SUPPORTED_SCHEMES};
|
||
use crate::raking::{
|
||
get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeIntent,
|
||
RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
|
||
};
|
||
use crate::storage::records::{DomainRecord, UrlVisitedRecord};
|
||
use crate::storage::RakerStore;
|
||
use anyhow::{anyhow, Context};
|
||
use chrono::Utc;
|
||
use cylon::Cylon;
|
||
use log::{debug, warn};
|
||
use lru::LruCache;
|
||
use metrics::increment_counter;
|
||
use quickpeep_structs::rake_entries::{
|
||
IconEntry, RakedPageEntry, RakedReference, RakedReferrerEntry, ReferenceKind,
|
||
};
|
||
use quickpeep_utils::dates::date_to_quickpeep_days;
|
||
use quickpeep_utils::urls::get_reduced_domain;
|
||
use reqwest::{Client, Url};
|
||
use std::borrow::{Borrow, Cow};
|
||
use std::collections::HashSet;
|
||
use std::sync::atomic::{AtomicBool, Ordering};
|
||
use std::sync::{Arc, Mutex as StdMutex, RwLock};
|
||
use std::time::Duration;
|
||
use tokio::sync::mpsc::Sender;
|
||
use tokio::sync::{Notify, Semaphore};
|
||
use tokio::time::Instant;
|
||
|
||
/// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the
|
||
/// queue and get turned into a backoff.
|
||
/// (This is necessary because some crawl delays can easily be hours or days.)
|
||
pub const MAX_CRAWL_DELAY_BEFORE_BACKOFF: Duration = Duration::from_secs(61);
|
||
|
||
/// Most sites request a crawl delay of 10 sec or less.
|
||
/// If unspecified, let's go with a reasonable-sounding number of 10 secs.
|
||
pub const DEFAULT_CRAWL_DELAY: Duration = Duration::from_secs(10);
|
||
|
||
enum NextAction {
|
||
Continue,
|
||
ChangeDomain,
|
||
}
|
||
|
||
#[derive(Clone)]
|
||
pub struct TaskResultSubmission {
|
||
pub pages: Sender<(Url, RakedPageEntry)>,
|
||
pub references: Sender<(Url, RakedReferrerEntry)>,
|
||
pub rejections: Sender<(Url, PermanentFailure)>,
|
||
pub icons: Sender<(Url, IconEntry)>,
|
||
}
|
||
|
||
#[derive(Clone)]
|
||
pub struct TaskContext {
|
||
/// The backing database store
|
||
pub store: RakerStore,
|
||
|
||
/// HTTP client
|
||
pub client: Client,
|
||
|
||
/// HTTP client that follows redirects automatically. Only used for favicons so far.
|
||
pub redirect_following_client: Client,
|
||
|
||
/// The raker
|
||
pub raker: Arc<Raker>,
|
||
|
||
/// Busy domains (that are being processed by other tasks)
|
||
pub busy_domains: Arc<StdMutex<HashSet<String>>>,
|
||
|
||
/// Cache of robots.txt entries for recently-made dormant sites
|
||
pub robotstxt_cache: Arc<RwLock<LruCache<String, Option<Cylon>>>>,
|
||
|
||
/// Semaphore that gives permits to make HTTP requests
|
||
pub semaphore: Arc<Semaphore>,
|
||
|
||
pub submission: TaskResultSubmission,
|
||
|
||
pub graceful_stop: Arc<AtomicBool>,
|
||
|
||
/// Notifier used to wake up sleepers (either to stop them gracefully, or because work
|
||
/// is available (not implemented))
|
||
pub notify: Arc<Notify>,
|
||
|
||
pub rerake_timings: Arc<RerakeTimings>,
|
||
}
|
||
|
||
impl TaskContext {
|
||
pub async fn get_robot_rules(&self, url_of_site: &Url) -> anyhow::Result<Option<Cylon>> {
|
||
let robots = get_robots_txt_for(url_of_site, &self.redirect_following_client).await?;
|
||
Ok(robots.map(|robots: RobotsTxt| robots.rules))
|
||
}
|
||
|
||
pub async fn process_domain(&mut self, domain: String) -> anyhow::Result<()> {
|
||
let mut current_robot_rules_url: Option<Url> = None;
|
||
let mut current_robot_rules: Option<Cylon> = None;
|
||
let mut wait_until: Option<Instant> = None;
|
||
|
||
while !self.graceful_stop.load(Ordering::Relaxed) {
|
||
// Get a URL to process
|
||
let url = {
|
||
let txn = self.store.ro_txn()?;
|
||
txn.choose_url_for_domain(&domain)
|
||
.context("failed to choose URL for domain")?
|
||
};
|
||
|
||
let (url_str, url_record) = if let Some(url) = url {
|
||
url
|
||
} else {
|
||
// Exhausted all the URLs for this domain. Let's remove it from the active domains
|
||
// queue.
|
||
let domain = domain.to_owned();
|
||
let out_of_urls = self
|
||
.store
|
||
.async_rw_txn(move |txn| {
|
||
// Double-check we're still out of URLs (another could have been added since
|
||
// we last checked!)
|
||
let out_of_urls = txn.choose_url_for_domain(&domain)?.is_none();
|
||
|
||
if !out_of_urls {
|
||
return Ok(false);
|
||
}
|
||
|
||
// Delete the active domain from the store
|
||
txn.remove_active_domain(&domain)
|
||
.context("failed to remove active domain")?;
|
||
|
||
txn.commit()?;
|
||
Ok(true)
|
||
})
|
||
.await
|
||
.context("failed to check if we're out of URLs")?;
|
||
if out_of_urls {
|
||
break;
|
||
} else {
|
||
continue;
|
||
}
|
||
};
|
||
|
||
let url = Url::parse(&url_str)
|
||
.with_context(|| format!("failed to parse as URL: {url_str:?}"))?;
|
||
|
||
// Check our robot rules are valid for that URL.
|
||
let robot_url = robots_txt_url_for(&url)
|
||
.with_context(|| format!("failed to get robots.txt URL for {url_str:?}"))?;
|
||
if Some(&robot_url) != current_robot_rules_url.as_ref() {
|
||
// We need to update our robot rules!
|
||
match self.get_robot_rules(&url).await {
|
||
Ok(rules) => {
|
||
current_robot_rules = rules;
|
||
}
|
||
Err(err) => {
|
||
self.process_outcome(
|
||
&url,
|
||
RakeOutcome::TemporaryFailure(TemporaryFailure {
|
||
reason: TemporaryFailureReason::UnknownClientError(format!(
|
||
"robots.txt failure {:?}: {:?}",
|
||
url, err
|
||
)),
|
||
// Back off for a day: this ought to be enough time for the operator to fix the problem... maybe?
|
||
backoff_sec: 86400,
|
||
}),
|
||
)
|
||
.await
|
||
.context("failed to handle TemporaryFailure outcome for robots.txt")?;
|
||
// Forcefully change domain
|
||
return Ok(());
|
||
}
|
||
}
|
||
current_robot_rules_url = Some(robot_url);
|
||
}
|
||
|
||
// Check our robot rules allow us to visit the desired URL
|
||
if let Some(robot_rules) = current_robot_rules.as_ref() {
|
||
if !robot_rules.allow(url.path().as_bytes()) {
|
||
// Permanently reject this.
|
||
self.process_outcome(
|
||
&url,
|
||
RakeOutcome::PermanentFailure(PermanentFailure {
|
||
reason: PermanentFailureReason::DeniedToRobots,
|
||
}),
|
||
)
|
||
.await
|
||
.context("failed to process PermanentFailure outcome for robots.txt")?;
|
||
continue;
|
||
}
|
||
}
|
||
|
||
if let Some(wait_until) = wait_until.take() {
|
||
// Sleep to respect a crawl-delay
|
||
tokio::select! {
|
||
_ = tokio::time::sleep_until(wait_until) => {
|
||
|
||
}
|
||
_ = self.notify.notified() => {
|
||
if self.graceful_stop.load(Ordering::SeqCst) {
|
||
// It's time to shut down
|
||
break;
|
||
}
|
||
}
|
||
};
|
||
}
|
||
|
||
let delay = if let Some(robot_rules) = current_robot_rules.as_ref() {
|
||
robot_rules
|
||
.delay()
|
||
.map(Duration::from_secs)
|
||
.unwrap_or(DEFAULT_CRAWL_DELAY)
|
||
} else {
|
||
DEFAULT_CRAWL_DELAY
|
||
};
|
||
|
||
// Now acquire a permit to go and fetch the desired URL
|
||
let permit = self.semaphore.acquire().await?;
|
||
let client = if url_record.intent == RakeIntent::Icon {
|
||
&self.redirect_following_client
|
||
} else {
|
||
&self.client
|
||
};
|
||
debug!("Rake: {url}");
|
||
let raked = self.raker.rake(&url, url_record.intent, client).await;
|
||
drop(permit);
|
||
|
||
// Next time, we need to wait before our request.
|
||
wait_until = Some(Instant::now() + delay);
|
||
|
||
let rake_outcome = raked.unwrap_or_else(|err| {
|
||
warn!("Failed to rake {:?}: {:?}", url, err);
|
||
|
||
match err.downcast::<PermanentFailure>() {
|
||
Ok(permanent) => RakeOutcome::PermanentFailure(permanent),
|
||
Err(err) => {
|
||
// Treat this as a temporary rejection (backoff).
|
||
RakeOutcome::TemporaryFailure(TemporaryFailure {
|
||
reason: TemporaryFailureReason::UnknownClientError(format!(
|
||
"Failed to rake {:?}: {:?}",
|
||
url, err
|
||
)),
|
||
// Back off for a day: this ought to be enough time for the operator to fix the problem... maybe?
|
||
backoff_sec: 86400,
|
||
})
|
||
}
|
||
}
|
||
});
|
||
|
||
match self.process_outcome(&url, rake_outcome).await? {
|
||
NextAction::Continue => {
|
||
// The URL has already been taken off the queue.
|
||
// We just need to continue!
|
||
}
|
||
NextAction::ChangeDomain => {
|
||
let mut cache = self
|
||
.robotstxt_cache
|
||
.write()
|
||
.map_err(|_| anyhow!("Robots.txt cache poisoned"))?;
|
||
cache.put(domain, current_robot_rules);
|
||
|
||
// Choose another domain
|
||
return Ok(());
|
||
}
|
||
}
|
||
|
||
if delay > MAX_CRAWL_DELAY_BEFORE_BACKOFF {
|
||
// This crawl-delay is painfully long!
|
||
// We'll respect it, but we treat this as a backoff and let the site lose its place
|
||
// in the queue.
|
||
|
||
let domain = domain.clone();
|
||
let url = url.clone();
|
||
let backoff = delay.as_secs().try_into().unwrap_or(u32::MAX);
|
||
|
||
self.store
|
||
.async_rw_txn(move |txn| {
|
||
txn.start_backing_off(
|
||
&domain,
|
||
backoff,
|
||
url.to_string(),
|
||
TemporaryFailure {
|
||
reason: TemporaryFailureReason::ExcruciatingCrawlDelay(
|
||
delay.as_secs(),
|
||
),
|
||
// Don't stack this up with a backoff; it's not an actual failure!
|
||
backoff_sec: 0,
|
||
},
|
||
)?;
|
||
txn.commit()?;
|
||
Ok(())
|
||
})
|
||
.await
|
||
.context("failure whilst turning long crawl delay into backoff")?;
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// Processes the outcome of
|
||
async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result<NextAction> {
|
||
let today = date_to_quickpeep_days(&Utc::today())?;
|
||
match outcome {
|
||
RakeOutcome::RakedPage(page) => {
|
||
self.submission
|
||
.pages
|
||
.send((url.clone(), page.page_entry.clone()))
|
||
.await
|
||
.context("Page processor shut down; can't stream page!")?;
|
||
|
||
self.submission
|
||
.references
|
||
.send((url.clone(), page.referrer_entry.clone()))
|
||
.await
|
||
.context("Reference processor shut down; can't stream references!")?;
|
||
|
||
self.as_event_processor()
|
||
.process_page(url.clone(), page.page_entry, today)
|
||
.await
|
||
.context("failure processing page for RakedPage")?;
|
||
self.as_event_processor()
|
||
.process_refs(url.clone(), page.referrer_entry, today, false)
|
||
.await
|
||
.context("failure processing refs for RakedPage")?;
|
||
|
||
Ok(NextAction::Continue)
|
||
}
|
||
RakeOutcome::RakedFeed(feed) => {
|
||
let refs = RakedReferrerEntry {
|
||
references: references_from_urlrakes(&feed, ReferenceKind::FeedEntry),
|
||
};
|
||
|
||
self.submission
|
||
.references
|
||
.send((url.clone(), refs.clone()))
|
||
.await
|
||
.context("Reference processor shut down; can't stream references!")?;
|
||
|
||
self.as_event_processor()
|
||
.process_refs(url.clone(), refs, today, true)
|
||
.await
|
||
.context("failure processing refs for RakedFeed")?;
|
||
|
||
Ok(NextAction::Continue)
|
||
}
|
||
RakeOutcome::RakedSitemap(sitemap) => {
|
||
let refs = RakedReferrerEntry {
|
||
references: references_from_urlrakes(&sitemap, ReferenceKind::SitemapEntry),
|
||
};
|
||
|
||
self.submission
|
||
.references
|
||
.send((url.clone(), refs.clone()))
|
||
.await
|
||
.context("Reference processor shut down; can't stream references!")?;
|
||
|
||
self.as_event_processor()
|
||
.process_refs(url.clone(), refs, today, true)
|
||
.await
|
||
.context("failure processing refs for RakedSitemap")?;
|
||
|
||
Ok(NextAction::Continue)
|
||
}
|
||
RakeOutcome::RakedIcon(icon) => {
|
||
// Store icon to icon store
|
||
|
||
self.submission
|
||
.icons
|
||
.send((
|
||
url.clone(),
|
||
IconEntry {
|
||
webp_bytes: icon.webp_bytes,
|
||
},
|
||
))
|
||
.await?;
|
||
|
||
self.as_event_processor()
|
||
.process_icon(url.clone(), today)
|
||
.await
|
||
.context("failure processing icon for RakedIcon")?;
|
||
|
||
Ok(NextAction::Continue)
|
||
}
|
||
RakeOutcome::Redirect { reason, new_url } => {
|
||
let refs = RakedReferrerEntry {
|
||
references: [RakedReference {
|
||
target: clean_url(&new_url).to_string(),
|
||
kind: match reason {
|
||
RedirectReason::Redirected { .. } => ReferenceKind::Redirect,
|
||
RedirectReason::NotCanonical { .. } => ReferenceKind::CanonicalUrl,
|
||
RedirectReason::SecureUpgrade => ReferenceKind::SecureUpgrade,
|
||
},
|
||
last_mod: None,
|
||
}]
|
||
.into_iter()
|
||
.collect(),
|
||
};
|
||
|
||
self.submission
|
||
.references
|
||
.send((url.clone(), refs.clone()))
|
||
.await
|
||
.context("Reference processor shut down; can't stream references!")?;
|
||
|
||
self.as_event_processor()
|
||
.process_refs(url.clone(), refs, today, false)
|
||
.await
|
||
.context("Failure processing refs for Redirect")?;
|
||
|
||
Ok(NextAction::Continue)
|
||
}
|
||
RakeOutcome::TemporaryFailure(failure) => {
|
||
// TODO(future) do we want to log this somewhere?
|
||
// or at least a metric
|
||
|
||
let domain = get_reduced_domain(url).with_context(|| {
|
||
format!("No domain in URL '{url}' for which we are processing the outcome!")
|
||
})?;
|
||
let url = url.clone();
|
||
|
||
// TODO(feature) add 1.1× the previous backoff, if there was one.
|
||
let new_backoff = failure.backoff_sec;
|
||
let domain = domain.into_owned();
|
||
|
||
self.store
|
||
.async_rw_txn(move |txn| {
|
||
txn.start_backing_off(&domain, new_backoff, url.to_string(), failure)?;
|
||
txn.commit()?;
|
||
Ok(())
|
||
})
|
||
.await
|
||
.context("failed to store backoff")?;
|
||
|
||
// Change domain now
|
||
Ok(NextAction::ChangeDomain)
|
||
}
|
||
RakeOutcome::PermanentFailure(failure) => {
|
||
self.submission
|
||
.rejections
|
||
.send((url.clone(), failure.clone()))
|
||
.await
|
||
.context("Rejection processor shut down; can't stream rejection!!")?;
|
||
self.as_event_processor()
|
||
.process_rejection(url.clone(), today)
|
||
.await
|
||
.context("failed to process rejection for PermanentFailure")?;
|
||
|
||
// Reasons for permanent rejection aren't our fault or a site-wide fault;
|
||
// so don't worry about carrying on.
|
||
Ok(NextAction::Continue)
|
||
}
|
||
}
|
||
}
|
||
|
||
fn as_event_processor(&self) -> EventProcessor {
|
||
EventProcessor {
|
||
store: Cow::Borrowed(&self.store),
|
||
rerake_timings: &self.rerake_timings,
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Processes events that are either emitted into or loaded from a RakePack.
|
||
/// Processing events this ways means we can bring up a raker's store to the same state as another,
|
||
/// just by replaying the stream of RakePacks and importing seeds.
|
||
pub struct EventProcessor<'a> {
|
||
store: Cow<'a, RakerStore>,
|
||
rerake_timings: &'a RerakeTimings,
|
||
}
|
||
|
||
impl EventProcessor<'_> {
|
||
pub async fn process_page(
|
||
&self,
|
||
url: Url,
|
||
page: RakedPageEntry,
|
||
datestamp: u16,
|
||
) -> anyhow::Result<()> {
|
||
let rerake_on = Some(datestamp + self.rerake_timings.page);
|
||
self.store
|
||
.as_ref()
|
||
.async_rw_txn(move |txn| {
|
||
let domain = get_reduced_domain(&url).with_context(|| {
|
||
format!("No domain for URL '{url}' for which we are processing the page!")
|
||
})?;
|
||
txn.mark_url_as_visited(
|
||
domain.as_ref(),
|
||
url.as_ref(),
|
||
UrlVisitedRecord {
|
||
last_visited_days: datestamp,
|
||
},
|
||
rerake_on,
|
||
)?;
|
||
|
||
// If there's a favicon to be tried, add it to the list...
|
||
let favicon_url_rel = page.document.head.effective_favicon_url();
|
||
if let Ok(favicon_url) = url.join(favicon_url_rel) {
|
||
if SUPPORTED_SCHEMES.contains(&favicon_url.scheme()) {
|
||
txn.enqueue_url(favicon_url.as_str(), None, RakeIntent::Icon)?;
|
||
}
|
||
}
|
||
|
||
txn.commit()?;
|
||
Ok(())
|
||
})
|
||
.await
|
||
}
|
||
|
||
pub async fn process_icon(&self, url: Url, datestamp: u16) -> anyhow::Result<()> {
|
||
let rerake_on = Some(datestamp + self.rerake_timings.icon);
|
||
|
||
self.store
|
||
.as_ref()
|
||
.async_rw_txn(move |txn| {
|
||
let domain = get_reduced_domain(&url).with_context(|| {
|
||
format!("No domain for URL '{url}' for which we are processing an icon!")
|
||
})?;
|
||
txn.mark_url_as_visited(
|
||
domain.as_ref(),
|
||
url.as_ref(),
|
||
UrlVisitedRecord {
|
||
last_visited_days: datestamp,
|
||
},
|
||
rerake_on,
|
||
)?;
|
||
|
||
txn.commit()?;
|
||
Ok(())
|
||
})
|
||
.await
|
||
}
|
||
|
||
pub async fn process_refs(
|
||
&self,
|
||
url: Url,
|
||
refs: RakedReferrerEntry,
|
||
datestamp: u16,
|
||
rerakeable_feed: bool,
|
||
) -> anyhow::Result<()> {
|
||
let rerake_on = if rerakeable_feed {
|
||
Some(self.rerake_timings.feed)
|
||
} else {
|
||
None
|
||
};
|
||
self.store
|
||
.as_ref()
|
||
.async_rw_txn(move |txn| {
|
||
let domain = get_reduced_domain(&url).with_context(|| {
|
||
format!("No domain for URL '{url}' for which we are processing refs!")
|
||
})?;
|
||
txn.mark_url_as_visited(
|
||
domain.as_ref(),
|
||
url.as_ref(),
|
||
UrlVisitedRecord {
|
||
last_visited_days: datestamp,
|
||
},
|
||
rerake_on,
|
||
)
|
||
.context("failed to mark URL as visited")?;
|
||
|
||
// track all the referred-to URLs!
|
||
for reference in refs.references {
|
||
let ref_url = Url::parse(&reference.target).with_context(|| {
|
||
format!(
|
||
"failed to parse target URL of reference: {:?}",
|
||
reference.target
|
||
)
|
||
})?;
|
||
let domain = get_reduced_domain(&ref_url).with_context(|| {
|
||
format!("failed to reduce domain: {:?}", reference.target)
|
||
})?;
|
||
|
||
// Check if this URL is an allowed URL (hence should be enqueued)
|
||
let allowed = txn
|
||
.get_domain_record(domain.borrow())?
|
||
.map(|record: DomainRecord| record.is_url_rakeable(&ref_url))
|
||
.flatten();
|
||
|
||
match allowed {
|
||
Some(true) => {
|
||
let is_fresh = txn.enqueue_url(
|
||
&reference.target,
|
||
reference.last_mod,
|
||
reference.kind.into(),
|
||
)?;
|
||
if is_fresh {
|
||
increment_counter!("qprake_queue_new_url");
|
||
}
|
||
continue;
|
||
}
|
||
Some(false) => {
|
||
// Weed! Do nothing.
|
||
}
|
||
None => {
|
||
// It's neither allowed nor weeded, so put it on hold for later inspection
|
||
txn.put_url_on_hold(&reference.target, reference.kind.into())?;
|
||
}
|
||
}
|
||
}
|
||
|
||
txn.commit()?;
|
||
Ok(())
|
||
})
|
||
.await
|
||
}
|
||
|
||
pub async fn process_rejection(&self, url: Url, datestamp: u16) -> anyhow::Result<()> {
|
||
self.store
|
||
.as_ref()
|
||
.async_rw_txn(move |txn| {
|
||
let domain = get_reduced_domain(&url).with_context(|| {
|
||
format!("No domain for URL '{url}' for which we are processing a rejection!")
|
||
})?;
|
||
txn.mark_url_as_visited(
|
||
domain.as_ref(),
|
||
url.as_ref(),
|
||
UrlVisitedRecord {
|
||
last_visited_days: datestamp,
|
||
},
|
||
None,
|
||
)?;
|
||
txn.commit()?;
|
||
Ok(())
|
||
})
|
||
.await
|
||
}
|
||
}
|