diff --git a/olivefs.toml b/olivefs.toml index eb3e905..49bb2be 100644 --- a/olivefs.toml +++ b/olivefs.toml @@ -11,3 +11,8 @@ ca_certificate = "ca/ca.crt" timeout = 60 keep_alive = 30 + +[performance.streaming_reader] +read_ahead = 1048576 +read_ahead_block_size = 131072 +command_limit = 64 diff --git a/olivefs/src/requester/streaming_reader.rs b/olivefs/src/requester/streaming_reader.rs index 0c2d04e..c2bba10 100644 --- a/olivefs/src/requester/streaming_reader.rs +++ b/olivefs/src/requester/streaming_reader.rs @@ -1,5 +1,7 @@ use anyhow::{anyhow, bail}; +use std::cmp::min; +use log::{debug, error, warn}; use olivefs_common::messages::{DataCommand, DataResponse}; use olivefs_common::networking::{read_bare_message, send_bare_message}; use quinn::{RecvStream, SendStream}; @@ -41,12 +43,14 @@ pub struct StreamingReaderState { pub config: Arc, /// The ring buffer containing bytes that we can use to service commands pub ring_buffer: VecDeque, + /// Whether the end of the ring buffer is the end of the file + pub buffer_end_is_eof: bool, /// The next 'read' index in the ring buffer pub ring_buffer_read_next_index: i64, /// The next 'write' index in the ring buffer pub ring_buffer_write_next_index: i64, /// (offset, true iff this was a jump) - pub requested_offsets: VecDeque<(i64, bool)>, + pub in_flight_requests: VecDeque<(i64, u32, bool)>, /// Next position to issue a readahead at. pub next_readahead_position: i64, @@ -137,7 +141,11 @@ impl StreamingReaderState { let (command_tx, command_rx) = mpsc::channel(32); let (reader_stop_tx, reader_stop_rx) = oneshot::channel(); - tokio::spawn(async { stream_reader(rx, response_tx, rx_tx, reader_stop_rx) }); + tokio::spawn(async { + if let Err(error) = stream_reader(rx, response_tx, rx_tx, reader_stop_rx).await { + error!("stream_reader Error: {:?}", error); + } + }); let state = StreamingReaderState { file_handle, @@ -152,9 +160,10 @@ impl StreamingReaderState { running: true, config, ring_buffer: Default::default(), + buffer_end_is_eof: false, ring_buffer_read_next_index: 0, ring_buffer_write_next_index: 0, - requested_offsets: Default::default(), + in_flight_requests: Default::default(), next_readahead_position: 0, upcoming_jump: None, jumps_in_progress: 0, @@ -164,10 +173,16 @@ impl StreamingReaderState { (state, command_tx) } - pub fn spawn(self) {} + pub fn spawn(self) { + tokio::spawn(async move { + if let Err(error) = self.command_handler().await { + error!("StreamingReader Error: {:?}", error); + } + }); + } pub async fn command_handler(mut self) -> anyhow::Result<()> { - while self.running || !self.requested_offsets.is_empty() { + while self.running || !self.in_flight_requests.is_empty() { tokio::select! { command = self.command_rx.recv(), if self.commands_in_flight < self.config.command_limit => { self.process_command(command).await?; @@ -178,14 +193,35 @@ impl StreamingReaderState { } } - // TODO check command queue is empty... + if !self.read_queue.is_empty() { + warn!("Read queue not empty before shutting down."); + } // Stop the stream reader self.reader_stop .send(()) .map_err(|_| anyhow!("Can't stop stream reader"))?; // Get the RecvStream back from the stream reader - let rx = self.rx_retrieval.await?; + let mut rx = self.rx_retrieval.await?; + + // Close the file handle on the server + send_bare_message( + &mut self.tx, + &DataCommand::ReleaseFile { + file_handle: self.file_handle, + }, + ) + .await?; + let response: DataResponse<()> = read_bare_message(&mut rx) + .await? + .ok_or_else(|| anyhow!("End of stream whilst waiting for ReleaseFile confirmation"))?; + + if let DataResponse::Error { code, message } = response { + error!("Failed to ReleaseFile: {:?} {:?}", code, message); + } else { + debug!("ReleaseFile fine."); + } + if let Some(stream_disposal) = self.close_command_reply.take() { stream_disposal .send((self.tx, rx)) @@ -203,7 +239,8 @@ impl StreamingReaderState { fn is_directly_satisfiable(&self, wanted_range: Range) -> bool { let available = self.available_byte_range(); - available.start <= wanted_range.start && available.end >= wanted_range.end + available.start <= wanted_range.start + && (available.end >= wanted_range.end || self.buffer_end_is_eof) } fn satisfy_read( @@ -212,7 +249,11 @@ impl StreamingReaderState { reply_to: OneshotSender>>, ) -> anyhow::Result<()> { let skip = wanted_byte_range.start - self.ring_buffer_read_next_index; - let size = wanted_byte_range.end - wanted_byte_range.start; + let size = if self.buffer_end_is_eof { + min(self.ring_buffer_write_next_index, wanted_byte_range.end) - wanted_byte_range.start + } else { + wanted_byte_range.end - wanted_byte_range.start + }; let drain = skip + size; let response_buf: Vec = self @@ -230,6 +271,10 @@ impl StreamingReaderState { } async fn issue_readaheads(&mut self) -> anyhow::Result<()> { + if self.upcoming_jump.is_none() && self.buffer_end_is_eof { + // Don't read ahead any more — there's nothing to read ahead! + } + let readahead_base = self .upcoming_jump .unwrap_or(self.ring_buffer_read_next_index); @@ -253,7 +298,11 @@ impl StreamingReaderState { }, ) .await?; - self.requested_offsets.push_back((block_offset, false)); + self.in_flight_requests.push_back(( + block_offset, + block_size.try_into().unwrap(), + false, + )); self.next_readahead_position += block_size; } @@ -267,6 +316,7 @@ impl StreamingReaderState { size, reply, }) => { + debug!("Read @{:?} {:?}", offset, size); let wanted_range = offset..(offset + size as i64); if self.is_directly_satisfiable(wanted_range.clone()) { // If we can, answer right away. @@ -288,23 +338,25 @@ impl StreamingReaderState { ) .await?; // (This is a jump!) - self.requested_offsets.push_back((offset, true)); + self.in_flight_requests.push_back((offset, size, true)); self.next_readahead_position = offset + size as i64; } } + + self.command_next_offset = offset + size as i64; + self.issue_readaheads().await?; } Some(StreamingReaderCmd::Close { reply }) => { + debug!("Close"); self.close_command_reply = Some(reply); // Ensure that we don't accept any more commands. // (This also prevents us repeatedly reading None...) self.commands_in_flight = u32::MAX; - // Instruct the stream to shut down. - // TODO Drain down in-flight requests - // TODO and close the file - todo!() + // Wind down. + self.running = false; } None => { bail!("Command sender shut down unexpectedly."); @@ -319,17 +371,32 @@ impl StreamingReaderState { ) -> anyhow::Result<()> { match response { Some(DataResponse::Success(block)) => { - let (requested_offset, is_jump) = self - .requested_offsets + debug!("Success (block: {:?} bytes)", block.len()); + let (requested_offset, requested_size, is_jump) = self + .in_flight_requests .pop_front() .ok_or_else(|| anyhow!("Received block without a tracked requested offset."))?; - // We might be able to complete a command + + debug!( + "req offs {:?}, is_jump {:?}, rb write {:?}", + requested_offset, is_jump, self.ring_buffer_write_next_index + ); + + if self.buffer_end_is_eof && block.is_empty() { + debug!("already EOF, have read 0 B. Ignoring."); + return Ok(()); + } + + let is_eof = requested_size as usize > block.len(); + self.buffer_end_is_eof = is_eof; + + // We might be able to complete a command. // These should be one and the same. - assert_eq!( - is_jump, - self.ring_buffer_write_next_index != requested_offset - ); + // How do `is_jump` and `self.ring_buffer_write_next_index != requested_offset` differ? + // is_jump implies `self.ring_buffer_write_next_index != requested_offset` + // but `self.ring_buffer_write_next_index != requested_offset` can also be caused + // if we've ran over the end of the file previously. if self.ring_buffer_write_next_index != requested_offset { // Clear the buffer because we're not at the right position @@ -372,6 +439,7 @@ impl StreamingReaderState { self.issue_readaheads().await?; } Some(DataResponse::Error { code, message }) => { + warn!("Error {:?} {:?}", code, message); // should probably bubble this up to the reads... for (_, reply) in self.read_queue.drain(..) { reply @@ -405,23 +473,21 @@ async fn stream_reader( message }, _ = &mut time_to_close => { - return Ok(()); + rx_disposal + .send(rx) + .map_err(|_| anyhow!("Failed to give back rx"))?; + break; } }; let msg: Option>> = message?; match msg { Some(response) => { - let is_stopping = false; // TODO!!! + debug!("stream_reader: new DataResponse"); sender.send(response).await?; - if is_stopping { - rx_disposal - .send(rx) - .map_err(|_| anyhow!("Failed to give back rx"))?; - break; - } } None => { // We never run the stream dry ourselves, so this is odd. + warn!("stream_reader: Unexpected end of stream (stream_reader)."); bail!("Unexpected end of stream (stream_reader)."); } }