Add some basic pile statistics, as a debug command

This commit is contained in:
Olivier 'reivilibre' 2021-11-16 19:55:09 +00:00
parent c3c0fdd240
commit ccb50f2dd9
8 changed files with 107 additions and 5 deletions

View File

@ -46,6 +46,9 @@ pub enum DebugCommand {
/// Name of the pointer to read.
name: String,
},
/// Reads statistics from the Pile
#[clap(name = "stats")]
Statistics {},
}
pub fn debug_prompt<RP: RawPile>(pdesc: PileDescriptor, pile: Pile<RP>) -> anyhow::Result<()> {
@ -176,6 +179,22 @@ pub fn debug_command<RP: RawPile>(
//eprintln!("{:#?}", this_node.node);
tree_node_printer(&this_node.name, &this_node.node, 0);
}
DebugCommand::Statistics { .. } => {
if let Some(stats) = pile.raw_pile.debug_statistics()? {
println!("Statistics for this pile");
println!(" chunk count: {} chunks", stats.number_of_chunks);
println!(
" total chunk stored space: {} bytes (may exclude deleted chunks)",
stats.total_chunk_size
);
let average_size =
((stats.total_chunk_size as f64) / (stats.number_of_chunks as f64)) as u64;
println!(" (average chunk size: {} bytes)", average_size);
} else {
eprintln!("{:?}", pile);
eprintln!("Statistics appear not to be supported on this kind of pile?");
}
}
}
Ok(())
}

View File

@ -48,6 +48,7 @@ impl Write for NullWriter {
/// Mark-and-sweep style vacuuming system.
/// We mark all the chunks that we run into (following the structure of all the pointers and
/// recursive chunk references) and sweep the chunks that have not been read.
#[derive(Debug)]
pub struct VacuumRawPile<RP: RawPile> {
underlying: RP,
vacuum_tracking_enabled: bool,

View File

@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
use crate::chunking::calculate_chunkid;
use crate::definitions::{ChunkId, PointerData};
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Condvar, Mutex};
pub mod compression;
@ -65,7 +66,16 @@ pub enum Keyspace {
Pointer,
}
pub trait RawPile: Send + Sync {
/// Useful information for humans. Doesn't need to be spot on, but kind of interesting.
#[derive(Debug, Clone)]
pub struct DebugStatistics {
pub number_of_chunks: u64,
pub minimum_chunk_size: Option<u32>,
pub maximum_chunk_size: Option<u32>,
pub total_chunk_size: u64,
}
pub trait RawPile: Send + Sync + Debug {
// TODO expose verification errors?
fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<bool>;
fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>>;
@ -84,6 +94,11 @@ pub trait RawPile: Send + Sync {
fn flush(&self) -> anyhow::Result<()>;
// TODO return a progress Receiver
fn check_lowlevel(&self) -> anyhow::Result<bool>;
/// Return a few statistics, if possible.
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
Ok(None)
}
}
impl RawPile for Box<dyn RawPile> {
@ -111,6 +126,9 @@ impl RawPile for Box<dyn RawPile> {
fn check_lowlevel(&self) -> anyhow::Result<bool> {
self.as_ref().check_lowlevel()
}
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.as_ref().debug_statistics()
}
}
impl<RP: RawPile> RawPile for Arc<RP> {
@ -138,8 +156,12 @@ impl<RP: RawPile> RawPile for Arc<RP> {
fn check_lowlevel(&self) -> anyhow::Result<bool> {
self.as_ref().check_lowlevel()
}
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.as_ref().debug_statistics()
}
}
#[derive(Debug)]
pub struct Pile<R: RawPile> {
pub raw_pile: R,
pub racy_submission_mutex: Mutex<HashSet<ChunkId>>,

View File

@ -24,7 +24,7 @@ use crossbeam_channel::{Receiver, Sender};
use log::error;
use zstd::block::{Compressor, Decompressor};
use crate::pile::{Keyspace, RawPile};
use crate::pile::{DebugStatistics, Keyspace, RawPile};
pub const DECOMPRESS_CAPACITY: usize = 32 * 1024 * 1024;
@ -40,6 +40,7 @@ pub struct CompressionSettings {
pub num_decompressors: u32,
}
#[derive(Debug)]
pub struct RawPileCompressor<R: RawPile> {
underlying: R,
compressor: Sender<(Vec<u8>, Sender<Vec<u8>>)>,
@ -172,4 +173,8 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
fn check_lowlevel(&self) -> anyhow::Result<bool> {
self.underlying.check_lowlevel()
}
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.underlying.debug_statistics()
}
}

View File

@ -30,6 +30,7 @@ use crate::pile::{Keyspace, RawPile};
/// to rely on that.
/// This feature will be revisited soon...
/// Notably, keys should be passed through a secure permutation first.
#[derive(Debug)]
pub struct RawPileEncryptor<R: RawPile> {
underlying: R,
secret_key: Key,

View File

@ -20,11 +20,12 @@ use std::hash::Hasher;
use thiserror::Error;
use crate::definitions::XXH64_SEED;
use crate::pile::{Keyspace, RawPile};
use crate::pile::{DebugStatistics, Keyspace, RawPile};
use crate::utils::bytes_to_hexstring;
/// This RawPile enables checking the integrity of stored chunks.
/// This is done by storing a hash along with the chunk contents, which can later be verified.
#[derive(Debug)]
pub struct RawPileIntegrityChecker<RP: RawPile> {
underlying: RP,
}
@ -108,4 +109,8 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
// TODO integrity check ...?
self.underlying.check_lowlevel()
}
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
self.underlying.debug_statistics()
}
}

View File

@ -19,7 +19,7 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::fs;
use std::fs::{File, OpenOptions};
use std::fs::{read_dir, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex};
@ -32,7 +32,7 @@ use rusqlite::{params, Error};
use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId;
use crate::pile::{Keyspace, RawPile};
use crate::pile::{DebugStatistics, Keyspace, RawPile};
use crate::utils::bytes_to_hexstring;
use rusqlite::ffi::ErrorCode::ConstraintViolation;
@ -46,6 +46,7 @@ pub const POINTER_WRITE_BATCHES: usize = 2048;
/// Format:
/// Repeated:
/// <32 byte ChunkId><u32: length><length × u8: data>
#[derive(Debug)]
pub struct Bloblog {
pub file: File,
}
@ -127,6 +128,7 @@ impl Bloblog {
pub type BloblogId = u32;
#[derive(Debug)]
pub struct Inner {
next_bloblog_id: BloblogId,
writer_bloblogs: Vec<BloblogId>,
@ -191,6 +193,7 @@ impl Inner {
/// Because random access is important for performance, an additional SQLite database is used
/// as a map from chunk IDs to their positions in the blob logs, allowing readers to seek to the
/// appropriate place and read a chunk randomly.
#[derive(Debug)]
pub struct SqliteBloblogPile {
inner: Arc<Mutex<Inner>>,
path: PathBuf,
@ -199,6 +202,7 @@ pub struct SqliteBloblogPile {
}
/// A pointer to a blob in a 'blob log'.
#[derive(Debug)]
pub struct BloblogPointer {
/// Which blob log the blob is stored in.
bloblog: BloblogId,
@ -546,6 +550,50 @@ impl RawPile for SqliteBloblogPile {
fn check_lowlevel(&self) -> anyhow::Result<bool> {
unimplemented!()
}
fn debug_statistics(&self) -> anyhow::Result<Option<DebugStatistics>> {
let inner = self.inner.lock().unwrap();
let chunk_count: i64 =
inner
.connection
.query_row("SELECT COUNT(1) FROM chunks", params![], |row| row.get(0))?;
let (deleted_chunk_count, deleted_chunk_space): (i64, i64) = inner.connection.query_row(
"SELECT COUNT(1), COALESCE(SUM(size), 0) FROM deleted",
params![],
|row| Ok((row.get(0)?, row.get(1)?)),
)?;
let mut total_on_disk_size = 0;
for dir_entry in read_dir(&self.path)? {
let dir_entry = dir_entry?;
if !dir_entry.file_type()?.is_file() {
continue;
}
if let Some(name) = dir_entry.file_name().to_str() {
if !name.chars().all(|c| c.is_numeric()) {
// bloblogs have numeric names.
continue;
}
total_on_disk_size += dir_entry.metadata()?.len();
}
}
// 32 bytes for the chunk ID.
// 4 bytes for the chunk length.
let chunk_overhead_per_chunk: u64 = 32 + 4;
let total_chunk_size = total_on_disk_size
- chunk_overhead_per_chunk * (deleted_chunk_count + chunk_count) as u64
- deleted_chunk_space as u64;
Ok(Some(DebugStatistics {
number_of_chunks: chunk_count.try_into().unwrap(),
minimum_chunk_size: None,
maximum_chunk_size: None,
total_chunk_size,
}))
}
}
struct KeyIterator {

View File

@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
/// A kind of RawPile which can make requests to a RawPile over a pipe (e.g. TCP socket or an
/// SSH connection).
/// The requests are handled by a `Responder` on the other end of the pipe.
#[derive(Debug)]
pub struct Requester {
commands: Sender<(RequestBody, Option<Sender<ResponseBody>>)>,
}