From aaf2ea1493de8fbb5b441ed429203c852b555225 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 20 Nov 2021 12:02:49 +0000 Subject: [PATCH] Allow using pipelined storage for store_fully --- datman/src/commands/backup.rs | 1 + datman/src/remote/backup_source_responder.rs | 2 ++ yama/src/operations/storing.rs | 23 ++++++++++++++++++-- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/datman/src/commands/backup.rs b/datman/src/commands/backup.rs index 6407717..4e8058c 100644 --- a/datman/src/commands/backup.rs +++ b/datman/src/commands/backup.rs @@ -201,6 +201,7 @@ pub fn backup_source_to_destination( parent, num_workers, progress_bar, + false, )?; info!("Stored!"); diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs index f9ca2cc..bb7ed7f 100644 --- a/datman/src/remote/backup_source_responder.rs +++ b/datman/src/remote/backup_source_responder.rs @@ -70,6 +70,7 @@ pub fn chunking( parent, get_number_of_workers("YAMA_CHUNKERS"), progress_bar, + false, )?; for join_handle in requester_join_handles { @@ -167,6 +168,7 @@ pub fn chunking_stdio() -> anyhow::Result<()> { parent, get_number_of_workers("YAMA_CHUNKERS"), &mut progress_bar, + false, )?; requester_join_handles diff --git a/yama/src/operations/storing.rs b/yama/src/operations/storing.rs index d784fea..f536938 100644 --- a/yama/src/operations/storing.rs +++ b/yama/src/operations/storing.rs @@ -30,7 +30,7 @@ use crate::chunking::{ChunkSubmissionTarget, RecursiveChunker, SENSIBLE_THRESHOL use crate::commands; use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node}; use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode}; -use crate::pile::{Pile, RawPile}; +use crate::pile::{Pile, RawPile, StoragePipelineSettings}; use crate::progress::ProgressTracker; use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place}; use std::collections::BTreeMap; @@ -238,6 +238,7 @@ pub fn store_fully( parent: Option, num_workers: u8, progress_bar: &mut PT, + use_pipelined_storage: bool, ) -> anyhow::Result<()> { if let Some(parent) = parent.as_ref() { let mut parent_pointer = pile.read_pointer(parent)?.ok_or_else(|| { @@ -252,7 +253,25 @@ pub fn store_fully( differentiate_node_in_place(&mut root_node, &parent_node.node)?; } - store(&root_dir, &mut root_node, pile, progress_bar, num_workers)?; + if use_pipelined_storage { + // TODO make these configurable + let sps = StoragePipelineSettings { + num_compressors: num_cpus::get() as u32, + compressor_input_bound: 64, + writer_input_bound: 64, + }; + let (control_tx, control_rx) = crossbeam_channel::unbounded(); + let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?; + store( + &root_dir, + &mut root_node, + &pipeline, + progress_bar, + num_workers, + )?; + } else { + store(&root_dir, &mut root_node, pile, progress_bar, num_workers)?; + } let mut uid_lookup = BTreeMap::new(); let mut gid_lookup = BTreeMap::new();