diff --git a/datman/src/commands/pushpull.rs b/datman/src/commands/pushpull.rs index 5a10811..8624a4d 100644 --- a/datman/src/commands/pushpull.rs +++ b/datman/src/commands/pushpull.rs @@ -3,8 +3,11 @@ use anyhow::{ensure, Context}; use std::collections::{BTreeMap, BTreeSet}; use std::io::{Read, Write}; -use yama::definitions::{ChunkId, PointerData}; -use yama::pile::{Pile, PipelineDescription, RawPile}; +use yama::chunking::RecursiveUnchunker; +use yama::commands::{fully_load_pointer, retrieve_tree_node}; +use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode}; +use yama::pile::{Keyspace, Pile, PipelineDescription, RawPile}; +use yama::progress::ProgressTracker; use yama::remote::{read_message, write_message}; pub fn offer_pointers( @@ -78,11 +81,59 @@ pub fn negotiate_bypassed_pile( Ok(()) } +fn collect_chunk_ids( + pile: &Pile>, + root: &TreeNode, + chunk_ids: &mut BTreeSet, +) -> anyhow::Result<()> { + root.visit( + &mut |tree_node, _| { + match tree_node { + TreeNode::NormalFile { content, .. } => { + collect_chunk_ids_from_chunkref(pile, content, chunk_ids)?; + } + _ => {} + } + Ok(()) + }, + "".to_owned(), + )?; + Ok(()) +} + +fn collect_chunk_ids_from_chunkref( + pile: &Pile>, + chunk_ref: &RecursiveChunkRef, + collection: &mut BTreeSet, +) -> anyhow::Result<()> { + if chunk_ref.depth == 0 { + collection.insert(condense_chunk_id(chunk_ref.chunk_id)); + } else { + let shallower_chunk_ref = RecursiveChunkRef { + chunk_id: chunk_ref.chunk_id, + depth: chunk_ref.depth - 1, + }; + let mut unchunker = RecursiveUnchunker::new(pile, shallower_chunk_ref); + let mut next_chunk_id: ChunkId = Default::default(); + loop { + let read = unchunker.read(&mut next_chunk_id[..])?; + if read == 0 { + break; + } else if read < next_chunk_id.len() { + unchunker.read_exact(&mut next_chunk_id[read..])?; + } + collection.insert(condense_chunk_id(next_chunk_id)); + } + } + Ok(()) +} + pub fn offering_side( pile: &Pile>, bypass_pile: Box, reader: &mut R, writer: &mut W, + mut progress: Box, ) -> anyhow::Result<()> { // First 'negotiate' (for now: assert) a pile bypass. // This lets us avoid decompressing things before recompressing them at the other end, @@ -92,26 +143,39 @@ pub fn offering_side( let offered_pointers = offer_pointers(pile, writer)?; let wanted_pointers = read_message::<_, Vec>(reader)?; - for pointer in &wanted_pointers { - ensure!( - offered_pointers.contains_key(pointer), - "Requested pointer {:?} not offered", - pointer - ); - } + let mut chunks_to_offer: BTreeSet = BTreeSet::new(); - let chunks_to_offer: BTreeSet = todo!(); + for pointer_name in &wanted_pointers { + let pointer_data = offered_pointers + .get(pointer_name) + .with_context(|| format!("Requested pointer {:?} was not offered", pointer_name))?; + + collect_chunk_ids_from_chunkref(pile, &pointer_data.chunk_ref, &mut chunks_to_offer)?; + + let root_node = retrieve_tree_node(pile, pointer_data.chunk_ref.clone())?; + collect_chunk_ids(pile, &root_node.node, &mut chunks_to_offer)?; + } write_message(writer, &chunks_to_offer)?; let chunks_to_skip: BTreeSet = read_message(reader)?; - let chunks_to_send: Vec = chunks_to_offer.difference(&chunks_to_skip).collect(); + let chunks_to_send: Vec = chunks_to_offer + .difference(&chunks_to_skip) + .cloned() + .collect(); drop(chunks_to_offer); drop(chunks_to_skip); + progress.set_max_size(chunks_to_send.len() as u64); + progress.set_current(0); for chunk_id in chunks_to_send { - todo!(); + let chunk_data = bypass_pile + .read(Keyspace::Chunk, &chunk_id)? + .context("Chunk vanished")?; + + write_message(writer, &Some((chunk_id, chunk_data)))?; + progress.inc_progress(1); } Ok(())