Fix bug when using gradual scans
This commit is contained in:
parent
2e08b2df47
commit
22beea0c50
@ -7,7 +7,6 @@ use dashmap::DashSet;
|
||||
use eyre::{bail, ensure, eyre, Context, ContextCompat};
|
||||
use indicatif::ProgressStyle;
|
||||
use patricia_tree::PatriciaMap;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
@ -55,7 +54,7 @@ pub async fn backup(
|
||||
let now = Utc::now();
|
||||
|
||||
// (dirtrees) Scan
|
||||
let dir_sources = scan_dir_sources(&sources_to_backup, parents_to_use, now)
|
||||
let dir_sources = scan_dir_sources(&sources_to_backup, parents_to_use, now, options)
|
||||
.await
|
||||
.context("failed to scan directory sources")?;
|
||||
|
||||
@ -67,10 +66,9 @@ pub async fn backup(
|
||||
let pwc = pwc.clone();
|
||||
|
||||
let bds_span = info_span!("storing");
|
||||
let options = options.clone();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
backup_dir_sources(dir_sources, pwc, new_unflushed_chunks, &options)
|
||||
backup_dir_sources(dir_sources, pwc, new_unflushed_chunks)
|
||||
.await
|
||||
.context("whilst backing up dir sources")
|
||||
}
|
||||
@ -125,9 +123,12 @@ pub async fn backup(
|
||||
|
||||
for (dir_source_prep, chunk_file_map) in dir_sources_and_chunkmaps.dir_source_returns {
|
||||
// Assemble and write a pointer
|
||||
let mut tree =
|
||||
assemble_tree_from_scan_entries(dir_source_prep.scan_entry_map, chunk_file_map)
|
||||
.context("failed to assemble tree")?;
|
||||
let mut tree = assemble_tree_from_scan_entries(
|
||||
dir_source_prep.new_scan_entry_map,
|
||||
dir_source_prep.reused_scan_entry_map,
|
||||
chunk_file_map,
|
||||
)
|
||||
.context("failed to assemble tree")?;
|
||||
let (uids, gids) =
|
||||
create_uidgid_lookup_tables(&tree).context("failed to create uid/gid tables")?;
|
||||
|
||||
@ -217,17 +218,22 @@ async fn find_suitable_parent_pointers(
|
||||
}
|
||||
|
||||
struct DirSourcePrep {
|
||||
scan_entry_map: PatriciaMap<ScanEntry>,
|
||||
/// New entries only.
|
||||
new_scan_entry_map: PatriciaMap<ScanEntry>,
|
||||
/// Files: Reused entries only. Directories: can be partially changed but there's no chunking to be done.
|
||||
reused_scan_entry_map: PatriciaMap<ScanEntry>,
|
||||
parent_name: Option<String>,
|
||||
parent: Option<Pointer>,
|
||||
path: PathBuf,
|
||||
new_pointer_name: String,
|
||||
chunk_file_map: PatriciaMap<Option<(RecursiveChunkRef, u64)>>,
|
||||
}
|
||||
|
||||
async fn scan_dir_sources(
|
||||
sources_to_backup: &BTreeMap<String, SourceDescriptor>,
|
||||
mut parents: BTreeMap<String, (String, Pointer)>,
|
||||
now: DateTime<Utc>,
|
||||
options: &BackupOptions,
|
||||
) -> eyre::Result<Vec<DirSourcePrep>> {
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
@ -244,10 +250,40 @@ async fn scan_dir_sources(
|
||||
let ignore = ignore.to_owned();
|
||||
let (parent_name, parent) = parents.remove(source_name).unzip();
|
||||
let new_pointer_name = get_pointer_name_at(&source_name, now);
|
||||
let options = options.clone();
|
||||
joinset.spawn_blocking(move || -> eyre::Result<DirSourcePrep> {
|
||||
let scan_entry_map = scan::scan(&path, &ignore).context("Failed to scan")?;
|
||||
|
||||
// TODO This whole section is messy.
|
||||
// Maybe we should consider combining prepopulate_unmodified and limit_scan_entry_map_to_size
|
||||
// as the latter might benefit from being able to see what is in the parent pointer...
|
||||
|
||||
let (chunk_file_map, pruned_scan_entry_map, prepopulated_scan_entry_map) =
|
||||
if let Some(ref parent_node) = parent {
|
||||
let (cfm, prepopulated, pruned) =
|
||||
scan::prepopulate_unmodified(&parent_node.root.node, &scan_entry_map);
|
||||
|
||||
(cfm, pruned, prepopulated)
|
||||
} else {
|
||||
(
|
||||
PatriciaMap::<Option<(RecursiveChunkRef, u64)>>::new(),
|
||||
scan_entry_map,
|
||||
PatriciaMap::new(),
|
||||
)
|
||||
};
|
||||
|
||||
let pruned_scan_entry_map = match options.gradual {
|
||||
Some(gradual_size_limit) => limit_scan_entry_map_to_size(
|
||||
pruned_scan_entry_map,
|
||||
gradual_size_limit.as_u64(),
|
||||
),
|
||||
None => pruned_scan_entry_map,
|
||||
};
|
||||
|
||||
Ok(DirSourcePrep {
|
||||
scan_entry_map,
|
||||
chunk_file_map,
|
||||
new_scan_entry_map: pruned_scan_entry_map,
|
||||
reused_scan_entry_map: prepopulated_scan_entry_map,
|
||||
parent_name,
|
||||
parent,
|
||||
path,
|
||||
@ -271,38 +307,19 @@ struct BackupDirSourcesReturn {
|
||||
}
|
||||
|
||||
async fn backup_dir_sources(
|
||||
dir_sources: Vec<DirSourcePrep>,
|
||||
mut dir_sources: Vec<DirSourcePrep>,
|
||||
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
|
||||
new_unflushed_chunks: Arc<DashSet<ChunkId>>,
|
||||
options: &BackupOptions,
|
||||
) -> eyre::Result<BackupDirSourcesReturn> {
|
||||
let mut chunk_file_maps = Vec::new();
|
||||
let mut pruned_scan_entry_maps = Vec::new();
|
||||
|
||||
// First collect all that stuff together...
|
||||
for dir_source in &mut dir_sources {
|
||||
chunk_file_maps.push(std::mem::take(&mut dir_source.chunk_file_map));
|
||||
}
|
||||
for dir_source in &dir_sources {
|
||||
let (chunk_file_map, pruned_scan_entry_map) = if let Some(ref parent_node) =
|
||||
dir_source.parent
|
||||
{
|
||||
let (cfm, pruned) =
|
||||
scan::prepopulate_unmodified(&parent_node.root.node, &dir_source.scan_entry_map);
|
||||
|
||||
(cfm, Cow::Owned(pruned))
|
||||
} else {
|
||||
(
|
||||
PatriciaMap::<Option<(RecursiveChunkRef, u64)>>::new(),
|
||||
Cow::Borrowed(&dir_source.scan_entry_map),
|
||||
)
|
||||
};
|
||||
chunk_file_maps.push(chunk_file_map);
|
||||
|
||||
pruned_scan_entry_maps.push(match options.gradual {
|
||||
Some(gradual_size_limit) => Cow::Owned(limit_scan_entry_map_to_size(
|
||||
pruned_scan_entry_map.into_owned(),
|
||||
gradual_size_limit.as_u64(),
|
||||
)),
|
||||
None => pruned_scan_entry_map,
|
||||
});
|
||||
pruned_scan_entry_maps.push(&dir_source.new_scan_entry_map);
|
||||
}
|
||||
|
||||
let store_span = Span::current();
|
||||
|
||||
@ -19,7 +19,6 @@ use clap::{Parser, Subcommand};
|
||||
use eyre::{bail, eyre, Context, ContextCompat};
|
||||
use indicatif::ProgressStyle;
|
||||
use patricia_tree::PatriciaMap;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeSet;
|
||||
use std::iter::Iterator;
|
||||
use std::path::{Path, PathBuf};
|
||||
@ -433,16 +432,17 @@ async fn main() -> eyre::Result<()> {
|
||||
|
||||
let pwc = Arc::new(pwc);
|
||||
|
||||
let (mut chunk_file_map, pruned_scan_entry_map) =
|
||||
let (mut chunk_file_map, pruned_scan_entry_map, prepopulated_scan_entry_map) =
|
||||
if let Some(ref parent_node) = parent_pointer {
|
||||
let (cfm, pruned) =
|
||||
let (cfm, pruned, prepopulated) =
|
||||
scan::prepopulate_unmodified(&parent_node.root.node, &scan_entry_map);
|
||||
|
||||
(cfm, Cow::Owned(pruned))
|
||||
(cfm, pruned, prepopulated)
|
||||
} else {
|
||||
(
|
||||
PatriciaMap::<Option<(RecursiveChunkRef, u64)>>::new(),
|
||||
Cow::Borrowed(&scan_entry_map),
|
||||
scan_entry_map,
|
||||
PatriciaMap::new(),
|
||||
)
|
||||
};
|
||||
|
||||
@ -467,10 +467,11 @@ async fn main() -> eyre::Result<()> {
|
||||
StoragePipeline::launch_new(4, pwc.clone(), new_unflushed_chunks).await?;
|
||||
|
||||
let source2 = source.clone();
|
||||
let pruned_scan_entry_map2 = &pruned_scan_entry_map;
|
||||
let (submitter_task, receiver_task) = tokio::join!(
|
||||
async move {
|
||||
let pipeline_job_tx = pipeline_job_tx;
|
||||
for (name_bytes, scan_entry) in pruned_scan_entry_map.iter() {
|
||||
for (name_bytes, scan_entry) in pruned_scan_entry_map2.iter() {
|
||||
if let ScanEntry::NormalFile { .. } = scan_entry {
|
||||
let name = std::str::from_utf8(name_bytes.as_slice())
|
||||
.context("name is not str")?;
|
||||
@ -517,8 +518,12 @@ async fn main() -> eyre::Result<()> {
|
||||
info!("All indices stored, writing pointer...");
|
||||
|
||||
// Assemble and write a pointer
|
||||
let mut tree = assemble_tree_from_scan_entries(scan_entry_map, chunk_file_map)
|
||||
.context("failed to assemble tree")?;
|
||||
let mut tree = assemble_tree_from_scan_entries(
|
||||
pruned_scan_entry_map,
|
||||
prepopulated_scan_entry_map,
|
||||
chunk_file_map,
|
||||
)
|
||||
.context("failed to assemble tree")?;
|
||||
let (uids, gids) =
|
||||
create_uidgid_lookup_tables(&tree).context("failed to create uid/gid tables")?;
|
||||
|
||||
|
||||
@ -220,19 +220,23 @@ fn scan_one_no_recurse(path: &Path, metadata: Metadata) -> eyre::Result<Option<S
|
||||
|
||||
/// Given the parent pointer's root TreeNode and a scan entry map of the current pointer,
|
||||
/// return a chunkings map prepopulated with the reusable entries.
|
||||
/// Also returns a pruned copy of the scan entry map.
|
||||
/// Also returns a prepopulated and pruned copy of the scan entry map.
|
||||
pub fn prepopulate_unmodified(
|
||||
parent_tree: &TreeNode,
|
||||
scan_entry_map: &PatriciaMap<ScanEntry>,
|
||||
) -> (
|
||||
PatriciaMap<Option<(RecursiveChunkRef, u64)>>,
|
||||
PatriciaMap<ScanEntry>,
|
||||
PatriciaMap<ScanEntry>,
|
||||
) {
|
||||
let mut reusable_chunkings = PatriciaMap::new();
|
||||
let mut prepopulated_scan_entry_map = PatriciaMap::new();
|
||||
let mut pruned_scan_entry_map = scan_entry_map.clone();
|
||||
parent_tree
|
||||
.visit(
|
||||
&mut |tree_node, path| {
|
||||
// TODO We should consider prepopulating symlinks and empty dirs too, if they're
|
||||
// included in the parent.
|
||||
if let TreeNode::NormalFile {
|
||||
mtime: prev_mtime,
|
||||
ownership: prev_ownership,
|
||||
@ -255,7 +259,26 @@ pub fn prepopulate_unmodified(
|
||||
{
|
||||
// Nothing seems to have changed about this file, let's just reuse the `content` from last time.
|
||||
reusable_chunkings.insert(path, Some((*prev_content, *size)));
|
||||
pruned_scan_entry_map.remove(path);
|
||||
prepopulated_scan_entry_map.insert(
|
||||
path,
|
||||
pruned_scan_entry_map.remove(path).expect("checked removal"),
|
||||
);
|
||||
|
||||
// Pull out parent directories so our subset always contains the parents for their children.
|
||||
let mut path_fragment = path.as_bytes();
|
||||
while let Some((index, _)) = path_fragment
|
||||
.iter()
|
||||
.enumerate()
|
||||
.rev()
|
||||
.find(|(_idx, char_byte)| **char_byte == b'/')
|
||||
{
|
||||
path_fragment = &path_fragment[0..index];
|
||||
|
||||
if let Some(directory) = pruned_scan_entry_map.remove(path_fragment)
|
||||
{
|
||||
prepopulated_scan_entry_map.insert(path_fragment, directory);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -265,7 +288,11 @@ pub fn prepopulate_unmodified(
|
||||
String::new(),
|
||||
)
|
||||
.expect("no reason to fail");
|
||||
(reusable_chunkings, pruned_scan_entry_map)
|
||||
(
|
||||
reusable_chunkings,
|
||||
prepopulated_scan_entry_map,
|
||||
pruned_scan_entry_map,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn integrate_uid_or_gid_map(new: &mut BTreeMap<u16, String>, old: &BTreeMap<u16, String>) {
|
||||
@ -287,6 +314,7 @@ pub fn limit_scan_entry_map_to_size(
|
||||
let mut accum_size: u64 = 0;
|
||||
let mut have_file = false;
|
||||
let mut result = PatriciaMap::new();
|
||||
// let mut removeds = PatriciaSet::new();
|
||||
let mut unincluded_directories = PatriciaMap::new();
|
||||
|
||||
for (path_bytes, entry) in scan_entry_map.into_iter() {
|
||||
@ -334,9 +362,16 @@ pub fn limit_scan_entry_map_to_size(
|
||||
} else if matches!(&entry, &ScanEntry::Directory { .. }) {
|
||||
// put the directory to one side in case we need it...
|
||||
unincluded_directories.insert(path_bytes, entry);
|
||||
} else {
|
||||
// removeds.insert(path_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
// for (key, _) in unincluded_directories {
|
||||
// removeds.insert(key);
|
||||
// }
|
||||
|
||||
// (result, removeds)
|
||||
result
|
||||
}
|
||||
|
||||
|
||||
@ -391,14 +391,15 @@ pub fn integrate_node_in_place(new: &mut TreeNode, old: &TreeNode) {
|
||||
}
|
||||
|
||||
pub fn assemble_tree_from_scan_entries(
|
||||
scan: PatriciaMap<ScanEntry>,
|
||||
new_scan: PatriciaMap<ScanEntry>,
|
||||
reused_scan: PatriciaMap<ScanEntry>,
|
||||
mut chunkings: PatriciaMap<Option<(RecursiveChunkRef, u64)>>,
|
||||
) -> eyre::Result<TreeNode> {
|
||||
let mut dirs: BTreeMap<String, BTreeMap<String, TreeNode>> = BTreeMap::new();
|
||||
// special-case the root ("")
|
||||
dirs.insert(String::new(), BTreeMap::new());
|
||||
|
||||
for (key, entry) in scan.into_iter() {
|
||||
for (key, entry) in reused_scan.into_iter().chain(new_scan.into_iter()) {
|
||||
let key_string = String::from_utf8(key).context("bad UTF-8 in PMap")?;
|
||||
let (parent_dir_name, child_name) =
|
||||
key_string.rsplit_once('/').unwrap_or(("", &key_string));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user