Sort chunk IDs by hint to make pull more efficient
This commit is contained in:
parent
ee9ca73224
commit
4aa1948350
|
@ -1,9 +1,11 @@
|
||||||
// Push and Pull support for Datman
|
// Push and Pull support for Datman
|
||||||
|
|
||||||
use anyhow::{bail, ensure, Context};
|
use anyhow::{bail, ensure, Context};
|
||||||
|
use log::info;
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use yama::chunking::RecursiveUnchunker;
|
use yama::chunking::RecursiveUnchunker;
|
||||||
use yama::commands::retrieve_tree_node;
|
use yama::commands::retrieve_tree_node;
|
||||||
use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode};
|
use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode};
|
||||||
|
@ -183,9 +185,25 @@ pub fn offering_side<R: Read, W: Write>(
|
||||||
drop(chunks_to_offer);
|
drop(chunks_to_offer);
|
||||||
drop(chunks_to_skip);
|
drop(chunks_to_skip);
|
||||||
|
|
||||||
progress.set_max_size(chunks_to_send.len() as u64);
|
let start_sort_by_hints = Instant::now();
|
||||||
|
let chunks_to_send_with_hints: BTreeSet<(u64, ChunkId)> = chunks_to_send
|
||||||
|
.into_iter()
|
||||||
|
.map(|chunk_id| {
|
||||||
|
pile.raw_pile
|
||||||
|
.chunk_id_transfer_ordering_hint(&chunk_id)
|
||||||
|
.map(|hint| (hint, chunk_id))
|
||||||
|
})
|
||||||
|
.collect::<anyhow::Result<_>>()?;
|
||||||
|
let time_to_sort_by_hints = Instant::now() - start_sort_by_hints;
|
||||||
|
info!(
|
||||||
|
"{} s to sort {} chunks by their hints",
|
||||||
|
time_to_sort_by_hints.as_secs_f32(),
|
||||||
|
chunks_to_send_with_hints.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
progress.set_max_size(chunks_to_send_with_hints.len() as u64);
|
||||||
progress.set_current(0);
|
progress.set_current(0);
|
||||||
for chunk_id in chunks_to_send {
|
for (_hint, chunk_id) in chunks_to_send_with_hints {
|
||||||
let chunk_data = bypass_pile
|
let chunk_data = bypass_pile
|
||||||
.read(Keyspace::Chunk, &chunk_id)?
|
.read(Keyspace::Chunk, &chunk_id)?
|
||||||
.context("Chunk vanished")?;
|
.context("Chunk vanished")?;
|
||||||
|
|
|
@ -137,6 +137,10 @@ impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
|
||||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||||
self.underlying.describe_pipeline()
|
self.underlying.describe_pipeline()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time.
|
/// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time.
|
||||||
|
|
|
@ -167,6 +167,10 @@ pub trait RawPile: Send + Sync + Debug + 'static {
|
||||||
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>>;
|
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>>;
|
||||||
|
|
||||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>>;
|
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>>;
|
||||||
|
|
||||||
|
/// Return a u64 order token that indicates the optimum order to read this chunk in
|
||||||
|
/// compared to other chunks.
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RawPile for Box<dyn RawPile> {
|
impl RawPile for Box<dyn RawPile> {
|
||||||
|
@ -210,6 +214,10 @@ impl RawPile for Box<dyn RawPile> {
|
||||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||||
self.as_ref().describe_pipeline()
|
self.as_ref().describe_pipeline()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
self.as_ref().chunk_id_transfer_ordering_hint(chunk_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<RP: RawPile> RawPile for Arc<RP> {
|
impl<RP: RawPile> RawPile for Arc<RP> {
|
||||||
|
@ -253,6 +261,10 @@ impl<RP: RawPile> RawPile for Arc<RP> {
|
||||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||||
self.as_ref().describe_pipeline()
|
self.as_ref().describe_pipeline()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
self.as_ref().chunk_id_transfer_ordering_hint(chunk_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
|
@ -130,4 +130,8 @@ impl<R: Clone + RawPile> RawPile for PileGuard<R> {
|
||||||
// TODO(question) Should we be described in the pipeline?
|
// TODO(question) Should we be described in the pipeline?
|
||||||
self.underlying.describe_pipeline()
|
self.underlying.describe_pipeline()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -348,4 +348,8 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
|
||||||
});
|
});
|
||||||
Ok(underlying)
|
Ok(underlying)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,4 +127,8 @@ impl<R: RawPile> RawPile for RawPileEncryptor<R> {
|
||||||
underlying.push(PipelineDescription::Encryption);
|
underlying.push(PipelineDescription::Encryption);
|
||||||
Ok(underlying)
|
Ok(underlying)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,4 +149,8 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
|
||||||
underlying.push(PipelineDescription::Integrity);
|
underlying.push(PipelineDescription::Integrity);
|
||||||
Ok(underlying)
|
Ok(underlying)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -754,6 +754,16 @@ impl RawPile for SqliteBloblogPile {
|
||||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||||
Ok(vec![PipelineDescription::Store])
|
Ok(vec![PipelineDescription::Store])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
let chunk_pointer = self
|
||||||
|
.get_chunk_pointer(chunk_id)?
|
||||||
|
.context("Can't get chunk ID transfer ordering hint for chunk without pointer.")?;
|
||||||
|
|
||||||
|
// Scheme: 24-bit bloblog ID
|
||||||
|
// followed by 40-bit offset
|
||||||
|
Ok(((chunk_pointer.bloblog as u64) << 40) | (chunk_pointer.offset & 0xFF_FF_FF_FF_FF))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct KeyIterator {
|
struct KeyIterator {
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
|
|
||||||
use anyhow::anyhow;
|
use anyhow::{anyhow, bail};
|
||||||
use crossbeam_channel::{Receiver, Sender};
|
use crossbeam_channel::{Receiver, Sender};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
|
||||||
|
@ -438,6 +438,10 @@ impl RawPile for Requester {
|
||||||
other => Err(anyhow!("Received {:?} for Describe", other)),
|
other => Err(anyhow!("Received {:?} for Describe", other)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn chunk_id_transfer_ordering_hint(&self, _chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||||
|
bail!("You probably shouldn't be using chunk ID transfer ordering hints with a remote.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ListKeyIterator {
|
pub struct ListKeyIterator {
|
||||||
|
|
Loading…
Reference in New Issue