diff --git a/datman/src/commands/pushpull.rs b/datman/src/commands/pushpull.rs index 4477d75..9177b87 100644 --- a/datman/src/commands/pushpull.rs +++ b/datman/src/commands/pushpull.rs @@ -1,9 +1,11 @@ // Push and Pull support for Datman use anyhow::{bail, ensure, Context}; +use log::info; use std::collections::{BTreeMap, BTreeSet}; use std::io::{Read, Write}; use std::sync::Arc; +use std::time::Instant; use yama::chunking::RecursiveUnchunker; use yama::commands::retrieve_tree_node; use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode}; @@ -183,9 +185,25 @@ pub fn offering_side( drop(chunks_to_offer); drop(chunks_to_skip); - progress.set_max_size(chunks_to_send.len() as u64); + let start_sort_by_hints = Instant::now(); + let chunks_to_send_with_hints: BTreeSet<(u64, ChunkId)> = chunks_to_send + .into_iter() + .map(|chunk_id| { + pile.raw_pile + .chunk_id_transfer_ordering_hint(&chunk_id) + .map(|hint| (hint, chunk_id)) + }) + .collect::>()?; + let time_to_sort_by_hints = Instant::now() - start_sort_by_hints; + info!( + "{} s to sort {} chunks by their hints", + time_to_sort_by_hints.as_secs_f32(), + chunks_to_send_with_hints.len() + ); + + progress.set_max_size(chunks_to_send_with_hints.len() as u64); progress.set_current(0); - for chunk_id in chunks_to_send { + for (_hint, chunk_id) in chunks_to_send_with_hints { let chunk_data = bypass_pile .read(Keyspace::Chunk, &chunk_id)? .context("Chunk vanished")?; diff --git a/yama/src/operations/checking.rs b/yama/src/operations/checking.rs index d29bb28..a218ce8 100644 --- a/yama/src/operations/checking.rs +++ b/yama/src/operations/checking.rs @@ -137,6 +137,10 @@ impl RawPile for VacuumRawPile { fn describe_pipeline(&self) -> anyhow::Result> { self.underlying.describe_pipeline() } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + self.underlying.chunk_id_transfer_ordering_hint(chunk_id) + } } /// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time. diff --git a/yama/src/pile.rs b/yama/src/pile.rs index 7dfeb45..caaa673 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -167,6 +167,10 @@ pub trait RawPile: Send + Sync + Debug + 'static { ) -> anyhow::Result)>>; fn describe_pipeline(&self) -> anyhow::Result>; + + /// Return a u64 order token that indicates the optimum order to read this chunk in + /// compared to other chunks. + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result; } impl RawPile for Box { @@ -210,6 +214,10 @@ impl RawPile for Box { fn describe_pipeline(&self) -> anyhow::Result> { self.as_ref().describe_pipeline() } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + self.as_ref().chunk_id_transfer_ordering_hint(chunk_id) + } } impl RawPile for Arc { @@ -253,6 +261,10 @@ impl RawPile for Arc { fn describe_pipeline(&self) -> anyhow::Result> { self.as_ref().describe_pipeline() } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + self.as_ref().chunk_id_transfer_ordering_hint(chunk_id) + } } #[derive(Debug)] diff --git a/yama/src/pile/access_guard.rs b/yama/src/pile/access_guard.rs index 3c22adf..63720af 100644 --- a/yama/src/pile/access_guard.rs +++ b/yama/src/pile/access_guard.rs @@ -130,4 +130,8 @@ impl RawPile for PileGuard { // TODO(question) Should we be described in the pipeline? self.underlying.describe_pipeline() } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + self.underlying.chunk_id_transfer_ordering_hint(chunk_id) + } } diff --git a/yama/src/pile/compression.rs b/yama/src/pile/compression.rs index 3962c1b..e79b67b 100644 --- a/yama/src/pile/compression.rs +++ b/yama/src/pile/compression.rs @@ -348,4 +348,8 @@ impl RawPile for RawPileCompressor { }); Ok(underlying) } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + self.underlying.chunk_id_transfer_ordering_hint(chunk_id) + } } diff --git a/yama/src/pile/encryption.rs b/yama/src/pile/encryption.rs index 167833a..5a0e548 100644 --- a/yama/src/pile/encryption.rs +++ b/yama/src/pile/encryption.rs @@ -127,4 +127,8 @@ impl RawPile for RawPileEncryptor { underlying.push(PipelineDescription::Encryption); Ok(underlying) } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + self.underlying.chunk_id_transfer_ordering_hint(chunk_id) + } } diff --git a/yama/src/pile/integrity.rs b/yama/src/pile/integrity.rs index e4b88b5..4326580 100644 --- a/yama/src/pile/integrity.rs +++ b/yama/src/pile/integrity.rs @@ -149,4 +149,8 @@ impl RawPile for RawPileIntegrityChecker { underlying.push(PipelineDescription::Integrity); Ok(underlying) } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + self.underlying.chunk_id_transfer_ordering_hint(chunk_id) + } } diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index 934f793..33e9e8f 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -754,6 +754,16 @@ impl RawPile for SqliteBloblogPile { fn describe_pipeline(&self) -> anyhow::Result> { Ok(vec![PipelineDescription::Store]) } + + fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result { + let chunk_pointer = self + .get_chunk_pointer(chunk_id)? + .context("Can't get chunk ID transfer ordering hint for chunk without pointer.")?; + + // Scheme: 24-bit bloblog ID + // followed by 40-bit offset + Ok(((chunk_pointer.bloblog as u64) << 40) | (chunk_pointer.offset & 0xFF_FF_FF_FF_FF)) + } } struct KeyIterator { diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 14d3e63..8d07985 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::thread::JoinHandle; -use anyhow::anyhow; +use anyhow::{anyhow, bail}; use crossbeam_channel::{Receiver, Sender}; use log::{error, info}; @@ -438,6 +438,10 @@ impl RawPile for Requester { other => Err(anyhow!("Received {:?} for Describe", other)), } } + + fn chunk_id_transfer_ordering_hint(&self, _chunk_id: &ChunkId) -> anyhow::Result { + bail!("You probably shouldn't be using chunk ID transfer ordering hints with a remote."); + } } pub struct ListKeyIterator {