Introduce Option<> on file store entries

This commit is contained in:
Olivier 'reivilibre' 2023-05-26 23:30:14 +01:00
parent 2b4608e06b
commit 32e514bd2e
5 changed files with 25 additions and 27 deletions

View File

@ -254,7 +254,7 @@ async fn scan_dir_sources(
struct BackupDirSourcesReturn { struct BackupDirSourcesReturn {
pub chunkmaps: BTreeMap<BloblogId, IndexBloblogEntry>, pub chunkmaps: BTreeMap<BloblogId, IndexBloblogEntry>,
pub dir_source_returns: Vec<(DirSourcePrep, PatriciaMap<(RecursiveChunkRef, u64)>)>, pub dir_source_returns: Vec<(DirSourcePrep, PatriciaMap<Option<(RecursiveChunkRef, u64)>>)>,
} }
async fn backup_dir_sources( async fn backup_dir_sources(
@ -276,7 +276,7 @@ async fn backup_dir_sources(
(cfm, Cow::Owned(pruned)) (cfm, Cow::Owned(pruned))
} else { } else {
( (
PatriciaMap::<(RecursiveChunkRef, u64)>::new(), PatriciaMap::<Option<(RecursiveChunkRef, u64)>>::new(),
Cow::Borrowed(&dir_source.scan_entry_map), Cow::Borrowed(&dir_source.scan_entry_map),
) )
}; };
@ -337,10 +337,9 @@ async fn backup_dir_sources(
Ok::<_, eyre::Report>(()) Ok::<_, eyre::Report>(())
}, },
async { async {
while let Ok(((dir_source_idx, job_id), rec_chunk_ref, real_size)) = while let Ok(((dir_source_idx, job_id), file_store_opt)) = pipeline.next_result().await
pipeline.next_result().await
{ {
chunk_file_maps[dir_source_idx].insert(&job_id, (rec_chunk_ref, real_size)); chunk_file_maps[dir_source_idx].insert(&job_id, file_store_opt);
completed += 1; completed += 1;
Span::current().pb_inc(1); Span::current().pb_inc(1);
} }

View File

@ -437,7 +437,7 @@ async fn main() -> eyre::Result<()> {
(cfm, Cow::Owned(pruned)) (cfm, Cow::Owned(pruned))
} else { } else {
( (
PatriciaMap::<(RecursiveChunkRef, u64)>::new(), PatriciaMap::<Option<(RecursiveChunkRef, u64)>>::new(),
Cow::Borrowed(&scan_entry_map), Cow::Borrowed(&scan_entry_map),
) )
}; };
@ -485,9 +485,8 @@ async fn main() -> eyre::Result<()> {
Ok::<_, eyre::Report>(()) Ok::<_, eyre::Report>(())
}, },
async { async {
while let Ok((job_id, rec_chunk_ref, real_size)) = pipeline.next_result().await while let Ok((job_id, file_store_opt)) = pipeline.next_result().await {
{ chunk_file_map.insert(&job_id, file_store_opt);
chunk_file_map.insert(&job_id, (rec_chunk_ref, real_size));
Span::current().pb_inc(1); Span::current().pb_inc(1);
} }
// eprintln!("fin rec"); // eprintln!("fin rec");

View File

@ -224,7 +224,7 @@ pub fn prepopulate_unmodified(
parent_tree: &TreeNode, parent_tree: &TreeNode,
scan_entry_map: &PatriciaMap<ScanEntry>, scan_entry_map: &PatriciaMap<ScanEntry>,
) -> ( ) -> (
PatriciaMap<(RecursiveChunkRef, u64)>, PatriciaMap<Option<(RecursiveChunkRef, u64)>>,
PatriciaMap<ScanEntry>, PatriciaMap<ScanEntry>,
) { ) {
let mut reusable_chunkings = PatriciaMap::new(); let mut reusable_chunkings = PatriciaMap::new();
@ -253,7 +253,7 @@ pub fn prepopulate_unmodified(
&& prev_permissions == permissions && prev_permissions == permissions
{ {
// Nothing seems to have changed about this file, let's just reuse the `content` from last time. // Nothing seems to have changed about this file, let's just reuse the `content` from last time.
reusable_chunkings.insert(path, (*prev_content, *size)); reusable_chunkings.insert(path, Some((*prev_content, *size)));
pruned_scan_entry_map.remove(path); pruned_scan_entry_map.remove(path);
} }
} }

View File

@ -279,22 +279,22 @@ async fn store_file(
file_path: &Path, file_path: &Path,
storing_state: &mut StoringState, storing_state: &mut StoringState,
sbw: &mut StoringBloblogWriters, sbw: &mut StoringBloblogWriters,
) -> eyre::Result<(RecursiveChunkRef, u64)> { ) -> eyre::Result<Option<(RecursiveChunkRef, u64)>> {
let file = File::open(file_path).await?.into_std().await; let file = File::open(file_path).await?.into_std().await;
let mapped = unsafe { memmap2::Mmap::map(&file) }?; let mapped = unsafe { memmap2::Mmap::map(&file) }?;
let size_of_file = mapped.as_ref().len(); let size_of_file = mapped.as_ref().len();
let chunkref = storing_state.store_full_slice(mapped.as_ref(), sbw)?; let chunkref = storing_state.store_full_slice(mapped.as_ref(), sbw)?;
Ok((chunkref, size_of_file as u64)) Ok(Some((chunkref, size_of_file as u64)))
} }
pub struct StoragePipeline<JobName> { pub struct StoragePipeline<JobName> {
result_rx: Receiver<(JobName, RecursiveChunkRef, u64)>, result_rx: Receiver<(JobName, Option<(RecursiveChunkRef, u64)>)>,
join_set: JoinSet<eyre::Result<StoringIntermediate>>, join_set: JoinSet<eyre::Result<StoringIntermediate>>,
} }
async fn storage_pipeline_worker<JobName: Debug>( async fn storage_pipeline_worker<JobName: Debug>(
job_rx: Receiver<(JobName, PathBuf)>, job_rx: Receiver<(JobName, PathBuf)>,
result_tx: Sender<(JobName, RecursiveChunkRef, u64)>, result_tx: Sender<(JobName, Option<(RecursiveChunkRef, u64)>)>,
mut storing_state: StoringState, mut storing_state: StoringState,
) -> eyre::Result<StoringIntermediate> { ) -> eyre::Result<StoringIntermediate> {
let mut bloblog_writers = StoringBloblogWriters::default(); let mut bloblog_writers = StoringBloblogWriters::default();
@ -306,14 +306,11 @@ async fn storage_pipeline_worker<JobName: Debug>(
async { async {
// debug!("SPW job {job_id:?}"); // debug!("SPW job {job_id:?}");
let (rec_chunk_ref, file_length) = let file_store_opt = store_file(&file_path, &mut storing_state, &mut bloblog_writers)
store_file(&file_path, &mut storing_state, &mut bloblog_writers)
.await
.with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx
.send_async((job_id, rec_chunk_ref, file_length))
.await .await
.with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx.send_async((job_id, file_store_opt)).await
{ {
bail!("Can't return result for {to_be_sent:?} — result_tx shut down."); bail!("Can't return result for {to_be_sent:?} — result_tx shut down.");
} }
@ -375,7 +372,9 @@ impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
} }
#[inline] #[inline]
pub async fn next_result(&self) -> Result<(JobName, RecursiveChunkRef, u64), RecvError> { pub async fn next_result(
&self,
) -> Result<(JobName, Option<(RecursiveChunkRef, u64)>), RecvError> {
self.result_rx.recv_async().await self.result_rx.recv_async().await
} }

View File

@ -392,7 +392,7 @@ pub fn integrate_node_in_place(new: &mut TreeNode, old: &TreeNode) {
pub fn assemble_tree_from_scan_entries( pub fn assemble_tree_from_scan_entries(
scan: PatriciaMap<ScanEntry>, scan: PatriciaMap<ScanEntry>,
mut chunkings: PatriciaMap<(RecursiveChunkRef, u64)>, mut chunkings: PatriciaMap<Option<(RecursiveChunkRef, u64)>>,
) -> eyre::Result<TreeNode> { ) -> eyre::Result<TreeNode> {
let mut dirs: BTreeMap<String, BTreeMap<String, TreeNode>> = BTreeMap::new(); let mut dirs: BTreeMap<String, BTreeMap<String, TreeNode>> = BTreeMap::new();
// special-case the root ("") // special-case the root ("")
@ -409,9 +409,10 @@ pub fn assemble_tree_from_scan_entries(
permissions, permissions,
size: _unverified_size_ignore, size: _unverified_size_ignore,
} => { } => {
let (content, size) = chunkings.remove(&key_string).with_context(|| { let (content, size) = chunkings
format!("bad chunkings PMap: missing entry: {key_string:?}") .remove(&key_string)
})?; .with_context(|| format!("bad chunkings PMap: missing entry: {key_string:?}"))?
.unwrap(); // TODO
// note: for the root, this inserts the root file entry as a child called "" within a fake root 'directory'. // note: for the root, this inserts the root file entry as a child called "" within a fake root 'directory'.
// That's fine. We'll patch this up later. // That's fine. We'll patch this up later.