Support read on the server side

This commit is contained in:
Olivier 'reivilibre' 2022-01-22 11:53:51 +00:00
parent 05def61ee6
commit a629041387
2 changed files with 90 additions and 2 deletions

View File

@ -1,5 +1,5 @@
use crate::server::file_access::{
ClientInformation, ClientSpecificState, FileAccess, ServerwideState,
error_to_response, ClientInformation, ClientSpecificState, FileAccess, ServerwideState,
};
use anyhow::Context;
use futures_util::StreamExt;
@ -48,6 +48,20 @@ pub async fn handle_command_stream(
DataCommand::ReleaseFile { file_handle } => {
send_bare_message(&mut tx, &file_access.release(file_handle).await?).await?;
}
DataCommand::ReadFile {
file_handle,
offset,
size,
} => {
send_bare_message(
&mut tx,
&file_access
.read(file_handle, offset, size)
.await
.unwrap_or_else(error_to_response),
)
.await?;
}
}
}

View File

@ -7,12 +7,14 @@ use path_absolutize::Absolutize;
use slab::Slab;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::io::SeekFrom;
use std::os::unix::fs::{MetadataExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs::{OpenOptions, ReadDir};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::sync::RwLock;
/// Server-wide state that all clients might need to mess with
@ -52,6 +54,9 @@ pub struct ClientSpecificState {
pub struct FileHandle {
/// Underlying file handle on our filesystem
pub file: tokio::fs::File,
/// Current offset
pub offset: u64,
}
pub struct DirHandle {
@ -86,6 +91,16 @@ fn io_error_to_response<A>(
}
}
pub fn error_to_response<A>(error: anyhow::Error) -> DataResponse<A> {
match error.downcast() {
Ok(io_error) => io_error_to_response(io_error, EFAULT, "Failure"),
Err(original) => DataResponse::Error {
code: EFAULT,
message: format!("Failure ({:?})", original),
},
}
}
impl FileAccess {
async fn resolve_vnode<R>(&self, vnode: VnodeId) -> Result<PathBuf, DataResponse<R>> {
let inode_map = self.client_state.inode_map.read().await;
@ -342,7 +357,8 @@ impl FileAccess {
match open_options.open(&path).await {
Ok(file) => {
let handle = Arc::new(RwLock::new(FileHandle { file }));
// TODO offset & append files: how does that work?
let handle = Arc::new(RwLock::new(FileHandle { file, offset: 0 }));
let file_handle: u32 = self
.client_state
@ -377,4 +393,62 @@ impl FileAccess {
})
}
}
pub async fn read(
&self,
file_handle: u32,
offset: i64,
size: u32,
) -> anyhow::Result<DataResponse<Vec<u8>>> {
let file_handle = file_handle as usize;
let file_handles = self.client_state.file_handles.read().await;
match file_handles.get(file_handle) {
Some(file_handle) => {
let offset: u64 = offset.try_into()?;
let mut handle = file_handle.write().await;
// TODO check this is valid for all modes including a+.
let starting_offset = if handle.offset != offset {
handle
.file
.seek(SeekFrom::Start(offset.try_into()?))
.await?
} else {
handle.offset
};
handle.offset = starting_offset;
if starting_offset != offset {
Ok(DataResponse::Success(Vec::with_capacity(0)))
} else {
// Read a full buffer if possible.
let mut target = vec![0u8; size as usize];
let mut read_so_far = 0;
while TryInto::<u64>::try_into(read_so_far)? < offset {
let how_many_read = handle.file.read(&mut target[read_so_far..]).await?;
read_so_far += how_many_read;
if how_many_read == 0 {
// This is end of file.
assert!(
target[read_so_far..].len() > 0,
"bug: reading to empty buffer"
);
// Delete the excess
target.drain(read_so_far..);
break;
}
}
handle.offset = starting_offset + TryInto::<u64>::try_into(read_so_far)?;
Ok(DataResponse::Success(target))
}
}
None => Ok(DataResponse::Error {
code: EBADFD,
message: format!("read(fh: {:#x?}): bad fd", file_handle),
}),
}
}
}