From fdd82b2fef317ebcf1bf71a632376b5df9de91e1 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 24 Mar 2022 19:12:45 +0000 Subject: [PATCH] Lay down some more structs --- src/database.rs | 2 + src/database/raw.rs | 16 ++++++++ src/database/wrapped.rs | 7 ++++ src/environment.rs | 74 +++++++++++++++++++++++++----------- src/lib.rs | 5 ++- src/wrapper.rs | 83 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 163 insertions(+), 24 deletions(-) create mode 100644 src/database.rs create mode 100644 src/database/raw.rs create mode 100644 src/database/wrapped.rs create mode 100644 src/wrapper.rs diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..20db276 --- /dev/null +++ b/src/database.rs @@ -0,0 +1,2 @@ +pub(crate) mod raw; +pub(crate) mod wrapped; diff --git a/src/database/raw.rs b/src/database/raw.rs new file mode 100644 index 0000000..451ed9a --- /dev/null +++ b/src/database/raw.rs @@ -0,0 +1,16 @@ +use libmdbx::{Database, Environment, WriteMap}; +use ouroboros::self_referencing; +use std::marker::PhantomData; +use std::sync::Arc; + +#[self_referencing(pub_extras)] +pub struct RawTable { + pub mdbx_env: Arc>, + + #[borrows(mdbx_env)] + #[covariant] + pub mdbx_db: Database<'this>, + + pub(crate) phantom_k: PhantomData, + pub(crate) phantom_v: PhantomData, +} diff --git a/src/database/wrapped.rs b/src/database/wrapped.rs new file mode 100644 index 0000000..81d1dab --- /dev/null +++ b/src/database/wrapped.rs @@ -0,0 +1,7 @@ +use crate::database::raw::RawTable; + +pub struct WrappedTable { + pub raw: RawTable<[u8], [u8]>, + pub k_wrapper: K, + pub v_wrapper: V, +} diff --git a/src/environment.rs b/src/environment.rs index 8ee8fa6..8eee486 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -1,30 +1,60 @@ +use crate::database::raw::{RawTable, RawTableTryBuilder}; +use crate::database::wrapped::WrappedTable; +use crate::wrapper::{ByteWrapper, ZeroCopyByteWrapper}; +use anyhow::{ensure, Context}; +use libmdbx::{DatabaseFlags, Environment, WriteMap}; +use std::marker::PhantomData; use std::sync::Arc; -use libmdbx::{Database, Environment, EnvironmentKind, WriteMap}; pub struct Env { - pub mdbx_env: Environment, + pub mdbx_env: Arc>, } -#[self_referencing] -pub struct Table { - pub mdbx_env: Arc, - - #[borrows(mdbx_env)] - #[covariant] - pub mdbx_db: Database<'this> -} - -// TODO: -// - raw if wanted -// - serde if wanted -// - compression if wanted -// - ??? does it make sense to offer ALL these things in one interface; could the raw stuff -// be done slightly differently? - impl Env { - pub fn open_table(&self, name: &str, flags: ()) -> anyhow::Result { - todo!() + pub fn open_raw_table( + &self, + name: Option<&str>, + _flags: (), + ) -> anyhow::Result> { + Ok(RawTableTryBuilder { + mdbx_env: self.mdbx_env.clone(), + mdbx_db_builder: |mdbx_env: &Arc>| { + let txn = mdbx_env + .begin_rw_txn() + .context("Can't start RW transaction")?; + // TODO database flags + let db = txn + .create_db(name, DatabaseFlags::empty()) + .context("Can't open database")?; + txn.prime_for_permaopen(db); + + let (_bool, mut dbs) = txn.commit_and_rebind_open_dbs()?; + + ensure!(dbs.len() == 1); + + let mdbx_db = dbs.pop().context("Expected database!")?; + + Ok(mdbx_db) + }, + phantom_k: PhantomData::default(), + phantom_v: PhantomData::default(), + } + .try_build()?) } - pub fn open_ -} \ No newline at end of file + pub fn open_wrapped_table( + &self, + name: Option<&str>, + flags: (), + k_wrapper: K, + v_wrapper: V, + ) -> anyhow::Result> { + let raw_table = self.open_raw_table(name, flags)?; + + Ok(WrappedTable { + raw: raw_table, + k_wrapper, + v_wrapper, + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index e3be3a4..941c1af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ - -mod environment; +pub(crate) mod database; +pub(crate) mod environment; +pub(crate) mod wrapper; #[cfg(test)] mod tests { diff --git a/src/wrapper.rs b/src/wrapper.rs new file mode 100644 index 0000000..8630b2b --- /dev/null +++ b/src/wrapper.rs @@ -0,0 +1,83 @@ +use anyhow::anyhow; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::marker::PhantomData; +use std::sync::Mutex; +use zstd::bulk::{Compressor, Decompressor}; + +pub trait ByteWrapper { + type Item; + + fn load_from_db_bytes(&self, bytes: &[u8]) -> anyhow::Result; + fn dump_to_db_bytes(&self, item: &Self::Item) -> anyhow::Result>; +} + +pub struct SerdeBareWrapper { + phantom: PhantomData, +} + +impl ByteWrapper for SerdeBareWrapper { + type Item = T; + + fn load_from_db_bytes(&self, bytes: &[u8]) -> anyhow::Result { + Ok(serde_bare::from_slice(bytes)?) + } + + fn dump_to_db_bytes(&self, item: &Self::Item) -> anyhow::Result> { + Ok(serde_bare::to_vec(item)?) + } +} + +pub struct CompressorWrapper { + inner: T, + compressor: Mutex>, + decompressor: Mutex>, +} + +impl ByteWrapper for CompressorWrapper { + type Item = T::Item; + + fn load_from_db_bytes(&self, bytes: &[u8]) -> anyhow::Result { + let mut decompressor = self + .decompressor + .lock() + .map_err(|_| anyhow!("Can't lock decompressor"))?; + // TODO be more sensible about capacity. 8× should be good though. + let raw_bytes = decompressor.decompress(bytes, bytes.len() * 8)?; + Ok(self.inner.load_from_db_bytes(&raw_bytes)?) + } + + fn dump_to_db_bytes(&self, item: &Self::Item) -> anyhow::Result> { + let raw_bytes = self.inner.dump_to_db_bytes(item)?; + let mut compressor = self + .compressor + .lock() + .map_err(|_| anyhow!("Can't lock compressor"))?; + Ok(compressor.compress(&raw_bytes)?) + } +} + +pub trait ZeroCopyByteWrapper { + fn as_byte_slice(&self) -> &[u8]; + fn from_byte_slice(bytes: &[u8]) -> anyhow::Result<&Self>; +} + +impl ZeroCopyByteWrapper for [u8] { + fn as_byte_slice(&self) -> &[u8] { + self + } + + fn from_byte_slice(bytes: &[u8]) -> anyhow::Result<&Self> { + Ok(bytes) + } +} + +impl ZeroCopyByteWrapper for str { + fn as_byte_slice(&self) -> &[u8] { + self.as_bytes() + } + + fn from_byte_slice(bytes: &[u8]) -> anyhow::Result<&Self> { + Ok(std::str::from_utf8(bytes)?) + } +}