Allow importing 'weeds' as opposed to 'seeds'

This commit is contained in:
Olivier 'reivilibre' 2022-03-22 19:52:50 +00:00
parent 05ebfc8998
commit 641c575660
3 changed files with 163 additions and 15 deletions

View File

@ -6,7 +6,7 @@ use env_logger::Env;
use anyhow::{anyhow, bail, Context}; use anyhow::{anyhow, bail, Context};
use colour::{dark_green_ln, dark_yellow, green, yellow_ln}; use colour::{dark_green_ln, dark_red_ln, dark_yellow, green, red, yellow_ln};
use log::warn; use log::warn;
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use std::path::PathBuf; use std::path::PathBuf;
@ -14,13 +14,17 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use quickpeep_raker::config; use quickpeep_raker::config;
use quickpeep_raker::config::RakerConfig;
use quickpeep_raker::raking::analysis::get_reduced_domain; use quickpeep_raker::raking::analysis::get_reduced_domain;
use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent}; use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent};
use quickpeep_raker::storage::records::AllowedDomainRecord; use quickpeep_raker::storage::records::{AllowedDomainRecord, WeedDomainRecord};
use quickpeep_raker::storage::RakerStore; use quickpeep_raker::storage::RakerStore;
use quickpeep_seed_parser::parse_seeds; use quickpeep_seed_parser::parse_seeds;
use quickpeep_utils::dirty::DirtyTracker; use quickpeep_utils::dirty::DirtyTracker;
pub const SEED_EXTENSION: &'static str = ".seed";
pub const WEED_EXTENSION: &'static str = ".weed";
/// Seeds a raker's queue with URLs /// Seeds a raker's queue with URLs
#[derive(Clone, Debug, Parser)] #[derive(Clone, Debug, Parser)]
pub struct Opts { pub struct Opts {
@ -55,19 +59,27 @@ pub async fn main() -> anyhow::Result<()> {
let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?; let store = RakerStore::open(&config.workbench_dir.join("raker.mdbx"))?;
let (seed_tx, seed_rx) = mpsc::channel(128); import_seeds(store.clone(), &config).await?;
let seed_files = find_seed_files(config.seed_dir.clone()).await?; import_weeds(store.clone(), &config).await?;
eprintln!("{:?}", seed_files); Ok(())
}
pub async fn import_seeds(store: RakerStore, config: &RakerConfig) -> anyhow::Result<()> {
let (weed_tx, weed_rx) = mpsc::channel(128);
let weed_files = find_seed_files(config.seed_dir.clone(), WEED_EXTENSION).await?;
eprintln!("{:?}", weed_files);
tokio::spawn(async move { tokio::spawn(async move {
seed_loader(seed_files, &seed_tx).await?; seed_loader(weed_files, &weed_tx).await?;
Ok(()) as anyhow::Result<()> Ok(()) as anyhow::Result<()>
}); });
let stats = importer(store, seed_rx).await?; let stats = importer(store, weed_rx, false).await?;
dark_green_ln!("=== Seeds Imported! ==="); dark_green_ln!("=== Seeds Imported! ===");
green!("New URLs: "); green!("New URLs: ");
@ -78,6 +90,30 @@ pub async fn main() -> anyhow::Result<()> {
yellow_ln!("{:?}", stats.new_domains); yellow_ln!("{:?}", stats.new_domains);
dark_yellow!("Seen URLs: "); dark_yellow!("Seen URLs: ");
yellow_ln!("{:?}", stats.already_present_urls); yellow_ln!("{:?}", stats.already_present_urls);
println!();
Ok(())
}
pub async fn import_weeds(store: RakerStore, config: &RakerConfig) -> anyhow::Result<()> {
let (seed_tx, seed_rx) = mpsc::channel(128);
let seed_files = find_seed_files(config.seed_dir.clone(), SEED_EXTENSION).await?;
eprintln!("{:?}", seed_files);
tokio::spawn(async move {
seed_loader(seed_files, &seed_tx).await?;
Ok(()) as anyhow::Result<()>
});
let stats = importer(store, seed_rx, true).await?;
dark_red_ln!("=== Weeds Imported! ===");
red!("New domains: ");
yellow_ln!("{:?}", stats.new_domains);
println!();
Ok(()) Ok(())
} }
@ -151,7 +187,7 @@ fn seed_url_parse_pattern(mut url: String) -> UrlOrUrlPattern {
} }
} }
async fn find_seed_files(seed_dir: PathBuf) -> anyhow::Result<Vec<PathBuf>> { async fn find_seed_files(seed_dir: PathBuf, extension: &str) -> anyhow::Result<Vec<PathBuf>> {
let mut dirs = vec![seed_dir]; let mut dirs = vec![seed_dir];
let mut seedfiles = Vec::new(); let mut seedfiles = Vec::new();
@ -174,7 +210,7 @@ async fn find_seed_files(seed_dir: PathBuf) -> anyhow::Result<Vec<PathBuf>> {
if file_name.starts_with(".") { if file_name.starts_with(".") {
continue; continue;
} }
if file_name.ends_with(".seed") { if file_name.ends_with(extension) {
seedfiles.push(path); seedfiles.push(path);
continue; continue;
} }
@ -199,7 +235,11 @@ pub struct SeedImportStats {
} }
/// Task that imports seeds into the store /// Task that imports seeds into the store
async fn importer(store: RakerStore, mut recv: Receiver<Seed>) -> anyhow::Result<SeedImportStats> { async fn importer(
store: RakerStore,
mut recv: Receiver<Seed>,
are_weeds: bool,
) -> anyhow::Result<SeedImportStats> {
let mut buf = Vec::with_capacity(BATCH_SIZE); let mut buf = Vec::with_capacity(BATCH_SIZE);
let mut stats = SeedImportStats::default(); let mut stats = SeedImportStats::default();
let client = Client::new(); let client = Client::new();
@ -207,15 +247,23 @@ async fn importer(store: RakerStore, mut recv: Receiver<Seed>) -> anyhow::Result
buf.push(seed); buf.push(seed);
if buf.len() == BATCH_SIZE { if buf.len() == BATCH_SIZE {
import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?; if are_weeds {
import_and_flush_batch_weeds(&store, &mut buf, &mut stats).await?;
} else {
import_and_flush_batch_seeds(&store, &mut buf, &mut stats, &client).await?;
} }
} }
import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?; }
if are_weeds {
import_and_flush_batch_weeds(&store, &mut buf, &mut stats).await?;
} else {
import_and_flush_batch_seeds(&store, &mut buf, &mut stats, &client).await?;
}
Ok(stats) Ok(stats)
} }
async fn import_and_flush_batch( async fn import_and_flush_batch_seeds(
store: &RakerStore, store: &RakerStore,
buf: &mut Vec<Seed>, buf: &mut Vec<Seed>,
stats: &mut SeedImportStats, stats: &mut SeedImportStats,
@ -290,3 +338,63 @@ async fn import_and_flush_batch(
txn.commit()?; txn.commit()?;
Ok(()) Ok(())
} }
async fn import_and_flush_batch_weeds(
store: &RakerStore,
buf: &mut Vec<Seed>,
stats: &mut SeedImportStats,
) -> anyhow::Result<()> {
let txn = store.rw_txn()?;
for seed in buf.drain(..) {
let as_url = Url::parse(seed.url.as_str())
.with_context(|| format!("Failed to parse {:?} as URL", seed.url))?;
let domain = get_reduced_domain(&as_url)?;
let weed_domain_record = txn.get_weed_domain_record(domain.borrow())?;
let is_domain_new = weed_domain_record.is_none();
if is_domain_new {
stats.new_domains += 1;
}
let mut weed_domain_record =
DirtyTracker::new(weed_domain_record.unwrap_or_else(|| WeedDomainRecord::default()));
if is_domain_new {
// Mark it as dirty
let _: &mut WeedDomainRecord = weed_domain_record.borrow_mut();
}
match &seed.url {
UrlOrUrlPattern::Url(url_str) => {
let url = Url::parse(url_str.as_str())?;
if txn.enqueue_url(url.as_str(), None, RakeIntent::Any)? {
stats.new_urls += 1;
} else {
stats.already_present_urls += 1;
}
url
}
UrlOrUrlPattern::UrlPrefix(prefix) => {
let prefix_as_url = Url::parse(prefix.as_str())?;
if txn.enqueue_url(prefix_as_url.as_str(), None, RakeIntent::Any)? {
stats.new_urls += 1;
} else {
stats.already_present_urls += 1;
}
if is_domain_new {
let weed_domain_record: &mut WeedDomainRecord = weed_domain_record.borrow_mut();
weed_domain_record
.restricted_prefixes
.insert(prefix_as_url.path().to_string());
}
prefix_as_url
}
};
if weed_domain_record.is_dirty() {
txn.put_weed_domain_record(domain.borrow(), weed_domain_record.into_inner())?;
}
}
txn.commit()?;
Ok(())
}

View File

@ -4,7 +4,7 @@ use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64};
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
use crate::storage::records::{ use crate::storage::records::{
ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, OnHoldUrlRecord, ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, OnHoldUrlRecord,
QueueUrlRecord, UrlVisitedRecord, QueueUrlRecord, UrlVisitedRecord, WeedDomainRecord,
}; };
use anyhow::{anyhow, bail, ensure, Context}; use anyhow::{anyhow, bail, ensure, Context};
use libmdbx::{ use libmdbx::{
@ -44,6 +44,8 @@ pub struct Databases<'env> {
pub allowed_domains: Database<'env>, pub allowed_domains: Database<'env>,
/// Domain \n URL → OnHoldUrlRecord Number of refs (INT VALUE) /// Domain \n URL → OnHoldUrlRecord Number of refs (INT VALUE)
pub urls_on_hold: Database<'env>, pub urls_on_hold: Database<'env>,
/// Domain → WeedDomainRecord
pub weed_domains: Database<'env>,
} }
impl<'env> Databases<'env> { impl<'env> Databases<'env> {
@ -60,13 +62,14 @@ impl<'env> Databases<'env> {
("visited_urls", &self.visited_urls), ("visited_urls", &self.visited_urls),
("allowed_domains", &self.allowed_domains), ("allowed_domains", &self.allowed_domains),
("urls_on_hold", &self.urls_on_hold), ("urls_on_hold", &self.urls_on_hold),
("weed_domains", &self.weed_domains),
] ]
.into_iter() .into_iter()
} }
} }
// Must match the order of the Databases struct fields. // Must match the order of the Databases struct fields.
pub const DATABASES: [(&'static str, DatabaseFlags); 8] = [ pub const DATABASES: [(&'static str, DatabaseFlags); 9] = [
("urls_queue", DatabaseFlags::empty()), ("urls_queue", DatabaseFlags::empty()),
("active_domains", DatabaseFlags::empty()), ("active_domains", DatabaseFlags::empty()),
("active_domain_raffle", DatabaseFlags::INTEGER_KEY), ("active_domain_raffle", DatabaseFlags::INTEGER_KEY),
@ -78,6 +81,7 @@ pub const DATABASES: [(&'static str, DatabaseFlags); 8] = [
("urls_visited", DatabaseFlags::empty()), ("urls_visited", DatabaseFlags::empty()),
("allowed_domains", DatabaseFlags::empty()), ("allowed_domains", DatabaseFlags::empty()),
("urls_on_hold", DatabaseFlags::empty()), ("urls_on_hold", DatabaseFlags::empty()),
("weed_domains", DatabaseFlags::empty()),
]; ];
#[self_referencing] #[self_referencing]
@ -165,6 +169,7 @@ impl RakerStore {
visited_urls: dbs.next().unwrap(), visited_urls: dbs.next().unwrap(),
allowed_domains: dbs.next().unwrap(), allowed_domains: dbs.next().unwrap(),
urls_on_hold: dbs.next().unwrap(), urls_on_hold: dbs.next().unwrap(),
weed_domains: dbs.next().unwrap(),
} }
}, },
} }
@ -499,6 +504,22 @@ impl<'a> RakerTxn<'a, RW> {
)?; )?;
Ok(()) Ok(())
} }
pub fn put_weed_domain_record(
&self,
domain: &str,
weed_domain_record: WeedDomainRecord,
) -> anyhow::Result<()> {
let weed_domains = &self.mdbx.borrow_dbs().weed_domains;
self.mdbx_txn.put(
weed_domains,
domain.as_bytes(),
MdbxBare(weed_domain_record).as_bytes(),
WriteFlags::empty(),
)?;
Ok(())
}
} }
/// Registers metrics for the datastore. Call this once at startup. /// Registers metrics for the datastore. Call this once at startup.
@ -658,6 +679,18 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> {
} }
} }
pub fn get_weed_domain_record(&self, domain: &str) -> anyhow::Result<Option<WeedDomainRecord>> {
let weed_domains = &self.mdbx.borrow_dbs().weed_domains;
match self
.mdbx_txn
.get::<MdbxBare<WeedDomainRecord>>(weed_domains, domain.as_bytes())?
{
None => Ok(None),
Some(MdbxBare(record)) => Ok(Some(record)),
}
}
/// Emits metrics for the datastore. Call this occasionally. /// Emits metrics for the datastore. Call this occasionally.
pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> { pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> {
for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() { for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() {

View File

@ -51,3 +51,10 @@ pub struct AllowedDomainRecord {
/// Empty if ALL path prefixes are permitted. /// Empty if ALL path prefixes are permitted.
pub restricted_prefixes: BTreeSet<String>, pub restricted_prefixes: BTreeSet<String>,
} }
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct WeedDomainRecord {
/// Set of weedy path prefixes.
/// Empty if ALL path prefixes are weedy.
pub restricted_prefixes: BTreeSet<String>,
}