From ee9ca7322492752c47e4496cbb9d8dab2705f9c0 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 23 Jul 2022 22:38:29 +0100 Subject: [PATCH] Put reader bloblogs in an LRU map to prevent hitting open FD limit --- yama/src/pile/local_sqlitebloblogs.rs | 82 ++++++++++++----------- yama/src/utils.rs | 96 +++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 39 deletions(-) diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index 8d1ac5f..934f793 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License along with Yama. If not, see . */ -use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::convert::{TryFrom, TryInto}; use std::fs::{read_dir, File, OpenOptions}; @@ -36,7 +35,7 @@ use crate::pile::{ ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile, StoragePipelineSettings, }; -use crate::utils::bytes_to_hexstring; +use crate::utils::{bytes_to_hexstring, LruMap}; use crossbeam_channel::{Receiver, Sender}; use rusqlite::ffi::ErrorCode::ConstraintViolation; use std::time::Duration; @@ -47,6 +46,9 @@ pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024; /// This many pointers will be batched up for writing. pub const POINTER_WRITE_BATCHES: usize = 2048; +/// This many bloblogs will be kept open for reading, at maximum. +pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128; + /// A file storing a log of blobs. /// Format: /// Repeated: @@ -136,8 +138,8 @@ pub type BloblogId = u32; #[derive(Debug)] pub struct Inner { next_bloblog_id: BloblogId, - writer_bloblogs: Vec, - open_bloblogs: HashMap>>, // TODO want an LRU cache with a weak hashmap...? + writer_bloblogs: Vec<(BloblogId, Arc>)>, + reader_bloblogs: LruMap>>, connection: Connection, writers_in_progress: u16, // We batch up pointer writes because sync() performance really hurts us if we do them one by @@ -271,7 +273,7 @@ impl SqliteBloblogPile { inner: Arc::new(Mutex::new(Inner { next_bloblog_id: 0, writer_bloblogs: Vec::new(), - open_bloblogs: HashMap::new(), + reader_bloblogs: LruMap::new(BLOBLOG_MAX_READING_FILE_COUNT), connection, writers_in_progress: 0, queued_pointer_writes: Default::default(), @@ -284,51 +286,53 @@ impl SqliteBloblogPile { fn open_bloblog(&self, bloblog_id: BloblogId) -> anyhow::Result>> { let mut inner = self.inner.lock().unwrap(); - Ok(match inner.open_bloblogs.entry(bloblog_id) { - Entry::Occupied(entry) => entry.get().clone(), - Entry::Vacant(entry) => { + + match inner.reader_bloblogs.get(&bloblog_id) { + Some(bloblog) => Ok(bloblog.clone()), + None => { let bloblog = Arc::new(Mutex::new(Bloblog::open( &self.path.join(&bloblog_id.to_string()), )?)); - entry.insert(bloblog.clone()); - bloblog + inner.reader_bloblogs.insert(bloblog_id, bloblog.clone()); + Ok(bloblog) } - }) + } } fn get_writing_bloblog(&self) -> anyhow::Result<(BloblogId, Arc>)> { let mut inner = self.inner.lock().unwrap(); - let writing_bloblog_id: BloblogId = match inner.writer_bloblogs.pop() { - None => { - loop { - let pre_inc = inner.next_bloblog_id; - inner.next_bloblog_id += 1; - // only start writing here if it doesn't already exist! - let bloblog_path = &self.path.join(&pre_inc.to_string()); - if !bloblog_path.exists() { - break pre_inc; - } - } + inner.writers_in_progress += 1; + + if let Some(writing_bloblog) = inner.writer_bloblogs.pop() { + // We already have an open bloblog to give back. + return Ok(writing_bloblog); + } + + // No open bloblogs to reuse; create a new one. + // It's very important to create a fresh one here; we definitely don't want to use a file + // that someone else is using! + let writing_bloblog_id = loop { + let pre_inc = inner.next_bloblog_id; + inner.next_bloblog_id += 1; + + // only start writing here if it doesn't already exist! + let bloblog_path = &self.path.join(&pre_inc.to_string()); + if !bloblog_path.exists() { + break pre_inc; } - Some(id) => id, }; - let result = Ok(( - writing_bloblog_id, - match inner.open_bloblogs.entry(writing_bloblog_id) { - Entry::Occupied(entry) => entry.get().clone(), - Entry::Vacant(entry) => { - let bloblog = Arc::new(Mutex::new(Bloblog::open( - &self.path.join(&writing_bloblog_id.to_string()), - )?)); - entry.insert(bloblog.clone()); - bloblog - } - }, - )); - inner.writers_in_progress += 1; - result + let bloblog = Arc::new(Mutex::new(Bloblog::open( + &self.path.join(&writing_bloblog_id.to_string()), + )?)); + + // MAYBE FUTURE // Insert a weak reference so we can easily get a reader for this if desired. + // inner.open_bloblogs.insert(writing_bloblog_id, Arc::downgrade(&bloblog)); + // For now, I don't think we actually care about reading a bloblog that we've written + // (at least not usually?) + + Ok((writing_bloblog_id, bloblog)) } /// Should be called once the bloblog has been finished writing to for the moment. @@ -341,7 +345,7 @@ impl SqliteBloblogPile { let size = bloblog.lock().unwrap().filesize()?; let mut inner = self.inner.lock().unwrap(); if size < MAX_BLOBLOG_REUSE_SIZE { - inner.writer_bloblogs.push(id); + inner.writer_bloblogs.push((id, bloblog)); } inner.writers_in_progress -= 1; if inner.writers_in_progress == 0 { diff --git a/yama/src/utils.rs b/yama/src/utils.rs index 7801801..6519aa8 100644 --- a/yama/src/utils.rs +++ b/yama/src/utils.rs @@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License along with Yama. If not, see . */ +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Write; pub fn bytes_to_hexstring(chunkid: &[u8]) -> String { @@ -42,3 +43,98 @@ pub fn get_number_of_workers(first_try_env_name: &str) -> u8 { } } } + +#[derive(Clone, Debug)] +pub struct LruMap { + capacity: usize, + last_access: BTreeSet<(u64, K)>, + items: BTreeMap, + counter: u64, +} + +impl LruMap { + pub fn new(capacity: usize) -> LruMap { + LruMap { + capacity, + last_access: BTreeSet::new(), + items: BTreeMap::new(), + counter: 0, + } + } + + /// Gets an item from the LRU map. + pub fn get(&mut self, key: &K) -> Option<&V> { + match self.items.get_mut(key) { + Some((value, last_used_instant)) => { + assert!( + self.last_access.remove(&(*last_used_instant, key.clone())), + "Corrupt LRU map: freshen not correct." + ); + let new_instant = self.counter; + self.counter += 1; + self.last_access.insert((new_instant, key.clone())); + *last_used_instant = new_instant; + Some(value) + } + None => None, + } + } + + pub fn insert(&mut self, key: K, value: V) -> Option { + let new_instant = self.counter; + self.counter += 1; + + let retval = match self.items.insert(key.clone(), (value, new_instant)) { + Some((old_entry, old_instant)) => { + assert!( + self.last_access.remove(&(old_instant, key.clone())), + "Corrupt LRU map: insert not correct." + ); + Some(old_entry) + } + None => None, + }; + self.last_access.insert((new_instant, key)); + + if retval.is_none() { + // We didn't replace any item, so we have grown by 1. + // Check if we need to evict. + if self.items.len() > self.capacity { + self.evict(); + } + } + + retval + } + + pub fn evict(&mut self) -> Option<(K, V)> { + if let Some(first_entry) = self.last_access.iter().next().cloned() { + self.last_access.remove(&first_entry); + let (_, key) = first_entry; + let (value, _) = self + .items + .remove(&key) + .expect("Corrupt LRU map: last access and items out of sync"); + + Some((key, value)) + } else { + None + } + } +} + +#[cfg(test)] +mod test { + use crate::utils::LruMap; + + #[test] + fn test_lru_map() { + let mut lmap = LruMap::new(3); + lmap.insert(1, 1); + lmap.insert(2, 1); + lmap.insert(3, 1); + assert_eq!(lmap.get(&1), Some(&1)); + lmap.insert(4, 1); + assert_eq!(lmap.get(&2), None); + } +}