diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index 49ecab6..f82f148 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -4,6 +4,7 @@ use env_logger::Env; use adblock::lists::RuleTypes; use anyhow::{bail, Context}; +use chrono::Utc; use log::{debug, error, warn}; use lru::LruCache; use metrics_exporter_prometheus::PrometheusBuilder; @@ -31,6 +32,7 @@ use quickpeep_structs::rake_entries::{ AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES, SCHEMA_RAKED_REJECTIONS, }; +use quickpeep_utils::dates::date_to_quickpeep_days; /// The ordering is slightly important on these: more specific things should come first. /// This means they filter out the troublesome elements before the broader filters do. @@ -280,6 +282,16 @@ pub async fn main() -> anyhow::Result<()> { }) .await?; + // Reinstate re-rakable URLs + let today = date_to_quickpeep_days(&Utc::today())?; + store + .async_rw_txn(move |txn| { + txn.reinstate_rerakables(today)?; + txn.commit()?; + Ok(()) + }) + .await?; + let mut tasks = Vec::with_capacity(num_tasks as usize); for task_num in 0..num_tasks { diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index d064cd4..82df6cf 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -1,5 +1,5 @@ use crate::raking::{RakeIntent, TemporaryFailure}; -use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64}; +use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU16BE, MdbxU32, MdbxU64}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::records::{ ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, OnHoldUrlRecord, @@ -16,7 +16,7 @@ use ouroboros::self_referencing; use quickpeep_utils::urls::get_reduced_domain; use reqwest::Url; use std::borrow::{Borrow, Cow}; -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; use std::ops::Add; use std::path::Path; use std::sync::atomic::AtomicU64; @@ -451,6 +451,54 @@ impl<'a> RakerTxn<'a, RW> { Ok(None) } + /// Reinstates URLs that are now re-rakable. + pub fn reinstate_rerakables(&self, today: u16) -> anyhow::Result<()> { + let queue_urls = &self.mdbx.borrow_dbs().queue_urls; + let rerake_queue = &self.mdbx.borrow_dbs().rerake_queue; + + let mut reinstatable_domains: BTreeSet = BTreeSet::new(); + + let mut cur = self.mdbx_txn.cursor(rerake_queue)?; + cur.first::()?; + loop { + let (MdbxU16BE(rerake_datestamp), url_to_rerake) = + match cur.get_current::()? { + Some(x) => x, + None => break, + }; + + if rerake_datestamp > today { + break; + } + + let url_str = url_to_rerake.into_string(); + let url = Url::parse(&url_str).context("Failed to parse rerakable URL")?; + let url_domain = + get_reduced_domain(&url).context("Unable to reduce domain for rerakable URL")?; + + self.mdbx_txn.put( + queue_urls, + format!("{}\n{}", url_domain, url_str).as_bytes(), + // TODO(correctness): should specify the same intent as before. + &MdbxBare(QueueUrlRecord { + intent: RakeIntent::Any, + }) + .as_bytes(), + WriteFlags::NO_OVERWRITE, + )?; + + reinstatable_domains.insert(url_domain.into_owned()); + + cur.del(WriteFlags::empty())?; + } + + for domain in reinstatable_domains { + self.insert_active_domain_with_new_raffle_ticket(domain)?; + } + + Ok(()) + } + /// Enqueues a URL. /// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been /// visited since then. diff --git a/quickpeep_raker/src/storage/mdbx_helper_types.rs b/quickpeep_raker/src/storage/mdbx_helper_types.rs index 5d8c2d1..caf3b1f 100644 --- a/quickpeep_raker/src/storage/mdbx_helper_types.rs +++ b/quickpeep_raker/src/storage/mdbx_helper_types.rs @@ -4,6 +4,32 @@ use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Cow; +/// u16 in BIG byte endianness (u16 not supported by INTEGERKEY mode!) +#[derive(Copy, Clone, Debug)] +pub struct MdbxU16BE(pub u16); + +impl MdbxU16BE { + pub fn as_bytes(&self) -> Cow<'_, [u8]> { + Cow::Owned(self.0.to_be_bytes().to_vec()) + } +} + +impl TableObject<'_> for MdbxU16BE { + fn decode(data_val: &[u8]) -> Result + where + Self: Sized, + { + if data_val.len() != 2 { + return Err(libmdbx::Error::DecodeError( + anyhow!("MDBX Key not 2 bytes; can't be decoded as u16").into(), + )); + } + let mut buf = [0u8; 2]; + buf.copy_from_slice(&data_val); + Ok(MdbxU16BE(u16::from_be_bytes(buf))) + } +} + /// u32 in native byte endianness (as required by INTEGERKEY mode) #[derive(Copy, Clone, Debug)] pub struct MdbxU32(pub u32);