From f3b867b2d58dba25676761f8bbb1460f85b31bcd Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 23 Jan 2022 17:53:01 +0000 Subject: [PATCH] Probably dead-end of client half of stream reader Let's go for something simpler --- olivefs/src/requester/streaming_reader.rs | 118 ++++++++++++++++++---- 1 file changed, 96 insertions(+), 22 deletions(-) diff --git a/olivefs/src/requester/streaming_reader.rs b/olivefs/src/requester/streaming_reader.rs index 6061f9f..1e0ad32 100644 --- a/olivefs/src/requester/streaming_reader.rs +++ b/olivefs/src/requester/streaming_reader.rs @@ -21,7 +21,7 @@ use crate::configuration::StreamingReaderConfig; // data as soon as it doesn't look like it will be read by a sequential reader! /// We won't send a different hold_on_at payload until it would unlock at least this many bytes. -pub const HOLD_ON_HYSTERESIS: i64 = 8192; +pub const HOLD_ON_HYSTERESIS: i64 = 32768; pub struct StreamingReaderState { pub ring_buffer: VecDeque, @@ -33,6 +33,9 @@ pub struct StreamingReaderState { pub network_next_offset: i64, /// The `hold_on_at` position we have sent to the server. pub stated_hold_on_at: i64, + /// The number of Seek operations in flight. This determines whether we can update the + /// `hold_on_at` position right now. + pub seeks_in_flight: u32, /// The next offset we expect a sequential reader to command us to fetch pub command_next_offset: i64, pub accept_more_responses: bool, @@ -91,10 +94,6 @@ impl StreamingReaderState { self.process_response(response).await?; } } - - if self.stated_hold_on_at + HOLD_ON_HYSTERESIS - < self.config.read_ahead as i64 + self.ring_buffer_read_next_index - {} } // TODO check command queue is empty... @@ -120,6 +119,29 @@ impl StreamingReaderState { available.start <= wanted_range.start && available.end >= wanted_range.end } + fn satisfy_read( + &mut self, + wanted_byte_range: Range, + reply_to: OneshotSender>, + ) -> anyhow::Result<()> { + let skip = wanted_byte_range.start - self.ring_buffer_read_next_index; + let size = wanted_byte_range.end - wanted_byte_range.start; + let drain = skip + size; + + let response_buf: Vec = self + .ring_buffer + .drain(0..(drain as usize)) + .skip(skip.try_into().unwrap()) + .collect(); + + reply_to + .send(response_buf) + .map_err(|_| anyhow!("Reply to Read failed"))?; + + self.ring_buffer_read_next_index += drain; + Ok(()) + } + async fn process_command(&mut self, command: Option) -> anyhow::Result<()> { match command { Some(StreamingReaderCmd::Read { @@ -131,21 +153,7 @@ impl StreamingReaderState { if self.is_directly_satisfiable(wanted_range.clone()) { // If we can, answer right away. // If we do, make sure we trigger more streaming. - - let skip = offset - self.ring_buffer_read_next_index; - let drain = skip + size as i64; - - let response_buf: Vec = self - .ring_buffer - .drain(0..(drain as usize)) - .skip(skip.try_into().unwrap()) - .collect(); - - reply - .send(response_buf) - .map_err(|_| anyhow!("Reply to Read failed"))?; - - self.ring_buffer_read_next_index += drain; + self.satisfy_read(wanted_range, reply)?; } else { // Otherwise, plop it on the queue. self.read_queue.push_back((wanted_range, reply)); @@ -157,6 +165,18 @@ impl StreamingReaderState { &StreamingReadRequest::Seek { offset, size }, ) .await?; + + // Also set the 'hold on at' point + send_bare_message( + &mut self.tx, + &StreamingReadRequest::Continue { + hold_on_at: offset + size as i64 + self.config.read_ahead as i64, + }, + ) + .await?; + // Prevent this getting overwritten in the meantime: we'll only automatically + // advance the `hold_on_at` position if no Seeks are in flight. + self.seeks_in_flight += 1; } } } @@ -186,12 +206,66 @@ impl StreamingReaderState { self.running = false; // TODO cancel existing commands if there are any? } - Some(StreamingReadResponse::Block(_block)) => { + Some(StreamingReadResponse::Block(block)) => { // We might be able to complete a command - todo!() + if self.ring_buffer_write_next_index != self.network_next_offset { + // Clear the buffer because we're not at the right position + self.ring_buffer.clear(); + self.ring_buffer_write_next_index = self.network_next_offset; + self.ring_buffer_read_next_index = self.network_next_offset; + } + + let block_size: i64 = block.len().try_into().unwrap(); + + // Append the data to the buffer + self.ring_buffer.extend(block); + self.ring_buffer_write_next_index += block_size; + + // Can we now satisfy a read command? + loop { + let can_satisfy_read = self + .read_queue + .front() + .map(|(wanted_byte_range, _)| { + self.is_directly_satisfiable(wanted_byte_range.clone()) + }) + .unwrap_or(false); + if can_satisfy_read { + let (wanted_byte_range, reply_to) = self.read_queue.pop_front().unwrap(); + self.satisfy_read(wanted_byte_range, reply_to)?; + } else { + break; + } + } + + if self.seeks_in_flight == 0 + && (self.stated_hold_on_at + HOLD_ON_HYSTERESIS + < self.config.read_ahead as i64 + self.ring_buffer_read_next_index) + { + // Allow the stream to carry on... + // We only do this if no seeks are in flight since a seek suggests we will have + // to reset our buffer shortly. + + let next_hold_on_at = + self.config.read_ahead as i64 + self.ring_buffer_read_next_index; + + send_bare_message( + &mut self.tx, + &StreamingReadRequest::Continue { + hold_on_at: next_hold_on_at, + }, + ) + .await?; + self.stated_hold_on_at = next_hold_on_at; + } } Some(StreamingReadResponse::Sought(seek)) => { self.network_next_offset = seek; + assert_ne!( + self.seeks_in_flight, 0, + "Supposedly no seeks in flight. This is a bug." + ); + self.seeks_in_flight -= 1; } Some(StreamingReadResponse::Eof) => { let _eof_at = self.network_next_offset;