Convert store_file to blocking

This commit is contained in:
Olivier 'reivilibre' 2023-08-13 16:59:57 +01:00
parent 27c5f09d0d
commit 6434190cf1
3 changed files with 31 additions and 22 deletions

View File

@ -13,6 +13,7 @@ use std::path::PathBuf;
use std::process::{Child, Command, Stdio}; use std::process::{Child, Command, Stdio};
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::runtime::Handle;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tracing::{debug, info, info_span, Instrument, Span}; use tracing::{debug, info, info_span, Instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt; use tracing_indicatif::span_ext::IndicatifSpanExt;
@ -461,7 +462,7 @@ async fn backup_virtual_source(
bail!("bug: non-VS SDI passed to BVS"); bail!("bug: non-VS SDI passed to BVS");
}; };
let mut storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks) let mut storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks, Handle::current())
.await .await
.context("failed to create storing state")?; .context("failed to create storing state")?;
let mut sbw = StoringBloblogWriters::default(); let mut sbw = StoringBloblogWriters::default();

View File

@ -26,6 +26,7 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::{stdin, AsyncBufReadExt, BufReader}; use tokio::io::{stdin, AsyncBufReadExt, BufReader};
use tokio::runtime::Handle;
use tracing::{info, info_span, warn, Instrument, Span}; use tracing::{info, info_span, warn, Instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt; use tracing_indicatif::span_ext::IndicatifSpanExt;
use tracing_indicatif::IndicatifLayer; use tracing_indicatif::IndicatifLayer;
@ -596,9 +597,10 @@ async fn main() -> eyre::Result<()> {
let _store_span_entered = store_span.enter(); let _store_span_entered = store_span.enter();
let new_unflushed_chunks = Arc::new(Default::default()); let new_unflushed_chunks = Arc::new(Default::default());
let mut storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks) let mut storing_state =
.await StoringState::new(pwc.clone(), new_unflushed_chunks, Handle::current())
.context("failed to create storing state")?; .await
.context("failed to create storing state")?;
let mut sbw = StoringBloblogWriters::default(); let mut sbw = StoringBloblogWriters::default();
let stdin = std::io::BufReader::new( let stdin = std::io::BufReader::new(
io_streams::StreamReader::stdin().context("failed to open stdin")?, io_streams::StreamReader::stdin().context("failed to open stdin")?,

View File

@ -6,13 +6,13 @@ use flume::{Receiver, RecvError, SendError, Sender};
use std::cmp::Reverse; use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug; use std::fmt::Debug;
use std::fs::File;
use std::io; use std::io;
use std::io::Read; use std::io::Read;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use tokio::fs::File;
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::task; use tokio::task;
use tracing::{debug, error, info_span, warn}; use tracing::{debug, error, info_span, warn};
@ -46,12 +46,15 @@ pub struct StoringState {
pub chunk_id_key: ChunkIdKey, pub chunk_id_key: ChunkIdKey,
pub compressor: zstd::bulk::Compressor<'static>, pub compressor: zstd::bulk::Compressor<'static>,
pub tokio_handle: Handle,
} }
impl StoringState { impl StoringState {
pub async fn new( pub async fn new(
pwc: Arc<PileWithCache<BoxedWormFileProvider>>, pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
new_unflushed_chunks: Arc<DashSet<ChunkId>>, new_unflushed_chunks: Arc<DashSet<ChunkId>>,
tokio_handle: Handle,
) -> eyre::Result<Self> { ) -> eyre::Result<Self> {
let compressor = match pwc.pile.pile_config.zstd_dict.as_ref() { let compressor = match pwc.pile.pile_config.zstd_dict.as_ref() {
None => { None => {
@ -69,6 +72,7 @@ impl StoringState {
pwc, pwc,
chunk_id_key, chunk_id_key,
compressor, compressor,
tokio_handle,
}) })
} }
} }
@ -146,7 +150,8 @@ impl StoringState {
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key); let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key);
result.push(chunk_id); result.push(chunk_id);
let is_new = Handle::current().block_on(async { let tokio_handle = self.tokio_handle.clone();
let is_new = tokio_handle.block_on(async {
Ok::<bool, eyre::Report>( Ok::<bool, eyre::Report>(
self.cache_conn.is_chunk_new(chunk_id).await? self.cache_conn.is_chunk_new(chunk_id).await?
&& self.new_unflushed_chunks.insert(chunk_id), && self.new_unflushed_chunks.insert(chunk_id),
@ -156,7 +161,7 @@ impl StoringState {
if is_new { if is_new {
let compressed_bytes = self.compressor.compress(&chunk_bytes)?; let compressed_bytes = self.compressor.compress(&chunk_bytes)?;
Handle::current().block_on(async { tokio_handle.block_on(async {
let writer = self.obtain_bloblog_writer(slot).await?; let writer = self.obtain_bloblog_writer(slot).await?;
writer.write_chunk(chunk_id, &compressed_bytes).await?; writer.write_chunk(chunk_id, &compressed_bytes).await?;
Ok::<(), eyre::Report>(()) Ok::<(), eyre::Report>(())
@ -284,14 +289,13 @@ impl StoringState {
/// Stores a file, returning Ok(Some(...)) if fine, Ok(None) if the file doesn't exist (vanished) /// Stores a file, returning Ok(Some(...)) if fine, Ok(None) if the file doesn't exist (vanished)
/// or Err(...) for any other error. /// or Err(...) for any other error.
#[async_backtrace::framed] fn store_file_blocking(
async fn store_file(
file_path: &Path, file_path: &Path,
storing_state: &mut StoringState, storing_state: &mut StoringState,
sbw: &mut StoringBloblogWriters, sbw: &mut StoringBloblogWriters,
) -> eyre::Result<Option<(RecursiveChunkRef, u64)>> { ) -> eyre::Result<Option<(RecursiveChunkRef, u64)>> {
let file = match File::open(file_path).await { let file = match File::open(file_path) {
Ok(file) => file.into_std().await, Ok(file) => file,
Err(err) if err.kind() == io::ErrorKind::NotFound => { Err(err) if err.kind() == io::ErrorKind::NotFound => {
warn!("file vanished: {file_path:?}"); warn!("file vanished: {file_path:?}");
return Ok(None); return Ok(None);
@ -327,13 +331,9 @@ fn storage_pipeline_worker_blocking<JobName: Debug>(
let _span_entered = span.enter(); let _span_entered = span.enter();
// debug!("SPW job {job_id:?}"); // debug!("SPW job {job_id:?}");
let file_store_opt = tokio_handle let file_store_opt =
.block_on(store_file( store_file_blocking(&file_path, &mut storing_state, &mut bloblog_writers)
&file_path, .with_context(|| format!("failed to store {file_path:?}"))?;
&mut storing_state,
&mut bloblog_writers,
))
.with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}"); // debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx.send((job_id, file_store_opt)) { if let Err(SendError(to_be_sent)) = result_tx.send((job_id, file_store_opt)) {
bail!("Can't return result for {to_be_sent:?} — result_tx shut down."); bail!("Can't return result for {to_be_sent:?} — result_tx shut down.");
@ -365,12 +365,18 @@ impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
for spw_num in 0..workers { for spw_num in 0..workers {
let job_rx = job_rx.clone(); let job_rx = job_rx.clone();
let result_tx = result_tx.clone(); let result_tx = result_tx.clone();
let storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks.clone())
.await let tokio_handle = Handle::current();
.context("failed to create storing state")?;
let storing_state = StoringState::new(
pwc.clone(),
new_unflushed_chunks.clone(),
tokio_handle.clone(),
)
.await
.context("failed to create storing state")?;
// make a logging span for the Storage Pipeline Workers // make a logging span for the Storage Pipeline Workers
let spw_span = info_span!("spw", n = spw_num); let spw_span = info_span!("spw", n = spw_num);
let tokio_handle = Handle::current();
let thread = std::thread::Builder::new() let thread = std::thread::Builder::new()
.name(format!("spw-{spw_num}")) .name(format!("spw-{spw_num}"))