Improve the raker to perform a reinstate periodically and to respawn workers
ci/woodpecker/push/check Pipeline failed Details
ci/woodpecker/push/manual Pipeline failed Details
ci/woodpecker/push/release Pipeline was successful Details

rei/rakerstore_postgres_overhaul
Olivier 'reivilibre' 2023-03-28 21:09:24 +01:00
parent 626b448245
commit 83fecf1464
2 changed files with 117 additions and 89 deletions

View File

@ -3,9 +3,9 @@ use clap::Parser;
use env_logger::Env;
use adblock::lists::RuleTypes;
use anyhow::{bail, Context};
use anyhow::{anyhow, bail, ensure, Context};
use chrono::Utc;
use log::{debug, error, warn};
use log::{debug, error, info, warn};
use lru::LruCache;
use metrics_exporter_prometheus::PrometheusBuilder;
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
@ -15,7 +15,7 @@ use signal_hook::iterator::Signals;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant, SystemTime};
use tokio::fs::File;
use tokio::sync::{mpsc, oneshot, Notify, Semaphore};
use tokio::time::MissedTickBehavior;
@ -27,7 +27,7 @@ use quickpeep_raker::raking::page_extraction::PageExtractionService;
use quickpeep_raker::raking::rakemetrics::describe_raking_metrics;
use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission};
use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::storage::RakerStore;
use quickpeep_raker::storage::{RakerStore, RandomActiveDomainAcquisition};
use quickpeep_structs::rake_entries::{
AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES,
SCHEMA_RAKED_REJECTIONS,
@ -150,8 +150,7 @@ pub async fn main() -> anyhow::Result<()> {
describe_raking_metrics();
}
let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers;
let semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize));
let active_fetch_semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize));
let (pages_tx, pages_rx) = mpsc::channel(32);
let (refs_tx, refs_rx) = mpsc::channel(32);
@ -269,44 +268,24 @@ pub async fn main() -> anyhow::Result<()> {
raker: Arc::new(raker),
busy_domains: Arc::new(Mutex::new(Default::default())),
robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))),
semaphore,
semaphore: active_fetch_semaphore,
submission,
graceful_stop,
notify: graceful_stop_notify,
rerake_timings: Arc::new(config.raker.rerake_timings.clone()),
};
// Reinstate old backoffs
// Reinstate old backoffs and re-rakable URLs
store
.async_rw_txn(|txn| {
let today = date_to_quickpeep_days(&Utc::today())?;
txn.reinstate_backoffs(SystemTime::now())?;
txn.commit()?;
Ok(())
})
.await?;
// Reinstate re-rakable URLs
let today = date_to_quickpeep_days(&Utc::today())?;
store
.async_rw_txn(move |txn| {
txn.reinstate_rerakables(today)?;
txn.commit()?;
Ok(())
})
.await?;
let mut tasks = Vec::with_capacity(num_tasks as usize);
for task_num in 0..num_tasks {
let task_context = task_context.clone();
tasks.push(tokio::spawn(async move {
if let Err(err) = task_context.run().await {
error!("Raker task {:?} encountered an error: {:?}", task_num, err);
}
}));
}
let (dsmu_cancel_tx, mut dsmu_cancel_rx) = oneshot::channel();
let datastore_metrics_updater = {
let store = task_context.store.clone();
@ -331,21 +310,22 @@ pub async fn main() -> anyhow::Result<()> {
})
};
let TaskContext {
graceful_stop,
notify,
submission,
..
} = task_context;
let graceful_stop = task_context.graceful_stop.clone();
let notify = task_context.notify.clone();
// Manually drop submission otherwise the senders don't hang up.
drop(submission);
let worker_semaphore =
Semaphore::new((opts.concurrent_jobs + opts.concurrent_sleepers) as usize);
let orchestrator_handle = tokio::spawn(async move {
if let Err(err) = orchestrator(task_context, Arc::new(worker_semaphore)).await {
error!("Error in orchestrator: {err:?}");
}
});
// ^C is SIGINT; systemd sends SIGTERM
start_signal_handler(Signals::new([SIGINT, SIGTERM])?, graceful_stop, notify)?;
for task in tasks {
task.await?;
if let Err(panic_err) = orchestrator_handle.await {
error!("orchestrator panic: {panic_err:?}");
}
for task in emitters {
@ -359,6 +339,101 @@ pub async fn main() -> anyhow::Result<()> {
Ok(())
}
async fn acquire_active_domain(task_context: &TaskContext) -> anyhow::Result<Option<String>> {
// Acquire a domain for the task to run against
let domain = {
let txn = task_context.store.ro_txn()?;
// TODO: don't clone teh Arc here — conv to ref.
txn.acquire_random_active_domain(task_context.busy_domains.clone())?
};
match domain {
RandomActiveDomainAcquisition::GotOne { domain, record: _ } => Ok(Some(domain)),
RandomActiveDomainAcquisition::AllBusy => Ok(None),
RandomActiveDomainAcquisition::NoneLeft => Ok(None),
}
}
/// Spawns tasks to do the work as necessary.
/// Performs back-off and re-rake reinstatements periodically and spawns up new workers if needed.
async fn orchestrator(task_context: TaskContext, semaphore: Arc<Semaphore>) -> anyhow::Result<()> {
let mut next_reinstate = Instant::now() + Duration::from_secs(1800);
let max_permits = semaphore.available_permits();
while !task_context.graceful_stop.load(Ordering::Relaxed) {
// Spawn up new tasks if there are available worker permits.
let domain_to_process = acquire_active_domain(&task_context)
.await
.context("failed trying to acquire active domain")?;
if domain_to_process.is_none() && semaphore.available_permits() == max_permits {
// There's nothing to do and nothing is being processed.
ensure!(
task_context.busy_domains.lock().unwrap().is_empty(),
"Shutting down orchestrator but set of busy domains is not empty."
);
}
tokio::select! {
_ = tokio::time::sleep_until(next_reinstate.into()) => {
// Reinstate backoffs and rerakables
if let Err(err) = task_context.store.async_rw_txn(|txn| {
txn.reinstate_backoffs(SystemTime::now())?;
let today = date_to_quickpeep_days(&Utc::today())?;
txn.reinstate_rerakables(today)?;
txn.commit()?;
Ok(())
}).await {
error!("Error performing periodic reinstatements: {err:?}");
}
next_reinstate = Instant::now() + Duration::from_secs(1800);
},
_ = task_context.notify.notified() => {
// nop: just wake from the loop
}
Ok(new_permit) = semaphore.clone().acquire_owned(), if domain_to_process.is_some() => {
let domain = domain_to_process.unwrap();
let mut task_context = task_context.clone();
tokio::spawn(async move {
if let Err(err) = task_context.process_domain(domain.clone()).await {
error!("Encountered error processing {:?}: {:?}", domain, err);
}
ensure!(
task_context.busy_domains
.lock()
.map_err(|_| anyhow!("busy domains set poisoned"))?
.remove(&domain),
"Our domain was not busy after processing!"
);
// Release the permit here, within the task.
drop(new_permit);
Ok(())
});
}
};
}
info!("Orchestrator shutting down gracefully...");
// Wind up:
let TaskContext { submission, .. } = task_context;
// Manually drop submission otherwise the senders don't hang up.
drop(submission);
let num_active_tasks = max_permits - semaphore.available_permits();
info!("Waiting for {num_active_tasks} rake tasks to close.");
info!(
"Acquired all remaining permits: {:?}",
semaphore.acquire_many(num_active_tasks as u32).await
);
Ok(())
}
fn start_signal_handler(
mut signals: Signals,
shutdown: Arc<AtomicBool>,

View File

@ -5,11 +5,11 @@ use crate::raking::{
RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
};
use crate::storage::records::{AllowedDomainRecord, UrlVisitedRecord, WeedDomainRecord};
use crate::storage::{RakerStore, RandomActiveDomainAcquisition};
use anyhow::{anyhow, ensure, Context};
use crate::storage::RakerStore;
use anyhow::{anyhow, Context};
use chrono::Utc;
use cylon::Cylon;
use log::{error, warn};
use log::warn;
use lru::LruCache;
use metrics::increment_counter;
use quickpeep_structs::rake_entries::{
@ -84,53 +84,6 @@ pub struct TaskContext {
}
impl TaskContext {
pub async fn run(mut self) -> anyhow::Result<()> {
// Get a domain to process
while !self.graceful_stop.load(Ordering::SeqCst) {
let domain = {
let txn = self.store.ro_txn()?;
txn.acquire_random_active_domain(self.busy_domains.clone())?
};
match domain {
RandomActiveDomainAcquisition::GotOne {
domain,
record: _active_record,
} => {
if let Err(err) = self.process_domain(domain.clone()).await {
error!("Encountered error processing {:?}: {:?}", domain, err);
}
ensure!(
self.busy_domains
.lock()
.map_err(|_| anyhow!("busy domains set poisoned"))?
.remove(&domain),
"Our domain was not busy after processing!"
);
}
RandomActiveDomainAcquisition::AllBusy => {
// TODO(perf): notify waiters when new domains are available.
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {
// nop
},
_ = self.notify.notified() => {
// nop (we allow the notifier to wake us up in case we need to gracefully
// stop).
},
};
}
RandomActiveDomainAcquisition::NoneLeft => {
// Nothing left to do, and it's not temporary because there aren't even any
// busy domains left.
break;
}
}
}
Ok(())
}
pub async fn get_robot_rules(&self, url_of_site: &Url) -> anyhow::Result<Option<Cylon>> {
let robots = get_robots_txt_for(url_of_site, &self.redirect_following_client).await?;
Ok(robots.map(|robots: RobotsTxt| robots.rules))