Switch over from Pile to ChunkSubmissionTarget

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 11:20:34 +00:00
parent 5442dc582b
commit c73ac35df1
4 changed files with 14 additions and 15 deletions

View File

@ -231,8 +231,7 @@ pub fn backup_source_to_destination<PT: ProgressTracker>(
}
info!("Will write as pointer {:?}.", pointer_name);
let mut chunker =
yama::chunking::RecursiveChunker::new_from_pile(SENSIBLE_THRESHOLD, &pile);
let mut chunker = yama::chunking::RecursiveChunker::new(SENSIBLE_THRESHOLD, &pile);
let mut process = open_stdout_backup_process(extra_args, helper)?;

View File

@ -57,22 +57,22 @@ impl ChunkSubmissionTarget for crossbeam_channel::Sender<(ChunkId, Vec<u8>)> {
/// The root RecursiveChunker is fed data bytes.
/// If it exceeds the nominated threshold, it grows a child RecursiveChunker (which may do the same).
/// When done, `finish` should be called to flush the buffers and obtain a `RecursiveChunkRef`.
pub struct RecursiveChunker<'pile, RP: RawPile> {
pub struct RecursiveChunker<'cst, CST: ChunkSubmissionTarget> {
/// The pile to submit chunks to.
pile: &'pile Pile<RP>,
target: &'cst CST,
/// Buffer of data at this level.
buffer: Vec<u8>,
/// The next-layer recursive chunker. This is where this chunker will submit chunk IDs to for
/// recursive chunking.
next_layer: Option<Box<RecursiveChunker<'pile, RP>>>,
next_layer: Option<Box<RecursiveChunker<'cst, CST>>>,
/// The size at which this chunker will perform recursive chunking.
threshold: usize,
}
impl<'pile, RP: RawPile> RecursiveChunker<'pile, RP> {
pub fn new_from_pile(threshold: usize, pile: &'pile Pile<RP>) -> Self {
impl<'cst, CST: ChunkSubmissionTarget> RecursiveChunker<'cst, CST> {
pub fn new(threshold: usize, target: &'cst CST) -> Self {
RecursiveChunker {
pile,
target,
buffer: vec![],
next_layer: None,
threshold,
@ -93,7 +93,7 @@ impl<'pile, RP: RawPile> RecursiveChunker<'pile, RP> {
let chunk_data = &self.buffer[chunk.offset..chunk.offset + chunk.length];
let chunk_id = calculate_chunkid(chunk_data);
new_chunks.extend_from_slice(&chunk_id);
self.pile.submit_chunk(chunk_id, chunk_data)?;
self.target.submit(chunk_id, chunk_data)?;
}
}
@ -119,7 +119,7 @@ impl<'pile, RP: RawPile> RecursiveChunker<'pile, RP> {
} else {
// no chunking, so depth=0 (raw) and just emit our unchunked data
let chunk_id = calculate_chunkid(&self.buffer);
self.pile.submit_chunk(chunk_id, &self.buffer)?;
self.target.submit(chunk_id, &self.buffer)?;
Ok(RecursiveChunkRef { chunk_id, depth: 0 })
}
}
@ -130,9 +130,9 @@ impl<'pile, RP: RawPile> RecursiveChunker<'pile, RP> {
if self.buffer.len() > self.threshold {
if self.next_layer.is_none() {
// start chunking
self.next_layer = Some(Box::new(RecursiveChunker::new_from_pile(
self.next_layer = Some(Box::new(RecursiveChunker::new(
self.threshold,
self.pile.clone(),
self.target.clone(),
)));
}
@ -145,7 +145,7 @@ impl<'pile, RP: RawPile> RecursiveChunker<'pile, RP> {
}
}
impl<'pile, RP: RawPile> Write for RecursiveChunker<'pile, RP> {
impl<'cst, CST: ChunkSubmissionTarget> Write for RecursiveChunker<'cst, CST> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.write_impl(buf) {
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),

View File

@ -112,7 +112,7 @@ pub fn store_tree_node<RP: RawPile>(
root_tree_node: &RootTreeNode,
) -> anyhow::Result<RecursiveChunkRef> {
let serialised = serde_bare::to_vec(root_tree_node)?;
let mut chunker = RecursiveChunker::new_from_pile(SENSIBLE_THRESHOLD, pile);
let mut chunker = RecursiveChunker::new(SENSIBLE_THRESHOLD, pile);
io::copy(&mut (&serialised[..]), &mut chunker)?;
let chunk_ref = chunker.finish()?;
Ok(chunk_ref)

View File

@ -93,7 +93,7 @@ pub fn store_worker<RP: RawPile>(
let full_path = root.join(&path);
match File::open(&full_path) {
Ok(mut file) => {
let mut chunker = RecursiveChunker::new_from_pile(SENSIBLE_THRESHOLD, &pile);
let mut chunker = RecursiveChunker::new(SENSIBLE_THRESHOLD, pile);
// streaming copy from file to chunker, really cool :)
io::copy(&mut file, &mut chunker)?;
let chunk_ref = chunker.finish()?;