Support write() in theory (must support flush() to do it properly)
Some checks are pending
continuous-integration/drone the build is running
Some checks are pending
continuous-integration/drone the build is running
This commit is contained in:
parent
6a16534ec7
commit
1301901c2d
@ -444,7 +444,7 @@ impl Filesystem for OliveFilesystem {
|
|||||||
/// will be undefined if the open method didn't set any value.
|
/// will be undefined if the open method didn't set any value.
|
||||||
///
|
///
|
||||||
/// write_flags: will contain FUSE_WRITE_CACHE, if this write is from the page cache. If set,
|
/// write_flags: will contain FUSE_WRITE_CACHE, if this write is from the page cache. If set,
|
||||||
/// the pid, uid, gid, and fh may not match the value that would have been sent if write cachin
|
/// the pid, uid, gid, and fh may not match the value that would have been sent if write caching
|
||||||
/// is disabled
|
/// is disabled
|
||||||
/// flags: these are the file flags, such as O_SYNC. Only supported with ABI >= 7.9
|
/// flags: these are the file flags, such as O_SYNC. Only supported with ABI >= 7.9
|
||||||
/// lock_owner: only supported with ABI >= 7.9
|
/// lock_owner: only supported with ABI >= 7.9
|
||||||
@ -460,18 +460,38 @@ impl Filesystem for OliveFilesystem {
|
|||||||
lock_owner: Option<u64>,
|
lock_owner: Option<u64>,
|
||||||
reply: ReplyWrite,
|
reply: ReplyWrite,
|
||||||
) {
|
) {
|
||||||
debug!(
|
let requester = self.requester.clone();
|
||||||
|
let file_handle: u32 = fh.try_into().unwrap();
|
||||||
|
let data = data.to_vec();
|
||||||
|
|
||||||
|
self.spawn_with_error_handler(
|
||||||
|
async move {
|
||||||
|
let data_len = data.len();
|
||||||
|
match requester.write(file_handle, offset, data).await? {
|
||||||
|
DataResponse::Success(()) => {
|
||||||
|
reply.written(data_len as u32);
|
||||||
|
}
|
||||||
|
DataResponse::Error { code, message } => {
|
||||||
|
warn!(
|
||||||
"[Not Implemented] write(ino: {:#x?}, fh: {}, offset: {}, data.len(): {}, \
|
"[Not Implemented] write(ino: {:#x?}, fh: {}, offset: {}, data.len(): {}, \
|
||||||
write_flags: {:#x?}, flags: {:#x?}, lock_owner: {:?})",
|
write_flags: {:#x?}, flags: {:#x?}, lock_owner: {:?}) failed: {:?}",
|
||||||
ino,
|
ino,
|
||||||
fh,
|
fh,
|
||||||
offset,
|
offset,
|
||||||
data.len(),
|
data_len,
|
||||||
write_flags,
|
write_flags,
|
||||||
flags,
|
flags,
|
||||||
lock_owner
|
lock_owner,
|
||||||
|
message
|
||||||
|
);
|
||||||
|
reply.error(code as c_int);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
"read",
|
||||||
);
|
);
|
||||||
reply.error(ENOSYS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Flush method.
|
/// Flush method.
|
||||||
|
@ -248,6 +248,21 @@ impl Requester {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn write(
|
||||||
|
&self,
|
||||||
|
file_handle: u32,
|
||||||
|
offset: i64,
|
||||||
|
data: Vec<u8>,
|
||||||
|
) -> anyhow::Result<DataResponse<()>> {
|
||||||
|
self.internal
|
||||||
|
.command(&DataCommand::WriteFile {
|
||||||
|
file_handle,
|
||||||
|
offset,
|
||||||
|
data,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn start_streaming_reader(
|
pub async fn start_streaming_reader(
|
||||||
&self,
|
&self,
|
||||||
file_handle: u32,
|
file_handle: u32,
|
||||||
|
@ -71,6 +71,11 @@ pub enum DataCommand {
|
|||||||
offset: i64,
|
offset: i64,
|
||||||
size: u32,
|
size: u32,
|
||||||
},
|
},
|
||||||
|
WriteFile {
|
||||||
|
file_handle: u32,
|
||||||
|
offset: i64,
|
||||||
|
data: Vec<u8>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'static {}
|
pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'static {}
|
||||||
|
@ -70,6 +70,20 @@ pub async fn handle_command_stream(
|
|||||||
send_bare_message(&mut tx, &file_access.create(dir_vnode, mode, name).await?)
|
send_bare_message(&mut tx, &file_access.create(dir_vnode, mode, name).await?)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
DataCommand::WriteFile {
|
||||||
|
file_handle,
|
||||||
|
offset,
|
||||||
|
data,
|
||||||
|
} => {
|
||||||
|
send_bare_message(
|
||||||
|
&mut tx,
|
||||||
|
&file_access
|
||||||
|
.write(file_handle, offset, data)
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(error_to_response),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,12 +9,13 @@ use std::borrow::Borrow;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::SeekFrom;
|
use std::io::SeekFrom;
|
||||||
|
|
||||||
|
use anyhow::bail;
|
||||||
use std::os::unix::fs::{MetadataExt, PermissionsExt};
|
use std::os::unix::fs::{MetadataExt, PermissionsExt};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::fs::{OpenOptions, ReadDir};
|
use tokio::fs::{OpenOptions, ReadDir};
|
||||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
/// Server-wide state that all clients might need to mess with
|
/// Server-wide state that all clients might need to mess with
|
||||||
@ -516,4 +517,49 @@ impl FileAccess {
|
|||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn write(
|
||||||
|
&self,
|
||||||
|
file_handle: u32,
|
||||||
|
offset: i64,
|
||||||
|
data: Vec<u8>,
|
||||||
|
) -> anyhow::Result<DataResponse<()>> {
|
||||||
|
let file_handle = file_handle as usize;
|
||||||
|
let file_handles = self.client_state.file_handles.read().await;
|
||||||
|
match file_handles.get(file_handle) {
|
||||||
|
Some(file_handle) => {
|
||||||
|
let offset: u64 = offset.try_into()?;
|
||||||
|
let mut handle = file_handle.write().await;
|
||||||
|
// TODO check this is valid for all modes including a+.
|
||||||
|
let starting_offset = if handle.offset != offset {
|
||||||
|
handle
|
||||||
|
.file
|
||||||
|
.seek(SeekFrom::Start(offset.try_into()?))
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
handle.offset
|
||||||
|
};
|
||||||
|
|
||||||
|
handle.offset = starting_offset;
|
||||||
|
|
||||||
|
if starting_offset != offset {
|
||||||
|
bail!(
|
||||||
|
"write: can't seek to {:?} (sought to {:?})",
|
||||||
|
offset,
|
||||||
|
starting_offset
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
handle.file.write_all(&data).await?;
|
||||||
|
|
||||||
|
handle.offset = starting_offset + data.len() as u64;
|
||||||
|
|
||||||
|
Ok(DataResponse::Success(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => Ok(DataResponse::Error {
|
||||||
|
code: EBADFD,
|
||||||
|
message: format!("write(fh: {:#x?}): bad fd", file_handle),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user