diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index e202497..c0677b0 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -3,9 +3,9 @@ use clap::Parser; use env_logger::Env; use adblock::lists::RuleTypes; -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, ensure, Context}; use chrono::Utc; -use log::{debug, error, warn}; +use log::{debug, error, info, warn}; use lru::LruCache; use metrics_exporter_prometheus::PrometheusBuilder; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; @@ -15,7 +15,7 @@ use signal_hook::iterator::Signals; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use tokio::fs::File; use tokio::sync::{mpsc, oneshot, Notify, Semaphore}; use tokio::time::MissedTickBehavior; @@ -27,7 +27,7 @@ use quickpeep_raker::raking::page_extraction::PageExtractionService; use quickpeep_raker::raking::rakemetrics::describe_raking_metrics; use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission}; use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; -use quickpeep_raker::storage::RakerStore; +use quickpeep_raker::storage::{RakerStore, RandomActiveDomainAcquisition}; use quickpeep_structs::rake_entries::{ AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES, SCHEMA_RAKED_REJECTIONS, @@ -150,8 +150,7 @@ pub async fn main() -> anyhow::Result<()> { describe_raking_metrics(); } - let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; - let semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize)); + let active_fetch_semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize)); let (pages_tx, pages_rx) = mpsc::channel(32); let (refs_tx, refs_rx) = mpsc::channel(32); @@ -269,44 +268,24 @@ pub async fn main() -> anyhow::Result<()> { raker: Arc::new(raker), busy_domains: Arc::new(Mutex::new(Default::default())), robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))), - semaphore, + semaphore: active_fetch_semaphore, submission, graceful_stop, notify: graceful_stop_notify, rerake_timings: Arc::new(config.raker.rerake_timings.clone()), }; - // Reinstate old backoffs + // Reinstate old backoffs and re-rakable URLs store .async_rw_txn(|txn| { + let today = date_to_quickpeep_days(&Utc::today())?; txn.reinstate_backoffs(SystemTime::now())?; - txn.commit()?; - Ok(()) - }) - .await?; - - // Reinstate re-rakable URLs - let today = date_to_quickpeep_days(&Utc::today())?; - store - .async_rw_txn(move |txn| { txn.reinstate_rerakables(today)?; txn.commit()?; Ok(()) }) .await?; - let mut tasks = Vec::with_capacity(num_tasks as usize); - - for task_num in 0..num_tasks { - let task_context = task_context.clone(); - - tasks.push(tokio::spawn(async move { - if let Err(err) = task_context.run().await { - error!("Raker task {:?} encountered an error: {:?}", task_num, err); - } - })); - } - let (dsmu_cancel_tx, mut dsmu_cancel_rx) = oneshot::channel(); let datastore_metrics_updater = { let store = task_context.store.clone(); @@ -331,21 +310,22 @@ pub async fn main() -> anyhow::Result<()> { }) }; - let TaskContext { - graceful_stop, - notify, - submission, - .. - } = task_context; + let graceful_stop = task_context.graceful_stop.clone(); + let notify = task_context.notify.clone(); - // Manually drop submission otherwise the senders don't hang up. - drop(submission); + let worker_semaphore = + Semaphore::new((opts.concurrent_jobs + opts.concurrent_sleepers) as usize); + let orchestrator_handle = tokio::spawn(async move { + if let Err(err) = orchestrator(task_context, Arc::new(worker_semaphore)).await { + error!("Error in orchestrator: {err:?}"); + } + }); // ^C is SIGINT; systemd sends SIGTERM start_signal_handler(Signals::new([SIGINT, SIGTERM])?, graceful_stop, notify)?; - for task in tasks { - task.await?; + if let Err(panic_err) = orchestrator_handle.await { + error!("orchestrator panic: {panic_err:?}"); } for task in emitters { @@ -359,6 +339,101 @@ pub async fn main() -> anyhow::Result<()> { Ok(()) } +async fn acquire_active_domain(task_context: &TaskContext) -> anyhow::Result> { + // Acquire a domain for the task to run against + let domain = { + let txn = task_context.store.ro_txn()?; + // TODO: don't clone teh Arc here — conv to ref. + txn.acquire_random_active_domain(task_context.busy_domains.clone())? + }; + + match domain { + RandomActiveDomainAcquisition::GotOne { domain, record: _ } => Ok(Some(domain)), + RandomActiveDomainAcquisition::AllBusy => Ok(None), + RandomActiveDomainAcquisition::NoneLeft => Ok(None), + } +} + +/// Spawns tasks to do the work as necessary. +/// Performs back-off and re-rake reinstatements periodically and spawns up new workers if needed. +async fn orchestrator(task_context: TaskContext, semaphore: Arc) -> anyhow::Result<()> { + let mut next_reinstate = Instant::now() + Duration::from_secs(1800); + let max_permits = semaphore.available_permits(); + + while !task_context.graceful_stop.load(Ordering::Relaxed) { + // Spawn up new tasks if there are available worker permits. + + let domain_to_process = acquire_active_domain(&task_context) + .await + .context("failed trying to acquire active domain")?; + + if domain_to_process.is_none() && semaphore.available_permits() == max_permits { + // There's nothing to do and nothing is being processed. + ensure!( + task_context.busy_domains.lock().unwrap().is_empty(), + "Shutting down orchestrator but set of busy domains is not empty." + ); + } + + tokio::select! { + _ = tokio::time::sleep_until(next_reinstate.into()) => { + // Reinstate backoffs and rerakables + if let Err(err) = task_context.store.async_rw_txn(|txn| { + txn.reinstate_backoffs(SystemTime::now())?; + let today = date_to_quickpeep_days(&Utc::today())?; + txn.reinstate_rerakables(today)?; + txn.commit()?; + Ok(()) + }).await { + error!("Error performing periodic reinstatements: {err:?}"); + } + + next_reinstate = Instant::now() + Duration::from_secs(1800); + }, + _ = task_context.notify.notified() => { + // nop: just wake from the loop + } + Ok(new_permit) = semaphore.clone().acquire_owned(), if domain_to_process.is_some() => { + let domain = domain_to_process.unwrap(); + let mut task_context = task_context.clone(); + tokio::spawn(async move { + if let Err(err) = task_context.process_domain(domain.clone()).await { + error!("Encountered error processing {:?}: {:?}", domain, err); + } + ensure!( + task_context.busy_domains + .lock() + .map_err(|_| anyhow!("busy domains set poisoned"))? + .remove(&domain), + "Our domain was not busy after processing!" + ); + + // Release the permit here, within the task. + drop(new_permit); + Ok(()) + }); + } + }; + } + + info!("Orchestrator shutting down gracefully..."); + + // Wind up: + let TaskContext { submission, .. } = task_context; + // Manually drop submission otherwise the senders don't hang up. + drop(submission); + + let num_active_tasks = max_permits - semaphore.available_permits(); + info!("Waiting for {num_active_tasks} rake tasks to close."); + + info!( + "Acquired all remaining permits: {:?}", + semaphore.acquire_many(num_active_tasks as u32).await + ); + + Ok(()) +} + fn start_signal_handler( mut signals: Signals, shutdown: Arc, diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index 3a327eb..f27cee8 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -5,11 +5,11 @@ use crate::raking::{ RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, }; use crate::storage::records::{AllowedDomainRecord, UrlVisitedRecord, WeedDomainRecord}; -use crate::storage::{RakerStore, RandomActiveDomainAcquisition}; -use anyhow::{anyhow, ensure, Context}; +use crate::storage::RakerStore; +use anyhow::{anyhow, Context}; use chrono::Utc; use cylon::Cylon; -use log::{error, warn}; +use log::warn; use lru::LruCache; use metrics::increment_counter; use quickpeep_structs::rake_entries::{ @@ -84,53 +84,6 @@ pub struct TaskContext { } impl TaskContext { - pub async fn run(mut self) -> anyhow::Result<()> { - // Get a domain to process - while !self.graceful_stop.load(Ordering::SeqCst) { - let domain = { - let txn = self.store.ro_txn()?; - txn.acquire_random_active_domain(self.busy_domains.clone())? - }; - - match domain { - RandomActiveDomainAcquisition::GotOne { - domain, - record: _active_record, - } => { - if let Err(err) = self.process_domain(domain.clone()).await { - error!("Encountered error processing {:?}: {:?}", domain, err); - } - ensure!( - self.busy_domains - .lock() - .map_err(|_| anyhow!("busy domains set poisoned"))? - .remove(&domain), - "Our domain was not busy after processing!" - ); - } - RandomActiveDomainAcquisition::AllBusy => { - // TODO(perf): notify waiters when new domains are available. - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(60)) => { - // nop - }, - _ = self.notify.notified() => { - // nop (we allow the notifier to wake us up in case we need to gracefully - // stop). - }, - }; - } - RandomActiveDomainAcquisition::NoneLeft => { - // Nothing left to do, and it's not temporary because there aren't even any - // busy domains left. - break; - } - } - } - - Ok(()) - } - pub async fn get_robot_rules(&self, url_of_site: &Url) -> anyhow::Result> { let robots = get_robots_txt_for(url_of_site, &self.redirect_following_client).await?; Ok(robots.map(|robots: RobotsTxt| robots.rules))