diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index 065055c..af5bd20 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -140,6 +140,7 @@ pub async fn main() -> anyhow::Result<()> { semaphore, submission, graceful_stop, + notify: Arc::new(Default::default()), }; let mut tasks = Vec::with_capacity(num_tasks as usize); diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index 62b60f1..ce4beb2 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -5,8 +5,8 @@ use crate::raking::{ Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, }; use crate::storage::records::UrlVisitedRecord; -use crate::storage::RakerStore; -use anyhow::{anyhow, Context}; +use crate::storage::{RakerStore, RandomActiveDomainAcquisition}; +use anyhow::{anyhow, ensure, Context}; use chrono::Utc; use cylon::Cylon; use log::warn; @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::time::Duration; use tokio::sync::mpsc::Sender; -use tokio::sync::Semaphore; +use tokio::sync::{Notify, Semaphore}; use tokio::time::Instant; /// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the @@ -69,6 +69,10 @@ pub struct TaskContext { pub submission: TaskResultSubmission, pub graceful_stop: Arc, + + /// Notifier used to wake up sleepers (either to stop them gracefully, or because work + /// is available (not implemented)) + pub notify: Arc, } impl TaskContext { @@ -77,31 +81,39 @@ impl TaskContext { while !self.graceful_stop.load(Ordering::Relaxed) { let domain = { let txn = self.store.ro_txn()?; - txn.choose_random_active_domain()? + txn.acquire_random_active_domain(self.busy_domains.clone())? }; match domain { - Some((domain, _active_record)) => { - let is_ours = { - let mut busy_domains = self - .busy_domains + RandomActiveDomainAcquisition::GotOne { + domain, + record: _active_record, + } => { + self.process_domain(domain.clone()).await?; + ensure!( + self.busy_domains .lock() - .map_err(|_| anyhow!("Busy Domains set poisoned"))?; - busy_domains.insert(domain.clone()) - }; - - if is_ours { - self.process_domain(domain).await?; - } else { - // Already in use; need to pick again - // TODO(perf) be smarter about this. - tokio::time::sleep(Duration::from_secs(60)).await; - } + .map_err(|_| anyhow!("busy domains set poisoned"))? + .remove(&domain), + "Our domain was not busy after processing!" + ); } - None => { - // No domains left! Woot. - // TODO(perf, feature) come up with some stopping conditions - tokio::time::sleep(Duration::from_secs(60)).await; + RandomActiveDomainAcquisition::AllBusy => { + // TODO(perf): notify waiters when new domains are available. + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(60)) => { + // nop + }, + _ = self.notify.notified() => { + // nop (we allow the notifier to wake us up in case we need to gracefully + // stop). + }, + }; + } + RandomActiveDomainAcquisition::NoneLeft => { + // Nothing left to do, and it's not temporary because there aren't even any + // busy domains left. + break; } } } diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index 6b56af8..e8d1fc7 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -6,7 +6,7 @@ use crate::storage::records::{ ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, QueueUrlRecord, UrlVisitedRecord, }; -use anyhow::{bail, ensure, Context}; +use anyhow::{anyhow, bail, ensure, Context}; use libmdbx::{ Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind, WriteFlags, WriteMap, RO, RW, @@ -15,10 +15,11 @@ use log::info; use metrics::{describe_gauge, gauge, Unit}; use ouroboros::self_referencing; use reqwest::Url; -use std::borrow::Cow; +use std::borrow::{Borrow, Cow}; +use std::collections::HashSet; use std::path::Path; use std::sync::atomic::AtomicU64; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; mod mdbx_helper_types; @@ -473,13 +474,27 @@ pub fn register_datastore_metrics() -> anyhow::Result<()> { Ok(()) } +pub enum RandomActiveDomainAcquisition { + GotOne { + domain: String, + record: ActiveDomainRecord, + }, + AllBusy, + NoneLeft, +} + /// Read-only implementations (but can also be used on RW transactions) impl<'a, K: TransactionKind> RakerTxn<'a, K> { - pub fn choose_random_active_domain( + /// Chooses a domain that is not busy, then marks it as busy. + pub fn acquire_random_active_domain( &self, - ) -> anyhow::Result> { + busy_domains: Arc>>, + ) -> anyhow::Result { let active_domains = &self.mdbx.borrow_dbs().active_domains; let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; + let mut busy_domains = busy_domains + .lock() + .map_err(|_| anyhow!("busy domain set poisoned"))?; let mut cur = self.mdbx_txn.cursor(&active_domain_raffle)?; @@ -499,11 +514,38 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> { (k, v) } else { // No entries left! - return Ok(None); + return Ok(RandomActiveDomainAcquisition::NoneLeft); } } }; + let (raffle_ticket, domain) = if busy_domains.contains::(domain.0.borrow()) { + // This domain is already busy. + // As a fallback, sequential-scan the raffle ticket table and look for something new. + let mut found = None; + for entry in cur.iter_start::() { + let (k, v) = entry?; + if !busy_domains.contains::(v.0.borrow()) { + found = Some((k, v)); + break; + } + } + match found { + None => { + // ALL the rows are busy! + return Ok(RandomActiveDomainAcquisition::AllBusy); + } + Some(entry) => entry, + } + } else { + (raffle_ticket, domain) + }; + + ensure!( + busy_domains.insert(domain.clone().into_string()), + "Domain already present even though it was checked to be free" + ); + let record = if let Some(record) = self .mdbx_txn .get::>(active_domains, domain.as_bytes())? @@ -517,7 +559,10 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> { bail!("Inconsistent database: picked raffle ticket {:?} but domain {:?} thinks it had {:?}", raffle_ticket, domain.0, record.raffle_ticket); } - Ok(Some((domain.into_string(), record))) + Ok(RandomActiveDomainAcquisition::GotOne { + domain: domain.into_string(), + record, + }) } pub fn choose_url_for_domain(