Probably dead-end of client half of stream reader

Let's go for something simpler
This commit is contained in:
Olivier 'reivilibre' 2022-01-23 17:53:01 +00:00
parent 01a68a6f6f
commit f3b867b2d5

View File

@ -21,7 +21,7 @@ use crate::configuration::StreamingReaderConfig;
// data as soon as it doesn't look like it will be read by a sequential reader! // data as soon as it doesn't look like it will be read by a sequential reader!
/// We won't send a different hold_on_at payload until it would unlock at least this many bytes. /// We won't send a different hold_on_at payload until it would unlock at least this many bytes.
pub const HOLD_ON_HYSTERESIS: i64 = 8192; pub const HOLD_ON_HYSTERESIS: i64 = 32768;
pub struct StreamingReaderState { pub struct StreamingReaderState {
pub ring_buffer: VecDeque<u8>, pub ring_buffer: VecDeque<u8>,
@ -33,6 +33,9 @@ pub struct StreamingReaderState {
pub network_next_offset: i64, pub network_next_offset: i64,
/// The `hold_on_at` position we have sent to the server. /// The `hold_on_at` position we have sent to the server.
pub stated_hold_on_at: i64, pub stated_hold_on_at: i64,
/// The number of Seek operations in flight. This determines whether we can update the
/// `hold_on_at` position right now.
pub seeks_in_flight: u32,
/// The next offset we expect a sequential reader to command us to fetch /// The next offset we expect a sequential reader to command us to fetch
pub command_next_offset: i64, pub command_next_offset: i64,
pub accept_more_responses: bool, pub accept_more_responses: bool,
@ -91,10 +94,6 @@ impl StreamingReaderState {
self.process_response(response).await?; self.process_response(response).await?;
} }
} }
if self.stated_hold_on_at + HOLD_ON_HYSTERESIS
< self.config.read_ahead as i64 + self.ring_buffer_read_next_index
{}
} }
// TODO check command queue is empty... // TODO check command queue is empty...
@ -120,6 +119,29 @@ impl StreamingReaderState {
available.start <= wanted_range.start && available.end >= wanted_range.end available.start <= wanted_range.start && available.end >= wanted_range.end
} }
fn satisfy_read(
&mut self,
wanted_byte_range: Range<i64>,
reply_to: OneshotSender<Vec<u8>>,
) -> anyhow::Result<()> {
let skip = wanted_byte_range.start - self.ring_buffer_read_next_index;
let size = wanted_byte_range.end - wanted_byte_range.start;
let drain = skip + size;
let response_buf: Vec<u8> = self
.ring_buffer
.drain(0..(drain as usize))
.skip(skip.try_into().unwrap())
.collect();
reply_to
.send(response_buf)
.map_err(|_| anyhow!("Reply to Read failed"))?;
self.ring_buffer_read_next_index += drain;
Ok(())
}
async fn process_command(&mut self, command: Option<StreamingReaderCmd>) -> anyhow::Result<()> { async fn process_command(&mut self, command: Option<StreamingReaderCmd>) -> anyhow::Result<()> {
match command { match command {
Some(StreamingReaderCmd::Read { Some(StreamingReaderCmd::Read {
@ -131,21 +153,7 @@ impl StreamingReaderState {
if self.is_directly_satisfiable(wanted_range.clone()) { if self.is_directly_satisfiable(wanted_range.clone()) {
// If we can, answer right away. // If we can, answer right away.
// If we do, make sure we trigger more streaming. // If we do, make sure we trigger more streaming.
self.satisfy_read(wanted_range, reply)?;
let skip = offset - self.ring_buffer_read_next_index;
let drain = skip + size as i64;
let response_buf: Vec<u8> = self
.ring_buffer
.drain(0..(drain as usize))
.skip(skip.try_into().unwrap())
.collect();
reply
.send(response_buf)
.map_err(|_| anyhow!("Reply to Read failed"))?;
self.ring_buffer_read_next_index += drain;
} else { } else {
// Otherwise, plop it on the queue. // Otherwise, plop it on the queue.
self.read_queue.push_back((wanted_range, reply)); self.read_queue.push_back((wanted_range, reply));
@ -157,6 +165,18 @@ impl StreamingReaderState {
&StreamingReadRequest::Seek { offset, size }, &StreamingReadRequest::Seek { offset, size },
) )
.await?; .await?;
// Also set the 'hold on at' point
send_bare_message(
&mut self.tx,
&StreamingReadRequest::Continue {
hold_on_at: offset + size as i64 + self.config.read_ahead as i64,
},
)
.await?;
// Prevent this getting overwritten in the meantime: we'll only automatically
// advance the `hold_on_at` position if no Seeks are in flight.
self.seeks_in_flight += 1;
} }
} }
} }
@ -186,12 +206,66 @@ impl StreamingReaderState {
self.running = false; self.running = false;
// TODO cancel existing commands if there are any? // TODO cancel existing commands if there are any?
} }
Some(StreamingReadResponse::Block(_block)) => { Some(StreamingReadResponse::Block(block)) => {
// We might be able to complete a command // We might be able to complete a command
todo!() if self.ring_buffer_write_next_index != self.network_next_offset {
// Clear the buffer because we're not at the right position
self.ring_buffer.clear();
self.ring_buffer_write_next_index = self.network_next_offset;
self.ring_buffer_read_next_index = self.network_next_offset;
}
let block_size: i64 = block.len().try_into().unwrap();
// Append the data to the buffer
self.ring_buffer.extend(block);
self.ring_buffer_write_next_index += block_size;
// Can we now satisfy a read command?
loop {
let can_satisfy_read = self
.read_queue
.front()
.map(|(wanted_byte_range, _)| {
self.is_directly_satisfiable(wanted_byte_range.clone())
})
.unwrap_or(false);
if can_satisfy_read {
let (wanted_byte_range, reply_to) = self.read_queue.pop_front().unwrap();
self.satisfy_read(wanted_byte_range, reply_to)?;
} else {
break;
}
}
if self.seeks_in_flight == 0
&& (self.stated_hold_on_at + HOLD_ON_HYSTERESIS
< self.config.read_ahead as i64 + self.ring_buffer_read_next_index)
{
// Allow the stream to carry on...
// We only do this if no seeks are in flight since a seek suggests we will have
// to reset our buffer shortly.
let next_hold_on_at =
self.config.read_ahead as i64 + self.ring_buffer_read_next_index;
send_bare_message(
&mut self.tx,
&StreamingReadRequest::Continue {
hold_on_at: next_hold_on_at,
},
)
.await?;
self.stated_hold_on_at = next_hold_on_at;
}
} }
Some(StreamingReadResponse::Sought(seek)) => { Some(StreamingReadResponse::Sought(seek)) => {
self.network_next_offset = seek; self.network_next_offset = seek;
assert_ne!(
self.seeks_in_flight, 0,
"Supposedly no seeks in flight. This is a bug."
);
self.seeks_in_flight -= 1;
} }
Some(StreamingReadResponse::Eof) => { Some(StreamingReadResponse::Eof) => {
let _eof_at = self.network_next_offset; let _eof_at = self.network_next_offset;