Implement compressor's pipeline stage

This commit is contained in:
Olivier 'reivilibre' 2021-11-19 21:40:20 +00:00
parent ea2f48f437
commit b93cbe89e0

View File

@ -169,8 +169,46 @@ impl<R: RawPile> RawPileCompressor<R> {
next_stage: Sender<(ChunkId, Vec<u8>)>,
input: Receiver<(ChunkId, Vec<u8>)>,
controller_send: &Sender<ControllerMessage>,
worker_id: String,
) -> anyhow::Result<()> {
todo!();
// the worker ID has to live forever, so we leak it :/
let worker_id: &'static str = Box::leak(worker_id.into_boxed_str());
metrics::register_histogram!(
"compressor_idle_time",
metrics::Unit::Seconds,
"Time spent waiting between chunks",
"id" => worker_id
);
metrics::register_counter!(
"compressor_bytes_input",
metrics::Unit::Bytes,
"Number of bytes input into the compressor.",
"id" => worker_id
);
metrics::register_counter!(
"compressor_bytes_output",
metrics::Unit::Bytes,
"Number of bytes output from the compressor.",
"id" => worker_id
);
metrics::register_counter!(
"compressor_chunks_processed",
metrics::Unit::Count,
"Number of bytes input into the compressor.",
"id" => worker_id
);
let mut compressor = Compressor::with_dict(self.settings.dictionary.as_ref().clone());
let level = self.settings.level;
while let Ok((chunk_id, bytes)) = input.recv() {
let in_bytes = bytes.len();
let bytes = compressor.compress(&bytes, level)?;
let out_bytes = bytes.len();
next_stage.send((chunk_id, bytes))?;
metrics::counter!("compressor_bytes_input", in_bytes as u64, "id" => worker_id);
metrics::counter!("compressor_bytes_output", out_bytes as u64, "id" => worker_id);
}
Ok(())
}
}
@ -187,6 +225,7 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
Ok(None)
}
}
fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> {
let compressed = self.compress(value)?;
self.underlying.write(kind, key, &compressed)
@ -233,9 +272,12 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
let this = (*self).clone();
thread::spawn(move || {
let worker_id = Arc::new(format!("compressor-{}", compressor_number));
if let Err(err) =
this.storage_pipeline_worker(subsequent_pipeline, receiver, &controller_send)
{
if let Err(err) = this.storage_pipeline_worker(
subsequent_pipeline,
receiver,
&controller_send,
worker_id.to_string(),
) {
controller_send
.send(ControllerMessage::Failure {
worker_id,