Put URLs on hold rather than the queue if they are not allowed

This commit is contained in:
Olivier 'reivilibre' 2022-03-21 19:16:48 +00:00
parent 9ef4fef858
commit 51d5b9208b
3 changed files with 92 additions and 6 deletions

View File

@ -4,7 +4,7 @@ use crate::raking::{
get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome, get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome,
Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
}; };
use crate::storage::records::UrlVisitedRecord; use crate::storage::records::{AllowedDomainRecord, UrlVisitedRecord};
use crate::storage::{RakerStore, RandomActiveDomainAcquisition}; use crate::storage::{RakerStore, RandomActiveDomainAcquisition};
use anyhow::{anyhow, ensure, Context}; use anyhow::{anyhow, ensure, Context};
use chrono::Utc; use chrono::Utc;
@ -16,7 +16,7 @@ use quickpeep_structs::rake_entries::{
}; };
use quickpeep_utils::dates::date_to_quickpeep_days; use quickpeep_utils::dates::date_to_quickpeep_days;
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use std::borrow::Cow; use std::borrow::{Borrow, Cow};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::sync::{Arc, Mutex as StdMutex, RwLock};
@ -469,7 +469,42 @@ impl EventProcessor<'_> {
// track all the referred-to URLs! // track all the referred-to URLs!
for reference in refs.references { for reference in refs.references {
txn.enqueue_url(&reference.target, reference.last_mod, reference.kind.into())?; let ref_url = Url::parse(&reference.target)?;
let domain = get_reduced_domain(&ref_url)?;
let allowed = match txn.get_allowed_domain_record(domain.borrow())? {
None => false,
Some(AllowedDomainRecord {
restricted_prefixes,
}) => {
if restricted_prefixes.is_empty() {
true
} else {
let mut allowed = false;
for prefix in restricted_prefixes.iter() {
if ref_url.path().starts_with(prefix) {
allowed = true;
break;
}
if prefix.as_str() > ref_url.path() {
// e.g. /dog > /cat/xyz
// This means we've missed all chances to see our prefix,
// so we break here (efficiency).
break;
}
}
allowed
}
}
};
if allowed {
txn.enqueue_url(
&reference.target,
reference.last_mod,
reference.kind.into(),
)?;
} else {
txn.put_url_on_hold(&reference.target, reference.kind.into())?;
}
} }
txn.commit()?; txn.commit()?;

View File

@ -3,8 +3,8 @@ use crate::raking::{RakeIntent, TemporaryFailure};
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64}; use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64};
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
use crate::storage::records::{ use crate::storage::records::{
ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, QueueUrlRecord, ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, OnHoldUrlRecord,
UrlVisitedRecord, QueueUrlRecord, UrlVisitedRecord,
}; };
use anyhow::{anyhow, bail, ensure, Context}; use anyhow::{anyhow, bail, ensure, Context};
use libmdbx::{ use libmdbx::{
@ -42,7 +42,7 @@ pub struct Databases<'env> {
pub visited_urls: Database<'env>, pub visited_urls: Database<'env>,
/// Domain → AllowedDomainRecord /// Domain → AllowedDomainRecord
pub allowed_domains: Database<'env>, pub allowed_domains: Database<'env>,
/// Domain \n URL → Number of refs (INT VALUE) /// Domain \n URL → OnHoldUrlRecord Number of refs (INT VALUE)
pub urls_on_hold: Database<'env>, pub urls_on_hold: Database<'env>,
} }
@ -58,6 +58,8 @@ impl<'env> Databases<'env> {
), ),
("backing_off_domains", &self.backing_off_domains), ("backing_off_domains", &self.backing_off_domains),
("visited_urls", &self.visited_urls), ("visited_urls", &self.visited_urls),
("allowed_domains", &self.allowed_domains),
("urls_on_hold", &self.urls_on_hold),
] ]
.into_iter() .into_iter()
} }
@ -439,6 +441,46 @@ impl<'a> RakerTxn<'a, RW> {
Ok(true) Ok(true)
} }
/// Enqueues a URL to the 'on hold' queue.
///
/// Returns: true if it was enqueued, false if nothing changed.
pub fn put_url_on_hold(&self, url_str: &str, intent: RakeIntent) -> anyhow::Result<bool> {
let urls_on_hold = &self.mdbx.borrow_dbs().urls_on_hold;
let url = Url::parse(url_str)?;
let url_domain = get_reduced_domain(&url)?;
let queue_key = format!("{}\n{}", url_domain, url);
let (record, is_new) = if let Some(mut record) = self
.mdbx_txn
.get::<MdbxBare<OnHoldUrlRecord>>(urls_on_hold, queue_key.as_bytes())?
{
// Already in the queue. Nothing to do here, except bump up the refs count.
record.0.refs = record.0.refs.saturating_add(1);
(record, false)
} else {
(
MdbxBare(OnHoldUrlRecord {
refs: 1,
queue_record: QueueUrlRecord { intent },
}),
true,
)
};
// Add the entry to the queue
self.mdbx_txn.put(
urls_on_hold,
queue_key.as_bytes(),
&record.as_bytes(),
WriteFlags::empty(),
)?;
Ok(is_new)
}
pub fn put_allowed_domain_record( pub fn put_allowed_domain_record(
&self, &self,
domain: &str, domain: &str,

View File

@ -23,6 +23,15 @@ pub struct QueueUrlRecord {
pub intent: RakeIntent, // TODO CONSIDER pub intent: RakeIntent, // TODO CONSIDER
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct OnHoldUrlRecord {
/// Record that should be emitted once this is released.
pub queue_record: QueueUrlRecord,
/// Number of times this URL has been 'enqueued'; capped at 255.
pub refs: u8,
}
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct BackingOffDomainRecord { pub struct BackingOffDomainRecord {
/// The URL that caused the backoff. /// The URL that caused the backoff.