From a9e1d38051acf3d34e5d2f2ddbe3bda48929cbf4 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 9 Jul 2021 20:15:55 +0100 Subject: [PATCH] Reluctantly, have separate code path for Stdio --- datman/src/bin/datman.rs | 8 +++ datman/src/remote/backup_source_responder.rs | 52 +++++++++++++++++++- yama/src/remote/requester.rs | 45 +++++++++++++++-- 3 files changed, 101 insertions(+), 4 deletions(-) diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index c866d53..eeac400 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -27,6 +27,7 @@ use datman::commands::ilabel::interactive_labelling_session; use datman::commands::init_descriptor; use datman::descriptor::{load_descriptor, SourceDescriptor}; use datman::remote::backup_source_requester::backup_remote_source_to_destination; +use datman::remote::backup_source_responder; use std::str::FromStr; #[derive(Clap)] @@ -95,6 +96,9 @@ pub enum DatmanCommand { #[clap(long)] skip_metadata: bool, }, + + #[clap(name = "_backup_source_responder")] + InternalBackupSourceResponder, } pub struct HumanDateTime(pub DateTime); @@ -241,6 +245,10 @@ fn main() -> anyhow::Result<()> { yama::utils::get_number_of_workers("YAMA_EXTRACTORS"), )?; } + + DatmanCommand::InternalBackupSourceResponder => { + backup_source_responder::handler_stdio()?; + } } Ok(()) } diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs index 8e79fc1..e9cda0e 100644 --- a/datman/src/remote/backup_source_responder.rs +++ b/datman/src/remote/backup_source_responder.rs @@ -3,7 +3,7 @@ use crate::tree::scan; use anyhow::bail; -use std::io::{Read, Write}; +use std::io::{stdin, stdout, Read, Write}; use std::path::PathBuf; use yama::definitions::TreeNode; use yama::pile::{Pile, RawPile}; @@ -70,6 +70,38 @@ pub fn chunking( Ok(()) } +pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> { + let (pointer_name, tree_node, parent) = { + let stdin = stdin(); + let mut read = stdin.lock(); + let pointer_name: String = read_message(&mut read)?; + let tree_node: TreeNode = read_message(&mut read)?; + let parent: Option = read_message(&mut read)?; + (pointer_name, tree_node, parent) + }; + + let (yama_requester, requester_join_handles) = Requester::new_from_stdio(); + + let raw_pile: Box = Box::new(yama_requester); + + let pile = Pile::new(raw_pile); + + yama::operations::storing::store_fully( + &pile, + path, + &pointer_name, + tree_node, + parent, + get_number_of_workers("YAMA_CHUNKERS"), + )?; + + for join_handle in requester_join_handles { + join_handle.join().expect("Expected to join handle"); + } + + Ok(()) +} + pub fn handler( mut read: R, mut write: W, @@ -82,3 +114,21 @@ pub fn handler( Ok(()) } + +pub fn handler_stdio() -> anyhow::Result<()> { + let path = { + let stdin = stdin(); + let stdout = stdout(); + + let mut read = stdin.lock(); + let mut write = stdout.lock(); + introduction(&mut read, &mut write)?; + + let path = scanning(&mut read, &mut write)?; + path + }; + + chunking_stdio(&path)?; + + Ok(()) +} diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 04037e2..16212ac 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::io::{Read, Write}; +use std::io::{stdin, stdout, Read, Write}; use std::sync::{Arc, Mutex}; use std::thread; use std::thread::JoinHandle; @@ -54,8 +54,47 @@ impl Requester { ) } + pub fn new_from_stdio() -> (Self, Vec>) { + let in_flight: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + let (command_sender, command_receiver) = crossbeam_channel::bounded(16); + let mut handles = Vec::new(); + + { + // Spawn a reader + let in_flight = in_flight.clone(); + handles.push(thread::spawn(move || { + let stdin = stdin(); + let read = stdin.lock(); + if let Err(e) = Self::reader(read, in_flight) { + error!("reader failed: {:?}", e); + } + })); + } + + { + // Spawn a writer + let in_flight = in_flight.clone(); + let command_receiver = command_receiver.clone(); + handles.push(thread::spawn(move || { + let stdout = stdout(); + let write = stdout.lock(); + if let Err(e) = Self::writer(write, in_flight, command_receiver) { + error!("writer failed: {:?}", e); + } + })); + } + + ( + Requester { + commands: command_sender, + }, + handles, + ) + } + /// Thread that reads messages and sends them along. - fn reader( + fn reader( mut read: R, in_flight: Arc>>>, ) -> anyhow::Result<()> { @@ -71,7 +110,7 @@ impl Requester { } /// Thread that writes messages. - fn writer( + fn writer( mut write: W, in_flight: Arc>>>, command_receiver: Receiver<(RequestBody, Sender)>,