Improve domain acquisition and shutdown logic

This commit is contained in:
Olivier 'reivilibre' 2022-03-20 23:01:20 +00:00
parent 06b3c54b81
commit f60031a462
3 changed files with 88 additions and 30 deletions

View File

@ -140,6 +140,7 @@ pub async fn main() -> anyhow::Result<()> {
semaphore, semaphore,
submission, submission,
graceful_stop, graceful_stop,
notify: Arc::new(Default::default()),
}; };
let mut tasks = Vec::with_capacity(num_tasks as usize); let mut tasks = Vec::with_capacity(num_tasks as usize);

View File

@ -5,8 +5,8 @@ use crate::raking::{
Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
}; };
use crate::storage::records::UrlVisitedRecord; use crate::storage::records::UrlVisitedRecord;
use crate::storage::RakerStore; use crate::storage::{RakerStore, RandomActiveDomainAcquisition};
use anyhow::{anyhow, Context}; use anyhow::{anyhow, ensure, Context};
use chrono::Utc; use chrono::Utc;
use cylon::Cylon; use cylon::Cylon;
use log::warn; use log::warn;
@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::sync::{Arc, Mutex as StdMutex, RwLock};
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore; use tokio::sync::{Notify, Semaphore};
use tokio::time::Instant; use tokio::time::Instant;
/// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the /// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the
@ -69,6 +69,10 @@ pub struct TaskContext {
pub submission: TaskResultSubmission, pub submission: TaskResultSubmission,
pub graceful_stop: Arc<AtomicBool>, pub graceful_stop: Arc<AtomicBool>,
/// Notifier used to wake up sleepers (either to stop them gracefully, or because work
/// is available (not implemented))
pub notify: Arc<Notify>,
} }
impl TaskContext { impl TaskContext {
@ -77,31 +81,39 @@ impl TaskContext {
while !self.graceful_stop.load(Ordering::Relaxed) { while !self.graceful_stop.load(Ordering::Relaxed) {
let domain = { let domain = {
let txn = self.store.ro_txn()?; let txn = self.store.ro_txn()?;
txn.choose_random_active_domain()? txn.acquire_random_active_domain(self.busy_domains.clone())?
}; };
match domain { match domain {
Some((domain, _active_record)) => { RandomActiveDomainAcquisition::GotOne {
let is_ours = { domain,
let mut busy_domains = self record: _active_record,
.busy_domains } => {
self.process_domain(domain.clone()).await?;
ensure!(
self.busy_domains
.lock() .lock()
.map_err(|_| anyhow!("Busy Domains set poisoned"))?; .map_err(|_| anyhow!("busy domains set poisoned"))?
busy_domains.insert(domain.clone()) .remove(&domain),
"Our domain was not busy after processing!"
);
}
RandomActiveDomainAcquisition::AllBusy => {
// TODO(perf): notify waiters when new domains are available.
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {
// nop
},
_ = self.notify.notified() => {
// nop (we allow the notifier to wake us up in case we need to gracefully
// stop).
},
}; };
if is_ours {
self.process_domain(domain).await?;
} else {
// Already in use; need to pick again
// TODO(perf) be smarter about this.
tokio::time::sleep(Duration::from_secs(60)).await;
} }
} RandomActiveDomainAcquisition::NoneLeft => {
None => { // Nothing left to do, and it's not temporary because there aren't even any
// No domains left! Woot. // busy domains left.
// TODO(perf, feature) come up with some stopping conditions break;
tokio::time::sleep(Duration::from_secs(60)).await;
} }
} }
} }

View File

@ -6,7 +6,7 @@ use crate::storage::records::{
ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, QueueUrlRecord, ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, QueueUrlRecord,
UrlVisitedRecord, UrlVisitedRecord,
}; };
use anyhow::{bail, ensure, Context}; use anyhow::{anyhow, bail, ensure, Context};
use libmdbx::{ use libmdbx::{
Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind,
WriteFlags, WriteMap, RO, RW, WriteFlags, WriteMap, RO, RW,
@ -15,10 +15,11 @@ use log::info;
use metrics::{describe_gauge, gauge, Unit}; use metrics::{describe_gauge, gauge, Unit};
use ouroboros::self_referencing; use ouroboros::self_referencing;
use reqwest::Url; use reqwest::Url;
use std::borrow::Cow; use std::borrow::{Borrow, Cow};
use std::collections::HashSet;
use std::path::Path; use std::path::Path;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
mod mdbx_helper_types; mod mdbx_helper_types;
@ -473,13 +474,27 @@ pub fn register_datastore_metrics() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub enum RandomActiveDomainAcquisition {
GotOne {
domain: String,
record: ActiveDomainRecord,
},
AllBusy,
NoneLeft,
}
/// Read-only implementations (but can also be used on RW transactions) /// Read-only implementations (but can also be used on RW transactions)
impl<'a, K: TransactionKind> RakerTxn<'a, K> { impl<'a, K: TransactionKind> RakerTxn<'a, K> {
pub fn choose_random_active_domain( /// Chooses a domain that is not busy, then marks it as busy.
pub fn acquire_random_active_domain(
&self, &self,
) -> anyhow::Result<Option<(String, ActiveDomainRecord)>> { busy_domains: Arc<Mutex<HashSet<String>>>,
) -> anyhow::Result<RandomActiveDomainAcquisition> {
let active_domains = &self.mdbx.borrow_dbs().active_domains; let active_domains = &self.mdbx.borrow_dbs().active_domains;
let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle;
let mut busy_domains = busy_domains
.lock()
.map_err(|_| anyhow!("busy domain set poisoned"))?;
let mut cur = self.mdbx_txn.cursor(&active_domain_raffle)?; let mut cur = self.mdbx_txn.cursor(&active_domain_raffle)?;
@ -499,11 +514,38 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> {
(k, v) (k, v)
} else { } else {
// No entries left! // No entries left!
return Ok(None); return Ok(RandomActiveDomainAcquisition::NoneLeft);
} }
} }
}; };
let (raffle_ticket, domain) = if busy_domains.contains::<str>(domain.0.borrow()) {
// This domain is already busy.
// As a fallback, sequential-scan the raffle ticket table and look for something new.
let mut found = None;
for entry in cur.iter_start::<MdbxU32, MdbxString>() {
let (k, v) = entry?;
if !busy_domains.contains::<str>(v.0.borrow()) {
found = Some((k, v));
break;
}
}
match found {
None => {
// ALL the rows are busy!
return Ok(RandomActiveDomainAcquisition::AllBusy);
}
Some(entry) => entry,
}
} else {
(raffle_ticket, domain)
};
ensure!(
busy_domains.insert(domain.clone().into_string()),
"Domain already present even though it was checked to be free"
);
let record = if let Some(record) = self let record = if let Some(record) = self
.mdbx_txn .mdbx_txn
.get::<MdbxBare<ActiveDomainRecord>>(active_domains, domain.as_bytes())? .get::<MdbxBare<ActiveDomainRecord>>(active_domains, domain.as_bytes())?
@ -517,7 +559,10 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> {
bail!("Inconsistent database: picked raffle ticket {:?} but domain {:?} thinks it had {:?}", raffle_ticket, domain.0, record.raffle_ticket); bail!("Inconsistent database: picked raffle ticket {:?} but domain {:?} thinks it had {:?}", raffle_ticket, domain.0, record.raffle_ticket);
} }
Ok(Some((domain.into_string(), record))) Ok(RandomActiveDomainAcquisition::GotOne {
domain: domain.into_string(),
record,
})
} }
pub fn choose_url_for_domain( pub fn choose_url_for_domain(