From c83e2be66dcfceec813a25be3fe5df9e8411ddd4 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 14 Jun 2022 08:53:47 +0100 Subject: [PATCH] Flesh out both sides --- datman/src/commands/pushpull.rs | 80 ++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 5 deletions(-) diff --git a/datman/src/commands/pushpull.rs b/datman/src/commands/pushpull.rs index 8624a4d..9e56ac5 100644 --- a/datman/src/commands/pushpull.rs +++ b/datman/src/commands/pushpull.rs @@ -1,10 +1,10 @@ // Push and Pull support for Datman -use anyhow::{ensure, Context}; +use anyhow::{bail, ensure, Context}; use std::collections::{BTreeMap, BTreeSet}; use std::io::{Read, Write}; use yama::chunking::RecursiveUnchunker; -use yama::commands::{fully_load_pointer, retrieve_tree_node}; +use yama::commands::retrieve_tree_node; use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode}; use yama::pile::{Keyspace, Pile, PipelineDescription, RawPile}; use yama::progress::ProgressTracker; @@ -107,7 +107,7 @@ fn collect_chunk_ids_from_chunkref( collection: &mut BTreeSet, ) -> anyhow::Result<()> { if chunk_ref.depth == 0 { - collection.insert(condense_chunk_id(chunk_ref.chunk_id)); + collection.insert(chunk_ref.chunk_id); } else { let shallower_chunk_ref = RecursiveChunkRef { chunk_id: chunk_ref.chunk_id, @@ -122,7 +122,7 @@ fn collect_chunk_ids_from_chunkref( } else if read < next_chunk_id.len() { unchunker.read_exact(&mut next_chunk_id[read..])?; } - collection.insert(condense_chunk_id(next_chunk_id)); + collection.insert(next_chunk_id); } } Ok(()) @@ -141,7 +141,7 @@ pub fn offering_side( negotiate_bypassed_pile(pile, &bypass_pile, reader, writer)?; let offered_pointers = offer_pointers(pile, writer)?; - let wanted_pointers = read_message::<_, Vec>(reader)?; + let wanted_pointers = read_message::<_, BTreeSet>(reader)?; let mut chunks_to_offer: BTreeSet = BTreeSet::new(); @@ -180,3 +180,73 @@ pub fn offering_side( Ok(()) } + +pub fn accepting_side( + pile: &Pile>, + bypass_pile: Box, + reader: &mut R, + writer: &mut W, + mut progress: Box, +) -> anyhow::Result<()> { + // First 'negotiate' (for now: assert) a pile bypass. + // This lets us avoid decompressing things before recompressing them at the other end, + // assuming both ends use the same dictionary. + negotiate_bypassed_pile(pile, &bypass_pile, reader, writer)?; + + let offered_pointers: BTreeMap = read_message(reader)?; + let mut wanted_pointers: BTreeSet = BTreeSet::new(); + + for (pointer_name, pointer_data) in &offered_pointers { + if pile.read_pointer(pointer_name)?.is_none() { + wanted_pointers.insert(pointer_name.clone()); + if let Some(parent) = &pointer_data.parent_pointer { + if pile.read_pointer(parent)?.is_none() && !offered_pointers.contains_key(parent) { + bail!("Offered pointer {:?} requires parent {:?} which we don't have and isn't offered.", pointer_name, parent); + } + } + } + } + + write_message(writer, &wanted_pointers)?; + + let offered_chunks: BTreeSet = read_message(reader)?; + let mut chunks_to_skip: BTreeSet = BTreeSet::new(); + for chunk_id in &offered_chunks { + if pile.chunk_exists(chunk_id)? { + chunks_to_skip.insert(*chunk_id); + } + } + + write_message(writer, &chunks_to_skip)?; + + let num_chunks_to_recv = offered_chunks.len() - chunks_to_skip.len(); + + let mut chunks_to_recv: BTreeSet = offered_chunks + .difference(&chunks_to_skip) + .cloned() + .collect(); + + drop(offered_chunks); + drop(chunks_to_skip); + + progress.set_max_size(num_chunks_to_recv as u64); + progress.set_current(0); + + while let Some((chunk_id, chunk_data)) = read_message::<_, Option<(ChunkId, Vec)>>(reader)? + { + ensure!( + chunks_to_recv.remove(&chunk_id), + "Received unexpected chunk" + ); + bypass_pile.write(Keyspace::Chunk, &chunk_id, &chunk_data)?; + progress.inc_progress(1); + } + + ensure!(chunks_to_recv.is_empty(), "Unreceived chunks."); + + for (pointer_name, pointer_data) in &offered_pointers { + pile.write_pointer(pointer_name, pointer_data)?; + } + + Ok(()) +}