From 3b9f9ab7cd527f8c6ddb7b89f11ffdf86cea15fe Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 23 Jan 2022 21:40:11 +0000 Subject: [PATCH] Polish off streaming reader support --- Cargo.lock | 1 + olivefs/Cargo.toml | 1 + olivefs/src/configuration.rs | 4 +- olivefs/src/filesystem.rs | 22 ++++++++- olivefs/src/main.rs | 4 +- olivefs/src/requester.rs | 59 ++++++++++++++++++----- olivefs/src/requester/streaming_reader.rs | 19 +++++--- 7 files changed, 88 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d747d89..cf14da6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -845,6 +845,7 @@ dependencies = [ "anyhow", "bare-metrics-recorder", "clap", + "dashmap 5.0.0", "env_logger", "fuser", "libc", diff --git a/olivefs/Cargo.toml b/olivefs/Cargo.toml index 6425b15..1861448 100644 --- a/olivefs/Cargo.toml +++ b/olivefs/Cargo.toml @@ -16,6 +16,7 @@ tracing-futures = { version = "0.2.5", features = ["tokio"] } # Asynchronous tokio = { version = "1.15.0", features = ["full"] } +dashmap = "5.0.0" # Serialisation serde = { version = "1.0.133", features = ["derive", "rc"] } diff --git a/olivefs/src/configuration.rs b/olivefs/src/configuration.rs index 9f0b1aa..3c08220 100644 --- a/olivefs/src/configuration.rs +++ b/olivefs/src/configuration.rs @@ -6,6 +6,8 @@ use std::sync::Arc; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct OlivefsClientConfiguration { pub connection: ConnectionConfiguration, + + #[serde(default)] pub performance: PerformanceConfiguration, } @@ -34,7 +36,7 @@ pub struct ConnectionConfiguration { pub keep_alive: u32, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct PerformanceConfiguration { /// If specified, we will use a streaming reader to improve read performance. #[serde(default)] diff --git a/olivefs/src/filesystem.rs b/olivefs/src/filesystem.rs index 6fc5c0e..ee77518 100644 --- a/olivefs/src/filesystem.rs +++ b/olivefs/src/filesystem.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use std::ffi::OsStr; use std::future::Future; +use crate::configuration::StreamingReaderConfig; use crate::Requester; use anyhow::{anyhow, Context}; use fuser::consts::FOPEN_DIRECT_IO; @@ -24,7 +25,9 @@ pub mod encryption; pub const ROOT_INODE: u64 = 0x01; -pub struct OliveFilesystemSettings {} +pub struct OliveFilesystemSettings { + pub streaming_reader: Option>, +} // TODO support multiple filesystems per FUSE fs so that operations between multiple of them // can be made efficient. @@ -301,7 +304,7 @@ impl Filesystem for OliveFilesystem { /// structure in for more details. fn open(&mut self, _req: &Request<'_>, ino: u64, flags: i32, reply: ReplyOpen) { let requester = self.requester.clone(); - // let buffers = self.buffers.clone(); + let streaming_reader_opts = self.settings.streaming_reader.clone(); // TODO This needs more careful consideration before using to write files // How does truncation work? @@ -350,6 +353,21 @@ impl Filesystem for OliveFilesystem { // rather than letting the kernel assume the getattr size is valid // and caching up to that point. // We might wind up wanting to do our own buffering... + + if let Some(config) = streaming_reader_opts { + requester + .start_streaming_reader(file_handle, config) + .await?; + debug!( + "open(ino: {:#x?}) = fh {:?} (streaming reader: yes)", + ino, file_handle + ); + } + debug!( + "open(ino: {:#x?}) = fh {:?} (streaming reader: no)", + ino, file_handle + ); + reply.opened(file_handle as u64, FOPEN_DIRECT_IO); } DataResponse::Error { code, message } => { diff --git a/olivefs/src/main.rs b/olivefs/src/main.rs index 39607de..4d0afee 100644 --- a/olivefs/src/main.rs +++ b/olivefs/src/main.rs @@ -64,7 +64,9 @@ async fn main() -> anyhow::Result<()> { requester: Arc::new(requester), buffers: Default::default(), tokio_runtime: Handle::current(), - settings: OliveFilesystemSettings {}, + settings: OliveFilesystemSettings { + streaming_reader: config.performance.streaming_reader.clone(), + }, }; // Convenience: try to unmount the filesystem at that path if there is one diff --git a/olivefs/src/requester.rs b/olivefs/src/requester.rs index 6d0adc3..ac24606 100644 --- a/olivefs/src/requester.rs +++ b/olivefs/src/requester.rs @@ -2,16 +2,20 @@ use log::info; use quinn::{Endpoint, RecvStream, SendStream}; use std::future::Future; -use anyhow::anyhow; +use anyhow::{anyhow, bail}; use rustls::{PrivateKey, RootCertStore}; use serde::de::DeserializeOwned; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; use std::net::SocketAddr; use std::path::Path; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use crate::configuration::StreamingReaderConfig; +use crate::requester::streaming_reader::StreamingReader; use olivefs_common::io::read_file; use olivefs_common::messages::{ DataCommand, DataResponse, DirectoryEntry, FileMetadata, OpenMode, VnodeId, @@ -25,6 +29,7 @@ pub mod streaming_reader; pub struct Requester { internal: RequesterInternal, + streaming_readers: DashMap, } pub struct RequesterInternal { @@ -152,7 +157,10 @@ impl RequesterInternal { impl Requester { pub fn new(internal: RequesterInternal) -> Requester { - Requester { internal } + Requester { + internal, + streaming_readers: Default::default(), + } } pub async fn getattr(&self, vnode: VnodeId) -> anyhow::Result> { @@ -195,9 +203,15 @@ impl Requester { } pub async fn release(&self, file_handle: u32) -> anyhow::Result> { - self.internal - .command(&DataCommand::ReleaseFile { file_handle }) - .await + if let Some((_, streaming_reader)) = self.streaming_readers.remove(&file_handle) { + let (tx, rx) = streaming_reader.close().await?; + self.internal.release_stream_to_pool(tx, rx).await?; + Ok(DataResponse::Success(())) + } else { + self.internal + .command(&DataCommand::ReleaseFile { file_handle }) + .await + } } pub async fn read( @@ -206,12 +220,33 @@ impl Requester { offset: i64, size: u32, ) -> anyhow::Result>> { - self.internal - .command(&DataCommand::ReadFile { - file_handle, - offset, - size, - }) - .await + if let Some(streaming_reader) = self.streaming_readers.get(&file_handle) { + streaming_reader.read(offset, size).await + } else { + self.internal + .command(&DataCommand::ReadFile { + file_handle, + offset, + size, + }) + .await + } + } + + pub async fn start_streaming_reader( + &self, + file_handle: u32, + config: Arc, + ) -> anyhow::Result<()> { + match self.streaming_readers.entry(file_handle) { + Entry::Occupied(_oe) => { + bail!("Streaming reader already started for FH{:?}", file_handle); + } + Entry::Vacant(ve) => { + let (tx, rx) = self.internal.acquire_stream_from_pool().await?; + ve.insert(StreamingReader::new(file_handle, tx, rx, config)); + } + } + Ok(()) } } diff --git a/olivefs/src/requester/streaming_reader.rs b/olivefs/src/requester/streaming_reader.rs index 9abb415..0c2d04e 100644 --- a/olivefs/src/requester/streaming_reader.rs +++ b/olivefs/src/requester/streaming_reader.rs @@ -32,7 +32,7 @@ pub struct StreamingReaderState { pub responses: Receiver>>, pub command_rx: Receiver, pub rx_retrieval: OneshotReceiver, - pub read_queue: VecDeque<(Range, OneshotSender>)>, + pub read_queue: VecDeque<(Range, OneshotSender>>)>, pub tx: SendStream, pub close_command_reply: Option>, pub running: bool, @@ -67,7 +67,7 @@ pub enum StreamingReaderCmd { Read { offset: i64, size: u32, - reply: OneshotSender>, + reply: OneshotSender>>, }, Close { /// The response gives the stream back to the application so it can be put back into the @@ -100,7 +100,7 @@ impl StreamingReader { } } - pub async fn read(&self, offset: i64, size: u32) -> anyhow::Result> { + pub async fn read(&self, offset: i64, size: u32) -> anyhow::Result>> { let (reply_tx, reply_rx) = oneshot::channel(); self.command_queue_tx .send(StreamingReaderCmd::Read { @@ -209,7 +209,7 @@ impl StreamingReaderState { fn satisfy_read( &mut self, wanted_byte_range: Range, - reply_to: OneshotSender>, + reply_to: OneshotSender>>, ) -> anyhow::Result<()> { let skip = wanted_byte_range.start - self.ring_buffer_read_next_index; let size = wanted_byte_range.end - wanted_byte_range.start; @@ -222,7 +222,7 @@ impl StreamingReaderState { .collect(); reply_to - .send(response_buf) + .send(DataResponse::Success(response_buf)) .map_err(|_| anyhow!("Reply to Read failed"))?; self.ring_buffer_read_next_index += drain; @@ -373,7 +373,14 @@ impl StreamingReaderState { } Some(DataResponse::Error { code, message }) => { // should probably bubble this up to the reads... - todo!() + for (_, reply) in self.read_queue.drain(..) { + reply + .send(DataResponse::Error { + code, + message: message.clone(), + }) + .map_err(|_| anyhow!("Can't reply to in-flight read with error"))? + } } None => { bail!("Cascade failure: stream reader suddenly shut down.");