Reluctantly, have separate code path for Stdio

This commit is contained in:
Olivier 'reivilibre' 2021-07-09 20:15:55 +01:00
parent be5fea47c6
commit a9e1d38051
3 changed files with 101 additions and 4 deletions

View File

@ -27,6 +27,7 @@ use datman::commands::ilabel::interactive_labelling_session;
use datman::commands::init_descriptor; use datman::commands::init_descriptor;
use datman::descriptor::{load_descriptor, SourceDescriptor}; use datman::descriptor::{load_descriptor, SourceDescriptor};
use datman::remote::backup_source_requester::backup_remote_source_to_destination; use datman::remote::backup_source_requester::backup_remote_source_to_destination;
use datman::remote::backup_source_responder;
use std::str::FromStr; use std::str::FromStr;
#[derive(Clap)] #[derive(Clap)]
@ -95,6 +96,9 @@ pub enum DatmanCommand {
#[clap(long)] #[clap(long)]
skip_metadata: bool, skip_metadata: bool,
}, },
#[clap(name = "_backup_source_responder")]
InternalBackupSourceResponder,
} }
pub struct HumanDateTime(pub DateTime<Local>); pub struct HumanDateTime(pub DateTime<Local>);
@ -241,6 +245,10 @@ fn main() -> anyhow::Result<()> {
yama::utils::get_number_of_workers("YAMA_EXTRACTORS"), yama::utils::get_number_of_workers("YAMA_EXTRACTORS"),
)?; )?;
} }
DatmanCommand::InternalBackupSourceResponder => {
backup_source_responder::handler_stdio()?;
}
} }
Ok(()) Ok(())
} }

View File

@ -3,7 +3,7 @@
use crate::tree::scan; use crate::tree::scan;
use anyhow::bail; use anyhow::bail;
use std::io::{Read, Write}; use std::io::{stdin, stdout, Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
use yama::definitions::TreeNode; use yama::definitions::TreeNode;
use yama::pile::{Pile, RawPile}; use yama::pile::{Pile, RawPile};
@ -70,6 +70,38 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
Ok(()) Ok(())
} }
pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> {
let (pointer_name, tree_node, parent) = {
let stdin = stdin();
let mut read = stdin.lock();
let pointer_name: String = read_message(&mut read)?;
let tree_node: TreeNode = read_message(&mut read)?;
let parent: Option<String> = read_message(&mut read)?;
(pointer_name, tree_node, parent)
};
let (yama_requester, requester_join_handles) = Requester::new_from_stdio();
let raw_pile: Box<dyn RawPile> = Box::new(yama_requester);
let pile = Pile::new(raw_pile);
yama::operations::storing::store_fully(
&pile,
path,
&pointer_name,
tree_node,
parent,
get_number_of_workers("YAMA_CHUNKERS"),
)?;
for join_handle in requester_join_handles {
join_handle.join().expect("Expected to join handle");
}
Ok(())
}
pub fn handler<R: Read + Send + 'static, W: Write + Send + 'static>( pub fn handler<R: Read + Send + 'static, W: Write + Send + 'static>(
mut read: R, mut read: R,
mut write: W, mut write: W,
@ -82,3 +114,21 @@ pub fn handler<R: Read + Send + 'static, W: Write + Send + 'static>(
Ok(()) Ok(())
} }
pub fn handler_stdio() -> anyhow::Result<()> {
let path = {
let stdin = stdin();
let stdout = stdout();
let mut read = stdin.lock();
let mut write = stdout.lock();
introduction(&mut read, &mut write)?;
let path = scanning(&mut read, &mut write)?;
path
};
chunking_stdio(&path)?;
Ok(())
}

View File

@ -1,5 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Write}; use std::io::{stdin, stdout, Read, Write};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::thread::JoinHandle; use std::thread::JoinHandle;
@ -54,8 +54,47 @@ impl Requester {
) )
} }
pub fn new_from_stdio() -> (Self, Vec<JoinHandle<()>>) {
let in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>> =
Arc::new(Mutex::new(HashMap::new()));
let (command_sender, command_receiver) = crossbeam_channel::bounded(16);
let mut handles = Vec::new();
{
// Spawn a reader
let in_flight = in_flight.clone();
handles.push(thread::spawn(move || {
let stdin = stdin();
let read = stdin.lock();
if let Err(e) = Self::reader(read, in_flight) {
error!("reader failed: {:?}", e);
}
}));
}
{
// Spawn a writer
let in_flight = in_flight.clone();
let command_receiver = command_receiver.clone();
handles.push(thread::spawn(move || {
let stdout = stdout();
let write = stdout.lock();
if let Err(e) = Self::writer(write, in_flight, command_receiver) {
error!("writer failed: {:?}", e);
}
}));
}
(
Requester {
commands: command_sender,
},
handles,
)
}
/// Thread that reads messages and sends them along. /// Thread that reads messages and sends them along.
fn reader<R: Read + Send + 'static>( fn reader<R: Read>(
mut read: R, mut read: R,
in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>, in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -71,7 +110,7 @@ impl Requester {
} }
/// Thread that writes messages. /// Thread that writes messages.
fn writer<W: Write + Send + 'static>( fn writer<W: Write>(
mut write: W, mut write: W,
in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>, in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>,
command_receiver: Receiver<(RequestBody, Sender<ResponseBody>)>, command_receiver: Receiver<(RequestBody, Sender<ResponseBody>)>,