From b93cbe89e0447289bddaded9b567792f8c1b1e41 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 19 Nov 2021 21:40:20 +0000 Subject: [PATCH] Implement compressor's pipeline stage --- yama/src/pile/compression.rs | 50 +++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/yama/src/pile/compression.rs b/yama/src/pile/compression.rs index 3a80588..aa293ae 100644 --- a/yama/src/pile/compression.rs +++ b/yama/src/pile/compression.rs @@ -169,8 +169,46 @@ impl RawPileCompressor { next_stage: Sender<(ChunkId, Vec)>, input: Receiver<(ChunkId, Vec)>, controller_send: &Sender, + worker_id: String, ) -> anyhow::Result<()> { - todo!(); + // the worker ID has to live forever, so we leak it :/ + let worker_id: &'static str = Box::leak(worker_id.into_boxed_str()); + metrics::register_histogram!( + "compressor_idle_time", + metrics::Unit::Seconds, + "Time spent waiting between chunks", + "id" => worker_id + ); + metrics::register_counter!( + "compressor_bytes_input", + metrics::Unit::Bytes, + "Number of bytes input into the compressor.", + "id" => worker_id + ); + metrics::register_counter!( + "compressor_bytes_output", + metrics::Unit::Bytes, + "Number of bytes output from the compressor.", + "id" => worker_id + ); + metrics::register_counter!( + "compressor_chunks_processed", + metrics::Unit::Count, + "Number of bytes input into the compressor.", + "id" => worker_id + ); + + let mut compressor = Compressor::with_dict(self.settings.dictionary.as_ref().clone()); + let level = self.settings.level; + while let Ok((chunk_id, bytes)) = input.recv() { + let in_bytes = bytes.len(); + let bytes = compressor.compress(&bytes, level)?; + let out_bytes = bytes.len(); + next_stage.send((chunk_id, bytes))?; + metrics::counter!("compressor_bytes_input", in_bytes as u64, "id" => worker_id); + metrics::counter!("compressor_bytes_output", out_bytes as u64, "id" => worker_id); + } + Ok(()) } } @@ -187,6 +225,7 @@ impl RawPile for RawPileCompressor { Ok(None) } } + fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> { let compressed = self.compress(value)?; self.underlying.write(kind, key, &compressed) @@ -233,9 +272,12 @@ impl RawPile for RawPileCompressor { let this = (*self).clone(); thread::spawn(move || { let worker_id = Arc::new(format!("compressor-{}", compressor_number)); - if let Err(err) = - this.storage_pipeline_worker(subsequent_pipeline, receiver, &controller_send) - { + if let Err(err) = this.storage_pipeline_worker( + subsequent_pipeline, + receiver, + &controller_send, + worker_id.to_string(), + ) { controller_send .send(ControllerMessage::Failure { worker_id,