overhaul: streaming extract support
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/release Pipeline was successful Details

This commit is contained in:
Olivier 'reivilibre' 2023-05-04 23:56:35 +01:00
parent 00dec17da0
commit 8e5649597b
2 changed files with 129 additions and 6 deletions

View File

@ -16,7 +16,7 @@ along with Yama. If not, see <https://www.gnu.org/licenses/>.
*/
use clap::{Parser, Subcommand};
use eyre::{bail, ensure, eyre, Context, ContextCompat};
use eyre::{bail, eyre, Context, ContextCompat};
use patricia_tree::PatriciaMap;
use std::borrow::Cow;
use std::iter::Iterator;
@ -179,9 +179,11 @@ pub enum YamaCommand {
Extract {
source: PileAndPointerWithSubTree,
destination: PathBuf,
},
#[arg(long)]
stdout: bool,
/// Extract an output stream from a Yama pile.
ExtractStdout {
source: PileAndPointerWithSubTree,
},
// TODO Mount { ... },
@ -582,9 +584,7 @@ async fn main() -> eyre::Result<()> {
YamaCommand::Extract {
source,
destination,
stdout,
} => {
ensure!(!stdout, "stdout not supported yet");
let pile_connector_path = source
.pile_path
.as_ref()
@ -640,7 +640,78 @@ async fn main() -> eyre::Result<()> {
.await?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
}
},
YamaCommand::ExtractStdout {
source,
} => {
let pile_connector_path = source
.pile_path
.as_ref()
.map(|p| p.as_ref())
.unwrap_or(Path::new("."));
let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?;
let pwc = Arc::new(open_pile(
&pile_connector_path,
keyring,
LockKind::Shared,
format!("{} store {:?}", get_hostname(), source.pointer),
)
.await?);
update_cache(&pwc).await?;
let pointer = pwc
.read_pointer_fully_integrated(source.pointer.0.as_str())
.await
.context("failed to read pointer")?
.with_context(|| {
format!(
"it appears that the pointer {:?} does not exist",
source.pointer
)
})?;
assert!(pointer.parent.is_none());
let node = if source.sub_tree.is_empty() {
&pointer.root.node
} else {
let mut current = &pointer.root.node;
for subpath in source.sub_tree.split('/') {
if let TreeNode::Directory { children, .. } = current {
current = children.get(subpath).with_context(|| {
format!("can't descend into {subpath}: doesn't exist in directory.")
})?;
} else {
bail!("can't descend into {subpath}; parent isn't a directory...");
}
}
current
};
let chunkref = match node {
TreeNode::NormalFile { content, .. } => {
content
}
TreeNode::Directory { .. } => {
bail!("Can't extract `Directory` to stdout!");
}
TreeNode::SymbolicLink { .. } => {
bail!("Can't extract `SymbolicLink` to stdout!");
}
TreeNode::Deleted => {
bail!("Can't extract `Deleted` to stdout!");
}
};
let extract_span = info_span!("extract_files");
let stream = std::io::BufWriter::new(io_streams::StreamWriter::stdout().context("failed to open stdout")?);
extract::unpack_sync_stream(&pwc, *chunkref, stream)
.instrument(extract_span)
.await?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
},
_other => todo!(),
}

View File

@ -7,6 +7,7 @@ use patricia_tree::PatriciaMap;
use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet};
use std::fs::Permissions;
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::sync::Arc;
@ -235,6 +236,57 @@ pub async fn unpack_files(
}.instrument(unpack_span).await
}
pub async fn unpack_sync_stream(pwc: &Arc<PileWithCache<BoxedWormFileProvider>>,
chunkref: RecursiveChunkRef,
mut stream: impl Write,
) -> eyre::Result<()> {
let expanded_chunkrefs = expand_chunkrefs(
pwc,
vec![((), chunkref)].into_iter(),
)
.await?;
let total_chunks = expanded_chunkrefs.iter().map(|(_, cs)| cs.len() as u64).sum::<u64>();
let unpack_span = info_span!("unpack_files");
async move {
let unpack_span = Span::current();
unpack_span.pb_set_style(&ProgressStyle::default_bar().template(
PROGRESS_BAR_STYLE,
).unwrap());
unpack_span.pb_set_message("unpack");
unpack_span.pb_set_length(total_chunks);
let (file_part_retriever, _) =
lookup_chunkrefs_and_create_retriever(pwc, expanded_chunkrefs).await?;
let mut done = false;
while let Ok(next_part) = file_part_retriever.recv_async().await {
match next_part {
RetrieverResp::Blob { blob, .. } => {
tokio::task::block_in_place(|| {
stream.write_all(&blob)
}).context("Failed to write to output stream on Blob")?;
unpack_span.pb_inc(1);
}
RetrieverResp::JobComplete(_) => {
tokio::task::block_in_place(|| {
stream.flush()
}).context("Failed to flush output stream on JobComplete")?;
done = true;
}
}
}
if !done {
bail!("There were errors extracting.");
}
Ok(())
}.instrument(unpack_span).await
}
async fn file_unpacker_writer(path: PathBuf, permissions: FilesystemPermissions, restore_permissions: bool, rx: Receiver<Option<Vec<u8>>>) -> eyre::Result<()> {
let mut oo = OpenOptions::new();
oo.write(true).create_new(true);