diff --git a/datman/src/backup.rs b/datman/src/backup.rs index c63159c..19b2d39 100644 --- a/datman/src/backup.rs +++ b/datman/src/backup.rs @@ -318,6 +318,7 @@ struct BackupDirSourcesReturn { pub dir_source_returns: Vec<(DirSourcePrep, PatriciaMap>)>, } +#[async_backtrace::framed] async fn backup_dir_sources( mut dir_sources: Vec, pwc: Arc>, diff --git a/yama/src/storing.rs b/yama/src/storing.rs index e9a016e..28c173d 100644 --- a/yama/src/storing.rs +++ b/yama/src/storing.rs @@ -14,7 +14,6 @@ use std::pin::Pin; use std::sync::Arc; use std::thread::JoinHandle; use tokio::runtime::Handle; -use tokio::task; use tracing::{debug, error, info_span, warn}; use yama_localcache::StoreConnection; use yama_midlevel_crypto::chunk_id::{ChunkId, ChunkIdKey}; @@ -141,7 +140,6 @@ impl StoringState { } /// For internal use only. - #[async_backtrace::framed] fn process_chunk( &mut self, chunk_bytes: &[u8], @@ -151,73 +149,66 @@ impl StoringState { let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key); result.push(chunk_id); let tokio_handle = self.tokio_handle.clone(); - let is_new = tokio_handle.block_on(async { + let is_new = tokio_handle.block_on(async_backtrace::frame!(async { Ok::( self.cache_conn.is_chunk_new(chunk_id).await? && self.new_unflushed_chunks.insert(chunk_id), ) - })?; + }))?; if is_new { let compressed_bytes = self.compressor.compress(&chunk_bytes)?; - tokio_handle.block_on(async { + tokio_handle.block_on(async_backtrace::frame!(async { let writer = self.obtain_bloblog_writer(slot).await?; writer.write_chunk(chunk_id, &compressed_bytes).await?; Ok::<(), eyre::Report>(()) - })?; + }))?; } Ok(()) } - #[async_backtrace::framed] fn store_full_slice_returning_chunks( &mut self, store_slice: &[u8], slot: &mut Option>>>, ) -> eyre::Result> { - task::block_in_place(|| { - let mut result = Vec::new(); + let mut result = Vec::new(); - for chunk in FastCDC::new(store_slice, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) { - let chunk_bytes = &store_slice[chunk.offset..chunk.offset + chunk.length]; - self.process_chunk(chunk_bytes, &mut result, slot)? - } + for chunk in FastCDC::new(store_slice, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) { + let chunk_bytes = &store_slice[chunk.offset..chunk.offset + chunk.length]; + self.process_chunk(chunk_bytes, &mut result, slot)? + } - if result.is_empty() { - self.process_chunk(&[], &mut result, slot)?; - } + if result.is_empty() { + self.process_chunk(&[], &mut result, slot)?; + } - Ok(result) - }) + Ok(result) } - #[async_backtrace::framed] fn store_full_stream_returning_chunks( &mut self, store_stream: impl Read, slot: &mut Option>>>, ) -> eyre::Result<(Vec, u64)> { - task::block_in_place(|| { - let mut stream_length = 0u64; - let mut result = Vec::new(); - for chunk in StreamCDC::new(store_stream, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) { - let chunk = chunk.context("failed to read in for StreamCDC")?; - let chunk_bytes = chunk.data.as_slice(); - stream_length += chunk_bytes.len() as u64; - self.process_chunk(chunk_bytes, &mut result, slot)?; - } + let mut stream_length = 0u64; + let mut result = Vec::new(); + for chunk in StreamCDC::new(store_stream, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) { + let chunk = chunk.context("failed to read in for StreamCDC")?; + let chunk_bytes = chunk.data.as_slice(); + stream_length += chunk_bytes.len() as u64; + self.process_chunk(chunk_bytes, &mut result, slot)?; + } - if result.is_empty() { - self.process_chunk(&[], &mut result, slot)?; - } + if result.is_empty() { + self.process_chunk(&[], &mut result, slot)?; + } - Ok((result, stream_length)) - }) + Ok((result, stream_length)) } - #[async_backtrace::framed] pub fn store_full_slice( &mut self, store_slice: &[u8], @@ -353,6 +344,7 @@ fn get_zstd_level() -> i32 { } impl StoragePipeline { + #[async_backtrace::framed] pub async fn launch_new( workers: u32, pwc: Arc>, @@ -415,6 +407,7 @@ impl StoragePipeline { } /// Must be sure that all results have been collected first. + #[async_backtrace::framed] pub async fn finish_into_chunkmaps( mut self, ) -> eyre::Result> { @@ -505,6 +498,7 @@ async fn write_indices( Ok(()) } +#[async_backtrace::framed] pub async fn assemble_and_write_indices( pwc: &PileWithCache, chunkmap: BTreeMap, diff --git a/yama_pile/src/bloblogs.rs b/yama_pile/src/bloblogs.rs index 5d0405f..0a37247 100644 --- a/yama_pile/src/bloblogs.rs +++ b/yama_pile/src/bloblogs.rs @@ -32,6 +32,7 @@ pub struct BloblogWriter { impl BloblogWriter { /// Creates a bloblog writer. + #[async_backtrace::framed] pub async fn new( worm_writer: WFW, keyring: &Keyring, @@ -67,6 +68,7 @@ impl BloblogWriter { } /// Adds a chunk to the bloblog. + #[async_backtrace::framed] pub async fn write_chunk(&mut self, chunk_id: ChunkId, chunk: &[u8]) -> eyre::Result<()> { let locator = BlobLocator { offset: self.writer.offset(), @@ -93,6 +95,7 @@ impl BloblogWriter { /// - flushes /// - finishes computing the hash of the file /// - moves the bloblog to the correct place + #[async_backtrace::framed] pub async fn finish( mut self, ) -> eyre::Result<(WormPathBuf, BloblogId, BTreeMap)> { diff --git a/yama_wormfile_sftp/src/lib.rs b/yama_wormfile_sftp/src/lib.rs index 2bae46f..5423f02 100644 --- a/yama_wormfile_sftp/src/lib.rs +++ b/yama_wormfile_sftp/src/lib.rs @@ -195,6 +195,7 @@ impl WormFileProvider for SftpWormFilesystem { type WormFileReader = SftpWormReader; type WormFileWriter = SftpWormWriter; + #[async_backtrace::framed] async fn is_dir(&self, path: impl AsRef + Send) -> eyre::Result { let path = path.as_ref().as_str(); let mut fs = self.get_fs(); @@ -205,6 +206,7 @@ impl WormFileProvider for SftpWormFilesystem { } } + #[async_backtrace::framed] async fn is_regular_file(&self, path: impl AsRef + Send) -> eyre::Result { let path = path.as_ref().as_str(); let mut fs = self.get_fs(); @@ -215,6 +217,7 @@ impl WormFileProvider for SftpWormFilesystem { } } + #[async_backtrace::framed] async fn list(&self, path: impl AsRef + Send) -> eyre::Result> { let worm_path = path.as_ref(); let path = worm_path.as_str(); @@ -250,6 +253,7 @@ impl WormFileProvider for SftpWormFilesystem { .collect()) } + #[async_backtrace::framed] async fn list_meta( &self, path: impl AsRef + Send, @@ -293,6 +297,7 @@ impl WormFileProvider for SftpWormFilesystem { .collect()) } + #[async_backtrace::framed] async fn read(&self, path: impl AsRef + Send) -> eyre::Result { let real_path = self.root_dir.join(path.as_ref().as_str()); @@ -313,6 +318,7 @@ impl WormFileProvider for SftpWormFilesystem { }) } + #[async_backtrace::framed] async fn write(&self) -> eyre::Result { if !self .is_dir(WormPath::new("tmp").unwrap()) @@ -354,6 +360,7 @@ impl WormFileProvider for SftpWormFilesystem { } } + #[async_backtrace::framed] async fn delete(&self, path: impl AsRef + Send) -> eyre::Result<()> { let worm_path = path.as_ref(); let path = worm_path.as_str(); @@ -456,6 +463,7 @@ impl AsyncWrite for SftpWormWriter { #[async_trait] impl WormFileWriter for SftpWormWriter { + #[async_backtrace::framed] async fn finalise(&mut self, target_path: &WormPath, replace: bool) -> io::Result<()> { debug!("finalising SFTP file to {target_path:?}"); self.flush().await?;