quickpeep/quickpeep_raker/src/bin/qp-raker.rs

380 lines
12 KiB
Rust

use clap::Parser;
use env_logger::Env;
use adblock::lists::RuleTypes;
use anyhow::{bail, Context};
use chrono::Utc;
use log::{debug, error, warn};
use lru::LruCache;
use metrics_exporter_prometheus::PrometheusBuilder;
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use reqwest::redirect::Policy;
use signal_hook::consts::{SIGINT, SIGTERM};
use signal_hook::iterator::Signals;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use tokio::fs::File;
use tokio::sync::{mpsc, oneshot, Notify, Semaphore};
use tokio::time::MissedTickBehavior;
use quickpeep_raker::config;
use quickpeep_raker::rakepack_emitter::pack_emitter;
use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet};
use quickpeep_raker::raking::page_extraction::PageExtractionService;
use quickpeep_raker::raking::rakemetrics::describe_raking_metrics;
use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission};
use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::storage::RakerStore;
use quickpeep_structs::rake_entries::{
AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES,
SCHEMA_RAKED_REJECTIONS,
};
use quickpeep_utils::dates::date_to_quickpeep_days;
/// The ordering is slightly important on these: more specific things should come first.
/// This means they filter out the troublesome elements before the broader filters do.
pub const ADBLOCK_FILTER_PATHS: [(AnalysisAntifeatures, &'static str); 5] = [
(AnalysisAntifeatures::COOKIE_NAG, "cookie_nag"),
(AnalysisAntifeatures::ANNOYANCE, "annoyance"),
(AnalysisAntifeatures::SOCIAL, "social"),
(AnalysisAntifeatures::PRIVACY, "privacy"),
(AnalysisAntifeatures::ADVERTS, "adverts"),
];
/// Seeds a raker's queue with URLs
#[derive(Clone, Debug, Parser)]
pub struct Opts {
#[clap(long = "config")]
config: Option<PathBuf>,
#[clap(long = "concurrency")]
/// How many concurrent requests to perform
concurrent_jobs: u32,
#[clap(long = "sleepers")]
/// Allow an additional <sleepers> number of sleeping tasks
/// (Waiting for a crawl delay before moving on to the next request)
concurrent_sleepers: u32,
}
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(
Env::default().default_filter_or("info,quickpeep_raker=debug,qp_raker=debug"),
)
.init();
debug!("Starting up...");
let opts: Opts = Opts::parse();
let config_path = opts
.config
.unwrap_or_else(|| PathBuf::from("quickpeep.ron"));
let config = config::RakerConfig::load(&config_path).context("Failed to load config")?;
if !config.raker.workbench_dir.exists() {
bail!(
"Workbench directory ({:?}) doesn't exist.",
config.raker.workbench_dir
);
}
let mut header_map = HeaderMap::new();
header_map.insert(USER_AGENT, HeaderValue::from_static(RAKER_USER_AGENT));
let client = reqwest::ClientBuilder::new()
.pool_idle_timeout(Duration::from_secs(90))
.pool_max_idle_per_host(1)
.timeout(TIME_LIMIT)
.default_headers(header_map.clone())
// We want to handle redirects ourselves so we can track them...
.redirect(Policy::none())
.build()?;
let redirect_following_client = reqwest::ClientBuilder::new()
.pool_idle_timeout(Duration::from_secs(90))
.pool_max_idle_per_host(1)
.timeout(TIME_LIMIT)
.default_headers(header_map)
// We want to handle redirects ourselves so we can track them...
.redirect(Policy::limited(5))
.build()?;
let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?;
let mut adblock_engines = Vec::new();
for (antifeature, name) in &ADBLOCK_FILTER_PATHS {
let path = PathBuf::from(config.raker.data_dir.join(format!("{}.adblock", name)));
if !path.exists() {
warn!("Missing adblock rules: {:?}.", path);
continue;
}
let file = File::open(&path).await?;
adblock_engines.push((
*antifeature,
preload_adblock_engine(file, RuleTypes::All).await?,
));
}
let mut antifeature_ip_set = IpSet::new();
let ips_file = File::open(config.raker.data_dir.join("cf_ips.txt"))
.await
.context("Failed to open CF IPs file")?;
antifeature_ip_set.add_all_from_file(ips_file).await?;
let raker = Raker {
antifeature_ip_set,
page_extraction: PageExtractionService::new(adblock_engines)?,
};
if let Some(addr) = config.raker.metrics.prometheus {
PrometheusBuilder::new()
.with_http_listener(addr)
.install()?;
} else if config.raker.metrics.bare_metrics {
warn!("BARE Metrics not supported yet, sorry.");
}
let metrics_enabled =
config.raker.metrics.prometheus.is_some() || config.raker.metrics.bare_metrics;
if metrics_enabled {
metrics_process_promstyle::describe();
describe_raking_metrics();
}
let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers;
let semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize));
let (pages_tx, pages_rx) = mpsc::channel(32);
let (refs_tx, refs_rx) = mpsc::channel(32);
let (rejections_tx, rejections_rx) = mpsc::channel(32);
let (icons_tx, icons_rx) = mpsc::channel(32);
let graceful_stop = Arc::new(AtomicBool::new(false));
let graceful_stop_notify = Arc::new(Notify::new());
let mut emitters = Vec::with_capacity(3);
{
let emit_dir = config.raker.emit_dir.clone();
let settings = config.raker.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push(
std::thread::Builder::new()
.name("pages emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(
&emit_dir,
"pages",
SCHEMA_RAKED_PAGES,
pages_rx,
&settings,
stop,
notify,
)?;
Ok(())
})?,
);
}
{
let emit_dir = config.raker.emit_dir.clone();
let settings = config.raker.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push(
std::thread::Builder::new()
.name("refs emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(
&emit_dir,
"refs",
SCHEMA_RAKED_REFERENCES,
refs_rx,
&settings,
stop,
notify,
)?;
Ok(())
})?,
);
}
{
let emit_dir = config.raker.emit_dir.clone();
let settings = config.raker.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push(
std::thread::Builder::new()
.name("rejections emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(
&emit_dir,
"rejections",
SCHEMA_RAKED_REJECTIONS,
rejections_rx,
&settings,
stop,
notify,
)?;
Ok(())
})?,
);
}
{
let emit_dir = config.raker.emit_dir.clone();
let settings = config.raker.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push(
std::thread::Builder::new()
.name("icons emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(
&emit_dir,
"icons",
SCHEMA_RAKED_ICONS,
icons_rx,
&settings,
stop,
notify,
)?;
Ok(())
})?,
);
}
let submission = TaskResultSubmission {
pages: pages_tx,
references: refs_tx,
rejections: rejections_tx,
icons: icons_tx,
};
let task_context = TaskContext {
store: store.clone(),
client,
redirect_following_client,
raker: Arc::new(raker),
busy_domains: Arc::new(Mutex::new(Default::default())),
robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))),
semaphore,
submission,
graceful_stop,
notify: graceful_stop_notify,
rerake_timings: Arc::new(config.raker.rerake_timings.clone()),
};
// Reinstate old backoffs
store
.async_rw_txn(|txn| {
txn.reinstate_backoffs(SystemTime::now())?;
txn.commit()?;
Ok(())
})
.await?;
// Reinstate re-rakable URLs
let today = date_to_quickpeep_days(&Utc::today())?;
store
.async_rw_txn(move |txn| {
txn.reinstate_rerakables(today)?;
txn.commit()?;
Ok(())
})
.await?;
let mut tasks = Vec::with_capacity(num_tasks as usize);
for task_num in 0..num_tasks {
let task_context = task_context.clone();
tasks.push(tokio::spawn(async move {
if let Err(err) = task_context.run().await {
error!("Raker task {:?} encountered an error: {:?}", task_num, err);
}
}));
}
let (dsmu_cancel_tx, mut dsmu_cancel_rx) = oneshot::channel();
let datastore_metrics_updater = {
let store = task_context.store.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
let txn = store.ro_txn()?;
txn.emit_datastore_metrics()?;
metrics_process_promstyle::emit_now()?;
}
_ = &mut dsmu_cancel_rx => {
break;
}
}
}
let r: anyhow::Result<()> = Ok(());
r
})
};
let TaskContext {
graceful_stop,
notify,
submission,
..
} = task_context;
// Manually drop submission otherwise the senders don't hang up.
drop(submission);
// ^C is SIGINT; systemd sends SIGTERM
start_signal_handler(Signals::new([SIGINT, SIGTERM])?, graceful_stop, notify)?;
for task in tasks {
task.await?;
}
for task in emitters {
let result = task.join().expect("Can't join thread");
result?;
}
let _ = dsmu_cancel_tx.send(());
datastore_metrics_updater.await??;
Ok(())
}
fn start_signal_handler(
mut signals: Signals,
shutdown: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
) -> anyhow::Result<()> {
std::thread::Builder::new()
.name("signals".to_string())
.stack_size(512)
.spawn(move || {
for sig in signals.forever() {
eprintln!("Received signal {:?}, shutting down.", sig);
shutdown.store(true, Ordering::SeqCst);
shutdown_notify.notify_waiters();
break;
}
})?;
Ok(())
}