diff --git a/datman/src/remote/backup_source_requester.rs b/datman/src/remote/backup_source_requester.rs index c9b8ed4..dda2786 100644 --- a/datman/src/remote/backup_source_requester.rs +++ b/datman/src/remote/backup_source_requester.rs @@ -79,7 +79,7 @@ pub fn chunking< write_message(&mut write, &parent)?; write.flush()?; - let writing_pipeline = if use_writing_pipeline { + let (writing_pipeline, control_rx) = if use_writing_pipeline { let sps = StoragePipelineSettings { num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32, compressor_input_bound: 32, @@ -87,11 +87,14 @@ pub fn chunking< }; let (control_tx, control_rx) = crossbeam_channel::unbounded(); let pipeline = raw_pile.build_storage_pipeline(sps, control_tx)?; - Some(ResponderWritingPipeline { - pipeline_submission: pipeline, - }) + ( + Some(ResponderWritingPipeline { + pipeline_submission: pipeline, + }), + Some(control_rx), + ) } else { - None + (None, None) }; let (r_handle, w_handle, join_handles) = Responder::start( @@ -111,6 +114,12 @@ pub fn chunking< let read = r_handle.join().unwrap(); let write = w_handle.join().unwrap(); + if let Some(control_rx) = control_rx { + while let Ok(_) = control_rx.recv() { + // TODO nop + } + } + info!("Remote finished chunking."); Ok((read, write)) diff --git a/yama/src/operations/storing.rs b/yama/src/operations/storing.rs index aa8e78f..b788a9a 100644 --- a/yama/src/operations/storing.rs +++ b/yama/src/operations/storing.rs @@ -273,6 +273,9 @@ pub fn store_fully( progress_bar, num_workers, )?; + while let Ok(_) = control_rx.recv() { + // TODO nothing for now. + } } else { store( &root_dir,