Add untested push command to yama
This commit is contained in:
parent
336b7bcb7f
commit
786e943bbd
@ -19,10 +19,13 @@ use std::path::{Path, PathBuf};
|
|||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use clap::{crate_authors, crate_description, crate_version, Clap};
|
use clap::{crate_authors, crate_description, crate_version, Clap};
|
||||||
|
use log::info;
|
||||||
|
|
||||||
use env_logger::Env;
|
use env_logger::Env;
|
||||||
|
use std::sync::Arc;
|
||||||
use yama::commands::{fully_integrate_pointer_node, load_pile_descriptor, open_pile};
|
use yama::commands::{fully_integrate_pointer_node, load_pile_descriptor, open_pile};
|
||||||
use yama::operations::checking::VacuumMode;
|
use yama::operations::checking::VacuumMode;
|
||||||
|
use yama::operations::pushpull::{determine_bypass_level, open_pile_with_work_bypass, push_to};
|
||||||
use yama::operations::{checking, extracting};
|
use yama::operations::{checking, extracting};
|
||||||
use yama::pile::{Pile, PileDescriptor, RawPile};
|
use yama::pile::{Pile, PileDescriptor, RawPile};
|
||||||
use yama::{commands, debug};
|
use yama::{commands, debug};
|
||||||
@ -79,6 +82,15 @@ enum PileCommand {
|
|||||||
},
|
},
|
||||||
/// Enter a debug prompt for manually operating on the yama pile.
|
/// Enter a debug prompt for manually operating on the yama pile.
|
||||||
Debug,
|
Debug,
|
||||||
|
|
||||||
|
/// Pushes a pointer from this pile to another pile.
|
||||||
|
Push {
|
||||||
|
/// The name of the pointer to push.
|
||||||
|
pointer_name: String,
|
||||||
|
|
||||||
|
/// The path to the other pile to push the pointer to.
|
||||||
|
other_pile_path: PathBuf,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
@ -200,6 +212,45 @@ fn main() -> anyhow::Result<()> {
|
|||||||
let (pdesc, pile) = open_pile()?;
|
let (pdesc, pile) = open_pile()?;
|
||||||
debug::debug_prompt(pdesc, pile)?;
|
debug::debug_prompt(pdesc, pile)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PileCommand::Push {
|
||||||
|
pointer_name,
|
||||||
|
other_pile_path,
|
||||||
|
} => {
|
||||||
|
let this_pile_path = PathBuf::from(".");
|
||||||
|
|
||||||
|
let descriptor_this = load_pile_descriptor(".".as_ref())
|
||||||
|
.context("Failed to load this pile descriptor")?;
|
||||||
|
|
||||||
|
let descriptor_other = load_pile_descriptor(other_pile_path)
|
||||||
|
.context("Failed to load foreign pile descriptor.")?;
|
||||||
|
|
||||||
|
let bypass_level = determine_bypass_level(
|
||||||
|
&descriptor_this,
|
||||||
|
&this_pile_path,
|
||||||
|
&descriptor_other,
|
||||||
|
&other_pile_path,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
info!("Using bypass level: {:?}", bypass_level);
|
||||||
|
|
||||||
|
let (this_pile, this_rp_bypass) =
|
||||||
|
open_pile_with_work_bypass(&this_pile_path, &descriptor_this, bypass_level)?;
|
||||||
|
let (other_pile, other_rp_bypass) =
|
||||||
|
open_pile_with_work_bypass(&other_pile_path, &descriptor_other, bypass_level)?;
|
||||||
|
|
||||||
|
info!("loaded");
|
||||||
|
|
||||||
|
push_to(
|
||||||
|
Arc::new(this_pile),
|
||||||
|
this_rp_bypass,
|
||||||
|
Arc::new(other_pile),
|
||||||
|
other_rp_bypass,
|
||||||
|
vec![pointer_name.clone()],
|
||||||
|
true,
|
||||||
|
32,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -24,7 +24,7 @@ use std::sync::Arc;
|
|||||||
pub fn push_to(
|
pub fn push_to(
|
||||||
from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
|
from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
|
||||||
from_rp_bypass: Arc<Box<dyn RawPile>>,
|
from_rp_bypass: Arc<Box<dyn RawPile>>,
|
||||||
to_pile: Arc<Pile<Box<dyn RawPile>>>,
|
to_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
|
||||||
to_rp_bypass: Arc<Box<dyn RawPile>>,
|
to_rp_bypass: Arc<Box<dyn RawPile>>,
|
||||||
pointers: Vec<String>,
|
pointers: Vec<String>,
|
||||||
make_progress_bar: bool,
|
make_progress_bar: bool,
|
||||||
@ -79,9 +79,6 @@ pub fn push_to(
|
|||||||
.expect("No fail");
|
.expect("No fail");
|
||||||
}
|
}
|
||||||
|
|
||||||
let initial_tasks = to_process.len() as u64;
|
|
||||||
pbar.set_length(initial_tasks);
|
|
||||||
|
|
||||||
// start the work
|
// start the work
|
||||||
|
|
||||||
let critical_failures = Arc::new(AtomicU32::new(0));
|
let critical_failures = Arc::new(AtomicU32::new(0));
|
||||||
@ -124,6 +121,7 @@ pub fn push_to(
|
|||||||
drop(jobs_rx);
|
drop(jobs_rx);
|
||||||
drop(stat_tx);
|
drop(stat_tx);
|
||||||
|
|
||||||
|
pbar.set_length(0);
|
||||||
if let Err(e) = pusher_manager(&pbar, stat_rx, jobs_tx) {
|
if let Err(e) = pusher_manager(&pbar, stat_rx, jobs_tx) {
|
||||||
error!("[critical!] Push manager FAILED: {:?}", e);
|
error!("[critical!] Push manager FAILED: {:?}", e);
|
||||||
critical_failures.fetch_add(1, Ordering::Relaxed);
|
critical_failures.fetch_add(1, Ordering::Relaxed);
|
||||||
@ -168,7 +166,7 @@ fn pusher_manager(
|
|||||||
fn pusher_worker(
|
fn pusher_worker(
|
||||||
from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
|
from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
|
||||||
from_rp_bypass: Arc<Box<dyn RawPile>>,
|
from_rp_bypass: Arc<Box<dyn RawPile>>,
|
||||||
to_pile: Arc<Pile<Box<dyn RawPile>>>,
|
to_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
|
||||||
to_rp_bypass: Arc<Box<dyn RawPile>>,
|
to_rp_bypass: Arc<Box<dyn RawPile>>,
|
||||||
jobs_rx: Receiver<RecursiveChunkRef>,
|
jobs_rx: Receiver<RecursiveChunkRef>,
|
||||||
stat_tx: Sender<PushWorkerToManagerMessage>,
|
stat_tx: Sender<PushWorkerToManagerMessage>,
|
||||||
@ -241,6 +239,7 @@ fn pusher_worker(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub enum BypassLevel {
|
pub enum BypassLevel {
|
||||||
NoBypass,
|
NoBypass,
|
||||||
CompressionBypass,
|
CompressionBypass,
|
||||||
@ -284,7 +283,7 @@ pub fn open_pile_with_work_bypass(
|
|||||||
dir: &Path,
|
dir: &Path,
|
||||||
desc: &PileDescriptor,
|
desc: &PileDescriptor,
|
||||||
bypass_level: BypassLevel,
|
bypass_level: BypassLevel,
|
||||||
) -> anyhow::Result<(Pile<Box<dyn RawPile>>, Box<dyn RawPile>)> {
|
) -> anyhow::Result<(Pile<Arc<Box<dyn RawPile>>>, Arc<Box<dyn RawPile>>)> {
|
||||||
let num_compressors = get_number_of_workers("YAMA_COMPRESSORS");
|
let num_compressors = get_number_of_workers("YAMA_COMPRESSORS");
|
||||||
let num_decompressors = get_number_of_workers("YAMA_DECOMPRESSORS");
|
let num_decompressors = get_number_of_workers("YAMA_DECOMPRESSORS");
|
||||||
|
|
||||||
@ -302,8 +301,8 @@ pub fn open_pile_with_work_bypass(
|
|||||||
BypassLevel::CompressionBypass => {
|
BypassLevel::CompressionBypass => {
|
||||||
let common_raw_pile: Arc<Box<dyn RawPile>> = Arc::new(Box::new(blob_raw_pile));
|
let common_raw_pile: Arc<Box<dyn RawPile>> = Arc::new(Box::new(blob_raw_pile));
|
||||||
|
|
||||||
let raw_pile: Box<dyn RawPile> = match desc.compression {
|
let raw_pile: Arc<Box<dyn RawPile>> = match desc.compression {
|
||||||
None => Box::new(common_raw_pile.clone()),
|
None => common_raw_pile.clone(),
|
||||||
Some(comp_level) => {
|
Some(comp_level) => {
|
||||||
let mut dictionary = Vec::new();
|
let mut dictionary = Vec::new();
|
||||||
let dict_path = dir.join("important_zstd.dict");
|
let dict_path = dir.join("important_zstd.dict");
|
||||||
@ -321,10 +320,10 @@ pub fn open_pile_with_work_bypass(
|
|||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Box::new(compressed_pile)
|
Arc::new(Box::new(compressed_pile))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok((Pile::new(raw_pile), Box::new(common_raw_pile)))
|
Ok((Pile::new(raw_pile), common_raw_pile))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user