From 0a02eea478e825437c420ca2d82ef78bbdb3ddfe Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 20 Nov 2021 10:33:30 +0000 Subject: [PATCH] Add primitive pipelined integrity stage --- yama/src/pile/integrity.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/yama/src/pile/integrity.rs b/yama/src/pile/integrity.rs index 6cabb7c..4d2cbcd 100644 --- a/yama/src/pile/integrity.rs +++ b/yama/src/pile/integrity.rs @@ -120,6 +120,19 @@ impl RawPile for RawPileIntegrityChecker { settings: StoragePipelineSettings, controller_send: Sender, ) -> anyhow::Result)>> { - todo!() + // TODO primitive implementation but good enough for now. + // May want metrics later? + let next_stage = self.underlying.build_storage_pipeline(settings, controller_send)?; + let (input, receiver) = crossbeam_channel::bounded::<(ChunkId, Vec)>(64); + std::thread::spawn(move || { + while let Ok((chunk_id, mut chunk)) = receiver.recv() { + let mut hasher = twox_hash::XxHash64::with_seed(XXH64_SEED); + hasher.write(&chunk); + let computed_hash = hasher.finish().to_be_bytes(); + chunk.extend_from_slice(&computed_hash); + next_stage.send((chunk_id, chunk)).unwrap(); + } + }); + Ok(input) } }