Add a rather dodgy untested S3 implementation
This commit is contained in:
parent
a5d6bf3085
commit
8d5c373abc
975
Cargo.lock
generated
975
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -17,6 +17,8 @@ pub trait WormFileProvider: Debug + Send + Sync {
|
||||
/// Tests whether the path is a directory.
|
||||
/// Does not fail if the path does not exist, even if the parent path doesn't exist — returns
|
||||
/// false in that case.
|
||||
/// Beware! Some backends do not support the concept of a directory and will happily report
|
||||
/// `true` for any path.
|
||||
async fn is_dir(&self, path: impl AsRef<WormPath> + Send) -> Result<bool, Self::Error>;
|
||||
|
||||
/// Tests whether the path is a regular file.
|
||||
|
@ -6,3 +6,16 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
yama_wormfile = { version = "0.1.0", path = "../yama_wormfile" }
|
||||
|
||||
#ouroboros = "0.15.6"
|
||||
rust-s3 = { version = "0.33.0-beta5", features = ["with-tokio"] }
|
||||
async-trait = "0.1.68"
|
||||
tokio = { version = "1.27.0", features = ["io-std"] }
|
||||
rand = "0.8.5"
|
||||
thiserror = "1.0.40"
|
||||
|
||||
tokio-stream = "0.1.12"
|
||||
tokio-util = "0.7.7"
|
||||
bytes = "1.4.0"
|
||||
uuid = { version = "1.3.0", features = ["fast-rng", "v4"] }
|
@ -0,0 +1,393 @@
|
||||
use async_trait::async_trait;
|
||||
use s3::error::S3Error;
|
||||
use s3::serde_types::HeadObjectResult;
|
||||
use s3::Bucket;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::io;
|
||||
use std::io::{ErrorKind, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::task::{ready, Context, Poll};
|
||||
use tokio::io::{duplex, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf};
|
||||
use tokio::task::JoinHandle;
|
||||
use uuid::Uuid;
|
||||
use yama_wormfile::paths::{WormPath, WormPathBuf};
|
||||
use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter};
|
||||
|
||||
/// WormFileProvider that uses an S3 bucket, with a given path prefix.
|
||||
#[derive(Debug)]
|
||||
pub struct S3WormFilesystem {
|
||||
/// The path prefix for all S3 objects.
|
||||
path_prefix: String,
|
||||
|
||||
bucket: Bucket,
|
||||
}
|
||||
|
||||
impl S3WormFilesystem {
|
||||
pub fn new(bucket: Bucket, path_prefix: String) -> io::Result<S3WormFilesystem> {
|
||||
Ok(S3WormFilesystem {
|
||||
path_prefix,
|
||||
bucket,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn resolve_real_path(&self, path: impl AsRef<WormPath>) -> String {
|
||||
format!("{}{}", self.path_prefix, path.as_ref().as_str())
|
||||
}
|
||||
|
||||
async fn head_object(&self, full_path: &str) -> Result<Option<HeadObjectResult>, S3Error> {
|
||||
let (head, status) = self.bucket.head_object(full_path).await?;
|
||||
if status == 404 {
|
||||
return Ok(None);
|
||||
}
|
||||
if status == 200 {
|
||||
return Ok(Some(head));
|
||||
}
|
||||
Err(S3Error::Http(
|
||||
status,
|
||||
format!("bad status for HEAD {full_path}"),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WormFileProvider for S3WormFilesystem {
|
||||
type WormFileReader = S3WormReader;
|
||||
type WormFileWriter = S3WormWriter;
|
||||
type Error = S3Error;
|
||||
|
||||
async fn is_dir(&self, _path: impl AsRef<WormPath> + Send) -> Result<bool, Self::Error> {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn is_regular_file(
|
||||
&self,
|
||||
path: impl AsRef<WormPath> + Send,
|
||||
) -> Result<bool, Self::Error> {
|
||||
let full_path = self.resolve_real_path(path.as_ref());
|
||||
Ok(self.head_object(&full_path).await?.is_some())
|
||||
}
|
||||
|
||||
async fn list(
|
||||
&self,
|
||||
path: impl AsRef<WormPath> + Send,
|
||||
) -> Result<Vec<WormPathBuf>, Self::Error> {
|
||||
let path = path.as_ref();
|
||||
let full_path = self.resolve_real_path(path);
|
||||
let list = self
|
||||
.bucket
|
||||
.list(format!("{full_path}/"), Some("/".to_owned()))
|
||||
.await?;
|
||||
Ok(list
|
||||
.into_iter()
|
||||
.map(|lbr| lbr.contents)
|
||||
.flatten()
|
||||
.filter_map(|obj| {
|
||||
// Strip the path prefix from items, plus convert to `WormPathBuf`s
|
||||
obj.key
|
||||
.strip_prefix(&self.path_prefix)
|
||||
.map(|s| WormPathBuf::new(s.to_owned()))
|
||||
.flatten()
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn read(
|
||||
&self,
|
||||
path: impl AsRef<WormPath> + Send,
|
||||
) -> Result<Self::WormFileReader, Self::Error> {
|
||||
let path = path.as_ref();
|
||||
let full_path = self.resolve_real_path(path);
|
||||
let head = self.head_object(&full_path).await?.ok_or_else(|| {
|
||||
S3Error::Http(404, format!("can't read {path:?} as it does not exist."))
|
||||
})?;
|
||||
|
||||
let length = head
|
||||
.content_length
|
||||
.ok_or_else(|| S3Error::Http(999, format!("No content-length from HEAD {path:?}")))?;
|
||||
|
||||
Ok(S3WormReader {
|
||||
bucket: self.bucket.clone(),
|
||||
path: full_path,
|
||||
reader: None,
|
||||
offset: 0,
|
||||
length: length as u64,
|
||||
next_read_size_hint: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn write(&self) -> Result<Self::WormFileWriter, Self::Error> {
|
||||
let (tx, mut rx) = duplex(8192);
|
||||
// The rx half won't be doing any writing.
|
||||
rx.shutdown().await?;
|
||||
|
||||
// let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, io::Error>>(4);
|
||||
// let mut reader = StreamReader::new(ReceiverStream::new(rx));
|
||||
|
||||
let uuid = Uuid::new_v4();
|
||||
let temp_path = format!("tmp/{}.writing", uuid);
|
||||
let upload_to = self.resolve_real_path(WormPath::new(&temp_path).unwrap());
|
||||
|
||||
let bucket = self.bucket.clone();
|
||||
let join_handle: JoinHandle<Result<u16, S3Error>> = tokio::spawn(async move {
|
||||
let resp_code = bucket.put_object_stream(&mut rx, upload_to).await?;
|
||||
if resp_code != 200 {
|
||||
eprintln!("non-200 resp code for put!")
|
||||
}
|
||||
Ok(resp_code)
|
||||
});
|
||||
|
||||
Ok(S3WormWriter {
|
||||
tx,
|
||||
join_handle,
|
||||
temp_path,
|
||||
bucket: self.bucket.clone(),
|
||||
path_prefix: self.path_prefix.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct S3WormReader {
|
||||
/// S3 bucket
|
||||
bucket: Bucket,
|
||||
/// S3-side path (absolute)
|
||||
path: String,
|
||||
|
||||
/// A stream for reading bytes from S3, if open.
|
||||
reader: Option<(DuplexStream, Range<u64>)>,
|
||||
/// Our current stream offset
|
||||
offset: u64,
|
||||
/// The length of the file
|
||||
length: u64,
|
||||
/// Optionally, a hint for the expected size of the next read operation.
|
||||
/// Can be used to request the correct range from S3.
|
||||
next_read_size_hint: Option<u64>,
|
||||
}
|
||||
|
||||
impl S3WormReader {
|
||||
fn spawn_reader_for_range(&self, range: Range<u64>) -> DuplexStream {
|
||||
let (mut tx, rx) = duplex(65536);
|
||||
|
||||
let bucket = self.bucket.clone();
|
||||
let path = self.path.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// TODO it says this is 'inclusive', but asserts start < end...
|
||||
// TODO it'd be nice to have a `get_object_range_stream`: PR one to rust-s3?
|
||||
// alternatively rebuild on top of reqwest and serde_xml_rs, since rust-s3 is
|
||||
// a little bit dodgy around the corners...
|
||||
if let Err(err) = bucket
|
||||
.get_object_range_to_writer(path, range.start, Some(range.end), &mut tx)
|
||||
.await
|
||||
{
|
||||
eprintln!("TODO error reading from S3 {err:?}");
|
||||
}
|
||||
});
|
||||
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for S3WormReader {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "S3WormReader({:?})", self.path)
|
||||
}
|
||||
}
|
||||
|
||||
/// Default size of read request from S3.
|
||||
const DEFAULT_READ_WINDOW_SIZE: u64 = 64 * 1024 * 1024;
|
||||
|
||||
impl AsyncRead for S3WormReader {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
if self.reader.is_none() {
|
||||
// Need to spawn a reader
|
||||
if self.offset > self.length {
|
||||
panic!("offset {} > length {}", self.offset, self.length);
|
||||
}
|
||||
|
||||
if self.offset == self.length {
|
||||
// EOF
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
let read_window_size = self
|
||||
.next_read_size_hint
|
||||
.take()
|
||||
.unwrap_or(DEFAULT_READ_WINDOW_SIZE)
|
||||
.min(self.length - self.offset);
|
||||
|
||||
let read_range = self.offset..(self.offset + read_window_size);
|
||||
|
||||
let rx = self.spawn_reader_for_range(read_range.clone());
|
||||
self.reader = Some((rx, read_range));
|
||||
}
|
||||
|
||||
let (reader, read_range) = self.reader.as_mut().unwrap();
|
||||
let orig_remaining = buf.remaining();
|
||||
let read = ready!(Pin::new(reader).poll_read(cx, buf));
|
||||
let bytes_read = (buf.remaining() - orig_remaining) as u64;
|
||||
|
||||
if bytes_read == 0 && read_range.start != read_range.end {
|
||||
// Unexpected EOF
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
ErrorKind::UnexpectedEof,
|
||||
"early EOF reading from S3",
|
||||
)));
|
||||
}
|
||||
|
||||
read_range.start += bytes_read;
|
||||
if read_range.start >= read_range.end {
|
||||
self.reader = None;
|
||||
}
|
||||
self.offset += bytes_read;
|
||||
|
||||
Poll::Ready(read)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncSeek for S3WormReader {
|
||||
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
|
||||
let desired_offset = match position {
|
||||
SeekFrom::Start(offset) => offset,
|
||||
SeekFrom::End(end_offset) => {
|
||||
let wanted = self.length as i64 + end_offset;
|
||||
if wanted < 0 {
|
||||
return Err(io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"can't seek below zero (using relative end seek)",
|
||||
));
|
||||
}
|
||||
wanted as u64
|
||||
}
|
||||
SeekFrom::Current(delta) => {
|
||||
let wanted = self.offset as i64 + delta;
|
||||
if wanted < 0 {
|
||||
return Err(io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"can't seek below zero (using relative current seek)",
|
||||
));
|
||||
}
|
||||
wanted as u64
|
||||
}
|
||||
};
|
||||
|
||||
// Don't seek beyond the end. (That makes no sense for reading, but is technically allowed.)
|
||||
let desired_offset = desired_offset.min(self.length);
|
||||
self.offset = desired_offset;
|
||||
|
||||
// Discard any readers
|
||||
self.reader = None;
|
||||
self.next_read_size_hint = None;
|
||||
|
||||
// Return OK right away, the actual work will get done when we read.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
|
||||
// Return OK right away, the actual work gets done when we read.
|
||||
Poll::Ready(Ok(self.offset))
|
||||
}
|
||||
}
|
||||
|
||||
impl WormFileReader for S3WormReader {}
|
||||
|
||||
pub struct S3WormWriter {
|
||||
tx: DuplexStream,
|
||||
temp_path: String,
|
||||
join_handle: JoinHandle<Result<u16, S3Error>>,
|
||||
bucket: Bucket,
|
||||
path_prefix: String,
|
||||
}
|
||||
|
||||
impl Debug for S3WormWriter {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "FileWormWriter({:?})", self.temp_path)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for S3WormWriter {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
Pin::new(&mut self.tx).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Pin::new(&mut self.tx).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
Pin::new(&mut self.tx).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WormFileWriter for S3WormWriter {
|
||||
async fn finalise(mut self, target_path: &WormPath, replace: bool) -> io::Result<()> {
|
||||
self.tx.shutdown().await?;
|
||||
|
||||
let resp_code = self
|
||||
.join_handle
|
||||
.await?
|
||||
.map_err(|e| io::Error::new(ErrorKind::Other, e))?;
|
||||
if resp_code != 200 {
|
||||
return Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"non-200 resp code for PUT",
|
||||
));
|
||||
}
|
||||
|
||||
let full_target_path = format!("{}{}", self.path_prefix, target_path.as_str());
|
||||
|
||||
if !replace {
|
||||
let (_head, head_code) = self
|
||||
.bucket
|
||||
.head_object(&full_target_path)
|
||||
.await
|
||||
.map_err(|e| io::Error::new(ErrorKind::Other, e))?;
|
||||
if head_code != 404 {
|
||||
return Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"won't replace file; HEAD of target path not 404",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// S3 moves are done as a copy + delete
|
||||
|
||||
let response_code = self
|
||||
.bucket
|
||||
.copy_object_internal(&self.temp_path, &full_target_path)
|
||||
.await
|
||||
.map_err(|e| io::Error::new(ErrorKind::Other, e))?;
|
||||
if response_code != 200 {
|
||||
return Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"non-200 response for copy object",
|
||||
));
|
||||
}
|
||||
|
||||
let response_code = self
|
||||
.bucket
|
||||
.delete_object(&self.temp_path)
|
||||
.await
|
||||
.map_err(|e| io::Error::new(ErrorKind::Other, e))?;
|
||||
if response_code.status_code() != 200 {
|
||||
return Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"non-200 response for delete object",
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user