Add a basic access guard
This commit is contained in:
parent
081a1922c7
commit
db0d9dd493
@ -26,6 +26,7 @@ use std::collections::HashSet;
|
|||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::{Arc, Condvar, Mutex};
|
use std::sync::{Arc, Condvar, Mutex};
|
||||||
|
|
||||||
|
pub mod access_guard;
|
||||||
pub mod compression;
|
pub mod compression;
|
||||||
pub mod encryption;
|
pub mod encryption;
|
||||||
pub mod integrity;
|
pub mod integrity;
|
||||||
|
|||||||
126
yama/src/pile/access_guard.rs
Normal file
126
yama/src/pile/access_guard.rs
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
use crate::chunking::calculate_chunkid;
|
||||||
|
use crate::definitions::ChunkId;
|
||||||
|
use crate::pile::{
|
||||||
|
ControllerMessage, Keyspace, PipelineDescription, RawPile, StoragePipelineSettings,
|
||||||
|
};
|
||||||
|
use anyhow::{anyhow, bail};
|
||||||
|
use crossbeam_channel::{Receiver, Sender};
|
||||||
|
use derivative::Derivative;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
/// PileGuard is a wrapper around a pile that prevents data exfiltration and malicious corruption.
|
||||||
|
/// It's basically a firewall for a Pile?
|
||||||
|
/// Preventing malicious corruption requires the chunks to be unprocessed. This way, their ID can be
|
||||||
|
/// checked by this module.
|
||||||
|
#[derive(Debug, Derivative)]
|
||||||
|
#[derivative(Clone(bound = ""))]
|
||||||
|
// we need to use derivative's Clone impl because Arc<R> causes R to have a bound on Clone
|
||||||
|
// even though that's not needed. https://github.com/rust-lang/rust/issues/26925
|
||||||
|
pub struct PileGuard<R: RawPile> {
|
||||||
|
underlying: Arc<R>,
|
||||||
|
/// Whether to verify chunk IDs to prevent malicious corruption
|
||||||
|
verify_chunk_ids: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pipeline(
|
||||||
|
subsequent_pipeline: Sender<(ChunkId, Vec<u8>)>,
|
||||||
|
input: Receiver<(ChunkId, Vec<u8>)>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
while let Ok((claimed_chunk_id, chunk)) = input.recv() {
|
||||||
|
let actual_chunk_id = calculate_chunkid(&chunk);
|
||||||
|
if actual_chunk_id != claimed_chunk_id {
|
||||||
|
bail!("CHUNK ID MISMATCH — is this forgery? (malicious storage process?) claimed{:?} actually{:?}", claimed_chunk_id, actual_chunk_id);
|
||||||
|
}
|
||||||
|
subsequent_pipeline
|
||||||
|
.send((claimed_chunk_id, chunk))
|
||||||
|
.map_err(|_| anyhow!("Subsequent step closed"))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: RawPile> PileGuard<R> {}
|
||||||
|
|
||||||
|
impl<R: RawPile> RawPile for PileGuard<R> {
|
||||||
|
fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<bool> {
|
||||||
|
match kind {
|
||||||
|
Keyspace::Chunk => self.underlying.exists(kind, key),
|
||||||
|
Keyspace::ChunkHash => {
|
||||||
|
bail!("Access denied");
|
||||||
|
}
|
||||||
|
Keyspace::Pointer => {
|
||||||
|
bail!("Access denied");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read(&self, _kind: Keyspace, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
|
||||||
|
bail!("Access denied");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write(&self, kind: Keyspace, _key: &[u8], _value: &[u8]) -> anyhow::Result<()> {
|
||||||
|
match kind {
|
||||||
|
Keyspace::Chunk => {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
Keyspace::ChunkHash => {
|
||||||
|
bail!("Access denied");
|
||||||
|
}
|
||||||
|
Keyspace::Pointer => {
|
||||||
|
bail!("Access denied");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn delete(&self, _kind: Keyspace, _key: &[u8]) -> anyhow::Result<()> {
|
||||||
|
bail!("Access denied");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_keys(
|
||||||
|
&self,
|
||||||
|
_kind: Keyspace,
|
||||||
|
) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<Vec<u8>>>>> {
|
||||||
|
bail!("Access denied");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&self) -> anyhow::Result<()> {
|
||||||
|
self.underlying.flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_lowlevel(&self) -> anyhow::Result<bool> {
|
||||||
|
self.underlying.check_lowlevel()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_storage_pipeline(
|
||||||
|
&self,
|
||||||
|
settings: StoragePipelineSettings,
|
||||||
|
controller_send: Sender<ControllerMessage>,
|
||||||
|
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
|
||||||
|
let subsequent_pipeline = self
|
||||||
|
.underlying
|
||||||
|
.build_storage_pipeline(settings.clone(), controller_send.clone())?;
|
||||||
|
|
||||||
|
let (input_to_this_stage, receiver) = crossbeam_channel::bounded(8);
|
||||||
|
|
||||||
|
thread::Builder::new()
|
||||||
|
.name("yama Aguard".to_owned())
|
||||||
|
.spawn(move || {
|
||||||
|
if let Err(err) = pipeline(subsequent_pipeline, receiver) {
|
||||||
|
controller_send
|
||||||
|
.send(ControllerMessage::Failure {
|
||||||
|
worker_id: Arc::new(String::from("accessguard")),
|
||||||
|
error_message: format!("err {:?}", err),
|
||||||
|
})
|
||||||
|
.expect("This is BAD: failed to send failure message to controller.");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(input_to_this_stage)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||||
|
// TODO(question) Should we be described in the pipeline?
|
||||||
|
self.underlying.describe_pipeline()
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user