diff --git a/Cargo.lock b/Cargo.lock index fa720c5..5337350 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -377,6 +377,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "bytesize" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5" + [[package]] name = "cap-fs-ext" version = "1.0.10" @@ -746,6 +752,7 @@ dependencies = [ name = "datman" version = "0.7.0-alpha.1" dependencies = [ + "bytesize", "chrono", "clap", "dashmap", @@ -1589,6 +1596,12 @@ dependencies = [ "nu-ansi-term", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "match_cfg" version = "0.1.0" @@ -3901,6 +3914,7 @@ dependencies = [ "ignore", "indicatif", "io-streams", + "maplit", "memmap2", "patricia_tree", "serde", diff --git a/datman/Cargo.toml b/datman/Cargo.toml index bab8e89..678baef 100644 --- a/datman/Cargo.toml +++ b/datman/Cargo.toml @@ -24,6 +24,7 @@ tokio = { version = "1.28.0", features = ["fs", "macros", "rt-multi-thread"] } dashmap = "5.4.0" chrono = "0.4.24" users = "0.11.0" +bytesize = "1.2.0" yama = { version = "0.7.0-alpha.1", path = "../yama" } yama_pile = { path = "../yama_pile" } diff --git a/datman/src/backup.rs b/datman/src/backup.rs index 0a2c492..c9e9516 100644 --- a/datman/src/backup.rs +++ b/datman/src/backup.rs @@ -1,6 +1,8 @@ use crate::descriptor_config::{SourceDescriptor, SourceDescriptorInner, VirtualSourceKind}; use crate::pointer_names::{get_pointer_name_at, POINTER_NAME_DATETIME_SPLITTER}; +use bytesize::ByteSize; use chrono::{DateTime, Utc}; +use clap::Args; use dashmap::DashSet; use eyre::{bail, ensure, eyre, Context, ContextCompat}; use indicatif::ProgressStyle; @@ -17,7 +19,7 @@ use tracing::{debug, info, info_span, Instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use users::{get_current_gid, get_current_uid}; use yama::pile_with_cache::PileWithCache; -use yama::scan::create_uidgid_lookup_tables; +use yama::scan::{create_uidgid_lookup_tables, limit_scan_entry_map_to_size}; use yama::storing::{ assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState, }; @@ -32,9 +34,19 @@ use yama_pile::tree::{ }; use yama_wormfile::boxed::BoxedWormFileProvider; +#[derive(Args, Clone, Debug)] +pub struct BackupOptions { + /// Number of bytes to back up in one go. Intended for gradually getting a backup started. + /// Supports suffixes like MiB and MB. + /// Applies per-source. Does not apply to virtual sources. + #[clap(long)] + gradual: Option, +} + pub async fn backup( pwc: Arc>, sources_to_backup: BTreeMap, + options: &BackupOptions, ) -> eyre::Result<()> { // Locate suitable parent pointers let parents_to_use = find_suitable_parent_pointers(&pwc, &sources_to_backup) @@ -55,9 +67,10 @@ pub async fn backup( let pwc = pwc.clone(); let bds_span = info_span!("storing"); + let options = options.clone(); tokio::spawn( async move { - backup_dir_sources(dir_sources, pwc, new_unflushed_chunks) + backup_dir_sources(dir_sources, pwc, new_unflushed_chunks, &options) .await .context("whilst backing up dir sources") } @@ -261,6 +274,7 @@ async fn backup_dir_sources( dir_sources: Vec, pwc: Arc>, new_unflushed_chunks: Arc>, + options: &BackupOptions, ) -> eyre::Result { let mut chunk_file_maps = Vec::new(); let mut pruned_scan_entry_maps = Vec::new(); @@ -281,7 +295,14 @@ async fn backup_dir_sources( ) }; chunk_file_maps.push(chunk_file_map); - pruned_scan_entry_maps.push(pruned_scan_entry_map); + + pruned_scan_entry_maps.push(match options.gradual { + Some(gradual_size_limit) => Cow::Owned(limit_scan_entry_map_to_size( + pruned_scan_entry_map.into_owned(), + gradual_size_limit.as_u64(), + )), + None => pruned_scan_entry_map, + }); } let store_span = Span::current(); diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index 70e59c6..926e00e 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -16,7 +16,7 @@ along with Yama. If not, see . */ use clap::{Parser, Subcommand}; -use datman::backup::backup; +use datman::backup::{backup, BackupOptions}; use datman::descriptor_config::{load_descriptor, SourceDescriptor}; use datman::extract::{ extract, load_pointers_for_extraction, merge_roots_for_batch_extract, select_to_extract, @@ -122,10 +122,14 @@ pub enum DatmanCommand { BackupOne { source_name: String, pile_name: String, + #[clap(flatten)] + options: BackupOptions, }, BackupAll { pile_name: String, + #[clap(flatten)] + options: BackupOptions, }, ExtractOne { @@ -178,6 +182,7 @@ pub async fn main() -> eyre::Result<()> { DatmanCommand::BackupOne { source_name, pile_name, + options, } => { let pile_connector_path = descriptor .piles @@ -203,9 +208,9 @@ pub async fn main() -> eyre::Result<()> { let mut sources_to_backup = BTreeMap::new(); sources_to_backup.insert(source_name.clone(), source.clone()); - backup(pwc, sources_to_backup).await?; + backup(pwc, sources_to_backup, &options).await?; } - DatmanCommand::BackupAll { pile_name } => { + DatmanCommand::BackupAll { pile_name, options } => { let pile_connector_path = descriptor .piles .get(&pile_name) @@ -235,7 +240,7 @@ pub async fn main() -> eyre::Result<()> { sources_to_backup.keys().collect::>() ); - backup(pwc, sources_to_backup).await?; + backup(pwc, sources_to_backup, &options).await?; } DatmanCommand::ExtractOne { pile_name, diff --git a/yama/Cargo.toml b/yama/Cargo.toml index 55d1e61..e88212b 100644 --- a/yama/Cargo.toml +++ b/yama/Cargo.toml @@ -51,3 +51,7 @@ io-streams = "0.14.3" dust_style_filetree_display = "0.8.5" + + +[dev-dependencies] +maplit = "1.0.2" \ No newline at end of file diff --git a/yama/src/scan.rs b/yama/src/scan.rs index b08f622..82600d1 100644 --- a/yama/src/scan.rs +++ b/yama/src/scan.rs @@ -1,6 +1,7 @@ use eyre::{bail, eyre, Context, ContextCompat}; use ignore::WalkBuilder; use patricia_tree::PatriciaMap; +use std::cmp::max; use std::collections::{BTreeMap, BTreeSet}; use std::fs::{read_link, Metadata}; use std::io::ErrorKind; @@ -272,3 +273,145 @@ pub fn integrate_uid_or_gid_map(new: &mut BTreeMap, old: &BTreeMap< new.entry(*old_uid).or_insert_with(|| old_user.clone()); } } + +/// Given a scan entry map, creates another one whose size is limited to not containing more than +/// the given number of file bytes to be stored. +/// There is one exception: if an individual file exceeds the max size, the returned scan entry map +/// may contain just that one file. +/// +/// Useful for gradually starting backups without having to do the first in one shot. +pub fn limit_scan_entry_map_to_size( + scan_entry_map: PatriciaMap, + soft_max_size: u64, +) -> PatriciaMap { + let mut accum_size: u64 = 0; + let mut have_file = false; + let mut result = PatriciaMap::new(); + let mut unincluded_directories = PatriciaMap::new(); + + for (path_bytes, entry) in scan_entry_map.into_iter() { + if accum_size >= soft_max_size { + // we're already full! + break; + } + let size_of_entry = match entry { + ScanEntry::NormalFile { size, .. } => { + // even zero-byte files are not for free, so don't let them be. + max(size, 4096) + } + ScanEntry::Directory { .. } => { + // slightly made up number, but typical size of an inode? + 4096 + } + ScanEntry::SymbolicLink { .. } => { + // slightly made up number, but typical size of an inode? + 4096 + } + }; + + let size_limit_would_be_respected = accum_size + size_of_entry <= soft_max_size; + let this_is_the_only_file = !have_file && matches!(&entry, ScanEntry::NormalFile { .. }); + if size_limit_would_be_respected || this_is_the_only_file { + have_file |= matches!(&entry, ScanEntry::NormalFile { .. }); + result.insert(&path_bytes, entry); + accum_size += size_of_entry; + + // Pull out parent directories so our subset always contains the parents for their children. + let mut path_fragment = &path_bytes[..]; + while let Some((index, _)) = path_fragment + .iter() + .enumerate() + .rev() + .find(|(_idx, char_byte)| **char_byte == b'/') + { + path_fragment = &path_bytes[0..index]; + + if let Some(directory) = unincluded_directories.remove(path_fragment) { + result.insert(path_fragment, directory); + accum_size += 4096; + } + } + } else if matches!(&entry, &ScanEntry::Directory { .. }) { + // put the directory to one side in case we need it... + unincluded_directories.insert(path_bytes, entry); + } + } + + result +} + +#[cfg(test)] +mod tests { + use crate::scan::limit_scan_entry_map_to_size; + use maplit::btreeset; + use patricia_tree::PatriciaMap; + use std::collections::BTreeSet; + use yama_pile::tree::unpopulated::ScanEntry; + use yama_pile::tree::{FilesystemOwnership, FilesystemPermissions}; + + #[test] + fn test_limit_scan_entry_map_to_size() { + let mut orig = PatriciaMap::new(); + + orig.insert( + "somedir".as_bytes(), + ScanEntry::Directory { + ownership: FilesystemOwnership { uid: 0, gid: 0 }, + permissions: FilesystemPermissions { mode: 0 }, + }, + ); + orig.insert( + "somedir/a_small_file".as_bytes(), + ScanEntry::NormalFile { + mtime: 0, + ownership: FilesystemOwnership { uid: 0, gid: 0 }, + permissions: FilesystemPermissions { mode: 0 }, + size: 4, + }, + ); + orig.insert( + "somedir/somefile".as_bytes(), + ScanEntry::NormalFile { + mtime: 0, + ownership: FilesystemOwnership { uid: 0, gid: 0 }, + permissions: FilesystemPermissions { mode: 0 }, + size: 8192, + }, + ); + + // 16k = 4k (dir) + 8k (somefile) + 4k (small file; minimum) + assert_eq!( + limit_scan_entry_map_to_size(orig.clone(), 16384) + .keys() + .collect::>(), + btreeset! { + b"somedir".to_vec(), + b"somedir/a_small_file".to_vec(), + b"somedir/somefile".to_vec(), + } + ); + + // now we don't have room for the big file. + assert_eq!( + limit_scan_entry_map_to_size(orig.clone(), 16383) + .keys() + .collect::>(), + btreeset! { + b"somedir".to_vec(), + b"somedir/a_small_file".to_vec(), + } + ); + + // because we must always include at least one file so we make forward progress, it doesn't + // matter that this violates the size limit. + assert_eq!( + limit_scan_entry_map_to_size(orig.clone(), 1) + .keys() + .collect::>(), + btreeset! { + b"somedir".to_vec(), + b"somedir/a_small_file".to_vec(), + } + ); + } +}