From 760626d01ed2535f8c0d1d1df979d80f5ccf0a99 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 28 May 2022 22:44:36 +0100 Subject: [PATCH] Add operation to describe the pipeline --- yama/src/operations/checking.rs | 8 +++- yama/src/pile.rs | 19 +++++++- yama/src/pile/compression.rs | 19 +++++++- yama/src/pile/encryption.rs | 10 ++++- yama/src/pile/integrity.rs | 11 ++++- yama/src/pile/local_sqlitebloblogs.rs | 9 +++- yama/src/remote.rs | 6 ++- yama/src/remote/requester.rs | 62 +++++++++++---------------- yama/src/remote/responder.rs | 14 ++++++ 9 files changed, 112 insertions(+), 46 deletions(-) diff --git a/yama/src/operations/checking.rs b/yama/src/operations/checking.rs index 4532792..d29bb28 100644 --- a/yama/src/operations/checking.rs +++ b/yama/src/operations/checking.rs @@ -18,7 +18,9 @@ along with Yama. If not, see . use crate::chunking::RecursiveUnchunker; use crate::commands::retrieve_tree_node; use crate::definitions::{ChunkId, TreeNode}; -use crate::pile::{ControllerMessage, Keyspace, Pile, RawPile, StoragePipelineSettings}; +use crate::pile::{ + ControllerMessage, Keyspace, Pile, PipelineDescription, RawPile, StoragePipelineSettings, +}; use anyhow::bail; use crossbeam_channel::Sender; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; @@ -131,6 +133,10 @@ impl RawPile for VacuumRawPile { self.underlying .build_storage_pipeline(settings, controller_send) } + + fn describe_pipeline(&self) -> anyhow::Result> { + self.underlying.describe_pipeline() + } } /// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time. diff --git a/yama/src/pile.rs b/yama/src/pile.rs index 4c04f46..8ae7639 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -125,7 +125,14 @@ pub enum ControllerMessage { }, } -// TODO(newver) Make piles async +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum PipelineDescription { + Store, + Remote, + Integrity, + Compression { dictionary_fingerprint: u64 }, + Encryption, +} pub trait RawPile: Send + Sync + Debug + 'static { // TODO expose verification errors? @@ -157,6 +164,8 @@ pub trait RawPile: Send + Sync + Debug + 'static { settings: StoragePipelineSettings, controller_send: Sender, ) -> anyhow::Result)>>; + + fn describe_pipeline(&self) -> anyhow::Result>; } impl RawPile for Box { @@ -196,6 +205,10 @@ impl RawPile for Box { self.as_ref() .build_storage_pipeline(settings, controller_send) } + + fn describe_pipeline(&self) -> anyhow::Result> { + self.as_ref().describe_pipeline() + } } impl RawPile for Arc { @@ -235,6 +248,10 @@ impl RawPile for Arc { self.as_ref() .build_storage_pipeline(settings, controller_send) } + + fn describe_pipeline(&self) -> anyhow::Result> { + self.as_ref().describe_pipeline() + } } #[derive(Debug)] diff --git a/yama/src/pile/compression.rs b/yama/src/pile/compression.rs index ef6355f..3962c1b 100644 --- a/yama/src/pile/compression.rs +++ b/yama/src/pile/compression.rs @@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License along with Yama. If not, see . */ +use std::convert::TryInto; use std::sync::Arc; use std::thread; use std::thread::JoinHandle; @@ -27,7 +28,10 @@ use metrics::{register_counter, Unit}; use zstd::bulk::{Compressor, Decompressor}; use crate::definitions::ChunkId; -use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings}; +use crate::pile::{ + ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile, + StoragePipelineSettings, +}; pub const DECOMPRESS_CAPACITY: usize = 32 * 1024 * 1024; @@ -331,4 +335,17 @@ impl RawPile for RawPileCompressor { Ok(input_to_this_stage) } + + fn describe_pipeline(&self) -> anyhow::Result> { + let mut underlying = self.underlying.describe_pipeline()?; + + let mut dict_fingerprint_u256 = [0; 32]; + blake::hash(256, &self.settings.dictionary, &mut dict_fingerprint_u256)?; + let dictionary_fingerprint: u64 = + u64::from_be_bytes(dict_fingerprint_u256[0..8].try_into().unwrap()); + underlying.push(PipelineDescription::Compression { + dictionary_fingerprint, + }); + Ok(underlying) + } } diff --git a/yama/src/pile/encryption.rs b/yama/src/pile/encryption.rs index b870272..167833a 100644 --- a/yama/src/pile/encryption.rs +++ b/yama/src/pile/encryption.rs @@ -21,7 +21,9 @@ use sodiumoxide::crypto::secretbox; use sodiumoxide::crypto::secretbox::{Key, Nonce, NONCEBYTES}; use crate::definitions::ChunkId; -use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings}; +use crate::pile::{ + ControllerMessage, Keyspace, PipelineDescription, RawPile, StoragePipelineSettings, +}; use crossbeam_channel::Sender; /// A RawPile that provides encryption of chunk contents. @@ -119,4 +121,10 @@ impl RawPile for RawPileEncryptor { ) -> anyhow::Result)>> { todo!() } + + fn describe_pipeline(&self) -> anyhow::Result> { + let mut underlying = self.underlying.describe_pipeline()?; + underlying.push(PipelineDescription::Encryption); + Ok(underlying) + } } diff --git a/yama/src/pile/integrity.rs b/yama/src/pile/integrity.rs index 34ac52f..e4b88b5 100644 --- a/yama/src/pile/integrity.rs +++ b/yama/src/pile/integrity.rs @@ -20,7 +20,10 @@ use std::hash::Hasher; use thiserror::Error; use crate::definitions::{ChunkId, XXH64_SEED}; -use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings}; +use crate::pile::{ + ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile, + StoragePipelineSettings, +}; use crate::utils::bytes_to_hexstring; use crossbeam_channel::Sender; @@ -140,4 +143,10 @@ impl RawPile for RawPileIntegrityChecker { .unwrap(); Ok(input) } + + fn describe_pipeline(&self) -> anyhow::Result> { + let mut underlying = self.underlying.describe_pipeline()?; + underlying.push(PipelineDescription::Integrity); + Ok(underlying) + } } diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index f287688..2069265 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -32,7 +32,10 @@ use rusqlite::{params, Error, ErrorCode}; use rusqlite::{Connection, OptionalExtension}; use crate::definitions::ChunkId; -use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings}; +use crate::pile::{ + ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile, + StoragePipelineSettings, +}; use crate::utils::bytes_to_hexstring; use crossbeam_channel::{Receiver, Sender}; use rusqlite::ffi::ErrorCode::ConstraintViolation; @@ -723,6 +726,10 @@ impl RawPile for SqliteBloblogPile { Ok(sender) } + + fn describe_pipeline(&self) -> anyhow::Result> { + Ok(vec![PipelineDescription::Store]) + } } struct KeyIterator { diff --git a/yama/src/remote.rs b/yama/src/remote.rs index ca207f1..40a84ed 100644 --- a/yama/src/remote.rs +++ b/yama/src/remote.rs @@ -22,7 +22,7 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use crate::pile::Keyspace; +use crate::pile::{Keyspace, PipelineDescription}; pub mod requester; pub mod responder; @@ -60,6 +60,7 @@ pub enum RequestBody { }, Flush, LowLevelCheck, + Describe, Shutdown, Progress { current: u64, @@ -73,7 +74,7 @@ pub struct Response { body: ResponseBody, } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum ResponseBody { Success, Failed(String), @@ -83,6 +84,7 @@ pub enum ResponseBody { batch: Vec>, next_token: u16, }, + Description(Vec), } pub fn read_message(read: &mut R) -> anyhow::Result { diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index b4eb04d..14d3e63 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -9,7 +9,9 @@ use crossbeam_channel::{Receiver, Sender}; use log::{error, info}; use crate::definitions::ChunkId; -use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings}; +use crate::pile::{ + ControllerMessage, Keyspace, PipelineDescription, RawPile, StoragePipelineSettings, +}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; use metrics::{ gauge, histogram, increment_counter, register_counter, register_gauge, register_histogram, Unit, @@ -269,8 +271,7 @@ impl RawPile for Requester { ResponseBody::Success => Ok(true), ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), ResponseBody::NotExists => Ok(false), - ResponseBody::Data(_) => Err(anyhow!("Received Data for exists.")), - ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for exists.")), + other => Err(anyhow!("Received {:?} for Exists", other)), } } fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result>> { @@ -282,7 +283,7 @@ impl RawPile for Requester { ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), ResponseBody::NotExists => Ok(None), ResponseBody::Data(data) => Ok(Some(data)), - ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for read.")), + other => Err(anyhow!("Received {:?} for Read", other)), } } fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> { @@ -293,9 +294,7 @@ impl RawPile for Requester { })? { ResponseBody::Success => Ok(()), ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), - ResponseBody::NotExists => Err(anyhow!("Received NotExists for write.")), - ResponseBody::Data(_) => Err(anyhow!("Received Data for write.")), - ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for write.")), + other => Err(anyhow!("Received {:?} for Write", other)), } } fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> { @@ -305,9 +304,7 @@ impl RawPile for Requester { })? { ResponseBody::Success => Ok(()), ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), - ResponseBody::NotExists => Err(anyhow!("Received NotExists for delete.")), - ResponseBody::Data(_) => Err(anyhow!("Received Data for delete.")), - ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for delete.")), + other => Err(anyhow!("Received {:?} for Delete", other)), } } fn list_keys( @@ -321,31 +318,26 @@ impl RawPile for Requester { buffer: Vec::with_capacity(0), })), ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), - ResponseBody::NotExists => Err(anyhow!("Received NotExists for list_keys.")), - ResponseBody::Data(_) => Err(anyhow!("Received Data for list_keys.")), ResponseBody::BatchData { batch, next_token } => Ok(Box::new(ListKeyIterator { command_sender: self.commands.clone(), batch_token: Some(next_token), buffer: batch, })), + other => Err(anyhow!("Received {:?} for List", other)), } } fn flush(&self) -> anyhow::Result<()> { match self.request(RequestBody::Flush)? { ResponseBody::Success => Ok(()), ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), - ResponseBody::NotExists => Err(anyhow!("Received NotExists for Flush.")), - ResponseBody::Data(_) => Err(anyhow!("Received Data for Flush.")), - ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for Flush.")), + other => Err(anyhow!("Received {:?} for Flush", other)), } } fn check_lowlevel(&self) -> anyhow::Result { match self.request(RequestBody::LowLevelCheck)? { ResponseBody::Success => Ok(true), ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), - ResponseBody::NotExists => Err(anyhow!("Received NotExists for LowLevelCheck.")), - ResponseBody::Data(_) => Err(anyhow!("Received Data for LowLevelCheck.")), - ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for LowLevelCheck.")), + other => Err(anyhow!("Received {:?} for LowLevelCheck", other)), } } @@ -396,15 +388,7 @@ impl RawPile for Requester { ResponseBody::Failed(string) => { panic!("Requester pipeline fail {}", string); } - ResponseBody::BatchData { .. } => { - panic!("wtf BatchData"); - } - ResponseBody::NotExists => { - panic!("wtf NotExists"); - } - ResponseBody::Data(_) => { - panic!("wtf Data"); - } + other => panic!("wtf {:?}", other), } } recv(receiver) -> resp => { @@ -434,15 +418,7 @@ impl RawPile for Requester { ResponseBody::Failed(string) => { panic!("Requester pipeline fail {}", string); } - ResponseBody::BatchData { .. } => { - panic!("wtf BatchData"); - } - ResponseBody::NotExists => { - panic!("wtf NotExists"); - } - ResponseBody::Data(_) => { - panic!("wtf Data"); - } + other => panic!("wtf {:?}", other), } } } @@ -451,6 +427,17 @@ impl RawPile for Requester { Ok(input) } + + fn describe_pipeline(&self) -> anyhow::Result> { + match self.request(RequestBody::Describe)? { + ResponseBody::Description(mut description) => { + description.push(PipelineDescription::Remote); + Ok(description) + } + ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)), + other => Err(anyhow!("Received {:?} for Describe", other)), + } + } } pub struct ListKeyIterator { @@ -478,8 +465,6 @@ impl Iterator for ListKeyIterator { None } ResponseBody::Failed(err_msg) => Some(Err(anyhow!("Remote failure: {}", err_msg))), - ResponseBody::NotExists => Some(Err(anyhow!("Received NotExists for NextBatch."))), - ResponseBody::Data(_) => Some(Err(anyhow!("Received Data for NextBatch."))), ResponseBody::BatchData { batch, next_token } => { self.batch_token = Some(next_token); self.buffer = batch; @@ -491,6 +476,7 @@ impl Iterator for ListKeyIterator { None } } + other => Some(Err(anyhow!("Received {:?} for NextBatch", other))), } } else { None diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index 54bfe3d..6c17593 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -349,6 +349,20 @@ impl Responder { RequestBody::Progress { .. } => { unreachable!("handled by readea") } + RequestBody::Describe => match pile.describe_pipeline() { + Ok(description) => Response { + response_to: request.id, + body: ResponseBody::Description(description), + }, + Err(err) => { + warn!("Error whilst doing a raw describe_pipeline: {:?}", err); + let err = format!("{:?}", err); + Response { + response_to: request.id, + body: ResponseBody::Failed(err), + } + } + }, }; responses