diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs
index 8d1ac5f..934f793 100644
--- a/yama/src/pile/local_sqlitebloblogs.rs
+++ b/yama/src/pile/local_sqlitebloblogs.rs
@@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see .
*/
-use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::fs::{read_dir, File, OpenOptions};
@@ -36,7 +35,7 @@ use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings,
};
-use crate::utils::bytes_to_hexstring;
+use crate::utils::{bytes_to_hexstring, LruMap};
use crossbeam_channel::{Receiver, Sender};
use rusqlite::ffi::ErrorCode::ConstraintViolation;
use std::time::Duration;
@@ -47,6 +46,9 @@ pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
/// This many pointers will be batched up for writing.
pub const POINTER_WRITE_BATCHES: usize = 2048;
+/// This many bloblogs will be kept open for reading, at maximum.
+pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128;
+
/// A file storing a log of blobs.
/// Format:
/// Repeated:
@@ -136,8 +138,8 @@ pub type BloblogId = u32;
#[derive(Debug)]
pub struct Inner {
next_bloblog_id: BloblogId,
- writer_bloblogs: Vec,
- open_bloblogs: HashMap>>, // TODO want an LRU cache with a weak hashmap...?
+ writer_bloblogs: Vec<(BloblogId, Arc>)>,
+ reader_bloblogs: LruMap>>,
connection: Connection,
writers_in_progress: u16,
// We batch up pointer writes because sync() performance really hurts us if we do them one by
@@ -271,7 +273,7 @@ impl SqliteBloblogPile {
inner: Arc::new(Mutex::new(Inner {
next_bloblog_id: 0,
writer_bloblogs: Vec::new(),
- open_bloblogs: HashMap::new(),
+ reader_bloblogs: LruMap::new(BLOBLOG_MAX_READING_FILE_COUNT),
connection,
writers_in_progress: 0,
queued_pointer_writes: Default::default(),
@@ -284,51 +286,53 @@ impl SqliteBloblogPile {
fn open_bloblog(&self, bloblog_id: BloblogId) -> anyhow::Result>> {
let mut inner = self.inner.lock().unwrap();
- Ok(match inner.open_bloblogs.entry(bloblog_id) {
- Entry::Occupied(entry) => entry.get().clone(),
- Entry::Vacant(entry) => {
+
+ match inner.reader_bloblogs.get(&bloblog_id) {
+ Some(bloblog) => Ok(bloblog.clone()),
+ None => {
let bloblog = Arc::new(Mutex::new(Bloblog::open(
&self.path.join(&bloblog_id.to_string()),
)?));
- entry.insert(bloblog.clone());
- bloblog
+ inner.reader_bloblogs.insert(bloblog_id, bloblog.clone());
+ Ok(bloblog)
}
- })
+ }
}
fn get_writing_bloblog(&self) -> anyhow::Result<(BloblogId, Arc>)> {
let mut inner = self.inner.lock().unwrap();
- let writing_bloblog_id: BloblogId = match inner.writer_bloblogs.pop() {
- None => {
- loop {
- let pre_inc = inner.next_bloblog_id;
- inner.next_bloblog_id += 1;
- // only start writing here if it doesn't already exist!
- let bloblog_path = &self.path.join(&pre_inc.to_string());
- if !bloblog_path.exists() {
- break pre_inc;
- }
- }
+ inner.writers_in_progress += 1;
+
+ if let Some(writing_bloblog) = inner.writer_bloblogs.pop() {
+ // We already have an open bloblog to give back.
+ return Ok(writing_bloblog);
+ }
+
+ // No open bloblogs to reuse; create a new one.
+ // It's very important to create a fresh one here; we definitely don't want to use a file
+ // that someone else is using!
+ let writing_bloblog_id = loop {
+ let pre_inc = inner.next_bloblog_id;
+ inner.next_bloblog_id += 1;
+
+ // only start writing here if it doesn't already exist!
+ let bloblog_path = &self.path.join(&pre_inc.to_string());
+ if !bloblog_path.exists() {
+ break pre_inc;
}
- Some(id) => id,
};
- let result = Ok((
- writing_bloblog_id,
- match inner.open_bloblogs.entry(writing_bloblog_id) {
- Entry::Occupied(entry) => entry.get().clone(),
- Entry::Vacant(entry) => {
- let bloblog = Arc::new(Mutex::new(Bloblog::open(
- &self.path.join(&writing_bloblog_id.to_string()),
- )?));
- entry.insert(bloblog.clone());
- bloblog
- }
- },
- ));
- inner.writers_in_progress += 1;
- result
+ let bloblog = Arc::new(Mutex::new(Bloblog::open(
+ &self.path.join(&writing_bloblog_id.to_string()),
+ )?));
+
+ // MAYBE FUTURE // Insert a weak reference so we can easily get a reader for this if desired.
+ // inner.open_bloblogs.insert(writing_bloblog_id, Arc::downgrade(&bloblog));
+ // For now, I don't think we actually care about reading a bloblog that we've written
+ // (at least not usually?)
+
+ Ok((writing_bloblog_id, bloblog))
}
/// Should be called once the bloblog has been finished writing to for the moment.
@@ -341,7 +345,7 @@ impl SqliteBloblogPile {
let size = bloblog.lock().unwrap().filesize()?;
let mut inner = self.inner.lock().unwrap();
if size < MAX_BLOBLOG_REUSE_SIZE {
- inner.writer_bloblogs.push(id);
+ inner.writer_bloblogs.push((id, bloblog));
}
inner.writers_in_progress -= 1;
if inner.writers_in_progress == 0 {
diff --git a/yama/src/utils.rs b/yama/src/utils.rs
index 7801801..6519aa8 100644
--- a/yama/src/utils.rs
+++ b/yama/src/utils.rs
@@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see .
*/
+use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Write;
pub fn bytes_to_hexstring(chunkid: &[u8]) -> String {
@@ -42,3 +43,98 @@ pub fn get_number_of_workers(first_try_env_name: &str) -> u8 {
}
}
}
+
+#[derive(Clone, Debug)]
+pub struct LruMap {
+ capacity: usize,
+ last_access: BTreeSet<(u64, K)>,
+ items: BTreeMap,
+ counter: u64,
+}
+
+impl LruMap {
+ pub fn new(capacity: usize) -> LruMap {
+ LruMap {
+ capacity,
+ last_access: BTreeSet::new(),
+ items: BTreeMap::new(),
+ counter: 0,
+ }
+ }
+
+ /// Gets an item from the LRU map.
+ pub fn get(&mut self, key: &K) -> Option<&V> {
+ match self.items.get_mut(key) {
+ Some((value, last_used_instant)) => {
+ assert!(
+ self.last_access.remove(&(*last_used_instant, key.clone())),
+ "Corrupt LRU map: freshen not correct."
+ );
+ let new_instant = self.counter;
+ self.counter += 1;
+ self.last_access.insert((new_instant, key.clone()));
+ *last_used_instant = new_instant;
+ Some(value)
+ }
+ None => None,
+ }
+ }
+
+ pub fn insert(&mut self, key: K, value: V) -> Option {
+ let new_instant = self.counter;
+ self.counter += 1;
+
+ let retval = match self.items.insert(key.clone(), (value, new_instant)) {
+ Some((old_entry, old_instant)) => {
+ assert!(
+ self.last_access.remove(&(old_instant, key.clone())),
+ "Corrupt LRU map: insert not correct."
+ );
+ Some(old_entry)
+ }
+ None => None,
+ };
+ self.last_access.insert((new_instant, key));
+
+ if retval.is_none() {
+ // We didn't replace any item, so we have grown by 1.
+ // Check if we need to evict.
+ if self.items.len() > self.capacity {
+ self.evict();
+ }
+ }
+
+ retval
+ }
+
+ pub fn evict(&mut self) -> Option<(K, V)> {
+ if let Some(first_entry) = self.last_access.iter().next().cloned() {
+ self.last_access.remove(&first_entry);
+ let (_, key) = first_entry;
+ let (value, _) = self
+ .items
+ .remove(&key)
+ .expect("Corrupt LRU map: last access and items out of sync");
+
+ Some((key, value))
+ } else {
+ None
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::utils::LruMap;
+
+ #[test]
+ fn test_lru_map() {
+ let mut lmap = LruMap::new(3);
+ lmap.insert(1, 1);
+ lmap.insert(2, 1);
+ lmap.insert(3, 1);
+ assert_eq!(lmap.get(&1), Some(&1));
+ lmap.insert(4, 1);
+ assert_eq!(lmap.get(&2), None);
+ }
+}