Import seeds (theoretically)
This commit is contained in:
parent
8df430c7f1
commit
abf814550a
|
@ -1,18 +1,23 @@
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
use std::borrow::{Borrow, BorrowMut};
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
|
|
||||||
use env_logger::Env;
|
use env_logger::Env;
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Context};
|
use anyhow::{anyhow, bail, Context};
|
||||||
use arc_interner::ArcIntern;
|
|
||||||
use smartstring::alias::CompactString;
|
use smartstring::alias::CompactString;
|
||||||
|
|
||||||
|
use reqwest::{Client, Url};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
|
|
||||||
use quickpeep_raker::config;
|
use quickpeep_raker::config;
|
||||||
|
use quickpeep_raker::raking::analysis::get_reduced_domain;
|
||||||
|
use quickpeep_raker::raking::{get_robots_txt_for, RakeIntent};
|
||||||
|
use quickpeep_raker::storage::records::AllowedDomainRecord;
|
||||||
use quickpeep_raker::storage::RakerStore;
|
use quickpeep_raker::storage::RakerStore;
|
||||||
use quickpeep_seed_parser::parse_seeds;
|
use quickpeep_seed_parser::parse_seeds;
|
||||||
|
use quickpeep_utils::dirty::DirtyTracker;
|
||||||
|
|
||||||
/// Seeds a raker's queue with URLs
|
/// Seeds a raker's queue with URLs
|
||||||
#[derive(Clone, Debug, Parser)]
|
#[derive(Clone, Debug, Parser)]
|
||||||
|
@ -69,6 +74,15 @@ pub enum UrlOrUrlPattern {
|
||||||
UrlPrefix(String),
|
UrlPrefix(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl UrlOrUrlPattern {
|
||||||
|
pub fn as_str(&self) -> &str {
|
||||||
|
match self {
|
||||||
|
UrlOrUrlPattern::Url(url) => url.as_str(),
|
||||||
|
UrlOrUrlPattern::UrlPrefix(url_prefix) => url_prefix.as_str(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Task that loads seeds from the filesystem
|
/// Task that loads seeds from the filesystem
|
||||||
async fn seed_loader(seed_files: Vec<PathBuf>, send: &Sender<Seed>) -> anyhow::Result<()> {
|
async fn seed_loader(seed_files: Vec<PathBuf>, send: &Sender<Seed>) -> anyhow::Result<()> {
|
||||||
for seed_file in seed_files {
|
for seed_file in seed_files {
|
||||||
|
@ -141,7 +155,100 @@ async fn find_seed_files(seed_dir: PathBuf) -> anyhow::Result<Vec<PathBuf>> {
|
||||||
Ok(seedfiles)
|
Ok(seedfiles)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Task that imports seeds into the store
|
const BATCH_SIZE: usize = 256;
|
||||||
async fn importer(store: RakerStore, recv: Receiver<Seed>) -> anyhow::Result<()> {
|
|
||||||
todo!()
|
#[derive(Clone, Debug, Default)]
|
||||||
|
pub struct SeedImportStats {
|
||||||
|
pub new_domains: u32,
|
||||||
|
pub new_sitemaps: u32,
|
||||||
|
pub new_urls: u32,
|
||||||
|
pub already_present_urls: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Task that imports seeds into the store
|
||||||
|
async fn importer(store: RakerStore, mut recv: Receiver<Seed>) -> anyhow::Result<SeedImportStats> {
|
||||||
|
let mut buf = Vec::with_capacity(BATCH_SIZE);
|
||||||
|
let mut stats = SeedImportStats::default();
|
||||||
|
let client = Client::new();
|
||||||
|
while let Some(seed) = recv.recv().await {
|
||||||
|
buf.push(seed);
|
||||||
|
|
||||||
|
if buf.len() == BATCH_SIZE {
|
||||||
|
import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
import_and_flush_batch(&store, &mut buf, &mut stats, &client).await?;
|
||||||
|
|
||||||
|
Ok(stats)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn import_and_flush_batch(
|
||||||
|
store: &RakerStore,
|
||||||
|
buf: &mut Vec<Seed>,
|
||||||
|
stats: &mut SeedImportStats,
|
||||||
|
client: &Client,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let txn = store.rw_txn()?;
|
||||||
|
for seed in buf.drain(..) {
|
||||||
|
let as_url = Url::parse(seed.url.as_str())
|
||||||
|
.with_context(|| format!("Failed to parse {:?} as URL", seed.url))?;
|
||||||
|
let domain = get_reduced_domain(&as_url)?;
|
||||||
|
|
||||||
|
let allowed_domain_record = txn.get_allowed_domain_record(domain.borrow())?;
|
||||||
|
|
||||||
|
let is_domain_new = allowed_domain_record.is_none();
|
||||||
|
if is_domain_new {
|
||||||
|
stats.new_domains += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut allowed_domain_record = DirtyTracker::new(
|
||||||
|
allowed_domain_record.unwrap_or_else(|| AllowedDomainRecord::default()),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Register the domain. This is a no-op if it's already active or backing off.
|
||||||
|
txn.insert_active_domain_with_new_raffle_ticket(domain.clone().into_owned())?;
|
||||||
|
|
||||||
|
let url_like = match &seed.url {
|
||||||
|
UrlOrUrlPattern::Url(url_str) => {
|
||||||
|
let url = Url::parse(url_str.as_str())?;
|
||||||
|
if txn.enqueue_url(url.as_str(), None, RakeIntent::Any)? {
|
||||||
|
stats.new_urls += 1;
|
||||||
|
} else {
|
||||||
|
stats.already_present_urls += 1;
|
||||||
|
}
|
||||||
|
url
|
||||||
|
}
|
||||||
|
UrlOrUrlPattern::UrlPrefix(prefix) => {
|
||||||
|
let prefix_as_url = Url::parse(prefix.as_str())?;
|
||||||
|
if txn.enqueue_url(prefix_as_url.as_str(), None, RakeIntent::Any)? {
|
||||||
|
stats.new_urls += 1;
|
||||||
|
} else {
|
||||||
|
stats.already_present_urls += 1;
|
||||||
|
}
|
||||||
|
if is_domain_new {
|
||||||
|
let allowed_domain_record: &mut AllowedDomainRecord =
|
||||||
|
allowed_domain_record.borrow_mut();
|
||||||
|
allowed_domain_record
|
||||||
|
.restricted_prefixes
|
||||||
|
.insert(prefix_as_url.path().to_string());
|
||||||
|
}
|
||||||
|
prefix_as_url
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if allowed_domain_record.is_dirty() {
|
||||||
|
txn.put_allowed_domain_record(domain.borrow(), allowed_domain_record.into_inner())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_domain_new {
|
||||||
|
// look at robots.txt and discover sitemaps!
|
||||||
|
if let Some(robots_txt) = get_robots_txt_for(&url_like, &client).await? {
|
||||||
|
for sitemap in robots_txt.sitemaps {
|
||||||
|
txn.enqueue_url(sitemap.url.as_str(), None, RakeIntent::SiteMap)?;
|
||||||
|
stats.new_sitemaps += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,8 @@ use crate::raking::{RakeIntent, TemporaryFailure};
|
||||||
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64};
|
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU32, MdbxU64};
|
||||||
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
|
use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION};
|
||||||
use crate::storage::records::{
|
use crate::storage::records::{
|
||||||
ActiveDomainRecord, BackingOffDomainRecord, QueueUrlRecord, UrlVisitedRecord,
|
ActiveDomainRecord, AllowedDomainRecord, BackingOffDomainRecord, QueueUrlRecord,
|
||||||
|
UrlVisitedRecord,
|
||||||
};
|
};
|
||||||
use anyhow::{bail, ensure, Context};
|
use anyhow::{bail, ensure, Context};
|
||||||
use libmdbx::{
|
use libmdbx::{
|
||||||
|
@ -38,6 +39,10 @@ pub struct Databases<'env> {
|
||||||
pub backing_off_domains: Database<'env>,
|
pub backing_off_domains: Database<'env>,
|
||||||
/// URL → VisitedDomainRecord
|
/// URL → VisitedDomainRecord
|
||||||
pub visited_urls: Database<'env>,
|
pub visited_urls: Database<'env>,
|
||||||
|
/// Domain → AllowedDomainRecord
|
||||||
|
pub allowed_domains: Database<'env>,
|
||||||
|
/// Domain \n URL → Number of refs (INT VALUE)
|
||||||
|
pub urls_on_hold: Database<'env>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'env> Databases<'env> {
|
impl<'env> Databases<'env> {
|
||||||
|
@ -58,7 +63,7 @@ impl<'env> Databases<'env> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Must match the order of the Databases struct fields.
|
// Must match the order of the Databases struct fields.
|
||||||
pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [
|
pub const DATABASES: [(&'static str, DatabaseFlags); 8] = [
|
||||||
("urls_queue", DatabaseFlags::empty()),
|
("urls_queue", DatabaseFlags::empty()),
|
||||||
("active_domains", DatabaseFlags::empty()),
|
("active_domains", DatabaseFlags::empty()),
|
||||||
("active_domain_raffle", DatabaseFlags::INTEGER_KEY),
|
("active_domain_raffle", DatabaseFlags::INTEGER_KEY),
|
||||||
|
@ -68,6 +73,8 @@ pub const DATABASES: [(&'static str, DatabaseFlags); 6] = [
|
||||||
),
|
),
|
||||||
("backing_off_domains", DatabaseFlags::empty()),
|
("backing_off_domains", DatabaseFlags::empty()),
|
||||||
("urls_visited", DatabaseFlags::empty()),
|
("urls_visited", DatabaseFlags::empty()),
|
||||||
|
("allowed_domains", DatabaseFlags::empty()),
|
||||||
|
("urls_on_hold", DatabaseFlags::empty()),
|
||||||
];
|
];
|
||||||
|
|
||||||
#[self_referencing]
|
#[self_referencing]
|
||||||
|
@ -153,6 +160,8 @@ impl RakerStore {
|
||||||
backing_off_reinstatements: dbs.next().unwrap(),
|
backing_off_reinstatements: dbs.next().unwrap(),
|
||||||
backing_off_domains: dbs.next().unwrap(),
|
backing_off_domains: dbs.next().unwrap(),
|
||||||
visited_urls: dbs.next().unwrap(),
|
visited_urls: dbs.next().unwrap(),
|
||||||
|
allowed_domains: dbs.next().unwrap(),
|
||||||
|
urls_on_hold: dbs.next().unwrap(),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -423,6 +432,22 @@ impl<'a> RakerTxn<'a, RW> {
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn put_allowed_domain_record(
|
||||||
|
&self,
|
||||||
|
domain: &str,
|
||||||
|
allowed_domain_record: AllowedDomainRecord,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let allowed_domains = &self.mdbx.borrow_dbs().allowed_domains;
|
||||||
|
|
||||||
|
self.mdbx_txn.put(
|
||||||
|
allowed_domains,
|
||||||
|
domain.as_bytes(),
|
||||||
|
MdbxBare(allowed_domain_record).as_bytes(),
|
||||||
|
WriteFlags::empty(),
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registers metrics for the datastore. Call this once at startup.
|
/// Registers metrics for the datastore. Call this once at startup.
|
||||||
|
@ -523,6 +548,21 @@ impl<'a, K: TransactionKind> RakerTxn<'a, K> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_allowed_domain_record(
|
||||||
|
&self,
|
||||||
|
domain: &str,
|
||||||
|
) -> anyhow::Result<Option<AllowedDomainRecord>> {
|
||||||
|
let allowed_domains = &self.mdbx.borrow_dbs().allowed_domains;
|
||||||
|
|
||||||
|
match self
|
||||||
|
.mdbx_txn
|
||||||
|
.get::<MdbxBare<AllowedDomainRecord>>(allowed_domains, domain.as_bytes())?
|
||||||
|
{
|
||||||
|
None => Ok(None),
|
||||||
|
Some(MdbxBare(record)) => Ok(Some(record)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Emits metrics for the datastore. Call this occasionally.
|
/// Emits metrics for the datastore. Call this occasionally.
|
||||||
pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> {
|
pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> {
|
||||||
for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() {
|
for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() {
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use crate::raking::{RakeIntent, TemporaryFailure};
|
use crate::raking::{RakeIntent, TemporaryFailure};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::BTreeSet;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
pub struct ActiveDomainRecord {
|
pub struct ActiveDomainRecord {
|
||||||
|
@ -35,8 +36,9 @@ pub struct BackingOffDomainRecord {
|
||||||
pub reinstate_at: u64,
|
pub reinstate_at: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||||
pub struct DomainMaskRestriction {
|
pub struct AllowedDomainRecord {
|
||||||
/// TODO List of acceptable URL patterns...
|
/// Set of acceptable path prefixes.
|
||||||
pub patterns: Vec<String>,
|
/// Empty if ALL path prefixes are permitted.
|
||||||
|
pub restricted_prefixes: BTreeSet<String>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
use std::borrow::{Borrow, BorrowMut};
|
||||||
|
|
||||||
|
pub struct DirtyTracker<T> {
|
||||||
|
inner: T,
|
||||||
|
dirty: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Borrow<T> for DirtyTracker<T> {
|
||||||
|
fn borrow(&self) -> &T {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> BorrowMut<T> for DirtyTracker<T> {
|
||||||
|
fn borrow_mut(&mut self) -> &mut T {
|
||||||
|
self.dirty = true;
|
||||||
|
&mut self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DirtyTracker<T> {
|
||||||
|
pub fn new(inner: T) -> DirtyTracker<T> {
|
||||||
|
DirtyTracker {
|
||||||
|
inner,
|
||||||
|
dirty: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_dirty(&self) -> bool {
|
||||||
|
self.dirty
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn make_clean(&mut self) {
|
||||||
|
self.dirty = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_inner(self) -> T {
|
||||||
|
self.inner
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,2 +1,3 @@
|
||||||
pub mod dates;
|
pub mod dates;
|
||||||
|
pub mod dirty;
|
||||||
pub mod lazy;
|
pub mod lazy;
|
||||||
|
|
Loading…
Reference in New Issue