Add naïve existence checking stage
continuous-integration/drone the build was successful Details

This commit is contained in:
Olivier 'reivilibre' 2021-11-20 13:29:50 +00:00
parent b00a6da993
commit 8802994d96
5 changed files with 47 additions and 8 deletions

View File

@ -28,6 +28,7 @@ use std::fmt::Debug;
use std::io::Write;
use std::path::Path;
use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use yama::chunking::SENSIBLE_THRESHOLD;
use yama::commands::{load_pile_descriptor, open_pile, store_tree_node};
use yama::definitions::{
@ -194,7 +195,7 @@ pub fn backup_source_to_destination<PT: ProgressTracker>(
info!("Storing using yama.");
yama::operations::storing::store_fully(
&pile,
Arc::new(pile),
&absolute_source_path,
&pointer_name,
root,

View File

@ -7,6 +7,7 @@ use crossbeam_channel::Sender;
use log::info;
use std::io::{stdin, stdout, Read, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use yama::definitions::TreeNode;
use yama::pile::{Pile, RawPile};
@ -63,7 +64,7 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
let progress_bar = &mut ();
yama::operations::storing::store_fully(
&pile,
Arc::new(pile),
&path,
&pointer_name,
tree_node,
@ -161,7 +162,7 @@ pub fn chunking_stdio() -> anyhow::Result<()> {
let pile = Pile::new(raw_pile);
yama::operations::storing::store_fully(
&pile,
Arc::new(pile),
&path,
&pointer_name,
tree_node,

View File

@ -30,10 +30,12 @@ use crate::chunking::{ChunkSubmissionTarget, RecursiveChunker, SENSIBLE_THRESHOL
use crate::commands;
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node};
use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode};
use crate::pile::{Pile, RawPile, StoragePipelineSettings};
use crate::pile::{existence_checker_stage, Pile, RawPile, StoragePipelineSettings};
use crate::progress::ProgressTracker;
use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place};
use crate::utils::get_number_of_workers;
use std::collections::BTreeMap;
use std::sync::Arc;
pub fn store<CST: ChunkSubmissionTarget, PT: ProgressTracker>(
root_path: &Path,
@ -231,7 +233,7 @@ pub fn manager<PT: ProgressTracker>(
/// loaded and fully-integrated).
/// This also creates a pointer (which is why this is called `store_fully`).
pub fn store_fully<PT: ProgressTracker>(
pile: &Pile<Box<dyn RawPile>>,
pile: Arc<Pile<Box<dyn RawPile>>>,
root_dir: &PathBuf,
new_pointer_name: &String,
mut root_node: TreeNode,
@ -256,12 +258,14 @@ pub fn store_fully<PT: ProgressTracker>(
if use_pipelined_storage {
// TODO make these configurable
let sps = StoragePipelineSettings {
num_compressors: num_cpus::get() as u32,
num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32,
compressor_input_bound: 64,
writer_input_bound: 64,
};
let (control_tx, control_rx) = crossbeam_channel::unbounded();
let pile2 = pile.clone();
let pipeline = pile.raw_pile.build_storage_pipeline(sps, control_tx)?;
let pipeline = existence_checker_stage(pile2, pipeline);
store(
&root_dir,
&mut root_node,
@ -270,7 +274,13 @@ pub fn store_fully<PT: ProgressTracker>(
num_workers,
)?;
} else {
store(&root_dir, &mut root_node, pile, progress_bar, num_workers)?;
store(
&root_dir,
&mut root_node,
pile.as_ref(),
progress_bar,
num_workers,
)?;
}
let mut uid_lookup = BTreeMap::new();

View File

@ -20,6 +20,7 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use crate::definitions::{ChunkId, PointerData};
use crate::utils::get_number_of_workers;
use crossbeam_channel::Sender;
use std::collections::HashSet;
use std::fmt::Debug;
@ -82,6 +83,30 @@ pub struct StoragePipelineSettings {
pub writer_input_bound: u32,
}
pub fn existence_checker_stage<RP: RawPile>(
pile: Arc<Pile<RP>>,
next_stage: Sender<(ChunkId, Vec<u8>)>,
) -> Sender<(ChunkId, Vec<u8>)> {
let (tx, rx) = crossbeam_channel::bounded::<(ChunkId, Vec<u8>)>(32);
// TODO would like something better for the networked case
for _ in 0..get_number_of_workers("YAMA_EXISTENCE_CHECKERS") {
let next_stage = next_stage.clone();
let rx = rx.clone();
let pile = pile.clone();
std::thread::spawn(move || {
while let Ok((chunk_id, chunk)) = rx.recv() {
// TODO handle errors properly
if !pile.chunk_exists(&chunk_id).unwrap() {
next_stage.send((chunk_id, chunk)).unwrap();
}
}
});
}
tx
}
pub enum ControllerMessage {
Failure {
worker_id: Arc<String>,

View File

@ -198,7 +198,9 @@ impl Responder {
{
let mut chunk_id = ChunkId::default();
chunk_id.copy_from_slice(&key[..]);
writing_pipeline.pipeline_submission.send((chunk_id, value));
writing_pipeline
.pipeline_submission
.send((chunk_id, value))?;
// We lie and say it was successful once we submit.
// We'll complain on our side if anything goes wrong, anyway.
Response {