Add check routine that checks all chunk hashes

This commit is contained in:
Olivier 'reivilibre' 2023-08-15 20:13:17 +01:00
parent 6f0e3de350
commit feb05cfecf
5 changed files with 191 additions and 6 deletions

View File

@ -151,6 +151,7 @@ const PROGRESS_SPANS: &'static [&'static str] = &[
"unpack_files", "unpack_files",
"expand_chunkrefs", "expand_chunkrefs",
"extract_files", "extract_files",
"check_all_chunks",
]; ];
#[tokio::main] #[tokio::main]

View File

@ -276,6 +276,7 @@ const PROGRESS_SPANS: &'static [&'static str] = &[
"unpack_files", "unpack_files",
"expand_chunkrefs", "expand_chunkrefs",
"extract_files", "extract_files",
"check_all_chunks",
]; ];
#[tokio::main] #[tokio::main]
@ -826,9 +827,6 @@ async fn main() -> eyre::Result<()> {
if pointers { if pointers {
bail!("pointers check not implemented yet. Try -2"); bail!("pointers check not implemented yet. Try -2");
} }
if intensive {
bail!("intensive check not implemented yet. Try -2");
}
let pile_connector_path = Path::new("."); let pile_connector_path = Path::new(".");
let keyring = pre_open_keyring(&pile_connector_path).await?; let keyring = pre_open_keyring(&pile_connector_path).await?;
@ -851,6 +849,14 @@ async fn main() -> eyre::Result<()> {
.context("shallow check failed")?; .context("shallow check failed")?;
} }
if intensive {
let all_bloblogs = pwc.localcache.read().await?.list_bloblogs().await?;
check::check_all_chunks_in_bloblogs(&pwc, &all_bloblogs)
.await
.context("intensive check failed")?;
}
Arc::try_unwrap(pwc) Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close() .close()

View File

@ -1,12 +1,20 @@
use crate::extract::expand_chunkrefs; use crate::extract::expand_chunkrefs;
use crate::pile_with_cache::PileWithCache; use crate::pile_with_cache::PileWithCache;
use eyre::{bail, ContextCompat}; use crate::retriever::decompressor::DECOMPRESS_CAPACITY;
use std::collections::BTreeSet; use crate::PROGRESS_BAR_STYLE;
use eyre::{bail, eyre, Context, ContextCompat};
use flume::{Receiver, Sender};
use indicatif::ProgressStyle;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc; use std::sync::Arc;
use tracing::info; use tokio::task::JoinSet;
use tracing::{error, info, info_span, Instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use yama_midlevel_crypto::chunk_id::ChunkId; use yama_midlevel_crypto::chunk_id::ChunkId;
use yama_pile::definitions::BloblogId;
use yama_pile::tree::TreeNode; use yama_pile::tree::TreeNode;
use yama_wormfile::boxed::BoxedWormFileProvider; use yama_wormfile::boxed::BoxedWormFileProvider;
use zstd::bulk::Decompressor;
/// Check that all pointers point to chunks that exist **in our local cache**. /// Check that all pointers point to chunks that exist **in our local cache**.
pub async fn check_pointers_point_to_indexed_chunks( pub async fn check_pointers_point_to_indexed_chunks(
@ -62,3 +70,154 @@ pub async fn check_pointers_point_to_indexed_chunks(
Ok(()) Ok(())
} }
/// Checks all the chunks in the bloblog and then returns the number of chunks that were checked.
pub async fn check_all_chunk_hashes_in_bloblog(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
bloblog_id: BloblogId,
) -> eyre::Result<u64> {
let mut decompressor = match &pwc.pile.pile_config.zstd_dict {
Some(dict) => Decompressor::with_dictionary(dict)?,
None => Decompressor::new()?,
};
let chunk_id_key = &pwc.pile.pile_config.chunk_id_key;
let mut bloblog = pwc
.pile
.read_bloblog(bloblog_id)
.await
.with_context(|| format!("could not open bloblog for checking: {bloblog_id:?}"))?;
let offsets_and_chunks_to_read: BTreeMap<u64, ChunkId> = bloblog
.footer()
.chunks
.iter()
.map(|(chunk_id, locator)| (locator.offset, *chunk_id))
.collect();
let mut buf = Vec::with_capacity(DECOMPRESS_CAPACITY);
let mut checked = 0;
for (_, chunk_id) in offsets_and_chunks_to_read {
let blob = bloblog
.read_chunk(chunk_id)
.await?
.context("missing chunk")?;
(decompressor, buf) = tokio::task::spawn_blocking(move || {
decompressor
.decompress_to_buffer(&blob, &mut buf)
.with_context(|| format!("failed to decompress {chunk_id:?} in {bloblog_id:?}"))?;
Ok::<_, eyre::Error>((decompressor, buf))
})
.await??;
if !chunk_id.verify(&buf, chunk_id_key) {
bail!("verification failure: chunk {chunk_id:?} in bloblog {bloblog_id:?} is corrupt!");
}
checked += 1;
}
Ok(checked)
}
pub async fn check_all_chunks_in_bloblogs(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
bloblogs: &BTreeSet<BloblogId>,
) -> eyre::Result<()> {
let prog_span = info_span!("check_all_chunks");
async {
let prog_span = Span::current();
prog_span.pb_set_style(
&ProgressStyle::default_bar()
.template(PROGRESS_BAR_STYLE)
.unwrap(),
);
prog_span.pb_set_length(bloblogs.len() as u64);
prog_span.pb_set_message("checking all bloblogs");
let mut workers = JoinSet::new();
let (bloblog_id_tx, bloblog_id_rx) = flume::bounded(16);
let (progress_tx, progress_rx) = flume::bounded(16);
for _ in 0..4 {
let pwc = pwc.clone();
let bloblog_id_rx = bloblog_id_rx.clone();
let progress_tx = progress_tx.clone();
workers.spawn(async {
if let Err(err) =
check_all_chunks_in_bloblogs_worker(pwc, bloblog_id_rx, progress_tx).await
{
error!("Error in chunk checker worker: {err:?}")
}
});
}
drop(progress_tx);
drop(bloblog_id_rx);
let mut success = true;
let mut num_bloblogs_outstanding = bloblogs.len();
let mut total_chunks_checked = 0u64;
tokio::join!(
async move {
for bloblog_id in bloblogs {
bloblog_id_tx
.send_async(*bloblog_id)
.await
.expect("can no longer submit new bloblogs to be checked; fault?");
}
drop(bloblog_id_tx);
},
async {
while let Ok(res) = progress_rx.recv_async().await {
match res {
Ok(chunks_checked) => {
total_chunks_checked += chunks_checked;
}
Err(err) => {
error!("check failure: {err:?}");
success = false;
}
}
prog_span.pb_inc(1);
num_bloblogs_outstanding = num_bloblogs_outstanding
.checked_sub(1)
.expect("more bloblogs progress reports than expected?");
}
}
);
if num_bloblogs_outstanding > 0 {
bail!("{num_bloblogs_outstanding} bloblogs outstanding somehow");
}
info!("{total_chunks_checked} chunks checked!");
if !success {
bail!("There were chunk check failures.");
}
Ok(())
}
.instrument(prog_span)
.await
}
pub async fn check_all_chunks_in_bloblogs_worker(
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
bloblogs_rx: Receiver<BloblogId>,
progress_tx: Sender<eyre::Result<u64>>,
) -> eyre::Result<()> {
while let Ok(bloblog_id) = bloblogs_rx.recv_async().await {
let check = check_all_chunk_hashes_in_bloblog(&pwc, bloblog_id).await;
progress_tx
.send_async(check)
.await
.map_err(|_| eyre!("check progress tx shut down"))?;
}
Ok(())
}

View File

@ -337,6 +337,21 @@ impl<const RW: bool> StoreConnection<RW> {
row_results.into_iter().collect() row_results.into_iter().collect()
} }
pub async fn list_bloblogs(&mut self) -> eyre::Result<BTreeSet<BloblogId>> {
let row_results = query!(
"
SELECT bloblog_sha256 FROM bloblogs
"
)
.map(|row| {
BloblogId::try_from(row.bloblog_sha256.as_ref())
.context("failed to decode BloblogId in local cache")
})
.fetch_all(&mut *self.conn)
.await?;
row_results.into_iter().collect()
}
pub async fn is_chunk_new(&mut self, chunk_id: ChunkId) -> eyre::Result<bool> { pub async fn is_chunk_new(&mut self, chunk_id: ChunkId) -> eyre::Result<bool> {
let chunk_id_text = chunk_id.to_string(); let chunk_id_text = chunk_id.to_string();
let is_new = query!( let is_new = query!(

View File

@ -211,4 +211,8 @@ impl<R: WormFileReader + Unpin> BloblogReader<R> {
None => Ok(None), None => Ok(None),
} }
} }
pub fn footer(&self) -> &BloblogFooter {
&self.footer
}
} }