Prevent duplicate chunk insertions in new pipeline
All checks were successful
continuous-integration/drone the build was successful
All checks were successful
continuous-integration/drone the build was successful
This commit is contained in:
parent
a37acf3e74
commit
bf150d61d8
@ -87,10 +87,12 @@ pub fn existence_checker_stage<RP: RawPile>(
|
||||
pile: Arc<Pile<RP>>,
|
||||
next_stage: Sender<(ChunkId, Vec<u8>)>,
|
||||
) -> Sender<(ChunkId, Vec<u8>)> {
|
||||
let shared_seen_set: Arc<Mutex<HashSet<ChunkId>>> = Default::default();
|
||||
let (tx, rx) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(32);
|
||||
|
||||
// TODO would like something better for the networked case
|
||||
for _ in 0..get_number_of_workers("YAMA_EXISTENCE_CHECKERS") {
|
||||
let shared_seen_set = shared_seen_set.clone();
|
||||
let next_stage = next_stage.clone();
|
||||
let rx = rx.clone();
|
||||
let pile = pile.clone();
|
||||
@ -99,6 +101,10 @@ pub fn existence_checker_stage<RP: RawPile>(
|
||||
.spawn(move || {
|
||||
while let Ok((chunk_id, chunk)) = rx.recv() {
|
||||
// TODO handle errors properly
|
||||
let is_new = { shared_seen_set.lock().unwrap().insert(chunk_id) };
|
||||
if !is_new {
|
||||
continue;
|
||||
}
|
||||
if !pile.chunk_exists(&chunk_id).unwrap() {
|
||||
next_stage.send((chunk_id, chunk)).unwrap();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user