diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index 5bdba6f..df19d78 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -151,6 +151,7 @@ const PROGRESS_SPANS: &'static [&'static str] = &[ "unpack_files", "expand_chunkrefs", "extract_files", + "check_all_chunks", ]; #[tokio::main] diff --git a/yama/src/bin/yama.rs b/yama/src/bin/yama.rs index ad91350..faa217f 100644 --- a/yama/src/bin/yama.rs +++ b/yama/src/bin/yama.rs @@ -276,6 +276,7 @@ const PROGRESS_SPANS: &'static [&'static str] = &[ "unpack_files", "expand_chunkrefs", "extract_files", + "check_all_chunks", ]; #[tokio::main] @@ -826,9 +827,6 @@ async fn main() -> eyre::Result<()> { if pointers { bail!("pointers check not implemented yet. Try -2"); } - if intensive { - bail!("intensive check not implemented yet. Try -2"); - } let pile_connector_path = Path::new("."); let keyring = pre_open_keyring(&pile_connector_path).await?; @@ -851,6 +849,14 @@ async fn main() -> eyre::Result<()> { .context("shallow check failed")?; } + if intensive { + let all_bloblogs = pwc.localcache.read().await?.list_bloblogs().await?; + + check::check_all_chunks_in_bloblogs(&pwc, &all_bloblogs) + .await + .context("intensive check failed")?; + } + Arc::try_unwrap(pwc) .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? .close() diff --git a/yama/src/check.rs b/yama/src/check.rs index 7b08197..f953b32 100644 --- a/yama/src/check.rs +++ b/yama/src/check.rs @@ -1,12 +1,20 @@ use crate::extract::expand_chunkrefs; use crate::pile_with_cache::PileWithCache; -use eyre::{bail, ContextCompat}; -use std::collections::BTreeSet; +use crate::retriever::decompressor::DECOMPRESS_CAPACITY; +use crate::PROGRESS_BAR_STYLE; +use eyre::{bail, eyre, Context, ContextCompat}; +use flume::{Receiver, Sender}; +use indicatif::ProgressStyle; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; -use tracing::info; +use tokio::task::JoinSet; +use tracing::{error, info, info_span, Instrument, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use yama_midlevel_crypto::chunk_id::ChunkId; +use yama_pile::definitions::BloblogId; use yama_pile::tree::TreeNode; use yama_wormfile::boxed::BoxedWormFileProvider; +use zstd::bulk::Decompressor; /// Check that all pointers point to chunks that exist **in our local cache**. pub async fn check_pointers_point_to_indexed_chunks( @@ -62,3 +70,154 @@ pub async fn check_pointers_point_to_indexed_chunks( Ok(()) } + +/// Checks all the chunks in the bloblog and then returns the number of chunks that were checked. +pub async fn check_all_chunk_hashes_in_bloblog( + pwc: &Arc>, + bloblog_id: BloblogId, +) -> eyre::Result { + let mut decompressor = match &pwc.pile.pile_config.zstd_dict { + Some(dict) => Decompressor::with_dictionary(dict)?, + None => Decompressor::new()?, + }; + let chunk_id_key = &pwc.pile.pile_config.chunk_id_key; + + let mut bloblog = pwc + .pile + .read_bloblog(bloblog_id) + .await + .with_context(|| format!("could not open bloblog for checking: {bloblog_id:?}"))?; + let offsets_and_chunks_to_read: BTreeMap = bloblog + .footer() + .chunks + .iter() + .map(|(chunk_id, locator)| (locator.offset, *chunk_id)) + .collect(); + + let mut buf = Vec::with_capacity(DECOMPRESS_CAPACITY); + + let mut checked = 0; + + for (_, chunk_id) in offsets_and_chunks_to_read { + let blob = bloblog + .read_chunk(chunk_id) + .await? + .context("missing chunk")?; + + (decompressor, buf) = tokio::task::spawn_blocking(move || { + decompressor + .decompress_to_buffer(&blob, &mut buf) + .with_context(|| format!("failed to decompress {chunk_id:?} in {bloblog_id:?}"))?; + Ok::<_, eyre::Error>((decompressor, buf)) + }) + .await??; + + if !chunk_id.verify(&buf, chunk_id_key) { + bail!("verification failure: chunk {chunk_id:?} in bloblog {bloblog_id:?} is corrupt!"); + } + + checked += 1; + } + + Ok(checked) +} + +pub async fn check_all_chunks_in_bloblogs( + pwc: &Arc>, + bloblogs: &BTreeSet, +) -> eyre::Result<()> { + let prog_span = info_span!("check_all_chunks"); + async { + let prog_span = Span::current(); + prog_span.pb_set_style( + &ProgressStyle::default_bar() + .template(PROGRESS_BAR_STYLE) + .unwrap(), + ); + prog_span.pb_set_length(bloblogs.len() as u64); + prog_span.pb_set_message("checking all bloblogs"); + + let mut workers = JoinSet::new(); + + let (bloblog_id_tx, bloblog_id_rx) = flume::bounded(16); + let (progress_tx, progress_rx) = flume::bounded(16); + + for _ in 0..4 { + let pwc = pwc.clone(); + let bloblog_id_rx = bloblog_id_rx.clone(); + let progress_tx = progress_tx.clone(); + workers.spawn(async { + if let Err(err) = + check_all_chunks_in_bloblogs_worker(pwc, bloblog_id_rx, progress_tx).await + { + error!("Error in chunk checker worker: {err:?}") + } + }); + } + + drop(progress_tx); + drop(bloblog_id_rx); + + let mut success = true; + let mut num_bloblogs_outstanding = bloblogs.len(); + let mut total_chunks_checked = 0u64; + + tokio::join!( + async move { + for bloblog_id in bloblogs { + bloblog_id_tx + .send_async(*bloblog_id) + .await + .expect("can no longer submit new bloblogs to be checked; fault?"); + } + drop(bloblog_id_tx); + }, + async { + while let Ok(res) = progress_rx.recv_async().await { + match res { + Ok(chunks_checked) => { + total_chunks_checked += chunks_checked; + } + Err(err) => { + error!("check failure: {err:?}"); + success = false; + } + } + prog_span.pb_inc(1); + num_bloblogs_outstanding = num_bloblogs_outstanding + .checked_sub(1) + .expect("more bloblogs progress reports than expected?"); + } + } + ); + + if num_bloblogs_outstanding > 0 { + bail!("{num_bloblogs_outstanding} bloblogs outstanding somehow"); + } + + info!("{total_chunks_checked} chunks checked!"); + + if !success { + bail!("There were chunk check failures."); + } + + Ok(()) + } + .instrument(prog_span) + .await +} + +pub async fn check_all_chunks_in_bloblogs_worker( + pwc: Arc>, + bloblogs_rx: Receiver, + progress_tx: Sender>, +) -> eyre::Result<()> { + while let Ok(bloblog_id) = bloblogs_rx.recv_async().await { + let check = check_all_chunk_hashes_in_bloblog(&pwc, bloblog_id).await; + progress_tx + .send_async(check) + .await + .map_err(|_| eyre!("check progress tx shut down"))?; + } + Ok(()) +} diff --git a/yama_localcache/src/lib.rs b/yama_localcache/src/lib.rs index 3f372df..b7e9f36 100644 --- a/yama_localcache/src/lib.rs +++ b/yama_localcache/src/lib.rs @@ -337,6 +337,21 @@ impl StoreConnection { row_results.into_iter().collect() } + pub async fn list_bloblogs(&mut self) -> eyre::Result> { + let row_results = query!( + " + SELECT bloblog_sha256 FROM bloblogs + " + ) + .map(|row| { + BloblogId::try_from(row.bloblog_sha256.as_ref()) + .context("failed to decode BloblogId in local cache") + }) + .fetch_all(&mut *self.conn) + .await?; + row_results.into_iter().collect() + } + pub async fn is_chunk_new(&mut self, chunk_id: ChunkId) -> eyre::Result { let chunk_id_text = chunk_id.to_string(); let is_new = query!( diff --git a/yama_pile/src/bloblogs.rs b/yama_pile/src/bloblogs.rs index 0a37247..a3c8d28 100644 --- a/yama_pile/src/bloblogs.rs +++ b/yama_pile/src/bloblogs.rs @@ -211,4 +211,8 @@ impl BloblogReader { None => Ok(None), } } + + pub fn footer(&self) -> &BloblogFooter { + &self.footer + } }