Compare commits

..

24 Commits

Author SHA1 Message Date
565c99cf8c Update flake and fix it 2024-05-08 20:41:28 +01:00
b57dbad890 Simplify flake lock
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2023-04-01 16:57:04 +01:00
9001177143 Batch up chunk deletions in an attempt to make vacuuming more performant
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-28 21:03:07 +00:00
c9d64b2962 Make sure to flush + add some error contexts
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-21 21:23:38 +00:00
50ff9bb36a Fix including trailing empty line as pointer name
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-20 22:11:05 +00:00
7e41408815 Add test for incremental backup with mid delete
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
Just for validation that delete does the right thing
2022-11-20 20:58:45 +00:00
4072c5ae82 Fix parent not being integrated before being used to differentiate whilst removing a pointer
Some checks are pending
ci/woodpecker/push/build Pipeline is pending
ci/woodpecker/push/release Pipeline is pending
2022-11-20 20:42:26 +00:00
d3fe111a06 Replace debug rmp with new implementation
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-20 19:44:21 +00:00
6e1e173cb6 Implement datman prune 2022-11-20 19:43:20 +00:00
fcc79ca95d Hopefully fix descriptors to compare in test
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-20 10:02:27 +00:00
c1de1341ef Tweak wording 2022-11-20 10:02:13 +00:00
e85c606c95 Make a no-op compaction really a no-op compaction
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-20 08:57:22 +00:00
34c619ef41 Fix compact thresholds in tests to demonstrate what we need
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-19 17:43:27 +00:00
b9dce3ddfc rustfmt 2022-11-19 17:42:09 +00:00
52202874f2 Update images to remove deprecated ones
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-19 16:47:25 +00:00
69656131af Fix linter
Some checks failed
ci/woodpecker/push/build Pipeline failed
ci/woodpecker/push/release Pipeline was successful
2022-11-19 16:35:36 +00:00
cc93997230 Linting 2022-11-19 16:35:33 +00:00
41248fe396 Add tests for yama compact 2022-11-19 16:35:24 +00:00
e7eb9ef288 Update nix shell to have python 2022-11-19 16:33:14 +00:00
b5e9e55cad Add yama compact command 2022-11-19 15:49:09 +00:00
cf502b7f7e rustfmt 2022-11-19 15:28:36 +00:00
58c5c3f039 Add compaction logic 2022-11-19 15:27:41 +00:00
30b261d172 Add Nix shell for Rust devel 2022-11-19 13:13:19 +00:00
0811c11c48 Add ability to extract subset of files from yama
Some checks are pending
ci/woodpecker/push/build Pipeline is pending
ci/woodpecker/push/release Pipeline is pending
2022-10-04 20:21:23 +01:00
29 changed files with 1411 additions and 151 deletions

2
.envrc Normal file
View File

@ -0,0 +1,2 @@
use nix

View File

@ -5,28 +5,19 @@ platform: linux/amd64
pipeline:
unitTests:
image: "docker.bics.ga/rei_ci/rust-sccache:latest-amd64"
image: "rust:1.65.0"
pull: true
commands:
- DEBIAN_FRONTEND=noninteractive apt-get -qq update > /dev/null
- DEBIAN_FRONTEND=noninteractive apt-get -yqq install pkg-config libssl-dev build-essential libsqlite3-dev > /dev/null
- cargo build --all
- cargo test --all
- sccache --show-stats
environment:
RUSTC_WRAPPER: /usr/local/bin/sccache
SCCACHE_S3_USE_SSL: "true"
SCCACHE_ENDPOINT: "richie.m4.tanukitsu.net:443"
secrets:
- sccache_bucket
- aws_access_key_id
- aws_secret_access_key
when:
event: [push, pull_request]
testSuite:
image: "docker.bics.ga/rei_ci/rust-sccache:latest-amd64"
image: "rust:1.65.0"
commands:
- DEBIAN_FRONTEND=noninteractive apt-get -qq update > /dev/null
- DEBIAN_FRONTEND=noninteractive apt-get -yqq -o=Dpkg::Use-Pty=0 install pkg-config libssl-dev build-essential libsqlite3-dev python3.9 python3.9-venv postgresql postgresql-client mariadb-server mariadb-client zstd lz4 > /dev/null
@ -41,20 +32,11 @@ pipeline:
- python3.9 -m venv testsuite/.venv
- ./testsuite/.venv/bin/pip install ./testsuite ./datman-helper-postgres ./datman-helper-mysql
- cd testsuite && . .venv/bin/activate && TEST_POSTGRES=$(hostname),testsuitedb,root TEST_MYSQL=$(hostname),testsuitemydb,root green
- sccache --show-stats
environment:
RUSTC_WRAPPER: /usr/local/bin/sccache
SCCACHE_S3_USE_SSL: "true"
SCCACHE_ENDPOINT: "richie.m4.tanukitsu.net:443"
secrets:
- sccache_bucket
- aws_access_key_id
- aws_secret_access_key
when:
event: [push, pull_request]
deployManual:
image: "docker.bics.ga/rei_ci/mdbook:latest-amd64"
image: "docker.emunest.net/rei_ci/mdbook:latest-amd64"
pull: true
when:
branch:

View File

@ -39,10 +39,7 @@ def cli():
# The process (if any) that is our LZ4 decompressor.
lz4_process = None
dump_command = [
"pg_dump",
database_to_use
]
dump_command = ["pg_dump", database_to_use]
if host_to_use is not None:
if use_lz4:
@ -63,21 +60,19 @@ def cli():
# (rather than lz4 covering it).
command = [
"ssh",
f"{user_to_use}@{host_to_use}" if user_to_use is not None else f"{host_to_use}",
f"{user_to_use}@{host_to_use}"
if user_to_use is not None
else f"{host_to_use}",
"bash",
"-o",
"pipefail",
"-c",
shlex.quote(" ".join(dump_command))
shlex.quote(" ".join(dump_command)),
]
elif user_to_use is not None:
current_username = pwd.getpwuid(os.getuid()).pw_name
if current_username != user_to_use:
command = [
"sudo",
"-u",
user_to_use
] + dump_command
command = ["sudo", "-u", user_to_use] + dump_command
else:
command = dump_command
else:

View File

@ -28,6 +28,7 @@ use bare_metrics_recorder::recording::BareMetricsRecorderCore;
use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc};
use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination};
use datman::commands::ilabel::interactive_labelling_session;
use datman::commands::prune::{prune_with_retention_policy, RetentionPolicy};
use datman::commands::{init_descriptor, pushpull};
use datman::descriptor::{load_descriptor, SourceDescriptor};
use datman::get_hostname;
@ -137,6 +138,13 @@ pub enum DatmanCommand {
pile_name: String,
},
/// Applies a retention policy by removing unnecessary backups.
/// Does not reclaim space by itself: use
/// `yama check --apply-gc --shallow`
/// & `yama compact`
/// to do that.
Prune { pile_name: String },
#[clap(name = "_pull_responder_offerer")]
InternalPullResponderOfferer {
datman_path: PathBuf,
@ -410,6 +418,24 @@ fn main() -> anyhow::Result<()> {
Box::new(pbar),
)?;
}
DatmanCommand::Prune { pile_name } => {
let descriptor = load_descriptor(Path::new(".")).unwrap();
let retention_policy = descriptor
.retention
.context("No retention policy set in descriptor")?;
let dest_desc = &descriptor.piles[&pile_name];
let pile_desc = load_pile_descriptor(&dest_desc.path)?;
prune_with_retention_policy(
&dest_desc.path,
&pile_desc,
&RetentionPolicy::from_config(retention_policy),
true,
)?;
}
DatmanCommand::InternalPullResponderOfferer {
datman_path,
pile_name,

View File

@ -20,12 +20,13 @@ use std::fs::File;
use std::io::Write;
use std::path::Path;
use crate::descriptor::{Descriptor, SourceDescriptor};
use crate::descriptor::{Descriptor, RetentionPolicyConfig, SourceDescriptor};
pub mod backup;
pub mod extract;
pub mod ibrowse;
pub mod ilabel;
pub mod prune;
pub mod pushpull;
pub mod report;
@ -51,6 +52,12 @@ pub fn init_descriptor(path: &Path) -> anyhow::Result<()> {
sources: source,
piles: Default::default(),
remote_hosts: Default::default(),
retention: Some(RetentionPolicyConfig {
daily: 14,
weekly: 12,
monthly: 24,
yearly: 9001,
}),
})?;
datman_toml_file.write_all(&bytes)?;

View File

@ -0,0 +1,220 @@
use crate::commands::backup::split_pointer_name;
use crate::descriptor::RetentionPolicyConfig;
use anyhow::{bail, Context};
use log::info;
use std::collections::{BTreeMap, BTreeSet};
use std::io;
use std::path::Path;
use yama::commands::open_pile;
use yama::operations::remove_pointer_safely;
use yama::pile::PileDescriptor;
pub struct RetentionBand {
pub interval_s: u64,
pub number_to_retain: u32,
}
pub struct RetentionPolicy {
pub retention_bands: Vec<RetentionBand>,
}
const DAY: u64 = 86400;
const WEEK: u64 = 7 * DAY;
const MONTH: u64 = 31 * DAY;
const YEAR: u64 = 365 * DAY;
impl RetentionPolicy {
pub fn from_config(descriptor: RetentionPolicyConfig) -> RetentionPolicy {
let mut policy = RetentionPolicy {
retention_bands: vec![],
};
if descriptor.daily != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: DAY,
number_to_retain: descriptor.daily,
});
}
if descriptor.weekly != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: WEEK,
number_to_retain: descriptor.weekly,
});
}
if descriptor.monthly != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: MONTH,
number_to_retain: descriptor.monthly,
});
}
if descriptor.yearly != 0 {
policy.retention_bands.push(RetentionBand {
interval_s: YEAR,
number_to_retain: descriptor.yearly,
});
}
policy
}
/// Returns the set of snapshots to remove.
pub fn apply_returning_prunable(
&self,
snapshots_by_unix_time: BTreeMap<u64, String>,
) -> BTreeSet<String> {
if snapshots_by_unix_time.is_empty() {
return BTreeSet::new();
}
let mut snapshots_included: BTreeSet<u64> = BTreeSet::new();
// Always mark the most recent snapshot as retained!
let last_snapshot = snapshots_by_unix_time.keys().rev().next().unwrap();
snapshots_included.insert(*last_snapshot);
let now_time = *last_snapshot;
for band in &self.retention_bands {
for multiple in 1..=band.number_to_retain {
let target_time = now_time - (multiple as u64) * band.interval_s;
if let Some((k, _)) = snapshots_by_unix_time.range(0..=target_time).rev().next() {
snapshots_included.insert(*k);
}
}
}
// Find all prunable (unincluded) snapshots.
snapshots_by_unix_time
.into_iter()
.filter(|(k, _v)| !snapshots_included.contains(k))
.map(|(_k, v)| v)
.collect()
}
}
pub fn prune_with_retention_policy(
pile_path: &Path,
pile_desc: &PileDescriptor,
policy: &RetentionPolicy,
prompt_first: bool,
) -> anyhow::Result<()> {
let pile = open_pile(&pile_path, &pile_desc).context("Failed to open pile")?;
let pointers = pile
.list_pointers()
.context("Failed to list pointers in pile")?;
let mut pointers_to_keep: BTreeSet<String> = pointers.iter().cloned().collect();
let pointers_to_remove = get_prunable_pointers(&policy, pointers);
for remove in &pointers_to_remove {
pointers_to_keep.remove(remove);
}
info!("Gory details:\n---\nKeep: {pointers_to_keep:?}\n---\nRemove: {pointers_to_remove:?}");
info!(
"{} pointers to remove ({} to keep) based on retention policy.",
pointers_to_remove.len(),
pointers_to_keep.len()
);
if prompt_first {
println!("Would you like to proceed? [y/N]: ");
let mut buffer = String::new();
let stdin = io::stdin(); // We get `Stdin` here.
stdin.read_line(&mut buffer)?;
if buffer.trim().to_ascii_lowercase() != "y" {
bail!("Aborted by user.");
}
}
for to_remove in pointers_to_remove {
let res = remove_pointer_safely(&pile, &to_remove).context("removing prunable pointers");
pile.flush()
.context("flushing pile after removing pointers")?;
res?;
}
Ok(())
}
fn get_prunable_pointers(policy: &RetentionPolicy, pointers: Vec<String>) -> BTreeSet<String> {
let mut split_pointers_by_name: BTreeMap<String, BTreeMap<u64, String>> = BTreeMap::new();
for pointer in pointers {
let (name, datetime) = if let Some(x) = split_pointer_name(&pointer) {
x
} else {
continue;
};
split_pointers_by_name
.entry(name)
.or_default()
.insert(datetime.timestamp().try_into().unwrap(), pointer);
}
let mut pointers_to_remove = BTreeSet::new();
for (_pointer_base_name, ts_to_pointer) in split_pointers_by_name {
let to_remove = policy.apply_returning_prunable(ts_to_pointer);
pointers_to_remove.extend(to_remove);
}
pointers_to_remove
}
#[cfg(test)]
mod test {
use crate::commands::prune::{get_prunable_pointers, RetentionPolicy};
use crate::descriptor::RetentionPolicyConfig;
#[test]
fn test_prunable_pointers() {
let pointers = vec![
"alice+2022-09-28_05:00:00",
"alice+2022-09-28_02:00:00",
"alice+2022-09-21_05:00:00",
"alice+2022-09-14_05:00:00",
"alice+2022-09-08_05:00:00",
"alice+2022-09-07_05:00:00",
"alice+2022-09-01_05:00:00",
"bob+2022-09-28_06:00:00",
"bob+2022-09-28_03:00:00",
"bob+2022-09-21_06:00:00",
"bob+2022-09-14_06:00:00",
"bob+2022-09-08_06:00:00",
"bob+2022-09-07_06:00:00",
"bob+2022-09-01_06:00:00",
]
.into_iter()
.map(|s| s.to_owned())
.collect();
let policy = RetentionPolicy::from_config(RetentionPolicyConfig {
daily: 0,
weekly: 3,
monthly: 0,
yearly: 0,
});
assert_eq!(
get_prunable_pointers(&policy, pointers)
.into_iter()
.collect::<Vec<_>>(),
vec![
"alice+2022-09-01_05:00:00",
"alice+2022-09-08_05:00:00",
"alice+2022-09-28_02:00:00",
"bob+2022-09-01_06:00:00",
"bob+2022-09-08_06:00:00",
"bob+2022-09-28_03:00:00",
]
);
}
}

View File

@ -38,6 +38,10 @@ pub struct Descriptor {
pub piles: HashMap<String, DestPileDescriptor>,
pub remote_hosts: HashMap<String, RemoteHostDescriptor>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub retention: Option<RetentionPolicyConfig>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
@ -46,6 +50,14 @@ pub struct RemoteHostDescriptor {
pub path_to_datman: Option<String>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct RetentionPolicyConfig {
pub daily: u32,
pub weekly: u32,
pub monthly: u32,
pub yearly: u32,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum SourceDescriptor {

147
flake.lock generated
View File

@ -1,12 +1,15 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1653893745,
"narHash": "sha256-0jntwV3Z8//YwuOjzhV2sgJJPt+HY6KhU7VZUL0fKZQ=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "1ed9fb1935d260de5fe1c2f7ee0ebaae17ed2fa1",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -17,14 +20,16 @@
},
"naersk": {
"inputs": {
"nixpkgs": "nixpkgs"
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1653413650,
"narHash": "sha256-wojDHjb+eU80MPH+3HQaK0liUy8EgR95rvmCl24i58Y=",
"lastModified": 1662220400,
"narHash": "sha256-9o2OGQqu4xyLZP9K6kNe1pTHnyPz0Wr3raGYnr9AIgY=",
"owner": "nix-community",
"repo": "naersk",
"rev": "69daaceebe12c070cd5ae69ba38f277bbf033695",
"rev": "6944160c19cb591eb85bbf9b2f2768a935623ed3",
"type": "github"
},
"original": {
@ -33,60 +38,58 @@
"type": "github"
}
},
"nix-github-actions": {
"inputs": {
"nixpkgs": [
"poetry2nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1703863825,
"narHash": "sha256-rXwqjtwiGKJheXB43ybM8NwWB8rO2dSRrEqes0S7F5Y=",
"owner": "nix-community",
"repo": "nix-github-actions",
"rev": "5163432afc817cf8bd1f031418d1869e4c9d5547",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nix-github-actions",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1653738054,
"narHash": "sha256-IaR8iLN4Ms3f5EjU1CJkXSc49ZzyS5qv03DtVAti6/s=",
"lastModified": 1714971268,
"narHash": "sha256-IKwMSwHj9+ec660l+I4tki/1NRoeGpyA2GdtdYpAgEw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "17b62c338f2a0862a58bb6951556beecd98ccda9",
"rev": "27c13997bf450a01219899f5a83bd6ffbfc70d3c",
"type": "github"
},
"original": {
"id": "nixpkgs",
"ref": "nixos-23.11",
"type": "indirect"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1653738054,
"narHash": "sha256-IaR8iLN4Ms3f5EjU1CJkXSc49ZzyS5qv03DtVAti6/s=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "17b62c338f2a0862a58bb6951556beecd98ccda9",
"type": "github"
},
"original": {
"id": "nixpkgs",
"type": "indirect"
}
},
"nixpkgs_3": {
"locked": {
"lastModified": 1654176133,
"narHash": "sha256-XhjUlU+q9LPFM8Z7X3h504vS2FUToCyKhaf5hVF6nsw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "63f15db5291aec276924d907d3e083e74d68e8b9",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"type": "github"
}
},
"poetry2nix": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs_3"
"nix-github-actions": "nix-github-actions",
"nixpkgs": [
"nixpkgs"
],
"systems": "systems_2",
"treefmt-nix": "treefmt-nix"
},
"locked": {
"lastModified": 1653561148,
"narHash": "sha256-JzAttqACdvMOTwkzkJ0jFF8MWIo8Uau4w/XUMyqpnd8=",
"lastModified": 1715017507,
"narHash": "sha256-RN2Vsba56PfX02DunWcZYkMLsipp928h+LVAWMYmbZg=",
"owner": "nix-community",
"repo": "poetry2nix",
"rev": "3b01c3e3dc57d511848d8433153ab67db79640e1",
"rev": "e6b36523407ae6a7a4dfe29770c30b3a3563b43a",
"type": "github"
},
"original": {
@ -98,18 +101,68 @@
"root": {
"inputs": {
"naersk": "naersk",
"nixpkgs": "nixpkgs_2",
"nixpkgs": "nixpkgs",
"poetry2nix": "poetry2nix",
"utils": "utils"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"id": "systems",
"type": "indirect"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"poetry2nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1714058656,
"narHash": "sha256-Qv4RBm4LKuO4fNOfx9wl40W2rBbv5u5m+whxRYUMiaA=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "c6aaf729f34a36c445618580a9f95a48f5e4e03f",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
},
"utils": {
"locked": {
"lastModified": 1652776076,
"narHash": "sha256-gzTw/v1vj4dOVbpBSJX4J0DwUR6LIyXo7/SuuTJp1kM=",
"lastModified": 1659877975,
"narHash": "sha256-zllb8aq3YO3h8B/U0/J1WBgAL8EX5yWf5pMj3G0NAmc=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "04c1b180862888302ddfb2e3ad9eaa63afc60cf8",
"rev": "c0e246b9b83f637f4681389ecabcb2681b4f3af0",
"type": "github"
},
"original": {

View File

@ -3,13 +3,21 @@
inputs = {
utils.url = "github:numtide/flake-utils";
naersk.url = "github:nix-community/naersk";
poetry2nix.url = "github:nix-community/poetry2nix";
naersk = {
url = "github:nix-community/naersk";
inputs.nixpkgs.follows = "nixpkgs";
};
nixpkgs.url = "nixpkgs/nixos-23.11";
poetry2nix = {
url = "github:nix-community/poetry2nix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, utils, naersk, poetry2nix }:
utils.lib.eachDefaultSystem (system: let
pkgs = nixpkgs.legacyPackages."${system}";
inherit (poetry2nix.lib.mkPoetry2Nix { inherit pkgs; }) mkPoetryApplication;
naersk-lib = naersk.lib."${system}";
rustComponents = naersk-lib.buildPackage {
@ -18,16 +26,16 @@
buildInputs = with pkgs; [
openssl
pkgconfig
pkg-config
sqlite
];
};
mysqlHelper = pkgs.poetry2nix.mkPoetryApplication {
mysqlHelper = mkPoetryApplication {
projectDir = ./datman-helper-mysql;
};
postgresHelper = pkgs.poetry2nix.mkPoetryApplication {
postgresHelper = mkPoetryApplication {
projectDir = ./datman-helper-postgres;
};

View File

@ -4,7 +4,7 @@ if [ $# -ge 1 ]
then
files=$*
else
files="testsuite/setup.py testsuite/datmantests testsuite/helpers testsuite/yamatests datman-helper-postgres/datman_helper_postgres datman-helper-postgres/setup.py datman-helper-mysql/datman_helper_mysql datman-helper-mysql/setup.py"
files="testsuite/setup.py testsuite/datmantests testsuite/helpers testsuite/yamatests datman-helper-postgres/datman_helper_postgres datman-helper-mysql/datman_helper_mysql"
fi
echo "Linting these locations: $files"

50
shell.nix Normal file
View File

@ -0,0 +1,50 @@
{ pkgs ? import <nixpkgs> {} }:
let
# We may need some packages from nixpkgs-unstable
#unstable = import <nixpkgs-unstable> {};
rust-toolchain = pkgs.symlinkJoin {
name = "rust-toolchain";
paths = [pkgs.rustc pkgs.cargo pkgs.rustfmt pkgs.rustPlatform.rustcSrc];
};
in
pkgs.mkShell {
buildInputs = [
rust-toolchain
pkgs.pkg-config
pkgs.alsa-lib
pkgs.sqlite
#pkgs.libclang # ??
];
nativeBuildInputs = [
pkgs.openssl
pkgs.python3
];
# Needed for bindgen when binding to avahi
LIBCLANG_PATH="${pkgs.llvmPackages_latest.libclang.lib}/lib";
# Cargo culted:
# Add to rustc search path
RUSTFLAGS = (builtins.map (a: ''-L ${a}/lib'') [
]);
# Add to bindgen search path
BINDGEN_EXTRA_CLANG_ARGS =
# Includes with normal include path
(builtins.map (a: ''-I"${a}/include"'') [
])
# Includes with special directory paths
++ [
''-I"${pkgs.llvmPackages_latest.libclang.lib}/lib/clang/${pkgs.llvmPackages_latest.libclang.version}/include"''
#''-I"${pkgs.glib.dev}/include/glib-2.0"''
#''-I${pkgs.glib.out}/lib/glib-2.0/include/''
];
}

View File

@ -251,7 +251,8 @@ kind = {{ stdout = "blahblah.txt" }}
seed = 7555
print(f"seed: {seed}")
rng.seed(seed)
# min_files is 8 because we need enough files to use each label for this test to succeed.
# min_files is 8 because we need enough files to use each label for this
# test to succeed.
initial_descriptor, _ = generate_random_dir(rng, src_path, 32, min_files=8)
labellings = generate_labels(initial_descriptor, rng)
save_labelling_rules(labelling_path.joinpath("srca.zst"), labellings)
@ -298,3 +299,81 @@ kind = {{ stdout = "blahblah.txt" }}
)
td.cleanup()
def test_backup_incremental_with_mid_delete(self):
td = TemporaryDirectory("test_backup_incremental_with_mid_delete")
tdpath = Path(td.name)
datman_path = tdpath.joinpath("datman")
src_path = datman_path.joinpath("srca")
yama_path = datman_path.joinpath("main")
set_up_simple_datman(datman_path)
set_up_simple_yama(yama_path)
rng = Random()
seed = rng.randint(0, 9001)
print(f"seed: {seed}")
rng.seed(seed)
initial_descriptor, _ = generate_random_dir(rng, src_path, 32)
print("storing")
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
# now mutate and store incremental
randomly_mutate_directory_in_descriptor(initial_descriptor, src_path, rng)
time.sleep(2)
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
# now mutate and store incremental again!
randomly_mutate_directory_in_descriptor(initial_descriptor, src_path, rng)
mutated_descriptor = scan_dir(src_path)
time.sleep(2)
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
pointer_names = [
line
for line in subprocess.check_output(("yama", "debug", "lsp"), cwd=yama_path)
.decode()
.split("\n")
if line
]
self.assertEqual(len(pointer_names), 3)
self.assertLess(pointer_names[0], pointer_names[1])
self.assertLess(pointer_names[1], pointer_names[2])
print(f"removing mid pointer {pointer_names[1]}")
subprocess.check_call(
("yama", "debug", "rmp", pointer_names[1]),
cwd=yama_path,
)
print("extracting last pointer to check still valid")
dest_path = tdpath.joinpath("desta")
subprocess.check_call(
(
"datman",
"extract",
"--skip-metadata",
"--accept-partial",
"main",
"../desta",
),
cwd=datman_path,
)
# this will be wrapped in a directory that starts with the name srca+
extracted_dir_descriptor_wrapper = scan_dir(dest_path)
contents = extracted_dir_descriptor_wrapper.contents
self.assertEqual(len(contents), 1)
key, value = next(iter(contents.items()))
self.assertTrue(key.startswith("srca+"))
self.assertIsInstance(value, DirectoryDescriptor)
key, value = next(iter(value.contents.items()))
self.assertEqual(key, "srca")
self.assertEqual(value.ignore_metadata(), mutated_descriptor.ignore_metadata())
td.cleanup()

View File

@ -1,6 +1,7 @@
import shutil
import subprocess
from pathlib import Path
from typing import Set
def set_up_simple_yama(path: Path):
@ -10,3 +11,13 @@ def set_up_simple_yama(path: Path):
"example_zstd.dict"
)
shutil.copyfile(example_zstd_path, path.joinpath("important_zstd.dict"))
def list_bloblog_ids(pile: Path) -> Set[int]:
result = set()
for p in pile.joinpath("bloblog").iterdir():
try:
result.add(int(p.name))
except ValueError:
pass
return result

View File

@ -22,7 +22,7 @@ REQUIRED = ["green", "attrs", "immutabledict"]
# What packages are optional?
EXTRAS = {"dev": ["black==21.7b0", "flake8==3.9.2", "isort==5.9.2"]}
EXTRAS = {"dev": ["black==22.10.0", "flake8==3.9.2", "isort==5.9.2"]}
# The rest you shouldn't have to touch too much :)
# ------------------------------------------------

View File

@ -0,0 +1,175 @@
import subprocess
from pathlib import Path
from random import Random
from tempfile import TemporaryDirectory
from unittest import TestCase
from helpers import (
DirectoryDescriptor,
generate_random_dir,
randomly_mutate_directory_in_descriptor,
scan_dir,
)
from helpers.datman_helpers import set_up_simple_datman
from helpers.yama_helpers import list_bloblog_ids, set_up_simple_yama
class TestYamaCompact(TestCase):
def test_compaction_merge_two_small_bloblogs(self):
td = TemporaryDirectory("test_check_fails_after_random_corruption")
tdpath = Path(td.name)
datman_path = tdpath.joinpath("datman")
src_path = datman_path.joinpath("srca")
yama_path = datman_path.joinpath("main")
set_up_simple_datman(datman_path)
set_up_simple_yama(yama_path)
rng = Random()
seed = rng.randint(0, 9001)
print(f"seed: {seed}")
rng.seed(seed)
later_expected_descriptor, _ = generate_random_dir(rng, src_path, 32)
# Back up twice: that way we should get at least two bloblogs!
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
old_bloblog_ids = list_bloblog_ids(yama_path)
self.assertGreater(
len(old_bloblog_ids), 1, "Should be many bloblogs at this point"
)
subprocess.check_call(
(
"yama",
"compact",
"--mergeable",
"2",
"--small",
str(2 * 1024 * 1024 * 1024),
),
cwd=yama_path,
)
new_bloblog_ids = list_bloblog_ids(yama_path)
self.assertEqual(
len(new_bloblog_ids), 1, "Should only be 1 bloblog at this point."
)
self.assertEqual(
list(new_bloblog_ids)[0],
max(old_bloblog_ids) + 1,
"New bloblog ID should be 1 greater than the max old one.",
)
def test_gc_then_compact(self):
td = TemporaryDirectory("test_gc_then_compact")
tdpath = Path(td.name)
datman_path = tdpath.joinpath("datman")
src_path = datman_path.joinpath("srca")
yama_path = datman_path.joinpath("main")
set_up_simple_datman(datman_path)
set_up_simple_yama(yama_path)
rng = Random()
seed = rng.randint(0, 9001)
print(f"seed: {seed}")
rng.seed(seed)
initial_descriptor, _ = generate_random_dir(rng, src_path, 32)
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
orig_pointer_name = (
subprocess.check_output(("yama", "debug", "lsp"), cwd=yama_path)
.decode()
.split("\n")[0]
)
randomly_mutate_directory_in_descriptor(initial_descriptor, src_path, rng)
mutated_descriptor = scan_dir(src_path)
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
old_bloblog_ids = list_bloblog_ids(yama_path)
# Try a GC and check that it's a no-op
subprocess.check_call(
("yama", "check", "--shallow", "--apply-gc"), cwd=yama_path
)
subprocess.check_call(
(
"yama",
"compact",
"--mergeable",
"2000",
"--reclaim",
"1",
"--max-dealloc",
"1",
),
cwd=yama_path,
)
unchanged_bloblog_ids = list_bloblog_ids(yama_path)
self.assertEqual(
old_bloblog_ids,
unchanged_bloblog_ids,
"No GC: no compaction should have happened.",
)
subprocess.check_call(
("yama", "debug", "rmp", orig_pointer_name), cwd=yama_path
)
# Try a GC and check that it did something
subprocess.check_call(
("yama", "check", "--shallow", "--apply-gc"), cwd=yama_path
)
subprocess.check_call(
(
"yama",
"compact",
"--mergeable",
"2000",
"--reclaim",
"1",
"--max-dealloc",
"1",
),
cwd=yama_path,
)
new_bloblog_ids = list_bloblog_ids(yama_path)
self.assertNotEqual(
old_bloblog_ids, new_bloblog_ids, "GC: compaction should have happened."
)
# Check that we can still extract the files!
dest_path = tdpath.joinpath("desta")
subprocess.check_call(
(
"datman",
"extract",
"--skip-metadata",
"--accept-partial",
"main",
"../desta",
),
cwd=datman_path,
)
extracted_dir_descriptor_wrapper = scan_dir(dest_path)
contents = extracted_dir_descriptor_wrapper.contents
self.assertEqual(len(contents), 1)
key, value = next(iter(contents.items()))
self.assertTrue(key.startswith("srca+"))
self.assertIsInstance(value, DirectoryDescriptor)
key, value = next(iter(value.contents.items()))
self.assertEqual(key, "srca")
self.assertEqual(value.ignore_metadata(), mutated_descriptor.ignore_metadata())
td.cleanup()

View File

@ -29,7 +29,8 @@ use yama::operations::checking::VacuumMode;
use yama::operations::legacy_pushpull::{
determine_bypass_level, open_pile_with_work_bypass, push_to,
};
use yama::operations::{checking, extracting};
use yama::operations::{checking, cleanup, extracting};
use yama::pile::local_sqlitebloblogs::CompactionThresholds;
use yama::pile::{Pile, PileDescriptor, RawPile};
use yama::{commands, debug};
@ -56,8 +57,9 @@ enum PileCommand {
pointer_name: String,
/// Limited expression(s) of files to retrieve.
/// LIMITATION OF CURRENT VERSION: ONLY ONE EXACT PATH ALLOWED, PLEASE.
#[clap(short, long)]
subset: Vec<PathBuf>,
subset: Option<String>,
destination: PathBuf,
@ -82,6 +84,29 @@ enum PileCommand {
shallow: bool,
},
Compact {
/// Don't actually perform any compaction; just plan it out.
#[clap(long)]
dry_run: bool,
/// Allocated size under which a bloblog is considered small.
#[clap(long = "small")]
small_thresh: Option<u64>,
/// Minimum amount of space to reclaim in order to run compaction for reclaim.
#[clap(long = "reclaim")]
min_reclaim: Option<u64>,
/// Maximum amount of space that can be deallocated in a bloblog before we consider it
/// worthwhile to replace.
#[clap(long = "max-dealloc")]
max_deallocated: Option<u64>,
/// Minimum number of mergeable small bloblogs in order to run compaction for merge.
#[clap(long)]
mergeable: Option<u32>,
},
/// Enter a debug prompt for manually operating on the yama pile.
Debug { supplied_command: Vec<String> },
@ -118,7 +143,7 @@ fn wrapped_main() -> anyhow::Result<i32> {
match &opts.command {
PileCommand::Retrieve {
pointer_name,
subset: _,
subset,
destination,
num_workers: workers,
} => {
@ -136,10 +161,25 @@ fn wrapped_main() -> anyhow::Result<i32> {
fully_integrate_pointer_node(&pile, &mut root_tree_node.node, &mut pointer)?;
let mut node_to_extract = &mut root_tree_node.node;
if let Some(subset) = subset {
for path_to_descend in subset.split('/').filter(|s| !s.is_empty()) {
match node_to_extract.child(path_to_descend) {
Ok(new_node) => {
node_to_extract = new_node;
}
Err(msg) => {
bail!("Can't descend into {path_to_descend:?}: {msg}");
}
}
}
}
// todo allow disabling apply metadata
extracting::extract(
destination,
&mut root_tree_node.node,
node_to_extract,
&pile,
true,
workers.unwrap_or(2),
@ -175,6 +215,29 @@ fn wrapped_main() -> anyhow::Result<i32> {
return Ok(1);
}
}
PileCommand::Compact {
dry_run,
small_thresh,
min_reclaim,
max_deallocated,
mergeable,
} => {
let this_dir = Path::new(".");
let descriptor =
load_pile_descriptor(this_dir).context("Failed to load pile descriptor")?;
cleanup::compact(
this_dir,
&descriptor,
!*dry_run,
true,
CompactionThresholds {
minimum_to_reclaim: min_reclaim.unwrap_or(2 * 1024 * 1024 * 1024),
minimum_small_bloblogs_to_merge: mergeable.unwrap_or(64),
cond_if_more_deallocated_than: max_deallocated.unwrap_or(256 * 1024 * 1024),
cond_if_less_allocated_than: small_thresh.unwrap_or(64 * 1024 * 1024),
},
)?;
}
PileCommand::Init {} => {
commands::init(".".as_ref())?;
}

View File

@ -15,13 +15,12 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node};
use crate::commands::retrieve_tree_node;
use crate::definitions::{FilesystemOwnership, FilesystemPermissions, TreeNode};
use crate::operations::remove_pointer_safely;
use crate::pile::{Pile, PileDescriptor, RawPile};
use crate::tree::integrate_node_in_place;
use anyhow::anyhow;
use clap::Parser;
use log::info;
use rustyline::error::ReadlineError;
use rustyline::Editor;
@ -123,50 +122,7 @@ pub fn debug_command<RP: RawPile>(
}
}
DebugCommand::DeletePointer { name } => {
// retrieve this pointer
let mut this_pointer = pile.read_pointer(name.as_str())?.ok_or_else(|| {
anyhow!("Pointer {:?} does not exist so can not be deleted.", name)
})?;
let mut this_node = retrieve_tree_node(&pile, this_pointer.chunk_ref.clone())?;
// fully integrate the pointer
fully_integrate_pointer_node(&pile, &mut this_node.node, &mut this_pointer)?;
assert!(this_pointer.parent_pointer.is_none());
// now integrate any pointers that rely on this one
// so that they no longer rely on this one.
for pointer in pile.list_pointers()?.iter() {
if pointer == name {
continue;
}
if let Some(mut pointer_data) = pile.read_pointer(pointer.as_str())? {
if let Some(parent_pointer) = pointer_data.parent_pointer.as_ref() {
if parent_pointer == name {
info!("Pointer is now an orphan: {:?}", pointer);
// need to integrate this node, so retrieve it
let mut node = retrieve_tree_node(&pile, pointer_data.chunk_ref)?;
// integrate it in-place
integrate_node_in_place(&mut node.node, &this_node.node)?;
// mark it as orphaned (no parent)
pointer_data.parent_pointer = None;
// store the orphaned node
let new_chunk_ref = store_tree_node(&pile, &node)?;
// associate the orphaned node with the orphaned pointer
pointer_data.chunk_ref = new_chunk_ref;
// write the pointer back.
pile.write_pointer(pointer.as_str(), &pointer_data)?;
}
}
}
}
// then delete the pointer
pile.delete_pointer(name)?;
info!("Deleted pointer: {:?}", name);
remove_pointer_safely(pile, name)?;
}
DebugCommand::PointerInfo { name } => {
let this_pointer = pile

View File

@ -270,6 +270,19 @@ impl TreeNode {
}
}
}
/// Recurses into a child by name, or returns Err with a reason.
pub fn child(&mut self, name: &str) -> Result<&mut TreeNode, &'static str> {
match self {
TreeNode::NormalFile { .. } => Err("not a directory: normal file"),
TreeNode::Directory { children, .. } => match children.get_mut(name) {
None => Err("child not in directory"),
Some(node) => Ok(node),
},
TreeNode::SymbolicLink { .. } => Err("not a directory: symlink"),
TreeNode::Deleted => Err("not a directory: deleted"),
}
}
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@ -1,4 +1,80 @@
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node};
use crate::pile::{Pile, RawPile};
use crate::tree::{differentiate_node_in_place, integrate_node_in_place};
use anyhow::{anyhow, Context};
use log::info;
pub mod checking;
pub mod cleanup;
pub mod extracting;
pub mod legacy_pushpull;
pub mod storing;
pub fn remove_pointer_safely<P: RawPile>(pile: &Pile<P>, name: &str) -> anyhow::Result<()> {
// retrieve this pointer
let mut this_pointer = pile
.read_pointer(name)?
.ok_or_else(|| anyhow!("Pointer {:?} does not exist so can not be deleted.", name))?;
let mut this_node = retrieve_tree_node(&pile, this_pointer.chunk_ref.clone())
.context("retrieving 'this' node")?;
let new_parent_name = this_pointer.parent_pointer.clone();
fully_integrate_pointer_node(pile, &mut this_node.node, &mut this_pointer)
.context("integrating new parent")?;
let new_parent = if let Some(ref new_parent_name) = new_parent_name {
let mut new_parent_pointer = pile
.read_pointer(new_parent_name.as_str())?
.ok_or_else(|| anyhow!("Parent pointer {:?} does not exist.", name))?;
let mut new_parent_node = retrieve_tree_node(&pile, new_parent_pointer.chunk_ref.clone())?;
fully_integrate_pointer_node(pile, &mut new_parent_node.node, &mut new_parent_pointer)?;
Some((new_parent_pointer, new_parent_node))
} else {
None
};
// now integrate any pointers that rely on this one
// so that they no longer rely on this one.
for pointer in pile.list_pointers()?.iter() {
if pointer == name {
continue;
}
if let Some(mut pointer_data) = pile.read_pointer(pointer.as_str())? {
if let Some(parent_pointer) = pointer_data.parent_pointer.as_ref() {
if parent_pointer == name {
info!("Pointer would be orphaned: {:?}; integrating", pointer);
// need to integrate this node, so retrieve it
let mut node = retrieve_tree_node(&pile, pointer_data.chunk_ref)?;
// integrate it in-place
integrate_node_in_place(&mut node.node, &this_node.node)?;
if let Some((_, ref new_parent_node)) = new_parent {
// then differentiate with respect to the NEW parent
differentiate_node_in_place(&mut node.node, &new_parent_node.node)?;
}
// pass through the parent
pointer_data.parent_pointer = new_parent_name.clone();
// store the updated version of the pointer
let new_chunk_ref = store_tree_node(&pile, &node)?;
// associate the new node with the new version of the pointer
pointer_data.chunk_ref = new_chunk_ref;
// write the pointer back.
pile.write_pointer(pointer.as_str(), &pointer_data)?;
// we must flush chunks before deleting the pointer
pile.flush()
.context("flushing after writing pointer back")?;
}
}
}
}
// then delete the pointer
pile.delete_pointer(name)?;
info!("Deleted pointer: {:?}", name);
Ok(())
}

View File

@ -24,8 +24,10 @@ use crate::pile::{
use anyhow::bail;
use crossbeam_channel::Sender;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use itertools::Itertools;
use log::{error, info, warn};
use std::collections::HashSet;
use std::convert::TryInto;
use std::io::{Read, Write};
use std::sync::Mutex;
@ -110,6 +112,10 @@ impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
self.underlying.delete(kind, key)
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
self.underlying.delete_many(kind, keys)
}
fn list_keys(
&self,
kind: Keyspace,
@ -407,9 +413,21 @@ pub fn check_shallow<RP: RawPile>(
// actually do the vacuum!
info!("Going to vacuum them up.");
for vacuum_id in to_vacuum {
pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
pbar.inc(1);
for vacuum_ids_chunk in to_vacuum
.into_iter()
.chunks(512)
.into_iter()
.map(|c| c.collect::<Vec<ChunkId>>())
{
pile.raw_pile.delete_many(
Keyspace::Chunk,
vacuum_ids_chunk
.iter()
.map(|ci| ci.as_slice())
.collect::<Vec<&[u8]>>()
.as_slice(),
)?;
pbar.inc(vacuum_ids_chunk.len().try_into().unwrap());
}
pile.flush()?;
pbar.finish_and_clear();

View File

@ -0,0 +1,64 @@
use crate::pile::local_sqlitebloblogs::{CompactionThresholds, SqliteBloblogPile};
use crate::pile::{PileDescriptor, PileStorage};
use anyhow::{bail, Context};
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use log::info;
use std::path::Path;
pub fn compact(
pile_path: &Path,
pile_desc: &PileDescriptor,
actually_run: bool,
make_progress_bar: bool,
thresholds: CompactionThresholds,
) -> anyhow::Result<()> {
let pbar = if make_progress_bar {
ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10))
} else {
ProgressBar::hidden()
};
pbar.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
);
pbar.set_message("compacting");
match pile_desc.storage {
PileStorage::SqliteIndexedBloblog => {
let bloblog_pile = SqliteBloblogPile::open(&pile_path)
.context("Failed to open SQLite-indexed Bloblog Pile")?;
compact_bloblogs(bloblog_pile, pbar, actually_run, thresholds)?;
Ok(())
}
other @ PileStorage::RemoteOnly => {
bail!("Cannot use compaction on this kind of pile: {other:?}!");
}
}
}
fn compact_bloblogs(
bloblog_pile: SqliteBloblogPile,
pbar: ProgressBar,
actually_run: bool,
thresholds: CompactionThresholds,
) -> anyhow::Result<()> {
info!("=== Analysing for compaction ===");
let analysis = bloblog_pile.analyse_for_compaction()?;
let chunks_total: u64 = analysis.values().map(|bs| bs.chunks_total).sum();
let chunks_deleted: u64 = analysis.values().map(|bs| bs.chunks_deleted).sum();
let bytes_total: u64 = analysis.values().map(|bs| bs.bytes_total).sum();
let bytes_deleted: u64 = analysis.values().map(|bs| bs.bytes_deleted).sum();
info!("{} bloblogs in this pile, with {chunks_total} chunks ({bytes_total} B) of which {chunks_deleted} ({bytes_deleted} B) are deleted.", analysis.len());
info!("=== Planning compaction ===");
let plan = bloblog_pile.plan_compaction(&thresholds, analysis)?;
info!("Planned compaction: replace {} bloblogs (of which {} are small), freeing up {} B and rewriting {} B", plan.bloblogs_to_replace.len(), plan.small_bloblogs, plan.reclaimable_space, plan.bytes_to_write);
if actually_run {
info!("=== Compacting ===");
bloblog_pile.perform_compaction(Box::new(pbar), plan)?;
}
Ok(())
}

View File

@ -141,6 +141,7 @@ pub trait RawPile: Send + Sync + Debug + 'static {
fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>>;
fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()>;
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()>;
fn delete_many(&self, kind: Keyspace, key: &[&[u8]]) -> anyhow::Result<()>;
fn list_keys(
&self,
kind: Keyspace,
@ -186,6 +187,9 @@ impl RawPile for Box<dyn RawPile> {
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
self.as_ref().delete(kind, key)
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
self.as_ref().delete_many(kind, keys)
}
fn list_keys(
&self,
kind: Keyspace,
@ -233,6 +237,9 @@ impl<RP: RawPile> RawPile for Arc<RP> {
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
self.as_ref().delete(kind, key)
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
self.as_ref().delete_many(kind, keys)
}
fn list_keys(
&self,
kind: Keyspace,

View File

@ -83,6 +83,10 @@ impl<R: Clone + RawPile> RawPile for PileGuard<R> {
bail!("Access denied");
}
fn delete_many(&self, _kind: Keyspace, _keys: &[&[u8]]) -> anyhow::Result<()> {
bail!("Access denied");
}
fn list_keys(
&self,
_kind: Keyspace,

View File

@ -278,6 +278,10 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
self.underlying.delete(kind, key)
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
self.underlying.delete_many(kind, keys)
}
fn list_keys(
&self,
kind: Keyspace,

View File

@ -101,6 +101,10 @@ impl<R: RawPile> RawPile for RawPileEncryptor<R> {
self.underlying.delete(kind, key)
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
self.underlying.delete_many(kind, keys)
}
fn list_keys(
&self,
kind: Keyspace,

View File

@ -98,6 +98,10 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
self.underlying.delete(kind, key)
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
self.underlying.delete_many(kind, keys)
}
fn list_keys(
&self,
kind: Keyspace,

View File

@ -15,19 +15,23 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::{HashMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::fs::{read_dir, File, OpenOptions};
use std::fs::{read_dir, remove_file, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use std::{fs, thread};
use anyhow::{bail, Context};
use anyhow::{bail, ensure, Context};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use crossbeam_channel::{Receiver, Sender};
use log::{info, warn};
use nix::unistd::sync;
use rusqlite::{params, Error, ErrorCode, Transaction};
use rusqlite::ffi::ErrorCode::ConstraintViolation;
use rusqlite::{params, Error, ErrorCode, Transaction, TransactionBehavior, NO_PARAMS};
use rusqlite::{Connection, OptionalExtension};
use crate::definitions::ChunkId;
@ -35,10 +39,8 @@ use crate::pile::{
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
StoragePipelineSettings,
};
use crate::progress::ProgressTracker;
use crate::utils::{bytes_to_hexstring, LruMap};
use crossbeam_channel::{Receiver, Sender};
use rusqlite::ffi::ErrorCode::ConstraintViolation;
use std::time::Duration;
/// Bloblogs will not be reused if they are already 2 GiB large.
pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
@ -49,6 +51,11 @@ pub const POINTER_WRITE_BATCHES: usize = 2048;
/// This many bloblogs will be kept open for reading, at maximum.
pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128;
/// Size of a blob header within a bloblog.
/// 32 byte Chunk Id
/// 4 byte (u32) Blob size
pub const BLOB_HEADER_SIZE: u64 = 32 + 4;
/// A file storing a log of blobs.
/// Format:
/// Repeated:
@ -371,6 +378,31 @@ impl SqliteBloblogPile {
.optional()?)
}
fn get_chunk_pointers(
&self,
chunk_ids: &[&[u8]],
) -> anyhow::Result<Vec<Option<BloblogPointer>>> {
let mut inner = self.inner.lock().unwrap();
let txn = inner.connection.transaction()?;
let mut result = Vec::with_capacity(chunk_ids.len());
{
let mut stmt = txn.prepare("SELECT bloblog, offset FROM chunks WHERE chunk_id = ?1")?;
for &chunk_id in chunk_ids {
let bloglog_pointer: Option<BloblogPointer> = stmt
.query_row(params![chunk_id], |row| {
Ok(BloblogPointer {
bloblog: row.get(0)?,
offset: row.get::<_, i64>(1)? as u64,
})
})
.optional()?;
result.push(bloglog_pointer);
}
}
txn.commit()?;
Ok(result)
}
fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64...");
@ -496,6 +528,341 @@ impl SqliteBloblogPile {
assert!(pointers_buffered.is_empty());
Ok(())
}
/// Look at the bloblogs in this pile and see where space may be reclaimable if we were to
/// compact.
///
/// Next step: plan_compaction
pub fn analyse_for_compaction(&self) -> anyhow::Result<BTreeMap<BloblogId, BloblogStats>> {
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let mut stmt = txn.prepare(
"
SELECT bloblog, COUNT(c.offset), COUNT(d.offset), SUM(COALESCE(d.size, 0))
FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
GROUP BY bloblog
",
)?;
struct UnpopulatedBloblogStats {
pub bloblog_id: BloblogId,
pub chunks_total: u64,
pub chunks_deleted: u64,
pub bytes_deleted: u64,
}
let unpopul_bloblog_stats = stmt.query_map(NO_PARAMS, |row| {
Ok(UnpopulatedBloblogStats {
bloblog_id: row.get(0)?,
chunks_total: row.get::<_, i64>(1)?.try_into().expect("i64 -> u64"),
chunks_deleted: row.get::<_, i64>(2)?.try_into().expect("i64 -> u64"),
bytes_deleted: row.get::<_, i64>(3)?.try_into().expect("i64 -> u64"),
})
})?;
let mut final_stats = BTreeMap::new();
for unpopul_stat in unpopul_bloblog_stats {
let UnpopulatedBloblogStats {
bloblog_id,
chunks_total,
chunks_deleted,
bytes_deleted,
} = unpopul_stat?;
let bloblog_path = self.path.join(&bloblog_id.to_string());
let bytes_total = std::fs::metadata(&bloblog_path)
.with_context(|| format!("Failed to get metadata for bloblog: {:?}", bloblog_path))?
.size();
final_stats.insert(
bloblog_id,
BloblogStats {
chunks_total,
chunks_deleted,
bytes_total,
// Add a slight correction since we can count the blob headers of deleted blobs
// as deleted.
bytes_deleted: bytes_deleted + chunks_deleted * BLOB_HEADER_SIZE,
},
);
}
Ok(final_stats)
}
/// Look at the analysis of compaction and, using the specified thresholds, come up with a plan
/// to perform compaction.
///
/// May return an empty plan if compaction isn't worthwhile.
///
/// Previous step: analyse_for_compaction
/// Next step: perform_compaction
pub fn plan_compaction(
&self,
thresholds: &CompactionThresholds,
analysis: BTreeMap<BloblogId, BloblogStats>,
) -> anyhow::Result<CompactionPlan> {
let bloblogs_to_replace: BTreeMap<BloblogId, BloblogStats> = analysis
.into_iter()
.filter(|(_id, stats)| thresholds.should_replace_bloblog(stats))
.collect();
let reclaimable_space: u64 = bloblogs_to_replace
.values()
.map(|bs| bs.bytes_deleted)
.sum();
let bytes_to_write: u64 = bloblogs_to_replace
.values()
.map(|bs| bs.bytes_total - bs.bytes_deleted)
.sum();
let small_bloblogs: u32 = bloblogs_to_replace
.values()
.filter(|bs| bs.bytes_total - bs.bytes_deleted < thresholds.cond_if_less_allocated_than)
.count() as u32;
if reclaimable_space < thresholds.minimum_to_reclaim
&& small_bloblogs < thresholds.minimum_small_bloblogs_to_merge
{
// Nothing worth doing: return an empty plan.
return Ok(CompactionPlan {
bloblogs_to_replace: Default::default(),
bytes_to_write: 0,
reclaimable_space: 0,
small_bloblogs: 0,
});
}
Ok(CompactionPlan {
bloblogs_to_replace: bloblogs_to_replace.keys().copied().collect(),
bytes_to_write,
reclaimable_space,
small_bloblogs,
})
}
/// Given a compaction plan, perform the compaction.
/// There shouldn't be any decisions left to be made at this point: just action.
///
/// TODO flock the bloblogs to be removed and make readers and writers also flock them too.
///
/// TODO find a way to deal with bloblogs that are entirely unreferenced from the index
/// (e.g. bloblogs that weren't written properly, e.g. if compaction fails.)
pub fn perform_compaction(
&self,
mut progress: Box<dyn ProgressTracker>,
plan: CompactionPlan,
) -> anyhow::Result<()> {
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
struct ReplacedBlobRow {
pub old_bloblog: BloblogId,
pub old_offset: u64,
pub chunk_id: ChunkId,
}
if plan.bloblogs_to_replace.is_empty() {
info!("No compaction to be done.");
return Ok(());
}
let mut to_preserve = BTreeSet::new();
let mut replacements = BTreeMap::new();
progress.set_max_size(plan.bytes_to_write);
// First find all the blobs we need to replace.
{
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let mut stmt = txn.prepare(
"
SELECT chunk_id, c.offset
FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
WHERE bloblog = ?1 AND d.offset IS NULL
",
)?;
for bloblog in plan.bloblogs_to_replace.iter().copied() {
to_preserve.extend(
stmt.query_map([bloblog], |row| {
let mut chunk_id = ChunkId::default();
chunk_id.copy_from_slice(row.get::<_, Vec<u8>>(0).unwrap().as_slice());
Ok(ReplacedBlobRow {
old_bloblog: bloblog,
chunk_id,
old_offset: row.get::<_, i64>(1).unwrap().try_into().unwrap(),
})
})?
.collect::<Result<Vec<ReplacedBlobRow>, _>>()?,
);
}
}
// Then make the replacements
info!("Rewriting bloblogs...");
let mut buf = Vec::new();
let mut iterator = to_preserve.into_iter();
loop {
let (new_bloblog_id, bloglog_mutex) = self.get_writing_bloblog()?;
let mut new_bloblog = bloglog_mutex.lock().expect("Failed to lock bloblog?");
let mut is_more = false;
while let Some(preserve) = iterator.next() {
is_more = true;
// Get hold of the old bloblog
let old_bloblog = self.open_bloblog(preserve.old_bloblog)?;
let mut old_bloblog = old_bloblog.lock().unwrap();
// Transfer the blob
buf.clear();
old_bloblog.read_blob(preserve.old_offset, &preserve.chunk_id, &mut buf)?;
let new_offset = new_bloblog.write_blob(&preserve.chunk_id, &buf)?;
// Make a note of the replacement
replacements.insert(
preserve,
BloblogPointer {
bloblog: new_bloblog_id,
offset: new_offset,
},
);
progress.inc_progress(buf.len() as u64);
if new_bloblog.filesize()? > MAX_BLOBLOG_REUSE_SIZE {
// get a new bloblog to write with.
break;
}
}
drop(new_bloblog);
self.return_writing_bloblog(new_bloblog_id, bloglog_mutex)?;
if !is_more {
break;
}
}
info!("Applying replacements...");
{
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let mut stmt = txn.prepare(
"
UPDATE chunks
SET bloblog = ?1, offset = ?2
WHERE chunk_id = ?3
",
)?;
for (replacement_row, new_pos) in replacements {
ensure!(
stmt.execute(params![
new_pos.bloblog,
new_pos.offset as i64,
&replacement_row.chunk_id as &[u8]
])? == 1,
"Wrong number of rows updated for replacement!"
);
}
drop(stmt);
txn.commit().context("committing replacements")?;
}
// TODO fsync new bloblogs
info!("Deleting old bloblogs...");
{
let mut inner = self.inner.lock().unwrap();
// Lock the database right away.
let txn = inner
.connection
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
for bloblog_id in plan.bloblogs_to_replace.iter().copied() {
let deleted_chunks = txn.execute(
"
DELETE FROM chunks WHERE bloblog = ?1
",
params![bloblog_id],
)?;
let deleted_deleted = txn.execute(
"
DELETE FROM deleted WHERE bloblog = ?1
",
params![bloblog_id],
)?;
ensure!(deleted_chunks == deleted_deleted, "Undeleted chunks left in bloblog {bloblog_id}: CHUNKS={deleted_chunks} DELETED={deleted_deleted}");
let bloblog_path = self.path.join(bloblog_id.to_string());
remove_file(&bloblog_path).with_context(|| {
format!("Failed to remove obsolete bloblog: {:?}", bloblog_path)
})?;
}
txn.commit()?;
}
Ok(())
}
}
pub struct BloblogStats {
pub chunks_total: u64,
pub chunks_deleted: u64,
pub bytes_total: u64,
pub bytes_deleted: u64,
}
pub struct CompactionPlan {
pub bloblogs_to_replace: BTreeSet<BloblogId>,
pub bytes_to_write: u64,
pub reclaimable_space: u64,
pub small_bloblogs: u32,
}
pub struct CompactionThresholds {
/// Minimum bytes to be reclaimable overall for compaction to be worthwhile.
pub minimum_to_reclaim: u64,
/// (alternative reason) Minimum number of files to be undersized in order for compaction
/// to be worthwhile.
/// This gives us a way to make compaction run if we have lots of tiny bloblogs.
pub minimum_small_bloblogs_to_merge: u32,
/// A bloblog will be replaced if the deallocated size is greater than this.
pub cond_if_more_deallocated_than: u64,
/// A bloblog will be replaced if the allocated size is less than this.
pub cond_if_less_allocated_than: u64,
}
impl CompactionThresholds {
pub fn should_replace_bloblog(&self, bloblog_stats: &BloblogStats) -> bool {
let allocated = bloblog_stats.bytes_total - bloblog_stats.bytes_deleted;
// Note that this will also trigger for fully-deallocated files if
let is_small = allocated < self.cond_if_less_allocated_than;
let has_large_deallocations =
bloblog_stats.bytes_deleted > self.cond_if_more_deallocated_than;
is_small || has_large_deallocations
}
}
pub struct CompactionOutcome {
pub bloblogs_deleted: u32,
pub bloblogs_created: u32,
pub bytes_deleted: u32,
pub bytes_created: u32,
}
impl Drop for SqliteBloblogPile {
@ -640,6 +1007,59 @@ impl RawPile for SqliteBloblogPile {
}
}
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
match kind {
Keyspace::Chunk => {
let mut chunk_pointers_by_bloblog: BTreeMap<BloblogId, Vec<(u64, &[u8])>> =
BTreeMap::new();
for (chunk_pointer, chunk_id) in self
.get_chunk_pointers(keys)
.context("failed to get chunk pointers")?
.into_iter()
.zip(keys)
.filter_map(|(pointer, &chunk_id)| match pointer {
Some(pointer) => Some((pointer, chunk_id)),
None => None,
})
{
chunk_pointers_by_bloblog
.entry(chunk_pointer.bloblog)
.or_default()
.push((chunk_pointer.offset, chunk_id));
}
let mut inner = self.inner.lock().unwrap();
let txn = inner.connection.transaction()?;
{
let mut stmt = txn.prepare(
"INSERT OR IGNORE INTO deleted (bloblog, offset, size)
VALUES (?1, ?2, ?3)",
)?;
for (bloblog_id, entries) in chunk_pointers_by_bloblog {
let bloblog_mutex = self.open_bloblog(bloblog_id)?;
let mut bloblog = bloblog_mutex.lock().unwrap();
for (chunk_offset, raw_chunk_id) in entries {
let mut chunk_id: ChunkId = Default::default();
chunk_id.copy_from_slice(raw_chunk_id);
let size = bloblog.blob_len(chunk_offset, &chunk_id)?;
let offset_i64 = i64::try_from(chunk_offset)
.expect("ouch! can't turn u64 into i64...");
stmt.execute(params![bloblog_id, offset_i64, size])?;
}
}
}
txn.commit().context("Failed to commit chunk deletions")?;
}
_ => {
for &key in keys {
self.delete(kind, key)?;
}
}
}
Ok(())
}
fn list_keys(
&self,
kind: Keyspace,
@ -809,9 +1229,10 @@ impl Iterator for KeyIterator {
#[cfg(test)]
mod tests {
use crate::pile::local_sqlitebloblogs::Bloblog;
use temp_dir::TempDir;
use crate::pile::local_sqlitebloblogs::Bloblog;
#[test]
pub fn bloblog_read_write_test() {
let td = TempDir::new().unwrap();

View File

@ -307,6 +307,12 @@ impl RawPile for Requester {
other => Err(anyhow!("Received {:?} for Delete", other)),
}
}
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
for &key in keys {
self.delete(kind, key)?;
}
Ok(())
}
fn list_keys(
&self,
kind: Keyspace,

View File

@ -185,7 +185,7 @@ pub fn differentiate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow
/// result is in-place.
///
/// Preconditions:
/// - `old` must be an integrated pointer.
/// - `old` must be an integrated pointer. (Otherwise this algorithm is not correct.)
/// - `old` is the parent of `new`
pub fn integrate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow::Result<()> {
if let TreeNode::Directory { children, .. } = new {