Flesh out both sides

This commit is contained in:
Olivier 'reivilibre' 2022-06-14 08:53:47 +01:00
parent a24778209e
commit c83e2be66d
1 changed files with 75 additions and 5 deletions

View File

@ -1,10 +1,10 @@
// Push and Pull support for Datman // Push and Pull support for Datman
use anyhow::{ensure, Context}; use anyhow::{bail, ensure, Context};
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::io::{Read, Write}; use std::io::{Read, Write};
use yama::chunking::RecursiveUnchunker; use yama::chunking::RecursiveUnchunker;
use yama::commands::{fully_load_pointer, retrieve_tree_node}; use yama::commands::retrieve_tree_node;
use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode}; use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode};
use yama::pile::{Keyspace, Pile, PipelineDescription, RawPile}; use yama::pile::{Keyspace, Pile, PipelineDescription, RawPile};
use yama::progress::ProgressTracker; use yama::progress::ProgressTracker;
@ -107,7 +107,7 @@ fn collect_chunk_ids_from_chunkref(
collection: &mut BTreeSet<ChunkId>, collection: &mut BTreeSet<ChunkId>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if chunk_ref.depth == 0 { if chunk_ref.depth == 0 {
collection.insert(condense_chunk_id(chunk_ref.chunk_id)); collection.insert(chunk_ref.chunk_id);
} else { } else {
let shallower_chunk_ref = RecursiveChunkRef { let shallower_chunk_ref = RecursiveChunkRef {
chunk_id: chunk_ref.chunk_id, chunk_id: chunk_ref.chunk_id,
@ -122,7 +122,7 @@ fn collect_chunk_ids_from_chunkref(
} else if read < next_chunk_id.len() { } else if read < next_chunk_id.len() {
unchunker.read_exact(&mut next_chunk_id[read..])?; unchunker.read_exact(&mut next_chunk_id[read..])?;
} }
collection.insert(condense_chunk_id(next_chunk_id)); collection.insert(next_chunk_id);
} }
} }
Ok(()) Ok(())
@ -141,7 +141,7 @@ pub fn offering_side<R: Read, W: Write>(
negotiate_bypassed_pile(pile, &bypass_pile, reader, writer)?; negotiate_bypassed_pile(pile, &bypass_pile, reader, writer)?;
let offered_pointers = offer_pointers(pile, writer)?; let offered_pointers = offer_pointers(pile, writer)?;
let wanted_pointers = read_message::<_, Vec<String>>(reader)?; let wanted_pointers = read_message::<_, BTreeSet<String>>(reader)?;
let mut chunks_to_offer: BTreeSet<ChunkId> = BTreeSet::new(); let mut chunks_to_offer: BTreeSet<ChunkId> = BTreeSet::new();
@ -180,3 +180,73 @@ pub fn offering_side<R: Read, W: Write>(
Ok(()) Ok(())
} }
pub fn accepting_side<R: Read, W: Write>(
pile: &Pile<Box<dyn RawPile>>,
bypass_pile: Box<dyn RawPile>,
reader: &mut R,
writer: &mut W,
mut progress: Box<dyn ProgressTracker>,
) -> anyhow::Result<()> {
// First 'negotiate' (for now: assert) a pile bypass.
// This lets us avoid decompressing things before recompressing them at the other end,
// assuming both ends use the same dictionary.
negotiate_bypassed_pile(pile, &bypass_pile, reader, writer)?;
let offered_pointers: BTreeMap<String, PointerData> = read_message(reader)?;
let mut wanted_pointers: BTreeSet<String> = BTreeSet::new();
for (pointer_name, pointer_data) in &offered_pointers {
if pile.read_pointer(pointer_name)?.is_none() {
wanted_pointers.insert(pointer_name.clone());
if let Some(parent) = &pointer_data.parent_pointer {
if pile.read_pointer(parent)?.is_none() && !offered_pointers.contains_key(parent) {
bail!("Offered pointer {:?} requires parent {:?} which we don't have and isn't offered.", pointer_name, parent);
}
}
}
}
write_message(writer, &wanted_pointers)?;
let offered_chunks: BTreeSet<ChunkId> = read_message(reader)?;
let mut chunks_to_skip: BTreeSet<ChunkId> = BTreeSet::new();
for chunk_id in &offered_chunks {
if pile.chunk_exists(chunk_id)? {
chunks_to_skip.insert(*chunk_id);
}
}
write_message(writer, &chunks_to_skip)?;
let num_chunks_to_recv = offered_chunks.len() - chunks_to_skip.len();
let mut chunks_to_recv: BTreeSet<ChunkId> = offered_chunks
.difference(&chunks_to_skip)
.cloned()
.collect();
drop(offered_chunks);
drop(chunks_to_skip);
progress.set_max_size(num_chunks_to_recv as u64);
progress.set_current(0);
while let Some((chunk_id, chunk_data)) = read_message::<_, Option<(ChunkId, Vec<u8>)>>(reader)?
{
ensure!(
chunks_to_recv.remove(&chunk_id),
"Received unexpected chunk"
);
bypass_pile.write(Keyspace::Chunk, &chunk_id, &chunk_data)?;
progress.inc_progress(1);
}
ensure!(chunks_to_recv.is_empty(), "Unreceived chunks.");
for (pointer_name, pointer_data) in &offered_pointers {
pile.write_pointer(pointer_name, pointer_data)?;
}
Ok(())
}