diff --git a/olivefs/src/filesystem.rs b/olivefs/src/filesystem.rs index 315df2a..8542ed0 100644 --- a/olivefs/src/filesystem.rs +++ b/olivefs/src/filesystem.rs @@ -444,7 +444,7 @@ impl Filesystem for OliveFilesystem { /// will be undefined if the open method didn't set any value. /// /// write_flags: will contain FUSE_WRITE_CACHE, if this write is from the page cache. If set, - /// the pid, uid, gid, and fh may not match the value that would have been sent if write cachin + /// the pid, uid, gid, and fh may not match the value that would have been sent if write caching /// is disabled /// flags: these are the file flags, such as O_SYNC. Only supported with ABI >= 7.9 /// lock_owner: only supported with ABI >= 7.9 @@ -460,18 +460,38 @@ impl Filesystem for OliveFilesystem { lock_owner: Option, reply: ReplyWrite, ) { - debug!( - "[Not Implemented] write(ino: {:#x?}, fh: {}, offset: {}, data.len(): {}, \ - write_flags: {:#x?}, flags: {:#x?}, lock_owner: {:?})", - ino, - fh, - offset, - data.len(), - write_flags, - flags, - lock_owner + let requester = self.requester.clone(); + let file_handle: u32 = fh.try_into().unwrap(); + let data = data.to_vec(); + + self.spawn_with_error_handler( + async move { + let data_len = data.len(); + match requester.write(file_handle, offset, data).await? { + DataResponse::Success(()) => { + reply.written(data_len as u32); + } + DataResponse::Error { code, message } => { + warn!( + "[Not Implemented] write(ino: {:#x?}, fh: {}, offset: {}, data.len(): {}, \ + write_flags: {:#x?}, flags: {:#x?}, lock_owner: {:?}) failed: {:?}", + ino, + fh, + offset, + data_len, + write_flags, + flags, + lock_owner, + message + ); + reply.error(code as c_int); + } + } + + Ok(()) + }, + "read", ); - reply.error(ENOSYS); } /// Flush method. diff --git a/olivefs/src/requester.rs b/olivefs/src/requester.rs index 086d115..c4ee2a8 100644 --- a/olivefs/src/requester.rs +++ b/olivefs/src/requester.rs @@ -248,6 +248,21 @@ impl Requester { } } + pub async fn write( + &self, + file_handle: u32, + offset: i64, + data: Vec, + ) -> anyhow::Result> { + self.internal + .command(&DataCommand::WriteFile { + file_handle, + offset, + data, + }) + .await + } + pub async fn start_streaming_reader( &self, file_handle: u32, diff --git a/olivefs_common/src/messages.rs b/olivefs_common/src/messages.rs index 193a4bb..a612751 100644 --- a/olivefs_common/src/messages.rs +++ b/olivefs_common/src/messages.rs @@ -71,6 +71,11 @@ pub enum DataCommand { offset: i64, size: u32, }, + WriteFile { + file_handle: u32, + offset: i64, + data: Vec, + }, } pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'static {} diff --git a/olivefsd/src/server/connections.rs b/olivefsd/src/server/connections.rs index 58fb332..39842b4 100644 --- a/olivefsd/src/server/connections.rs +++ b/olivefsd/src/server/connections.rs @@ -70,6 +70,20 @@ pub async fn handle_command_stream( send_bare_message(&mut tx, &file_access.create(dir_vnode, mode, name).await?) .await?; } + DataCommand::WriteFile { + file_handle, + offset, + data, + } => { + send_bare_message( + &mut tx, + &file_access + .write(file_handle, offset, data) + .await + .unwrap_or_else(error_to_response), + ) + .await?; + } } } diff --git a/olivefsd/src/server/file_access.rs b/olivefsd/src/server/file_access.rs index af2e797..337436c 100644 --- a/olivefsd/src/server/file_access.rs +++ b/olivefsd/src/server/file_access.rs @@ -9,12 +9,13 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::io::SeekFrom; +use anyhow::bail; use std::os::unix::fs::{MetadataExt, PermissionsExt}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::fs::{OpenOptions, ReadDir}; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::sync::RwLock; /// Server-wide state that all clients might need to mess with @@ -516,4 +517,49 @@ impl FileAccess { }), } } + + pub async fn write( + &self, + file_handle: u32, + offset: i64, + data: Vec, + ) -> anyhow::Result> { + let file_handle = file_handle as usize; + let file_handles = self.client_state.file_handles.read().await; + match file_handles.get(file_handle) { + Some(file_handle) => { + let offset: u64 = offset.try_into()?; + let mut handle = file_handle.write().await; + // TODO check this is valid for all modes including a+. + let starting_offset = if handle.offset != offset { + handle + .file + .seek(SeekFrom::Start(offset.try_into()?)) + .await? + } else { + handle.offset + }; + + handle.offset = starting_offset; + + if starting_offset != offset { + bail!( + "write: can't seek to {:?} (sought to {:?})", + offset, + starting_offset + ); + } else { + handle.file.write_all(&data).await?; + + handle.offset = starting_offset + data.len() as u64; + + Ok(DataResponse::Success(())) + } + } + None => Ok(DataResponse::Error { + code: EBADFD, + message: format!("write(fh: {:#x?}): bad fd", file_handle), + }), + } + } }