diff --git a/Cargo.lock b/Cargo.lock index 21eea2e..f8e285c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,16 +185,6 @@ dependencies = [ "tiny-keccak", ] -[[package]] -name = "crossbeam-channel" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" -dependencies = [ - "crossbeam-utils 0.7.2", - "maybe-uninit", -] - [[package]] name = "crossbeam-channel" version = "0.5.1" @@ -202,7 +192,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils 0.8.5", + "crossbeam-utils", ] [[package]] @@ -213,7 +203,7 @@ checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" dependencies = [ "cfg-if 1.0.0", "crossbeam-epoch", - "crossbeam-utils 0.8.5", + "crossbeam-utils", ] [[package]] @@ -223,23 +213,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils 0.8.5", + "crossbeam-utils", "lazy_static", "memoffset", "scopeguard", ] -[[package]] -name = "crossbeam-utils" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" -dependencies = [ - "autocfg", - "cfg-if 0.1.10", - "lazy_static", -] - [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -275,7 +254,7 @@ dependencies = [ "byteorder", "chrono", "clap", - "crossbeam-channel 0.4.4", + "crossbeam-channel", "env_logger", "glob", "hostname", @@ -292,6 +271,17 @@ dependencies = [ "zstd", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -565,12 +555,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" -[[package]] -name = "maybe-uninit" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" - [[package]] name = "memchr" version = "2.4.0" @@ -792,9 +776,9 @@ version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" dependencies = [ - "crossbeam-channel 0.5.1", + "crossbeam-channel", "crossbeam-deque", - "crossbeam-utils 0.8.5", + "crossbeam-utils", "lazy_static", "num_cpus", ] @@ -1205,8 +1189,9 @@ dependencies = [ "byteorder", "chrono", "clap", - "crossbeam-channel 0.4.4", - "crossbeam-utils 0.8.5", + "crossbeam-channel", + "crossbeam-utils", + "derivative", "env_logger", "fastcdc", "glob", diff --git a/datman/Cargo.toml b/datman/Cargo.toml index a0e176d..f1c0a50 100644 --- a/datman/Cargo.toml +++ b/datman/Cargo.toml @@ -12,7 +12,7 @@ description = "A chunked and deduplicated backup system using Yama" [dependencies] clap = "= 3.0.0-beta.5" -crossbeam-channel = "0.4" +crossbeam-channel = "0.5.1" anyhow = "1.0" thiserror = "1.0" serde = { version = "1.0.104", features = ["derive"] } diff --git a/yama/Cargo.toml b/yama/Cargo.toml index cabc19f..359ff68 100644 --- a/yama/Cargo.toml +++ b/yama/Cargo.toml @@ -20,8 +20,8 @@ twox-hash = "1.5.0" serde = { version = "1.0.104", features = ["derive"] } serde_bare = "0.3.0" users = "0.9.1" -crossbeam-channel = "0.4" -crossbeam-utils = "0.8.1" +crossbeam-channel = "0.5.1" +crossbeam-utils = "0.8.5" toml = "0.5.5" glob = "0.3.0" nix = "0.17.0" @@ -38,6 +38,7 @@ rayon = "1.5.0" rusqlite = "0.24.2" chrono = "0.4.19" rustyline = "7.1.0" +derivative = "2.2.0" [dev-dependencies] diff --git a/yama/src/operations/checking.rs b/yama/src/operations/checking.rs index a09eb7e..4532792 100644 --- a/yama/src/operations/checking.rs +++ b/yama/src/operations/checking.rs @@ -18,8 +18,9 @@ along with Yama. If not, see . use crate::chunking::RecursiveUnchunker; use crate::commands::retrieve_tree_node; use crate::definitions::{ChunkId, TreeNode}; -use crate::pile::{Keyspace, Pile, RawPile}; +use crate::pile::{ControllerMessage, Keyspace, Pile, RawPile, StoragePipelineSettings}; use anyhow::bail; +use crossbeam_channel::Sender; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use log::{error, info, warn}; use std::collections::HashSet; @@ -121,6 +122,15 @@ impl RawPile for VacuumRawPile { fn check_lowlevel(&self) -> anyhow::Result { self.underlying.check_lowlevel() } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + self.underlying + .build_storage_pipeline(settings, controller_send) + } } /// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time. diff --git a/yama/src/pile.rs b/yama/src/pile.rs index b7afaca..df2ec2d 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize}; use crate::chunking::calculate_chunkid; use crate::definitions::{ChunkId, PointerData}; +use crossbeam_channel::Sender; use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, Condvar, Mutex}; @@ -75,7 +76,21 @@ pub struct DebugStatistics { pub total_chunk_size: u64, } -pub trait RawPile: Send + Sync + Debug { +#[derive(Debug, Clone)] +pub struct StoragePipelineSettings { + pub num_compressors: u32, + pub compressor_input_bound: u32, + pub writer_input_bound: u32, +} + +pub enum ControllerMessage { + Failure { + worker_id: Arc, + error_message: String, + }, +} + +pub trait RawPile: Send + Sync + Debug + 'static { // TODO expose verification errors? fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result; fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result>>; @@ -99,6 +114,12 @@ pub trait RawPile: Send + Sync + Debug { fn debug_statistics(&self) -> anyhow::Result> { Ok(None) } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>>; } impl RawPile for Box { @@ -129,6 +150,15 @@ impl RawPile for Box { fn debug_statistics(&self) -> anyhow::Result> { self.as_ref().debug_statistics() } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + self.as_ref() + .build_storage_pipeline(settings, controller_send) + } } impl RawPile for Arc { @@ -159,6 +189,15 @@ impl RawPile for Arc { fn debug_statistics(&self) -> anyhow::Result> { self.as_ref().debug_statistics() } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + self.as_ref() + .build_storage_pipeline(settings, controller_send) + } } #[derive(Debug)] diff --git a/yama/src/pile/compression.rs b/yama/src/pile/compression.rs index 25369fb..3a80588 100644 --- a/yama/src/pile/compression.rs +++ b/yama/src/pile/compression.rs @@ -21,14 +21,16 @@ use std::thread::JoinHandle; use anyhow::anyhow; use crossbeam_channel::{Receiver, Sender}; +use derivative::Derivative; use log::error; use zstd::block::{Compressor, Decompressor}; -use crate::pile::{DebugStatistics, Keyspace, RawPile}; +use crate::definitions::ChunkId; +use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings}; pub const DECOMPRESS_CAPACITY: usize = 32 * 1024 * 1024; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct CompressionSettings { /// Raw dictionary to pass to Zstd for compression and decompression pub dictionary: Arc>, @@ -40,11 +42,15 @@ pub struct CompressionSettings { pub num_decompressors: u32, } -#[derive(Debug)] +#[derive(Debug, Derivative)] +#[derivative(Clone(bound = ""))] +// we need to use derivative's Clone impl because Arc causes R to have a bound on Clone +// even though that's not needed. https://github.com/rust-lang/rust/issues/26925 pub struct RawPileCompressor { - underlying: R, - compressor: Sender<(Vec, Sender>)>, - decompressor: Sender<(Vec, Sender>)>, + underlying: Arc, + compressor: Option, Sender>)>>, + decompressor: Option, Sender>)>>, + settings: Arc, } impl RawPileCompressor { @@ -52,6 +58,20 @@ impl RawPileCompressor { underlying: R, settings: CompressionSettings, ) -> anyhow::Result<(Self, Vec>)> { + if settings.num_compressors == 0 && settings.num_decompressors == 0 { + // optimisation for when we're only building a pipeline: we don't want to + return Ok(( + RawPileCompressor { + underlying: Arc::new(underlying), + compressor: None, + decompressor: None, + + settings: Arc::new(settings), + }, + Vec::with_capacity(0), + )); + } + let (com_s, com_r) = crossbeam_channel::bounded(4); let (dec_s, dec_r) = crossbeam_channel::bounded(4); @@ -85,9 +105,10 @@ impl RawPileCompressor { Ok(( RawPileCompressor { - underlying, - compressor: com_s, - decompressor: dec_s, + underlying: Arc::new(underlying), + compressor: Some(com_s), + decompressor: Some(dec_s), + settings: Arc::new(settings), }, handles, )) @@ -124,6 +145,8 @@ impl RawPileCompressor { fn decompress(&self, data: &[u8]) -> anyhow::Result> { let (ret_s, ret_r) = crossbeam_channel::bounded(0); self.decompressor + .as_ref() + .expect("No decompressors configured") .send((data.to_vec(), ret_s)) .or(Err(anyhow!("couldn't send to decompressor")))?; @@ -133,11 +156,23 @@ impl RawPileCompressor { fn compress(&self, compressed_data: &[u8]) -> anyhow::Result> { let (ret_s, ret_r) = crossbeam_channel::bounded(0); self.compressor + .as_ref() + .expect("No compressors configured") .send((compressed_data.to_vec(), ret_s)) .or(Err(anyhow!("couldn't send to compressor")))?; Ok(ret_r.recv().or(Err(anyhow!("couldn't receive result")))?) } + + fn storage_pipeline_worker( + &self, + next_stage: Sender<(ChunkId, Vec)>, + input: Receiver<(ChunkId, Vec)>, + controller_send: &Sender, + ) -> anyhow::Result<()> { + todo!(); + Ok(()) + } } impl RawPile for RawPileCompressor { @@ -177,4 +212,40 @@ impl RawPile for RawPileCompressor { fn debug_statistics(&self) -> anyhow::Result> { self.underlying.debug_statistics() } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + // this one should have a few threads behind it! yarr! + let subsequent_pipeline = self + .underlying + .build_storage_pipeline(settings.clone(), controller_send.clone())?; + + let (input_to_this_stage, receiver) = + crossbeam_channel::bounded(settings.compressor_input_bound as usize); + + for compressor_number in 0..settings.num_compressors { + let subsequent_pipeline = subsequent_pipeline.clone(); + let receiver = receiver.clone(); + let controller_send = controller_send.clone(); + let this = (*self).clone(); + thread::spawn(move || { + let worker_id = Arc::new(format!("compressor-{}", compressor_number)); + if let Err(err) = + this.storage_pipeline_worker(subsequent_pipeline, receiver, &controller_send) + { + controller_send + .send(ControllerMessage::Failure { + worker_id, + error_message: format!("err {:?}", err), + }) + .expect("This is BAD: failed to send failure message to controller."); + } + }); + } + + Ok(input_to_this_stage) + } } diff --git a/yama/src/pile/encryption.rs b/yama/src/pile/encryption.rs index 9f27a66..0b61b75 100644 --- a/yama/src/pile/encryption.rs +++ b/yama/src/pile/encryption.rs @@ -20,7 +20,9 @@ use log::warn; use sodiumoxide::crypto::secretbox; use sodiumoxide::crypto::secretbox::{Key, Nonce, NONCEBYTES}; -use crate::pile::{Keyspace, RawPile}; +use crate::definitions::ChunkId; +use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings}; +use crossbeam_channel::Sender; /// A RawPile that provides encryption of chunk contents. /// Please note that keys are not currently encrypted, so this scheme is not CPA-secure. @@ -109,4 +111,12 @@ impl RawPile for RawPileEncryptor { fn check_lowlevel(&self) -> anyhow::Result { self.underlying.check_lowlevel() } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + todo!() + } } diff --git a/yama/src/pile/integrity.rs b/yama/src/pile/integrity.rs index accf18d..6cabb7c 100644 --- a/yama/src/pile/integrity.rs +++ b/yama/src/pile/integrity.rs @@ -19,9 +19,10 @@ use std::hash::Hasher; use thiserror::Error; -use crate::definitions::XXH64_SEED; -use crate::pile::{DebugStatistics, Keyspace, RawPile}; +use crate::definitions::{ChunkId, XXH64_SEED}; +use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings}; use crate::utils::bytes_to_hexstring; +use crossbeam_channel::Sender; /// This RawPile enables checking the integrity of stored chunks. /// This is done by storing a hash along with the chunk contents, which can later be verified. @@ -113,4 +114,12 @@ impl RawPile for RawPileIntegrityChecker { fn debug_statistics(&self) -> anyhow::Result> { self.underlying.debug_statistics() } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + todo!() + } } diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index 0246b63..5cf597b 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -18,11 +18,11 @@ along with Yama. If not, see . use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::convert::{TryFrom, TryInto}; -use std::fs; use std::fs::{read_dir, File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Condvar, Mutex}; +use std::{fs, thread}; use anyhow::{bail, Context}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; @@ -32,9 +32,11 @@ use rusqlite::{params, Error}; use rusqlite::{Connection, OptionalExtension}; use crate::definitions::ChunkId; -use crate::pile::{DebugStatistics, Keyspace, RawPile}; +use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings}; use crate::utils::bytes_to_hexstring; +use crossbeam_channel::{Receiver, Sender}; use rusqlite::ffi::ErrorCode::ConstraintViolation; +use std::time::Duration; /// Bloblogs will not be reused if they are already 2 GiB large. pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024; @@ -193,11 +195,11 @@ impl Inner { /// Because random access is important for performance, an additional SQLite database is used /// as a map from chunk IDs to their positions in the blob logs, allowing readers to seek to the /// appropriate place and read a chunk randomly. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SqliteBloblogPile { inner: Arc>, path: PathBuf, - writers_reach_zero: Condvar, + writers_reach_zero: Arc, should_batch_pointer_writes: bool, } @@ -367,6 +369,79 @@ impl SqliteBloblogPile { let mut inner = self.inner.lock().unwrap(); inner.flush() } + + fn storage_pipeline_worker( + &self, + incoming: Receiver<(ChunkId, Vec)>, + controller_sender: &Sender, + ) -> anyhow::Result<()> { + // can hold on to the same bloblog as long as we'd like! + const POINTERS_BUFFER_SIZE: usize = 256; + let mut pointers_buffered = Vec::with_capacity(POINTERS_BUFFER_SIZE); + + fn flush_pointers( + this: &SqliteBloblogPile, + pointers_buffered: &mut Vec, + ) -> anyhow::Result<()> { + todo!() + } + + fn write_blob( + this: &SqliteBloblogPile, + bloblog_id: BloblogId, + bloblog: &mut Bloblog, + pointers_buffered: &mut Vec, + (chunk_id, chunk): (ChunkId, Vec), + ) -> anyhow::Result<()> { + let offset = bloblog.write_blob(&chunk_id, &chunk)?; + let pointer = BloblogPointer { + bloblog: bloblog_id, + offset, + }; + pointers_buffered.push(pointer); + + if pointers_buffered.len() >= POINTERS_BUFFER_SIZE { + flush_pointers(this, pointers_buffered)?; + } + + Ok(()) + } + + while let Ok(chunk) = incoming.recv() { + let (bloblog_id, bloglog_mutex) = self.get_writing_bloblog()?; + let mut bloblog = bloglog_mutex.lock().expect("Failed to lock bloblog?"); + write_blob( + self, + bloblog_id, + &mut bloblog, + &mut pointers_buffered, + chunk, + )?; + + while let Ok(chunk) = incoming.recv_timeout(Duration::from_secs(5)) { + write_blob( + self, + bloblog_id, + &mut bloblog, + &mut pointers_buffered, + chunk, + )?; + if bloblog.filesize()? > MAX_BLOBLOG_REUSE_SIZE { + // get a new bloblog to write with. + break; + } + } + + drop(bloblog); + self.return_writing_bloblog(bloblog_id, bloglog_mutex)?; + } + + flush_pointers(self, &mut pointers_buffered)?; + + // we MUST have flushed ALL the pointers by now. + assert!(pointers_buffered.is_empty()); + Ok(()) + } } impl Drop for SqliteBloblogPile { @@ -594,6 +669,29 @@ impl RawPile for SqliteBloblogPile { total_chunk_size, })) } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + let (sender, incoming) = crossbeam_channel::bounded(settings.writer_input_bound as usize); + + let this = self.clone(); + + thread::spawn(move || { + let worker_id = Arc::new(format!("bloblogwriter")); + if let Err(err) = this.storage_pipeline_worker(incoming, &controller_send) { + controller_send + .send(ControllerMessage::Failure { + worker_id, + error_message: format!("err {:?}", err), + }) + .expect("This is BAD: failed to send failure message to controller."); + } + }); + todo!() + } } struct KeyIterator { diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index ea4870f..9b77357 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -8,7 +8,8 @@ use anyhow::anyhow; use crossbeam_channel::{Receiver, Sender}; use log::{error, info}; -use crate::pile::{Keyspace, RawPile}; +use crate::definitions::ChunkId; +use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; @@ -303,6 +304,15 @@ impl RawPile for Requester { ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for LowLevelCheck.")), } } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + // this one is a little bit more complex. + todo!() + } } pub struct ListKeyIterator {