diff --git a/Cargo.lock b/Cargo.lock index 8f0f4b8..5ea8e37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2733,6 +2733,8 @@ dependencies = [ "anyhow", "chrono", "lazy_static", + "log", + "tokio", ] [[package]] diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index 4743d4e..f059378 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -6,12 +6,15 @@ use anyhow::{bail, Context}; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use reqwest::redirect::Policy; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; -use tokio::sync::Semaphore; +use log::error; +use lru::LruCache; +use tokio::sync::{mpsc, Semaphore}; use quickpeep_raker::config; -use quickpeep_raker::raking::{RAKER_USER_AGENT, TIME_LIMIT}; +use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; +use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission}; use quickpeep_raker::storage::RakerStore; /// Seeds a raker's queue with URLs @@ -60,10 +63,49 @@ pub async fn main() -> anyhow::Result<()> { .redirect(Policy::none()) .build()?; - let _store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; + let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; - let _num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; - let _semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize)); + let raker = Raker { + adblock_engines: vec![], + antifeature_ip_set: todo!() + }; + + let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; + let semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize)); + + let (pages_tx, pages_rx) = mpsc::channel(32); + let (refs_tx, refs_rx) = mpsc::channel(32); + + let submission = TaskResultSubmission { + pages: pages_tx, + references: refs_tx + }; + + let task_context = TaskContext { + store: store.clone(), + client: Default::default(), + raker: Arc::new(raker), + busy_domains: Arc::new(Mutex::new(Default::default())), + robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))), + semaphore, + submission + }; + + let mut tasks = Vec::with_capacity(num_tasks as usize); + + for task_num in 0..num_tasks { + let task_context = task_context.clone(); + + tasks.push(tokio::spawn(async move { + if let Err(err) = task_context.run().await { + error!("Raker task {:?} encountered an error: {:?}", task_num, err); + } + })); + + // TODO spawn task + } + + drop(task_context); eprintln!("{:#?}", config); diff --git a/quickpeep_raker/src/raking.rs b/quickpeep_raker/src/raking.rs index b08648f..d94773f 100644 --- a/quickpeep_raker/src/raking.rs +++ b/quickpeep_raker/src/raking.rs @@ -21,7 +21,9 @@ use serde::{Deserialize, Serialize}; use sitemap::reader::SiteMapEntity; use std::collections::HashSet; use std::time::Duration; +use tokio::sync::Mutex; use tokio::time::Instant; +use quickpeep_utils::threadguard::ThreadGuard; pub mod analysis; pub mod references; @@ -166,7 +168,7 @@ async fn response_to_bytes_limited( } pub struct Raker { - pub adblock_engines: Vec<(AnalysisAntifeatures, Engine)>, + pub adblock_engines: Vec<(AnalysisAntifeatures, ThreadGuard)>, pub antifeature_ip_set: IpSet, } @@ -247,7 +249,7 @@ impl Raker { { // We don't try any fallbacks for an HTML page return Ok(self - .rake_html_page(&content, url, is_cf, &headers) + .rake_html_page(&content, url, is_cf, &headers).await .context("Raking HTML page")?); } @@ -282,7 +284,7 @@ impl Raker { })); } - pub fn rake_html_page( + pub async fn rake_html_page( &self, content: &[u8], url: &Url, @@ -343,20 +345,23 @@ impl Raker { } for (engine_antifeature_flag, adblock_engine) in &self.adblock_engines { - match analyse_with_ad_block_cosmetic_filter( - &root_node, - adblock_engine, - url.as_str(), - true, - ) { - Ok(cosmetic_filters_tripped) => { - eprintln!("?cosmetic filters tripped: {}", cosmetic_filters_tripped); - antifeature_flags |= *engine_antifeature_flag; - } - Err(err) => { - eprintln!("Cosmetic Filter Err {:?}", err); - } - }; + adblock_engine.apply(Box::new(|adblock_engine| { + match analyse_with_ad_block_cosmetic_filter( + &root_node, + &adblock_engine, + url.as_str(), + true, + ) { + Ok(cosmetic_filters_tripped) => { + eprintln!("?cosmetic filters tripped: {}", cosmetic_filters_tripped); + antifeature_flags |= *engine_antifeature_flag; + } + Err(err) => { + eprintln!("Cosmetic Filter Err {:?}", err); + } + }; + Ok(()) + })); } let dense_doc = DenseTree::from_body(root_node.clone()); diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index f81e45f..25b3a2f 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -26,31 +26,31 @@ enum NextAction { #[derive(Clone)] pub struct TaskResultSubmission { - pages: Sender<(Url, RakedPageEntry)>, - references: Sender<(Url, RakedReferrerEntry)>, + pub pages: Sender<(Url, RakedPageEntry)>, + pub references: Sender<(Url, RakedReferrerEntry)>, } #[derive(Clone)] pub struct TaskContext { /// The backing database store - store: RakerStore, + pub store: RakerStore, /// HTTP client - client: Client, + pub client: Client, /// The raker - raker: Arc, + pub raker: Arc, /// Busy domains (that are being processed by other tasks) - busy_domains: Arc>>, + pub busy_domains: Arc>>, /// Cache of robots.txt entries for recently-made dormant sites - robotstxt_cache: Arc>>>, + pub robotstxt_cache: Arc>>>, /// Semaphore that gives permits to make HTTP requests - semaphore: Arc, + pub semaphore: Arc, - submission: TaskResultSubmission, + pub submission: TaskResultSubmission, } impl TaskContext { diff --git a/quickpeep_utils/Cargo.toml b/quickpeep_utils/Cargo.toml index 197992c..e3f01e4 100644 --- a/quickpeep_utils/Cargo.toml +++ b/quickpeep_utils/Cargo.toml @@ -9,4 +9,5 @@ edition = "2021" chrono = "0.4.19" lazy_static = "1.4.0" anyhow = "1.0.56" - +tokio = { version = "1.17.0", features = ["sync"] } +log = "0.4.14" \ No newline at end of file diff --git a/quickpeep_utils/src/lib.rs b/quickpeep_utils/src/lib.rs index 6bcb8ed..39634ce 100644 --- a/quickpeep_utils/src/lib.rs +++ b/quickpeep_utils/src/lib.rs @@ -1,2 +1,3 @@ pub mod dates; pub mod lazy; +pub mod threadguard; \ No newline at end of file diff --git a/quickpeep_utils/src/threadguard.rs b/quickpeep_utils/src/threadguard.rs new file mode 100644 index 0000000..fd29ed1 --- /dev/null +++ b/quickpeep_utils/src/threadguard.rs @@ -0,0 +1,56 @@ +use std::sync::mpsc; +use std::sync::mpsc::Sender; +use std::thread::JoinHandle; +use anyhow::anyhow; +use tokio::sync::oneshot::Sender as OneshotSender; +use log::warn; + +pub type ThreadGuardJob = Box anyhow::Result + Send + Sync>; + +/// Helper to use structs that aren't Sync or Send. +/// Spawns a thread upon which operations can be sent to. +pub struct ThreadGuard { + jobs: Sender<(ThreadGuardJob, OneshotSender>)>, + thread: Option>> +} + +impl Drop for ThreadGuard { + fn drop(&mut self) { + if let Some(thread) = self.thread.take() { + if let Err(err) = thread.join() + .unwrap_or_else(|e| Err(anyhow!("Can't join: {:?}", e))) { + warn!("Failed to join threadguard, or it returned an error."); + } + } + + } +} + +impl ThreadGuard { + pub fn new(initialiser: Box T + Send>) -> ThreadGuard { + let (jobs_tx, jobs_rx) = mpsc::channel::<(ThreadGuardJob, OneshotSender>)>(); + + let thread = Some(std::thread::spawn(move || -> anyhow::Result<()> { + let object = initialiser(); + + while let Ok((job, job_return_tx)) = jobs_rx.recv() { + let result = job(&object); + job_return_tx.send(result); + } + + Ok(()) + })); + + ThreadGuard { + jobs: jobs_tx, + thread + } + } + + pub async fn apply(&self, function: ThreadGuardJob) -> anyhow::Result { + let (return_tx, return_rx) = tokio::sync::oneshot::channel(); + self.jobs.send((function, return_tx))?; + + Ok(return_rx.await??) + } +} \ No newline at end of file