STASH work on Raking
continuous-integration/drone the build failed Details

This commit is contained in:
Olivier 'reivilibre' 2022-03-19 21:04:12 +00:00
parent cd9e4215ee
commit ab0b1e84ee
13 changed files with 388 additions and 59 deletions

28
Cargo.lock generated
View File

@ -2019,6 +2019,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "lru"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb87f3080f6d1d69e8c564c0fcfde1d7aa8cc451ce40cae89479111f03bc0eb"
dependencies = [
"hashbrown",
]
[[package]] [[package]]
name = "mac" name = "mac"
version = "0.1.1" version = "0.1.1"
@ -2599,6 +2608,18 @@ version = "2.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8eda7c62d9ecaafdf8b62374c006de0adf61666ae96a96ba74a37134aa4e470" checksum = "e8eda7c62d9ecaafdf8b62374c006de0adf61666ae96a96ba74a37134aa4e470"
[[package]]
name = "publicsuffix"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "292972edad6bbecc137ab84c5e36421a4a6c979ea31d3cc73540dd04315b33e1"
dependencies = [
"byteorder",
"hashbrown",
"idna",
"psl-types",
]
[[package]] [[package]]
name = "quick-xml" name = "quick-xml"
version = "0.22.0" version = "0.22.0"
@ -2678,8 +2699,10 @@ dependencies = [
"libmdbx", "libmdbx",
"lingua", "lingua",
"log", "log",
"lru",
"mdbx-sys", "mdbx-sys",
"ouroboros", "ouroboros",
"publicsuffix",
"quickpeep_densedoc", "quickpeep_densedoc",
"quickpeep_moz_readability", "quickpeep_moz_readability",
"quickpeep_structs", "quickpeep_structs",
@ -2706,6 +2729,11 @@ dependencies = [
[[package]] [[package]]
name = "quickpeep_utils" name = "quickpeep_utils"
version = "0.1.0" version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"lazy_static",
]
[[package]] [[package]]
name = "quote" name = "quote"

View File

@ -44,6 +44,7 @@ log = "0.4.14"
env_logger = "0.9.0" env_logger = "0.9.0"
ouroboros = "0.14.2" ouroboros = "0.14.2"
rand = "0.8.5" rand = "0.8.5"
lru = "0.7.3"
### Raking helpers ### Raking helpers
# HTTP Requests # HTTP Requests
@ -57,6 +58,8 @@ cylon = { version = "0.2.0", features = ["crawl-delay"] }
feed-rs = "1.0.0" feed-rs = "1.0.0"
# Sitemaps # Sitemaps
sitemap = "0.4.1" sitemap = "0.4.1"
# Public Suffix List handling
publicsuffix = "2.1.1"
### Filtering helpers ### Filtering helpers
# AdBlock # AdBlock

View File

@ -0,0 +1,71 @@
use clap::Parser;
use env_logger::Env;
use anyhow::{bail, Context};
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use reqwest::redirect::Policy;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use quickpeep_raker::config;
use quickpeep_raker::raking::{RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::storage::RakerStore;
/// Seeds a raker's queue with URLs
#[derive(Clone, Debug, Parser)]
pub struct Opts {
#[clap(long = "config")]
config: Option<PathBuf>,
#[clap(long = "concurrency")]
/// How many concurrent requests to perform
concurrent_jobs: u32,
#[clap(long = "sleepers")]
/// Allow an additional <sleepers> number of sleeping tasks
/// (Waiting for a crawl delay before moving on to the next request)
concurrent_sleepers: u32,
}
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info,quickpeep=debug")).init();
let opts: Opts = Opts::parse();
let config_path = opts
.config
.unwrap_or_else(|| PathBuf::from("qp_raker.toml"));
let config = config::RakerConfig::load(&config_path).context("Failed to load config")?;
if !config.workbench_dir.exists() {
bail!(
"Workbench directory ({:?}) doesn't exist.",
config.workbench_dir
);
}
let mut header_map = HeaderMap::new();
header_map.insert(USER_AGENT, HeaderValue::from_static(RAKER_USER_AGENT));
let _client = reqwest::ClientBuilder::new()
.pool_idle_timeout(Duration::from_secs(90))
.pool_max_idle_per_host(1)
.timeout(TIME_LIMIT)
.default_headers(header_map)
// We want to handle redirects ourselves so we can track them...
.redirect(Policy::none())
.build()?;
let _store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?;
let _num_tasks = opts.concurrent_jobs + opts.concurrent_sleepers;
let _semaphore = Arc::new(Semaphore::new(opts.concurrent_jobs as usize));
eprintln!("{:#?}", config);
Ok(())
}

View File

@ -39,6 +39,8 @@ pub async fn main() -> anyhow::Result<()> {
bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir); bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir);
} }
// TODO discover sitemaps at the same time as digging up robots.txt files
eprintln!("{:#?}", config); eprintln!("{:#?}", config);
Ok(()) Ok(())

View File

@ -14,7 +14,7 @@ use lazy_static::lazy_static;
use log::debug; use log::debug;
use quickpeep_densedoc::{DenseDocument, DenseHead, DenseTree}; use quickpeep_densedoc::{DenseDocument, DenseHead, DenseTree};
use quickpeep_structs::rake_entries::{AnalysisAntifeatures, RakedPageEntry, RakedReferrerEntry}; use quickpeep_structs::rake_entries::{AnalysisAntifeatures, RakedPageEntry, RakedReferrerEntry};
use quickpeep_utils::Lazy; use quickpeep_utils::lazy::Lazy;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use reqwest::{Client, Response, Url}; use reqwest::{Client, Response, Url};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -25,6 +25,7 @@ use tokio::time::Instant;
pub mod analysis; pub mod analysis;
pub mod references; pub mod references;
pub mod task;
/// 4 MiB ought to be enough for anybody. /// 4 MiB ought to be enough for anybody.
pub const SIZE_LIMIT: usize = 4 * 1024 * 1024; pub const SIZE_LIMIT: usize = 4 * 1024 * 1024;
@ -547,10 +548,13 @@ pub fn rake_sitemap(content: &[u8]) -> anyhow::Result<Vec<UrlRaked>> {
Ok(urls) Ok(urls)
} }
pub fn robots_txt_url_for(url: &Url) -> anyhow::Result<Url> {
url.join("/robots.txt")
.context("Whilst resolving /robots.txt on URL")
}
pub async fn get_robots_txt_for(url: &Url, client: &Client) -> anyhow::Result<Option<RobotsTxt>> { pub async fn get_robots_txt_for(url: &Url, client: &Client) -> anyhow::Result<Option<RobotsTxt>> {
let robots_url = url let robots_url = robots_txt_url_for(url)?;
.join("/robots.txt")
.context("Whilst resolving /robots.txt on URL")?;
let resp = client.get(robots_url.clone()).send().await?; let resp = client.get(robots_url.clone()).send().await?;
if !resp.status().is_success() { if !resp.status().is_success() {

View File

@ -4,6 +4,8 @@ use anyhow::Context;
use ipnetwork::IpNetwork; use ipnetwork::IpNetwork;
use kuchiki::NodeRef; use kuchiki::NodeRef;
use lingua::Language; use lingua::Language;
use reqwest::Url;
use std::borrow::Cow;
use std::collections::{BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::net::IpAddr; use std::net::IpAddr;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
@ -177,6 +179,15 @@ impl IpSet {
} }
} }
pub fn get_reduced_domain(url: &Url) -> anyhow::Result<Cow<'_, str>> {
let domain = url.domain().context("URLs must have domains")?;
Ok(Cow::Borrowed(match domain.strip_prefix("www.") {
Some(stripped) => stripped,
None => domain,
}))
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::raking::analysis::IpSet; use crate::raking::analysis::IpSet;

View File

@ -0,0 +1,124 @@
use crate::raking::{get_robots_txt_for, RakeOutcome, Raker, RobotsTxt, UrlRaked};
use crate::storage::RakerStore;
use cylon::Cylon;
use lru::LruCache;
use quickpeep_structs::rake_entries::{RakedPageEntry, RakedReferrerEntry};
use reqwest::{Client, Url};
use std::collections::HashSet;
use std::sync::{Arc, Mutex as StdMutex, RwLock};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore;
/// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the
/// queue and get turned into a backoff.
/// (This is necessary because some crawl delays can easily be hours or days.)
pub const MAX_CRAWL_DELAY_BEFORE_BACKOFF: Duration = Duration::from_secs(61);
/// Most sites request a crawl delay of 10 sec or less.
/// If unspecified, let's go with a reasonable-sounding number of 15 secs.
pub const DEFAULT_CRAWL_DELAY: Duration = Duration::from_secs(15);
enum NextAction {
Continue,
ChangeDomain,
}
#[derive(Clone)]
pub struct TaskResultSubmission {
pages: Sender<(Url, RakedPageEntry)>,
references: Sender<(Url, RakedReferrerEntry)>,
}
#[derive(Clone)]
pub struct TaskContext {
/// The backing database store
store: RakerStore,
/// HTTP client
client: Client,
/// The raker
raker: Arc<Raker>,
/// Busy domains (that are being processed by other tasks)
busy_domains: Arc<StdMutex<HashSet<String>>>,
/// Cache of robots.txt entries for recently-made dormant sites
robotstxt_cache: Arc<RwLock<LruCache<String, Option<Cylon>>>>,
/// Semaphore that gives permits to make HTTP requests
semaphore: Arc<Semaphore>,
submission: TaskResultSubmission,
}
impl TaskContext {
pub async fn run(self) -> anyhow::Result<()> {
// Get a domain to process
todo!();
Ok(())
}
pub async fn get_robot_rules(&self, url_of_site: &Url) -> anyhow::Result<Option<Cylon>> {
let robots = get_robots_txt_for(url_of_site, &self.client).await?;
Ok(robots.map(|robots: RobotsTxt| robots.rules))
}
pub async fn process_domain(&mut self, domain: String) -> anyhow::Result<()> {
let mut current_robot_rules_url: Option<Url> = None;
let mut current_robot_rules: Option<Cylon> = None;
loop {
// Get a URL to process
let url = {
let txn = self.store.ro_txn()?;
txn.choose_url_for_domain(&domain)?
};
let url = todo!();
let intent = todo!();
// Check our robot rules are valid for that URL
todo!();
let permit = self.semaphore.acquire().await?;
// TODO process errors
match self.raker.rake(url, intent, &self.client).await {
Ok(rake_outcome) => match self.process_outcome(&url, rake_outcome).await? {
NextAction::Continue => {
todo!()
}
NextAction::ChangeDomain => {
todo!()
}
},
Err(_) => {
todo!();
}
}
drop(permit);
}
Ok(())
}
/// Processes the outcome of
async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result<NextAction> {
match outcome {
RakeOutcome::RakedPage(page) => {}
RakeOutcome::RakedFeed(feed) => {}
RakeOutcome::RakedSitemap(sitemap) => {}
RakeOutcome::Redirect { reason, new_url } => {}
RakeOutcome::TemporaryFailure(failure) => {}
RakeOutcome::PermanentFailure(failure) => {}
}
todo!()
}
async fn process_feed_or_sitemap(&self, feed: &Vec<UrlRaked>) -> anyhow::Result<()> {
todo!()
}
}

View File

@ -1,10 +1,10 @@
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32}; use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32};
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
use crate::storage::records::ActiveDomainRecord; use crate::storage::records::{ActiveDomainRecord, QueueUrlRecord};
use anyhow::{bail, ensure}; use anyhow::{bail, ensure, Context};
use libmdbx::{ use libmdbx::{
Database, DatabaseFlags, Environment, Transaction, TransactionKind, WriteFlags, WriteMap, RO, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, TransactionKind,
RW, WriteFlags, WriteMap, RO, RW,
}; };
use log::info; use log::info;
use ouroboros::self_referencing; use ouroboros::self_referencing;
@ -18,7 +18,7 @@ mod records;
/// The databases available in an environment. /// The databases available in an environment.
pub struct Databases<'env> { pub struct Databases<'env> {
/// URL → QueueUrlRecord /// Domain \n URL → QueueUrlRecord
pub queue_urls: Database<'env>, pub queue_urls: Database<'env>,
/// Domain → ActiveDomainRecord /// Domain → ActiveDomainRecord
pub active_domains: Database<'env>, pub active_domains: Database<'env>,
@ -53,6 +53,8 @@ pub struct RakerDb {
pub dbs: Databases<'this>, pub dbs: Databases<'this>,
} }
/// Handle to the store.
/// Doesn't need wrapping in Arc because it already is.
#[derive(Clone)] #[derive(Clone)]
pub struct RakerStore { pub struct RakerStore {
pub mdbx: Arc<RakerDb>, pub mdbx: Arc<RakerDb>,
@ -60,7 +62,13 @@ pub struct RakerStore {
impl RakerStore { impl RakerStore {
pub fn open(path: &Path) -> anyhow::Result<RakerStore> { pub fn open(path: &Path) -> anyhow::Result<RakerStore> {
let env = Environment::new().set_max_dbs(256).open(path)?; let mut flags = EnvironmentFlags::default();
flags.no_sub_dir = true;
let env = Environment::new()
.set_max_dbs(256)
.set_flags(flags)
.open(path)?;
let mdbx: RakerDb = RakerDbBuilder { let mdbx: RakerDb = RakerDbBuilder {
env, env,
@ -243,7 +251,8 @@ impl<'a> RakerTxn<'a, RW> {
} }
} }
impl<'a> RakerTxn<'a, RO> { /// Read-only implementations (but can also be used on RW transactions)
impl<'a, K: TransactionKind> RakerTxn<'a, K> {
pub fn choose_random_active_domain( pub fn choose_random_active_domain(
&self, &self,
) -> anyhow::Result<Option<(String, ActiveDomainRecord)>> { ) -> anyhow::Result<Option<(String, ActiveDomainRecord)>> {
@ -288,4 +297,37 @@ impl<'a> RakerTxn<'a, RO> {
Ok(Some((domain.into_string(), record))) Ok(Some((domain.into_string(), record)))
} }
pub fn choose_url_for_domain(
&self,
domain: &str,
) -> anyhow::Result<Option<(String, QueueUrlRecord)>> {
let queue: &Database = &self.mdbx.borrow_dbs().queue_urls;
let mut cur = self.mdbx_txn.cursor(queue)?;
match cur
.iter_from::<MdbxString, MdbxBare<QueueUrlRecord>>(
MdbxString(Cow::Owned(format!("{}\n", domain))).as_bytes(),
)
.next()
{
Some(entry) => {
let (k, MdbxBare(record)) = entry?;
let domain_followed_by_url = k.0.as_ref();
let mut split = domain_followed_by_url.split("\n");
let actual_domain = split.next().context("No domain")?;
let url = split.next().context("No URL")?;
if domain != actual_domain {
// This means we've ran out of URLs for the domain in question.
return Ok(None);
}
ensure!(split.next().is_none(), "Should be no more splits.");
Ok(Some((url.to_owned(), record)))
}
None => Ok(None),
}
}
} }

View File

@ -5,3 +5,18 @@ pub struct ActiveDomainRecord {
/// The raffle ticket number owned by this domain. /// The raffle ticket number owned by this domain.
pub raffle_ticket: u32, pub raffle_ticket: u32,
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct VisitedDomainRecord {
/// Number of minutes since the QuickPeep Epoch that this page was last raked at.
/// We store minutes to give us 60× the range of times.
/// We'd really rather stick with 32-bit ints to reduce the space storage requirements.
/// We could *possibly* go for a u16 in the future and store number of days (179 years' range):
/// sitemaps and feeds usually only tell you the date the page was last updated.
pub last_visited_days: u16,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct QueueUrlRecord {
// TODO
}

View File

@ -6,3 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
chrono = "0.4.19"
lazy_static = "1.4.0"
anyhow = "1.0.56"

View File

@ -0,0 +1,23 @@
use anyhow::Context;
use chrono::{Date, Duration, TimeZone, Utc};
use lazy_static::lazy_static;
lazy_static! {
/// The QuickPeep Epoch is 2022-01-01, as this gives us 52 years of extra headroom compared to the
/// Unix one. QuickPeep didn't exist before 2022 so we needn't worry about negative dates!
pub static ref QUICKPEEP_EPOCH: Date<Utc> = Utc.ymd(2022, 1, 1);
}
pub fn date_from_quickpeep_days(days: u16) -> Date<Utc> {
let dt = QUICKPEEP_EPOCH.and_hms(0, 0, 0);
(dt + Duration::days(days as i64)).date()
}
pub fn date_to_quickpeep_days(date: &Date<Utc>) -> anyhow::Result<u16> {
let dt = date.and_hms(0, 0, 0);
let duration = dt - QUICKPEEP_EPOCH.and_hms(0, 0, 0);
duration
.num_days()
.try_into()
.context("Failed to convert date to QuickPeep datestamp")
}

View File

@ -0,0 +1,48 @@
use std::cell::RefCell;
use std::ops::Deref;
enum LazyInner<'a, T> {
Uncomputed(Option<Box<dyn FnOnce() -> T + 'a>>),
Computed(T),
}
pub struct Lazy<'a, T> {
inner: RefCell<LazyInner<'a, T>>,
}
impl<'a, T> Lazy<'a, T> {
pub fn new(func: Box<dyn FnOnce() -> T + 'a>) -> Lazy<T> {
Lazy {
inner: RefCell::new(LazyInner::Uncomputed(Some(func))),
}
}
}
impl<'a, T: 'a> Deref for Lazy<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe fn extend_lifetime<'a, 'b, A>(a: &'a A) -> &'b A {
std::mem::transmute(a)
}
let mut inner_mut = self.inner.borrow_mut();
if let LazyInner::Uncomputed(func) = &mut *inner_mut {
if let Some(func) = func.take() {
*inner_mut = LazyInner::Computed(func());
} else {
panic!("Unreachable: uncomputed but no function to compute with")
}
}
match &*inner_mut {
LazyInner::Computed(computed) => unsafe {
// Extending the lifetime *should* be safe because we don't ever overwrite
// a computed value...
extend_lifetime(computed)
},
LazyInner::Uncomputed(_) => {
panic!("Unreachable: Should have been computed");
}
}
}
}

View File

@ -1,48 +1,2 @@
use std::cell::RefCell; pub mod dates;
use std::ops::Deref; pub mod lazy;
enum LazyInner<'a, T> {
Uncomputed(Option<Box<dyn FnOnce() -> T + 'a>>),
Computed(T),
}
pub struct Lazy<'a, T> {
inner: RefCell<LazyInner<'a, T>>,
}
impl<'a, T> Lazy<'a, T> {
pub fn new(func: Box<dyn FnOnce() -> T + 'a>) -> Lazy<T> {
Lazy {
inner: RefCell::new(LazyInner::Uncomputed(Some(func))),
}
}
}
impl<'a, T: 'a> Deref for Lazy<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe fn extend_lifetime<'a, 'b, A>(a: &'a A) -> &'b A {
std::mem::transmute(a)
}
let mut inner_mut = self.inner.borrow_mut();
if let LazyInner::Uncomputed(func) = &mut *inner_mut {
if let Some(func) = func.take() {
*inner_mut = LazyInner::Computed(func());
} else {
panic!("Unreachable: uncomputed but no function to compute with")
}
}
match &*inner_mut {
LazyInner::Computed(computed) => unsafe {
// Extending the lifetime *should* be safe because we don't ever overwrite
// a computed value...
extend_lifetime(computed)
},
LazyInner::Uncomputed(_) => {
panic!("Unreachable: Should have been computed");
}
}
}
}