Make chunking() able to use writing pipelines

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 12:57:00 +00:00
parent e6d618a29e
commit b00a6da993
2 changed files with 21 additions and 4 deletions

View File

@ -10,9 +10,9 @@ use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use yama::commands::{load_pile_descriptor, open_pile};
use yama::definitions::TreeNode;
use yama::pile::{Pile, RawPile};
use yama::pile::{Pile, RawPile, StoragePipelineSettings};
use yama::progress::ProgressTracker;
use yama::remote::responder::Responder;
use yama::remote::responder::{Responder, ResponderWritingPipeline};
use yama::remote::{read_message, write_message};
use yama::utils::get_number_of_workers;
@ -69,6 +69,7 @@ pub fn chunking<
raw_pile: Arc<RP>,
parent: Option<String>,
progress_bar: PT,
use_writing_pipeline: bool,
) -> anyhow::Result<(R, W)> {
info!("Chunking.");
write_message(&mut write, &"chunk")?;
@ -78,12 +79,27 @@ pub fn chunking<
write_message(&mut write, &parent)?;
write.flush()?;
let writing_pipeline = if use_writing_pipeline {
let sps = StoragePipelineSettings {
num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32,
compressor_input_bound: 32,
writer_input_bound: 32,
};
let (control_tx, control_rx) = crossbeam_channel::unbounded();
let pipeline = raw_pile.build_storage_pipeline(sps, control_tx)?;
Some(ResponderWritingPipeline {
pipeline_submission: pipeline,
})
} else {
None
};
let (r_handle, w_handle, join_handles) = Responder::start(
read,
write,
get_number_of_workers("YAMA_RESPONDERS") as u16,
raw_pile,
None,
writing_pipeline,
progress_bar,
);
@ -232,6 +248,7 @@ pub fn backup_remote_source_to_destination<PT: ProgressTracker + Send + 'static>
raw_pile,
parent,
progress_bar,
false,
)?;
quit(&mut read, &mut write)?;

View File

@ -16,7 +16,7 @@ use crate::remote::{read_message, write_message, Request, RequestBody, Response,
#[derive(Clone)]
pub struct ResponderWritingPipeline {
pipeline_submission: Sender<(ChunkId, Vec<u8>)>,
pub pipeline_submission: Sender<(ChunkId, Vec<u8>)>,
}
#[derive(Clone)]