Add facility to qp-indexer that lets it download rakepacks from a feed
ci/woodpecker/push/check Pipeline was successful Details
ci/woodpecker/push/manual Pipeline failed Details
ci/woodpecker/push/release Pipeline was successful Details

rei/rakerstore_postgres_overhaul
Olivier 'reivilibre' 2022-11-26 20:48:06 +00:00
parent bd16f58d9e
commit 54a468d079
4 changed files with 163 additions and 37 deletions

2
Cargo.lock generated
View File

@ -3766,11 +3766,13 @@ dependencies = [
"quickpeep_seed_parser",
"quickpeep_structs",
"quickpeep_utils",
"reqwest",
"ron",
"serde",
"serde_bare",
"serde_json",
"smartstring",
"tempfile",
"tokio",
"url",
"zstd",

View File

@ -30,6 +30,11 @@ patricia_tree = "0.3.1"
# For decompression of emitted packs. 0.11.1+zstd.1.5.2
zstd = "0.11.1"
# HTTP Requests
reqwest = { version = "0.11.9", features = ["blocking"] }
tempfile = "3.3.0"
quickpeep_densedoc = { path = "../quickpeep_densedoc" }
quickpeep_index = { path = "../quickpeep_index" }
quickpeep_structs = { path = "../quickpeep_structs" }

View File

@ -2,9 +2,9 @@ use anyhow::{bail, Context};
use clap::Parser;
use colour::{blue, yellow_ln};
use env_logger::Env;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::collections::{BTreeSet, HashMap};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use patricia_tree::PatriciaMap;
use quickpeep_densedoc::DenseTree;
@ -20,7 +20,8 @@ use quickpeep_structs::rake_entries::{
};
use quickpeep_utils::urls::get_reduced_domain;
use smartstring::alias::CompactString;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use tempfile::NamedTempFile;
use tokio::sync::mpsc::Receiver;
use url::Url;
@ -30,6 +31,11 @@ pub struct Opts {
#[clap(long = "config")]
config: Option<PathBuf>,
/// If specified, rakepacks from a feed will automatically be fetched and indexed.
/// The rakepacks are tracked as having been processed.
#[clap(long = "feed")]
feed: Option<Url>,
rakepacks: Vec<PathBuf>,
}
@ -62,46 +68,146 @@ pub async fn main() -> anyhow::Result<()> {
let mut indexer_backend = config.open_indexer_backend()?;
if let Some(feed) = opts.feed {
let processed_rakepack_path = config
.processed_rakepack_path()
.context("can't get a suitable location to track processed rakepacks")?;
handle_pack_feed(
feed,
&mut indexer_backend,
processed_rakepack_path,
&seed_lookup,
&icon_store,
)
.context("failed to handle pack feed")?;
}
for pack in opts.rakepacks {
blue!("Indexing: ");
yellow_ln!("{:?}", pack);
let file = File::open(&pack)?;
let decompressor = zstd::stream::Decoder::new(file)?;
// TODO the decompressor has a buffer already, but we need this to see the end
let mut buf_reader = BufReader::new(decompressor);
let schema: String =
serde_bare::from_reader(&mut buf_reader).context("failed to read schema ver")?;
match schema.as_ref() {
SCHEMA_RAKED_PAGES => {
// TODO(unstable): this condition is `.has_data_left()` but it's unstable.
while buf_reader.fill_buf().map(|b| !b.is_empty())? {
handle_page_pack(&mut buf_reader, &seed_lookup, &mut indexer_backend)
.context("failed to handle page pack")?;
}
}
SCHEMA_RAKED_ICONS => {
// TODO(unstable): this condition is `.has_data_left()` but it's unstable.
while buf_reader.fill_buf().map(|b| !b.is_empty())? {
handle_icon_pack(&mut buf_reader, &icon_store)
.context("failed to handle icon pack")?;
}
}
_ => {
bail!(
"Wrong schema version: wanted e.g. {:?}, got {:?}",
SCHEMA_RAKED_PAGES,
&schema
);
}
}
handle_pack(&pack, &mut indexer_backend, &seed_lookup, &icon_store)
.with_context(|| format!("Whilst handling pack: {pack:?}"))?;
}
indexer_backend.flush()?;
Ok(())
}
pub fn handle_pack_feed(
feed_url: Url,
indexer_backend: &mut Box<dyn Backend>,
processed_list_path: PathBuf,
seed_lookup: &SeedLookupTable,
icon_store: &IconStore,
) -> anyhow::Result<()> {
blue!("Scanning feed: ");
yellow_ln!("{:?}", feed_url);
let new_packs =
find_new_packs(feed_url.clone(), &processed_list_path).context("finding new packs")?;
let mut processed_log = OpenOptions::new()
.append(true)
.create(true)
.open(&processed_list_path)
.context("can't open processed list for append")?;
for pack_name in new_packs {
let pack_url = feed_url
.join(&pack_name)
.context("Can't resolve URL of new pack")?;
blue!("Downloading: ");
yellow_ln!("{:?}", pack_url);
let mut temp_file = NamedTempFile::new().context("opening temp file")?;
reqwest::blocking::get(pack_url.clone())
.context("failed to request pack")?
.error_for_status()?
.copy_to(temp_file.as_file_mut())
.context("failed to download pack to temp file")?;
handle_pack(temp_file.path(), indexer_backend, seed_lookup, icon_store).with_context(
|| {
format!(
"Whilst handling pack: {:?} ({:?})",
temp_file.path(),
pack_url
)
},
)?;
processed_log.write(format!("\n{}", &pack_name).as_bytes())?;
processed_log.flush()?;
}
Ok(())
}
fn find_new_packs(feed_url: Url, processed_list_path: &Path) -> anyhow::Result<BTreeSet<String>> {
let processed_file = OpenOptions::new()
.read(true)
.create(true)
.open(processed_list_path)?;
let br = BufReader::new(processed_file);
let processed: Result<BTreeSet<String>, _> = br.lines().collect();
let processed = processed.context("failed to read local processed list")?;
let mut unprocessed: BTreeSet<String> = BTreeSet::new();
let feed_lines = BufReader::new(reqwest::blocking::get(feed_url)?.error_for_status()?).lines();
for line in feed_lines {
let line = line?;
if line.is_empty() {
continue;
}
if processed.contains(&line) {
continue;
}
unprocessed.insert(line.to_owned());
}
Ok(unprocessed)
}
pub fn handle_pack(
pack: &Path,
indexer_backend: &mut Box<dyn Backend>,
seed_lookup: &SeedLookupTable,
icon_store: &IconStore,
) -> anyhow::Result<()> {
blue!("Indexing: ");
yellow_ln!("{:?}", pack);
let file = File::open(&pack)?;
let decompressor = zstd::stream::Decoder::new(file)?;
// TODO the decompressor has a buffer already, but we need this to see the end
let mut buf_reader = BufReader::new(decompressor);
let schema: String =
serde_bare::from_reader(&mut buf_reader).context("failed to read schema ver")?;
match schema.as_ref() {
SCHEMA_RAKED_PAGES => {
// TODO(unstable): this condition is `.has_data_left()` but it's unstable.
while buf_reader.fill_buf().map(|b| !b.is_empty())? {
handle_page_pack(&mut buf_reader, &seed_lookup, indexer_backend)
.context("failed to handle page pack")?;
}
}
SCHEMA_RAKED_ICONS => {
// TODO(unstable): this condition is `.has_data_left()` but it's unstable.
while buf_reader.fill_buf().map(|b| !b.is_empty())? {
handle_icon_pack(&mut buf_reader, &icon_store)
.context("failed to handle icon pack")?;
}
}
_ => {
bail!(
"Wrong schema version: wanted e.g. {:?}, got {:?}",
SCHEMA_RAKED_PAGES,
&schema
);
}
}
Ok(())
}
pub fn handle_page_pack(
buf_reader: &mut impl BufRead,
seed_lookup: &SeedLookupTable,

View File

@ -55,4 +55,17 @@ impl IndexerConfig {
}
}
}
/// Returns the path to a text file which can be used for storing a list of processed rakepacks
/// (needed for following rakepack streams over a network).
pub fn processed_rakepack_path(&self) -> anyhow::Result<PathBuf> {
match &self.index.backend {
BackendConfig::Tantivy(tantivy) => {
Ok(tantivy.index_dir.join("processed_rakepacks.lst"))
}
BackendConfig::Meili(_) => {
todo!()
}
}
}
}