Add configurable re-rake times for different kinds of raked things

rei/rakerstore_postgres_overhaul
Olivier 'reivilibre' 2022-11-26 19:05:36 +00:00
parent d5255410f5
commit 6ecbc0561f
5 changed files with 64 additions and 5 deletions

View File

@ -96,5 +96,11 @@
pack_emitter: (
),
rerake_timings: (
page: 300,
icon: 365,
feed: 10,
)
),
)

View File

@ -268,6 +268,7 @@ pub async fn main() -> anyhow::Result<()> {
submission,
graceful_stop,
notify: graceful_stop_notify,
rerake_timings: Arc::new(config.raker.rerake_timings.clone()),
};
// Reinstate old backoffs

View File

@ -28,6 +28,23 @@ pub struct RakerOnlyConfig {
pub metrics: MetricsConfig,
pub pack_emitter: PackEmitterSettings,
pub rerake_timings: RerakeTimings,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RerakeTimings {
/// How long, in days, between re-rakes of the same page?
/// Suggested: 300
pub page: u16,
/// How long, in days, between re-rakes of feeds?
/// Suggested: 10
pub feed: u16,
/// How long, in days, between re-rakes of icons?
/// Suggested: 365
pub icon: u16,
}
impl RakerConfig {

View File

@ -1,3 +1,4 @@
use crate::config::RerakeTimings;
use crate::raking::references::references_from_urlrakes;
use crate::raking::{
get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeIntent,
@ -78,6 +79,8 @@ pub struct TaskContext {
/// Notifier used to wake up sleepers (either to stop them gracefully, or because work
/// is available (not implemented))
pub notify: Arc<Notify>,
pub rerake_timings: Arc<RerakeTimings>,
}
impl TaskContext {
@ -348,7 +351,7 @@ impl TaskContext {
.process_page(url.clone(), page.page_entry, today)
.await?;
self.as_event_processor()
.process_refs(url.clone(), page.referrer_entry, today)
.process_refs(url.clone(), page.referrer_entry, today, false)
.await?;
Ok(NextAction::Continue)
@ -365,7 +368,7 @@ impl TaskContext {
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today)
.process_refs(url.clone(), refs, today, true)
.await?;
Ok(NextAction::Continue)
@ -382,7 +385,7 @@ impl TaskContext {
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today)
.process_refs(url.clone(), refs, today, true)
.await?;
Ok(NextAction::Continue)
@ -427,7 +430,7 @@ impl TaskContext {
.context("Reference processor shut down; can't stream references!")?;
self.as_event_processor()
.process_refs(url.clone(), refs, today)
.process_refs(url.clone(), refs, today, false)
.await?;
Ok(NextAction::Continue)
@ -474,6 +477,7 @@ impl TaskContext {
fn as_event_processor(&self) -> EventProcessor {
EventProcessor {
store: Cow::Borrowed(&self.store),
rerake_timings: &self.rerake_timings,
}
}
}
@ -483,6 +487,7 @@ impl TaskContext {
/// just by replaying the stream of RakePacks and importing seeds.
pub struct EventProcessor<'a> {
store: Cow<'a, RakerStore>,
rerake_timings: &'a RerakeTimings,
}
impl EventProcessor<'_> {
@ -492,6 +497,7 @@ impl EventProcessor<'_> {
page: RakedPageEntry,
datestamp: u16,
) -> anyhow::Result<()> {
let rerake_on = Some(datestamp + self.rerake_timings.page);
self.store
.as_ref()
.async_rw_txn(move |txn| {
@ -502,6 +508,7 @@ impl EventProcessor<'_> {
UrlVisitedRecord {
last_visited_days: datestamp,
},
rerake_on,
)?;
// If there's a favicon to be tried, add it to the list...
@ -517,6 +524,8 @@ impl EventProcessor<'_> {
}
pub async fn process_icon(&self, url: Url, datestamp: u16) -> anyhow::Result<()> {
let rerake_on = Some(datestamp + self.rerake_timings.icon);
self.store
.as_ref()
.async_rw_txn(move |txn| {
@ -527,6 +536,7 @@ impl EventProcessor<'_> {
UrlVisitedRecord {
last_visited_days: datestamp,
},
rerake_on,
)?;
txn.commit()?;
@ -540,7 +550,13 @@ impl EventProcessor<'_> {
url: Url,
refs: RakedReferrerEntry,
datestamp: u16,
rerakeable_feed: bool,
) -> anyhow::Result<()> {
let rerake_on = if rerakeable_feed {
Some(self.rerake_timings.feed)
} else {
None
};
self.store
.as_ref()
.async_rw_txn(move |txn| {
@ -551,6 +567,7 @@ impl EventProcessor<'_> {
UrlVisitedRecord {
last_visited_days: datestamp,
},
rerake_on,
)?;
// track all the referred-to URLs!
@ -603,6 +620,7 @@ impl EventProcessor<'_> {
UrlVisitedRecord {
last_visited_days: datestamp,
},
None,
)?;
txn.commit()?;
Ok(())

View File

@ -32,6 +32,9 @@ pub mod records;
pub struct Databases<'env> {
/// Domain \n URL → QueueUrlRecord
pub queue_urls: Database<'env>,
/// u16 → URL. The u16 is the day-precision QuickPeep timestamp at which the URL should (MULTI-VALUE; INT16)
/// be enqueued again for reraking.
pub rerake_queue: Database<'env>,
/// Domain → ActiveDomainRecord
pub active_domains: Database<'env>,
/// u32 → domain name. Used to try and give some fairness.
@ -54,6 +57,7 @@ impl<'env> Databases<'env> {
pub fn iter_all_databases(&self) -> impl Iterator<Item = (&'static str, &Database<'env>)> {
[
("queue_urls", &self.queue_urls),
("rerake_queue", &self.rerake_queue),
("active_domains", &self.active_domains),
("active_domain_raffle", &self.active_domain_raffle),
(
@ -71,8 +75,9 @@ impl<'env> Databases<'env> {
}
// Must match the order of the Databases struct fields.
pub const DATABASES: [(&'static str, DatabaseFlags); 9] = [
pub const DATABASES: [(&'static str, DatabaseFlags); 10] = [
("urls_queue", DatabaseFlags::empty()),
("rerake_queue", DatabaseFlags::DUP_SORT),
("active_domains", DatabaseFlags::empty()),
("active_domain_raffle", DatabaseFlags::INTEGER_KEY),
(
@ -176,6 +181,7 @@ impl RakerStore {
// Must match the order of the DATABASES constant and the struct field definitions
Databases {
queue_urls: dbs.next().unwrap(),
rerake_queue: dbs.next().unwrap(),
active_domains: dbs.next().unwrap(),
active_domain_raffle: dbs.next().unwrap(),
backing_off_reinstatements: dbs.next().unwrap(),
@ -337,9 +343,11 @@ impl<'a> RakerTxn<'a, RW> {
domain: &str,
url_str: &str,
record: UrlVisitedRecord,
rerake_on: Option<u16>,
) -> anyhow::Result<()> {
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
let visited_urls = &self.mdbx.borrow_dbs().visited_urls;
let rerake_queue = &self.mdbx.borrow_dbs().rerake_queue;
let queue_key = format!("{}\n{}", domain, url_str);
@ -358,6 +366,15 @@ impl<'a> RakerTxn<'a, RW> {
WriteFlags::empty(),
)?;
if let Some(rerake_on) = rerake_on {
self.mdbx_txn.put(
rerake_queue,
&rerake_on.to_be_bytes(),
url_str.as_bytes(),
WriteFlags::empty(),
)?;
}
Ok(())
}