Sketching out of the client half of streaming reading

This commit is contained in:
Olivier 'reivilibre' 2022-01-23 16:55:57 +00:00
parent e3621aacb8
commit 01a68a6f6f
6 changed files with 297 additions and 1 deletions

View File

@ -18,7 +18,7 @@ tracing-futures = { version = "0.2.5", features = ["tokio"] }
tokio = { version = "1.15.0", features = ["full"] }
# Serialisation
serde = { version = "1.0.133", features = ["derive"] }
serde = { version = "1.0.133", features = ["derive", "rc"] }
serde_bare = "0.5.0"
toml = "0.5.8"
clap = { version = "3.0.7", features = ["derive"] }

View File

@ -1,10 +1,12 @@
use serde::Deserialize;
use serde::Serialize;
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OlivefsClientConfiguration {
pub connection: ConnectionConfiguration,
pub performance: PerformanceConfiguration,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -31,3 +33,19 @@ pub struct ConnectionConfiguration {
/// Keep-alive in seconds
pub keep_alive: u32,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PerformanceConfiguration {
/// If specified, we will use a streaming reader to improve read performance.
#[serde(default)]
pub streaming_reader: Option<Arc<StreamingReaderConfig>>, // TODO pipelined writer?
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StreamingReaderConfig {
/// How many bytes we will pre-emptively read before they are requested.
/// A larger number will benefit sequential reads, but requires more data to be transmitted
/// needlessly for random reads. This setting also influences memory usage.
/// This might be configurable per-filetype in the future.
pub read_ahead: u32,
}

View File

@ -21,6 +21,8 @@ use olivefs_common::networking::{
};
use tokio::sync::Mutex;
pub mod streaming_reader;
pub struct Requester {
internal: RequesterInternal,
}

View File

@ -0,0 +1,241 @@
use anyhow::{anyhow, bail};
use olivefs_common::messages::{DataResponse, StreamingReadRequest, StreamingReadResponse};
use olivefs_common::networking::{read_bare_message, send_bare_message};
use quinn::{RecvStream, SendStream};
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot::Receiver as OneshotReceiver;
use tokio::sync::oneshot::Sender as OneshotSender;
use crate::configuration::StreamingReaderConfig;
// We could use a B-Tree of Spans → bytes
// and then use a pseudo-LRU (e.g. tree PLRU) to decide which old ones to get rid of.
// But when the common case is a sequential read (e.g. media files), that's a lot of aggravation
// when we could just use a ring buffer.
// The streaming reader is therefore only designed to work with sequential reads. It will discard
// data as soon as it doesn't look like it will be read by a sequential reader!
/// We won't send a different hold_on_at payload until it would unlock at least this many bytes.
pub const HOLD_ON_HYSTERESIS: i64 = 8192;
pub struct StreamingReaderState {
pub ring_buffer: VecDeque<u8>,
/// The next 'read' index in the ring buffer
pub ring_buffer_read_next_index: i64,
/// The next 'write' index in the ring buffer
pub ring_buffer_write_next_index: i64,
/// The next offset we will read from the network
pub network_next_offset: i64,
/// The `hold_on_at` position we have sent to the server.
pub stated_hold_on_at: i64,
/// The next offset we expect a sequential reader to command us to fetch
pub command_next_offset: i64,
pub accept_more_responses: bool,
pub commands_in_flight: u32,
pub command_limit: u32,
pub command_rx: Receiver<StreamingReaderCmd>,
pub responses: Receiver<StreamingReadResponse>,
pub running: bool,
pub rx_retrieval: OneshotReceiver<RecvStream>,
pub read_queue: VecDeque<(Range<i64>, OneshotSender<Vec<u8>>)>,
pub tx: SendStream,
pub close_command_reply: Option<OneshotSender<(SendStream, RecvStream)>>,
pub config: Arc<StreamingReaderConfig>,
}
pub enum StreamingReaderCmd {
Read {
offset: i64,
size: u32,
reply: OneshotSender<Vec<u8>>,
},
Close {
/// The response gives the stream back to the application so it can be put back into the
/// pool.
reply: OneshotSender<(SendStream, RecvStream)>,
},
}
pub struct StreamingReader {
command_queue_tx: Sender<StreamingReaderCmd>,
// TODO ... commands_in_flight_semaphore: Sem
}
pub async fn streaming_reader(_command_queue: Receiver<StreamingReaderCmd>) -> anyhow::Result<()> {
todo!()
}
impl StreamingReader {
pub async fn read(&self, _offset: i64, _size: u32) -> anyhow::Result<Vec<u8>> {
todo!()
}
pub async fn close(&self) -> anyhow::Result<(SendStream, RecvStream)> {
todo!()
}
}
impl StreamingReaderState {
pub async fn command_handler(mut self) -> anyhow::Result<()> {
while self.running {
tokio::select! {
command = self.command_rx.recv(), if self.commands_in_flight < self.command_limit => {
self.process_command(command).await?;
}
response = self.responses.recv() => {
self.process_response(response).await?;
}
}
if self.stated_hold_on_at + HOLD_ON_HYSTERESIS
< self.config.read_ahead as i64 + self.ring_buffer_read_next_index
{}
}
// TODO check command queue is empty...
let rx = self.rx_retrieval.await?;
if let Some(stream_disposal) = self.close_command_reply.take() {
stream_disposal
.send((self.tx, rx))
.map_err(|_| anyhow!("Failed to return stream afterwards..."))?;
}
Ok(())
}
fn available_byte_range(&self) -> Range<i64> {
let offset = self.ring_buffer_read_next_index;
let number: i64 = self.ring_buffer.len().try_into().unwrap();
offset..(offset + number)
}
fn is_directly_satisfiable(&self, wanted_range: Range<i64>) -> bool {
let available = self.available_byte_range();
available.start <= wanted_range.start && available.end >= wanted_range.end
}
async fn process_command(&mut self, command: Option<StreamingReaderCmd>) -> anyhow::Result<()> {
match command {
Some(StreamingReaderCmd::Read {
offset,
size,
reply,
}) => {
let wanted_range = offset..(offset + size as i64);
if self.is_directly_satisfiable(wanted_range.clone()) {
// If we can, answer right away.
// If we do, make sure we trigger more streaming.
let skip = offset - self.ring_buffer_read_next_index;
let drain = skip + size as i64;
let response_buf: Vec<u8> = self
.ring_buffer
.drain(0..(drain as usize))
.skip(skip.try_into().unwrap())
.collect();
reply
.send(response_buf)
.map_err(|_| anyhow!("Reply to Read failed"))?;
self.ring_buffer_read_next_index += drain;
} else {
// Otherwise, plop it on the queue.
self.read_queue.push_back((wanted_range, reply));
if self.command_next_offset != offset {
// We should also request a seek as this isn't a sequential read...
send_bare_message(
&mut self.tx,
&StreamingReadRequest::Seek { offset, size },
)
.await?;
}
}
}
Some(StreamingReaderCmd::Close { reply }) => {
self.close_command_reply = Some(reply);
// Ensure that we don't accept any more commands.
// (This also prevents us repeatedly reading None...)
self.commands_in_flight = u32::MAX;
// Instruct the stream to shut down.
send_bare_message(&mut self.tx, &StreamingReadRequest::Stop).await?;
}
None => {
bail!("Command sender shut down unexpectedly.");
}
}
Ok(())
}
async fn process_response(
&mut self,
response: Option<StreamingReadResponse>,
) -> anyhow::Result<()> {
match response {
Some(StreamingReadResponse::Stopped) => {
self.running = false;
// TODO cancel existing commands if there are any?
}
Some(StreamingReadResponse::Block(_block)) => {
// We might be able to complete a command
todo!()
}
Some(StreamingReadResponse::Sought(seek)) => {
self.network_next_offset = seek;
}
Some(StreamingReadResponse::Eof) => {
let _eof_at = self.network_next_offset;
// Handling EOF means completing commands with as much data as they can get...
todo!()
}
None => {
bail!("Cascade failure: stream reader suddenly shut down.");
}
}
Ok(())
}
}
/// Reading messages from a RecvStream uses read_exact which is not cancellation safe, so it can't
/// be used in tokio::select! directly.
/// That's why we send them through a channel.
async fn stream_reader(
mut rx: RecvStream,
sender: Sender<StreamingReadResponse>,
rx_disposal: OneshotSender<RecvStream>,
) -> anyhow::Result<()> {
loop {
let msg: Option<DataResponse<StreamingReadResponse>> = read_bare_message(&mut rx).await?;
match msg {
Some(DataResponse::Success(response)) => {
let is_stopping = &response == &StreamingReadResponse::Stopped;
sender.send(response).await?;
if is_stopping {
rx_disposal
.send(rx)
.map_err(|_| anyhow!("Failed to give back rx"))?;
break;
}
}
Some(DataResponse::Error { code, message }) => {
// TODO we probably want to bubble this error up to the read requests somehow.
bail!("stream_reader: Error from remote: {:?} {:?}", code, message);
}
None => {
// We never run the stream dry ourselves, so this is odd.
bail!("Unexpected end of stream (stream_reader).");
}
}
}
Ok(())
}

View File

@ -66,6 +66,10 @@ pub enum DataCommand {
offset: i64,
size: u32,
},
StreamingRead {
file_handle: u32,
offset: i64,
},
}
pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'static {}
@ -153,3 +157,28 @@ pub struct DirectoryEntry {
pub kind: FileKind,
pub name: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StreamingReadRequest {
/// Unblocks streaming up until a certain position.
/// This is the mechanism used to apply backpressure.
Continue { hold_on_at: i64 },
/// Please move to a different position in the file and read at least this much for the first
/// block. Usually will be followed up by a 'Continue' request.
Seek { offset: i64, size: u32 },
/// Please stop. Further messages are not StreamingReadRequests.
Stop,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum StreamingReadResponse {
/// This is what 'streams' — sequential blocks of data
Block(Vec<u8>),
/// The end of the file has been reached
Eof,
/// Stream moved due to request. Here is the actual position.
Sought(i64),
/// Stream stopped as requested.
/// Further messages are not StreamingReadResponses.
Stopped,
}

View File

@ -62,6 +62,12 @@ pub async fn handle_command_stream(
)
.await?;
}
DataCommand::StreamingRead {
file_handle,
offset,
} => {
todo!()
}
}
}