Flush chunk pointers in one transaction
ci/woodpecker/push/build Pipeline was successful Details
ci/woodpecker/push/release Pipeline was successful Details
ci/woodpecker/tag/build Pipeline was successful Details
ci/woodpecker/tag/release Pipeline was successful Details

This commit is contained in:
Olivier 'reivilibre' 2022-06-15 21:13:58 +01:00
parent eef22e7009
commit 0b84c793bf
1 changed files with 39 additions and 26 deletions

View File

@ -28,7 +28,7 @@ use anyhow::{bail, Context};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use log::{info, warn}; use log::{info, warn};
use nix::unistd::sync; use nix::unistd::sync;
use rusqlite::{params, Error, ErrorCode}; use rusqlite::{params, Error, ErrorCode, Transaction};
use rusqlite::{Connection, OptionalExtension}; use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId; use crate::definitions::ChunkId;
@ -145,14 +145,13 @@ pub struct Inner {
queued_pointer_writes: HashMap<ChunkId, BloblogPointer>, queued_pointer_writes: HashMap<ChunkId, BloblogPointer>,
} }
impl Inner { fn raw_put_chunk_pointer_txn(
pub fn raw_put_chunk_pointer( txn: &Transaction,
&self,
chunk_id: &ChunkId, chunk_id: &ChunkId,
bloblog: BloblogId, bloblog: BloblogId,
offset_i64: i64, offset_i64: i64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match self.connection.execute( match txn.execute(
"INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)", "INSERT INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)",
params![&chunk_id[..], bloblog, offset_i64], params![&chunk_id[..], bloblog, offset_i64],
) { ) {
@ -165,27 +164,41 @@ impl Inner {
); );
Ok(()) Ok(())
} else { } else {
Err(Error::SqliteFailure(e, str))?; Err(Error::SqliteFailure(e, str).into())
unreachable!();
} }
} }
other => { Err(other) => Err(other.into()),
other?;
unreachable!();
}
} }
}
impl Inner {
pub fn raw_put_chunk_pointer(
&mut self,
chunk_id: &ChunkId,
bloblog: BloblogId,
offset_i64: i64,
) -> anyhow::Result<()> {
let txn = self.connection.transaction()?;
raw_put_chunk_pointer_txn(&txn, chunk_id, bloblog, offset_i64)?;
txn.commit()?;
Ok(())
} }
pub fn flush(&mut self) -> anyhow::Result<()> { pub fn flush(&mut self) -> anyhow::Result<()> {
// Create a non-allocated hashmap to satisfy borrow checker, then swap it in and out // Create a non-allocated hashmap to satisfy borrow checker, then swap it in and out
let mut queued_pointer_writes = HashMap::with_capacity(0); let mut queued_pointer_writes = HashMap::with_capacity(0);
std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes); std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes);
let txn = self.connection.transaction()?;
for (chunk_id, pointer) in queued_pointer_writes.drain() { for (chunk_id, pointer) in queued_pointer_writes.drain() {
let offset_i64 = let offset_i64 =
i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64...");
self.raw_put_chunk_pointer(&chunk_id, pointer.bloblog, offset_i64)?; raw_put_chunk_pointer_txn(&txn, &chunk_id, pointer.bloblog, offset_i64)?;
} }
std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes); std::mem::swap(&mut self.queued_pointer_writes, &mut queued_pointer_writes);
txn.commit()?;
Ok(()) Ok(())
} }
} }
@ -355,7 +368,7 @@ impl SqliteBloblogPile {
} }
fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> { fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> {
let inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64..."); let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64...");
inner.raw_put_chunk_pointer(chunk_id, pointer.bloblog, offset_i64) inner.raw_put_chunk_pointer(chunk_id, pointer.bloblog, offset_i64)
} }