Exhaust controller receiver on finish so that we don't exit too soon
continuous-integration/drone the build failed Details

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 15:05:22 +00:00
parent b4cac57ec5
commit 155d31626e
2 changed files with 17 additions and 5 deletions

View File

@ -79,7 +79,7 @@ pub fn chunking<
write_message(&mut write, &parent)?;
write.flush()?;
let writing_pipeline = if use_writing_pipeline {
let (writing_pipeline, control_rx) = if use_writing_pipeline {
let sps = StoragePipelineSettings {
num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32,
compressor_input_bound: 32,
@ -87,11 +87,14 @@ pub fn chunking<
};
let (control_tx, control_rx) = crossbeam_channel::unbounded();
let pipeline = raw_pile.build_storage_pipeline(sps, control_tx)?;
Some(ResponderWritingPipeline {
pipeline_submission: pipeline,
})
(
Some(ResponderWritingPipeline {
pipeline_submission: pipeline,
}),
Some(control_rx),
)
} else {
None
(None, None)
};
let (r_handle, w_handle, join_handles) = Responder::start(
@ -111,6 +114,12 @@ pub fn chunking<
let read = r_handle.join().unwrap();
let write = w_handle.join().unwrap();
if let Some(control_rx) = control_rx {
while let Ok(_) = control_rx.recv() {
// TODO nop
}
}
info!("Remote finished chunking.");
Ok((read, write))

View File

@ -273,6 +273,9 @@ pub fn store_fully<PT: ProgressTracker>(
progress_bar,
num_workers,
)?;
while let Ok(_) = control_rx.recv() {
// TODO nothing for now.
}
} else {
store(
&root_dir,