Flush pointers when writing using new high-throughput pipeline

This commit is contained in:
Olivier 'reivilibre' 2021-11-19 21:09:18 +00:00
parent cc60ae88a4
commit ea2f48f437

View File

@ -28,7 +28,7 @@ use anyhow::{bail, Context};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use log::warn;
use nix::unistd::sync;
use rusqlite::{params, Error};
use rusqlite::{params, Error, ErrorCode};
use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId;
@ -381,16 +381,40 @@ impl SqliteBloblogPile {
fn flush_pointers(
this: &SqliteBloblogPile,
pointers_buffered: &mut Vec<BloblogPointer>,
pointers_buffered: &mut Vec<(ChunkId, BloblogPointer)>,
) -> anyhow::Result<()> {
todo!()
let mut inner = this.inner.lock().unwrap();
let txn = inner.connection.transaction()?;
let mut stmt = txn.prepare(
"INSERT OR FAIL INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)",
)?;
for (chunk_id, pointer) in pointers_buffered.drain(..) {
match stmt.execute(params![
&chunk_id[..],
pointer.bloblog,
pointer.offset as i64
]) {
Err(Error::SqliteFailure(e, str))
if e.code == ErrorCode::ConstraintViolation =>
{
warn!(
"(ignoring) SQLite constraint violation on insertion... {:?}",
str
);
}
other => {
other?;
}
}
}
Ok(())
}
fn write_blob(
this: &SqliteBloblogPile,
bloblog_id: BloblogId,
bloblog: &mut Bloblog,
pointers_buffered: &mut Vec<BloblogPointer>,
pointers_buffered: &mut Vec<(ChunkId, BloblogPointer)>,
(chunk_id, chunk): (ChunkId, Vec<u8>),
) -> anyhow::Result<()> {
let offset = bloblog.write_blob(&chunk_id, &chunk)?;
@ -398,7 +422,7 @@ impl SqliteBloblogPile {
bloblog: bloblog_id,
offset,
};
pointers_buffered.push(pointer);
pointers_buffered.push((chunk_id, pointer));
if pointers_buffered.len() >= POINTERS_BUFFER_SIZE {
flush_pointers(this, pointers_buffered)?;