From 6c2ff9daec4a460e02d70c35dd0f39cdc281455f Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 3 Apr 2022 10:18:41 +0100 Subject: [PATCH] Add minimum free space cutoff feature for the raker --- Cargo.lock | 24 +++++++++ qp_raker.sample.toml | 3 ++ quickpeep_raker/Cargo.toml | 2 + quickpeep_raker/src/bin/qp-raker.rs | 38 ++++++++++++-- quickpeep_raker/src/rakepack_emitter.rs | 66 ++++++++++++++++++++++-- quickpeep_raker/src/raking/references.rs | 2 +- 6 files changed, 125 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b47eea5..413c4a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -451,6 +451,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "bytesize" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c58ec36aac5066d5ca17df51b3e70279f5670a72102f5752cb7e7c856adfc70" +dependencies = [ + "serde", +] + [[package]] name = "bzip2" version = "0.4.3" @@ -3031,6 +3040,19 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" +[[package]] +name = "nix" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f866317acbd3a240710c63f065ffb1e4fd466259045ccb504130b7f668f35c6" +dependencies = [ + "bitflags", + "cc", + "cfg-if", + "libc", + "memoffset", +] + [[package]] name = "nodrop" version = "0.1.14" @@ -3756,6 +3778,7 @@ dependencies = [ "arc-interner", "bare-metrics-recorder", "bytes", + "bytesize", "chrono", "clap", "colour", @@ -3779,6 +3802,7 @@ dependencies = [ "metrics 0.18.1", "metrics-exporter-prometheus", "metrics-process-promstyle", + "nix", "ouroboros", "publicsuffix", "quickpeep_densedoc", diff --git a/qp_raker.sample.toml b/qp_raker.sample.toml index 069a6d7..e145ba1 100644 --- a/qp_raker.sample.toml +++ b/qp_raker.sample.toml @@ -8,3 +8,6 @@ prometheus = "127.0.0.1:9774" # bare_metrics = true [pack_emitter] +# size_cutoff = "4 GiB" +# zstd_level = 16 +# min_free_space = "1 GiB" diff --git a/quickpeep_raker/Cargo.toml b/quickpeep_raker/Cargo.toml index e0c3487..49b07f0 100644 --- a/quickpeep_raker/Cargo.toml +++ b/quickpeep_raker/Cargo.toml @@ -28,6 +28,7 @@ serde_bare = "0.5.0" serde_json = "1.0.79" toml = "0.5.8" +bytesize = {version = "1.1.0", features = ["serde"]} ### Dates chrono = "0.4.19" @@ -56,6 +57,7 @@ diplomatic-bag = "0.2.0" arc-interner = "0.7.0" smartstring = "1.0.0" signal-hook = "0.3.13" +nix = "0.23.1" ### Raking helpers # HTTP Requests diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index 2f9a58b..b92c094 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -152,16 +152,29 @@ pub async fn main() -> anyhow::Result<()> { let (rejections_tx, rejections_rx) = mpsc::channel(32); let (icons_tx, icons_rx) = mpsc::channel(32); + let graceful_stop = Arc::new(AtomicBool::new(false)); + let graceful_stop_notify = Arc::new(Notify::new()); + let mut emitters = Vec::with_capacity(3); { let emit_dir = config.emit_dir.clone(); let settings = config.pack_emitter.clone(); + let stop = graceful_stop.clone(); + let notify = graceful_stop_notify.clone(); emitters.push( std::thread::Builder::new() .name("pages emitter".to_owned()) .spawn(move || -> anyhow::Result<()> { - pack_emitter(&emit_dir, "pages", SCHEMA_RAKED_PAGES, pages_rx, &settings)?; + pack_emitter( + &emit_dir, + "pages", + SCHEMA_RAKED_PAGES, + pages_rx, + &settings, + stop, + notify, + )?; Ok(()) })?, ); @@ -170,6 +183,8 @@ pub async fn main() -> anyhow::Result<()> { { let emit_dir = config.emit_dir.clone(); let settings = config.pack_emitter.clone(); + let stop = graceful_stop.clone(); + let notify = graceful_stop_notify.clone(); emitters.push( std::thread::Builder::new() .name("refs emitter".to_owned()) @@ -180,6 +195,8 @@ pub async fn main() -> anyhow::Result<()> { SCHEMA_RAKED_REFERENCES, refs_rx, &settings, + stop, + notify, )?; Ok(()) })?, @@ -189,6 +206,8 @@ pub async fn main() -> anyhow::Result<()> { { let emit_dir = config.emit_dir.clone(); let settings = config.pack_emitter.clone(); + let stop = graceful_stop.clone(); + let notify = graceful_stop_notify.clone(); emitters.push( std::thread::Builder::new() .name("rejections emitter".to_owned()) @@ -199,6 +218,8 @@ pub async fn main() -> anyhow::Result<()> { SCHEMA_RAKED_REJECTIONS, rejections_rx, &settings, + stop, + notify, )?; Ok(()) })?, @@ -208,11 +229,21 @@ pub async fn main() -> anyhow::Result<()> { { let emit_dir = config.emit_dir.clone(); let settings = config.pack_emitter.clone(); + let stop = graceful_stop.clone(); + let notify = graceful_stop_notify.clone(); emitters.push( std::thread::Builder::new() .name("icons emitter".to_owned()) .spawn(move || -> anyhow::Result<()> { - pack_emitter(&emit_dir, "icons", SCHEMA_RAKED_ICONS, icons_rx, &settings)?; + pack_emitter( + &emit_dir, + "icons", + SCHEMA_RAKED_ICONS, + icons_rx, + &settings, + stop, + notify, + )?; Ok(()) })?, ); @@ -225,7 +256,6 @@ pub async fn main() -> anyhow::Result<()> { icons: icons_tx, }; - let graceful_stop = Arc::new(AtomicBool::new(false)); let task_context = TaskContext { store: store.clone(), client, @@ -236,7 +266,7 @@ pub async fn main() -> anyhow::Result<()> { semaphore, submission, graceful_stop, - notify: Arc::new(Default::default()), + notify: graceful_stop_notify, }; let mut tasks = Vec::with_capacity(num_tasks as usize); diff --git a/quickpeep_raker/src/rakepack_emitter.rs b/quickpeep_raker/src/rakepack_emitter.rs index 50b1e06..df0977b 100644 --- a/quickpeep_raker/src/rakepack_emitter.rs +++ b/quickpeep_raker/src/rakepack_emitter.rs @@ -1,5 +1,6 @@ +use bytesize::ByteSize; use chrono::Utc; -use log::warn; +use log::{error, warn}; use metrics::{describe_counter, register_counter, Counter, Unit}; use quickpeep_structs::rake_entries::PackRecord; use reqwest::Url; @@ -8,23 +9,31 @@ use std::borrow::Cow; use std::fs::OpenOptions; use std::io::Write; use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Receiver; +use tokio::sync::Notify; /// Size at which a new pack file will be created. 4 GiB, will later be configurable? -pub const SUGGESTED_SIZE_CUTOFF: usize = 4 * 1024 * 1024 * 1024; +pub const SUGGESTED_SIZE_CUTOFF: ByteSize = ByteSize::gib(4); /// The Zstd compression level to use. 16 is quite high, but we really want the compact file sizes; /// willing to pay quite a lot in compression speed. /// If this turns out to be too slow, should probably go down to 10 or 7. pub const SUGGESTED_ZSTD_LEVEL: i32 = 16; +pub const SUGGESTED_FREE_SPACE_CUTOFF: ByteSize = ByteSize::gib(1); + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct PackEmitterSettings { #[serde(default)] - pub size_cutoff: Option, + pub size_cutoff: Option, #[serde(default)] pub zstd_level: Option, + /// Raking will stop when we have less than this amount of free space. + #[serde(default)] + pub min_free_space: Option, } /// An emitter for some kind of pack. @@ -35,6 +44,8 @@ pub fn pack_emitter( schema_name: &'static str, mut rx: Receiver<(Url, T)>, settings: &PackEmitterSettings, + shutdown: Arc, + shutdown_notify: Arc, ) -> anyhow::Result<()> { describe_counter!( "emitted_pack_bytes", @@ -63,7 +74,15 @@ pub fn pack_emitter( } }; - if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, schema_name, settings)? { + if !pack_emitter_to_file( + &new_pack_file_path, + &mut rx, + name, + schema_name, + settings, + shutdown.clone(), + shutdown_notify.clone(), + )? { // File wasn't filled; the receiver was exhausted (we're shutting down). break; } @@ -78,6 +97,8 @@ fn pack_emitter_to_file( name: &str, schema_name: &'static str, settings: &PackEmitterSettings, + shutdown: Arc, + shutdown_notify: Arc, ) -> anyhow::Result { let file = OpenOptions::new().create_new(true).write(true).open(file)?; let mut compressor = @@ -92,6 +113,8 @@ fn pack_emitter_to_file( serde_bare::to_writer(&mut ser_buf, schema_name)?; + let mut doc_count = 0; + while let Some((url, record)) = rx.blocking_recv() { serde_bare::to_writer( &mut ser_buf, @@ -106,12 +129,45 @@ fn pack_emitter_to_file( byte_counter.increment(ser_buf.len() as u64); record_counter.increment(1); - if length_so_far > settings.size_cutoff.unwrap_or(SUGGESTED_SIZE_CUTOFF) { + if length_so_far as u64 + > settings + .size_cutoff + .unwrap_or(SUGGESTED_SIZE_CUTOFF) + .as_u64() + { // MUST CALL compressor.finish()?.flush()?; return Ok(true); } + if { + doc_count += 1; + doc_count + } % 32 + == 0 + { + let fs_stats = nix::sys::statvfs::fstatvfs(compressor.get_ref())?; + let free_bytes: u64 = fs_stats.block_size() * fs_stats.blocks_available(); + if free_bytes + < settings + .min_free_space + .unwrap_or(SUGGESTED_FREE_SPACE_CUTOFF) + .as_u64() + { + // Signal to shut down. We must still keep processing incoming entries so none get lost, + // though. + + if !shutdown.load(Ordering::SeqCst) { + error!( + "Running low on disk space ({}); shutting down.", + ByteSize::b(free_bytes) + ); + shutdown.store(true, Ordering::SeqCst); + shutdown_notify.notify_waiters(); + } + } + } + ser_buf.clear(); } diff --git a/quickpeep_raker/src/raking/references.rs b/quickpeep_raker/src/raking/references.rs index 5e243b2..5bb0cef 100644 --- a/quickpeep_raker/src/raking/references.rs +++ b/quickpeep_raker/src/raking/references.rs @@ -1,10 +1,10 @@ use crate::raking::UrlRaked; +use log::debug; use quickpeep_densedoc::DenseTree; use quickpeep_structs::rake_entries::{RakedReference, ReferenceKind}; use quickpeep_utils::dates::date_to_quickpeep_days; use reqwest::Url; use std::collections::BTreeSet; -use log::debug; pub fn find_references( doc: &Vec,