diff --git a/olivefs/src/filesystem.rs b/olivefs/src/filesystem.rs index 8542ed0..1ec1ffe 100644 --- a/olivefs/src/filesystem.rs +++ b/olivefs/src/filesystem.rs @@ -473,16 +473,9 @@ impl Filesystem for OliveFilesystem { } DataResponse::Error { code, message } => { warn!( - "[Not Implemented] write(ino: {:#x?}, fh: {}, offset: {}, data.len(): {}, \ + "write(ino: {:#x?}, fh: {}, offset: {}, data.len(): {}, \ write_flags: {:#x?}, flags: {:#x?}, lock_owner: {:?}) failed: {:?}", - ino, - fh, - offset, - data_len, - write_flags, - flags, - lock_owner, - message + ino, fh, offset, data_len, write_flags, flags, lock_owner, message ); reply.error(code as c_int); } @@ -490,7 +483,7 @@ impl Filesystem for OliveFilesystem { Ok(()) }, - "read", + "write", ); } @@ -505,11 +498,30 @@ impl Filesystem for OliveFilesystem { /// filesystem wants to return write errors. If the filesystem supports file locking /// operations (setlk, getlk) it should remove all locks belonging to 'lock_owner'. fn flush(&mut self, _req: &Request<'_>, ino: u64, fh: u64, lock_owner: u64, reply: ReplyEmpty) { - debug!( - "[Not Implemented] flush(ino: {:#x?}, fh: {}, lock_owner: {:?})", - ino, fh, lock_owner + let requester = self.requester.clone(); + let file_handle: u32 = fh.try_into().unwrap(); + + // TODO(setlk, getlk): remove locks belonging to `lock_owner`. + + self.spawn_with_error_handler( + async move { + match requester.flush(file_handle).await? { + DataResponse::Success(()) => { + reply.ok(); + } + DataResponse::Error { code, message } => { + warn!( + "flush(ino: {:#x?}, fh: {}, lock_owner: {:?}): {:?}", + ino, fh, lock_owner, message + ); + reply.error(code as c_int); + } + } + + Ok(()) + }, + "flush", ); - reply.error(ENOSYS); } /// Release an open file. diff --git a/olivefs/src/requester.rs b/olivefs/src/requester.rs index c4ee2a8..dfefbff 100644 --- a/olivefs/src/requester.rs +++ b/olivefs/src/requester.rs @@ -263,6 +263,12 @@ impl Requester { .await } + pub async fn flush(&self, file_handle: u32) -> anyhow::Result> { + self.internal + .command(&DataCommand::FlushFile { file_handle }) + .await + } + pub async fn start_streaming_reader( &self, file_handle: u32, diff --git a/olivefs_common/src/messages.rs b/olivefs_common/src/messages.rs index a612751..231b844 100644 --- a/olivefs_common/src/messages.rs +++ b/olivefs_common/src/messages.rs @@ -76,6 +76,9 @@ pub enum DataCommand { offset: i64, data: Vec, }, + FlushFile { + file_handle: u32, + }, } pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'static {} diff --git a/olivefsd/src/server/connections.rs b/olivefsd/src/server/connections.rs index 39842b4..51ab1ef 100644 --- a/olivefsd/src/server/connections.rs +++ b/olivefsd/src/server/connections.rs @@ -84,6 +84,16 @@ pub async fn handle_command_stream( ) .await?; } + DataCommand::FlushFile { file_handle } => { + send_bare_message( + &mut tx, + &file_access + .flush(file_handle) + .await + .unwrap_or_else(error_to_response), + ) + .await?; + } } } diff --git a/olivefsd/src/server/file_access.rs b/olivefsd/src/server/file_access.rs index 337436c..33fe9f6 100644 --- a/olivefsd/src/server/file_access.rs +++ b/olivefsd/src/server/file_access.rs @@ -562,4 +562,20 @@ impl FileAccess { }), } } + + pub async fn flush(&self, file_handle: u32) -> anyhow::Result> { + let file_handle = file_handle as usize; + let file_handles = self.client_state.file_handles.read().await; + match file_handles.get(file_handle) { + Some(file_handle) => { + let mut file_handle = file_handle.write().await; + file_handle.file.flush().await?; + Ok(DataResponse::Success(())) + } + None => Ok(DataResponse::Error { + code: EBADFD, + message: format!("write(fh: {:#x?}): bad fd", file_handle), + }), + } + } }