Batch up pointer writes to reduce sync() bottleneck in SQLite
Some checks failed
continuous-integration/drone the build failed

This commit is contained in:
Olivier 'reivilibre' 2021-10-27 22:08:28 +01:00
parent 1ba2f09676
commit dbd4d42ac6

View File

@ -39,6 +39,9 @@ use rusqlite::ffi::ErrorCode::ConstraintViolation;
/// Bloblogs will not be reused if they are already 2 GiB large. /// Bloblogs will not be reused if they are already 2 GiB large.
pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024; pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
/// This many pointers will be batched up for writing.
pub const POINTER_WRITE_BATCHES: usize = 2048;
/// A file storing a log of blobs. /// A file storing a log of blobs.
/// Format: /// Format:
/// Repeated: /// Repeated:
@ -130,6 +133,54 @@ pub struct Inner {
open_bloblogs: HashMap<BloblogId, Arc<Mutex<Bloblog>>>, // TODO want an LRU cache with a weak hashmap...? open_bloblogs: HashMap<BloblogId, Arc<Mutex<Bloblog>>>, // TODO want an LRU cache with a weak hashmap...?
connection: Connection, connection: Connection,
writers_in_progress: u16, writers_in_progress: u16,
// We batch up pointer writes because sync() performance really hurts us if we do them one by
// one.
queued_pointer_writes: HashMap<ChunkId, BloblogPointer>,
}
impl Inner {
pub fn raw_put_chunk_pointer(
&self,
chunk_id: &ChunkId,
bloblog: BloblogId,
offset_i64: i64,
) -> anyhow::Result<()> {
match self.connection.execute(
"INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)",
params![&chunk_id[..], bloblog, offset_i64],
) {
Ok(_) => Ok(()),
Err(Error::SqliteFailure(e, str)) => {
if e.code == ConstraintViolation {
warn!(
"(ignoring) SQLite constraint violation on insertion... {:?}",
str
);
Ok(())
} else {
Err(Error::SqliteFailure(e, str))?;
unreachable!();
}
}
other => {
other?;
unreachable!();
}
}
}
pub fn flush(&mut self) -> anyhow::Result<()> {
// Create a non-allocated hashmap to satisfy borrow checker, then swap it in and out
let mut queued_pointer_writes = HashMap::with_capacity(0);
std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes);
for (chunk_id, pointer) in queued_pointer_writes.drain() {
let offset_i64 =
i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64...");
self.raw_put_chunk_pointer(&chunk_id, pointer.bloblog, offset_i64)?;
}
std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes);
Ok(())
}
} }
/// A Pile built on the idea of SQLite-indexed 'blob logs'. /// A Pile built on the idea of SQLite-indexed 'blob logs'.
@ -144,6 +195,7 @@ pub struct SqliteBloblogPile {
inner: Arc<Mutex<Inner>>, inner: Arc<Mutex<Inner>>,
path: PathBuf, path: PathBuf,
writers_reach_zero: Condvar, writers_reach_zero: Condvar,
should_batch_pointer_writes: bool,
} }
/// A pointer to a blob in a 'blob log'. /// A pointer to a blob in a 'blob log'.
@ -193,9 +245,11 @@ impl SqliteBloblogPile {
open_bloblogs: HashMap::new(), open_bloblogs: HashMap::new(),
connection, connection,
writers_in_progress: 0, writers_in_progress: 0,
queued_pointer_writes: Default::default(),
})), })),
path: path.to_owned(), path: path.to_owned(),
writers_reach_zero: Default::default(), writers_reach_zero: Default::default(),
should_batch_pointer_writes: true,
}) })
} }
@ -287,28 +341,34 @@ impl SqliteBloblogPile {
fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> { fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> {
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64...");
match inner.connection.execute( inner.raw_put_chunk_pointer(chunk_id, pointer.bloblog, offset_i64)
"INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)", }
params![&chunk_id[..], pointer.bloblog, offset_i64],
) { fn batched_put_chunk_pointer(
Ok(_) => Ok(()), &self,
Err(Error::SqliteFailure(e, str)) => { chunk_id: &ChunkId,
if e.code == ConstraintViolation { pointer: BloblogPointer,
warn!( ) -> anyhow::Result<()> {
"(ignoring) SQLite constraint violation on insertion... {:?}", let mut inner = self.inner.lock().unwrap();
str inner
); .queued_pointer_writes
.insert(chunk_id.clone(), pointer);
if inner.queued_pointer_writes.len() >= POINTER_WRITE_BATCHES {
inner.flush()?;
}
Ok(()) Ok(())
} else {
Err(Error::SqliteFailure(e, str))?;
unreachable!();
}
}
other => {
other?;
unreachable!();
} }
fn flush_queued_pointer_writes(&self) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.flush()
} }
}
impl Drop for SqliteBloblogPile {
fn drop(&mut self) {
self.flush_queued_pointer_writes()
.expect("POSSIBILITY OF LOSS OF NEW DATA: failed to flush queued pointer writes!");
} }
} }
@ -385,7 +445,11 @@ impl RawPile for SqliteBloblogPile {
offset, offset,
}; };
if self.should_batch_pointer_writes {
self.batched_put_chunk_pointer(&chunk_id, pointer)?;
} else {
self.put_chunk_pointer(&chunk_id, pointer)?; self.put_chunk_pointer(&chunk_id, pointer)?;
}
Ok(()) Ok(())
} }
Keyspace::ChunkHash => unimplemented!(), Keyspace::ChunkHash => unimplemented!(),