Tweaks that tracked down SFTP infinite buffer problem
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/release Pipeline was successful Details

This commit is contained in:
Olivier 'reivilibre' 2023-05-22 20:44:24 +01:00
parent a47924dc80
commit 470420665f
6 changed files with 110 additions and 98 deletions

3
.gitignore vendored
View File

@ -22,4 +22,5 @@ __pycache__
yama7demo yama7demo
yamaSFTPdemo yamaSFTPdemo
yama_localcache/testdb.sqlite yama_localcache/testdb.sqlite
sftp7demo

24
Cargo.lock generated
View File

@ -1826,15 +1826,17 @@ dependencies = [
[[package]] [[package]]
name = "openssh-sftp-client" name = "openssh-sftp-client"
version = "0.12.2" version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620" checksum = "866d0eab409a2fcb6b8c3838fdbf10d7399d486548c19179a80f1c1142e93348"
dependencies = [ dependencies = [
"bytes", "bytes",
"derive_destructure2", "derive_destructure2",
"futures-core",
"once_cell", "once_cell",
"openssh-sftp-client-lowlevel", "openssh-sftp-client-lowlevel",
"openssh-sftp-error", "openssh-sftp-error",
"pin-project",
"scopeguard", "scopeguard",
"tokio", "tokio",
"tokio-io-utility", "tokio-io-utility",
@ -1843,9 +1845,9 @@ dependencies = [
[[package]] [[package]]
name = "openssh-sftp-client-lowlevel" name = "openssh-sftp-client-lowlevel"
version = "0.4.1" version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa" checksum = "f4975d0a824e82d4f61e3edf870254ce97bd7f8154751d2afdd97c7f43e57dff"
dependencies = [ dependencies = [
"awaitable", "awaitable",
"bytes", "bytes",
@ -1860,9 +1862,9 @@ dependencies = [
[[package]] [[package]]
name = "openssh-sftp-error" name = "openssh-sftp-error"
version = "0.3.0" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486" checksum = "f4c3356e914b8006417188efd534105d5bcb230b4a9fd67782a6b4a4e15fa006"
dependencies = [ dependencies = [
"awaitable-error", "awaitable-error",
"openssh-sftp-protocol-error", "openssh-sftp-protocol-error",
@ -3124,9 +3126,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.12" version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
@ -3135,9 +3137,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.7" version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
@ -4020,6 +4022,8 @@ dependencies = [
"rand", "rand",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream",
"tracing",
"yama_wormfile", "yama_wormfile",
] ]

View File

@ -159,8 +159,9 @@ pub async fn main() -> eyre::Result<()> {
tracing_subscriber::registry() tracing_subscriber::registry()
.with( .with(
tracing_subscriber::EnvFilter::try_from_default_env() tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
.unwrap_or_else(|_| "sqlx=warn,yama=debug,datman=debug,info".into()), "sqlx=warn,yama=debug,datman=debug,yama_wormfile_sftp=debug,info".into()
}),
) )
.with(tracing_subscriber::fmt::layer().with_writer(stderr_writer)) .with(tracing_subscriber::fmt::layer().with_writer(stderr_writer))
.with(indicatif_layer) .with(indicatif_layer)

View File

@ -323,9 +323,9 @@ async fn storage_pipeline_worker<JobName: Debug>(
.await? .await?
} }
debug!("SPW shutdown"); debug!("finishing bloblogs");
bloblog_writers.finish_bloblogs(&mut storing_state).await?; bloblog_writers.finish_bloblogs(&mut storing_state).await?;
debug!("finished bloblogs!");
Ok(StoringIntermediate::from(storing_state)) Ok(StoringIntermediate::from(storing_state))
} }

View File

@ -10,9 +10,11 @@ yama_wormfile = { version = "0.1.0", path = "../yama_wormfile" }
ouroboros = "0.15.6" ouroboros = "0.15.6"
openssh = "0.9.9" openssh = "0.9.9"
openssh-sftp-client = "0.12.2" openssh-sftp-client = "0.13.5"
async-trait = "0.1.68" async-trait = "0.1.68"
tokio = { version = "1.27.0", features = ["io-std"] } tokio = { version = "1.27.0", features = ["io-std"] }
tokio-stream = "0.1.14"
rand = "0.8.5" rand = "0.8.5"
thiserror = "1.0.40" thiserror = "1.0.40"
eyre = "0.6.8" eyre = "0.6.8"
tracing = "0.1.37"

View File

@ -4,8 +4,8 @@ use async_trait::async_trait;
use eyre::{bail, Context as EyreContext, ContextCompat}; use eyre::{bail, Context as EyreContext, ContextCompat};
use openssh::{KnownHosts, RemoteChild, Session, Stdio}; use openssh::{KnownHosts, RemoteChild, Session, Stdio};
use openssh_sftp_client::error::SftpErrorKind; use openssh_sftp_client::error::SftpErrorKind;
use openssh_sftp_client::file::TokioCompatFile; use openssh_sftp_client::file::{File, TokioCompatFile};
use openssh_sftp_client::fs::Fs; use openssh_sftp_client::fs::{DirEntry, Fs};
use openssh_sftp_client::Error::SftpError; use openssh_sftp_client::Error::SftpError;
use openssh_sftp_client::Sftp; use openssh_sftp_client::Sftp;
use ouroboros::self_referencing; use ouroboros::self_referencing;
@ -15,9 +15,11 @@ use std::io::{ErrorKind, SeekFrom};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{ready, Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::runtime::Handle; use tokio_stream::StreamExt;
use tracing::debug;
use yama_wormfile::paths::{WormPath, WormPathBuf}; use yama_wormfile::paths::{WormPath, WormPathBuf};
use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter}; use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter};
@ -50,14 +52,6 @@ struct SftpConn {
// fs: Fs<'this>, // fs: Fs<'this>,
} }
#[self_referencing]
struct FileWithSftpConn {
conn: Arc<SftpConn>,
#[borrows(conn)]
#[covariant]
file: Option<TokioCompatFile<'this>>,
}
impl SftpConn { impl SftpConn {
pub async fn create(ssh_connect: &str, root_dir: impl Into<PathBuf>) -> eyre::Result<Self> { pub async fn create(ssh_connect: &str, root_dir: impl Into<PathBuf>) -> eyre::Result<Self> {
let root_dir = root_dir.into(); let root_dir = root_dir.into();
@ -98,13 +92,27 @@ impl SftpConn {
Ok(res) Ok(res)
} }
pub fn get_fs(&self) -> Fs<'_> { pub fn get_fs(&self) -> Fs {
let mut fs = self.borrow_sftp().fs(); let mut fs = self.borrow_sftp().fs();
fs.set_cwd(&self.borrow_root_dir()); fs.set_cwd(&self.borrow_root_dir());
fs fs
} }
async fn create_dir_all(&self, worm_path_as_pathbuf: PathBuf) -> eyre::Result<()> { async fn create_dir_all(&self, worm_path_as_pathbuf: PathBuf) -> eyre::Result<()> {
// Try twice to try and shake out race conditions if another worker is doing this exactly
// now...
if self
.create_dir_all_impl_once(worm_path_as_pathbuf.clone())
.await
.is_ok()
{
return Ok(());
}
tokio::time::sleep(Duration::from_secs(3)).await;
self.create_dir_all_impl_once(worm_path_as_pathbuf).await
}
async fn create_dir_all_impl_once(&self, worm_path_as_pathbuf: PathBuf) -> eyre::Result<()> {
let mut fs = self.get_fs(); let mut fs = self.get_fs();
let mut stack = vec![]; let mut stack = vec![];
@ -163,7 +171,7 @@ impl SftpWormFilesystem {
Ok(SftpWormFilesystem { conn, root_dir }) Ok(SftpWormFilesystem { conn, root_dir })
} }
fn get_fs(&self) -> Fs<'_> { fn get_fs(&self) -> Fs {
let mut fs = self.conn.borrow_sftp().fs(); let mut fs = self.conn.borrow_sftp().fs();
fs.set_cwd(&self.root_dir); fs.set_cwd(&self.root_dir);
fs fs
@ -200,7 +208,7 @@ impl WormFileProvider for SftpWormFilesystem {
let path = worm_path.as_str(); let path = worm_path.as_str();
let mut fs = self.get_fs(); let mut fs = self.get_fs();
let mut remote_dir = match fs.open_dir(path).await { let remote_dir = match fs.open_dir(path).await {
Ok(ok) => ok, Ok(ok) => ok,
Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => { Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => {
return Ok(Vec::new()); return Ok(Vec::new());
@ -209,7 +217,10 @@ impl WormFileProvider for SftpWormFilesystem {
return Err(other.into()); return Err(other.into());
} }
}; };
let dir_reader = remote_dir.read_dir().await?; let dir_reader: Vec<DirEntry> = remote_dir
.read_dir()
.collect::<Result<Vec<DirEntry>, _>>()
.await?;
Ok(dir_reader Ok(dir_reader
.iter() .iter()
@ -235,7 +246,7 @@ impl WormFileProvider for SftpWormFilesystem {
let path = worm_path.as_str(); let path = worm_path.as_str();
let mut fs = self.get_fs(); let mut fs = self.get_fs();
let mut remote_dir = match fs.open_dir(path).await { let remote_dir = match fs.open_dir(path).await {
Ok(ok) => ok, Ok(ok) => ok,
Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => { Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => {
return Ok(Vec::new()); return Ok(Vec::new());
@ -244,7 +255,10 @@ impl WormFileProvider for SftpWormFilesystem {
return Err(other.into()); return Err(other.into());
} }
}; };
let dir_reader = remote_dir.read_dir().await?; let dir_reader: Vec<DirEntry> = remote_dir
.read_dir()
.collect::<Result<Vec<DirEntry>, _>>()
.await?;
Ok(dir_reader Ok(dir_reader
.iter() .iter()
@ -271,36 +285,18 @@ impl WormFileProvider for SftpWormFilesystem {
let real_path = self.root_dir.join(path.as_ref().as_str()); let real_path = self.root_dir.join(path.as_ref().as_str());
let real_path2 = real_path.clone(); let real_path2 = real_path.clone();
// the `Send` in the below line is very important...
let mut file_with_conn: FileWithSftpConn = FileWithSftpConnAsyncSendTryBuilder {
conn: self.conn.clone(),
file_builder: |conn| {
Box::pin(async move {
let file = conn.borrow_sftp().open(real_path).await?;
Ok::<_, eyre::Report>(Some(TokioCompatFile::new(file)))
})
},
}
.try_build()
.await?;
// yucky hacks... but we need to get to the file to get the length out, so we can seek from the end... let mut file: File = self.conn.borrow_sftp().open(real_path).await?;
let file_length = tokio::task::block_in_place(|| { let file_length = file
file_with_conn.with_file_mut(|file| { .metadata()
Handle::current().block_on(async move { .await?
file.as_mut() .len()
.unwrap() .context("no len in SFTP file metadata!")?;
.metadata() let file = Some(Box::pin(TokioCompatFile::new(file)));
.await?
.len()
.context("no len in SFTP file metadata!")
})
})
})?;
Ok(SftpWormReader { Ok(SftpWormReader {
path: real_path2, path: real_path2,
file_with_conn, file,
length: file_length, length: file_length,
}) })
} }
@ -333,21 +329,15 @@ impl WormFileProvider for SftpWormFilesystem {
continue; continue;
} }
let file_with_conn: FileWithSftpConn = FileWithSftpConnAsyncSendTryBuilder { let file = Some(Box::pin(TokioCompatFile::new(
conn: self.conn.clone(), self.conn.borrow_sftp().create(real_path).await?,
file_builder: |conn| { )));
Box::pin(async move {
let file = conn.borrow_sftp().create(real_path).await?;
Ok::<_, eyre::Report>(Some(TokioCompatFile::new(file)))
})
},
}
.try_build()
.await?;
break Ok(SftpWormWriter { break Ok(SftpWormWriter {
temp_path: WormPathBuf::new(try_path).unwrap(), temp_path: WormPathBuf::new(try_path).unwrap(),
file_with_conn: Some(file_with_conn), file,
conn: self.conn.clone(),
unflushed_written_bytes: 0,
}); });
} }
} }
@ -363,7 +353,7 @@ impl WormFileProvider for SftpWormFilesystem {
pub struct SftpWormReader { pub struct SftpWormReader {
path: PathBuf, path: PathBuf,
file_with_conn: FileWithSftpConn, file: Option<Pin<Box<TokioCompatFile>>>,
length: u64, length: u64,
} }
@ -379,8 +369,7 @@ impl AsyncRead for SftpWormReader {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> { ) -> Poll<io::Result<()>> {
self.file_with_conn Pin::new(self.file.as_mut().unwrap()).poll_read(cx, buf)
.with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_read(cx, buf))
} }
} }
@ -392,13 +381,11 @@ impl AsyncSeek for SftpWormReader {
io::Error::new(ErrorKind::InvalidInput, "SeekFrom::End out of bounds") io::Error::new(ErrorKind::InvalidInput, "SeekFrom::End out of bounds")
})?); })?);
} }
self.file_with_conn Pin::new(self.file.as_mut().unwrap()).start_seek(position)
.with_file_mut(|file| Pin::new(file.as_mut().unwrap()).start_seek(position))
} }
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
self.file_with_conn Pin::new(self.file.as_mut().unwrap()).poll_complete(cx)
.with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_complete(cx))
} }
} }
@ -406,7 +393,9 @@ impl WormFileReader for SftpWormReader {}
pub struct SftpWormWriter { pub struct SftpWormWriter {
temp_path: WormPathBuf, temp_path: WormPathBuf,
file_with_conn: Option<FileWithSftpConn>, file: Option<Pin<Box<TokioCompatFile>>>,
conn: Arc<SftpConn>,
unflushed_written_bytes: u64,
} }
impl Debug for SftpWormWriter { impl Debug for SftpWormWriter {
@ -415,54 +404,66 @@ impl Debug for SftpWormWriter {
} }
} }
/// Maximum number of unflushed bytes to have written, before we force-schedule a flush.
const MAX_UNFLUSHED_WRITTEN_BYTES: u64 = 32 * 1024 * 1024;
impl AsyncWrite for SftpWormWriter { impl AsyncWrite for SftpWormWriter {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<Result<usize, io::Error>> { ) -> Poll<Result<usize, io::Error>> {
self.file_with_conn // Workaround for openssh-sftp-client bug that means that writes are infinitely buffered:
.as_mut() // only allow a certain number of bytes to be written before flushing.
.unwrap() if self.unflushed_written_bytes > MAX_UNFLUSHED_WRITTEN_BYTES {
.with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_write(cx, buf)) // Flush and only continue if a flush was a no-op/immediately performed.
//ready!(self.as_mut().poll_flush(cx))?;
}
let r = Pin::new(self.file.as_mut().unwrap()).poll_write(cx, buf);
if r.is_ready() {
self.unflushed_written_bytes += buf.len() as u64;
}
r
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.file_with_conn let r = Pin::new(self.file.as_mut().unwrap()).poll_flush(cx);
.as_mut() if r.is_ready() {
.unwrap() self.unflushed_written_bytes = 0;
.with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_flush(cx)) }
r
} }
fn poll_shutdown( fn poll_shutdown(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> { ) -> Poll<Result<(), io::Error>> {
self.file_with_conn Pin::new(self.file.as_mut().unwrap()).poll_shutdown(cx)
.as_mut()
.unwrap()
.with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_shutdown(cx))
} }
} }
#[async_trait] #[async_trait]
impl WormFileWriter for SftpWormWriter { impl WormFileWriter for SftpWormWriter {
async fn finalise(&mut self, target_path: &WormPath, replace: bool) -> io::Result<()> { async fn finalise(&mut self, target_path: &WormPath, replace: bool) -> io::Result<()> {
debug!("finalising SFTP file to {target_path:?}");
self.flush().await?; self.flush().await?;
debug!("flushed SFTP file to {target_path:?}");
let SftpWormWriter { let SftpWormWriter {
temp_path, temp_path,
file_with_conn, file,
conn,
.. ..
} = self; } = self;
let mut file_with_conn = file_with_conn.take().unwrap();
let file = file_with_conn.with_file_mut(|file| file.take().unwrap()); let file = file.take().unwrap();
file.close() let file: TokioCompatFile = TokioCompatFile::clone(&file);
// This looks wrong but might be OK. Hack mostly needed due to the very awkward Pinning.
file.into_inner()
.close()
.await .await
.map_err(|e| io::Error::new(ErrorKind::Other, e))?; .map_err(|e| io::Error::new(ErrorKind::Other, e))?;
let conn: Arc<SftpConn> = file_with_conn.into_heads().conn;
let mut fs = conn.get_fs(); let mut fs = conn.get_fs();
// Directories will be created as needed. // Directories will be created as needed.
@ -471,6 +472,7 @@ impl WormFileWriter for SftpWormWriter {
.await .await
.map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;
} }
debug!("created dirs SFTP file to {target_path:?}");
// Avoid allowing a replacement if not intended. // Avoid allowing a replacement if not intended.
// But this is currently not atomic, so it's just a sanity check rather than a foolproof // But this is currently not atomic, so it's just a sanity check rather than a foolproof
// safeguard! // safeguard!
@ -491,10 +493,12 @@ impl WormFileWriter for SftpWormWriter {
} }
} }
debug!("moving SFTP file to {target_path:?}");
// Perform the move. // Perform the move.
fs.rename(&temp_path.as_ref().as_str(), target_path.as_str()) fs.rename(&temp_path.as_ref().as_str(), target_path.as_str())
.await .await
.map_err(|e| io::Error::new(ErrorKind::Other, e))?; .map_err(|e| io::Error::new(ErrorKind::Other, e))?;
debug!("moved SFTP file to {target_path:?}");
Ok(()) Ok(())
} }
} }