Re-apply seeds and weeds at import time, to on-hold URLs

This commit is contained in:
Olivier 'reivilibre' 2022-03-22 20:01:26 +00:00
parent 2f5131e690
commit db9fe77c16
3 changed files with 105 additions and 5 deletions

View File

@ -18,7 +18,7 @@ use quickpeep_raker::config::RakerConfig;
use quickpeep_raker::raking::analysis::get_reduced_domain; use quickpeep_raker::raking::analysis::get_reduced_domain;
use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent}; use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent};
use quickpeep_raker::storage::records::{AllowedDomainRecord, WeedDomainRecord}; use quickpeep_raker::storage::records::{AllowedDomainRecord, WeedDomainRecord};
use quickpeep_raker::storage::RakerStore; use quickpeep_raker::storage::{maintenance, RakerStore};
use quickpeep_seed_parser::parse_seeds; use quickpeep_seed_parser::parse_seeds;
use quickpeep_utils::dirty::DirtyTracker; use quickpeep_utils::dirty::DirtyTracker;
@ -63,6 +63,12 @@ pub async fn main() -> anyhow::Result<()> {
import_weeds(store.clone(), &config).await?; import_weeds(store.clone(), &config).await?;
eprintln!("... re-applying seeds and weeds to on-hold URLs ...");
store
.async_rw_txn(|txn| maintenance::reapply_seeds_and_weeds_to_on_hold_urls(txn))
.await?;
eprintln!("... done!");
Ok(()) Ok(())
} }
@ -96,19 +102,19 @@ pub async fn import_seeds(store: RakerStore, config: &RakerConfig) -> anyhow::Re
} }
pub async fn import_weeds(store: RakerStore, config: &RakerConfig) -> anyhow::Result<()> { pub async fn import_weeds(store: RakerStore, config: &RakerConfig) -> anyhow::Result<()> {
let (seed_tx, seed_rx) = mpsc::channel(128); let (weed_tx, weed_rx) = mpsc::channel(128);
let seed_files = find_seed_files(config.seed_dir.clone(), SEED_EXTENSION).await?; let seed_files = find_seed_files(config.seed_dir.clone(), WEED_EXTENSION).await?;
eprintln!("{:?}", seed_files); eprintln!("{:?}", seed_files);
tokio::spawn(async move { tokio::spawn(async move {
seed_loader(seed_files, &seed_tx).await?; seed_loader(seed_files, &weed_tx).await?;
Ok(()) as anyhow::Result<()> Ok(()) as anyhow::Result<()>
}); });
let stats = importer(store, seed_rx, true).await?; let stats = importer(store, weed_rx, true).await?;
dark_red_ln!("=== Weeds Imported! ==="); dark_red_ln!("=== Weeds Imported! ===");
red!("New domains: "); red!("New domains: ");

View File

@ -22,6 +22,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
pub mod maintenance;
pub mod mdbx_helper_types; pub mod mdbx_helper_types;
mod migrations; mod migrations;
pub mod records; pub mod records;

View File

@ -0,0 +1,93 @@
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString};
use crate::storage::records::{AllowedDomainRecord, OnHoldUrlRecord, WeedDomainRecord};
use crate::storage::RakerTxn;
use anyhow::Context;
use libmdbx::{Database, WriteFlags, RW};
use log::warn;
use reqwest::Url;
/// Runs one big transaction that:
/// - scans on-hold URLs
/// - moves 'allowed' ones to the queue
/// - deletes 'weeds'
/// - leaves unknown ones alone
///
/// Ideally should be applied after importing seeds and weeds on an existing database.
pub fn reapply_seeds_and_weeds_to_on_hold_urls(txn: RakerTxn<RW>) -> anyhow::Result<()> {
struct DomainState {
pub domain: String,
pub allowed_domain_record: Option<AllowedDomainRecord>,
pub weed_domain_record: Option<WeedDomainRecord>,
}
let urls_on_hold: &Database = &txn.mdbx.borrow_dbs().urls_on_hold;
let mut domain_state = None;
// Scan through the on-hold URLs
let mut cur = txn.mdbx_txn.cursor(urls_on_hold)?;
let mut first_iteration = true;
while let Some((MdbxString(domain_then_url), MdbxBare(record))) = if first_iteration {
first_iteration = false;
cur.first::<MdbxString, MdbxBare<OnHoldUrlRecord>>()
} else {
cur.next::<MdbxString, MdbxBare<OnHoldUrlRecord>>()
}? {
let mut split = domain_then_url.as_ref().split("\n");
let domain = split.next().context("No first split..?")?;
let url_str = split.next().context("No URL")?;
// Is the domain new?
if domain_state
.as_ref()
.map(|ds: &DomainState| &ds.domain != domain)
.unwrap_or(true)
{
// Then load the relevant records for it.
domain_state = Some(DomainState {
domain: domain.to_owned(),
allowed_domain_record: txn.get_allowed_domain_record(domain)?,
weed_domain_record: txn.get_weed_domain_record(domain)?,
});
}
let url = Url::parse(url_str)?;
let domain_state = domain_state.as_ref().unwrap();
let is_allowed = domain_state
.allowed_domain_record
.as_ref()
.map(|adr: &AllowedDomainRecord| adr.applies_to_url(&url))
.unwrap_or(false);
let is_weed = domain_state
.weed_domain_record
.as_ref()
.map(|wdr: &WeedDomainRecord| wdr.applies_to_url(&url))
.unwrap_or(false);
match (is_allowed, is_weed) {
(false, false) => { /* nop */ }
(true, true) => {
warn!(
"Ambiguous: {:?} is both mentioned by a seed and a weed. Ignoring.",
url
);
}
(true, false) => {
// ALLOWED
// Make it a queued URL
txn.enqueue_url(url_str, None, record.queue_record.intent)?;
cur.del(WriteFlags::empty())?;
}
(false, true) => {
// WEED
// Just delete
cur.del(WriteFlags::empty())?;
}
}
}
txn.commit()?;
Ok(())
}