From 6f0e3de350161aeab256c9180a22718a62646e1a Mon Sep 17 00:00:00 2001 From: Olivier Date: Tue, 15 Aug 2023 19:53:28 +0100 Subject: [PATCH] Don't use mmap for storing due to concurrency bug scare --- yama/src/storing.rs | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/yama/src/storing.rs b/yama/src/storing.rs index 28c173d..7f3c206 100644 --- a/yama/src/storing.rs +++ b/yama/src/storing.rs @@ -280,7 +280,17 @@ impl StoringState { /// Stores a file, returning Ok(Some(...)) if fine, Ok(None) if the file doesn't exist (vanished) /// or Err(...) for any other error. -fn store_file_blocking( +/// +/// WARNING! This memory-maps the file and should NOT be used on files that are being written to +/// by other applications. If the underlying data changes during storage, it can cause issues with +/// Zstd (and presumably can also cause the chunk hashes to be invalid). +/// +/// Further, I have had issues with this seeming to 'use' a lot of memory. Whilst it should only +/// be virtual memory, for some reason it seems to cause swap to be used and it makes diagnosis +/// of REAL memory issues much harder. +/// For that reason it is hard to recommend this approach for now. +#[allow(dead_code)] +fn store_file_mmap_blocking( file_path: &Path, storing_state: &mut StoringState, sbw: &mut StoringBloblogWriters, @@ -301,6 +311,24 @@ fn store_file_blocking( Ok(Some((chunkref, size_of_file as u64))) } +fn store_file_non_mmap_blocking( + file_path: &Path, + storing_state: &mut StoringState, + sbw: &mut StoringBloblogWriters, +) -> eyre::Result> { + let file = match File::open(file_path) { + Ok(file) => file, + Err(err) if err.kind() == io::ErrorKind::NotFound => { + warn!("file vanished: {file_path:?}"); + return Ok(None); + } + Err(other) => { + bail!("error storing {file_path:?}: {other:?}"); + } + }; + storing_state.store_full_stream(file, sbw).map(Some) +} + pub struct StoragePipeline { result_rx: Receiver<(JobName, Option<(RecursiveChunkRef, u64)>)>, join_set: Vec>>, @@ -321,11 +349,9 @@ fn storage_pipeline_worker_blocking( let span = info_span!("store_file", file=?file_path); let _span_entered = span.enter(); - // debug!("SPW job {job_id:?}"); let file_store_opt = - store_file_blocking(&file_path, &mut storing_state, &mut bloblog_writers) + store_file_non_mmap_blocking(&file_path, &mut storing_state, &mut bloblog_writers) .with_context(|| format!("failed to store {file_path:?}"))?; - // debug!("SPW good {job_id:?}"); if let Err(SendError(to_be_sent)) = result_tx.send((job_id, file_store_opt)) { bail!("Can't return result for {to_be_sent:?} — result_tx shut down."); }