diff --git a/Cargo.lock b/Cargo.lock index c94e3eb..8ab3762 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -746,7 +746,24 @@ dependencies = [ name = "datman" version = "0.7.0-alpha.1" dependencies = [ + "chrono", + "clap", + "dashmap", "eyre", + "indicatif", + "patricia_tree", + "serde", + "serde_json", + "tokio", + "toml", + "tracing", + "tracing-indicatif", + "tracing-subscriber", + "users", + "yama", + "yama_midlevel_crypto", + "yama_pile", + "yama_wormfile", ] [[package]] @@ -3036,9 +3053,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" dependencies = [ "autocfg", "bytes", @@ -3050,7 +3067,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -3065,9 +3082,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", diff --git a/datman/Cargo.toml b/datman/Cargo.toml index fa5e7c8..bab8e89 100644 --- a/datman/Cargo.toml +++ b/datman/Cargo.toml @@ -12,3 +12,26 @@ description = "A chunked and deduplicated backup system using Yama" [dependencies] eyre = "0.6.8" +clap = { version = "4.2.2", features = ["derive", "env"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.16", features = ["tracing-log", "env-filter"] } +tracing-indicatif = "0.3.0" +indicatif = "0.17.3" +serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0.96" +toml = "0.7.3" +tokio = { version = "1.28.0", features = ["fs", "macros", "rt-multi-thread"] } +dashmap = "5.4.0" +chrono = "0.4.24" +users = "0.11.0" + +yama = { version = "0.7.0-alpha.1", path = "../yama" } +yama_pile = { path = "../yama_pile" } +#yama_localcache = { path = "../yama_localcache" } +yama_wormfile = { path = "../yama_wormfile" } +#yama_wormfile_fs = { path = "../yama_wormfile_fs" } +#yama_wormfile_s3 = { path = "../yama_wormfile_s3" } +#yama_wormfile_sftp = { path = "../yama_wormfile_sftp" } +yama_midlevel_crypto = { path = "../yama_midlevel_crypto" } + +patricia_tree = "0.5.7" \ No newline at end of file diff --git a/datman/src/backup.rs b/datman/src/backup.rs new file mode 100644 index 0000000..d980858 --- /dev/null +++ b/datman/src/backup.rs @@ -0,0 +1,471 @@ +use crate::descriptor_config::{SourceDescriptor, SourceDescriptorInner, VirtualSourceKind}; +use crate::pointer_names::{get_pointer_name_at, POINTER_NAME_DATETIME_SPLITTER}; +use chrono::{DateTime, Utc}; +use dashmap::DashSet; +use eyre::{bail, eyre, Context, ContextCompat}; +use indicatif::ProgressStyle; +use patricia_tree::PatriciaMap; +use std::borrow::Cow; +use std::collections::{BTreeMap, HashMap}; +use std::io::Write; +use std::path::PathBuf; +use std::process::{Child, Command, Stdio}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::task::JoinSet; +use tracing::{debug, info, info_span, Instrument, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; +use users::{get_current_gid, get_current_uid}; +use yama::pile_with_cache::PileWithCache; +use yama::scan::create_uidgid_lookup_tables; +use yama::storing::{ + assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState, +}; +use yama::{scan, PROGRESS_BAR_STYLE}; +use yama_midlevel_crypto::chunk_id::ChunkId; +use yama_pile::definitions::{BlobLocator, BloblogId, IndexBloblogEntry, RecursiveChunkRef}; +use yama_pile::pointers::Pointer; +use yama_pile::tree::unpopulated::ScanEntry; +use yama_pile::tree::{ + assemble_tree_from_scan_entries, differentiate_node_in_place, FilesystemOwnership, + FilesystemPermissions, RootTreeNode, TreeNode, +}; +use yama_wormfile::boxed::BoxedWormFileProvider; + +pub async fn backup( + pwc: Arc>, + sources_to_backup: BTreeMap, +) -> eyre::Result<()> { + // Locate suitable parent pointers + let parents_to_use = find_suitable_parent_pointers(&pwc, &sources_to_backup) + .await + .context("failed to look for suitable parent pointers")?; + let now = Utc::now(); + + // (dirtrees) Scan + let dir_sources = scan_dir_sources(&sources_to_backup, parents_to_use, now) + .await + .context("failed to scan directory sources")?; + + let new_unflushed_chunks: Arc> = Arc::new(Default::default()); + + // (dirtrees) Start a storage pipeline and submit jobs to it + let task_store_dirs = { + let new_unflushed_chunks = new_unflushed_chunks.clone(); + let pwc = pwc.clone(); + + let bds_span = info_span!("storing"); + tokio::spawn( + async move { + backup_dir_sources(dir_sources, pwc, new_unflushed_chunks) + .await + .context("whilst backing up dir sources") + } + .instrument(bds_span), + ) + }; + + // (virtual source streams) Store to bloblog writers + let task_store_virtuals = { + let bvs_span = info_span!("storing_virts"); + let new_unflushed_chunks = new_unflushed_chunks.clone(); + let pwc = pwc.clone(); + + tokio::spawn( + async move { + backup_virtual_sources(&sources_to_backup, now, pwc, new_unflushed_chunks) + .await + .context("whilst backing up virtual sources") + } + .instrument(bvs_span), + ) + }; + + let (dir_sources_and_chunkmaps, virt_sources) = + tokio::join!(task_store_dirs, task_store_virtuals); + let dir_sources_and_chunkmaps: BackupDirSourcesReturn = dir_sources_and_chunkmaps??; + let mut virt_sources: Vec = virt_sources??; + + let mut chunkmaps = dir_sources_and_chunkmaps.chunkmaps; + for source in &mut virt_sources { + chunkmaps.extend( + std::mem::take(&mut source.chunkmaps) + .into_iter() + .map(|(k, nb)| { + ( + k, + IndexBloblogEntry { + chunks: nb, + forgotten_bytes: 0, + }, + ) + }), + ); + } + + // Chunkmaps, indices and write pointers + assemble_and_write_indices(&pwc, chunkmaps) + .await + .context("failed to assemble and write indices")?; + + info!("All indices stored, writing pointer..."); + + for (dir_source_prep, chunk_file_map) in dir_sources_and_chunkmaps.dir_source_returns { + // Assemble and write a pointer + let mut tree = + assemble_tree_from_scan_entries(dir_source_prep.scan_entry_map, chunk_file_map) + .context("failed to assemble tree")?; + let (uids, gids) = + create_uidgid_lookup_tables(&tree).context("failed to create uid/gid tables")?; + + if let Some(ref parent_node) = dir_source_prep.parent { + differentiate_node_in_place(&mut tree, &parent_node.root.node) + .context("failed to differentiate?")?; + } + + pwc.pile + .write_pointer( + &dir_source_prep.new_pointer_name, + false, + &Pointer { + parent: dir_source_prep.parent_name.clone(), + root: RootTreeNode { + name: dir_source_prep + .path + .file_name() + .map(|oss| oss.to_str()) + .flatten() + .unwrap_or("") + .to_owned(), + node: tree, + }, + uids, + gids, + }, + ) + .await + .context("failed to write pointer")?; + } + + for virtual_source in virt_sources { + pwc.pile + .write_pointer(&virtual_source.pointer_name, false, &virtual_source.pointer) + .await + .context("failed to write pointer")?; + } + + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + + Ok(()) +} + +/// Given access to a PWC and a map of sources to back up, returns a map of pointer names to use as parents. +/// For virtual sources, no parent is chosen. +/// For directory sources, the most recent pointer from the same source is chosen as a parent. +async fn find_suitable_parent_pointers( + pwc: &PileWithCache, + sources_to_backup: &BTreeMap, +) -> eyre::Result> { + let mut result = BTreeMap::new(); + + let pointers = pwc + .pile + .list_pointers() + .await + .context("failed to list pointers")?; + + for (source_name, source) in sources_to_backup.iter() { + if source.is_directory_source() { + let starter = format!("{source_name}{POINTER_NAME_DATETIME_SPLITTER}"); + if let Some(most_recent_pointer) = pointers + .iter() + .rev() + .filter(|pn| pn.starts_with(&starter)) + .next() + { + debug!("for {source_name:?}, using parent {most_recent_pointer:?}"); + let pointer = pwc + .read_pointer_fully_integrated(&most_recent_pointer) + .await + .context("failed to read parent pointer")? + .context("no parent pointer despite having just listed it")?; + result.insert( + source_name.to_owned(), + (most_recent_pointer.clone(), pointer), + ); + } + } + } + + Ok(result) +} + +struct DirSourcePrep { + scan_entry_map: PatriciaMap, + parent_name: Option, + parent: Option, + path: PathBuf, + new_pointer_name: String, +} + +async fn scan_dir_sources( + sources_to_backup: &BTreeMap, + mut parents: BTreeMap, + now: DateTime, +) -> eyre::Result> { + let mut joinset = JoinSet::new(); + + for (source_name, source) in sources_to_backup { + if let SourceDescriptorInner::DirectorySource { + path, + cross_filesystems, + ignore, + } = &source.inner + { + let path = path.to_owned(); + let cross_filesystems = *cross_filesystems; + debug!("TODO: xf={cross_filesystems}"); + let ignore = ignore.to_owned(); + let (parent_name, parent) = parents.remove(source_name).unzip(); + let new_pointer_name = get_pointer_name_at(&source_name, now); + joinset.spawn_blocking(move || -> eyre::Result { + let scan_entry_map = scan::scan(&path, &ignore).context("Failed to scan")?; + Ok(DirSourcePrep { + scan_entry_map, + parent_name, + parent, + path, + new_pointer_name, + }) + }); + } + } + + let mut result = Vec::new(); + + while let Some(dsp_res_res) = joinset.join_next().await { + result.push(dsp_res_res??); + } + Ok(result) +} + +struct BackupDirSourcesReturn { + pub chunkmaps: BTreeMap, + pub dir_source_returns: Vec<(DirSourcePrep, PatriciaMap<(RecursiveChunkRef, u64)>)>, +} + +async fn backup_dir_sources( + dir_sources: Vec, + pwc: Arc>, + new_unflushed_chunks: Arc>, +) -> eyre::Result { + let mut chunk_file_maps = Vec::new(); + let mut pruned_scan_entry_maps = Vec::new(); + + // First collect all that stuff together... + for dir_source in &dir_sources { + let (chunk_file_map, pruned_scan_entry_map) = if let Some(ref parent_node) = + dir_source.parent + { + let (cfm, pruned) = + scan::prepopulate_unmodified(&parent_node.root.node, &dir_source.scan_entry_map); + + (cfm, Cow::Owned(pruned)) + } else { + ( + PatriciaMap::<(RecursiveChunkRef, u64)>::new(), + Cow::Borrowed(&dir_source.scan_entry_map), + ) + }; + chunk_file_maps.push(chunk_file_map); + pruned_scan_entry_maps.push(pruned_scan_entry_map); + } + + let store_span = Span::current(); + // store_span.pb_set_style(&ProgressStyle::default_bar()); + store_span.pb_set_style( + &ProgressStyle::default_bar() + .template(PROGRESS_BAR_STYLE) + .unwrap(), + ); + store_span.pb_set_message("storing files"); + store_span.pb_set_length( + pruned_scan_entry_maps + .iter() + .map(|pruned_scan_entry_map| { + pruned_scan_entry_map + .values() + .filter(|v| matches!(v, ScanEntry::NormalFile { .. })) + .count() as u64 + }) + .sum(), + ); + + // + let (pipeline, pipeline_job_tx) = + StoragePipeline::launch_new(4, pwc.clone(), new_unflushed_chunks).await?; + + let dir_sources2 = &dir_sources; + let (submitter_task, receiver_task) = tokio::join!( + async move { + let pipeline_job_tx = pipeline_job_tx; + for (dir_source_idx, dir_source) in dir_sources2.iter().enumerate() { + for (name_bytes, scan_entry) in pruned_scan_entry_maps[dir_source_idx].iter() { + if let ScanEntry::NormalFile { .. } = scan_entry { + let name = std::str::from_utf8(name_bytes.as_slice()) + .context("name is not str")?; + // TODO(bug): if source name is a file, this doesn't work (.join("")) + pipeline_job_tx + .send_async(( + (dir_source_idx, name.to_owned()), + dir_source.path.join(name), + )) + .await + .map_err(|_| eyre!("unable to send to pipeline."))?; + } + } + } + + drop(pipeline_job_tx); + Ok::<_, eyre::Report>(()) + }, + async { + while let Ok(((dir_source_idx, job_id), rec_chunk_ref, real_size)) = + pipeline.next_result().await + { + chunk_file_maps[dir_source_idx].insert_str(&job_id, (rec_chunk_ref, real_size)); + Span::current().pb_inc(1); + } + // eprintln!("fin rec"); + Ok::<_, eyre::Report>(()) + } + ); + + submitter_task?; + receiver_task?; + + assert_eq!(dir_sources.len(), chunk_file_maps.len()); + + let chunkmaps = pipeline.finish_into_chunkmaps().await?; + + Ok(BackupDirSourcesReturn { + chunkmaps, + dir_source_returns: dir_sources + .into_iter() + .zip(chunk_file_maps.into_iter()) + .collect(), + }) +} + +async fn backup_virtual_sources( + sources: &BTreeMap, + now: DateTime, + pwc: Arc>, + new_unflushed_chunks: Arc>, +) -> eyre::Result> { + let mut joinset: JoinSet> = JoinSet::new(); + + for (source_name, source) in sources { + if source.is_virtual_source() { + joinset.spawn(backup_virtual_source( + get_pointer_name_at(source_name, now), + source.clone(), + pwc.clone(), + new_unflushed_chunks.clone(), + )); + } + } + + let mut results = Vec::new(); + while let Some(result_res_res) = joinset.join_next().await { + results.push(result_res_res??); + } + + Ok(results) +} + +struct VirtualSourceReturn { + pub pointer_name: String, + pub pointer: Pointer, + pub chunkmaps: Vec<(BloblogId, BTreeMap)>, +} + +async fn backup_virtual_source( + pointer_name: String, + source: SourceDescriptor, + pwc: Arc>, + new_unflushed_chunks: Arc>, +) -> eyre::Result { + let SourceDescriptorInner::VirtualSource(virtual_source) = &source.inner else { + bail!("bug: non-VS SDI passed to BVS"); + }; + + let mut storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks) + .await + .context("failed to create storing state")?; + let mut sbw = StoringBloblogWriters::default(); + let ((chunkref, size), mut sbw, mut storing_state) = tokio::task::spawn_blocking({ + let virtual_source = virtual_source.clone(); + move || -> eyre::Result<((RecursiveChunkRef, u64), StoringBloblogWriters, StoringState)> { + let child = open_stdout_backup_process(&virtual_source.extra_args, &virtual_source.helper)?; + Ok((storing_state.store_full_stream(child.stdout.unwrap(), &mut sbw).context("Failed to store stream into Yama pile")?, sbw, storing_state)) + } + }).await??; + + sbw.finish_bloblogs(&mut storing_state) + .await + .context("Failed to finish bloblogs")?; + let chunkmaps = storing_state.new_bloblogs; + + // Assemble and write a pointer + let uid = get_current_uid() as u16; + let gid = get_current_gid() as u16; + let tree = TreeNode::NormalFile { + mtime: SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0), + ownership: FilesystemOwnership { uid, gid }, + permissions: FilesystemPermissions { mode: 0o600 }, + size, + content: chunkref, + }; + let (uids, gids) = + create_uidgid_lookup_tables(&tree).context("failed to create uid/gid tables")?; + + let VirtualSourceKind::Stdout { filename } = &virtual_source.kind; + + Ok(VirtualSourceReturn { + pointer_name, + pointer: Pointer { + parent: None, + root: RootTreeNode { + name: filename.clone(), + node: tree, + }, + uids, + gids, + }, + chunkmaps, + }) +} + +pub fn open_stdout_backup_process( + extra_args: &HashMap, + program_name: &str, +) -> eyre::Result { + let mut child = Command::new(format!("datman-helper-{}-backup", program_name)) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .stdin(Stdio::piped()) + .spawn()?; + let mut child_stdin = child.stdin.as_mut().unwrap(); + serde_json::to_writer(&mut child_stdin, extra_args)?; + child_stdin.flush()?; + // close stdin! + child.stdin = None; + Ok(child) +} diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index 20b202f..3f3e027 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -15,6 +15,266 @@ You should have received a copy of the GNU General Public License along with Yama. If not, see . */ -pub fn main() -> eyre::Result<()> { +use clap::{Parser, Subcommand}; +use datman::backup::backup; +use datman::descriptor_config::{load_descriptor, SourceDescriptor}; +use datman::extract::{ + extract, load_pointers_for_extraction, merge_roots_for_batch_extract, select_to_extract, +}; +use eyre::{bail, Context, ContextCompat}; +use std::collections::{BTreeMap, BTreeSet}; +use std::path::PathBuf; +use std::str::FromStr; +use tracing::info; +use tracing_indicatif::IndicatifLayer; +use tracing_subscriber::filter::filter_fn; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::Layer; +use yama::get_hostname; +use yama::open::open_lock_and_update_cache; + +#[derive(Clone, Debug)] +pub struct PileAndPointer { + pub pile_path: Option, + pub pointer: PointerName, +} + +#[derive(Clone, Debug)] +#[repr(transparent)] +pub struct PointerName(String); + +impl FromStr for PointerName { + type Err = eyre::Error; + + fn from_str(s: &str) -> Result { + if !s + .chars() + .all(|c| c.is_alphanumeric() || ['_', '+', '-', ':'].contains(&c)) + { + bail!("Bad pointer name: {s:?}"); + } + Ok(PointerName(s.to_owned())) + } +} + +impl FromStr for PileAndPointer { + type Err = eyre::Error; + + fn from_str(s: &str) -> Result { + match s.split_once(":") { + None => Ok(PileAndPointer { + pile_path: None, + pointer: PointerName::from_str(s)?, + }), + Some((pile_path, pointer)) => Ok(PileAndPointer { + pile_path: Some(PathBuf::from(pile_path)), + pointer: PointerName::from_str(pointer)?, + }), + } + } +} + +#[derive(Clone, Debug)] +pub struct PileAndPointerWithSubTree { + pub pile_path: Option, + pub pointer: PointerName, + // TODO how to represent... + pub sub_tree: String, +} + +impl FromStr for PileAndPointerWithSubTree { + type Err = eyre::Error; + + fn from_str(s: &str) -> Result { + let (pile_path, pointer_and_subtree) = match s.split_once(":") { + None => (None, s), + Some((pile_path, pointer)) => (Some(PathBuf::from(pile_path)), pointer), + }; + + if let Some(slash) = pointer_and_subtree.find('/') { + Ok(PileAndPointerWithSubTree { + pile_path, + pointer: PointerName::from_str(&pointer_and_subtree[0..slash])?, + sub_tree: pointer_and_subtree[slash + 1..].to_owned(), + }) + } else { + Ok(PileAndPointerWithSubTree { + pile_path, + pointer: PointerName::from_str(&pointer_and_subtree)?, + sub_tree: String::new(), + }) + } + } +} + +#[derive(Parser, Clone, Debug)] +pub struct DatmanArgs { + #[arg(long, env = "DATMAN_CONFIG", default_value = "datman.toml")] + config: PathBuf, + + #[command(subcommand)] + command: DatmanCommand, +} + +#[derive(Subcommand, Clone, Debug)] +pub enum DatmanCommand { + BackupOne { + source_name: String, + pile_name: String, + }, + + BackupAll { + pile_name: String, + }, + + ExtractOne { + pile_name: String, + source_name: String, + destination: PathBuf, + }, + + ExtractAll { + pile_name: String, + destination: PathBuf, + }, +} + +const PROGRESS_SPANS: &'static [&'static str] = &[ + "store_file", + "storing", + "unpack_files", + "expand_chunkrefs", + "extract_files", +]; + +#[tokio::main] +pub async fn main() -> eyre::Result<()> { + let indicatif_layer = IndicatifLayer::new(); + let stderr_writer = indicatif_layer.get_stderr_writer(); + let indicatif_layer = indicatif_layer.with_filter(filter_fn(|span_metadata| { + span_metadata.target().starts_with("yama") && PROGRESS_SPANS.contains(&span_metadata.name()) + })); + + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "sqlx=warn,yama=debug,datman=debug,info".into()), + ) + .with(tracing_subscriber::fmt::layer().with_writer(stderr_writer)) + .with(indicatif_layer) + .init(); + + let args: DatmanArgs = dbg!(DatmanArgs::parse()); + + let descriptor = load_descriptor(&args.config) + .await + .context("failed to load Datman descriptor")?; + dbg!(&descriptor); + + match args.command { + DatmanCommand::BackupOne { + source_name, + pile_name, + } => { + let pile_connector_path = descriptor + .piles + .get(&pile_name) + .cloned() + .context("no pile by that name")?; + let lock_name = format!("{} datman backup {:?}", get_hostname(), source_name); + let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?; + + let source = descriptor + .sources + .get(&source_name) + .context("no source by that name")?; + let my_hostname = get_hostname(); + if &source.host != &my_hostname { + bail!( + "Current hostname is {:?}, not {:?} as expected for this source.", + my_hostname, + source.host + ); + } + + let mut sources_to_backup = BTreeMap::new(); + sources_to_backup.insert(source_name.clone(), source.clone()); + + backup(pwc, sources_to_backup).await?; + } + DatmanCommand::BackupAll { pile_name } => { + let pile_connector_path = descriptor + .piles + .get(&pile_name) + .cloned() + .context("no pile by that name")?; + let lock_name = format!("{} datman backupall", get_hostname()); + let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?; + + let my_hostname = get_hostname(); + let sources_to_backup: BTreeMap = descriptor + .sources + .clone() + .into_iter() + .filter(|(_, source)| &source.host == &my_hostname) + .collect(); + + if sources_to_backup.len() == 0 { + bail!( + "No sources to back up! The current hostname is {:?}; is it correct?", + my_hostname + ); + } + + info!( + "Backing up the following {} sources: {:?}", + sources_to_backup.len(), + sources_to_backup.keys().collect::>() + ); + + backup(pwc, sources_to_backup).await?; + } + DatmanCommand::ExtractOne { + pile_name, + source_name, + destination, + } => { + let pile_connector_path = descriptor + .piles + .get(&pile_name) + .cloned() + .context("no pile by that name")?; + let lock_name = format!("{} datman extract {:?}", get_hostname(), source_name); + let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?; + + let mut sources = BTreeSet::new(); + sources.insert(source_name.clone()); + let selected = select_to_extract(&pwc, sources, None, None, false).await?; + let mut for_extraction = load_pointers_for_extraction(pwc.clone(), selected).await?; + assert_eq!(for_extraction.len(), 1); + let root_node = for_extraction.remove(&source_name).unwrap(); + extract(pwc, root_node.node, &destination).await?; + } + DatmanCommand::ExtractAll { + pile_name, + destination, + } => { + let pile_connector_path = descriptor + .piles + .get(&pile_name) + .cloned() + .context("no pile by that name")?; + let lock_name = format!("{} datman extractall", get_hostname()); + let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?; + + let sources = descriptor.sources.keys().cloned().collect(); + let selected = select_to_extract(&pwc, sources, None, None, false).await?; + let for_extraction = load_pointers_for_extraction(pwc.clone(), selected).await?; + let merged_node = merge_roots_for_batch_extract(for_extraction); + extract(pwc, merged_node, &destination).await?; + } + } + Ok(()) } diff --git a/datman/src/datetime.rs b/datman/src/datetime.rs new file mode 100644 index 0000000..3291945 --- /dev/null +++ b/datman/src/datetime.rs @@ -0,0 +1,26 @@ +use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone}; +use eyre::bail; +use std::str::FromStr; + +pub struct HumanDateTime(pub DateTime); + +impl FromStr for HumanDateTime { + type Err = eyre::Error; + + fn from_str(s: &str) -> Result { + if let Ok(date_only) = NaiveDate::parse_from_str(s, "%Y-%m-%d") { + let local_datetime = Local + .from_local_datetime(&date_only.and_hms_opt(0, 0, 0).unwrap()) + .unwrap(); + Ok(HumanDateTime(local_datetime)) + } else if let Ok(date_and_time) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { + let local_datetime = Local.from_local_datetime(&date_and_time).unwrap(); + Ok(HumanDateTime(local_datetime)) + } else if let Ok(date_and_time) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { + let local_datetime = Local.from_local_datetime(&date_and_time).unwrap(); + Ok(HumanDateTime(local_datetime)) + } else { + bail!("Couldn't parse using any format. Use one of: 2021-05-16 OR 2021-05-16T17:42:14 OR 2021-05-16 17:42:14"); + } + } +} diff --git a/datman/src/descriptor_config.rs b/datman/src/descriptor_config.rs new file mode 100644 index 0000000..2dba230 --- /dev/null +++ b/datman/src/descriptor_config.rs @@ -0,0 +1,126 @@ +/* +This file is part of Yama. + +Yama is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +Yama is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with Yama. If not, see . +*/ + +use eyre::{Context, ContextCompat}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +// TODO how do we handle?: +// - (important) yama push of one pile to another +// - backup policy stuff like 'minimum backup frequency' ... show when it's not been done +// - backup policy stuff like 'minimum on two different disks, not powered at the same time...' + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Descriptor { + /// Sources + pub sources: HashMap, + + /// Paths to destination Yama Piles. Remote Piles need a local virtual pile to specify the layers. + pub piles: HashMap, + + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub retention: Option, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct RetentionPolicyConfig { + pub daily: u32, + pub weekly: u32, + pub monthly: u32, + pub yearly: u32, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct SourceDescriptor { + /// The host to run this backup task on. + pub host: String, + #[serde(flatten)] + pub inner: SourceDescriptorInner, +} + +impl SourceDescriptor { + pub fn is_directory_source(&self) -> bool { + matches!(&self.inner, &SourceDescriptorInner::DirectorySource { .. }) + } + + pub fn is_virtual_source(&self) -> bool { + matches!(&self.inner, &SourceDescriptorInner::VirtualSource { .. }) + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum SourceDescriptorInner { + DirectorySource { + path: PathBuf, + #[serde(default)] + cross_filesystems: bool, + + /// TODO Paths to ignore + #[serde(default)] + ignore: Vec, + }, + + VirtualSource(VirtualSource), +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct VirtualSource { + /// The name of the helper program that will be used to do this backup. + pub helper: String, + + /// The label that will be assigned to this source. + pub label: String, + + /// The kind of virtual source (how it operates). + pub kind: VirtualSourceKind, + + #[serde(flatten)] + pub extra_args: HashMap, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum VirtualSourceKind { + Stdout { + #[serde(rename = "stdout")] + filename: String, + }, + // TODO(feature) TempDir +} + +/// Loads a descriptor and resolves relative paths contained within. +pub async fn load_descriptor(path: &Path) -> eyre::Result { + let text = tokio::fs::read_to_string(path).await?; + let mut descriptor: Descriptor = toml::de::from_str(&text)?; + + let dir = path + .parent() + .context("there must be a parent path for the descriptor file")?; + + // Absolutise pile paths + for (_, pile_path) in descriptor.piles.iter_mut() { + *pile_path = dir + .join(&*pile_path) + .canonicalize() + .context("Failed to canonicalise path in descriptor")?; + } + + Ok(descriptor) +} diff --git a/datman/src/extract.rs b/datman/src/extract.rs new file mode 100644 index 0000000..561283a --- /dev/null +++ b/datman/src/extract.rs @@ -0,0 +1,182 @@ +use crate::datetime::HumanDateTime; +use crate::pointer_names::split_pointer_name; +use chrono::{DateTime, Utc}; +use eyre::{bail, eyre, Context, ContextCompat}; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, BTreeSet}; +use std::path::Path; +use std::sync::Arc; +use tracing::{info_span, warn, Instrument}; +use yama::extract; +use yama::extract::flatten_treenode; +use yama::pile_with_cache::PileWithCache; +use yama_pile::tree::{FilesystemOwnership, FilesystemPermissions, RootTreeNode, TreeNode}; +use yama_wormfile::boxed::BoxedWormFileProvider; + +/// Given a list of source names and conditions to find pointers within, +/// returns a mapping of source names to pointers. +pub async fn select_to_extract( + pwc: &PileWithCache, + sources: BTreeSet, + before: Option, + after: Option, + accept_partial: bool, +) -> eyre::Result> { + let before = before.map(|dt| dt.0.with_timezone(&Utc)); + let after = after.map(|dt| dt.0.with_timezone(&Utc)); + let pointers_list = pwc + .pile + .list_pointers() + .await + .context("failed to list pointers")?; + + select_to_extract_impl(pointers_list, sources, before, after, accept_partial) +} + +/// Given a list of source names and conditions to find pointers within, +/// returns a mapping of source names to pointers. +fn select_to_extract_impl( + pointers_list: Vec, + sources: BTreeSet, + before: Option>, + after: Option>, + accept_partial: bool, +) -> eyre::Result> { + if after.is_some() && before.is_some() { + bail!("Can't specify both before and after!"); + } + + let mut pointers_by_source: BTreeMap = BTreeMap::new(); + + for pointer in pointers_list { + if let Some((source_name, pointer_datetime)) = split_pointer_name(&pointer) { + if !sources.contains(&source_name) { + // Not a source that we're interested in. + continue; + } + if let Some(before) = before { + if before < pointer_datetime { + // datetime is after the 'before' time + continue; + } + } else if let Some(after) = after { + if pointer_datetime < after { + // datetime is before the 'after' time + continue; + } + } + + match pointers_by_source.entry(source_name) { + Entry::Vacant(ve) => { + ve.insert(pointer); + } + Entry::Occupied(mut oe) => { + let current_choice = oe.get_mut(); + let (_, current_datetime) = split_pointer_name(¤t_choice).unwrap(); + let should_replace = if after.is_some() { + // if we want the first one after a time, we want the earliest option! + // so replace if new datetime is earlier than current + pointer_datetime < current_datetime + } else { + // replace if new datetime is after current datetime + current_datetime < pointer_datetime + }; + if should_replace { + *current_choice = pointer; + } + } + } + }; + } + + if pointers_by_source.is_empty() { + bail!("No pointers selected for ANY of the sources: {sources:?}"); + } + + let missing: Vec<&String> = sources + .iter() + .filter(|src| !pointers_by_source.contains_key(*src)) + .collect(); + if !missing.is_empty() { + if accept_partial { + warn!("Some sources didn't have any pointers selected: {missing:?}. Continuing because --accept-partial passed."); + } else { + bail!("Some sources didn't have any pointers selected: {missing:?}. Pass --accept-partial if this is intended anyway."); + } + } + + Ok(pointers_by_source) +} + +pub async fn load_pointers_for_extraction( + pwc: Arc>, + what_to_extract: BTreeMap, +) -> eyre::Result> { + let mut result = BTreeMap::new(); + for (source_name, pointer_name) in &what_to_extract { + let pointer = pwc + .read_pointer_fully_integrated(&pointer_name) + .await? + .context("pointer doesn't exist??")?; + // TODO(ownership): adapt uid/gids here + result.insert(source_name.clone(), pointer.root); + } + Ok(result) +} + +pub fn merge_roots_for_batch_extract(extracts: BTreeMap) -> TreeNode { + let mut children = BTreeMap::new(); + + for (name, entry) in extracts { + if matches!(entry.node, TreeNode::NormalFile { .. }) { + let mut children2 = BTreeMap::new(); + children2.insert(entry.name, entry.node); + children.insert( + name, + TreeNode::Directory { + ownership: FilesystemOwnership { + // TODO(ownership): populate this correctly (current user?) + uid: 0, + gid: 0, + }, + permissions: FilesystemPermissions { mode: 0o700 }, + children: children2, + }, + ); + } else { + children.insert(name, entry.node); + } + } + + TreeNode::Directory { + ownership: FilesystemOwnership { + // TODO(ownership): populate this correctly (current user?) + uid: 0, + gid: 0, + }, + permissions: FilesystemPermissions { mode: 0o700 }, + children, + } +} + +pub async fn extract( + pwc: Arc>, + node: TreeNode, + destination: &Path, +) -> eyre::Result<()> { + let flat = flatten_treenode(&node)?; + drop(node); + + extract::unpack_nonfiles(destination, &flat.nonfiles, false, true).await?; + + let extract_span = info_span!("extract_files"); + extract::unpack_files(&pwc, destination, &flat.files, false, true) + .instrument(extract_span) + .await?; + + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + Ok(()) +} diff --git a/datman/src/lib.rs b/datman/src/lib.rs index 8b13789..c835c9a 100644 --- a/datman/src/lib.rs +++ b/datman/src/lib.rs @@ -1 +1,6 @@ +pub mod backup; +pub mod descriptor_config; +pub mod extract; +pub mod datetime; +pub mod pointer_names; diff --git a/datman/src/pointer_names.rs b/datman/src/pointer_names.rs new file mode 100644 index 0000000..1721ebe --- /dev/null +++ b/datman/src/pointer_names.rs @@ -0,0 +1,20 @@ +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; + +pub const POINTER_DATETIME_FORMAT: &'static str = "%F_%T"; +pub const POINTER_NAME_DATETIME_SPLITTER: &'static str = "+"; + +pub fn get_pointer_name_at(source_name: &str, datetime: DateTime) -> String { + format!( + "{}{}{}", + source_name, + POINTER_NAME_DATETIME_SPLITTER, + datetime.format(POINTER_DATETIME_FORMAT).to_string() + ) +} + +pub fn split_pointer_name(pointer_name: &str) -> Option<(String, DateTime)> { + let (source_name, date_time_str) = pointer_name.rsplit_once(POINTER_NAME_DATETIME_SPLITTER)?; + let date_time = NaiveDateTime::parse_from_str(date_time_str, POINTER_DATETIME_FORMAT).ok()?; + let date_time = Utc.from_utc_datetime(&date_time); + Some((source_name.to_owned(), date_time)) +} diff --git a/yama/Cargo.toml b/yama/Cargo.toml index d546e24..55d1e61 100644 --- a/yama/Cargo.toml +++ b/yama/Cargo.toml @@ -30,7 +30,7 @@ yama_midlevel_crypto = { path = "../yama_midlevel_crypto" } clap = { version = "4.2.2", features = ["derive"] } -tokio = { version = "1.27.0", features = ["io-std"] } +tokio = { version = "1.28.1", features = ["full"] } appdirs = "0.2.0" twox-hash = "1.6.3" hostname = "0.3.1" diff --git a/yama/src/bin/yama.rs b/yama/src/bin/yama.rs index 031f499..a6d6b02 100644 --- a/yama/src/bin/yama.rs +++ b/yama/src/bin/yama.rs @@ -17,6 +17,7 @@ along with Yama. If not, see . use clap::{Parser, Subcommand}; use eyre::{bail, eyre, Context, ContextCompat}; +use indicatif::ProgressStyle; use patricia_tree::PatriciaMap; use std::borrow::Cow; use std::iter::Iterator; @@ -24,32 +25,41 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use indicatif::ProgressStyle; use tokio::io::{stdin, AsyncBufReadExt, BufReader}; -use tracing::{info, info_span, warn, Span, Instrument}; -use tracing_indicatif::IndicatifLayer; +use tracing::{info, info_span, warn, Instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; +use tracing_indicatif::IndicatifLayer; use tracing_subscriber::filter::filter_fn; -use tracing_subscriber::Layer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::Layer; use users::{get_current_gid, get_current_uid}; use yama::extract::flatten_treenode; use yama::init::{generate_master_keyring, pack_keyring}; use yama::open::{open_keyring_interactive, open_pile, pre_open_keyring, update_cache}; use yama::pile_connector::PileConnectionScheme; use yama::scan::create_uidgid_lookup_tables; -use yama::storing::{assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState}; -use yama::{extract, get_hostname, init, PROGRESS_BAR_STYLE, scan}; +use yama::storing::{ + assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState, +}; +use yama::vacuum::forget_chunks::{find_forgettable_chunks, forget_chunks}; +use yama::vacuum::merge_indices::{MERGE_TARGET_SIZE, MERGE_THRESHOLD_SIZE}; +use yama::vacuum::repack_bloblogs_and_indices::{ + get_bloblogs_stats, perform_repack, select_bloblogs_for_repack, +}; +use yama::{check, extract, get_hostname, init, scan, vacuum, PROGRESS_BAR_STYLE}; use yama_midlevel_crypto::byte_layer::{ByteLayer, CborSerde}; use yama_midlevel_crypto::chunk_id::ChunkIdKey; use yama_pile::definitions::{ - PackedPileConfig, PileConfig, RecursiveChunkRef, SUPPORTED_YAMA_PILE_VERSION, + IndexBloblogEntry, PackedPileConfig, PileConfig, RecursiveChunkRef, SUPPORTED_YAMA_PILE_VERSION, }; use yama_pile::locks::LockKind; use yama_pile::pointers::Pointer; use yama_pile::tree::unpopulated::ScanEntry; -use yama_pile::tree::{assemble_tree_from_scan_entries, differentiate_node_in_place, FilesystemOwnership, FilesystemPermissions, RootTreeNode, TreeNode}; +use yama_pile::tree::{ + assemble_tree_from_scan_entries, differentiate_node_in_place, FilesystemOwnership, + FilesystemPermissions, RootTreeNode, TreeNode, +}; use yama_pile::FILE_YAMA_CONNECTOR; #[derive(Clone, Debug)] @@ -68,7 +78,7 @@ impl FromStr for PointerName { fn from_str(s: &str) -> Result { if !s .chars() - .all(|c| c.is_alphanumeric() || ['_', '+', '-'].contains(&c)) + .all(|c| c.is_alphanumeric() || ['_', '+', '-', ':'].contains(&c)) { bail!("Bad pointer name: {s:?}"); } @@ -182,9 +192,7 @@ pub enum YamaCommand { }, /// Extract an output stream from a Yama pile. - ExtractStdout { - source: PileAndPointerWithSubTree, - }, + ExtractStdout { source: PileAndPointerWithSubTree }, // TODO Mount { ... }, Check { @@ -198,10 +206,31 @@ pub enum YamaCommand { intensive: bool, }, // TODO lsp, rmp + /// Perform maintenance tasks, usually freeing up space or clumping together files to reduce + /// clutter. + Vacuum { + /// Perform all maintenance and space-saving tasks. + #[arg(long, short = 'a')] + all: bool, - // TODO vacuum + /// Merge indices together. Implied by -a. + #[arg(long, short = 'm')] + merge: bool, - // TODO `locks` to inspect locks + /// Forget chunks from indices. Implied by -a. + /// This process is slow because it involves walking all pointers to see which chunks can be + /// forgotten. + #[arg(long, short = 'f')] + forget: bool, + + /// Repack bloblogs and corresponding indices. Implied by -a. + #[arg(long, short = 'r')] + repack: bool, + + /// Delete unreferenced bloblogs. Implied by -a. + #[arg(long, short = 'd')] + delete_unrefd_bloblogs: bool, + }, // TODO `locks` to inspect locks } #[derive(Subcommand, Clone, Debug)] @@ -236,16 +265,21 @@ pub enum KeyringCommand { }, // TODO ChangePassword } -const PROGRESS_SPANS: &'static [&'static str] = &["store_file", "storing", "unpack_files", "expand_chunkrefs", "extract_files"]; +const PROGRESS_SPANS: &'static [&'static str] = &[ + "store_file", + "storing", + "unpack_files", + "expand_chunkrefs", + "extract_files", +]; #[tokio::main] async fn main() -> eyre::Result<()> { let indicatif_layer = IndicatifLayer::new(); let stderr_writer = indicatif_layer.get_stderr_writer(); - let indicatif_layer = indicatif_layer - .with_filter(filter_fn(|span_metadata| { - span_metadata.target().starts_with("yama") && PROGRESS_SPANS.contains(&span_metadata.name()) - })); + let indicatif_layer = indicatif_layer.with_filter(filter_fn(|span_metadata| { + span_metadata.target().starts_with("yama") && PROGRESS_SPANS.contains(&span_metadata.name()) + })); tracing_subscriber::registry() .with( @@ -307,9 +341,11 @@ async fn main() -> eyre::Result<()> { None } else { let zstd_dict_path = zstd_dict.unwrap(); - Some(Arc::new(tokio::fs::read(&zstd_dict_path) - .await - .with_context(|| format!("failed to read Zstd dict at {zstd_dict_path:?}"))?)) + Some(Arc::new( + tokio::fs::read(&zstd_dict_path).await.with_context(|| { + format!("failed to read Zstd dict at {zstd_dict_path:?}") + })?, + )) }; let pile_config = PileConfig { @@ -408,15 +444,23 @@ async fn main() -> eyre::Result<()> { let store_span = info_span!("storing"); // store_span.pb_set_style(&ProgressStyle::default_bar()); - store_span.pb_set_style(&ProgressStyle::default_bar().template( - PROGRESS_BAR_STYLE, - ).unwrap()); + store_span.pb_set_style( + &ProgressStyle::default_bar() + .template(PROGRESS_BAR_STYLE) + .unwrap(), + ); store_span.pb_set_message("storing files"); - store_span.pb_set_length(pruned_scan_entry_map.values() - .filter(|v| matches!(v, ScanEntry::NormalFile { .. })).count() as u64); + store_span.pb_set_length( + pruned_scan_entry_map + .values() + .filter(|v| matches!(v, ScanEntry::NormalFile { .. })) + .count() as u64, + ); let store_span_entered = store_span.enter(); - let (pipeline, pipeline_job_tx) = StoragePipeline::launch_new(4, pwc.clone()).await?; + let new_unflushed_chunks = Arc::new(Default::default()); + let (pipeline, pipeline_job_tx) = + StoragePipeline::launch_new(4, pwc.clone(), new_unflushed_chunks).await?; let source2 = source.clone(); let (submitter_task, receiver_task) = tokio::join!( @@ -426,8 +470,13 @@ async fn main() -> eyre::Result<()> { if let ScanEntry::NormalFile { .. } = scan_entry { let name = std::str::from_utf8(name_bytes.as_slice()) .context("name is not str")?; + let path = if name != "" { + source2.join(name) + } else { + source2.clone() + }; pipeline_job_tx - .send_async((name.to_owned(), source2.join(name))) + .send_async((name.to_owned(), path)) .await .map_err(|_| eyre!("unable to send to pipeline."))?; } @@ -497,8 +546,11 @@ async fn main() -> eyre::Result<()> { .await .context("failed to write pointer")?; - Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?; - }, + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + } YamaCommand::StoreStdin { destination, overwrite, @@ -514,7 +566,7 @@ async fn main() -> eyre::Result<()> { LockKind::Shared, format!("{} store {:?}", get_hostname(), destination.pointer), ) - .await?; + .await?; update_cache(&pwc).await?; let pwc = Arc::new(pwc); @@ -522,30 +574,54 @@ async fn main() -> eyre::Result<()> { let store_span = info_span!("storing"); // store_span.pb_set_style(&ProgressStyle::default_bar()); // TODO INDETERMINATE PROGRESS BAR with bytes shown? - store_span.pb_set_style(&ProgressStyle::default_bar().template( - PROGRESS_BAR_STYLE, - ).unwrap()); + store_span.pb_set_style( + &ProgressStyle::default_bar() + .template(PROGRESS_BAR_STYLE) + .unwrap(), + ); store_span.pb_set_message("storing files"); store_span.pb_set_length(1u64); // TODO Dirty - let store_span_entered = store_span.enter(); + let _store_span_entered = store_span.enter(); - - let mut storing_state = StoringState::new(pwc.clone()).await.context("failed to create storing state")?; + let new_unflushed_chunks = Arc::new(Default::default()); + let mut storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks) + .await + .context("failed to create storing state")?; let mut sbw = StoringBloblogWriters::default(); - let stdin = std::io::BufReader::new(io_streams::StreamReader::stdin().context("failed to open stdin")?); - let (chunkref, size) = storing_state.store_full_stream(stdin, &mut sbw).context("Failed to store stream into Yama pile")?; + let stdin = std::io::BufReader::new( + io_streams::StreamReader::stdin().context("failed to open stdin")?, + ); + let (chunkref, size) = storing_state + .store_full_stream(stdin, &mut sbw) + .context("Failed to store stream into Yama pile")?; - sbw.finish_bloblogs(&mut storing_state).await.context("Failed to finish bloblogs")?; + sbw.finish_bloblogs(&mut storing_state) + .await + .context("Failed to finish bloblogs")?; info!("Stream stored, writing indices..."); // Write indices for the new bloblogs we have created. This is a prerequisite for creating a pointer. let chunkmaps = storing_state.new_bloblogs; - assemble_and_write_indices(&pwc, chunkmaps) - .await - .context("failed to assemble and write indices")?; + assemble_and_write_indices( + &pwc, + chunkmaps + .into_iter() + .map(|(k, nb)| { + ( + k, + IndexBloblogEntry { + chunks: nb, + forgotten_bytes: 0, + }, + ) + }) + .collect(), + ) + .await + .context("failed to assemble and write indices")?; info!("All indices stored, writing pointer..."); @@ -553,7 +629,10 @@ async fn main() -> eyre::Result<()> { let uid = get_current_uid() as u16; let gid = get_current_gid() as u16; let tree = TreeNode::NormalFile { - mtime: SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_millis() as u64).unwrap_or(0), + mtime: SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0), ownership: FilesystemOwnership { uid, gid }, permissions: FilesystemPermissions { mode: 0o600 }, size, @@ -579,8 +658,11 @@ async fn main() -> eyre::Result<()> { .await .context("failed to write pointer")?; - Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?; - }, + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + } YamaCommand::Extract { source, destination, @@ -593,13 +675,15 @@ async fn main() -> eyre::Result<()> { let keyring = pre_open_keyring(&pile_connector_path).await?; let keyring = open_keyring_interactive(keyring).await?; - let pwc = Arc::new(open_pile( - &pile_connector_path, - keyring, - LockKind::Shared, - format!("{} store {:?}", get_hostname(), source.pointer), - ) - .await?); + let pwc = Arc::new( + open_pile( + &pile_connector_path, + keyring, + LockKind::Shared, + format!("{} store {:?}", get_hostname(), source.pointer), + ) + .await?, + ); update_cache(&pwc).await?; let pointer = pwc @@ -639,11 +723,12 @@ async fn main() -> eyre::Result<()> { .instrument(extract_span) .await?; - Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?; - }, - YamaCommand::ExtractStdout { - source, - } => { + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + } + YamaCommand::ExtractStdout { source } => { let pile_connector_path = source .pile_path .as_ref() @@ -652,13 +737,15 @@ async fn main() -> eyre::Result<()> { let keyring = pre_open_keyring(&pile_connector_path).await?; let keyring = open_keyring_interactive(keyring).await?; - let pwc = Arc::new(open_pile( - &pile_connector_path, - keyring, - LockKind::Shared, - format!("{} store {:?}", get_hostname(), source.pointer), - ) - .await?); + let pwc = Arc::new( + open_pile( + &pile_connector_path, + keyring, + LockKind::Shared, + format!("{} store {:?}", get_hostname(), source.pointer), + ) + .await?, + ); update_cache(&pwc).await?; let pointer = pwc @@ -690,9 +777,7 @@ async fn main() -> eyre::Result<()> { }; let chunkref = match node { - TreeNode::NormalFile { content, .. } => { - content - } + TreeNode::NormalFile { content, .. } => content, TreeNode::Directory { .. } => { bail!("Can't extract `Directory` to stdout!"); } @@ -705,13 +790,126 @@ async fn main() -> eyre::Result<()> { }; let extract_span = info_span!("extract_files"); - let stream = std::io::BufWriter::new(io_streams::StreamWriter::stdout().context("failed to open stdout")?); + let stream = std::io::BufWriter::new( + io_streams::StreamWriter::stdout().context("failed to open stdout")?, + ); extract::unpack_sync_stream(&pwc, *chunkref, stream) .instrument(extract_span) .await?; - Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?; - }, + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + } + YamaCommand::Check { + pointers, + shallow, + intensive, + } => { + if !pointers && !shallow && !intensive { + bail!("Check level not chosen. Try -2"); + } + if pointers { + bail!("pointers check not implemented yet. Try -2"); + } + if intensive { + bail!("intensive check not implemented yet. Try -2"); + } + + let pile_connector_path = Path::new("."); + let keyring = pre_open_keyring(&pile_connector_path).await?; + let keyring = open_keyring_interactive(keyring).await?; + + let pwc = Arc::new( + open_pile( + &pile_connector_path, + keyring, + LockKind::Shared, + format!("{} check", get_hostname()), + ) + .await?, + ); + update_cache(&pwc).await?; + + if shallow { + check::check_pointers_point_to_indexed_chunks(&pwc) + .await + .context("shallow check failed")?; + } + + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + } + YamaCommand::Vacuum { + all, + merge, + forget, + repack, + delete_unrefd_bloblogs, + } => { + let pile_connector_path = Path::new("."); + let keyring = pre_open_keyring(&pile_connector_path).await?; + let keyring = open_keyring_interactive(keyring).await?; + + // TODO figure out how we use the pendingexclusive thing again... + let pwc = Arc::new( + open_pile( + &pile_connector_path, + keyring, + LockKind::Exclusive, + format!("{} vacuum", get_hostname()), + ) + .await?, + ); + update_cache(&pwc).await?; + + if all || merge { + let to_merge = vacuum::merge_indices::select_indices_for_merge( + &pwc, + MERGE_TARGET_SIZE, + MERGE_THRESHOLD_SIZE, + ) + .await + .context("failed to select indices for merge")?; + vacuum::merge_indices::merge_indices(&pwc, to_merge) + .await + .context("failed to merge indices")?; + update_cache(&pwc).await?; + } + + if all || forget { + // TODO: allow running on smaller sets of indices than all of them + let all_indices = { + let mut cache_conn = pwc.localcache.read().await?; + cache_conn.list_indices().await? + }; + + let forgettable_chunks = find_forgettable_chunks(&pwc, all_indices.clone()).await?; + info!("{} chunks can be forgotten", forgettable_chunks.len()); + forget_chunks(&pwc, all_indices, forgettable_chunks).await?; + update_cache(&pwc).await?; + } + + if all || repack { + let bloblog_stats = get_bloblogs_stats(&pwc).await?; + let to_repack = select_bloblogs_for_repack(bloblog_stats).await?; + info!("{} repack groups to be processed.", to_repack.len()); + perform_repack(pwc.clone(), to_repack).await?; + update_cache(&pwc).await?; + } + + if all || delete_unrefd_bloblogs { + todo!(); + } + + Arc::try_unwrap(pwc) + .map_err(|_| eyre!("pwc still in use; can't close down gracefully"))? + .close() + .await?; + } _other => todo!(), } diff --git a/yama/src/bin/yamascan.rs b/yama/src/bin/yamascan.rs index 6b88a87..bf58473 100644 --- a/yama/src/bin/yamascan.rs +++ b/yama/src/bin/yamascan.rs @@ -99,16 +99,15 @@ async fn main() -> eyre::Result<()> { &root_display_node, false, ) - }, - YamaScanCommand::Ignore { - path, unanchored - } => { + } + YamaScanCommand::Ignore { path, unanchored } => { let mut oo = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(false) - .open(".yamaignore").await + .open(".yamaignore") + .await .context("failed to open .yamaignore for r/w")?; let pos = oo.seek(SeekFrom::End(0)).await?; if pos > 1 { @@ -127,7 +126,7 @@ async fn main() -> eyre::Result<()> { oo.flush().await?; drop(oo); - }, + } _other => todo!(), } diff --git a/yama/src/check.rs b/yama/src/check.rs new file mode 100644 index 0000000..7b08197 --- /dev/null +++ b/yama/src/check.rs @@ -0,0 +1,64 @@ +use crate::extract::expand_chunkrefs; +use crate::pile_with_cache::PileWithCache; +use eyre::{bail, ContextCompat}; +use std::collections::BTreeSet; +use std::sync::Arc; +use tracing::info; +use yama_midlevel_crypto::chunk_id::ChunkId; +use yama_pile::tree::TreeNode; +use yama_wormfile::boxed::BoxedWormFileProvider; + +/// Check that all pointers point to chunks that exist **in our local cache**. +pub async fn check_pointers_point_to_indexed_chunks( + pwc: &Arc>, +) -> eyre::Result<()> { + let pointer_names = pwc.pile.list_pointers().await?; + let mut rcrs_to_check = BTreeSet::new(); + for pointer_name in &pointer_names { + let pointer = pwc + .pile + .read_pointer(pointer_name) + .await? + .context("pointer vanished")?; + if let Some(parent_name) = pointer.parent { + if !pointer_names.contains(pointer_name) { + bail!("{parent_name:?}, the parent of {pointer_name:?}, does not exist"); + } + } + + pointer + .root + .node + .visit( + &mut |node, _| { + if let TreeNode::NormalFile { content, .. } = node { + rcrs_to_check.insert(*content); + } + Ok(()) + }, + String::new(), + ) + .unwrap(); + } + + let chunk_ids: BTreeSet = + expand_chunkrefs(pwc, rcrs_to_check.into_iter().map(|x| ((), x))) + .await? + .into_iter() + .map(|(_, x)| x) + .flatten() + .collect(); + + info!("{} chunks to check for existence", chunk_ids.len()); + + let mut cache = pwc.localcache.read().await?; + + let resolved_chunks = cache.locate_chunks(&chunk_ids).await?; + + if chunk_ids.len() != resolved_chunks.len() { + bail!("Not all chunk IDs could be resolved. TODO: this check error is currently not granular enough."); + } + info!("All {} chunks accounted for!", resolved_chunks.len()); + + Ok(()) +} diff --git a/yama/src/extract.rs b/yama/src/extract.rs index cf5275d..61a2e8a 100644 --- a/yama/src/extract.rs +++ b/yama/src/extract.rs @@ -1,17 +1,18 @@ use crate::pile_with_cache::PileWithCache; use crate::retriever::decompressor::PipelineDecompressor; use crate::retriever::{create_fixed_retriever, FileId, JobChunkReq, JobId, RetrieverResp}; -use eyre::{bail, ensure, Context, ContextCompat, eyre}; +use crate::PROGRESS_BAR_STYLE; +use eyre::{bail, ensure, eyre, Context, ContextCompat}; use flume::Receiver; +use indicatif::ProgressStyle; use patricia_tree::PatriciaMap; use std::cmp::Reverse; use std::collections::{BTreeMap, BTreeSet}; use std::fs::Permissions; use std::io::Write; use std::os::unix::fs::PermissionsExt; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use indicatif::ProgressStyle; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; @@ -22,7 +23,6 @@ use yama_pile::definitions::{BloblogId, RecursiveChunkRef}; use yama_pile::tree::unpopulated::ScanEntry; use yama_pile::tree::{FilesystemPermissions, TreeNode}; use yama_wormfile::boxed::BoxedWormFileProvider; -use crate::PROGRESS_BAR_STYLE; #[derive(Clone, Debug, Default)] pub struct FlattenedTree { @@ -93,7 +93,7 @@ pub fn flatten_treenode(root_node: &TreeNode) -> eyre::Result { /// Create directories and symbolic links. pub async fn unpack_nonfiles( - root: &PathBuf, + root: &Path, nonfiles: &PatriciaMap, restore_ownership: bool, restore_permissions: bool, @@ -133,7 +133,7 @@ pub async fn unpack_nonfiles( // TODO(perf): move out file writes into separate tasks... pub async fn unpack_files( pwc: &Arc>, - root: &PathBuf, + root: &Path, files: &PatriciaMap<(ScanEntry, RecursiveChunkRef)>, restore_ownership: bool, restore_permissions: bool, @@ -149,7 +149,10 @@ pub async fn unpack_files( ) .await?; - let total_chunks = expanded_chunkrefs.iter().map(|(_, cs)| cs.len() as u64).sum::(); + let total_chunks = expanded_chunkrefs + .iter() + .map(|(_, cs)| cs.len() as u64) + .sum::(); let unpack_span = info_span!("unpack_files"); async move { @@ -236,24 +239,26 @@ pub async fn unpack_files( }.instrument(unpack_span).await } -pub async fn unpack_sync_stream(pwc: &Arc>, - chunkref: RecursiveChunkRef, - mut stream: impl Write, +pub async fn unpack_sync_stream( + pwc: &Arc>, + chunkref: RecursiveChunkRef, + mut stream: impl Write, ) -> eyre::Result<()> { - let expanded_chunkrefs = expand_chunkrefs( - pwc, - vec![((), chunkref)].into_iter(), - ) - .await?; + let expanded_chunkrefs = expand_chunkrefs(pwc, vec![((), chunkref)].into_iter()).await?; - let total_chunks = expanded_chunkrefs.iter().map(|(_, cs)| cs.len() as u64).sum::(); + let total_chunks = expanded_chunkrefs + .iter() + .map(|(_, cs)| cs.len() as u64) + .sum::(); let unpack_span = info_span!("unpack_files"); async move { let unpack_span = Span::current(); - unpack_span.pb_set_style(&ProgressStyle::default_bar().template( - PROGRESS_BAR_STYLE, - ).unwrap()); + unpack_span.pb_set_style( + &ProgressStyle::default_bar() + .template(PROGRESS_BAR_STYLE) + .unwrap(), + ); unpack_span.pb_set_message("unpack"); unpack_span.pb_set_length(total_chunks); @@ -264,16 +269,14 @@ pub async fn unpack_sync_stream(pwc: &Arc>, while let Ok(next_part) = file_part_retriever.recv_async().await { match next_part { RetrieverResp::Blob { blob, .. } => { - tokio::task::block_in_place(|| { - stream.write_all(&blob) - }).context("Failed to write to output stream on Blob")?; + tokio::task::block_in_place(|| stream.write_all(&blob)) + .context("Failed to write to output stream on Blob")?; unpack_span.pb_inc(1); } RetrieverResp::JobComplete(_) => { - tokio::task::block_in_place(|| { - stream.flush() - }).context("Failed to flush output stream on JobComplete")?; + tokio::task::block_in_place(|| stream.flush()) + .context("Failed to flush output stream on JobComplete")?; done = true; } } @@ -284,10 +287,17 @@ pub async fn unpack_sync_stream(pwc: &Arc>, } Ok(()) - }.instrument(unpack_span).await + } + .instrument(unpack_span) + .await } -async fn file_unpacker_writer(path: PathBuf, permissions: FilesystemPermissions, restore_permissions: bool, rx: Receiver>>) -> eyre::Result<()> { +async fn file_unpacker_writer( + path: PathBuf, + permissions: FilesystemPermissions, + restore_permissions: bool, + rx: Receiver>>, +) -> eyre::Result<()> { let mut oo = OpenOptions::new(); oo.write(true).create_new(true); if restore_permissions { @@ -301,15 +311,12 @@ async fn file_unpacker_writer(path: PathBuf, permissions: FilesystemPermissions, loop { match rx.recv_async().await { Ok(Some(next_block)) => { - file.write_all(&next_block) - .await?; - }, + file.write_all(&next_block).await?; + } Ok(None) => { - file.flush() - .await - .context("failed to flush")?; + file.flush().await.context("failed to flush")?; return Ok(()); - }, + } Err(_) => { bail!("rx for file unpacking into {path:?} disconnected unexpectedly"); } @@ -317,7 +324,7 @@ async fn file_unpacker_writer(path: PathBuf, permissions: FilesystemPermissions, } } -async fn expand_chunkrefs( +pub(crate) async fn expand_chunkrefs( pwc: &Arc>, chunkrefs: impl Iterator, ) -> eyre::Result)>> { @@ -337,13 +344,21 @@ async fn expand_chunkrefs( } let ec_span = info_span!("expand_chunkrefs"); - ec_span.pb_set_style(&ProgressStyle::default_bar().template( - PROGRESS_BAR_STYLE, - ).unwrap()); - ec_span.pb_set_length(ts_and_chunks.iter().map(|(_, cs)| cs.len() as u64).sum::()); + ec_span.pb_set_style( + &ProgressStyle::default_bar() + .template(PROGRESS_BAR_STYLE) + .unwrap(), + ); + ec_span.pb_set_length( + ts_and_chunks + .iter() + .map(|(_, cs)| cs.len() as u64) + .sum::(), + ); ec_span.pb_set_message(&format!("resolve (d={next_depth})")); let expanded_ts_and_chunks = expand_chunkrefs_one_layer(pwc, ts_and_chunks) - .instrument(ec_span).await?; + .instrument(ec_span) + .await?; by_depth .entry(Reverse(next_depth - 1)) .or_default() @@ -413,7 +428,7 @@ async fn lookup_chunkrefs_and_create_retriever( Ok((retriever, out_by_job)) } -async fn expand_chunkrefs_one_layer( +pub(crate) async fn expand_chunkrefs_one_layer( pwc: &Arc>, input: Vec<(T, Vec)>, ) -> eyre::Result)>> { diff --git a/yama/src/lib.rs b/yama/src/lib.rs index d391b1c..fbb9589 100644 --- a/yama/src/lib.rs +++ b/yama/src/lib.rs @@ -1,6 +1,7 @@ pub mod init; pub mod open; +pub mod check; pub mod extract; pub mod scan; pub mod storing; @@ -11,7 +12,8 @@ pub mod pile_with_cache; pub mod retriever; -pub const PROGRESS_BAR_STYLE: &'static str = "[{elapsed_precise}]/[{eta}] {wide_bar:.cyan/blue} {pos:>7}/{len:7} {msg}"; +pub const PROGRESS_BAR_STYLE: &'static str = + "[{elapsed_precise}]/[{eta}] {wide_bar:.cyan/blue} {pos:>7}/{len:7} {msg}"; pub fn get_hostname() -> String { hostname::get() diff --git a/yama/src/open.rs b/yama/src/open.rs index 3ccc6aa..ca7d86f 100644 --- a/yama/src/open.rs +++ b/yama/src/open.rs @@ -4,7 +4,7 @@ use eyre::{bail, Context, ContextCompat}; use std::borrow::Cow; use std::collections::BTreeSet; use std::hash::{Hash, Hasher}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use tracing::debug; @@ -98,7 +98,10 @@ pub async fn open_pile( connection_scheme.hash(&mut hasher); let u64_hash = hasher.finish(); - let base_name = connector_in_dir + let canon_connector_in_dir = connector_in_dir + .canonicalize() + .unwrap_or(connector_in_dir.to_owned()); + let base_name = canon_connector_in_dir .file_name() .map(|f| f.to_string_lossy()) .unwrap_or(Cow::Borrowed("_")); @@ -165,3 +168,16 @@ pub async fn update_cache(pwc: &PileWithCache) -> eyre::R Ok(()) } + +pub async fn open_lock_and_update_cache( + pile_connector_path: PathBuf, + lock_name: String, +) -> eyre::Result>> { + let keyring = pre_open_keyring(&pile_connector_path).await?; + let keyring = open_keyring_interactive(keyring).await?; + + let pwc = open_pile(&pile_connector_path, keyring, LockKind::Shared, lock_name).await?; + update_cache(&pwc).await?; + + Ok(Arc::new(pwc)) +} diff --git a/yama/src/retriever.rs b/yama/src/retriever.rs index 2f7cfa2..47bbe30 100644 --- a/yama/src/retriever.rs +++ b/yama/src/retriever.rs @@ -48,6 +48,7 @@ struct FileRegionMarker { pub subjob: u32, } +#[derive(Debug)] struct OpenFileState { pub req_tx: Sender, pub offset: u64, @@ -61,16 +62,13 @@ struct OpenFileReq { pub subjob: u32, } +#[derive(Debug)] struct ActiveJobState { pub subjobs: Vec, pub next_subjob: u32, pub inflight: u32, } -pub struct Retriever { - job_tx: Sender<(JobId, Vec)>, -} - struct RetrieverInternals { pwc: Arc>, jobs_queue: BTreeMap>, @@ -141,6 +139,7 @@ impl RetrieverInternals { offset: u64, length: u64, ) -> eyre::Result<()> { + // debug!("sched {job:?}->{subjob:?}"); open_file .req_tx .send_async(OpenFileReq { @@ -205,7 +204,8 @@ impl RetrieverInternals { }) .await .expect("completions shut"); - // eprintln!("completion of{next_job:?}"); + + // debug!("read,acking! {:?}", next_job); ack_tx.send_async(next_job.job).await?; } @@ -213,7 +213,12 @@ impl RetrieverInternals { } async fn retrieval_task(&mut self) -> eyre::Result<()> { + // let mut icount = 0u64; loop { + // icount += 1; + // debug!("[{icount}] active jobs {:#?}", self.active_jobs); + // debug!("[{icount}] open files {:#?}", self.open_files); + // 0. Try to progress open jobs if they are staring right at the bytes they need... let mut to_remove = Vec::new(); for (active_job_id, active_job) in &mut self.active_jobs { @@ -226,9 +231,17 @@ impl RetrieverInternals { to_remove.push(*active_job_id); continue; } + // Which file we are 'staring at' and requesting a run of chunks from + let mut stare_file = None; 'single_job_staring: loop { let desired_blob = &active_job.subjobs[active_job.next_subjob as usize]; + if stare_file.is_some() && stare_file != Some(desired_blob.file) { + // We have changed which file we are looking at, we can't request any further + // because they might get retrieved out of order. + break 'single_job_staring; + } if let Some(open_file) = self.open_files.get_mut(&desired_blob.file) { + stare_file = Some(desired_blob.file); if open_file.offset == desired_blob.offset { Self::file_request( open_file, @@ -255,12 +268,15 @@ impl RetrieverInternals { // this job is to be finished! break 'single_job_staring; } + } else { + break 'single_job_staring; } } else { break 'single_job_staring; } } } + for remove in to_remove { self.active_jobs.remove(&remove); // eprintln!("job complete {remove:?}"); @@ -354,6 +370,7 @@ impl RetrieverInternals { files_to_open.insert(desired_blob.file); } } + if !files_to_open.is_empty() { for file in files_to_open { self.open_file(file).await?; diff --git a/yama/src/retriever/decompressor.rs b/yama/src/retriever/decompressor.rs index 5bdbc5b..0720888 100644 --- a/yama/src/retriever/decompressor.rs +++ b/yama/src/retriever/decompressor.rs @@ -175,10 +175,10 @@ impl PipelineDecompressor { ); } - let state = self - .processing - .get_mut(&job) - .context("bad job/not starting at 0 for job")?; + // debug!("blob {job:?} {subjob:?}"); + let state = self.processing.get_mut(&job).with_context(|| { + format!("bad job/not starting at 0 for job {job:?} (subjob={subjob:?})") + })?; ensure!( state.next_enqueue_subjob == subjob, "out of order Blob commands" @@ -196,6 +196,8 @@ impl PipelineDecompressor { .context("bad job to complete")?; state.complete = true; + // debug!("complete {job:?}"); + let can_remove = state.next_submit_subjob == state.next_enqueue_subjob; if can_remove { diff --git a/yama/src/scan.rs b/yama/src/scan.rs index 2f6631a..dba486f 100644 --- a/yama/src/scan.rs +++ b/yama/src/scan.rs @@ -1,4 +1,4 @@ -use eyre::{bail, eyre, Context}; +use eyre::{bail, eyre, Context, ContextCompat}; use ignore::WalkBuilder; use patricia_tree::PatriciaMap; use std::collections::{BTreeMap, BTreeSet}; @@ -95,6 +95,19 @@ pub fn relative_path(base: &Path, leaf: &Path) -> Option { /// Aborts if any errors (permission, bad .yamaignore files, etc) are encountered. /// In the future, we possibly want to consider allowing pub fn scan(root: &Path, ignores: &Vec) -> eyre::Result> { + let mut entries: PatriciaMap = PatriciaMap::new(); + + if !root.is_dir() { + let metadata = std::fs::symlink_metadata(root).context("reading metadata of root")?; + entries.insert( + "", + scan_one_no_recurse(root, metadata) + .context("failed to generate scan entry for root")? + .context("root probably doesn't exist, or is ignored?")?, + ); + return Ok(entries); + } + let mut walker = WalkBuilder::new(root); walker .standard_filters(false) @@ -108,8 +121,6 @@ pub fn scan(root: &Path, ignores: &Vec) -> eyre::Result = PatriciaMap::new(); - for entry in walker { let entry = entry?; diff --git a/yama/src/storing.rs b/yama/src/storing.rs index 0331d94..cd8c65c 100644 --- a/yama/src/storing.rs +++ b/yama/src/storing.rs @@ -5,6 +5,7 @@ use fastcdc::v2020::{FastCDC, StreamCDC}; use flume::{Receiver, RecvError, SendError, Sender}; use std::cmp::Reverse; use std::collections::{BTreeMap, BTreeSet}; +use std::fmt::Debug; use std::io::Read; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -47,7 +48,10 @@ pub struct StoringState { } impl StoringState { - pub async fn new(pwc: Arc>) -> eyre::Result { + pub async fn new( + pwc: Arc>, + new_unflushed_chunks: Arc>, + ) -> eyre::Result { let compressor = match pwc.pile.pile_config.zstd_dict.as_ref() { None => { Compressor::new(get_zstd_level()).context("can't create dictless compressor")? @@ -59,7 +63,7 @@ impl StoringState { let chunk_id_key = pwc.pile.pile_config.chunk_id_key; Ok(StoringState { cache_conn: pwc.localcache.read().await?, - new_unflushed_chunks: Arc::new(Default::default()), + new_unflushed_chunks, new_bloblogs: vec![], pwc, chunk_id_key, @@ -130,6 +134,35 @@ impl StoringState { Ok(slot.as_mut().unwrap()) } + /// For internal use only. + fn process_chunk( + &mut self, + chunk_bytes: &[u8], + result: &mut Vec, + slot: &mut Option>>>, + ) -> eyre::Result<()> { + let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key); + result.push(chunk_id); + let is_new = Handle::current().block_on(async { + Ok::( + self.cache_conn.is_chunk_new(chunk_id).await? + && self.new_unflushed_chunks.insert(chunk_id), + ) + })?; + + if is_new { + let compressed_bytes = self.compressor.compress(&chunk_bytes)?; + + Handle::current().block_on(async { + let writer = self.obtain_bloblog_writer(slot).await?; + writer.write_chunk(chunk_id, &compressed_bytes).await?; + Ok::<(), eyre::Report>(()) + })?; + } + + Ok(()) + } + fn store_full_slice_returning_chunks( &mut self, store_slice: &[u8], @@ -137,26 +170,14 @@ impl StoringState { ) -> eyre::Result> { task::block_in_place(|| { let mut result = Vec::new(); + for chunk in FastCDC::new(store_slice, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) { let chunk_bytes = &store_slice[chunk.offset..chunk.offset + chunk.length]; - let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key); - result.push(chunk_id); - let is_new = Handle::current().block_on(async { - Ok::( - self.cache_conn.is_chunk_new(chunk_id).await? - && self.new_unflushed_chunks.insert(chunk_id), - ) - })?; + self.process_chunk(chunk_bytes, &mut result, slot)? + } - if is_new { - let compressed_bytes = self.compressor.compress(&chunk_bytes)?; - - Handle::current().block_on(async { - let writer = self.obtain_bloblog_writer(slot).await?; - writer.write_chunk(chunk_id, &compressed_bytes).await?; - Ok::<(), eyre::Report>(()) - })?; - } + if result.is_empty() { + self.process_chunk(&[], &mut result, slot)?; } Ok(result) @@ -175,24 +196,11 @@ impl StoringState { let chunk = chunk.context("failed to read in for StreamCDC")?; let chunk_bytes = chunk.data.as_slice(); stream_length += chunk_bytes.len() as u64; - let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key); - result.push(chunk_id); - let is_new = Handle::current().block_on(async { - Ok::( - self.cache_conn.is_chunk_new(chunk_id).await? - && self.new_unflushed_chunks.insert(chunk_id), - ) - })?; + self.process_chunk(chunk_bytes, &mut result, slot)?; + } - if is_new { - let compressed_bytes = self.compressor.compress(&chunk_bytes)?; - - Handle::current().block_on(async { - let writer = self.obtain_bloblog_writer(slot).await?; - writer.write_chunk(chunk_id, &compressed_bytes).await?; - Ok::<(), eyre::Report>(()) - })?; - } + if result.is_empty() { + self.process_chunk(&[], &mut result, slot)?; } Ok((result, stream_length)) @@ -257,10 +265,13 @@ impl StoringState { depth += 1; } - Ok((RecursiveChunkRef { - chunk_id: chunk_ids[0], - depth, - }, stream_length)) + Ok(( + RecursiveChunkRef { + chunk_id: chunk_ids[0], + depth, + }, + stream_length, + )) } } @@ -276,14 +287,14 @@ async fn store_file( Ok((chunkref, size_of_file as u64)) } -pub struct StoragePipeline { - result_rx: Receiver<(String, RecursiveChunkRef, u64)>, +pub struct StoragePipeline { + result_rx: Receiver<(JobName, RecursiveChunkRef, u64)>, join_set: JoinSet>, } -async fn storage_pipeline_worker( - job_rx: Receiver<(String, PathBuf)>, - result_tx: Sender<(String, RecursiveChunkRef, u64)>, +async fn storage_pipeline_worker( + job_rx: Receiver<(JobName, PathBuf)>, + result_tx: Sender<(JobName, RecursiveChunkRef, u64)>, mut storing_state: StoringState, ) -> eyre::Result { let mut bloblog_writers = StoringBloblogWriters::default(); @@ -292,22 +303,24 @@ async fn storage_pipeline_worker( while let Ok((job_id, file_path)) = job_rx.recv_async().await { let span = info_span!("store_file", file=?file_path); - let span_enter = span.enter(); - // debug!("SPW job {job_id:?}"); - let (rec_chunk_ref, file_length) = - store_file(&file_path, &mut storing_state, &mut bloblog_writers) - .await - .with_context(|| format!("failed to store {file_path:?}"))?; - // debug!("SPW good {job_id:?}"); - if let Err(SendError(to_be_sent)) = result_tx - .send_async((job_id, rec_chunk_ref, file_length)) - .await - { - bail!("Can't return result for {to_be_sent:?} — result_tx shut down."); - } - drop(span_enter); - drop(span); + async { + // debug!("SPW job {job_id:?}"); + let (rec_chunk_ref, file_length) = + store_file(&file_path, &mut storing_state, &mut bloblog_writers) + .await + .with_context(|| format!("failed to store {file_path:?}"))?; + // debug!("SPW good {job_id:?}"); + if let Err(SendError(to_be_sent)) = result_tx + .send_async((job_id, rec_chunk_ref, file_length)) + .await + { + bail!("Can't return result for {to_be_sent:?} — result_tx shut down."); + } + Ok(()) + } + .instrument(span) + .await? } debug!("SPW shutdown"); @@ -322,11 +335,12 @@ fn get_zstd_level() -> i32 { return 12; } -impl StoragePipeline { +impl StoragePipeline { pub async fn launch_new( workers: u32, pwc: Arc>, - ) -> eyre::Result<(StoragePipeline, Sender<(String, PathBuf)>)> { + new_unflushed_chunks: Arc>, + ) -> eyre::Result<(StoragePipeline, Sender<(JobName, PathBuf)>)> { let (job_tx, job_rx) = flume::bounded(16); let (result_tx, result_rx) = flume::bounded(4); @@ -334,7 +348,9 @@ impl StoragePipeline { for spw_num in 0..workers { let job_rx = job_rx.clone(); let result_tx = result_tx.clone(); - let storing_state = StoringState::new(pwc.clone()).await.context("failed to create storing state")?; + let storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks.clone()) + .await + .context("failed to create storing state")?; // make a logging span for the Storage Pipeline Workers let spw_span = info_span!("spw", n = spw_num); join_set.spawn( @@ -359,48 +375,48 @@ impl StoragePipeline { } #[inline] - pub async fn next_result(&self) -> Result<(String, RecursiveChunkRef, u64), RecvError> { + pub async fn next_result(&self) -> Result<(JobName, RecursiveChunkRef, u64), RecvError> { self.result_rx.recv_async().await } /// Must be sure that all results have been collected first. pub async fn finish_into_chunkmaps( mut self, - ) -> eyre::Result)>> { + ) -> eyre::Result> { if let Ok(msg) = self.result_rx.recv_async().await { bail!("Haven't processed all results yet! {msg:?}"); } - let mut chunkmap = Vec::new(); + let mut chunkmap = BTreeMap::new(); while let Some(join_resres) = self.join_set.join_next().await { - chunkmap.extend(join_resres??.new_bloblogs); + chunkmap.extend(join_resres??.new_bloblogs.into_iter().map(|(k, nb)| { + ( + k, + IndexBloblogEntry { + chunks: nb, + forgotten_bytes: 0, + }, + ) + })); } Ok(chunkmap) } } -fn assemble_indices(chunkmap: Vec<(BloblogId, BTreeMap)>) -> Vec { +fn assemble_indices(chunkmap: BTreeMap) -> Vec { let mut sorted_map = BTreeMap::new(); for (idx, chunkmap) in chunkmap.into_iter().enumerate() { - let size_of_chunkmap = chunkmap.1.len() + 1; + let size_of_chunkmap = chunkmap.1.chunks.len() + 1; sorted_map.insert(Reverse((size_of_chunkmap, idx)), chunkmap); } let mut indices = Vec::new(); - while let Some(k) = sorted_map.keys().cloned().next() { - let (Reverse((size, _)), (bloblog_id, bloblog_chunks)) = - sorted_map.remove_entry(&k).unwrap(); + while let Some((Reverse((size, _)), (bloblog_id, bloblog_chunks))) = sorted_map.pop_first() { let mut new_index_contents = BTreeMap::new(); - new_index_contents.insert( - bloblog_id, - IndexBloblogEntry { - chunks: bloblog_chunks, - forgotten_bytes: 0, - }, - ); + new_index_contents.insert(bloblog_id, bloblog_chunks); let mut new_index_size_so_far = size; while new_index_size_so_far < DESIRED_INDEX_SIZE_ENTRIES && !sorted_map.is_empty() { @@ -417,13 +433,9 @@ fn assemble_indices(chunkmap: Vec<(BloblogId, BTreeMap)>) let (Reverse((add_size, _)), (bloblog_id, bloblog_chunks)) = sorted_map.remove_entry(&k).unwrap(); new_index_size_so_far += add_size; - new_index_contents.insert( - bloblog_id, - IndexBloblogEntry { - chunks: bloblog_chunks, - forgotten_bytes: 0, - }, - ); + new_index_contents.insert(bloblog_id, bloblog_chunks); + } else { + break; } } @@ -458,7 +470,7 @@ async fn write_indices( pub async fn assemble_and_write_indices( pwc: &PileWithCache, - chunkmap: Vec<(BloblogId, BTreeMap)>, + chunkmap: BTreeMap, ) -> eyre::Result<()> { let indices = assemble_indices(chunkmap); write_indices(pwc, indices).await diff --git a/yama/src/vacuum.rs b/yama/src/vacuum.rs index 8b13789..8957c12 100644 --- a/yama/src/vacuum.rs +++ b/yama/src/vacuum.rs @@ -1 +1,4 @@ - +pub mod delete_unrefd_bloblogs; +pub mod forget_chunks; +pub mod merge_indices; +pub mod repack_bloblogs_and_indices; diff --git a/yama/src/vacuum/delete_unrefd_bloblogs.rs b/yama/src/vacuum/delete_unrefd_bloblogs.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/yama/src/vacuum/delete_unrefd_bloblogs.rs @@ -0,0 +1 @@ + diff --git a/yama/src/vacuum/forget_chunks.rs b/yama/src/vacuum/forget_chunks.rs new file mode 100644 index 0000000..b7ac4bc --- /dev/null +++ b/yama/src/vacuum/forget_chunks.rs @@ -0,0 +1,171 @@ +use crate::extract::expand_chunkrefs_one_layer; +use crate::pile_with_cache::PileWithCache; +use eyre::{bail, ensure, Context, ContextCompat}; +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Arc; +use tracing::info; +use yama_midlevel_crypto::chunk_id::ChunkId; +use yama_pile::definitions::IndexId; +use yama_pile::tree::TreeNode; +use yama_wormfile::boxed::BoxedWormFileProvider; + +pub async fn find_forgettable_chunks( + pwc: &Arc>, + indices: BTreeSet, +) -> eyre::Result> { + let mut unseen_chunk_ids = BTreeSet::new(); + + // Find all chunks in the given indices + { + let mut cache_conn = pwc.localcache.read().await?; + for index_id in &indices { + unseen_chunk_ids.extend(cache_conn.list_chunks_in_index(*index_id).await?); + } + }; + + let chunks_to_scan = prepare_chunkrefs_to_scan(pwc).await?; + scan_chunks(pwc, &mut unseen_chunk_ids, chunks_to_scan) + .await + .context("failed to do a sweep")?; + + Ok(unseen_chunk_ids) +} + +async fn prepare_chunkrefs_to_scan( + pwc: &Arc>, +) -> eyre::Result>> { + let pointer_names = pwc + .pile + .list_pointers() + .await + .context("failed to list pointers")?; + let mut chunks_to_scan_by_depth: BTreeMap> = BTreeMap::new(); + + for pointer_name in &pointer_names { + let pointer = pwc + .pile + .read_pointer(pointer_name) + .await? + .context("pointer vanished")?; + if let Some(parent_name) = pointer.parent { + if !pointer_names.contains(pointer_name) { + bail!("{parent_name:?}, the parent of {pointer_name:?}, does not exist"); + } + } + + pointer + .root + .node + .visit( + &mut |node, _| { + if let TreeNode::NormalFile { content, .. } = node { + chunks_to_scan_by_depth + .entry(content.depth) + .or_default() + .insert(content.chunk_id); + } + Ok(()) + }, + String::new(), + ) + .unwrap(); + } + + Ok(chunks_to_scan_by_depth) +} + +/// Scans the recursive chunkrefs that are passed in, ticking off chunks from the `unseen` set as +/// we go. +async fn scan_chunks( + pwc: &Arc>, + unseen: &mut BTreeSet, + chunks_to_scan_by_depth: BTreeMap>, +) -> eyre::Result<()> { + let mut to_scan: Vec<(u32, Vec)> = chunks_to_scan_by_depth + .into_iter() + .flat_map(|(depth, chunkset)| { + chunkset + .into_iter() + .map(move |chunk_id| (depth, vec![chunk_id])) + }) + .collect(); + + while !to_scan.is_empty() { + // Mark as seen. + for (_, chunk_ids) in &to_scan { + for chunk_id in chunk_ids { + unseen.remove(chunk_id); + } + } + + // Don't descend further into zero-depth elements. + to_scan = to_scan + .into_iter() + .filter(|(depth, _)| *depth > 0) + .collect(); + + // Decrement depth counters. + to_scan = expand_chunkrefs_one_layer(pwc, to_scan) + .await? + .into_iter() + .map(|(old_depth, chunkids)| (old_depth - 1, chunkids)) + .collect(); + } + + Ok(()) +} + +pub async fn forget_chunks( + pwc: &Arc>, + indices: BTreeSet, + forgettable: BTreeSet, +) -> eyre::Result<()> { + let mut indices_to_rewrite = Vec::new(); + // First do a cache-only check to see which indices need rewriting. + { + let mut cache_conn = pwc.localcache.read().await?; + for index_id in &indices { + let chunks_in_this_index = cache_conn.list_chunks_in_index(*index_id).await?; + if !chunks_in_this_index.is_disjoint(&forgettable) { + indices_to_rewrite.push(index_id); + } + } + } + + info!( + "{} indices to rewrite in order to forget chunks", + indices_to_rewrite.len() + ); + + // Go through each index and clean out whatever needs forgetting (then re-create the index and + // remove the old one). + for index_id in indices_to_rewrite { + let mut index = pwc.pile.read_index(*index_id).await?; + let mut changed = false; + for bloblog_entry in index.bloblogs.values_mut() { + let removable: Vec = bloblog_entry + .chunks + .keys() + .filter(|ci| forgettable.contains(ci)) + .cloned() + .collect(); + changed |= !removable.is_empty(); + for chunk_id in removable { + bloblog_entry.forgotten_bytes += + bloblog_entry.chunks.remove(&chunk_id).unwrap().length; + } + } + + ensure!(changed, "no change to index {index_id:?}"); + + index.supersedes.clear(); + index.supersedes.insert(*index_id); + + // TODO APPLY THE NEW INDEX DIRECTLY (how do we do that again?) + let new_index_id = pwc.pile.create_index(&index).await?; + ensure!(new_index_id != *index_id, "index ID bounce"); + pwc.pile.delete_index_dangerous_exclusive(*index_id).await?; + } + + Ok(()) +} diff --git a/yama/src/vacuum/merge_indices.rs b/yama/src/vacuum/merge_indices.rs new file mode 100644 index 0000000..a82c3e4 --- /dev/null +++ b/yama/src/vacuum/merge_indices.rs @@ -0,0 +1,127 @@ +use crate::pile_with_cache::PileWithCache; +use eyre::{bail, Context}; +use std::collections::btree_map::Entry; +use std::collections::BTreeSet; +use std::sync::Arc; +use tracing::{debug, warn}; +use yama_pile::definitions::{Index, IndexId}; +use yama_wormfile::boxed::BoxedWormFileProvider; + +pub const MERGE_THRESHOLD_SIZE: u32 = 2 * 1024 * 1024; +pub const MERGE_TARGET_SIZE: u32 = 16 * 1024 * 1024; + +/// Selects indices for merge. +/// +/// Criteria: +/// - size is less than the `threshold_size` +/// - (FUTURE; TODO) two indices that cover the same bloblog should be merged +pub async fn select_indices_for_merge( + pwc: &Arc>, + target_size: u32, + threshold_size: u32, +) -> eyre::Result>> { + let mut result = Vec::new(); + let mut mergeable_indices: BTreeSet<(u64, IndexId)> = pwc + .pile + .list_indices_with_meta() + .await? + .into_iter() + .filter(|(_, meta)| meta.file_size < threshold_size as u64) + .map(|(index_id, meta)| (meta.file_size, index_id)) + .collect(); + + while mergeable_indices.len() >= 2 { + let mut merge_set = BTreeSet::new(); + let mut merge_size = 0u64; + + let (first_size, first_index) = mergeable_indices.pop_first().unwrap(); + merge_size += first_size; + merge_set.insert(first_index); + + while let Some((size, index)) = mergeable_indices.first() { + if merge_size + *size < target_size as u64 { + merge_size += *size; + merge_set.insert(*index); + mergeable_indices.pop_first(); + } else { + break; + } + } + + if merge_set.len() > 1 { + result.push(merge_set); + } + } + + Ok(result) +} + +/// Merges some indices, deleting them in the process. +/// Requires exclusive lock. +/// (Note: in the future we could only supersede the indices, which only needs a shared lock. +/// However you need an exclusive lock to eventually delete superseded indices...). +pub async fn merge_indices( + pwc: &Arc>, + merge_sets: Vec>, +) -> eyre::Result<()> { + for merge_set in merge_sets { + let mut final_index = Index { + supersedes: merge_set.clone(), + bloblogs: Default::default(), + }; + + for index_id in &merge_set { + let index_being_subsumed = pwc.pile.read_index(*index_id).await?; + // TODO: do we need to worry about the 'supersedes' property on the index here? + // I think not, or at least not if the superseded indices don't exist, + // but worth thinking about in the future if we don't immediately delete + // superseded indices... + for (bloblog_id, bloblog_entry) in index_being_subsumed.bloblogs { + match final_index.bloblogs.entry(bloblog_id) { + Entry::Vacant(ve) => { + ve.insert(bloblog_entry); + } + Entry::Occupied(mut oe) => { + let new_entry = oe.get_mut(); + let (existing_chunks, new_chunks): (Vec<_>, Vec<_>) = bloblog_entry + .chunks + .into_iter() + .partition(|(chunk_id, _)| new_entry.chunks.contains_key(chunk_id)); + for (chunk_id, locator) in new_chunks { + // Subtract from the forgotten byte count, since this may be us re-remembering bytes out of safety... + new_entry.forgotten_bytes = + new_entry.forgotten_bytes.saturating_sub(locator.length); + let is_new = new_entry.chunks.insert(chunk_id, locator).is_none(); + assert!(is_new); + } + for (chunk_id, locator) in existing_chunks { + if &new_entry.chunks[&chunk_id] != &locator { + bail!("Attempted to merge indices that disagree about {bloblog_id:?}/{chunk_id:?}"); + } + } + } + } + } + } + + let merged_index_id = pwc + .pile + .create_index(&final_index) + .await + .context("failed to create merged index")?; + + if merge_set.contains(&merged_index_id) { + // I don't see how this could be possible, but let's avoid deleting the new index if it somehow is a merge of itself... + warn!("strange: created index ID is one of its own merges..."); + continue; + } + + debug!("merged indices {merge_set:?} into {merged_index_id:?}; deleting mergees"); + for index_to_delete in merge_set { + pwc.pile + .delete_index_dangerous_exclusive(index_to_delete) + .await?; + } + } + Ok(()) +} diff --git a/yama/src/vacuum/repack_bloblogs_and_indices.rs b/yama/src/vacuum/repack_bloblogs_and_indices.rs new file mode 100644 index 0000000..454dc6f --- /dev/null +++ b/yama/src/vacuum/repack_bloblogs_and_indices.rs @@ -0,0 +1,191 @@ +use crate::pile_with_cache::PileWithCache; +use crate::storing::assemble_and_write_indices; +use eyre::ContextCompat; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Arc; +use yama_localcache::BloblogStats; +use yama_midlevel_crypto::chunk_id::ChunkId; +use yama_pile::definitions::{BloblogId, IndexBloblogEntry}; +use yama_wormfile::boxed::BoxedWormFileProvider; + +/// Repack bloblogs that have this much forgotten space in them. +pub const REPACK_BLOBLOGS_TO_RECLAIM_SPACE_BYTES: u64 = 32 * 1024 * 1024; + +/// Defines what a 'small bloblog' is (one that is below a certain size, excluding forgotten bytes). +pub const SMALL_BLOBLOG_THRESHOLD: u64 = 64 * 1024 * 1024; + +/// Clump together small bloblogs when together they would hit or exceed this size. +pub const REPACK_BLOBLOGS_TO_CLUMP_TOGETHER_SMALL_BLOBLOGS_BYTES: u64 = 2 * 1024 * 1024 * 1024; + +/// The target size to reach when repacking, in terms of blob bytes. +pub const REPACK_TARGET_SIZE: u64 = 4 * 1024 * 1024; + +/// The limit size to use when repacking, in terms of blob bytes. +pub const REPACK_TARGET_LIMIT: u64 = 5 * 1024 * 1024; + +/// Gets bloblogs' stats. Only considers bloblogs referenced by exactly one index, so we don't +/// have to deal with unifying indices. +pub async fn get_bloblogs_stats( + pwc: &Arc>, +) -> eyre::Result> { + let mut cache_conn = pwc.localcache.read().await?; + let indices = cache_conn.list_indices().await?; + let mut bloblogs: BTreeMap> = BTreeMap::new(); + + for index in indices { + for (bloblog, stats) in cache_conn.index_bloblog_stats(index).await? { + match bloblogs.entry(bloblog) { + Entry::Vacant(ve) => { + ve.insert(Some(stats)); + } + Entry::Occupied(mut oe) => { + // only allow one stats per bloblog, then replace with None. + oe.insert(None); + } + } + } + } + + Ok(bloblogs + .into_iter() + .flat_map(|(k, v)| v.map(|v| (k, v))) + .collect()) +} + +/// Choose some bloblogs to repack. Assumes an updated local cache. +/// +/// Only bloblogs referenced by exactly one index will be considered for repacking. +pub async fn select_bloblogs_for_repack( + stats: BTreeMap, +) -> eyre::Result>> { + let mut repack_for_space: BTreeSet = stats + .iter() + .filter(|(_, v)| v.forgotten_bytes >= REPACK_BLOBLOGS_TO_RECLAIM_SPACE_BYTES) + .map(|(&k, _)| k) + .collect(); + let maybe_repack_for_clumping: BTreeSet = stats + .iter() + .filter(|(_, v)| v.blob_size <= SMALL_BLOBLOG_THRESHOLD) + .map(|(&k, _)| k) + .collect(); + + let should_repack_for_clumping = maybe_repack_for_clumping.len() > 1 + && maybe_repack_for_clumping + .iter() + .map(|bi| stats[bi].blob_size) + .sum::() + > REPACK_BLOBLOGS_TO_CLUMP_TOGETHER_SMALL_BLOBLOGS_BYTES; + + let to_repack = repack_for_space.clone(); + if should_repack_for_clumping { + repack_for_space.extend(maybe_repack_for_clumping); + } + + let mut to_repack: BTreeSet<(u64, BloblogId)> = to_repack + .into_iter() + .map(|bi| (stats[&bi].blob_size, bi)) + .collect(); + + let mut repack_sets = Vec::new(); + + while !to_repack.is_empty() { + let mut new_repack_group = BTreeMap::new(); + let mut new_repack_group_size = 0u64; + + let (first_sz, first_to_repack) = to_repack.pop_last().unwrap(); + new_repack_group_size += first_sz; + new_repack_group.insert(first_to_repack, stats[&first_to_repack].clone()); + + while new_repack_group_size < REPACK_TARGET_SIZE { + let Some((first_size, _)) = to_repack.first() else { break; }; + if new_repack_group_size + *first_size > REPACK_TARGET_LIMIT { + break; + } + let (extra_size, extra_bloblog_id) = to_repack.pop_first().unwrap(); + new_repack_group_size += extra_size; + new_repack_group.insert(extra_bloblog_id, stats[&extra_bloblog_id].clone()); + } + + // now check the repack group is good + if new_repack_group + .keys() + .any(|bi| repack_for_space.contains(bi)) + || new_repack_group_size > REPACK_BLOBLOGS_TO_CLUMP_TOGETHER_SMALL_BLOBLOGS_BYTES + { + repack_sets.push(new_repack_group); + } + } + + Ok(repack_sets) +} + +pub async fn perform_repack( + pwc: Arc>, + repack_sets: Vec>, +) -> eyre::Result<()> { + // 1. Write new bloblogs + let mut indices_buffer = BTreeMap::new(); + let mut index_parts: BTreeMap = BTreeMap::new(); + for repack_set in &repack_sets { + let mut new_bloblog = pwc.pile.create_bloblog().await?; + + for (old_bloblog_id, old_bloblog_stats) in repack_set { + let index_id = old_bloblog_stats.in_index; + if !indices_buffer.contains_key(&index_id) { + indices_buffer.insert(index_id, pwc.pile.read_index(index_id).await?); + } + let index_bloblog_entry = indices_buffer + .get_mut(&index_id) + .unwrap() + .bloblogs + .remove(&old_bloblog_id) + .context("bug: no IBE despite rewrite from context of this index")?; + let mut old_bloblog = pwc.pile.read_bloblog(*old_bloblog_id).await?; + let locators: BTreeMap = index_bloblog_entry + .chunks + .into_iter() + .map(|(blob, locator)| (locator.offset, blob)) + .collect(); + for chunk_id in locators.into_values() { + let chunk = old_bloblog + .read_chunk(chunk_id) + .await? + .context("bug or corrupt bloblog: promised chunk missing")?; + new_bloblog.write_chunk(chunk_id, &chunk).await?; + } + } + + let (_wormpath, new_bloblog_id, new_bloblog_index_info) = new_bloblog.finish().await?; + index_parts.insert( + new_bloblog_id, + IndexBloblogEntry { + chunks: new_bloblog_index_info, + forgotten_bytes: 0, + }, + ); + } + + // 2. Write new indices, but make sure to also write out index entries for unaffected bloblogs + // that appear in the indices we want to replace shortly. + for (_, index) in indices_buffer.iter_mut() { + index_parts.extend(std::mem::take(&mut index.bloblogs)); + } + assemble_and_write_indices(&pwc, index_parts).await?; + + // 3. Delete old indices + for index_id in indices_buffer.into_keys() { + pwc.pile.delete_index_dangerous_exclusive(index_id).await?; + } + + // 4. Delete old bloblogs + for repack_group in repack_sets { + for bloblog_id in repack_group.into_keys() { + pwc.pile + .delete_bloblog_dangerous_exclusive(bloblog_id) + .await?; + } + } + + Ok(()) +} diff --git a/yama_localcache/migrations/20230413133342_local_index_cache.sql b/yama_localcache/migrations/20230413133342_local_index_cache.sql index 1fb4975..aa24e90 100644 --- a/yama_localcache/migrations/20230413133342_local_index_cache.sql +++ b/yama_localcache/migrations/20230413133342_local_index_cache.sql @@ -6,23 +6,32 @@ CREATE TABLE indices ( ); CREATE UNIQUE INDEX indices_index_sha256 ON indices(index_sha256); -CREATE TABLE blobs ( - chunk_id TEXT NOT NULL, - bloblog_short_id INTEGER NOT NULL REFERENCES bloblogs(bloblog_short_id), - index_short_id INTEGER NOT NULL REFERENCES indices(index_short_id), - offset INTEGER NOT NULL, - size INTEGER NOT NULL, - PRIMARY KEY (chunk_id, bloblog_short_id, index_short_id) -); -CREATE INDEX blobs_bloblog_short_id ON blobs(bloblog_short_id); -CREATE INDEX blobs_index_short_id ON blobs(index_short_id); - CREATE TABLE bloblogs ( bloblog_short_id INTEGER PRIMARY KEY NOT NULL, bloblog_sha256 TEXT NOT NULL ); CREATE UNIQUE INDEX bloblogs_bloblog_sha256 ON bloblogs(bloblog_sha256); +-- Track the relationship between indices and bloblogs +CREATE TABLE indices_bloblogs ( + index_short_id INTEGER NOT NULL REFERENCES indices(index_short_id), + bloblog_short_id INTEGER NOT NULL REFERENCES bloblogs(bloblog_short_id), + forgotten_bytes INTEGER NOT NULL, + PRIMARY KEY (index_short_id, bloblog_short_id) +); + +CREATE TABLE blobs ( + chunk_id TEXT NOT NULL, + bloblog_short_id INTEGER NOT NULL, + index_short_id INTEGER NOT NULL, + offset INTEGER NOT NULL, + size INTEGER NOT NULL, + PRIMARY KEY (chunk_id, bloblog_short_id, index_short_id), + FOREIGN KEY (index_short_id, bloblog_short_id) REFERENCES indices_bloblogs(index_short_id, bloblog_short_id) +); +CREATE INDEX blobs_bloblog_short_id ON blobs(bloblog_short_id); +CREATE INDEX blobs_index_short_id ON blobs(index_short_id); + CREATE TABLE indices_supersede ( superseded_sha256 TEXT NOT NULL, successor_sha256 TEXT NOT NULL REFERENCES indices(index_sha256), diff --git a/yama_localcache/src/lib.rs b/yama_localcache/src/lib.rs index 50c8c14..4584adb 100644 --- a/yama_localcache/src/lib.rs +++ b/yama_localcache/src/lib.rs @@ -150,6 +150,13 @@ impl StoreConnection { Some(row) => row.bloblog_short_id, }; + let forgotten_bytes = index_bloblog_entry.forgotten_bytes as i64; + query!(" + INSERT INTO indices_bloblogs (index_short_id, bloblog_short_id, forgotten_bytes) + VALUES (?, ?, ?) + ", index_short_id, bloblog_short_id, forgotten_bytes) + .execute(&mut *txn) + .await?; for (chunk_id, chunk_locator) in index_bloblog_entry.chunks.iter() { let chunk_id_txt = chunk_id.to_string(); @@ -201,6 +208,15 @@ impl StoreConnection { .execute(&mut *txn) .await?; + query!( + " + DELETE FROM indices_bloblogs WHERE index_short_id = ? + ", + index_short_id + ) + .execute(&mut *txn) + .await?; + query!( " DELETE FROM indices WHERE index_short_id = ? @@ -255,6 +271,8 @@ impl StoreConnection { } } + /// Returns all chunk locations. + /// If a chunk does not exist, it is just not returned in the output map. pub async fn locate_chunks( &mut self, chunk_ids: &BTreeSet, @@ -332,4 +350,59 @@ impl StoreConnection { .is_none(); Ok(is_new) } + + pub async fn list_chunks_in_index( + &mut self, + index_id: IndexId, + ) -> eyre::Result> { + let index_id_text = index_id.to_string(); + let row_results = query!( + " + SELECT chunk_id AS \"chunk_id!\" FROM indices i + LEFT JOIN blobs b USING (index_short_id) + WHERE index_sha256 = ? + ", + index_id_text + ) + .map(|row| { + ChunkId::from_str(&row.chunk_id).context("failed to decode ChunkId in local cache") + }) + .fetch_all(&mut *self.conn) + .await?; + row_results.into_iter().collect() + } + + pub async fn index_bloblog_stats( + &mut self, + index_id: IndexId, + ) -> eyre::Result> { + let index_id_text = index_id.to_string(); + let row_results = query!(" + SELECT bloblog_sha256 AS bloblog_id, ib.forgotten_bytes AS forgotten_bytes, COUNT(size) AS \"num_chunks!: i64\", SUM(size) AS \"num_bytes!: i64\" FROM indices i + LEFT JOIN indices_bloblogs ib USING (index_short_id) + LEFT JOIN bloblogs b USING (bloblog_short_id) + LEFT JOIN blobs USING (index_short_id, bloblog_short_id) + WHERE index_sha256 = ? + GROUP BY bloblog_sha256 + ", index_id_text) + .map(|row| { + Ok((BloblogId::try_from(row.bloblog_id.as_ref())?, BloblogStats { + in_index: index_id, + blob_size: row.num_bytes as u64, + forgotten_bytes: row.forgotten_bytes as u64, + num_chunks: row.num_chunks as u32, + })) + }) + .fetch_all(&mut *self.conn) + .await?; + row_results.into_iter().collect() + } +} + +#[derive(Clone, Debug)] +pub struct BloblogStats { + pub in_index: IndexId, + pub blob_size: u64, + pub forgotten_bytes: u64, + pub num_chunks: u32, } diff --git a/yama_pile/src/definitions.rs b/yama_pile/src/definitions.rs index 2db35c7..993f233 100644 --- a/yama_pile/src/definitions.rs +++ b/yama_pile/src/definitions.rs @@ -26,7 +26,7 @@ pub struct BloblogFooter { pub type PackedBloblogFooter = AsymBox>>; /// Locator for a blob within a bloblog. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Eq, PartialEq)] pub struct BlobLocator { pub offset: u64, pub length: u64, @@ -104,7 +104,7 @@ pub struct IndexBloblogEntry { pub type PackedIndex = AsymBox>>; -#[derive(Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Ord, PartialOrd)] pub struct RecursiveChunkRef { /// The root Chunk ID. pub chunk_id: ChunkId, diff --git a/yama_pile/src/keyring.rs b/yama_pile/src/keyring.rs index 0427840..abe53de 100644 --- a/yama_pile/src/keyring.rs +++ b/yama_pile/src/keyring.rs @@ -25,29 +25,58 @@ pub struct Keyring { pub fn generate_r_w_keys() -> (ReaderKey, WriterKey) { let (encrypt, decrypt) = generate_asym_keypair(); let (sign, verify) = asym_signing_keypair(); - (ReaderKey { decrypt, verify }, WriterKey { encrypt, sign }) + ( + ReaderKey::new(decrypt, verify), + WriterKey::new(encrypt, sign), + ) } #[derive(Clone, Serialize, Deserialize)] pub struct WriterKey { + // boxed because these take up a lot of stack space otherwise! + #[serde(flatten)] + inner: Box, +} + +#[derive(Clone, Serialize, Deserialize)] +struct WriterKeyInner { encrypt: EncryptingKey, sign: SigningKey, } impl WriterKey { + pub fn new(encrypt: EncryptingKey, sign: SigningKey) -> Self { + Self { + inner: Box::new(WriterKeyInner { encrypt, sign }), + } + } + pub fn make_locked_asymbox(&self, contents: T) -> AsymBox { - AsymBox::new(contents, &self.sign, &self.encrypt).unwrap() + AsymBox::new(contents, &self.inner.sign, &self.inner.encrypt).unwrap() } } #[derive(Clone, Serialize, Deserialize)] pub struct ReaderKey { + // boxed because these take up a lot of stack space otherwise! + #[serde(flatten)] + inner: Box, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct ReaderKeyInner { decrypt: DecryptingKey, verify: VerifyingKey, } impl ReaderKey { + pub fn new(decrypt: DecryptingKey, verify: VerifyingKey) -> Self { + Self { + inner: Box::new(ReaderKeyInner { decrypt, verify }), + } + } + pub fn unlock_asymbox(&self, asymbox: AsymBox) -> Option { - asymbox.unlock(&self.decrypt, &self.verify) + asymbox.unlock(&self.inner.decrypt, &self.inner.verify) } } diff --git a/yama_pile/src/lib.rs b/yama_pile/src/lib.rs index e3b3939..254cf45 100644 --- a/yama_pile/src/lib.rs +++ b/yama_pile/src/lib.rs @@ -8,13 +8,13 @@ use crate::locks::{LockHandle, LockKind}; use crate::pointers::{PackedPointer, Pointer}; use crate::utils::HashedWormWriter; use eyre::{bail, Context, ContextCompat}; -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use yama_midlevel_crypto::byte_layer::{ByteLayer, CborSerde}; use yama_midlevel_crypto::zstd_box::Zstd; use yama_wormfile::paths::{WormPath, WormPathBuf}; -use yama_wormfile::{WormFileProvider, WormFileWriter}; +use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileWriter}; pub mod definitions; @@ -109,10 +109,35 @@ impl Pile { Ok(BloblogReader::new(worm_reader, &self.keyring).await?) } + /// Delete a bloblog from the pile. + /// This is dangerous: should only really be done in the vacuum operation. + /// Requires an exclusive lock! + /// + /// + pub async fn delete_bloblog_dangerous_exclusive( + &self, + bloblog_id: BloblogId, + ) -> eyre::Result<()> { + if !self.lock.is_active_now(LockKind::Exclusive) { + bail!("can't delete bloblog: exclusive lock not active"); + } + let bloblog_path = WormPathBuf::new(format!( + "bloblogs/{}/{}", + hex::encode(&bloblog_id.0 .0[0..1]), + bloblog_id.0.to_string() + )) + .unwrap(); + self.provider.delete(bloblog_path.as_ref()).await?; + Ok(()) + } + /// Create a new index, returning the index ID. /// /// Requires key: w_bloblog_footer pub async fn create_index(&self, index: &Index) -> eyre::Result { + if !self.lock.is_active_now(LockKind::Shared) { + bail!("can't create index: lock not active"); + } let worm_writer = self.provider.write().await?; let mut writer = HashedWormWriter::new(worm_writer); let packed_index: PackedIndex = self @@ -133,6 +158,9 @@ impl Pile { /// List all indices present in the pile. pub async fn list_indices(&self) -> eyre::Result> { + if !self.lock.is_active_now(LockKind::Shared) { + bail!("can't list indices: lock not active"); + } let files = self .provider .list(WormPath::new("indices").unwrap()) @@ -152,10 +180,37 @@ impl Pile { Ok(result) } + /// List all indices present in the pile, with their metadata. + pub async fn list_indices_with_meta(&self) -> eyre::Result> { + if !self.lock.is_active_now(LockKind::Shared) { + bail!("can't list indices: lock not active"); + } + let files = self + .provider + .list_meta(WormPath::new("indices").unwrap()) + .await + .context("failed to list indices")?; + let mut result = BTreeMap::new(); + for (file, meta) in files { + let (_, filename) = file + .as_ref() + .as_str() + .rsplit_once('/') + .context("index listing entry should split at /")?; + let index_id = IndexId::try_from(filename) + .with_context(|| format!("not a valid index ID: {filename:?}"))?; + result.insert(index_id, meta); + } + Ok(result) + } + /// Read an index from the pile. /// /// Requires key: r_bloblog_footer pub async fn read_index(&self, index_id: IndexId) -> eyre::Result { + if !self.lock.is_active_now(LockKind::Shared) { + bail!("can't read index: lock not active"); + } let r_bloblog_footer = self .keyring .r_bloblog_footer @@ -174,6 +229,20 @@ impl Pile { Ok(index) } + /// Delete an index from the pile. + /// This is dangerous: should only really be done in the vacuum operation. + /// Requires an exclusive lock! + /// + /// + pub async fn delete_index_dangerous_exclusive(&self, index_id: IndexId) -> eyre::Result<()> { + if !self.lock.is_active_now(LockKind::Exclusive) { + bail!("can't delete index: exclusive lock not active"); + } + let target = WormPathBuf::new(format!("indices/{}", index_id.0)).unwrap(); + self.provider.delete(target.as_ref()).await?; + Ok(()) + } + pub async fn read_pointer(&self, name: &str) -> eyre::Result> { let r_pointer = self .keyring @@ -234,14 +303,33 @@ impl Pile { Ok(()) } - pub async fn close(mut self) -> eyre::Result<()> { + pub async fn list_pointers(&self) -> eyre::Result> { + let files = self + .provider + .list(WormPath::new("pointers").unwrap()) + .await?; + Ok(files + .into_iter() + .map(|file| { + let (_dir, pointer) = file.as_ref().as_str().rsplit_once('/').unwrap(); + pointer.to_owned() + }) + .collect()) + } + + pub async fn close(self) -> eyre::Result<()> { match Arc::try_unwrap(self.lock) { Ok(lock) => { - lock.close().await + lock.close() + .await .context("failed to release lock gracefully")?; } Err(arc) => { - bail!("could not close pile gracefully; lock arc has {} strong refs and {} weak refs", Arc::strong_count(&arc), Arc::weak_count(&arc)); + bail!( + "could not close pile gracefully; lock arc has {} strong refs and {} weak refs", + Arc::strong_count(&arc), + Arc::weak_count(&arc) + ); } } Ok(()) diff --git a/yama_pile/src/locks.rs b/yama_pile/src/locks.rs index 0d54f82..2ce48ec 100644 --- a/yama_pile/src/locks.rs +++ b/yama_pile/src/locks.rs @@ -1,6 +1,6 @@ use crate::keyring::{Keyring, ReaderKey, WriterKey}; use chrono::{DateTime, Duration, Utc}; -use eyre::{bail, Context, ContextCompat, eyre}; +use eyre::{bail, eyre, Context, ContextCompat}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::{Arc, RwLock}; @@ -74,19 +74,23 @@ pub struct LockHandle { impl Drop for LockHandle { fn drop(&mut self) { - if let Some(lock_release_tx) = self.lock_release_tx - .take() { - lock_release_tx - .send(()) - .expect("can't drop lock"); + if let Some(lock_release_tx) = self.lock_release_tx.take() { + lock_release_tx.send(()).expect("can't drop lock"); } } } impl LockHandle { pub async fn close(mut self) -> eyre::Result<()> { - self.lock_release_tx.take().unwrap().send(()).map_err(|_| eyre!("can't drop lock"))?; - self.lock_task_join_handle.take().unwrap().await + self.lock_release_tx + .take() + .unwrap() + .send(()) + .map_err(|_| eyre!("can't drop lock"))?; + self.lock_task_join_handle + .take() + .unwrap() + .await .context("lock task fail")?; Ok(()) } @@ -141,7 +145,7 @@ impl LockHandle { let stage1_locks = scan_locks(provider.as_ref(), &r_locks, now).await?; if let Some(blocker) = find_lock_blocker(&stage1_locks, &lock_id, kind) { let lock = &stage1_locks[blocker]; - warn!("{:?} lock {} held by {} currently expiring at {} is blocking our potential lock.", lock.kind, lock_id, lock.holder, lock.expires_at); + warn!("{:?} lock {} held by {:?} currently expiring at {} is blocking our potential lock.", lock.kind, lock_id, lock.holder, lock.expires_at); tokio::time::sleep(tokio::time::Duration::from_secs( (lock.expires_at - now).num_seconds().max(0) as u64 + 10, @@ -161,7 +165,7 @@ impl LockHandle { let stage2_locks = scan_locks(provider.as_ref(), &r_locks, now).await?; if let Some(blocker) = find_lock_blocker(&stage2_locks, &lock_id, kind) { let lock = &stage2_locks[blocker]; - warn!("{:?} lock {} held by {} currently expiring at {} blocked our lock; backing out.", lock.kind, lock_id, lock.holder, lock.expires_at); + warn!("{:?} lock {} held by {:?} currently expiring at {} blocked our lock; backing out.", lock.kind, lock_id, lock.holder, lock.expires_at); // Back out our lock. provider.delete(lock_path.as_ref()).await?; @@ -197,7 +201,7 @@ impl LockHandle { lock_path, lock_id, lock_release_tx: Some(lock_release_tx), - lock_task_join_handle + lock_task_join_handle, }); } } diff --git a/yama_pile/src/tree.rs b/yama_pile/src/tree.rs index 757f216..5dd1267 100644 --- a/yama_pile/src/tree.rs +++ b/yama_pile/src/tree.rs @@ -41,7 +41,9 @@ pub struct RootTreeNode { } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[serde(tag = "t")] pub enum TreeNode { + #[serde(rename = "F")] NormalFile { /// modification time in ms mtime: u64, @@ -53,6 +55,7 @@ pub enum TreeNode { #[serde(flatten)] content: RecursiveChunkRef, }, + #[serde(rename = "D")] Directory { #[serde(flatten)] ownership: FilesystemOwnership, @@ -60,12 +63,14 @@ pub enum TreeNode { permissions: FilesystemPermissions, children: BTreeMap, }, + #[serde(rename = "L")] SymbolicLink { #[serde(flatten)] ownership: FilesystemOwnership, target: String, }, // TODO is there any other kind of file we need to store? + #[serde(rename = "X")] Deleted, } diff --git a/yama_pile/src/utils.rs b/yama_pile/src/utils.rs index 1a7e50e..dec283b 100644 --- a/yama_pile/src/utils.rs +++ b/yama_pile/src/utils.rs @@ -131,6 +131,7 @@ impl AsyncWrite for SymStreamWriter { ) -> Poll> { let mut enc_buf = buf.to_vec(); // Safety: Deny use of unencrypted `buf` from here on. + #[allow(unused)] let buf = (); let offset = self.offset; self.sym_stream_key.apply_xor(offset, &mut enc_buf); diff --git a/yama_wormfile/src/boxed.rs b/yama_wormfile/src/boxed.rs index 3459b8b..d5d9a45 100644 --- a/yama_wormfile/src/boxed.rs +++ b/yama_wormfile/src/boxed.rs @@ -1,5 +1,5 @@ use crate::paths::{WormPath, WormPathBuf}; -use crate::{WormFileProvider, WormFileReader, WormFileWriter}; +use crate::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter}; use async_trait::async_trait; use std::error::Error; use std::fmt::{Debug, Display, Formatter}; @@ -32,6 +32,7 @@ trait BoxableWormFileProvider: Debug + Send + Sync { async fn is_dir_b(&self, path: &WormPath) -> eyre::Result; async fn is_regular_file_b(&self, path: &WormPath) -> eyre::Result; async fn list_b(&self, path: &WormPath) -> eyre::Result>; + async fn list_meta_b(&self, path: &WormPath) -> eyre::Result>; async fn read_b(&self, path: &WormPath) -> eyre::Result>>; async fn write_b(&self) -> eyre::Result>>; async fn delete_b(&self, path: &WormPath) -> eyre::Result<()>; @@ -51,6 +52,10 @@ impl BoxableWormFileProvider for T { self.list(path).await } + async fn list_meta_b(&self, path: &WormPath) -> eyre::Result> { + self.list_meta(path).await + } + async fn read_b(&self, path: &WormPath) -> eyre::Result>> { self.read(path) .await @@ -101,6 +106,14 @@ impl WormFileProvider for BoxedWormFileProvider { self.inner.list_b(path).await } + async fn list_meta( + &self, + path: impl AsRef + Send, + ) -> eyre::Result> { + let path = path.as_ref(); + self.inner.list_meta_b(path).await + } + async fn read(&self, path: impl AsRef + Send) -> eyre::Result { let path = path.as_ref(); self.inner.read_b(path).await diff --git a/yama_wormfile/src/lib.rs b/yama_wormfile/src/lib.rs index 762ca52..860d9eb 100644 --- a/yama_wormfile/src/lib.rs +++ b/yama_wormfile/src/lib.rs @@ -32,6 +32,15 @@ pub trait WormFileProvider: Debug + Send + Sync { /// TODO a streaming version of this might be beneficial. async fn list(&self, path: impl AsRef + Send) -> eyre::Result>; + /// Lists all the files and directories in the specified path, with metadata. + /// + /// If the path does not exist, gives an error. + /// TODO a streaming version of this might be beneficial. + async fn list_meta( + &self, + path: impl AsRef + Send, + ) -> eyre::Result>; + /// Reads a file. /// /// Fails if the file does not exist or is not a regular file. @@ -50,6 +59,11 @@ pub trait WormFileProvider: Debug + Send + Sync { async fn delete(&self, path: impl AsRef + Send) -> eyre::Result<()>; } +#[derive(Clone, Debug)] +pub struct WormFileMeta { + pub file_size: u64, +} + pub trait WormFileReader: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin + 'static {} #[async_trait] diff --git a/yama_wormfile_fs/src/lib.rs b/yama_wormfile_fs/src/lib.rs index f0d6464..8bc1d49 100644 --- a/yama_wormfile_fs/src/lib.rs +++ b/yama_wormfile_fs/src/lib.rs @@ -3,13 +3,14 @@ use eyre::Context as EyreContext; use std::fmt::{Debug, Formatter}; use std::io; use std::io::{ErrorKind, SeekFrom}; +use std::os::unix::fs::MetadataExt; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf}; use yama_wormfile::paths::{WormPath, WormPathBuf}; -use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter}; +use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter}; /// WormFileProvider that uses the local filesystem, in a given root directory. #[derive(Debug)] @@ -70,6 +71,34 @@ impl WormFileProvider for LocalWormFilesystem { Ok(out) } + async fn list_meta( + &self, + path: impl AsRef + Send, + ) -> eyre::Result> { + let worm_path = path.as_ref(); + let real_path = self.resolve_real_path(worm_path); + let mut dir_reader = match tokio::fs::read_dir(real_path).await { + Ok(ok) => ok, + Err(e) if e.kind() == ErrorKind::NotFound => { + return Ok(Vec::new()); + } + Err(other) => return Err(other.into()), + }; + let mut out = Vec::new(); + while let Some(next_ent) = dir_reader.next_entry().await? { + if let Some(name_str) = next_ent.file_name().to_str() { + let metadata = next_ent.metadata().await?; + out.push(( + worm_path.join(name_str).unwrap(), + WormFileMeta { + file_size: metadata.size(), + }, + )); + } + } + Ok(out) + } + async fn read(&self, path: impl AsRef + Send) -> eyre::Result { let worm_path = path.as_ref(); let real_path = self.resolve_real_path(worm_path); diff --git a/yama_wormfile_s3/src/lib.rs b/yama_wormfile_s3/src/lib.rs index 34340e9..9934a25 100644 --- a/yama_wormfile_s3/src/lib.rs +++ b/yama_wormfile_s3/src/lib.rs @@ -12,7 +12,7 @@ use tokio::io::{duplex, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, DuplexS use tokio::task::JoinHandle; use uuid::Uuid; use yama_wormfile::paths::{WormPath, WormPathBuf}; -use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter}; +use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter}; /// WormFileProvider that uses an S3 bucket, with a given path prefix. #[derive(Debug)] @@ -68,6 +68,18 @@ impl WormFileProvider for S3WormFilesystem { } async fn list(&self, path: impl AsRef + Send) -> eyre::Result> { + Ok(self + .list_meta(path) + .await? + .into_iter() + .map(|(name, _meta)| name) + .collect()) + } + + async fn list_meta( + &self, + path: impl AsRef + Send, + ) -> eyre::Result> { let path = path.as_ref(); let full_path = self.resolve_real_path(path); let list = self @@ -84,6 +96,14 @@ impl WormFileProvider for S3WormFilesystem { .strip_prefix(&self.path_prefix) .map(|s| WormPathBuf::new(s.to_owned())) .flatten() + .map(|x| { + ( + x, + WormFileMeta { + file_size: obj.size, + }, + ) + }) }) .collect()) } diff --git a/yama_wormfile_sftp/src/lib.rs b/yama_wormfile_sftp/src/lib.rs index b931e9d..8dd930e 100644 --- a/yama_wormfile_sftp/src/lib.rs +++ b/yama_wormfile_sftp/src/lib.rs @@ -19,7 +19,7 @@ use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::runtime::Handle; use yama_wormfile::paths::{WormPath, WormPathBuf}; -use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter}; +use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter}; /// WormFileProvider that uses an SFTP connection, in a given root directory. #[derive(Debug)] @@ -227,6 +227,46 @@ impl WormFileProvider for SftpWormFilesystem { .collect()) } + async fn list_meta( + &self, + path: impl AsRef + Send, + ) -> eyre::Result> { + let worm_path = path.as_ref(); + let path = worm_path.as_str(); + let mut fs = self.get_fs(); + + let mut remote_dir = match fs.open_dir(path).await { + Ok(ok) => ok, + Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => { + return Ok(Vec::new()); + } + Err(other) => { + return Err(other.into()); + } + }; + let dir_reader = remote_dir.read_dir().await?; + + Ok(dir_reader + .iter() + .filter_map(|entry| { + if let Some(name_str) = entry.filename().as_os_str().to_str() { + if name_str.is_empty() || name_str == "." || name_str == ".." { + None + } else { + Some(( + worm_path.join(name_str).expect("pre-checked"), + WormFileMeta { + file_size: entry.metadata().len().expect("no size on SFTP file?"), + }, + )) + } + } else { + None + } + }) + .collect()) + } + async fn read(&self, path: impl AsRef + Send) -> eyre::Result { let real_path = self.root_dir.join(path.as_ref().as_str());