Finish off the basic offering side implementation

This commit is contained in:
Olivier 'reivilibre' 2022-06-13 23:27:43 +01:00
parent bb8fc355f0
commit a24778209e
1 changed files with 76 additions and 12 deletions

View File

@ -3,8 +3,11 @@
use anyhow::{ensure, Context}; use anyhow::{ensure, Context};
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::io::{Read, Write}; use std::io::{Read, Write};
use yama::definitions::{ChunkId, PointerData}; use yama::chunking::RecursiveUnchunker;
use yama::pile::{Pile, PipelineDescription, RawPile}; use yama::commands::{fully_load_pointer, retrieve_tree_node};
use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode};
use yama::pile::{Keyspace, Pile, PipelineDescription, RawPile};
use yama::progress::ProgressTracker;
use yama::remote::{read_message, write_message}; use yama::remote::{read_message, write_message};
pub fn offer_pointers<W: Write, RP: RawPile>( pub fn offer_pointers<W: Write, RP: RawPile>(
@ -78,11 +81,59 @@ pub fn negotiate_bypassed_pile<R: Read, W: Write>(
Ok(()) Ok(())
} }
fn collect_chunk_ids(
pile: &Pile<Box<dyn RawPile>>,
root: &TreeNode,
chunk_ids: &mut BTreeSet<ChunkId>,
) -> anyhow::Result<()> {
root.visit(
&mut |tree_node, _| {
match tree_node {
TreeNode::NormalFile { content, .. } => {
collect_chunk_ids_from_chunkref(pile, content, chunk_ids)?;
}
_ => {}
}
Ok(())
},
"".to_owned(),
)?;
Ok(())
}
fn collect_chunk_ids_from_chunkref(
pile: &Pile<Box<dyn RawPile>>,
chunk_ref: &RecursiveChunkRef,
collection: &mut BTreeSet<ChunkId>,
) -> anyhow::Result<()> {
if chunk_ref.depth == 0 {
collection.insert(condense_chunk_id(chunk_ref.chunk_id));
} else {
let shallower_chunk_ref = RecursiveChunkRef {
chunk_id: chunk_ref.chunk_id,
depth: chunk_ref.depth - 1,
};
let mut unchunker = RecursiveUnchunker::new(pile, shallower_chunk_ref);
let mut next_chunk_id: ChunkId = Default::default();
loop {
let read = unchunker.read(&mut next_chunk_id[..])?;
if read == 0 {
break;
} else if read < next_chunk_id.len() {
unchunker.read_exact(&mut next_chunk_id[read..])?;
}
collection.insert(condense_chunk_id(next_chunk_id));
}
}
Ok(())
}
pub fn offering_side<R: Read, W: Write>( pub fn offering_side<R: Read, W: Write>(
pile: &Pile<Box<dyn RawPile>>, pile: &Pile<Box<dyn RawPile>>,
bypass_pile: Box<dyn RawPile>, bypass_pile: Box<dyn RawPile>,
reader: &mut R, reader: &mut R,
writer: &mut W, writer: &mut W,
mut progress: Box<dyn ProgressTracker>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// First 'negotiate' (for now: assert) a pile bypass. // First 'negotiate' (for now: assert) a pile bypass.
// This lets us avoid decompressing things before recompressing them at the other end, // This lets us avoid decompressing things before recompressing them at the other end,
@ -92,26 +143,39 @@ pub fn offering_side<R: Read, W: Write>(
let offered_pointers = offer_pointers(pile, writer)?; let offered_pointers = offer_pointers(pile, writer)?;
let wanted_pointers = read_message::<_, Vec<String>>(reader)?; let wanted_pointers = read_message::<_, Vec<String>>(reader)?;
for pointer in &wanted_pointers { let mut chunks_to_offer: BTreeSet<ChunkId> = BTreeSet::new();
ensure!(
offered_pointers.contains_key(pointer),
"Requested pointer {:?} not offered",
pointer
);
}
let chunks_to_offer: BTreeSet<ChunkId> = todo!(); for pointer_name in &wanted_pointers {
let pointer_data = offered_pointers
.get(pointer_name)
.with_context(|| format!("Requested pointer {:?} was not offered", pointer_name))?;
collect_chunk_ids_from_chunkref(pile, &pointer_data.chunk_ref, &mut chunks_to_offer)?;
let root_node = retrieve_tree_node(pile, pointer_data.chunk_ref.clone())?;
collect_chunk_ids(pile, &root_node.node, &mut chunks_to_offer)?;
}
write_message(writer, &chunks_to_offer)?; write_message(writer, &chunks_to_offer)?;
let chunks_to_skip: BTreeSet<ChunkId> = read_message(reader)?; let chunks_to_skip: BTreeSet<ChunkId> = read_message(reader)?;
let chunks_to_send: Vec<ChunkId> = chunks_to_offer.difference(&chunks_to_skip).collect(); let chunks_to_send: Vec<ChunkId> = chunks_to_offer
.difference(&chunks_to_skip)
.cloned()
.collect();
drop(chunks_to_offer); drop(chunks_to_offer);
drop(chunks_to_skip); drop(chunks_to_skip);
progress.set_max_size(chunks_to_send.len() as u64);
progress.set_current(0);
for chunk_id in chunks_to_send { for chunk_id in chunks_to_send {
todo!(); let chunk_data = bypass_pile
.read(Keyspace::Chunk, &chunk_id)?
.context("Chunk vanished")?;
write_message(writer, &Some((chunk_id, chunk_data)))?;
progress.inc_progress(1);
} }
Ok(()) Ok(())