Write down basic implementation of datman pull

This commit is contained in:
Olivier 'reivilibre' 2022-06-14 19:56:45 +01:00
parent e357547777
commit d384b1bcbd
2 changed files with 73 additions and 5 deletions

View File

@ -18,11 +18,12 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
use std::fs::File; use std::fs::File;
use std::io::{BufReader, BufWriter, Write}; use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use clap::Parser; use clap::Parser;
use env_logger::Env; use env_logger::Env;
use anyhow::bail; use anyhow::{bail, Context};
use bare_metrics_recorder::recording::BareMetricsRecorderCore; use bare_metrics_recorder::recording::BareMetricsRecorderCore;
use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc}; use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc};
use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination}; use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination};
@ -33,6 +34,7 @@ use datman::get_hostname;
use datman::remote::backup_source_requester::backup_remote_source_to_destination; use datman::remote::backup_source_requester::backup_remote_source_to_destination;
use datman::remote::backup_source_responder; use datman::remote::backup_source_responder;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use itertools::Itertools;
use log::info; use log::info;
use std::str::FromStr; use std::str::FromStr;
use yama::commands::load_pile_descriptor; use yama::commands::load_pile_descriptor;
@ -136,7 +138,10 @@ pub enum DatmanCommand {
}, },
#[clap(name = "_pull_responder_offerer")] #[clap(name = "_pull_responder_offerer")]
InternalPullResponderOfferer { pile_name: String }, InternalPullResponderOfferer {
datman_path: PathBuf,
pile_name: String,
},
} }
pub struct HumanDateTime(pub DateTime<Local>); pub struct HumanDateTime(pub DateTime<Local>);
@ -344,8 +349,15 @@ fn main() -> anyhow::Result<()> {
datman::commands::report::print_filesystem_space(&destination.path)?; datman::commands::report::print_filesystem_space(&destination.path)?;
datman::commands::report::print_report(&report)?; datman::commands::report::print_report(&report)?;
} }
DatmanCommand::Pull { .. } => {} DatmanCommand::Pull {
DatmanCommand::InternalPullResponderOfferer { pile_name } => { remote_and_remote_pile,
pile_name,
} => {
let (hostname, remote_datman_path, remote_pile_name) = remote_and_remote_pile
.split(':')
.collect_tuple()
.context("You must pull from a remote pile specified as remote:path:pile.")?;
let descriptor = load_descriptor(Path::new(".")).unwrap(); let descriptor = load_descriptor(Path::new(".")).unwrap();
let source = &descriptor.piles[&pile_name]; let source = &descriptor.piles[&pile_name];
@ -356,6 +368,62 @@ fn main() -> anyhow::Result<()> {
BypassLevel::CompressionBypass, BypassLevel::CompressionBypass,
)?; )?;
let pbar = ProgressBar::with_draw_target(0, ProgressDrawTarget::stdout_with_hz(10));
pbar.set_style(
ProgressStyle::default_bar().template(
"[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
),
);
pbar.set_message("pulling");
let remote_host_descriptor = descriptor
.remote_hosts
.get(hostname)
.ok_or_else(|| anyhow::anyhow!("No remote host by that name: {:?}.", hostname))?;
let mut connection = Command::new("ssh")
.arg(&remote_host_descriptor.user_at_host)
.arg("--")
.arg(
&remote_host_descriptor
.path_to_datman
.as_ref()
.map(|x| x.as_str())
.unwrap_or("datman"),
)
.arg("_pull_responder_offerer")
.arg(remote_datman_path)
.arg(remote_pile_name)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()?;
let mut reader = BufReader::new(connection.stdout.take().unwrap());
let mut writer = BufWriter::new(connection.stdin.take().unwrap());
pushpull::accepting_side(
&pile,
&bypass_raw_pile,
&mut reader,
&mut writer,
Box::new(pbar),
)?;
}
DatmanCommand::InternalPullResponderOfferer {
datman_path,
pile_name,
} => {
let descriptor = load_descriptor(&datman_path).unwrap();
let source = &descriptor.piles[&pile_name];
let pile_desc = load_pile_descriptor(&source.path)?;
let (pile, bypass_raw_pile) = open_pile_with_work_bypass(
&source.path,
&pile_desc,
BypassLevel::CompressionBypass,
)?;
let mut stdin = BufReader::new(io_streams::StreamReader::stdin()?); let mut stdin = BufReader::new(io_streams::StreamReader::stdin()?);
let mut stdout = BufWriter::new(io_streams::StreamWriter::stdout()?); let mut stdout = BufWriter::new(io_streams::StreamWriter::stdout()?);

View File

@ -184,7 +184,7 @@ pub fn offering_side<R: Read, W: Write>(
pub fn accepting_side<R: Read, W: Write>( pub fn accepting_side<R: Read, W: Write>(
pile: &Pile<Arc<Box<dyn RawPile>>>, pile: &Pile<Arc<Box<dyn RawPile>>>,
bypass_pile: Box<dyn RawPile>, bypass_pile: &Box<dyn RawPile>,
reader: &mut R, reader: &mut R,
writer: &mut W, writer: &mut W,
mut progress: Box<dyn ProgressTracker>, mut progress: Box<dyn ProgressTracker>,