Support dumping seeds as files

rei/raker_storage
Olivier 'reivilibre' 2022-03-28 22:40:21 +01:00
parent 507459b4ee
commit 98c05f59b5
4 changed files with 174 additions and 0 deletions

2
Cargo.lock generated
View File

@ -3672,11 +3672,13 @@ dependencies = [
"axum",
"colour",
"env_logger",
"futures-util",
"itertools",
"log",
"quickpeep_index",
"ron",
"serde",
"smartstring",
"sqlx",
"tokio",
"tower-http",

View File

@ -18,5 +18,7 @@ env_logger = "0.9.0"
sqlx = { version = "0.5.11", features = ["sqlite", "runtime-tokio-rustls"] }
itertools = "0.10.3"
colour = "0.6.0"
futures-util = "0.3.21"
smartstring = "1.0.1"
quickpeep_index = { path = "../quickpeep_index" }

View File

@ -0,0 +1,7 @@
CREATE TABLE seed_processing_positions (
-- Name of the processing relevant to this position
name TEXT NOT NULL PRIMARY KEY,
-- Position (last processed ID)
last_processed INTEGER NOT NULL
);

View File

@ -0,0 +1,163 @@
use anyhow::{bail, Context};
use env_logger::Env;
use futures_util::stream::StreamExt;
use quickpeep::config::WebConfig;
use smartstring::alias::CompactString;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::{Connection, SqlitePool};
use std::collections::{BTreeSet, HashMap};
use std::path::{Path, PathBuf};
use tokio::fs::OpenOptions;
use tokio::io::{AsyncWriteExt, BufWriter};
pub const DECISION_INCLUDED: i64 = 0;
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(
Env::default().default_filter_or("info,quickpeep=debug,sqlx=warn"),
)
.init();
let seed_dump_path = PathBuf::from(
std::env::args()
.skip(1)
.next()
.context("Must specify output file as arg № 1! :)")?,
);
let config_path =
PathBuf::from(std::env::var("QP_WEB_CONFIG").unwrap_or_else(|_| "qp_web.ron".to_owned()));
if !config_path.exists() {
bail!(
"Config path {:?} doesn't exist. QP_WEB_CONFIG env var overrides.",
config_path
);
}
let file_bytes = std::fs::read(&config_path).context("Failed to read web config file")?;
let web_config: WebConfig =
ron::de::from_bytes(&file_bytes).context("Failed to parse web config")?;
let pool = SqlitePoolOptions::new()
.min_connections(1)
.after_connect(|conn| {
Box::pin(async move {
// Use the WAL because it just makes more sense :)
sqlx::query("PRAGMA journal_mode = WAL")
.execute(&mut *conn)
.await?;
// Enable foreign keys because we like them!
sqlx::query("PRAGMA foreign_keys = ON")
.execute(&mut *conn)
.await?;
Ok(())
})
})
.connect(
&web_config
.sqlite_db_path
.to_str()
.context("SQLite DB path should be UTF-8")?,
)
.await?;
sqlx::migrate!().run(&pool).await?;
seed_dump(&pool, &seed_dump_path).await?;
Ok(())
}
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
struct PublishableSeed {
url: String,
comment: String,
}
/// Dumps seeds from the database to a seed file.
/// Only dumps new seeds, then updates the 'last dumped' position.
pub async fn seed_dump(pool: &SqlitePool, path: &Path) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let mut txn = conn.begin().await?;
// Get the last updated position
let position: Option<i64> = sqlx::query!(
"
SELECT last_processed FROM seed_processing_positions WHERE name = 'dumped'
"
)
.map(|row| row.last_processed)
.fetch_optional(&mut *txn)
.await?;
let file = OpenOptions::new().create_new(true).open(path).await?;
let mut last_processed_position: Option<i64> = None;
let process_from = position.unwrap_or(i64::MIN);
let mut seeds_to_process_stream = sqlx::query!(
"
SELECT collected_seed_id, tag_diff, comment_published, url, tags
FROM sorted_seeds
JOIN collected_seeds USING (collected_seed_id)
WHERE collected_seed_id > ? AND decision = ?
ORDER BY collected_seed_id ASC
",
DECISION_INCLUDED,
process_from
)
.fetch(&mut *txn);
// {{Tags} -> {URLs}}
let mut seed_sets_to_seeds: HashMap<BTreeSet<CompactString>, BTreeSet<PublishableSeed>> =
HashMap::new();
while let Some(row_result) = seeds_to_process_stream.next().await {
let row = row_result?;
let mut tags: BTreeSet<&str> = row.tags.split(",").collect();
let diff_tags: Vec<&str> = row.tag_diff.split(",").collect();
for diff_tag in diff_tags {
if diff_tag.starts_with('+') {
tags.insert(&diff_tag[1..]);
} else if diff_tag.starts_with('-') {
tags.remove(&diff_tag[1..]);
} else {
bail!("!!! Unknown diff tag {:?}", diff_tag);
}
}
let record = PublishableSeed {
url: row.url,
comment: row.comment_published,
};
seed_sets_to_seeds
.entry(tags.into_iter().map(|s| s.into()).collect())
.or_insert_with(BTreeSet::new)
.insert(record);
last_processed_position = Some(row.collected_seed_id);
}
drop(seeds_to_process_stream);
let mut buf_writer = BufWriter::new(file);
buf_writer.flush().await?;
sqlx::query!(
"
REPLACE INTO seed_processing_positions (name, last_processed)
VALUES ('dumped', ?)
",
last_processed_position
)
.execute(&mut *txn)
.await?;
txn.commit().await?;
Ok(())
}