STASH
continuous-integration/drone the build failed Details

This commit is contained in:
Olivier 'reivilibre' 2022-03-19 15:39:59 +00:00
parent ab0b1e84ee
commit 5bab279cc2
7 changed files with 140 additions and 33 deletions

2
Cargo.lock generated
View File

@ -2733,6 +2733,8 @@ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
"lazy_static", "lazy_static",
"log",
"tokio",
] ]
[[package]] [[package]]

View File

@ -6,12 +6,15 @@ use anyhow::{bail, Context};
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use reqwest::redirect::Policy; use reqwest::redirect::Policy;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
use tokio::sync::Semaphore; use log::error;
use lru::LruCache;
use tokio::sync::{mpsc, Semaphore};
use quickpeep_raker::config; use quickpeep_raker::config;
use quickpeep_raker::raking::{RAKER_USER_AGENT, TIME_LIMIT}; use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission};
use quickpeep_raker::storage::RakerStore; use quickpeep_raker::storage::RakerStore;
/// Seeds a raker's queue with URLs /// Seeds a raker's queue with URLs
@ -60,10 +63,49 @@ pub async fn main() -> anyhow::Result<()> {
.redirect(Policy::none()) .redirect(Policy::none())
.build()?; .build()?;
let _store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?;
let _num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers; let raker = Raker {
let _semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize)); adblock_engines: vec![],
antifeature_ip_set: todo!()
};
let num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers;
let semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize));
let (pages_tx, pages_rx) = mpsc::channel(32);
let (refs_tx, refs_rx) = mpsc::channel(32);
let submission = TaskResultSubmission {
pages: pages_tx,
references: refs_tx
};
let task_context = TaskContext {
store: store.clone(),
client: Default::default(),
raker: Arc::new(raker),
busy_domains: Arc::new(Mutex::new(Default::default())),
robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))),
semaphore,
submission
};
let mut tasks = Vec::with_capacity(num_tasks as usize);
for task_num in 0..num_tasks {
let task_context = task_context.clone();
tasks.push(tokio::spawn(async move {
if let Err(err) = task_context.run().await {
error!("Raker task {:?} encountered an error: {:?}", task_num, err);
}
}));
// TODO spawn task
}
drop(task_context);
eprintln!("{:#?}", config); eprintln!("{:#?}", config);

View File

@ -21,7 +21,9 @@ use serde::{Deserialize, Serialize};
use sitemap::reader::SiteMapEntity; use sitemap::reader::SiteMapEntity;
use std::collections::HashSet; use std::collections::HashSet;
use std::time::Duration; use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant; use tokio::time::Instant;
use quickpeep_utils::threadguard::ThreadGuard;
pub mod analysis; pub mod analysis;
pub mod references; pub mod references;
@ -166,7 +168,7 @@ async fn response_to_bytes_limited(
} }
pub struct Raker { pub struct Raker {
pub adblock_engines: Vec<(AnalysisAntifeatures, Engine)>, pub adblock_engines: Vec<(AnalysisAntifeatures, ThreadGuard<Engine, ()>)>,
pub antifeature_ip_set: IpSet, pub antifeature_ip_set: IpSet,
} }
@ -247,7 +249,7 @@ impl Raker {
{ {
// We don't try any fallbacks for an HTML page // We don't try any fallbacks for an HTML page
return Ok(self return Ok(self
.rake_html_page(&content, url, is_cf, &headers) .rake_html_page(&content, url, is_cf, &headers).await
.context("Raking HTML page")?); .context("Raking HTML page")?);
} }
@ -282,7 +284,7 @@ impl Raker {
})); }));
} }
pub fn rake_html_page( pub async fn rake_html_page(
&self, &self,
content: &[u8], content: &[u8],
url: &Url, url: &Url,
@ -343,9 +345,10 @@ impl Raker {
} }
for (engine_antifeature_flag, adblock_engine) in &self.adblock_engines { for (engine_antifeature_flag, adblock_engine) in &self.adblock_engines {
adblock_engine.apply(Box::new(|adblock_engine| {
match analyse_with_ad_block_cosmetic_filter( match analyse_with_ad_block_cosmetic_filter(
&root_node, &root_node,
adblock_engine, &adblock_engine,
url.as_str(), url.as_str(),
true, true,
) { ) {
@ -357,6 +360,8 @@ impl Raker {
eprintln!("Cosmetic Filter Err {:?}", err); eprintln!("Cosmetic Filter Err {:?}", err);
} }
}; };
Ok(())
}));
} }
let dense_doc = DenseTree::from_body(root_node.clone()); let dense_doc = DenseTree::from_body(root_node.clone());

View File

@ -26,31 +26,31 @@ enum NextAction {
#[derive(Clone)] #[derive(Clone)]
pub struct TaskResultSubmission { pub struct TaskResultSubmission {
pages: Sender<(Url, RakedPageEntry)>, pub pages: Sender<(Url, RakedPageEntry)>,
references: Sender<(Url, RakedReferrerEntry)>, pub references: Sender<(Url, RakedReferrerEntry)>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct TaskContext { pub struct TaskContext {
/// The backing database store /// The backing database store
store: RakerStore, pub store: RakerStore,
/// HTTP client /// HTTP client
client: Client, pub client: Client,
/// The raker /// The raker
raker: Arc<Raker>, pub raker: Arc<Raker>,
/// Busy domains (that are being processed by other tasks) /// Busy domains (that are being processed by other tasks)
busy_domains: Arc<StdMutex<HashSet<String>>>, pub busy_domains: Arc<StdMutex<HashSet<String>>>,
/// Cache of robots.txt entries for recently-made dormant sites /// Cache of robots.txt entries for recently-made dormant sites
robotstxt_cache: Arc<RwLock<LruCache<String, Option<Cylon>>>>, pub robotstxt_cache: Arc<RwLock<LruCache<String, Option<Cylon>>>>,
/// Semaphore that gives permits to make HTTP requests /// Semaphore that gives permits to make HTTP requests
semaphore: Arc<Semaphore>, pub semaphore: Arc<Semaphore>,
submission: TaskResultSubmission, pub submission: TaskResultSubmission,
} }
impl TaskContext { impl TaskContext {

View File

@ -9,4 +9,5 @@ edition = "2021"
chrono = "0.4.19" chrono = "0.4.19"
lazy_static = "1.4.0" lazy_static = "1.4.0"
anyhow = "1.0.56" anyhow = "1.0.56"
tokio = { version = "1.17.0", features = ["sync"] }
log = "0.4.14"

View File

@ -1,2 +1,3 @@
pub mod dates; pub mod dates;
pub mod lazy; pub mod lazy;
pub mod threadguard;

View File

@ -0,0 +1,56 @@
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use std::thread::JoinHandle;
use anyhow::anyhow;
use tokio::sync::oneshot::Sender as OneshotSender;
use log::warn;
pub type ThreadGuardJob<T, R> = Box<dyn FnOnce(&T) -> anyhow::Result<R> + Send + Sync>;
/// Helper to use structs that aren't Sync or Send.
/// Spawns a thread upon which operations can be sent to.
pub struct ThreadGuard<T, R> {
jobs: Sender<(ThreadGuardJob<T, R>, OneshotSender<anyhow::Result<R>>)>,
thread: Option<JoinHandle<anyhow::Result<()>>>
}
impl<T, R> Drop for ThreadGuard<T, R> {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
if let Err(err) = thread.join()
.unwrap_or_else(|e| Err(anyhow!("Can't join: {:?}", e))) {
warn!("Failed to join threadguard, or it returned an error.");
}
}
}
}
impl<T: 'static, R: Send + 'static> ThreadGuard<T, R> {
pub fn new(initialiser: Box<dyn FnOnce() -> T + Send>) -> ThreadGuard<T, R> {
let (jobs_tx, jobs_rx) = mpsc::channel::<(ThreadGuardJob<T, R>, OneshotSender<anyhow::Result<R>>)>();
let thread = Some(std::thread::spawn(move || -> anyhow::Result<()> {
let object = initialiser();
while let Ok((job, job_return_tx)) = jobs_rx.recv() {
let result = job(&object);
job_return_tx.send(result);
}
Ok(())
}));
ThreadGuard {
jobs: jobs_tx,
thread
}
}
pub async fn apply(&self, function: ThreadGuardJob<T, R>) -> anyhow::Result<R> {
let (return_tx, return_rx) = tokio::sync::oneshot::channel();
self.jobs.send((function, return_tx))?;
Ok(return_rx.await??)
}
}