Emit rakepacks from the raker

This commit is contained in:
Olivier 'reivilibre' 2022-03-21 19:07:35 +00:00
parent f60031a462
commit 71c22daf0d
7 changed files with 198 additions and 2 deletions

42
Cargo.lock generated
View File

@ -389,6 +389,9 @@ name = "cc"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@ -1321,6 +1324,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.56"
@ -3045,6 +3057,7 @@ dependencies = [
"smartstring",
"tokio",
"toml",
"zstd",
]
[[package]]
@ -4453,3 +4466,32 @@ dependencies = [
"thiserror",
"time",
]
[[package]]
name = "zstd"
version = "0.11.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View File

@ -32,6 +32,8 @@ chrono = "0.4.19"
libmdbx = "0.1.1"
# Used for FFI. Must match the version in libmdbx.
mdbx-sys = "0.11.4-git.20210105"
# For compression of emitted packs. 0.11.1+zstd.1.5.2
zstd = "0.11.1"
### Utils
lazy_static = "1.4.0"

View File

@ -18,6 +18,7 @@ use tokio::sync::{mpsc, oneshot, Semaphore};
use tokio::time::MissedTickBehavior;
use quickpeep_raker::config;
use quickpeep_raker::rakepack_emitter::pack_emitter;
use quickpeep_raker::raking::analysis::{preload_adblock_engine, IpSet};
use quickpeep_raker::raking::page_extraction::PageExtractionService;
use quickpeep_raker::raking::task::{TaskContext, TaskResultSubmission};
@ -124,6 +125,47 @@ pub async fn main() -> anyhow::Result<()> {
let (refs_tx, refs_rx) = mpsc::channel(32);
let (rejections_tx, rejections_rx) = mpsc::channel(32);
let mut emitters = Vec::with_capacity(3);
{
let emit_dir = config.emit_dir.clone();
let settings = config.pack_emitter.clone();
emitters.push(
std::thread::Builder::new()
.name("pages emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "pages", pages_rx, &settings)?;
Ok(())
})?,
);
}
{
let emit_dir = config.emit_dir.clone();
let settings = config.pack_emitter.clone();
emitters.push(
std::thread::Builder::new()
.name("refs emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "refs", refs_rx, &settings)?;
Ok(())
})?,
);
}
{
let emit_dir = config.emit_dir.clone();
let settings = config.pack_emitter.clone();
emitters.push(
std::thread::Builder::new()
.name("rejections emitter".to_owned())
.spawn(move || -> anyhow::Result<()> {
pack_emitter(&emit_dir, "rejections", rejections_rx, &settings)?;
Ok(())
})?,
);
}
let submission = TaskResultSubmission {
pages: pages_tx,
references: refs_tx,

View File

@ -1,3 +1,4 @@
use crate::rakepack_emitter::PackEmitterSettings;
use anyhow::Context;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
@ -20,6 +21,8 @@ pub struct RakerConfig {
pub emit_dir: PathBuf,
pub metrics: MetricsConfig,
pub pack_emitter: PackEmitterSettings,
}
impl RakerConfig {

View File

@ -4,5 +4,7 @@ pub mod config;
pub mod storage;
pub mod rakepack_emitter;
#[cfg(test)]
mod test;

View File

@ -0,0 +1,105 @@
use chrono::Utc;
use log::warn;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::fs::OpenOptions;
use std::io::Write;
use std::path::Path;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
/// Size at which a new pack file will be created. 256 MiB, will later be configurable?
pub const SUGGESTED_SIZE_CUTOFF: usize = 256 * 1024 * 1024;
/// The Zstd compression level to use. 16 is quite high, but we really want the compact file sizes;
/// willing to pay quite a lot in compression speed.
/// If this turns out to be too slow, should probably go down to 10 or 7.
pub const SUGGESTED_ZSTD_LEVEL: i32 = 16;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct PackEmitterSettings {
#[serde(default)]
pub size_cutoff: Option<usize>,
#[serde(default)]
pub zstd_level: Option<i32>,
}
/// An emitter for some kind of pack.
/// Usual types of T: RakedPageEntry, RakedReferrerEntry, PermanentFailure
pub fn pack_emitter<T: Serialize + Send + 'static>(
directory: &Path,
name: &str,
mut rx: Receiver<(Url, T)>,
settings: &PackEmitterSettings,
) -> anyhow::Result<()> {
loop {
let now = Utc::now();
// 2022-01-01 01:01:01
let new_pack_file_path = loop {
let new_pack_file_path =
directory.join(format!("{}.{}.pack", now.format("%F_%T"), name));
if new_pack_file_path.exists() {
warn!(
"{:?} already exists; sleeping to generate new timestamp.",
new_pack_file_path
);
std::thread::sleep(Duration::from_secs(2));
} else {
break new_pack_file_path;
}
};
if !pack_emitter_to_file(&new_pack_file_path, &mut rx, settings)? {
// File wasn't filled; the receiver was exhausted (we're shutting down).
break;
}
}
Ok(())
}
#[derive(Serialize)]
struct PackRecord<'a, T> {
url: &'a str,
record: T,
}
/// Returns: true if the file was filled (size cutoff reached), false if the receiver was exhausted.
fn pack_emitter_to_file<T: Serialize>(
file: &Path,
rx: &mut Receiver<(Url, T)>,
settings: &PackEmitterSettings,
) -> anyhow::Result<bool> {
let file = OpenOptions::new().create_new(true).write(true).open(file)?;
let mut compressor =
zstd::stream::Encoder::new(file, settings.zstd_level.unwrap_or(SUGGESTED_ZSTD_LEVEL))?;
let mut ser_buf = Vec::new();
let mut length_so_far = 0usize;
while let Some((url, record)) = rx.blocking_recv() {
serde_bare::to_writer(
&mut ser_buf,
&PackRecord {
url: url.as_str(),
record,
},
)?;
compressor.write_all(&ser_buf)?;
length_so_far += ser_buf.len();
if length_so_far > settings.size_cutoff.unwrap_or(SUGGESTED_SIZE_CUTOFF) {
// MUST CALL
compressor.finish()?.flush()?;
return Ok(true);
}
ser_buf.clear();
}
// MUST CALL
compressor.finish()?.flush()?;
// Exhausted
Ok(false)
}

View File

@ -76,7 +76,7 @@ pub struct TemporaryFailure {
pub backoff_sec: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PermanentFailure {
pub reason: PermanentFailureReason,
}
@ -89,7 +89,7 @@ pub enum TemporaryFailureReason {
ExcruciatingCrawlDelay(u64),
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum PermanentFailureReason {
ResourceDenied(u16),
DeniedToRobots,