diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index 445226d..8d1ac5f 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -28,7 +28,7 @@ use anyhow::{bail, Context}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use log::{info, warn}; use nix::unistd::sync; -use rusqlite::{params, Error, ErrorCode}; +use rusqlite::{params, Error, ErrorCode, Transaction}; use rusqlite::{Connection, OptionalExtension}; use crate::definitions::ChunkId; @@ -145,47 +145,60 @@ pub struct Inner { queued_pointer_writes: HashMap, } +fn raw_put_chunk_pointer_txn( + txn: &Transaction, + chunk_id: &ChunkId, + bloblog: BloblogId, + offset_i64: i64, +) -> anyhow::Result<()> { + match txn.execute( + "INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)", + params![&chunk_id[..], bloblog, offset_i64], + ) { + Ok(_) => Ok(()), + Err(Error::SqliteFailure(e, str)) => { + if e.code == ConstraintViolation { + warn!( + "(ignoring) SQLite constraint violation on insertion... {:?}", + str + ); + Ok(()) + } else { + Err(Error::SqliteFailure(e, str).into()) + } + } + Err(other) => Err(other.into()), + } +} + impl Inner { pub fn raw_put_chunk_pointer( - &self, + &mut self, chunk_id: &ChunkId, bloblog: BloblogId, offset_i64: i64, ) -> anyhow::Result<()> { - match self.connection.execute( - "INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)", - params![&chunk_id[..], bloblog, offset_i64], - ) { - Ok(_) => Ok(()), - Err(Error::SqliteFailure(e, str)) => { - if e.code == ConstraintViolation { - warn!( - "(ignoring) SQLite constraint violation on insertion... {:?}", - str - ); - Ok(()) - } else { - Err(Error::SqliteFailure(e, str))?; - unreachable!(); - } - } - other => { - other?; - unreachable!(); - } - } + let txn = self.connection.transaction()?; + raw_put_chunk_pointer_txn(&txn, chunk_id, bloblog, offset_i64)?; + txn.commit()?; + Ok(()) } pub fn flush(&mut self) -> anyhow::Result<()> { // Create a non-allocated hashmap to satisfy borrow checker, then swap it in and out let mut queued_pointer_writes = HashMap::with_capacity(0); std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes); + + let txn = self.connection.transaction()?; + for (chunk_id, pointer) in queued_pointer_writes.drain() { let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); - self.raw_put_chunk_pointer(&chunk_id, pointer.bloblog, offset_i64)?; + raw_put_chunk_pointer_txn(&txn, &chunk_id, pointer.bloblog, offset_i64)?; } std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes); + + txn.commit()?; Ok(()) } } @@ -355,7 +368,7 @@ impl SqliteBloblogPile { } fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> { - let inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock().unwrap(); let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); inner.raw_put_chunk_pointer(chunk_id, pointer.bloblog, offset_i64) }