Add backoff reinstatement function to store
This commit is contained in:
parent
75afb8b559
commit
fc69b1b192
2
.gitignore
vendored
2
.gitignore
vendored
@ -20,3 +20,5 @@
|
||||
/quickpeep.ron
|
||||
/index_icons
|
||||
/index_icons-lck
|
||||
|
||||
target
|
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3820,6 +3820,7 @@ dependencies = [
|
||||
"signal-hook 0.3.13",
|
||||
"sitemap",
|
||||
"smartstring",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"webp",
|
||||
"zstd",
|
||||
|
@ -89,3 +89,6 @@ metrics = "0.18.1"
|
||||
metrics-exporter-prometheus = { version = "0.9.0", default-features = false, features = ["http-listener"] }
|
||||
metrics-process-promstyle = "0.18.0"
|
||||
bare-metrics-recorder = "0.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.3.0"
|
||||
|
@ -17,10 +17,11 @@ use quickpeep_utils::urls::get_reduced_domain;
|
||||
use reqwest::Url;
|
||||
use std::borrow::{Borrow, Cow};
|
||||
use std::collections::HashSet;
|
||||
use std::ops::Add;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
pub mod maintenance;
|
||||
pub mod mdbx_helper_types;
|
||||
@ -400,6 +401,39 @@ impl<'a> RakerTxn<'a, RW> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reinstates backing-off domains up to the specified time.
|
||||
/// Returns the time of the next reinstatement, if there is one.
|
||||
pub fn reinstate_backoffs(&self, up_to_ts: SystemTime) -> anyhow::Result<Option<SystemTime>> {
|
||||
let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains;
|
||||
let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements;
|
||||
|
||||
let reinstate_up_to = up_to_ts.duration_since(UNIX_EPOCH)?.as_secs();
|
||||
|
||||
let mut cur = self.mdbx_txn.cursor(backing_off_reinstatements)?;
|
||||
cur.first::<MdbxU64, MdbxString>()?;
|
||||
loop {
|
||||
let (MdbxU64(reinstatement_time), domain_to_reinstate) =
|
||||
match cur.get_current::<MdbxU64, MdbxString>()? {
|
||||
Some(x) => x,
|
||||
None => break,
|
||||
};
|
||||
|
||||
if reinstatement_time > reinstate_up_to {
|
||||
return Ok(Some(
|
||||
UNIX_EPOCH.add(Duration::from_secs(reinstatement_time)),
|
||||
));
|
||||
}
|
||||
|
||||
let dom_str = domain_to_reinstate.into_string();
|
||||
self.mdbx_txn
|
||||
.del(backing_off_domains, dom_str.clone(), None)?;
|
||||
self.insert_active_domain_with_new_raffle_ticket(dom_str)?;
|
||||
cur.del(WriteFlags::empty())?;
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Enqueues a URL.
|
||||
/// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been
|
||||
/// visited since then.
|
||||
@ -553,6 +587,7 @@ pub fn register_datastore_metrics() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum RandomActiveDomainAcquisition {
|
||||
GotOne {
|
||||
domain: String,
|
||||
@ -717,3 +752,89 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use crate::raking::TemporaryFailureReason;
|
||||
use std::collections::BTreeSet;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
#[test]
|
||||
fn test_reinstate_multiple_domains() -> anyhow::Result<()> {
|
||||
let tfile = NamedTempFile::new()?;
|
||||
let store = RakerStore::open(tfile.path())?;
|
||||
{
|
||||
let txn = store.rw_txn()?;
|
||||
txn.insert_active_domain_with_new_raffle_ticket("a.invalid".to_owned())?;
|
||||
txn.insert_active_domain_with_new_raffle_ticket("b.invalid".to_owned())?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
let now = SystemTime::now();
|
||||
|
||||
{
|
||||
let txn = store.rw_txn()?;
|
||||
txn.start_backing_off(
|
||||
"a.invalid",
|
||||
300,
|
||||
"".to_owned(),
|
||||
TemporaryFailure {
|
||||
reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1),
|
||||
backoff_sec: 300,
|
||||
},
|
||||
)?;
|
||||
txn.start_backing_off(
|
||||
"b.invalid",
|
||||
300,
|
||||
"".to_owned(),
|
||||
TemporaryFailure {
|
||||
reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1),
|
||||
backoff_sec: 300,
|
||||
},
|
||||
)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let txn = store.ro_txn()?;
|
||||
assert_eq!(
|
||||
txn.acquire_random_active_domain(Default::default())?,
|
||||
RandomActiveDomainAcquisition::NoneLeft
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
let txn = store.rw_txn()?;
|
||||
txn.reinstate_backoffs(now + Duration::from_secs(600))?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let txn = store.ro_txn()?;
|
||||
let busy = Default::default();
|
||||
|
||||
let acq1 = txn.acquire_random_active_domain(Arc::clone(&busy))?;
|
||||
let acq2 = txn.acquire_random_active_domain(Arc::clone(&busy))?;
|
||||
|
||||
assert!(
|
||||
matches!((acq1.clone(), acq2.clone()), (
|
||||
RandomActiveDomainAcquisition::GotOne {
|
||||
domain: dom1,
|
||||
..
|
||||
},
|
||||
RandomActiveDomainAcquisition::GotOne {
|
||||
domain: dom2,
|
||||
..
|
||||
}
|
||||
) if vec![dom1.as_ref(), dom2.as_ref()].into_iter().collect::<BTreeSet<&str>>() == vec![
|
||||
"a.invalid", "b.invalid"
|
||||
].into_iter().collect::<BTreeSet<&str>>()),
|
||||
"{:#?}",
|
||||
(acq1, acq2)
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@ use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
|
||||
pub struct ActiveDomainRecord {
|
||||
/// The raffle ticket number owned by this domain.
|
||||
pub raffle_ticket: u32,
|
||||
|
Loading…
Reference in New Issue
Block a user