From 7ec617b6a5f3432428dcd15f3457185fee3945dc Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 12 Nov 2021 06:56:42 +0000 Subject: [PATCH] Fix remote source backup and add progress bar Signed-off-by: Olivier 'reivilibre --- Cargo.lock | 1 + datman/Cargo.toml | 1 + datman/src/bin/datman.rs | 61 +++++++++++------- datman/src/commands/backup.rs | 9 ++- datman/src/remote/backup_source_requester.rs | 32 ++++++++-- datman/src/remote/backup_source_responder.rs | 67 +++++++++++++++++++- docs/SUMMARY.md | 2 + docs/datman/remote_backups.md | 25 ++++++++ yama/src/lib.rs | 1 + yama/src/operations/storing.rs | 35 ++++------ yama/src/progress.rs | 42 ++++++++++++ yama/src/remote.rs | 4 ++ yama/src/remote/requester.rs | 21 ++++-- yama/src/remote/responder.rs | 21 +++++- 14 files changed, 258 insertions(+), 64 deletions(-) create mode 100644 docs/datman/remote_backups.md create mode 100644 yama/src/progress.rs diff --git a/Cargo.lock b/Cargo.lock index dbcd041..fbadd94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,6 +275,7 @@ dependencies = [ "byteorder", "chrono", "clap", + "crossbeam-channel 0.4.4", "env_logger", "glob", "hostname", diff --git a/datman/Cargo.toml b/datman/Cargo.toml index c8d49e9..aecb1c6 100644 --- a/datman/Cargo.toml +++ b/datman/Cargo.toml @@ -12,6 +12,7 @@ description = "A chunked and deduplicated backup system using Yama" [dependencies] clap = "= 3.0.0-beta.5" +crossbeam-channel = "0.4" anyhow = "1.0" thiserror = "1.0" serde = { version = "1.0.104", features = ["derive"] } diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index 1fd41ce..3784685 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -28,6 +28,8 @@ use datman::commands::init_descriptor; use datman::descriptor::{load_descriptor, SourceDescriptor}; use datman::remote::backup_source_requester::backup_remote_source_to_destination; use datman::remote::backup_source_responder; +use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; +use log::info; use std::str::FromStr; pub const FAILURE_SYMBOL_OBNOXIOUS_FLASHING: &str = "\x1b[5m\x1b[31m⚠️ \x1b[25m\x1b[22m"; @@ -57,16 +59,13 @@ pub enum DatmanCommand { source_name: String, }, + /// Back up a source locally or over the network. BackupOne { /// Name of the source to back up. source_name: String, /// Name of the destination to back up to. destination_name: String, - - /// Specify the remote name. - #[clap(short, long)] - remote: Option, }, BackupAll { @@ -174,6 +173,16 @@ fn main() -> anyhow::Result<()> { unimplemented!(); } DatmanCommand::InteractiveLabelling { source_name } => { + // TODO: check for now, then support remote functionality + // if let SourceDescriptor::DirectorySource { hostname, .. } = source { + // if hostname != &my_hostname && remote.is_none() { + // bail!( + // "Wrong hostname. Hostname should be {:?} but is {:?}", + // hostname, + // my_hostname + // ); + // } + // } interactive_labelling_session(Path::new("."), source_name).unwrap(); } DatmanCommand::InteractiveBrowsing { source_name } => { @@ -182,7 +191,6 @@ fn main() -> anyhow::Result<()> { DatmanCommand::BackupOne { source_name, destination_name, - remote, } => { let my_hostname = hostname::get() .expect("No hostname") @@ -192,23 +200,21 @@ fn main() -> anyhow::Result<()> { let source = &descriptor.source[&source_name]; let destination = &descriptor.piles[&destination_name]; - if let SourceDescriptor::DirectorySource { hostname, .. } = source { - if hostname != &my_hostname && remote.is_none() { - bail!( - "Wrong hostname. Hostname should be {:?} but is {:?}", - hostname, - my_hostname - ); - } - } + let mut pbar = ProgressBar::with_draw_target(0, ProgressDrawTarget::stdout_with_hz(10)); + pbar.set_style( + ProgressStyle::default_bar().template( + "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", + ), + ); + pbar.set_message("storing"); - let result = if let Some(remote_name) = remote { - let remote_host_descriptor = - if let Some(rhd) = descriptor.remote_hosts.get(&remote_name) { - rhd - } else { - bail!("No remote found by that name."); - }; + let is_remote = if let SourceDescriptor::DirectorySource { hostname, .. } = source { + hostname != &my_hostname + } else { + false + }; + + let result = if is_remote { backup_remote_source_to_destination( source, destination, @@ -216,8 +222,8 @@ fn main() -> anyhow::Result<()> { Path::new("."), &source_name, &destination_name, - remote_host_descriptor, yama::utils::get_number_of_workers("YAMA_CHUNKERS"), + pbar, ) } else { backup_source_to_destination( @@ -228,6 +234,7 @@ fn main() -> anyhow::Result<()> { &source_name, &destination_name, yama::utils::get_number_of_workers("YAMA_CHUNKERS"), + &mut pbar, ) }; with_exitcode(with_obvious_successfail_message(result)) @@ -236,12 +243,21 @@ fn main() -> anyhow::Result<()> { let descriptor = load_descriptor(Path::new(".")).unwrap(); let destination = &descriptor.piles[&destination_name]; + let mut pbar = ProgressBar::with_draw_target(0, ProgressDrawTarget::stdout_with_hz(10)); + pbar.set_style( + ProgressStyle::default_bar().template( + "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", + ), + ); + pbar.set_message("storing"); + backup_all_sources_to_destination( destination, &descriptor, Path::new("."), &destination_name, yama::utils::get_number_of_workers("YAMA_CHUNKERS"), + &mut pbar, ) .unwrap(); } @@ -280,6 +296,7 @@ fn main() -> anyhow::Result<()> { } DatmanCommand::InternalBackupSourceResponder => { + info!("Datman responder at {:?}", std::env::current_exe()?); backup_source_responder::handler_stdio()?; } } diff --git a/datman/src/commands/backup.rs b/datman/src/commands/backup.rs index e6ee358..d7c1658 100644 --- a/datman/src/commands/backup.rs +++ b/datman/src/commands/backup.rs @@ -33,6 +33,7 @@ use yama::definitions::{ FilesystemOwnership, FilesystemPermissions, PointerData, RecursiveChunkRef, RootTreeNode, TreeNode, }; +use yama::progress::ProgressTracker; pub const POINTER_DATETIME_FORMAT: &'static str = "%F_%T"; pub const POINTER_FIELD_SEPARATOR: char = '+'; @@ -123,7 +124,7 @@ pub fn label_filter_and_convert( Ok(Some(root)) } -pub fn backup_source_to_destination( +pub fn backup_source_to_destination( source: &SourceDescriptor, dest: &DestPileDescriptor, descriptor: &Descriptor, @@ -131,6 +132,7 @@ pub fn backup_source_to_destination( source_name: &str, dest_name: &str, num_workers: u8, + progress_bar: &mut PT, ) -> anyhow::Result<()> { match source { SourceDescriptor::DirectorySource { @@ -192,6 +194,7 @@ pub fn backup_source_to_destination( root, parent, num_workers, + progress_bar, )?; info!("Stored!"); @@ -320,12 +323,13 @@ where } } -pub fn backup_all_sources_to_destination( +pub fn backup_all_sources_to_destination( dest: &DestPileDescriptor, descriptor: &Descriptor, desc_path: &Path, dest_name: &str, num_workers: u8, + progress_bar: &mut PT, ) -> anyhow::Result<()> { for (source_name, source_descriptor) in descriptor.source.iter() { backup_source_to_destination( @@ -336,6 +340,7 @@ pub fn backup_all_sources_to_destination( source_name.as_str(), dest_name, num_workers, + progress_bar, )?; } Ok(()) diff --git a/datman/src/remote/backup_source_requester.rs b/datman/src/remote/backup_source_requester.rs index b355dfe..ba70f87 100644 --- a/datman/src/remote/backup_source_requester.rs +++ b/datman/src/remote/backup_source_requester.rs @@ -1,5 +1,5 @@ use crate::commands::backup::{get_pointer_name_at, label_filter_and_convert}; -use crate::descriptor::{Descriptor, DestPileDescriptor, RemoteHostDescriptor, SourceDescriptor}; +use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor}; use crate::tree::FileTree; use anyhow::{anyhow, bail}; use chrono::Utc; @@ -11,6 +11,7 @@ use std::sync::Arc; use yama::commands::{load_pile_descriptor, open_pile}; use yama::definitions::TreeNode; use yama::pile::{Pile, RawPile}; +use yama::progress::ProgressTracker; use yama::remote::responder::Responder; use yama::remote::{read_message, write_message}; use yama::utils::get_number_of_workers; @@ -53,13 +54,19 @@ pub fn scanning( Ok(scan_result) } -pub fn chunking( +pub fn chunking< + R: Read + Send + 'static, + W: Write + Send + 'static, + RP: RawPile + 'static, + PT: ProgressTracker + Send + 'static, +>( read: R, mut write: W, pointer_name: String, tree_node: &TreeNode, raw_pile: Arc, parent: Option, + progress_bar: PT, ) -> anyhow::Result<()> { info!("Chunking."); write_message(&mut write, &pointer_name)?; @@ -72,6 +79,7 @@ pub fn chunking( source: &SourceDescriptor, dest: &DestPileDescriptor, descriptor: &Descriptor, desc_path: &Path, source_name: &str, dest_name: &str, - remote_host_descriptor: &RemoteHostDescriptor, _num_workers: u8, + progress_bar: PT, ) -> anyhow::Result<()> { match source { SourceDescriptor::DirectorySource { - hostname: _, + hostname, directory, } => { + let remote_host_descriptor = descriptor + .remote_hosts + .get(hostname) + .ok_or_else(|| anyhow::anyhow!("No remote host by that name: {:?}.", hostname))?; info!( "Looking to backup {} (from {}) to {}", source_name, remote_host_descriptor.user_at_host, dest_name @@ -177,7 +189,15 @@ pub fn backup_remote_source_to_destination( let raw_pile = Arc::new(pile.raw_pile); let pile = Pile::new(raw_pile.clone()); - chunking(read, write, pointer_name.clone(), &root, raw_pile, parent)?; + chunking( + read, + write, + pointer_name.clone(), + &root, + raw_pile, + parent, + progress_bar, + )?; info!("Stored! Checking for existence..."); if pile.list_pointers()?.contains(&pointer_name) { diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs index 9c12165..c46e9c4 100644 --- a/datman/src/remote/backup_source_responder.rs +++ b/datman/src/remote/backup_source_responder.rs @@ -3,13 +3,16 @@ use crate::tree::scan; use anyhow::bail; +use crossbeam_channel::Sender; use log::info; use std::io::{stdin, stdout, Read, Write}; use std::path::PathBuf; +use std::time::Instant; use yama::definitions::TreeNode; use yama::pile::{Pile, RawPile}; +use yama::progress::ProgressTracker; use yama::remote::requester::Requester; -use yama::remote::{read_message, write_message}; +use yama::remote::{read_message, write_message, RequestBody, ResponseBody}; use yama::utils::get_number_of_workers; pub fn introduction(read: &mut R, write: &mut W) -> anyhow::Result<()> { @@ -57,6 +60,9 @@ pub fn chunking( let pile = Pile::new(raw_pile); + // TODO TODO progress + let progress_bar = &mut (); + yama::operations::storing::store_fully( &pile, path, @@ -64,6 +70,7 @@ pub fn chunking( tree_node, parent, get_number_of_workers("YAMA_CHUNKERS"), + progress_bar, )?; for join_handle in requester_join_handles { @@ -73,6 +80,54 @@ pub fn chunking( Ok(()) } +pub struct ProgressSender { + pub last_sent: Instant, + pub current_progress: u64, + pub current_max: u64, + // TODO actually propagate this + pub current_message: String, + pub sender: Sender<(RequestBody, Option>)>, +} + +impl ProgressSender { + pub fn send_now(&mut self, include_message: bool) { + self.sender + .send(( + RequestBody::Progress { + current: self.current_progress, + max: self.current_max, + }, + None, + )) + .expect("Progress sender failed"); + self.last_sent = Instant::now(); + } + + pub fn send_if_overdue(&mut self) { + //info!("send if overdue..."); + if Instant::now().duration_since(self.last_sent).as_millis() >= 1024 { + self.send_now(false); + } + } +} + +impl ProgressTracker for ProgressSender { + fn inc_progress(&mut self, delta_progress: u64) { + self.current_progress += delta_progress; + self.send_if_overdue(); + } + + fn set_current(&mut self, current_progress: u64) { + self.current_progress = current_progress; + self.send_if_overdue(); + } + + fn set_max_size(&mut self, max_size: u64) { + self.current_max = max_size; + self.send_if_overdue(); + } +} + pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> { let (pointer_name, tree_node, parent) = { let stdin = stdin(); @@ -90,6 +145,15 @@ pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> { let requester_join_handles = { let (yama_requester, requester_join_handles) = Requester::new_from_stdio(); + let command_sender = yama_requester.clone_command_sender(); + info!("progress sender in use"); + let mut progress_bar = ProgressSender { + last_sent: Instant::now(), + current_progress: 0, + current_max: 0, + current_message: "".to_string(), + sender: command_sender, + }; let raw_pile: Box = Box::new(yama_requester); @@ -102,6 +166,7 @@ pub fn chunking_stdio(path: &PathBuf) -> anyhow::Result<()> { tree_node, parent, get_number_of_workers("YAMA_CHUNKERS"), + &mut progress_bar, )?; requester_join_handles diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 3234ba1..851c8cf 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -7,3 +7,5 @@ - [Pointers and Nodes](./yama/internals/pointers-and-nodes.md) - [Datman](./datman/index.md) - [Getting Started](./datman/getting_started.md) + - [Remote Backups](./datman/remote_backups.md) + diff --git a/docs/datman/remote_backups.md b/docs/datman/remote_backups.md new file mode 100644 index 0000000..75699be --- /dev/null +++ b/docs/datman/remote_backups.md @@ -0,0 +1,25 @@ +# Remote Backups + +## Backup up a remote source into a local destination + +First set up a remote host like the following: + +```toml +[remote_hosts.myhost] +# Specify an SSH user@host pair +user_at_host = "backup@myhost.example.com" +# Optional: override path to Datman (may be useful for using one with special privileges) +path_to_datman = "/path/to/datman" +``` + +And set up a source using the same hostname: + +```toml +[source.mysourcename] +directory = "/path/to/source/on/remote" +hostname = "myhost" +``` + +*Currently the use of helpers is not possible using this approach, but some of them have direct support for SSH targets.* + +*Unimplemented: It would be useful to offload compression onto the remote.* diff --git a/yama/src/lib.rs b/yama/src/lib.rs index 59a5c89..79c4f2f 100644 --- a/yama/src/lib.rs +++ b/yama/src/lib.rs @@ -4,6 +4,7 @@ pub mod debug; pub mod definitions; pub mod operations; pub mod pile; +pub mod progress; pub mod remote; pub mod tree; pub mod utils; diff --git a/yama/src/operations/storing.rs b/yama/src/operations/storing.rs index 0c3c7ac..6662efa 100644 --- a/yama/src/operations/storing.rs +++ b/yama/src/operations/storing.rs @@ -24,7 +24,6 @@ use std::sync::atomic::{AtomicU32, Ordering}; use anyhow::{anyhow, bail, Context}; use crossbeam_channel::{Receiver, Sender}; use crossbeam_utils::thread; -use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use log::{error, warn}; use crate::chunking::{RecursiveChunker, SENSIBLE_THRESHOLD}; @@ -32,33 +31,22 @@ use crate::commands; use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node}; use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode}; use crate::pile::{Pile, RawPile}; +use crate::progress::ProgressTracker; use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place}; use std::collections::BTreeMap; -pub fn store( +pub fn store( root_path: &Path, root: &mut TreeNode, pile: &Pile, - make_progress_bar: bool, + progress_bar: &mut PT, num_workers: u8, ) -> anyhow::Result<()> { - let pbar = if make_progress_bar { - ProgressBar::with_draw_target( - root.count_normal_files() as u64, - ProgressDrawTarget::stdout_with_hz(10), - ) - } else { - ProgressBar::hidden() - }; - pbar.set_style( - ProgressStyle::default_bar() - .template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"), - ); - pbar.set_message("storing"); - let (paths_send, paths_recv) = crossbeam_channel::unbounded(); let (results_send, results_recv) = crossbeam_channel::bounded(16); + progress_bar.set_max_size(root.count_normal_files() as u64); + let critical_failures = AtomicU32::new(0); thread::scope(|s| { @@ -80,7 +68,7 @@ pub fn store( drop(results_send); drop(paths_recv); - if let Err(e) = manager(root, paths_send, results_recv, &pbar) { + if let Err(e) = manager(root, paths_send, results_recv, progress_bar) { error!("[critical!] Storage manager FAILED: {:?}", e); critical_failures.fetch_add(1, Ordering::Relaxed); } @@ -204,11 +192,11 @@ fn update_node( Ok(()) } -pub fn manager( +pub fn manager( root: &mut TreeNode, paths_sender: Sender, results_receiver: Receiver<(String, Option)>, - progress_bar: &ProgressBar, + progress_bar: &mut PT, ) -> anyhow::Result<()> { root.visit( &mut |tree_node, name| { @@ -225,7 +213,7 @@ pub fn manager( drop(paths_sender); while let Ok((path, opt_chunk_ref)) = results_receiver.recv() { - progress_bar.inc(1); + progress_bar.inc_progress(1); match opt_chunk_ref { None => { delete_node(root, &path)?; @@ -242,13 +230,14 @@ pub fn manager( /// Stores files into the pile, potentially differentiating using a parent pointer (which will be /// loaded and fully-integrated). /// This also creates a pointer (which is why this is called `store_fully`). -pub fn store_fully( +pub fn store_fully( pile: &Pile>, root_dir: &PathBuf, new_pointer_name: &String, mut root_node: TreeNode, parent: Option, num_workers: u8, + progress_bar: &mut PT, ) -> anyhow::Result<()> { if let Some(parent) = parent.as_ref() { let mut parent_pointer = pile.read_pointer(parent)?.ok_or_else(|| { @@ -263,7 +252,7 @@ pub fn store_fully( differentiate_node_in_place(&mut root_node, &parent_node.node)?; } - store(&root_dir, &mut root_node, &pile, true, num_workers)?; + store(&root_dir, &mut root_node, &pile, progress_bar, num_workers)?; let mut uid_lookup = BTreeMap::new(); let mut gid_lookup = BTreeMap::new(); diff --git a/yama/src/progress.rs b/yama/src/progress.rs new file mode 100644 index 0000000..05e11c7 --- /dev/null +++ b/yama/src/progress.rs @@ -0,0 +1,42 @@ +use indicatif::ProgressBar; + +pub trait ProgressTracker { + fn inc_progress(&mut self, delta_progress: u64); + fn set_current(&mut self, current_progress: u64); + fn set_max_size(&mut self, max_size: u64); +} + +impl ProgressTracker for ProgressBar { + #[inline] + fn set_max_size(&mut self, max_size: u64) { + self.set_length(max_size); + } + + #[inline] + fn inc_progress(&mut self, delta_progress: u64) { + self.inc(delta_progress); + } + + #[inline] + fn set_current(&mut self, current_progress: u64) { + self.set_position(current_progress); + } +} + +/// No-operation progress tracker. +impl ProgressTracker for () { + #[inline] + fn set_max_size(&mut self, _max_size: u64) { + // nop + } + + #[inline] + fn inc_progress(&mut self, _delta_progress: u64) { + // nop + } + + #[inline] + fn set_current(&mut self, _current_progress: u64) { + // nop + } +} diff --git a/yama/src/remote.rs b/yama/src/remote.rs index 9901ee7..ca207f1 100644 --- a/yama/src/remote.rs +++ b/yama/src/remote.rs @@ -61,6 +61,10 @@ pub enum RequestBody { Flush, LowLevelCheck, Shutdown, + Progress { + current: u64, + max: u64, + }, } #[derive(Serialize, Deserialize, Clone)] diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 7d7c980..75f9a7f 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; /// SSH connection). /// The requests are handled by a `Responder` on the other end of the pipe. pub struct Requester { - commands: Sender<(RequestBody, Sender)>, + commands: Sender<(RequestBody, Option>)>, } impl Requester { @@ -103,6 +103,10 @@ impl Requester { ) } + pub fn clone_command_sender(&self) -> Sender<(RequestBody, Option>)> { + self.commands.clone() + } + /// Thread that reads messages and sends them along. fn reader( mut read: R, @@ -130,19 +134,22 @@ impl Requester { fn writer( mut write: W, in_flight: Arc>>>, - command_receiver: Receiver<(RequestBody, Sender)>, + command_receiver: Receiver<(RequestBody, Option>)>, shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>, ) -> anyhow::Result<()> { while let Ok((req_body, response_channel)) = command_receiver.recv() { - let request_id = { + let request_id = if let Some(response_channel) = response_channel { let mut map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?; - let request_id = (0u16..u16::max_value()) + let request_id = (1u16..u16::MAX) .into_iter() .find(|id| !map.contains_key(&id)) .expect("No ID found"); map.insert(request_id, response_channel); request_id + } else { + 0 }; + let shutting_down = &req_body == &RequestBody::Shutdown; if shutting_down { @@ -198,7 +205,7 @@ impl Requester { fn request(&self, req: RequestBody) -> anyhow::Result { let (sender, receiver) = crossbeam_channel::bounded(0); self.commands - .send((req, sender)) + .send((req, Some(sender))) .or(Err(anyhow!("Failed to queue request")))?; Ok(receiver @@ -298,7 +305,7 @@ impl RawPile for Requester { } pub struct ListKeyIterator { - pub(crate) command_sender: Sender<(RequestBody, Sender)>, + pub(crate) command_sender: Sender<(RequestBody, Option>)>, pub(crate) batch_token: Option, /// in reverse order pub(crate) buffer: Vec>, @@ -313,7 +320,7 @@ impl Iterator for ListKeyIterator { } else if let Some(batch_token) = self.batch_token { let (send, recv) = crossbeam_channel::bounded(0); self.command_sender - .send((RequestBody::NextBatch { token: batch_token }, send)) + .send((RequestBody::NextBatch { token: batch_token }, Some(send))) .expect("Unable to send"); let resp = recv.recv().expect("Unable to recv"); match resp { diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index ba824c1..fd9dc98 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -10,6 +10,7 @@ use itertools::Itertools; use log::{error, info, warn}; use crate::pile::RawPile; +use crate::progress::ProgressTracker; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; #[derive(Clone)] @@ -21,11 +22,17 @@ pub struct Responder { impl Responder { /// Start a 'responder' (command processor). - pub fn start( + pub fn start< + RP: RawPile + 'static, + R: Read + Send + 'static, + W: Write + Send + 'static, + PT: ProgressTracker + Send + 'static, + >( read: R, write: W, num_workers: u16, pile: Arc, + mut progress_bar: PT, ) -> Vec> { let mut handles = Vec::new(); let (work_queue_send, work_queue_recv) = crossbeam_channel::bounded::(16); @@ -39,7 +46,7 @@ impl Responder { let work_queue_send = work_queue_send.clone(); let responder = responder.clone(); handles.push(thread::spawn(move || { - if let Err(e) = responder.reader(read, work_queue_send) { + if let Err(e) = responder.reader(read, work_queue_send, &mut progress_bar) { error!("reader failed: {:?}", e); } })); @@ -73,10 +80,11 @@ impl Responder { } /// Thread that reads messages and sends them along. - fn reader( + fn reader( &self, mut read: R, worker_queue_send: Sender, + progress_tracker: &mut PT, ) -> anyhow::Result<()> { loop { let request: Request = read_message(&mut read)?; @@ -100,6 +108,10 @@ impl Responder { "Failed to send continuation token to continuer" )))?; } + RequestBody::Progress { current, max } => { + progress_tracker.set_max_size(max); + progress_tracker.set_current(current); + } _ => { worker_queue_send.send(request)?; } @@ -288,6 +300,9 @@ impl Responder { response_to: request.id, body: ResponseBody::Success, }, + RequestBody::Progress { .. } => { + unreachable!("handled by readea") + } }; responses