Add minimum free space cutoff feature for the raker

This commit is contained in:
Olivier 'reivilibre' 2022-04-03 10:18:41 +01:00
parent f335d0daaa
commit 6c2ff9daec
6 changed files with 125 additions and 10 deletions

24
Cargo.lock generated
View File

@ -451,6 +451,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "bytesize"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c58ec36aac5066d5ca17df51b3e70279f5670a72102f5752cb7e7c856adfc70"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "bzip2" name = "bzip2"
version = "0.4.3" version = "0.4.3"
@ -3031,6 +3040,19 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54"
[[package]]
name = "nix"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f866317acbd3a240710c63f065ffb1e4fd466259045ccb504130b7f668f35c6"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset",
]
[[package]] [[package]]
name = "nodrop" name = "nodrop"
version = "0.1.14" version = "0.1.14"
@ -3756,6 +3778,7 @@ dependencies = [
"arc-interner", "arc-interner",
"bare-metrics-recorder", "bare-metrics-recorder",
"bytes", "bytes",
"bytesize",
"chrono", "chrono",
"clap", "clap",
"colour", "colour",
@ -3779,6 +3802,7 @@ dependencies = [
"metrics 0.18.1", "metrics 0.18.1",
"metrics-exporter-prometheus", "metrics-exporter-prometheus",
"metrics-process-promstyle", "metrics-process-promstyle",
"nix",
"ouroboros", "ouroboros",
"publicsuffix", "publicsuffix",
"quickpeep_densedoc", "quickpeep_densedoc",

View File

@ -8,3 +8,6 @@ prometheus = "127.0.0.1:9774"
# bare_metrics = true # bare_metrics = true
[pack_emitter] [pack_emitter]
# size_cutoff = "4 GiB"
# zstd_level = 16
# min_free_space = "1 GiB"

View File

@ -28,6 +28,7 @@ serde_bare = "0.5.0"
serde_json = "1.0.79" serde_json = "1.0.79"
toml = "0.5.8" toml = "0.5.8"
bytesize = {version = "1.1.0", features = ["serde"]}
### Dates ### Dates
chrono = "0.4.19" chrono = "0.4.19"
@ -56,6 +57,7 @@ diplomatic-bag = "0.2.0"
arc-interner = "0.7.0" arc-interner = "0.7.0"
smartstring = "1.0.0" smartstring = "1.0.0"
signal-hook = "0.3.13" signal-hook = "0.3.13"
nix = "0.23.1"
### Raking helpers ### Raking helpers
# HTTP Requests # HTTP Requests

View File

@ -152,16 +152,29 @@ pub async fn main() -> anyhow::Result<()> {
let (rejections_tx, rejections_rx) = mpsc::channel(32); let (rejections_tx, rejections_rx) = mpsc::channel(32);
let (icons_tx, icons_rx) = mpsc::channel(32); let (icons_tx, icons_rx) = mpsc::channel(32);
let graceful_stop = Arc::new(AtomicBool::new(false));
let graceful_stop_notify = Arc::new(Notify::new());
let mut emitters = Vec::with_capacity(3); let mut emitters = Vec::with_capacity(3);
{ {
let emit_dir = config.emit_dir.clone(); let emit_dir = config.emit_dir.clone();
let settings = config.pack_emitter.clone(); let settings = config.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push( emitters.push(
std::thread::Builder::new() std::thread::Builder::new()
.name("pages emitter".to_owned()) .name("pages emitter".to_owned())
.spawn(move || -> anyhow::Result<()> { .spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "pages", SCHEMA_RAKED_PAGES, pages_rx, &settings)?; pack_emitter(
&emit_dir,
"pages",
SCHEMA_RAKED_PAGES,
pages_rx,
&settings,
stop,
notify,
)?;
Ok(()) Ok(())
})?, })?,
); );
@ -170,6 +183,8 @@ pub async fn main() -> anyhow::Result<()> {
{ {
let emit_dir = config.emit_dir.clone(); let emit_dir = config.emit_dir.clone();
let settings = config.pack_emitter.clone(); let settings = config.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push( emitters.push(
std::thread::Builder::new() std::thread::Builder::new()
.name("refs emitter".to_owned()) .name("refs emitter".to_owned())
@ -180,6 +195,8 @@ pub async fn main() -> anyhow::Result<()> {
SCHEMA_RAKED_REFERENCES, SCHEMA_RAKED_REFERENCES,
refs_rx, refs_rx,
&settings, &settings,
stop,
notify,
)?; )?;
Ok(()) Ok(())
})?, })?,
@ -189,6 +206,8 @@ pub async fn main() -> anyhow::Result<()> {
{ {
let emit_dir = config.emit_dir.clone(); let emit_dir = config.emit_dir.clone();
let settings = config.pack_emitter.clone(); let settings = config.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push( emitters.push(
std::thread::Builder::new() std::thread::Builder::new()
.name("rejections emitter".to_owned()) .name("rejections emitter".to_owned())
@ -199,6 +218,8 @@ pub async fn main() -> anyhow::Result<()> {
SCHEMA_RAKED_REJECTIONS, SCHEMA_RAKED_REJECTIONS,
rejections_rx, rejections_rx,
&settings, &settings,
stop,
notify,
)?; )?;
Ok(()) Ok(())
})?, })?,
@ -208,11 +229,21 @@ pub async fn main() -> anyhow::Result<()> {
{ {
let emit_dir = config.emit_dir.clone(); let emit_dir = config.emit_dir.clone();
let settings = config.pack_emitter.clone(); let settings = config.pack_emitter.clone();
let stop = graceful_stop.clone();
let notify = graceful_stop_notify.clone();
emitters.push( emitters.push(
std::thread::Builder::new() std::thread::Builder::new()
.name("icons emitter".to_owned()) .name("icons emitter".to_owned())
.spawn(move || -> anyhow::Result<()> { .spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "icons", SCHEMA_RAKED_ICONS, icons_rx, &settings)?; pack_emitter(
&emit_dir,
"icons",
SCHEMA_RAKED_ICONS,
icons_rx,
&settings,
stop,
notify,
)?;
Ok(()) Ok(())
})?, })?,
); );
@ -225,7 +256,6 @@ pub async fn main() -> anyhow::Result<()> {
icons: icons_tx, icons: icons_tx,
}; };
let graceful_stop = Arc::new(AtomicBool::new(false));
let task_context = TaskContext { let task_context = TaskContext {
store: store.clone(), store: store.clone(),
client, client,
@ -236,7 +266,7 @@ pub async fn main() -> anyhow::Result<()> {
semaphore, semaphore,
submission, submission,
graceful_stop, graceful_stop,
notify: Arc::new(Default::default()), notify: graceful_stop_notify,
}; };
let mut tasks = Vec::with_capacity(num_tasks as usize); let mut tasks = Vec::with_capacity(num_tasks as usize);

View File

@ -1,5 +1,6 @@
use bytesize::ByteSize;
use chrono::Utc; use chrono::Utc;
use log::warn; use log::{error, warn};
use metrics::{describe_counter, register_counter, Counter, Unit}; use metrics::{describe_counter, register_counter, Counter, Unit};
use quickpeep_structs::rake_entries::PackRecord; use quickpeep_structs::rake_entries::PackRecord;
use reqwest::Url; use reqwest::Url;
@ -8,23 +9,31 @@ use std::borrow::Cow;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Write; use std::io::Write;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::sync::Notify;
/// Size at which a new pack file will be created. 4 GiB, will later be configurable? /// Size at which a new pack file will be created. 4 GiB, will later be configurable?
pub const SUGGESTED_SIZE_CUTOFF: usize = 4 * 1024 * 1024 * 1024; pub const SUGGESTED_SIZE_CUTOFF: ByteSize = ByteSize::gib(4);
/// The Zstd compression level to use. 16 is quite high, but we really want the compact file sizes; /// The Zstd compression level to use. 16 is quite high, but we really want the compact file sizes;
/// willing to pay quite a lot in compression speed. /// willing to pay quite a lot in compression speed.
/// If this turns out to be too slow, should probably go down to 10 or 7. /// If this turns out to be too slow, should probably go down to 10 or 7.
pub const SUGGESTED_ZSTD_LEVEL: i32 = 16; pub const SUGGESTED_ZSTD_LEVEL: i32 = 16;
pub const SUGGESTED_FREE_SPACE_CUTOFF: ByteSize = ByteSize::gib(1);
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct PackEmitterSettings { pub struct PackEmitterSettings {
#[serde(default)] #[serde(default)]
pub size_cutoff: Option<usize>, pub size_cutoff: Option<ByteSize>,
#[serde(default)] #[serde(default)]
pub zstd_level: Option<i32>, pub zstd_level: Option<i32>,
/// Raking will stop when we have less than this amount of free space.
#[serde(default)]
pub min_free_space: Option<ByteSize>,
} }
/// An emitter for some kind of pack. /// An emitter for some kind of pack.
@ -35,6 +44,8 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
schema_name: &'static str, schema_name: &'static str,
mut rx: Receiver<(Url, T)>, mut rx: Receiver<(Url, T)>,
settings: &PackEmitterSettings, settings: &PackEmitterSettings,
shutdown: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
describe_counter!( describe_counter!(
"emitted_pack_bytes", "emitted_pack_bytes",
@ -63,7 +74,15 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
} }
}; };
if !pack_emitter_to_file(&new_pack_file_path, &mut rx, name, schema_name, settings)? { if !pack_emitter_to_file(
&new_pack_file_path,
&mut rx,
name,
schema_name,
settings,
shutdown.clone(),
shutdown_notify.clone(),
)? {
// File wasn't filled; the receiver was exhausted (we're shutting down). // File wasn't filled; the receiver was exhausted (we're shutting down).
break; break;
} }
@ -78,6 +97,8 @@ fn pack_emitter_to_file<T: Serialize>(
name: &str, name: &str,
schema_name: &'static str, schema_name: &'static str,
settings: &PackEmitterSettings, settings: &PackEmitterSettings,
shutdown: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
let file = OpenOptions::new().create_new(true).write(true).open(file)?; let file = OpenOptions::new().create_new(true).write(true).open(file)?;
let mut compressor = let mut compressor =
@ -92,6 +113,8 @@ fn pack_emitter_to_file<T: Serialize>(
serde_bare::to_writer(&mut ser_buf, schema_name)?; serde_bare::to_writer(&mut ser_buf, schema_name)?;
let mut doc_count = 0;
while let Some((url, record)) = rx.blocking_recv() { while let Some((url, record)) = rx.blocking_recv() {
serde_bare::to_writer( serde_bare::to_writer(
&mut ser_buf, &mut ser_buf,
@ -106,12 +129,45 @@ fn pack_emitter_to_file<T: Serialize>(
byte_counter.increment(ser_buf.len() as u64); byte_counter.increment(ser_buf.len() as u64);
record_counter.increment(1); record_counter.increment(1);
if length_so_far > settings.size_cutoff.unwrap_or(SUGGESTED_SIZE_CUTOFF) { if length_so_far as u64
> settings
.size_cutoff
.unwrap_or(SUGGESTED_SIZE_CUTOFF)
.as_u64()
{
// MUST CALL // MUST CALL
compressor.finish()?.flush()?; compressor.finish()?.flush()?;
return Ok(true); return Ok(true);
} }
if {
doc_count += 1;
doc_count
} % 32
== 0
{
let fs_stats = nix::sys::statvfs::fstatvfs(compressor.get_ref())?;
let free_bytes: u64 = fs_stats.block_size() * fs_stats.blocks_available();
if free_bytes
< settings
.min_free_space
.unwrap_or(SUGGESTED_FREE_SPACE_CUTOFF)
.as_u64()
{
// Signal to shut down. We must still keep processing incoming entries so none get lost,
// though.
if !shutdown.load(Ordering::SeqCst) {
error!(
"Running low on disk space ({}); shutting down.",
ByteSize::b(free_bytes)
);
shutdown.store(true, Ordering::SeqCst);
shutdown_notify.notify_waiters();
}
}
}
ser_buf.clear(); ser_buf.clear();
} }

View File

@ -1,10 +1,10 @@
use crate::raking::UrlRaked; use crate::raking::UrlRaked;
use log::debug;
use quickpeep_densedoc::DenseTree; use quickpeep_densedoc::DenseTree;
use quickpeep_structs::rake_entries::{RakedReference, ReferenceKind}; use quickpeep_structs::rake_entries::{RakedReference, ReferenceKind};
use quickpeep_utils::dates::date_to_quickpeep_days; use quickpeep_utils::dates::date_to_quickpeep_days;
use reqwest::Url; use reqwest::Url;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use log::debug;
pub fn find_references( pub fn find_references(
doc: &Vec<DenseTree>, doc: &Vec<DenseTree>,