From 98c05f59b5595c129fada4cd870523d91b40899c Mon Sep 17 00:00:00 2001 From: Olivier Date: Mon, 28 Mar 2022 22:40:21 +0100 Subject: [PATCH] Support dumping seeds as files --- Cargo.lock | 2 + quickpeep/Cargo.toml | 2 + .../20220328172903_seed_dumping.sql | 7 + quickpeep/src/bin/qp-seedcoll-dump.rs | 163 ++++++++++++++++++ 4 files changed, 174 insertions(+) create mode 100644 quickpeep/migrations/20220328172903_seed_dumping.sql create mode 100644 quickpeep/src/bin/qp-seedcoll-dump.rs diff --git a/Cargo.lock b/Cargo.lock index 15b75c5..39d5233 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3672,11 +3672,13 @@ dependencies = [ "axum", "colour", "env_logger", + "futures-util", "itertools", "log", "quickpeep_index", "ron", "serde", + "smartstring", "sqlx", "tokio", "tower-http", diff --git a/quickpeep/Cargo.toml b/quickpeep/Cargo.toml index 9a1b121..e926ee6 100644 --- a/quickpeep/Cargo.toml +++ b/quickpeep/Cargo.toml @@ -18,5 +18,7 @@ env_logger = "0.9.0" sqlx = { version = "0.5.11", features = ["sqlite", "runtime-tokio-rustls"] } itertools = "0.10.3" colour = "0.6.0" +futures-util = "0.3.21" +smartstring = "1.0.1" quickpeep_index = { path = "../quickpeep_index" } diff --git a/quickpeep/migrations/20220328172903_seed_dumping.sql b/quickpeep/migrations/20220328172903_seed_dumping.sql new file mode 100644 index 0000000..85c906d --- /dev/null +++ b/quickpeep/migrations/20220328172903_seed_dumping.sql @@ -0,0 +1,7 @@ +CREATE TABLE seed_processing_positions ( + -- Name of the processing relevant to this position + name TEXT NOT NULL PRIMARY KEY, + + -- Position (last processed ID) + last_processed INTEGER NOT NULL +); diff --git a/quickpeep/src/bin/qp-seedcoll-dump.rs b/quickpeep/src/bin/qp-seedcoll-dump.rs new file mode 100644 index 0000000..21e5942 --- /dev/null +++ b/quickpeep/src/bin/qp-seedcoll-dump.rs @@ -0,0 +1,163 @@ +use anyhow::{bail, Context}; +use env_logger::Env; +use futures_util::stream::StreamExt; +use quickpeep::config::WebConfig; +use smartstring::alias::CompactString; +use sqlx::sqlite::SqlitePoolOptions; +use sqlx::{Connection, SqlitePool}; +use std::collections::{BTreeSet, HashMap}; +use std::path::{Path, PathBuf}; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncWriteExt, BufWriter}; + +pub const DECISION_INCLUDED: i64 = 0; + +#[tokio::main] +pub async fn main() -> anyhow::Result<()> { + env_logger::Builder::from_env( + Env::default().default_filter_or("info,quickpeep=debug,sqlx=warn"), + ) + .init(); + + let seed_dump_path = PathBuf::from( + std::env::args() + .skip(1) + .next() + .context("Must specify output file as arg № 1! :)")?, + ); + + let config_path = + PathBuf::from(std::env::var("QP_WEB_CONFIG").unwrap_or_else(|_| "qp_web.ron".to_owned())); + + if !config_path.exists() { + bail!( + "Config path {:?} doesn't exist. QP_WEB_CONFIG env var overrides.", + config_path + ); + } + + let file_bytes = std::fs::read(&config_path).context("Failed to read web config file")?; + let web_config: WebConfig = + ron::de::from_bytes(&file_bytes).context("Failed to parse web config")?; + + let pool = SqlitePoolOptions::new() + .min_connections(1) + .after_connect(|conn| { + Box::pin(async move { + // Use the WAL because it just makes more sense :) + sqlx::query("PRAGMA journal_mode = WAL") + .execute(&mut *conn) + .await?; + + // Enable foreign keys because we like them! + sqlx::query("PRAGMA foreign_keys = ON") + .execute(&mut *conn) + .await?; + + Ok(()) + }) + }) + .connect( + &web_config + .sqlite_db_path + .to_str() + .context("SQLite DB path should be UTF-8")?, + ) + .await?; + + sqlx::migrate!().run(&pool).await?; + + seed_dump(&pool, &seed_dump_path).await?; + + Ok(()) +} + +#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)] +struct PublishableSeed { + url: String, + comment: String, +} + +/// Dumps seeds from the database to a seed file. +/// Only dumps new seeds, then updates the 'last dumped' position. +pub async fn seed_dump(pool: &SqlitePool, path: &Path) -> anyhow::Result<()> { + let mut conn = pool.acquire().await?; + let mut txn = conn.begin().await?; + + // Get the last updated position + let position: Option = sqlx::query!( + " + SELECT last_processed FROM seed_processing_positions WHERE name = 'dumped' + " + ) + .map(|row| row.last_processed) + .fetch_optional(&mut *txn) + .await?; + + let file = OpenOptions::new().create_new(true).open(path).await?; + + let mut last_processed_position: Option = None; + + let process_from = position.unwrap_or(i64::MIN); + + let mut seeds_to_process_stream = sqlx::query!( + " + SELECT collected_seed_id, tag_diff, comment_published, url, tags + FROM sorted_seeds + JOIN collected_seeds USING (collected_seed_id) + WHERE collected_seed_id > ? AND decision = ? + ORDER BY collected_seed_id ASC + ", + DECISION_INCLUDED, + process_from + ) + .fetch(&mut *txn); + + // {{Tags} -> {URLs}} + let mut seed_sets_to_seeds: HashMap, BTreeSet> = + HashMap::new(); + + while let Some(row_result) = seeds_to_process_stream.next().await { + let row = row_result?; + + let mut tags: BTreeSet<&str> = row.tags.split(",").collect(); + let diff_tags: Vec<&str> = row.tag_diff.split(",").collect(); + for diff_tag in diff_tags { + if diff_tag.starts_with('+') { + tags.insert(&diff_tag[1..]); + } else if diff_tag.starts_with('-') { + tags.remove(&diff_tag[1..]); + } else { + bail!("!!! Unknown diff tag {:?}", diff_tag); + } + } + + let record = PublishableSeed { + url: row.url, + comment: row.comment_published, + }; + seed_sets_to_seeds + .entry(tags.into_iter().map(|s| s.into()).collect()) + .or_insert_with(BTreeSet::new) + .insert(record); + last_processed_position = Some(row.collected_seed_id); + } + drop(seeds_to_process_stream); + + let mut buf_writer = BufWriter::new(file); + + buf_writer.flush().await?; + + sqlx::query!( + " + REPLACE INTO seed_processing_positions (name, last_processed) + VALUES ('dumped', ?) + ", + last_processed_position + ) + .execute(&mut *txn) + .await?; + + txn.commit().await?; + Ok(()) +}