Don't use mmap for storing due to concurrency bug scare

This commit is contained in:
Olivier 'reivilibre' 2023-08-15 19:53:28 +01:00
parent d07351d465
commit 6f0e3de350
1 changed files with 30 additions and 4 deletions

View File

@ -280,7 +280,17 @@ impl StoringState {
/// Stores a file, returning Ok(Some(...)) if fine, Ok(None) if the file doesn't exist (vanished) /// Stores a file, returning Ok(Some(...)) if fine, Ok(None) if the file doesn't exist (vanished)
/// or Err(...) for any other error. /// or Err(...) for any other error.
fn store_file_blocking( ///
/// WARNING! This memory-maps the file and should NOT be used on files that are being written to
/// by other applications. If the underlying data changes during storage, it can cause issues with
/// Zstd (and presumably can also cause the chunk hashes to be invalid).
///
/// Further, I have had issues with this seeming to 'use' a lot of memory. Whilst it should only
/// be virtual memory, for some reason it seems to cause swap to be used and it makes diagnosis
/// of REAL memory issues much harder.
/// For that reason it is hard to recommend this approach for now.
#[allow(dead_code)]
fn store_file_mmap_blocking(
file_path: &Path, file_path: &Path,
storing_state: &mut StoringState, storing_state: &mut StoringState,
sbw: &mut StoringBloblogWriters, sbw: &mut StoringBloblogWriters,
@ -301,6 +311,24 @@ fn store_file_blocking(
Ok(Some((chunkref, size_of_file as u64))) Ok(Some((chunkref, size_of_file as u64)))
} }
fn store_file_non_mmap_blocking(
file_path: &Path,
storing_state: &mut StoringState,
sbw: &mut StoringBloblogWriters,
) -> eyre::Result<Option<(RecursiveChunkRef, u64)>> {
let file = match File::open(file_path) {
Ok(file) => file,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
warn!("file vanished: {file_path:?}");
return Ok(None);
}
Err(other) => {
bail!("error storing {file_path:?}: {other:?}");
}
};
storing_state.store_full_stream(file, sbw).map(Some)
}
pub struct StoragePipeline<JobName> { pub struct StoragePipeline<JobName> {
result_rx: Receiver<(JobName, Option<(RecursiveChunkRef, u64)>)>, result_rx: Receiver<(JobName, Option<(RecursiveChunkRef, u64)>)>,
join_set: Vec<JoinHandle<eyre::Result<StoringIntermediate>>>, join_set: Vec<JoinHandle<eyre::Result<StoringIntermediate>>>,
@ -321,11 +349,9 @@ fn storage_pipeline_worker_blocking<JobName: Debug>(
let span = info_span!("store_file", file=?file_path); let span = info_span!("store_file", file=?file_path);
let _span_entered = span.enter(); let _span_entered = span.enter();
// debug!("SPW job {job_id:?}");
let file_store_opt = let file_store_opt =
store_file_blocking(&file_path, &mut storing_state, &mut bloblog_writers) store_file_non_mmap_blocking(&file_path, &mut storing_state, &mut bloblog_writers)
.with_context(|| format!("failed to store {file_path:?}"))?; .with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx.send((job_id, file_store_opt)) { if let Err(SendError(to_be_sent)) = result_tx.send((job_id, file_store_opt)) {
bail!("Can't return result for {to_be_sent:?} — result_tx shut down."); bail!("Can't return result for {to_be_sent:?} — result_tx shut down.");
} }