diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index d56ea76..9b3ff2e 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -18,11 +18,12 @@ along with Yama. If not, see . use std::fs::File; use std::io::{BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; use clap::Parser; use env_logger::Env; -use anyhow::bail; +use anyhow::{bail, Context}; use bare_metrics_recorder::recording::BareMetricsRecorderCore; use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc}; use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination}; @@ -33,6 +34,7 @@ use datman::get_hostname; use datman::remote::backup_source_requester::backup_remote_source_to_destination; use datman::remote::backup_source_responder; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; +use itertools::Itertools; use log::info; use std::str::FromStr; use yama::commands::load_pile_descriptor; @@ -136,7 +138,10 @@ pub enum DatmanCommand { }, #[clap(name = "_pull_responder_offerer")] - InternalPullResponderOfferer { pile_name: String }, + InternalPullResponderOfferer { + datman_path: PathBuf, + pile_name: String, + }, } pub struct HumanDateTime(pub DateTime); @@ -344,8 +349,15 @@ fn main() -> anyhow::Result<()> { datman::commands::report::print_filesystem_space(&destination.path)?; datman::commands::report::print_report(&report)?; } - DatmanCommand::Pull { .. } => {} - DatmanCommand::InternalPullResponderOfferer { pile_name } => { + DatmanCommand::Pull { + remote_and_remote_pile, + pile_name, + } => { + let (hostname, remote_datman_path, remote_pile_name) = remote_and_remote_pile + .split(':') + .collect_tuple() + .context("You must pull from a remote pile specified as remote:path:pile.")?; + let descriptor = load_descriptor(Path::new(".")).unwrap(); let source = &descriptor.piles[&pile_name]; @@ -356,6 +368,62 @@ fn main() -> anyhow::Result<()> { BypassLevel::CompressionBypass, )?; + let pbar = ProgressBar::with_draw_target(0, ProgressDrawTarget::stdout_with_hz(10)); + pbar.set_style( + ProgressStyle::default_bar().template( + "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", + ), + ); + pbar.set_message("pulling"); + + let remote_host_descriptor = descriptor + .remote_hosts + .get(hostname) + .ok_or_else(|| anyhow::anyhow!("No remote host by that name: {:?}.", hostname))?; + + let mut connection = Command::new("ssh") + .arg(&remote_host_descriptor.user_at_host) + .arg("--") + .arg( + &remote_host_descriptor + .path_to_datman + .as_ref() + .map(|x| x.as_str()) + .unwrap_or("datman"), + ) + .arg("_pull_responder_offerer") + .arg(remote_datman_path) + .arg(remote_pile_name) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + + let mut reader = BufReader::new(connection.stdout.take().unwrap()); + let mut writer = BufWriter::new(connection.stdin.take().unwrap()); + + pushpull::accepting_side( + &pile, + &bypass_raw_pile, + &mut reader, + &mut writer, + Box::new(pbar), + )?; + } + DatmanCommand::InternalPullResponderOfferer { + datman_path, + pile_name, + } => { + let descriptor = load_descriptor(&datman_path).unwrap(); + let source = &descriptor.piles[&pile_name]; + + let pile_desc = load_pile_descriptor(&source.path)?; + let (pile, bypass_raw_pile) = open_pile_with_work_bypass( + &source.path, + &pile_desc, + BypassLevel::CompressionBypass, + )?; + let mut stdin = BufReader::new(io_streams::StreamReader::stdin()?); let mut stdout = BufWriter::new(io_streams::StreamWriter::stdout()?); diff --git a/datman/src/commands/pushpull.rs b/datman/src/commands/pushpull.rs index 6ada2e1..25f71d1 100644 --- a/datman/src/commands/pushpull.rs +++ b/datman/src/commands/pushpull.rs @@ -184,7 +184,7 @@ pub fn offering_side( pub fn accepting_side( pile: &Pile>>, - bypass_pile: Box, + bypass_pile: &Box, reader: &mut R, writer: &mut W, mut progress: Box,