Some progress on remote source backup

This commit is contained in:
Olivier 'reivilibre' 2021-07-09 20:54:12 +01:00
parent a9e1d38051
commit b008a80ca4
5 changed files with 57 additions and 14 deletions

View File

@ -19,11 +19,13 @@ use yama::utils::get_number_of_workers;
// your Yama pile. Do NOT connect to untrusted or compromised systems using this mechanism (yet).
pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
info!("Introduction.");
let version = env!("CARGO_PKG_VERSION");
write_message(
write,
&format!("Datman v{} Backup Source Requester", version),
)?;
write.flush()?;
let foreign_side: String = read_message(read)?;
let expected_foreign_side = format!("Datman v{} Backup Source Responder", version);
@ -43,7 +45,9 @@ pub fn scanning<R: Read, W: Write>(
write: &mut W,
path: &Path,
) -> anyhow::Result<Option<FileTree<(), (), (), ()>>> {
info!("Scanning.");
write_message(write, &path)?;
write.flush()?;
let scan_result: Option<FileTree<(), (), (), ()>> = read_message(read)?;
Ok(scan_result)
@ -55,9 +59,13 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static, RP: RawPile
pointer_name: String,
tree_node: &TreeNode,
raw_pile: Arc<RP>,
parent: Option<String>,
) -> anyhow::Result<()> {
info!("Chunking.");
write_message(&mut write, &pointer_name)?;
write_message(&mut write, tree_node)?;
write_message(&mut write, &parent)?;
write.flush()?;
let join_handles = Responder::start(
read,
@ -66,10 +74,14 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static, RP: RawPile
raw_pile,
);
info!("JH");
for handle in join_handles {
handle.join().expect("Join handle should not fail");
}
info!("J!");
Ok(())
}
@ -165,7 +177,7 @@ pub fn backup_remote_source_to_destination(
let raw_pile = Arc::new(pile.raw_pile);
let pile = Pile::new(raw_pile.clone());
chunking(read, write, pointer_name.clone(), &root, raw_pile)?;
chunking(read, write, pointer_name.clone(), &root, raw_pile, parent)?;
info!("Stored! Checking for existence...");
if pile.list_pointers()?.contains(&pointer_name) {

View File

@ -3,6 +3,7 @@
use crate::tree::scan;
use anyhow::bail;
use log::info;
use std::io::{stdin, stdout, Read, Write};
use std::path::PathBuf;
use yama::definitions::TreeNode;
@ -17,6 +18,7 @@ pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::R
write,
&format!("Datman v{} Backup Source Responder", version),
)?;
write.flush()?;
let foreign_side: String = read_message(read)?;
let expected_foreign_side = format!("Datman v{} Backup Source Requester", version);
@ -35,6 +37,7 @@ pub fn scanning<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Resul
let path: PathBuf = read_message(read)?;
let scan_result = scan(&path)?;
write_message(write, &scan_result)?;
write.flush()?;
Ok(path)
}
@ -80,6 +83,12 @@ pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> {
(pointer_name, tree_node, parent)
};
info!(
"Have pointer_name = {:?}, parent = {:?}",
pointer_name, parent
);
let requester_join_handles = {
let (yama_requester, requester_join_handles) = Requester::new_from_stdio();
let raw_pile: Box<dyn RawPile> = Box::new(yama_requester);
@ -95,10 +104,18 @@ pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> {
get_number_of_workers("YAMA_CHUNKERS"),
)?;
requester_join_handles
};
info!("Waiting to join DBG.");
for join_handle in requester_join_handles {
join_handle.join().expect("Expected to join handle");
info!("handle");
}
info!("Chunking completed.");
Ok(())
}
@ -122,12 +139,15 @@ pub fn handler_stdio() -> anyhow::Result<()> {
let mut read = stdin.lock();
let mut write = stdout.lock();
info!("Introduction.");
introduction(&mut read, &mut write)?;
info!("Scanning.");
let path = scanning(&mut read, &mut write)?;
path
};
info!("Chunking.");
chunking_stdio(&path)?;
Ok(())

View File

@ -22,6 +22,8 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use log::debug;
use crate::pile::Keyspace;
pub mod requester;
@ -82,6 +84,7 @@ pub enum ResponseBody {
pub fn read_message<R: Read, D: DeserializeOwned>(read: &mut R) -> anyhow::Result<D> {
let len = read.read_u32::<BigEndian>()?;
debug!("RM {:?}", len);
let mut data_vec = vec![0u8; len as usize];
read.read_exact(&mut data_vec)?;
Ok(serde_bare::from_slice(&data_vec)?)
@ -89,7 +92,9 @@ pub fn read_message<R: Read, D: DeserializeOwned>(read: &mut R) -> anyhow::Resul
pub fn write_message<W: Write, S: Serialize>(write: &mut W, message: &S) -> anyhow::Result<()> {
let data_vec = serde_bare::to_vec(&message)?;
debug!("WM {:?}", data_vec.len());
write.write_u32::<BigEndian>(data_vec.len().try_into()?)?;
write.write_all(&data_vec)?;
write.flush()?;
Ok(())
}

View File

@ -6,7 +6,7 @@ use std::thread::JoinHandle;
use anyhow::anyhow;
use crossbeam_channel::{Receiver, Sender};
use log::error;
use log::{error, info};
use crate::pile::{Keyspace, RawPile};
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
@ -132,12 +132,14 @@ impl Requester {
body: req_body,
},
)?;
write.flush()?;
}
Ok(())
}
/// Helper to make a request and wait for the result.
fn request(&self, req: RequestBody) -> anyhow::Result<ResponseBody> {
info!("Req...");
let (sender, receiver) = crossbeam_channel::bounded(0);
self.commands
.send((req, sender))
@ -151,6 +153,7 @@ impl Requester {
impl RawPile for Requester {
fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<bool> {
info!("Ex?");
match self.request(RequestBody::CheckExists {
kind,
key: key.to_vec(),
@ -175,6 +178,7 @@ impl RawPile for Requester {
}
}
fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> {
info!("Wr?");
match self.request(RequestBody::Write {
kind,
key: key.to_vec(),

View File

@ -8,7 +8,7 @@ use anyhow::anyhow;
use crossbeam_channel::{Receiver, Sender};
use itertools::Itertools;
use log::error;
use log::warn;
use log::{info, warn};
use crate::pile::RawPile;
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
@ -79,6 +79,7 @@ impl Responder {
) -> anyhow::Result<()> {
loop {
let request: Request = read_message(&mut read)?;
info!("M!<");
match request.body {
RequestBody::NextBatch { token } => {
@ -109,6 +110,7 @@ impl Responder {
) -> anyhow::Result<()> {
while let Ok(response) = responses.recv() {
write_message(&mut write, &response)?;
write.flush()?;
}
Ok(())
}