Convert storage_pipeline_worker to blocking

This commit is contained in:
Olivier 'reivilibre' 2023-08-13 16:54:23 +01:00
parent 96deadd270
commit 27c5f09d0d

View File

@ -1,6 +1,6 @@
use crate::pile_with_cache::PileWithCache;
use dashmap::DashSet;
use eyre::{bail, Context};
use eyre::{bail, eyre, Context};
use fastcdc::v2020::{FastCDC, StreamCDC};
use flume::{Receiver, RecvError, SendError, Sender};
use std::cmp::Reverse;
@ -11,11 +11,11 @@ use std::io::Read;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::thread::JoinHandle;
use tokio::fs::File;
use tokio::runtime::Handle;
use tokio::task;
use tokio::task::JoinSet;
use tracing::{debug, error, info_span, warn, Instrument};
use tracing::{debug, error, info_span, warn};
use yama_localcache::StoreConnection;
use yama_midlevel_crypto::chunk_id::{ChunkId, ChunkIdKey};
use yama_pile::bloblogs::BloblogWriter;
@ -308,40 +308,40 @@ async fn store_file(
pub struct StoragePipeline<JobName> {
result_rx: Receiver<(JobName, Option<(RecursiveChunkRef, u64)>)>,
join_set: JoinSet<eyre::Result<StoringIntermediate>>,
join_set: Vec<JoinHandle<eyre::Result<StoringIntermediate>>>,
}
#[async_backtrace::framed]
async fn storage_pipeline_worker<JobName: Debug>(
fn storage_pipeline_worker_blocking<JobName: Debug>(
job_rx: Receiver<(JobName, PathBuf)>,
result_tx: Sender<(JobName, Option<(RecursiveChunkRef, u64)>)>,
mut storing_state: StoringState,
tokio_handle: Handle,
) -> eyre::Result<StoringIntermediate> {
let mut bloblog_writers = StoringBloblogWriters::default();
debug!("SPW startup");
while let Ok((job_id, file_path)) = job_rx.recv_async().await {
while let Ok((job_id, file_path)) = job_rx.recv() {
// TODO(span): is this correctly a child of the parent span?
let span = info_span!("store_file", file=?file_path);
let _span_entered = span.enter();
async {
// debug!("SPW job {job_id:?}");
let file_store_opt = store_file(&file_path, &mut storing_state, &mut bloblog_writers)
.await
.with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx.send_async((job_id, file_store_opt)).await
{
bail!("Can't return result for {to_be_sent:?} — result_tx shut down.");
}
Ok(())
// debug!("SPW job {job_id:?}");
let file_store_opt = tokio_handle
.block_on(store_file(
&file_path,
&mut storing_state,
&mut bloblog_writers,
))
.with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx.send((job_id, file_store_opt)) {
bail!("Can't return result for {to_be_sent:?} — result_tx shut down.");
}
.instrument(span)
.await?
}
debug!("finishing bloblogs");
bloblog_writers.finish_bloblogs(&mut storing_state).await?;
tokio_handle.block_on(bloblog_writers.finish_bloblogs(&mut storing_state))?;
debug!("finished bloblogs!");
Ok(StoringIntermediate::from(storing_state))
@ -361,7 +361,7 @@ impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
let (job_tx, job_rx) = flume::bounded(16);
let (result_tx, result_rx) = flume::bounded(4);
let mut join_set = JoinSet::new();
let mut join_set = Vec::new();
for spw_num in 0..workers {
let job_rx = job_rx.clone();
let result_tx = result_tx.clone();
@ -370,14 +370,26 @@ impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
.context("failed to create storing state")?;
// make a logging span for the Storage Pipeline Workers
let spw_span = info_span!("spw", n = spw_num);
join_set.spawn(async_backtrace::frame!(async move {
let result = storage_pipeline_worker(job_rx, result_tx, storing_state).await;
if let Err(ref err) = result {
error!("Error in SPW {err:?}");
}
result
}
.instrument(spw_span)));
let tokio_handle = Handle::current();
let thread = std::thread::Builder::new()
.name(format!("spw-{spw_num}"))
.spawn(move || {
let _spw_span_entered = spw_span.enter();
let result = storage_pipeline_worker_blocking(
job_rx,
result_tx,
storing_state,
tokio_handle,
);
if let Err(ref err) = result {
error!("Error in SPW {err:?}");
}
result
})
.expect("failed to spawn SPW thread!");
join_set.push(thread);
}
Ok((
@ -406,7 +418,9 @@ impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
let mut chunkmap = BTreeMap::new();
while let Some(join_resres) = self.join_set.join_next().await {
while let Some(thread) = self.join_set.pop() {
// TODO(blocking on async thread): do this differently.
let join_resres = thread.join().map_err(|panic_err| eyre!("{panic_err:?}"));
chunkmap.extend(join_resres??.new_bloblogs.into_iter().map(|(k, nb)| {
(
k,