Batch up chunk deletions in an attempt to make vacuuming more performant
This commit is contained in:
parent
c9d64b2962
commit
9001177143
|
@ -24,8 +24,10 @@ use crate::pile::{
|
|||
use anyhow::bail;
|
||||
use crossbeam_channel::Sender;
|
||||
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
|
||||
use itertools::Itertools;
|
||||
use log::{error, info, warn};
|
||||
use std::collections::HashSet;
|
||||
use std::convert::TryInto;
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::Mutex;
|
||||
|
||||
|
@ -110,6 +112,10 @@ impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
|
|||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
@ -407,9 +413,21 @@ pub fn check_shallow<RP: RawPile>(
|
|||
|
||||
// actually do the vacuum!
|
||||
info!("Going to vacuum them up.");
|
||||
for vacuum_id in to_vacuum {
|
||||
pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
|
||||
pbar.inc(1);
|
||||
for vacuum_ids_chunk in to_vacuum
|
||||
.into_iter()
|
||||
.chunks(512)
|
||||
.into_iter()
|
||||
.map(|c| c.collect::<Vec<ChunkId>>())
|
||||
{
|
||||
pile.raw_pile.delete_many(
|
||||
Keyspace::Chunk,
|
||||
vacuum_ids_chunk
|
||||
.iter()
|
||||
.map(|ci| ci.as_slice())
|
||||
.collect::<Vec<&[u8]>>()
|
||||
.as_slice(),
|
||||
)?;
|
||||
pbar.inc(vacuum_ids_chunk.len().try_into().unwrap());
|
||||
}
|
||||
pile.flush()?;
|
||||
pbar.finish_and_clear();
|
||||
|
|
|
@ -141,6 +141,7 @@ pub trait RawPile: Send + Sync + Debug + 'static {
|
|||
fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>>;
|
||||
fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()>;
|
||||
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()>;
|
||||
fn delete_many(&self, kind: Keyspace, key: &[&[u8]]) -> anyhow::Result<()>;
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
@ -186,6 +187,9 @@ impl RawPile for Box<dyn RawPile> {
|
|||
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete(kind, key)
|
||||
}
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete_many(kind, keys)
|
||||
}
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
@ -233,6 +237,9 @@ impl<RP: RawPile> RawPile for Arc<RP> {
|
|||
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete(kind, key)
|
||||
}
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete_many(kind, keys)
|
||||
}
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
|
|
@ -83,6 +83,10 @@ impl<R: Clone + RawPile> RawPile for PileGuard<R> {
|
|||
bail!("Access denied");
|
||||
}
|
||||
|
||||
fn delete_many(&self, _kind: Keyspace, _keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
bail!("Access denied");
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
_kind: Keyspace,
|
||||
|
|
|
@ -278,6 +278,10 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
|
|||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
|
|
@ -101,6 +101,10 @@ impl<R: RawPile> RawPile for RawPileEncryptor<R> {
|
|||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
|
|
@ -98,6 +98,10 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
|
|||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
|
|
@ -378,6 +378,31 @@ impl SqliteBloblogPile {
|
|||
.optional()?)
|
||||
}
|
||||
|
||||
fn get_chunk_pointers(
|
||||
&self,
|
||||
chunk_ids: &[&[u8]],
|
||||
) -> anyhow::Result<Vec<Option<BloblogPointer>>> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let txn = inner.connection.transaction()?;
|
||||
let mut result = Vec::with_capacity(chunk_ids.len());
|
||||
{
|
||||
let mut stmt = txn.prepare("SELECT bloblog, offset FROM chunks WHERE chunk_id = ?1")?;
|
||||
for &chunk_id in chunk_ids {
|
||||
let bloglog_pointer: Option<BloblogPointer> = stmt
|
||||
.query_row(params![chunk_id], |row| {
|
||||
Ok(BloblogPointer {
|
||||
bloblog: row.get(0)?,
|
||||
offset: row.get::<_, i64>(1)? as u64,
|
||||
})
|
||||
})
|
||||
.optional()?;
|
||||
result.push(bloglog_pointer);
|
||||
}
|
||||
}
|
||||
txn.commit()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64...");
|
||||
|
@ -982,6 +1007,59 @@ impl RawPile for SqliteBloblogPile {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
match kind {
|
||||
Keyspace::Chunk => {
|
||||
let mut chunk_pointers_by_bloblog: BTreeMap<BloblogId, Vec<(u64, &[u8])>> =
|
||||
BTreeMap::new();
|
||||
|
||||
for (chunk_pointer, chunk_id) in self
|
||||
.get_chunk_pointers(keys)
|
||||
.context("failed to get chunk pointers")?
|
||||
.into_iter()
|
||||
.zip(keys)
|
||||
.filter_map(|(pointer, &chunk_id)| match pointer {
|
||||
Some(pointer) => Some((pointer, chunk_id)),
|
||||
None => None,
|
||||
})
|
||||
{
|
||||
chunk_pointers_by_bloblog
|
||||
.entry(chunk_pointer.bloblog)
|
||||
.or_default()
|
||||
.push((chunk_pointer.offset, chunk_id));
|
||||
}
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let txn = inner.connection.transaction()?;
|
||||
{
|
||||
let mut stmt = txn.prepare(
|
||||
"INSERT OR IGNORE INTO deleted (bloblog, offset, size)
|
||||
VALUES (?1, ?2, ?3)",
|
||||
)?;
|
||||
for (bloblog_id, entries) in chunk_pointers_by_bloblog {
|
||||
let bloblog_mutex = self.open_bloblog(bloblog_id)?;
|
||||
let mut bloblog = bloblog_mutex.lock().unwrap();
|
||||
for (chunk_offset, raw_chunk_id) in entries {
|
||||
let mut chunk_id: ChunkId = Default::default();
|
||||
chunk_id.copy_from_slice(raw_chunk_id);
|
||||
let size = bloblog.blob_len(chunk_offset, &chunk_id)?;
|
||||
let offset_i64 = i64::try_from(chunk_offset)
|
||||
.expect("ouch! can't turn u64 into i64...");
|
||||
stmt.execute(params![bloblog_id, offset_i64, size])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
txn.commit().context("Failed to commit chunk deletions")?;
|
||||
}
|
||||
_ => {
|
||||
for &key in keys {
|
||||
self.delete(kind, key)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
|
|
@ -307,6 +307,12 @@ impl RawPile for Requester {
|
|||
other => Err(anyhow!("Received {:?} for Delete", other)),
|
||||
}
|
||||
}
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
for &key in keys {
|
||||
self.delete(kind, key)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
|
|
Loading…
Reference in New Issue