diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 065a74a..0b47f39 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -320,10 +320,11 @@ impl RawPile for Requester { let (response_tx, response_rx) = crossbeam_channel::bounded::(32); let mut in_flight_writes = 0; const MAX_IN_FLIGHT_WRITES: u32 = 32; + let mut pipeline_still_going = true; - loop { + while pipeline_still_going || in_flight_writes > 0 { // TODO this won't handle channel closure properly. - if in_flight_writes < MAX_IN_FLIGHT_WRITES { + if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going { crossbeam_channel::select! { recv(response_rx) -> resp => { in_flight_writes -= 1; @@ -346,17 +347,22 @@ impl RawPile for Requester { } } recv(receiver) -> resp => { - in_flight_writes += 1; - let (chunk_id, write) = resp.unwrap(); - todo!(); - command_sender.send((RequestBody::Write { - kind: Keyspace::Chunk, - key: chunk_id.to_vec(), - value: write - }, Some(response_tx.clone()))).unwrap(); + if let Ok((chunk_id, write)) = resp { + in_flight_writes += 1; + command_sender.send((RequestBody::Write { + kind: Keyspace::Chunk, + key: chunk_id.to_vec(), + value: write + }, Some(response_tx.clone()))).unwrap(); + } else { + // the input has stopped + pipeline_still_going = false; + } } } } else { + // Either the pipeline is stopping or we are too busy to accept new chunks, + // so only process responses. let resp = response_rx.recv().unwrap(); match resp { ResponseBody::Success => {