Implement datman prune

This commit is contained in:
Olivier 'reivilibre' 2022-11-20 19:32:09 +00:00
parent fcc79ca95d
commit 6e1e173cb6
6 changed files with 330 additions and 2 deletions

View File

@ -28,6 +28,7 @@ use bare_metrics_recorder::recording::BareMetricsRecorderCore;
use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc};
use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination};
use datman::commands::ilabel::interactive_labelling_session;
use datman::commands::prune::{prune_with_retention_policy, RetentionPolicy};
use datman::commands::{init_descriptor, pushpull};
use datman::descriptor::{load_descriptor, SourceDescriptor};
use datman::get_hostname;
@ -137,6 +138,13 @@ pub enum DatmanCommand {
pile_name: String,
},
/// Applies a retention policy by removing unnecessary backups.
/// Does not reclaim space by itself: use
/// `yama check --apply-gc --shallow`
/// & `yama compact`
/// to do that.
Prune { pile_name: String },
#[clap(name = "_pull_responder_offerer")]
InternalPullResponderOfferer {
datman_path: PathBuf,
@ -410,6 +418,24 @@ fn main() -> anyhow::Result<()> {
Box::new(pbar),
)?;
}
DatmanCommand::Prune { pile_name } => {
let descriptor = load_descriptor(Path::new(".")).unwrap();
let retention_policy = descriptor
.retention
.context("No retention policy set in descriptor")?;
let dest_desc = &descriptor.piles[&pile_name];
let pile_desc = load_pile_descriptor(&dest_desc.path)?;
prune_with_retention_policy(
&dest_desc.path,
&pile_desc,
&RetentionPolicy::from_config(retention_policy),
true,
)?;
}
DatmanCommand::InternalPullResponderOfferer {
datman_path,
pile_name,

View File

@ -20,12 +20,13 @@ use std::fs::File;
use std::io::Write;
use std::path::Path;
use crate::descriptor::{Descriptor, SourceDescriptor};
use crate::descriptor::{Descriptor, RetentionPolicyConfig, SourceDescriptor};
pub mod backup;
pub mod extract;
pub mod ibrowse;
pub mod ilabel;
pub mod prune;
pub mod pushpull;
pub mod report;
@ -51,6 +52,12 @@ pub fn init_descriptor(path: &Path) -> anyhow::Result<()> {
sources: source,
piles: Default::default(),
remote_hosts: Default::default(),
retention: Some(RetentionPolicyConfig {
daily: 14,
weekly: 12,
monthly: 24,
yearly: 9001,
}),
})?;
datman_toml_file.write_all(&bytes)?;

View File

@ -0,0 +1,215 @@
use crate::commands::backup::split_pointer_name;
use crate::descriptor::RetentionPolicyConfig;
use anyhow::{bail, Context};
use log::info;
use std::collections::{BTreeMap, BTreeSet};
use std::io;
use std::path::Path;
use yama::commands::open_pile;
use yama::operations::remove_pointer_safely;
use yama::pile::PileDescriptor;
pub struct RetentionBand {
pub interval_s: u64,
pub number_to_retain: u32,
}
pub struct RetentionPolicy {
pub retention_bands: Vec<RetentionBand>,
}
const DAY: u64 = 86400;
const WEEK: u64 = 7 * DAY;
const MONTH: u64 = 31 * DAY;
const YEAR: u64 = 365 * DAY;
impl RetentionPolicy {
pub fn from_config(descriptor: RetentionPolicyConfig) -> RetentionPolicy {
let mut policy = RetentionPolicy {
retention_bands: vec![],
};
if descriptor.daily != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: DAY,
number_to_retain: descriptor.daily,
});
}
if descriptor.weekly != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: WEEK,
number_to_retain: descriptor.weekly,
});
}
if descriptor.monthly != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: MONTH,
number_to_retain: descriptor.monthly,
});
}
if descriptor.yearly != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: YEAR,
number_to_retain: descriptor.yearly,
});
}
policy
}
/// Returns the set of snapshots to remove.
pub fn apply_returning_prunable(
&self,
snapshots_by_unix_time: BTreeMap<u64, String>,
) -> BTreeSet<String> {
if snapshots_by_unix_time.is_empty() {
return BTreeSet::new();
}
let mut snapshots_included: BTreeSet<u64> = BTreeSet::new();
// Always mark the most recent snapshot as retained!
let last_snapshot = snapshots_by_unix_time.keys().rev().next().unwrap();
snapshots_included.insert(*last_snapshot);
let now_time = *last_snapshot;
for band in &self.retention_bands {
for multiple in 1..=band.number_to_retain {
let target_time = now_time - (multiple as u64) * band.interval_s;
if let Some((k, _)) = snapshots_by_unix_time.range(0..=target_time).rev().next() {
snapshots_included.insert(*k);
}
}
}
// Find all prunable (unincluded) snapshots.
snapshots_by_unix_time
.into_iter()
.filter(|(k, _v)| !snapshots_included.contains(k))
.map(|(_k, v)| v)
.collect()
}
}
pub fn prune_with_retention_policy(
pile_path: &Path,
pile_desc: &PileDescriptor,
policy: &RetentionPolicy,
prompt_first: bool,
) -> anyhow::Result<()> {
let pile = open_pile(&pile_path, &pile_desc).context("Failed to open pile")?;
let pointers = pile
.list_pointers()
.context("Failed to list pointers in pile")?;
let mut pointers_to_keep: BTreeSet<String> = pointers.iter().cloned().collect();
let pointers_to_remove = get_prunable_pointers(&policy, pointers);
for remove in &pointers_to_remove {
pointers_to_keep.remove(remove);
}
info!("Gory details:\n---\nKeep: {pointers_to_keep:?}\n---\nRemove: {pointers_to_remove:?}");
info!(
"{} pointers to remove ({} to keep) based on retention policy.",
pointers_to_remove.len(),
pointers_to_keep.len()
);
if prompt_first {
println!("Would you like to proceed? [y/N]: ");
let mut buffer = String::new();
let stdin = io::stdin(); // We get `Stdin` here.
stdin.read_line(&mut buffer)?;
if buffer.trim().to_ascii_lowercase() != "y" {
bail!("Aborted by user.");
}
}
for to_remove in pointers_to_remove {
remove_pointer_safely(&pile, &to_remove)?;
}
Ok(())
}
fn get_prunable_pointers(policy: &RetentionPolicy, pointers: Vec<String>) -> BTreeSet<String> {
let mut split_pointers_by_name: BTreeMap<String, BTreeMap<u64, String>> = BTreeMap::new();
for pointer in pointers {
let (name, datetime) = if let Some(x) = split_pointer_name(&pointer) {
x
} else {
continue;
};
split_pointers_by_name
.entry(name)
.or_default()
.insert(datetime.timestamp().try_into().unwrap(), pointer);
}
let mut pointers_to_remove = BTreeSet::new();
for (_pointer_base_name, ts_to_pointer) in split_pointers_by_name {
let to_remove = policy.apply_returning_prunable(ts_to_pointer);
pointers_to_remove.extend(to_remove);
}
pointers_to_remove
}
#[cfg(test)]
mod test {
use crate::commands::prune::{get_prunable_pointers, RetentionPolicy};
use crate::descriptor::RetentionPolicyConfig;
#[test]
fn test_prunable_pointers() {
let pointers = vec![
"alice+2022-09-28_05:00:00",
"alice+2022-09-28_02:00:00",
"alice+2022-09-21_05:00:00",
"alice+2022-09-14_05:00:00",
"alice+2022-09-08_05:00:00",
"alice+2022-09-07_05:00:00",
"alice+2022-09-01_05:00:00",
"bob+2022-09-28_06:00:00",
"bob+2022-09-28_03:00:00",
"bob+2022-09-21_06:00:00",
"bob+2022-09-14_06:00:00",
"bob+2022-09-08_06:00:00",
"bob+2022-09-07_06:00:00",
"bob+2022-09-01_06:00:00",
]
.into_iter()
.map(|s| s.to_owned())
.collect();
let policy = RetentionPolicy::from_config(RetentionPolicyConfig {
daily: 0,
weekly: 3,
monthly: 0,
yearly: 0,
});
assert_eq!(
get_prunable_pointers(&policy, pointers)
.into_iter()
.collect::<Vec<_>>(),
vec![
"alice+2022-09-01_05:00:00",
"alice+2022-09-08_05:00:00",
"alice+2022-09-28_02:00:00",
"bob+2022-09-01_06:00:00",
"bob+2022-09-08_06:00:00",
"bob+2022-09-28_03:00:00",
]
);
}
}

View File

@ -38,6 +38,10 @@ pub struct Descriptor {
pub piles: HashMap<String, DestPileDescriptor>,
pub remote_hosts: HashMap<String, RemoteHostDescriptor>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub retention: Option<RetentionPolicyConfig>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
@ -46,6 +50,14 @@ pub struct RemoteHostDescriptor {
pub path_to_datman: Option<String>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct RetentionPolicyConfig {
pub daily: u32,
pub weekly: u32,
pub monthly: u32,
pub yearly: u32,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum SourceDescriptor {

View File

@ -1,5 +1,73 @@
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node};
use crate::pile::{Pile, RawPile};
use crate::tree::{differentiate_node_in_place, integrate_node_in_place};
use anyhow::anyhow;
use log::info;
pub mod checking;
pub mod cleanup;
pub mod extracting;
pub mod legacy_pushpull;
pub mod storing;
pub fn remove_pointer_safely<P: RawPile>(pile: &Pile<P>, name: &str) -> anyhow::Result<()> {
// retrieve this pointer
let mut this_pointer = pile
.read_pointer(name)?
.ok_or_else(|| anyhow!("Pointer {:?} does not exist so can not be deleted.", name))?;
let mut this_node = retrieve_tree_node(&pile, this_pointer.chunk_ref.clone())?;
let new_parent_name = this_pointer.parent_pointer.clone();
fully_integrate_pointer_node(pile, &mut this_node.node, &mut this_pointer)?;
let new_parent = if let Some(ref new_parent_name) = new_parent_name {
let new_parent_pointer = pile
.read_pointer(new_parent_name.as_str())?
.ok_or_else(|| anyhow!("Parent pointer {:?} does not exist.", name))?;
let new_parent_node = retrieve_tree_node(&pile, new_parent_pointer.chunk_ref.clone())?;
Some((new_parent_pointer, new_parent_node))
} else {
None
};
// now integrate any pointers that rely on this one
// so that they no longer rely on this one.
for pointer in pile.list_pointers()?.iter() {
if pointer == name {
continue;
}
if let Some(mut pointer_data) = pile.read_pointer(pointer.as_str())? {
if let Some(parent_pointer) = pointer_data.parent_pointer.as_ref() {
if parent_pointer == name {
info!("Pointer would be orphaned: {:?}; integrating", pointer);
// need to integrate this node, so retrieve it
let mut node = retrieve_tree_node(&pile, pointer_data.chunk_ref)?;
// integrate it in-place
integrate_node_in_place(&mut node.node, &this_node.node)?;
if let Some((_, ref new_parent_node)) = new_parent {
// then differentiate with respect to the NEW parent
differentiate_node_in_place(&mut node.node, &new_parent_node.node)?;
}
// pass through the parent
pointer_data.parent_pointer = new_parent_name.clone();
// store the updated version of the pointer
let new_chunk_ref = store_tree_node(&pile, &node)?;
// associate the new node with the new version of the pointer
pointer_data.chunk_ref = new_chunk_ref;
// write the pointer back.
pile.write_pointer(pointer.as_str(), &pointer_data)?;
}
}
}
}
// then delete the pointer
pile.delete_pointer(name)?;
info!("Deleted pointer: {:?}", name);
Ok(())
}

View File

@ -185,7 +185,7 @@ pub fn differentiate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow
/// result is in-place.
///
/// Preconditions:
/// - `old` must be an integrated pointer.
/// - `old` must be an integrated pointer. (Otherwise this algorithm is not correct.)
/// - `old` is the parent of `new`
pub fn integrate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow::Result<()> {
if let TreeNode::Directory { children, .. } = new {