Add some more async_backtrace tracking and remove redundant block_in_places

This commit is contained in:
Olivier 'reivilibre' 2023-08-13 17:24:09 +01:00
parent 6434190cf1
commit e9c4400ea5
4 changed files with 40 additions and 34 deletions

View File

@ -318,6 +318,7 @@ struct BackupDirSourcesReturn {
pub dir_source_returns: Vec<(DirSourcePrep, PatriciaMap<Option<(RecursiveChunkRef, u64)>>)>,
}
#[async_backtrace::framed]
async fn backup_dir_sources(
mut dir_sources: Vec<DirSourcePrep>,
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,

View File

@ -14,7 +14,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::thread::JoinHandle;
use tokio::runtime::Handle;
use tokio::task;
use tracing::{debug, error, info_span, warn};
use yama_localcache::StoreConnection;
use yama_midlevel_crypto::chunk_id::{ChunkId, ChunkIdKey};
@ -141,7 +140,6 @@ impl StoringState {
}
/// For internal use only.
#[async_backtrace::framed]
fn process_chunk(
&mut self,
chunk_bytes: &[u8],
@ -151,73 +149,66 @@ impl StoringState {
let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key);
result.push(chunk_id);
let tokio_handle = self.tokio_handle.clone();
let is_new = tokio_handle.block_on(async {
let is_new = tokio_handle.block_on(async_backtrace::frame!(async {
Ok::<bool, eyre::Report>(
self.cache_conn.is_chunk_new(chunk_id).await?
&& self.new_unflushed_chunks.insert(chunk_id),
)
})?;
}))?;
if is_new {
let compressed_bytes = self.compressor.compress(&chunk_bytes)?;
tokio_handle.block_on(async {
tokio_handle.block_on(async_backtrace::frame!(async {
let writer = self.obtain_bloblog_writer(slot).await?;
writer.write_chunk(chunk_id, &compressed_bytes).await?;
Ok::<(), eyre::Report>(())
})?;
}))?;
}
Ok(())
}
#[async_backtrace::framed]
fn store_full_slice_returning_chunks(
&mut self,
store_slice: &[u8],
slot: &mut Option<BloblogWriter<Pin<Box<dyn WormFileWriter>>>>,
) -> eyre::Result<Vec<ChunkId>> {
task::block_in_place(|| {
let mut result = Vec::new();
let mut result = Vec::new();
for chunk in FastCDC::new(store_slice, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) {
let chunk_bytes = &store_slice[chunk.offset..chunk.offset + chunk.length];
self.process_chunk(chunk_bytes, &mut result, slot)?
}
for chunk in FastCDC::new(store_slice, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) {
let chunk_bytes = &store_slice[chunk.offset..chunk.offset + chunk.length];
self.process_chunk(chunk_bytes, &mut result, slot)?
}
if result.is_empty() {
self.process_chunk(&[], &mut result, slot)?;
}
if result.is_empty() {
self.process_chunk(&[], &mut result, slot)?;
}
Ok(result)
})
Ok(result)
}
#[async_backtrace::framed]
fn store_full_stream_returning_chunks(
&mut self,
store_stream: impl Read,
slot: &mut Option<BloblogWriter<Pin<Box<dyn WormFileWriter>>>>,
) -> eyre::Result<(Vec<ChunkId>, u64)> {
task::block_in_place(|| {
let mut stream_length = 0u64;
let mut result = Vec::new();
for chunk in StreamCDC::new(store_stream, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) {
let chunk = chunk.context("failed to read in for StreamCDC")?;
let chunk_bytes = chunk.data.as_slice();
stream_length += chunk_bytes.len() as u64;
self.process_chunk(chunk_bytes, &mut result, slot)?;
}
let mut stream_length = 0u64;
let mut result = Vec::new();
for chunk in StreamCDC::new(store_stream, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) {
let chunk = chunk.context("failed to read in for StreamCDC")?;
let chunk_bytes = chunk.data.as_slice();
stream_length += chunk_bytes.len() as u64;
self.process_chunk(chunk_bytes, &mut result, slot)?;
}
if result.is_empty() {
self.process_chunk(&[], &mut result, slot)?;
}
if result.is_empty() {
self.process_chunk(&[], &mut result, slot)?;
}
Ok((result, stream_length))
})
Ok((result, stream_length))
}
#[async_backtrace::framed]
pub fn store_full_slice(
&mut self,
store_slice: &[u8],
@ -353,6 +344,7 @@ fn get_zstd_level() -> i32 {
}
impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
#[async_backtrace::framed]
pub async fn launch_new(
workers: u32,
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
@ -415,6 +407,7 @@ impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
}
/// Must be sure that all results have been collected first.
#[async_backtrace::framed]
pub async fn finish_into_chunkmaps(
mut self,
) -> eyre::Result<BTreeMap<BloblogId, IndexBloblogEntry>> {
@ -505,6 +498,7 @@ async fn write_indices(
Ok(())
}
#[async_backtrace::framed]
pub async fn assemble_and_write_indices(
pwc: &PileWithCache<BoxedWormFileProvider>,
chunkmap: BTreeMap<BloblogId, IndexBloblogEntry>,

View File

@ -32,6 +32,7 @@ pub struct BloblogWriter<W: WormFileWriter + Unpin> {
impl<WFW: WormFileWriter + Unpin> BloblogWriter<WFW> {
/// Creates a bloblog writer.
#[async_backtrace::framed]
pub async fn new(
worm_writer: WFW,
keyring: &Keyring,
@ -67,6 +68,7 @@ impl<WFW: WormFileWriter + Unpin> BloblogWriter<WFW> {
}
/// Adds a chunk to the bloblog.
#[async_backtrace::framed]
pub async fn write_chunk(&mut self, chunk_id: ChunkId, chunk: &[u8]) -> eyre::Result<()> {
let locator = BlobLocator {
offset: self.writer.offset(),
@ -93,6 +95,7 @@ impl<WFW: WormFileWriter + Unpin> BloblogWriter<WFW> {
/// - flushes
/// - finishes computing the hash of the file
/// - moves the bloblog to the correct place
#[async_backtrace::framed]
pub async fn finish(
mut self,
) -> eyre::Result<(WormPathBuf, BloblogId, BTreeMap<ChunkId, BlobLocator>)> {

View File

@ -195,6 +195,7 @@ impl WormFileProvider for SftpWormFilesystem {
type WormFileReader = SftpWormReader;
type WormFileWriter = SftpWormWriter;
#[async_backtrace::framed]
async fn is_dir(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<bool> {
let path = path.as_ref().as_str();
let mut fs = self.get_fs();
@ -205,6 +206,7 @@ impl WormFileProvider for SftpWormFilesystem {
}
}
#[async_backtrace::framed]
async fn is_regular_file(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<bool> {
let path = path.as_ref().as_str();
let mut fs = self.get_fs();
@ -215,6 +217,7 @@ impl WormFileProvider for SftpWormFilesystem {
}
}
#[async_backtrace::framed]
async fn list(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<Vec<WormPathBuf>> {
let worm_path = path.as_ref();
let path = worm_path.as_str();
@ -250,6 +253,7 @@ impl WormFileProvider for SftpWormFilesystem {
.collect())
}
#[async_backtrace::framed]
async fn list_meta(
&self,
path: impl AsRef<WormPath> + Send,
@ -293,6 +297,7 @@ impl WormFileProvider for SftpWormFilesystem {
.collect())
}
#[async_backtrace::framed]
async fn read(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<Self::WormFileReader> {
let real_path = self.root_dir.join(path.as_ref().as_str());
@ -313,6 +318,7 @@ impl WormFileProvider for SftpWormFilesystem {
})
}
#[async_backtrace::framed]
async fn write(&self) -> eyre::Result<Self::WormFileWriter> {
if !self
.is_dir(WormPath::new("tmp").unwrap())
@ -354,6 +360,7 @@ impl WormFileProvider for SftpWormFilesystem {
}
}
#[async_backtrace::framed]
async fn delete(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<()> {
let worm_path = path.as_ref();
let path = worm_path.as_str();
@ -456,6 +463,7 @@ impl AsyncWrite for SftpWormWriter {
#[async_trait]
impl WormFileWriter for SftpWormWriter {
#[async_backtrace::framed]
async fn finalise(&mut self, target_path: &WormPath, replace: bool) -> io::Result<()> {
debug!("finalising SFTP file to {target_path:?}");
self.flush().await?;