diff --git a/.gitignore b/.gitignore index cce471e..6b923ae 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,5 @@ /quickpeep.ron /index_icons /index_icons-lck + +target \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 28137ac..5305c5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3820,6 +3820,7 @@ dependencies = [ "signal-hook 0.3.13", "sitemap", "smartstring", + "tempfile", "tokio", "webp", "zstd", diff --git a/quickpeep_raker/Cargo.toml b/quickpeep_raker/Cargo.toml index 983a77c..bea9cb3 100644 --- a/quickpeep_raker/Cargo.toml +++ b/quickpeep_raker/Cargo.toml @@ -89,3 +89,6 @@ metrics = "0.18.1" metrics-exporter-prometheus = { version = "0.9.0", default-features = false, features = ["http-listener"] } metrics-process-promstyle = "0.18.0" bare-metrics-recorder = "0.1.0" + +[dev-dependencies] +tempfile = "3.3.0" diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index b3dc2c3..1e4c8fa 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -17,10 +17,11 @@ use quickpeep_utils::urls::get_reduced_domain; use reqwest::Url; use std::borrow::{Borrow, Cow}; use std::collections::HashSet; +use std::ops::Add; use std::path::Path; use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; pub mod maintenance; pub mod mdbx_helper_types; @@ -400,6 +401,39 @@ impl<'a> RakerTxn<'a, RW> { Ok(()) } + /// Reinstates backing-off domains up to the specified time. + /// Returns the time of the next reinstatement, if there is one. + pub fn reinstate_backoffs(&self, up_to_ts: SystemTime) -> anyhow::Result> { + let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains; + let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements; + + let reinstate_up_to = up_to_ts.duration_since(UNIX_EPOCH)?.as_secs(); + + let mut cur = self.mdbx_txn.cursor(backing_off_reinstatements)?; + cur.first::()?; + loop { + let (MdbxU64(reinstatement_time), domain_to_reinstate) = + match cur.get_current::()? { + Some(x) => x, + None => break, + }; + + if reinstatement_time > reinstate_up_to { + return Ok(Some( + UNIX_EPOCH.add(Duration::from_secs(reinstatement_time)), + )); + } + + let dom_str = domain_to_reinstate.into_string(); + self.mdbx_txn + .del(backing_off_domains, dom_str.clone(), None)?; + self.insert_active_domain_with_new_raffle_ticket(dom_str)?; + cur.del(WriteFlags::empty())?; + } + + Ok(None) + } + /// Enqueues a URL. /// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been /// visited since then. @@ -553,6 +587,7 @@ pub fn register_datastore_metrics() -> anyhow::Result<()> { Ok(()) } +#[derive(Clone, Debug, Eq, PartialEq)] pub enum RandomActiveDomainAcquisition { GotOne { domain: String, @@ -717,3 +752,89 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> { Ok(()) } } + +#[cfg(test)] +pub mod test { + use super::*; + use crate::raking::TemporaryFailureReason; + use std::collections::BTreeSet; + use tempfile::NamedTempFile; + + #[test] + fn test_reinstate_multiple_domains() -> anyhow::Result<()> { + let tfile = NamedTempFile::new()?; + let store = RakerStore::open(tfile.path())?; + { + let txn = store.rw_txn()?; + txn.insert_active_domain_with_new_raffle_ticket("a.invalid".to_owned())?; + txn.insert_active_domain_with_new_raffle_ticket("b.invalid".to_owned())?; + txn.commit()?; + } + + let now = SystemTime::now(); + + { + let txn = store.rw_txn()?; + txn.start_backing_off( + "a.invalid", + 300, + "".to_owned(), + TemporaryFailure { + reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1), + backoff_sec: 300, + }, + )?; + txn.start_backing_off( + "b.invalid", + 300, + "".to_owned(), + TemporaryFailure { + reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1), + backoff_sec: 300, + }, + )?; + txn.commit()?; + } + + { + let txn = store.ro_txn()?; + assert_eq!( + txn.acquire_random_active_domain(Default::default())?, + RandomActiveDomainAcquisition::NoneLeft + ); + } + + { + let txn = store.rw_txn()?; + txn.reinstate_backoffs(now + Duration::from_secs(600))?; + txn.commit()?; + } + + { + let txn = store.ro_txn()?; + let busy = Default::default(); + + let acq1 = txn.acquire_random_active_domain(Arc::clone(&busy))?; + let acq2 = txn.acquire_random_active_domain(Arc::clone(&busy))?; + + assert!( + matches!((acq1.clone(), acq2.clone()), ( + RandomActiveDomainAcquisition::GotOne { + domain: dom1, + .. + }, + RandomActiveDomainAcquisition::GotOne { + domain: dom2, + .. + } + ) if vec![dom1.as_ref(), dom2.as_ref()].into_iter().collect::>() == vec![ + "a.invalid", "b.invalid" + ].into_iter().collect::>()), + "{:#?}", + (acq1, acq2) + ); + } + + Ok(()) + } +} diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index bdc2e94..ec3ac7e 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -3,7 +3,7 @@ use reqwest::Url; use serde::{Deserialize, Serialize}; use std::collections::BTreeSet; -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] pub struct ActiveDomainRecord { /// The raffle ticket number owned by this domain. pub raffle_ticket: u32,