Add operation to describe the pipeline
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/release Pipeline was successful Details

This commit is contained in:
Olivier 'reivilibre' 2022-05-28 22:44:36 +01:00
parent f4debbc9fe
commit 760626d01e
9 changed files with 112 additions and 46 deletions

View File

@ -18,7 +18,9 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
use crate::chunking::RecursiveUnchunker;
use crate::commands::retrieve_tree_node;
use crate::definitions::{ChunkId, TreeNode};
use crate::pile::{ControllerMessage, Keyspace, Pile, RawPile, StoragePipelineSettings};
use crate::pile::{
ControllerMessage, Keyspace, Pile, PipelineDescription, RawPile, StoragePipelineSettings,
};
use anyhow::bail;
use crossbeam_channel::Sender;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
@ -131,6 +133,10 @@ impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
self.underlying
.build_storage_pipeline(settings, controller_send)
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
self.underlying.describe_pipeline()
}
}
/// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time.

View File

@ -125,7 +125,14 @@ pub enum ControllerMessage {
},
}
// TODO(newver) Make piles async
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum PipelineDescription {
Store,
Remote,
Integrity,
Compression { dictionary_fingerprint: u64 },
Encryption,
}
pub trait RawPile: Send + Sync + Debug + 'static {
// TODO expose verification errors?
@ -157,6 +164,8 @@ pub trait RawPile: Send + Sync + Debug + 'static {
settings: StoragePipelineSettings,
controller_send: Sender<ControllerMessage>,
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>>;
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>>;
}
impl RawPile for Box<dyn RawPile> {
@ -196,6 +205,10 @@ impl RawPile for Box<dyn RawPile> {
self.as_ref()
.build_storage_pipeline(settings, controller_send)
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
self.as_ref().describe_pipeline()
}
}
impl<RP: RawPile> RawPile for Arc<RP> {
@ -235,6 +248,10 @@ impl<RP: RawPile> RawPile for Arc<RP> {
self.as_ref()
.build_storage_pipeline(settings, controller_send)
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
self.as_ref().describe_pipeline()
}
}
#[derive(Debug)]

View File

@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/
use std::convert::TryInto;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
@ -27,7 +28,10 @@ use metrics::{register_counter, Unit};
use zstd::bulk::{Compressor, Decompressor};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings};
use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings,
};
pub const DECOMPRESS_CAPACITY: usize = 32 * 1024 * 1024;
@ -331,4 +335,17 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
Ok(input_to_this_stage)
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
let mut underlying = self.underlying.describe_pipeline()?;
let mut dict_fingerprint_u256 = [0; 32];
blake::hash(256, &self.settings.dictionary, &mut dict_fingerprint_u256)?;
let dictionary_fingerprint: u64 =
u64::from_be_bytes(dict_fingerprint_u256[0..8].try_into().unwrap());
underlying.push(PipelineDescription::Compression {
dictionary_fingerprint,
});
Ok(underlying)
}
}

View File

@ -21,7 +21,9 @@ use sodiumoxide::crypto::secretbox;
use sodiumoxide::crypto::secretbox::{Key, Nonce, NONCEBYTES};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings};
use crate::pile::{
ControllerMessage, Keyspace, PipelineDescription, RawPile, StoragePipelineSettings,
};
use crossbeam_channel::Sender;
/// A RawPile that provides encryption of chunk contents.
@ -119,4 +121,10 @@ impl<R: RawPile> RawPile for RawPileEncryptor<R> {
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>> {
todo!()
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
let mut underlying = self.underlying.describe_pipeline()?;
underlying.push(PipelineDescription::Encryption);
Ok(underlying)
}
}

View File

@ -20,7 +20,10 @@ use std::hash::Hasher;
use thiserror::Error;
use crate::definitions::{ChunkId, XXH64_SEED};
use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings};
use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings,
};
use crate::utils::bytes_to_hexstring;
use crossbeam_channel::Sender;
@ -140,4 +143,10 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
.unwrap();
Ok(input)
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
let mut underlying = self.underlying.describe_pipeline()?;
underlying.push(PipelineDescription::Integrity);
Ok(underlying)
}
}

View File

@ -32,7 +32,10 @@ use rusqlite::{params, Error, ErrorCode};
use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, DebugStatistics, Keyspace, RawPile, StoragePipelineSettings};
use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings,
};
use crate::utils::bytes_to_hexstring;
use crossbeam_channel::{Receiver, Sender};
use rusqlite::ffi::ErrorCode::ConstraintViolation;
@ -723,6 +726,10 @@ impl RawPile for SqliteBloblogPile {
Ok(sender)
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
Ok(vec![PipelineDescription::Store])
}
}
struct KeyIterator {

View File

@ -22,7 +22,7 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::pile::Keyspace;
use crate::pile::{Keyspace, PipelineDescription};
pub mod requester;
pub mod responder;
@ -60,6 +60,7 @@ pub enum RequestBody {
},
Flush,
LowLevelCheck,
Describe,
Shutdown,
Progress {
current: u64,
@ -73,7 +74,7 @@ pub struct Response {
body: ResponseBody,
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ResponseBody {
Success,
Failed(String),
@ -83,6 +84,7 @@ pub enum ResponseBody {
batch: Vec<Vec<u8>>,
next_token: u16,
},
Description(Vec<PipelineDescription>),
}
pub fn read_message<R: Read, D: DeserializeOwned>(read: &mut R) -> anyhow::Result<D> {

View File

@ -9,7 +9,9 @@ use crossbeam_channel::{Receiver, Sender};
use log::{error, info};
use crate::definitions::ChunkId;
use crate::pile::{ControllerMessage, Keyspace, RawPile, StoragePipelineSettings};
use crate::pile::{
ControllerMessage, Keyspace, PipelineDescription, RawPile, StoragePipelineSettings,
};
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
use metrics::{
gauge, histogram, increment_counter, register_counter, register_gauge, register_histogram, Unit,
@ -269,8 +271,7 @@ impl RawPile for Requester {
ResponseBody::Success => Ok(true),
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
ResponseBody::NotExists => Ok(false),
ResponseBody::Data(_) => Err(anyhow!("Received Data for exists.")),
ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for exists.")),
other => Err(anyhow!("Received {:?} for Exists", other)),
}
}
fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
@ -282,7 +283,7 @@ impl RawPile for Requester {
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
ResponseBody::NotExists => Ok(None),
ResponseBody::Data(data) => Ok(Some(data)),
ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for read.")),
other => Err(anyhow!("Received {:?} for Read", other)),
}
}
fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> {
@ -293,9 +294,7 @@ impl RawPile for Requester {
})? {
ResponseBody::Success => Ok(()),
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
ResponseBody::NotExists => Err(anyhow!("Received NotExists for write.")),
ResponseBody::Data(_) => Err(anyhow!("Received Data for write.")),
ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for write.")),
other => Err(anyhow!("Received {:?} for Write", other)),
}
}
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
@ -305,9 +304,7 @@ impl RawPile for Requester {
})? {
ResponseBody::Success => Ok(()),
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
ResponseBody::NotExists => Err(anyhow!("Received NotExists for delete.")),
ResponseBody::Data(_) => Err(anyhow!("Received Data for delete.")),
ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for delete.")),
other => Err(anyhow!("Received {:?} for Delete", other)),
}
}
fn list_keys(
@ -321,31 +318,26 @@ impl RawPile for Requester {
buffer: Vec::with_capacity(0),
})),
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
ResponseBody::NotExists => Err(anyhow!("Received NotExists for list_keys.")),
ResponseBody::Data(_) => Err(anyhow!("Received Data for list_keys.")),
ResponseBody::BatchData { batch, next_token } => Ok(Box::new(ListKeyIterator {
command_sender: self.commands.clone(),
batch_token: Some(next_token),
buffer: batch,
})),
other => Err(anyhow!("Received {:?} for List", other)),
}
}
fn flush(&self) -> anyhow::Result<()> {
match self.request(RequestBody::Flush)? {
ResponseBody::Success => Ok(()),
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
ResponseBody::NotExists => Err(anyhow!("Received NotExists for Flush.")),
ResponseBody::Data(_) => Err(anyhow!("Received Data for Flush.")),
ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for Flush.")),
other => Err(anyhow!("Received {:?} for Flush", other)),
}
}
fn check_lowlevel(&self) -> anyhow::Result<bool> {
match self.request(RequestBody::LowLevelCheck)? {
ResponseBody::Success => Ok(true),
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
ResponseBody::NotExists => Err(anyhow!("Received NotExists for LowLevelCheck.")),
ResponseBody::Data(_) => Err(anyhow!("Received Data for LowLevelCheck.")),
ResponseBody::BatchData { .. } => Err(anyhow!("Received BatchData for LowLevelCheck.")),
other => Err(anyhow!("Received {:?} for LowLevelCheck", other)),
}
}
@ -396,15 +388,7 @@ impl RawPile for Requester {
ResponseBody::Failed(string) => {
panic!("Requester pipeline fail {}", string);
}
ResponseBody::BatchData { .. } => {
panic!("wtf BatchData");
}
ResponseBody::NotExists => {
panic!("wtf NotExists");
}
ResponseBody::Data(_) => {
panic!("wtf Data");
}
other => panic!("wtf {:?}", other),
}
}
recv(receiver) -> resp => {
@ -434,15 +418,7 @@ impl RawPile for Requester {
ResponseBody::Failed(string) => {
panic!("Requester pipeline fail {}", string);
}
ResponseBody::BatchData { .. } => {
panic!("wtf BatchData");
}
ResponseBody::NotExists => {
panic!("wtf NotExists");
}
ResponseBody::Data(_) => {
panic!("wtf Data");
}
other => panic!("wtf {:?}", other),
}
}
}
@ -451,6 +427,17 @@ impl RawPile for Requester {
Ok(input)
}
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
match self.request(RequestBody::Describe)? {
ResponseBody::Description(mut description) => {
description.push(PipelineDescription::Remote);
Ok(description)
}
ResponseBody::Failed(err_msg) => Err(anyhow!("Remote failure: {}", err_msg)),
other => Err(anyhow!("Received {:?} for Describe", other)),
}
}
}
pub struct ListKeyIterator {
@ -478,8 +465,6 @@ impl Iterator for ListKeyIterator {
None
}
ResponseBody::Failed(err_msg) => Some(Err(anyhow!("Remote failure: {}", err_msg))),
ResponseBody::NotExists => Some(Err(anyhow!("Received NotExists for NextBatch."))),
ResponseBody::Data(_) => Some(Err(anyhow!("Received Data for NextBatch."))),
ResponseBody::BatchData { batch, next_token } => {
self.batch_token = Some(next_token);
self.buffer = batch;
@ -491,6 +476,7 @@ impl Iterator for ListKeyIterator {
None
}
}
other => Some(Err(anyhow!("Received {:?} for NextBatch", other))),
}
} else {
None

View File

@ -349,6 +349,20 @@ impl Responder {
RequestBody::Progress { .. } => {
unreachable!("handled by readea")
}
RequestBody::Describe => match pile.describe_pipeline() {
Ok(description) => Response {
response_to: request.id,
body: ResponseBody::Description(description),
},
Err(err) => {
warn!("Error whilst doing a raw describe_pipeline: {:?}", err);
let err = format!("{:?}", err);
Response {
response_to: request.id,
body: ResponseBody::Failed(err),
}
}
},
};
responses