diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs
index 33e9e8f..a37ddf8 100644
--- a/yama/src/pile/local_sqlitebloblogs.rs
+++ b/yama/src/pile/local_sqlitebloblogs.rs
@@ -15,19 +15,20 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see .
*/
-use std::collections::{HashMap, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::convert::{TryFrom, TryInto};
-use std::fs::{read_dir, File, OpenOptions};
+use std::fs::{read_dir, remove_file, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
+use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex};
use std::{fs, thread};
-use anyhow::{bail, Context};
+use anyhow::{bail, ensure, Context};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use log::{info, warn};
use nix::unistd::sync;
-use rusqlite::{params, Error, ErrorCode, Transaction};
+use rusqlite::{params, Error, ErrorCode, Transaction, TransactionBehavior, NO_PARAMS};
use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId;
@@ -35,6 +36,7 @@ use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings,
};
+use crate::progress::ProgressTracker;
use crate::utils::{bytes_to_hexstring, LruMap};
use crossbeam_channel::{Receiver, Sender};
use rusqlite::ffi::ErrorCode::ConstraintViolation;
@@ -49,6 +51,11 @@ pub const POINTER_WRITE_BATCHES: usize = 2048;
/// This many bloblogs will be kept open for reading, at maximum.
pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128;
+/// Size of a blob header within a bloblog.
+/// 32 byte Chunk Id
+/// 4 byte (u32) Blob size
+pub const BLOB_HEADER_SIZE: u64 = 32 + 4;
+
/// A file storing a log of blobs.
/// Format:
/// Repeated:
@@ -496,6 +503,336 @@ impl SqliteBloblogPile {
assert!(pointers_buffered.is_empty());
Ok(())
}
+
+ /// Look at the bloblogs in this pile and see where space may be reclaimable if we were to
+ /// compact.
+ ///
+ /// Next step: plan_compaction
+ pub fn analyse_for_compaction(&self) -> anyhow::Result> {
+ let mut inner = self.inner.lock().unwrap();
+ // Lock the database right away.
+ let txn = inner
+ .connection
+ .transaction_with_behavior(TransactionBehavior::Exclusive)?;
+ let mut stmt = txn.prepare(
+ "
+ SELECT bloblog, COUNT(c.offset), COUNT(d.offset), SUM(COALESCE(d.size, 0))
+ FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
+ GROUP BY bloblog
+ ",
+ )?;
+
+ struct UnpopulatedBloblogStats {
+ pub bloblog_id: BloblogId,
+ pub chunks_total: u64,
+ pub chunks_deleted: u64,
+ pub bytes_deleted: u64,
+ }
+
+ let unpopul_bloblog_stats = stmt.query_map(NO_PARAMS, |row| {
+ Ok(UnpopulatedBloblogStats {
+ bloblog_id: row.get(0)?,
+ chunks_total: row.get::<_, i64>(1)?.try_into().expect("i64 -> u64"),
+ chunks_deleted: row.get::<_, i64>(2)?.try_into().expect("i64 -> u64"),
+ bytes_deleted: row.get::<_, i64>(3)?.try_into().expect("i64 -> u64"),
+ })
+ })?;
+
+ let mut final_stats = BTreeMap::new();
+
+ for unpopul_stat in unpopul_bloblog_stats {
+ let UnpopulatedBloblogStats {
+ bloblog_id,
+ chunks_total,
+ chunks_deleted,
+ bytes_deleted,
+ } = unpopul_stat?;
+ let bloblog_path = self.path.join(&bloblog_id.to_string());
+ let bytes_total = std::fs::metadata(&bloblog_path)
+ .with_context(|| format!("Failed to get metadata for bloblog: {:?}", bloblog_path))?
+ .size();
+
+ final_stats.insert(
+ bloblog_id,
+ BloblogStats {
+ chunks_total,
+ chunks_deleted,
+ bytes_total,
+ // Add a slight correction since we can count the blob headers of deleted blobs
+ // as deleted.
+ bytes_deleted: bytes_deleted + chunks_deleted * BLOB_HEADER_SIZE,
+ },
+ );
+ }
+
+ Ok(final_stats)
+ }
+
+ /// Look at the analysis of compaction and, using the specified thresholds, come up with a plan
+ /// to perform compaction.
+ ///
+ /// May return an empty plan if compaction isn't worthwhile.
+ ///
+ /// Previous step: analyse_for_compaction
+ /// Next step: perform_compaction
+ pub fn plan_compaction(
+ &self,
+ thresholds: &CompactionThresholds,
+ analysis: BTreeMap,
+ ) -> anyhow::Result {
+ let bloblogs_to_replace: BTreeMap = analysis
+ .into_iter()
+ .filter(|(_id, stats)| thresholds.should_replace_bloblog(stats))
+ .collect();
+ let reclaimable_space: u64 = bloblogs_to_replace
+ .values()
+ .map(|bs| bs.bytes_deleted)
+ .sum();
+ let bytes_to_write: u64 = bloblogs_to_replace
+ .values()
+ .map(|bs| bs.bytes_total - bs.bytes_deleted)
+ .sum();
+ let small_bloblogs: u32 = bloblogs_to_replace
+ .values()
+ .filter(|bs| bs.bytes_total - bs.bytes_deleted < thresholds.cond_if_less_allocated_than)
+ .count() as u32;
+
+ if reclaimable_space < thresholds.minimum_to_reclaim
+ && small_bloblogs < thresholds.minimum_small_bloblogs_to_merge
+ {
+ // Nothing worth doing: return an empty plan.
+ return Ok(CompactionPlan {
+ bloblogs_to_replace: Default::default(),
+ bytes_to_write: 0,
+ reclaimable_space: 0,
+ small_bloblogs: 0,
+ });
+ }
+
+ Ok(CompactionPlan {
+ bloblogs_to_replace: bloblogs_to_replace.keys().copied().collect(),
+ bytes_to_write,
+ reclaimable_space,
+ small_bloblogs,
+ })
+ }
+
+ /// Given a compaction plan, perform the compaction.
+ /// There shouldn't be any decisions left to be made at this point: just action.
+ ///
+ /// TODO flock the bloblogs to be removed and make readers and writers also flock them too.
+ ///
+ /// TODO find a way to deal with bloblogs that are entirely unreferenced from the index
+ /// (e.g. bloblogs that weren't written properly, e.g. if compaction fails.)
+ pub fn perform_compaction(
+ &self,
+ mut progress: Box,
+ plan: CompactionPlan,
+ ) -> anyhow::Result<()> {
+ #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
+ struct ReplacedBlobRow {
+ pub old_bloblog: BloblogId,
+ pub old_offset: u64,
+ pub chunk_id: ChunkId,
+ }
+
+ let mut to_preserve = BTreeSet::new();
+ let mut replacements = BTreeMap::new();
+
+ progress.set_max_size(plan.bytes_to_write);
+
+ // First find all the blobs we need to replace.
+ {
+ let mut inner = self.inner.lock().unwrap();
+ // Lock the database right away.
+ let txn = inner
+ .connection
+ .transaction_with_behavior(TransactionBehavior::Exclusive)?;
+ let mut stmt = txn.prepare(
+ "
+ SELECT chunk_id, c.offset
+ FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
+ WHERE bloblog = ?1 AND d.offset IS NULL
+ ",
+ )?;
+ for bloblog in plan.bloblogs_to_replace.iter().copied() {
+ to_preserve.extend(
+ stmt.query_map([bloblog], |row| {
+ let mut chunk_id = ChunkId::default();
+ chunk_id.copy_from_slice(row.get::<_, Vec>(0).unwrap().as_slice());
+ Ok(ReplacedBlobRow {
+ old_bloblog: bloblog,
+ chunk_id,
+ old_offset: row.get::<_, i64>(1).unwrap().try_into().unwrap(),
+ })
+ })?
+ .collect::, _>>()?,
+ );
+ }
+ }
+
+ // Then make the replacements
+ info!("Rewriting bloblogs...");
+ let mut buf = Vec::new();
+ let mut iterator = to_preserve.into_iter();
+ loop {
+ let (new_bloblog_id, bloglog_mutex) = self.get_writing_bloblog()?;
+ let mut new_bloblog = bloglog_mutex.lock().expect("Failed to lock bloblog?");
+ let mut is_more = false;
+
+ while let Some(preserve) = iterator.next() {
+ is_more = true;
+
+ // Get hold of the old bloblog
+ let old_bloblog = self.open_bloblog(preserve.old_bloblog)?;
+ let mut old_bloblog = old_bloblog.lock().unwrap();
+
+ // Transfer the blob
+ buf.clear();
+ old_bloblog.read_blob(preserve.old_offset, &preserve.chunk_id, &mut buf)?;
+ let new_offset = new_bloblog.write_blob(&preserve.chunk_id, &buf)?;
+
+ // Make a note of the replacement
+ replacements.insert(
+ preserve,
+ BloblogPointer {
+ bloblog: new_bloblog_id,
+ offset: new_offset,
+ },
+ );
+
+ progress.inc_progress(buf.len() as u64);
+
+ if new_bloblog.filesize()? > MAX_BLOBLOG_REUSE_SIZE {
+ // get a new bloblog to write with.
+ break;
+ }
+ }
+
+ drop(new_bloblog);
+ self.return_writing_bloblog(new_bloblog_id, bloglog_mutex)?;
+
+ if !is_more {
+ break;
+ }
+ }
+
+ info!("Applying replacements...");
+ {
+ let mut inner = self.inner.lock().unwrap();
+ // Lock the database right away.
+ let txn = inner
+ .connection
+ .transaction_with_behavior(TransactionBehavior::Exclusive)?;
+ let mut stmt = txn.prepare(
+ "
+ UPDATE chunks
+ SET bloblog = ?1, offset = ?2
+ WHERE chunk_id = ?3
+ ",
+ )?;
+
+ for (replacement_row, new_pos) in replacements {
+ ensure!(
+ stmt.execute(params![
+ new_pos.bloblog,
+ new_pos.offset as i64,
+ &replacement_row.chunk_id as &[u8]
+ ])? == 1,
+ "Wrong number of rows updated for replacement!"
+ );
+ }
+
+ drop(stmt);
+ txn.commit().context("committing replacements")?;
+ }
+
+ // TODO fsync new bloblogs
+
+ info!("Deleting old bloblogs...");
+ {
+ let mut inner = self.inner.lock().unwrap();
+ // Lock the database right away.
+ let txn = inner
+ .connection
+ .transaction_with_behavior(TransactionBehavior::Exclusive)?;
+
+ for bloblog_id in plan.bloblogs_to_replace.iter().copied() {
+ let deleted_chunks = txn.execute(
+ "
+ DELETE FROM chunks WHERE bloblog = ?1
+ ",
+ params![bloblog_id],
+ )?;
+
+ let deleted_deleted = txn.execute(
+ "
+ DELETE FROM deleted WHERE bloblog = ?1
+ ",
+ params![bloblog_id],
+ )?;
+
+ ensure!(deleted_chunks == deleted_deleted, "Undeleted chunks left in bloblog {bloblog_id}: CHUNKS={deleted_chunks} DELETED={deleted_deleted}");
+
+ let bloblog_path = self.path.join(bloblog_id.to_string());
+ remove_file(&bloblog_path).with_context(|| {
+ format!("Failed to remove obsolete bloblog: {:?}", bloblog_path)
+ })?;
+ }
+
+ txn.commit()?;
+ }
+
+ Ok(())
+ }
+}
+
+pub struct BloblogStats {
+ pub chunks_total: u64,
+ pub chunks_deleted: u64,
+ pub bytes_total: u64,
+ pub bytes_deleted: u64,
+}
+
+pub struct CompactionPlan {
+ pub bloblogs_to_replace: BTreeSet,
+ pub bytes_to_write: u64,
+ pub reclaimable_space: u64,
+ pub small_bloblogs: u32,
+}
+
+pub struct CompactionThresholds {
+ /// Minimum bytes to be reclaimable overall for compaction to be worthwhile.
+ pub minimum_to_reclaim: u64,
+
+ /// (alternative reason) Minimum number of files to be undersized in order for compaction
+ /// to be worthwhile.
+ /// This gives us a way to make compaction run if we have lots of tiny bloblogs.
+ pub minimum_small_bloblogs_to_merge: u32,
+
+ /// A bloblog will be replaced if the deallocated size is greater than this.
+ pub cond_if_more_deallocated_than: u64,
+
+ /// A bloblog will be replaced if the allocated size is less than this.
+ pub cond_if_less_allocated_than: u64,
+}
+
+impl CompactionThresholds {
+ pub fn should_replace_bloblog(&self, bloblog_stats: &BloblogStats) -> bool {
+ let allocated = bloblog_stats.bytes_total - bloblog_stats.bytes_deleted;
+ // Note that this will also trigger for fully-deallocated files if
+ let is_small = allocated < self.cond_if_less_allocated_than;
+ let has_large_deallocations =
+ bloblog_stats.bytes_deleted > self.cond_if_more_deallocated_than;
+ is_small || has_large_deallocations
+ }
+}
+
+pub struct CompactionOutcome {
+ pub bloblogs_deleted: u32,
+ pub bloblogs_created: u32,
+ pub bytes_deleted: u32,
+ pub bytes_created: u32,
}
impl Drop for SqliteBloblogPile {