diff --git a/datman/src/remote/backup_source_requester.rs b/datman/src/remote/backup_source_requester.rs index dda2786..5c7dc7c 100644 --- a/datman/src/remote/backup_source_requester.rs +++ b/datman/src/remote/backup_source_requester.rs @@ -9,7 +9,8 @@ use std::path::Path; use std::process::{Child, Command, Stdio}; use std::sync::Arc; use yama::commands::{load_pile_descriptor, open_pile}; -use yama::definitions::TreeNode; +use yama::definitions::{PartialPointerData, TreeNode}; +use yama::operations::storing::{pointer_ops_prepare_to_store, pointers_ops_after_store}; use yama::pile::{Pile, RawPile, StoragePipelineSettings}; use yama::progress::ProgressTracker; use yama::remote::responder::{Responder, ResponderWritingPipeline}; @@ -64,19 +65,15 @@ pub fn chunking< read: R, mut write: W, path: &Path, - pointer_name: String, tree_node: &TreeNode, raw_pile: Arc, - parent: Option, progress_bar: PT, use_writing_pipeline: bool, -) -> anyhow::Result<(R, W)> { +) -> anyhow::Result<(R, W, PartialPointerData)> { info!("Chunking."); write_message(&mut write, &"chunk")?; write_message(&mut write, &path)?; - write_message(&mut write, &pointer_name)?; write_message(&mut write, tree_node)?; - write_message(&mut write, &parent)?; write.flush()?; let (writing_pipeline, control_rx) = if use_writing_pipeline { @@ -111,7 +108,7 @@ pub fn chunking< for handle in join_handles { handle.join().expect("Join handle should not fail"); } - let read = r_handle.join().unwrap(); + let mut read = r_handle.join().unwrap(); let write = w_handle.join().unwrap(); if let Some(control_rx) = control_rx { @@ -122,7 +119,9 @@ pub fn chunking< info!("Remote finished chunking."); - Ok((read, write)) + let pointer_data: PartialPointerData = read_message(&mut read)?; + + Ok((read, write, pointer_data)) } pub fn quit(read: &mut R, write: &mut W) -> anyhow::Result<()> { @@ -199,7 +198,7 @@ pub fn backup_remote_source_to_destination let scan_result = scanning(&mut read, &mut write, directory.as_ref())? .ok_or_else(|| anyhow!("Remote scan failed (does the directory exist?)"))?; - let root = + let mut root = label_filter_and_convert(scan_result, descriptor, desc_path, source_name, dest)? .ok_or_else(|| anyhow!("Empty filter..."))?; @@ -248,19 +247,27 @@ pub fn backup_remote_source_to_destination let raw_pile = Arc::new(pile.raw_pile); let pile = Pile::new(raw_pile.clone()); - let (mut read, mut write) = chunking( + pointer_ops_prepare_to_store(&pile, &mut root, &parent)?; + + info!( + "Have pointer_name = {:?}, parent = {:?}", + pointer_name, parent + ); + + let (mut read, mut write, pointer_data) = chunking( read, write, directory.as_ref(), - pointer_name.clone(), &root, raw_pile, - parent, progress_bar, true, )?; quit(&mut read, &mut write)?; + + pointers_ops_after_store(&pile, &pointer_name, &pointer_data.complete(parent))?; + pile.flush()?; info!("Stored! Checking for existence..."); diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs index 23b8a90..dffce83 100644 --- a/datman/src/remote/backup_source_responder.rs +++ b/datman/src/remote/backup_source_responder.rs @@ -1,21 +1,24 @@ // This file implements the responder side of the backup source protocol -- the protocol used // to connect to remote backup sources. -use crate::tree::scan; -use anyhow::bail; -use crossbeam_channel::Sender; -use log::info; use std::io::{stdin, stdout, Read, Write}; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use yama::definitions::TreeNode; + +use anyhow::bail; +use crossbeam_channel::Sender; +use log::info; + +use yama::definitions::{PartialPointerData, TreeNode}; use yama::pile::{Pile, RawPile}; use yama::progress::ProgressTracker; use yama::remote::requester::Requester; use yama::remote::{read_message, write_message, RequestBody, ResponseBody}; use yama::utils::get_number_of_workers; +use crate::tree::scan; + pub fn introduction(read: &mut R, write: &mut W) -> anyhow::Result<()> { let version = env!("CARGO_PKG_VERSION"); write_message( @@ -45,41 +48,6 @@ pub fn scanning(read: &mut R, write: &mut W) -> anyhow::Resul Ok(()) } -pub fn chunking( - mut read: R, - write: W, -) -> anyhow::Result<()> { - let path: PathBuf = read_message(&mut read)?; - let pointer_name: String = read_message(&mut read)?; - let tree_node: TreeNode = read_message(&mut read)?; - let parent: Option = read_message(&mut read)?; - - let (yama_requester, requester_join_handles) = Requester::new(read, write); - - let raw_pile: Box = Box::new(yama_requester); - - let pile = Pile::new(raw_pile); - - // TODO TODO progress - let progress_bar = &mut (); - - yama::operations::storing::store_fully( - Arc::new(pile), - &path, - &pointer_name, - tree_node, - parent, - get_number_of_workers("YAMA_CHUNKERS"), - progress_bar, - )?; - - for join_handle in requester_join_handles { - join_handle.join().expect("Expected to join handle"); - } - - Ok(()) -} - pub struct ProgressSender { pub last_sent: Instant, pub current_progress: u64, @@ -128,23 +96,16 @@ impl ProgressTracker for ProgressSender { } } -pub fn chunking_stdio() -> anyhow::Result<()> { - let (path, pointer_name, tree_node, parent) = { +pub fn chunking_stdio() -> anyhow::Result { + let (path, tree_node) = { let stdin = stdin(); let mut read = stdin.lock(); let path: PathBuf = read_message(&mut read)?; - let pointer_name: String = read_message(&mut read)?; let tree_node: TreeNode = read_message(&mut read)?; - let parent: Option = read_message(&mut read)?; - (path, pointer_name, tree_node, parent) + (path, tree_node) }; - info!( - "Have pointer_name = {:?}, parent = {:?}", - pointer_name, parent - ); - - let requester_join_handles = { + let (pointer_data, requester_join_handles) = { let (yama_requester, requester_join_handles) = Requester::new_from_stdio(); let command_sender = yama_requester.clone_command_sender(); info!("progress sender in use"); @@ -160,17 +121,15 @@ pub fn chunking_stdio() -> anyhow::Result<()> { let pile = Pile::new(raw_pile); - yama::operations::storing::store_fully( - Arc::new(pile), + let pointer_data = yama::operations::storing::store_without_pointer_ops( + &Arc::new(pile), &path, - &pointer_name, tree_node, - parent, get_number_of_workers("YAMA_CHUNKERS"), &mut progress_bar, )?; - requester_join_handles + (pointer_data, requester_join_handles) }; info!("Waiting to join."); @@ -181,20 +140,7 @@ pub fn chunking_stdio() -> anyhow::Result<()> { info!("Chunking completed."); - Ok(()) -} - -pub fn handler( - mut read: R, - mut write: W, -) -> anyhow::Result<()> { - introduction(&mut read, &mut write)?; - - scanning(&mut read, &mut write)?; - - chunking(read, write)?; - - Ok(()) + Ok(pointer_data) } pub fn handler_stdio() -> anyhow::Result<()> { @@ -217,9 +163,10 @@ pub fn handler_stdio() -> anyhow::Result<()> { info!("Chunking."); drop(read); drop(write); - chunking_stdio()?; + let pointer_data = chunking_stdio()?; read = stdin.lock(); write = stdout.lock(); + write_message(&mut write, &pointer_data)?; } "exit" => { write_message(&mut write, &"exit")?; diff --git a/yama/src/definitions.rs b/yama/src/definitions.rs index 787f32a..b5c441d 100644 --- a/yama/src/definitions.rs +++ b/yama/src/definitions.rs @@ -35,6 +35,24 @@ pub struct PointerData { pub gid_lookup: BTreeMap>, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PartialPointerData { + pub chunk_ref: RecursiveChunkRef, + pub uid_lookup: BTreeMap>, + pub gid_lookup: BTreeMap>, +} + +impl PartialPointerData { + pub fn complete(self, parent_pointer: Option) -> PointerData { + PointerData { + chunk_ref: self.chunk_ref, + parent_pointer, + uid_lookup: self.uid_lookup, + gid_lookup: self.gid_lookup, + } + } +} + #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct RecursiveChunkRef { /// The root Chunk ID. diff --git a/yama/src/operations/storing.rs b/yama/src/operations/storing.rs index ee86daf..563464a 100644 --- a/yama/src/operations/storing.rs +++ b/yama/src/operations/storing.rs @@ -29,7 +29,9 @@ use log::{error, warn}; use crate::chunking::{ChunkSubmissionTarget, RecursiveChunker, SENSIBLE_THRESHOLD}; use crate::commands; use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node}; -use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode}; +use crate::definitions::{ + PartialPointerData, PointerData, RecursiveChunkRef, RootTreeNode, TreeNode, +}; use crate::pile::{existence_checker_stage, Pile, RawPile, StoragePipelineSettings}; use crate::progress::ProgressTracker; use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place}; @@ -242,21 +244,16 @@ pub fn store_fully( progress_bar: &mut PT, ) -> anyhow::Result<()> { pointer_ops_prepare_to_store(&pile, &mut root_node, &parent)?; - let pointer_data = store_without_pointer_ops( - &pile, - &root_dir, - root_node, - parent, - num_workers, - progress_bar, - )?; - pointers_ops_after_store(pile, &new_pointer_name, &pointer_data)?; + let pointer_data = + store_without_pointer_ops(&pile, &root_dir, root_node, num_workers, progress_bar)? + .complete(parent); + pointers_ops_after_store(&pile, &new_pointer_name, &pointer_data)?; Ok(()) } pub fn pointers_ops_after_store( - pile: Arc>>, - new_pointer_name: &&String, + pile: &Pile, + new_pointer_name: &str, pointer_data: &PointerData, ) -> anyhow::Result<()> { pile.write_pointer(&new_pointer_name, &pointer_data)?; @@ -265,7 +262,7 @@ pub fn pointers_ops_after_store( } pub fn pointer_ops_prepare_to_store( - pile: &Arc>>, + pile: &Pile, mut root_node: &mut TreeNode, parent: &Option, ) -> anyhow::Result<()> { @@ -288,10 +285,9 @@ pub fn store_without_pointer_ops( pile: &Arc>>, root_dir: &PathBuf, mut root_node: TreeNode, - parent: Option, num_workers: u8, progress_bar: &mut PT, -) -> anyhow::Result { +) -> anyhow::Result { // TODO make these configurable let sps = StoragePipelineSettings { num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32, @@ -332,9 +328,8 @@ pub fn store_without_pointer_ops( }, )?; - let pointer_data = PointerData { + let pointer_data = PartialPointerData { chunk_ref, - parent_pointer: parent, uid_lookup, gid_lookup, };