From 23b352f93601b8e7be2e327825c9efd2821e5c07 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 28 May 2022 23:09:34 +0100 Subject: [PATCH] Remove non-pipelined storage --- datman/src/commands/backup.rs | 1 - datman/src/remote/backup_source_responder.rs | 2 - yama/src/operations/storing.rs | 56 ++++++++------------ 3 files changed, 23 insertions(+), 36 deletions(-) diff --git a/datman/src/commands/backup.rs b/datman/src/commands/backup.rs index caca8d0..616a606 100644 --- a/datman/src/commands/backup.rs +++ b/datman/src/commands/backup.rs @@ -202,7 +202,6 @@ pub fn backup_source_to_destination( parent, num_workers, progress_bar, - true, )?; info!("Stored!"); diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs index dd84ad4..23b8a90 100644 --- a/datman/src/remote/backup_source_responder.rs +++ b/datman/src/remote/backup_source_responder.rs @@ -71,7 +71,6 @@ pub fn chunking( parent, get_number_of_workers("YAMA_CHUNKERS"), progress_bar, - true, )?; for join_handle in requester_join_handles { @@ -169,7 +168,6 @@ pub fn chunking_stdio() -> anyhow::Result<()> { parent, get_number_of_workers("YAMA_CHUNKERS"), &mut progress_bar, - true, )?; requester_join_handles diff --git a/yama/src/operations/storing.rs b/yama/src/operations/storing.rs index 9a83137..48fbf8e 100644 --- a/yama/src/operations/storing.rs +++ b/yama/src/operations/storing.rs @@ -254,41 +254,31 @@ pub fn store_fully( fully_integrate_pointer_node(&pile, &mut parent_node.node, &mut parent_pointer)?; differentiate_node_in_place(&mut root_node, &parent_node.node)?; } + + // TODO make these configurable + let sps = StoragePipelineSettings { + num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32, + compressor_input_bound: 64, + writer_input_bound: 64, + }; + let (control_tx, control_rx) = crossbeam_channel::unbounded(); + let pile2 = pile.clone(); + let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?; - if use_pipelined_storage { - // TODO make these configurable - let sps = StoragePipelineSettings { - num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32, - compressor_input_bound: 64, - writer_input_bound: 64, - }; - let (control_tx, control_rx) = crossbeam_channel::unbounded(); - let pile2 = pile.clone(); - let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?; + // TODO(newver) The existence checker stage should be able to be swapped between different implementations. + let pipeline = existence_checker_stage(pile2, pipeline); - // TODO(newver) The existence checker stage should be able to be swapped between different implementations. - let pipeline = existence_checker_stage(pile2, pipeline); - - store( - &root_dir, - &mut root_node, - &pipeline, - progress_bar, - num_workers, - )?; - // must drop the pipeline to allow the threads to close - drop(pipeline); - while let Ok(_) = control_rx.recv() { - // TODO nothing for now. - } - } else { - store( - &root_dir, - &mut root_node, - pile.as_ref(), - progress_bar, - num_workers, - )?; + store( + &root_dir, + &mut root_node, + &pipeline, + progress_bar, + num_workers, + )?; + // must drop the pipeline to allow the threads to close + drop(pipeline); + while let Ok(_) = control_rx.recv() { + // TODO nothing for now. } let mut uid_lookup = BTreeMap::new();