Deep checking and vacuuming
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Olivier 'reivilibre' 2021-06-15 23:58:39 +01:00
parent 9fff39d66a
commit 4b2afc261c

View File

@ -1 +1,223 @@
use crate::chunking::RecursiveUnchunker;
use crate::commands::retrieve_tree_node;
use crate::definitions::{ChunkId, TreeNode};
use crate::pile::{Keyspace, Pile, RawPile};
use anyhow::bail;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use log::{error, info, warn};
use std::collections::HashSet;
use std::io::Write;
use std::sync::Mutex;
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
pub enum VacuumMode {
NoVacuum,
DryRunVacuum,
Vacuum,
}
pub struct NullWriter {}
impl Write for NullWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
/// Mark-and-sweep style vacuuming system.
pub struct VacuumRawPile<RP: RawPile> {
underlying: RP,
vacuum_tracking_enabled: bool,
retrieved_chunks: Mutex<HashSet<ChunkId>>,
}
impl<RP: RawPile> VacuumRawPile<RP> {
pub fn new(underlying: RP, vacuum_tracking_enabled: bool) -> Self {
VacuumRawPile {
underlying,
vacuum_tracking_enabled,
retrieved_chunks: Default::default(),
}
}
pub fn calculate_vacuum_for_sweeping(&self) -> anyhow::Result<HashSet<ChunkId>> {
if !self.vacuum_tracking_enabled {
bail!("Vacuum tracking not enabled, you can't calculate the vacuum set!");
}
let mut to_sweep = HashSet::new();
let retrieved_chunks = self.retrieved_chunks.lock().unwrap();
let mut chunk_id: ChunkId = Default::default();
for key in self.list_keys(Keyspace::Chunk)? {
chunk_id.clone_from_slice(&key?);
if !retrieved_chunks.contains(&chunk_id) {
to_sweep.insert(chunk_id.clone());
}
}
Ok(to_sweep)
}
}
impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<bool> {
self.underlying.exists(kind, key)
}
fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
self.underlying.read(kind, key)
}
fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> {
if self.vacuum_tracking_enabled && kind == Keyspace::Chunk {
let mut chunk_id: ChunkId = Default::default();
chunk_id.clone_from_slice(key);
self.retrieved_chunks.lock().unwrap().insert(chunk_id);
}
self.underlying.write(kind, key, value)
}
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
self.underlying.delete(kind, key)
}
fn list_keys(
&self,
kind: Keyspace,
) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<Vec<u8>>>>> {
self.underlying.list_keys(kind)
}
fn flush(&self) -> anyhow::Result<()> {
self.underlying.flush()
}
fn check_lowlevel(&self) -> anyhow::Result<bool> {
self.underlying.check_lowlevel()
}
}
pub fn check_deep<RP: RawPile>(
pile: Pile<RP>,
vacuum: VacuumMode,
make_progress_bar: bool,
) -> anyhow::Result<u32> {
let pile = Pile::new(VacuumRawPile::new(
pile.raw_pile,
vacuum != VacuumMode::NoVacuum,
));
let mut errors = 0;
let mut to_check = Vec::new();
let pointer_list = pile.list_pointers()?;
for pointer in pointer_list.iter() {
info!("Checking pointer {:?}", pointer);
match pile.read_pointer(&pointer)? {
Some(pointer_data) => {
if let Some(parent) = pointer_data.parent_pointer {
if !pointer_list.contains(&parent) {
errors += 1;
error!(
"Pointer {:?} has a parent {:?} which does not exist.",
pointer, parent
);
}
}
let tree_node = retrieve_tree_node(&pile, pointer_data.chunk_ref.clone())?;
tree_node.node.visit(
&mut |node, _| {
if let TreeNode::NormalFile { content, .. } = node {
to_check.push(content.clone());
}
Ok(())
},
"".to_owned(),
)?;
}
None => {
errors += 1;
error!("Pointer {:?} does not seem to exist.", pointer);
}
}
}
let pbar = if make_progress_bar {
ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10))
} else {
ProgressBar::hidden()
};
pbar.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
);
pbar.set_message("checking");
let mut done = 0;
while let Some(next_to_check) = to_check.pop() {
done += 1;
pbar.set_length(done + to_check.len() as u64);
pbar.set_position(done);
let mut unchunker = RecursiveUnchunker::new(&pile, next_to_check.clone());
match std::io::copy(&mut unchunker, &mut NullWriter {}) {
Ok(_) => {}
Err(err) => {
errors += 1;
warn!(
"Error occurred when reading {:?}: {:?}.",
next_to_check, err
);
}
}
}
pbar.finish_and_clear();
if errors > 0 {
error!("There were {:?}", errors);
} else {
info!("No errors.");
}
if errors == 0 && vacuum != VacuumMode::NoVacuum {
info!("Calculating sweep set for vacuuming.");
let to_vacuum = pile.raw_pile.calculate_vacuum_for_sweeping()?;
info!("{} chunks are ready to be vacuumed.", to_vacuum.len());
if vacuum == VacuumMode::Vacuum {
let pbar = if make_progress_bar {
ProgressBar::with_draw_target(
to_vacuum.len() as u64,
ProgressDrawTarget::stdout_with_hz(10),
)
} else {
ProgressBar::hidden()
};
pbar.set_style(
ProgressStyle::default_bar().template(
"[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
),
);
pbar.set_message("vacuuming");
// actually do the vacuum!
info!("Going to vacuum them up.");
for vacuum_id in to_vacuum {
pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
pbar.inc(1);
}
pbar.finish_and_clear();
}
}
Ok(errors)
}