diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index e39da66..8b6d9b9 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -39,6 +39,9 @@ use rusqlite::ffi::ErrorCode::ConstraintViolation; /// Bloblogs will not be reused if they are already 2 GiB large. pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024; +/// This many pointers will be batched up for writing. +pub const POINTER_WRITE_BATCHES: usize = 2048; + /// A file storing a log of blobs. /// Format: /// Repeated: @@ -130,6 +133,54 @@ pub struct Inner { open_bloblogs: HashMap>>, // TODO want an LRU cache with a weak hashmap...? connection: Connection, writers_in_progress: u16, + // We batch up pointer writes because sync() performance really hurts us if we do them one by + // one. + queued_pointer_writes: HashMap, +} + +impl Inner { + pub fn raw_put_chunk_pointer( + &self, + chunk_id: &ChunkId, + bloblog: BloblogId, + offset_i64: i64, + ) -> anyhow::Result<()> { + match self.connection.execute( + "INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)", + params![&chunk_id[..], bloblog, offset_i64], + ) { + Ok(_) => Ok(()), + Err(Error::SqliteFailure(e, str)) => { + if e.code == ConstraintViolation { + warn!( + "(ignoring) SQLite constraint violation on insertion... {:?}", + str + ); + Ok(()) + } else { + Err(Error::SqliteFailure(e, str))?; + unreachable!(); + } + } + other => { + other?; + unreachable!(); + } + } + } + + pub fn flush(&mut self) -> anyhow::Result<()> { + // Create a non-allocated hashmap to satisfy borrow checker, then swap it in and out + let mut queued_pointer_writes = HashMap::with_capacity(0); + std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes); + for (chunk_id, pointer) in queued_pointer_writes.drain() { + let offset_i64 = + i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); + self.raw_put_chunk_pointer(&chunk_id, pointer.bloblog, offset_i64)?; + } + std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes); + Ok(()) + } } /// A Pile built on the idea of SQLite-indexed 'blob logs'. @@ -144,6 +195,7 @@ pub struct SqliteBloblogPile { inner: Arc>, path: PathBuf, writers_reach_zero: Condvar, + should_batch_pointer_writes: bool, } /// A pointer to a blob in a 'blob log'. @@ -193,9 +245,11 @@ impl SqliteBloblogPile { open_bloblogs: HashMap::new(), connection, writers_in_progress: 0, + queued_pointer_writes: Default::default(), })), path: path.to_owned(), writers_reach_zero: Default::default(), + should_batch_pointer_writes: true, }) } @@ -287,28 +341,34 @@ impl SqliteBloblogPile { fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> { let inner = self.inner.lock().unwrap(); let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); - match inner.connection.execute( - "INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)", - params![&chunk_id[..], pointer.bloblog, offset_i64], - ) { - Ok(_) => Ok(()), - Err(Error::SqliteFailure(e, str)) => { - if e.code == ConstraintViolation { - warn!( - "(ignoring) SQLite constraint violation on insertion... {:?}", - str - ); - Ok(()) - } else { - Err(Error::SqliteFailure(e, str))?; - unreachable!(); - } - } - other => { - other?; - unreachable!(); - } + inner.raw_put_chunk_pointer(chunk_id, pointer.bloblog, offset_i64) + } + + fn batched_put_chunk_pointer( + &self, + chunk_id: &ChunkId, + pointer: BloblogPointer, + ) -> anyhow::Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner + .queued_pointer_writes + .insert(chunk_id.clone(), pointer); + if inner.queued_pointer_writes.len() >= POINTER_WRITE_BATCHES { + inner.flush()?; } + Ok(()) + } + + fn flush_queued_pointer_writes(&self) -> anyhow::Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.flush() + } +} + +impl Drop for SqliteBloblogPile { + fn drop(&mut self) { + self.flush_queued_pointer_writes() + .expect("POSSIBILITY OF LOSS OF NEW DATA: failed to flush queued pointer writes!"); } } @@ -385,7 +445,11 @@ impl RawPile for SqliteBloblogPile { offset, }; - self.put_chunk_pointer(&chunk_id, pointer)?; + if self.should_batch_pointer_writes { + self.batched_put_chunk_pointer(&chunk_id, pointer)?; + } else { + self.put_chunk_pointer(&chunk_id, pointer)?; + } Ok(()) } Keyspace::ChunkHash => unimplemented!(),