Polish off streaming reader support

This commit is contained in:
Olivier 'reivilibre' 2022-01-23 21:40:11 +00:00
parent 86d94e62fa
commit 3b9f9ab7cd
7 changed files with 88 additions and 22 deletions

1
Cargo.lock generated
View File

@ -845,6 +845,7 @@ dependencies = [
"anyhow",
"bare-metrics-recorder",
"clap",
"dashmap 5.0.0",
"env_logger",
"fuser",
"libc",

View File

@ -16,6 +16,7 @@ tracing-futures = { version = "0.2.5", features = ["tokio"] }
# Asynchronous
tokio = { version = "1.15.0", features = ["full"] }
dashmap = "5.0.0"
# Serialisation
serde = { version = "1.0.133", features = ["derive", "rc"] }

View File

@ -6,6 +6,8 @@ use std::sync::Arc;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OlivefsClientConfiguration {
pub connection: ConnectionConfiguration,
#[serde(default)]
pub performance: PerformanceConfiguration,
}
@ -34,7 +36,7 @@ pub struct ConnectionConfiguration {
pub keep_alive: u32,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct PerformanceConfiguration {
/// If specified, we will use a streaming reader to improve read performance.
#[serde(default)]

View File

@ -9,6 +9,7 @@ use std::collections::HashMap;
use std::ffi::OsStr;
use std::future::Future;
use crate::configuration::StreamingReaderConfig;
use crate::Requester;
use anyhow::{anyhow, Context};
use fuser::consts::FOPEN_DIRECT_IO;
@ -24,7 +25,9 @@ pub mod encryption;
pub const ROOT_INODE: u64 = 0x01;
pub struct OliveFilesystemSettings {}
pub struct OliveFilesystemSettings {
pub streaming_reader: Option<Arc<StreamingReaderConfig>>,
}
// TODO support multiple filesystems per FUSE fs so that operations between multiple of them
// can be made efficient.
@ -301,7 +304,7 @@ impl Filesystem for OliveFilesystem {
/// structure in <fuse_common.h> for more details.
fn open(&mut self, _req: &Request<'_>, ino: u64, flags: i32, reply: ReplyOpen) {
let requester = self.requester.clone();
// let buffers = self.buffers.clone();
let streaming_reader_opts = self.settings.streaming_reader.clone();
// TODO This needs more careful consideration before using to write files
// How does truncation work?
@ -350,6 +353,21 @@ impl Filesystem for OliveFilesystem {
// rather than letting the kernel assume the getattr size is valid
// and caching up to that point.
// We might wind up wanting to do our own buffering...
if let Some(config) = streaming_reader_opts {
requester
.start_streaming_reader(file_handle, config)
.await?;
debug!(
"open(ino: {:#x?}) = fh {:?} (streaming reader: yes)",
ino, file_handle
);
}
debug!(
"open(ino: {:#x?}) = fh {:?} (streaming reader: no)",
ino, file_handle
);
reply.opened(file_handle as u64, FOPEN_DIRECT_IO);
}
DataResponse::Error { code, message } => {

View File

@ -64,7 +64,9 @@ async fn main() -> anyhow::Result<()> {
requester: Arc::new(requester),
buffers: Default::default(),
tokio_runtime: Handle::current(),
settings: OliveFilesystemSettings {},
settings: OliveFilesystemSettings {
streaming_reader: config.performance.streaming_reader.clone(),
},
};
// Convenience: try to unmount the filesystem at that path if there is one

View File

@ -2,16 +2,20 @@ use log::info;
use quinn::{Endpoint, RecvStream, SendStream};
use std::future::Future;
use anyhow::anyhow;
use anyhow::{anyhow, bail};
use rustls::{PrivateKey, RootCertStore};
use serde::de::DeserializeOwned;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use crate::configuration::StreamingReaderConfig;
use crate::requester::streaming_reader::StreamingReader;
use olivefs_common::io::read_file;
use olivefs_common::messages::{
DataCommand, DataResponse, DirectoryEntry, FileMetadata, OpenMode, VnodeId,
@ -25,6 +29,7 @@ pub mod streaming_reader;
pub struct Requester {
internal: RequesterInternal,
streaming_readers: DashMap<u32, StreamingReader>,
}
pub struct RequesterInternal {
@ -152,7 +157,10 @@ impl RequesterInternal {
impl Requester {
pub fn new(internal: RequesterInternal) -> Requester {
Requester { internal }
Requester {
internal,
streaming_readers: Default::default(),
}
}
pub async fn getattr(&self, vnode: VnodeId) -> anyhow::Result<DataResponse<FileMetadata>> {
@ -195,9 +203,15 @@ impl Requester {
}
pub async fn release(&self, file_handle: u32) -> anyhow::Result<DataResponse<()>> {
self.internal
.command(&DataCommand::ReleaseFile { file_handle })
.await
if let Some((_, streaming_reader)) = self.streaming_readers.remove(&file_handle) {
let (tx, rx) = streaming_reader.close().await?;
self.internal.release_stream_to_pool(tx, rx).await?;
Ok(DataResponse::Success(()))
} else {
self.internal
.command(&DataCommand::ReleaseFile { file_handle })
.await
}
}
pub async fn read(
@ -206,12 +220,33 @@ impl Requester {
offset: i64,
size: u32,
) -> anyhow::Result<DataResponse<Vec<u8>>> {
self.internal
.command(&DataCommand::ReadFile {
file_handle,
offset,
size,
})
.await
if let Some(streaming_reader) = self.streaming_readers.get(&file_handle) {
streaming_reader.read(offset, size).await
} else {
self.internal
.command(&DataCommand::ReadFile {
file_handle,
offset,
size,
})
.await
}
}
pub async fn start_streaming_reader(
&self,
file_handle: u32,
config: Arc<StreamingReaderConfig>,
) -> anyhow::Result<()> {
match self.streaming_readers.entry(file_handle) {
Entry::Occupied(_oe) => {
bail!("Streaming reader already started for FH{:?}", file_handle);
}
Entry::Vacant(ve) => {
let (tx, rx) = self.internal.acquire_stream_from_pool().await?;
ve.insert(StreamingReader::new(file_handle, tx, rx, config));
}
}
Ok(())
}
}

View File

@ -32,7 +32,7 @@ pub struct StreamingReaderState {
pub responses: Receiver<DataResponse<Vec<u8>>>,
pub command_rx: Receiver<StreamingReaderCmd>,
pub rx_retrieval: OneshotReceiver<RecvStream>,
pub read_queue: VecDeque<(Range<i64>, OneshotSender<Vec<u8>>)>,
pub read_queue: VecDeque<(Range<i64>, OneshotSender<DataResponse<Vec<u8>>>)>,
pub tx: SendStream,
pub close_command_reply: Option<OneshotSender<(SendStream, RecvStream)>>,
pub running: bool,
@ -67,7 +67,7 @@ pub enum StreamingReaderCmd {
Read {
offset: i64,
size: u32,
reply: OneshotSender<Vec<u8>>,
reply: OneshotSender<DataResponse<Vec<u8>>>,
},
Close {
/// The response gives the stream back to the application so it can be put back into the
@ -100,7 +100,7 @@ impl StreamingReader {
}
}
pub async fn read(&self, offset: i64, size: u32) -> anyhow::Result<Vec<u8>> {
pub async fn read(&self, offset: i64, size: u32) -> anyhow::Result<DataResponse<Vec<u8>>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.command_queue_tx
.send(StreamingReaderCmd::Read {
@ -209,7 +209,7 @@ impl StreamingReaderState {
fn satisfy_read(
&mut self,
wanted_byte_range: Range<i64>,
reply_to: OneshotSender<Vec<u8>>,
reply_to: OneshotSender<DataResponse<Vec<u8>>>,
) -> anyhow::Result<()> {
let skip = wanted_byte_range.start - self.ring_buffer_read_next_index;
let size = wanted_byte_range.end - wanted_byte_range.start;
@ -222,7 +222,7 @@ impl StreamingReaderState {
.collect();
reply_to
.send(response_buf)
.send(DataResponse::Success(response_buf))
.map_err(|_| anyhow!("Reply to Read failed"))?;
self.ring_buffer_read_next_index += drain;
@ -373,7 +373,14 @@ impl StreamingReaderState {
}
Some(DataResponse::Error { code, message }) => {
// should probably bubble this up to the reads...
todo!()
for (_, reply) in self.read_queue.drain(..) {
reply
.send(DataResponse::Error {
code,
message: message.clone(),
})
.map_err(|_| anyhow!("Can't reply to in-flight read with error"))?
}
}
None => {
bail!("Cascade failure: stream reader suddenly shut down.");