diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index 9b3ff2e..01f03ea 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -28,6 +28,7 @@ use bare_metrics_recorder::recording::BareMetricsRecorderCore; use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc}; use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination}; use datman::commands::ilabel::interactive_labelling_session; +use datman::commands::prune::{prune_with_retention_policy, RetentionPolicy}; use datman::commands::{init_descriptor, pushpull}; use datman::descriptor::{load_descriptor, SourceDescriptor}; use datman::get_hostname; @@ -137,6 +138,13 @@ pub enum DatmanCommand { pile_name: String, }, + /// Applies a retention policy by removing unnecessary backups. + /// Does not reclaim space by itself: use + /// `yama check --apply-gc --shallow` + /// & `yama compact` + /// to do that. + Prune { pile_name: String }, + #[clap(name = "_pull_responder_offerer")] InternalPullResponderOfferer { datman_path: PathBuf, @@ -410,6 +418,24 @@ fn main() -> anyhow::Result<()> { Box::new(pbar), )?; } + + DatmanCommand::Prune { pile_name } => { + let descriptor = load_descriptor(Path::new(".")).unwrap(); + let retention_policy = descriptor + .retention + .context("No retention policy set in descriptor")?; + let dest_desc = &descriptor.piles[&pile_name]; + + let pile_desc = load_pile_descriptor(&dest_desc.path)?; + + prune_with_retention_policy( + &dest_desc.path, + &pile_desc, + &RetentionPolicy::from_config(retention_policy), + true, + )?; + } + DatmanCommand::InternalPullResponderOfferer { datman_path, pile_name, diff --git a/datman/src/commands.rs b/datman/src/commands.rs index fc41ab0..58d171d 100644 --- a/datman/src/commands.rs +++ b/datman/src/commands.rs @@ -20,12 +20,13 @@ use std::fs::File; use std::io::Write; use std::path::Path; -use crate::descriptor::{Descriptor, SourceDescriptor}; +use crate::descriptor::{Descriptor, RetentionPolicyConfig, SourceDescriptor}; pub mod backup; pub mod extract; pub mod ibrowse; pub mod ilabel; +pub mod prune; pub mod pushpull; pub mod report; @@ -51,6 +52,12 @@ pub fn init_descriptor(path: &Path) -> anyhow::Result<()> { sources: source, piles: Default::default(), remote_hosts: Default::default(), + retention: Some(RetentionPolicyConfig { + daily: 14, + weekly: 12, + monthly: 24, + yearly: 9001, + }), })?; datman_toml_file.write_all(&bytes)?; diff --git a/datman/src/commands/prune.rs b/datman/src/commands/prune.rs new file mode 100644 index 0000000..7280a45 --- /dev/null +++ b/datman/src/commands/prune.rs @@ -0,0 +1,215 @@ +use crate::commands::backup::split_pointer_name; +use crate::descriptor::RetentionPolicyConfig; +use anyhow::{bail, Context}; +use log::info; +use std::collections::{BTreeMap, BTreeSet}; +use std::io; +use std::path::Path; +use yama::commands::open_pile; +use yama::operations::remove_pointer_safely; +use yama::pile::PileDescriptor; + +pub struct RetentionBand { + pub interval_s: u64, + pub number_to_retain: u32, +} + +pub struct RetentionPolicy { + pub retention_bands: Vec, +} + +const DAY: u64 = 86400; +const WEEK: u64 = 7 * DAY; +const MONTH: u64 = 31 * DAY; +const YEAR: u64 = 365 * DAY; + +impl RetentionPolicy { + pub fn from_config(descriptor: RetentionPolicyConfig) -> RetentionPolicy { + let mut policy = RetentionPolicy { + retention_bands: vec![], + }; + + if descriptor.daily != 0 { + policy.retention_bands.push(RetentionBand { + interval_s: DAY, + number_to_retain: descriptor.daily, + }); + } + + if descriptor.weekly != 0 { + policy.retention_bands.push(RetentionBand { + interval_s: WEEK, + number_to_retain: descriptor.weekly, + }); + } + + if descriptor.monthly != 0 { + policy.retention_bands.push(RetentionBand { + interval_s: MONTH, + number_to_retain: descriptor.monthly, + }); + } + + if descriptor.yearly != 0 { + policy.retention_bands.push(RetentionBand { + interval_s: YEAR, + number_to_retain: descriptor.yearly, + }); + } + + policy + } + + /// Returns the set of snapshots to remove. + pub fn apply_returning_prunable( + &self, + snapshots_by_unix_time: BTreeMap, + ) -> BTreeSet { + if snapshots_by_unix_time.is_empty() { + return BTreeSet::new(); + } + let mut snapshots_included: BTreeSet = BTreeSet::new(); + + // Always mark the most recent snapshot as retained! + let last_snapshot = snapshots_by_unix_time.keys().rev().next().unwrap(); + snapshots_included.insert(*last_snapshot); + + let now_time = *last_snapshot; + + for band in &self.retention_bands { + for multiple in 1..=band.number_to_retain { + let target_time = now_time - (multiple as u64) * band.interval_s; + if let Some((k, _)) = snapshots_by_unix_time.range(0..=target_time).rev().next() { + snapshots_included.insert(*k); + } + } + } + + // Find all prunable (unincluded) snapshots. + snapshots_by_unix_time + .into_iter() + .filter(|(k, _v)| !snapshots_included.contains(k)) + .map(|(_k, v)| v) + .collect() + } +} + +pub fn prune_with_retention_policy( + pile_path: &Path, + pile_desc: &PileDescriptor, + policy: &RetentionPolicy, + prompt_first: bool, +) -> anyhow::Result<()> { + let pile = open_pile(&pile_path, &pile_desc).context("Failed to open pile")?; + + let pointers = pile + .list_pointers() + .context("Failed to list pointers in pile")?; + + let mut pointers_to_keep: BTreeSet = pointers.iter().cloned().collect(); + + let pointers_to_remove = get_prunable_pointers(&policy, pointers); + + for remove in &pointers_to_remove { + pointers_to_keep.remove(remove); + } + + info!("Gory details:\n---\nKeep: {pointers_to_keep:?}\n---\nRemove: {pointers_to_remove:?}"); + info!( + "{} pointers to remove ({} to keep) based on retention policy.", + pointers_to_remove.len(), + pointers_to_keep.len() + ); + + if prompt_first { + println!("Would you like to proceed? [y/N]: "); + let mut buffer = String::new(); + let stdin = io::stdin(); // We get `Stdin` here. + stdin.read_line(&mut buffer)?; + if buffer.trim().to_ascii_lowercase() != "y" { + bail!("Aborted by user."); + } + } + + for to_remove in pointers_to_remove { + remove_pointer_safely(&pile, &to_remove)?; + } + + Ok(()) +} + +fn get_prunable_pointers(policy: &RetentionPolicy, pointers: Vec) -> BTreeSet { + let mut split_pointers_by_name: BTreeMap> = BTreeMap::new(); + + for pointer in pointers { + let (name, datetime) = if let Some(x) = split_pointer_name(&pointer) { + x + } else { + continue; + }; + + split_pointers_by_name + .entry(name) + .or_default() + .insert(datetime.timestamp().try_into().unwrap(), pointer); + } + + let mut pointers_to_remove = BTreeSet::new(); + + for (_pointer_base_name, ts_to_pointer) in split_pointers_by_name { + let to_remove = policy.apply_returning_prunable(ts_to_pointer); + + pointers_to_remove.extend(to_remove); + } + + pointers_to_remove +} + +#[cfg(test)] +mod test { + use crate::commands::prune::{get_prunable_pointers, RetentionPolicy}; + use crate::descriptor::RetentionPolicyConfig; + + #[test] + fn test_prunable_pointers() { + let pointers = vec![ + "alice+2022-09-28_05:00:00", + "alice+2022-09-28_02:00:00", + "alice+2022-09-21_05:00:00", + "alice+2022-09-14_05:00:00", + "alice+2022-09-08_05:00:00", + "alice+2022-09-07_05:00:00", + "alice+2022-09-01_05:00:00", + "bob+2022-09-28_06:00:00", + "bob+2022-09-28_03:00:00", + "bob+2022-09-21_06:00:00", + "bob+2022-09-14_06:00:00", + "bob+2022-09-08_06:00:00", + "bob+2022-09-07_06:00:00", + "bob+2022-09-01_06:00:00", + ] + .into_iter() + .map(|s| s.to_owned()) + .collect(); + let policy = RetentionPolicy::from_config(RetentionPolicyConfig { + daily: 0, + weekly: 3, + monthly: 0, + yearly: 0, + }); + + assert_eq!( + get_prunable_pointers(&policy, pointers) + .into_iter() + .collect::>(), + vec![ + "alice+2022-09-01_05:00:00", + "alice+2022-09-08_05:00:00", + "alice+2022-09-28_02:00:00", + "bob+2022-09-01_06:00:00", + "bob+2022-09-08_06:00:00", + "bob+2022-09-28_03:00:00", + ] + ); + } +} diff --git a/datman/src/descriptor.rs b/datman/src/descriptor.rs index 03d8dc3..0647825 100644 --- a/datman/src/descriptor.rs +++ b/datman/src/descriptor.rs @@ -38,6 +38,10 @@ pub struct Descriptor { pub piles: HashMap, pub remote_hosts: HashMap, + + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub retention: Option, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -46,6 +50,14 @@ pub struct RemoteHostDescriptor { pub path_to_datman: Option, } +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct RetentionPolicyConfig { + pub daily: u32, + pub weekly: u32, + pub monthly: u32, + pub yearly: u32, +} + #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(untagged)] pub enum SourceDescriptor { diff --git a/yama/src/operations.rs b/yama/src/operations.rs index 772552d..14fe27b 100644 --- a/yama/src/operations.rs +++ b/yama/src/operations.rs @@ -1,5 +1,73 @@ +use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node}; +use crate::pile::{Pile, RawPile}; +use crate::tree::{differentiate_node_in_place, integrate_node_in_place}; +use anyhow::anyhow; +use log::info; + pub mod checking; pub mod cleanup; pub mod extracting; pub mod legacy_pushpull; pub mod storing; + +pub fn remove_pointer_safely(pile: &Pile

, name: &str) -> anyhow::Result<()> { + // retrieve this pointer + let mut this_pointer = pile + .read_pointer(name)? + .ok_or_else(|| anyhow!("Pointer {:?} does not exist so can not be deleted.", name))?; + let mut this_node = retrieve_tree_node(&pile, this_pointer.chunk_ref.clone())?; + + let new_parent_name = this_pointer.parent_pointer.clone(); + fully_integrate_pointer_node(pile, &mut this_node.node, &mut this_pointer)?; + + let new_parent = if let Some(ref new_parent_name) = new_parent_name { + let new_parent_pointer = pile + .read_pointer(new_parent_name.as_str())? + .ok_or_else(|| anyhow!("Parent pointer {:?} does not exist.", name))?; + let new_parent_node = retrieve_tree_node(&pile, new_parent_pointer.chunk_ref.clone())?; + Some((new_parent_pointer, new_parent_node)) + } else { + None + }; + + // now integrate any pointers that rely on this one + // so that they no longer rely on this one. + for pointer in pile.list_pointers()?.iter() { + if pointer == name { + continue; + } + if let Some(mut pointer_data) = pile.read_pointer(pointer.as_str())? { + if let Some(parent_pointer) = pointer_data.parent_pointer.as_ref() { + if parent_pointer == name { + info!("Pointer would be orphaned: {:?}; integrating", pointer); + + // need to integrate this node, so retrieve it + let mut node = retrieve_tree_node(&pile, pointer_data.chunk_ref)?; + + // integrate it in-place + integrate_node_in_place(&mut node.node, &this_node.node)?; + + if let Some((_, ref new_parent_node)) = new_parent { + // then differentiate with respect to the NEW parent + differentiate_node_in_place(&mut node.node, &new_parent_node.node)?; + } + + // pass through the parent + pointer_data.parent_pointer = new_parent_name.clone(); + + // store the updated version of the pointer + let new_chunk_ref = store_tree_node(&pile, &node)?; + // associate the new node with the new version of the pointer + pointer_data.chunk_ref = new_chunk_ref; + // write the pointer back. + pile.write_pointer(pointer.as_str(), &pointer_data)?; + } + } + } + } + + // then delete the pointer + pile.delete_pointer(name)?; + info!("Deleted pointer: {:?}", name); + Ok(()) +} diff --git a/yama/src/tree.rs b/yama/src/tree.rs index eb2f5e8..0652bd6 100644 --- a/yama/src/tree.rs +++ b/yama/src/tree.rs @@ -185,7 +185,7 @@ pub fn differentiate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow /// result is in-place. /// /// Preconditions: -/// - `old` must be an integrated pointer. +/// - `old` must be an integrated pointer. (Otherwise this algorithm is not correct.) /// - `old` is the parent of `new` pub fn integrate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow::Result<()> { if let TreeNode::Directory { children, .. } = new {