From 01a68a6f6fd02373136dd51a11db7dd9bdc4d3ed Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 23 Jan 2022 16:55:57 +0000 Subject: [PATCH] Sketching out of the client half of streaming reading --- olivefs/Cargo.toml | 2 +- olivefs/src/configuration.rs | 18 ++ olivefs/src/requester.rs | 2 + olivefs/src/requester/streaming_reader.rs | 241 ++++++++++++++++++++++ olivefs_common/src/messages.rs | 29 +++ olivefsd/src/server/connections.rs | 6 + 6 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 olivefs/src/requester/streaming_reader.rs diff --git a/olivefs/Cargo.toml b/olivefs/Cargo.toml index 5721ed2..6425b15 100644 --- a/olivefs/Cargo.toml +++ b/olivefs/Cargo.toml @@ -18,7 +18,7 @@ tracing-futures = { version = "0.2.5", features = ["tokio"] } tokio = { version = "1.15.0", features = ["full"] } # Serialisation -serde = { version = "1.0.133", features = ["derive"] } +serde = { version = "1.0.133", features = ["derive", "rc"] } serde_bare = "0.5.0" toml = "0.5.8" clap = { version = "3.0.7", features = ["derive"] } diff --git a/olivefs/src/configuration.rs b/olivefs/src/configuration.rs index 5c9ab23..5498e90 100644 --- a/olivefs/src/configuration.rs +++ b/olivefs/src/configuration.rs @@ -1,10 +1,12 @@ use serde::Deserialize; use serde::Serialize; use std::path::PathBuf; +use std::sync::Arc; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct OlivefsClientConfiguration { pub connection: ConnectionConfiguration, + pub performance: PerformanceConfiguration, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -31,3 +33,19 @@ pub struct ConnectionConfiguration { /// Keep-alive in seconds pub keep_alive: u32, } + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PerformanceConfiguration { + /// If specified, we will use a streaming reader to improve read performance. + #[serde(default)] + pub streaming_reader: Option>, // TODO pipelined writer? +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct StreamingReaderConfig { + /// How many bytes we will pre-emptively read before they are requested. + /// A larger number will benefit sequential reads, but requires more data to be transmitted + /// needlessly for random reads. This setting also influences memory usage. + /// This might be configurable per-filetype in the future. + pub read_ahead: u32, +} diff --git a/olivefs/src/requester.rs b/olivefs/src/requester.rs index 986f2fd..6d0adc3 100644 --- a/olivefs/src/requester.rs +++ b/olivefs/src/requester.rs @@ -21,6 +21,8 @@ use olivefs_common::networking::{ }; use tokio::sync::Mutex; +pub mod streaming_reader; + pub struct Requester { internal: RequesterInternal, } diff --git a/olivefs/src/requester/streaming_reader.rs b/olivefs/src/requester/streaming_reader.rs new file mode 100644 index 0000000..6061f9f --- /dev/null +++ b/olivefs/src/requester/streaming_reader.rs @@ -0,0 +1,241 @@ +use anyhow::{anyhow, bail}; + +use olivefs_common::messages::{DataResponse, StreamingReadRequest, StreamingReadResponse}; +use olivefs_common::networking::{read_bare_message, send_bare_message}; +use quinn::{RecvStream, SendStream}; +use std::collections::VecDeque; +use std::ops::Range; +use std::sync::Arc; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::oneshot::Receiver as OneshotReceiver; +use tokio::sync::oneshot::Sender as OneshotSender; + +use crate::configuration::StreamingReaderConfig; + +// We could use a B-Tree of Spans → bytes +// and then use a pseudo-LRU (e.g. tree PLRU) to decide which old ones to get rid of. +// But when the common case is a sequential read (e.g. media files), that's a lot of aggravation +// when we could just use a ring buffer. + +// The streaming reader is therefore only designed to work with sequential reads. It will discard +// data as soon as it doesn't look like it will be read by a sequential reader! + +/// We won't send a different hold_on_at payload until it would unlock at least this many bytes. +pub const HOLD_ON_HYSTERESIS: i64 = 8192; + +pub struct StreamingReaderState { + pub ring_buffer: VecDeque, + /// The next 'read' index in the ring buffer + pub ring_buffer_read_next_index: i64, + /// The next 'write' index in the ring buffer + pub ring_buffer_write_next_index: i64, + /// The next offset we will read from the network + pub network_next_offset: i64, + /// The `hold_on_at` position we have sent to the server. + pub stated_hold_on_at: i64, + /// The next offset we expect a sequential reader to command us to fetch + pub command_next_offset: i64, + pub accept_more_responses: bool, + pub commands_in_flight: u32, + pub command_limit: u32, + pub command_rx: Receiver, + pub responses: Receiver, + pub running: bool, + pub rx_retrieval: OneshotReceiver, + pub read_queue: VecDeque<(Range, OneshotSender>)>, + pub tx: SendStream, + pub close_command_reply: Option>, + pub config: Arc, +} + +pub enum StreamingReaderCmd { + Read { + offset: i64, + size: u32, + reply: OneshotSender>, + }, + Close { + /// The response gives the stream back to the application so it can be put back into the + /// pool. + reply: OneshotSender<(SendStream, RecvStream)>, + }, +} + +pub struct StreamingReader { + command_queue_tx: Sender, + // TODO ... commands_in_flight_semaphore: Sem +} + +pub async fn streaming_reader(_command_queue: Receiver) -> anyhow::Result<()> { + todo!() +} + +impl StreamingReader { + pub async fn read(&self, _offset: i64, _size: u32) -> anyhow::Result> { + todo!() + } + + pub async fn close(&self) -> anyhow::Result<(SendStream, RecvStream)> { + todo!() + } +} + +impl StreamingReaderState { + pub async fn command_handler(mut self) -> anyhow::Result<()> { + while self.running { + tokio::select! { + command = self.command_rx.recv(), if self.commands_in_flight < self.command_limit => { + self.process_command(command).await?; + } + response = self.responses.recv() => { + self.process_response(response).await?; + } + } + + if self.stated_hold_on_at + HOLD_ON_HYSTERESIS + < self.config.read_ahead as i64 + self.ring_buffer_read_next_index + {} + } + + // TODO check command queue is empty... + + let rx = self.rx_retrieval.await?; + if let Some(stream_disposal) = self.close_command_reply.take() { + stream_disposal + .send((self.tx, rx)) + .map_err(|_| anyhow!("Failed to return stream afterwards..."))?; + } + + Ok(()) + } + + fn available_byte_range(&self) -> Range { + let offset = self.ring_buffer_read_next_index; + let number: i64 = self.ring_buffer.len().try_into().unwrap(); + offset..(offset + number) + } + + fn is_directly_satisfiable(&self, wanted_range: Range) -> bool { + let available = self.available_byte_range(); + available.start <= wanted_range.start && available.end >= wanted_range.end + } + + async fn process_command(&mut self, command: Option) -> anyhow::Result<()> { + match command { + Some(StreamingReaderCmd::Read { + offset, + size, + reply, + }) => { + let wanted_range = offset..(offset + size as i64); + if self.is_directly_satisfiable(wanted_range.clone()) { + // If we can, answer right away. + // If we do, make sure we trigger more streaming. + + let skip = offset - self.ring_buffer_read_next_index; + let drain = skip + size as i64; + + let response_buf: Vec = self + .ring_buffer + .drain(0..(drain as usize)) + .skip(skip.try_into().unwrap()) + .collect(); + + reply + .send(response_buf) + .map_err(|_| anyhow!("Reply to Read failed"))?; + + self.ring_buffer_read_next_index += drain; + } else { + // Otherwise, plop it on the queue. + self.read_queue.push_back((wanted_range, reply)); + + if self.command_next_offset != offset { + // We should also request a seek as this isn't a sequential read... + send_bare_message( + &mut self.tx, + &StreamingReadRequest::Seek { offset, size }, + ) + .await?; + } + } + } + Some(StreamingReaderCmd::Close { reply }) => { + self.close_command_reply = Some(reply); + + // Ensure that we don't accept any more commands. + // (This also prevents us repeatedly reading None...) + self.commands_in_flight = u32::MAX; + + // Instruct the stream to shut down. + send_bare_message(&mut self.tx, &StreamingReadRequest::Stop).await?; + } + None => { + bail!("Command sender shut down unexpectedly."); + } + } + Ok(()) + } + + async fn process_response( + &mut self, + response: Option, + ) -> anyhow::Result<()> { + match response { + Some(StreamingReadResponse::Stopped) => { + self.running = false; + // TODO cancel existing commands if there are any? + } + Some(StreamingReadResponse::Block(_block)) => { + // We might be able to complete a command + todo!() + } + Some(StreamingReadResponse::Sought(seek)) => { + self.network_next_offset = seek; + } + Some(StreamingReadResponse::Eof) => { + let _eof_at = self.network_next_offset; + // Handling EOF means completing commands with as much data as they can get... + todo!() + } + None => { + bail!("Cascade failure: stream reader suddenly shut down."); + } + } + Ok(()) + } +} + +/// Reading messages from a RecvStream uses read_exact which is not cancellation safe, so it can't +/// be used in tokio::select! directly. +/// That's why we send them through a channel. +async fn stream_reader( + mut rx: RecvStream, + sender: Sender, + rx_disposal: OneshotSender, +) -> anyhow::Result<()> { + loop { + let msg: Option> = read_bare_message(&mut rx).await?; + match msg { + Some(DataResponse::Success(response)) => { + let is_stopping = &response == &StreamingReadResponse::Stopped; + sender.send(response).await?; + if is_stopping { + rx_disposal + .send(rx) + .map_err(|_| anyhow!("Failed to give back rx"))?; + break; + } + } + Some(DataResponse::Error { code, message }) => { + // TODO we probably want to bubble this error up to the read requests somehow. + bail!("stream_reader: Error from remote: {:?} {:?}", code, message); + } + None => { + // We never run the stream dry ourselves, so this is odd. + bail!("Unexpected end of stream (stream_reader)."); + } + } + } + Ok(()) +} diff --git a/olivefs_common/src/messages.rs b/olivefs_common/src/messages.rs index 3c4931d..c00efed 100644 --- a/olivefs_common/src/messages.rs +++ b/olivefs_common/src/messages.rs @@ -66,6 +66,10 @@ pub enum DataCommand { offset: i64, size: u32, }, + StreamingRead { + file_handle: u32, + offset: i64, + }, } pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'static {} @@ -153,3 +157,28 @@ pub struct DirectoryEntry { pub kind: FileKind, pub name: String, } + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum StreamingReadRequest { + /// Unblocks streaming up until a certain position. + /// This is the mechanism used to apply backpressure. + Continue { hold_on_at: i64 }, + /// Please move to a different position in the file and read at least this much for the first + /// block. Usually will be followed up by a 'Continue' request. + Seek { offset: i64, size: u32 }, + /// Please stop. Further messages are not StreamingReadRequests. + Stop, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum StreamingReadResponse { + /// This is what 'streams' — sequential blocks of data + Block(Vec), + /// The end of the file has been reached + Eof, + /// Stream moved due to request. Here is the actual position. + Sought(i64), + /// Stream stopped as requested. + /// Further messages are not StreamingReadResponses. + Stopped, +} diff --git a/olivefsd/src/server/connections.rs b/olivefsd/src/server/connections.rs index 0e0edb4..2b31c12 100644 --- a/olivefsd/src/server/connections.rs +++ b/olivefsd/src/server/connections.rs @@ -62,6 +62,12 @@ pub async fn handle_command_stream( ) .await?; } + DataCommand::StreamingRead { + file_handle, + offset, + } => { + todo!() + } } }