diff --git a/yama/src/operations/checking.rs b/yama/src/operations/checking.rs index 8b13789..37c85ba 100644 --- a/yama/src/operations/checking.rs +++ b/yama/src/operations/checking.rs @@ -1 +1,223 @@ +use crate::chunking::RecursiveUnchunker; +use crate::commands::retrieve_tree_node; +use crate::definitions::{ChunkId, TreeNode}; +use crate::pile::{Keyspace, Pile, RawPile}; +use anyhow::bail; +use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; +use log::{error, info, warn}; +use std::collections::HashSet; +use std::io::Write; +use std::sync::Mutex; +#[derive(PartialEq, Eq, Copy, Clone, Debug)] +pub enum VacuumMode { + NoVacuum, + DryRunVacuum, + Vacuum, +} + +pub struct NullWriter {} + +impl Write for NullWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +/// Mark-and-sweep style vacuuming system. +pub struct VacuumRawPile { + underlying: RP, + vacuum_tracking_enabled: bool, + retrieved_chunks: Mutex>, +} + +impl VacuumRawPile { + pub fn new(underlying: RP, vacuum_tracking_enabled: bool) -> Self { + VacuumRawPile { + underlying, + vacuum_tracking_enabled, + retrieved_chunks: Default::default(), + } + } + + pub fn calculate_vacuum_for_sweeping(&self) -> anyhow::Result> { + if !self.vacuum_tracking_enabled { + bail!("Vacuum tracking not enabled, you can't calculate the vacuum set!"); + } + + let mut to_sweep = HashSet::new(); + + let retrieved_chunks = self.retrieved_chunks.lock().unwrap(); + + let mut chunk_id: ChunkId = Default::default(); + for key in self.list_keys(Keyspace::Chunk)? { + chunk_id.clone_from_slice(&key?); + if !retrieved_chunks.contains(&chunk_id) { + to_sweep.insert(chunk_id.clone()); + } + } + + Ok(to_sweep) + } +} + +impl RawPile for VacuumRawPile { + fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result { + self.underlying.exists(kind, key) + } + + fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result>> { + self.underlying.read(kind, key) + } + + fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> { + if self.vacuum_tracking_enabled && kind == Keyspace::Chunk { + let mut chunk_id: ChunkId = Default::default(); + chunk_id.clone_from_slice(key); + self.retrieved_chunks.lock().unwrap().insert(chunk_id); + } + self.underlying.write(kind, key, value) + } + + fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> { + self.underlying.delete(kind, key) + } + + fn list_keys( + &self, + kind: Keyspace, + ) -> anyhow::Result>>>> { + self.underlying.list_keys(kind) + } + + fn flush(&self) -> anyhow::Result<()> { + self.underlying.flush() + } + + fn check_lowlevel(&self) -> anyhow::Result { + self.underlying.check_lowlevel() + } +} + +pub fn check_deep( + pile: Pile, + vacuum: VacuumMode, + make_progress_bar: bool, +) -> anyhow::Result { + let pile = Pile::new(VacuumRawPile::new( + pile.raw_pile, + vacuum != VacuumMode::NoVacuum, + )); + + let mut errors = 0; + + let mut to_check = Vec::new(); + let pointer_list = pile.list_pointers()?; + + for pointer in pointer_list.iter() { + info!("Checking pointer {:?}", pointer); + match pile.read_pointer(&pointer)? { + Some(pointer_data) => { + if let Some(parent) = pointer_data.parent_pointer { + if !pointer_list.contains(&parent) { + errors += 1; + error!( + "Pointer {:?} has a parent {:?} which does not exist.", + pointer, parent + ); + } + } + + let tree_node = retrieve_tree_node(&pile, pointer_data.chunk_ref.clone())?; + tree_node.node.visit( + &mut |node, _| { + if let TreeNode::NormalFile { content, .. } = node { + to_check.push(content.clone()); + } + Ok(()) + }, + "".to_owned(), + )?; + } + None => { + errors += 1; + error!("Pointer {:?} does not seem to exist.", pointer); + } + } + } + + let pbar = if make_progress_bar { + ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10)) + } else { + ProgressBar::hidden() + }; + pbar.set_style( + ProgressStyle::default_bar() + .template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"), + ); + pbar.set_message("checking"); + + let mut done = 0; + + while let Some(next_to_check) = to_check.pop() { + done += 1; + pbar.set_length(done + to_check.len() as u64); + pbar.set_position(done); + + let mut unchunker = RecursiveUnchunker::new(&pile, next_to_check.clone()); + match std::io::copy(&mut unchunker, &mut NullWriter {}) { + Ok(_) => {} + Err(err) => { + errors += 1; + warn!( + "Error occurred when reading {:?}: {:?}.", + next_to_check, err + ); + } + } + } + + pbar.finish_and_clear(); + + if errors > 0 { + error!("There were {:?}", errors); + } else { + info!("No errors."); + } + + if errors == 0 && vacuum != VacuumMode::NoVacuum { + info!("Calculating sweep set for vacuuming."); + let to_vacuum = pile.raw_pile.calculate_vacuum_for_sweeping()?; + info!("{} chunks are ready to be vacuumed.", to_vacuum.len()); + if vacuum == VacuumMode::Vacuum { + let pbar = if make_progress_bar { + ProgressBar::with_draw_target( + to_vacuum.len() as u64, + ProgressDrawTarget::stdout_with_hz(10), + ) + } else { + ProgressBar::hidden() + }; + pbar.set_style( + ProgressStyle::default_bar().template( + "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", + ), + ); + pbar.set_message("vacuuming"); + + // actually do the vacuum! + info!("Going to vacuum them up."); + for vacuum_id in to_vacuum { + pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?; + pbar.inc(1); + } + pbar.finish_and_clear(); + } + } + + Ok(errors) +}