This commit is contained in:
Olivier 'reivilibre' 2021-07-02 20:54:30 +01:00
parent 8a0ce6754c
commit 336b7bcb7f
27 changed files with 98 additions and 71 deletions

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use clap::Clap; use clap::Clap;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io::Write; use std::io::Write;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor, VirtualSourceKind}; use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor, VirtualSourceKind};
use crate::labelling::{label_node, load_labelling_rules, str_to_label, Label, State}; use crate::labelling::{label_node, load_labelling_rules, str_to_label, Label, State};
use crate::tree::{scan, FileTree}; use crate::tree::{scan, FileTree};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use crate::commands::backup::POINTER_DATETIME_FORMAT; use crate::commands::backup::POINTER_DATETIME_FORMAT;
use crate::descriptor::load_descriptor; use crate::descriptor::load_descriptor;
use anyhow::bail; use anyhow::bail;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::path::Path; use std::path::Path;
use anyhow::{anyhow, bail}; use anyhow::{anyhow, bail};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::io; use std::io;
use std::io::{StdinLock, Stdout, Write}; use std::io::{StdinLock, Stdout, Write};
use std::path::Path; use std::path::Path;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::fs::{read_link, symlink_metadata, DirEntry, Metadata}; use std::fs::{read_link, symlink_metadata, DirEntry, Metadata};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use anyhow::{bail, Context}; use anyhow::{bail, Context};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::cmp::min; use std::cmp::min;
use std::io; use std::io;
use std::io::{Cursor, Read, Write}; use std::io::{Cursor, Read, Write};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::fs::File; use std::fs::File;
use std::io; use std::io;
use std::io::{Read, Write}; use std::io::{Read, Write};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node}; use crate::commands::{fully_integrate_pointer_node, retrieve_tree_node, store_tree_node};
use crate::definitions::{FilesystemOwnership, FilesystemPermissions, TreeNode}; use crate::definitions::{FilesystemOwnership, FilesystemPermissions, TreeNode};
use crate::pile::{Pile, PileDescriptor, RawPile}; use crate::pile::{Pile, PileDescriptor, RawPile};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};

View File

@ -1,4 +1,4 @@
pub mod checking; pub mod checking;
pub mod extracting; pub mod extracting;
pub mod storing;
pub mod pushpull; pub mod pushpull;
pub mod storing;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use crate::chunking::RecursiveUnchunker; use crate::chunking::RecursiveUnchunker;
use crate::commands::retrieve_tree_node; use crate::commands::retrieve_tree_node;
use crate::definitions::{ChunkId, TreeNode}; use crate::definitions::{ChunkId, TreeNode};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::convert::TryInto; use std::convert::TryInto;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;

View File

@ -1,27 +1,35 @@
use crate::pile::{RawPile, PileDescriptor, PileStorage, Pile, Keyspace}; use crate::chunking::RecursiveUnchunker;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use crate::commands::fully_load_pointer;
use std::path::Path; use crate::definitions::{ChunkId, RecursiveChunkRef, TreeNode};
use log::error; use crate::operations::checking::VacuumRawPile;
use crate::utils::get_number_of_workers; use crate::operations::pushpull::PushWorkerToManagerMessage::{NewTask, TaskDone};
use crate::pile::compression::{CompressionSettings, RawPileCompressor};
use crate::pile::integrity::RawPileIntegrityChecker; use crate::pile::integrity::RawPileIntegrityChecker;
use crate::pile::local_sqlitebloblogs::SqliteBloblogPile; use crate::pile::local_sqlitebloblogs::SqliteBloblogPile;
use crate::pile::compression::{RawPileCompressor, CompressionSettings}; use crate::pile::{Keyspace, Pile, PileDescriptor, PileStorage, RawPile};
use std::sync::Arc; use crate::utils::get_number_of_workers;
use std::fs::File;
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use std::io::Read;
use crate::commands::fully_load_pointer;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use crate::definitions::{RecursiveChunkRef, TreeNode, ChunkId}; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use std::sync::atomic::{Ordering, AtomicU32}; use log::error;
use crate::operations::pushpull::PushWorkerToManagerMessage::{NewTask, TaskDone};
use crate::operations::checking::VacuumRawPile;
use crate::chunking::RecursiveUnchunker;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
/// Pushes chunks (and pointers) from one pile to another. /// Pushes chunks (and pointers) from one pile to another.
/// This is a thorough implementation that could be slow but at least should give good confidence. /// This is a thorough implementation that could be slow but at least should give good confidence.
pub fn push_to(from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>, from_rp_bypass: Arc<Box<dyn RawPile>>, to_pile: Arc<Pile<Box<dyn RawPile>>>, to_rp_bypass: Arc<Box<dyn RawPile>>, pointers: Vec<String>, make_progress_bar: bool, num_workers: u32) -> anyhow::Result<()> { pub fn push_to(
from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
from_rp_bypass: Arc<Box<dyn RawPile>>,
to_pile: Arc<Pile<Box<dyn RawPile>>>,
to_rp_bypass: Arc<Box<dyn RawPile>>,
pointers: Vec<String>,
make_progress_bar: bool,
num_workers: u32,
) -> anyhow::Result<()> {
let pbar = if make_progress_bar { let pbar = if make_progress_bar {
ProgressBar::with_draw_target( ProgressBar::with_draw_target(
1, // TODO 1, // TODO
@ -54,15 +62,21 @@ pub fn push_to(from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>, from_rp_bypass: Arc<
// copy across the pointer data // copy across the pointer data
to_pile.write_pointer(&pointer, &pointer_data)?; to_pile.write_pointer(&pointer, &pointer_data)?;
root_node.node.visit(&mut |node, _path| { root_node
match node { .node
TreeNode::NormalFile { content, .. } => { .visit(
to_process.push(content.clone()); &mut |node, _path| {
} match node {
_ => {} // nop TreeNode::NormalFile { content, .. } => {
} to_process.push(content.clone());
Ok(()) }
}, String::new()).expect("No fail"); _ => {} // nop
}
Ok(())
},
String::new(),
)
.expect("No fail");
} }
let initial_tasks = to_process.len() as u64; let initial_tasks = to_process.len() as u64;
@ -85,7 +99,14 @@ pub fn push_to(from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>, from_rp_bypass: Arc<
std::thread::Builder::new() std::thread::Builder::new()
.name(format!("yama pusher {}", worker_num)) .name(format!("yama pusher {}", worker_num))
.spawn(move || { .spawn(move || {
if let Err(e) = pusher_worker(from_pile, from_rp_bypass, to_pile, to_rp_bypass, jobs_rx, stat_tx) { if let Err(e) = pusher_worker(
from_pile,
from_rp_bypass,
to_pile,
to_rp_bypass,
jobs_rx,
stat_tx,
) {
error!("[critical!] Push worker {} FAILED: {:?}", worker_num, e); error!("[critical!] Push worker {} FAILED: {:?}", worker_num, e);
critical_failures.fetch_add(1, Ordering::Relaxed); critical_failures.fetch_add(1, Ordering::Relaxed);
} }
@ -94,14 +115,15 @@ pub fn push_to(from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>, from_rp_bypass: Arc<
} }
for task in to_process { for task in to_process {
stat_tx.send(NewTask(task)).expect("unbounded so should be able to send"); stat_tx
.send(NewTask(task))
.expect("unbounded so should be able to send");
} }
// must drop here for ending to happen // must drop here for ending to happen
drop(jobs_rx); drop(jobs_rx);
drop(stat_tx); drop(stat_tx);
if let Err(e) = pusher_manager(&pbar, stat_rx, jobs_tx) { if let Err(e) = pusher_manager(&pbar, stat_rx, jobs_tx) {
error!("[critical!] Push manager FAILED: {:?}", e); error!("[critical!] Push manager FAILED: {:?}", e);
critical_failures.fetch_add(1, Ordering::Relaxed); critical_failures.fetch_add(1, Ordering::Relaxed);
@ -115,7 +137,11 @@ enum PushWorkerToManagerMessage {
TaskDone, TaskDone,
} }
fn pusher_manager(pbar: &ProgressBar, update_receiver: Receiver<PushWorkerToManagerMessage>, job_queue: Sender<RecursiveChunkRef>) -> anyhow::Result<()> { fn pusher_manager(
pbar: &ProgressBar,
update_receiver: Receiver<PushWorkerToManagerMessage>,
job_queue: Sender<RecursiveChunkRef>,
) -> anyhow::Result<()> {
let mut outstanding = 0; let mut outstanding = 0;
let mut already_done = HashSet::new(); let mut already_done = HashSet::new();
while let Ok(status) = update_receiver.recv() { while let Ok(status) = update_receiver.recv() {
@ -139,9 +165,16 @@ fn pusher_manager(pbar: &ProgressBar, update_receiver: Receiver<PushWorkerToMana
Ok(()) Ok(())
} }
fn pusher_worker(from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>, from_rp_bypass: Arc<Box<dyn RawPile>>, to_pile: Arc<Pile<Box<dyn RawPile>>>, to_rp_bypass: Arc<Box<dyn RawPile>>, jobs_rx: Receiver<RecursiveChunkRef>, stat_tx: Sender<PushWorkerToManagerMessage>) -> anyhow::Result<()> { fn pusher_worker(
from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>,
from_rp_bypass: Arc<Box<dyn RawPile>>,
to_pile: Arc<Pile<Box<dyn RawPile>>>,
to_rp_bypass: Arc<Box<dyn RawPile>>,
jobs_rx: Receiver<RecursiveChunkRef>,
stat_tx: Sender<PushWorkerToManagerMessage>,
) -> anyhow::Result<()> {
while let Ok(job) = jobs_rx.recv() { while let Ok(job) = jobs_rx.recv() {
if ! to_pile.chunk_exists(&job.chunk_id)? { if !to_pile.chunk_exists(&job.chunk_id)? {
if let Some(bypass_chunk_data) = from_rp_bypass.read(Keyspace::Chunk, &job.chunk_id)? { if let Some(bypass_chunk_data) = from_rp_bypass.read(Keyspace::Chunk, &job.chunk_id)? {
to_rp_bypass.write(Keyspace::Chunk, &job.chunk_id, &bypass_chunk_data)?; to_rp_bypass.write(Keyspace::Chunk, &job.chunk_id, &bypass_chunk_data)?;
} else { } else {
@ -175,20 +208,30 @@ fn pusher_worker(from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>, from_rp_bypass: Ar
unchunker.read_exact(&mut chunk_id_buf[read_bytes..])?; unchunker.read_exact(&mut chunk_id_buf[read_bytes..])?;
} }
stat_tx.send(NewTask(RecursiveChunkRef { stat_tx
chunk_id: chunk_id_buf.clone(), .send(NewTask(RecursiveChunkRef {
depth: 0 chunk_id: chunk_id_buf.clone(),
})).expect("Should be able to send"); depth: 0,
}))
.expect("Should be able to send");
} }
// Then track the chunks that we read whilst doing the above // Then track the chunks that we read whilst doing the above
for needed_chunk_id in vacuum_pile.raw_pile.retrieved_chunks.lock().expect("Should be able to lock").iter() { for needed_chunk_id in vacuum_pile
.raw_pile
.retrieved_chunks
.lock()
.expect("Should be able to lock")
.iter()
{
if needed_chunk_id != &job.chunk_id { if needed_chunk_id != &job.chunk_id {
// only track them if they're not the same as the one on this job. // only track them if they're not the same as the one on this job.
stat_tx.send(NewTask(RecursiveChunkRef { stat_tx
chunk_id: needed_chunk_id.clone(), .send(NewTask(RecursiveChunkRef {
depth: 0 chunk_id: needed_chunk_id.clone(),
})).expect("Should be able to send"); depth: 0,
}))
.expect("Should be able to send");
} }
} }
} }
@ -200,10 +243,15 @@ fn pusher_worker(from_pile: Arc<Pile<Arc<Box<dyn RawPile>>>>, from_rp_bypass: Ar
pub enum BypassLevel { pub enum BypassLevel {
NoBypass, NoBypass,
CompressionBypass CompressionBypass,
} }
pub fn determine_bypass_level(desc1: &PileDescriptor, dir1: &Path, desc2: &PileDescriptor, dir2: &Path) -> anyhow::Result<BypassLevel> { pub fn determine_bypass_level(
desc1: &PileDescriptor,
dir1: &Path,
desc2: &PileDescriptor,
dir2: &Path,
) -> anyhow::Result<BypassLevel> {
if desc1.compression.is_some() && desc2.compression.is_some() { if desc1.compression.is_some() && desc2.compression.is_some() {
let mut dictionary1 = Vec::new(); let mut dictionary1 = Vec::new();
let dict_path1 = dir1.join("important_zstd.dict"); let dict_path1 = dir1.join("important_zstd.dict");
@ -232,7 +280,11 @@ pub fn determine_bypass_level(desc1: &PileDescriptor, dir1: &Path, desc2: &PileD
/// pile, which, for example, skips performing compression operations. /// pile, which, for example, skips performing compression operations.
/// ///
/// Return tuple: (actual pile, bypass raw pile) /// Return tuple: (actual pile, bypass raw pile)
pub fn open_pile_with_work_bypass(dir: &Path, desc: &PileDescriptor, bypass_level: BypassLevel) -> anyhow::Result<(Pile<Box<dyn RawPile>>, Box<dyn RawPile>)> { pub fn open_pile_with_work_bypass(
dir: &Path,
desc: &PileDescriptor,
bypass_level: BypassLevel,
) -> anyhow::Result<(Pile<Box<dyn RawPile>>, Box<dyn RawPile>)> {
let num_compressors = get_number_of_workers("YAMA_COMPRESSORS"); let num_compressors = get_number_of_workers("YAMA_COMPRESSORS");
let num_decompressors = get_number_of_workers("YAMA_DECOMPRESSORS"); let num_decompressors = get_number_of_workers("YAMA_DECOMPRESSORS");
@ -277,4 +329,4 @@ pub fn open_pile_with_work_bypass(dir: &Path, desc: &PileDescriptor, bypass_leve
} }
} }
} }
} }

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::fs::File; use std::fs::File;
use std::io; use std::io;
use std::io::ErrorKind; use std::io::ErrorKind;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::path::PathBuf; use std::path::PathBuf;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -23,7 +22,7 @@ use serde::{Deserialize, Serialize};
use crate::chunking::calculate_chunkid; use crate::chunking::calculate_chunkid;
use crate::definitions::{ChunkId, PointerData}; use crate::definitions::{ChunkId, PointerData};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::{Condvar, Mutex, Arc}; use std::sync::{Arc, Condvar, Mutex};
pub mod compression; pub mod compression;
pub mod encryption; pub mod encryption;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::thread::JoinHandle; use std::thread::JoinHandle;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use anyhow::anyhow; use anyhow::anyhow;
use sodiumoxide::crypto::secretbox; use sodiumoxide::crypto::secretbox;
use sodiumoxide::crypto::secretbox::{Key, Nonce, NONCEBYTES}; use sodiumoxide::crypto::secretbox::{Key, Nonce, NONCEBYTES};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::hash::Hasher; use std::hash::Hasher;
use thiserror::Error; use thiserror::Error;

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::convert::TryInto; use std::convert::TryInto;
use std::io::{Read, Write}; use std::io::{Read, Write};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::collections::btree_map::Entry; use std::collections::btree_map::Entry;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fs::{read_link, symlink_metadata, DirEntry, Metadata}; use std::fs::{read_link, symlink_metadata, DirEntry, Metadata};

View File

@ -15,7 +15,6 @@ You should have received a copy of the GNU General Public License
along with Yama. If not, see <https://www.gnu.org/licenses/>. along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::fmt::Write; use std::fmt::Write;
pub fn bytes_to_hexstring(chunkid: &[u8]) -> String { pub fn bytes_to_hexstring(chunkid: &[u8]) -> String {