From 470420665f599b831909a67b8b2adbb561087a2c Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 May 2023 20:44:24 +0100 Subject: [PATCH] Tweaks that tracked down SFTP infinite buffer problem --- .gitignore | 3 +- Cargo.lock | 24 +++-- datman/src/bin/datman.rs | 5 +- yama/src/storing.rs | 4 +- yama_wormfile_sftp/Cargo.toml | 6 +- yama_wormfile_sftp/src/lib.rs | 166 +++++++++++++++++----------------- 6 files changed, 110 insertions(+), 98 deletions(-) diff --git a/.gitignore b/.gitignore index 0761fab..328550d 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,5 @@ __pycache__ yama7demo yamaSFTPdemo -yama_localcache/testdb.sqlite \ No newline at end of file +yama_localcache/testdb.sqlite +sftp7demo \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 8ab3762..fa720c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1826,15 +1826,17 @@ dependencies = [ [[package]] name = "openssh-sftp-client" -version = "0.12.2" +version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620" +checksum = "866d0eab409a2fcb6b8c3838fdbf10d7399d486548c19179a80f1c1142e93348" dependencies = [ "bytes", "derive_destructure2", + "futures-core", "once_cell", "openssh-sftp-client-lowlevel", "openssh-sftp-error", + "pin-project", "scopeguard", "tokio", "tokio-io-utility", @@ -1843,9 +1845,9 @@ dependencies = [ [[package]] name = "openssh-sftp-client-lowlevel" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa" +checksum = "f4975d0a824e82d4f61e3edf870254ce97bd7f8154751d2afdd97c7f43e57dff" dependencies = [ "awaitable", "bytes", @@ -1860,9 +1862,9 @@ dependencies = [ [[package]] name = "openssh-sftp-error" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486" +checksum = "f4c3356e914b8006417188efd534105d5bcb230b4a9fd67782a6b4a4e15fa006" dependencies = [ "awaitable-error", "openssh-sftp-protocol-error", @@ -3124,9 +3126,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ "futures-core", "pin-project-lite", @@ -3135,9 +3137,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", @@ -4020,6 +4022,8 @@ dependencies = [ "rand", "thiserror", "tokio", + "tokio-stream", + "tracing", "yama_wormfile", ] diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index 2d7ecf0..70e59c6 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -159,8 +159,9 @@ pub async fn main() -> eyre::Result<()> { tracing_subscriber::registry() .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "sqlx=warn,yama=debug,datman=debug,info".into()), + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + "sqlx=warn,yama=debug,datman=debug,yama_wormfile_sftp=debug,info".into() + }), ) .with(tracing_subscriber::fmt::layer().with_writer(stderr_writer)) .with(indicatif_layer) diff --git a/yama/src/storing.rs b/yama/src/storing.rs index cd8c65c..4d2a8b5 100644 --- a/yama/src/storing.rs +++ b/yama/src/storing.rs @@ -323,9 +323,9 @@ async fn storage_pipeline_worker( .await? } - debug!("SPW shutdown"); - + debug!("finishing bloblogs"); bloblog_writers.finish_bloblogs(&mut storing_state).await?; + debug!("finished bloblogs!"); Ok(StoringIntermediate::from(storing_state)) } diff --git a/yama_wormfile_sftp/Cargo.toml b/yama_wormfile_sftp/Cargo.toml index 6348a0c..70e4812 100644 --- a/yama_wormfile_sftp/Cargo.toml +++ b/yama_wormfile_sftp/Cargo.toml @@ -10,9 +10,11 @@ yama_wormfile = { version = "0.1.0", path = "../yama_wormfile" } ouroboros = "0.15.6" openssh = "0.9.9" -openssh-sftp-client = "0.12.2" +openssh-sftp-client = "0.13.5" async-trait = "0.1.68" tokio = { version = "1.27.0", features = ["io-std"] } +tokio-stream = "0.1.14" rand = "0.8.5" thiserror = "1.0.40" -eyre = "0.6.8" \ No newline at end of file +eyre = "0.6.8" +tracing = "0.1.37" diff --git a/yama_wormfile_sftp/src/lib.rs b/yama_wormfile_sftp/src/lib.rs index 8dd930e..740ceee 100644 --- a/yama_wormfile_sftp/src/lib.rs +++ b/yama_wormfile_sftp/src/lib.rs @@ -4,8 +4,8 @@ use async_trait::async_trait; use eyre::{bail, Context as EyreContext, ContextCompat}; use openssh::{KnownHosts, RemoteChild, Session, Stdio}; use openssh_sftp_client::error::SftpErrorKind; -use openssh_sftp_client::file::TokioCompatFile; -use openssh_sftp_client::fs::Fs; +use openssh_sftp_client::file::{File, TokioCompatFile}; +use openssh_sftp_client::fs::{DirEntry, Fs}; use openssh_sftp_client::Error::SftpError; use openssh_sftp_client::Sftp; use ouroboros::self_referencing; @@ -15,9 +15,11 @@ use std::io::{ErrorKind, SeekFrom}; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; +use std::time::Duration; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf}; -use tokio::runtime::Handle; +use tokio_stream::StreamExt; +use tracing::debug; use yama_wormfile::paths::{WormPath, WormPathBuf}; use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter}; @@ -50,14 +52,6 @@ struct SftpConn { // fs: Fs<'this>, } -#[self_referencing] -struct FileWithSftpConn { - conn: Arc, - #[borrows(conn)] - #[covariant] - file: Option>, -} - impl SftpConn { pub async fn create(ssh_connect: &str, root_dir: impl Into) -> eyre::Result { let root_dir = root_dir.into(); @@ -98,13 +92,27 @@ impl SftpConn { Ok(res) } - pub fn get_fs(&self) -> Fs<'_> { + pub fn get_fs(&self) -> Fs { let mut fs = self.borrow_sftp().fs(); fs.set_cwd(&self.borrow_root_dir()); fs } async fn create_dir_all(&self, worm_path_as_pathbuf: PathBuf) -> eyre::Result<()> { + // Try twice to try and shake out race conditions if another worker is doing this exactly + // now... + if self + .create_dir_all_impl_once(worm_path_as_pathbuf.clone()) + .await + .is_ok() + { + return Ok(()); + } + tokio::time::sleep(Duration::from_secs(3)).await; + self.create_dir_all_impl_once(worm_path_as_pathbuf).await + } + + async fn create_dir_all_impl_once(&self, worm_path_as_pathbuf: PathBuf) -> eyre::Result<()> { let mut fs = self.get_fs(); let mut stack = vec![]; @@ -163,7 +171,7 @@ impl SftpWormFilesystem { Ok(SftpWormFilesystem { conn, root_dir }) } - fn get_fs(&self) -> Fs<'_> { + fn get_fs(&self) -> Fs { let mut fs = self.conn.borrow_sftp().fs(); fs.set_cwd(&self.root_dir); fs @@ -200,7 +208,7 @@ impl WormFileProvider for SftpWormFilesystem { let path = worm_path.as_str(); let mut fs = self.get_fs(); - let mut remote_dir = match fs.open_dir(path).await { + let remote_dir = match fs.open_dir(path).await { Ok(ok) => ok, Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => { return Ok(Vec::new()); @@ -209,7 +217,10 @@ impl WormFileProvider for SftpWormFilesystem { return Err(other.into()); } }; - let dir_reader = remote_dir.read_dir().await?; + let dir_reader: Vec = remote_dir + .read_dir() + .collect::, _>>() + .await?; Ok(dir_reader .iter() @@ -235,7 +246,7 @@ impl WormFileProvider for SftpWormFilesystem { let path = worm_path.as_str(); let mut fs = self.get_fs(); - let mut remote_dir = match fs.open_dir(path).await { + let remote_dir = match fs.open_dir(path).await { Ok(ok) => ok, Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => { return Ok(Vec::new()); @@ -244,7 +255,10 @@ impl WormFileProvider for SftpWormFilesystem { return Err(other.into()); } }; - let dir_reader = remote_dir.read_dir().await?; + let dir_reader: Vec = remote_dir + .read_dir() + .collect::, _>>() + .await?; Ok(dir_reader .iter() @@ -271,36 +285,18 @@ impl WormFileProvider for SftpWormFilesystem { let real_path = self.root_dir.join(path.as_ref().as_str()); let real_path2 = real_path.clone(); - // the `Send` in the below line is very important... - let mut file_with_conn: FileWithSftpConn = FileWithSftpConnAsyncSendTryBuilder { - conn: self.conn.clone(), - file_builder: |conn| { - Box::pin(async move { - let file = conn.borrow_sftp().open(real_path).await?; - Ok::<_, eyre::Report>(Some(TokioCompatFile::new(file))) - }) - }, - } - .try_build() - .await?; - // yucky hacks... but we need to get to the file to get the length out, so we can seek from the end... - let file_length = tokio::task::block_in_place(|| { - file_with_conn.with_file_mut(|file| { - Handle::current().block_on(async move { - file.as_mut() - .unwrap() - .metadata() - .await? - .len() - .context("no len in SFTP file metadata!") - }) - }) - })?; + let mut file: File = self.conn.borrow_sftp().open(real_path).await?; + let file_length = file + .metadata() + .await? + .len() + .context("no len in SFTP file metadata!")?; + let file = Some(Box::pin(TokioCompatFile::new(file))); Ok(SftpWormReader { path: real_path2, - file_with_conn, + file, length: file_length, }) } @@ -333,21 +329,15 @@ impl WormFileProvider for SftpWormFilesystem { continue; } - let file_with_conn: FileWithSftpConn = FileWithSftpConnAsyncSendTryBuilder { - conn: self.conn.clone(), - file_builder: |conn| { - Box::pin(async move { - let file = conn.borrow_sftp().create(real_path).await?; - Ok::<_, eyre::Report>(Some(TokioCompatFile::new(file))) - }) - }, - } - .try_build() - .await?; + let file = Some(Box::pin(TokioCompatFile::new( + self.conn.borrow_sftp().create(real_path).await?, + ))); break Ok(SftpWormWriter { temp_path: WormPathBuf::new(try_path).unwrap(), - file_with_conn: Some(file_with_conn), + file, + conn: self.conn.clone(), + unflushed_written_bytes: 0, }); } } @@ -363,7 +353,7 @@ impl WormFileProvider for SftpWormFilesystem { pub struct SftpWormReader { path: PathBuf, - file_with_conn: FileWithSftpConn, + file: Option>>, length: u64, } @@ -379,8 +369,7 @@ impl AsyncRead for SftpWormReader { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - self.file_with_conn - .with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_read(cx, buf)) + Pin::new(self.file.as_mut().unwrap()).poll_read(cx, buf) } } @@ -392,13 +381,11 @@ impl AsyncSeek for SftpWormReader { io::Error::new(ErrorKind::InvalidInput, "SeekFrom::End out of bounds") })?); } - self.file_with_conn - .with_file_mut(|file| Pin::new(file.as_mut().unwrap()).start_seek(position)) + Pin::new(self.file.as_mut().unwrap()).start_seek(position) } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.file_with_conn - .with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_complete(cx)) + Pin::new(self.file.as_mut().unwrap()).poll_complete(cx) } } @@ -406,7 +393,9 @@ impl WormFileReader for SftpWormReader {} pub struct SftpWormWriter { temp_path: WormPathBuf, - file_with_conn: Option, + file: Option>>, + conn: Arc, + unflushed_written_bytes: u64, } impl Debug for SftpWormWriter { @@ -415,54 +404,66 @@ impl Debug for SftpWormWriter { } } +/// Maximum number of unflushed bytes to have written, before we force-schedule a flush. +const MAX_UNFLUSHED_WRITTEN_BYTES: u64 = 32 * 1024 * 1024; + impl AsyncWrite for SftpWormWriter { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - self.file_with_conn - .as_mut() - .unwrap() - .with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_write(cx, buf)) + // Workaround for openssh-sftp-client bug that means that writes are infinitely buffered: + // only allow a certain number of bytes to be written before flushing. + if self.unflushed_written_bytes > MAX_UNFLUSHED_WRITTEN_BYTES { + // Flush and only continue if a flush was a no-op/immediately performed. + //ready!(self.as_mut().poll_flush(cx))?; + } + let r = Pin::new(self.file.as_mut().unwrap()).poll_write(cx, buf); + if r.is_ready() { + self.unflushed_written_bytes += buf.len() as u64; + } + r } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.file_with_conn - .as_mut() - .unwrap() - .with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_flush(cx)) + let r = Pin::new(self.file.as_mut().unwrap()).poll_flush(cx); + if r.is_ready() { + self.unflushed_written_bytes = 0; + } + r } fn poll_shutdown( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.file_with_conn - .as_mut() - .unwrap() - .with_file_mut(|file| Pin::new(file.as_mut().unwrap()).poll_shutdown(cx)) + Pin::new(self.file.as_mut().unwrap()).poll_shutdown(cx) } } #[async_trait] impl WormFileWriter for SftpWormWriter { async fn finalise(&mut self, target_path: &WormPath, replace: bool) -> io::Result<()> { + debug!("finalising SFTP file to {target_path:?}"); self.flush().await?; + debug!("flushed SFTP file to {target_path:?}"); let SftpWormWriter { temp_path, - file_with_conn, + file, + conn, .. } = self; - let mut file_with_conn = file_with_conn.take().unwrap(); - let file = file_with_conn.with_file_mut(|file| file.take().unwrap()); - file.close() + let file = file.take().unwrap(); + let file: TokioCompatFile = TokioCompatFile::clone(&file); + // This looks wrong but might be OK. Hack mostly needed due to the very awkward Pinning. + file.into_inner() + .close() .await .map_err(|e| io::Error::new(ErrorKind::Other, e))?; - let conn: Arc = file_with_conn.into_heads().conn; let mut fs = conn.get_fs(); // Directories will be created as needed. @@ -471,6 +472,7 @@ impl WormFileWriter for SftpWormWriter { .await .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; } + debug!("created dirs SFTP file to {target_path:?}"); // Avoid allowing a replacement if not intended. // But this is currently not atomic, so it's just a sanity check rather than a foolproof // safeguard! @@ -491,10 +493,12 @@ impl WormFileWriter for SftpWormWriter { } } + debug!("moving SFTP file to {target_path:?}"); // Perform the move. fs.rename(&temp_path.as_ref().as_str(), target_path.as_str()) .await .map_err(|e| io::Error::new(ErrorKind::Other, e))?; + debug!("moved SFTP file to {target_path:?}"); Ok(()) } }