Add --gradual flag to datman backup commands: allows softlimiting the size of a backup round
This commit is contained in:
parent
00fa9d0951
commit
c812532541
14
Cargo.lock
generated
14
Cargo.lock
generated
@ -377,6 +377,12 @@ version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
|
||||
|
||||
[[package]]
|
||||
name = "bytesize"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5"
|
||||
|
||||
[[package]]
|
||||
name = "cap-fs-ext"
|
||||
version = "1.0.10"
|
||||
@ -746,6 +752,7 @@ dependencies = [
|
||||
name = "datman"
|
||||
version = "0.7.0-alpha.1"
|
||||
dependencies = [
|
||||
"bytesize",
|
||||
"chrono",
|
||||
"clap",
|
||||
"dashmap",
|
||||
@ -1589,6 +1596,12 @@ dependencies = [
|
||||
"nu-ansi-term",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "maplit"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@ -3901,6 +3914,7 @@ dependencies = [
|
||||
"ignore",
|
||||
"indicatif",
|
||||
"io-streams",
|
||||
"maplit",
|
||||
"memmap2",
|
||||
"patricia_tree",
|
||||
"serde",
|
||||
|
@ -24,6 +24,7 @@ tokio = { version = "1.28.0", features = ["fs", "macros", "rt-multi-thread"] }
|
||||
dashmap = "5.4.0"
|
||||
chrono = "0.4.24"
|
||||
users = "0.11.0"
|
||||
bytesize = "1.2.0"
|
||||
|
||||
yama = { version = "0.7.0-alpha.1", path = "../yama" }
|
||||
yama_pile = { path = "../yama_pile" }
|
||||
|
@ -1,6 +1,8 @@
|
||||
use crate::descriptor_config::{SourceDescriptor, SourceDescriptorInner, VirtualSourceKind};
|
||||
use crate::pointer_names::{get_pointer_name_at, POINTER_NAME_DATETIME_SPLITTER};
|
||||
use bytesize::ByteSize;
|
||||
use chrono::{DateTime, Utc};
|
||||
use clap::Args;
|
||||
use dashmap::DashSet;
|
||||
use eyre::{bail, ensure, eyre, Context, ContextCompat};
|
||||
use indicatif::ProgressStyle;
|
||||
@ -17,7 +19,7 @@ use tracing::{debug, info, info_span, Instrument, Span};
|
||||
use tracing_indicatif::span_ext::IndicatifSpanExt;
|
||||
use users::{get_current_gid, get_current_uid};
|
||||
use yama::pile_with_cache::PileWithCache;
|
||||
use yama::scan::create_uidgid_lookup_tables;
|
||||
use yama::scan::{create_uidgid_lookup_tables, limit_scan_entry_map_to_size};
|
||||
use yama::storing::{
|
||||
assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState,
|
||||
};
|
||||
@ -32,9 +34,19 @@ use yama_pile::tree::{
|
||||
};
|
||||
use yama_wormfile::boxed::BoxedWormFileProvider;
|
||||
|
||||
#[derive(Args, Clone, Debug)]
|
||||
pub struct BackupOptions {
|
||||
/// Number of bytes to back up in one go. Intended for gradually getting a backup started.
|
||||
/// Supports suffixes like MiB and MB.
|
||||
/// Applies per-source. Does not apply to virtual sources.
|
||||
#[clap(long)]
|
||||
gradual: Option<ByteSize>,
|
||||
}
|
||||
|
||||
pub async fn backup(
|
||||
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
|
||||
sources_to_backup: BTreeMap<String, SourceDescriptor>,
|
||||
options: &BackupOptions,
|
||||
) -> eyre::Result<()> {
|
||||
// Locate suitable parent pointers
|
||||
let parents_to_use = find_suitable_parent_pointers(&pwc, &sources_to_backup)
|
||||
@ -55,9 +67,10 @@ pub async fn backup(
|
||||
let pwc = pwc.clone();
|
||||
|
||||
let bds_span = info_span!("storing");
|
||||
let options = options.clone();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
backup_dir_sources(dir_sources, pwc, new_unflushed_chunks)
|
||||
backup_dir_sources(dir_sources, pwc, new_unflushed_chunks, &options)
|
||||
.await
|
||||
.context("whilst backing up dir sources")
|
||||
}
|
||||
@ -261,6 +274,7 @@ async fn backup_dir_sources(
|
||||
dir_sources: Vec<DirSourcePrep>,
|
||||
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
|
||||
new_unflushed_chunks: Arc<DashSet<ChunkId>>,
|
||||
options: &BackupOptions,
|
||||
) -> eyre::Result<BackupDirSourcesReturn> {
|
||||
let mut chunk_file_maps = Vec::new();
|
||||
let mut pruned_scan_entry_maps = Vec::new();
|
||||
@ -281,7 +295,14 @@ async fn backup_dir_sources(
|
||||
)
|
||||
};
|
||||
chunk_file_maps.push(chunk_file_map);
|
||||
pruned_scan_entry_maps.push(pruned_scan_entry_map);
|
||||
|
||||
pruned_scan_entry_maps.push(match options.gradual {
|
||||
Some(gradual_size_limit) => Cow::Owned(limit_scan_entry_map_to_size(
|
||||
pruned_scan_entry_map.into_owned(),
|
||||
gradual_size_limit.as_u64(),
|
||||
)),
|
||||
None => pruned_scan_entry_map,
|
||||
});
|
||||
}
|
||||
|
||||
let store_span = Span::current();
|
||||
|
@ -16,7 +16,7 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use datman::backup::backup;
|
||||
use datman::backup::{backup, BackupOptions};
|
||||
use datman::descriptor_config::{load_descriptor, SourceDescriptor};
|
||||
use datman::extract::{
|
||||
extract, load_pointers_for_extraction, merge_roots_for_batch_extract, select_to_extract,
|
||||
@ -122,10 +122,14 @@ pub enum DatmanCommand {
|
||||
BackupOne {
|
||||
source_name: String,
|
||||
pile_name: String,
|
||||
#[clap(flatten)]
|
||||
options: BackupOptions,
|
||||
},
|
||||
|
||||
BackupAll {
|
||||
pile_name: String,
|
||||
#[clap(flatten)]
|
||||
options: BackupOptions,
|
||||
},
|
||||
|
||||
ExtractOne {
|
||||
@ -178,6 +182,7 @@ pub async fn main() -> eyre::Result<()> {
|
||||
DatmanCommand::BackupOne {
|
||||
source_name,
|
||||
pile_name,
|
||||
options,
|
||||
} => {
|
||||
let pile_connector_path = descriptor
|
||||
.piles
|
||||
@ -203,9 +208,9 @@ pub async fn main() -> eyre::Result<()> {
|
||||
let mut sources_to_backup = BTreeMap::new();
|
||||
sources_to_backup.insert(source_name.clone(), source.clone());
|
||||
|
||||
backup(pwc, sources_to_backup).await?;
|
||||
backup(pwc, sources_to_backup, &options).await?;
|
||||
}
|
||||
DatmanCommand::BackupAll { pile_name } => {
|
||||
DatmanCommand::BackupAll { pile_name, options } => {
|
||||
let pile_connector_path = descriptor
|
||||
.piles
|
||||
.get(&pile_name)
|
||||
@ -235,7 +240,7 @@ pub async fn main() -> eyre::Result<()> {
|
||||
sources_to_backup.keys().collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
backup(pwc, sources_to_backup).await?;
|
||||
backup(pwc, sources_to_backup, &options).await?;
|
||||
}
|
||||
DatmanCommand::ExtractOne {
|
||||
pile_name,
|
||||
|
@ -51,3 +51,7 @@ io-streams = "0.14.3"
|
||||
|
||||
|
||||
dust_style_filetree_display = "0.8.5"
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
maplit = "1.0.2"
|
143
yama/src/scan.rs
143
yama/src/scan.rs
@ -1,6 +1,7 @@
|
||||
use eyre::{bail, eyre, Context, ContextCompat};
|
||||
use ignore::WalkBuilder;
|
||||
use patricia_tree::PatriciaMap;
|
||||
use std::cmp::max;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fs::{read_link, Metadata};
|
||||
use std::io::ErrorKind;
|
||||
@ -272,3 +273,145 @@ pub fn integrate_uid_or_gid_map(new: &mut BTreeMap<u16, String>, old: &BTreeMap<
|
||||
new.entry(*old_uid).or_insert_with(|| old_user.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a scan entry map, creates another one whose size is limited to not containing more than
|
||||
/// the given number of file bytes to be stored.
|
||||
/// There is one exception: if an individual file exceeds the max size, the returned scan entry map
|
||||
/// may contain just that one file.
|
||||
///
|
||||
/// Useful for gradually starting backups without having to do the first in one shot.
|
||||
pub fn limit_scan_entry_map_to_size(
|
||||
scan_entry_map: PatriciaMap<ScanEntry>,
|
||||
soft_max_size: u64,
|
||||
) -> PatriciaMap<ScanEntry> {
|
||||
let mut accum_size: u64 = 0;
|
||||
let mut have_file = false;
|
||||
let mut result = PatriciaMap::new();
|
||||
let mut unincluded_directories = PatriciaMap::new();
|
||||
|
||||
for (path_bytes, entry) in scan_entry_map.into_iter() {
|
||||
if accum_size >= soft_max_size {
|
||||
// we're already full!
|
||||
break;
|
||||
}
|
||||
let size_of_entry = match entry {
|
||||
ScanEntry::NormalFile { size, .. } => {
|
||||
// even zero-byte files are not for free, so don't let them be.
|
||||
max(size, 4096)
|
||||
}
|
||||
ScanEntry::Directory { .. } => {
|
||||
// slightly made up number, but typical size of an inode?
|
||||
4096
|
||||
}
|
||||
ScanEntry::SymbolicLink { .. } => {
|
||||
// slightly made up number, but typical size of an inode?
|
||||
4096
|
||||
}
|
||||
};
|
||||
|
||||
let size_limit_would_be_respected = accum_size + size_of_entry <= soft_max_size;
|
||||
let this_is_the_only_file = !have_file && matches!(&entry, ScanEntry::NormalFile { .. });
|
||||
if size_limit_would_be_respected || this_is_the_only_file {
|
||||
have_file |= matches!(&entry, ScanEntry::NormalFile { .. });
|
||||
result.insert(&path_bytes, entry);
|
||||
accum_size += size_of_entry;
|
||||
|
||||
// Pull out parent directories so our subset always contains the parents for their children.
|
||||
let mut path_fragment = &path_bytes[..];
|
||||
while let Some((index, _)) = path_fragment
|
||||
.iter()
|
||||
.enumerate()
|
||||
.rev()
|
||||
.find(|(_idx, char_byte)| **char_byte == b'/')
|
||||
{
|
||||
path_fragment = &path_bytes[0..index];
|
||||
|
||||
if let Some(directory) = unincluded_directories.remove(path_fragment) {
|
||||
result.insert(path_fragment, directory);
|
||||
accum_size += 4096;
|
||||
}
|
||||
}
|
||||
} else if matches!(&entry, &ScanEntry::Directory { .. }) {
|
||||
// put the directory to one side in case we need it...
|
||||
unincluded_directories.insert(path_bytes, entry);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::scan::limit_scan_entry_map_to_size;
|
||||
use maplit::btreeset;
|
||||
use patricia_tree::PatriciaMap;
|
||||
use std::collections::BTreeSet;
|
||||
use yama_pile::tree::unpopulated::ScanEntry;
|
||||
use yama_pile::tree::{FilesystemOwnership, FilesystemPermissions};
|
||||
|
||||
#[test]
|
||||
fn test_limit_scan_entry_map_to_size() {
|
||||
let mut orig = PatriciaMap::new();
|
||||
|
||||
orig.insert(
|
||||
"somedir".as_bytes(),
|
||||
ScanEntry::Directory {
|
||||
ownership: FilesystemOwnership { uid: 0, gid: 0 },
|
||||
permissions: FilesystemPermissions { mode: 0 },
|
||||
},
|
||||
);
|
||||
orig.insert(
|
||||
"somedir/a_small_file".as_bytes(),
|
||||
ScanEntry::NormalFile {
|
||||
mtime: 0,
|
||||
ownership: FilesystemOwnership { uid: 0, gid: 0 },
|
||||
permissions: FilesystemPermissions { mode: 0 },
|
||||
size: 4,
|
||||
},
|
||||
);
|
||||
orig.insert(
|
||||
"somedir/somefile".as_bytes(),
|
||||
ScanEntry::NormalFile {
|
||||
mtime: 0,
|
||||
ownership: FilesystemOwnership { uid: 0, gid: 0 },
|
||||
permissions: FilesystemPermissions { mode: 0 },
|
||||
size: 8192,
|
||||
},
|
||||
);
|
||||
|
||||
// 16k = 4k (dir) + 8k (somefile) + 4k (small file; minimum)
|
||||
assert_eq!(
|
||||
limit_scan_entry_map_to_size(orig.clone(), 16384)
|
||||
.keys()
|
||||
.collect::<BTreeSet<_>>(),
|
||||
btreeset! {
|
||||
b"somedir".to_vec(),
|
||||
b"somedir/a_small_file".to_vec(),
|
||||
b"somedir/somefile".to_vec(),
|
||||
}
|
||||
);
|
||||
|
||||
// now we don't have room for the big file.
|
||||
assert_eq!(
|
||||
limit_scan_entry_map_to_size(orig.clone(), 16383)
|
||||
.keys()
|
||||
.collect::<BTreeSet<_>>(),
|
||||
btreeset! {
|
||||
b"somedir".to_vec(),
|
||||
b"somedir/a_small_file".to_vec(),
|
||||
}
|
||||
);
|
||||
|
||||
// because we must always include at least one file so we make forward progress, it doesn't
|
||||
// matter that this violates the size limit.
|
||||
assert_eq!(
|
||||
limit_scan_entry_map_to_size(orig.clone(), 1)
|
||||
.keys()
|
||||
.collect::<BTreeSet<_>>(),
|
||||
btreeset! {
|
||||
b"somedir".to_vec(),
|
||||
b"somedir/a_small_file".to_vec(),
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user