Read rake pack records in the indexer

This commit is contained in:
Olivier 'reivilibre' 2022-03-24 23:37:11 +00:00
parent 4f4c3e36d1
commit 5418afe8dd
4 changed files with 58 additions and 13 deletions

View File

@ -1,9 +1,12 @@
use anyhow::Context;
use anyhow::{bail, Context};
use clap::Parser;
use colour::{blue, yellow_ln};
use env_logger::Env;
use std::fs::File;
use std::io::{BufRead, BufReader};
use quickpeep_indexer::config::IndexerConfig;
use quickpeep_structs::rake_entries::{PackRecord, RakedPageEntry, SCHEMA_RAKED_PAGES};
use std::path::PathBuf;
/// Seeds a raker's queue with URLs
@ -30,6 +33,23 @@ pub fn main() -> anyhow::Result<()> {
for pack in opts.rakepacks {
blue!("Indexing: ");
yellow_ln!("{:?}", pack);
let file = File::open(&pack)?;
let mut buf_reader = BufReader::new(file);
let schema: String = serde_bare::from_reader(&mut buf_reader)?;
if &schema != SCHEMA_RAKED_PAGES {
bail!(
"Wrong schema version: wanted {:?}, got {:?}",
SCHEMA_RAKED_PAGES,
&schema
);
}
// TODO(unstable): this condition is `.has_data_left()` but it's unstable.
while buf_reader.fill_buf().map(|b| !b.is_empty())? {
let _page_record: PackRecord<RakedPageEntry> =
serde_bare::from_reader(&mut buf_reader)?;
}
}
Ok(())

View File

@ -26,7 +26,9 @@ use quickpeep_raker::raking::page_extraction::PageExtractionService;
use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission};
use quickpeep_raker::raking::{Raker, RAKER_USER_AGENT, TIME_LIMIT};
use quickpeep_raker::storage::RakerStore;
use quickpeep_structs::rake_entries::AnalysisAntifeatures;
use quickpeep_structs::rake_entries::{
AnalysisAntifeatures, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES, SCHEMA_RAKED_REJECTIONS,
};
/// The ordering is slightly important on these: more specific things should come first.
/// This means they filter out the troublesome elements before the broader filters do.
@ -146,7 +148,7 @@ pub async fn main() -> anyhow::Result<()> {
std::thread::Builder::new()
.name("pages emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "pages", pages_rx, &settings)?;
pack_emitter(&emit_dir, "pages", SCHEMA_RAKED_PAGES, pages_rx, &settings)?;
Ok(())
})?,
);
@ -159,7 +161,13 @@ pub async fn main() -> anyhow::Result<()> {
std::thread::Builder::new()
.name("refs emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "refs", refs_rx, &settings)?;
pack_emitter(
&emit_dir,
"refs",
SCHEMA_RAKED_REFERENCES,
refs_rx,
&settings,
)?;
Ok(())
})?,
);
@ -172,7 +180,13 @@ pub async fn main() -> anyhow::Result<()> {
std::thread::Builder::new()
.name("rejections emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "rejections", rejections_rx, &settings)?;
pack_emitter(
&emit_dir,
"rejections",
SCHEMA_RAKED_REJECTIONS,
rejections_rx,
&settings,
)?;
Ok(())
})?,
);

View File

@ -1,8 +1,10 @@
use chrono::Utc;
use log::warn;
use metrics::{describe_counter, register_counter, Counter, Unit};
use quickpeep_structs::rake_entries::PackRecord;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::Path;
@ -30,6 +32,7 @@ pub struct PackEmitterSettings {
pub fn pack_emitter<T: Serialize + Send + 'static>(
directory: &Path,
name: &str,
schema_name: &'static str,
mut rx: Receiver<(Url, T)>,
settings: &PackEmitterSettings,
) -> anyhow::Result<()> {
@ -60,7 +63,7 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
}
};
if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, settings)? {
if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, schema_name, settings)? {
// File wasn't filled; the receiver was exhausted (we're shutting down).
break;
}
@ -68,17 +71,12 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
Ok(())
}
#[derive(Serialize)]
struct PackRecord<'a, T> {
url: &'a str,
record: T,
}
/// Returns: true if the file was filled (size cutoff reached), false if the receiver was exhausted.
fn pack_emitter_to_file<T: Serialize>(
file: &Path,
rx: &mut Receiver<(Url, T)>,
name: &str,
schema_name: &'static str,
settings: &PackEmitterSettings,
) -> anyhow::Result<bool> {
let file = OpenOptions::new().create_new(true).write(true).open(file)?;
@ -92,11 +90,13 @@ fn pack_emitter_to_file<T: Serialize>(
let record_counter: Counter =
register_counter!("emitted_pack_count", "pack" => name.to_owned());
serde_bare::to_writer(&mut ser_buf, schema_name)?;
while let Some((url, record)) = rx.blocking_recv() {
serde_bare::to_writer(
&mut ser_buf,
&PackRecord {
url: url.as_str(),
url: Cow::Borrowed(url.as_str()),
record,
},
)?;

View File

@ -2,6 +2,7 @@ use bitflags::bitflags;
use bitflags_serde_shim::impl_serde_for_bitflags;
use quickpeep_densedoc::DenseDocument;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::BTreeSet;
bitflags! {
@ -25,6 +26,10 @@ bitflags! {
impl_serde_for_bitflags!(AnalysisAntifeatures);
pub const SCHEMA_RAKED_PAGES: &str = "quickpeep_pages:0.1.0";
pub const SCHEMA_RAKED_REFERENCES: &str = "quickpeep_references:0.1.0";
pub const SCHEMA_RAKED_REJECTIONS: &str = "quickpeep_rejections:0.1.0";
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RakedPageEntry {
pub analysed_antifeatures: AnalysisAntifeatures,
@ -57,3 +62,9 @@ pub enum ReferenceKind {
FeedEntry,
SitemapEntry,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PackRecord<'a, T> {
pub url: Cow<'a, str>,
pub record: T,
}