Sketch out a simpler pipelined reader

This commit is contained in:
Olivier 'reivilibre' 2022-01-23 20:02:35 +00:00
parent f3b867b2d5
commit 86d94e62fa
3 changed files with 206 additions and 116 deletions

View File

@ -48,4 +48,11 @@ pub struct StreamingReaderConfig {
/// needlessly for random reads. This setting also influences memory usage.
/// This might be configurable per-filetype in the future.
pub read_ahead: u32,
/// How many bytes will be requested in each read block. Ideally should be a multiple of
/// the page size on the remote...
pub read_ahead_block_size: u32,
/// Maximum number of commands in flight at one time.
pub command_limit: u32,
}

View File

@ -1,6 +1,6 @@
use anyhow::{anyhow, bail};
use olivefs_common::messages::{DataResponse, StreamingReadRequest, StreamingReadResponse};
use olivefs_common::messages::{DataCommand, DataResponse};
use olivefs_common::networking::{read_bare_message, send_bare_message};
use quinn::{RecvStream, SendStream};
use std::collections::VecDeque;
@ -9,6 +9,7 @@ use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot::Receiver as OneshotReceiver;
use tokio::sync::oneshot::Sender as OneshotSender;
use tokio::sync::{mpsc, oneshot};
use crate::configuration::StreamingReaderConfig;
@ -20,35 +21,46 @@ use crate::configuration::StreamingReaderConfig;
// The streaming reader is therefore only designed to work with sequential reads. It will discard
// data as soon as it doesn't look like it will be read by a sequential reader!
/// We won't send a different hold_on_at payload until it would unlock at least this many bytes.
pub const HOLD_ON_HYSTERESIS: i64 = 32768;
pub struct StreamingReaderState {
/// The ID of the file that's open
pub file_handle: u32,
/// The next offset we expect a sequential reader to command us to fetch
pub command_next_offset: i64,
pub commands_in_flight: u32,
pub responses: Receiver<DataResponse<Vec<u8>>>,
pub command_rx: Receiver<StreamingReaderCmd>,
pub rx_retrieval: OneshotReceiver<RecvStream>,
pub read_queue: VecDeque<(Range<i64>, OneshotSender<Vec<u8>>)>,
pub tx: SendStream,
pub close_command_reply: Option<OneshotSender<(SendStream, RecvStream)>>,
pub running: bool,
/// Configuration values that tweak our behaviour
pub config: Arc<StreamingReaderConfig>,
/// The ring buffer containing bytes that we can use to service commands
pub ring_buffer: VecDeque<u8>,
/// The next 'read' index in the ring buffer
pub ring_buffer_read_next_index: i64,
/// The next 'write' index in the ring buffer
pub ring_buffer_write_next_index: i64,
/// The next offset we will read from the network
pub network_next_offset: i64,
/// The `hold_on_at` position we have sent to the server.
pub stated_hold_on_at: i64,
/// The number of Seek operations in flight. This determines whether we can update the
/// `hold_on_at` position right now.
pub seeks_in_flight: u32,
/// The next offset we expect a sequential reader to command us to fetch
pub command_next_offset: i64,
pub accept_more_responses: bool,
pub commands_in_flight: u32,
pub command_limit: u32,
pub command_rx: Receiver<StreamingReaderCmd>,
pub responses: Receiver<StreamingReadResponse>,
pub running: bool,
pub rx_retrieval: OneshotReceiver<RecvStream>,
pub read_queue: VecDeque<(Range<i64>, OneshotSender<Vec<u8>>)>,
pub tx: SendStream,
pub close_command_reply: Option<OneshotSender<(SendStream, RecvStream)>>,
pub config: Arc<StreamingReaderConfig>,
/// (offset, true iff this was a jump)
pub requested_offsets: VecDeque<(i64, bool)>,
/// Next position to issue a readahead at.
pub next_readahead_position: i64,
/// The latest upcoming jump in position. If set, this is used as the base for readahead
/// rather than the current read position.
/// It is cleared when we jump to that location.
pub upcoming_jump: Option<i64>,
/// Number of upcoming jumps. Used to dodge a bug that might occur if the `upcoming_jump` flag
/// is cleared too early (e.g. because we read something that we later jump to).
pub jumps_in_progress: i64,
/// One-shot channel. Sending a message in this channel will stop the stream reader.
pub reader_stop: OneshotSender<()>,
}
pub enum StreamingReaderCmd {
@ -66,7 +78,6 @@ pub enum StreamingReaderCmd {
pub struct StreamingReader {
command_queue_tx: Sender<StreamingReaderCmd>,
// TODO ... commands_in_flight_semaphore: Sem
}
pub async fn streaming_reader(_command_queue: Receiver<StreamingReaderCmd>) -> anyhow::Result<()> {
@ -74,20 +85,91 @@ pub async fn streaming_reader(_command_queue: Receiver<StreamingReaderCmd>) -> a
}
impl StreamingReader {
pub async fn read(&self, _offset: i64, _size: u32) -> anyhow::Result<Vec<u8>> {
todo!()
pub fn new(
file_handle: u32,
tx: SendStream,
rx: RecvStream,
config: Arc<StreamingReaderConfig>,
) -> StreamingReader {
let (state, command_tx) = StreamingReaderState::new(file_handle, tx, rx, config);
state.spawn();
StreamingReader {
command_queue_tx: command_tx,
}
}
pub async fn read(&self, offset: i64, size: u32) -> anyhow::Result<Vec<u8>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_queue_tx
.send(StreamingReaderCmd::Read {
offset,
size,
reply: reply_tx,
})
.await
.map_err(|_| anyhow!("Unable to send Read command (cascade failure?)"))?;
Ok(reply_rx.await?)
}
pub async fn close(&self) -> anyhow::Result<(SendStream, RecvStream)> {
todo!()
let (reply_tx, reply_rx) = oneshot::channel();
self.command_queue_tx
.send(StreamingReaderCmd::Close { reply: reply_tx })
.await
.map_err(|_| anyhow!("Unable to send Close command (cascade failure?)"))?;
Ok(reply_rx.await?)
}
}
impl StreamingReaderState {
pub fn new(
file_handle: u32,
tx: SendStream,
rx: RecvStream,
config: Arc<StreamingReaderConfig>,
) -> (StreamingReaderState, Sender<StreamingReaderCmd>) {
let (response_tx, response_rx) = mpsc::channel(32);
let (rx_tx, rx_rx) = oneshot::channel();
let (command_tx, command_rx) = mpsc::channel(32);
let (reader_stop_tx, reader_stop_rx) = oneshot::channel();
tokio::spawn(async { stream_reader(rx, response_tx, rx_tx, reader_stop_rx) });
let state = StreamingReaderState {
file_handle,
command_next_offset: 0,
commands_in_flight: 0,
responses: response_rx,
command_rx,
rx_retrieval: rx_rx,
read_queue: Default::default(),
tx,
close_command_reply: None,
running: true,
config,
ring_buffer: Default::default(),
ring_buffer_read_next_index: 0,
ring_buffer_write_next_index: 0,
requested_offsets: Default::default(),
next_readahead_position: 0,
upcoming_jump: None,
jumps_in_progress: 0,
reader_stop: reader_stop_tx,
};
(state, command_tx)
}
pub fn spawn(self) {}
pub async fn command_handler(mut self) -> anyhow::Result<()> {
while self.running {
while self.running || !self.requested_offsets.is_empty() {
tokio::select! {
command = self.command_rx.recv(), if self.commands_in_flight < self.command_limit => {
command = self.command_rx.recv(), if self.commands_in_flight < self.config.command_limit => {
self.process_command(command).await?;
}
response = self.responses.recv() => {
@ -98,6 +180,11 @@ impl StreamingReaderState {
// TODO check command queue is empty...
// Stop the stream reader
self.reader_stop
.send(())
.map_err(|_| anyhow!("Can't stop stream reader"))?;
// Get the RecvStream back from the stream reader
let rx = self.rx_retrieval.await?;
if let Some(stream_disposal) = self.close_command_reply.take() {
stream_disposal
@ -142,6 +229,37 @@ impl StreamingReaderState {
Ok(())
}
async fn issue_readaheads(&mut self) -> anyhow::Result<()> {
let readahead_base = self
.upcoming_jump
.unwrap_or(self.ring_buffer_read_next_index);
let readahead_limit = readahead_base + self.config.read_ahead as i64;
let bytes_we_could_read_ahead = readahead_limit - self.next_readahead_position;
if bytes_we_could_read_ahead <= 0 {
return Ok(());
}
let block_size = self.config.read_ahead_block_size as i64;
while readahead_limit - self.next_readahead_position > block_size {
let block_offset = self.next_readahead_position;
send_bare_message(
&mut self.tx,
&DataCommand::ReadFile {
file_handle: self.file_handle,
offset: block_offset,
size: block_size.try_into().unwrap(),
},
)
.await?;
self.requested_offsets.push_back((block_offset, false));
self.next_readahead_position += block_size;
}
Ok(())
}
async fn process_command(&mut self, command: Option<StreamingReaderCmd>) -> anyhow::Result<()> {
match command {
Some(StreamingReaderCmd::Read {
@ -162,23 +280,19 @@ impl StreamingReaderState {
// We should also request a seek as this isn't a sequential read...
send_bare_message(
&mut self.tx,
&StreamingReadRequest::Seek { offset, size },
)
.await?;
// Also set the 'hold on at' point
send_bare_message(
&mut self.tx,
&StreamingReadRequest::Continue {
hold_on_at: offset + size as i64 + self.config.read_ahead as i64,
&DataCommand::ReadFile {
file_handle: self.file_handle,
offset,
size,
},
)
.await?;
// Prevent this getting overwritten in the meantime: we'll only automatically
// advance the `hold_on_at` position if no Seeks are in flight.
self.seeks_in_flight += 1;
// (This is a jump!)
self.requested_offsets.push_back((offset, true));
self.next_readahead_position = offset + size as i64;
}
}
self.issue_readaheads().await?;
}
Some(StreamingReaderCmd::Close { reply }) => {
self.close_command_reply = Some(reply);
@ -188,7 +302,9 @@ impl StreamingReaderState {
self.commands_in_flight = u32::MAX;
// Instruct the stream to shut down.
send_bare_message(&mut self.tx, &StreamingReadRequest::Stop).await?;
// TODO Drain down in-flight requests
// TODO and close the file
todo!()
}
None => {
bail!("Command sender shut down unexpectedly.");
@ -199,20 +315,35 @@ impl StreamingReaderState {
async fn process_response(
&mut self,
response: Option<StreamingReadResponse>,
response: Option<DataResponse<Vec<u8>>>,
) -> anyhow::Result<()> {
match response {
Some(StreamingReadResponse::Stopped) => {
self.running = false;
// TODO cancel existing commands if there are any?
}
Some(StreamingReadResponse::Block(block)) => {
Some(DataResponse::Success(block)) => {
let (requested_offset, is_jump) = self
.requested_offsets
.pop_front()
.ok_or_else(|| anyhow!("Received block without a tracked requested offset."))?;
// We might be able to complete a command
if self.ring_buffer_write_next_index != self.network_next_offset {
// These should be one and the same.
assert_eq!(
is_jump,
self.ring_buffer_write_next_index != requested_offset
);
if self.ring_buffer_write_next_index != requested_offset {
// Clear the buffer because we're not at the right position
self.ring_buffer.clear();
self.ring_buffer_write_next_index = self.network_next_offset;
self.ring_buffer_read_next_index = self.network_next_offset;
self.ring_buffer_write_next_index = requested_offset;
self.ring_buffer_read_next_index = requested_offset;
}
if is_jump {
self.jumps_in_progress -= 1;
if self.jumps_in_progress == 0 {
// Clear the 'upcoming jump' since we WERE the jump
self.upcoming_jump = None;
}
}
let block_size: i64 = block.len().try_into().unwrap();
@ -238,38 +369,10 @@ impl StreamingReaderState {
}
}
if self.seeks_in_flight == 0
&& (self.stated_hold_on_at + HOLD_ON_HYSTERESIS
< self.config.read_ahead as i64 + self.ring_buffer_read_next_index)
{
// Allow the stream to carry on...
// We only do this if no seeks are in flight since a seek suggests we will have
// to reset our buffer shortly.
let next_hold_on_at =
self.config.read_ahead as i64 + self.ring_buffer_read_next_index;
send_bare_message(
&mut self.tx,
&StreamingReadRequest::Continue {
hold_on_at: next_hold_on_at,
},
)
.await?;
self.stated_hold_on_at = next_hold_on_at;
self.issue_readaheads().await?;
}
}
Some(StreamingReadResponse::Sought(seek)) => {
self.network_next_offset = seek;
assert_ne!(
self.seeks_in_flight, 0,
"Supposedly no seeks in flight. This is a bug."
);
self.seeks_in_flight -= 1;
}
Some(StreamingReadResponse::Eof) => {
let _eof_at = self.network_next_offset;
// Handling EOF means completing commands with as much data as they can get...
Some(DataResponse::Error { code, message }) => {
// should probably bubble this up to the reads...
todo!()
}
None => {
@ -285,14 +388,23 @@ impl StreamingReaderState {
/// That's why we send them through a channel.
async fn stream_reader(
mut rx: RecvStream,
sender: Sender<StreamingReadResponse>,
sender: Sender<DataResponse<Vec<u8>>>,
rx_disposal: OneshotSender<RecvStream>,
mut time_to_close: OneshotReceiver<()>,
) -> anyhow::Result<()> {
loop {
let msg: Option<DataResponse<StreamingReadResponse>> = read_bare_message(&mut rx).await?;
let message = tokio::select! {
message = read_bare_message(&mut rx) => {
message
},
_ = &mut time_to_close => {
return Ok(());
}
};
let msg: Option<DataResponse<Vec<u8>>> = message?;
match msg {
Some(DataResponse::Success(response)) => {
let is_stopping = &response == &StreamingReadResponse::Stopped;
Some(response) => {
let is_stopping = false; // TODO!!!
sender.send(response).await?;
if is_stopping {
rx_disposal
@ -301,10 +413,6 @@ async fn stream_reader(
break;
}
}
Some(DataResponse::Error { code, message }) => {
// TODO we probably want to bubble this error up to the read requests somehow.
bail!("stream_reader: Error from remote: {:?} {:?}", code, message);
}
None => {
// We never run the stream dry ourselves, so this is odd.
bail!("Unexpected end of stream (stream_reader).");

View File

@ -157,28 +157,3 @@ pub struct DirectoryEntry {
pub kind: FileKind,
pub name: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StreamingReadRequest {
/// Unblocks streaming up until a certain position.
/// This is the mechanism used to apply backpressure.
Continue { hold_on_at: i64 },
/// Please move to a different position in the file and read at least this much for the first
/// block. Usually will be followed up by a 'Continue' request.
Seek { offset: i64, size: u32 },
/// Please stop. Further messages are not StreamingReadRequests.
Stop,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum StreamingReadResponse {
/// This is what 'streams' — sequential blocks of data
Block(Vec<u8>),
/// The end of the file has been reached
Eof,
/// Stream moved due to request. Here is the actual position.
Sought(i64),
/// Stream stopped as requested.
/// Further messages are not StreamingReadResponses.
Stopped,
}