diff --git a/datman/src/lib.rs b/datman/src/lib.rs index 4720ebd..ab23eaf 100644 --- a/datman/src/lib.rs +++ b/datman/src/lib.rs @@ -1,4 +1,5 @@ pub mod commands; pub mod descriptor; pub mod labelling; +pub mod remote; pub mod tree; diff --git a/datman/src/remote.rs b/datman/src/remote.rs new file mode 100644 index 0000000..04e6521 --- /dev/null +++ b/datman/src/remote.rs @@ -0,0 +1,2 @@ +pub mod backup_source_requester; +pub mod backup_source_responder; diff --git a/datman/src/remote/backup_source_requester.rs b/datman/src/remote/backup_source_requester.rs new file mode 100644 index 0000000..12dcc4c --- /dev/null +++ b/datman/src/remote/backup_source_requester.rs @@ -0,0 +1,65 @@ +use crate::tree::FileTree; +use anyhow::bail; +use std::io::{Read, Write}; +use std::path::Path; +use std::sync::Arc; +use yama::definitions::TreeNode; +use yama::pile::RawPile; +use yama::remote::responder::Responder; +use yama::remote::{read_message, write_message}; +use yama::utils::get_number_of_workers; + +pub fn introduction(read: &mut R, write: &mut W) -> anyhow::Result<()> { + let version = env!("CARGO_PKG_VERSION"); + write_message( + write, + &format!("Datman v{} Backup Source Requester", version), + )?; + + let foreign_side: String = read_message(read)?; + let expected_foreign_side = format!("Datman v{} Backup Source Responder", version); + if &foreign_side != &expected_foreign_side { + bail!( + "Datman version mismatch. Expected {:?}, got {:?}", + expected_foreign_side, + foreign_side + ); + } + + Ok(()) +} + +pub fn scanning( + read: &mut R, + write: &mut W, + path: &Path, +) -> anyhow::Result>> { + write_message(write, &path)?; + let scan_result: Option> = read_message(read)?; + + Ok(scan_result) +} + +pub fn chunking( + read: R, + mut write: W, + pointer_name: String, + tree_node: &TreeNode, + raw_pile: Arc, +) -> anyhow::Result<()> { + write_message(&mut write, &pointer_name)?; + write_message(&mut write, tree_node)?; + + let join_handles = Responder::start( + read, + write, + get_number_of_workers("YAMA_RESPONDERS") as u16, + raw_pile, + ); + + for handle in join_handles { + handle.join().expect("Join handle should not fail"); + } + + Ok(()) +} diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs new file mode 100644 index 0000000..8e79fc1 --- /dev/null +++ b/datman/src/remote/backup_source_responder.rs @@ -0,0 +1,84 @@ +// This file implements the responder side of the backup source protocol -- the protocol used +// to connect to remote backup sources. + +use crate::tree::scan; +use anyhow::bail; +use std::io::{Read, Write}; +use std::path::PathBuf; +use yama::definitions::TreeNode; +use yama::pile::{Pile, RawPile}; +use yama::remote::requester::Requester; +use yama::remote::{read_message, write_message}; +use yama::utils::get_number_of_workers; + +pub fn introduction(read: &mut R, write: &mut W) -> anyhow::Result<()> { + let version = env!("CARGO_PKG_VERSION"); + write_message( + write, + &format!("Datman v{} Backup Source Responder", version), + )?; + + let foreign_side: String = read_message(read)?; + let expected_foreign_side = format!("Datman v{} Backup Source Requester", version); + if &foreign_side != &expected_foreign_side { + bail!( + "Datman version mismatch. Expected {:?}, got {:?}", + expected_foreign_side, + foreign_side + ); + } + + Ok(()) +} + +pub fn scanning(read: &mut R, write: &mut W) -> anyhow::Result { + let path: PathBuf = read_message(read)?; + let scan_result = scan(&path)?; + write_message(write, &scan_result)?; + + Ok(path) +} + +pub fn chunking( + mut read: R, + write: W, + path: &PathBuf, +) -> anyhow::Result<()> { + let pointer_name: String = read_message(&mut read)?; + let tree_node: TreeNode = read_message(&mut read)?; + let parent: Option = read_message(&mut read)?; + + let (yama_requester, requester_join_handles) = Requester::new(read, write); + + let raw_pile: Box = Box::new(yama_requester); + + let pile = Pile::new(raw_pile); + + yama::operations::storing::store_fully( + &pile, + path, + &pointer_name, + tree_node, + parent, + get_number_of_workers("YAMA_CHUNKERS"), + )?; + + for join_handle in requester_join_handles { + join_handle.join().expect("Expected to join handle"); + } + + Ok(()) +} + +pub fn handler( + mut read: R, + mut write: W, +) -> anyhow::Result<()> { + introduction(&mut read, &mut write)?; + + let path = scanning(&mut read, &mut write)?; + + chunking(read, write, &path)?; + + Ok(()) +} diff --git a/yama/src/remote.rs b/yama/src/remote.rs index 5df86e6..43f1fa3 100644 --- a/yama/src/remote.rs +++ b/yama/src/remote.rs @@ -80,19 +80,14 @@ pub enum ResponseBody { }, } -pub fn read_message( - read: &mut R, -) -> anyhow::Result { +pub fn read_message(read: &mut R) -> anyhow::Result { let len = read.read_u32::()?; let mut data_vec = vec![0u8; len as usize]; read.read_exact(&mut data_vec)?; Ok(serde_bare::from_slice(&data_vec)?) } -pub fn write_message( - write: &mut W, - message: &S, -) -> anyhow::Result<()> { +pub fn write_message(write: &mut W, message: &S) -> anyhow::Result<()> { let data_vec = serde_bare::to_vec(&message)?; write.write_u32::(data_vec.len().try_into()?)?; write.write_all(&data_vec)?;