From b00a6da9939afa20b6a84f03fd8ccca80034930d Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 20 Nov 2021 12:57:00 +0000 Subject: [PATCH] Make chunking() able to use writing pipelines --- datman/src/remote/backup_source_requester.rs | 23 +++++++++++++++++--- yama/src/remote/responder.rs | 2 +- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/datman/src/remote/backup_source_requester.rs b/datman/src/remote/backup_source_requester.rs index f70c417..2272b3c 100644 --- a/datman/src/remote/backup_source_requester.rs +++ b/datman/src/remote/backup_source_requester.rs @@ -10,9 +10,9 @@ use std::process::{Child, Command, Stdio}; use std::sync::Arc; use yama::commands::{load_pile_descriptor, open_pile}; use yama::definitions::TreeNode; -use yama::pile::{Pile, RawPile}; +use yama::pile::{Pile, RawPile, StoragePipelineSettings}; use yama::progress::ProgressTracker; -use yama::remote::responder::Responder; +use yama::remote::responder::{Responder, ResponderWritingPipeline}; use yama::remote::{read_message, write_message}; use yama::utils::get_number_of_workers; @@ -69,6 +69,7 @@ pub fn chunking< raw_pile: Arc, parent: Option, progress_bar: PT, + use_writing_pipeline: bool, ) -> anyhow::Result<(R, W)> { info!("Chunking."); write_message(&mut write, &"chunk")?; @@ -78,12 +79,27 @@ pub fn chunking< write_message(&mut write, &parent)?; write.flush()?; + let writing_pipeline = if use_writing_pipeline { + let sps = StoragePipelineSettings { + num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32, + compressor_input_bound: 32, + writer_input_bound: 32, + }; + let (control_tx, control_rx) = crossbeam_channel::unbounded(); + let pipeline = raw_pile.build_storage_pipeline(sps, control_tx)?; + Some(ResponderWritingPipeline { + pipeline_submission: pipeline, + }) + } else { + None + }; + let (r_handle, w_handle, join_handles) = Responder::start( read, write, get_number_of_workers("YAMA_RESPONDERS") as u16, raw_pile, - None, + writing_pipeline, progress_bar, ); @@ -232,6 +248,7 @@ pub fn backup_remote_source_to_destination raw_pile, parent, progress_bar, + false, )?; quit(&mut read, &mut write)?; diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index ce756cc..748d65b 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -16,7 +16,7 @@ use crate::remote::{read_message, write_message, Request, RequestBody, Response, #[derive(Clone)] pub struct ResponderWritingPipeline { - pipeline_submission: Sender<(ChunkId, Vec)>, + pub pipeline_submission: Sender<(ChunkId, Vec)>, } #[derive(Clone)]