From b008a80ca41b3798c1fcf9f7efb1d71768c0929c Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 9 Jul 2021 20:54:12 +0100 Subject: [PATCH] Some progress on remote source backup --- datman/src/remote/backup_source_requester.rs | 14 ++++++- datman/src/remote/backup_source_responder.rs | 42 +++++++++++++++----- yama/src/remote.rs | 5 +++ yama/src/remote/requester.rs | 6 ++- yama/src/remote/responder.rs | 4 +- 5 files changed, 57 insertions(+), 14 deletions(-) diff --git a/datman/src/remote/backup_source_requester.rs b/datman/src/remote/backup_source_requester.rs index 22087a7..99665b0 100644 --- a/datman/src/remote/backup_source_requester.rs +++ b/datman/src/remote/backup_source_requester.rs @@ -19,11 +19,13 @@ use yama::utils::get_number_of_workers; // your Yama pile. Do NOT connect to untrusted or compromised systems using this mechanism (yet). pub fn introduction(read: &mut R, write: &mut W) -> anyhow::Result<()> { + info!("Introduction."); let version = env!("CARGO_PKG_VERSION"); write_message( write, &format!("Datman v{} Backup Source Requester", version), )?; + write.flush()?; let foreign_side: String = read_message(read)?; let expected_foreign_side = format!("Datman v{} Backup Source Responder", version); @@ -43,7 +45,9 @@ pub fn scanning( write: &mut W, path: &Path, ) -> anyhow::Result>> { + info!("Scanning."); write_message(write, &path)?; + write.flush()?; let scan_result: Option> = read_message(read)?; Ok(scan_result) @@ -55,9 +59,13 @@ pub fn chunking, + parent: Option, ) -> anyhow::Result<()> { + info!("Chunking."); write_message(&mut write, &pointer_name)?; write_message(&mut write, tree_node)?; + write_message(&mut write, &parent)?; + write.flush()?; let join_handles = Responder::start( read, @@ -66,10 +74,14 @@ pub fn chunking(read: &mut R, write: &mut W) -> anyhow::R write, &format!("Datman v{} Backup Source Responder", version), )?; + write.flush()?; let foreign_side: String = read_message(read)?; let expected_foreign_side = format!("Datman v{} Backup Source Requester", version); @@ -35,6 +37,7 @@ pub fn scanning(read: &mut R, write: &mut W) -> anyhow::Resul let path: PathBuf = read_message(read)?; let scan_result = scan(&path)?; write_message(write, &scan_result)?; + write.flush()?; Ok(path) } @@ -80,25 +83,39 @@ pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> { (pointer_name, tree_node, parent) }; - let (yama_requester, requester_join_handles) = Requester::new_from_stdio(); + info!( + "Have pointer_name = {:?}, parent = {:?}", + pointer_name, parent + ); - let raw_pile: Box = Box::new(yama_requester); + let requester_join_handles = { + let (yama_requester, requester_join_handles) = Requester::new_from_stdio(); - let pile = Pile::new(raw_pile); + let raw_pile: Box = Box::new(yama_requester); - yama::operations::storing::store_fully( - &pile, - path, - &pointer_name, - tree_node, - parent, - get_number_of_workers("YAMA_CHUNKERS"), - )?; + let pile = Pile::new(raw_pile); + + yama::operations::storing::store_fully( + &pile, + path, + &pointer_name, + tree_node, + parent, + get_number_of_workers("YAMA_CHUNKERS"), + )?; + + requester_join_handles + }; + + info!("Waiting to join DBG."); for join_handle in requester_join_handles { join_handle.join().expect("Expected to join handle"); + info!("handle"); } + info!("Chunking completed."); + Ok(()) } @@ -122,12 +139,15 @@ pub fn handler_stdio() -> anyhow::Result<()> { let mut read = stdin.lock(); let mut write = stdout.lock(); + info!("Introduction."); introduction(&mut read, &mut write)?; + info!("Scanning."); let path = scanning(&mut read, &mut write)?; path }; + info!("Chunking."); chunking_stdio(&path)?; Ok(()) diff --git a/yama/src/remote.rs b/yama/src/remote.rs index 43f1fa3..d762fc8 100644 --- a/yama/src/remote.rs +++ b/yama/src/remote.rs @@ -22,6 +22,8 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use log::debug; + use crate::pile::Keyspace; pub mod requester; @@ -82,6 +84,7 @@ pub enum ResponseBody { pub fn read_message(read: &mut R) -> anyhow::Result { let len = read.read_u32::()?; + debug!("RM {:?}", len); let mut data_vec = vec![0u8; len as usize]; read.read_exact(&mut data_vec)?; Ok(serde_bare::from_slice(&data_vec)?) @@ -89,7 +92,9 @@ pub fn read_message(read: &mut R) -> anyhow::Resul pub fn write_message(write: &mut W, message: &S) -> anyhow::Result<()> { let data_vec = serde_bare::to_vec(&message)?; + debug!("WM {:?}", data_vec.len()); write.write_u32::(data_vec.len().try_into()?)?; write.write_all(&data_vec)?; + write.flush()?; Ok(()) } diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 16212ac..3827542 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -6,7 +6,7 @@ use std::thread::JoinHandle; use anyhow::anyhow; use crossbeam_channel::{Receiver, Sender}; -use log::error; +use log::{error, info}; use crate::pile::{Keyspace, RawPile}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; @@ -132,12 +132,14 @@ impl Requester { body: req_body, }, )?; + write.flush()?; } Ok(()) } /// Helper to make a request and wait for the result. fn request(&self, req: RequestBody) -> anyhow::Result { + info!("Req..."); let (sender, receiver) = crossbeam_channel::bounded(0); self.commands .send((req, sender)) @@ -151,6 +153,7 @@ impl Requester { impl RawPile for Requester { fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result { + info!("Ex?"); match self.request(RequestBody::CheckExists { kind, key: key.to_vec(), @@ -175,6 +178,7 @@ impl RawPile for Requester { } } fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> { + info!("Wr?"); match self.request(RequestBody::Write { kind, key: key.to_vec(), diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index 32109f8..84118a1 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -8,7 +8,7 @@ use anyhow::anyhow; use crossbeam_channel::{Receiver, Sender}; use itertools::Itertools; use log::error; -use log::warn; +use log::{info, warn}; use crate::pile::RawPile; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; @@ -79,6 +79,7 @@ impl Responder { ) -> anyhow::Result<()> { loop { let request: Request = read_message(&mut read)?; + info!("M!<"); match request.body { RequestBody::NextBatch { token } => { @@ -109,6 +110,7 @@ impl Responder { ) -> anyhow::Result<()> { while let Ok(response) = responses.recv() { write_message(&mut write, &response)?; + write.flush()?; } Ok(()) }