Start an attempt to create a high-performance chunking pipeline

This commit is contained in:
Olivier 'reivilibre' 2021-11-18 09:21:55 +00:00
parent ccb50f2dd9
commit cc60ae88a4
10 changed files with 290 additions and 57 deletions

55
Cargo.lock generated
View File

@ -185,16 +185,6 @@ dependencies = [
"tiny-keccak",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
dependencies = [
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
@ -202,7 +192,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils 0.8.5",
"crossbeam-utils",
]
[[package]]
@ -213,7 +203,7 @@ checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-epoch",
"crossbeam-utils 0.8.5",
"crossbeam-utils",
]
[[package]]
@ -223,23 +213,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils 0.8.5",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg",
"cfg-if 0.1.10",
"lazy_static",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
@ -275,7 +254,7 @@ dependencies = [
"byteorder",
"chrono",
"clap",
"crossbeam-channel 0.4.4",
"crossbeam-channel",
"env_logger",
"glob",
"hostname",
@ -292,6 +271,17 @@ dependencies = [
"zstd",
]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "dirs-next"
version = "2.0.0"
@ -565,12 +555,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]]
name = "memchr"
version = "2.4.0"
@ -792,9 +776,9 @@ version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
dependencies = [
"crossbeam-channel 0.5.1",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils 0.8.5",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]
@ -1205,8 +1189,9 @@ dependencies = [
"byteorder",
"chrono",
"clap",
"crossbeam-channel 0.4.4",
"crossbeam-utils 0.8.5",
"crossbeam-channel",
"crossbeam-utils",
"derivative",
"env_logger",
"fastcdc",
"glob",

View File

@ -12,7 +12,7 @@ description = "A chunked and deduplicated backup system using Yama"
[dependencies]
clap = "= 3.0.0-beta.5"
crossbeam-channel = "0.4"
crossbeam-channel = "0.5.1"
anyhow = "1.0"
thiserror = "1.0"
serde = { version = "1.0.104", features = ["derive"] }

View File

@ -20,8 +20,8 @@ twox-hash = "1.5.0"
serde = { version = "1.0.104", features = ["derive"] }
serde_bare = "0.3.0"
users = "0.9.1"
crossbeam-channel = "0.4"
crossbeam-utils = "0.8.1"
crossbeam-channel = "0.5.1"
crossbeam-utils = "0.8.5"
toml = "0.5.5"
glob = "0.3.0"
nix = "0.17.0"
@ -38,6 +38,7 @@ rayon = "1.5.0"
rusqlite = "0.24.2"
chrono = "0.4.19"
rustyline = "7.1.0"
derivative = "2.2.0"
[dev-dependencies]

View File

@ -18,8 +18,9 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
use crate::chunking::RecursiveUnchunker;
use crate::commands::retrieve_tree_node;
use crate::definitions::{ChunkId, TreeNode};
use crate::pile::{Keyspace, Pile, RawPile};
use crate::pile::{ControllerMessage, Keyspace, Pile, RawPile, StoragePipelineSettings};
use anyhow::bail;
use crossbeam_channel::Sender;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use log::{error, info, warn};
use std::collections::HashSet;
@ -121,6 +122,15 @@ impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
fn check_lowlevel(&self) -> anyhow::Result<bool> {
self.underlying.check_lowlevel()
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
self.underlying
.build_storage_pipeline(settings, controller_send)
}
}
/// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time.

View File

@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize};
use crate::chunking::calculate_chunkid;
use crate::definitions::{ChunkId, PointerData};
use crossbeam_channel::Sender;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Condvar, Mutex};
@ -75,7 +76,21 @@ pub struct DebugStatistics {
pub total_chunk_size: u64,
}
pub trait RawPile: Send + Sync + Debug {
#[derive(Debug, Clone)]
pub struct StoragePipelineSettings {
pub num_compressors: u32,
pub compressor_input_bound: u32,
pub writer_input_bound: u32,
}
pub enum ControllerMessage {
Failure {
worker_id: Arc<String>,
error_message: String,
},
}
pub trait RawPile: Send + Sync + Debug + 'static {
// TODO expose verification errors?
fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<bool>;
fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>>;
@ -99,6 +114,12 @@ pub trait RawPile: Send + Sync + Debug {
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
Ok(None)
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>>;
}
impl RawPile for Box<dyn RawPile> {
@ -129,6 +150,15 @@ impl RawPile for Box<dyn RawPile> {
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.as_ref().debug_statistics()
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
self.as_ref()
.build_storage_pipeline(settings, controller_send)
}
}
impl<RP: RawPile> RawPile for Arc<RP> {
@ -159,6 +189,15 @@ impl<RP: RawPile> RawPile for Arc<RP> {
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.as_ref().debug_statistics()
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
self.as_ref()
.build_storage_pipeline(settings, controller_send)
}
}
#[derive(Debug)]

View File

@ -21,14 +21,16 @@ use std::thread::JoinHandle;
use anyhow::anyhow;
use crossbeam_channel::{Receiver, Sender};
use derivative::Derivative;
use log::error;
use zstd::block::{Compressor, Decompressor};
use crate::pile::{DebugStatistics, Keyspace, RawPile};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings};
pub const DECOMPRESS_CAPACITY: usize = 32 * 1024 * 1024;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct CompressionSettings {
/// Raw dictionary to pass to Zstd for compression and decompression
pub dictionary: Arc<Vec<u8>>,
@ -40,11 +42,15 @@ pub struct CompressionSettings {
pub num_decompressors: u32,
}
#[derive(Debug)]
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
// we need to use derivative's Clone impl because Arc<R> causes R to have a bound on Clone
// even though that's not needed. https://github.com/rust-lang/rust/issues/26925
pub struct RawPileCompressor<R: RawPile> {
underlying: R,
compressor: Sender<(Vec<u8>, Sender<Vec<u8>>)>,
decompressor: Sender<(Vec<u8>, Sender<Vec<u8>>)>,
underlying: Arc<R>,
compressor: Option<Sender<(Vec<u8>, Sender<Vec<u8>>)>>,
decompressor: Option<Sender<(Vec<u8>, Sender<Vec<u8>>)>>,
settings: Arc<CompressionSettings>,
}
impl<R: RawPile> RawPileCompressor<R> {
@ -52,6 +58,20 @@ impl<R: RawPile> RawPileCompressor<R> {
underlying: R,
settings: CompressionSettings,
) -> anyhow::Result<(Self, Vec<JoinHandle<()>>)> {
if settings.num_compressors == 0 && settings.num_decompressors == 0 {
// optimisation for when we're only building a pipeline: we don't want to
return Ok((
RawPileCompressor {
underlying: Arc::new(underlying),
compressor: None,
decompressor: None,
settings: Arc::new(settings),
},
Vec::with_capacity(0),
));
}
let (com_s, com_r) = crossbeam_channel::bounded(4);
let (dec_s, dec_r) = crossbeam_channel::bounded(4);
@ -85,9 +105,10 @@ impl<R: RawPile> RawPileCompressor<R> {
Ok((
RawPileCompressor {
underlying,
compressor: com_s,
decompressor: dec_s,
underlying: Arc::new(underlying),
compressor: Some(com_s),
decompressor: Some(dec_s),
settings: Arc::new(settings),
},
handles,
))
@ -124,6 +145,8 @@ impl<R: RawPile> RawPileCompressor<R> {
fn decompress(&self, data: &[u8]) -> anyhow::Result<Vec<u8>> {
let (ret_s, ret_r) = crossbeam_channel::bounded(0);
self.decompressor
.as_ref()
.expect("No decompressors configured")
.send((data.to_vec(), ret_s))
.or(Err(anyhow!("couldn't send to decompressor")))?;
@ -133,11 +156,23 @@ impl<R: RawPile> RawPileCompressor<R> {
fn compress(&self, compressed_data: &[u8]) -> anyhow::Result<Vec<u8>> {
let (ret_s, ret_r) = crossbeam_channel::bounded(0);
self.compressor
.as_ref()
.expect("No compressors configured")
.send((compressed_data.to_vec(), ret_s))
.or(Err(anyhow!("couldn't send to compressor")))?;
Ok(ret_r.recv().or(Err(anyhow!("couldn't receive result")))?)
}
fn storage_pipeline_worker(
&self,
next_stage: Sender<(ChunkId, Vec<u8>)>,
input: Receiver<(ChunkId, Vec<u8>)>,
controller_send: &Sender<ControllerMessage>,
) -> anyhow::Result<()> {
todo!();
Ok(())
}
}
impl<R: RawPile> RawPile for RawPileCompressor<R> {
@ -177,4 +212,40 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.underlying.debug_statistics()
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
// this one should have a few threads behind it! yarr!
let subsequent_pipeline = self
.underlying
.build_storage_pipeline(settings.clone(), controller_send.clone())?;
let (input_to_this_stage, receiver) =
crossbeam_channel::bounded(settings.compressor_input_bound as usize);
for compressor_number in 0..settings.num_compressors {
let subsequent_pipeline = subsequent_pipeline.clone();
let receiver = receiver.clone();
let controller_send = controller_send.clone();
let this = (*self).clone();
thread::spawn(move || {
let worker_id = Arc::new(format!("compressor-{}", compressor_number));
if let Err(err) =
this.storage_pipeline_worker(subsequent_pipeline, receiver, &controller_send)
{
controller_send
.send(ControllerMessage::Failure {
worker_id,
error_message: format!("err {:?}", err),
})
.expect("This is BAD: failed to send failure message to controller.");
}
});
}
Ok(input_to_this_stage)
}
}

View File

@ -20,7 +20,9 @@ use log::warn;
use sodiumoxide::crypto::secretbox;
use sodiumoxide::crypto::secretbox::{Key, Nonce, NONCEBYTES};
use crate::pile::{Keyspace, RawPile};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings};
use crossbeam_channel::Sender;
/// A RawPile that provides encryption of chunk contents.
/// Please note that keys are not currently encrypted, so this scheme is not CPA-secure.
@ -109,4 +111,12 @@ impl<R: RawPile> RawPile for RawPileEncryptor<R> {
fn check_lowlevel(&self) -> anyhow::Result<bool> {
self.underlying.check_lowlevel()
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
todo!()
}
}

View File

@ -19,9 +19,10 @@ use std::hash::Hasher;
use thiserror::Error;
use crate::definitions::XXH64_SEED;
use crate::pile::{DebugStatistics, Keyspace, RawPile};
use crate::definitions::{ChunkId, XXH64_SEED};
use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings};
use crate::utils::bytes_to_hexstring;
use crossbeam_channel::Sender;
/// This RawPile enables checking the integrity of stored chunks.
/// This is done by storing a hash along with the chunk contents, which can later be verified.
@ -113,4 +114,12 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.underlying.debug_statistics()
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
todo!()
}
}

View File

@ -18,11 +18,11 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::fs;
use std::fs::{read_dir, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex};
use std::{fs, thread};
use anyhow::{bail, Context};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
@ -32,9 +32,11 @@ use rusqlite::{params, Error};
use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId;
use crate::pile::{DebugStatistics, Keyspace, RawPile};
use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings};
use crate::utils::bytes_to_hexstring;
use crossbeam_channel::{Receiver, Sender};
use rusqlite::ffi::ErrorCode::ConstraintViolation;
use std::time::Duration;
/// Bloblogs will not be reused if they are already 2 GiB large.
pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
@ -193,11 +195,11 @@ impl Inner {
/// Because random access is important for performance, an additional SQLite database is used
/// as a map from chunk IDs to their positions in the blob logs, allowing readers to seek to the
/// appropriate place and read a chunk randomly.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SqliteBloblogPile {
inner: Arc<Mutex<Inner>>,
path: PathBuf,
writers_reach_zero: Condvar,
writers_reach_zero: Arc<Condvar>,
should_batch_pointer_writes: bool,
}
@ -367,6 +369,79 @@ impl SqliteBloblogPile {
let mut inner = self.inner.lock().unwrap();
inner.flush()
}
fn storage_pipeline_worker(
&self,
incoming: Receiver<(ChunkId, Vec<u8>)>,
controller_sender: &Sender<ControllerMessage>,
) -> anyhow::Result<()> {
// can hold on to the same bloblog as long as we'd like!
const POINTERS_BUFFER_SIZE: usize = 256;
let mut pointers_buffered = Vec::with_capacity(POINTERS_BUFFER_SIZE);
fn flush_pointers(
this: &SqliteBloblogPile,
pointers_buffered: &mut Vec<BloblogPointer>,
) -> anyhow::Result<()> {
todo!()
}
fn write_blob(
this: &SqliteBloblogPile,
bloblog_id: BloblogId,
bloblog: &mut Bloblog,
pointers_buffered: &mut Vec<BloblogPointer>,
(chunk_id, chunk): (ChunkId, Vec<u8>),
) -> anyhow::Result<()> {
let offset = bloblog.write_blob(&chunk_id, &chunk)?;
let pointer = BloblogPointer {
bloblog: bloblog_id,
offset,
};
pointers_buffered.push(pointer);
if pointers_buffered.len() >= POINTERS_BUFFER_SIZE {
flush_pointers(this, pointers_buffered)?;
}
Ok(())
}
while let Ok(chunk) = incoming.recv() {
let (bloblog_id, bloglog_mutex) = self.get_writing_bloblog()?;
let mut bloblog = bloglog_mutex.lock().expect("Failed to lock bloblog?");
write_blob(
self,
bloblog_id,
&mut bloblog,
&mut pointers_buffered,
chunk,
)?;
while let Ok(chunk) = incoming.recv_timeout(Duration::from_secs(5)) {
write_blob(
self,
bloblog_id,
&mut bloblog,
&mut pointers_buffered,
chunk,
)?;
if bloblog.filesize()? > MAX_BLOBLOG_REUSE_SIZE {
// get a new bloblog to write with.
break;
}
}
drop(bloblog);
self.return_writing_bloblog(bloblog_id, bloglog_mutex)?;
}
flush_pointers(self, &mut pointers_buffered)?;
// we MUST have flushed ALL the pointers by now.
assert!(pointers_buffered.is_empty());
Ok(())
}
}
impl Drop for SqliteBloblogPile {
@ -594,6 +669,29 @@ impl RawPile for SqliteBloblogPile {
total_chunk_size,
}))
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
let (sender, incoming) = crossbeam_channel::bounded(settings.writer_input_bound as usize);
let this = self.clone();
thread::spawn(move || {
let worker_id = Arc::new(format!("bloblogwriter"));
if let Err(err) = this.storage_pipeline_worker(incoming, &controller_send) {
controller_send
.send(ControllerMessage::Failure {
worker_id,
error_message: format!("err {:?}", err),
})
.expect("This is BAD: failed to send failure message to controller.");
}
});
todo!()
}
}
struct KeyIterator {

View File

@ -8,7 +8,8 @@ use anyhow::anyhow;
use crossbeam_channel::{Receiver, Sender};
use log::{error, info};
use crate::pile::{Keyspace, RawPile};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings};
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
@ -303,6 +304,15 @@ impl RawPile for Requester {
ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for LowLevelCheck.")),
}
}
fn build_storage_pipeline(
&self,
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
// this one is a little bit more complex.
todo!()
}
}
pub struct ListKeyIterator {