Add core implementation of datman backup from remote source
This commit is contained in:
parent
998cca51eb
commit
40636a098a
@ -17,7 +17,7 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor, VirtualSourceKind};
|
||||
use crate::labelling::{label_node, load_labelling_rules, str_to_label, Label, State};
|
||||
use crate::tree::{scan, FileTree};
|
||||
use crate::tree::{scan, FileTree, FileTree1};
|
||||
use anyhow::anyhow;
|
||||
use arc_interner::ArcIntern;
|
||||
use chrono::{DateTime, Utc};
|
||||
@ -63,6 +63,65 @@ pub fn open_stdout_backup_process(
|
||||
Ok(child)
|
||||
}
|
||||
|
||||
pub fn label_filter_and_convert(
|
||||
tree: FileTree1<()>,
|
||||
descriptor: &Descriptor,
|
||||
desc_path: &Path,
|
||||
source_name: &str,
|
||||
dest: &DestPileDescriptor,
|
||||
) -> anyhow::Result<Option<TreeNode>> {
|
||||
info!("Labelling.");
|
||||
let mut tree = tree.replace_meta(&None);
|
||||
let labels = descriptor
|
||||
.labels
|
||||
.iter()
|
||||
.map(|l| Label(ArcIntern::new(l.clone())))
|
||||
.collect();
|
||||
let rules = load_labelling_rules(desc_path, source_name)?;
|
||||
label_node("".to_owned(), None, &mut tree, &labels, &rules)?;
|
||||
|
||||
let included_labels: HashSet<Label> = dest.included_labels.iter().map(str_to_label).collect();
|
||||
|
||||
info!("Filtering.");
|
||||
let mut unlabelled_included = false;
|
||||
if !tree.filter_inclusive(&mut |node| {
|
||||
match node.get_metadata().unwrap() {
|
||||
None => {
|
||||
// unlabelled -- include by default for safety
|
||||
unlabelled_included = true;
|
||||
true
|
||||
}
|
||||
Some(State::Excluded) => {
|
||||
// don't include excluded things
|
||||
false
|
||||
}
|
||||
Some(State::Labelled(label)) => {
|
||||
// include things only if we want the label
|
||||
included_labels.contains(&label)
|
||||
}
|
||||
Some(State::Split) => {
|
||||
// no point retaining this directory if its children aren't going to be!
|
||||
assert!(
|
||||
node.is_dir(),
|
||||
"Non-directories should not be labelled for Split!"
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
}) {
|
||||
info!("Empty filter. Stopping.");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if unlabelled_included {
|
||||
warn!("Unlabelled nodes. They have been included for safety, but you should consider running\n\t'datman ilabel {}'\nat some point to assign labels.", source_name);
|
||||
}
|
||||
|
||||
let root = convert_filetree_to_yamatree(&tree);
|
||||
|
||||
Ok(Some(root))
|
||||
}
|
||||
|
||||
pub fn backup_source_to_destination(
|
||||
source: &SourceDescriptor,
|
||||
dest: &DestPileDescriptor,
|
||||
@ -81,59 +140,17 @@ pub fn backup_source_to_destination(
|
||||
info!("Scanning.");
|
||||
let tree = scan(directory)?.ok_or_else(|| anyhow!("Source does not exist."))?;
|
||||
|
||||
info!("Labelling.");
|
||||
let mut tree = tree.replace_meta(&None);
|
||||
let labels = descriptor
|
||||
.labels
|
||||
.iter()
|
||||
.map(|l| Label(ArcIntern::new(l.clone())))
|
||||
.collect();
|
||||
let rules = load_labelling_rules(desc_path, source_name)?;
|
||||
label_node("".to_owned(), None, &mut tree, &labels, &rules)?;
|
||||
|
||||
let included_labels: HashSet<Label> =
|
||||
dest.included_labels.iter().map(str_to_label).collect();
|
||||
|
||||
info!("Filtering.");
|
||||
let mut unlabelled_included = false;
|
||||
if !tree.filter_inclusive(&mut |node| {
|
||||
match node.get_metadata().unwrap() {
|
||||
None => {
|
||||
// unlabelled -- include by default for safety
|
||||
unlabelled_included = true;
|
||||
true
|
||||
}
|
||||
Some(State::Excluded) => {
|
||||
// don't include excluded things
|
||||
false
|
||||
}
|
||||
Some(State::Labelled(label)) => {
|
||||
// include things only if we want the label
|
||||
included_labels.contains(&label)
|
||||
}
|
||||
Some(State::Split) => {
|
||||
// no point retaining this directory if its children aren't going to be!
|
||||
assert!(
|
||||
node.is_dir(),
|
||||
"Non-directories should not be labelled for Split!"
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
}) {
|
||||
info!("Empty filter. Stopping.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if unlabelled_included {
|
||||
warn!("Unlabelled nodes. They have been included for safety, but you should consider running\n\t'datman ilabel {}'\nat some point to assign labels.", source_name);
|
||||
}
|
||||
|
||||
let absolute_source_path = desc_path.join(directory);
|
||||
let absolute_dest_path = desc_path.join(&dest.path);
|
||||
let pile_descriptor = load_pile_descriptor(&absolute_dest_path)?;
|
||||
let pile = open_pile(&absolute_dest_path, &pile_descriptor)?;
|
||||
let root = convert_filetree_to_yamatree(&tree);
|
||||
let root = if let Some(root) =
|
||||
label_filter_and_convert(tree, descriptor, desc_path, source_name, dest)?
|
||||
{
|
||||
root
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let pointer_name = get_pointer_name_at(&source_name, Utc::now());
|
||||
|
||||
|
@ -1,14 +1,23 @@
|
||||
use crate::commands::backup::{get_pointer_name_at, label_filter_and_convert};
|
||||
use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor};
|
||||
use crate::tree::FileTree;
|
||||
use anyhow::bail;
|
||||
use anyhow::{anyhow, bail};
|
||||
use chrono::Utc;
|
||||
use log::info;
|
||||
use std::io::{Read, Write};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::sync::Arc;
|
||||
use yama::commands::{load_pile_descriptor, open_pile};
|
||||
use yama::definitions::TreeNode;
|
||||
use yama::pile::RawPile;
|
||||
use yama::pile::{Pile, RawPile};
|
||||
use yama::remote::responder::Responder;
|
||||
use yama::remote::{read_message, write_message};
|
||||
use yama::utils::get_number_of_workers;
|
||||
|
||||
// SECURITY WARNING: the system you connect to using this mechanism will receive full access to
|
||||
// your Yama pile. Do NOT connect to untrusted or compromised systems using this mechanism (yet).
|
||||
|
||||
pub fn introduction<R: Read, W: Write>(read: &mut R, write: &mut W) -> anyhow::Result<()> {
|
||||
let version = env!("CARGO_PKG_VERSION");
|
||||
write_message(
|
||||
@ -63,3 +72,105 @@ pub fn chunking<R: Read + Send + 'static, W: Write + Send + 'static, RP: RawPile
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn backup_remote_source_to_destination(
|
||||
source: &SourceDescriptor,
|
||||
dest: &DestPileDescriptor,
|
||||
descriptor: &Descriptor,
|
||||
desc_path: &Path,
|
||||
source_name: &str,
|
||||
dest_name: &str,
|
||||
user_at_host: &str,
|
||||
_num_workers: u8,
|
||||
) -> anyhow::Result<()> {
|
||||
match source {
|
||||
SourceDescriptor::DirectorySource {
|
||||
hostname: _,
|
||||
directory,
|
||||
} => {
|
||||
info!(
|
||||
"Looking to backup {} (from {}) to {}",
|
||||
source_name, user_at_host, dest_name
|
||||
);
|
||||
|
||||
let connection = Command::new("ssh")
|
||||
.arg(user_at_host)
|
||||
.arg("--")
|
||||
.arg("datman")
|
||||
.arg("_backup_source_responder")
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()?;
|
||||
|
||||
let mut read = connection.stdout.expect("Requested stdout");
|
||||
let mut write = connection.stdin.expect("Requested stdin");
|
||||
|
||||
// first start off with an introduction
|
||||
info!("Connecting...");
|
||||
introduction(&mut read, &mut write)?;
|
||||
|
||||
// then request to scan
|
||||
info!("Requesting scan... (this may take some time)");
|
||||
let scan_result = scanning(&mut read, &mut write, directory.as_ref())?
|
||||
.ok_or_else(|| anyhow!("Remote scan failed (does the directory exist?)"))?;
|
||||
|
||||
let root =
|
||||
label_filter_and_convert(scan_result, descriptor, desc_path, source_name, dest)?
|
||||
.ok_or_else(|| anyhow!("Empty filter..."))?;
|
||||
|
||||
let absolute_dest_path = desc_path.join(&dest.path);
|
||||
let pile_descriptor = load_pile_descriptor(&absolute_dest_path)?;
|
||||
let pile = open_pile(&absolute_dest_path, &pile_descriptor)?;
|
||||
|
||||
let pointer_name = get_pointer_name_at(&source_name, Utc::now());
|
||||
|
||||
info!("Will write as pointer {:?}.", pointer_name);
|
||||
|
||||
info!("Searching for suitable parents.");
|
||||
let mut parent: Option<String> = None;
|
||||
let prefix = format!("{}+", source_name);
|
||||
for pointer in pile.list_pointers()?.iter() {
|
||||
if pointer.starts_with(&prefix) {
|
||||
match parent.as_ref() {
|
||||
None => {
|
||||
parent = Some(pointer.to_owned());
|
||||
}
|
||||
Some(cur_parent) => {
|
||||
if cur_parent < pointer {
|
||||
parent = Some(pointer.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match parent.as_ref() {
|
||||
Some(parent) => {
|
||||
info!("Using parent: {}", parent);
|
||||
}
|
||||
None => {
|
||||
info!("No suitable parent found.");
|
||||
}
|
||||
}
|
||||
|
||||
info!("Storing remote using Yama (this may take some time)...");
|
||||
|
||||
let raw_pile = Arc::new(pile.raw_pile);
|
||||
let pile = Pile::new(raw_pile.clone());
|
||||
|
||||
chunking(read, write, pointer_name.clone(), &root, raw_pile)?;
|
||||
|
||||
info!("Stored! Checking for existence...");
|
||||
if pile.list_pointers()?.contains(&pointer_name) {
|
||||
info!("Exists!");
|
||||
} else {
|
||||
bail!("Pointer {:?} does not exist...", &pointer_name);
|
||||
}
|
||||
}
|
||||
SourceDescriptor::VirtualSource { .. } => {
|
||||
unimplemented!("Can't currently back up virtualsources on remotes...")
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user