diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 9b77357..065a74a 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -311,7 +311,75 @@ impl RawPile for Requester { controller_send: Sender, ) -> anyhow::Result)>> { // this one is a little bit more complex. - todo!() + // We want to be able to send off multiple write requests at once, but not too many, so we + // need to be able to apply backpressure. + let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec)>(128); + let command_sender = self.commands.clone(); + + std::thread::spawn(move || { + let (response_tx, response_rx) = crossbeam_channel::bounded::(32); + let mut in_flight_writes = 0; + const MAX_IN_FLIGHT_WRITES: u32 = 32; + + loop { + // TODO this won't handle channel closure properly. + if in_flight_writes < MAX_IN_FLIGHT_WRITES { + crossbeam_channel::select! { + recv(response_rx) -> resp => { + in_flight_writes -= 1; + match resp.unwrap() { + ResponseBody::Success => { + // nop + } + ResponseBody::Failed(string) => { + panic!("Requester pipeline fail {}", string); + } + ResponseBody::BatchData { .. } => { + panic!("wtf BatchData"); + } + ResponseBody::NotExists => { + panic!("wtf NotExists"); + } + ResponseBody::Data(_) => { + panic!("wtf Data"); + } + } + } + recv(receiver) -> resp => { + in_flight_writes += 1; + let (chunk_id, write) = resp.unwrap(); + todo!(); + command_sender.send((RequestBody::Write { + kind: Keyspace::Chunk, + key: chunk_id.to_vec(), + value: write + }, Some(response_tx.clone()))).unwrap(); + } + } + } else { + let resp = response_rx.recv().unwrap(); + match resp { + ResponseBody::Success => { + // nop + } + ResponseBody::Failed(string) => { + panic!("Requester pipeline fail {}", string); + } + ResponseBody::BatchData { .. } => { + panic!("wtf BatchData"); + } + ResponseBody::NotExists => { + panic!("wtf NotExists"); + } + ResponseBody::Data(_) => { + panic!("wtf Data"); + } + } + } + } + }); + + Ok(input) } }