Fix EOF in the streaming reader

This commit is contained in:
Olivier 'reivilibre' 2022-01-24 07:07:52 +00:00
parent 3b9f9ab7cd
commit bf3f8828d9
2 changed files with 101 additions and 30 deletions

View File

@ -11,3 +11,8 @@ ca_certificate = "ca/ca.crt"
timeout = 60 timeout = 60
keep_alive = 30 keep_alive = 30
[performance.streaming_reader]
read_ahead = 1048576
read_ahead_block_size = 131072
command_limit = 64

View File

@ -1,5 +1,7 @@
use anyhow::{anyhow, bail}; use anyhow::{anyhow, bail};
use std::cmp::min;
use log::{debug, error, warn};
use olivefs_common::messages::{DataCommand, DataResponse}; use olivefs_common::messages::{DataCommand, DataResponse};
use olivefs_common::networking::{read_bare_message, send_bare_message}; use olivefs_common::networking::{read_bare_message, send_bare_message};
use quinn::{RecvStream, SendStream}; use quinn::{RecvStream, SendStream};
@ -41,12 +43,14 @@ pub struct StreamingReaderState {
pub config: Arc<StreamingReaderConfig>, pub config: Arc<StreamingReaderConfig>,
/// The ring buffer containing bytes that we can use to service commands /// The ring buffer containing bytes that we can use to service commands
pub ring_buffer: VecDeque<u8>, pub ring_buffer: VecDeque<u8>,
/// Whether the end of the ring buffer is the end of the file
pub buffer_end_is_eof: bool,
/// The next 'read' index in the ring buffer /// The next 'read' index in the ring buffer
pub ring_buffer_read_next_index: i64, pub ring_buffer_read_next_index: i64,
/// The next 'write' index in the ring buffer /// The next 'write' index in the ring buffer
pub ring_buffer_write_next_index: i64, pub ring_buffer_write_next_index: i64,
/// (offset, true iff this was a jump) /// (offset, true iff this was a jump)
pub requested_offsets: VecDeque<(i64, bool)>, pub in_flight_requests: VecDeque<(i64, u32, bool)>,
/// Next position to issue a readahead at. /// Next position to issue a readahead at.
pub next_readahead_position: i64, pub next_readahead_position: i64,
@ -137,7 +141,11 @@ impl StreamingReaderState {
let (command_tx, command_rx) = mpsc::channel(32); let (command_tx, command_rx) = mpsc::channel(32);
let (reader_stop_tx, reader_stop_rx) = oneshot::channel(); let (reader_stop_tx, reader_stop_rx) = oneshot::channel();
tokio::spawn(async { stream_reader(rx, response_tx, rx_tx, reader_stop_rx) }); tokio::spawn(async {
if let Err(error) = stream_reader(rx, response_tx, rx_tx, reader_stop_rx).await {
error!("stream_reader Error: {:?}", error);
}
});
let state = StreamingReaderState { let state = StreamingReaderState {
file_handle, file_handle,
@ -152,9 +160,10 @@ impl StreamingReaderState {
running: true, running: true,
config, config,
ring_buffer: Default::default(), ring_buffer: Default::default(),
buffer_end_is_eof: false,
ring_buffer_read_next_index: 0, ring_buffer_read_next_index: 0,
ring_buffer_write_next_index: 0, ring_buffer_write_next_index: 0,
requested_offsets: Default::default(), in_flight_requests: Default::default(),
next_readahead_position: 0, next_readahead_position: 0,
upcoming_jump: None, upcoming_jump: None,
jumps_in_progress: 0, jumps_in_progress: 0,
@ -164,10 +173,16 @@ impl StreamingReaderState {
(state, command_tx) (state, command_tx)
} }
pub fn spawn(self) {} pub fn spawn(self) {
tokio::spawn(async move {
if let Err(error) = self.command_handler().await {
error!("StreamingReader Error: {:?}", error);
}
});
}
pub async fn command_handler(mut self) -> anyhow::Result<()> { pub async fn command_handler(mut self) -> anyhow::Result<()> {
while self.running || !self.requested_offsets.is_empty() { while self.running || !self.in_flight_requests.is_empty() {
tokio::select! { tokio::select! {
command = self.command_rx.recv(), if self.commands_in_flight < self.config.command_limit => { command = self.command_rx.recv(), if self.commands_in_flight < self.config.command_limit => {
self.process_command(command).await?; self.process_command(command).await?;
@ -178,14 +193,35 @@ impl StreamingReaderState {
} }
} }
// TODO check command queue is empty... if !self.read_queue.is_empty() {
warn!("Read queue not empty before shutting down.");
}
// Stop the stream reader // Stop the stream reader
self.reader_stop self.reader_stop
.send(()) .send(())
.map_err(|_| anyhow!("Can't stop stream reader"))?; .map_err(|_| anyhow!("Can't stop stream reader"))?;
// Get the RecvStream back from the stream reader // Get the RecvStream back from the stream reader
let rx = self.rx_retrieval.await?; let mut rx = self.rx_retrieval.await?;
// Close the file handle on the server
send_bare_message(
&mut self.tx,
&DataCommand::ReleaseFile {
file_handle: self.file_handle,
},
)
.await?;
let response: DataResponse<()> = read_bare_message(&mut rx)
.await?
.ok_or_else(|| anyhow!("End of stream whilst waiting for ReleaseFile confirmation"))?;
if let DataResponse::Error { code, message } = response {
error!("Failed to ReleaseFile: {:?} {:?}", code, message);
} else {
debug!("ReleaseFile fine.");
}
if let Some(stream_disposal) = self.close_command_reply.take() { if let Some(stream_disposal) = self.close_command_reply.take() {
stream_disposal stream_disposal
.send((self.tx, rx)) .send((self.tx, rx))
@ -203,7 +239,8 @@ impl StreamingReaderState {
fn is_directly_satisfiable(&self, wanted_range: Range<i64>) -> bool { fn is_directly_satisfiable(&self, wanted_range: Range<i64>) -> bool {
let available = self.available_byte_range(); let available = self.available_byte_range();
available.start <= wanted_range.start && available.end >= wanted_range.end available.start <= wanted_range.start
&& (available.end >= wanted_range.end || self.buffer_end_is_eof)
} }
fn satisfy_read( fn satisfy_read(
@ -212,7 +249,11 @@ impl StreamingReaderState {
reply_to: OneshotSender<DataResponse<Vec<u8>>>, reply_to: OneshotSender<DataResponse<Vec<u8>>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let skip = wanted_byte_range.start - self.ring_buffer_read_next_index; let skip = wanted_byte_range.start - self.ring_buffer_read_next_index;
let size = wanted_byte_range.end - wanted_byte_range.start; let size = if self.buffer_end_is_eof {
min(self.ring_buffer_write_next_index, wanted_byte_range.end) - wanted_byte_range.start
} else {
wanted_byte_range.end - wanted_byte_range.start
};
let drain = skip + size; let drain = skip + size;
let response_buf: Vec<u8> = self let response_buf: Vec<u8> = self
@ -230,6 +271,10 @@ impl StreamingReaderState {
} }
async fn issue_readaheads(&mut self) -> anyhow::Result<()> { async fn issue_readaheads(&mut self) -> anyhow::Result<()> {
if self.upcoming_jump.is_none() && self.buffer_end_is_eof {
// Don't read ahead any more — there's nothing to read ahead!
}
let readahead_base = self let readahead_base = self
.upcoming_jump .upcoming_jump
.unwrap_or(self.ring_buffer_read_next_index); .unwrap_or(self.ring_buffer_read_next_index);
@ -253,7 +298,11 @@ impl StreamingReaderState {
}, },
) )
.await?; .await?;
self.requested_offsets.push_back((block_offset, false)); self.in_flight_requests.push_back((
block_offset,
block_size.try_into().unwrap(),
false,
));
self.next_readahead_position += block_size; self.next_readahead_position += block_size;
} }
@ -267,6 +316,7 @@ impl StreamingReaderState {
size, size,
reply, reply,
}) => { }) => {
debug!("Read @{:?} {:?}", offset, size);
let wanted_range = offset..(offset + size as i64); let wanted_range = offset..(offset + size as i64);
if self.is_directly_satisfiable(wanted_range.clone()) { if self.is_directly_satisfiable(wanted_range.clone()) {
// If we can, answer right away. // If we can, answer right away.
@ -288,23 +338,25 @@ impl StreamingReaderState {
) )
.await?; .await?;
// (This is a jump!) // (This is a jump!)
self.requested_offsets.push_back((offset, true)); self.in_flight_requests.push_back((offset, size, true));
self.next_readahead_position = offset + size as i64; self.next_readahead_position = offset + size as i64;
} }
} }
self.command_next_offset = offset + size as i64;
self.issue_readaheads().await?; self.issue_readaheads().await?;
} }
Some(StreamingReaderCmd::Close { reply }) => { Some(StreamingReaderCmd::Close { reply }) => {
debug!("Close");
self.close_command_reply = Some(reply); self.close_command_reply = Some(reply);
// Ensure that we don't accept any more commands. // Ensure that we don't accept any more commands.
// (This also prevents us repeatedly reading None...) // (This also prevents us repeatedly reading None...)
self.commands_in_flight = u32::MAX; self.commands_in_flight = u32::MAX;
// Instruct the stream to shut down. // Wind down.
// TODO Drain down in-flight requests self.running = false;
// TODO and close the file
todo!()
} }
None => { None => {
bail!("Command sender shut down unexpectedly."); bail!("Command sender shut down unexpectedly.");
@ -319,17 +371,32 @@ impl StreamingReaderState {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match response { match response {
Some(DataResponse::Success(block)) => { Some(DataResponse::Success(block)) => {
let (requested_offset, is_jump) = self debug!("Success (block: {:?} bytes)", block.len());
.requested_offsets let (requested_offset, requested_size, is_jump) = self
.in_flight_requests
.pop_front() .pop_front()
.ok_or_else(|| anyhow!("Received block without a tracked requested offset."))?; .ok_or_else(|| anyhow!("Received block without a tracked requested offset."))?;
// We might be able to complete a command
debug!(
"req offs {:?}, is_jump {:?}, rb write {:?}",
requested_offset, is_jump, self.ring_buffer_write_next_index
);
if self.buffer_end_is_eof && block.is_empty() {
debug!("already EOF, have read 0 B. Ignoring.");
return Ok(());
}
let is_eof = requested_size as usize > block.len();
self.buffer_end_is_eof = is_eof;
// We might be able to complete a command.
// These should be one and the same. // These should be one and the same.
assert_eq!( // How do `is_jump` and `self.ring_buffer_write_next_index != requested_offset` differ?
is_jump, // is_jump implies `self.ring_buffer_write_next_index != requested_offset`
self.ring_buffer_write_next_index != requested_offset // but `self.ring_buffer_write_next_index != requested_offset` can also be caused
); // if we've ran over the end of the file previously.
if self.ring_buffer_write_next_index != requested_offset { if self.ring_buffer_write_next_index != requested_offset {
// Clear the buffer because we're not at the right position // Clear the buffer because we're not at the right position
@ -372,6 +439,7 @@ impl StreamingReaderState {
self.issue_readaheads().await?; self.issue_readaheads().await?;
} }
Some(DataResponse::Error { code, message }) => { Some(DataResponse::Error { code, message }) => {
warn!("Error {:?} {:?}", code, message);
// should probably bubble this up to the reads... // should probably bubble this up to the reads...
for (_, reply) in self.read_queue.drain(..) { for (_, reply) in self.read_queue.drain(..) {
reply reply
@ -405,23 +473,21 @@ async fn stream_reader(
message message
}, },
_ = &mut time_to_close => { _ = &mut time_to_close => {
return Ok(()); rx_disposal
.send(rx)
.map_err(|_| anyhow!("Failed to give back rx"))?;
break;
} }
}; };
let msg: Option<DataResponse<Vec<u8>>> = message?; let msg: Option<DataResponse<Vec<u8>>> = message?;
match msg { match msg {
Some(response) => { Some(response) => {
let is_stopping = false; // TODO!!! debug!("stream_reader: new DataResponse");
sender.send(response).await?; sender.send(response).await?;
if is_stopping {
rx_disposal
.send(rx)
.map_err(|_| anyhow!("Failed to give back rx"))?;
break;
}
} }
None => { None => {
// We never run the stream dry ourselves, so this is odd. // We never run the stream dry ourselves, so this is odd.
warn!("stream_reader: Unexpected end of stream (stream_reader).");
bail!("Unexpected end of stream (stream_reader)."); bail!("Unexpected end of stream (stream_reader).");
} }
} }