Push things around so that the chunking process doesn't need to know about pointers
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/release Pipeline was successful Details

This commit is contained in:
Olivier 'reivilibre' 2022-05-29 00:10:53 +01:00
parent f1b73b28ee
commit 8692d83510
4 changed files with 67 additions and 100 deletions

View File

@ -9,7 +9,8 @@ use std::path::Path;
use std::process::{Child, Command, Stdio}; use std::process::{Child, Command, Stdio};
use std::sync::Arc; use std::sync::Arc;
use yama::commands::{load_pile_descriptor, open_pile}; use yama::commands::{load_pile_descriptor, open_pile};
use yama::definitions::TreeNode; use yama::definitions::{PartialPointerData, TreeNode};
use yama::operations::storing::{pointer_ops_prepare_to_store, pointers_ops_after_store};
use yama::pile::{Pile, RawPile, StoragePipelineSettings}; use yama::pile::{Pile, RawPile, StoragePipelineSettings};
use yama::progress::ProgressTracker; use yama::progress::ProgressTracker;
use yama::remote::responder::{Responder, ResponderWritingPipeline}; use yama::remote::responder::{Responder, ResponderWritingPipeline};
@ -64,19 +65,15 @@ pub fn chunking<
read: R, read: R,
mut write: W, mut write: W,
path: &Path, path: &Path,
pointer_name: String,
tree_node: &TreeNode, tree_node: &TreeNode,
raw_pile: Arc<RP>, raw_pile: Arc<RP>,
parent: Option<String>,
progress_bar: PT, progress_bar: PT,
use_writing_pipeline: bool, use_writing_pipeline: bool,
) -> anyhow::Result<(R, W)> { ) -> anyhow::Result<(R, W, PartialPointerData)> {
info!("Chunking."); info!("Chunking.");
write_message(&mut write, &"chunk")?; write_message(&mut write, &"chunk")?;
write_message(&mut write, &path)?; write_message(&mut write, &path)?;
write_message(&mut write, &pointer_name)?;
write_message(&mut write, tree_node)?; write_message(&mut write, tree_node)?;
write_message(&mut write, &parent)?;
write.flush()?; write.flush()?;
let (writing_pipeline, control_rx) = if use_writing_pipeline { let (writing_pipeline, control_rx) = if use_writing_pipeline {
@ -111,7 +108,7 @@ pub fn chunking<
for handle in join_handles { for handle in join_handles {
handle.join().expect("Join handle should not fail"); handle.join().expect("Join handle should not fail");
} }
let read = r_handle.join().unwrap(); let mut read = r_handle.join().unwrap();
let write = w_handle.join().unwrap(); let write = w_handle.join().unwrap();
if let Some(control_rx) = control_rx { if let Some(control_rx) = control_rx {
@ -122,7 +119,9 @@ pub fn chunking<
info!("Remote finished chunking."); info!("Remote finished chunking.");
Ok((read, write)) let pointer_data: PartialPointerData = read_message(&mut read)?;
Ok((read, write, pointer_data))
} }
pub fn quit<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> { pub fn quit<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
@ -199,7 +198,7 @@ pub fn backup_remote_source_to_destination<PT: ProgressTracker + Send + 'static>
let scan_result = scanning(&mut read, &mut write, directory.as_ref())? let scan_result = scanning(&mut read, &mut write, directory.as_ref())?
.ok_or_else(|| anyhow!("Remote scan failed (does the directory exist?)"))?; .ok_or_else(|| anyhow!("Remote scan failed (does the directory exist?)"))?;
let root = let mut root =
label_filter_and_convert(scan_result, descriptor, desc_path, source_name, dest)? label_filter_and_convert(scan_result, descriptor, desc_path, source_name, dest)?
.ok_or_else(|| anyhow!("Empty filter..."))?; .ok_or_else(|| anyhow!("Empty filter..."))?;
@ -248,19 +247,27 @@ pub fn backup_remote_source_to_destination<PT: ProgressTracker + Send + 'static>
let raw_pile = Arc::new(pile.raw_pile); let raw_pile = Arc::new(pile.raw_pile);
let pile = Pile::new(raw_pile.clone()); let pile = Pile::new(raw_pile.clone());
let (mut read, mut write) = chunking( pointer_ops_prepare_to_store(&pile, &mut root, &parent)?;
info!(
"Have pointer_name = {:?}, parent = {:?}",
pointer_name, parent
);
let (mut read, mut write, pointer_data) = chunking(
read, read,
write, write,
directory.as_ref(), directory.as_ref(),
pointer_name.clone(),
&root, &root,
raw_pile, raw_pile,
parent,
progress_bar, progress_bar,
true, true,
)?; )?;
quit(&mut read, &mut write)?; quit(&mut read, &mut write)?;
pointers_ops_after_store(&pile, &pointer_name, &pointer_data.complete(parent))?;
pile.flush()?; pile.flush()?;
info!("Stored! Checking for existence..."); info!("Stored! Checking for existence...");

View File

@ -1,21 +1,24 @@
// This file implements the responder side of the backup source protocol -- the protocol used // This file implements the responder side of the backup source protocol -- the protocol used
// to connect to remote backup sources. // to connect to remote backup sources.
use crate::tree::scan;
use anyhow::bail;
use crossbeam_channel::Sender;
use log::info;
use std::io::{stdin, stdout, Read, Write}; use std::io::{stdin, stdout, Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use yama::definitions::TreeNode;
use anyhow::bail;
use crossbeam_channel::Sender;
use log::info;
use yama::definitions::{PartialPointerData, TreeNode};
use yama::pile::{Pile, RawPile}; use yama::pile::{Pile, RawPile};
use yama::progress::ProgressTracker; use yama::progress::ProgressTracker;
use yama::remote::requester::Requester; use yama::remote::requester::Requester;
use yama::remote::{read_message, write_message, RequestBody, ResponseBody}; use yama::remote::{read_message, write_message, RequestBody, ResponseBody};
use yama::utils::get_number_of_workers; use yama::utils::get_number_of_workers;
use crate::tree::scan;
pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> { pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
let version = env!("CARGO_PKG_VERSION"); let version = env!("CARGO_PKG_VERSION");
write_message( write_message(
@ -45,41 +48,6 @@ pub fn scanning<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Resul
Ok(()) Ok(())
} }
pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static>(
mut read: R,
write: W,
) -> anyhow::Result<()> {
let path: PathBuf = read_message(&mut read)?;
let pointer_name: String = read_message(&mut read)?;
let tree_node: TreeNode = read_message(&mut read)?;
let parent: Option<String> = read_message(&mut read)?;
let (yama_requester, requester_join_handles) = Requester::new(read, write);
let raw_pile: Box<dyn RawPile> = Box::new(yama_requester);
let pile = Pile::new(raw_pile);
// TODO TODO progress
let progress_bar = &mut ();
yama::operations::storing::store_fully(
Arc::new(pile),
&path,
&pointer_name,
tree_node,
parent,
get_number_of_workers("YAMA_CHUNKERS"),
progress_bar,
)?;
for join_handle in requester_join_handles {
join_handle.join().expect("Expected to join handle");
}
Ok(())
}
pub struct ProgressSender { pub struct ProgressSender {
pub last_sent: Instant, pub last_sent: Instant,
pub current_progress: u64, pub current_progress: u64,
@ -128,23 +96,16 @@ impl ProgressTracker for ProgressSender {
} }
} }
pub fn chunking_stdio() -> anyhow::Result<()> { pub fn chunking_stdio() -> anyhow::Result<PartialPointerData> {
let (path, pointer_name, tree_node, parent) = { let (path, tree_node) = {
let stdin = stdin(); let stdin = stdin();
let mut read = stdin.lock(); let mut read = stdin.lock();
let path: PathBuf = read_message(&mut read)?; let path: PathBuf = read_message(&mut read)?;
let pointer_name: String = read_message(&mut read)?;
let tree_node: TreeNode = read_message(&mut read)?; let tree_node: TreeNode = read_message(&mut read)?;
let parent: Option<String> = read_message(&mut read)?; (path, tree_node)
(path, pointer_name, tree_node, parent)
}; };
info!( let (pointer_data, requester_join_handles) = {
"Have pointer_name = {:?}, parent = {:?}",
pointer_name, parent
);
let requester_join_handles = {
let (yama_requester, requester_join_handles) = Requester::new_from_stdio(); let (yama_requester, requester_join_handles) = Requester::new_from_stdio();
let command_sender = yama_requester.clone_command_sender(); let command_sender = yama_requester.clone_command_sender();
info!("progress sender in use"); info!("progress sender in use");
@ -160,17 +121,15 @@ pub fn chunking_stdio() -> anyhow::Result<()> {
let pile = Pile::new(raw_pile); let pile = Pile::new(raw_pile);
yama::operations::storing::store_fully( let pointer_data = yama::operations::storing::store_without_pointer_ops(
Arc::new(pile), &Arc::new(pile),
&path, &path,
&pointer_name,
tree_node, tree_node,
parent,
get_number_of_workers("YAMA_CHUNKERS"), get_number_of_workers("YAMA_CHUNKERS"),
&mut progress_bar, &mut progress_bar,
)?; )?;
requester_join_handles (pointer_data, requester_join_handles)
}; };
info!("Waiting to join."); info!("Waiting to join.");
@ -181,20 +140,7 @@ pub fn chunking_stdio() -> anyhow::Result<()> {
info!("Chunking completed."); info!("Chunking completed.");
Ok(()) Ok(pointer_data)
}
pub fn handler<R: Read + Send + 'static, W: Write + Send + 'static>(
mut read: R,
mut write: W,
) -> anyhow::Result<()> {
introduction(&mut read, &mut write)?;
scanning(&mut read, &mut write)?;
chunking(read, write)?;
Ok(())
} }
pub fn handler_stdio() -> anyhow::Result<()> { pub fn handler_stdio() -> anyhow::Result<()> {
@ -217,9 +163,10 @@ pub fn handler_stdio() -> anyhow::Result<()> {
info!("Chunking."); info!("Chunking.");
drop(read); drop(read);
drop(write); drop(write);
chunking_stdio()?; let pointer_data = chunking_stdio()?;
read = stdin.lock(); read = stdin.lock();
write = stdout.lock(); write = stdout.lock();
write_message(&mut write, &pointer_data)?;
} }
"exit" => { "exit" => {
write_message(&mut write, &"exit")?; write_message(&mut write, &"exit")?;

View File

@ -35,6 +35,24 @@ pub struct PointerData {
pub gid_lookup: BTreeMap<u16, Option<String>>, pub gid_lookup: BTreeMap<u16, Option<String>>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartialPointerData {
pub chunk_ref: RecursiveChunkRef,
pub uid_lookup: BTreeMap<u16, Option<String>>,
pub gid_lookup: BTreeMap<u16, Option<String>>,
}
impl PartialPointerData {
pub fn complete(self, parent_pointer: Option<String>) -> PointerData {
PointerData {
chunk_ref: self.chunk_ref,
parent_pointer,
uid_lookup: self.uid_lookup,
gid_lookup: self.gid_lookup,
}
}
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct RecursiveChunkRef { pub struct RecursiveChunkRef {
/// The root Chunk ID. /// The root Chunk ID.

View File

@ -29,7 +29,9 @@ use log::{error, warn};
use crate::chunking::{ChunkSubmissionTarget, RecursiveChunker, SENSIBLE_THRESHOLD}; use crate::chunking::{ChunkSubmissionTarget, RecursiveChunker, SENSIBLE_THRESHOLD};
use crate::commands; use crate::commands;
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node}; use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node};
use crate::definitions::{PointerData, RecursiveChunkRef, RootTreeNode, TreeNode}; use crate::definitions::{
PartialPointerData, PointerData, RecursiveChunkRef, RootTreeNode, TreeNode,
};
use crate::pile::{existence_checker_stage, Pile, RawPile, StoragePipelineSettings}; use crate::pile::{existence_checker_stage, Pile, RawPile, StoragePipelineSettings};
use crate::progress::ProgressTracker; use crate::progress::ProgressTracker;
use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place}; use crate::tree::{create_uidgid_lookup_tables, differentiate_node_in_place};
@ -242,21 +244,16 @@ pub fn store_fully<PT: ProgressTracker>(
progress_bar: &mut PT, progress_bar: &mut PT,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
pointer_ops_prepare_to_store(&pile, &mut root_node, &parent)?; pointer_ops_prepare_to_store(&pile, &mut root_node, &parent)?;
let pointer_data = store_without_pointer_ops( let pointer_data =
&pile, store_without_pointer_ops(&pile, &root_dir, root_node, num_workers, progress_bar)?
&root_dir, .complete(parent);
root_node, pointers_ops_after_store(&pile, &new_pointer_name, &pointer_data)?;
parent,
num_workers,
progress_bar,
)?;
pointers_ops_after_store(pile, &new_pointer_name, &pointer_data)?;
Ok(()) Ok(())
} }
pub fn pointers_ops_after_store( pub fn pointers_ops_after_store(
pile: Arc<Pile<Box<dyn RawPile>>>, pile: &Pile<impl RawPile>,
new_pointer_name: &&String, new_pointer_name: &str,
pointer_data: &PointerData, pointer_data: &PointerData,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
pile.write_pointer(&new_pointer_name, &pointer_data)?; pile.write_pointer(&new_pointer_name, &pointer_data)?;
@ -265,7 +262,7 @@ pub fn pointers_ops_after_store(
} }
pub fn pointer_ops_prepare_to_store( pub fn pointer_ops_prepare_to_store(
pile: &Arc<Pile<Box<dyn RawPile>>>, pile: &Pile<impl RawPile>,
mut root_node: &mut TreeNode, mut root_node: &mut TreeNode,
parent: &Option<String>, parent: &Option<String>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -288,10 +285,9 @@ pub fn store_without_pointer_ops<PT: ProgressTracker>(
pile: &Arc<Pile<Box<dyn RawPile>>>, pile: &Arc<Pile<Box<dyn RawPile>>>,
root_dir: &PathBuf, root_dir: &PathBuf,
mut root_node: TreeNode, mut root_node: TreeNode,
parent: Option<String>,
num_workers: u8, num_workers: u8,
progress_bar: &mut PT, progress_bar: &mut PT,
) -> anyhow::Result<PointerData> { ) -> anyhow::Result<PartialPointerData> {
// TODO make these configurable // TODO make these configurable
let sps = StoragePipelineSettings { let sps = StoragePipelineSettings {
num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32, num_compressors: get_number_of_workers("YAMA_PL_COMPRESSORS") as u32,
@ -332,9 +328,8 @@ pub fn store_without_pointer_ops<PT: ProgressTracker>(
}, },
)?; )?;
let pointer_data = PointerData { let pointer_data = PartialPointerData {
chunk_ref, chunk_ref,
parent_pointer: parent,
uid_lookup, uid_lookup,
gid_lookup, gid_lookup,
}; };