Add compaction logic

This commit is contained in:
Olivier 'reivilibre' 2022-11-19 15:27:41 +00:00
parent 30b261d172
commit 58c5c3f039
1 changed files with 341 additions and 4 deletions

View File

@ -15,19 +15,20 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::{HashMap, VecDeque}; use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::fs::{read_dir, File, OpenOptions}; use std::fs::{read_dir, remove_file, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write}; use std::io::{Read, Seek, SeekFrom, Write};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex}; use std::sync::{Arc, Condvar, Mutex};
use std::{fs, thread}; use std::{fs, thread};
use anyhow::{bail, Context}; use anyhow::{bail, ensure, Context};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use log::{info, warn}; use log::{info, warn};
use nix::unistd::sync; use nix::unistd::sync;
use rusqlite::{params, Error, ErrorCode, Transaction}; use rusqlite::{params, Error, ErrorCode, Transaction, TransactionBehavior, NO_PARAMS};
use rusqlite::{Connection, OptionalExtension}; use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId; use crate::definitions::ChunkId;
@ -35,6 +36,7 @@ use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile, ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings, StoragePipelineSettings,
}; };
use crate::progress::ProgressTracker;
use crate::utils::{bytes_to_hexstring, LruMap}; use crate::utils::{bytes_to_hexstring, LruMap};
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use rusqlite::ffi::ErrorCode::ConstraintViolation; use rusqlite::ffi::ErrorCode::ConstraintViolation;
@ -49,6 +51,11 @@ pub const POINTER_WRITE_BATCHES: usize = 2048;
/// This many bloblogs will be kept open for reading, at maximum. /// This many bloblogs will be kept open for reading, at maximum.
pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128; pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128;
/// Size of a blob header within a bloblog.
/// 32 byte Chunk Id
/// 4 byte (u32) Blob size
pub const BLOB_HEADER_SIZE: u64 = 32 + 4;
/// A file storing a log of blobs. /// A file storing a log of blobs.
/// Format: /// Format:
/// Repeated: /// Repeated:
@ -496,6 +503,336 @@ impl SqliteBloblogPile {
assert!(pointers_buffered.is_empty()); assert!(pointers_buffered.is_empty());
Ok(()) Ok(())
} }
/// Look at the bloblogs in this pile and see where space may be reclaimable if we were to
/// compact.
///
/// Next step: plan_compaction
pub fn analyse_for_compaction(&self) -> anyhow::Result<BTreeMap<BloblogId, BloblogStats>> {
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let mut stmt = txn.prepare(
"
SELECT bloblog, COUNT(c.offset), COUNT(d.offset), SUM(COALESCE(d.size, 0))
FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
GROUP BY bloblog
",
)?;
struct UnpopulatedBloblogStats {
pub bloblog_id: BloblogId,
pub chunks_total: u64,
pub chunks_deleted: u64,
pub bytes_deleted: u64,
}
let unpopul_bloblog_stats = stmt.query_map(NO_PARAMS, |row| {
Ok(UnpopulatedBloblogStats {
bloblog_id: row.get(0)?,
chunks_total: row.get::<_, i64>(1)?.try_into().expect("i64 -> u64"),
chunks_deleted: row.get::<_, i64>(2)?.try_into().expect("i64 -> u64"),
bytes_deleted: row.get::<_, i64>(3)?.try_into().expect("i64 -> u64"),
})
})?;
let mut final_stats = BTreeMap::new();
for unpopul_stat in unpopul_bloblog_stats {
let UnpopulatedBloblogStats {
bloblog_id,
chunks_total,
chunks_deleted,
bytes_deleted,
} = unpopul_stat?;
let bloblog_path = self.path.join(&bloblog_id.to_string());
let bytes_total = std::fs::metadata(&bloblog_path)
.with_context(|| format!("Failed to get metadata for bloblog: {:?}", bloblog_path))?
.size();
final_stats.insert(
bloblog_id,
BloblogStats {
chunks_total,
chunks_deleted,
bytes_total,
// Add a slight correction since we can count the blob headers of deleted blobs
// as deleted.
bytes_deleted: bytes_deleted + chunks_deleted * BLOB_HEADER_SIZE,
},
);
}
Ok(final_stats)
}
/// Look at the analysis of compaction and, using the specified thresholds, come up with a plan
/// to perform compaction.
///
/// May return an empty plan if compaction isn't worthwhile.
///
/// Previous step: analyse_for_compaction
/// Next step: perform_compaction
pub fn plan_compaction(
&self,
thresholds: &CompactionThresholds,
analysis: BTreeMap<BloblogId, BloblogStats>,
) -> anyhow::Result<CompactionPlan> {
let bloblogs_to_replace: BTreeMap<BloblogId, BloblogStats> = analysis
.into_iter()
.filter(|(_id, stats)| thresholds.should_replace_bloblog(stats))
.collect();
let reclaimable_space: u64 = bloblogs_to_replace
.values()
.map(|bs| bs.bytes_deleted)
.sum();
let bytes_to_write: u64 = bloblogs_to_replace
.values()
.map(|bs| bs.bytes_total - bs.bytes_deleted)
.sum();
let small_bloblogs: u32 = bloblogs_to_replace
.values()
.filter(|bs| bs.bytes_total - bs.bytes_deleted < thresholds.cond_if_less_allocated_than)
.count() as u32;
if reclaimable_space < thresholds.minimum_to_reclaim
&& small_bloblogs < thresholds.minimum_small_bloblogs_to_merge
{
// Nothing worth doing: return an empty plan.
return Ok(CompactionPlan {
bloblogs_to_replace: Default::default(),
bytes_to_write: 0,
reclaimable_space: 0,
small_bloblogs: 0,
});
}
Ok(CompactionPlan {
bloblogs_to_replace: bloblogs_to_replace.keys().copied().collect(),
bytes_to_write,
reclaimable_space,
small_bloblogs,
})
}
/// Given a compaction plan, perform the compaction.
/// There shouldn't be any decisions left to be made at this point: just action.
///
/// TODO flock the bloblogs to be removed and make readers and writers also flock them too.
///
/// TODO find a way to deal with bloblogs that are entirely unreferenced from the index
/// (e.g. bloblogs that weren't written properly, e.g. if compaction fails.)
pub fn perform_compaction(
&self,
mut progress: Box<dyn ProgressTracker>,
plan: CompactionPlan,
) -> anyhow::Result<()> {
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
struct ReplacedBlobRow {
pub old_bloblog: BloblogId,
pub old_offset: u64,
pub chunk_id: ChunkId,
}
let mut to_preserve = BTreeSet::new();
let mut replacements = BTreeMap::new();
progress.set_max_size(plan.bytes_to_write);
// First find all the blobs we need to replace.
{
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let mut stmt = txn.prepare(
"
SELECT chunk_id, c.offset
FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
WHERE bloblog = ?1 AND d.offset IS NULL
",
)?;
for bloblog in plan.bloblogs_to_replace.iter().copied() {
to_preserve.extend(
stmt.query_map([bloblog], |row| {
let mut chunk_id = ChunkId::default();
chunk_id.copy_from_slice(row.get::<_, Vec<u8>>(0).unwrap().as_slice());
Ok(ReplacedBlobRow {
old_bloblog: bloblog,
chunk_id,
old_offset: row.get::<_, i64>(1).unwrap().try_into().unwrap(),
})
})?
.collect::<Result<Vec<ReplacedBlobRow>, _>>()?,
);
}
}
// Then make the replacements
info!("Rewriting bloblogs...");
let mut buf = Vec::new();
let mut iterator = to_preserve.into_iter();
loop {
let (new_bloblog_id, bloglog_mutex) = self.get_writing_bloblog()?;
let mut new_bloblog = bloglog_mutex.lock().expect("Failed to lock bloblog?");
let mut is_more = false;
while let Some(preserve) = iterator.next() {
is_more = true;
// Get hold of the old bloblog
let old_bloblog = self.open_bloblog(preserve.old_bloblog)?;
let mut old_bloblog = old_bloblog.lock().unwrap();
// Transfer the blob
buf.clear();
old_bloblog.read_blob(preserve.old_offset, &preserve.chunk_id, &mut buf)?;
let new_offset = new_bloblog.write_blob(&preserve.chunk_id, &buf)?;
// Make a note of the replacement
replacements.insert(
preserve,
BloblogPointer {
bloblog: new_bloblog_id,
offset: new_offset,
},
);
progress.inc_progress(buf.len() as u64);
if new_bloblog.filesize()? > MAX_BLOBLOG_REUSE_SIZE {
// get a new bloblog to write with.
break;
}
}
drop(new_bloblog);
self.return_writing_bloblog(new_bloblog_id, bloglog_mutex)?;
if !is_more {
break;
}
}
info!("Applying replacements...");
{
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let mut stmt = txn.prepare(
"
UPDATE chunks
SET bloblog = ?1, offset = ?2
WHERE chunk_id = ?3
",
)?;
for (replacement_row, new_pos) in replacements {
ensure!(
stmt.execute(params![
new_pos.bloblog,
new_pos.offset as i64,
&replacement_row.chunk_id as &[u8]
])? == 1,
"Wrong number of rows updated for replacement!"
);
}
drop(stmt);
txn.commit().context("committing replacements")?;
}
// TODO fsync new bloblogs
info!("Deleting old bloblogs...");
{
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
for bloblog_id in plan.bloblogs_to_replace.iter().copied() {
let deleted_chunks = txn.execute(
"
DELETE FROM chunks WHERE bloblog = ?1
",
params![bloblog_id],
)?;
let deleted_deleted = txn.execute(
"
DELETE FROM deleted WHERE bloblog = ?1
",
params![bloblog_id],
)?;
ensure!(deleted_chunks == deleted_deleted, "Undeleted chunks left in bloblog {bloblog_id}: CHUNKS={deleted_chunks} DELETED={deleted_deleted}");
let bloblog_path = self.path.join(bloblog_id.to_string());
remove_file(&bloblog_path).with_context(|| {
format!("Failed to remove obsolete bloblog: {:?}", bloblog_path)
})?;
}
txn.commit()?;
}
Ok(())
}
}
pub struct BloblogStats {
pub chunks_total: u64,
pub chunks_deleted: u64,
pub bytes_total: u64,
pub bytes_deleted: u64,
}
pub struct CompactionPlan {
pub bloblogs_to_replace: BTreeSet<BloblogId>,
pub bytes_to_write: u64,
pub reclaimable_space: u64,
pub small_bloblogs: u32,
}
pub struct CompactionThresholds {
/// Minimum bytes to be reclaimable overall for compaction to be worthwhile.
pub minimum_to_reclaim: u64,
/// (alternative reason) Minimum number of files to be undersized in order for compaction
/// to be worthwhile.
/// This gives us a way to make compaction run if we have lots of tiny bloblogs.
pub minimum_small_bloblogs_to_merge: u32,
/// A bloblog will be replaced if the deallocated size is greater than this.
pub cond_if_more_deallocated_than: u64,
/// A bloblog will be replaced if the allocated size is less than this.
pub cond_if_less_allocated_than: u64,
}
impl CompactionThresholds {
pub fn should_replace_bloblog(&self, bloblog_stats: &BloblogStats) -> bool {
let allocated = bloblog_stats.bytes_total - bloblog_stats.bytes_deleted;
// Note that this will also trigger for fully-deallocated files if
let is_small = allocated < self.cond_if_less_allocated_than;
let has_large_deallocations =
bloblog_stats.bytes_deleted > self.cond_if_more_deallocated_than;
is_small || has_large_deallocations
}
}
pub struct CompactionOutcome {
pub bloblogs_deleted: u32,
pub bloblogs_created: u32,
pub bytes_deleted: u32,
pub bytes_created: u32,
} }
impl Drop for SqliteBloblogPile { impl Drop for SqliteBloblogPile {