Reinstate re-rakable URLs on startup
This commit is contained in:
parent
6ecbc0561f
commit
52d0183942
|
@ -4,6 +4,7 @@ use env_logger::Env;
|
|||
|
||||
use adblock::lists::RuleTypes;
|
||||
use anyhow::{bail, Context};
|
||||
use chrono::Utc;
|
||||
use log::{debug, error, warn};
|
||||
use lru::LruCache;
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
|
@ -31,6 +32,7 @@ use quickpeep_structs::rake_entries::{
|
|||
AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES,
|
||||
SCHEMA_RAKED_REJECTIONS,
|
||||
};
|
||||
use quickpeep_utils::dates::date_to_quickpeep_days;
|
||||
|
||||
/// The ordering is slightly important on these: more specific things should come first.
|
||||
/// This means they filter out the troublesome elements before the broader filters do.
|
||||
|
@ -280,6 +282,16 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
})
|
||||
.await?;
|
||||
|
||||
// Reinstate re-rakable URLs
|
||||
let today = date_to_quickpeep_days(&Utc::today())?;
|
||||
store
|
||||
.async_rw_txn(move |txn| {
|
||||
txn.reinstate_rerakables(today)?;
|
||||
txn.commit()?;
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut tasks = Vec::with_capacity(num_tasks as usize);
|
||||
|
||||
for task_num in 0..num_tasks {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::raking::{RakeIntent, TemporaryFailure};
|
||||
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64};
|
||||
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU16BE, MdbxU32, MdbxU64};
|
||||
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
|
||||
use crate::storage::records::{
|
||||
ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, OnHoldUrlRecord,
|
||||
|
@ -16,7 +16,7 @@ use ouroboros::self_referencing;
|
|||
use quickpeep_utils::urls::get_reduced_domain;
|
||||
use reqwest::Url;
|
||||
use std::borrow::{Borrow, Cow};
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::ops::Add;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
@ -451,6 +451,54 @@ impl<'a> RakerTxn<'a, RW> {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
/// Reinstates URLs that are now re-rakable.
|
||||
pub fn reinstate_rerakables(&self, today: u16) -> anyhow::Result<()> {
|
||||
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
|
||||
let rerake_queue = &self.mdbx.borrow_dbs().rerake_queue;
|
||||
|
||||
let mut reinstatable_domains: BTreeSet<String> = BTreeSet::new();
|
||||
|
||||
let mut cur = self.mdbx_txn.cursor(rerake_queue)?;
|
||||
cur.first::<MdbxU16BE, MdbxString>()?;
|
||||
loop {
|
||||
let (MdbxU16BE(rerake_datestamp), url_to_rerake) =
|
||||
match cur.get_current::<MdbxU16BE, MdbxString>()? {
|
||||
Some(x) => x,
|
||||
None => break,
|
||||
};
|
||||
|
||||
if rerake_datestamp > today {
|
||||
break;
|
||||
}
|
||||
|
||||
let url_str = url_to_rerake.into_string();
|
||||
let url = Url::parse(&url_str).context("Failed to parse rerakable URL")?;
|
||||
let url_domain =
|
||||
get_reduced_domain(&url).context("Unable to reduce domain for rerakable URL")?;
|
||||
|
||||
self.mdbx_txn.put(
|
||||
queue_urls,
|
||||
format!("{}\n{}", url_domain, url_str).as_bytes(),
|
||||
// TODO(correctness): should specify the same intent as before.
|
||||
&MdbxBare(QueueUrlRecord {
|
||||
intent: RakeIntent::Any,
|
||||
})
|
||||
.as_bytes(),
|
||||
WriteFlags::NO_OVERWRITE,
|
||||
)?;
|
||||
|
||||
reinstatable_domains.insert(url_domain.into_owned());
|
||||
|
||||
cur.del(WriteFlags::empty())?;
|
||||
}
|
||||
|
||||
for domain in reinstatable_domains {
|
||||
self.insert_active_domain_with_new_raffle_ticket(domain)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enqueues a URL.
|
||||
/// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been
|
||||
/// visited since then.
|
||||
|
|
|
@ -4,6 +4,32 @@ use serde::de::DeserializeOwned;
|
|||
use serde::Serialize;
|
||||
use std::borrow::Cow;
|
||||
|
||||
/// u16 in BIG byte endianness (u16 not supported by INTEGERKEY mode!)
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct MdbxU16BE(pub u16);
|
||||
|
||||
impl MdbxU16BE {
|
||||
pub fn as_bytes(&self) -> Cow<'_, [u8]> {
|
||||
Cow::Owned(self.0.to_be_bytes().to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
impl TableObject<'_> for MdbxU16BE {
|
||||
fn decode(data_val: &[u8]) -> Result<Self, libmdbx::Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
if data_val.len() != 2 {
|
||||
return Err(libmdbx::Error::DecodeError(
|
||||
anyhow!("MDBX Key not 2 bytes; can't be decoded as u16").into(),
|
||||
));
|
||||
}
|
||||
let mut buf = [0u8; 2];
|
||||
buf.copy_from_slice(&data_val);
|
||||
Ok(MdbxU16BE(u16::from_be_bytes(buf)))
|
||||
}
|
||||
}
|
||||
|
||||
/// u32 in native byte endianness (as required by INTEGERKEY mode)
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct MdbxU32(pub u32);
|
||||
|
|
Loading…
Reference in New Issue