Add yama_wormfile crates

These will be a useful component of the new on-disk storage format
This commit is contained in:
Olivier 'reivilibre' 2023-04-02 20:47:34 +01:00
parent 7cd71695bc
commit 1fe4d9d2f3
13 changed files with 698 additions and 21 deletions

178
Cargo.lock generated
View File

@ -67,6 +67,17 @@ dependencies = [
"serde",
]
[[package]]
name = "async-trait"
version = "0.1.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.13",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -141,6 +152,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cap-fs-ext"
version = "0.24.4"
@ -246,7 +263,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
"syn 1.0.96",
]
[[package]]
@ -404,7 +421,7 @@ dependencies = [
[[package]]
name = "datman"
version = "0.6.0-alpha.5"
version = "0.7.0-alpha.1"
dependencies = [
"anyhow",
"arc-interner",
@ -441,7 +458,7 @@ checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.96",
]
[[package]]
@ -883,7 +900,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"syn",
"syn 1.0.96",
]
[[package]]
@ -910,7 +927,7 @@ dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys",
"windows-sys 0.36.1",
]
[[package]]
@ -1037,9 +1054,15 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
"windows-sys 0.36.1",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pkg-config"
version = "0.3.25"
@ -1061,7 +1084,7 @@ dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"syn 1.0.96",
"version_check",
]
@ -1084,9 +1107,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.39"
version = "1.0.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f"
checksum = "1d0dd4be24fcdcfeaa12a432d588dc59bbad6cad3510c67e74a2b6b2fc950564"
dependencies = [
"unicode-ident",
]
@ -1099,9 +1122,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.18"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1"
checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc"
dependencies = [
"proc-macro2",
]
@ -1320,7 +1343,7 @@ checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.96",
]
[[package]]
@ -1429,7 +1452,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn",
"syn 1.0.96",
]
[[package]]
@ -1443,6 +1466,17 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c9da457c5285ac1f936ebd076af6dac17a61cfe7826f2076b4d015cf47bc8ec"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "system-interface"
version = "0.20.0"
@ -1520,7 +1554,7 @@ checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.96",
]
[[package]]
@ -1543,6 +1577,18 @@ dependencies = [
"crunchy",
]
[[package]]
name = "tokio"
version = "1.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001"
dependencies = [
"autocfg",
"bytes",
"pin-project-lite",
"windows-sys 0.45.0",
]
[[package]]
name = "toml"
version = "0.5.9"
@ -1683,43 +1729,109 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
"windows_aarch64_msvc 0.36.1",
"windows_i686_gnu 0.36.1",
"windows_i686_msvc 0.36.1",
"windows_x86_64_gnu 0.36.1",
"windows_x86_64_msvc 0.36.1",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "winx"
version = "0.31.0"
@ -1733,7 +1845,7 @@ dependencies = [
[[package]]
name = "yama"
version = "0.6.0-alpha.5"
version = "0.7.0-alpha.1"
dependencies = [
"anyhow",
"blake",
@ -1766,6 +1878,32 @@ dependencies = [
"zstd",
]
[[package]]
name = "yama_wormfile"
version = "0.1.0"
dependencies = [
"async-trait",
"tokio",
]
[[package]]
name = "yama_wormfile_fs"
version = "0.1.0"
dependencies = [
"async-trait",
"rand",
"tokio",
"yama_wormfile",
]
[[package]]
name = "yama_wormfile_s3"
version = "0.1.0"
[[package]]
name = "yama_wormfile_sftp"
version = "0.1.0"
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@ -2,6 +2,10 @@
members = [
"yama",
"datman",
"yama_wormfile",
"yama_wormfile_fs",
"yama_wormfile_sftp",
"yama_wormfile_s3",
]
[profile.release]

View File

@ -6,7 +6,7 @@ let
rust-toolchain = pkgs.symlinkJoin {
name = "rust-toolchain";
paths = [pkgs.rustc pkgs.cargo pkgs.rustfmt pkgs.rustPlatform.rustcSrc];
paths = [pkgs.rustc pkgs.cargo pkgs.clippy pkgs.rustfmt pkgs.rustPlatform.rustcSrc];
};
in

10
yama_wormfile/Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[package]
name = "yama_wormfile"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.68"
tokio = { version = "1.27.0", features = ["io-util"] }

120
yama_wormfile/src/boxed.rs Normal file
View File

@ -0,0 +1,120 @@
use crate::paths::{WormPath, WormPathBuf};
use crate::{WormFileProvider, WormFileReader, WormFileWriter};
use async_trait::async_trait;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::pin::Pin;
pub struct BoxErr(Box<dyn Error + Send + Sync>);
impl Debug for BoxErr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.0, f)
}
}
impl Display for BoxErr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}
impl Error for BoxErr {}
impl BoxErr {
pub fn new(err: impl Error + Sync + Send + 'static) -> Self {
BoxErr(Box::new(err))
}
}
#[async_trait]
trait BoxableWormFileProvider: Debug + Send + Sync {
async fn is_dir_b(&self, path: &WormPath) -> Result<bool, BoxErr>;
async fn is_regular_file_b(&self, path: &WormPath) -> Result<bool, BoxErr>;
async fn list_b(&self, path: &WormPath) -> Result<Vec<WormPathBuf>, BoxErr>;
async fn read_b(&self, path: &WormPath) -> Result<Pin<Box<dyn WormFileReader>>, BoxErr>;
async fn write_b(&self) -> Result<Pin<Box<dyn WormFileWriter>>, BoxErr>;
}
#[async_trait]
impl<T: WormFileProvider> BoxableWormFileProvider for T {
async fn is_dir_b(&self, path: &WormPath) -> Result<bool, BoxErr> {
self.is_dir(path).await.map_err(BoxErr::new)
}
async fn is_regular_file_b(&self, path: &WormPath) -> Result<bool, BoxErr> {
self.is_regular_file(path).await.map_err(BoxErr::new)
}
async fn list_b(&self, path: &WormPath) -> Result<Vec<WormPathBuf>, BoxErr> {
self.list(path).await.map_err(BoxErr::new)
}
async fn read_b(&self, path: &WormPath) -> Result<Pin<Box<dyn WormFileReader>>, BoxErr> {
self.read(path)
.await
.map_err(BoxErr::new)
.map(|wfr| Box::pin(wfr) as Pin<Box<dyn WormFileReader>>)
}
async fn write_b(&self) -> Result<Pin<Box<dyn WormFileWriter>>, BoxErr> {
self.write()
.await
.map_err(BoxErr::new)
.map(|wfw| Box::pin(wfw) as Pin<Box<dyn WormFileWriter>>)
}
}
#[derive(Debug)]
pub struct BoxedWormFileProvider {
inner: Box<dyn BoxableWormFileProvider>,
}
#[async_trait]
impl WormFileProvider for BoxedWormFileProvider {
type WormFileReader = Pin<Box<dyn WormFileReader>>;
type WormFileWriter = Pin<Box<dyn WormFileWriter>>;
type Error = BoxErr;
async fn is_dir(&self, path: impl AsRef<WormPath> + Send) -> Result<bool, Self::Error> {
let path = path.as_ref();
self.inner.is_dir_b(path).await
}
async fn is_regular_file(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<bool, Self::Error> {
let path = path.as_ref();
self.inner.is_regular_file_b(path).await
}
async fn list(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<Vec<WormPathBuf>, Self::Error> {
let path = path.as_ref();
self.inner.list_b(path).await
}
async fn read(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<Self::WormFileReader, Self::Error> {
let path = path.as_ref();
self.inner.read_b(path).await
}
async fn write(&self) -> Result<Self::WormFileWriter, Self::Error> {
self.inner.write_b().await
}
}
#[async_trait]
impl WormFileReader for Pin<Box<dyn WormFileReader>> {}
#[async_trait]
impl WormFileWriter for Pin<Box<dyn WormFileWriter>> {
async fn finalise(self, target_path: &WormPath, replace: bool) -> std::io::Result<()> {
WormFileWriter::finalise(self, target_path, replace).await
}
}

64
yama_wormfile/src/lib.rs Normal file
View File

@ -0,0 +1,64 @@
use crate::paths::{WormPath, WormPathBuf};
use async_trait::async_trait;
use std::error::Error;
use std::fmt::Debug;
use std::io;
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
pub mod boxed;
pub mod paths;
#[async_trait]
pub trait WormFileProvider: Debug + Send + Sync {
type WormFileReader: WormFileReader;
type WormFileWriter: WormFileWriter;
type Error: Error + Send + Sync + 'static;
/// Tests whether the path is a directory.
/// Does not fail if the path does not exist, even if the parent path doesn't exist — returns
/// false in that case.
async fn is_dir(&self, path: impl AsRef<WormPath> + Send) -> Result<bool, Self::Error>;
/// Tests whether the path is a regular file.
/// Does not fail if the path does not exist, even if the parent path doesn't exist — returns
/// false in that case.
async fn is_regular_file(&self, path: impl AsRef<WormPath> + Send)
-> Result<bool, Self::Error>;
/// Lists all the files and directories in the specified path.
///
/// If the path does not exist, gives an error.
/// TODO a streaming version of this might be beneficial.
async fn list(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<Vec<WormPathBuf>, Self::Error>;
/// Reads a file.
///
/// Fails if the file does not exist or is not a regular file.
async fn read(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<Self::WormFileReader, Self::Error>;
/// Writes to a file.
///
/// No path is supplied here because this interface provides atomic writing by first writing
/// to a temporary file and moving it in place once ready.
///
/// When applicable, the file is first created in the `tmp` directory during writing and then
/// moved into place afterwards.
async fn write(&self) -> Result<Self::WormFileWriter, Self::Error>;
}
pub trait WormFileReader: AsyncRead + AsyncSeek + Debug + Send + Sync + 'static {}
#[async_trait]
pub trait WormFileWriter: AsyncWrite + Debug + Send + Sync + 'static {
/// Finish writing the file.
/// Moves the file atomically to `target_path`.
/// If `replace` is false, will not overwrite a file. (May be best-effort depending on backend;
/// intended as a sanity check rather than a flawless safeguard.)
async fn finalise(self, target_path: &WormPath, replace: bool) -> io::Result<()>;
}

View File

@ -0,0 +1,96 @@
use std::borrow::Borrow;
/// Simplified version of `Path` for use in WormFile situations.
/// The Path is guaranteed to remain within the root and does not contain any `.` or `..` elements.
#[repr(transparent)]
pub struct WormPath {
inner: str,
}
impl WormPath {
pub fn new(path_str: &str) -> Option<&WormPath> {
if path_str
.split('/')
.any(|component| component.is_empty() || component == "." || component == "..")
{
None
} else {
Some(unsafe { Self::new_unchecked(path_str) })
}
}
unsafe fn new_unchecked(path_str: &str) -> &WormPath {
&*(path_str as *const str as *const WormPath)
}
pub fn as_str(&self) -> &str {
&self.inner
}
pub fn join(&self, extension: impl AsRef<str>) -> Option<WormPathBuf> {
let extension = extension.as_ref();
if extension.starts_with('/')
|| extension
.split('/')
.any(|component| component.is_empty() || component == "." || component == "..")
{
return None;
}
let mut owned = self.inner.to_owned();
owned.reserve_exact(extension.len() + 1);
owned.push('/');
owned.push_str(extension);
// TODO maybe convert to new_unchecked in the future.
Some(WormPathBuf::new(owned).expect("already checked upfront"))
}
}
impl ToOwned for WormPath {
type Owned = WormPathBuf;
fn to_owned(&self) -> Self::Owned {
WormPathBuf {
inner: self.inner.to_owned(),
}
}
}
impl AsRef<WormPath> for WormPath {
fn as_ref(&self) -> &WormPath {
self
}
}
/// Simplified version of `PathBuf` for use in WormFile situations.
/// Owned form of `WormPath`.
#[repr(transparent)]
pub struct WormPathBuf {
inner: String,
}
impl WormPathBuf {
pub fn new(path_string: String) -> Option<WormPathBuf> {
if path_string
.split('/')
.any(|component| component.is_empty() || component == "." || component == "..")
{
None
} else {
Some(WormPathBuf { inner: path_string })
}
}
}
impl AsRef<WormPath> for WormPathBuf {
fn as_ref(&self) -> &WormPath {
unsafe { WormPath::new_unchecked(&self.inner) }
}
}
impl Borrow<WormPath> for WormPathBuf {
fn borrow(&self) -> &WormPath {
unsafe { WormPath::new_unchecked(&self.inner) }
}
}

View File

@ -0,0 +1,13 @@
[package]
name = "yama_wormfile_fs"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
yama_wormfile = { version = "0.1.0", path = "../yama_wormfile" }
async-trait = "0.1.68"
tokio = { version = "1.27.0", features = ["io-std", "fs"] }
rand = "0.8.5"

216
yama_wormfile_fs/src/lib.rs Normal file
View File

@ -0,0 +1,216 @@
use async_trait::async_trait;
use std::fmt::{Debug, Formatter};
use std::io;
use std::io::{ErrorKind, SeekFrom};
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf};
use yama_wormfile::paths::{WormPath, WormPathBuf};
use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter};
/// WormFileProvider that uses the local filesystem, in a given root directory.
#[derive(Debug)]
pub struct LocalWormFilesystem {
/// The root directory.
root_dir: PathBuf,
}
impl LocalWormFilesystem {
pub fn new(root_dir: impl Into<PathBuf>) -> io::Result<LocalWormFilesystem> {
let root_dir = root_dir.into();
if !root_dir.is_dir() {
return Err(io::Error::new(
ErrorKind::Other,
"root for LocalWormFilesystem is not a directory!",
));
}
Ok(LocalWormFilesystem { root_dir })
}
pub fn resolve_real_path(&self, path: impl AsRef<WormPath>) -> PathBuf {
let relpath = path.as_ref();
self.root_dir.join(relpath.as_str())
}
}
#[async_trait]
impl WormFileProvider for LocalWormFilesystem {
type WormFileReader = FileWormReader;
type WormFileWriter = FileWormWriter;
type Error = io::Error;
async fn is_dir(&self, path: impl AsRef<WormPath> + Send) -> Result<bool, Self::Error> {
let path = self.resolve_real_path(path.as_ref());
Ok(tokio::fs::metadata(path).await?.is_dir())
}
async fn is_regular_file(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<bool, Self::Error> {
let path = self.resolve_real_path(path.as_ref());
Ok(tokio::fs::metadata(path).await?.is_file())
}
async fn list(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<Vec<WormPathBuf>, Self::Error> {
let worm_path = path.as_ref();
let real_path = self.resolve_real_path(worm_path);
let mut dir_reader = tokio::fs::read_dir(real_path).await?;
let mut out = Vec::new();
while let Some(next_ent) = dir_reader.next_entry().await? {
if let Some(name_str) = next_ent.file_name().to_str() {
out.push(worm_path.join(name_str).unwrap());
}
}
Ok(out)
}
async fn read(
&self,
path: impl AsRef<WormPath> + Send,
) -> Result<Self::WormFileReader, Self::Error> {
let worm_path = path.as_ref();
let real_path = self.resolve_real_path(worm_path);
let file = OpenOptions::new().read(true).open(&real_path).await?;
Ok(FileWormReader {
path: real_path,
file,
})
}
async fn write(&self) -> Result<Self::WormFileWriter, Self::Error> {
let tmp_dir = self.root_dir.join("tmp");
if !tokio::fs::try_exists(&tmp_dir).await? {
tokio::fs::create_dir(&tmp_dir).await?;
}
let (tmp_path, file) = loop {
let rand_num: u32 = rand::random();
let pid = std::process::id();
let try_fn = format!("pid{pid}-{rand_num:08X}.writing");
let try_path = tmp_dir.join(try_fn);
match OpenOptions::new().create_new(true).open(&try_path).await {
Ok(file) => break (try_path, file),
Err(err) => {
if err.kind() == ErrorKind::AlreadyExists {
continue;
} else {
return Err(err);
}
}
}
};
Ok(FileWormWriter {
temp_path: tmp_path,
file,
root_dir: self.root_dir.clone(),
})
}
}
pub struct FileWormReader {
path: PathBuf,
file: File,
}
impl Debug for FileWormReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "FileWormReader({:?})", self.path)
}
}
impl AsyncRead for FileWormReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.file).poll_read(cx, buf)
}
}
impl AsyncSeek for FileWormReader {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
Pin::new(&mut self.file).start_seek(position)
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Pin::new(&mut self.file).poll_complete(cx)
}
}
impl WormFileReader for FileWormReader {}
pub struct FileWormWriter {
temp_path: PathBuf,
file: File,
root_dir: PathBuf,
}
impl Debug for FileWormWriter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "FileWormWriter({:?})", self.temp_path)
}
}
impl AsyncWrite for FileWormWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.file).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.file).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.file).poll_shutdown(cx)
}
}
#[async_trait]
impl WormFileWriter for FileWormWriter {
async fn finalise(mut self, target_path: &WormPath, replace: bool) -> io::Result<()> {
self.flush().await?;
let FileWormWriter {
root_dir,
temp_path,
..
} = self;
let worm_path = target_path;
let real_path = root_dir.join(worm_path.as_str());
// Directories will be created as needed.
let parent = real_path.parent().unwrap();
tokio::fs::create_dir_all(parent).await?;
// Avoid allowing a replacement if not intended.
// But this is currently not atomic, so it's just a sanity check rather than a foolproof
// safeguard!
if !replace && tokio::fs::try_exists(&real_path).await? {
return Err(io::Error::new(
ErrorKind::AlreadyExists,
"finalise()ing a writer: dest already exists and replace = false",
));
}
// Perform the move, atomically.
tokio::fs::rename(&temp_path, &real_path).await?;
Ok(())
}
}

View File

@ -0,0 +1,8 @@
[package]
name = "yama_wormfile_s3"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

View File

View File

@ -0,0 +1,8 @@
[package]
name = "yama_wormfile_sftp"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

View File