Fix remote source backup and add progress bar
continuous-integration/drone the build failed Details

Signed-off-by: Olivier 'reivilibre <olivier@librepush.net>
This commit is contained in:
Olivier 'reivilibre' 2021-11-12 06:56:42 +00:00
parent 74c369e0f6
commit 7ec617b6a5
14 changed files with 258 additions and 64 deletions

1
Cargo.lock generated
View File

@ -275,6 +275,7 @@ dependencies = [
"byteorder", "byteorder",
"chrono", "chrono",
"clap", "clap",
"crossbeam-channel 0.4.4",
"env_logger", "env_logger",
"glob", "glob",
"hostname", "hostname",

View File

@ -12,6 +12,7 @@ description = "A chunked and deduplicated backup system using Yama"
[dependencies] [dependencies]
clap = "= 3.0.0-beta.5" clap = "= 3.0.0-beta.5"
crossbeam-channel = "0.4"
anyhow = "1.0" anyhow = "1.0"
thiserror = "1.0" thiserror = "1.0"
serde = { version = "1.0.104", features = ["derive"] } serde = { version = "1.0.104", features = ["derive"] }

View File

@ -28,6 +28,8 @@ use datman::commands::init_descriptor;
use datman::descriptor::{load_descriptor, SourceDescriptor}; use datman::descriptor::{load_descriptor, SourceDescriptor};
use datman::remote::backup_source_requester::backup_remote_source_to_destination; use datman::remote::backup_source_requester::backup_remote_source_to_destination;
use datman::remote::backup_source_responder; use datman::remote::backup_source_responder;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use log::info;
use std::str::FromStr; use std::str::FromStr;
pub const FAILURE_SYMBOL_OBNOXIOUS_FLASHING: &str = "\x1b[5m\x1b[31m⚠ \x1b[25m\x1b[22m"; pub const FAILURE_SYMBOL_OBNOXIOUS_FLASHING: &str = "\x1b[5m\x1b[31m⚠ \x1b[25m\x1b[22m";
@ -57,16 +59,13 @@ pub enum DatmanCommand {
source_name: String, source_name: String,
}, },
/// Back up a source locally or over the network.
BackupOne { BackupOne {
/// Name of the source to back up. /// Name of the source to back up.
source_name: String, source_name: String,
/// Name of the destination to back up to. /// Name of the destination to back up to.
destination_name: String, destination_name: String,
/// Specify the remote name.
#[clap(short, long)]
remote: Option<String>,
}, },
BackupAll { BackupAll {
@ -174,6 +173,16 @@ fn main() -> anyhow::Result<()> {
unimplemented!(); unimplemented!();
} }
DatmanCommand::InteractiveLabelling { source_name } => { DatmanCommand::InteractiveLabelling { source_name } => {
// TODO: check for now, then support remote functionality
// if let SourceDescriptor::DirectorySource { hostname, .. } = source {
// if hostname != &my_hostname && remote.is_none() {
// bail!(
// "Wrong hostname. Hostname should be {:?} but is {:?}",
// hostname,
// my_hostname
// );
// }
// }
interactive_labelling_session(Path::new("."), source_name).unwrap(); interactive_labelling_session(Path::new("."), source_name).unwrap();
} }
DatmanCommand::InteractiveBrowsing { source_name } => { DatmanCommand::InteractiveBrowsing { source_name } => {
@ -182,7 +191,6 @@ fn main() -> anyhow::Result<()> {
DatmanCommand::BackupOne { DatmanCommand::BackupOne {
source_name, source_name,
destination_name, destination_name,
remote,
} => { } => {
let my_hostname = hostname::get() let my_hostname = hostname::get()
.expect("No hostname") .expect("No hostname")
@ -192,23 +200,21 @@ fn main() -> anyhow::Result<()> {
let source = &descriptor.source[&source_name]; let source = &descriptor.source[&source_name];
let destination = &descriptor.piles[&destination_name]; let destination = &descriptor.piles[&destination_name];
if let SourceDescriptor::DirectorySource { hostname, .. } = source { let mut pbar = ProgressBar::with_draw_target(0, ProgressDrawTarget::stdout_with_hz(10));
if hostname != &my_hostname && remote.is_none() { pbar.set_style(
bail!( ProgressStyle::default_bar().template(
"Wrong hostname. Hostname should be {:?} but is {:?}", "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
hostname, ),
my_hostname
); );
} pbar.set_message("storing");
}
let result = if let Some(remote_name) = remote { let is_remote = if let SourceDescriptor::DirectorySource { hostname, .. } = source {
let remote_host_descriptor = hostname != &my_hostname
if let Some(rhd) = descriptor.remote_hosts.get(&remote_name) {
rhd
} else { } else {
bail!("No remote found by that name."); false
}; };
let result = if is_remote {
backup_remote_source_to_destination( backup_remote_source_to_destination(
source, source,
destination, destination,
@ -216,8 +222,8 @@ fn main() -> anyhow::Result<()> {
Path::new("."), Path::new("."),
&source_name, &source_name,
&destination_name, &destination_name,
remote_host_descriptor,
yama::utils::get_number_of_workers("YAMA_CHUNKERS"), yama::utils::get_number_of_workers("YAMA_CHUNKERS"),
pbar,
) )
} else { } else {
backup_source_to_destination( backup_source_to_destination(
@ -228,6 +234,7 @@ fn main() -> anyhow::Result<()> {
&source_name, &source_name,
&destination_name, &destination_name,
yama::utils::get_number_of_workers("YAMA_CHUNKERS"), yama::utils::get_number_of_workers("YAMA_CHUNKERS"),
&mut pbar,
) )
}; };
with_exitcode(with_obvious_successfail_message(result)) with_exitcode(with_obvious_successfail_message(result))
@ -236,12 +243,21 @@ fn main() -> anyhow::Result<()> {
let descriptor = load_descriptor(Path::new(".")).unwrap(); let descriptor = load_descriptor(Path::new(".")).unwrap();
let destination = &descriptor.piles[&destination_name]; let destination = &descriptor.piles[&destination_name];
let mut pbar = ProgressBar::with_draw_target(0, ProgressDrawTarget::stdout_with_hz(10));
pbar.set_style(
ProgressStyle::default_bar().template(
"[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
),
);
pbar.set_message("storing");
backup_all_sources_to_destination( backup_all_sources_to_destination(
destination, destination,
&descriptor, &descriptor,
Path::new("."), Path::new("."),
&destination_name, &destination_name,
yama::utils::get_number_of_workers("YAMA_CHUNKERS"), yama::utils::get_number_of_workers("YAMA_CHUNKERS"),
&mut pbar,
) )
.unwrap(); .unwrap();
} }
@ -280,6 +296,7 @@ fn main() -> anyhow::Result<()> {
} }
DatmanCommand::InternalBackupSourceResponder => { DatmanCommand::InternalBackupSourceResponder => {
info!("Datman responder at {:?}", std::env::current_exe()?);
backup_source_responder::handler_stdio()?; backup_source_responder::handler_stdio()?;
} }
} }

View File

@ -33,6 +33,7 @@ use yama::definitions::{
FilesystemOwnership, FilesystemPermissions, PointerData, RecursiveChunkRef, RootTreeNode, FilesystemOwnership, FilesystemPermissions, PointerData, RecursiveChunkRef, RootTreeNode,
TreeNode, TreeNode,
}; };
use yama::progress::ProgressTracker;
pub const POINTER_DATETIME_FORMAT: &'static str = "%F_%T"; pub const POINTER_DATETIME_FORMAT: &'static str = "%F_%T";
pub const POINTER_FIELD_SEPARATOR: char = '+'; pub const POINTER_FIELD_SEPARATOR: char = '+';
@ -123,7 +124,7 @@ pub fn label_filter_and_convert(
Ok(Some(root)) Ok(Some(root))
} }
pub fn backup_source_to_destination( pub fn backup_source_to_destination<PT: ProgressTracker>(
source: &SourceDescriptor, source: &SourceDescriptor,
dest: &DestPileDescriptor, dest: &DestPileDescriptor,
descriptor: &Descriptor, descriptor: &Descriptor,
@ -131,6 +132,7 @@ pub fn backup_source_to_destination(
source_name: &str, source_name: &str,
dest_name: &str, dest_name: &str,
num_workers: u8, num_workers: u8,
progress_bar: &mut PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match source { match source {
SourceDescriptor::DirectorySource { SourceDescriptor::DirectorySource {
@ -192,6 +194,7 @@ pub fn backup_source_to_destination(
root, root,
parent, parent,
num_workers, num_workers,
progress_bar,
)?; )?;
info!("Stored!"); info!("Stored!");
@ -320,12 +323,13 @@ where
} }
} }
pub fn backup_all_sources_to_destination( pub fn backup_all_sources_to_destination<PT: ProgressTracker>(
dest: &DestPileDescriptor, dest: &DestPileDescriptor,
descriptor: &Descriptor, descriptor: &Descriptor,
desc_path: &Path, desc_path: &Path,
dest_name: &str, dest_name: &str,
num_workers: u8, num_workers: u8,
progress_bar: &mut PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
for (source_name, source_descriptor) in descriptor.source.iter() { for (source_name, source_descriptor) in descriptor.source.iter() {
backup_source_to_destination( backup_source_to_destination(
@ -336,6 +340,7 @@ pub fn backup_all_sources_to_destination(
source_name.as_str(), source_name.as_str(),
dest_name, dest_name,
num_workers, num_workers,
progress_bar,
)?; )?;
} }
Ok(()) Ok(())

View File

@ -1,5 +1,5 @@
use crate::commands::backup::{get_pointer_name_at, label_filter_and_convert}; use crate::commands::backup::{get_pointer_name_at, label_filter_and_convert};
use crate::descriptor::{Descriptor, DestPileDescriptor, RemoteHostDescriptor, SourceDescriptor}; use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor};
use crate::tree::FileTree; use crate::tree::FileTree;
use anyhow::{anyhow, bail}; use anyhow::{anyhow, bail};
use chrono::Utc; use chrono::Utc;
@ -11,6 +11,7 @@ use std::sync::Arc;
use yama::commands::{load_pile_descriptor, open_pile}; use yama::commands::{load_pile_descriptor, open_pile};
use yama::definitions::TreeNode; use yama::definitions::TreeNode;
use yama::pile::{Pile, RawPile}; use yama::pile::{Pile, RawPile};
use yama::progress::ProgressTracker;
use yama::remote::responder::Responder; use yama::remote::responder::Responder;
use yama::remote::{read_message, write_message}; use yama::remote::{read_message, write_message};
use yama::utils::get_number_of_workers; use yama::utils::get_number_of_workers;
@ -53,13 +54,19 @@ pub fn scanning<R: Read, W: Write>(
Ok(scan_result) Ok(scan_result)
} }
pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static, RP: RawPile + 'static>( pub fn chunking<
R: Read + Send + 'static,
W: Write + Send + 'static,
RP: RawPile + 'static,
PT: ProgressTracker + Send + 'static,
>(
read: R, read: R,
mut write: W, mut write: W,
pointer_name: String, pointer_name: String,
tree_node: &TreeNode, tree_node: &TreeNode,
raw_pile: Arc<RP>, raw_pile: Arc<RP>,
parent: Option<String>, parent: Option<String>,
progress_bar: PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("Chunking."); info!("Chunking.");
write_message(&mut write, &pointer_name)?; write_message(&mut write, &pointer_name)?;
@ -72,6 +79,7 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static, RP: RawPile
write, write,
get_number_of_workers("YAMA_RESPONDERS") as u16, get_number_of_workers("YAMA_RESPONDERS") as u16,
raw_pile, raw_pile,
progress_bar,
); );
info!("Waiting for remote to finish."); info!("Waiting for remote to finish.");
@ -85,21 +93,25 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static, RP: RawPile
Ok(()) Ok(())
} }
pub fn backup_remote_source_to_destination( pub fn backup_remote_source_to_destination<PT: ProgressTracker + Send + 'static>(
source: &SourceDescriptor, source: &SourceDescriptor,
dest: &DestPileDescriptor, dest: &DestPileDescriptor,
descriptor: &Descriptor, descriptor: &Descriptor,
desc_path: &Path, desc_path: &Path,
source_name: &str, source_name: &str,
dest_name: &str, dest_name: &str,
remote_host_descriptor: &RemoteHostDescriptor,
_num_workers: u8, _num_workers: u8,
progress_bar: PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match source { match source {
SourceDescriptor::DirectorySource { SourceDescriptor::DirectorySource {
hostname: _, hostname,
directory, directory,
} => { } => {
let remote_host_descriptor = descriptor
.remote_hosts
.get(hostname)
.ok_or_else(|| anyhow::anyhow!("No remote host by that name: {:?}.", hostname))?;
info!( info!(
"Looking to backup {} (from {}) to {}", "Looking to backup {} (from {}) to {}",
source_name, remote_host_descriptor.user_at_host, dest_name source_name, remote_host_descriptor.user_at_host, dest_name
@ -177,7 +189,15 @@ pub fn backup_remote_source_to_destination(
let raw_pile = Arc::new(pile.raw_pile); let raw_pile = Arc::new(pile.raw_pile);
let pile = Pile::new(raw_pile.clone()); let pile = Pile::new(raw_pile.clone());
chunking(read, write, pointer_name.clone(), &root, raw_pile, parent)?; chunking(
read,
write,
pointer_name.clone(),
&root,
raw_pile,
parent,
progress_bar,
)?;
info!("Stored! Checking for existence..."); info!("Stored! Checking for existence...");
if pile.list_pointers()?.contains(&pointer_name) { if pile.list_pointers()?.contains(&pointer_name) {

View File

@ -3,13 +3,16 @@
use crate::tree::scan; use crate::tree::scan;
use anyhow::bail; use anyhow::bail;
use crossbeam_channel::Sender;
use log::info; use log::info;
use std::io::{stdin, stdout, Read, Write}; use std::io::{stdin, stdout, Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Instant;
use yama::definitions::TreeNode; use yama::definitions::TreeNode;
use yama::pile::{Pile, RawPile}; use yama::pile::{Pile, RawPile};
use yama::progress::ProgressTracker;
use yama::remote::requester::Requester; use yama::remote::requester::Requester;
use yama::remote::{read_message, write_message}; use yama::remote::{read_message, write_message, RequestBody, ResponseBody};
use yama::utils::get_number_of_workers; use yama::utils::get_number_of_workers;
pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> { pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
@ -57,6 +60,9 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
let pile = Pile::new(raw_pile); let pile = Pile::new(raw_pile);
// TODO TODO progress
let progress_bar = &mut ();
yama::operations::storing::store_fully( yama::operations::storing::store_fully(
&pile, &pile,
path, path,
@ -64,6 +70,7 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
tree_node, tree_node,
parent, parent,
get_number_of_workers("YAMA_CHUNKERS"), get_number_of_workers("YAMA_CHUNKERS"),
progress_bar,
)?; )?;
for join_handle in requester_join_handles { for join_handle in requester_join_handles {
@ -73,6 +80,54 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
Ok(()) Ok(())
} }
pub struct ProgressSender {
pub last_sent: Instant,
pub current_progress: u64,
pub current_max: u64,
// TODO actually propagate this
pub current_message: String,
pub sender: Sender<(RequestBody, Option<Sender<ResponseBody>>)>,
}
impl ProgressSender {
pub fn send_now(&mut self, include_message: bool) {
self.sender
.send((
RequestBody::Progress {
current: self.current_progress,
max: self.current_max,
},
None,
))
.expect("Progress sender failed");
self.last_sent = Instant::now();
}
pub fn send_if_overdue(&mut self) {
//info!("send if overdue...");
if Instant::now().duration_since(self.last_sent).as_millis() >= 1024 {
self.send_now(false);
}
}
}
impl ProgressTracker for ProgressSender {
fn inc_progress(&mut self, delta_progress: u64) {
self.current_progress += delta_progress;
self.send_if_overdue();
}
fn set_current(&mut self, current_progress: u64) {
self.current_progress = current_progress;
self.send_if_overdue();
}
fn set_max_size(&mut self, max_size: u64) {
self.current_max = max_size;
self.send_if_overdue();
}
}
pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> { pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> {
let (pointer_name, tree_node, parent) = { let (pointer_name, tree_node, parent) = {
let stdin = stdin(); let stdin = stdin();
@ -90,6 +145,15 @@ pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> {
let requester_join_handles = { let requester_join_handles = {
let (yama_requester, requester_join_handles) = Requester::new_from_stdio(); let (yama_requester, requester_join_handles) = Requester::new_from_stdio();
let command_sender = yama_requester.clone_command_sender();
info!("progress sender in use");
let mut progress_bar = ProgressSender {
last_sent: Instant::now(),
current_progress: 0,
current_max: 0,
current_message: "".to_string(),
sender: command_sender,
};
let raw_pile: Box<dyn RawPile> = Box::new(yama_requester); let raw_pile: Box<dyn RawPile> = Box::new(yama_requester);
@ -102,6 +166,7 @@ pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> {
tree_node, tree_node,
parent, parent,
get_number_of_workers("YAMA_CHUNKERS"), get_number_of_workers("YAMA_CHUNKERS"),
&mut progress_bar,
)?; )?;
requester_join_handles requester_join_handles

View File

@ -7,3 +7,5 @@
- [Pointers and Nodes](./yama/internals/pointers-and-nodes.md) - [Pointers and Nodes](./yama/internals/pointers-and-nodes.md)
- [Datman](./datman/index.md) - [Datman](./datman/index.md)
- [Getting Started](./datman/getting_started.md) - [Getting Started](./datman/getting_started.md)
- [Remote Backups](./datman/remote_backups.md)

View File

@ -0,0 +1,25 @@
# Remote Backups
## Backup up a remote source into a local destination
First set up a remote host like the following:
```toml
[remote_hosts.myhost]
# Specify an SSH user@host pair
user_at_host = "backup@myhost.example.com"
# Optional: override path to Datman (may be useful for using one with special privileges)
path_to_datman = "/path/to/datman"
```
And set up a source using the same hostname:
```toml
[source.mysourcename]
directory = "/path/to/source/on/remote"
hostname = "myhost"
```
*Currently the use of helpers is not possible using this approach, but some of them have direct support for SSH targets.*
*Unimplemented: It would be useful to offload compression onto the remote.*

View File

@ -4,6 +4,7 @@ pub mod debug;
pub mod definitions; pub mod definitions;
pub mod operations; pub mod operations;
pub mod pile; pub mod pile;
pub mod progress;
pub mod remote; pub mod remote;
pub mod tree; pub mod tree;
pub mod utils; pub mod utils;

View File

@ -24,7 +24,6 @@ use std::sync::atomic::{AtomicU32, Ordering};
use anyhow::{anyhow, bail, Context}; use anyhow::{anyhow, bail, Context};
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use crossbeam_utils::thread; use crossbeam_utils::thread;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use log::{error, warn}; use log::{error, warn};
use crate::chunking::{RecursiveChunker, SENSIBLE_THRESHOLD}; use crate::chunking::{RecursiveChunker, SENSIBLE_THRESHOLD};
@ -32,33 +31,22 @@ use crate::commands;
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node}; use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node};
use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode}; use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode};
use crate::pile::{Pile, RawPile}; use crate::pile::{Pile, RawPile};
use crate::progress::ProgressTracker;
use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place}; use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place};
use std::collections::BTreeMap; use std::collections::BTreeMap;
pub fn store<RP: RawPile>( pub fn store<RP: RawPile, PT: ProgressTracker>(
root_path: &Path, root_path: &Path,
root: &mut TreeNode, root: &mut TreeNode,
pile: &Pile<RP>, pile: &Pile<RP>,
make_progress_bar: bool, progress_bar: &mut PT,
num_workers: u8, num_workers: u8,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let pbar = if make_progress_bar {
ProgressBar::with_draw_target(
root.count_normal_files() as u64,
ProgressDrawTarget::stdout_with_hz(10),
)
} else {
ProgressBar::hidden()
};
pbar.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
);
pbar.set_message("storing");
let (paths_send, paths_recv) = crossbeam_channel::unbounded(); let (paths_send, paths_recv) = crossbeam_channel::unbounded();
let (results_send, results_recv) = crossbeam_channel::bounded(16); let (results_send, results_recv) = crossbeam_channel::bounded(16);
progress_bar.set_max_size(root.count_normal_files() as u64);
let critical_failures = AtomicU32::new(0); let critical_failures = AtomicU32::new(0);
thread::scope(|s| { thread::scope(|s| {
@ -80,7 +68,7 @@ pub fn store<RP: RawPile>(
drop(results_send); drop(results_send);
drop(paths_recv); drop(paths_recv);
if let Err(e) = manager(root, paths_send, results_recv, &pbar) { if let Err(e) = manager(root, paths_send, results_recv, progress_bar) {
error!("[critical!] Storage manager FAILED: {:?}", e); error!("[critical!] Storage manager FAILED: {:?}", e);
critical_failures.fetch_add(1, Ordering::Relaxed); critical_failures.fetch_add(1, Ordering::Relaxed);
} }
@ -204,11 +192,11 @@ fn update_node(
Ok(()) Ok(())
} }
pub fn manager( pub fn manager<PT: ProgressTracker>(
root: &mut TreeNode, root: &mut TreeNode,
paths_sender: Sender<String>, paths_sender: Sender<String>,
results_receiver: Receiver<(String, Option<RecursiveChunkRef>)>, results_receiver: Receiver<(String, Option<RecursiveChunkRef>)>,
progress_bar: &ProgressBar, progress_bar: &mut PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
root.visit( root.visit(
&mut |tree_node, name| { &mut |tree_node, name| {
@ -225,7 +213,7 @@ pub fn manager(
drop(paths_sender); drop(paths_sender);
while let Ok((path, opt_chunk_ref)) = results_receiver.recv() { while let Ok((path, opt_chunk_ref)) = results_receiver.recv() {
progress_bar.inc(1); progress_bar.inc_progress(1);
match opt_chunk_ref { match opt_chunk_ref {
None => { None => {
delete_node(root, &path)?; delete_node(root, &path)?;
@ -242,13 +230,14 @@ pub fn manager(
/// Stores files into the pile, potentially differentiating using a parent pointer (which will be /// Stores files into the pile, potentially differentiating using a parent pointer (which will be
/// loaded and fully-integrated). /// loaded and fully-integrated).
/// This also creates a pointer (which is why this is called `store_fully`). /// This also creates a pointer (which is why this is called `store_fully`).
pub fn store_fully( pub fn store_fully<PT: ProgressTracker>(
pile: &Pile<Box<dyn RawPile>>, pile: &Pile<Box<dyn RawPile>>,
root_dir: &PathBuf, root_dir: &PathBuf,
new_pointer_name: &String, new_pointer_name: &String,
mut root_node: TreeNode, mut root_node: TreeNode,
parent: Option<String>, parent: Option<String>,
num_workers: u8, num_workers: u8,
progress_bar: &mut PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if let Some(parent) = parent.as_ref() { if let Some(parent) = parent.as_ref() {
let mut parent_pointer = pile.read_pointer(parent)?.ok_or_else(|| { let mut parent_pointer = pile.read_pointer(parent)?.ok_or_else(|| {
@ -263,7 +252,7 @@ pub fn store_fully(
differentiate_node_in_place(&mut root_node, &parent_node.node)?; differentiate_node_in_place(&mut root_node, &parent_node.node)?;
} }
store(&root_dir, &mut root_node, &pile, true, num_workers)?; store(&root_dir, &mut root_node, &pile, progress_bar, num_workers)?;
let mut uid_lookup = BTreeMap::new(); let mut uid_lookup = BTreeMap::new();
let mut gid_lookup = BTreeMap::new(); let mut gid_lookup = BTreeMap::new();

42
yama/src/progress.rs Normal file
View File

@ -0,0 +1,42 @@
use indicatif::ProgressBar;
pub trait ProgressTracker {
fn inc_progress(&mut self, delta_progress: u64);
fn set_current(&mut self, current_progress: u64);
fn set_max_size(&mut self, max_size: u64);
}
impl ProgressTracker for ProgressBar {
#[inline]
fn set_max_size(&mut self, max_size: u64) {
self.set_length(max_size);
}
#[inline]
fn inc_progress(&mut self, delta_progress: u64) {
self.inc(delta_progress);
}
#[inline]
fn set_current(&mut self, current_progress: u64) {
self.set_position(current_progress);
}
}
/// No-operation progress tracker.
impl ProgressTracker for () {
#[inline]
fn set_max_size(&mut self, _max_size: u64) {
// nop
}
#[inline]
fn inc_progress(&mut self, _delta_progress: u64) {
// nop
}
#[inline]
fn set_current(&mut self, _current_progress: u64) {
// nop
}
}

View File

@ -61,6 +61,10 @@ pub enum RequestBody {
Flush, Flush,
LowLevelCheck, LowLevelCheck,
Shutdown, Shutdown,
Progress {
current: u64,
max: u64,
},
} }
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]

View File

@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
/// SSH connection). /// SSH connection).
/// The requests are handled by a `Responder` on the other end of the pipe. /// The requests are handled by a `Responder` on the other end of the pipe.
pub struct Requester { pub struct Requester {
commands: Sender<(RequestBody, Sender<ResponseBody>)>, commands: Sender<(RequestBody, Option<Sender<ResponseBody>>)>,
} }
impl Requester { impl Requester {
@ -103,6 +103,10 @@ impl Requester {
) )
} }
pub fn clone_command_sender(&self) -> Sender<(RequestBody, Option<Sender<ResponseBody>>)> {
self.commands.clone()
}
/// Thread that reads messages and sends them along. /// Thread that reads messages and sends them along.
fn reader<R: Read>( fn reader<R: Read>(
mut read: R, mut read: R,
@ -130,19 +134,22 @@ impl Requester {
fn writer<W: Write>( fn writer<W: Write>(
mut write: W, mut write: W,
in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>, in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>,
command_receiver: Receiver<(RequestBody, Sender<ResponseBody>)>, command_receiver: Receiver<(RequestBody, Option<Sender<ResponseBody>>)>,
shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>, shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
while let Ok((req_body, response_channel)) = command_receiver.recv() { while let Ok((req_body, response_channel)) = command_receiver.recv() {
let request_id = { let request_id = if let Some(response_channel) = response_channel {
let mut map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?; let mut map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?;
let request_id = (0u16..u16::max_value()) let request_id = (1u16..u16::MAX)
.into_iter() .into_iter()
.find(|id| !map.contains_key(&id)) .find(|id| !map.contains_key(&id))
.expect("No ID found"); .expect("No ID found");
map.insert(request_id, response_channel); map.insert(request_id, response_channel);
request_id request_id
} else {
0
}; };
let shutting_down = &req_body == &RequestBody::Shutdown; let shutting_down = &req_body == &RequestBody::Shutdown;
if shutting_down { if shutting_down {
@ -198,7 +205,7 @@ impl Requester {
fn request(&self, req: RequestBody) -> anyhow::Result<ResponseBody> { fn request(&self, req: RequestBody) -> anyhow::Result<ResponseBody> {
let (sender, receiver) = crossbeam_channel::bounded(0); let (sender, receiver) = crossbeam_channel::bounded(0);
self.commands self.commands
.send((req, sender)) .send((req, Some(sender)))
.or(Err(anyhow!("Failed to queue request")))?; .or(Err(anyhow!("Failed to queue request")))?;
Ok(receiver Ok(receiver
@ -298,7 +305,7 @@ impl RawPile for Requester {
} }
pub struct ListKeyIterator { pub struct ListKeyIterator {
pub(crate) command_sender: Sender<(RequestBody, Sender<ResponseBody>)>, pub(crate) command_sender: Sender<(RequestBody, Option<Sender<ResponseBody>>)>,
pub(crate) batch_token: Option<u16>, pub(crate) batch_token: Option<u16>,
/// in reverse order /// in reverse order
pub(crate) buffer: Vec<Vec<u8>>, pub(crate) buffer: Vec<Vec<u8>>,
@ -313,7 +320,7 @@ impl Iterator for ListKeyIterator {
} else if let Some(batch_token) = self.batch_token { } else if let Some(batch_token) = self.batch_token {
let (send, recv) = crossbeam_channel::bounded(0); let (send, recv) = crossbeam_channel::bounded(0);
self.command_sender self.command_sender
.send((RequestBody::NextBatch { token: batch_token }, send)) .send((RequestBody::NextBatch { token: batch_token }, Some(send)))
.expect("Unable to send"); .expect("Unable to send");
let resp = recv.recv().expect("Unable to recv"); let resp = recv.recv().expect("Unable to recv");
match resp { match resp {

View File

@ -10,6 +10,7 @@ use itertools::Itertools;
use log::{error, info, warn}; use log::{error, info, warn};
use crate::pile::RawPile; use crate::pile::RawPile;
use crate::progress::ProgressTracker;
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
#[derive(Clone)] #[derive(Clone)]
@ -21,11 +22,17 @@ pub struct Responder {
impl Responder { impl Responder {
/// Start a 'responder' (command processor). /// Start a 'responder' (command processor).
pub fn start<RP: RawPile + 'static, R: Read + Send + 'static, W: Write + Send + 'static>( pub fn start<
RP: RawPile + 'static,
R: Read + Send + 'static,
W: Write + Send + 'static,
PT: ProgressTracker + Send + 'static,
>(
read: R, read: R,
write: W, write: W,
num_workers: u16, num_workers: u16,
pile: Arc<RP>, pile: Arc<RP>,
mut progress_bar: PT,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let mut handles = Vec::new(); let mut handles = Vec::new();
let (work_queue_send, work_queue_recv) = crossbeam_channel::bounded::<Request>(16); let (work_queue_send, work_queue_recv) = crossbeam_channel::bounded::<Request>(16);
@ -39,7 +46,7 @@ impl Responder {
let work_queue_send = work_queue_send.clone(); let work_queue_send = work_queue_send.clone();
let responder = responder.clone(); let responder = responder.clone();
handles.push(thread::spawn(move || { handles.push(thread::spawn(move || {
if let Err(e) = responder.reader(read, work_queue_send) { if let Err(e) = responder.reader(read, work_queue_send, &mut progress_bar) {
error!("reader failed: {:?}", e); error!("reader failed: {:?}", e);
} }
})); }));
@ -73,10 +80,11 @@ impl Responder {
} }
/// Thread that reads messages and sends them along. /// Thread that reads messages and sends them along.
fn reader<R: Read + Send + 'static>( fn reader<R: Read + Send + 'static, PT: ProgressTracker>(
&self, &self,
mut read: R, mut read: R,
worker_queue_send: Sender<Request>, worker_queue_send: Sender<Request>,
progress_tracker: &mut PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
loop { loop {
let request: Request = read_message(&mut read)?; let request: Request = read_message(&mut read)?;
@ -100,6 +108,10 @@ impl Responder {
"Failed to send continuation token to continuer" "Failed to send continuation token to continuer"
)))?; )))?;
} }
RequestBody::Progress { current, max } => {
progress_tracker.set_max_size(max);
progress_tracker.set_current(current);
}
_ => { _ => {
worker_queue_send.send(request)?; worker_queue_send.send(request)?;
} }
@ -288,6 +300,9 @@ impl Responder {
response_to: request.id, response_to: request.id,
body: ResponseBody::Success, body: ResponseBody::Success,
}, },
RequestBody::Progress { .. } => {
unreachable!("handled by readea")
}
}; };
responses responses