diff --git a/quickpeep_indexer/src/bin/qp-indexer.rs b/quickpeep_indexer/src/bin/qp-indexer.rs index 0ce731d..43cd067 100644 --- a/quickpeep_indexer/src/bin/qp-indexer.rs +++ b/quickpeep_indexer/src/bin/qp-indexer.rs @@ -1,9 +1,12 @@ -use anyhow::Context; +use anyhow::{bail, Context}; use clap::Parser; use colour::{blue, yellow_ln}; use env_logger::Env; +use std::fs::File; +use std::io::{BufRead, BufReader}; use quickpeep_indexer::config::IndexerConfig; +use quickpeep_structs::rake_entries::{PackRecord, RakedPageEntry, SCHEMA_RAKED_PAGES}; use std::path::PathBuf; /// Seeds a raker's queue with URLs @@ -30,6 +33,23 @@ pub fn main() -> anyhow::Result<()> { for pack in opts.rakepacks { blue!("Indexing: "); yellow_ln!("{:?}", pack); + + let file = File::open(&pack)?; + let mut buf_reader = BufReader::new(file); + let schema: String = serde_bare::from_reader(&mut buf_reader)?; + if &schema != SCHEMA_RAKED_PAGES { + bail!( + "Wrong schema version: wanted {:?}, got {:?}", + SCHEMA_RAKED_PAGES, + &schema + ); + } + + // TODO(unstable): this condition is `.has_data_left()` but it's unstable. + while buf_reader.fill_buf().map(|b| !b.is_empty())? { + let _page_record: PackRecord = + serde_bare::from_reader(&mut buf_reader)?; + } } Ok(()) diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index 6b5b18c..0a8686a 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -26,7 +26,9 @@ use quickpeep_raker::raking::page_extraction::PageExtractionService; use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission}; use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT}; use quickpeep_raker::storage::RakerStore; -use quickpeep_structs::rake_entries::AnalysisAntifeatures; +use quickpeep_structs::rake_entries::{ + AnalysisAntifeatures, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES, SCHEMA_RAKED_REJECTIONS, +}; /// The ordering is slightly important on these: more specific things should come first. /// This means they filter out the troublesome elements before the broader filters do. @@ -146,7 +148,7 @@ pub async fn main() -> anyhow::Result<()> { std::thread::Builder::new() .name("pages emitter".to_owned()) .spawn(move || -> anyhow::Result<()> { - pack_emitter(&emit_dir, "pages", pages_rx, &settings)?; + pack_emitter(&emit_dir, "pages", SCHEMA_RAKED_PAGES, pages_rx, &settings)?; Ok(()) })?, ); @@ -159,7 +161,13 @@ pub async fn main() -> anyhow::Result<()> { std::thread::Builder::new() .name("refs emitter".to_owned()) .spawn(move || -> anyhow::Result<()> { - pack_emitter(&emit_dir, "refs", refs_rx, &settings)?; + pack_emitter( + &emit_dir, + "refs", + SCHEMA_RAKED_REFERENCES, + refs_rx, + &settings, + )?; Ok(()) })?, ); @@ -172,7 +180,13 @@ pub async fn main() -> anyhow::Result<()> { std::thread::Builder::new() .name("rejections emitter".to_owned()) .spawn(move || -> anyhow::Result<()> { - pack_emitter(&emit_dir, "rejections", rejections_rx, &settings)?; + pack_emitter( + &emit_dir, + "rejections", + SCHEMA_RAKED_REJECTIONS, + rejections_rx, + &settings, + )?; Ok(()) })?, ); diff --git a/quickpeep_raker/src/rakepack_emitter.rs b/quickpeep_raker/src/rakepack_emitter.rs index 7642445..eec8015 100644 --- a/quickpeep_raker/src/rakepack_emitter.rs +++ b/quickpeep_raker/src/rakepack_emitter.rs @@ -1,8 +1,10 @@ use chrono::Utc; use log::warn; use metrics::{describe_counter, register_counter, Counter, Unit}; +use quickpeep_structs::rake_entries::PackRecord; use reqwest::Url; use serde::{Deserialize, Serialize}; +use std::borrow::Cow; use std::fs::OpenOptions; use std::io::Write; use std::path::Path; @@ -30,6 +32,7 @@ pub struct PackEmitterSettings { pub fn pack_emitter( directory: &Path, name: &str, + schema_name: &'static str, mut rx: Receiver<(Url, T)>, settings: &PackEmitterSettings, ) -> anyhow::Result<()> { @@ -60,7 +63,7 @@ pub fn pack_emitter( } }; - if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, settings)? { + if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, schema_name, settings)? { // File wasn't filled; the receiver was exhausted (we're shutting down). break; } @@ -68,17 +71,12 @@ pub fn pack_emitter( Ok(()) } -#[derive(Serialize)] -struct PackRecord<'a, T> { - url: &'a str, - record: T, -} - /// Returns: true if the file was filled (size cutoff reached), false if the receiver was exhausted. fn pack_emitter_to_file( file: &Path, rx: &mut Receiver<(Url, T)>, name: &str, + schema_name: &'static str, settings: &PackEmitterSettings, ) -> anyhow::Result { let file = OpenOptions::new().create_new(true).write(true).open(file)?; @@ -92,11 +90,13 @@ fn pack_emitter_to_file( let record_counter: Counter = register_counter!("emitted_pack_count", "pack" => name.to_owned()); + serde_bare::to_writer(&mut ser_buf, schema_name)?; + while let Some((url, record)) = rx.blocking_recv() { serde_bare::to_writer( &mut ser_buf, &PackRecord { - url: url.as_str(), + url: Cow::Borrowed(url.as_str()), record, }, )?; diff --git a/quickpeep_structs/src/rake_entries.rs b/quickpeep_structs/src/rake_entries.rs index 0e3d9a0..484c660 100644 --- a/quickpeep_structs/src/rake_entries.rs +++ b/quickpeep_structs/src/rake_entries.rs @@ -2,6 +2,7 @@ use bitflags::bitflags; use bitflags_serde_shim::impl_serde_for_bitflags; use quickpeep_densedoc::DenseDocument; use serde::{Deserialize, Serialize}; +use std::borrow::Cow; use std::collections::BTreeSet; bitflags! { @@ -25,6 +26,10 @@ bitflags! { impl_serde_for_bitflags!(AnalysisAntifeatures); +pub const SCHEMA_RAKED_PAGES: &str = "quickpeep_pages:0.1.0"; +pub const SCHEMA_RAKED_REFERENCES: &str = "quickpeep_references:0.1.0"; +pub const SCHEMA_RAKED_REJECTIONS: &str = "quickpeep_rejections:0.1.0"; + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RakedPageEntry { pub analysed_antifeatures: AnalysisAntifeatures, @@ -57,3 +62,9 @@ pub enum ReferenceKind { FeedEntry, SitemapEntry, } + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PackRecord<'a, T> { + pub url: Cow<'a, str>, + pub record: T, +}