Actually commit when flushing bloblog pointers
This commit is contained in:
parent
8eeafa7626
commit
a37acf3e74
@ -26,7 +26,7 @@ use std::{fs, thread};
|
|||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use log::warn;
|
use log::{info, warn};
|
||||||
use nix::unistd::sync;
|
use nix::unistd::sync;
|
||||||
use rusqlite::{params, Error, ErrorCode};
|
use rusqlite::{params, Error, ErrorCode};
|
||||||
use rusqlite::{Connection, OptionalExtension};
|
use rusqlite::{Connection, OptionalExtension};
|
||||||
@ -383,29 +383,32 @@ impl SqliteBloblogPile {
|
|||||||
pointers_buffered: &mut Vec<(ChunkId, BloblogPointer)>,
|
pointers_buffered: &mut Vec<(ChunkId, BloblogPointer)>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut inner = this.inner.lock().unwrap();
|
let mut inner = this.inner.lock().unwrap();
|
||||||
let txn = inner.connection.transaction()?;
|
let mut txn = inner.connection.transaction()?;
|
||||||
let mut stmt = txn.prepare(
|
{
|
||||||
"INSERT OR FAIL INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)",
|
let mut stmt = txn.prepare(
|
||||||
)?;
|
"INSERT OR FAIL INTO chunks (chunk_id, bloblog, offset) VALUES (?1, ?2, ?3)",
|
||||||
for (chunk_id, pointer) in pointers_buffered.drain(..) {
|
)?;
|
||||||
match stmt.execute(params![
|
for (chunk_id, pointer) in pointers_buffered.drain(..) {
|
||||||
&chunk_id[..],
|
match stmt.execute(params![
|
||||||
pointer.bloblog,
|
&chunk_id[..],
|
||||||
pointer.offset as i64
|
pointer.bloblog,
|
||||||
]) {
|
pointer.offset as i64
|
||||||
Err(Error::SqliteFailure(e, str))
|
]) {
|
||||||
if e.code == ErrorCode::ConstraintViolation =>
|
Err(Error::SqliteFailure(e, str))
|
||||||
{
|
if e.code == ErrorCode::ConstraintViolation =>
|
||||||
warn!(
|
{
|
||||||
"(ignoring) SQLite constraint violation on insertion... {:?}",
|
warn!(
|
||||||
str
|
"(ignoring) SQLite constraint violation on insertion... {:?}",
|
||||||
);
|
str
|
||||||
}
|
);
|
||||||
other => {
|
}
|
||||||
other?;
|
other => {
|
||||||
|
other?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
txn.commit()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -459,6 +462,7 @@ impl SqliteBloblogPile {
|
|||||||
self.return_writing_bloblog(bloblog_id, bloglog_mutex)?;
|
self.return_writing_bloblog(bloblog_id, bloglog_mutex)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("Flushing pointers (storage pipeline shutdown).");
|
||||||
flush_pointers(self, &mut pointers_buffered)?;
|
flush_pointers(self, &mut pointers_buffered)?;
|
||||||
|
|
||||||
// we MUST have flushed ALL the pointers by now.
|
// we MUST have flushed ALL the pointers by now.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user