Remove non-pipelined storage

This commit is contained in:
Olivier 'reivilibre' 2022-05-28 23:09:34 +01:00
parent d82176075a
commit 23b352f936
3 changed files with 23 additions and 36 deletions

View File

@ -202,7 +202,6 @@ pub fn backup_source_to_destination<PT: ProgressTracker>(
parent,
num_workers,
progress_bar,
true,
)?;
info!("Stored!");

View File

@ -71,7 +71,6 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
parent,
get_number_of_workers("YAMA_CHUNKERS"),
progress_bar,
true,
)?;
for join_handle in requester_join_handles {
@ -169,7 +168,6 @@ pub fn chunking_stdio() -> anyhow::Result<()> {
parent,
get_number_of_workers("YAMA_CHUNKERS"),
&mut progress_bar,
true,
)?;
requester_join_handles

View File

@ -254,41 +254,31 @@ pub fn store_fully<PT: ProgressTracker>(
fully_integrate_pointer_node(&pile, &mut parent_node.node, &mut parent_pointer)?;
differentiate_node_in_place(&mut root_node, &parent_node.node)?;
}
// TODO make these configurable
let sps = StoragePipelineSettings {
num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32,
compressor_input_bound: 64,
writer_input_bound: 64,
};
let (control_tx, control_rx) = crossbeam_channel::unbounded();
let pile2 = pile.clone();
let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?;
if use_pipelined_storage {
// TODO make these configurable
let sps = StoragePipelineSettings {
num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32,
compressor_input_bound: 64,
writer_input_bound: 64,
};
let (control_tx, control_rx) = crossbeam_channel::unbounded();
let pile2 = pile.clone();
let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?;
// TODO(newver) The existence checker stage should be able to be swapped between different implementations.
let pipeline = existence_checker_stage(pile2, pipeline);
// TODO(newver) The existence checker stage should be able to be swapped between different implementations.
let pipeline = existence_checker_stage(pile2, pipeline);
store(
&root_dir,
&mut root_node,
&pipeline,
progress_bar,
num_workers,
)?;
// must drop the pipeline to allow the threads to close
drop(pipeline);
while let Ok(_) = control_rx.recv() {
// TODO nothing for now.
}
} else {
store(
&root_dir,
&mut root_node,
pile.as_ref(),
progress_bar,
num_workers,
)?;
store(
&root_dir,
&mut root_node,
&pipeline,
progress_bar,
num_workers,
)?;
// must drop the pipeline to allow the threads to close
drop(pipeline);
while let Ok(_) = control_rx.recv() {
// TODO nothing for now.
}
let mut uid_lookup = BTreeMap::new();