overhaul: datman support

This commit is contained in:
Olivier 'reivilibre' 2023-05-20 13:11:30 +01:00
parent 8e5649597b
commit dabf7c5cf0
38 changed files with 2549 additions and 265 deletions

27
Cargo.lock generated
View File

@ -746,7 +746,24 @@ dependencies = [
name = "datman"
version = "0.7.0-alpha.1"
dependencies = [
"chrono",
"clap",
"dashmap",
"eyre",
"indicatif",
"patricia_tree",
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tracing-indicatif",
"tracing-subscriber",
"users",
"yama",
"yama_midlevel_crypto",
"yama_pile",
"yama_wormfile",
]
[[package]]
@ -3036,9 +3053,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.27.0"
version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
dependencies = [
"autocfg",
"bytes",
@ -3050,7 +3067,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.45.0",
"windows-sys 0.48.0",
]
[[package]]
@ -3065,9 +3082,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",

View File

@ -12,3 +12,26 @@ description = "A chunked and deduplicated backup system using Yama"
[dependencies]
eyre = "0.6.8"
clap = { version = "4.2.2", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["tracing-log", "env-filter"] }
tracing-indicatif = "0.3.0"
indicatif = "0.17.3"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
toml = "0.7.3"
tokio = { version = "1.28.0", features = ["fs", "macros", "rt-multi-thread"] }
dashmap = "5.4.0"
chrono = "0.4.24"
users = "0.11.0"
yama = { version = "0.7.0-alpha.1", path = "../yama" }
yama_pile = { path = "../yama_pile" }
#yama_localcache = { path = "../yama_localcache" }
yama_wormfile = { path = "../yama_wormfile" }
#yama_wormfile_fs = { path = "../yama_wormfile_fs" }
#yama_wormfile_s3 = { path = "../yama_wormfile_s3" }
#yama_wormfile_sftp = { path = "../yama_wormfile_sftp" }
yama_midlevel_crypto = { path = "../yama_midlevel_crypto" }
patricia_tree = "0.5.7"

471
datman/src/backup.rs Normal file
View File

@ -0,0 +1,471 @@
use crate::descriptor_config::{SourceDescriptor, SourceDescriptorInner, VirtualSourceKind};
use crate::pointer_names::{get_pointer_name_at, POINTER_NAME_DATETIME_SPLITTER};
use chrono::{DateTime, Utc};
use dashmap::DashSet;
use eyre::{bail, eyre, Context, ContextCompat};
use indicatif::ProgressStyle;
use patricia_tree::PatriciaMap;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::io::Write;
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::task::JoinSet;
use tracing::{debug, info, info_span, Instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use users::{get_current_gid, get_current_uid};
use yama::pile_with_cache::PileWithCache;
use yama::scan::create_uidgid_lookup_tables;
use yama::storing::{
assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState,
};
use yama::{scan, PROGRESS_BAR_STYLE};
use yama_midlevel_crypto::chunk_id::ChunkId;
use yama_pile::definitions::{BlobLocator, BloblogId, IndexBloblogEntry, RecursiveChunkRef};
use yama_pile::pointers::Pointer;
use yama_pile::tree::unpopulated::ScanEntry;
use yama_pile::tree::{
assemble_tree_from_scan_entries, differentiate_node_in_place, FilesystemOwnership,
FilesystemPermissions, RootTreeNode, TreeNode,
};
use yama_wormfile::boxed::BoxedWormFileProvider;
pub async fn backup(
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
sources_to_backup: BTreeMap<String, SourceDescriptor>,
) -> eyre::Result<()> {
// Locate suitable parent pointers
let parents_to_use = find_suitable_parent_pointers(&pwc, &sources_to_backup)
.await
.context("failed to look for suitable parent pointers")?;
let now = Utc::now();
// (dirtrees) Scan
let dir_sources = scan_dir_sources(&sources_to_backup, parents_to_use, now)
.await
.context("failed to scan directory sources")?;
let new_unflushed_chunks: Arc<DashSet<ChunkId>> = Arc::new(Default::default());
// (dirtrees) Start a storage pipeline and submit jobs to it
let task_store_dirs = {
let new_unflushed_chunks = new_unflushed_chunks.clone();
let pwc = pwc.clone();
let bds_span = info_span!("storing");
tokio::spawn(
async move {
backup_dir_sources(dir_sources, pwc, new_unflushed_chunks)
.await
.context("whilst backing up dir sources")
}
.instrument(bds_span),
)
};
// (virtual source streams) Store to bloblog writers
let task_store_virtuals = {
let bvs_span = info_span!("storing_virts");
let new_unflushed_chunks = new_unflushed_chunks.clone();
let pwc = pwc.clone();
tokio::spawn(
async move {
backup_virtual_sources(&sources_to_backup, now, pwc, new_unflushed_chunks)
.await
.context("whilst backing up virtual sources")
}
.instrument(bvs_span),
)
};
let (dir_sources_and_chunkmaps, virt_sources) =
tokio::join!(task_store_dirs, task_store_virtuals);
let dir_sources_and_chunkmaps: BackupDirSourcesReturn = dir_sources_and_chunkmaps??;
let mut virt_sources: Vec<VirtualSourceReturn> = virt_sources??;
let mut chunkmaps = dir_sources_and_chunkmaps.chunkmaps;
for source in &mut virt_sources {
chunkmaps.extend(
std::mem::take(&mut source.chunkmaps)
.into_iter()
.map(|(k, nb)| {
(
k,
IndexBloblogEntry {
chunks: nb,
forgotten_bytes: 0,
},
)
}),
);
}
// Chunkmaps, indices and write pointers
assemble_and_write_indices(&pwc, chunkmaps)
.await
.context("failed to assemble and write indices")?;
info!("All indices stored, writing pointer...");
for (dir_source_prep, chunk_file_map) in dir_sources_and_chunkmaps.dir_source_returns {
// Assemble and write a pointer
let mut tree =
assemble_tree_from_scan_entries(dir_source_prep.scan_entry_map, chunk_file_map)
.context("failed to assemble tree")?;
let (uids, gids) =
create_uidgid_lookup_tables(&tree).context("failed to create uid/gid tables")?;
if let Some(ref parent_node) = dir_source_prep.parent {
differentiate_node_in_place(&mut tree, &parent_node.root.node)
.context("failed to differentiate?")?;
}
pwc.pile
.write_pointer(
&dir_source_prep.new_pointer_name,
false,
&Pointer {
parent: dir_source_prep.parent_name.clone(),
root: RootTreeNode {
name: dir_source_prep
.path
.file_name()
.map(|oss| oss.to_str())
.flatten()
.unwrap_or("")
.to_owned(),
node: tree,
},
uids,
gids,
},
)
.await
.context("failed to write pointer")?;
}
for virtual_source in virt_sources {
pwc.pile
.write_pointer(&virtual_source.pointer_name, false, &virtual_source.pointer)
.await
.context("failed to write pointer")?;
}
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
Ok(())
}
/// Given access to a PWC and a map of sources to back up, returns a map of pointer names to use as parents.
/// For virtual sources, no parent is chosen.
/// For directory sources, the most recent pointer from the same source is chosen as a parent.
async fn find_suitable_parent_pointers(
pwc: &PileWithCache<BoxedWormFileProvider>,
sources_to_backup: &BTreeMap<String, SourceDescriptor>,
) -> eyre::Result<BTreeMap<String, (String, Pointer)>> {
let mut result = BTreeMap::new();
let pointers = pwc
.pile
.list_pointers()
.await
.context("failed to list pointers")?;
for (source_name, source) in sources_to_backup.iter() {
if source.is_directory_source() {
let starter = format!("{source_name}{POINTER_NAME_DATETIME_SPLITTER}");
if let Some(most_recent_pointer) = pointers
.iter()
.rev()
.filter(|pn| pn.starts_with(&starter))
.next()
{
debug!("for {source_name:?}, using parent {most_recent_pointer:?}");
let pointer = pwc
.read_pointer_fully_integrated(&most_recent_pointer)
.await
.context("failed to read parent pointer")?
.context("no parent pointer despite having just listed it")?;
result.insert(
source_name.to_owned(),
(most_recent_pointer.clone(), pointer),
);
}
}
}
Ok(result)
}
struct DirSourcePrep {
scan_entry_map: PatriciaMap<ScanEntry>,
parent_name: Option<String>,
parent: Option<Pointer>,
path: PathBuf,
new_pointer_name: String,
}
async fn scan_dir_sources(
sources_to_backup: &BTreeMap<String, SourceDescriptor>,
mut parents: BTreeMap<String, (String, Pointer)>,
now: DateTime<Utc>,
) -> eyre::Result<Vec<DirSourcePrep>> {
let mut joinset = JoinSet::new();
for (source_name, source) in sources_to_backup {
if let SourceDescriptorInner::DirectorySource {
path,
cross_filesystems,
ignore,
} = &source.inner
{
let path = path.to_owned();
let cross_filesystems = *cross_filesystems;
debug!("TODO: xf={cross_filesystems}");
let ignore = ignore.to_owned();
let (parent_name, parent) = parents.remove(source_name).unzip();
let new_pointer_name = get_pointer_name_at(&source_name, now);
joinset.spawn_blocking(move || -> eyre::Result<DirSourcePrep> {
let scan_entry_map = scan::scan(&path, &ignore).context("Failed to scan")?;
Ok(DirSourcePrep {
scan_entry_map,
parent_name,
parent,
path,
new_pointer_name,
})
});
}
}
let mut result = Vec::new();
while let Some(dsp_res_res) = joinset.join_next().await {
result.push(dsp_res_res??);
}
Ok(result)
}
struct BackupDirSourcesReturn {
pub chunkmaps: BTreeMap<BloblogId, IndexBloblogEntry>,
pub dir_source_returns: Vec<(DirSourcePrep, PatriciaMap<(RecursiveChunkRef, u64)>)>,
}
async fn backup_dir_sources(
dir_sources: Vec<DirSourcePrep>,
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
new_unflushed_chunks: Arc<DashSet<ChunkId>>,
) -> eyre::Result<BackupDirSourcesReturn> {
let mut chunk_file_maps = Vec::new();
let mut pruned_scan_entry_maps = Vec::new();
// First collect all that stuff together...
for dir_source in &dir_sources {
let (chunk_file_map, pruned_scan_entry_map) = if let Some(ref parent_node) =
dir_source.parent
{
let (cfm, pruned) =
scan::prepopulate_unmodified(&parent_node.root.node, &dir_source.scan_entry_map);
(cfm, Cow::Owned(pruned))
} else {
(
PatriciaMap::<(RecursiveChunkRef, u64)>::new(),
Cow::Borrowed(&dir_source.scan_entry_map),
)
};
chunk_file_maps.push(chunk_file_map);
pruned_scan_entry_maps.push(pruned_scan_entry_map);
}
let store_span = Span::current();
// store_span.pb_set_style(&ProgressStyle::default_bar());
store_span.pb_set_style(
&ProgressStyle::default_bar()
.template(PROGRESS_BAR_STYLE)
.unwrap(),
);
store_span.pb_set_message("storing files");
store_span.pb_set_length(
pruned_scan_entry_maps
.iter()
.map(|pruned_scan_entry_map| {
pruned_scan_entry_map
.values()
.filter(|v| matches!(v, ScanEntry::NormalFile { .. }))
.count() as u64
})
.sum(),
);
//
let (pipeline, pipeline_job_tx) =
StoragePipeline::launch_new(4, pwc.clone(), new_unflushed_chunks).await?;
let dir_sources2 = &dir_sources;
let (submitter_task, receiver_task) = tokio::join!(
async move {
let pipeline_job_tx = pipeline_job_tx;
for (dir_source_idx, dir_source) in dir_sources2.iter().enumerate() {
for (name_bytes, scan_entry) in pruned_scan_entry_maps[dir_source_idx].iter() {
if let ScanEntry::NormalFile { .. } = scan_entry {
let name = std::str::from_utf8(name_bytes.as_slice())
.context("name is not str")?;
// TODO(bug): if source name is a file, this doesn't work (.join(""))
pipeline_job_tx
.send_async((
(dir_source_idx, name.to_owned()),
dir_source.path.join(name),
))
.await
.map_err(|_| eyre!("unable to send to pipeline."))?;
}
}
}
drop(pipeline_job_tx);
Ok::<_, eyre::Report>(())
},
async {
while let Ok(((dir_source_idx, job_id), rec_chunk_ref, real_size)) =
pipeline.next_result().await
{
chunk_file_maps[dir_source_idx].insert_str(&job_id, (rec_chunk_ref, real_size));
Span::current().pb_inc(1);
}
// eprintln!("fin rec");
Ok::<_, eyre::Report>(())
}
);
submitter_task?;
receiver_task?;
assert_eq!(dir_sources.len(), chunk_file_maps.len());
let chunkmaps = pipeline.finish_into_chunkmaps().await?;
Ok(BackupDirSourcesReturn {
chunkmaps,
dir_source_returns: dir_sources
.into_iter()
.zip(chunk_file_maps.into_iter())
.collect(),
})
}
async fn backup_virtual_sources(
sources: &BTreeMap<String, SourceDescriptor>,
now: DateTime<Utc>,
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
new_unflushed_chunks: Arc<DashSet<ChunkId>>,
) -> eyre::Result<Vec<VirtualSourceReturn>> {
let mut joinset: JoinSet<eyre::Result<VirtualSourceReturn>> = JoinSet::new();
for (source_name, source) in sources {
if source.is_virtual_source() {
joinset.spawn(backup_virtual_source(
get_pointer_name_at(source_name, now),
source.clone(),
pwc.clone(),
new_unflushed_chunks.clone(),
));
}
}
let mut results = Vec::new();
while let Some(result_res_res) = joinset.join_next().await {
results.push(result_res_res??);
}
Ok(results)
}
struct VirtualSourceReturn {
pub pointer_name: String,
pub pointer: Pointer,
pub chunkmaps: Vec<(BloblogId, BTreeMap<ChunkId, BlobLocator>)>,
}
async fn backup_virtual_source(
pointer_name: String,
source: SourceDescriptor,
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
new_unflushed_chunks: Arc<DashSet<ChunkId>>,
) -> eyre::Result<VirtualSourceReturn> {
let SourceDescriptorInner::VirtualSource(virtual_source) = &source.inner else {
bail!("bug: non-VS SDI passed to BVS");
};
let mut storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks)
.await
.context("failed to create storing state")?;
let mut sbw = StoringBloblogWriters::default();
let ((chunkref, size), mut sbw, mut storing_state) = tokio::task::spawn_blocking({
let virtual_source = virtual_source.clone();
move || -> eyre::Result<((RecursiveChunkRef, u64), StoringBloblogWriters, StoringState)> {
let child = open_stdout_backup_process(&virtual_source.extra_args, &virtual_source.helper)?;
Ok((storing_state.store_full_stream(child.stdout.unwrap(), &mut sbw).context("Failed to store stream into Yama pile")?, sbw, storing_state))
}
}).await??;
sbw.finish_bloblogs(&mut storing_state)
.await
.context("Failed to finish bloblogs")?;
let chunkmaps = storing_state.new_bloblogs;
// Assemble and write a pointer
let uid = get_current_uid() as u16;
let gid = get_current_gid() as u16;
let tree = TreeNode::NormalFile {
mtime: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
ownership: FilesystemOwnership { uid, gid },
permissions: FilesystemPermissions { mode: 0o600 },
size,
content: chunkref,
};
let (uids, gids) =
create_uidgid_lookup_tables(&tree).context("failed to create uid/gid tables")?;
let VirtualSourceKind::Stdout { filename } = &virtual_source.kind;
Ok(VirtualSourceReturn {
pointer_name,
pointer: Pointer {
parent: None,
root: RootTreeNode {
name: filename.clone(),
node: tree,
},
uids,
gids,
},
chunkmaps,
})
}
pub fn open_stdout_backup_process(
extra_args: &HashMap<String, toml::Value>,
program_name: &str,
) -> eyre::Result<Child> {
let mut child = Command::new(format!("datman-helper-{}-backup", program_name))
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.stdin(Stdio::piped())
.spawn()?;
let mut child_stdin = child.stdin.as_mut().unwrap();
serde_json::to_writer(&mut child_stdin, extra_args)?;
child_stdin.flush()?;
// close stdin!
child.stdin = None;
Ok(child)
}

View File

@ -15,6 +15,266 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/
pub fn main() -> eyre::Result<()> {
use clap::{Parser, Subcommand};
use datman::backup::backup;
use datman::descriptor_config::{load_descriptor, SourceDescriptor};
use datman::extract::{
extract, load_pointers_for_extraction, merge_roots_for_batch_extract, select_to_extract,
};
use eyre::{bail, Context, ContextCompat};
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use std::str::FromStr;
use tracing::info;
use tracing_indicatif::IndicatifLayer;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
use yama::get_hostname;
use yama::open::open_lock_and_update_cache;
#[derive(Clone, Debug)]
pub struct PileAndPointer {
pub pile_path: Option<PathBuf>,
pub pointer: PointerName,
}
#[derive(Clone, Debug)]
#[repr(transparent)]
pub struct PointerName(String);
impl FromStr for PointerName {
type Err = eyre::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if !s
.chars()
.all(|c| c.is_alphanumeric() || ['_', '+', '-', ':'].contains(&c))
{
bail!("Bad pointer name: {s:?}");
}
Ok(PointerName(s.to_owned()))
}
}
impl FromStr for PileAndPointer {
type Err = eyre::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.split_once(":") {
None => Ok(PileAndPointer {
pile_path: None,
pointer: PointerName::from_str(s)?,
}),
Some((pile_path, pointer)) => Ok(PileAndPointer {
pile_path: Some(PathBuf::from(pile_path)),
pointer: PointerName::from_str(pointer)?,
}),
}
}
}
#[derive(Clone, Debug)]
pub struct PileAndPointerWithSubTree {
pub pile_path: Option<PathBuf>,
pub pointer: PointerName,
// TODO how to represent...
pub sub_tree: String,
}
impl FromStr for PileAndPointerWithSubTree {
type Err = eyre::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (pile_path, pointer_and_subtree) = match s.split_once(":") {
None => (None, s),
Some((pile_path, pointer)) => (Some(PathBuf::from(pile_path)), pointer),
};
if let Some(slash) = pointer_and_subtree.find('/') {
Ok(PileAndPointerWithSubTree {
pile_path,
pointer: PointerName::from_str(&pointer_and_subtree[0..slash])?,
sub_tree: pointer_and_subtree[slash + 1..].to_owned(),
})
} else {
Ok(PileAndPointerWithSubTree {
pile_path,
pointer: PointerName::from_str(&pointer_and_subtree)?,
sub_tree: String::new(),
})
}
}
}
#[derive(Parser, Clone, Debug)]
pub struct DatmanArgs {
#[arg(long, env = "DATMAN_CONFIG", default_value = "datman.toml")]
config: PathBuf,
#[command(subcommand)]
command: DatmanCommand,
}
#[derive(Subcommand, Clone, Debug)]
pub enum DatmanCommand {
BackupOne {
source_name: String,
pile_name: String,
},
BackupAll {
pile_name: String,
},
ExtractOne {
pile_name: String,
source_name: String,
destination: PathBuf,
},
ExtractAll {
pile_name: String,
destination: PathBuf,
},
}
const PROGRESS_SPANS: &'static [&'static str] = &[
"store_file",
"storing",
"unpack_files",
"expand_chunkrefs",
"extract_files",
];
#[tokio::main]
pub async fn main() -> eyre::Result<()> {
let indicatif_layer = IndicatifLayer::new();
let stderr_writer = indicatif_layer.get_stderr_writer();
let indicatif_layer = indicatif_layer.with_filter(filter_fn(|span_metadata| {
span_metadata.target().starts_with("yama") && PROGRESS_SPANS.contains(&span_metadata.name())
}));
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "sqlx=warn,yama=debug,datman=debug,info".into()),
)
.with(tracing_subscriber::fmt::layer().with_writer(stderr_writer))
.with(indicatif_layer)
.init();
let args: DatmanArgs = dbg!(DatmanArgs::parse());
let descriptor = load_descriptor(&args.config)
.await
.context("failed to load Datman descriptor")?;
dbg!(&descriptor);
match args.command {
DatmanCommand::BackupOne {
source_name,
pile_name,
} => {
let pile_connector_path = descriptor
.piles
.get(&pile_name)
.cloned()
.context("no pile by that name")?;
let lock_name = format!("{} datman backup {:?}", get_hostname(), source_name);
let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?;
let source = descriptor
.sources
.get(&source_name)
.context("no source by that name")?;
let my_hostname = get_hostname();
if &source.host != &my_hostname {
bail!(
"Current hostname is {:?}, not {:?} as expected for this source.",
my_hostname,
source.host
);
}
let mut sources_to_backup = BTreeMap::new();
sources_to_backup.insert(source_name.clone(), source.clone());
backup(pwc, sources_to_backup).await?;
}
DatmanCommand::BackupAll { pile_name } => {
let pile_connector_path = descriptor
.piles
.get(&pile_name)
.cloned()
.context("no pile by that name")?;
let lock_name = format!("{} datman backupall", get_hostname());
let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?;
let my_hostname = get_hostname();
let sources_to_backup: BTreeMap<String, SourceDescriptor> = descriptor
.sources
.clone()
.into_iter()
.filter(|(_, source)| &source.host == &my_hostname)
.collect();
if sources_to_backup.len() == 0 {
bail!(
"No sources to back up! The current hostname is {:?}; is it correct?",
my_hostname
);
}
info!(
"Backing up the following {} sources: {:?}",
sources_to_backup.len(),
sources_to_backup.keys().collect::<Vec<_>>()
);
backup(pwc, sources_to_backup).await?;
}
DatmanCommand::ExtractOne {
pile_name,
source_name,
destination,
} => {
let pile_connector_path = descriptor
.piles
.get(&pile_name)
.cloned()
.context("no pile by that name")?;
let lock_name = format!("{} datman extract {:?}", get_hostname(), source_name);
let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?;
let mut sources = BTreeSet::new();
sources.insert(source_name.clone());
let selected = select_to_extract(&pwc, sources, None, None, false).await?;
let mut for_extraction = load_pointers_for_extraction(pwc.clone(), selected).await?;
assert_eq!(for_extraction.len(), 1);
let root_node = for_extraction.remove(&source_name).unwrap();
extract(pwc, root_node.node, &destination).await?;
}
DatmanCommand::ExtractAll {
pile_name,
destination,
} => {
let pile_connector_path = descriptor
.piles
.get(&pile_name)
.cloned()
.context("no pile by that name")?;
let lock_name = format!("{} datman extractall", get_hostname());
let pwc = open_lock_and_update_cache(pile_connector_path, lock_name).await?;
let sources = descriptor.sources.keys().cloned().collect();
let selected = select_to_extract(&pwc, sources, None, None, false).await?;
let for_extraction = load_pointers_for_extraction(pwc.clone(), selected).await?;
let merged_node = merge_roots_for_batch_extract(for_extraction);
extract(pwc, merged_node, &destination).await?;
}
}
Ok(())
}

26
datman/src/datetime.rs Normal file
View File

@ -0,0 +1,26 @@
use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone};
use eyre::bail;
use std::str::FromStr;
pub struct HumanDateTime(pub DateTime<Local>);
impl FromStr for HumanDateTime {
type Err = eyre::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(date_only) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
let local_datetime = Local
.from_local_datetime(&date_only.and_hms_opt(0, 0, 0).unwrap())
.unwrap();
Ok(HumanDateTime(local_datetime))
} else if let Ok(date_and_time) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
let local_datetime = Local.from_local_datetime(&date_and_time).unwrap();
Ok(HumanDateTime(local_datetime))
} else if let Ok(date_and_time) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
let local_datetime = Local.from_local_datetime(&date_and_time).unwrap();
Ok(HumanDateTime(local_datetime))
} else {
bail!("Couldn't parse using any format. Use one of: 2021-05-16 OR 2021-05-16T17:42:14 OR 2021-05-16 17:42:14");
}
}
}

View File

@ -0,0 +1,126 @@
/*
This file is part of Yama.
Yama is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Yama is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/
use eyre::{Context, ContextCompat};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
// TODO how do we handle?:
// - (important) yama push of one pile to another
// - backup policy stuff like 'minimum backup frequency' ... show when it's not been done
// - backup policy stuff like 'minimum on two different disks, not powered at the same time...'
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Descriptor {
/// Sources
pub sources: HashMap<String, SourceDescriptor>,
/// Paths to destination Yama Piles. Remote Piles need a local virtual pile to specify the layers.
pub piles: HashMap<String, PathBuf>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub retention: Option<RetentionPolicyConfig>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct RetentionPolicyConfig {
pub daily: u32,
pub weekly: u32,
pub monthly: u32,
pub yearly: u32,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct SourceDescriptor {
/// The host to run this backup task on.
pub host: String,
#[serde(flatten)]
pub inner: SourceDescriptorInner,
}
impl SourceDescriptor {
pub fn is_directory_source(&self) -> bool {
matches!(&self.inner, &SourceDescriptorInner::DirectorySource { .. })
}
pub fn is_virtual_source(&self) -> bool {
matches!(&self.inner, &SourceDescriptorInner::VirtualSource { .. })
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum SourceDescriptorInner {
DirectorySource {
path: PathBuf,
#[serde(default)]
cross_filesystems: bool,
/// TODO Paths to ignore
#[serde(default)]
ignore: Vec<String>,
},
VirtualSource(VirtualSource),
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct VirtualSource {
/// The name of the helper program that will be used to do this backup.
pub helper: String,
/// The label that will be assigned to this source.
pub label: String,
/// The kind of virtual source (how it operates).
pub kind: VirtualSourceKind,
#[serde(flatten)]
pub extra_args: HashMap<String, toml::Value>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum VirtualSourceKind {
Stdout {
#[serde(rename = "stdout")]
filename: String,
},
// TODO(feature) TempDir
}
/// Loads a descriptor and resolves relative paths contained within.
pub async fn load_descriptor(path: &Path) -> eyre::Result<Descriptor> {
let text = tokio::fs::read_to_string(path).await?;
let mut descriptor: Descriptor = toml::de::from_str(&text)?;
let dir = path
.parent()
.context("there must be a parent path for the descriptor file")?;
// Absolutise pile paths
for (_, pile_path) in descriptor.piles.iter_mut() {
*pile_path = dir
.join(&*pile_path)
.canonicalize()
.context("Failed to canonicalise path in descriptor")?;
}
Ok(descriptor)
}

182
datman/src/extract.rs Normal file
View File

@ -0,0 +1,182 @@
use crate::datetime::HumanDateTime;
use crate::pointer_names::split_pointer_name;
use chrono::{DateTime, Utc};
use eyre::{bail, eyre, Context, ContextCompat};
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use std::sync::Arc;
use tracing::{info_span, warn, Instrument};
use yama::extract;
use yama::extract::flatten_treenode;
use yama::pile_with_cache::PileWithCache;
use yama_pile::tree::{FilesystemOwnership, FilesystemPermissions, RootTreeNode, TreeNode};
use yama_wormfile::boxed::BoxedWormFileProvider;
/// Given a list of source names and conditions to find pointers within,
/// returns a mapping of source names to pointers.
pub async fn select_to_extract(
pwc: &PileWithCache<BoxedWormFileProvider>,
sources: BTreeSet<String>,
before: Option<HumanDateTime>,
after: Option<HumanDateTime>,
accept_partial: bool,
) -> eyre::Result<BTreeMap<String, String>> {
let before = before.map(|dt| dt.0.with_timezone(&Utc));
let after = after.map(|dt| dt.0.with_timezone(&Utc));
let pointers_list = pwc
.pile
.list_pointers()
.await
.context("failed to list pointers")?;
select_to_extract_impl(pointers_list, sources, before, after, accept_partial)
}
/// Given a list of source names and conditions to find pointers within,
/// returns a mapping of source names to pointers.
fn select_to_extract_impl(
pointers_list: Vec<String>,
sources: BTreeSet<String>,
before: Option<DateTime<Utc>>,
after: Option<DateTime<Utc>>,
accept_partial: bool,
) -> eyre::Result<BTreeMap<String, String>> {
if after.is_some() && before.is_some() {
bail!("Can't specify both before and after!");
}
let mut pointers_by_source: BTreeMap<String, String> = BTreeMap::new();
for pointer in pointers_list {
if let Some((source_name, pointer_datetime)) = split_pointer_name(&pointer) {
if !sources.contains(&source_name) {
// Not a source that we're interested in.
continue;
}
if let Some(before) = before {
if before < pointer_datetime {
// datetime is after the 'before' time
continue;
}
} else if let Some(after) = after {
if pointer_datetime < after {
// datetime is before the 'after' time
continue;
}
}
match pointers_by_source.entry(source_name) {
Entry::Vacant(ve) => {
ve.insert(pointer);
}
Entry::Occupied(mut oe) => {
let current_choice = oe.get_mut();
let (_, current_datetime) = split_pointer_name(&current_choice).unwrap();
let should_replace = if after.is_some() {
// if we want the first one after a time, we want the earliest option!
// so replace if new datetime is earlier than current
pointer_datetime < current_datetime
} else {
// replace if new datetime is after current datetime
current_datetime < pointer_datetime
};
if should_replace {
*current_choice = pointer;
}
}
}
};
}
if pointers_by_source.is_empty() {
bail!("No pointers selected for ANY of the sources: {sources:?}");
}
let missing: Vec<&String> = sources
.iter()
.filter(|src| !pointers_by_source.contains_key(*src))
.collect();
if !missing.is_empty() {
if accept_partial {
warn!("Some sources didn't have any pointers selected: {missing:?}. Continuing because --accept-partial passed.");
} else {
bail!("Some sources didn't have any pointers selected: {missing:?}. Pass --accept-partial if this is intended anyway.");
}
}
Ok(pointers_by_source)
}
pub async fn load_pointers_for_extraction(
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
what_to_extract: BTreeMap<String, String>,
) -> eyre::Result<BTreeMap<String, RootTreeNode>> {
let mut result = BTreeMap::new();
for (source_name, pointer_name) in &what_to_extract {
let pointer = pwc
.read_pointer_fully_integrated(&pointer_name)
.await?
.context("pointer doesn't exist??")?;
// TODO(ownership): adapt uid/gids here
result.insert(source_name.clone(), pointer.root);
}
Ok(result)
}
pub fn merge_roots_for_batch_extract(extracts: BTreeMap<String, RootTreeNode>) -> TreeNode {
let mut children = BTreeMap::new();
for (name, entry) in extracts {
if matches!(entry.node, TreeNode::NormalFile { .. }) {
let mut children2 = BTreeMap::new();
children2.insert(entry.name, entry.node);
children.insert(
name,
TreeNode::Directory {
ownership: FilesystemOwnership {
// TODO(ownership): populate this correctly (current user?)
uid: 0,
gid: 0,
},
permissions: FilesystemPermissions { mode: 0o700 },
children: children2,
},
);
} else {
children.insert(name, entry.node);
}
}
TreeNode::Directory {
ownership: FilesystemOwnership {
// TODO(ownership): populate this correctly (current user?)
uid: 0,
gid: 0,
},
permissions: FilesystemPermissions { mode: 0o700 },
children,
}
}
pub async fn extract(
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
node: TreeNode,
destination: &Path,
) -> eyre::Result<()> {
let flat = flatten_treenode(&node)?;
drop(node);
extract::unpack_nonfiles(destination, &flat.nonfiles, false, true).await?;
let extract_span = info_span!("extract_files");
extract::unpack_files(&pwc, destination, &flat.files, false, true)
.instrument(extract_span)
.await?;
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
Ok(())
}

View File

@ -1 +1,6 @@
pub mod backup;
pub mod descriptor_config;
pub mod extract;
pub mod datetime;
pub mod pointer_names;

View File

@ -0,0 +1,20 @@
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
pub const POINTER_DATETIME_FORMAT: &'static str = "%F_%T";
pub const POINTER_NAME_DATETIME_SPLITTER: &'static str = "+";
pub fn get_pointer_name_at(source_name: &str, datetime: DateTime<Utc>) -> String {
format!(
"{}{}{}",
source_name,
POINTER_NAME_DATETIME_SPLITTER,
datetime.format(POINTER_DATETIME_FORMAT).to_string()
)
}
pub fn split_pointer_name(pointer_name: &str) -> Option<(String, DateTime<Utc>)> {
let (source_name, date_time_str) = pointer_name.rsplit_once(POINTER_NAME_DATETIME_SPLITTER)?;
let date_time = NaiveDateTime::parse_from_str(date_time_str, POINTER_DATETIME_FORMAT).ok()?;
let date_time = Utc.from_utc_datetime(&date_time);
Some((source_name.to_owned(), date_time))
}

View File

@ -30,7 +30,7 @@ yama_midlevel_crypto = { path = "../yama_midlevel_crypto" }
clap = { version = "4.2.2", features = ["derive"] }
tokio = { version = "1.27.0", features = ["io-std"] }
tokio = { version = "1.28.1", features = ["full"] }
appdirs = "0.2.0"
twox-hash = "1.6.3"
hostname = "0.3.1"

View File

@ -17,6 +17,7 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
use clap::{Parser, Subcommand};
use eyre::{bail, eyre, Context, ContextCompat};
use indicatif::ProgressStyle;
use patricia_tree::PatriciaMap;
use std::borrow::Cow;
use std::iter::Iterator;
@ -24,32 +25,41 @@ use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use indicatif::ProgressStyle;
use tokio::io::{stdin, AsyncBufReadExt, BufReader};
use tracing::{info, info_span, warn, Span, Instrument};
use tracing_indicatif::IndicatifLayer;
use tracing::{info, info_span, warn, Instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tracing_indicatif::IndicatifLayer;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
use users::{get_current_gid, get_current_uid};
use yama::extract::flatten_treenode;
use yama::init::{generate_master_keyring, pack_keyring};
use yama::open::{open_keyring_interactive, open_pile, pre_open_keyring, update_cache};
use yama::pile_connector::PileConnectionScheme;
use yama::scan::create_uidgid_lookup_tables;
use yama::storing::{assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState};
use yama::{extract, get_hostname, init, PROGRESS_BAR_STYLE, scan};
use yama::storing::{
assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState,
};
use yama::vacuum::forget_chunks::{find_forgettable_chunks, forget_chunks};
use yama::vacuum::merge_indices::{MERGE_TARGET_SIZE, MERGE_THRESHOLD_SIZE};
use yama::vacuum::repack_bloblogs_and_indices::{
get_bloblogs_stats, perform_repack, select_bloblogs_for_repack,
};
use yama::{check, extract, get_hostname, init, scan, vacuum, PROGRESS_BAR_STYLE};
use yama_midlevel_crypto::byte_layer::{ByteLayer, CborSerde};
use yama_midlevel_crypto::chunk_id::ChunkIdKey;
use yama_pile::definitions::{
PackedPileConfig, PileConfig, RecursiveChunkRef, SUPPORTED_YAMA_PILE_VERSION,
IndexBloblogEntry, PackedPileConfig, PileConfig, RecursiveChunkRef, SUPPORTED_YAMA_PILE_VERSION,
};
use yama_pile::locks::LockKind;
use yama_pile::pointers::Pointer;
use yama_pile::tree::unpopulated::ScanEntry;
use yama_pile::tree::{assemble_tree_from_scan_entries, differentiate_node_in_place, FilesystemOwnership, FilesystemPermissions, RootTreeNode, TreeNode};
use yama_pile::tree::{
assemble_tree_from_scan_entries, differentiate_node_in_place, FilesystemOwnership,
FilesystemPermissions, RootTreeNode, TreeNode,
};
use yama_pile::FILE_YAMA_CONNECTOR;
#[derive(Clone, Debug)]
@ -68,7 +78,7 @@ impl FromStr for PointerName {
fn from_str(s: &str) -> Result<Self, Self::Err> {
if !s
.chars()
.all(|c| c.is_alphanumeric() || ['_', '+', '-'].contains(&c))
.all(|c| c.is_alphanumeric() || ['_', '+', '-', ':'].contains(&c))
{
bail!("Bad pointer name: {s:?}");
}
@ -182,9 +192,7 @@ pub enum YamaCommand {
},
/// Extract an output stream from a Yama pile.
ExtractStdout {
source: PileAndPointerWithSubTree,
},
ExtractStdout { source: PileAndPointerWithSubTree },
// TODO Mount { ... },
Check {
@ -198,10 +206,31 @@ pub enum YamaCommand {
intensive: bool,
},
// TODO lsp, rmp
/// Perform maintenance tasks, usually freeing up space or clumping together files to reduce
/// clutter.
Vacuum {
/// Perform all maintenance and space-saving tasks.
#[arg(long, short = 'a')]
all: bool,
// TODO vacuum
/// Merge indices together. Implied by -a.
#[arg(long, short = 'm')]
merge: bool,
// TODO `locks` to inspect locks
/// Forget chunks from indices. Implied by -a.
/// This process is slow because it involves walking all pointers to see which chunks can be
/// forgotten.
#[arg(long, short = 'f')]
forget: bool,
/// Repack bloblogs and corresponding indices. Implied by -a.
#[arg(long, short = 'r')]
repack: bool,
/// Delete unreferenced bloblogs. Implied by -a.
#[arg(long, short = 'd')]
delete_unrefd_bloblogs: bool,
}, // TODO `locks` to inspect locks
}
#[derive(Subcommand, Clone, Debug)]
@ -236,16 +265,21 @@ pub enum KeyringCommand {
}, // TODO ChangePassword
}
const PROGRESS_SPANS: &'static [&'static str] = &["store_file", "storing", "unpack_files", "expand_chunkrefs", "extract_files"];
const PROGRESS_SPANS: &'static [&'static str] = &[
"store_file",
"storing",
"unpack_files",
"expand_chunkrefs",
"extract_files",
];
#[tokio::main]
async fn main() -> eyre::Result<()> {
let indicatif_layer = IndicatifLayer::new();
let stderr_writer = indicatif_layer.get_stderr_writer();
let indicatif_layer = indicatif_layer
.with_filter(filter_fn(|span_metadata| {
span_metadata.target().starts_with("yama") && PROGRESS_SPANS.contains(&span_metadata.name())
}));
let indicatif_layer = indicatif_layer.with_filter(filter_fn(|span_metadata| {
span_metadata.target().starts_with("yama") && PROGRESS_SPANS.contains(&span_metadata.name())
}));
tracing_subscriber::registry()
.with(
@ -307,9 +341,11 @@ async fn main() -> eyre::Result<()> {
None
} else {
let zstd_dict_path = zstd_dict.unwrap();
Some(Arc::new(tokio::fs::read(&zstd_dict_path)
.await
.with_context(|| format!("failed to read Zstd dict at {zstd_dict_path:?}"))?))
Some(Arc::new(
tokio::fs::read(&zstd_dict_path).await.with_context(|| {
format!("failed to read Zstd dict at {zstd_dict_path:?}")
})?,
))
};
let pile_config = PileConfig {
@ -408,15 +444,23 @@ async fn main() -> eyre::Result<()> {
let store_span = info_span!("storing");
// store_span.pb_set_style(&ProgressStyle::default_bar());
store_span.pb_set_style(&ProgressStyle::default_bar().template(
PROGRESS_BAR_STYLE,
).unwrap());
store_span.pb_set_style(
&ProgressStyle::default_bar()
.template(PROGRESS_BAR_STYLE)
.unwrap(),
);
store_span.pb_set_message("storing files");
store_span.pb_set_length(pruned_scan_entry_map.values()
.filter(|v| matches!(v, ScanEntry::NormalFile { .. })).count() as u64);
store_span.pb_set_length(
pruned_scan_entry_map
.values()
.filter(|v| matches!(v, ScanEntry::NormalFile { .. }))
.count() as u64,
);
let store_span_entered = store_span.enter();
let (pipeline, pipeline_job_tx) = StoragePipeline::launch_new(4, pwc.clone()).await?;
let new_unflushed_chunks = Arc::new(Default::default());
let (pipeline, pipeline_job_tx) =
StoragePipeline::launch_new(4, pwc.clone(), new_unflushed_chunks).await?;
let source2 = source.clone();
let (submitter_task, receiver_task) = tokio::join!(
@ -426,8 +470,13 @@ async fn main() -> eyre::Result<()> {
if let ScanEntry::NormalFile { .. } = scan_entry {
let name = std::str::from_utf8(name_bytes.as_slice())
.context("name is not str")?;
let path = if name != "" {
source2.join(name)
} else {
source2.clone()
};
pipeline_job_tx
.send_async((name.to_owned(), source2.join(name)))
.send_async((name.to_owned(), path))
.await
.map_err(|_| eyre!("unable to send to pipeline."))?;
}
@ -497,8 +546,11 @@ async fn main() -> eyre::Result<()> {
.await
.context("failed to write pointer")?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
},
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
}
YamaCommand::StoreStdin {
destination,
overwrite,
@ -514,7 +566,7 @@ async fn main() -> eyre::Result<()> {
LockKind::Shared,
format!("{} store {:?}", get_hostname(), destination.pointer),
)
.await?;
.await?;
update_cache(&pwc).await?;
let pwc = Arc::new(pwc);
@ -522,30 +574,54 @@ async fn main() -> eyre::Result<()> {
let store_span = info_span!("storing");
// store_span.pb_set_style(&ProgressStyle::default_bar());
// TODO INDETERMINATE PROGRESS BAR with bytes shown?
store_span.pb_set_style(&ProgressStyle::default_bar().template(
PROGRESS_BAR_STYLE,
).unwrap());
store_span.pb_set_style(
&ProgressStyle::default_bar()
.template(PROGRESS_BAR_STYLE)
.unwrap(),
);
store_span.pb_set_message("storing files");
store_span.pb_set_length(1u64);
// TODO Dirty
let store_span_entered = store_span.enter();
let _store_span_entered = store_span.enter();
let mut storing_state = StoringState::new(pwc.clone()).await.context("failed to create storing state")?;
let new_unflushed_chunks = Arc::new(Default::default());
let mut storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks)
.await
.context("failed to create storing state")?;
let mut sbw = StoringBloblogWriters::default();
let stdin = std::io::BufReader::new(io_streams::StreamReader::stdin().context("failed to open stdin")?);
let (chunkref, size) = storing_state.store_full_stream(stdin, &mut sbw).context("Failed to store stream into Yama pile")?;
let stdin = std::io::BufReader::new(
io_streams::StreamReader::stdin().context("failed to open stdin")?,
);
let (chunkref, size) = storing_state
.store_full_stream(stdin, &mut sbw)
.context("Failed to store stream into Yama pile")?;
sbw.finish_bloblogs(&mut storing_state).await.context("Failed to finish bloblogs")?;
sbw.finish_bloblogs(&mut storing_state)
.await
.context("Failed to finish bloblogs")?;
info!("Stream stored, writing indices...");
// Write indices for the new bloblogs we have created. This is a prerequisite for creating a pointer.
let chunkmaps = storing_state.new_bloblogs;
assemble_and_write_indices(&pwc, chunkmaps)
.await
.context("failed to assemble and write indices")?;
assemble_and_write_indices(
&pwc,
chunkmaps
.into_iter()
.map(|(k, nb)| {
(
k,
IndexBloblogEntry {
chunks: nb,
forgotten_bytes: 0,
},
)
})
.collect(),
)
.await
.context("failed to assemble and write indices")?;
info!("All indices stored, writing pointer...");
@ -553,7 +629,10 @@ async fn main() -> eyre::Result<()> {
let uid = get_current_uid() as u16;
let gid = get_current_gid() as u16;
let tree = TreeNode::NormalFile {
mtime: SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_millis() as u64).unwrap_or(0),
mtime: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
ownership: FilesystemOwnership { uid, gid },
permissions: FilesystemPermissions { mode: 0o600 },
size,
@ -579,8 +658,11 @@ async fn main() -> eyre::Result<()> {
.await
.context("failed to write pointer")?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
},
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
}
YamaCommand::Extract {
source,
destination,
@ -593,13 +675,15 @@ async fn main() -> eyre::Result<()> {
let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?;
let pwc = Arc::new(open_pile(
&pile_connector_path,
keyring,
LockKind::Shared,
format!("{} store {:?}", get_hostname(), source.pointer),
)
.await?);
let pwc = Arc::new(
open_pile(
&pile_connector_path,
keyring,
LockKind::Shared,
format!("{} store {:?}", get_hostname(), source.pointer),
)
.await?,
);
update_cache(&pwc).await?;
let pointer = pwc
@ -639,11 +723,12 @@ async fn main() -> eyre::Result<()> {
.instrument(extract_span)
.await?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
},
YamaCommand::ExtractStdout {
source,
} => {
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
}
YamaCommand::ExtractStdout { source } => {
let pile_connector_path = source
.pile_path
.as_ref()
@ -652,13 +737,15 @@ async fn main() -> eyre::Result<()> {
let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?;
let pwc = Arc::new(open_pile(
&pile_connector_path,
keyring,
LockKind::Shared,
format!("{} store {:?}", get_hostname(), source.pointer),
)
.await?);
let pwc = Arc::new(
open_pile(
&pile_connector_path,
keyring,
LockKind::Shared,
format!("{} store {:?}", get_hostname(), source.pointer),
)
.await?,
);
update_cache(&pwc).await?;
let pointer = pwc
@ -690,9 +777,7 @@ async fn main() -> eyre::Result<()> {
};
let chunkref = match node {
TreeNode::NormalFile { content, .. } => {
content
}
TreeNode::NormalFile { content, .. } => content,
TreeNode::Directory { .. } => {
bail!("Can't extract `Directory` to stdout!");
}
@ -705,13 +790,126 @@ async fn main() -> eyre::Result<()> {
};
let extract_span = info_span!("extract_files");
let stream = std::io::BufWriter::new(io_streams::StreamWriter::stdout().context("failed to open stdout")?);
let stream = std::io::BufWriter::new(
io_streams::StreamWriter::stdout().context("failed to open stdout")?,
);
extract::unpack_sync_stream(&pwc, *chunkref, stream)
.instrument(extract_span)
.await?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
},
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
}
YamaCommand::Check {
pointers,
shallow,
intensive,
} => {
if !pointers && !shallow && !intensive {
bail!("Check level not chosen. Try -2");
}
if pointers {
bail!("pointers check not implemented yet. Try -2");
}
if intensive {
bail!("intensive check not implemented yet. Try -2");
}
let pile_connector_path = Path::new(".");
let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?;
let pwc = Arc::new(
open_pile(
&pile_connector_path,
keyring,
LockKind::Shared,
format!("{} check", get_hostname()),
)
.await?,
);
update_cache(&pwc).await?;
if shallow {
check::check_pointers_point_to_indexed_chunks(&pwc)
.await
.context("shallow check failed")?;
}
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
}
YamaCommand::Vacuum {
all,
merge,
forget,
repack,
delete_unrefd_bloblogs,
} => {
let pile_connector_path = Path::new(".");
let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?;
// TODO figure out how we use the pendingexclusive thing again...
let pwc = Arc::new(
open_pile(
&pile_connector_path,
keyring,
LockKind::Exclusive,
format!("{} vacuum", get_hostname()),
)
.await?,
);
update_cache(&pwc).await?;
if all || merge {
let to_merge = vacuum::merge_indices::select_indices_for_merge(
&pwc,
MERGE_TARGET_SIZE,
MERGE_THRESHOLD_SIZE,
)
.await
.context("failed to select indices for merge")?;
vacuum::merge_indices::merge_indices(&pwc, to_merge)
.await
.context("failed to merge indices")?;
update_cache(&pwc).await?;
}
if all || forget {
// TODO: allow running on smaller sets of indices than all of them
let all_indices = {
let mut cache_conn = pwc.localcache.read().await?;
cache_conn.list_indices().await?
};
let forgettable_chunks = find_forgettable_chunks(&pwc, all_indices.clone()).await?;
info!("{} chunks can be forgotten", forgettable_chunks.len());
forget_chunks(&pwc, all_indices, forgettable_chunks).await?;
update_cache(&pwc).await?;
}
if all || repack {
let bloblog_stats = get_bloblogs_stats(&pwc).await?;
let to_repack = select_bloblogs_for_repack(bloblog_stats).await?;
info!("{} repack groups to be processed.", to_repack.len());
perform_repack(pwc.clone(), to_repack).await?;
update_cache(&pwc).await?;
}
if all || delete_unrefd_bloblogs {
todo!();
}
Arc::try_unwrap(pwc)
.map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?
.close()
.await?;
}
_other => todo!(),
}

View File

@ -99,16 +99,15 @@ async fn main() -> eyre::Result<()> {
&root_display_node,
false,
)
},
YamaScanCommand::Ignore {
path, unanchored
} => {
}
YamaScanCommand::Ignore { path, unanchored } => {
let mut oo = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(".yamaignore").await
.open(".yamaignore")
.await
.context("failed to open .yamaignore for r/w")?;
let pos = oo.seek(SeekFrom::End(0)).await?;
if pos > 1 {
@ -127,7 +126,7 @@ async fn main() -> eyre::Result<()> {
oo.flush().await?;
drop(oo);
},
}
_other => todo!(),
}

64
yama/src/check.rs Normal file
View File

@ -0,0 +1,64 @@
use crate::extract::expand_chunkrefs;
use crate::pile_with_cache::PileWithCache;
use eyre::{bail, ContextCompat};
use std::collections::BTreeSet;
use std::sync::Arc;
use tracing::info;
use yama_midlevel_crypto::chunk_id::ChunkId;
use yama_pile::tree::TreeNode;
use yama_wormfile::boxed::BoxedWormFileProvider;
/// Check that all pointers point to chunks that exist **in our local cache**.
pub async fn check_pointers_point_to_indexed_chunks(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
) -> eyre::Result<()> {
let pointer_names = pwc.pile.list_pointers().await?;
let mut rcrs_to_check = BTreeSet::new();
for pointer_name in &pointer_names {
let pointer = pwc
.pile
.read_pointer(pointer_name)
.await?
.context("pointer vanished")?;
if let Some(parent_name) = pointer.parent {
if !pointer_names.contains(pointer_name) {
bail!("{parent_name:?}, the parent of {pointer_name:?}, does not exist");
}
}
pointer
.root
.node
.visit(
&mut |node, _| {
if let TreeNode::NormalFile { content, .. } = node {
rcrs_to_check.insert(*content);
}
Ok(())
},
String::new(),
)
.unwrap();
}
let chunk_ids: BTreeSet<ChunkId> =
expand_chunkrefs(pwc, rcrs_to_check.into_iter().map(|x| ((), x)))
.await?
.into_iter()
.map(|(_, x)| x)
.flatten()
.collect();
info!("{} chunks to check for existence", chunk_ids.len());
let mut cache = pwc.localcache.read().await?;
let resolved_chunks = cache.locate_chunks(&chunk_ids).await?;
if chunk_ids.len() != resolved_chunks.len() {
bail!("Not all chunk IDs could be resolved. TODO: this check error is currently not granular enough.");
}
info!("All {} chunks accounted for!", resolved_chunks.len());
Ok(())
}

View File

@ -1,17 +1,18 @@
use crate::pile_with_cache::PileWithCache;
use crate::retriever::decompressor::PipelineDecompressor;
use crate::retriever::{create_fixed_retriever, FileId, JobChunkReq, JobId, RetrieverResp};
use eyre::{bail, ensure, Context, ContextCompat, eyre};
use crate::PROGRESS_BAR_STYLE;
use eyre::{bail, ensure, eyre, Context, ContextCompat};
use flume::Receiver;
use indicatif::ProgressStyle;
use patricia_tree::PatriciaMap;
use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet};
use std::fs::Permissions;
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use indicatif::ProgressStyle;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
@ -22,7 +23,6 @@ use yama_pile::definitions::{BloblogId, RecursiveChunkRef};
use yama_pile::tree::unpopulated::ScanEntry;
use yama_pile::tree::{FilesystemPermissions, TreeNode};
use yama_wormfile::boxed::BoxedWormFileProvider;
use crate::PROGRESS_BAR_STYLE;
#[derive(Clone, Debug, Default)]
pub struct FlattenedTree {
@ -93,7 +93,7 @@ pub fn flatten_treenode(root_node: &TreeNode) -> eyre::Result<FlattenedTree> {
/// Create directories and symbolic links.
pub async fn unpack_nonfiles(
root: &PathBuf,
root: &Path,
nonfiles: &PatriciaMap<ScanEntry>,
restore_ownership: bool,
restore_permissions: bool,
@ -133,7 +133,7 @@ pub async fn unpack_nonfiles(
// TODO(perf): move out file writes into separate tasks...
pub async fn unpack_files(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
root: &PathBuf,
root: &Path,
files: &PatriciaMap<(ScanEntry, RecursiveChunkRef)>,
restore_ownership: bool,
restore_permissions: bool,
@ -149,7 +149,10 @@ pub async fn unpack_files(
)
.await?;
let total_chunks = expanded_chunkrefs.iter().map(|(_, cs)| cs.len() as u64).sum::<u64>();
let total_chunks = expanded_chunkrefs
.iter()
.map(|(_, cs)| cs.len() as u64)
.sum::<u64>();
let unpack_span = info_span!("unpack_files");
async move {
@ -236,24 +239,26 @@ pub async fn unpack_files(
}.instrument(unpack_span).await
}
pub async fn unpack_sync_stream(pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
chunkref: RecursiveChunkRef,
mut stream: impl Write,
pub async fn unpack_sync_stream(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
chunkref: RecursiveChunkRef,
mut stream: impl Write,
) -> eyre::Result<()> {
let expanded_chunkrefs = expand_chunkrefs(
pwc,
vec![((), chunkref)].into_iter(),
)
.await?;
let expanded_chunkrefs = expand_chunkrefs(pwc, vec![((), chunkref)].into_iter()).await?;
let total_chunks = expanded_chunkrefs.iter().map(|(_, cs)| cs.len() as u64).sum::<u64>();
let total_chunks = expanded_chunkrefs
.iter()
.map(|(_, cs)| cs.len() as u64)
.sum::<u64>();
let unpack_span = info_span!("unpack_files");
async move {
let unpack_span = Span::current();
unpack_span.pb_set_style(&ProgressStyle::default_bar().template(
PROGRESS_BAR_STYLE,
).unwrap());
unpack_span.pb_set_style(
&ProgressStyle::default_bar()
.template(PROGRESS_BAR_STYLE)
.unwrap(),
);
unpack_span.pb_set_message("unpack");
unpack_span.pb_set_length(total_chunks);
@ -264,16 +269,14 @@ pub async fn unpack_sync_stream(pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
while let Ok(next_part) = file_part_retriever.recv_async().await {
match next_part {
RetrieverResp::Blob { blob, .. } => {
tokio::task::block_in_place(|| {
stream.write_all(&blob)
}).context("Failed to write to output stream on Blob")?;
tokio::task::block_in_place(|| stream.write_all(&blob))
.context("Failed to write to output stream on Blob")?;
unpack_span.pb_inc(1);
}
RetrieverResp::JobComplete(_) => {
tokio::task::block_in_place(|| {
stream.flush()
}).context("Failed to flush output stream on JobComplete")?;
tokio::task::block_in_place(|| stream.flush())
.context("Failed to flush output stream on JobComplete")?;
done = true;
}
}
@ -284,10 +287,17 @@ pub async fn unpack_sync_stream(pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
}
Ok(())
}.instrument(unpack_span).await
}
.instrument(unpack_span)
.await
}
async fn file_unpacker_writer(path: PathBuf, permissions: FilesystemPermissions, restore_permissions: bool, rx: Receiver<Option<Vec<u8>>>) -> eyre::Result<()> {
async fn file_unpacker_writer(
path: PathBuf,
permissions: FilesystemPermissions,
restore_permissions: bool,
rx: Receiver<Option<Vec<u8>>>,
) -> eyre::Result<()> {
let mut oo = OpenOptions::new();
oo.write(true).create_new(true);
if restore_permissions {
@ -301,15 +311,12 @@ async fn file_unpacker_writer(path: PathBuf, permissions: FilesystemPermissions,
loop {
match rx.recv_async().await {
Ok(Some(next_block)) => {
file.write_all(&next_block)
.await?;
},
file.write_all(&next_block).await?;
}
Ok(None) => {
file.flush()
.await
.context("failed to flush")?;
file.flush().await.context("failed to flush")?;
return Ok(());
},
}
Err(_) => {
bail!("rx for file unpacking into {path:?} disconnected unexpectedly");
}
@ -317,7 +324,7 @@ async fn file_unpacker_writer(path: PathBuf, permissions: FilesystemPermissions,
}
}
async fn expand_chunkrefs<T>(
pub(crate) async fn expand_chunkrefs<T>(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
chunkrefs: impl Iterator<Item = (T, RecursiveChunkRef)>,
) -> eyre::Result<Vec<(T, Vec<ChunkId>)>> {
@ -337,13 +344,21 @@ async fn expand_chunkrefs<T>(
}
let ec_span = info_span!("expand_chunkrefs");
ec_span.pb_set_style(&ProgressStyle::default_bar().template(
PROGRESS_BAR_STYLE,
).unwrap());
ec_span.pb_set_length(ts_and_chunks.iter().map(|(_, cs)| cs.len() as u64).sum::<u64>());
ec_span.pb_set_style(
&ProgressStyle::default_bar()
.template(PROGRESS_BAR_STYLE)
.unwrap(),
);
ec_span.pb_set_length(
ts_and_chunks
.iter()
.map(|(_, cs)| cs.len() as u64)
.sum::<u64>(),
);
ec_span.pb_set_message(&format!("resolve (d={next_depth})"));
let expanded_ts_and_chunks = expand_chunkrefs_one_layer(pwc, ts_and_chunks)
.instrument(ec_span).await?;
.instrument(ec_span)
.await?;
by_depth
.entry(Reverse(next_depth - 1))
.or_default()
@ -413,7 +428,7 @@ async fn lookup_chunkrefs_and_create_retriever<T>(
Ok((retriever, out_by_job))
}
async fn expand_chunkrefs_one_layer<T>(
pub(crate) async fn expand_chunkrefs_one_layer<T>(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
input: Vec<(T, Vec<ChunkId>)>,
) -> eyre::Result<Vec<(T, Vec<ChunkId>)>> {

View File

@ -1,6 +1,7 @@
pub mod init;
pub mod open;
pub mod check;
pub mod extract;
pub mod scan;
pub mod storing;
@ -11,7 +12,8 @@ pub mod pile_with_cache;
pub mod retriever;
pub const PROGRESS_BAR_STYLE: &'static str = "[{elapsed_precise}]/[{eta}] {wide_bar:.cyan/blue} {pos:>7}/{len:7} {msg}";
pub const PROGRESS_BAR_STYLE: &'static str =
"[{elapsed_precise}]/[{eta}] {wide_bar:.cyan/blue} {pos:>7}/{len:7} {msg}";
pub fn get_hostname() -> String {
hostname::get()

View File

@ -4,7 +4,7 @@ use eyre::{bail, Context, ContextCompat};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::hash::{Hash, Hasher};
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tracing::debug;
@ -98,7 +98,10 @@ pub async fn open_pile(
connection_scheme.hash(&mut hasher);
let u64_hash = hasher.finish();
let base_name = connector_in_dir
let canon_connector_in_dir = connector_in_dir
.canonicalize()
.unwrap_or(connector_in_dir.to_owned());
let base_name = canon_connector_in_dir
.file_name()
.map(|f| f.to_string_lossy())
.unwrap_or(Cow::Borrowed("_"));
@ -165,3 +168,16 @@ pub async fn update_cache(pwc: &PileWithCache<BoxedWormFileProvider>) -> eyre::R
Ok(())
}
pub async fn open_lock_and_update_cache(
pile_connector_path: PathBuf,
lock_name: String,
) -> eyre::Result<Arc<PileWithCache<BoxedWormFileProvider>>> {
let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?;
let pwc = open_pile(&pile_connector_path, keyring, LockKind::Shared, lock_name).await?;
update_cache(&pwc).await?;
Ok(Arc::new(pwc))
}

View File

@ -48,6 +48,7 @@ struct FileRegionMarker {
pub subjob: u32,
}
#[derive(Debug)]
struct OpenFileState {
pub req_tx: Sender<OpenFileReq>,
pub offset: u64,
@ -61,16 +62,13 @@ struct OpenFileReq {
pub subjob: u32,
}
#[derive(Debug)]
struct ActiveJobState {
pub subjobs: Vec<JobChunkReq>,
pub next_subjob: u32,
pub inflight: u32,
}
pub struct Retriever {
job_tx: Sender<(JobId, Vec<JobChunkReq>)>,
}
struct RetrieverInternals {
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
jobs_queue: BTreeMap<JobId, Vec<JobChunkReq>>,
@ -141,6 +139,7 @@ impl RetrieverInternals {
offset: u64,
length: u64,
) -> eyre::Result<()> {
// debug!("sched {job:?}->{subjob:?}");
open_file
.req_tx
.send_async(OpenFileReq {
@ -205,7 +204,8 @@ impl RetrieverInternals {
})
.await
.expect("completions shut");
// eprintln!("completion of{next_job:?}");
// debug!("read,acking! {:?}", next_job);
ack_tx.send_async(next_job.job).await?;
}
@ -213,7 +213,12 @@ impl RetrieverInternals {
}
async fn retrieval_task(&mut self) -> eyre::Result<()> {
// let mut icount = 0u64;
loop {
// icount += 1;
// debug!("[{icount}] active jobs {:#?}", self.active_jobs);
// debug!("[{icount}] open files {:#?}", self.open_files);
// 0. Try to progress open jobs if they are staring right at the bytes they need...
let mut to_remove = Vec::new();
for (active_job_id, active_job) in &mut self.active_jobs {
@ -226,9 +231,17 @@ impl RetrieverInternals {
to_remove.push(*active_job_id);
continue;
}
// Which file we are 'staring at' and requesting a run of chunks from
let mut stare_file = None;
'single_job_staring: loop {
let desired_blob = &active_job.subjobs[active_job.next_subjob as usize];
if stare_file.is_some() && stare_file != Some(desired_blob.file) {
// We have changed which file we are looking at, we can't request any further
// because they might get retrieved out of order.
break 'single_job_staring;
}
if let Some(open_file) = self.open_files.get_mut(&desired_blob.file) {
stare_file = Some(desired_blob.file);
if open_file.offset == desired_blob.offset {
Self::file_request(
open_file,
@ -255,12 +268,15 @@ impl RetrieverInternals {
// this job is to be finished!
break 'single_job_staring;
}
} else {
break 'single_job_staring;
}
} else {
break 'single_job_staring;
}
}
}
for remove in to_remove {
self.active_jobs.remove(&remove);
// eprintln!("job complete {remove:?}");
@ -354,6 +370,7 @@ impl RetrieverInternals {
files_to_open.insert(desired_blob.file);
}
}
if !files_to_open.is_empty() {
for file in files_to_open {
self.open_file(file).await?;

View File

@ -175,10 +175,10 @@ impl PipelineDecompressor {
);
}
let state = self
.processing
.get_mut(&job)
.context("bad job/not starting at 0 for job")?;
// debug!("blob {job:?} {subjob:?}");
let state = self.processing.get_mut(&job).with_context(|| {
format!("bad job/not starting at 0 for job {job:?} (subjob={subjob:?})")
})?;
ensure!(
state.next_enqueue_subjob == subjob,
"out of order Blob commands"
@ -196,6 +196,8 @@ impl PipelineDecompressor {
.context("bad job to complete")?;
state.complete = true;
// debug!("complete {job:?}");
let can_remove = state.next_submit_subjob == state.next_enqueue_subjob;
if can_remove {

View File

@ -1,4 +1,4 @@
use eyre::{bail, eyre, Context};
use eyre::{bail, eyre, Context, ContextCompat};
use ignore::WalkBuilder;
use patricia_tree::PatriciaMap;
use std::collections::{BTreeMap, BTreeSet};
@ -95,6 +95,19 @@ pub fn relative_path(base: &Path, leaf: &Path) -> Option<String> {
/// Aborts if any errors (permission, bad .yamaignore files, etc) are encountered.
/// In the future, we possibly want to consider allowing
pub fn scan(root: &Path, ignores: &Vec<String>) -> eyre::Result<PatriciaMap<ScanEntry>> {
let mut entries: PatriciaMap<ScanEntry> = PatriciaMap::new();
if !root.is_dir() {
let metadata = std::fs::symlink_metadata(root).context("reading metadata of root")?;
entries.insert(
"",
scan_one_no_recurse(root, metadata)
.context("failed to generate scan entry for root")?
.context("root probably doesn't exist, or is ignored?")?,
);
return Ok(entries);
}
let mut walker = WalkBuilder::new(root);
walker
.standard_filters(false)
@ -108,8 +121,6 @@ pub fn scan(root: &Path, ignores: &Vec<String>) -> eyre::Result<PatriciaMap<Scan
}
let walker = walker.build();
let mut entries: PatriciaMap<ScanEntry> = PatriciaMap::new();
for entry in walker {
let entry = entry?;

View File

@ -5,6 +5,7 @@ use fastcdc::v2020::{FastCDC, StreamCDC};
use flume::{Receiver, RecvError, SendError, Sender};
use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::pin::Pin;
@ -47,7 +48,10 @@ pub struct StoringState {
}
impl StoringState {
pub async fn new(pwc: Arc<PileWithCache<BoxedWormFileProvider>>) -> eyre::Result<Self> {
pub async fn new(
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
new_unflushed_chunks: Arc<DashSet<ChunkId>>,
) -> eyre::Result<Self> {
let compressor = match pwc.pile.pile_config.zstd_dict.as_ref() {
None => {
Compressor::new(get_zstd_level()).context("can't create dictless compressor")?
@ -59,7 +63,7 @@ impl StoringState {
let chunk_id_key = pwc.pile.pile_config.chunk_id_key;
Ok(StoringState {
cache_conn: pwc.localcache.read().await?,
new_unflushed_chunks: Arc::new(Default::default()),
new_unflushed_chunks,
new_bloblogs: vec![],
pwc,
chunk_id_key,
@ -130,6 +134,35 @@ impl StoringState {
Ok(slot.as_mut().unwrap())
}
/// For internal use only.
fn process_chunk(
&mut self,
chunk_bytes: &[u8],
result: &mut Vec<ChunkId>,
slot: &mut Option<BloblogWriter<Pin<Box<dyn WormFileWriter>>>>,
) -> eyre::Result<()> {
let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key);
result.push(chunk_id);
let is_new = Handle::current().block_on(async {
Ok::<bool, eyre::Report>(
self.cache_conn.is_chunk_new(chunk_id).await?
&& self.new_unflushed_chunks.insert(chunk_id),
)
})?;
if is_new {
let compressed_bytes = self.compressor.compress(&chunk_bytes)?;
Handle::current().block_on(async {
let writer = self.obtain_bloblog_writer(slot).await?;
writer.write_chunk(chunk_id, &compressed_bytes).await?;
Ok::<(), eyre::Report>(())
})?;
}
Ok(())
}
fn store_full_slice_returning_chunks(
&mut self,
store_slice: &[u8],
@ -137,26 +170,14 @@ impl StoringState {
) -> eyre::Result<Vec<ChunkId>> {
task::block_in_place(|| {
let mut result = Vec::new();
for chunk in FastCDC::new(store_slice, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) {
let chunk_bytes = &store_slice[chunk.offset..chunk.offset + chunk.length];
let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key);
result.push(chunk_id);
let is_new = Handle::current().block_on(async {
Ok::<bool, eyre::Report>(
self.cache_conn.is_chunk_new(chunk_id).await?
&& self.new_unflushed_chunks.insert(chunk_id),
)
})?;
self.process_chunk(chunk_bytes, &mut result, slot)?
}
if is_new {
let compressed_bytes = self.compressor.compress(&chunk_bytes)?;
Handle::current().block_on(async {
let writer = self.obtain_bloblog_writer(slot).await?;
writer.write_chunk(chunk_id, &compressed_bytes).await?;
Ok::<(), eyre::Report>(())
})?;
}
if result.is_empty() {
self.process_chunk(&[], &mut result, slot)?;
}
Ok(result)
@ -175,24 +196,11 @@ impl StoringState {
let chunk = chunk.context("failed to read in for StreamCDC")?;
let chunk_bytes = chunk.data.as_slice();
stream_length += chunk_bytes.len() as u64;
let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key);
result.push(chunk_id);
let is_new = Handle::current().block_on(async {
Ok::<bool, eyre::Report>(
self.cache_conn.is_chunk_new(chunk_id).await?
&& self.new_unflushed_chunks.insert(chunk_id),
)
})?;
self.process_chunk(chunk_bytes, &mut result, slot)?;
}
if is_new {
let compressed_bytes = self.compressor.compress(&chunk_bytes)?;
Handle::current().block_on(async {
let writer = self.obtain_bloblog_writer(slot).await?;
writer.write_chunk(chunk_id, &compressed_bytes).await?;
Ok::<(), eyre::Report>(())
})?;
}
if result.is_empty() {
self.process_chunk(&[], &mut result, slot)?;
}
Ok((result, stream_length))
@ -257,10 +265,13 @@ impl StoringState {
depth += 1;
}
Ok((RecursiveChunkRef {
chunk_id: chunk_ids[0],
depth,
}, stream_length))
Ok((
RecursiveChunkRef {
chunk_id: chunk_ids[0],
depth,
},
stream_length,
))
}
}
@ -276,14 +287,14 @@ async fn store_file(
Ok((chunkref, size_of_file as u64))
}
pub struct StoragePipeline {
result_rx: Receiver<(String, RecursiveChunkRef, u64)>,
pub struct StoragePipeline<JobName> {
result_rx: Receiver<(JobName, RecursiveChunkRef, u64)>,
join_set: JoinSet<eyre::Result<StoringIntermediate>>,
}
async fn storage_pipeline_worker(
job_rx: Receiver<(String, PathBuf)>,
result_tx: Sender<(String, RecursiveChunkRef, u64)>,
async fn storage_pipeline_worker<JobName: Debug>(
job_rx: Receiver<(JobName, PathBuf)>,
result_tx: Sender<(JobName, RecursiveChunkRef, u64)>,
mut storing_state: StoringState,
) -> eyre::Result<StoringIntermediate> {
let mut bloblog_writers = StoringBloblogWriters::default();
@ -292,22 +303,24 @@ async fn storage_pipeline_worker(
while let Ok((job_id, file_path)) = job_rx.recv_async().await {
let span = info_span!("store_file", file=?file_path);
let span_enter = span.enter();
// debug!("SPW job {job_id:?}");
let (rec_chunk_ref, file_length) =
store_file(&file_path, &mut storing_state, &mut bloblog_writers)
.await
.with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx
.send_async((job_id, rec_chunk_ref, file_length))
.await
{
bail!("Can't return result for {to_be_sent:?} — result_tx shut down.");
}
drop(span_enter);
drop(span);
async {
// debug!("SPW job {job_id:?}");
let (rec_chunk_ref, file_length) =
store_file(&file_path, &mut storing_state, &mut bloblog_writers)
.await
.with_context(|| format!("failed to store {file_path:?}"))?;
// debug!("SPW good {job_id:?}");
if let Err(SendError(to_be_sent)) = result_tx
.send_async((job_id, rec_chunk_ref, file_length))
.await
{
bail!("Can't return result for {to_be_sent:?} — result_tx shut down.");
}
Ok(())
}
.instrument(span)
.await?
}
debug!("SPW shutdown");
@ -322,11 +335,12 @@ fn get_zstd_level() -> i32 {
return 12;
}
impl StoragePipeline {
impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
pub async fn launch_new(
workers: u32,
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
) -> eyre::Result<(StoragePipeline, Sender<(String, PathBuf)>)> {
new_unflushed_chunks: Arc<DashSet<ChunkId>>,
) -> eyre::Result<(StoragePipeline<JobName>, Sender<(JobName, PathBuf)>)> {
let (job_tx, job_rx) = flume::bounded(16);
let (result_tx, result_rx) = flume::bounded(4);
@ -334,7 +348,9 @@ impl StoragePipeline {
for spw_num in 0..workers {
let job_rx = job_rx.clone();
let result_tx = result_tx.clone();
let storing_state = StoringState::new(pwc.clone()).await.context("failed to create storing state")?;
let storing_state = StoringState::new(pwc.clone(), new_unflushed_chunks.clone())
.await
.context("failed to create storing state")?;
// make a logging span for the Storage Pipeline Workers
let spw_span = info_span!("spw", n = spw_num);
join_set.spawn(
@ -359,48 +375,48 @@ impl StoragePipeline {
}
#[inline]
pub async fn next_result(&self) -> Result<(String, RecursiveChunkRef, u64), RecvError> {
pub async fn next_result(&self) -> Result<(JobName, RecursiveChunkRef, u64), RecvError> {
self.result_rx.recv_async().await
}
/// Must be sure that all results have been collected first.
pub async fn finish_into_chunkmaps(
mut self,
) -> eyre::Result<Vec<(BloblogId, BTreeMap<ChunkId, BlobLocator>)>> {
) -> eyre::Result<BTreeMap<BloblogId, IndexBloblogEntry>> {
if let Ok(msg) = self.result_rx.recv_async().await {
bail!("Haven't processed all results yet! {msg:?}");
}
let mut chunkmap = Vec::new();
let mut chunkmap = BTreeMap::new();
while let Some(join_resres) = self.join_set.join_next().await {
chunkmap.extend(join_resres??.new_bloblogs);
chunkmap.extend(join_resres??.new_bloblogs.into_iter().map(|(k, nb)| {
(
k,
IndexBloblogEntry {
chunks: nb,
forgotten_bytes: 0,
},
)
}));
}
Ok(chunkmap)
}
}
fn assemble_indices(chunkmap: Vec<(BloblogId, BTreeMap<ChunkId, BlobLocator>)>) -> Vec<Index> {
fn assemble_indices(chunkmap: BTreeMap<BloblogId, IndexBloblogEntry>) -> Vec<Index> {
let mut sorted_map = BTreeMap::new();
for (idx, chunkmap) in chunkmap.into_iter().enumerate() {
let size_of_chunkmap = chunkmap.1.len() + 1;
let size_of_chunkmap = chunkmap.1.chunks.len() + 1;
sorted_map.insert(Reverse((size_of_chunkmap, idx)), chunkmap);
}
let mut indices = Vec::new();
while let Some(k) = sorted_map.keys().cloned().next() {
let (Reverse((size, _)), (bloblog_id, bloblog_chunks)) =
sorted_map.remove_entry(&k).unwrap();
while let Some((Reverse((size, _)), (bloblog_id, bloblog_chunks))) = sorted_map.pop_first() {
let mut new_index_contents = BTreeMap::new();
new_index_contents.insert(
bloblog_id,
IndexBloblogEntry {
chunks: bloblog_chunks,
forgotten_bytes: 0,
},
);
new_index_contents.insert(bloblog_id, bloblog_chunks);
let mut new_index_size_so_far = size;
while new_index_size_so_far < DESIRED_INDEX_SIZE_ENTRIES && !sorted_map.is_empty() {
@ -417,13 +433,9 @@ fn assemble_indices(chunkmap: Vec<(BloblogId, BTreeMap<ChunkId, BlobLocator>)>)
let (Reverse((add_size, _)), (bloblog_id, bloblog_chunks)) =
sorted_map.remove_entry(&k).unwrap();
new_index_size_so_far += add_size;
new_index_contents.insert(
bloblog_id,
IndexBloblogEntry {
chunks: bloblog_chunks,
forgotten_bytes: 0,
},
);
new_index_contents.insert(bloblog_id, bloblog_chunks);
} else {
break;
}
}
@ -458,7 +470,7 @@ async fn write_indices(
pub async fn assemble_and_write_indices(
pwc: &PileWithCache<BoxedWormFileProvider>,
chunkmap: Vec<(BloblogId, BTreeMap<ChunkId, BlobLocator>)>,
chunkmap: BTreeMap<BloblogId, IndexBloblogEntry>,
) -> eyre::Result<()> {
let indices = assemble_indices(chunkmap);
write_indices(pwc, indices).await

View File

@ -1 +1,4 @@
pub mod delete_unrefd_bloblogs;
pub mod forget_chunks;
pub mod merge_indices;
pub mod repack_bloblogs_and_indices;

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,171 @@
use crate::extract::expand_chunkrefs_one_layer;
use crate::pile_with_cache::PileWithCache;
use eyre::{bail, ensure, Context, ContextCompat};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use tracing::info;
use yama_midlevel_crypto::chunk_id::ChunkId;
use yama_pile::definitions::IndexId;
use yama_pile::tree::TreeNode;
use yama_wormfile::boxed::BoxedWormFileProvider;
pub async fn find_forgettable_chunks(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
indices: BTreeSet<IndexId>,
) -> eyre::Result<BTreeSet<ChunkId>> {
let mut unseen_chunk_ids = BTreeSet::new();
// Find all chunks in the given indices
{
let mut cache_conn = pwc.localcache.read().await?;
for index_id in &indices {
unseen_chunk_ids.extend(cache_conn.list_chunks_in_index(*index_id).await?);
}
};
let chunks_to_scan = prepare_chunkrefs_to_scan(pwc).await?;
scan_chunks(pwc, &mut unseen_chunk_ids, chunks_to_scan)
.await
.context("failed to do a sweep")?;
Ok(unseen_chunk_ids)
}
async fn prepare_chunkrefs_to_scan(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
) -> eyre::Result<BTreeMap<u32, BTreeSet<ChunkId>>> {
let pointer_names = pwc
.pile
.list_pointers()
.await
.context("failed to list pointers")?;
let mut chunks_to_scan_by_depth: BTreeMap<u32, BTreeSet<ChunkId>> = BTreeMap::new();
for pointer_name in &pointer_names {
let pointer = pwc
.pile
.read_pointer(pointer_name)
.await?
.context("pointer vanished")?;
if let Some(parent_name) = pointer.parent {
if !pointer_names.contains(pointer_name) {
bail!("{parent_name:?}, the parent of {pointer_name:?}, does not exist");
}
}
pointer
.root
.node
.visit(
&mut |node, _| {
if let TreeNode::NormalFile { content, .. } = node {
chunks_to_scan_by_depth
.entry(content.depth)
.or_default()
.insert(content.chunk_id);
}
Ok(())
},
String::new(),
)
.unwrap();
}
Ok(chunks_to_scan_by_depth)
}
/// Scans the recursive chunkrefs that are passed in, ticking off chunks from the `unseen` set as
/// we go.
async fn scan_chunks(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
unseen: &mut BTreeSet<ChunkId>,
chunks_to_scan_by_depth: BTreeMap<u32, BTreeSet<ChunkId>>,
) -> eyre::Result<()> {
let mut to_scan: Vec<(u32, Vec<ChunkId>)> = chunks_to_scan_by_depth
.into_iter()
.flat_map(|(depth, chunkset)| {
chunkset
.into_iter()
.map(move |chunk_id| (depth, vec![chunk_id]))
})
.collect();
while !to_scan.is_empty() {
// Mark as seen.
for (_, chunk_ids) in &to_scan {
for chunk_id in chunk_ids {
unseen.remove(chunk_id);
}
}
// Don't descend further into zero-depth elements.
to_scan = to_scan
.into_iter()
.filter(|(depth, _)| *depth > 0)
.collect();
// Decrement depth counters.
to_scan = expand_chunkrefs_one_layer(pwc, to_scan)
.await?
.into_iter()
.map(|(old_depth, chunkids)| (old_depth - 1, chunkids))
.collect();
}
Ok(())
}
pub async fn forget_chunks(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
indices: BTreeSet<IndexId>,
forgettable: BTreeSet<ChunkId>,
) -> eyre::Result<()> {
let mut indices_to_rewrite = Vec::new();
// First do a cache-only check to see which indices need rewriting.
{
let mut cache_conn = pwc.localcache.read().await?;
for index_id in &indices {
let chunks_in_this_index = cache_conn.list_chunks_in_index(*index_id).await?;
if !chunks_in_this_index.is_disjoint(&forgettable) {
indices_to_rewrite.push(index_id);
}
}
}
info!(
"{} indices to rewrite in order to forget chunks",
indices_to_rewrite.len()
);
// Go through each index and clean out whatever needs forgetting (then re-create the index and
// remove the old one).
for index_id in indices_to_rewrite {
let mut index = pwc.pile.read_index(*index_id).await?;
let mut changed = false;
for bloblog_entry in index.bloblogs.values_mut() {
let removable: Vec<ChunkId> = bloblog_entry
.chunks
.keys()
.filter(|ci| forgettable.contains(ci))
.cloned()
.collect();
changed |= !removable.is_empty();
for chunk_id in removable {
bloblog_entry.forgotten_bytes +=
bloblog_entry.chunks.remove(&chunk_id).unwrap().length;
}
}
ensure!(changed, "no change to index {index_id:?}");
index.supersedes.clear();
index.supersedes.insert(*index_id);
// TODO APPLY THE NEW INDEX DIRECTLY (how do we do that again?)
let new_index_id = pwc.pile.create_index(&index).await?;
ensure!(new_index_id != *index_id, "index ID bounce");
pwc.pile.delete_index_dangerous_exclusive(*index_id).await?;
}
Ok(())
}

View File

@ -0,0 +1,127 @@
use crate::pile_with_cache::PileWithCache;
use eyre::{bail, Context};
use std::collections::btree_map::Entry;
use std::collections::BTreeSet;
use std::sync::Arc;
use tracing::{debug, warn};
use yama_pile::definitions::{Index, IndexId};
use yama_wormfile::boxed::BoxedWormFileProvider;
pub const MERGE_THRESHOLD_SIZE: u32 = 2 * 1024 * 1024;
pub const MERGE_TARGET_SIZE: u32 = 16 * 1024 * 1024;
/// Selects indices for merge.
///
/// Criteria:
/// - size is less than the `threshold_size`
/// - (FUTURE; TODO) two indices that cover the same bloblog should be merged
pub async fn select_indices_for_merge(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
target_size: u32,
threshold_size: u32,
) -> eyre::Result<Vec<BTreeSet<IndexId>>> {
let mut result = Vec::new();
let mut mergeable_indices: BTreeSet<(u64, IndexId)> = pwc
.pile
.list_indices_with_meta()
.await?
.into_iter()
.filter(|(_, meta)| meta.file_size < threshold_size as u64)
.map(|(index_id, meta)| (meta.file_size, index_id))
.collect();
while mergeable_indices.len() >= 2 {
let mut merge_set = BTreeSet::new();
let mut merge_size = 0u64;
let (first_size, first_index) = mergeable_indices.pop_first().unwrap();
merge_size += first_size;
merge_set.insert(first_index);
while let Some((size, index)) = mergeable_indices.first() {
if merge_size + *size < target_size as u64 {
merge_size += *size;
merge_set.insert(*index);
mergeable_indices.pop_first();
} else {
break;
}
}
if merge_set.len() > 1 {
result.push(merge_set);
}
}
Ok(result)
}
/// Merges some indices, deleting them in the process.
/// Requires exclusive lock.
/// (Note: in the future we could only supersede the indices, which only needs a shared lock.
/// However you need an exclusive lock to eventually delete superseded indices...).
pub async fn merge_indices(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
merge_sets: Vec<BTreeSet<IndexId>>,
) -> eyre::Result<()> {
for merge_set in merge_sets {
let mut final_index = Index {
supersedes: merge_set.clone(),
bloblogs: Default::default(),
};
for index_id in &merge_set {
let index_being_subsumed = pwc.pile.read_index(*index_id).await?;
// TODO: do we need to worry about the 'supersedes' property on the index here?
// I think not, or at least not if the superseded indices don't exist,
// but worth thinking about in the future if we don't immediately delete
// superseded indices...
for (bloblog_id, bloblog_entry) in index_being_subsumed.bloblogs {
match final_index.bloblogs.entry(bloblog_id) {
Entry::Vacant(ve) => {
ve.insert(bloblog_entry);
}
Entry::Occupied(mut oe) => {
let new_entry = oe.get_mut();
let (existing_chunks, new_chunks): (Vec<_>, Vec<_>) = bloblog_entry
.chunks
.into_iter()
.partition(|(chunk_id, _)| new_entry.chunks.contains_key(chunk_id));
for (chunk_id, locator) in new_chunks {
// Subtract from the forgotten byte count, since this may be us re-remembering bytes out of safety...
new_entry.forgotten_bytes =
new_entry.forgotten_bytes.saturating_sub(locator.length);
let is_new = new_entry.chunks.insert(chunk_id, locator).is_none();
assert!(is_new);
}
for (chunk_id, locator) in existing_chunks {
if &new_entry.chunks[&chunk_id] != &locator {
bail!("Attempted to merge indices that disagree about {bloblog_id:?}/{chunk_id:?}");
}
}
}
}
}
}
let merged_index_id = pwc
.pile
.create_index(&final_index)
.await
.context("failed to create merged index")?;
if merge_set.contains(&merged_index_id) {
// I don't see how this could be possible, but let's avoid deleting the new index if it somehow is a merge of itself...
warn!("strange: created index ID is one of its own merges...");
continue;
}
debug!("merged indices {merge_set:?} into {merged_index_id:?}; deleting mergees");
for index_to_delete in merge_set {
pwc.pile
.delete_index_dangerous_exclusive(index_to_delete)
.await?;
}
}
Ok(())
}

View File

@ -0,0 +1,191 @@
use crate::pile_with_cache::PileWithCache;
use crate::storing::assemble_and_write_indices;
use eyre::ContextCompat;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use yama_localcache::BloblogStats;
use yama_midlevel_crypto::chunk_id::ChunkId;
use yama_pile::definitions::{BloblogId, IndexBloblogEntry};
use yama_wormfile::boxed::BoxedWormFileProvider;
/// Repack bloblogs that have this much forgotten space in them.
pub const REPACK_BLOBLOGS_TO_RECLAIM_SPACE_BYTES: u64 = 32 * 1024 * 1024;
/// Defines what a 'small bloblog' is (one that is below a certain size, excluding forgotten bytes).
pub const SMALL_BLOBLOG_THRESHOLD: u64 = 64 * 1024 * 1024;
/// Clump together small bloblogs when together they would hit or exceed this size.
pub const REPACK_BLOBLOGS_TO_CLUMP_TOGETHER_SMALL_BLOBLOGS_BYTES: u64 = 2 * 1024 * 1024 * 1024;
/// The target size to reach when repacking, in terms of blob bytes.
pub const REPACK_TARGET_SIZE: u64 = 4 * 1024 * 1024;
/// The limit size to use when repacking, in terms of blob bytes.
pub const REPACK_TARGET_LIMIT: u64 = 5 * 1024 * 1024;
/// Gets bloblogs' stats. Only considers bloblogs referenced by exactly one index, so we don't
/// have to deal with unifying indices.
pub async fn get_bloblogs_stats(
pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
) -> eyre::Result<BTreeMap<BloblogId, BloblogStats>> {
let mut cache_conn = pwc.localcache.read().await?;
let indices = cache_conn.list_indices().await?;
let mut bloblogs: BTreeMap<BloblogId, Option<BloblogStats>> = BTreeMap::new();
for index in indices {
for (bloblog, stats) in cache_conn.index_bloblog_stats(index).await? {
match bloblogs.entry(bloblog) {
Entry::Vacant(ve) => {
ve.insert(Some(stats));
}
Entry::Occupied(mut oe) => {
// only allow one stats per bloblog, then replace with None.
oe.insert(None);
}
}
}
}
Ok(bloblogs
.into_iter()
.flat_map(|(k, v)| v.map(|v| (k, v)))
.collect())
}
/// Choose some bloblogs to repack. Assumes an updated local cache.
///
/// Only bloblogs referenced by exactly one index will be considered for repacking.
pub async fn select_bloblogs_for_repack(
stats: BTreeMap<BloblogId, BloblogStats>,
) -> eyre::Result<Vec<BTreeMap<BloblogId, BloblogStats>>> {
let mut repack_for_space: BTreeSet<BloblogId> = stats
.iter()
.filter(|(_, v)| v.forgotten_bytes >= REPACK_BLOBLOGS_TO_RECLAIM_SPACE_BYTES)
.map(|(&k, _)| k)
.collect();
let maybe_repack_for_clumping: BTreeSet<BloblogId> = stats
.iter()
.filter(|(_, v)| v.blob_size <= SMALL_BLOBLOG_THRESHOLD)
.map(|(&k, _)| k)
.collect();
let should_repack_for_clumping = maybe_repack_for_clumping.len() > 1
&& maybe_repack_for_clumping
.iter()
.map(|bi| stats[bi].blob_size)
.sum::<u64>()
> REPACK_BLOBLOGS_TO_CLUMP_TOGETHER_SMALL_BLOBLOGS_BYTES;
let to_repack = repack_for_space.clone();
if should_repack_for_clumping {
repack_for_space.extend(maybe_repack_for_clumping);
}
let mut to_repack: BTreeSet<(u64, BloblogId)> = to_repack
.into_iter()
.map(|bi| (stats[&bi].blob_size, bi))
.collect();
let mut repack_sets = Vec::new();
while !to_repack.is_empty() {
let mut new_repack_group = BTreeMap::new();
let mut new_repack_group_size = 0u64;
let (first_sz, first_to_repack) = to_repack.pop_last().unwrap();
new_repack_group_size += first_sz;
new_repack_group.insert(first_to_repack, stats[&first_to_repack].clone());
while new_repack_group_size < REPACK_TARGET_SIZE {
let Some((first_size, _)) = to_repack.first() else { break; };
if new_repack_group_size + *first_size > REPACK_TARGET_LIMIT {
break;
}
let (extra_size, extra_bloblog_id) = to_repack.pop_first().unwrap();
new_repack_group_size += extra_size;
new_repack_group.insert(extra_bloblog_id, stats[&extra_bloblog_id].clone());
}
// now check the repack group is good
if new_repack_group
.keys()
.any(|bi| repack_for_space.contains(bi))
|| new_repack_group_size > REPACK_BLOBLOGS_TO_CLUMP_TOGETHER_SMALL_BLOBLOGS_BYTES
{
repack_sets.push(new_repack_group);
}
}
Ok(repack_sets)
}
pub async fn perform_repack(
pwc: Arc<PileWithCache<BoxedWormFileProvider>>,
repack_sets: Vec<BTreeMap<BloblogId, BloblogStats>>,
) -> eyre::Result<()> {
// 1. Write new bloblogs
let mut indices_buffer = BTreeMap::new();
let mut index_parts: BTreeMap<BloblogId, IndexBloblogEntry> = BTreeMap::new();
for repack_set in &repack_sets {
let mut new_bloblog = pwc.pile.create_bloblog().await?;
for (old_bloblog_id, old_bloblog_stats) in repack_set {
let index_id = old_bloblog_stats.in_index;
if !indices_buffer.contains_key(&index_id) {
indices_buffer.insert(index_id, pwc.pile.read_index(index_id).await?);
}
let index_bloblog_entry = indices_buffer
.get_mut(&index_id)
.unwrap()
.bloblogs
.remove(&old_bloblog_id)
.context("bug: no IBE despite rewrite from context of this index")?;
let mut old_bloblog = pwc.pile.read_bloblog(*old_bloblog_id).await?;
let locators: BTreeMap<u64, ChunkId> = index_bloblog_entry
.chunks
.into_iter()
.map(|(blob, locator)| (locator.offset, blob))
.collect();
for chunk_id in locators.into_values() {
let chunk = old_bloblog
.read_chunk(chunk_id)
.await?
.context("bug or corrupt bloblog: promised chunk missing")?;
new_bloblog.write_chunk(chunk_id, &chunk).await?;
}
}
let (_wormpath, new_bloblog_id, new_bloblog_index_info) = new_bloblog.finish().await?;
index_parts.insert(
new_bloblog_id,
IndexBloblogEntry {
chunks: new_bloblog_index_info,
forgotten_bytes: 0,
},
);
}
// 2. Write new indices, but make sure to also write out index entries for unaffected bloblogs
// that appear in the indices we want to replace shortly.
for (_, index) in indices_buffer.iter_mut() {
index_parts.extend(std::mem::take(&mut index.bloblogs));
}
assemble_and_write_indices(&pwc, index_parts).await?;
// 3. Delete old indices
for index_id in indices_buffer.into_keys() {
pwc.pile.delete_index_dangerous_exclusive(index_id).await?;
}
// 4. Delete old bloblogs
for repack_group in repack_sets {
for bloblog_id in repack_group.into_keys() {
pwc.pile
.delete_bloblog_dangerous_exclusive(bloblog_id)
.await?;
}
}
Ok(())
}

View File

@ -6,23 +6,32 @@ CREATE TABLE indices (
);
CREATE UNIQUE INDEX indices_index_sha256 ON indices(index_sha256);
CREATE TABLE blobs (
chunk_id TEXT NOT NULL,
bloblog_short_id INTEGER NOT NULL REFERENCES bloblogs(bloblog_short_id),
index_short_id INTEGER NOT NULL REFERENCES indices(index_short_id),
offset INTEGER NOT NULL,
size INTEGER NOT NULL,
PRIMARY KEY (chunk_id, bloblog_short_id, index_short_id)
);
CREATE INDEX blobs_bloblog_short_id ON blobs(bloblog_short_id);
CREATE INDEX blobs_index_short_id ON blobs(index_short_id);
CREATE TABLE bloblogs (
bloblog_short_id INTEGER PRIMARY KEY NOT NULL,
bloblog_sha256 TEXT NOT NULL
);
CREATE UNIQUE INDEX bloblogs_bloblog_sha256 ON bloblogs(bloblog_sha256);
-- Track the relationship between indices and bloblogs
CREATE TABLE indices_bloblogs (
index_short_id INTEGER NOT NULL REFERENCES indices(index_short_id),
bloblog_short_id INTEGER NOT NULL REFERENCES bloblogs(bloblog_short_id),
forgotten_bytes INTEGER NOT NULL,
PRIMARY KEY (index_short_id, bloblog_short_id)
);
CREATE TABLE blobs (
chunk_id TEXT NOT NULL,
bloblog_short_id INTEGER NOT NULL,
index_short_id INTEGER NOT NULL,
offset INTEGER NOT NULL,
size INTEGER NOT NULL,
PRIMARY KEY (chunk_id, bloblog_short_id, index_short_id),
FOREIGN KEY (index_short_id, bloblog_short_id) REFERENCES indices_bloblogs(index_short_id, bloblog_short_id)
);
CREATE INDEX blobs_bloblog_short_id ON blobs(bloblog_short_id);
CREATE INDEX blobs_index_short_id ON blobs(index_short_id);
CREATE TABLE indices_supersede (
superseded_sha256 TEXT NOT NULL,
successor_sha256 TEXT NOT NULL REFERENCES indices(index_sha256),

View File

@ -150,6 +150,13 @@ impl StoreConnection<true> {
Some(row) => row.bloblog_short_id,
};
let forgotten_bytes = index_bloblog_entry.forgotten_bytes as i64;
query!("
INSERT INTO indices_bloblogs (index_short_id, bloblog_short_id, forgotten_bytes)
VALUES (?, ?, ?)
", index_short_id, bloblog_short_id, forgotten_bytes)
.execute(&mut *txn)
.await?;
for (chunk_id, chunk_locator) in index_bloblog_entry.chunks.iter() {
let chunk_id_txt = chunk_id.to_string();
@ -201,6 +208,15 @@ impl StoreConnection<true> {
.execute(&mut *txn)
.await?;
query!(
"
DELETE FROM indices_bloblogs WHERE index_short_id = ?
",
index_short_id
)
.execute(&mut *txn)
.await?;
query!(
"
DELETE FROM indices WHERE index_short_id = ?
@ -255,6 +271,8 @@ impl<const RW: bool> StoreConnection<RW> {
}
}
/// Returns all chunk locations.
/// If a chunk does not exist, it is just not returned in the output map.
pub async fn locate_chunks(
&mut self,
chunk_ids: &BTreeSet<ChunkId>,
@ -332,4 +350,59 @@ impl<const RW: bool> StoreConnection<RW> {
.is_none();
Ok(is_new)
}
pub async fn list_chunks_in_index(
&mut self,
index_id: IndexId,
) -> eyre::Result<BTreeSet<ChunkId>> {
let index_id_text = index_id.to_string();
let row_results = query!(
"
SELECT chunk_id AS \"chunk_id!\" FROM indices i
LEFT JOIN blobs b USING (index_short_id)
WHERE index_sha256 = ?
",
index_id_text
)
.map(|row| {
ChunkId::from_str(&row.chunk_id).context("failed to decode ChunkId in local cache")
})
.fetch_all(&mut *self.conn)
.await?;
row_results.into_iter().collect()
}
pub async fn index_bloblog_stats(
&mut self,
index_id: IndexId,
) -> eyre::Result<BTreeMap<BloblogId, BloblogStats>> {
let index_id_text = index_id.to_string();
let row_results = query!("
SELECT bloblog_sha256 AS bloblog_id, ib.forgotten_bytes AS forgotten_bytes, COUNT(size) AS \"num_chunks!: i64\", SUM(size) AS \"num_bytes!: i64\" FROM indices i
LEFT JOIN indices_bloblogs ib USING (index_short_id)
LEFT JOIN bloblogs b USING (bloblog_short_id)
LEFT JOIN blobs USING (index_short_id, bloblog_short_id)
WHERE index_sha256 = ?
GROUP BY bloblog_sha256
", index_id_text)
.map(|row| {
Ok((BloblogId::try_from(row.bloblog_id.as_ref())?, BloblogStats {
in_index: index_id,
blob_size: row.num_bytes as u64,
forgotten_bytes: row.forgotten_bytes as u64,
num_chunks: row.num_chunks as u32,
}))
})
.fetch_all(&mut *self.conn)
.await?;
row_results.into_iter().collect()
}
}
#[derive(Clone, Debug)]
pub struct BloblogStats {
pub in_index: IndexId,
pub blob_size: u64,
pub forgotten_bytes: u64,
pub num_chunks: u32,
}

View File

@ -26,7 +26,7 @@ pub struct BloblogFooter {
pub type PackedBloblogFooter = AsymBox<Zstd<CborSerde<BloblogFooter>>>;
/// Locator for a blob within a bloblog.
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub struct BlobLocator {
pub offset: u64,
pub length: u64,
@ -104,7 +104,7 @@ pub struct IndexBloblogEntry {
pub type PackedIndex = AsymBox<Zstd<CborSerde<Index>>>;
#[derive(Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct RecursiveChunkRef {
/// The root Chunk ID.
pub chunk_id: ChunkId,

View File

@ -25,29 +25,58 @@ pub struct Keyring {
pub fn generate_r_w_keys() -> (ReaderKey, WriterKey) {
let (encrypt, decrypt) = generate_asym_keypair();
let (sign, verify) = asym_signing_keypair();
(ReaderKey { decrypt, verify }, WriterKey { encrypt, sign })
(
ReaderKey::new(decrypt, verify),
WriterKey::new(encrypt, sign),
)
}
#[derive(Clone, Serialize, Deserialize)]
pub struct WriterKey {
// boxed because these take up a lot of stack space otherwise!
#[serde(flatten)]
inner: Box<WriterKeyInner>,
}
#[derive(Clone, Serialize, Deserialize)]
struct WriterKeyInner {
encrypt: EncryptingKey,
sign: SigningKey,
}
impl WriterKey {
pub fn new(encrypt: EncryptingKey, sign: SigningKey) -> Self {
Self {
inner: Box::new(WriterKeyInner { encrypt, sign }),
}
}
pub fn make_locked_asymbox<T: ByteLayer>(&self, contents: T) -> AsymBox<T> {
AsymBox::new(contents, &self.sign, &self.encrypt).unwrap()
AsymBox::new(contents, &self.inner.sign, &self.inner.encrypt).unwrap()
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ReaderKey {
// boxed because these take up a lot of stack space otherwise!
#[serde(flatten)]
inner: Box<ReaderKeyInner>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ReaderKeyInner {
decrypt: DecryptingKey,
verify: VerifyingKey,
}
impl ReaderKey {
pub fn new(decrypt: DecryptingKey, verify: VerifyingKey) -> Self {
Self {
inner: Box::new(ReaderKeyInner { decrypt, verify }),
}
}
pub fn unlock_asymbox<T: ByteLayer>(&self, asymbox: AsymBox<T>) -> Option<T> {
asymbox.unlock(&self.decrypt, &self.verify)
asymbox.unlock(&self.inner.decrypt, &self.inner.verify)
}
}

View File

@ -8,13 +8,13 @@ use crate::locks::{LockHandle, LockKind};
use crate::pointers::{PackedPointer, Pointer};
use crate::utils::HashedWormWriter;
use eyre::{bail, Context, ContextCompat};
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use yama_midlevel_crypto::byte_layer::{ByteLayer, CborSerde};
use yama_midlevel_crypto::zstd_box::Zstd;
use yama_wormfile::paths::{WormPath, WormPathBuf};
use yama_wormfile::{WormFileProvider, WormFileWriter};
use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileWriter};
pub mod definitions;
@ -109,10 +109,35 @@ impl<WFP: WormFileProvider + 'static> Pile<WFP> {
Ok(BloblogReader::new(worm_reader, &self.keyring).await?)
}
/// Delete a bloblog from the pile.
/// This is dangerous: should only really be done in the vacuum operation.
/// Requires an exclusive lock!
///
///
pub async fn delete_bloblog_dangerous_exclusive(
&self,
bloblog_id: BloblogId,
) -> eyre::Result<()> {
if !self.lock.is_active_now(LockKind::Exclusive) {
bail!("can't delete bloblog: exclusive lock not active");
}
let bloblog_path = WormPathBuf::new(format!(
"bloblogs/{}/{}",
hex::encode(&bloblog_id.0 .0[0..1]),
bloblog_id.0.to_string()
))
.unwrap();
self.provider.delete(bloblog_path.as_ref()).await?;
Ok(())
}
/// Create a new index, returning the index ID.
///
/// Requires key: w_bloblog_footer
pub async fn create_index(&self, index: &Index) -> eyre::Result<IndexId> {
if !self.lock.is_active_now(LockKind::Shared) {
bail!("can't create index: lock not active");
}
let worm_writer = self.provider.write().await?;
let mut writer = HashedWormWriter::new(worm_writer);
let packed_index: PackedIndex = self
@ -133,6 +158,9 @@ impl<WFP: WormFileProvider + 'static> Pile<WFP> {
/// List all indices present in the pile.
pub async fn list_indices(&self) -> eyre::Result<BTreeSet<IndexId>> {
if !self.lock.is_active_now(LockKind::Shared) {
bail!("can't list indices: lock not active");
}
let files = self
.provider
.list(WormPath::new("indices").unwrap())
@ -152,10 +180,37 @@ impl<WFP: WormFileProvider + 'static> Pile<WFP> {
Ok(result)
}
/// List all indices present in the pile, with their metadata.
pub async fn list_indices_with_meta(&self) -> eyre::Result<BTreeMap<IndexId, WormFileMeta>> {
if !self.lock.is_active_now(LockKind::Shared) {
bail!("can't list indices: lock not active");
}
let files = self
.provider
.list_meta(WormPath::new("indices").unwrap())
.await
.context("failed to list indices")?;
let mut result = BTreeMap::new();
for (file, meta) in files {
let (_, filename) = file
.as_ref()
.as_str()
.rsplit_once('/')
.context("index listing entry should split at /")?;
let index_id = IndexId::try_from(filename)
.with_context(|| format!("not a valid index ID: {filename:?}"))?;
result.insert(index_id, meta);
}
Ok(result)
}
/// Read an index from the pile.
///
/// Requires key: r_bloblog_footer
pub async fn read_index(&self, index_id: IndexId) -> eyre::Result<Index> {
if !self.lock.is_active_now(LockKind::Shared) {
bail!("can't read index: lock not active");
}
let r_bloblog_footer = self
.keyring
.r_bloblog_footer
@ -174,6 +229,20 @@ impl<WFP: WormFileProvider + 'static> Pile<WFP> {
Ok(index)
}
/// Delete an index from the pile.
/// This is dangerous: should only really be done in the vacuum operation.
/// Requires an exclusive lock!
///
///
pub async fn delete_index_dangerous_exclusive(&self, index_id: IndexId) -> eyre::Result<()> {
if !self.lock.is_active_now(LockKind::Exclusive) {
bail!("can't delete index: exclusive lock not active");
}
let target = WormPathBuf::new(format!("indices/{}", index_id.0)).unwrap();
self.provider.delete(target.as_ref()).await?;
Ok(())
}
pub async fn read_pointer(&self, name: &str) -> eyre::Result<Option<Pointer>> {
let r_pointer = self
.keyring
@ -234,14 +303,33 @@ impl<WFP: WormFileProvider + 'static> Pile<WFP> {
Ok(())
}
pub async fn close(mut self) -> eyre::Result<()> {
pub async fn list_pointers(&self) -> eyre::Result<Vec<String>> {
let files = self
.provider
.list(WormPath::new("pointers").unwrap())
.await?;
Ok(files
.into_iter()
.map(|file| {
let (_dir, pointer) = file.as_ref().as_str().rsplit_once('/').unwrap();
pointer.to_owned()
})
.collect())
}
pub async fn close(self) -> eyre::Result<()> {
match Arc::try_unwrap(self.lock) {
Ok(lock) => {
lock.close().await
lock.close()
.await
.context("failed to release lock gracefully")?;
}
Err(arc) => {
bail!("could not close pile gracefully; lock arc has {} strong refs and {} weak refs", Arc::strong_count(&arc), Arc::weak_count(&arc));
bail!(
"could not close pile gracefully; lock arc has {} strong refs and {} weak refs",
Arc::strong_count(&arc),
Arc::weak_count(&arc)
);
}
}
Ok(())

View File

@ -1,6 +1,6 @@
use crate::keyring::{Keyring, ReaderKey, WriterKey};
use chrono::{DateTime, Duration, Utc};
use eyre::{bail, Context, ContextCompat, eyre};
use eyre::{bail, eyre, Context, ContextCompat};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
@ -74,19 +74,23 @@ pub struct LockHandle {
impl Drop for LockHandle {
fn drop(&mut self) {
if let Some(lock_release_tx) = self.lock_release_tx
.take() {
lock_release_tx
.send(())
.expect("can't drop lock");
if let Some(lock_release_tx) = self.lock_release_tx.take() {
lock_release_tx.send(()).expect("can't drop lock");
}
}
}
impl LockHandle {
pub async fn close(mut self) -> eyre::Result<()> {
self.lock_release_tx.take().unwrap().send(()).map_err(|_| eyre!("can't drop lock"))?;
self.lock_task_join_handle.take().unwrap().await
self.lock_release_tx
.take()
.unwrap()
.send(())
.map_err(|_| eyre!("can't drop lock"))?;
self.lock_task_join_handle
.take()
.unwrap()
.await
.context("lock task fail")?;
Ok(())
}
@ -141,7 +145,7 @@ impl LockHandle {
let stage1_locks = scan_locks(provider.as_ref(), &r_locks, now).await?;
if let Some(blocker) = find_lock_blocker(&stage1_locks, &lock_id, kind) {
let lock = &stage1_locks[blocker];
warn!("{:?} lock {} held by {} currently expiring at {} is blocking our potential lock.", lock.kind, lock_id, lock.holder, lock.expires_at);
warn!("{:?} lock {} held by {:?} currently expiring at {} is blocking our potential lock.", lock.kind, lock_id, lock.holder, lock.expires_at);
tokio::time::sleep(tokio::time::Duration::from_secs(
(lock.expires_at - now).num_seconds().max(0) as u64 + 10,
@ -161,7 +165,7 @@ impl LockHandle {
let stage2_locks = scan_locks(provider.as_ref(), &r_locks, now).await?;
if let Some(blocker) = find_lock_blocker(&stage2_locks, &lock_id, kind) {
let lock = &stage2_locks[blocker];
warn!("{:?} lock {} held by {} currently expiring at {} blocked our lock; backing out.", lock.kind, lock_id, lock.holder, lock.expires_at);
warn!("{:?} lock {} held by {:?} currently expiring at {} blocked our lock; backing out.", lock.kind, lock_id, lock.holder, lock.expires_at);
// Back out our lock.
provider.delete(lock_path.as_ref()).await?;
@ -197,7 +201,7 @@ impl LockHandle {
lock_path,
lock_id,
lock_release_tx: Some(lock_release_tx),
lock_task_join_handle
lock_task_join_handle,
});
}
}

View File

@ -41,7 +41,9 @@ pub struct RootTreeNode {
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(tag = "t")]
pub enum TreeNode {
#[serde(rename = "F")]
NormalFile {
/// modification time in ms
mtime: u64,
@ -53,6 +55,7 @@ pub enum TreeNode {
#[serde(flatten)]
content: RecursiveChunkRef,
},
#[serde(rename = "D")]
Directory {
#[serde(flatten)]
ownership: FilesystemOwnership,
@ -60,12 +63,14 @@ pub enum TreeNode {
permissions: FilesystemPermissions,
children: BTreeMap<String, TreeNode>,
},
#[serde(rename = "L")]
SymbolicLink {
#[serde(flatten)]
ownership: FilesystemOwnership,
target: String,
},
// TODO is there any other kind of file we need to store?
#[serde(rename = "X")]
Deleted,
}

View File

@ -131,6 +131,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for SymStreamWriter<W> {
) -> Poll<Result<usize, io::Error>> {
let mut enc_buf = buf.to_vec();
// Safety: Deny use of unencrypted `buf` from here on.
#[allow(unused)]
let buf = ();
let offset = self.offset;
self.sym_stream_key.apply_xor(offset, &mut enc_buf);

View File

@ -1,5 +1,5 @@
use crate::paths::{WormPath, WormPathBuf};
use crate::{WormFileProvider, WormFileReader, WormFileWriter};
use crate::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter};
use async_trait::async_trait;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
@ -32,6 +32,7 @@ trait BoxableWormFileProvider: Debug + Send + Sync {
async fn is_dir_b(&self, path: &WormPath) -> eyre::Result<bool>;
async fn is_regular_file_b(&self, path: &WormPath) -> eyre::Result<bool>;
async fn list_b(&self, path: &WormPath) -> eyre::Result<Vec<WormPathBuf>>;
async fn list_meta_b(&self, path: &WormPath) -> eyre::Result<Vec<(WormPathBuf, WormFileMeta)>>;
async fn read_b(&self, path: &WormPath) -> eyre::Result<Pin<Box<dyn WormFileReader>>>;
async fn write_b(&self) -> eyre::Result<Pin<Box<dyn WormFileWriter>>>;
async fn delete_b(&self, path: &WormPath) -> eyre::Result<()>;
@ -51,6 +52,10 @@ impl<T: WormFileProvider> BoxableWormFileProvider for T {
self.list(path).await
}
async fn list_meta_b(&self, path: &WormPath) -> eyre::Result<Vec<(WormPathBuf, WormFileMeta)>> {
self.list_meta(path).await
}
async fn read_b(&self, path: &WormPath) -> eyre::Result<Pin<Box<dyn WormFileReader>>> {
self.read(path)
.await
@ -101,6 +106,14 @@ impl WormFileProvider for BoxedWormFileProvider {
self.inner.list_b(path).await
}
async fn list_meta(
&self,
path: impl AsRef<WormPath> + Send,
) -> eyre::Result<Vec<(WormPathBuf, WormFileMeta)>> {
let path = path.as_ref();
self.inner.list_meta_b(path).await
}
async fn read(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<Self::WormFileReader> {
let path = path.as_ref();
self.inner.read_b(path).await

View File

@ -32,6 +32,15 @@ pub trait WormFileProvider: Debug + Send + Sync {
/// TODO a streaming version of this might be beneficial.
async fn list(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<Vec<WormPathBuf>>;
/// Lists all the files and directories in the specified path, with metadata.
///
/// If the path does not exist, gives an error.
/// TODO a streaming version of this might be beneficial.
async fn list_meta(
&self,
path: impl AsRef<WormPath> + Send,
) -> eyre::Result<Vec<(WormPathBuf, WormFileMeta)>>;
/// Reads a file.
///
/// Fails if the file does not exist or is not a regular file.
@ -50,6 +59,11 @@ pub trait WormFileProvider: Debug + Send + Sync {
async fn delete(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<()>;
}
#[derive(Clone, Debug)]
pub struct WormFileMeta {
pub file_size: u64,
}
pub trait WormFileReader: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin + 'static {}
#[async_trait]

View File

@ -3,13 +3,14 @@ use eyre::Context as EyreContext;
use std::fmt::{Debug, Formatter};
use std::io;
use std::io::{ErrorKind, SeekFrom};
use std::os::unix::fs::MetadataExt;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf};
use yama_wormfile::paths::{WormPath, WormPathBuf};
use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter};
use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter};
/// WormFileProvider that uses the local filesystem, in a given root directory.
#[derive(Debug)]
@ -70,6 +71,34 @@ impl WormFileProvider for LocalWormFilesystem {
Ok(out)
}
async fn list_meta(
&self,
path: impl AsRef<WormPath> + Send,
) -> eyre::Result<Vec<(WormPathBuf, WormFileMeta)>> {
let worm_path = path.as_ref();
let real_path = self.resolve_real_path(worm_path);
let mut dir_reader = match tokio::fs::read_dir(real_path).await {
Ok(ok) => ok,
Err(e) if e.kind() == ErrorKind::NotFound => {
return Ok(Vec::new());
}
Err(other) => return Err(other.into()),
};
let mut out = Vec::new();
while let Some(next_ent) = dir_reader.next_entry().await? {
if let Some(name_str) = next_ent.file_name().to_str() {
let metadata = next_ent.metadata().await?;
out.push((
worm_path.join(name_str).unwrap(),
WormFileMeta {
file_size: metadata.size(),
},
));
}
}
Ok(out)
}
async fn read(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<Self::WormFileReader> {
let worm_path = path.as_ref();
let real_path = self.resolve_real_path(worm_path);

View File

@ -12,7 +12,7 @@ use tokio::io::{duplex, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, DuplexS
use tokio::task::JoinHandle;
use uuid::Uuid;
use yama_wormfile::paths::{WormPath, WormPathBuf};
use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter};
use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter};
/// WormFileProvider that uses an S3 bucket, with a given path prefix.
#[derive(Debug)]
@ -68,6 +68,18 @@ impl WormFileProvider for S3WormFilesystem {
}
async fn list(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<Vec<WormPathBuf>> {
Ok(self
.list_meta(path)
.await?
.into_iter()
.map(|(name, _meta)| name)
.collect())
}
async fn list_meta(
&self,
path: impl AsRef<WormPath> + Send,
) -> eyre::Result<Vec<(WormPathBuf, WormFileMeta)>> {
let path = path.as_ref();
let full_path = self.resolve_real_path(path);
let list = self
@ -84,6 +96,14 @@ impl WormFileProvider for S3WormFilesystem {
.strip_prefix(&self.path_prefix)
.map(|s| WormPathBuf::new(s.to_owned()))
.flatten()
.map(|x| {
(
x,
WormFileMeta {
file_size: obj.size,
},
)
})
})
.collect())
}

View File

@ -19,7 +19,7 @@ use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::runtime::Handle;
use yama_wormfile::paths::{WormPath, WormPathBuf};
use yama_wormfile::{WormFileProvider, WormFileReader, WormFileWriter};
use yama_wormfile::{WormFileMeta, WormFileProvider, WormFileReader, WormFileWriter};
/// WormFileProvider that uses an SFTP connection, in a given root directory.
#[derive(Debug)]
@ -227,6 +227,46 @@ impl WormFileProvider for SftpWormFilesystem {
.collect())
}
async fn list_meta(
&self,
path: impl AsRef<WormPath> + Send,
) -> eyre::Result<Vec<(WormPathBuf, WormFileMeta)>> {
let worm_path = path.as_ref();
let path = worm_path.as_str();
let mut fs = self.get_fs();
let mut remote_dir = match fs.open_dir(path).await {
Ok(ok) => ok,
Err(openssh_sftp_client::Error::SftpError(SftpErrorKind::NoSuchFile, _msg)) => {
return Ok(Vec::new());
}
Err(other) => {
return Err(other.into());
}
};
let dir_reader = remote_dir.read_dir().await?;
Ok(dir_reader
.iter()
.filter_map(|entry| {
if let Some(name_str) = entry.filename().as_os_str().to_str() {
if name_str.is_empty() || name_str == "." || name_str == ".." {
None
} else {
Some((
worm_path.join(name_str).expect("pre-checked"),
WormFileMeta {
file_size: entry.metadata().len().expect("no size on SFTP file?"),
},
))
}
} else {
None
}
})
.collect())
}
async fn read(&self, path: impl AsRef<WormPath> + Send) -> eyre::Result<Self::WormFileReader> {
let real_path = self.root_dir.join(path.as_ref().as_str());