From bf150d61d896282fe864bb896cb942e0eda59b0d Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 21 Nov 2021 12:17:37 +0000 Subject: [PATCH] Prevent duplicate chunk insertions in new pipeline --- yama/src/pile.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/yama/src/pile.rs b/yama/src/pile.rs index 2bc58db..544b667 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -87,10 +87,12 @@ pub fn existence_checker_stage( pile: Arc>, next_stage: Sender<(ChunkId, Vec)>, ) -> Sender<(ChunkId, Vec)> { + let shared_seen_set: Arc>> = Default::default(); let (tx, rx) = crossbeam_channel::bounded::<(ChunkId, Vec)>(32); // TODO would like something better for the networked case for _ in 0..get_number_of_workers("YAMA_EXISTENCE_CHECKERS") { + let shared_seen_set = shared_seen_set.clone(); let next_stage = next_stage.clone(); let rx = rx.clone(); let pile = pile.clone(); @@ -99,6 +101,10 @@ pub fn existence_checker_stage( .spawn(move || { while let Ok((chunk_id, chunk)) = rx.recv() { // TODO handle errors properly + let is_new = { shared_seen_set.lock().unwrap().insert(chunk_id) }; + if !is_new { + continue; + } if !pile.chunk_exists(&chunk_id).unwrap() { next_stage.send((chunk_id, chunk)).unwrap(); }