diff --git a/datman/src/commands/backup.rs b/datman/src/commands/backup.rs index 4e8058c..95a4655 100644 --- a/datman/src/commands/backup.rs +++ b/datman/src/commands/backup.rs @@ -28,6 +28,7 @@ use std::fmt::Debug; use std::io::Write; use std::path::Path; use std::process::{Child, Command, Stdio}; +use std::sync::Arc; use yama::chunking::SENSIBLE_THRESHOLD; use yama::commands::{load_pile_descriptor, open_pile, store_tree_node}; use yama::definitions::{ @@ -194,7 +195,7 @@ pub fn backup_source_to_destination( info!("Storing using yama."); yama::operations::storing::store_fully( - &pile, + Arc::new(pile), &absolute_source_path, &pointer_name, root, diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs index bb7ed7f..65cf394 100644 --- a/datman/src/remote/backup_source_responder.rs +++ b/datman/src/remote/backup_source_responder.rs @@ -7,6 +7,7 @@ use crossbeam_channel::Sender; use log::info; use std::io::{stdin, stdout, Read, Write}; use std::path::PathBuf; +use std::sync::Arc; use std::time::Instant; use yama::definitions::TreeNode; use yama::pile::{Pile, RawPile}; @@ -63,7 +64,7 @@ pub fn chunking( let progress_bar = &mut (); yama::operations::storing::store_fully( - &pile, + Arc::new(pile), &path, &pointer_name, tree_node, @@ -161,7 +162,7 @@ pub fn chunking_stdio() -> anyhow::Result<()> { let pile = Pile::new(raw_pile); yama::operations::storing::store_fully( - &pile, + Arc::new(pile), &path, &pointer_name, tree_node, diff --git a/yama/src/operations/storing.rs b/yama/src/operations/storing.rs index f536938..aa8e78f 100644 --- a/yama/src/operations/storing.rs +++ b/yama/src/operations/storing.rs @@ -30,10 +30,12 @@ use crate::chunking::{ChunkSubmissionTarget, RecursiveChunker, SENSIBLE_THRESHOL use crate::commands; use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node}; use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode}; -use crate::pile::{Pile, RawPile, StoragePipelineSettings}; +use crate::pile::{existence_checker_stage, Pile, RawPile, StoragePipelineSettings}; use crate::progress::ProgressTracker; use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place}; +use crate::utils::get_number_of_workers; use std::collections::BTreeMap; +use std::sync::Arc; pub fn store( root_path: &Path, @@ -231,7 +233,7 @@ pub fn manager( /// loaded and fully-integrated). /// This also creates a pointer (which is why this is called `store_fully`). pub fn store_fully( - pile: &Pile>, + pile: Arc>>, root_dir: &PathBuf, new_pointer_name: &String, mut root_node: TreeNode, @@ -256,12 +258,14 @@ pub fn store_fully( if use_pipelined_storage { // TODO make these configurable let sps = StoragePipelineSettings { - num_compressors: num_cpus::get() as u32, + num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32, compressor_input_bound: 64, writer_input_bound: 64, }; let (control_tx, control_rx) = crossbeam_channel::unbounded(); + let pile2 = pile.clone(); let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?; + let pipeline = existence_checker_stage(pile2, pipeline); store( &root_dir, &mut root_node, @@ -270,7 +274,13 @@ pub fn store_fully( num_workers, )?; } else { - store(&root_dir, &mut root_node, pile, progress_bar, num_workers)?; + store( + &root_dir, + &mut root_node, + pile.as_ref(), + progress_bar, + num_workers, + )?; } let mut uid_lookup = BTreeMap::new(); diff --git a/yama/src/pile.rs b/yama/src/pile.rs index 0f99fd9..a2e2fb4 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -20,6 +20,7 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use crate::definitions::{ChunkId, PointerData}; +use crate::utils::get_number_of_workers; use crossbeam_channel::Sender; use std::collections::HashSet; use std::fmt::Debug; @@ -82,6 +83,30 @@ pub struct StoragePipelineSettings { pub writer_input_bound: u32, } +pub fn existence_checker_stage( + pile: Arc>, + next_stage: Sender<(ChunkId, Vec)>, +) -> Sender<(ChunkId, Vec)> { + let (tx, rx) = crossbeam_channel::bounded::<(ChunkId, Vec)>(32); + + // TODO would like something better for the networked case + for _ in 0..get_number_of_workers("YAMA_EXISTENCE_CHECKERS") { + let next_stage = next_stage.clone(); + let rx = rx.clone(); + let pile = pile.clone(); + std::thread::spawn(move || { + while let Ok((chunk_id, chunk)) = rx.recv() { + // TODO handle errors properly + if !pile.chunk_exists(&chunk_id).unwrap() { + next_stage.send((chunk_id, chunk)).unwrap(); + } + } + }); + } + + tx +} + pub enum ControllerMessage { Failure { worker_id: Arc, diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index 748d65b..3786029 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -198,7 +198,9 @@ impl Responder { { let mut chunk_id = ChunkId::default(); chunk_id.copy_from_slice(&key[..]); - writing_pipeline.pipeline_submission.send((chunk_id, value)); + writing_pipeline + .pipeline_submission + .send((chunk_id, value))?; // We lie and say it was successful once we submit. // We'll complain on our side if anything goes wrong, anyway. Response {