From 8eeafa762654f353454e366a5efa4e31da078827 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 21 Nov 2021 11:57:14 +0000 Subject: [PATCH] Fix hang after storing --- Cargo.lock | 4 ++-- datman/Cargo.toml | 2 +- yama/Cargo.toml | 2 +- yama/src/operations/storing.rs | 2 ++ yama/src/pile.rs | 17 ++++++++++------- yama/src/pile/compression.rs | 33 ++++++++++++++++++--------------- yama/src/pile/integrity.rs | 21 ++++++++++++--------- 7 files changed, 46 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2afcd9e..9afe12e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -258,7 +258,7 @@ dependencies = [ [[package]] name = "datman" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" dependencies = [ "anyhow", "arc-interner", @@ -1218,7 +1218,7 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "yama" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" dependencies = [ "anyhow", "blake", diff --git a/datman/Cargo.toml b/datman/Cargo.toml index f1c0a50..4d2e3f5 100644 --- a/datman/Cargo.toml +++ b/datman/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "datman" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" authors = ["Olivier 'reivilibre' "] edition = "2018" repository = "https://bics.ga/reivilibre/yama" diff --git a/yama/Cargo.toml b/yama/Cargo.toml index c8b4225..bb09dc0 100644 --- a/yama/Cargo.toml +++ b/yama/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yama" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" authors = ["Olivier 'reivilibre' "] edition = "2018" description = "Deduplicated, compressed and encrypted content pile manager" diff --git a/yama/src/operations/storing.rs b/yama/src/operations/storing.rs index b788a9a..f9afa74 100644 --- a/yama/src/operations/storing.rs +++ b/yama/src/operations/storing.rs @@ -273,6 +273,8 @@ pub fn store_fully( progress_bar, num_workers, )?; + // must drop the pipeline to allow the threads to close + drop(pipeline); while let Ok(_) = control_rx.recv() { // TODO nothing for now. } diff --git a/yama/src/pile.rs b/yama/src/pile.rs index a2e2fb4..2bc58db 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -94,14 +94,17 @@ pub fn existence_checker_stage( let next_stage = next_stage.clone(); let rx = rx.clone(); let pile = pile.clone(); - std::thread::spawn(move || { - while let Ok((chunk_id, chunk)) = rx.recv() { - // TODO handle errors properly - if !pile.chunk_exists(&chunk_id).unwrap() { - next_stage.send((chunk_id, chunk)).unwrap(); + std::thread::Builder::new() + .name("yama exist?er".to_string()) + .spawn(move || { + while let Ok((chunk_id, chunk)) = rx.recv() { + // TODO handle errors properly + if !pile.chunk_exists(&chunk_id).unwrap() { + next_stage.send((chunk_id, chunk)).unwrap(); + } } - } - }); + }) + .unwrap(); } tx diff --git a/yama/src/pile/compression.rs b/yama/src/pile/compression.rs index b1afd9e..0675ea0 100644 --- a/yama/src/pile/compression.rs +++ b/yama/src/pile/compression.rs @@ -269,21 +269,24 @@ impl RawPile for RawPileCompressor { let receiver = receiver.clone(); let controller_send = controller_send.clone(); let this = (*self).clone(); - thread::spawn(move || { - let worker_id = Arc::new(format!("compressor-{}", compressor_number)); - if let Err(err) = this.storage_pipeline_worker( - subsequent_pipeline, - receiver, - worker_id.to_string(), - ) { - controller_send - .send(ControllerMessage::Failure { - worker_id, - error_message: format!("err {:?}", err), - }) - .expect("This is BAD: failed to send failure message to controller."); - } - }); + thread::Builder::new() + .name(format!("yama Pcomp{}", compressor_number)) + .spawn(move || { + let worker_id = Arc::new(format!("compressor-{}", compressor_number)); + if let Err(err) = this.storage_pipeline_worker( + subsequent_pipeline, + receiver, + worker_id.to_string(), + ) { + controller_send + .send(ControllerMessage::Failure { + worker_id, + error_message: format!("err {:?}", err), + }) + .expect("This is BAD: failed to send failure message to controller."); + } + }) + .unwrap(); } Ok(input_to_this_stage) diff --git a/yama/src/pile/integrity.rs b/yama/src/pile/integrity.rs index b741736..34ac52f 100644 --- a/yama/src/pile/integrity.rs +++ b/yama/src/pile/integrity.rs @@ -126,15 +126,18 @@ impl RawPile for RawPileIntegrityChecker { .underlying .build_storage_pipeline(settings, controller_send)?; let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec)>(64); - std::thread::spawn(move || { - while let Ok((chunk_id, mut chunk)) = receiver.recv() { - let mut hasher = twox_hash::XxHash64::with_seed(XXH64_SEED); - hasher.write(&chunk); - let computed_hash = hasher.finish().to_be_bytes(); - chunk.extend_from_slice(&computed_hash); - next_stage.send((chunk_id, chunk)).unwrap(); - } - }); + std::thread::Builder::new() + .name("yama integrity".to_string()) + .spawn(move || { + while let Ok((chunk_id, mut chunk)) = receiver.recv() { + let mut hasher = twox_hash::XxHash64::with_seed(XXH64_SEED); + hasher.write(&chunk); + let computed_hash = hasher.finish().to_be_bytes(); + chunk.extend_from_slice(&computed_hash); + next_stage.send((chunk_id, chunk)).unwrap(); + } + }) + .unwrap(); Ok(input) } }