Put reader bloblogs in an LRU map to prevent hitting open FD limit

This commit is contained in:
Olivier 'reivilibre' 2022-07-23 22:38:29 +01:00
parent 05c6d3e662
commit ee9ca73224
2 changed files with 139 additions and 39 deletions

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::fs::{read_dir, File, OpenOptions}; use std::fs::{read_dir, File, OpenOptions};
@ -36,7 +35,7 @@ use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile, ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings, StoragePipelineSettings,
}; };
use crate::utils::bytes_to_hexstring; use crate::utils::{bytes_to_hexstring, LruMap};
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use rusqlite::ffi::ErrorCode::ConstraintViolation; use rusqlite::ffi::ErrorCode::ConstraintViolation;
use std::time::Duration; use std::time::Duration;
@ -47,6 +46,9 @@ pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
/// This many pointers will be batched up for writing. /// This many pointers will be batched up for writing.
pub const POINTER_WRITE_BATCHES: usize = 2048; pub const POINTER_WRITE_BATCHES: usize = 2048;
/// This many bloblogs will be kept open for reading, at maximum.
pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128;
/// A file storing a log of blobs. /// A file storing a log of blobs.
/// Format: /// Format:
/// Repeated: /// Repeated:
@ -136,8 +138,8 @@ pub type BloblogId = u32;
#[derive(Debug)] #[derive(Debug)]
pub struct Inner { pub struct Inner {
next_bloblog_id: BloblogId, next_bloblog_id: BloblogId,
writer_bloblogs: Vec<BloblogId>, writer_bloblogs: Vec<(BloblogId, Arc<Mutex<Bloblog>>)>,
open_bloblogs: HashMap<BloblogId, Arc<Mutex<Bloblog>>>, // TODO want an LRU cache with a weak hashmap...? reader_bloblogs: LruMap<BloblogId, Arc<Mutex<Bloblog>>>,
connection: Connection, connection: Connection,
writers_in_progress: u16, writers_in_progress: u16,
// We batch up pointer writes because sync() performance really hurts us if we do them one by // We batch up pointer writes because sync() performance really hurts us if we do them one by
@ -271,7 +273,7 @@ impl SqliteBloblogPile {
inner: Arc::new(Mutex::new(Inner { inner: Arc::new(Mutex::new(Inner {
next_bloblog_id: 0, next_bloblog_id: 0,
writer_bloblogs: Vec::new(), writer_bloblogs: Vec::new(),
open_bloblogs: HashMap::new(), reader_bloblogs: LruMap::new(BLOBLOG_MAX_READING_FILE_COUNT),
connection, connection,
writers_in_progress: 0, writers_in_progress: 0,
queued_pointer_writes: Default::default(), queued_pointer_writes: Default::default(),
@ -284,51 +286,53 @@ impl SqliteBloblogPile {
fn open_bloblog(&self, bloblog_id: BloblogId) -> anyhow::Result<Arc<Mutex<Bloblog>>> { fn open_bloblog(&self, bloblog_id: BloblogId) -> anyhow::Result<Arc<Mutex<Bloblog>>> {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
Ok(match inner.open_bloblogs.entry(bloblog_id) {
Entry::Occupied(entry) => entry.get().clone(), match inner.reader_bloblogs.get(&bloblog_id) {
Entry::Vacant(entry) => { Some(bloblog) => Ok(bloblog.clone()),
None => {
let bloblog = Arc::new(Mutex::new(Bloblog::open( let bloblog = Arc::new(Mutex::new(Bloblog::open(
&self.path.join(&bloblog_id.to_string()), &self.path.join(&bloblog_id.to_string()),
)?)); )?));
entry.insert(bloblog.clone()); inner.reader_bloblogs.insert(bloblog_id, bloblog.clone());
bloblog Ok(bloblog)
} }
}) }
} }
fn get_writing_bloblog(&self) -> anyhow::Result<(BloblogId, Arc<Mutex<Bloblog>>)> { fn get_writing_bloblog(&self) -> anyhow::Result<(BloblogId, Arc<Mutex<Bloblog>>)> {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let writing_bloblog_id: BloblogId = match inner.writer_bloblogs.pop() {
None => {
loop {
let pre_inc = inner.next_bloblog_id;
inner.next_bloblog_id += 1;
// only start writing here if it doesn't already exist! inner.writers_in_progress += 1;
let bloblog_path = &self.path.join(&pre_inc.to_string());
if !bloblog_path.exists() { if let Some(writing_bloblog) = inner.writer_bloblogs.pop() {
break pre_inc; // We already have an open bloblog to give back.
} return Ok(writing_bloblog);
} }
// No open bloblogs to reuse; create a new one.
// It's very important to create a fresh one here; we definitely don't want to use a file
// that someone else is using!
let writing_bloblog_id = loop {
let pre_inc = inner.next_bloblog_id;
inner.next_bloblog_id += 1;
// only start writing here if it doesn't already exist!
let bloblog_path = &self.path.join(&pre_inc.to_string());
if !bloblog_path.exists() {
break pre_inc;
} }
Some(id) => id,
}; };
let result = Ok(( let bloblog = Arc::new(Mutex::new(Bloblog::open(
writing_bloblog_id, &self.path.join(&writing_bloblog_id.to_string()),
match inner.open_bloblogs.entry(writing_bloblog_id) { )?));
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => { // MAYBE FUTURE // Insert a weak reference so we can easily get a reader for this if desired.
let bloblog = Arc::new(Mutex::new(Bloblog::open( // inner.open_bloblogs.insert(writing_bloblog_id, Arc::downgrade(&bloblog));
&self.path.join(&writing_bloblog_id.to_string()), // For now, I don't think we actually care about reading a bloblog that we've written
)?)); // (at least not usually?)
entry.insert(bloblog.clone());
bloblog Ok((writing_bloblog_id, bloblog))
}
},
));
inner.writers_in_progress += 1;
result
} }
/// Should be called once the bloblog has been finished writing to for the moment. /// Should be called once the bloblog has been finished writing to for the moment.
@ -341,7 +345,7 @@ impl SqliteBloblogPile {
let size = bloblog.lock().unwrap().filesize()?; let size = bloblog.lock().unwrap().filesize()?;
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
if size < MAX_BLOBLOG_REUSE_SIZE { if size < MAX_BLOBLOG_REUSE_SIZE {
inner.writer_bloblogs.push(id); inner.writer_bloblogs.push((id, bloblog));
} }
inner.writers_in_progress -= 1; inner.writers_in_progress -= 1;
if inner.writers_in_progress == 0 { if inner.writers_in_progress == 0 {

View File

@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Write; use std::fmt::Write;
pub fn bytes_to_hexstring(chunkid: &[u8]) -> String { pub fn bytes_to_hexstring(chunkid: &[u8]) -> String {
@ -42,3 +43,98 @@ pub fn get_number_of_workers(first_try_env_name: &str) -> u8 {
} }
} }
} }
#[derive(Clone, Debug)]
pub struct LruMap<K, V> {
capacity: usize,
last_access: BTreeSet<(u64, K)>,
items: BTreeMap<K, (V, u64)>,
counter: u64,
}
impl<K: Ord + Clone, V> LruMap<K, V> {
pub fn new(capacity: usize) -> LruMap<K, V> {
LruMap {
capacity,
last_access: BTreeSet::new(),
items: BTreeMap::new(),
counter: 0,
}
}
/// Gets an item from the LRU map.
pub fn get(&mut self, key: &K) -> Option<&V> {
match self.items.get_mut(key) {
Some((value, last_used_instant)) => {
assert!(
self.last_access.remove(&(*last_used_instant, key.clone())),
"Corrupt LRU map: freshen not correct."
);
let new_instant = self.counter;
self.counter += 1;
self.last_access.insert((new_instant, key.clone()));
*last_used_instant = new_instant;
Some(value)
}
None => None,
}
}
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
let new_instant = self.counter;
self.counter += 1;
let retval = match self.items.insert(key.clone(), (value, new_instant)) {
Some((old_entry, old_instant)) => {
assert!(
self.last_access.remove(&(old_instant, key.clone())),
"Corrupt LRU map: insert not correct."
);
Some(old_entry)
}
None => None,
};
self.last_access.insert((new_instant, key));
if retval.is_none() {
// We didn't replace any item, so we have grown by 1.
// Check if we need to evict.
if self.items.len() > self.capacity {
self.evict();
}
}
retval
}
pub fn evict(&mut self) -> Option<(K, V)> {
if let Some(first_entry) = self.last_access.iter().next().cloned() {
self.last_access.remove(&first_entry);
let (_, key) = first_entry;
let (value, _) = self
.items
.remove(&key)
.expect("Corrupt LRU map: last access and items out of sync");
Some((key, value))
} else {
None
}
}
}
#[cfg(test)]
mod test {
use crate::utils::LruMap;
#[test]
fn test_lru_map() {
let mut lmap = LruMap::new(3);
lmap.insert(1, 1);
lmap.insert(2, 1);
lmap.insert(3, 1);
assert_eq!(lmap.get(&1), Some(&1));
lmap.insert(4, 1);
assert_eq!(lmap.get(&2), None);
}
}