From ea2f48f4379ebbd38188c1eb79cf99129cf8f23f Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 19 Nov 2021 21:09:18 +0000 Subject: [PATCH] Flush pointers when writing using new high-throughput pipeline --- yama/src/pile/local_sqlitebloblogs.rs | 34 +++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index 5cf597b..7efe1fc 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -28,7 +28,7 @@ use anyhow::{bail, Context}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use log::warn; use nix::unistd::sync; -use rusqlite::{params, Error}; +use rusqlite::{params, Error, ErrorCode}; use rusqlite::{Connection, OptionalExtension}; use crate::definitions::ChunkId; @@ -381,16 +381,40 @@ impl SqliteBloblogPile { fn flush_pointers( this: &SqliteBloblogPile, - pointers_buffered: &mut Vec, + pointers_buffered: &mut Vec<(ChunkId, BloblogPointer)>, ) -> anyhow::Result<()> { - todo!() + let mut inner = this.inner.lock().unwrap(); + let txn = inner.connection.transaction()?; + let mut stmt = txn.prepare( + "INSERT OR FAIL INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)", + )?; + for (chunk_id, pointer) in pointers_buffered.drain(..) { + match stmt.execute(params![ + &chunk_id[..], + pointer.bloblog, + pointer.offset as i64 + ]) { + Err(Error::SqliteFailure(e, str)) + if e.code == ErrorCode::ConstraintViolation => + { + warn!( + "(ignoring) SQLite constraint violation on insertion... {:?}", + str + ); + } + other => { + other?; + } + } + } + Ok(()) } fn write_blob( this: &SqliteBloblogPile, bloblog_id: BloblogId, bloblog: &mut Bloblog, - pointers_buffered: &mut Vec, + pointers_buffered: &mut Vec<(ChunkId, BloblogPointer)>, (chunk_id, chunk): (ChunkId, Vec), ) -> anyhow::Result<()> { let offset = bloblog.write_blob(&chunk_id, &chunk)?; @@ -398,7 +422,7 @@ impl SqliteBloblogPile { bloblog: bloblog_id, offset, }; - pointers_buffered.push(pointer); + pointers_buffered.push((chunk_id, pointer)); if pointers_buffered.len() >= POINTERS_BUFFER_SIZE { flush_pointers(this, pointers_buffered)?;