From bb8fc355f0bb9f3eaff8693bd6af8f7e644a417b Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 13 Jun 2022 23:15:46 +0100 Subject: [PATCH] Lay down the basic structure of push/pull offerer --- datman/src/commands.rs | 1 + datman/src/commands/pushpull.rs | 118 +++++++++++++++++++ datman/src/remote/backup_source_responder.rs | 1 + yama/src/bin/yama.rs | 4 +- yama/src/pile.rs | 2 +- 5 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 datman/src/commands/pushpull.rs diff --git a/datman/src/commands.rs b/datman/src/commands.rs index a8efb24..fc41ab0 100644 --- a/datman/src/commands.rs +++ b/datman/src/commands.rs @@ -26,6 +26,7 @@ pub mod backup; pub mod extract; pub mod ibrowse; pub mod ilabel; +pub mod pushpull; pub mod report; pub fn init_descriptor(path: &Path) -> anyhow::Result<()> { diff --git a/datman/src/commands/pushpull.rs b/datman/src/commands/pushpull.rs new file mode 100644 index 0000000..5a10811 --- /dev/null +++ b/datman/src/commands/pushpull.rs @@ -0,0 +1,118 @@ +// Push and Pull support for Datman + +use anyhow::{ensure, Context}; +use std::collections::{BTreeMap, BTreeSet}; +use std::io::{Read, Write}; +use yama::definitions::{ChunkId, PointerData}; +use yama::pile::{Pile, PipelineDescription, RawPile}; +use yama::remote::{read_message, write_message}; + +pub fn offer_pointers( + pile: &Pile, + writer: &mut W, +) -> anyhow::Result> { + let mut pointers_to_offer: BTreeMap = BTreeMap::new(); + + for pointer_name in pile.list_pointers()? { + let pointer_data = pile + .read_pointer(&pointer_name)? + .context("Listed pointer not present")?; + + pointers_to_offer.insert(pointer_name, pointer_data); + } + + write_message(writer, &pointers_to_offer)?; + Ok(pointers_to_offer) +} + +pub fn ensure_compatible_bypasses( + my_full: &Vec, + my_bypass: &Vec, + their_full: &Vec, + their_bypass: &Vec, +) -> anyhow::Result<()> { + ensure!( + my_full.starts_with(&my_bypass), + "Our full pipeline is not an extension of the bypass pipeline." + ); + ensure!( + their_full.starts_with(&their_bypass), + "Their full pipeline is not an extension of their bypass pipeline." + ); + + let my_bypassed_parts = &my_full[my_bypass.len()..]; + let their_bypassed_parts = &their_full[their_bypass.len()..]; + + ensure!( + my_bypassed_parts == their_bypassed_parts, + "Our bypassed parts and their bypassed parts are not the same.\nOurs: {:?}\nTheirs: {:?}", + my_bypassed_parts, + their_bypassed_parts + ); + + Ok(()) +} + +pub fn negotiate_bypassed_pile( + pile: &Pile>, + bypass_pile: &Box, + reader: &mut R, + writer: &mut W, +) -> anyhow::Result<()> { + let my_full_pipeline = pile.raw_pile.describe_pipeline()?; + let my_bypass_pipeline = bypass_pile.describe_pipeline()?; + + write_message(writer, &my_full_pipeline)?; + write_message(writer, &my_bypass_pipeline)?; + + let their_full_pipeline = read_message::<_, Vec>(reader)?; + let their_bypass_pipeline = read_message::<_, Vec>(reader)?; + + ensure_compatible_bypasses( + &my_full_pipeline, + &my_bypass_pipeline, + &their_full_pipeline, + &their_bypass_pipeline, + )?; + + Ok(()) +} + +pub fn offering_side( + pile: &Pile>, + bypass_pile: Box, + reader: &mut R, + writer: &mut W, +) -> anyhow::Result<()> { + // First 'negotiate' (for now: assert) a pile bypass. + // This lets us avoid decompressing things before recompressing them at the other end, + // assuming both ends use the same dictionary. + negotiate_bypassed_pile(pile, &bypass_pile, reader, writer)?; + + let offered_pointers = offer_pointers(pile, writer)?; + let wanted_pointers = read_message::<_, Vec>(reader)?; + + for pointer in &wanted_pointers { + ensure!( + offered_pointers.contains_key(pointer), + "Requested pointer {:?} not offered", + pointer + ); + } + + let chunks_to_offer: BTreeSet = todo!(); + + write_message(writer, &chunks_to_offer)?; + + let chunks_to_skip: BTreeSet = read_message(reader)?; + let chunks_to_send: Vec = chunks_to_offer.difference(&chunks_to_skip).collect(); + + drop(chunks_to_offer); + drop(chunks_to_skip); + + for chunk_id in chunks_to_send { + todo!(); + } + + Ok(()) +} diff --git a/datman/src/remote/backup_source_responder.rs b/datman/src/remote/backup_source_responder.rs index 8efac4d..af28007 100644 --- a/datman/src/remote/backup_source_responder.rs +++ b/datman/src/remote/backup_source_responder.rs @@ -96,6 +96,7 @@ impl ProgressTracker for ProgressSender { } } +// TODO use io-streams crate and get rid of the duplication!! pub fn chunking_stdio() -> anyhow::Result { let (path, tree_node) = { let stdin = stdin(); diff --git a/yama/src/bin/yama.rs b/yama/src/bin/yama.rs index bec6c51..bb63adf 100644 --- a/yama/src/bin/yama.rs +++ b/yama/src/bin/yama.rs @@ -26,7 +26,9 @@ use std::sync::Arc; use yama::commands::{fully_integrate_pointer_node, load_pile_descriptor, open_pile}; use yama::debug::{debug_command, DebugCommand}; use yama::operations::checking::VacuumMode; -use yama::operations::legacy_pushpull::{determine_bypass_level, open_pile_with_work_bypass, push_to}; +use yama::operations::legacy_pushpull::{ + determine_bypass_level, open_pile_with_work_bypass, push_to, +}; use yama::operations::{checking, extracting}; use yama::pile::{Pile, PileDescriptor, RawPile}; use yama::{commands, debug}; diff --git a/yama/src/pile.rs b/yama/src/pile.rs index 11c7074..7dfeb45 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -126,7 +126,7 @@ pub enum ControllerMessage { }, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum PipelineDescription { Store, Remote,