881 lines
29 KiB
Rust
881 lines
29 KiB
Rust
use crate::raking::{RakeIntent, TemporaryFailure};
|
|
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU16BE, MdbxU32, MdbxU64};
|
|
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
|
|
use crate::storage::records::{
|
|
ActiveDomainRecord, BackingOffDomainRecord, DomainRecord, OnHoldUrlRecord, QueueUrlRecord,
|
|
UrlVisitedRecord,
|
|
};
|
|
use anyhow::{anyhow, bail, ensure, Context};
|
|
use libmdbx::{
|
|
Database, DatabaseFlags, Environment, EnvironmentFlags, Geometry, Transaction, TransactionKind,
|
|
WriteFlags, WriteMap, RO, RW,
|
|
};
|
|
use log::info;
|
|
use metrics::{describe_gauge, gauge, Unit};
|
|
use ouroboros::self_referencing;
|
|
use quickpeep_utils::urls::get_reduced_domain;
|
|
use reqwest::Url;
|
|
use std::borrow::{Borrow, Cow};
|
|
use std::collections::{BTreeSet, HashSet};
|
|
use std::ops::Add;
|
|
use std::path::Path;
|
|
use std::sync::atomic::AtomicU64;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
|
|
|
pub mod maintenance;
|
|
pub mod mdbx_helper_types;
|
|
mod migrations;
|
|
pub mod records;
|
|
|
|
/// The databases available in an environment.
|
|
pub struct Databases<'env> {
|
|
/// Domain \n URL → QueueUrlRecord
|
|
pub queue_urls: Database<'env>,
|
|
/// u16 → URL. The u16 is the day-precision QuickPeep timestamp at which the URL should (MULTI-VALUE; INT16)
|
|
/// be enqueued again for reraking.
|
|
pub rerake_queue: Database<'env>,
|
|
/// Domain → ActiveDomainRecord
|
|
pub active_domains: Database<'env>,
|
|
/// u32 → domain name. Used to try and give some fairness.
|
|
pub active_domain_raffle: Database<'env>,
|
|
/// timestamp → BackingOffReinstatementRecord (MULTI-VALUE; INT)
|
|
pub backing_off_reinstatements: Database<'env>,
|
|
/// Domain → BackingOffDomainRecord
|
|
pub backing_off_domains: Database<'env>,
|
|
/// URL → VisitedDomainRecord
|
|
pub visited_urls: Database<'env>,
|
|
/// Domain → DomainRecord
|
|
pub domains: Database<'env>,
|
|
/// Domain \n URL → OnHoldUrlRecord Number of refs (INT VALUE)
|
|
pub urls_on_hold: Database<'env>,
|
|
}
|
|
|
|
impl<'env> Databases<'env> {
|
|
pub fn iter_all_databases(&self) -> impl Iterator<Item = (&'static str, &Database<'env>)> {
|
|
[
|
|
("queue_urls", &self.queue_urls),
|
|
("rerake_queue", &self.rerake_queue),
|
|
("active_domains", &self.active_domains),
|
|
("active_domain_raffle", &self.active_domain_raffle),
|
|
(
|
|
"backing_off_reinstatements",
|
|
&self.backing_off_reinstatements,
|
|
),
|
|
("backing_off_domains", &self.backing_off_domains),
|
|
("visited_urls", &self.visited_urls),
|
|
("domains", &self.domains),
|
|
("urls_on_hold", &self.urls_on_hold),
|
|
]
|
|
.into_iter()
|
|
}
|
|
}
|
|
|
|
// Must match the order of the Databases struct fields.
|
|
pub const DATABASES: [(&'static str, DatabaseFlags); 9] = [
|
|
("urls_queue", DatabaseFlags::empty()),
|
|
("rerake_queue", DatabaseFlags::DUP_SORT),
|
|
("active_domains", DatabaseFlags::empty()),
|
|
("active_domain_raffle", DatabaseFlags::INTEGER_KEY),
|
|
(
|
|
"backing_off_reinstatements",
|
|
DatabaseFlags::INTEGER_KEY.union(DatabaseFlags::DUP_SORT),
|
|
),
|
|
("backing_off_domains", DatabaseFlags::empty()),
|
|
("urls_visited", DatabaseFlags::empty()),
|
|
("domains", DatabaseFlags::empty()),
|
|
("urls_on_hold", DatabaseFlags::empty()),
|
|
];
|
|
|
|
#[self_referencing]
|
|
pub struct RakerDb {
|
|
pub env: Environment<WriteMap>,
|
|
#[borrows(env)]
|
|
#[covariant]
|
|
pub dbs: Databases<'this>,
|
|
}
|
|
|
|
/// Handle to the store.
|
|
/// Doesn't need wrapping in Arc because it already is.
|
|
#[derive(Clone)]
|
|
pub struct RakerStore {
|
|
pub mdbx: Arc<RakerDb>,
|
|
pub metrics: Option<Arc<RakerStoreMetrics>>,
|
|
}
|
|
|
|
impl RakerStore {
|
|
pub fn open(path: &Path) -> anyhow::Result<RakerStore> {
|
|
let mut flags = EnvironmentFlags::default();
|
|
flags.no_sub_dir = true;
|
|
|
|
let mut geom = Geometry::default();
|
|
// Don't stop the database growing until it hits 64 GiB.
|
|
// (The default is 1 MiB which is just not enough!)
|
|
geom.size = Some(1024 * 1024..64 * 1024 * 1024 * 1024);
|
|
|
|
// Grow 16 MiB at a time.
|
|
geom.growth_step = Some(16 * 1024 * 1024);
|
|
// Shrink 64 MiB at a time.
|
|
geom.shrink_threshold = Some(64 * 1024 * 1024);
|
|
// (Yes these numbers represent a large database).
|
|
|
|
let env = Environment::new()
|
|
.set_geometry(geom)
|
|
.set_max_dbs(256)
|
|
.set_flags(flags)
|
|
.open(path)?;
|
|
|
|
let mdbx: RakerDb = RakerDbBuilder {
|
|
env,
|
|
dbs_builder: |env: &Environment<WriteMap>| {
|
|
let txn = env
|
|
.begin_rw_txn()
|
|
.expect("Should be able to start a transaction");
|
|
|
|
let root_meta_db = txn
|
|
.create_db(None, DatabaseFlags::empty())
|
|
.expect("txn.create_db failed");
|
|
match txn
|
|
.get::<MdbxString>(&root_meta_db, MIGRATION_KEY)
|
|
.expect("txn.get failed")
|
|
{
|
|
None => {
|
|
info!("Loading database with no migration version. Assuming it's fresh!");
|
|
txn.put(
|
|
&root_meta_db,
|
|
MIGRATION_KEY,
|
|
MIGRATION_VERSION.as_bytes(),
|
|
WriteFlags::empty(),
|
|
)
|
|
.expect("txn.put failed");
|
|
}
|
|
Some(version) => {
|
|
info!("Loading database with migration version {:?}", version.0);
|
|
if MIGRATION_VERSION != version.0.as_ref() {
|
|
panic!(
|
|
"Migration version not supported: {:?} (I support {:?})",
|
|
version.0.as_ref(),
|
|
MIGRATION_VERSION
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
for (db_name, db_flags) in DATABASES {
|
|
let db = txn
|
|
.create_db(Some(db_name), db_flags)
|
|
.expect("Failed to open database");
|
|
|
|
txn.prime_for_permaopen(db);
|
|
}
|
|
|
|
let (_bool, dbs) = txn
|
|
.commit_and_rebind_open_dbs()
|
|
.expect("Failed to commit & rebind");
|
|
let mut dbs = dbs.into_iter();
|
|
|
|
// Must match the order of the DATABASES constant and the struct field definitions
|
|
Databases {
|
|
queue_urls: dbs.next().unwrap(),
|
|
rerake_queue: dbs.next().unwrap(),
|
|
active_domains: dbs.next().unwrap(),
|
|
active_domain_raffle: dbs.next().unwrap(),
|
|
backing_off_reinstatements: dbs.next().unwrap(),
|
|
backing_off_domains: dbs.next().unwrap(),
|
|
visited_urls: dbs.next().unwrap(),
|
|
domains: dbs.next().unwrap(),
|
|
urls_on_hold: dbs.next().unwrap(),
|
|
}
|
|
},
|
|
}
|
|
.build();
|
|
|
|
Ok(RakerStore {
|
|
mdbx: Arc::new(mdbx),
|
|
metrics: Some(Arc::new(Default::default())),
|
|
})
|
|
}
|
|
|
|
pub async fn async_rw_txn<F, R>(&self, f: F) -> anyhow::Result<R>
|
|
where
|
|
F: FnOnce(RakerTxn<'_, RW>) -> anyhow::Result<R> + Send + 'static,
|
|
R: Send + 'static,
|
|
{
|
|
// TODO(robustness) consider adding a lock here to prevent all the async executors getting stuck here...
|
|
let this = self.clone();
|
|
Ok(tokio::task::spawn_blocking(move || -> anyhow::Result<R> {
|
|
let txn = this.rw_txn()?;
|
|
let r = f(txn)?;
|
|
Ok(r)
|
|
})
|
|
.await??)
|
|
}
|
|
|
|
pub fn rw_txn(&self) -> anyhow::Result<RakerTxn<'_, RW>> {
|
|
let mdbx_txn = self.mdbx.borrow_env().begin_rw_txn()?;
|
|
Ok(RakerTxn {
|
|
mdbx_txn,
|
|
mdbx: self.mdbx.clone(),
|
|
})
|
|
}
|
|
|
|
pub fn ro_txn(&self) -> anyhow::Result<RakerTxn<'_, RO>> {
|
|
let mdbx_txn = self.mdbx.borrow_env().begin_ro_txn()?;
|
|
Ok(RakerTxn {
|
|
mdbx_txn,
|
|
mdbx: self.mdbx.clone(),
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Optional struct to store metrics. Intended for exporting over Prometheus so we can see at a
|
|
/// glance what the shape of the datastore looks like.
|
|
#[derive(Default)]
|
|
pub struct RakerStoreMetrics {
|
|
pub active_domains_count: AtomicU64,
|
|
pub backoff_domains_count: AtomicU64,
|
|
pub queued_url_count: AtomicU64,
|
|
}
|
|
|
|
pub struct RakerTxn<'a, K: TransactionKind> {
|
|
pub mdbx_txn: Transaction<'a, K, WriteMap>,
|
|
pub mdbx: Arc<RakerDb>,
|
|
}
|
|
|
|
impl<'a> RakerTxn<'a, RW> {
|
|
pub fn commit(self) -> anyhow::Result<()> {
|
|
self.mdbx_txn.commit()?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Inserts a domain into the active domain table,
|
|
/// generating a raffle ticket (and inserting it too).
|
|
///
|
|
/// No-op if the domain is already an active domain or being backed off from.
|
|
pub fn insert_active_domain_with_new_raffle_ticket(
|
|
&self,
|
|
new_domain: String,
|
|
) -> anyhow::Result<()> {
|
|
let active_domains = &self.mdbx.borrow_dbs().active_domains;
|
|
let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle;
|
|
let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains;
|
|
|
|
let new_domain = MdbxString(Cow::Owned(new_domain));
|
|
|
|
if self
|
|
.mdbx_txn
|
|
.get::<()>(active_domains, new_domain.as_bytes())?
|
|
.is_some()
|
|
{
|
|
return Ok(());
|
|
}
|
|
|
|
if self
|
|
.mdbx_txn
|
|
.get::<()>(backing_off_domains, new_domain.as_bytes())?
|
|
.is_some()
|
|
{
|
|
return Ok(());
|
|
}
|
|
|
|
let reserved_raffle_ticket = loop {
|
|
let next_raffle_ticket = MdbxU32(rand::random());
|
|
if self
|
|
.mdbx_txn
|
|
.get::<()>(active_domain_raffle, &next_raffle_ticket.as_bytes())?
|
|
.is_none()
|
|
{
|
|
break next_raffle_ticket;
|
|
}
|
|
};
|
|
|
|
self.mdbx_txn.put(
|
|
active_domain_raffle,
|
|
&reserved_raffle_ticket.as_bytes(),
|
|
new_domain.as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
self.mdbx_txn.put(
|
|
active_domains,
|
|
new_domain.as_bytes(),
|
|
MdbxBare(ActiveDomainRecord {
|
|
raffle_ticket: reserved_raffle_ticket.0,
|
|
})
|
|
.as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Removes a domain from the active domains table, deleting its raffle ticket from the raffle
|
|
/// table.
|
|
///
|
|
/// Returns true if a deletion took place, and false if it did not.
|
|
pub fn remove_active_domain(&self, domain: &str) -> anyhow::Result<bool> {
|
|
let active_domains = &self.mdbx.borrow_dbs().active_domains;
|
|
let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle;
|
|
|
|
let domain = MdbxString(Cow::Borrowed(domain));
|
|
|
|
if let Some(MdbxBare(active_domain)) = self
|
|
.mdbx_txn
|
|
.get::<MdbxBare<ActiveDomainRecord>>(active_domains, domain.as_bytes())?
|
|
{
|
|
ensure!(self.mdbx_txn.del(active_domains, domain.as_bytes(), None)?);
|
|
let raffle_ticket = MdbxU32(active_domain.raffle_ticket);
|
|
ensure!(self
|
|
.mdbx_txn
|
|
.del(active_domain_raffle, raffle_ticket.as_bytes(), None)?);
|
|
Ok(true)
|
|
} else {
|
|
Ok(false)
|
|
}
|
|
}
|
|
|
|
/// Marks a URL as visited and takes it out of the queue.
|
|
pub fn mark_url_as_visited(
|
|
&self,
|
|
domain: &str,
|
|
url_str: &str,
|
|
record: UrlVisitedRecord,
|
|
rerake_on: Option<u16>,
|
|
) -> anyhow::Result<()> {
|
|
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
|
|
let visited_urls = &self.mdbx.borrow_dbs().visited_urls;
|
|
let rerake_queue = &self.mdbx.borrow_dbs().rerake_queue;
|
|
|
|
let queue_key = format!("{}\n{}", domain, url_str);
|
|
|
|
// We legitimately want this to NOP when already dequeued; so don't ensure the opposite.
|
|
// ensure!(
|
|
// self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?,
|
|
// "No queued URL to delete ({})", queue_key
|
|
// );
|
|
|
|
self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?;
|
|
|
|
self.mdbx_txn.put(
|
|
visited_urls,
|
|
url_str.as_bytes(),
|
|
&MdbxBare(record).as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
|
|
if let Some(rerake_on) = rerake_on {
|
|
self.mdbx_txn.put(
|
|
rerake_queue,
|
|
&rerake_on.to_be_bytes(),
|
|
url_str.as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Marks a URL as visited and takes it out of the queue.
|
|
pub fn dequeue_url(&self, domain: &str, url_str: &str) -> anyhow::Result<()> {
|
|
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
|
|
let queue_key = format!("{}\n{}", domain, url_str);
|
|
self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn start_backing_off(
|
|
&self,
|
|
domain: &str,
|
|
backoff_for: u32,
|
|
failed_url: String,
|
|
failure: TemporaryFailure,
|
|
) -> anyhow::Result<()> {
|
|
ensure!(
|
|
self.remove_active_domain(domain)?,
|
|
"Can't back off from domain that's not active"
|
|
);
|
|
let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains;
|
|
let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements;
|
|
|
|
let reinstate_at =
|
|
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() + backoff_for as u64;
|
|
|
|
let backoff_record = BackingOffDomainRecord {
|
|
failed_url,
|
|
failure,
|
|
backoff: backoff_for,
|
|
reinstate_at,
|
|
};
|
|
|
|
self.mdbx_txn.put(
|
|
backing_off_domains,
|
|
domain.as_bytes(),
|
|
&MdbxBare(backoff_record).as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
self.mdbx_txn.put(
|
|
backing_off_reinstatements,
|
|
MdbxU64(reinstate_at).as_bytes(),
|
|
domain.as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Reinstates backing-off domains up to the specified time.
|
|
/// Returns the time of the next reinstatement, if there is one.
|
|
pub fn reinstate_backoffs(&self, up_to_ts: SystemTime) -> anyhow::Result<Option<SystemTime>> {
|
|
let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains;
|
|
let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements;
|
|
|
|
let reinstate_up_to = up_to_ts.duration_since(UNIX_EPOCH)?.as_secs();
|
|
|
|
let mut cur = self.mdbx_txn.cursor(backing_off_reinstatements)?;
|
|
cur.first::<MdbxU64, MdbxString>()?;
|
|
loop {
|
|
let (MdbxU64(reinstatement_time), domain_to_reinstate) =
|
|
match cur.get_current::<MdbxU64, MdbxString>()? {
|
|
Some(x) => x,
|
|
None => break,
|
|
};
|
|
|
|
if reinstatement_time > reinstate_up_to {
|
|
return Ok(Some(
|
|
UNIX_EPOCH.add(Duration::from_secs(reinstatement_time)),
|
|
));
|
|
}
|
|
|
|
let dom_str = domain_to_reinstate.into_string();
|
|
self.mdbx_txn
|
|
.del(backing_off_domains, dom_str.clone(), None)?;
|
|
self.insert_active_domain_with_new_raffle_ticket(dom_str)?;
|
|
cur.del(WriteFlags::empty())?;
|
|
}
|
|
|
|
Ok(None)
|
|
}
|
|
|
|
/// Reinstates URLs that are now re-rakable.
|
|
pub fn reinstate_rerakables(&self, today: u16) -> anyhow::Result<()> {
|
|
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
|
|
let rerake_queue = &self.mdbx.borrow_dbs().rerake_queue;
|
|
|
|
let mut reinstatable_domains: BTreeSet<String> = BTreeSet::new();
|
|
|
|
let mut cur = self.mdbx_txn.cursor(rerake_queue)?;
|
|
cur.first::<MdbxU16BE, MdbxString>()?;
|
|
loop {
|
|
let (MdbxU16BE(rerake_datestamp), url_to_rerake) =
|
|
match cur.get_current::<MdbxU16BE, MdbxString>()? {
|
|
Some(x) => x,
|
|
None => break,
|
|
};
|
|
|
|
if rerake_datestamp > today {
|
|
break;
|
|
}
|
|
|
|
let url_str = url_to_rerake.into_string();
|
|
let url = Url::parse(&url_str).context("Failed to parse rerakable URL")?;
|
|
let url_domain =
|
|
get_reduced_domain(&url).context("Unable to reduce domain for rerakable URL")?;
|
|
|
|
self.mdbx_txn.put(
|
|
queue_urls,
|
|
format!("{}\n{}", url_domain, url_str).as_bytes(),
|
|
// TODO(correctness): should specify the same intent as before.
|
|
&MdbxBare(QueueUrlRecord {
|
|
intent: RakeIntent::Any,
|
|
})
|
|
.as_bytes(),
|
|
WriteFlags::NO_OVERWRITE,
|
|
)?;
|
|
|
|
reinstatable_domains.insert(url_domain.into_owned());
|
|
|
|
cur.del(WriteFlags::empty())?;
|
|
}
|
|
|
|
for domain in reinstatable_domains {
|
|
self.insert_active_domain_with_new_raffle_ticket(domain)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Enqueues a URL.
|
|
/// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been
|
|
/// visited since then.
|
|
/// If `only_if_not_visited_since` is not specified, then this is a no-op if the page has already
|
|
/// been visited.
|
|
///
|
|
/// Returns: true if it was enqueued, false if nothing changed.
|
|
pub fn enqueue_url(
|
|
&self,
|
|
url_str: &str,
|
|
last_modified: Option<u16>,
|
|
intent: RakeIntent,
|
|
) -> anyhow::Result<bool> {
|
|
let queue_urls = &self.mdbx.borrow_dbs().queue_urls;
|
|
let visited_urls = &self.mdbx.borrow_dbs().visited_urls;
|
|
|
|
let url = Url::parse(url_str)?;
|
|
let url_domain = get_reduced_domain(&url)
|
|
.with_context(|| format!("No domain for to-be-enqueued URL: '{url}'!"))?;
|
|
|
|
let queue_key = format!("{}\n{}", url_domain, url);
|
|
|
|
if self
|
|
.mdbx_txn
|
|
.get::<()>(queue_urls, queue_key.as_bytes())?
|
|
.is_some()
|
|
{
|
|
// Already in the queue. Nothing to do here.
|
|
return Ok(false);
|
|
}
|
|
|
|
if let Some(MdbxBare(visited_entry)) = self
|
|
.mdbx_txn
|
|
.get::<MdbxBare<UrlVisitedRecord>>(visited_urls, url_str.as_bytes())?
|
|
{
|
|
match last_modified {
|
|
None => {
|
|
// Already visited. Nothing to do here.
|
|
return Ok(false);
|
|
}
|
|
Some(last_modified) => {
|
|
if last_modified <= visited_entry.last_visited_days {
|
|
// Hasn't been modified since our last visit
|
|
return Ok(false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add the entry to the queue
|
|
self.mdbx_txn.put(
|
|
queue_urls,
|
|
queue_key.as_bytes(),
|
|
&MdbxBare(QueueUrlRecord { intent }).as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
|
|
// Activate the domain if needed...
|
|
self.insert_active_domain_with_new_raffle_ticket(url_domain.into_owned())?;
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
/// Enqueues a URL to the 'on hold' queue.
|
|
///
|
|
/// Returns: true if it was enqueued, false if nothing changed.
|
|
pub fn put_url_on_hold(&self, url_str: &str, intent: RakeIntent) -> anyhow::Result<bool> {
|
|
let urls_on_hold = &self.mdbx.borrow_dbs().urls_on_hold;
|
|
|
|
let url = Url::parse(url_str)?;
|
|
let url_domain = get_reduced_domain(&url)
|
|
.with_context(|| format!("No domain for to-be-put-on-hold URL: '{url}'!"))?;
|
|
|
|
let queue_key = format!("{}\n{}", url_domain, url);
|
|
|
|
let (record, is_new) = if let Some(mut record) = self
|
|
.mdbx_txn
|
|
.get::<MdbxBare<OnHoldUrlRecord>>(urls_on_hold, queue_key.as_bytes())?
|
|
{
|
|
// Already in the queue. Nothing to do here, except bump up the refs count.
|
|
record.0.refs = record.0.refs.saturating_add(1);
|
|
|
|
(record, false)
|
|
} else {
|
|
(
|
|
MdbxBare(OnHoldUrlRecord {
|
|
refs: 1,
|
|
queue_record: QueueUrlRecord { intent },
|
|
}),
|
|
true,
|
|
)
|
|
};
|
|
|
|
// Add the entry to the queue
|
|
self.mdbx_txn.put(
|
|
urls_on_hold,
|
|
queue_key.as_bytes(),
|
|
&record.as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
|
|
Ok(is_new)
|
|
}
|
|
|
|
pub fn put_domain_record(
|
|
&self,
|
|
domain: &str,
|
|
domain_record: DomainRecord,
|
|
) -> anyhow::Result<()> {
|
|
let domains = &self.mdbx.borrow_dbs().domains;
|
|
|
|
self.mdbx_txn.put(
|
|
domains,
|
|
domain.as_bytes(),
|
|
MdbxBare(domain_record).as_bytes(),
|
|
WriteFlags::empty(),
|
|
)?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Registers metrics for the datastore. Call this once at startup.
|
|
pub fn register_datastore_metrics() -> anyhow::Result<()> {
|
|
// Registration is if you want a handle to the Gauge. We don't care for now so leave it be...
|
|
// register_gauge!("db_entries");
|
|
describe_gauge!(
|
|
"db_entries",
|
|
Unit::Count,
|
|
"Number of entries in a specific database."
|
|
);
|
|
// register_gauge!("db_size_bytes");
|
|
describe_gauge!(
|
|
"db_size_bytes",
|
|
Unit::Bytes,
|
|
"Size in bytes of all the pages used by a specific database."
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
|
pub enum RandomActiveDomainAcquisition {
|
|
GotOne {
|
|
domain: String,
|
|
record: ActiveDomainRecord,
|
|
},
|
|
AllBusy,
|
|
NoneLeft,
|
|
}
|
|
|
|
/// Read-only implementations (but can also be used on RW transactions)
|
|
impl<'a, K: TransactionKind> RakerTxn<'a, K> {
|
|
/// Chooses a domain that is not busy, then marks it as busy.
|
|
pub fn acquire_random_active_domain(
|
|
&self,
|
|
busy_domains: Arc<Mutex<HashSet<String>>>,
|
|
) -> anyhow::Result<RandomActiveDomainAcquisition> {
|
|
let active_domains = &self.mdbx.borrow_dbs().active_domains;
|
|
let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle;
|
|
let mut busy_domains = busy_domains
|
|
.lock()
|
|
.map_err(|_| anyhow!("busy domain set poisoned"))?;
|
|
|
|
let mut cur = self.mdbx_txn.cursor(&active_domain_raffle)?;
|
|
|
|
let rand_key: u32 = rand::random();
|
|
|
|
let (raffle_ticket, domain) = match cur
|
|
.iter_from::<MdbxU32, MdbxString>(&MdbxU32(rand_key).as_bytes())
|
|
.next()
|
|
{
|
|
Some(entry) => {
|
|
let (k, v) = entry?;
|
|
(k, v)
|
|
}
|
|
None => {
|
|
// Wrap around to the start for fairness
|
|
if let Some((k, v)) = cur.first::<MdbxU32, MdbxString>()? {
|
|
(k, v)
|
|
} else {
|
|
// No entries left!
|
|
return Ok(RandomActiveDomainAcquisition::NoneLeft);
|
|
}
|
|
}
|
|
};
|
|
|
|
let (raffle_ticket, domain) = if busy_domains.contains::<str>(domain.0.borrow()) {
|
|
// This domain is already busy.
|
|
// As a fallback, sequential-scan the raffle ticket table and look for something new.
|
|
let mut found = None;
|
|
for entry in cur.iter_start::<MdbxU32, MdbxString>() {
|
|
let (k, v) = entry?;
|
|
if !busy_domains.contains::<str>(v.0.borrow()) {
|
|
found = Some((k, v));
|
|
break;
|
|
}
|
|
}
|
|
match found {
|
|
None => {
|
|
// ALL the rows are busy!
|
|
return Ok(RandomActiveDomainAcquisition::AllBusy);
|
|
}
|
|
Some(entry) => entry,
|
|
}
|
|
} else {
|
|
(raffle_ticket, domain)
|
|
};
|
|
|
|
ensure!(
|
|
busy_domains.insert(domain.clone().into_string()),
|
|
"Domain already present even though it was checked to be free"
|
|
);
|
|
|
|
let record = if let Some(record) = self
|
|
.mdbx_txn
|
|
.get::<MdbxBare<ActiveDomainRecord>>(active_domains, domain.as_bytes())?
|
|
{
|
|
record.0
|
|
} else {
|
|
bail!("Inconsistent database: raffle ticket received for domain that isn't present.");
|
|
};
|
|
|
|
if record.raffle_ticket != raffle_ticket.0 {
|
|
bail!("Inconsistent database: picked raffle ticket {:?} but domain {:?} thinks it had {:?}", raffle_ticket, domain.0, record.raffle_ticket);
|
|
}
|
|
|
|
Ok(RandomActiveDomainAcquisition::GotOne {
|
|
domain: domain.into_string(),
|
|
record,
|
|
})
|
|
}
|
|
|
|
pub fn choose_url_for_domain(
|
|
&self,
|
|
domain: &str,
|
|
) -> anyhow::Result<Option<(String, QueueUrlRecord)>> {
|
|
let queue: &Database = &self.mdbx.borrow_dbs().queue_urls;
|
|
|
|
let mut cur = self.mdbx_txn.cursor(queue)?;
|
|
match cur
|
|
.iter_from::<MdbxString, MdbxBare<QueueUrlRecord>>(
|
|
MdbxString(Cow::Owned(format!("{}\n", domain))).as_bytes(),
|
|
)
|
|
.next()
|
|
{
|
|
Some(entry) => {
|
|
let (k, MdbxBare(record)) = entry?;
|
|
let domain_followed_by_url = k.0.as_ref();
|
|
let mut split = domain_followed_by_url.split("\n");
|
|
let actual_domain = split.next().context("No domain")?;
|
|
let url = split.next().context("No URL")?;
|
|
|
|
if domain != actual_domain {
|
|
// This means we've ran out of URLs for the domain in question.
|
|
return Ok(None);
|
|
}
|
|
|
|
ensure!(split.next().is_none(), "Should be no more splits.");
|
|
|
|
Ok(Some((url.to_owned(), record)))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
pub fn get_domain_record(&self, domain: &str) -> anyhow::Result<Option<DomainRecord>> {
|
|
let domains = &self.mdbx.borrow_dbs().domains;
|
|
|
|
match self
|
|
.mdbx_txn
|
|
.get::<MdbxBare<DomainRecord>>(domains, domain.as_bytes())?
|
|
{
|
|
None => Ok(None),
|
|
Some(MdbxBare(record)) => Ok(Some(record)),
|
|
}
|
|
}
|
|
|
|
/// Emits metrics for the datastore. Call this occasionally.
|
|
pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> {
|
|
for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() {
|
|
let stat = self.mdbx_txn.db_stat(db)?;
|
|
let entries = stat.entries() as f64;
|
|
let size_in_pages = stat.branch_pages() + stat.leaf_pages() + stat.overflow_pages();
|
|
let size_in_bytes = stat.page_size() as f64 * size_in_pages as f64;
|
|
gauge!("db_entries", entries, "db" => db_name);
|
|
gauge!("db_size_bytes", size_in_bytes, "db" => db_name);
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub mod test {
|
|
use super::*;
|
|
use crate::raking::TemporaryFailureReason;
|
|
use std::collections::BTreeSet;
|
|
use tempfile::NamedTempFile;
|
|
|
|
#[test]
|
|
fn test_reinstate_multiple_domains() -> anyhow::Result<()> {
|
|
let tfile = NamedTempFile::new()?;
|
|
let store = RakerStore::open(tfile.path())?;
|
|
{
|
|
let txn = store.rw_txn()?;
|
|
txn.insert_active_domain_with_new_raffle_ticket("a.invalid".to_owned())?;
|
|
txn.insert_active_domain_with_new_raffle_ticket("b.invalid".to_owned())?;
|
|
txn.commit()?;
|
|
}
|
|
|
|
let now = SystemTime::now();
|
|
|
|
{
|
|
let txn = store.rw_txn()?;
|
|
txn.start_backing_off(
|
|
"a.invalid",
|
|
300,
|
|
"".to_owned(),
|
|
TemporaryFailure {
|
|
reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1),
|
|
backoff_sec: 300,
|
|
},
|
|
)?;
|
|
txn.start_backing_off(
|
|
"b.invalid",
|
|
300,
|
|
"".to_owned(),
|
|
TemporaryFailure {
|
|
reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1),
|
|
backoff_sec: 300,
|
|
},
|
|
)?;
|
|
txn.commit()?;
|
|
}
|
|
|
|
{
|
|
let txn = store.ro_txn()?;
|
|
assert_eq!(
|
|
txn.acquire_random_active_domain(Default::default())?,
|
|
RandomActiveDomainAcquisition::NoneLeft
|
|
);
|
|
}
|
|
|
|
{
|
|
let txn = store.rw_txn()?;
|
|
txn.reinstate_backoffs(now + Duration::from_secs(600))?;
|
|
txn.commit()?;
|
|
}
|
|
|
|
{
|
|
let txn = store.ro_txn()?;
|
|
let busy = Default::default();
|
|
|
|
let acq1 = txn.acquire_random_active_domain(Arc::clone(&busy))?;
|
|
let acq2 = txn.acquire_random_active_domain(Arc::clone(&busy))?;
|
|
|
|
assert!(
|
|
matches!((acq1.clone(), acq2.clone()), (
|
|
RandomActiveDomainAcquisition::GotOne {
|
|
domain: dom1,
|
|
..
|
|
},
|
|
RandomActiveDomainAcquisition::GotOne {
|
|
domain: dom2,
|
|
..
|
|
}
|
|
) if vec![dom1.as_ref(), dom2.as_ref()].into_iter().collect::<BTreeSet<&str>>() == vec![
|
|
"a.invalid", "b.invalid"
|
|
].into_iter().collect::<BTreeSet<&str>>()),
|
|
"{:#?}",
|
|
(acq1, acq2)
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|