From 9001177143675f9c980ea288fa047b933660a256 Mon Sep 17 00:00:00 2001 From: Olivier Date: Mon, 28 Nov 2022 21:03:07 +0000 Subject: [PATCH] Batch up chunk deletions in an attempt to make vacuuming more performant --- yama/src/operations/checking.rs | 24 +++++++-- yama/src/pile.rs | 7 +++ yama/src/pile/access_guard.rs | 4 ++ yama/src/pile/compression.rs | 4 ++ yama/src/pile/encryption.rs | 4 ++ yama/src/pile/integrity.rs | 4 ++ yama/src/pile/local_sqlitebloblogs.rs | 78 +++++++++++++++++++++++++++ yama/src/remote/requester.rs | 6 +++ 8 files changed, 128 insertions(+), 3 deletions(-) diff --git a/yama/src/operations/checking.rs b/yama/src/operations/checking.rs index a218ce8..0e89a5f 100644 --- a/yama/src/operations/checking.rs +++ b/yama/src/operations/checking.rs @@ -24,8 +24,10 @@ use crate::pile::{ use anyhow::bail; use crossbeam_channel::Sender; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; +use itertools::Itertools; use log::{error, info, warn}; use std::collections::HashSet; +use std::convert::TryInto; use std::io::{Read, Write}; use std::sync::Mutex; @@ -110,6 +112,10 @@ impl RawPile for VacuumRawPile { self.underlying.delete(kind, key) } + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + self.underlying.delete_many(kind, keys) + } + fn list_keys( &self, kind: Keyspace, @@ -407,9 +413,21 @@ pub fn check_shallow( // actually do the vacuum! info!("Going to vacuum them up."); - for vacuum_id in to_vacuum { - pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?; - pbar.inc(1); + for vacuum_ids_chunk in to_vacuum + .into_iter() + .chunks(512) + .into_iter() + .map(|c| c.collect::>()) + { + pile.raw_pile.delete_many( + Keyspace::Chunk, + vacuum_ids_chunk + .iter() + .map(|ci| ci.as_slice()) + .collect::>() + .as_slice(), + )?; + pbar.inc(vacuum_ids_chunk.len().try_into().unwrap()); } pile.flush()?; pbar.finish_and_clear(); diff --git a/yama/src/pile.rs b/yama/src/pile.rs index caaa673..78926f9 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -141,6 +141,7 @@ pub trait RawPile: Send + Sync + Debug + 'static { fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result>>; fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()>; fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()>; + fn delete_many(&self, kind: Keyspace, key: &[&[u8]]) -> anyhow::Result<()>; fn list_keys( &self, kind: Keyspace, @@ -186,6 +187,9 @@ impl RawPile for Box { fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> { self.as_ref().delete(kind, key) } + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + self.as_ref().delete_many(kind, keys) + } fn list_keys( &self, kind: Keyspace, @@ -233,6 +237,9 @@ impl RawPile for Arc { fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> { self.as_ref().delete(kind, key) } + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + self.as_ref().delete_many(kind, keys) + } fn list_keys( &self, kind: Keyspace, diff --git a/yama/src/pile/access_guard.rs b/yama/src/pile/access_guard.rs index 63720af..59a2bef 100644 --- a/yama/src/pile/access_guard.rs +++ b/yama/src/pile/access_guard.rs @@ -83,6 +83,10 @@ impl RawPile for PileGuard { bail!("Access denied"); } + fn delete_many(&self, _kind: Keyspace, _keys: &[&[u8]]) -> anyhow::Result<()> { + bail!("Access denied"); + } + fn list_keys( &self, _kind: Keyspace, diff --git a/yama/src/pile/compression.rs b/yama/src/pile/compression.rs index e79b67b..c6ef733 100644 --- a/yama/src/pile/compression.rs +++ b/yama/src/pile/compression.rs @@ -278,6 +278,10 @@ impl RawPile for RawPileCompressor { self.underlying.delete(kind, key) } + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + self.underlying.delete_many(kind, keys) + } + fn list_keys( &self, kind: Keyspace, diff --git a/yama/src/pile/encryption.rs b/yama/src/pile/encryption.rs index 5a0e548..53584cf 100644 --- a/yama/src/pile/encryption.rs +++ b/yama/src/pile/encryption.rs @@ -101,6 +101,10 @@ impl RawPile for RawPileEncryptor { self.underlying.delete(kind, key) } + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + self.underlying.delete_many(kind, keys) + } + fn list_keys( &self, kind: Keyspace, diff --git a/yama/src/pile/integrity.rs b/yama/src/pile/integrity.rs index 4326580..a41ab65 100644 --- a/yama/src/pile/integrity.rs +++ b/yama/src/pile/integrity.rs @@ -98,6 +98,10 @@ impl RawPile for RawPileIntegrityChecker { self.underlying.delete(kind, key) } + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + self.underlying.delete_many(kind, keys) + } + fn list_keys( &self, kind: Keyspace, diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index b4cdac3..988c7fd 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -378,6 +378,31 @@ impl SqliteBloblogPile { .optional()?) } + fn get_chunk_pointers( + &self, + chunk_ids: &[&[u8]], + ) -> anyhow::Result>> { + let mut inner = self.inner.lock().unwrap(); + let txn = inner.connection.transaction()?; + let mut result = Vec::with_capacity(chunk_ids.len()); + { + let mut stmt = txn.prepare("SELECT bloblog, offset FROM chunks WHERE chunk_id = ?1")?; + for &chunk_id in chunk_ids { + let bloglog_pointer: Option = stmt + .query_row(params![chunk_id], |row| { + Ok(BloblogPointer { + bloblog: row.get(0)?, + offset: row.get::<_, i64>(1)? as u64, + }) + }) + .optional()?; + result.push(bloglog_pointer); + } + } + txn.commit()?; + Ok(result) + } + fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> { let mut inner = self.inner.lock().unwrap(); let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); @@ -982,6 +1007,59 @@ impl RawPile for SqliteBloblogPile { } } } + + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + match kind { + Keyspace::Chunk => { + let mut chunk_pointers_by_bloblog: BTreeMap> = + BTreeMap::new(); + + for (chunk_pointer, chunk_id) in self + .get_chunk_pointers(keys) + .context("failed to get chunk pointers")? + .into_iter() + .zip(keys) + .filter_map(|(pointer, &chunk_id)| match pointer { + Some(pointer) => Some((pointer, chunk_id)), + None => None, + }) + { + chunk_pointers_by_bloblog + .entry(chunk_pointer.bloblog) + .or_default() + .push((chunk_pointer.offset, chunk_id)); + } + let mut inner = self.inner.lock().unwrap(); + let txn = inner.connection.transaction()?; + { + let mut stmt = txn.prepare( + "INSERT OR IGNORE INTO deleted (bloblog, offset, size) + VALUES (?1, ?2, ?3)", + )?; + for (bloblog_id, entries) in chunk_pointers_by_bloblog { + let bloblog_mutex = self.open_bloblog(bloblog_id)?; + let mut bloblog = bloblog_mutex.lock().unwrap(); + for (chunk_offset, raw_chunk_id) in entries { + let mut chunk_id: ChunkId = Default::default(); + chunk_id.copy_from_slice(raw_chunk_id); + let size = bloblog.blob_len(chunk_offset, &chunk_id)?; + let offset_i64 = i64::try_from(chunk_offset) + .expect("ouch! can't turn u64 into i64..."); + stmt.execute(params![bloblog_id, offset_i64, size])?; + } + } + } + txn.commit().context("Failed to commit chunk deletions")?; + } + _ => { + for &key in keys { + self.delete(kind, key)?; + } + } + } + Ok(()) + } + fn list_keys( &self, kind: Keyspace, diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 8d07985..d8aa81d 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -307,6 +307,12 @@ impl RawPile for Requester { other => Err(anyhow!("Received {:?} for Delete", other)), } } + fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> { + for &key in keys { + self.delete(kind, key)?; + } + Ok(()) + } fn list_keys( &self, kind: Keyspace,