diff --git a/yama/src/pile.rs b/yama/src/pile.rs index 8ae7639..11c7074 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -26,6 +26,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, Condvar, Mutex}; +pub mod access_guard; pub mod compression; pub mod encryption; pub mod integrity; diff --git a/yama/src/pile/access_guard.rs b/yama/src/pile/access_guard.rs new file mode 100644 index 0000000..723a126 --- /dev/null +++ b/yama/src/pile/access_guard.rs @@ -0,0 +1,126 @@ +use crate::chunking::calculate_chunkid; +use crate::definitions::ChunkId; +use crate::pile::{ + ControllerMessage, Keyspace, PipelineDescription, RawPile, StoragePipelineSettings, +}; +use anyhow::{anyhow, bail}; +use crossbeam_channel::{Receiver, Sender}; +use derivative::Derivative; +use std::sync::Arc; +use std::thread; + +/// PileGuard is a wrapper around a pile that prevents data exfiltration and malicious corruption. +/// It's basically a firewall for a Pile? +/// Preventing malicious corruption requires the chunks to be unprocessed. This way, their ID can be +/// checked by this module. +#[derive(Debug, Derivative)] +#[derivative(Clone(bound = ""))] +// we need to use derivative's Clone impl because Arc causes R to have a bound on Clone +// even though that's not needed. https://github.com/rust-lang/rust/issues/26925 +pub struct PileGuard { + underlying: Arc, + /// Whether to verify chunk IDs to prevent malicious corruption + verify_chunk_ids: bool, +} + +fn pipeline( + subsequent_pipeline: Sender<(ChunkId, Vec)>, + input: Receiver<(ChunkId, Vec)>, +) -> anyhow::Result<()> { + while let Ok((claimed_chunk_id, chunk)) = input.recv() { + let actual_chunk_id = calculate_chunkid(&chunk); + if actual_chunk_id != claimed_chunk_id { + bail!("CHUNK ID MISMATCH — is this forgery? (malicious storage process?) claimed{:?} actually{:?}", claimed_chunk_id, actual_chunk_id); + } + subsequent_pipeline + .send((claimed_chunk_id, chunk)) + .map_err(|_| anyhow!("Subsequent step closed"))?; + } + Ok(()) +} + +impl PileGuard {} + +impl RawPile for PileGuard { + fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result { + match kind { + Keyspace::Chunk => self.underlying.exists(kind, key), + Keyspace::ChunkHash => { + bail!("Access denied"); + } + Keyspace::Pointer => { + bail!("Access denied"); + } + } + } + + fn read(&self, _kind: Keyspace, _key: &[u8]) -> anyhow::Result>> { + bail!("Access denied"); + } + + fn write(&self, kind: Keyspace, _key: &[u8], _value: &[u8]) -> anyhow::Result<()> { + match kind { + Keyspace::Chunk => { + todo!() + } + Keyspace::ChunkHash => { + bail!("Access denied"); + } + Keyspace::Pointer => { + bail!("Access denied"); + } + } + } + + fn delete(&self, _kind: Keyspace, _key: &[u8]) -> anyhow::Result<()> { + bail!("Access denied"); + } + + fn list_keys( + &self, + _kind: Keyspace, + ) -> anyhow::Result>>>> { + bail!("Access denied"); + } + + fn flush(&self) -> anyhow::Result<()> { + self.underlying.flush() + } + + fn check_lowlevel(&self) -> anyhow::Result { + self.underlying.check_lowlevel() + } + + fn build_storage_pipeline( + &self, + settings: StoragePipelineSettings, + controller_send: Sender, + ) -> anyhow::Result)>> { + let subsequent_pipeline = self + .underlying + .build_storage_pipeline(settings.clone(), controller_send.clone())?; + + let (input_to_this_stage, receiver) = crossbeam_channel::bounded(8); + + thread::Builder::new() + .name("yama Aguard".to_owned()) + .spawn(move || { + if let Err(err) = pipeline(subsequent_pipeline, receiver) { + controller_send + .send(ControllerMessage::Failure { + worker_id: Arc::new(String::from("accessguard")), + error_message: format!("err {:?}", err), + }) + .expect("This is BAD: failed to send failure message to controller."); + } + }) + .unwrap(); + + Ok(input_to_this_stage) + } + + fn describe_pipeline(&self) -> anyhow::Result> { + // TODO(question) Should we be described in the pipeline? + self.underlying.describe_pipeline() + } +}