From 786e943bbdfdd01ee30baec1edaba7069f8d2ecf Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 2 Jul 2021 21:28:51 +0100 Subject: [PATCH] Add untested push command to yama --- yama/src/bin/yama.rs | 51 +++++++++++++++++++++++++++++++++ yama/src/operations/pushpull.rs | 19 ++++++------ 2 files changed, 60 insertions(+), 10 deletions(-) diff --git a/yama/src/bin/yama.rs b/yama/src/bin/yama.rs index 5be9885..d30b68c 100644 --- a/yama/src/bin/yama.rs +++ b/yama/src/bin/yama.rs @@ -19,10 +19,13 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, Context}; use clap::{crate_authors, crate_description, crate_version, Clap}; +use log::info; use env_logger::Env; +use std::sync::Arc; use yama::commands::{fully_integrate_pointer_node, load_pile_descriptor, open_pile}; use yama::operations::checking::VacuumMode; +use yama::operations::pushpull::{determine_bypass_level, open_pile_with_work_bypass, push_to}; use yama::operations::{checking, extracting}; use yama::pile::{Pile, PileDescriptor, RawPile}; use yama::{commands, debug}; @@ -79,6 +82,15 @@ enum PileCommand { }, /// Enter a debug prompt for manually operating on the yama pile. Debug, + + /// Pushes a pointer from this pile to another pile. + Push { + /// The name of the pointer to push. + pointer_name: String, + + /// The path to the other pile to push the pointer to. + other_pile_path: PathBuf, + }, } fn main() -> anyhow::Result<()> { @@ -200,6 +212,45 @@ fn main() -> anyhow::Result<()> { let (pdesc, pile) = open_pile()?; debug::debug_prompt(pdesc, pile)?; } + + PileCommand::Push { + pointer_name, + other_pile_path, + } => { + let this_pile_path = PathBuf::from("."); + + let descriptor_this = load_pile_descriptor(".".as_ref()) + .context("Failed to load this pile descriptor")?; + + let descriptor_other = load_pile_descriptor(other_pile_path) + .context("Failed to load foreign pile descriptor.")?; + + let bypass_level = determine_bypass_level( + &descriptor_this, + &this_pile_path, + &descriptor_other, + &other_pile_path, + )?; + + info!("Using bypass level: {:?}", bypass_level); + + let (this_pile, this_rp_bypass) = + open_pile_with_work_bypass(&this_pile_path, &descriptor_this, bypass_level)?; + let (other_pile, other_rp_bypass) = + open_pile_with_work_bypass(&other_pile_path, &descriptor_other, bypass_level)?; + + info!("loaded"); + + push_to( + Arc::new(this_pile), + this_rp_bypass, + Arc::new(other_pile), + other_rp_bypass, + vec![pointer_name.clone()], + true, + 32, + )?; + } } Ok(()) diff --git a/yama/src/operations/pushpull.rs b/yama/src/operations/pushpull.rs index f185776..63f9f2a 100644 --- a/yama/src/operations/pushpull.rs +++ b/yama/src/operations/pushpull.rs @@ -24,7 +24,7 @@ use std::sync::Arc; pub fn push_to( from_pile: Arc>>>, from_rp_bypass: Arc>, - to_pile: Arc>>, + to_pile: Arc>>>, to_rp_bypass: Arc>, pointers: Vec, make_progress_bar: bool, @@ -79,9 +79,6 @@ pub fn push_to( .expect("No fail"); } - let initial_tasks = to_process.len() as u64; - pbar.set_length(initial_tasks); - // start the work let critical_failures = Arc::new(AtomicU32::new(0)); @@ -124,6 +121,7 @@ pub fn push_to( drop(jobs_rx); drop(stat_tx); + pbar.set_length(0); if let Err(e) = pusher_manager(&pbar, stat_rx, jobs_tx) { error!("[critical!] Push manager FAILED: {:?}", e); critical_failures.fetch_add(1, Ordering::Relaxed); @@ -168,7 +166,7 @@ fn pusher_manager( fn pusher_worker( from_pile: Arc>>>, from_rp_bypass: Arc>, - to_pile: Arc>>, + to_pile: Arc>>>, to_rp_bypass: Arc>, jobs_rx: Receiver, stat_tx: Sender, @@ -241,6 +239,7 @@ fn pusher_worker( Ok(()) } +#[derive(Copy, Clone, Debug)] pub enum BypassLevel { NoBypass, CompressionBypass, @@ -284,7 +283,7 @@ pub fn open_pile_with_work_bypass( dir: &Path, desc: &PileDescriptor, bypass_level: BypassLevel, -) -> anyhow::Result<(Pile>, Box)> { +) -> anyhow::Result<(Pile>>, Arc>)> { let num_compressors = get_number_of_workers("YAMA_COMPRESSORS"); let num_decompressors = get_number_of_workers("YAMA_DECOMPRESSORS"); @@ -302,8 +301,8 @@ pub fn open_pile_with_work_bypass( BypassLevel::CompressionBypass => { let common_raw_pile: Arc> = Arc::new(Box::new(blob_raw_pile)); - let raw_pile: Box = match desc.compression { - None => Box::new(common_raw_pile.clone()), + let raw_pile: Arc> = match desc.compression { + None => common_raw_pile.clone(), Some(comp_level) => { let mut dictionary = Vec::new(); let dict_path = dir.join("important_zstd.dict"); @@ -321,10 +320,10 @@ pub fn open_pile_with_work_bypass( }, )?; - Box::new(compressed_pile) + Arc::new(Box::new(compressed_pile)) } }; - Ok((Pile::new(raw_pile), Box::new(common_raw_pile))) + Ok((Pile::new(raw_pile), common_raw_pile)) } } }