Process pipeline stopping properly.

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 10:40:59 +00:00
parent efa1e6d51f
commit aebd32da4a
1 changed files with 16 additions and 10 deletions

View File

@ -320,10 +320,11 @@ impl RawPile for Requester {
let (response_tx, response_rx) = crossbeam_channel::bounded::<ResponseBody>(32); let (response_tx, response_rx) = crossbeam_channel::bounded::<ResponseBody>(32);
let mut in_flight_writes = 0; let mut in_flight_writes = 0;
const MAX_IN_FLIGHT_WRITES: u32 = 32; const MAX_IN_FLIGHT_WRITES: u32 = 32;
let mut pipeline_still_going = true;
loop { while pipeline_still_going || in_flight_writes > 0 {
// TODO this won't handle channel closure properly. // TODO this won't handle channel closure properly.
if in_flight_writes < MAX_IN_FLIGHT_WRITES { if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going {
crossbeam_channel::select! { crossbeam_channel::select! {
recv(response_rx) -> resp => { recv(response_rx) -> resp => {
in_flight_writes -= 1; in_flight_writes -= 1;
@ -346,17 +347,22 @@ impl RawPile for Requester {
} }
} }
recv(receiver) -> resp => { recv(receiver) -> resp => {
if let Ok((chunk_id, write)) = resp {
in_flight_writes += 1; in_flight_writes += 1;
let (chunk_id, write) = resp.unwrap();
todo!();
command_sender.send((RequestBody::Write { command_sender.send((RequestBody::Write {
kind: Keyspace::Chunk, kind: Keyspace::Chunk,
key: chunk_id.to_vec(), key: chunk_id.to_vec(),
value: write value: write
}, Some(response_tx.clone()))).unwrap(); }, Some(response_tx.clone()))).unwrap();
} else {
// the input has stopped
pipeline_still_going = false;
}
} }
} }
} else { } else {
// Either the pipeline is stopping or we are too busy to accept new chunks,
// so only process responses.
let resp = response_rx.recv().unwrap(); let resp = response_rx.recv().unwrap();
match resp { match resp {
ResponseBody::Success => { ResponseBody::Success => {