diff --git a/olivefsd/src/server/connections.rs b/olivefsd/src/server/connections.rs index 67ac0fd..0e0edb4 100644 --- a/olivefsd/src/server/connections.rs +++ b/olivefsd/src/server/connections.rs @@ -1,5 +1,5 @@ use crate::server::file_access::{ - ClientInformation, ClientSpecificState, FileAccess, ServerwideState, + error_to_response, ClientInformation, ClientSpecificState, FileAccess, ServerwideState, }; use anyhow::Context; use futures_util::StreamExt; @@ -48,6 +48,20 @@ pub async fn handle_command_stream( DataCommand::ReleaseFile { file_handle } => { send_bare_message(&mut tx, &file_access.release(file_handle).await?).await?; } + DataCommand::ReadFile { + file_handle, + offset, + size, + } => { + send_bare_message( + &mut tx, + &file_access + .read(file_handle, offset, size) + .await + .unwrap_or_else(error_to_response), + ) + .await?; + } } } diff --git a/olivefsd/src/server/file_access.rs b/olivefsd/src/server/file_access.rs index da48fc7..5d12786 100644 --- a/olivefsd/src/server/file_access.rs +++ b/olivefsd/src/server/file_access.rs @@ -7,12 +7,14 @@ use path_absolutize::Absolutize; use slab::Slab; use std::borrow::Borrow; use std::collections::HashMap; +use std::io::SeekFrom; use std::os::unix::fs::{MetadataExt, PermissionsExt}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::fs::{OpenOptions, ReadDir}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::sync::RwLock; /// Server-wide state that all clients might need to mess with @@ -52,6 +54,9 @@ pub struct ClientSpecificState { pub struct FileHandle { /// Underlying file handle on our filesystem pub file: tokio::fs::File, + + /// Current offset + pub offset: u64, } pub struct DirHandle { @@ -86,6 +91,16 @@ fn io_error_to_response( } } +pub fn error_to_response(error: anyhow::Error) -> DataResponse { + match error.downcast() { + Ok(io_error) => io_error_to_response(io_error, EFAULT, "Failure"), + Err(original) => DataResponse::Error { + code: EFAULT, + message: format!("Failure ({:?})", original), + }, + } +} + impl FileAccess { async fn resolve_vnode(&self, vnode: VnodeId) -> Result> { let inode_map = self.client_state.inode_map.read().await; @@ -342,7 +357,8 @@ impl FileAccess { match open_options.open(&path).await { Ok(file) => { - let handle = Arc::new(RwLock::new(FileHandle { file })); + // TODO offset & append files: how does that work? + let handle = Arc::new(RwLock::new(FileHandle { file, offset: 0 })); let file_handle: u32 = self .client_state @@ -377,4 +393,62 @@ impl FileAccess { }) } } + + pub async fn read( + &self, + file_handle: u32, + offset: i64, + size: u32, + ) -> anyhow::Result>> { + let file_handle = file_handle as usize; + let file_handles = self.client_state.file_handles.read().await; + match file_handles.get(file_handle) { + Some(file_handle) => { + let offset: u64 = offset.try_into()?; + let mut handle = file_handle.write().await; + // TODO check this is valid for all modes including a+. + let starting_offset = if handle.offset != offset { + handle + .file + .seek(SeekFrom::Start(offset.try_into()?)) + .await? + } else { + handle.offset + }; + + handle.offset = starting_offset; + + if starting_offset != offset { + Ok(DataResponse::Success(Vec::with_capacity(0))) + } else { + // Read a full buffer if possible. + let mut target = vec![0u8; size as usize]; + let mut read_so_far = 0; + + while TryInto::::try_into(read_so_far)? < offset { + let how_many_read = handle.file.read(&mut target[read_so_far..]).await?; + read_so_far += how_many_read; + if how_many_read == 0 { + // This is end of file. + assert!( + target[read_so_far..].len() > 0, + "bug: reading to empty buffer" + ); + // Delete the excess + target.drain(read_so_far..); + break; + } + } + + handle.offset = starting_offset + TryInto::::try_into(read_so_far)?; + + Ok(DataResponse::Success(target)) + } + } + None => Ok(DataResponse::Error { + code: EBADFD, + message: format!("read(fh: {:#x?}): bad fd", file_handle), + }), + } + } }