From 54a468d079fee916fa99db619c31de5d9790b772 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 26 Nov 2022 20:48:06 +0000 Subject: [PATCH] Add facility to qp-indexer that lets it download rakepacks from a feed --- Cargo.lock | 2 + quickpeep_indexer/Cargo.toml | 5 + quickpeep_indexer/src/bin/qp-indexer.rs | 180 +++++++++++++++++++----- quickpeep_indexer/src/config.rs | 13 ++ 4 files changed, 163 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8c82af..945dafb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3766,11 +3766,13 @@ dependencies = [ "quickpeep_seed_parser", "quickpeep_structs", "quickpeep_utils", + "reqwest", "ron", "serde", "serde_bare", "serde_json", "smartstring", + "tempfile", "tokio", "url", "zstd", diff --git a/quickpeep_indexer/Cargo.toml b/quickpeep_indexer/Cargo.toml index c0f12b5..a26322c 100644 --- a/quickpeep_indexer/Cargo.toml +++ b/quickpeep_indexer/Cargo.toml @@ -30,6 +30,11 @@ patricia_tree = "0.3.1" # For decompression of emitted packs. 0.11.1+zstd.1.5.2 zstd = "0.11.1" +# HTTP Requests +reqwest = { version = "0.11.9", features = ["blocking"] } + +tempfile = "3.3.0" + quickpeep_densedoc = { path = "../quickpeep_densedoc" } quickpeep_index = { path = "../quickpeep_index" } quickpeep_structs = { path = "../quickpeep_structs" } diff --git a/quickpeep_indexer/src/bin/qp-indexer.rs b/quickpeep_indexer/src/bin/qp-indexer.rs index 332ca3c..d431a30 100644 --- a/quickpeep_indexer/src/bin/qp-indexer.rs +++ b/quickpeep_indexer/src/bin/qp-indexer.rs @@ -2,9 +2,9 @@ use anyhow::{bail, Context}; use clap::Parser; use colour::{blue, yellow_ln}; use env_logger::Env; -use std::collections::HashMap; -use std::fs::File; -use std::io::{BufRead, BufReader}; +use std::collections::{BTreeSet, HashMap}; +use std::fs::{File, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; use patricia_tree::PatriciaMap; use quickpeep_densedoc::DenseTree; @@ -20,7 +20,8 @@ use quickpeep_structs::rake_entries::{ }; use quickpeep_utils::urls::get_reduced_domain; use smartstring::alias::CompactString; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use tempfile::NamedTempFile; use tokio::sync::mpsc::Receiver; use url::Url; @@ -30,6 +31,11 @@ pub struct Opts { #[clap(long = "config")] config: Option, + /// If specified, rakepacks from a feed will automatically be fetched and indexed. + /// The rakepacks are tracked as having been processed. + #[clap(long = "feed")] + feed: Option, + rakepacks: Vec, } @@ -62,46 +68,146 @@ pub async fn main() -> anyhow::Result<()> { let mut indexer_backend = config.open_indexer_backend()?; + if let Some(feed) = opts.feed { + let processed_rakepack_path = config + .processed_rakepack_path() + .context("can't get a suitable location to track processed rakepacks")?; + handle_pack_feed( + feed, + &mut indexer_backend, + processed_rakepack_path, + &seed_lookup, + &icon_store, + ) + .context("failed to handle pack feed")?; + } + for pack in opts.rakepacks { - blue!("Indexing: "); - yellow_ln!("{:?}", pack); - - let file = File::open(&pack)?; - let decompressor = zstd::stream::Decoder::new(file)?; - // TODO the decompressor has a buffer already, but we need this to see the end - let mut buf_reader = BufReader::new(decompressor); - let schema: String = - serde_bare::from_reader(&mut buf_reader).context("failed to read schema ver")?; - - match schema.as_ref() { - SCHEMA_RAKED_PAGES => { - // TODO(unstable): this condition is `.has_data_left()` but it's unstable. - while buf_reader.fill_buf().map(|b| !b.is_empty())? { - handle_page_pack(&mut buf_reader, &seed_lookup, &mut indexer_backend) - .context("failed to handle page pack")?; - } - } - SCHEMA_RAKED_ICONS => { - // TODO(unstable): this condition is `.has_data_left()` but it's unstable. - while buf_reader.fill_buf().map(|b| !b.is_empty())? { - handle_icon_pack(&mut buf_reader, &icon_store) - .context("failed to handle icon pack")?; - } - } - _ => { - bail!( - "Wrong schema version: wanted e.g. {:?}, got {:?}", - SCHEMA_RAKED_PAGES, - &schema - ); - } - } + handle_pack(&pack, &mut indexer_backend, &seed_lookup, &icon_store) + .with_context(|| format!("Whilst handling pack: {pack:?}"))?; } indexer_backend.flush()?; Ok(()) } +pub fn handle_pack_feed( + feed_url: Url, + indexer_backend: &mut Box, + processed_list_path: PathBuf, + seed_lookup: &SeedLookupTable, + icon_store: &IconStore, +) -> anyhow::Result<()> { + blue!("Scanning feed: "); + yellow_ln!("{:?}", feed_url); + + let new_packs = + find_new_packs(feed_url.clone(), &processed_list_path).context("finding new packs")?; + let mut processed_log = OpenOptions::new() + .append(true) + .create(true) + .open(&processed_list_path) + .context("can't open processed list for append")?; + + for pack_name in new_packs { + let pack_url = feed_url + .join(&pack_name) + .context("Can't resolve URL of new pack")?; + + blue!("Downloading: "); + yellow_ln!("{:?}", pack_url); + let mut temp_file = NamedTempFile::new().context("opening temp file")?; + + reqwest::blocking::get(pack_url.clone()) + .context("failed to request pack")? + .error_for_status()? + .copy_to(temp_file.as_file_mut()) + .context("failed to download pack to temp file")?; + + handle_pack(temp_file.path(), indexer_backend, seed_lookup, icon_store).with_context( + || { + format!( + "Whilst handling pack: {:?} ({:?})", + temp_file.path(), + pack_url + ) + }, + )?; + + processed_log.write(format!("\n{}", &pack_name).as_bytes())?; + processed_log.flush()?; + } + Ok(()) +} + +fn find_new_packs(feed_url: Url, processed_list_path: &Path) -> anyhow::Result> { + let processed_file = OpenOptions::new() + .read(true) + .create(true) + .open(processed_list_path)?; + let br = BufReader::new(processed_file); + let processed: Result, _> = br.lines().collect(); + let processed = processed.context("failed to read local processed list")?; + + let mut unprocessed: BTreeSet = BTreeSet::new(); + + let feed_lines = BufReader::new(reqwest::blocking::get(feed_url)?.error_for_status()?).lines(); + for line in feed_lines { + let line = line?; + if line.is_empty() { + continue; + } + if processed.contains(&line) { + continue; + } + unprocessed.insert(line.to_owned()); + } + + Ok(unprocessed) +} + +pub fn handle_pack( + pack: &Path, + indexer_backend: &mut Box, + seed_lookup: &SeedLookupTable, + icon_store: &IconStore, +) -> anyhow::Result<()> { + blue!("Indexing: "); + yellow_ln!("{:?}", pack); + + let file = File::open(&pack)?; + let decompressor = zstd::stream::Decoder::new(file)?; + // TODO the decompressor has a buffer already, but we need this to see the end + let mut buf_reader = BufReader::new(decompressor); + let schema: String = + serde_bare::from_reader(&mut buf_reader).context("failed to read schema ver")?; + + match schema.as_ref() { + SCHEMA_RAKED_PAGES => { + // TODO(unstable): this condition is `.has_data_left()` but it's unstable. + while buf_reader.fill_buf().map(|b| !b.is_empty())? { + handle_page_pack(&mut buf_reader, &seed_lookup, indexer_backend) + .context("failed to handle page pack")?; + } + } + SCHEMA_RAKED_ICONS => { + // TODO(unstable): this condition is `.has_data_left()` but it's unstable. + while buf_reader.fill_buf().map(|b| !b.is_empty())? { + handle_icon_pack(&mut buf_reader, &icon_store) + .context("failed to handle icon pack")?; + } + } + _ => { + bail!( + "Wrong schema version: wanted e.g. {:?}, got {:?}", + SCHEMA_RAKED_PAGES, + &schema + ); + } + } + Ok(()) +} + pub fn handle_page_pack( buf_reader: &mut impl BufRead, seed_lookup: &SeedLookupTable, diff --git a/quickpeep_indexer/src/config.rs b/quickpeep_indexer/src/config.rs index 710f622..2157924 100644 --- a/quickpeep_indexer/src/config.rs +++ b/quickpeep_indexer/src/config.rs @@ -55,4 +55,17 @@ impl IndexerConfig { } } } + + /// Returns the path to a text file which can be used for storing a list of processed rakepacks + /// (needed for following rakepack streams over a network). + pub fn processed_rakepack_path(&self) -> anyhow::Result { + match &self.index.backend { + BackendConfig::Tantivy(tantivy) => { + Ok(tantivy.index_dir.join("processed_rakepacks.lst")) + } + BackendConfig::Meili(_) => { + todo!() + } + } + } }