From e6d618a29e77963777ca5e3f8ba1f021a6e6526e Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 20 Nov 2021 12:37:39 +0000 Subject: [PATCH] Make Responder technically capable of using a writing pipeline --- datman/src/remote/backup_source_requester.rs | 1 + yama/src/remote/responder.rs | 49 +++++++++++++++----- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/datman/src/remote/backup_source_requester.rs b/datman/src/remote/backup_source_requester.rs index bd7ff21..f70c417 100644 --- a/datman/src/remote/backup_source_requester.rs +++ b/datman/src/remote/backup_source_requester.rs @@ -83,6 +83,7 @@ pub fn chunking< write, get_number_of_workers("YAMA_RESPONDERS") as u16, raw_pile, + None, progress_bar, ); diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index 61da54f..ce756cc 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -9,15 +9,22 @@ use crossbeam_channel::{Receiver, Sender}; use itertools::Itertools; use log::{error, info, warn}; -use crate::pile::RawPile; +use crate::definitions::ChunkId; +use crate::pile::{Keyspace, RawPile}; use crate::progress::ProgressTracker; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; +#[derive(Clone)] +pub struct ResponderWritingPipeline { + pipeline_submission: Sender<(ChunkId, Vec)>, +} + #[derive(Clone)] /// A wrapper for a RawPile which allows a `Requester` to access it over a pipe (e.g. TCP socket or /// an SSH connection). pub struct Responder { continuation_tokens: Arc>>>, + writing_pipeline: Option, } impl Responder { @@ -32,6 +39,7 @@ impl Responder { write: W, num_workers: u16, pile: Arc, + writing_pipeline: Option, mut progress_bar: PT, ) -> (JoinHandle, JoinHandle, Vec>) { let mut handles = Vec::new(); @@ -39,6 +47,7 @@ impl Responder { let (resp_send, resp_recv) = crossbeam_channel::bounded::(4); let responder = Responder { continuation_tokens: Arc::new(Mutex::new(Default::default())), + writing_pipeline, }; let r_handle = { @@ -181,20 +190,38 @@ impl Responder { } } }, - RequestBody::Write { kind, key, value } => match pile.write(kind, &key, &value) { - Ok(_) => Response { - response_to: request.id, - body: ResponseBody::Success, - }, - Err(err) => { - warn!("Error whilst doing a raw write: {:?}", err); - let err = format!("{:?}", err); + RequestBody::Write { kind, key, value } => { + if let Some(writing_pipeline) = self + .writing_pipeline + .as_ref() + .filter(|_| kind == Keyspace::Chunk) + { + let mut chunk_id = ChunkId::default(); + chunk_id.copy_from_slice(&key[..]); + writing_pipeline.pipeline_submission.send((chunk_id, value)); + // We lie and say it was successful once we submit. + // We'll complain on our side if anything goes wrong, anyway. Response { response_to: request.id, - body: ResponseBody::Failed(err), + body: ResponseBody::Success, + } + } else { + match pile.write(kind, &key, &value) { + Ok(_) => Response { + response_to: request.id, + body: ResponseBody::Success, + }, + Err(err) => { + warn!("Error whilst doing a raw write: {:?}", err); + let err = format!("{:?}", err); + Response { + response_to: request.id, + body: ResponseBody::Failed(err), + } + } } } - }, + } RequestBody::Delete { kind, key } => match pile.delete(kind, &key) { Ok(_) => Response { response_to: request.id,