Fix hang after storing
continuous-integration/drone the build failed Details

This commit is contained in:
Olivier 'reivilibre' 2021-11-21 11:57:14 +00:00
parent 155d31626e
commit 8eeafa7626
7 changed files with 46 additions and 35 deletions

4
Cargo.lock generated
View File

@ -258,7 +258,7 @@ dependencies = [
[[package]] [[package]]
name = "datman" name = "datman"
version = "0.5.0-alpha.1" version = "0.5.0-alpha.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arc-interner", "arc-interner",
@ -1218,7 +1218,7 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "yama" name = "yama"
version = "0.5.0-alpha.1" version = "0.5.0-alpha.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"blake", "blake",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "datman" name = "datman"
version = "0.5.0-alpha.1" version = "0.5.0-alpha.2"
authors = ["Olivier 'reivilibre' <olivier@librepush.net>"] authors = ["Olivier 'reivilibre' <olivier@librepush.net>"]
edition = "2018" edition = "2018"
repository = "https://bics.ga/reivilibre/yama" repository = "https://bics.ga/reivilibre/yama"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "yama" name = "yama"
version = "0.5.0-alpha.1" version = "0.5.0-alpha.2"
authors = ["Olivier 'reivilibre' <olivier@librepush.net>"] authors = ["Olivier 'reivilibre' <olivier@librepush.net>"]
edition = "2018" edition = "2018"
description = "Deduplicated, compressed and encrypted content pile manager" description = "Deduplicated, compressed and encrypted content pile manager"

View File

@ -273,6 +273,8 @@ pub fn store_fully<PT: ProgressTracker>(
progress_bar, progress_bar,
num_workers, num_workers,
)?; )?;
// must drop the pipeline to allow the threads to close
drop(pipeline);
while let Ok(_) = control_rx.recv() { while let Ok(_) = control_rx.recv() {
// TODO nothing for now. // TODO nothing for now.
} }

View File

@ -94,14 +94,17 @@ pub fn existence_checker_stage<RP: RawPile>(
let next_stage = next_stage.clone(); let next_stage = next_stage.clone();
let rx = rx.clone(); let rx = rx.clone();
let pile = pile.clone(); let pile = pile.clone();
std::thread::spawn(move || { std::thread::Builder::new()
while let Ok((chunk_id, chunk)) = rx.recv() { .name("yama exist?er".to_string())
// TODO handle errors properly .spawn(move || {
if !pile.chunk_exists(&chunk_id).unwrap() { while let Ok((chunk_id, chunk)) = rx.recv() {
next_stage.send((chunk_id, chunk)).unwrap(); // TODO handle errors properly
if !pile.chunk_exists(&chunk_id).unwrap() {
next_stage.send((chunk_id, chunk)).unwrap();
}
} }
} })
}); .unwrap();
} }
tx tx

View File

@ -269,21 +269,24 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
let receiver = receiver.clone(); let receiver = receiver.clone();
let controller_send = controller_send.clone(); let controller_send = controller_send.clone();
let this = (*self).clone(); let this = (*self).clone();
thread::spawn(move || { thread::Builder::new()
let worker_id = Arc::new(format!("compressor-{}", compressor_number)); .name(format!("yama Pcomp{}", compressor_number))
if let Err(err) = this.storage_pipeline_worker( .spawn(move || {
subsequent_pipeline, let worker_id = Arc::new(format!("compressor-{}", compressor_number));
receiver, if let Err(err) = this.storage_pipeline_worker(
worker_id.to_string(), subsequent_pipeline,
) { receiver,
controller_send worker_id.to_string(),
.send(ControllerMessage::Failure { ) {
worker_id, controller_send
error_message: format!("err {:?}", err), .send(ControllerMessage::Failure {
}) worker_id,
.expect("This is BAD: failed to send failure message to controller."); error_message: format!("err {:?}", err),
} })
}); .expect("This is BAD: failed to send failure message to controller.");
}
})
.unwrap();
} }
Ok(input_to_this_stage) Ok(input_to_this_stage)

View File

@ -126,15 +126,18 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
.underlying .underlying
.build_storage_pipeline(settings, controller_send)?; .build_storage_pipeline(settings, controller_send)?;
let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(64); let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(64);
std::thread::spawn(move || { std::thread::Builder::new()
while let Ok((chunk_id, mut chunk)) = receiver.recv() { .name("yama integrity".to_string())
let mut hasher = twox_hash::XxHash64::with_seed(XXH64_SEED); .spawn(move || {
hasher.write(&chunk); while let Ok((chunk_id, mut chunk)) = receiver.recv() {
let computed_hash = hasher.finish().to_be_bytes(); let mut hasher = twox_hash::XxHash64::with_seed(XXH64_SEED);
chunk.extend_from_slice(&computed_hash); hasher.write(&chunk);
next_stage.send((chunk_id, chunk)).unwrap(); let computed_hash = hasher.finish().to_be_bytes();
} chunk.extend_from_slice(&computed_hash);
}); next_stage.send((chunk_id, chunk)).unwrap();
}
})
.unwrap();
Ok(input) Ok(input)
} }
} }