Compare commits
32 Commits
v0.6.0-alp
...
develop
Author | SHA1 | Date | |
---|---|---|---|
565c99cf8c | |||
b57dbad890 | |||
9001177143 | |||
c9d64b2962 | |||
50ff9bb36a | |||
7e41408815 | |||
4072c5ae82 | |||
d3fe111a06 | |||
6e1e173cb6 | |||
fcc79ca95d | |||
c1de1341ef | |||
e85c606c95 | |||
34c619ef41 | |||
b9dce3ddfc | |||
52202874f2 | |||
69656131af | |||
cc93997230 | |||
41248fe396 | |||
e7eb9ef288 | |||
b5e9e55cad | |||
cf502b7f7e | |||
58c5c3f039 | |||
30b261d172 | |||
0811c11c48 | |||
aa2722607e | |||
8612804298 | |||
080875bfce | |||
098895d913 | |||
bd5e18bc9f | |||
e25e92b273 | |||
4aa1948350 | |||
ee9ca73224 |
@ -5,28 +5,19 @@ platform: linux/amd64
|
||||
|
||||
pipeline:
|
||||
unitTests:
|
||||
image: "docker.bics.ga/rei_ci/rust-sccache:latest-amd64"
|
||||
image: "rust:1.65.0"
|
||||
pull: true
|
||||
commands:
|
||||
- DEBIAN_FRONTEND=noninteractive apt-get -qq update > /dev/null
|
||||
- DEBIAN_FRONTEND=noninteractive apt-get -yqq install pkg-config libssl-dev build-essential libsqlite3-dev > /dev/null
|
||||
- cargo build --all
|
||||
- cargo test --all
|
||||
- sccache --show-stats
|
||||
environment:
|
||||
RUSTC_WRAPPER: /usr/local/bin/sccache
|
||||
SCCACHE_S3_USE_SSL: "true"
|
||||
SCCACHE_ENDPOINT: "richie.m4.tanukitsu.net:443"
|
||||
secrets:
|
||||
- sccache_bucket
|
||||
- aws_access_key_id
|
||||
- aws_secret_access_key
|
||||
when:
|
||||
event: [push, pull_request]
|
||||
|
||||
|
||||
testSuite:
|
||||
image: "docker.bics.ga/rei_ci/rust-sccache:latest-amd64"
|
||||
image: "rust:1.65.0"
|
||||
commands:
|
||||
- DEBIAN_FRONTEND=noninteractive apt-get -qq update > /dev/null
|
||||
- DEBIAN_FRONTEND=noninteractive apt-get -yqq -o=Dpkg::Use-Pty=0 install pkg-config libssl-dev build-essential libsqlite3-dev python3.9 python3.9-venv postgresql postgresql-client mariadb-server mariadb-client zstd lz4 > /dev/null
|
||||
@ -41,20 +32,11 @@ pipeline:
|
||||
- python3.9 -m venv testsuite/.venv
|
||||
- ./testsuite/.venv/bin/pip install ./testsuite ./datman-helper-postgres ./datman-helper-mysql
|
||||
- cd testsuite && . .venv/bin/activate && TEST_POSTGRES=$(hostname),testsuitedb,root TEST_MYSQL=$(hostname),testsuitemydb,root green
|
||||
- sccache --show-stats
|
||||
environment:
|
||||
RUSTC_WRAPPER: /usr/local/bin/sccache
|
||||
SCCACHE_S3_USE_SSL: "true"
|
||||
SCCACHE_ENDPOINT: "richie.m4.tanukitsu.net:443"
|
||||
secrets:
|
||||
- sccache_bucket
|
||||
- aws_access_key_id
|
||||
- aws_secret_access_key
|
||||
when:
|
||||
event: [push, pull_request]
|
||||
|
||||
deployManual:
|
||||
image: "docker.bics.ga/rei_ci/mdbook:latest-amd64"
|
||||
image: "docker.emunest.net/rei_ci/mdbook:latest-amd64"
|
||||
pull: true
|
||||
when:
|
||||
branch:
|
||||
|
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -404,7 +404,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datman"
|
||||
version = "0.6.0-alpha.4"
|
||||
version = "0.6.0-alpha.5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-interner",
|
||||
@ -1733,7 +1733,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "yama"
|
||||
version = "0.6.0-alpha.4"
|
||||
version = "0.6.0-alpha.5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"blake",
|
||||
|
@ -39,10 +39,7 @@ def cli():
|
||||
# The process (if any) that is our LZ4 decompressor.
|
||||
lz4_process = None
|
||||
|
||||
dump_command = [
|
||||
"pg_dump",
|
||||
database_to_use
|
||||
]
|
||||
dump_command = ["pg_dump", database_to_use]
|
||||
|
||||
if host_to_use is not None:
|
||||
if use_lz4:
|
||||
@ -63,21 +60,19 @@ def cli():
|
||||
# (rather than lz4 covering it).
|
||||
command = [
|
||||
"ssh",
|
||||
f"{user_to_use}@{host_to_use}" if user_to_use is not None else f"{host_to_use}",
|
||||
f"{user_to_use}@{host_to_use}"
|
||||
if user_to_use is not None
|
||||
else f"{host_to_use}",
|
||||
"bash",
|
||||
"-o",
|
||||
"pipefail",
|
||||
"-c",
|
||||
shlex.quote(" ".join(dump_command))
|
||||
shlex.quote(" ".join(dump_command)),
|
||||
]
|
||||
elif user_to_use is not None:
|
||||
current_username = pwd.getpwuid(os.getuid()).pw_name
|
||||
if current_username != user_to_use:
|
||||
command = [
|
||||
"sudo",
|
||||
"-u",
|
||||
user_to_use
|
||||
] + dump_command
|
||||
command = ["sudo", "-u", user_to_use] + dump_command
|
||||
else:
|
||||
command = dump_command
|
||||
else:
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "datman"
|
||||
version = "0.6.0-alpha.4"
|
||||
version = "0.6.0-alpha.5"
|
||||
authors = ["Olivier 'reivilibre' <olivier@librepush.net>"]
|
||||
edition = "2021"
|
||||
repository = "https://bics.ga/reivilibre/yama"
|
||||
@ -30,7 +30,7 @@ humansize = "1.1.1"
|
||||
chrono = "0.4.19"
|
||||
itertools = "0.10.1"
|
||||
hostname = "0.3.1"
|
||||
yama = { path = "../yama", version = "0.6.0-alpha.1" }
|
||||
yama = { path = "../yama", version = "0.6.0-alpha.5" }
|
||||
metrics = "0.17.1"
|
||||
bare-metrics-recorder = { version = "0.1.0" }
|
||||
comfy-table = "6.0.0-rc.1"
|
||||
|
@ -8,5 +8,6 @@ Features:
|
||||
* (optional) Compression using Zstd and a specifiable dictionary
|
||||
* (optional) Encryption
|
||||
* Ability to back up to remote machines over SSH
|
||||
* Labelling of files in a backup source; different destinations can choose to backup either all or a subset of the labels.
|
||||
|
||||
See the documentation for more information.
|
||||
|
@ -28,6 +28,7 @@ use bare_metrics_recorder::recording::BareMetricsRecorderCore;
|
||||
use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc};
|
||||
use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination};
|
||||
use datman::commands::ilabel::interactive_labelling_session;
|
||||
use datman::commands::prune::{prune_with_retention_policy, RetentionPolicy};
|
||||
use datman::commands::{init_descriptor, pushpull};
|
||||
use datman::descriptor::{load_descriptor, SourceDescriptor};
|
||||
use datman::get_hostname;
|
||||
@ -137,6 +138,13 @@ pub enum DatmanCommand {
|
||||
pile_name: String,
|
||||
},
|
||||
|
||||
/// Applies a retention policy by removing unnecessary backups.
|
||||
/// Does not reclaim space by itself: use
|
||||
/// `yama check --apply-gc --shallow`
|
||||
/// & `yama compact`
|
||||
/// to do that.
|
||||
Prune { pile_name: String },
|
||||
|
||||
#[clap(name = "_pull_responder_offerer")]
|
||||
InternalPullResponderOfferer {
|
||||
datman_path: PathBuf,
|
||||
@ -410,6 +418,24 @@ fn main() -> anyhow::Result<()> {
|
||||
Box::new(pbar),
|
||||
)?;
|
||||
}
|
||||
|
||||
DatmanCommand::Prune { pile_name } => {
|
||||
let descriptor = load_descriptor(Path::new(".")).unwrap();
|
||||
let retention_policy = descriptor
|
||||
.retention
|
||||
.context("No retention policy set in descriptor")?;
|
||||
let dest_desc = &descriptor.piles[&pile_name];
|
||||
|
||||
let pile_desc = load_pile_descriptor(&dest_desc.path)?;
|
||||
|
||||
prune_with_retention_policy(
|
||||
&dest_desc.path,
|
||||
&pile_desc,
|
||||
&RetentionPolicy::from_config(retention_policy),
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
|
||||
DatmanCommand::InternalPullResponderOfferer {
|
||||
datman_path,
|
||||
pile_name,
|
||||
|
@ -20,12 +20,13 @@ use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::descriptor::{Descriptor, SourceDescriptor};
|
||||
use crate::descriptor::{Descriptor, RetentionPolicyConfig, SourceDescriptor};
|
||||
|
||||
pub mod backup;
|
||||
pub mod extract;
|
||||
pub mod ibrowse;
|
||||
pub mod ilabel;
|
||||
pub mod prune;
|
||||
pub mod pushpull;
|
||||
pub mod report;
|
||||
|
||||
@ -51,6 +52,12 @@ pub fn init_descriptor(path: &Path) -> anyhow::Result<()> {
|
||||
sources: source,
|
||||
piles: Default::default(),
|
||||
remote_hosts: Default::default(),
|
||||
retention: Some(RetentionPolicyConfig {
|
||||
daily: 14,
|
||||
weekly: 12,
|
||||
monthly: 24,
|
||||
yearly: 9001,
|
||||
}),
|
||||
})?;
|
||||
|
||||
datman_toml_file.write_all(&bytes)?;
|
||||
|
@ -17,7 +17,9 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor, VirtualSourceKind};
|
||||
use crate::get_hostname;
|
||||
use crate::labelling::{label_node, load_labelling_rules, str_to_label, Label, State};
|
||||
use crate::labelling::{
|
||||
label_node, load_labelling_rules, str_to_label, Label, LabellingRules, State,
|
||||
};
|
||||
use crate::tree::{scan, FileTree, FileTree1};
|
||||
use anyhow::{anyhow, bail};
|
||||
use arc_interner::ArcIntern;
|
||||
@ -76,8 +78,8 @@ pub fn open_stdout_backup_process(
|
||||
pub fn label_filter_and_convert(
|
||||
tree: FileTree1<()>,
|
||||
descriptor: &Descriptor,
|
||||
desc_path: &Path,
|
||||
source_name: &str,
|
||||
rules: &LabellingRules,
|
||||
dest: &DestPileDescriptor,
|
||||
) -> anyhow::Result<Option<TreeNode>> {
|
||||
info!("Labelling.");
|
||||
@ -87,8 +89,7 @@ pub fn label_filter_and_convert(
|
||||
.iter()
|
||||
.map(|l| Label(ArcIntern::new(l.clone())))
|
||||
.collect();
|
||||
let rules = load_labelling_rules(desc_path, source_name)?;
|
||||
label_node("".to_owned(), None, &mut tree, &labels, &rules)?;
|
||||
label_node("".to_owned(), None, &mut tree, &labels, rules)?;
|
||||
|
||||
let included_labels: HashSet<Label> = dest.included_labels.iter().map(str_to_label).collect();
|
||||
|
||||
@ -150,16 +151,20 @@ pub fn backup_source_to_destination<PT: ProgressTracker>(
|
||||
cross_filesystems,
|
||||
} => {
|
||||
info!("Looking to backup {} to {}", source_name, dest_name);
|
||||
let rules = load_labelling_rules(desc_path, source_name)?;
|
||||
let exclusions = rules.get_exclusions_set(directory);
|
||||
|
||||
info!("Scanning.");
|
||||
let tree = scan(directory, !*cross_filesystems)?
|
||||
let tree = scan(directory, !*cross_filesystems, &exclusions)?
|
||||
.ok_or_else(|| anyhow!("Source does not exist."))?;
|
||||
|
||||
let absolute_source_path = desc_path.join(directory);
|
||||
let absolute_dest_path = desc_path.join(&dest.path);
|
||||
let pile_descriptor = load_pile_descriptor(&absolute_dest_path)?;
|
||||
let pile = open_pile(&absolute_dest_path, &pile_descriptor)?;
|
||||
|
||||
let root = if let Some(root) =
|
||||
label_filter_and_convert(tree, descriptor, desc_path, source_name, dest)?
|
||||
label_filter_and_convert(tree, descriptor, source_name, &rules, dest)?
|
||||
{
|
||||
root
|
||||
} else {
|
||||
|
@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{anyhow, bail};
|
||||
@ -80,7 +81,7 @@ pub fn session(path: &Path, source_name: String) -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
println!("Scanning source; this might take a little while...");
|
||||
let mut dir_scan: FileTree1<Option<State>> = scan(directory, one_filesystem)?
|
||||
let mut dir_scan: FileTree1<Option<State>> = scan(directory, one_filesystem, &BTreeSet::new())?
|
||||
.ok_or_else(|| anyhow!("Empty source."))?
|
||||
.replace_meta(&None);
|
||||
|
||||
|
@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::io;
|
||||
use std::io::{StdinLock, Stdout, Write};
|
||||
use std::path::Path;
|
||||
@ -192,7 +193,7 @@ pub fn interactive_labelling_session(path: &Path, source_name: String) -> anyhow
|
||||
let my_hostname = get_hostname();
|
||||
let mut dir_scan = if &my_hostname == hostname {
|
||||
info!("Scanning source; this might take a little while...");
|
||||
scan(directory, !*cross_filesystems)?
|
||||
scan(directory, !*cross_filesystems, &BTreeSet::new())?
|
||||
.ok_or_else(|| anyhow!("Empty source."))?
|
||||
.replace_meta(&None)
|
||||
} else {
|
||||
@ -212,6 +213,7 @@ pub fn interactive_labelling_session(path: &Path, source_name: String) -> anyhow
|
||||
&mut write,
|
||||
directory.as_ref(),
|
||||
!*cross_filesystems,
|
||||
&BTreeSet::new(),
|
||||
)?
|
||||
.ok_or_else(|| anyhow!("Remote scan failed (does the directory exist?)"))?
|
||||
.replace_meta(&None);
|
||||
|
220
datman/src/commands/prune.rs
Normal file
220
datman/src/commands/prune.rs
Normal file
@ -0,0 +1,220 @@
|
||||
use crate::commands::backup::split_pointer_name;
|
||||
use crate::descriptor::RetentionPolicyConfig;
|
||||
use anyhow::{bail, Context};
|
||||
use log::info;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use yama::commands::open_pile;
|
||||
use yama::operations::remove_pointer_safely;
|
||||
use yama::pile::PileDescriptor;
|
||||
|
||||
pub struct RetentionBand {
|
||||
pub interval_s: u64,
|
||||
pub number_to_retain: u32,
|
||||
}
|
||||
|
||||
pub struct RetentionPolicy {
|
||||
pub retention_bands: Vec<RetentionBand>,
|
||||
}
|
||||
|
||||
const DAY: u64 = 86400;
|
||||
const WEEK: u64 = 7 * DAY;
|
||||
const MONTH: u64 = 31 * DAY;
|
||||
const YEAR: u64 = 365 * DAY;
|
||||
|
||||
impl RetentionPolicy {
|
||||
pub fn from_config(descriptor: RetentionPolicyConfig) -> RetentionPolicy {
|
||||
let mut policy = RetentionPolicy {
|
||||
retention_bands: vec![],
|
||||
};
|
||||
|
||||
if descriptor.daily != 0 {
|
||||
policy.retention_bands.push(RetentionBand {
|
||||
interval_s: DAY,
|
||||
number_to_retain: descriptor.daily,
|
||||
});
|
||||
}
|
||||
|
||||
if descriptor.weekly != 0 {
|
||||
policy.retention_bands.push(RetentionBand {
|
||||
interval_s: WEEK,
|
||||
number_to_retain: descriptor.weekly,
|
||||
});
|
||||
}
|
||||
|
||||
if descriptor.monthly != 0 {
|
||||
policy.retention_bands.push(RetentionBand {
|
||||
interval_s: MONTH,
|
||||
number_to_retain: descriptor.monthly,
|
||||
});
|
||||
}
|
||||
|
||||
if descriptor.yearly != 0 {
|
||||
policy.retention_bands.push(RetentionBand {
|
||||
interval_s: YEAR,
|
||||
number_to_retain: descriptor.yearly,
|
||||
});
|
||||
}
|
||||
|
||||
policy
|
||||
}
|
||||
|
||||
/// Returns the set of snapshots to remove.
|
||||
pub fn apply_returning_prunable(
|
||||
&self,
|
||||
snapshots_by_unix_time: BTreeMap<u64, String>,
|
||||
) -> BTreeSet<String> {
|
||||
if snapshots_by_unix_time.is_empty() {
|
||||
return BTreeSet::new();
|
||||
}
|
||||
let mut snapshots_included: BTreeSet<u64> = BTreeSet::new();
|
||||
|
||||
// Always mark the most recent snapshot as retained!
|
||||
let last_snapshot = snapshots_by_unix_time.keys().rev().next().unwrap();
|
||||
snapshots_included.insert(*last_snapshot);
|
||||
|
||||
let now_time = *last_snapshot;
|
||||
|
||||
for band in &self.retention_bands {
|
||||
for multiple in 1..=band.number_to_retain {
|
||||
let target_time = now_time - (multiple as u64) * band.interval_s;
|
||||
if let Some((k, _)) = snapshots_by_unix_time.range(0..=target_time).rev().next() {
|
||||
snapshots_included.insert(*k);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Find all prunable (unincluded) snapshots.
|
||||
snapshots_by_unix_time
|
||||
.into_iter()
|
||||
.filter(|(k, _v)| !snapshots_included.contains(k))
|
||||
.map(|(_k, v)| v)
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prune_with_retention_policy(
|
||||
pile_path: &Path,
|
||||
pile_desc: &PileDescriptor,
|
||||
policy: &RetentionPolicy,
|
||||
prompt_first: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let pile = open_pile(&pile_path, &pile_desc).context("Failed to open pile")?;
|
||||
|
||||
let pointers = pile
|
||||
.list_pointers()
|
||||
.context("Failed to list pointers in pile")?;
|
||||
|
||||
let mut pointers_to_keep: BTreeSet<String> = pointers.iter().cloned().collect();
|
||||
|
||||
let pointers_to_remove = get_prunable_pointers(&policy, pointers);
|
||||
|
||||
for remove in &pointers_to_remove {
|
||||
pointers_to_keep.remove(remove);
|
||||
}
|
||||
|
||||
info!("Gory details:\n---\nKeep: {pointers_to_keep:?}\n---\nRemove: {pointers_to_remove:?}");
|
||||
info!(
|
||||
"{} pointers to remove ({} to keep) based on retention policy.",
|
||||
pointers_to_remove.len(),
|
||||
pointers_to_keep.len()
|
||||
);
|
||||
|
||||
if prompt_first {
|
||||
println!("Would you like to proceed? [y/N]: ");
|
||||
let mut buffer = String::new();
|
||||
let stdin = io::stdin(); // We get `Stdin` here.
|
||||
stdin.read_line(&mut buffer)?;
|
||||
if buffer.trim().to_ascii_lowercase() != "y" {
|
||||
bail!("Aborted by user.");
|
||||
}
|
||||
}
|
||||
|
||||
for to_remove in pointers_to_remove {
|
||||
let res = remove_pointer_safely(&pile, &to_remove).context("removing prunable pointers");
|
||||
|
||||
pile.flush()
|
||||
.context("flushing pile after removing pointers")?;
|
||||
|
||||
res?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_prunable_pointers(policy: &RetentionPolicy, pointers: Vec<String>) -> BTreeSet<String> {
|
||||
let mut split_pointers_by_name: BTreeMap<String, BTreeMap<u64, String>> = BTreeMap::new();
|
||||
|
||||
for pointer in pointers {
|
||||
let (name, datetime) = if let Some(x) = split_pointer_name(&pointer) {
|
||||
x
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
split_pointers_by_name
|
||||
.entry(name)
|
||||
.or_default()
|
||||
.insert(datetime.timestamp().try_into().unwrap(), pointer);
|
||||
}
|
||||
|
||||
let mut pointers_to_remove = BTreeSet::new();
|
||||
|
||||
for (_pointer_base_name, ts_to_pointer) in split_pointers_by_name {
|
||||
let to_remove = policy.apply_returning_prunable(ts_to_pointer);
|
||||
|
||||
pointers_to_remove.extend(to_remove);
|
||||
}
|
||||
|
||||
pointers_to_remove
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::commands::prune::{get_prunable_pointers, RetentionPolicy};
|
||||
use crate::descriptor::RetentionPolicyConfig;
|
||||
|
||||
#[test]
|
||||
fn test_prunable_pointers() {
|
||||
let pointers = vec![
|
||||
"alice+2022-09-28_05:00:00",
|
||||
"alice+2022-09-28_02:00:00",
|
||||
"alice+2022-09-21_05:00:00",
|
||||
"alice+2022-09-14_05:00:00",
|
||||
"alice+2022-09-08_05:00:00",
|
||||
"alice+2022-09-07_05:00:00",
|
||||
"alice+2022-09-01_05:00:00",
|
||||
"bob+2022-09-28_06:00:00",
|
||||
"bob+2022-09-28_03:00:00",
|
||||
"bob+2022-09-21_06:00:00",
|
||||
"bob+2022-09-14_06:00:00",
|
||||
"bob+2022-09-08_06:00:00",
|
||||
"bob+2022-09-07_06:00:00",
|
||||
"bob+2022-09-01_06:00:00",
|
||||
]
|
||||
.into_iter()
|
||||
.map(|s| s.to_owned())
|
||||
.collect();
|
||||
let policy = RetentionPolicy::from_config(RetentionPolicyConfig {
|
||||
daily: 0,
|
||||
weekly: 3,
|
||||
monthly: 0,
|
||||
yearly: 0,
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
get_prunable_pointers(&policy, pointers)
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
"alice+2022-09-01_05:00:00",
|
||||
"alice+2022-09-08_05:00:00",
|
||||
"alice+2022-09-28_02:00:00",
|
||||
"bob+2022-09-01_06:00:00",
|
||||
"bob+2022-09-08_06:00:00",
|
||||
"bob+2022-09-28_03:00:00",
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
// Push and Pull support for Datman
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use log::info;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use yama::chunking::RecursiveUnchunker;
|
||||
use yama::commands::retrieve_tree_node;
|
||||
use yama::definitions::{ChunkId, PointerData, RecursiveChunkRef, TreeNode};
|
||||
@ -183,9 +185,25 @@ pub fn offering_side<R: Read, W: Write>(
|
||||
drop(chunks_to_offer);
|
||||
drop(chunks_to_skip);
|
||||
|
||||
progress.set_max_size(chunks_to_send.len() as u64);
|
||||
let start_sort_by_hints = Instant::now();
|
||||
let chunks_to_send_with_hints: BTreeSet<(u64, ChunkId)> = chunks_to_send
|
||||
.into_iter()
|
||||
.map(|chunk_id| {
|
||||
pile.raw_pile
|
||||
.chunk_id_transfer_ordering_hint(&chunk_id)
|
||||
.map(|hint| (hint, chunk_id))
|
||||
})
|
||||
.collect::<anyhow::Result<_>>()?;
|
||||
let time_to_sort_by_hints = Instant::now() - start_sort_by_hints;
|
||||
info!(
|
||||
"{} s to sort {} chunks by their hints",
|
||||
time_to_sort_by_hints.as_secs_f32(),
|
||||
chunks_to_send_with_hints.len()
|
||||
);
|
||||
|
||||
progress.set_max_size(chunks_to_send_with_hints.len() as u64);
|
||||
progress.set_current(0);
|
||||
for chunk_id in chunks_to_send {
|
||||
for (_hint, chunk_id) in chunks_to_send_with_hints {
|
||||
let chunk_data = bypass_pile
|
||||
.read(Keyspace::Chunk, &chunk_id)?
|
||||
.context("Chunk vanished")?;
|
||||
|
@ -38,6 +38,10 @@ pub struct Descriptor {
|
||||
pub piles: HashMap<String, DestPileDescriptor>,
|
||||
|
||||
pub remote_hosts: HashMap<String, RemoteHostDescriptor>,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub retention: Option<RetentionPolicyConfig>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
@ -46,6 +50,14 @@ pub struct RemoteHostDescriptor {
|
||||
pub path_to_datman: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
pub struct RetentionPolicyConfig {
|
||||
pub daily: u32,
|
||||
pub weekly: u32,
|
||||
pub monthly: u32,
|
||||
pub yearly: u32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
#[serde(untagged)]
|
||||
pub enum SourceDescriptor {
|
||||
|
@ -15,10 +15,10 @@ You should have received a copy of the GNU General Public License
|
||||
along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use anyhow::Context;
|
||||
@ -222,6 +222,23 @@ impl LabellingRules {
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn get_exclusions_set(&self, base: &Path) -> BTreeSet<PathBuf> {
|
||||
let mut exclusions = BTreeSet::new();
|
||||
|
||||
for (ext_path, state) in &self.position_based_rules {
|
||||
assert!(ext_path.is_empty() || ext_path.starts_with('/'));
|
||||
let full_path = PathBuf::from(format!(
|
||||
"{}{ext_path}",
|
||||
base.to_str().expect("base path must always be utf-8")
|
||||
));
|
||||
if state == &Excluded {
|
||||
exclusions.insert(full_path);
|
||||
}
|
||||
}
|
||||
|
||||
exclusions
|
||||
}
|
||||
}
|
||||
|
||||
/// Uninteractively label the nodes.
|
||||
|
@ -1,11 +1,13 @@
|
||||
use crate::commands::backup::{get_pointer_name_at, label_filter_and_convert};
|
||||
use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor};
|
||||
use crate::labelling::load_labelling_rules;
|
||||
use crate::tree::FileTree;
|
||||
use anyhow::{anyhow, bail};
|
||||
use chrono::Utc;
|
||||
use log::info;
|
||||
use std::collections::BTreeSet;
|
||||
use std::io::{Read, Write};
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use std::sync::Arc;
|
||||
use yama::commands::{load_pile_descriptor, open_pile};
|
||||
@ -48,11 +50,13 @@ pub fn scanning<R: Read, W: Write>(
|
||||
write: &mut W,
|
||||
path: &Path,
|
||||
one_filesystem: bool,
|
||||
exclusions: &BTreeSet<PathBuf>,
|
||||
) -> anyhow::Result<Option<FileTree<(), (), (), ()>>> {
|
||||
info!("Scanning.");
|
||||
write_message(write, &"scan")?;
|
||||
write_message(write, &path)?;
|
||||
write_message(write, &one_filesystem)?;
|
||||
write_message(write, exclusions)?;
|
||||
write.flush()?;
|
||||
let scan_result: Option<FileTree<(), (), (), ()>> = read_message(read)?;
|
||||
|
||||
@ -199,6 +203,9 @@ pub fn backup_remote_source_to_destination<PT: ProgressTracker + Send + 'static>
|
||||
info!("Connecting...");
|
||||
introduction(&mut read, &mut write)?;
|
||||
|
||||
let rules = load_labelling_rules(desc_path, source_name)?;
|
||||
let exclusions = rules.get_exclusions_set(directory);
|
||||
|
||||
// then request to scan
|
||||
info!("Requesting scan... (this may take some time)");
|
||||
let scan_result = scanning(
|
||||
@ -206,11 +213,12 @@ pub fn backup_remote_source_to_destination<PT: ProgressTracker + Send + 'static>
|
||||
&mut write,
|
||||
directory.as_ref(),
|
||||
!*cross_filesystems,
|
||||
&exclusions,
|
||||
)?
|
||||
.ok_or_else(|| anyhow!("Remote scan failed (does the directory exist?)"))?;
|
||||
|
||||
let mut root =
|
||||
label_filter_and_convert(scan_result, descriptor, desc_path, source_name, dest)?
|
||||
label_filter_and_convert(scan_result, descriptor, source_name, &rules, dest)?
|
||||
.ok_or_else(|| anyhow!("Empty filter..."))?;
|
||||
|
||||
let absolute_dest_path = desc_path.join(&dest.path);
|
||||
|
@ -1,6 +1,7 @@
|
||||
// This file implements the responder side of the backup source protocol -- the protocol used
|
||||
// to connect to remote backup sources.
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::io::{stdin, stdout, Read, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@ -43,7 +44,8 @@ pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::R
|
||||
pub fn scanning<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
|
||||
let path: PathBuf = read_message(read)?;
|
||||
let one_filesystem: bool = read_message(read)?;
|
||||
let scan_result = scan(&path, one_filesystem)?;
|
||||
let exclusions: BTreeSet<PathBuf> = read_message(read)?;
|
||||
let scan_result = scan(&path, one_filesystem, &exclusions)?;
|
||||
write_message(write, &scan_result)?;
|
||||
write.flush()?;
|
||||
Ok(())
|
||||
|
@ -15,16 +15,16 @@ You should have received a copy of the GNU General Public License
|
||||
along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt::Debug;
|
||||
use std::fs::{read_link, symlink_metadata, DirEntry, Metadata};
|
||||
use std::io::ErrorKind;
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
|
||||
use log::{info, warn};
|
||||
use log::{debug, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use yama::definitions::FilesystemOwnership;
|
||||
@ -216,14 +216,18 @@ pub fn mtime_msec(metadata: &Metadata) -> u64 {
|
||||
}
|
||||
|
||||
/// Scan the filesystem to produce a Tree, using a default progress bar.
|
||||
pub fn scan(path: &Path, one_filesystem: bool) -> anyhow::Result<Option<FileTree<(), (), (), ()>>> {
|
||||
pub fn scan(
|
||||
path: &Path,
|
||||
one_filesystem: bool,
|
||||
exclusions: &BTreeSet<PathBuf>,
|
||||
) -> anyhow::Result<Option<FileTree<(), (), (), ()>>> {
|
||||
let pbar = ProgressBar::with_draw_target(0, ProgressDrawTarget::stdout_with_hz(2));
|
||||
pbar.set_style(ProgressStyle::default_spinner().template("{spinner} {pos:7} {msg}"));
|
||||
pbar.set_message("dir scan");
|
||||
|
||||
let one_filesystem = if one_filesystem { Some(None) } else { None };
|
||||
|
||||
let result = scan_with_progress_bar(path, &pbar, one_filesystem);
|
||||
let result = scan_with_progress_bar(path, &pbar, one_filesystem, exclusions);
|
||||
pbar.finish_at_current_pos();
|
||||
result
|
||||
}
|
||||
@ -233,7 +237,14 @@ pub fn scan_with_progress_bar(
|
||||
path: &Path,
|
||||
progress_bar: &ProgressBar,
|
||||
mut one_filesystem: Option<Option<u64>>,
|
||||
exclusions: &BTreeSet<PathBuf>,
|
||||
) -> anyhow::Result<Option<FileTree<(), (), (), ()>>> {
|
||||
if exclusions.contains(path) {
|
||||
// Don't enter excluded paths.
|
||||
debug!("Not descending into excluded path: {:?}", path);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let metadata_res = symlink_metadata(path);
|
||||
progress_bar.inc(1);
|
||||
if let Err(e) = &metadata_res {
|
||||
@ -305,7 +316,17 @@ pub fn scan_with_progress_bar(
|
||||
|
||||
for entry in dir_read? {
|
||||
let entry: DirEntry = entry?;
|
||||
let scanned = scan_with_progress_bar(&entry.path(), progress_bar, one_filesystem)?;
|
||||
|
||||
if entry.file_name() == ".datmanskip" {
|
||||
// Directories with .datmanskip in them are to be skipped entirely.
|
||||
// TODO(perf): should this be checked upfront before some children may already
|
||||
// have been scanned?
|
||||
debug!("Skipping {path:?} because it has a .datmanskip file.");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let scanned =
|
||||
scan_with_progress_bar(&entry.path(), progress_bar, one_filesystem, exclusions)?;
|
||||
if let Some(scanned) = scanned {
|
||||
if let Ok(filename) = entry.file_name().into_string() {
|
||||
children.insert(filename, scanned);
|
||||
|
147
flake.lock
generated
147
flake.lock
generated
@ -1,12 +1,15 @@
|
||||
{
|
||||
"nodes": {
|
||||
"flake-utils": {
|
||||
"inputs": {
|
||||
"systems": "systems"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1653893745,
|
||||
"narHash": "sha256-0jntwV3Z8//YwuOjzhV2sgJJPt+HY6KhU7VZUL0fKZQ=",
|
||||
"lastModified": 1710146030,
|
||||
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "1ed9fb1935d260de5fe1c2f7ee0ebaae17ed2fa1",
|
||||
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@ -17,14 +20,16 @@
|
||||
},
|
||||
"naersk": {
|
||||
"inputs": {
|
||||
"nixpkgs": "nixpkgs"
|
||||
"nixpkgs": [
|
||||
"nixpkgs"
|
||||
]
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1653413650,
|
||||
"narHash": "sha256-wojDHjb+eU80MPH+3HQaK0liUy8EgR95rvmCl24i58Y=",
|
||||
"lastModified": 1662220400,
|
||||
"narHash": "sha256-9o2OGQqu4xyLZP9K6kNe1pTHnyPz0Wr3raGYnr9AIgY=",
|
||||
"owner": "nix-community",
|
||||
"repo": "naersk",
|
||||
"rev": "69daaceebe12c070cd5ae69ba38f277bbf033695",
|
||||
"rev": "6944160c19cb591eb85bbf9b2f2768a935623ed3",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@ -33,60 +38,58 @@
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nix-github-actions": {
|
||||
"inputs": {
|
||||
"nixpkgs": [
|
||||
"poetry2nix",
|
||||
"nixpkgs"
|
||||
]
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1703863825,
|
||||
"narHash": "sha256-rXwqjtwiGKJheXB43ybM8NwWB8rO2dSRrEqes0S7F5Y=",
|
||||
"owner": "nix-community",
|
||||
"repo": "nix-github-actions",
|
||||
"rev": "5163432afc817cf8bd1f031418d1869e4c9d5547",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-community",
|
||||
"repo": "nix-github-actions",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1653738054,
|
||||
"narHash": "sha256-IaR8iLN4Ms3f5EjU1CJkXSc49ZzyS5qv03DtVAti6/s=",
|
||||
"lastModified": 1714971268,
|
||||
"narHash": "sha256-IKwMSwHj9+ec660l+I4tki/1NRoeGpyA2GdtdYpAgEw=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "17b62c338f2a0862a58bb6951556beecd98ccda9",
|
||||
"rev": "27c13997bf450a01219899f5a83bd6ffbfc70d3c",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"id": "nixpkgs",
|
||||
"ref": "nixos-23.11",
|
||||
"type": "indirect"
|
||||
}
|
||||
},
|
||||
"nixpkgs_2": {
|
||||
"locked": {
|
||||
"lastModified": 1653738054,
|
||||
"narHash": "sha256-IaR8iLN4Ms3f5EjU1CJkXSc49ZzyS5qv03DtVAti6/s=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "17b62c338f2a0862a58bb6951556beecd98ccda9",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"id": "nixpkgs",
|
||||
"type": "indirect"
|
||||
}
|
||||
},
|
||||
"nixpkgs_3": {
|
||||
"locked": {
|
||||
"lastModified": 1654176133,
|
||||
"narHash": "sha256-XhjUlU+q9LPFM8Z7X3h504vS2FUToCyKhaf5hVF6nsw=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "63f15db5291aec276924d907d3e083e74d68e8b9",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"poetry2nix": {
|
||||
"inputs": {
|
||||
"flake-utils": "flake-utils",
|
||||
"nixpkgs": "nixpkgs_3"
|
||||
"nix-github-actions": "nix-github-actions",
|
||||
"nixpkgs": [
|
||||
"nixpkgs"
|
||||
],
|
||||
"systems": "systems_2",
|
||||
"treefmt-nix": "treefmt-nix"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1653561148,
|
||||
"narHash": "sha256-JzAttqACdvMOTwkzkJ0jFF8MWIo8Uau4w/XUMyqpnd8=",
|
||||
"lastModified": 1715017507,
|
||||
"narHash": "sha256-RN2Vsba56PfX02DunWcZYkMLsipp928h+LVAWMYmbZg=",
|
||||
"owner": "nix-community",
|
||||
"repo": "poetry2nix",
|
||||
"rev": "3b01c3e3dc57d511848d8433153ab67db79640e1",
|
||||
"rev": "e6b36523407ae6a7a4dfe29770c30b3a3563b43a",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@ -98,18 +101,68 @@
|
||||
"root": {
|
||||
"inputs": {
|
||||
"naersk": "naersk",
|
||||
"nixpkgs": "nixpkgs_2",
|
||||
"nixpkgs": "nixpkgs",
|
||||
"poetry2nix": "poetry2nix",
|
||||
"utils": "utils"
|
||||
}
|
||||
},
|
||||
"systems": {
|
||||
"locked": {
|
||||
"lastModified": 1681028828,
|
||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"systems_2": {
|
||||
"locked": {
|
||||
"lastModified": 1681028828,
|
||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"id": "systems",
|
||||
"type": "indirect"
|
||||
}
|
||||
},
|
||||
"treefmt-nix": {
|
||||
"inputs": {
|
||||
"nixpkgs": [
|
||||
"poetry2nix",
|
||||
"nixpkgs"
|
||||
]
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1714058656,
|
||||
"narHash": "sha256-Qv4RBm4LKuO4fNOfx9wl40W2rBbv5u5m+whxRYUMiaA=",
|
||||
"owner": "numtide",
|
||||
"repo": "treefmt-nix",
|
||||
"rev": "c6aaf729f34a36c445618580a9f95a48f5e4e03f",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "numtide",
|
||||
"repo": "treefmt-nix",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"utils": {
|
||||
"locked": {
|
||||
"lastModified": 1652776076,
|
||||
"narHash": "sha256-gzTw/v1vj4dOVbpBSJX4J0DwUR6LIyXo7/SuuTJp1kM=",
|
||||
"lastModified": 1659877975,
|
||||
"narHash": "sha256-zllb8aq3YO3h8B/U0/J1WBgAL8EX5yWf5pMj3G0NAmc=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "04c1b180862888302ddfb2e3ad9eaa63afc60cf8",
|
||||
"rev": "c0e246b9b83f637f4681389ecabcb2681b4f3af0",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
18
flake.nix
18
flake.nix
@ -3,13 +3,21 @@
|
||||
|
||||
inputs = {
|
||||
utils.url = "github:numtide/flake-utils";
|
||||
naersk.url = "github:nix-community/naersk";
|
||||
poetry2nix.url = "github:nix-community/poetry2nix";
|
||||
naersk = {
|
||||
url = "github:nix-community/naersk";
|
||||
inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
nixpkgs.url = "nixpkgs/nixos-23.11";
|
||||
poetry2nix = {
|
||||
url = "github:nix-community/poetry2nix";
|
||||
inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, utils, naersk, poetry2nix }:
|
||||
utils.lib.eachDefaultSystem (system: let
|
||||
pkgs = nixpkgs.legacyPackages."${system}";
|
||||
inherit (poetry2nix.lib.mkPoetry2Nix { inherit pkgs; }) mkPoetryApplication;
|
||||
naersk-lib = naersk.lib."${system}";
|
||||
|
||||
rustComponents = naersk-lib.buildPackage {
|
||||
@ -18,16 +26,16 @@
|
||||
|
||||
buildInputs = with pkgs; [
|
||||
openssl
|
||||
pkgconfig
|
||||
pkg-config
|
||||
sqlite
|
||||
];
|
||||
};
|
||||
|
||||
mysqlHelper = pkgs.poetry2nix.mkPoetryApplication {
|
||||
mysqlHelper = mkPoetryApplication {
|
||||
projectDir = ./datman-helper-mysql;
|
||||
};
|
||||
|
||||
postgresHelper = pkgs.poetry2nix.mkPoetryApplication {
|
||||
postgresHelper = mkPoetryApplication {
|
||||
projectDir = ./datman-helper-postgres;
|
||||
};
|
||||
|
||||
|
@ -4,7 +4,7 @@ if [ $# -ge 1 ]
|
||||
then
|
||||
files=$*
|
||||
else
|
||||
files="testsuite/setup.py testsuite/datmantests testsuite/helpers testsuite/yamatests datman-helper-postgres/datman_helper_postgres datman-helper-postgres/setup.py datman-helper-mysql/datman_helper_mysql datman-helper-mysql/setup.py"
|
||||
files="testsuite/setup.py testsuite/datmantests testsuite/helpers testsuite/yamatests datman-helper-postgres/datman_helper_postgres datman-helper-mysql/datman_helper_mysql"
|
||||
fi
|
||||
|
||||
echo "Linting these locations: $files"
|
||||
|
50
shell.nix
Normal file
50
shell.nix
Normal file
@ -0,0 +1,50 @@
|
||||
{ pkgs ? import <nixpkgs> {} }:
|
||||
|
||||
let
|
||||
# We may need some packages from nixpkgs-unstable
|
||||
#unstable = import <nixpkgs-unstable> {};
|
||||
|
||||
rust-toolchain = pkgs.symlinkJoin {
|
||||
name = "rust-toolchain";
|
||||
paths = [pkgs.rustc pkgs.cargo pkgs.rustfmt pkgs.rustPlatform.rustcSrc];
|
||||
};
|
||||
in
|
||||
|
||||
pkgs.mkShell {
|
||||
|
||||
buildInputs = [
|
||||
rust-toolchain
|
||||
|
||||
pkgs.pkg-config
|
||||
|
||||
pkgs.alsa-lib
|
||||
pkgs.sqlite
|
||||
#pkgs.libclang # ??
|
||||
];
|
||||
|
||||
nativeBuildInputs = [
|
||||
pkgs.openssl
|
||||
pkgs.python3
|
||||
];
|
||||
|
||||
# Needed for bindgen when binding to avahi
|
||||
LIBCLANG_PATH="${pkgs.llvmPackages_latest.libclang.lib}/lib";
|
||||
|
||||
# Cargo culted:
|
||||
# Add to rustc search path
|
||||
RUSTFLAGS = (builtins.map (a: ''-L ${a}/lib'') [
|
||||
]);
|
||||
# Add to bindgen search path
|
||||
BINDGEN_EXTRA_CLANG_ARGS =
|
||||
# Includes with normal include path
|
||||
(builtins.map (a: ''-I"${a}/include"'') [
|
||||
])
|
||||
# Includes with special directory paths
|
||||
++ [
|
||||
''-I"${pkgs.llvmPackages_latest.libclang.lib}/lib/clang/${pkgs.llvmPackages_latest.libclang.version}/include"''
|
||||
#''-I"${pkgs.glib.dev}/include/glib-2.0"''
|
||||
#''-I${pkgs.glib.out}/lib/glib-2.0/include/''
|
||||
];
|
||||
|
||||
|
||||
}
|
@ -251,7 +251,8 @@ kind = {{ stdout = "blahblah.txt" }}
|
||||
seed = 7555
|
||||
print(f"seed: {seed}")
|
||||
rng.seed(seed)
|
||||
# min_files is 8 because we need enough files to use each label for this test to succeed.
|
||||
# min_files is 8 because we need enough files to use each label for this
|
||||
# test to succeed.
|
||||
initial_descriptor, _ = generate_random_dir(rng, src_path, 32, min_files=8)
|
||||
labellings = generate_labels(initial_descriptor, rng)
|
||||
save_labelling_rules(labelling_path.joinpath("srca.zst"), labellings)
|
||||
@ -298,3 +299,81 @@ kind = {{ stdout = "blahblah.txt" }}
|
||||
)
|
||||
|
||||
td.cleanup()
|
||||
|
||||
def test_backup_incremental_with_mid_delete(self):
|
||||
td = TemporaryDirectory("test_backup_incremental_with_mid_delete")
|
||||
tdpath = Path(td.name)
|
||||
|
||||
datman_path = tdpath.joinpath("datman")
|
||||
src_path = datman_path.joinpath("srca")
|
||||
yama_path = datman_path.joinpath("main")
|
||||
|
||||
set_up_simple_datman(datman_path)
|
||||
set_up_simple_yama(yama_path)
|
||||
|
||||
rng = Random()
|
||||
seed = rng.randint(0, 9001)
|
||||
print(f"seed: {seed}")
|
||||
rng.seed(seed)
|
||||
initial_descriptor, _ = generate_random_dir(rng, src_path, 32)
|
||||
|
||||
print("storing")
|
||||
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
|
||||
|
||||
# now mutate and store incremental
|
||||
randomly_mutate_directory_in_descriptor(initial_descriptor, src_path, rng)
|
||||
time.sleep(2)
|
||||
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
|
||||
|
||||
# now mutate and store incremental again!
|
||||
randomly_mutate_directory_in_descriptor(initial_descriptor, src_path, rng)
|
||||
mutated_descriptor = scan_dir(src_path)
|
||||
time.sleep(2)
|
||||
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
|
||||
|
||||
pointer_names = [
|
||||
line
|
||||
for line in subprocess.check_output(("yama", "debug", "lsp"), cwd=yama_path)
|
||||
.decode()
|
||||
.split("\n")
|
||||
if line
|
||||
]
|
||||
self.assertEqual(len(pointer_names), 3)
|
||||
self.assertLess(pointer_names[0], pointer_names[1])
|
||||
self.assertLess(pointer_names[1], pointer_names[2])
|
||||
|
||||
print(f"removing mid pointer {pointer_names[1]}")
|
||||
subprocess.check_call(
|
||||
("yama", "debug", "rmp", pointer_names[1]),
|
||||
cwd=yama_path,
|
||||
)
|
||||
|
||||
print("extracting last pointer to check still valid")
|
||||
dest_path = tdpath.joinpath("desta")
|
||||
subprocess.check_call(
|
||||
(
|
||||
"datman",
|
||||
"extract",
|
||||
"--skip-metadata",
|
||||
"--accept-partial",
|
||||
"main",
|
||||
"../desta",
|
||||
),
|
||||
cwd=datman_path,
|
||||
)
|
||||
|
||||
# this will be wrapped in a directory that starts with the name srca+
|
||||
extracted_dir_descriptor_wrapper = scan_dir(dest_path)
|
||||
|
||||
contents = extracted_dir_descriptor_wrapper.contents
|
||||
self.assertEqual(len(contents), 1)
|
||||
key, value = next(iter(contents.items()))
|
||||
self.assertTrue(key.startswith("srca+"))
|
||||
|
||||
self.assertIsInstance(value, DirectoryDescriptor)
|
||||
key, value = next(iter(value.contents.items()))
|
||||
self.assertEqual(key, "srca")
|
||||
|
||||
self.assertEqual(value.ignore_metadata(), mutated_descriptor.ignore_metadata())
|
||||
|
||||
td.cleanup()
|
||||
|
@ -1,6 +1,7 @@
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Set
|
||||
|
||||
|
||||
def set_up_simple_yama(path: Path):
|
||||
@ -10,3 +11,13 @@ def set_up_simple_yama(path: Path):
|
||||
"example_zstd.dict"
|
||||
)
|
||||
shutil.copyfile(example_zstd_path, path.joinpath("important_zstd.dict"))
|
||||
|
||||
|
||||
def list_bloblog_ids(pile: Path) -> Set[int]:
|
||||
result = set()
|
||||
for p in pile.joinpath("bloblog").iterdir():
|
||||
try:
|
||||
result.add(int(p.name))
|
||||
except ValueError:
|
||||
pass
|
||||
return result
|
||||
|
@ -22,7 +22,7 @@ REQUIRED = ["green", "attrs", "immutabledict"]
|
||||
|
||||
|
||||
# What packages are optional?
|
||||
EXTRAS = {"dev": ["black==21.7b0", "flake8==3.9.2", "isort==5.9.2"]}
|
||||
EXTRAS = {"dev": ["black==22.10.0", "flake8==3.9.2", "isort==5.9.2"]}
|
||||
|
||||
# The rest you shouldn't have to touch too much :)
|
||||
# ------------------------------------------------
|
||||
|
175
testsuite/yamatests/test_compact.py
Normal file
175
testsuite/yamatests/test_compact.py
Normal file
@ -0,0 +1,175 @@
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from random import Random
|
||||
from tempfile import TemporaryDirectory
|
||||
from unittest import TestCase
|
||||
|
||||
from helpers import (
|
||||
DirectoryDescriptor,
|
||||
generate_random_dir,
|
||||
randomly_mutate_directory_in_descriptor,
|
||||
scan_dir,
|
||||
)
|
||||
from helpers.datman_helpers import set_up_simple_datman
|
||||
from helpers.yama_helpers import list_bloblog_ids, set_up_simple_yama
|
||||
|
||||
|
||||
class TestYamaCompact(TestCase):
|
||||
def test_compaction_merge_two_small_bloblogs(self):
|
||||
td = TemporaryDirectory("test_check_fails_after_random_corruption")
|
||||
tdpath = Path(td.name)
|
||||
|
||||
datman_path = tdpath.joinpath("datman")
|
||||
src_path = datman_path.joinpath("srca")
|
||||
yama_path = datman_path.joinpath("main")
|
||||
|
||||
set_up_simple_datman(datman_path)
|
||||
set_up_simple_yama(yama_path)
|
||||
|
||||
rng = Random()
|
||||
seed = rng.randint(0, 9001)
|
||||
print(f"seed: {seed}")
|
||||
rng.seed(seed)
|
||||
later_expected_descriptor, _ = generate_random_dir(rng, src_path, 32)
|
||||
|
||||
# Back up twice: that way we should get at least two bloblogs!
|
||||
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
|
||||
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
|
||||
old_bloblog_ids = list_bloblog_ids(yama_path)
|
||||
self.assertGreater(
|
||||
len(old_bloblog_ids), 1, "Should be many bloblogs at this point"
|
||||
)
|
||||
|
||||
subprocess.check_call(
|
||||
(
|
||||
"yama",
|
||||
"compact",
|
||||
"--mergeable",
|
||||
"2",
|
||||
"--small",
|
||||
str(2 * 1024 * 1024 * 1024),
|
||||
),
|
||||
cwd=yama_path,
|
||||
)
|
||||
|
||||
new_bloblog_ids = list_bloblog_ids(yama_path)
|
||||
self.assertEqual(
|
||||
len(new_bloblog_ids), 1, "Should only be 1 bloblog at this point."
|
||||
)
|
||||
self.assertEqual(
|
||||
list(new_bloblog_ids)[0],
|
||||
max(old_bloblog_ids) + 1,
|
||||
"New bloblog ID should be 1 greater than the max old one.",
|
||||
)
|
||||
|
||||
def test_gc_then_compact(self):
|
||||
td = TemporaryDirectory("test_gc_then_compact")
|
||||
tdpath = Path(td.name)
|
||||
|
||||
datman_path = tdpath.joinpath("datman")
|
||||
src_path = datman_path.joinpath("srca")
|
||||
yama_path = datman_path.joinpath("main")
|
||||
|
||||
set_up_simple_datman(datman_path)
|
||||
set_up_simple_yama(yama_path)
|
||||
|
||||
rng = Random()
|
||||
seed = rng.randint(0, 9001)
|
||||
print(f"seed: {seed}")
|
||||
rng.seed(seed)
|
||||
initial_descriptor, _ = generate_random_dir(rng, src_path, 32)
|
||||
|
||||
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
|
||||
orig_pointer_name = (
|
||||
subprocess.check_output(("yama", "debug", "lsp"), cwd=yama_path)
|
||||
.decode()
|
||||
.split("\n")[0]
|
||||
)
|
||||
|
||||
randomly_mutate_directory_in_descriptor(initial_descriptor, src_path, rng)
|
||||
mutated_descriptor = scan_dir(src_path)
|
||||
|
||||
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
|
||||
|
||||
old_bloblog_ids = list_bloblog_ids(yama_path)
|
||||
|
||||
# Try a GC and check that it's a no-op
|
||||
subprocess.check_call(
|
||||
("yama", "check", "--shallow", "--apply-gc"), cwd=yama_path
|
||||
)
|
||||
subprocess.check_call(
|
||||
(
|
||||
"yama",
|
||||
"compact",
|
||||
"--mergeable",
|
||||
"2000",
|
||||
"--reclaim",
|
||||
"1",
|
||||
"--max-dealloc",
|
||||
"1",
|
||||
),
|
||||
cwd=yama_path,
|
||||
)
|
||||
|
||||
unchanged_bloblog_ids = list_bloblog_ids(yama_path)
|
||||
self.assertEqual(
|
||||
old_bloblog_ids,
|
||||
unchanged_bloblog_ids,
|
||||
"No GC: no compaction should have happened.",
|
||||
)
|
||||
|
||||
subprocess.check_call(
|
||||
("yama", "debug", "rmp", orig_pointer_name), cwd=yama_path
|
||||
)
|
||||
|
||||
# Try a GC and check that it did something
|
||||
subprocess.check_call(
|
||||
("yama", "check", "--shallow", "--apply-gc"), cwd=yama_path
|
||||
)
|
||||
subprocess.check_call(
|
||||
(
|
||||
"yama",
|
||||
"compact",
|
||||
"--mergeable",
|
||||
"2000",
|
||||
"--reclaim",
|
||||
"1",
|
||||
"--max-dealloc",
|
||||
"1",
|
||||
),
|
||||
cwd=yama_path,
|
||||
)
|
||||
|
||||
new_bloblog_ids = list_bloblog_ids(yama_path)
|
||||
self.assertNotEqual(
|
||||
old_bloblog_ids, new_bloblog_ids, "GC: compaction should have happened."
|
||||
)
|
||||
|
||||
# Check that we can still extract the files!
|
||||
dest_path = tdpath.joinpath("desta")
|
||||
subprocess.check_call(
|
||||
(
|
||||
"datman",
|
||||
"extract",
|
||||
"--skip-metadata",
|
||||
"--accept-partial",
|
||||
"main",
|
||||
"../desta",
|
||||
),
|
||||
cwd=datman_path,
|
||||
)
|
||||
|
||||
extracted_dir_descriptor_wrapper = scan_dir(dest_path)
|
||||
|
||||
contents = extracted_dir_descriptor_wrapper.contents
|
||||
self.assertEqual(len(contents), 1)
|
||||
key, value = next(iter(contents.items()))
|
||||
self.assertTrue(key.startswith("srca+"))
|
||||
|
||||
self.assertIsInstance(value, DirectoryDescriptor)
|
||||
key, value = next(iter(value.contents.items()))
|
||||
self.assertEqual(key, "srca")
|
||||
|
||||
self.assertEqual(value.ignore_metadata(), mutated_descriptor.ignore_metadata())
|
||||
|
||||
td.cleanup()
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "yama"
|
||||
version = "0.6.0-alpha.4"
|
||||
version = "0.6.0-alpha.5"
|
||||
authors = ["Olivier 'reivilibre' <olivier@librepush.net>"]
|
||||
edition = "2018"
|
||||
description = "Deduplicated, compressed and encrypted content pile manager"
|
||||
|
@ -29,7 +29,8 @@ use yama::operations::checking::VacuumMode;
|
||||
use yama::operations::legacy_pushpull::{
|
||||
determine_bypass_level, open_pile_with_work_bypass, push_to,
|
||||
};
|
||||
use yama::operations::{checking, extracting};
|
||||
use yama::operations::{checking, cleanup, extracting};
|
||||
use yama::pile::local_sqlitebloblogs::CompactionThresholds;
|
||||
use yama::pile::{Pile, PileDescriptor, RawPile};
|
||||
use yama::{commands, debug};
|
||||
|
||||
@ -56,8 +57,9 @@ enum PileCommand {
|
||||
pointer_name: String,
|
||||
|
||||
/// Limited expression(s) of files to retrieve.
|
||||
/// LIMITATION OF CURRENT VERSION: ONLY ONE EXACT PATH ALLOWED, PLEASE.
|
||||
#[clap(short, long)]
|
||||
subset: Vec<PathBuf>,
|
||||
subset: Option<String>,
|
||||
|
||||
destination: PathBuf,
|
||||
|
||||
@ -82,6 +84,29 @@ enum PileCommand {
|
||||
shallow: bool,
|
||||
},
|
||||
|
||||
Compact {
|
||||
/// Don't actually perform any compaction; just plan it out.
|
||||
#[clap(long)]
|
||||
dry_run: bool,
|
||||
|
||||
/// Allocated size under which a bloblog is considered small.
|
||||
#[clap(long = "small")]
|
||||
small_thresh: Option<u64>,
|
||||
|
||||
/// Minimum amount of space to reclaim in order to run compaction for reclaim.
|
||||
#[clap(long = "reclaim")]
|
||||
min_reclaim: Option<u64>,
|
||||
|
||||
/// Maximum amount of space that can be deallocated in a bloblog before we consider it
|
||||
/// worthwhile to replace.
|
||||
#[clap(long = "max-dealloc")]
|
||||
max_deallocated: Option<u64>,
|
||||
|
||||
/// Minimum number of mergeable small bloblogs in order to run compaction for merge.
|
||||
#[clap(long)]
|
||||
mergeable: Option<u32>,
|
||||
},
|
||||
|
||||
/// Enter a debug prompt for manually operating on the yama pile.
|
||||
Debug { supplied_command: Vec<String> },
|
||||
|
||||
@ -118,7 +143,7 @@ fn wrapped_main() -> anyhow::Result<i32> {
|
||||
match &opts.command {
|
||||
PileCommand::Retrieve {
|
||||
pointer_name,
|
||||
subset: _,
|
||||
subset,
|
||||
destination,
|
||||
num_workers: workers,
|
||||
} => {
|
||||
@ -136,10 +161,25 @@ fn wrapped_main() -> anyhow::Result<i32> {
|
||||
|
||||
fully_integrate_pointer_node(&pile, &mut root_tree_node.node, &mut pointer)?;
|
||||
|
||||
let mut node_to_extract = &mut root_tree_node.node;
|
||||
|
||||
if let Some(subset) = subset {
|
||||
for path_to_descend in subset.split('/').filter(|s| !s.is_empty()) {
|
||||
match node_to_extract.child(path_to_descend) {
|
||||
Ok(new_node) => {
|
||||
node_to_extract = new_node;
|
||||
}
|
||||
Err(msg) => {
|
||||
bail!("Can't descend into {path_to_descend:?}: {msg}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// todo allow disabling apply metadata
|
||||
extracting::extract(
|
||||
destination,
|
||||
&mut root_tree_node.node,
|
||||
node_to_extract,
|
||||
&pile,
|
||||
true,
|
||||
workers.unwrap_or(2),
|
||||
@ -175,6 +215,29 @@ fn wrapped_main() -> anyhow::Result<i32> {
|
||||
return Ok(1);
|
||||
}
|
||||
}
|
||||
PileCommand::Compact {
|
||||
dry_run,
|
||||
small_thresh,
|
||||
min_reclaim,
|
||||
max_deallocated,
|
||||
mergeable,
|
||||
} => {
|
||||
let this_dir = Path::new(".");
|
||||
let descriptor =
|
||||
load_pile_descriptor(this_dir).context("Failed to load pile descriptor")?;
|
||||
cleanup::compact(
|
||||
this_dir,
|
||||
&descriptor,
|
||||
!*dry_run,
|
||||
true,
|
||||
CompactionThresholds {
|
||||
minimum_to_reclaim: min_reclaim.unwrap_or(2 * 1024 * 1024 * 1024),
|
||||
minimum_small_bloblogs_to_merge: mergeable.unwrap_or(64),
|
||||
cond_if_more_deallocated_than: max_deallocated.unwrap_or(256 * 1024 * 1024),
|
||||
cond_if_less_allocated_than: small_thresh.unwrap_or(64 * 1024 * 1024),
|
||||
},
|
||||
)?;
|
||||
}
|
||||
PileCommand::Init {} => {
|
||||
commands::init(".".as_ref())?;
|
||||
}
|
||||
|
@ -15,13 +15,12 @@ You should have received a copy of the GNU General Public License
|
||||
along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node};
|
||||
use crate::commands::retrieve_tree_node;
|
||||
use crate::definitions::{FilesystemOwnership, FilesystemPermissions, TreeNode};
|
||||
use crate::operations::remove_pointer_safely;
|
||||
use crate::pile::{Pile, PileDescriptor, RawPile};
|
||||
use crate::tree::integrate_node_in_place;
|
||||
use anyhow::anyhow;
|
||||
use clap::Parser;
|
||||
use log::info;
|
||||
use rustyline::error::ReadlineError;
|
||||
use rustyline::Editor;
|
||||
|
||||
@ -123,50 +122,7 @@ pub fn debug_command<RP: RawPile>(
|
||||
}
|
||||
}
|
||||
DebugCommand::DeletePointer { name } => {
|
||||
// retrieve this pointer
|
||||
let mut this_pointer = pile.read_pointer(name.as_str())?.ok_or_else(|| {
|
||||
anyhow!("Pointer {:?} does not exist so can not be deleted.", name)
|
||||
})?;
|
||||
let mut this_node = retrieve_tree_node(&pile, this_pointer.chunk_ref.clone())?;
|
||||
|
||||
// fully integrate the pointer
|
||||
fully_integrate_pointer_node(&pile, &mut this_node.node, &mut this_pointer)?;
|
||||
assert!(this_pointer.parent_pointer.is_none());
|
||||
|
||||
// now integrate any pointers that rely on this one
|
||||
// so that they no longer rely on this one.
|
||||
for pointer in pile.list_pointers()?.iter() {
|
||||
if pointer == name {
|
||||
continue;
|
||||
}
|
||||
if let Some(mut pointer_data) = pile.read_pointer(pointer.as_str())? {
|
||||
if let Some(parent_pointer) = pointer_data.parent_pointer.as_ref() {
|
||||
if parent_pointer == name {
|
||||
info!("Pointer is now an orphan: {:?}", pointer);
|
||||
|
||||
// need to integrate this node, so retrieve it
|
||||
let mut node = retrieve_tree_node(&pile, pointer_data.chunk_ref)?;
|
||||
|
||||
// integrate it in-place
|
||||
integrate_node_in_place(&mut node.node, &this_node.node)?;
|
||||
|
||||
// mark it as orphaned (no parent)
|
||||
pointer_data.parent_pointer = None;
|
||||
|
||||
// store the orphaned node
|
||||
let new_chunk_ref = store_tree_node(&pile, &node)?;
|
||||
// associate the orphaned node with the orphaned pointer
|
||||
pointer_data.chunk_ref = new_chunk_ref;
|
||||
// write the pointer back.
|
||||
pile.write_pointer(pointer.as_str(), &pointer_data)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// then delete the pointer
|
||||
pile.delete_pointer(name)?;
|
||||
info!("Deleted pointer: {:?}", name);
|
||||
remove_pointer_safely(pile, name)?;
|
||||
}
|
||||
DebugCommand::PointerInfo { name } => {
|
||||
let this_pointer = pile
|
||||
|
@ -270,6 +270,19 @@ impl TreeNode {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Recurses into a child by name, or returns Err with a reason.
|
||||
pub fn child(&mut self, name: &str) -> Result<&mut TreeNode, &'static str> {
|
||||
match self {
|
||||
TreeNode::NormalFile { .. } => Err("not a directory: normal file"),
|
||||
TreeNode::Directory { children, .. } => match children.get_mut(name) {
|
||||
None => Err("child not in directory"),
|
||||
Some(node) => Ok(node),
|
||||
},
|
||||
TreeNode::SymbolicLink { .. } => Err("not a directory: symlink"),
|
||||
TreeNode::Deleted => Err("not a directory: deleted"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
|
@ -1,4 +1,80 @@
|
||||
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node};
|
||||
use crate::pile::{Pile, RawPile};
|
||||
use crate::tree::{differentiate_node_in_place, integrate_node_in_place};
|
||||
use anyhow::{anyhow, Context};
|
||||
use log::info;
|
||||
|
||||
pub mod checking;
|
||||
pub mod cleanup;
|
||||
pub mod extracting;
|
||||
pub mod legacy_pushpull;
|
||||
pub mod storing;
|
||||
|
||||
pub fn remove_pointer_safely<P: RawPile>(pile: &Pile<P>, name: &str) -> anyhow::Result<()> {
|
||||
// retrieve this pointer
|
||||
let mut this_pointer = pile
|
||||
.read_pointer(name)?
|
||||
.ok_or_else(|| anyhow!("Pointer {:?} does not exist so can not be deleted.", name))?;
|
||||
let mut this_node = retrieve_tree_node(&pile, this_pointer.chunk_ref.clone())
|
||||
.context("retrieving 'this' node")?;
|
||||
|
||||
let new_parent_name = this_pointer.parent_pointer.clone();
|
||||
fully_integrate_pointer_node(pile, &mut this_node.node, &mut this_pointer)
|
||||
.context("integrating new parent")?;
|
||||
|
||||
let new_parent = if let Some(ref new_parent_name) = new_parent_name {
|
||||
let mut new_parent_pointer = pile
|
||||
.read_pointer(new_parent_name.as_str())?
|
||||
.ok_or_else(|| anyhow!("Parent pointer {:?} does not exist.", name))?;
|
||||
let mut new_parent_node = retrieve_tree_node(&pile, new_parent_pointer.chunk_ref.clone())?;
|
||||
fully_integrate_pointer_node(pile, &mut new_parent_node.node, &mut new_parent_pointer)?;
|
||||
Some((new_parent_pointer, new_parent_node))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// now integrate any pointers that rely on this one
|
||||
// so that they no longer rely on this one.
|
||||
for pointer in pile.list_pointers()?.iter() {
|
||||
if pointer == name {
|
||||
continue;
|
||||
}
|
||||
if let Some(mut pointer_data) = pile.read_pointer(pointer.as_str())? {
|
||||
if let Some(parent_pointer) = pointer_data.parent_pointer.as_ref() {
|
||||
if parent_pointer == name {
|
||||
info!("Pointer would be orphaned: {:?}; integrating", pointer);
|
||||
|
||||
// need to integrate this node, so retrieve it
|
||||
let mut node = retrieve_tree_node(&pile, pointer_data.chunk_ref)?;
|
||||
|
||||
// integrate it in-place
|
||||
integrate_node_in_place(&mut node.node, &this_node.node)?;
|
||||
|
||||
if let Some((_, ref new_parent_node)) = new_parent {
|
||||
// then differentiate with respect to the NEW parent
|
||||
differentiate_node_in_place(&mut node.node, &new_parent_node.node)?;
|
||||
}
|
||||
|
||||
// pass through the parent
|
||||
pointer_data.parent_pointer = new_parent_name.clone();
|
||||
|
||||
// store the updated version of the pointer
|
||||
let new_chunk_ref = store_tree_node(&pile, &node)?;
|
||||
// associate the new node with the new version of the pointer
|
||||
pointer_data.chunk_ref = new_chunk_ref;
|
||||
// write the pointer back.
|
||||
pile.write_pointer(pointer.as_str(), &pointer_data)?;
|
||||
|
||||
// we must flush chunks before deleting the pointer
|
||||
pile.flush()
|
||||
.context("flushing after writing pointer back")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// then delete the pointer
|
||||
pile.delete_pointer(name)?;
|
||||
info!("Deleted pointer: {:?}", name);
|
||||
Ok(())
|
||||
}
|
||||
|
@ -24,8 +24,10 @@ use crate::pile::{
|
||||
use anyhow::bail;
|
||||
use crossbeam_channel::Sender;
|
||||
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
|
||||
use itertools::Itertools;
|
||||
use log::{error, info, warn};
|
||||
use std::collections::HashSet;
|
||||
use std::convert::TryInto;
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::Mutex;
|
||||
|
||||
@ -110,6 +112,10 @@ impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
|
||||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -137,6 +143,10 @@ impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
|
||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||
self.underlying.describe_pipeline()
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a full check of a Yama pile. This reads ALL the chunks, which can take a long time.
|
||||
@ -403,9 +413,21 @@ pub fn check_shallow<RP: RawPile>(
|
||||
|
||||
// actually do the vacuum!
|
||||
info!("Going to vacuum them up.");
|
||||
for vacuum_id in to_vacuum {
|
||||
pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
|
||||
pbar.inc(1);
|
||||
for vacuum_ids_chunk in to_vacuum
|
||||
.into_iter()
|
||||
.chunks(512)
|
||||
.into_iter()
|
||||
.map(|c| c.collect::<Vec<ChunkId>>())
|
||||
{
|
||||
pile.raw_pile.delete_many(
|
||||
Keyspace::Chunk,
|
||||
vacuum_ids_chunk
|
||||
.iter()
|
||||
.map(|ci| ci.as_slice())
|
||||
.collect::<Vec<&[u8]>>()
|
||||
.as_slice(),
|
||||
)?;
|
||||
pbar.inc(vacuum_ids_chunk.len().try_into().unwrap());
|
||||
}
|
||||
pile.flush()?;
|
||||
pbar.finish_and_clear();
|
||||
|
64
yama/src/operations/cleanup.rs
Normal file
64
yama/src/operations/cleanup.rs
Normal file
@ -0,0 +1,64 @@
|
||||
use crate::pile::local_sqlitebloblogs::{CompactionThresholds, SqliteBloblogPile};
|
||||
use crate::pile::{PileDescriptor, PileStorage};
|
||||
use anyhow::{bail, Context};
|
||||
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
|
||||
use log::info;
|
||||
use std::path::Path;
|
||||
|
||||
pub fn compact(
|
||||
pile_path: &Path,
|
||||
pile_desc: &PileDescriptor,
|
||||
actually_run: bool,
|
||||
make_progress_bar: bool,
|
||||
thresholds: CompactionThresholds,
|
||||
) -> anyhow::Result<()> {
|
||||
let pbar = if make_progress_bar {
|
||||
ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10))
|
||||
} else {
|
||||
ProgressBar::hidden()
|
||||
};
|
||||
pbar.set_style(
|
||||
ProgressStyle::default_bar()
|
||||
.template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
|
||||
);
|
||||
pbar.set_message("compacting");
|
||||
|
||||
match pile_desc.storage {
|
||||
PileStorage::SqliteIndexedBloblog => {
|
||||
let bloblog_pile = SqliteBloblogPile::open(&pile_path)
|
||||
.context("Failed to open SQLite-indexed Bloblog Pile")?;
|
||||
compact_bloblogs(bloblog_pile, pbar, actually_run, thresholds)?;
|
||||
Ok(())
|
||||
}
|
||||
other @ PileStorage::RemoteOnly => {
|
||||
bail!("Cannot use compaction on this kind of pile: {other:?}!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_bloblogs(
|
||||
bloblog_pile: SqliteBloblogPile,
|
||||
pbar: ProgressBar,
|
||||
actually_run: bool,
|
||||
thresholds: CompactionThresholds,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("=== Analysing for compaction ===");
|
||||
let analysis = bloblog_pile.analyse_for_compaction()?;
|
||||
let chunks_total: u64 = analysis.values().map(|bs| bs.chunks_total).sum();
|
||||
let chunks_deleted: u64 = analysis.values().map(|bs| bs.chunks_deleted).sum();
|
||||
let bytes_total: u64 = analysis.values().map(|bs| bs.bytes_total).sum();
|
||||
let bytes_deleted: u64 = analysis.values().map(|bs| bs.bytes_deleted).sum();
|
||||
|
||||
info!("{} bloblogs in this pile, with {chunks_total} chunks ({bytes_total} B) of which {chunks_deleted} ({bytes_deleted} B) are deleted.", analysis.len());
|
||||
|
||||
info!("=== Planning compaction ===");
|
||||
let plan = bloblog_pile.plan_compaction(&thresholds, analysis)?;
|
||||
info!("Planned compaction: replace {} bloblogs (of which {} are small), freeing up {} B and rewriting {} B", plan.bloblogs_to_replace.len(), plan.small_bloblogs, plan.reclaimable_space, plan.bytes_to_write);
|
||||
|
||||
if actually_run {
|
||||
info!("=== Compacting ===");
|
||||
bloblog_pile.perform_compaction(Box::new(pbar), plan)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@ -141,6 +141,7 @@ pub trait RawPile: Send + Sync + Debug + 'static {
|
||||
fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>>;
|
||||
fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()>;
|
||||
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()>;
|
||||
fn delete_many(&self, kind: Keyspace, key: &[&[u8]]) -> anyhow::Result<()>;
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -167,6 +168,10 @@ pub trait RawPile: Send + Sync + Debug + 'static {
|
||||
) -> anyhow::Result<Sender<(ChunkId, Vec<u8>)>>;
|
||||
|
||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>>;
|
||||
|
||||
/// Return a u64 order token that indicates the optimum order to read this chunk in
|
||||
/// compared to other chunks.
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64>;
|
||||
}
|
||||
|
||||
impl RawPile for Box<dyn RawPile> {
|
||||
@ -182,6 +187,9 @@ impl RawPile for Box<dyn RawPile> {
|
||||
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete(kind, key)
|
||||
}
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete_many(kind, keys)
|
||||
}
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -210,6 +218,10 @@ impl RawPile for Box<dyn RawPile> {
|
||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||
self.as_ref().describe_pipeline()
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
self.as_ref().chunk_id_transfer_ordering_hint(chunk_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<RP: RawPile> RawPile for Arc<RP> {
|
||||
@ -225,6 +237,9 @@ impl<RP: RawPile> RawPile for Arc<RP> {
|
||||
fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete(kind, key)
|
||||
}
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.as_ref().delete_many(kind, keys)
|
||||
}
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -253,6 +268,10 @@ impl<RP: RawPile> RawPile for Arc<RP> {
|
||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||
self.as_ref().describe_pipeline()
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
self.as_ref().chunk_id_transfer_ordering_hint(chunk_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -83,6 +83,10 @@ impl<R: Clone + RawPile> RawPile for PileGuard<R> {
|
||||
bail!("Access denied");
|
||||
}
|
||||
|
||||
fn delete_many(&self, _kind: Keyspace, _keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
bail!("Access denied");
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
_kind: Keyspace,
|
||||
@ -130,4 +134,8 @@ impl<R: Clone + RawPile> RawPile for PileGuard<R> {
|
||||
// TODO(question) Should we be described in the pipeline?
|
||||
self.underlying.describe_pipeline()
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||
}
|
||||
}
|
||||
|
@ -278,6 +278,10 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
|
||||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -348,4 +352,8 @@ impl<R: RawPile> RawPile for RawPileCompressor<R> {
|
||||
});
|
||||
Ok(underlying)
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||
}
|
||||
}
|
||||
|
@ -101,6 +101,10 @@ impl<R: RawPile> RawPile for RawPileEncryptor<R> {
|
||||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -127,4 +131,8 @@ impl<R: RawPile> RawPile for RawPileEncryptor<R> {
|
||||
underlying.push(PipelineDescription::Encryption);
|
||||
Ok(underlying)
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||
}
|
||||
}
|
||||
|
@ -98,6 +98,10 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
|
||||
self.underlying.delete(kind, key)
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
self.underlying.delete_many(kind, keys)
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -149,4 +153,8 @@ impl<RP: RawPile> RawPile for RawPileIntegrityChecker<RP> {
|
||||
underlying.push(PipelineDescription::Integrity);
|
||||
Ok(underlying)
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
self.underlying.chunk_id_transfer_ordering_hint(chunk_id)
|
||||
}
|
||||
}
|
||||
|
@ -15,20 +15,23 @@ You should have received a copy of the GNU General Public License
|
||||
along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::fs::{read_dir, File, OpenOptions};
|
||||
use std::fs::{read_dir, remove_file, File, OpenOptions};
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use log::{info, warn};
|
||||
use nix::unistd::sync;
|
||||
use rusqlite::{params, Error, ErrorCode, Transaction};
|
||||
use rusqlite::ffi::ErrorCode::ConstraintViolation;
|
||||
use rusqlite::{params, Error, ErrorCode, Transaction, TransactionBehavior, NO_PARAMS};
|
||||
use rusqlite::{Connection, OptionalExtension};
|
||||
|
||||
use crate::definitions::ChunkId;
|
||||
@ -36,10 +39,8 @@ use crate::pile::{
|
||||
ControllerMessage, DebugStatistics, Keyspace, PipelineDescription, RawPile,
|
||||
StoragePipelineSettings,
|
||||
};
|
||||
use crate::utils::bytes_to_hexstring;
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use rusqlite::ffi::ErrorCode::ConstraintViolation;
|
||||
use std::time::Duration;
|
||||
use crate::progress::ProgressTracker;
|
||||
use crate::utils::{bytes_to_hexstring, LruMap};
|
||||
|
||||
/// Bloblogs will not be reused if they are already 2 GiB large.
|
||||
pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
|
||||
@ -47,6 +48,14 @@ pub const MAX_BLOBLOG_REUSE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
|
||||
/// This many pointers will be batched up for writing.
|
||||
pub const POINTER_WRITE_BATCHES: usize = 2048;
|
||||
|
||||
/// This many bloblogs will be kept open for reading, at maximum.
|
||||
pub const BLOBLOG_MAX_READING_FILE_COUNT: usize = 128;
|
||||
|
||||
/// Size of a blob header within a bloblog.
|
||||
/// 32 byte Chunk Id
|
||||
/// 4 byte (u32) Blob size
|
||||
pub const BLOB_HEADER_SIZE: u64 = 32 + 4;
|
||||
|
||||
/// A file storing a log of blobs.
|
||||
/// Format:
|
||||
/// Repeated:
|
||||
@ -136,8 +145,8 @@ pub type BloblogId = u32;
|
||||
#[derive(Debug)]
|
||||
pub struct Inner {
|
||||
next_bloblog_id: BloblogId,
|
||||
writer_bloblogs: Vec<BloblogId>,
|
||||
open_bloblogs: HashMap<BloblogId, Arc<Mutex<Bloblog>>>, // TODO want an LRU cache with a weak hashmap...?
|
||||
writer_bloblogs: Vec<(BloblogId, Arc<Mutex<Bloblog>>)>,
|
||||
reader_bloblogs: LruMap<BloblogId, Arc<Mutex<Bloblog>>>,
|
||||
connection: Connection,
|
||||
writers_in_progress: u16,
|
||||
// We batch up pointer writes because sync() performance really hurts us if we do them one by
|
||||
@ -271,7 +280,7 @@ impl SqliteBloblogPile {
|
||||
inner: Arc::new(Mutex::new(Inner {
|
||||
next_bloblog_id: 0,
|
||||
writer_bloblogs: Vec::new(),
|
||||
open_bloblogs: HashMap::new(),
|
||||
reader_bloblogs: LruMap::new(BLOBLOG_MAX_READING_FILE_COUNT),
|
||||
connection,
|
||||
writers_in_progress: 0,
|
||||
queued_pointer_writes: Default::default(),
|
||||
@ -284,51 +293,53 @@ impl SqliteBloblogPile {
|
||||
|
||||
fn open_bloblog(&self, bloblog_id: BloblogId) -> anyhow::Result<Arc<Mutex<Bloblog>>> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
Ok(match inner.open_bloblogs.entry(bloblog_id) {
|
||||
Entry::Occupied(entry) => entry.get().clone(),
|
||||
Entry::Vacant(entry) => {
|
||||
|
||||
match inner.reader_bloblogs.get(&bloblog_id) {
|
||||
Some(bloblog) => Ok(bloblog.clone()),
|
||||
None => {
|
||||
let bloblog = Arc::new(Mutex::new(Bloblog::open(
|
||||
&self.path.join(&bloblog_id.to_string()),
|
||||
)?));
|
||||
entry.insert(bloblog.clone());
|
||||
bloblog
|
||||
inner.reader_bloblogs.insert(bloblog_id, bloblog.clone());
|
||||
Ok(bloblog)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn get_writing_bloblog(&self) -> anyhow::Result<(BloblogId, Arc<Mutex<Bloblog>>)> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let writing_bloblog_id: BloblogId = match inner.writer_bloblogs.pop() {
|
||||
None => {
|
||||
loop {
|
||||
let pre_inc = inner.next_bloblog_id;
|
||||
inner.next_bloblog_id += 1;
|
||||
|
||||
// only start writing here if it doesn't already exist!
|
||||
let bloblog_path = &self.path.join(&pre_inc.to_string());
|
||||
if !bloblog_path.exists() {
|
||||
break pre_inc;
|
||||
}
|
||||
}
|
||||
inner.writers_in_progress += 1;
|
||||
|
||||
if let Some(writing_bloblog) = inner.writer_bloblogs.pop() {
|
||||
// We already have an open bloblog to give back.
|
||||
return Ok(writing_bloblog);
|
||||
}
|
||||
|
||||
// No open bloblogs to reuse; create a new one.
|
||||
// It's very important to create a fresh one here; we definitely don't want to use a file
|
||||
// that someone else is using!
|
||||
let writing_bloblog_id = loop {
|
||||
let pre_inc = inner.next_bloblog_id;
|
||||
inner.next_bloblog_id += 1;
|
||||
|
||||
// only start writing here if it doesn't already exist!
|
||||
let bloblog_path = &self.path.join(&pre_inc.to_string());
|
||||
if !bloblog_path.exists() {
|
||||
break pre_inc;
|
||||
}
|
||||
Some(id) => id,
|
||||
};
|
||||
|
||||
let result = Ok((
|
||||
writing_bloblog_id,
|
||||
match inner.open_bloblogs.entry(writing_bloblog_id) {
|
||||
Entry::Occupied(entry) => entry.get().clone(),
|
||||
Entry::Vacant(entry) => {
|
||||
let bloblog = Arc::new(Mutex::new(Bloblog::open(
|
||||
&self.path.join(&writing_bloblog_id.to_string()),
|
||||
)?));
|
||||
entry.insert(bloblog.clone());
|
||||
bloblog
|
||||
}
|
||||
},
|
||||
));
|
||||
inner.writers_in_progress += 1;
|
||||
result
|
||||
let bloblog = Arc::new(Mutex::new(Bloblog::open(
|
||||
&self.path.join(&writing_bloblog_id.to_string()),
|
||||
)?));
|
||||
|
||||
// MAYBE FUTURE // Insert a weak reference so we can easily get a reader for this if desired.
|
||||
// inner.open_bloblogs.insert(writing_bloblog_id, Arc::downgrade(&bloblog));
|
||||
// For now, I don't think we actually care about reading a bloblog that we've written
|
||||
// (at least not usually?)
|
||||
|
||||
Ok((writing_bloblog_id, bloblog))
|
||||
}
|
||||
|
||||
/// Should be called once the bloblog has been finished writing to for the moment.
|
||||
@ -341,7 +352,7 @@ impl SqliteBloblogPile {
|
||||
let size = bloblog.lock().unwrap().filesize()?;
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
if size < MAX_BLOBLOG_REUSE_SIZE {
|
||||
inner.writer_bloblogs.push(id);
|
||||
inner.writer_bloblogs.push((id, bloblog));
|
||||
}
|
||||
inner.writers_in_progress -= 1;
|
||||
if inner.writers_in_progress == 0 {
|
||||
@ -367,6 +378,31 @@ impl SqliteBloblogPile {
|
||||
.optional()?)
|
||||
}
|
||||
|
||||
fn get_chunk_pointers(
|
||||
&self,
|
||||
chunk_ids: &[&[u8]],
|
||||
) -> anyhow::Result<Vec<Option<BloblogPointer>>> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let txn = inner.connection.transaction()?;
|
||||
let mut result = Vec::with_capacity(chunk_ids.len());
|
||||
{
|
||||
let mut stmt = txn.prepare("SELECT bloblog, offset FROM chunks WHERE chunk_id = ?1")?;
|
||||
for &chunk_id in chunk_ids {
|
||||
let bloglog_pointer: Option<BloblogPointer> = stmt
|
||||
.query_row(params![chunk_id], |row| {
|
||||
Ok(BloblogPointer {
|
||||
bloblog: row.get(0)?,
|
||||
offset: row.get::<_, i64>(1)? as u64,
|
||||
})
|
||||
})
|
||||
.optional()?;
|
||||
result.push(bloglog_pointer);
|
||||
}
|
||||
}
|
||||
txn.commit()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn put_chunk_pointer(&self, chunk_id: &ChunkId, pointer: BloblogPointer) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let offset_i64 = i64::try_from(pointer.offset).expect("ouch! can't turn u64 into i64...");
|
||||
@ -492,6 +528,341 @@ impl SqliteBloblogPile {
|
||||
assert!(pointers_buffered.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look at the bloblogs in this pile and see where space may be reclaimable if we were to
|
||||
/// compact.
|
||||
///
|
||||
/// Next step: plan_compaction
|
||||
pub fn analyse_for_compaction(&self) -> anyhow::Result<BTreeMap<BloblogId, BloblogStats>> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
// Lock the database right away.
|
||||
let txn = inner
|
||||
.connection
|
||||
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
|
||||
let mut stmt = txn.prepare(
|
||||
"
|
||||
SELECT bloblog, COUNT(c.offset), COUNT(d.offset), SUM(COALESCE(d.size, 0))
|
||||
FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
|
||||
GROUP BY bloblog
|
||||
",
|
||||
)?;
|
||||
|
||||
struct UnpopulatedBloblogStats {
|
||||
pub bloblog_id: BloblogId,
|
||||
pub chunks_total: u64,
|
||||
pub chunks_deleted: u64,
|
||||
pub bytes_deleted: u64,
|
||||
}
|
||||
|
||||
let unpopul_bloblog_stats = stmt.query_map(NO_PARAMS, |row| {
|
||||
Ok(UnpopulatedBloblogStats {
|
||||
bloblog_id: row.get(0)?,
|
||||
chunks_total: row.get::<_, i64>(1)?.try_into().expect("i64 -> u64"),
|
||||
chunks_deleted: row.get::<_, i64>(2)?.try_into().expect("i64 -> u64"),
|
||||
bytes_deleted: row.get::<_, i64>(3)?.try_into().expect("i64 -> u64"),
|
||||
})
|
||||
})?;
|
||||
|
||||
let mut final_stats = BTreeMap::new();
|
||||
|
||||
for unpopul_stat in unpopul_bloblog_stats {
|
||||
let UnpopulatedBloblogStats {
|
||||
bloblog_id,
|
||||
chunks_total,
|
||||
chunks_deleted,
|
||||
bytes_deleted,
|
||||
} = unpopul_stat?;
|
||||
let bloblog_path = self.path.join(&bloblog_id.to_string());
|
||||
let bytes_total = std::fs::metadata(&bloblog_path)
|
||||
.with_context(|| format!("Failed to get metadata for bloblog: {:?}", bloblog_path))?
|
||||
.size();
|
||||
|
||||
final_stats.insert(
|
||||
bloblog_id,
|
||||
BloblogStats {
|
||||
chunks_total,
|
||||
chunks_deleted,
|
||||
bytes_total,
|
||||
// Add a slight correction since we can count the blob headers of deleted blobs
|
||||
// as deleted.
|
||||
bytes_deleted: bytes_deleted + chunks_deleted * BLOB_HEADER_SIZE,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(final_stats)
|
||||
}
|
||||
|
||||
/// Look at the analysis of compaction and, using the specified thresholds, come up with a plan
|
||||
/// to perform compaction.
|
||||
///
|
||||
/// May return an empty plan if compaction isn't worthwhile.
|
||||
///
|
||||
/// Previous step: analyse_for_compaction
|
||||
/// Next step: perform_compaction
|
||||
pub fn plan_compaction(
|
||||
&self,
|
||||
thresholds: &CompactionThresholds,
|
||||
analysis: BTreeMap<BloblogId, BloblogStats>,
|
||||
) -> anyhow::Result<CompactionPlan> {
|
||||
let bloblogs_to_replace: BTreeMap<BloblogId, BloblogStats> = analysis
|
||||
.into_iter()
|
||||
.filter(|(_id, stats)| thresholds.should_replace_bloblog(stats))
|
||||
.collect();
|
||||
let reclaimable_space: u64 = bloblogs_to_replace
|
||||
.values()
|
||||
.map(|bs| bs.bytes_deleted)
|
||||
.sum();
|
||||
let bytes_to_write: u64 = bloblogs_to_replace
|
||||
.values()
|
||||
.map(|bs| bs.bytes_total - bs.bytes_deleted)
|
||||
.sum();
|
||||
let small_bloblogs: u32 = bloblogs_to_replace
|
||||
.values()
|
||||
.filter(|bs| bs.bytes_total - bs.bytes_deleted < thresholds.cond_if_less_allocated_than)
|
||||
.count() as u32;
|
||||
|
||||
if reclaimable_space < thresholds.minimum_to_reclaim
|
||||
&& small_bloblogs < thresholds.minimum_small_bloblogs_to_merge
|
||||
{
|
||||
// Nothing worth doing: return an empty plan.
|
||||
return Ok(CompactionPlan {
|
||||
bloblogs_to_replace: Default::default(),
|
||||
bytes_to_write: 0,
|
||||
reclaimable_space: 0,
|
||||
small_bloblogs: 0,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(CompactionPlan {
|
||||
bloblogs_to_replace: bloblogs_to_replace.keys().copied().collect(),
|
||||
bytes_to_write,
|
||||
reclaimable_space,
|
||||
small_bloblogs,
|
||||
})
|
||||
}
|
||||
|
||||
/// Given a compaction plan, perform the compaction.
|
||||
/// There shouldn't be any decisions left to be made at this point: just action.
|
||||
///
|
||||
/// TODO flock the bloblogs to be removed and make readers and writers also flock them too.
|
||||
///
|
||||
/// TODO find a way to deal with bloblogs that are entirely unreferenced from the index
|
||||
/// (e.g. bloblogs that weren't written properly, e.g. if compaction fails.)
|
||||
pub fn perform_compaction(
|
||||
&self,
|
||||
mut progress: Box<dyn ProgressTracker>,
|
||||
plan: CompactionPlan,
|
||||
) -> anyhow::Result<()> {
|
||||
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
|
||||
struct ReplacedBlobRow {
|
||||
pub old_bloblog: BloblogId,
|
||||
pub old_offset: u64,
|
||||
pub chunk_id: ChunkId,
|
||||
}
|
||||
|
||||
if plan.bloblogs_to_replace.is_empty() {
|
||||
info!("No compaction to be done.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut to_preserve = BTreeSet::new();
|
||||
let mut replacements = BTreeMap::new();
|
||||
|
||||
progress.set_max_size(plan.bytes_to_write);
|
||||
|
||||
// First find all the blobs we need to replace.
|
||||
{
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
// Lock the database right away.
|
||||
let txn = inner
|
||||
.connection
|
||||
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
|
||||
let mut stmt = txn.prepare(
|
||||
"
|
||||
SELECT chunk_id, c.offset
|
||||
FROM chunks c LEFT JOIN deleted d USING (bloblog, offset)
|
||||
WHERE bloblog = ?1 AND d.offset IS NULL
|
||||
",
|
||||
)?;
|
||||
for bloblog in plan.bloblogs_to_replace.iter().copied() {
|
||||
to_preserve.extend(
|
||||
stmt.query_map([bloblog], |row| {
|
||||
let mut chunk_id = ChunkId::default();
|
||||
chunk_id.copy_from_slice(row.get::<_, Vec<u8>>(0).unwrap().as_slice());
|
||||
Ok(ReplacedBlobRow {
|
||||
old_bloblog: bloblog,
|
||||
chunk_id,
|
||||
old_offset: row.get::<_, i64>(1).unwrap().try_into().unwrap(),
|
||||
})
|
||||
})?
|
||||
.collect::<Result<Vec<ReplacedBlobRow>, _>>()?,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Then make the replacements
|
||||
info!("Rewriting bloblogs...");
|
||||
let mut buf = Vec::new();
|
||||
let mut iterator = to_preserve.into_iter();
|
||||
loop {
|
||||
let (new_bloblog_id, bloglog_mutex) = self.get_writing_bloblog()?;
|
||||
let mut new_bloblog = bloglog_mutex.lock().expect("Failed to lock bloblog?");
|
||||
let mut is_more = false;
|
||||
|
||||
while let Some(preserve) = iterator.next() {
|
||||
is_more = true;
|
||||
|
||||
// Get hold of the old bloblog
|
||||
let old_bloblog = self.open_bloblog(preserve.old_bloblog)?;
|
||||
let mut old_bloblog = old_bloblog.lock().unwrap();
|
||||
|
||||
// Transfer the blob
|
||||
buf.clear();
|
||||
old_bloblog.read_blob(preserve.old_offset, &preserve.chunk_id, &mut buf)?;
|
||||
let new_offset = new_bloblog.write_blob(&preserve.chunk_id, &buf)?;
|
||||
|
||||
// Make a note of the replacement
|
||||
replacements.insert(
|
||||
preserve,
|
||||
BloblogPointer {
|
||||
bloblog: new_bloblog_id,
|
||||
offset: new_offset,
|
||||
},
|
||||
);
|
||||
|
||||
progress.inc_progress(buf.len() as u64);
|
||||
|
||||
if new_bloblog.filesize()? > MAX_BLOBLOG_REUSE_SIZE {
|
||||
// get a new bloblog to write with.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
drop(new_bloblog);
|
||||
self.return_writing_bloblog(new_bloblog_id, bloglog_mutex)?;
|
||||
|
||||
if !is_more {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
info!("Applying replacements...");
|
||||
{
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
// Lock the database right away.
|
||||
let txn = inner
|
||||
.connection
|
||||
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
|
||||
let mut stmt = txn.prepare(
|
||||
"
|
||||
UPDATE chunks
|
||||
SET bloblog = ?1, offset = ?2
|
||||
WHERE chunk_id = ?3
|
||||
",
|
||||
)?;
|
||||
|
||||
for (replacement_row, new_pos) in replacements {
|
||||
ensure!(
|
||||
stmt.execute(params![
|
||||
new_pos.bloblog,
|
||||
new_pos.offset as i64,
|
||||
&replacement_row.chunk_id as &[u8]
|
||||
])? == 1,
|
||||
"Wrong number of rows updated for replacement!"
|
||||
);
|
||||
}
|
||||
|
||||
drop(stmt);
|
||||
txn.commit().context("committing replacements")?;
|
||||
}
|
||||
|
||||
// TODO fsync new bloblogs
|
||||
|
||||
info!("Deleting old bloblogs...");
|
||||
{
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
// Lock the database right away.
|
||||
let txn = inner
|
||||
.connection
|
||||
.transaction_with_behavior(TransactionBehavior::Exclusive)?;
|
||||
|
||||
for bloblog_id in plan.bloblogs_to_replace.iter().copied() {
|
||||
let deleted_chunks = txn.execute(
|
||||
"
|
||||
DELETE FROM chunks WHERE bloblog = ?1
|
||||
",
|
||||
params![bloblog_id],
|
||||
)?;
|
||||
|
||||
let deleted_deleted = txn.execute(
|
||||
"
|
||||
DELETE FROM deleted WHERE bloblog = ?1
|
||||
",
|
||||
params![bloblog_id],
|
||||
)?;
|
||||
|
||||
ensure!(deleted_chunks == deleted_deleted, "Undeleted chunks left in bloblog {bloblog_id}: CHUNKS={deleted_chunks} DELETED={deleted_deleted}");
|
||||
|
||||
let bloblog_path = self.path.join(bloblog_id.to_string());
|
||||
remove_file(&bloblog_path).with_context(|| {
|
||||
format!("Failed to remove obsolete bloblog: {:?}", bloblog_path)
|
||||
})?;
|
||||
}
|
||||
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BloblogStats {
|
||||
pub chunks_total: u64,
|
||||
pub chunks_deleted: u64,
|
||||
pub bytes_total: u64,
|
||||
pub bytes_deleted: u64,
|
||||
}
|
||||
|
||||
pub struct CompactionPlan {
|
||||
pub bloblogs_to_replace: BTreeSet<BloblogId>,
|
||||
pub bytes_to_write: u64,
|
||||
pub reclaimable_space: u64,
|
||||
pub small_bloblogs: u32,
|
||||
}
|
||||
|
||||
pub struct CompactionThresholds {
|
||||
/// Minimum bytes to be reclaimable overall for compaction to be worthwhile.
|
||||
pub minimum_to_reclaim: u64,
|
||||
|
||||
/// (alternative reason) Minimum number of files to be undersized in order for compaction
|
||||
/// to be worthwhile.
|
||||
/// This gives us a way to make compaction run if we have lots of tiny bloblogs.
|
||||
pub minimum_small_bloblogs_to_merge: u32,
|
||||
|
||||
/// A bloblog will be replaced if the deallocated size is greater than this.
|
||||
pub cond_if_more_deallocated_than: u64,
|
||||
|
||||
/// A bloblog will be replaced if the allocated size is less than this.
|
||||
pub cond_if_less_allocated_than: u64,
|
||||
}
|
||||
|
||||
impl CompactionThresholds {
|
||||
pub fn should_replace_bloblog(&self, bloblog_stats: &BloblogStats) -> bool {
|
||||
let allocated = bloblog_stats.bytes_total - bloblog_stats.bytes_deleted;
|
||||
// Note that this will also trigger for fully-deallocated files if
|
||||
let is_small = allocated < self.cond_if_less_allocated_than;
|
||||
let has_large_deallocations =
|
||||
bloblog_stats.bytes_deleted > self.cond_if_more_deallocated_than;
|
||||
is_small || has_large_deallocations
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CompactionOutcome {
|
||||
pub bloblogs_deleted: u32,
|
||||
pub bloblogs_created: u32,
|
||||
pub bytes_deleted: u32,
|
||||
pub bytes_created: u32,
|
||||
}
|
||||
|
||||
impl Drop for SqliteBloblogPile {
|
||||
@ -636,6 +1007,59 @@ impl RawPile for SqliteBloblogPile {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
match kind {
|
||||
Keyspace::Chunk => {
|
||||
let mut chunk_pointers_by_bloblog: BTreeMap<BloblogId, Vec<(u64, &[u8])>> =
|
||||
BTreeMap::new();
|
||||
|
||||
for (chunk_pointer, chunk_id) in self
|
||||
.get_chunk_pointers(keys)
|
||||
.context("failed to get chunk pointers")?
|
||||
.into_iter()
|
||||
.zip(keys)
|
||||
.filter_map(|(pointer, &chunk_id)| match pointer {
|
||||
Some(pointer) => Some((pointer, chunk_id)),
|
||||
None => None,
|
||||
})
|
||||
{
|
||||
chunk_pointers_by_bloblog
|
||||
.entry(chunk_pointer.bloblog)
|
||||
.or_default()
|
||||
.push((chunk_pointer.offset, chunk_id));
|
||||
}
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let txn = inner.connection.transaction()?;
|
||||
{
|
||||
let mut stmt = txn.prepare(
|
||||
"INSERT OR IGNORE INTO deleted (bloblog, offset, size)
|
||||
VALUES (?1, ?2, ?3)",
|
||||
)?;
|
||||
for (bloblog_id, entries) in chunk_pointers_by_bloblog {
|
||||
let bloblog_mutex = self.open_bloblog(bloblog_id)?;
|
||||
let mut bloblog = bloblog_mutex.lock().unwrap();
|
||||
for (chunk_offset, raw_chunk_id) in entries {
|
||||
let mut chunk_id: ChunkId = Default::default();
|
||||
chunk_id.copy_from_slice(raw_chunk_id);
|
||||
let size = bloblog.blob_len(chunk_offset, &chunk_id)?;
|
||||
let offset_i64 = i64::try_from(chunk_offset)
|
||||
.expect("ouch! can't turn u64 into i64...");
|
||||
stmt.execute(params![bloblog_id, offset_i64, size])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
txn.commit().context("Failed to commit chunk deletions")?;
|
||||
}
|
||||
_ => {
|
||||
for &key in keys {
|
||||
self.delete(kind, key)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -750,6 +1174,16 @@ impl RawPile for SqliteBloblogPile {
|
||||
fn describe_pipeline(&self) -> anyhow::Result<Vec<PipelineDescription>> {
|
||||
Ok(vec![PipelineDescription::Store])
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
let chunk_pointer = self
|
||||
.get_chunk_pointer(chunk_id)?
|
||||
.context("Can't get chunk ID transfer ordering hint for chunk without pointer.")?;
|
||||
|
||||
// Scheme: 24-bit bloblog ID
|
||||
// followed by 40-bit offset
|
||||
Ok(((chunk_pointer.bloblog as u64) << 40) | (chunk_pointer.offset & 0xFF_FF_FF_FF_FF))
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyIterator {
|
||||
@ -795,9 +1229,10 @@ impl Iterator for KeyIterator {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::pile::local_sqlitebloblogs::Bloblog;
|
||||
use temp_dir::TempDir;
|
||||
|
||||
use crate::pile::local_sqlitebloblogs::Bloblog;
|
||||
|
||||
#[test]
|
||||
pub fn bloblog_read_write_test() {
|
||||
let td = TempDir::new().unwrap();
|
||||
|
@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use anyhow::{anyhow, bail};
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use log::{error, info};
|
||||
|
||||
@ -307,6 +307,12 @@ impl RawPile for Requester {
|
||||
other => Err(anyhow!("Received {:?} for Delete", other)),
|
||||
}
|
||||
}
|
||||
fn delete_many(&self, kind: Keyspace, keys: &[&[u8]]) -> anyhow::Result<()> {
|
||||
for &key in keys {
|
||||
self.delete(kind, key)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
fn list_keys(
|
||||
&self,
|
||||
kind: Keyspace,
|
||||
@ -438,6 +444,10 @@ impl RawPile for Requester {
|
||||
other => Err(anyhow!("Received {:?} for Describe", other)),
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_id_transfer_ordering_hint(&self, _chunk_id: &ChunkId) -> anyhow::Result<u64> {
|
||||
bail!("You probably shouldn't be using chunk ID transfer ordering hints with a remote.");
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ListKeyIterator {
|
||||
|
@ -185,7 +185,7 @@ pub fn differentiate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow
|
||||
/// result is in-place.
|
||||
///
|
||||
/// Preconditions:
|
||||
/// - `old` must be an integrated pointer.
|
||||
/// - `old` must be an integrated pointer. (Otherwise this algorithm is not correct.)
|
||||
/// - `old` is the parent of `new`
|
||||
pub fn integrate_node_in_place(new: &mut TreeNode, old: &TreeNode) -> anyhow::Result<()> {
|
||||
if let TreeNode::Directory { children, .. } = new {
|
||||
|
@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt::Write;
|
||||
|
||||
pub fn bytes_to_hexstring(chunkid: &[u8]) -> String {
|
||||
@ -42,3 +43,98 @@ pub fn get_number_of_workers(first_try_env_name: &str) -> u8 {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LruMap<K, V> {
|
||||
capacity: usize,
|
||||
last_access: BTreeSet<(u64, K)>,
|
||||
items: BTreeMap<K, (V, u64)>,
|
||||
counter: u64,
|
||||
}
|
||||
|
||||
impl<K: Ord + Clone, V> LruMap<K, V> {
|
||||
pub fn new(capacity: usize) -> LruMap<K, V> {
|
||||
LruMap {
|
||||
capacity,
|
||||
last_access: BTreeSet::new(),
|
||||
items: BTreeMap::new(),
|
||||
counter: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets an item from the LRU map.
|
||||
pub fn get(&mut self, key: &K) -> Option<&V> {
|
||||
match self.items.get_mut(key) {
|
||||
Some((value, last_used_instant)) => {
|
||||
assert!(
|
||||
self.last_access.remove(&(*last_used_instant, key.clone())),
|
||||
"Corrupt LRU map: freshen not correct."
|
||||
);
|
||||
let new_instant = self.counter;
|
||||
self.counter += 1;
|
||||
self.last_access.insert((new_instant, key.clone()));
|
||||
*last_used_instant = new_instant;
|
||||
Some(value)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
|
||||
let new_instant = self.counter;
|
||||
self.counter += 1;
|
||||
|
||||
let retval = match self.items.insert(key.clone(), (value, new_instant)) {
|
||||
Some((old_entry, old_instant)) => {
|
||||
assert!(
|
||||
self.last_access.remove(&(old_instant, key.clone())),
|
||||
"Corrupt LRU map: insert not correct."
|
||||
);
|
||||
Some(old_entry)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
self.last_access.insert((new_instant, key));
|
||||
|
||||
if retval.is_none() {
|
||||
// We didn't replace any item, so we have grown by 1.
|
||||
// Check if we need to evict.
|
||||
if self.items.len() > self.capacity {
|
||||
self.evict();
|
||||
}
|
||||
}
|
||||
|
||||
retval
|
||||
}
|
||||
|
||||
pub fn evict(&mut self) -> Option<(K, V)> {
|
||||
if let Some(first_entry) = self.last_access.iter().next().cloned() {
|
||||
self.last_access.remove(&first_entry);
|
||||
let (_, key) = first_entry;
|
||||
let (value, _) = self
|
||||
.items
|
||||
.remove(&key)
|
||||
.expect("Corrupt LRU map: last access and items out of sync");
|
||||
|
||||
Some((key, value))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::utils::LruMap;
|
||||
|
||||
#[test]
|
||||
fn test_lru_map() {
|
||||
let mut lmap = LruMap::new(3);
|
||||
lmap.insert(1, 1);
|
||||
lmap.insert(2, 1);
|
||||
lmap.insert(3, 1);
|
||||
assert_eq!(lmap.get(&1), Some(&1));
|
||||
lmap.insert(4, 1);
|
||||
assert_eq!(lmap.get(&2), None);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user