Add skeleton of implementation for backup remote source

This commit is contained in:
Olivier 'reivilibre' 2021-07-04 12:03:13 +01:00
parent 33d2078413
commit 998cca51eb
5 changed files with 154 additions and 7 deletions

View File

@ -1,4 +1,5 @@
pub mod commands;
pub mod descriptor;
pub mod labelling;
pub mod remote;
pub mod tree;

2
datman/src/remote.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod backup_source_requester;
pub mod backup_source_responder;

View File

@ -0,0 +1,65 @@
use crate::tree::FileTree;
use anyhow::bail;
use std::io::{Read, Write};
use std::path::Path;
use std::sync::Arc;
use yama::definitions::TreeNode;
use yama::pile::RawPile;
use yama::remote::responder::Responder;
use yama::remote::{read_message, write_message};
use yama::utils::get_number_of_workers;
pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
let version = env!("CARGO_PKG_VERSION");
write_message(
write,
&format!("Datman v{} Backup Source Requester", version),
)?;
let foreign_side: String = read_message(read)?;
let expected_foreign_side = format!("Datman v{} Backup Source Responder", version);
if &foreign_side != &expected_foreign_side {
bail!(
"Datman version mismatch. Expected {:?}, got {:?}",
expected_foreign_side,
foreign_side
);
}
Ok(())
}
pub fn scanning<R: Read, W: Write>(
read: &mut R,
write: &mut W,
path: &Path,
) -> anyhow::Result<Option<FileTree<(), (), (), ()>>> {
write_message(write, &path)?;
let scan_result: Option<FileTree<(), (), (), ()>> = read_message(read)?;
Ok(scan_result)
}
pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static, RP: RawPile + 'static>(
read: R,
mut write: W,
pointer_name: String,
tree_node: &TreeNode,
raw_pile: Arc<RP>,
) -> anyhow::Result<()> {
write_message(&mut write, &pointer_name)?;
write_message(&mut write, tree_node)?;
let join_handles = Responder::start(
read,
write,
get_number_of_workers("YAMA_RESPONDERS") as u16,
raw_pile,
);
for handle in join_handles {
handle.join().expect("Join handle should not fail");
}
Ok(())
}

View File

@ -0,0 +1,84 @@
// This file implements the responder side of the backup source protocol -- the protocol used
// to connect to remote backup sources.
use crate::tree::scan;
use anyhow::bail;
use std::io::{Read, Write};
use std::path::PathBuf;
use yama::definitions::TreeNode;
use yama::pile::{Pile, RawPile};
use yama::remote::requester::Requester;
use yama::remote::{read_message, write_message};
use yama::utils::get_number_of_workers;
pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
let version = env!("CARGO_PKG_VERSION");
write_message(
write,
&format!("Datman v{} Backup Source Responder", version),
)?;
let foreign_side: String = read_message(read)?;
let expected_foreign_side = format!("Datman v{} Backup Source Requester", version);
if &foreign_side != &expected_foreign_side {
bail!(
"Datman version mismatch. Expected {:?}, got {:?}",
expected_foreign_side,
foreign_side
);
}
Ok(())
}
pub fn scanning<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<PathBuf> {
let path: PathBuf = read_message(read)?;
let scan_result = scan(&path)?;
write_message(write, &scan_result)?;
Ok(path)
}
pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
mut read: R,
write: W,
path: &PathBuf,
) -> anyhow::Result<()> {
let pointer_name: String = read_message(&mut read)?;
let tree_node: TreeNode = read_message(&mut read)?;
let parent: Option<String> = read_message(&mut read)?;
let (yama_requester, requester_join_handles) = Requester::new(read, write);
let raw_pile: Box<dyn RawPile> = Box::new(yama_requester);
let pile = Pile::new(raw_pile);
yama::operations::storing::store_fully(
&pile,
path,
&pointer_name,
tree_node,
parent,
get_number_of_workers("YAMA_CHUNKERS"),
)?;
for join_handle in requester_join_handles {
join_handle.join().expect("Expected to join handle");
}
Ok(())
}
pub fn handler<R: Read + Send + 'static, W: Write + Send + 'static>(
mut read: R,
mut write: W,
) -> anyhow::Result<()> {
introduction(&mut read, &mut write)?;
let path = scanning(&mut read, &mut write)?;
chunking(read, write, &path)?;
Ok(())
}

View File

@ -80,19 +80,14 @@ pub enum ResponseBody {
},
}
pub fn read_message<R: Read + Send + 'static, D: DeserializeOwned + Send + 'static>(
read: &mut R,
) -> anyhow::Result<D> {
pub fn read_message<R: Read, D: DeserializeOwned>(read: &mut R) -> anyhow::Result<D> {
let len = read.read_u32::<BigEndian>()?;
let mut data_vec = vec![0u8; len as usize];
read.read_exact(&mut data_vec)?;
Ok(serde_bare::from_slice(&data_vec)?)
}
pub fn write_message<W: Write + Send + 'static, S: Serialize + Send + 'static>(
write: &mut W,
message: &S,
) -> anyhow::Result<()> {
pub fn write_message<W: Write, S: Serialize>(write: &mut W, message: &S) -> anyhow::Result<()> {
let data_vec = serde_bare::to_vec(&message)?;
write.write_u32::<BigEndian>(data_vec.len().try_into()?)?;
write.write_all(&data_vec)?;