Allow using pipelined storage for store_fully

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 12:02:49 +00:00
parent 752b07e0b1
commit aaf2ea1493
3 changed files with 24 additions and 2 deletions

View File

@ -201,6 +201,7 @@ pub fn backup_source_to_destination<PT: ProgressTracker>(
parent,
num_workers,
progress_bar,
false,
)?;
info!("Stored!");

View File

@ -70,6 +70,7 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
parent,
get_number_of_workers("YAMA_CHUNKERS"),
progress_bar,
false,
)?;
for join_handle in requester_join_handles {
@ -167,6 +168,7 @@ pub fn chunking_stdio() -> anyhow::Result<()> {
parent,
get_number_of_workers("YAMA_CHUNKERS"),
&mut progress_bar,
false,
)?;
requester_join_handles

View File

@ -30,7 +30,7 @@ use crate::chunking::{ChunkSubmissionTarget, RecursiveChunker, SENSIBLE_THRESHOL
use crate::commands;
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node};
use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode};
use crate::pile::{Pile, RawPile};
use crate::pile::{Pile, RawPile, StoragePipelineSettings};
use crate::progress::ProgressTracker;
use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place};
use std::collections::BTreeMap;
@ -238,6 +238,7 @@ pub fn store_fully<PT: ProgressTracker>(
parent: Option<String>,
num_workers: u8,
progress_bar: &mut PT,
use_pipelined_storage: bool,
) -> anyhow::Result<()> {
if let Some(parent) = parent.as_ref() {
let mut parent_pointer = pile.read_pointer(parent)?.ok_or_else(|| {
@ -252,7 +253,25 @@ pub fn store_fully<PT: ProgressTracker>(
differentiate_node_in_place(&mut root_node, &parent_node.node)?;
}
store(&root_dir, &mut root_node, pile, progress_bar, num_workers)?;
if use_pipelined_storage {
// TODO make these configurable
let sps = StoragePipelineSettings {
num_compressors: num_cpus::get() as u32,
compressor_input_bound: 64,
writer_input_bound: 64,
};
let (control_tx, control_rx) = crossbeam_channel::unbounded();
let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?;
store(
&root_dir,
&mut root_node,
&pipeline,
progress_bar,
num_workers,
)?;
} else {
store(&root_dir, &mut root_node, pile, progress_bar, num_workers)?;
}
let mut uid_lookup = BTreeMap::new();
let mut gid_lookup = BTreeMap::new();