Lay down the basic structure of push/pull offerer

This commit is contained in:
Olivier 'reivilibre' 2022-06-13 23:15:46 +01:00
parent 14fc925dbc
commit bb8fc355f0
5 changed files with 124 additions and 2 deletions

View File

@ -26,6 +26,7 @@ pub mod backup;
pub mod extract; pub mod extract;
pub mod ibrowse; pub mod ibrowse;
pub mod ilabel; pub mod ilabel;
pub mod pushpull;
pub mod report; pub mod report;
pub fn init_descriptor(path: &Path) -> anyhow::Result<()> { pub fn init_descriptor(path: &Path) -> anyhow::Result<()> {

View File

@ -0,0 +1,118 @@
// Push and Pull support for Datman
use anyhow::{ensure, Context};
use std::collections::{BTreeMap, BTreeSet};
use std::io::{Read, Write};
use yama::definitions::{ChunkId, PointerData};
use yama::pile::{Pile, PipelineDescription, RawPile};
use yama::remote::{read_message, write_message};
pub fn offer_pointers<W: Write, RP: RawPile>(
pile: &Pile<RP>,
writer: &mut W,
) -> anyhow::Result<BTreeMap<String, PointerData>> {
let mut pointers_to_offer: BTreeMap<String, PointerData> = BTreeMap::new();
for pointer_name in pile.list_pointers()? {
let pointer_data = pile
.read_pointer(&pointer_name)?
.context("Listed pointer not present")?;
pointers_to_offer.insert(pointer_name, pointer_data);
}
write_message(writer, &pointers_to_offer)?;
Ok(pointers_to_offer)
}
pub fn ensure_compatible_bypasses(
my_full: &Vec<PipelineDescription>,
my_bypass: &Vec<PipelineDescription>,
their_full: &Vec<PipelineDescription>,
their_bypass: &Vec<PipelineDescription>,
) -> anyhow::Result<()> {
ensure!(
my_full.starts_with(&my_bypass),
"Our full pipeline is not an extension of the bypass pipeline."
);
ensure!(
their_full.starts_with(&their_bypass),
"Their full pipeline is not an extension of their bypass pipeline."
);
let my_bypassed_parts = &my_full[my_bypass.len()..];
let their_bypassed_parts = &their_full[their_bypass.len()..];
ensure!(
my_bypassed_parts == their_bypassed_parts,
"Our bypassed parts and their bypassed parts are not the same.\nOurs: {:?}\nTheirs: {:?}",
my_bypassed_parts,
their_bypassed_parts
);
Ok(())
}
pub fn negotiate_bypassed_pile<R: Read, W: Write>(
pile: &Pile<Box<dyn RawPile>>,
bypass_pile: &Box<dyn RawPile>,
reader: &mut R,
writer: &mut W,
) -> anyhow::Result<()> {
let my_full_pipeline = pile.raw_pile.describe_pipeline()?;
let my_bypass_pipeline = bypass_pile.describe_pipeline()?;
write_message(writer, &my_full_pipeline)?;
write_message(writer, &my_bypass_pipeline)?;
let their_full_pipeline = read_message::<_, Vec<PipelineDescription>>(reader)?;
let their_bypass_pipeline = read_message::<_, Vec<PipelineDescription>>(reader)?;
ensure_compatible_bypasses(
&my_full_pipeline,
&my_bypass_pipeline,
&their_full_pipeline,
&their_bypass_pipeline,
)?;
Ok(())
}
pub fn offering_side<R: Read, W: Write>(
pile: &Pile<Box<dyn RawPile>>,
bypass_pile: Box<dyn RawPile>,
reader: &mut R,
writer: &mut W,
) -> anyhow::Result<()> {
// First 'negotiate' (for now: assert) a pile bypass.
// This lets us avoid decompressing things before recompressing them at the other end,
// assuming both ends use the same dictionary.
negotiate_bypassed_pile(pile, &bypass_pile, reader, writer)?;
let offered_pointers = offer_pointers(pile, writer)?;
let wanted_pointers = read_message::<_, Vec<String>>(reader)?;
for pointer in &wanted_pointers {
ensure!(
offered_pointers.contains_key(pointer),
"Requested pointer {:?} not offered",
pointer
);
}
let chunks_to_offer: BTreeSet<ChunkId> = todo!();
write_message(writer, &chunks_to_offer)?;
let chunks_to_skip: BTreeSet<ChunkId> = read_message(reader)?;
let chunks_to_send: Vec<ChunkId> = chunks_to_offer.difference(&chunks_to_skip).collect();
drop(chunks_to_offer);
drop(chunks_to_skip);
for chunk_id in chunks_to_send {
todo!();
}
Ok(())
}

View File

@ -96,6 +96,7 @@ impl ProgressTracker for ProgressSender {
} }
} }
// TODO use io-streams crate and get rid of the duplication!!
pub fn chunking_stdio() -> anyhow::Result<PartialPointerData> { pub fn chunking_stdio() -> anyhow::Result<PartialPointerData> {
let (path, tree_node) = { let (path, tree_node) = {
let stdin = stdin(); let stdin = stdin();

View File

@ -26,7 +26,9 @@ use std::sync::Arc;
use yama::commands::{fully_integrate_pointer_node, load_pile_descriptor, open_pile}; use yama::commands::{fully_integrate_pointer_node, load_pile_descriptor, open_pile};
use yama::debug::{debug_command, DebugCommand}; use yama::debug::{debug_command, DebugCommand};
use yama::operations::checking::VacuumMode; use yama::operations::checking::VacuumMode;
use yama::operations::legacy_pushpull::{determine_bypass_level, open_pile_with_work_bypass, push_to}; use yama::operations::legacy_pushpull::{
determine_bypass_level, open_pile_with_work_bypass, push_to,
};
use yama::operations::{checking, extracting}; use yama::operations::{checking, extracting};
use yama::pile::{Pile, PileDescriptor, RawPile}; use yama::pile::{Pile, PileDescriptor, RawPile};
use yama::{commands, debug}; use yama::{commands, debug};

View File

@ -126,7 +126,7 @@ pub enum ControllerMessage {
}, },
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum PipelineDescription { pub enum PipelineDescription {
Store, Store,
Remote, Remote,