Add pipelined requester stage

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 10:33:44 +00:00
parent 0a02eea478
commit 005da2589d

View File

@ -311,7 +311,75 @@ impl RawPile for Requester {
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
// this one is a little bit more complex.
todo!()
// We want to be able to send off multiple write requests at once, but not too many, so we
// need to be able to apply backpressure.
let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(128);
let command_sender = self.commands.clone();
std::thread::spawn(move || {
let (response_tx, response_rx) = crossbeam_channel::bounded::<ResponseBody>(32);
let mut in_flight_writes = 0;
const MAX_IN_FLIGHT_WRITES: u32 = 32;
loop {
// TODO this won't handle channel closure properly.
if in_flight_writes < MAX_IN_FLIGHT_WRITES {
crossbeam_channel::select! {
recv(response_rx) -> resp => {
in_flight_writes -= 1;
match resp.unwrap() {
ResponseBody::Success => {
// nop
}
ResponseBody::Failed(string) => {
panic!("Requester pipeline fail {}", string);
}
ResponseBody::BatchData { .. } => {
panic!("wtf BatchData");
}
ResponseBody::NotExists => {
panic!("wtf NotExists");
}
ResponseBody::Data(_) => {
panic!("wtf Data");
}
}
}
recv(receiver) -> resp => {
in_flight_writes += 1;
let (chunk_id, write) = resp.unwrap();
todo!();
command_sender.send((RequestBody::Write {
kind: Keyspace::Chunk,
key: chunk_id.to_vec(),
value: write
}, Some(response_tx.clone()))).unwrap();
}
}
} else {
let resp = response_rx.recv().unwrap();
match resp {
ResponseBody::Success => {
// nop
}
ResponseBody::Failed(string) => {
panic!("Requester pipeline fail {}", string);
}
ResponseBody::BatchData { .. } => {
panic!("wtf BatchData");
}
ResponseBody::NotExists => {
panic!("wtf NotExists");
}
ResponseBody::Data(_) => {
panic!("wtf Data");
}
}
}
}
});
Ok(input)
}
}