diff --git a/Cargo.lock b/Cargo.lock index 8f07f2a..90996fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -389,6 +389,9 @@ name = "cc" version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -1321,6 +1324,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.56" @@ -3045,6 +3057,7 @@ dependencies = [ "smartstring", "tokio", "toml", + "zstd", ] [[package]] @@ -4453,3 +4466,32 @@ dependencies = [ "thiserror", "time", ] + +[[package]] +name = "zstd" +version = "0.11.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" +dependencies = [ + "cc", + "libc", +] diff --git a/quickpeep_raker/Cargo.toml b/quickpeep_raker/Cargo.toml index b8b0b0c..69a6370 100644 --- a/quickpeep_raker/Cargo.toml +++ b/quickpeep_raker/Cargo.toml @@ -32,6 +32,8 @@ chrono = "0.4.19" libmdbx = "0.1.1" # Used for FFI. Must match the version in libmdbx. mdbx-sys = "0.11.4-git.20210105" +# For compression of emitted packs. 0.11.1+zstd.1.5.2 +zstd = "0.11.1" ### Utils lazy_static = "1.4.0" diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index af5bd20..96259fa 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -18,6 +18,7 @@ use tokio::sync::{mpsc, oneshot, Semaphore}; use tokio::time::MissedTickBehavior; use quickpeep_raker::config; +use quickpeep_raker::rakepack_emitter::pack_emitter; use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet}; use quickpeep_raker::raking::page_extraction::PageExtractionService; use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission}; @@ -124,6 +125,47 @@ pub async fn main() -> anyhow::Result<()> { let (refs_tx, refs_rx) = mpsc::channel(32); let (rejections_tx, rejections_rx) = mpsc::channel(32); + let mut emitters = Vec::with_capacity(3); + + { + let emit_dir = config.emit_dir.clone(); + let settings = config.pack_emitter.clone(); + emitters.push( + std::thread::Builder::new() + .name("pages emitter".to_owned()) + .spawn(move || -> anyhow::Result<()> { + pack_emitter(&emit_dir, "pages", pages_rx, &settings)?; + Ok(()) + })?, + ); + } + + { + let emit_dir = config.emit_dir.clone(); + let settings = config.pack_emitter.clone(); + emitters.push( + std::thread::Builder::new() + .name("refs emitter".to_owned()) + .spawn(move || -> anyhow::Result<()> { + pack_emitter(&emit_dir, "refs", refs_rx, &settings)?; + Ok(()) + })?, + ); + } + + { + let emit_dir = config.emit_dir.clone(); + let settings = config.pack_emitter.clone(); + emitters.push( + std::thread::Builder::new() + .name("rejections emitter".to_owned()) + .spawn(move || -> anyhow::Result<()> { + pack_emitter(&emit_dir, "rejections", rejections_rx, &settings)?; + Ok(()) + })?, + ); + } + let submission = TaskResultSubmission { pages: pages_tx, references: refs_tx, diff --git a/quickpeep_raker/src/config.rs b/quickpeep_raker/src/config.rs index a00a11d..cac0435 100644 --- a/quickpeep_raker/src/config.rs +++ b/quickpeep_raker/src/config.rs @@ -1,3 +1,4 @@ +use crate::rakepack_emitter::PackEmitterSettings; use anyhow::Context; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; @@ -20,6 +21,8 @@ pub struct RakerConfig { pub emit_dir: PathBuf, pub metrics: MetricsConfig, + + pub pack_emitter: PackEmitterSettings, } impl RakerConfig { diff --git a/quickpeep_raker/src/lib.rs b/quickpeep_raker/src/lib.rs index 2c250df..297954e 100644 --- a/quickpeep_raker/src/lib.rs +++ b/quickpeep_raker/src/lib.rs @@ -4,5 +4,7 @@ pub mod config; pub mod storage; +pub mod rakepack_emitter; + #[cfg(test)] mod test; diff --git a/quickpeep_raker/src/rakepack_emitter.rs b/quickpeep_raker/src/rakepack_emitter.rs new file mode 100644 index 0000000..80a88c1 --- /dev/null +++ b/quickpeep_raker/src/rakepack_emitter.rs @@ -0,0 +1,105 @@ +use chrono::Utc; +use log::warn; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use std::fs::OpenOptions; +use std::io::Write; +use std::path::Path; +use std::time::Duration; +use tokio::sync::mpsc::Receiver; + +/// Size at which a new pack file will be created. 256 MiB, will later be configurable? +pub const SUGGESTED_SIZE_CUTOFF: usize = 256 * 1024 * 1024; + +/// The Zstd compression level to use. 16 is quite high, but we really want the compact file sizes; +/// willing to pay quite a lot in compression speed. +/// If this turns out to be too slow, should probably go down to 10 or 7. +pub const SUGGESTED_ZSTD_LEVEL: i32 = 16; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct PackEmitterSettings { + #[serde(default)] + pub size_cutoff: Option, + #[serde(default)] + pub zstd_level: Option, +} + +/// An emitter for some kind of pack. +/// Usual types of T: RakedPageEntry, RakedReferrerEntry, PermanentFailure +pub fn pack_emitter( + directory: &Path, + name: &str, + mut rx: Receiver<(Url, T)>, + settings: &PackEmitterSettings, +) -> anyhow::Result<()> { + loop { + let now = Utc::now(); + // 2022-01-01 01:01:01 + let new_pack_file_path = loop { + let new_pack_file_path = + directory.join(format!("{}.{}.pack", now.format("%F_%T"), name)); + if new_pack_file_path.exists() { + warn!( + "{:?} already exists; sleeping to generate new timestamp.", + new_pack_file_path + ); + std::thread::sleep(Duration::from_secs(2)); + } else { + break new_pack_file_path; + } + }; + + if !pack_emitter_to_file(&new_pack_file_path, &mut rx, settings)? { + // File wasn't filled; the receiver was exhausted (we're shutting down). + break; + } + } + Ok(()) +} + +#[derive(Serialize)] +struct PackRecord<'a, T> { + url: &'a str, + record: T, +} + +/// Returns: true if the file was filled (size cutoff reached), false if the receiver was exhausted. +fn pack_emitter_to_file( + file: &Path, + rx: &mut Receiver<(Url, T)>, + settings: &PackEmitterSettings, +) -> anyhow::Result { + let file = OpenOptions::new().create_new(true).write(true).open(file)?; + let mut compressor = + zstd::stream::Encoder::new(file, settings.zstd_level.unwrap_or(SUGGESTED_ZSTD_LEVEL))?; + let mut ser_buf = Vec::new(); + + let mut length_so_far = 0usize; + + while let Some((url, record)) = rx.blocking_recv() { + serde_bare::to_writer( + &mut ser_buf, + &PackRecord { + url: url.as_str(), + record, + }, + )?; + + compressor.write_all(&ser_buf)?; + length_so_far += ser_buf.len(); + + if length_so_far > settings.size_cutoff.unwrap_or(SUGGESTED_SIZE_CUTOFF) { + // MUST CALL + compressor.finish()?.flush()?; + return Ok(true); + } + + ser_buf.clear(); + } + + // MUST CALL + compressor.finish()?.flush()?; + + // Exhausted + Ok(false) +} diff --git a/quickpeep_raker/src/raking.rs b/quickpeep_raker/src/raking.rs index 24d27d4..f6a0d42 100644 --- a/quickpeep_raker/src/raking.rs +++ b/quickpeep_raker/src/raking.rs @@ -76,7 +76,7 @@ pub struct TemporaryFailure { pub backoff_sec: u32, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct PermanentFailure { pub reason: PermanentFailureReason, } @@ -89,7 +89,7 @@ pub enum TemporaryFailureReason { ExcruciatingCrawlDelay(u64), } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum PermanentFailureReason { ResourceDenied(u16), DeniedToRobots,