Make Responder technically capable of using a writing pipeline

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 12:37:39 +00:00
parent aaf2ea1493
commit e6d618a29e
2 changed files with 39 additions and 11 deletions

View File

@ -83,6 +83,7 @@ pub fn chunking<
write,
get_number_of_workers("YAMA_RESPONDERS") as u16,
raw_pile,
None,
progress_bar,
);

View File

@ -9,15 +9,22 @@ use crossbeam_channel::{Receiver, Sender};
use itertools::Itertools;
use log::{error, info, warn};
use crate::pile::RawPile;
use crate::definitions::ChunkId;
use crate::pile::{Keyspace, RawPile};
use crate::progress::ProgressTracker;
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
#[derive(Clone)]
pub struct ResponderWritingPipeline {
pipeline_submission: Sender<(ChunkId, Vec<u8>)>,
}
#[derive(Clone)]
/// A wrapper for a RawPile which allows a `Requester` to access it over a pipe (e.g. TCP socket or
/// an SSH connection).
pub struct Responder {
continuation_tokens: Arc<Mutex<HashMap<u16, Sender<u16>>>>,
writing_pipeline: Option<ResponderWritingPipeline>,
}
impl Responder {
@ -32,6 +39,7 @@ impl Responder {
write: W,
num_workers: u16,
pile: Arc<RP>,
writing_pipeline: Option<ResponderWritingPipeline>,
mut progress_bar: PT,
) -> (JoinHandle<R>, JoinHandle<W>, Vec<JoinHandle<()>>) {
let mut handles = Vec::new();
@ -39,6 +47,7 @@ impl Responder {
let (resp_send, resp_recv) = crossbeam_channel::bounded::<Response>(4);
let responder = Responder {
continuation_tokens: Arc::new(Mutex::new(Default::default())),
writing_pipeline,
};
let r_handle = {
@ -181,20 +190,38 @@ impl Responder {
}
}
},
RequestBody::Write { kind, key, value } => match pile.write(kind, &key, &value) {
Ok(_) => Response {
response_to: request.id,
body: ResponseBody::Success,
},
Err(err) => {
warn!("Error whilst doing a raw write: {:?}", err);
let err = format!("{:?}", err);
RequestBody::Write { kind, key, value } => {
if let Some(writing_pipeline) = self
.writing_pipeline
.as_ref()
.filter(|_| kind == Keyspace::Chunk)
{
let mut chunk_id = ChunkId::default();
chunk_id.copy_from_slice(&key[..]);
writing_pipeline.pipeline_submission.send((chunk_id, value));
// We lie and say it was successful once we submit.
// We'll complain on our side if anything goes wrong, anyway.
Response {
response_to: request.id,
body: ResponseBody::Failed(err),
body: ResponseBody::Success,
}
} else {
match pile.write(kind, &key, &value) {
Ok(_) => Response {
response_to: request.id,
body: ResponseBody::Success,
},
Err(err) => {
warn!("Error whilst doing a raw write: {:?}", err);
let err = format!("{:?}", err);
Response {
response_to: request.id,
body: ResponseBody::Failed(err),
}
}
}
}
},
}
RequestBody::Delete { kind, key } => match pile.delete(kind, &key) {
Ok(_) => Response {
response_to: request.id,