diff --git a/olivefs/src/configuration.rs b/olivefs/src/configuration.rs index 5498e90..9f0b1aa 100644 --- a/olivefs/src/configuration.rs +++ b/olivefs/src/configuration.rs @@ -48,4 +48,11 @@ pub struct StreamingReaderConfig { /// needlessly for random reads. This setting also influences memory usage. /// This might be configurable per-filetype in the future. pub read_ahead: u32, + + /// How many bytes will be requested in each read block. Ideally should be a multiple of + /// the page size on the remote... + pub read_ahead_block_size: u32, + + /// Maximum number of commands in flight at one time. + pub command_limit: u32, } diff --git a/olivefs/src/requester/streaming_reader.rs b/olivefs/src/requester/streaming_reader.rs index 1e0ad32..9abb415 100644 --- a/olivefs/src/requester/streaming_reader.rs +++ b/olivefs/src/requester/streaming_reader.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, bail}; -use olivefs_common::messages::{DataResponse, StreamingReadRequest, StreamingReadResponse}; +use olivefs_common::messages::{DataCommand, DataResponse}; use olivefs_common::networking::{read_bare_message, send_bare_message}; use quinn::{RecvStream, SendStream}; use std::collections::VecDeque; @@ -9,6 +9,7 @@ use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::oneshot::Receiver as OneshotReceiver; use tokio::sync::oneshot::Sender as OneshotSender; +use tokio::sync::{mpsc, oneshot}; use crate::configuration::StreamingReaderConfig; @@ -20,35 +21,46 @@ use crate::configuration::StreamingReaderConfig; // The streaming reader is therefore only designed to work with sequential reads. It will discard // data as soon as it doesn't look like it will be read by a sequential reader! -/// We won't send a different hold_on_at payload until it would unlock at least this many bytes. -pub const HOLD_ON_HYSTERESIS: i64 = 32768; - pub struct StreamingReaderState { + /// The ID of the file that's open + pub file_handle: u32, + + /// The next offset we expect a sequential reader to command us to fetch + pub command_next_offset: i64, + pub commands_in_flight: u32, + + pub responses: Receiver>>, + pub command_rx: Receiver, + pub rx_retrieval: OneshotReceiver, + pub read_queue: VecDeque<(Range, OneshotSender>)>, + pub tx: SendStream, + pub close_command_reply: Option>, + pub running: bool, + + /// Configuration values that tweak our behaviour + pub config: Arc, + /// The ring buffer containing bytes that we can use to service commands pub ring_buffer: VecDeque, /// The next 'read' index in the ring buffer pub ring_buffer_read_next_index: i64, /// The next 'write' index in the ring buffer pub ring_buffer_write_next_index: i64, - /// The next offset we will read from the network - pub network_next_offset: i64, - /// The `hold_on_at` position we have sent to the server. - pub stated_hold_on_at: i64, - /// The number of Seek operations in flight. This determines whether we can update the - /// `hold_on_at` position right now. - pub seeks_in_flight: u32, - /// The next offset we expect a sequential reader to command us to fetch - pub command_next_offset: i64, - pub accept_more_responses: bool, - pub commands_in_flight: u32, - pub command_limit: u32, - pub command_rx: Receiver, - pub responses: Receiver, - pub running: bool, - pub rx_retrieval: OneshotReceiver, - pub read_queue: VecDeque<(Range, OneshotSender>)>, - pub tx: SendStream, - pub close_command_reply: Option>, - pub config: Arc, + /// (offset, true iff this was a jump) + pub requested_offsets: VecDeque<(i64, bool)>, + + /// Next position to issue a readahead at. + pub next_readahead_position: i64, + + /// The latest upcoming jump in position. If set, this is used as the base for readahead + /// rather than the current read position. + /// It is cleared when we jump to that location. + pub upcoming_jump: Option, + /// Number of upcoming jumps. Used to dodge a bug that might occur if the `upcoming_jump` flag + /// is cleared too early (e.g. because we read something that we later jump to). + pub jumps_in_progress: i64, + + /// One-shot channel. Sending a message in this channel will stop the stream reader. + pub reader_stop: OneshotSender<()>, } pub enum StreamingReaderCmd { @@ -66,7 +78,6 @@ pub enum StreamingReaderCmd { pub struct StreamingReader { command_queue_tx: Sender, - // TODO ... commands_in_flight_semaphore: Sem } pub async fn streaming_reader(_command_queue: Receiver) -> anyhow::Result<()> { @@ -74,20 +85,91 @@ pub async fn streaming_reader(_command_queue: Receiver) -> a } impl StreamingReader { - pub async fn read(&self, _offset: i64, _size: u32) -> anyhow::Result> { - todo!() + pub fn new( + file_handle: u32, + tx: SendStream, + rx: RecvStream, + config: Arc, + ) -> StreamingReader { + let (state, command_tx) = StreamingReaderState::new(file_handle, tx, rx, config); + + state.spawn(); + + StreamingReader { + command_queue_tx: command_tx, + } + } + + pub async fn read(&self, offset: i64, size: u32) -> anyhow::Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + self.command_queue_tx + .send(StreamingReaderCmd::Read { + offset, + size, + reply: reply_tx, + }) + .await + .map_err(|_| anyhow!("Unable to send Read command (cascade failure?)"))?; + + Ok(reply_rx.await?) } pub async fn close(&self) -> anyhow::Result<(SendStream, RecvStream)> { - todo!() + let (reply_tx, reply_rx) = oneshot::channel(); + self.command_queue_tx + .send(StreamingReaderCmd::Close { reply: reply_tx }) + .await + .map_err(|_| anyhow!("Unable to send Close command (cascade failure?)"))?; + + Ok(reply_rx.await?) } } impl StreamingReaderState { + pub fn new( + file_handle: u32, + tx: SendStream, + rx: RecvStream, + config: Arc, + ) -> (StreamingReaderState, Sender) { + let (response_tx, response_rx) = mpsc::channel(32); + let (rx_tx, rx_rx) = oneshot::channel(); + let (command_tx, command_rx) = mpsc::channel(32); + let (reader_stop_tx, reader_stop_rx) = oneshot::channel(); + + tokio::spawn(async { stream_reader(rx, response_tx, rx_tx, reader_stop_rx) }); + + let state = StreamingReaderState { + file_handle, + command_next_offset: 0, + commands_in_flight: 0, + responses: response_rx, + command_rx, + rx_retrieval: rx_rx, + read_queue: Default::default(), + tx, + close_command_reply: None, + running: true, + config, + ring_buffer: Default::default(), + ring_buffer_read_next_index: 0, + ring_buffer_write_next_index: 0, + requested_offsets: Default::default(), + next_readahead_position: 0, + upcoming_jump: None, + jumps_in_progress: 0, + reader_stop: reader_stop_tx, + }; + + (state, command_tx) + } + + pub fn spawn(self) {} + pub async fn command_handler(mut self) -> anyhow::Result<()> { - while self.running { + while self.running || !self.requested_offsets.is_empty() { tokio::select! { - command = self.command_rx.recv(), if self.commands_in_flight < self.command_limit => { + command = self.command_rx.recv(), if self.commands_in_flight < self.config.command_limit => { self.process_command(command).await?; } response = self.responses.recv() => { @@ -98,6 +180,11 @@ impl StreamingReaderState { // TODO check command queue is empty... + // Stop the stream reader + self.reader_stop + .send(()) + .map_err(|_| anyhow!("Can't stop stream reader"))?; + // Get the RecvStream back from the stream reader let rx = self.rx_retrieval.await?; if let Some(stream_disposal) = self.close_command_reply.take() { stream_disposal @@ -142,6 +229,37 @@ impl StreamingReaderState { Ok(()) } + async fn issue_readaheads(&mut self) -> anyhow::Result<()> { + let readahead_base = self + .upcoming_jump + .unwrap_or(self.ring_buffer_read_next_index); + let readahead_limit = readahead_base + self.config.read_ahead as i64; + + let bytes_we_could_read_ahead = readahead_limit - self.next_readahead_position; + if bytes_we_could_read_ahead <= 0 { + return Ok(()); + } + + let block_size = self.config.read_ahead_block_size as i64; + + while readahead_limit - self.next_readahead_position > block_size { + let block_offset = self.next_readahead_position; + send_bare_message( + &mut self.tx, + &DataCommand::ReadFile { + file_handle: self.file_handle, + offset: block_offset, + size: block_size.try_into().unwrap(), + }, + ) + .await?; + self.requested_offsets.push_back((block_offset, false)); + self.next_readahead_position += block_size; + } + + Ok(()) + } + async fn process_command(&mut self, command: Option) -> anyhow::Result<()> { match command { Some(StreamingReaderCmd::Read { @@ -162,23 +280,19 @@ impl StreamingReaderState { // We should also request a seek as this isn't a sequential read... send_bare_message( &mut self.tx, - &StreamingReadRequest::Seek { offset, size }, - ) - .await?; - - // Also set the 'hold on at' point - send_bare_message( - &mut self.tx, - &StreamingReadRequest::Continue { - hold_on_at: offset + size as i64 + self.config.read_ahead as i64, + &DataCommand::ReadFile { + file_handle: self.file_handle, + offset, + size, }, ) .await?; - // Prevent this getting overwritten in the meantime: we'll only automatically - // advance the `hold_on_at` position if no Seeks are in flight. - self.seeks_in_flight += 1; + // (This is a jump!) + self.requested_offsets.push_back((offset, true)); + self.next_readahead_position = offset + size as i64; } } + self.issue_readaheads().await?; } Some(StreamingReaderCmd::Close { reply }) => { self.close_command_reply = Some(reply); @@ -188,7 +302,9 @@ impl StreamingReaderState { self.commands_in_flight = u32::MAX; // Instruct the stream to shut down. - send_bare_message(&mut self.tx, &StreamingReadRequest::Stop).await?; + // TODO Drain down in-flight requests + // TODO and close the file + todo!() } None => { bail!("Command sender shut down unexpectedly."); @@ -199,20 +315,35 @@ impl StreamingReaderState { async fn process_response( &mut self, - response: Option, + response: Option>>, ) -> anyhow::Result<()> { match response { - Some(StreamingReadResponse::Stopped) => { - self.running = false; - // TODO cancel existing commands if there are any? - } - Some(StreamingReadResponse::Block(block)) => { + Some(DataResponse::Success(block)) => { + let (requested_offset, is_jump) = self + .requested_offsets + .pop_front() + .ok_or_else(|| anyhow!("Received block without a tracked requested offset."))?; // We might be able to complete a command - if self.ring_buffer_write_next_index != self.network_next_offset { + + // These should be one and the same. + assert_eq!( + is_jump, + self.ring_buffer_write_next_index != requested_offset + ); + + if self.ring_buffer_write_next_index != requested_offset { // Clear the buffer because we're not at the right position self.ring_buffer.clear(); - self.ring_buffer_write_next_index = self.network_next_offset; - self.ring_buffer_read_next_index = self.network_next_offset; + self.ring_buffer_write_next_index = requested_offset; + self.ring_buffer_read_next_index = requested_offset; + } + + if is_jump { + self.jumps_in_progress -= 1; + if self.jumps_in_progress == 0 { + // Clear the 'upcoming jump' since we WERE the jump + self.upcoming_jump = None; + } } let block_size: i64 = block.len().try_into().unwrap(); @@ -238,38 +369,10 @@ impl StreamingReaderState { } } - if self.seeks_in_flight == 0 - && (self.stated_hold_on_at + HOLD_ON_HYSTERESIS - < self.config.read_ahead as i64 + self.ring_buffer_read_next_index) - { - // Allow the stream to carry on... - // We only do this if no seeks are in flight since a seek suggests we will have - // to reset our buffer shortly. - - let next_hold_on_at = - self.config.read_ahead as i64 + self.ring_buffer_read_next_index; - - send_bare_message( - &mut self.tx, - &StreamingReadRequest::Continue { - hold_on_at: next_hold_on_at, - }, - ) - .await?; - self.stated_hold_on_at = next_hold_on_at; - } + self.issue_readaheads().await?; } - Some(StreamingReadResponse::Sought(seek)) => { - self.network_next_offset = seek; - assert_ne!( - self.seeks_in_flight, 0, - "Supposedly no seeks in flight. This is a bug." - ); - self.seeks_in_flight -= 1; - } - Some(StreamingReadResponse::Eof) => { - let _eof_at = self.network_next_offset; - // Handling EOF means completing commands with as much data as they can get... + Some(DataResponse::Error { code, message }) => { + // should probably bubble this up to the reads... todo!() } None => { @@ -285,14 +388,23 @@ impl StreamingReaderState { /// That's why we send them through a channel. async fn stream_reader( mut rx: RecvStream, - sender: Sender, + sender: Sender>>, rx_disposal: OneshotSender, + mut time_to_close: OneshotReceiver<()>, ) -> anyhow::Result<()> { loop { - let msg: Option> = read_bare_message(&mut rx).await?; + let message = tokio::select! { + message = read_bare_message(&mut rx) => { + message + }, + _ = &mut time_to_close => { + return Ok(()); + } + }; + let msg: Option>> = message?; match msg { - Some(DataResponse::Success(response)) => { - let is_stopping = &response == &StreamingReadResponse::Stopped; + Some(response) => { + let is_stopping = false; // TODO!!! sender.send(response).await?; if is_stopping { rx_disposal @@ -301,10 +413,6 @@ async fn stream_reader( break; } } - Some(DataResponse::Error { code, message }) => { - // TODO we probably want to bubble this error up to the read requests somehow. - bail!("stream_reader: Error from remote: {:?} {:?}", code, message); - } None => { // We never run the stream dry ourselves, so this is odd. bail!("Unexpected end of stream (stream_reader)."); diff --git a/olivefs_common/src/messages.rs b/olivefs_common/src/messages.rs index c00efed..e69edfd 100644 --- a/olivefs_common/src/messages.rs +++ b/olivefs_common/src/messages.rs @@ -157,28 +157,3 @@ pub struct DirectoryEntry { pub kind: FileKind, pub name: String, } - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum StreamingReadRequest { - /// Unblocks streaming up until a certain position. - /// This is the mechanism used to apply backpressure. - Continue { hold_on_at: i64 }, - /// Please move to a different position in the file and read at least this much for the first - /// block. Usually will be followed up by a 'Continue' request. - Seek { offset: i64, size: u32 }, - /// Please stop. Further messages are not StreamingReadRequests. - Stop, -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub enum StreamingReadResponse { - /// This is what 'streams' — sequential blocks of data - Block(Vec), - /// The end of the file has been reached - Eof, - /// Stream moved due to request. Here is the actual position. - Sought(i64), - /// Stream stopped as requested. - /// Further messages are not StreamingReadResponses. - Stopped, -}