overhaul: streaming store support
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/release Pipeline was successful Details

This commit is contained in:
Olivier 'reivilibre' 2023-05-03 23:50:55 +01:00
parent a8e1cc45ef
commit 00dec17da0
4 changed files with 367 additions and 37 deletions

169
Cargo.lock generated
View File

@ -49,6 +49,12 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "ambient-authority"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec8ad6edb4840b78c5c3d88de606b22252d552b55f3a4699fbb10fc070ec3049"
[[package]] [[package]]
name = "android_system_properties" name = "android_system_properties"
version = "0.1.5" version = "0.1.5"
@ -371,6 +377,47 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cap-fs-ext"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0c86006edbaf13bbe0cdf2d7492cff638cd24cd6b717fa2aadcab09b532353"
dependencies = [
"cap-primitives",
"cap-std",
"io-lifetimes",
"windows-sys 0.48.0",
]
[[package]]
name = "cap-primitives"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f377e5b016d3d2b9d150b8e8f711d88d42046b89294572d504596f19e59ca"
dependencies = [
"ambient-authority",
"fs-set-times",
"io-extras",
"io-lifetimes",
"ipnet",
"maybe-owned",
"rustix",
"windows-sys 0.48.0",
"winx",
]
[[package]]
name = "cap-std"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14bfc13243563bf62ee9a31b6659d2fc2bf20e75f2d3d58d87a0c420778e1399"
dependencies = [
"cap-primitives",
"io-extras",
"io-lifetimes",
"rustix",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.73" version = "1.0.73"
@ -766,6 +813,12 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]]
name = "duplex"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f4b4ccdcb95f0ced5ddc8e3dbac4a2f029e9433c5ee94e9b9d7c148c86ffcd4"
[[package]] [[package]]
name = "dust_style_filetree_display" name = "dust_style_filetree_display"
version = "0.8.5" version = "0.8.5"
@ -881,6 +934,17 @@ dependencies = [
"instant", "instant",
] ]
[[package]]
name = "fd-lock"
version = "3.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ae6b3d9530211fb3b12a95374b8b0823be812f53d09e18c5675c0146b09642"
dependencies = [
"cfg-if",
"rustix",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "fiat-crypto" name = "fiat-crypto"
version = "0.1.20" version = "0.1.20"
@ -930,6 +994,17 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fs-set-times"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7833d0f115a013d51c55950a3b09d30e4b057be9961b709acb9b5b17a1108861"
dependencies = [
"io-lifetimes",
"rustix",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.28" version = "0.3.28"
@ -1334,6 +1409,17 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "io-extras"
version = "0.17.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde93d48f0d9277f977a333eca8313695ddd5301dc96f7e02aeddcb0dd99096f"
dependencies = [
"io-lifetimes",
"os_pipe",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "io-lifetimes" name = "io-lifetimes"
version = "1.0.9" version = "1.0.9"
@ -1342,9 +1428,26 @@ checksum = "09270fd4fa1111bc614ed2246c7ef56239a3063d5be0d1ec3b589c505d400aeb"
dependencies = [ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"libc", "libc",
"os_pipe",
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
[[package]]
name = "io-streams"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b18f85497e7fd4b4d3ada035e29273dde90f5b188349fda32a7cb1bc4457afbe"
dependencies = [
"duplex",
"io-extras",
"io-lifetimes",
"memchr",
"os_pipe",
"parking",
"rustix",
"system-interface",
]
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.5.0" version = "2.5.0"
@ -1495,6 +1598,12 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "maybe-owned"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4"
[[package]] [[package]]
name = "md5" name = "md5"
version = "0.7.0" version = "0.7.0"
@ -1825,6 +1934,16 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "os_pipe"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177"
dependencies = [
"libc",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "ouroboros" name = "ouroboros"
version = "0.15.6" version = "0.15.6"
@ -1864,6 +1983,12 @@ dependencies = [
"libm", "libm",
] ]
[[package]]
name = "parking"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@ -2279,8 +2404,10 @@ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"errno", "errno",
"io-lifetimes", "io-lifetimes",
"itoa",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"once_cell",
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
@ -2542,6 +2669,19 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "socketpair"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "365cc8b798dfcc4f5518e75655521d47f089ded0ef7df335c637e30b6cc717de"
dependencies = [
"io-extras",
"io-lifetimes",
"rustix",
"uuid",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "spin" name = "spin"
version = "0.5.2" version = "0.5.2"
@ -2750,6 +2890,23 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "system-interface"
version = "0.25.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928ebd55ab758962e230f51ca63735c5b283f26292297c81404289cda5d78631"
dependencies = [
"bitflags 1.3.2",
"cap-fs-ext",
"fd-lock",
"io-lifetimes",
"os_pipe",
"rustix",
"socketpair",
"windows-sys 0.48.0",
"winx",
]
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.5.0" version = "3.5.0"
@ -3680,6 +3837,17 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "winx"
version = "0.35.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c52a121f0fbf9320d5f2a9a5d82f6cb7557eda5e8b47fc3e7f359ec866ae960"
dependencies = [
"bitflags 1.3.2",
"io-lifetimes",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "x25519-dalek" name = "x25519-dalek"
version = "2.0.0-rc.2" version = "2.0.0-rc.2"
@ -3713,6 +3881,7 @@ dependencies = [
"hostname", "hostname",
"ignore", "ignore",
"indicatif", "indicatif",
"io-streams",
"memmap2", "memmap2",
"patricia_tree", "patricia_tree",
"serde", "serde",

View File

@ -47,6 +47,7 @@ flume = "0.10.14"
async-recursion = "1.0.4" async-recursion = "1.0.4"
toml = "0.7.3" toml = "0.7.3"
io-streams = "0.14.3"
dust_style_filetree_display = "0.8.5" dust_style_filetree_display = "0.8.5"

View File

@ -23,6 +23,7 @@ use std::iter::Iterator;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use indicatif::ProgressStyle; use indicatif::ProgressStyle;
use tokio::io::{stdin, AsyncBufReadExt, BufReader}; use tokio::io::{stdin, AsyncBufReadExt, BufReader};
use tracing::{info, info_span, warn, Span, Instrument}; use tracing::{info, info_span, warn, Span, Instrument};
@ -32,12 +33,13 @@ use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
use users::{get_current_gid, get_current_uid};
use yama::extract::flatten_treenode; use yama::extract::flatten_treenode;
use yama::init::{generate_master_keyring, pack_keyring}; use yama::init::{generate_master_keyring, pack_keyring};
use yama::open::{open_keyring_interactive, open_pile, pre_open_keyring, update_cache}; use yama::open::{open_keyring_interactive, open_pile, pre_open_keyring, update_cache};
use yama::pile_connector::PileConnectionScheme; use yama::pile_connector::PileConnectionScheme;
use yama::scan::create_uidgid_lookup_tables; use yama::scan::create_uidgid_lookup_tables;
use yama::storing::{assemble_and_write_indices, StoragePipeline}; use yama::storing::{assemble_and_write_indices, StoragePipeline, StoringBloblogWriters, StoringState};
use yama::{extract, get_hostname, init, PROGRESS_BAR_STYLE, scan}; use yama::{extract, get_hostname, init, PROGRESS_BAR_STYLE, scan};
use yama_midlevel_crypto::byte_layer::{ByteLayer, CborSerde}; use yama_midlevel_crypto::byte_layer::{ByteLayer, CborSerde};
use yama_midlevel_crypto::chunk_id::ChunkIdKey; use yama_midlevel_crypto::chunk_id::ChunkIdKey;
@ -47,9 +49,7 @@ use yama_pile::definitions::{
use yama_pile::locks::LockKind; use yama_pile::locks::LockKind;
use yama_pile::pointers::Pointer; use yama_pile::pointers::Pointer;
use yama_pile::tree::unpopulated::ScanEntry; use yama_pile::tree::unpopulated::ScanEntry;
use yama_pile::tree::{ use yama_pile::tree::{assemble_tree_from_scan_entries, differentiate_node_in_place, FilesystemOwnership, FilesystemPermissions, RootTreeNode, TreeNode};
assemble_tree_from_scan_entries, differentiate_node_in_place, RootTreeNode, TreeNode,
};
use yama_pile::FILE_YAMA_CONNECTOR; use yama_pile::FILE_YAMA_CONNECTOR;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -147,14 +147,11 @@ pub enum YamaCommand {
#[command(subcommand)] #[command(subcommand)]
Keyring(KeyringCommand), Keyring(KeyringCommand),
/// Store a file, directory or input stream to a Yama pile. /// Store a file or directory to a Yama pile.
Store { Store {
source: PathBuf, source: PathBuf,
destination: PileAndPointer, destination: PileAndPointer,
#[arg(long)]
stdin: bool,
#[arg(long)] #[arg(long)]
overwrite: bool, overwrite: bool,
@ -165,6 +162,19 @@ pub enum YamaCommand {
parent: Option<PointerName>, parent: Option<PointerName>,
}, },
/// Store an input stream to the Yama pile.
StoreStdin {
destination: PileAndPointer,
#[arg(long)]
overwrite: bool,
/// A name to give to the file that the stream is stored as. Otherwise it will just be called
/// 'stream'.
#[arg(short = 'n', long)]
name: Option<String>,
},
/// Extract a file, directory or output stream from a Yama pile. /// Extract a file, directory or output stream from a Yama pile.
Extract { Extract {
source: PileAndPointerWithSubTree, source: PileAndPointerWithSubTree,
@ -172,9 +182,6 @@ pub enum YamaCommand {
#[arg(long)] #[arg(long)]
stdout: bool, stdout: bool,
#[arg(long)]
overwrite: bool,
}, },
// TODO Mount { ... }, // TODO Mount { ... },
@ -346,11 +353,9 @@ async fn main() -> eyre::Result<()> {
YamaCommand::Store { YamaCommand::Store {
source, source,
destination, destination,
stdin,
overwrite, overwrite,
parent, parent,
} => { } => {
ensure!(!stdin, "stdin not supported yet");
let pile_connector_path = destination.pile_path.unwrap_or(PathBuf::from(".")); let pile_connector_path = destination.pile_path.unwrap_or(PathBuf::from("."));
let keyring = pre_open_keyring(&pile_connector_path).await?; let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?; let keyring = open_keyring_interactive(keyring).await?;
@ -491,12 +496,93 @@ async fn main() -> eyre::Result<()> {
.context("failed to write pointer")?; .context("failed to write pointer")?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?; Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
} },
YamaCommand::StoreStdin {
destination,
overwrite,
name,
} => {
let pile_connector_path = destination.pile_path.unwrap_or(PathBuf::from("."));
let keyring = pre_open_keyring(&pile_connector_path).await?;
let keyring = open_keyring_interactive(keyring).await?;
let pwc = open_pile(
&pile_connector_path,
keyring,
LockKind::Shared,
format!("{} store {:?}", get_hostname(), destination.pointer),
)
.await?;
update_cache(&pwc).await?;
let pwc = Arc::new(pwc);
let store_span = info_span!("storing");
// store_span.pb_set_style(&ProgressStyle::default_bar());
// TODO INDETERMINATE PROGRESS BAR with bytes shown?
store_span.pb_set_style(&ProgressStyle::default_bar().template(
PROGRESS_BAR_STYLE,
).unwrap());
store_span.pb_set_message("storing files");
store_span.pb_set_length(1u64);
// TODO Dirty
let store_span_entered = store_span.enter();
let mut storing_state = StoringState::new(pwc.clone()).await.context("failed to create storing state")?;
let mut sbw = StoringBloblogWriters::default();
let stdin = std::io::BufReader::new(io_streams::StreamReader::stdin().context("failed to open stdin")?);
let (chunkref, size) = storing_state.store_full_stream(stdin, &mut sbw).context("Failed to store stream into Yama pile")?;
sbw.finish_bloblogs(&mut storing_state).await.context("Failed to finish bloblogs")?;
info!("Stream stored, writing indices...");
// Write indices for the new bloblogs we have created. This is a prerequisite for creating a pointer.
let chunkmaps = storing_state.new_bloblogs;
assemble_and_write_indices(&pwc, chunkmaps)
.await
.context("failed to assemble and write indices")?;
info!("All indices stored, writing pointer...");
// Assemble and write a pointer
let uid = get_current_uid() as u16;
let gid = get_current_gid() as u16;
let tree = TreeNode::NormalFile {
mtime: SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_millis() as u64).unwrap_or(0),
ownership: FilesystemOwnership { uid, gid },
permissions: FilesystemPermissions { mode: 0o600 },
size,
content: chunkref,
};
let (uids, gids) =
create_uidgid_lookup_tables(&tree).context("failed to create uid/gid tables")?;
pwc.pile
.write_pointer(
destination.pointer.0.as_str(),
overwrite,
&Pointer {
parent: None,
root: RootTreeNode {
name: name.unwrap_or_else(|| String::from("stream")),
node: tree,
},
uids,
gids,
},
)
.await
.context("failed to write pointer")?;
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
},
YamaCommand::Extract { YamaCommand::Extract {
source, source,
destination, destination,
stdout, stdout,
overwrite,
} => { } => {
ensure!(!stdout, "stdout not supported yet"); ensure!(!stdout, "stdout not supported yet");
let pile_connector_path = source let pile_connector_path = source
@ -555,7 +641,7 @@ async fn main() -> eyre::Result<()> {
Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?; Arc::try_unwrap(pwc).map_err(|_| eyre!("pwc still in use; can't close down gracefully"))?.close().await?;
} }
other => todo!(), _other => todo!(),
} }
Ok(()) Ok(())

View File

@ -1,10 +1,11 @@
use crate::pile_with_cache::PileWithCache; use crate::pile_with_cache::PileWithCache;
use dashmap::DashSet; use dashmap::DashSet;
use eyre::{bail, Context}; use eyre::{bail, Context};
use fastcdc::v2020::FastCDC; use fastcdc::v2020::{FastCDC, StreamCDC};
use flume::{Receiver, RecvError, SendError, Sender}; use flume::{Receiver, RecvError, SendError, Sender};
use std::cmp::Reverse; use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::io::Read;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
@ -45,6 +46,28 @@ pub struct StoringState {
pub compressor: zstd::bulk::Compressor<'static>, pub compressor: zstd::bulk::Compressor<'static>,
} }
impl StoringState {
pub async fn new(pwc: Arc<PileWithCache<BoxedWormFileProvider>>) -> eyre::Result<Self> {
let compressor = match pwc.pile.pile_config.zstd_dict.as_ref() {
None => {
Compressor::new(get_zstd_level()).context("can't create dictless compressor")?
}
Some(dict_bytes) => Compressor::with_dictionary(get_zstd_level(), dict_bytes)
.context("can't create dictful compressor")?,
};
let chunk_id_key = pwc.pile.pile_config.chunk_id_key;
Ok(StoringState {
cache_conn: pwc.localcache.read().await?,
new_unflushed_chunks: Arc::new(Default::default()),
new_bloblogs: vec![],
pwc,
chunk_id_key,
compressor,
})
}
}
struct StoringIntermediate { struct StoringIntermediate {
/// New bloblogs that we have created but not yet written out indices for. /// New bloblogs that we have created but not yet written out indices for.
pub new_bloblogs: Vec<(BloblogId, BTreeMap<ChunkId, BlobLocator>)>, pub new_bloblogs: Vec<(BloblogId, BTreeMap<ChunkId, BlobLocator>)>,
@ -68,7 +91,7 @@ pub struct StoringBloblogWriters {
} }
impl StoringBloblogWriters { impl StoringBloblogWriters {
async fn finish_bloblogs(&mut self, ss: &mut StoringState) -> eyre::Result<()> { pub async fn finish_bloblogs(&mut self, ss: &mut StoringState) -> eyre::Result<()> {
if let Some(writer_to_finish) = self.file_contents.take() { if let Some(writer_to_finish) = self.file_contents.take() {
let (_bloblog_path, bloblog_id, chunkmap) = writer_to_finish.finish().await?; let (_bloblog_path, bloblog_id, chunkmap) = writer_to_finish.finish().await?;
ss.new_bloblogs.push((bloblog_id, chunkmap)); ss.new_bloblogs.push((bloblog_id, chunkmap));
@ -140,6 +163,42 @@ impl StoringState {
}) })
} }
fn store_full_stream_returning_chunks(
&mut self,
store_stream: impl Read,
slot: &mut Option<BloblogWriter<Pin<Box<dyn WormFileWriter>>>>,
) -> eyre::Result<(Vec<ChunkId>, u64)> {
task::block_in_place(|| {
let mut stream_length = 0u64;
let mut result = Vec::new();
for chunk in StreamCDC::new(store_stream, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX) {
let chunk = chunk.context("failed to read in for StreamCDC")?;
let chunk_bytes = chunk.data.as_slice();
stream_length += chunk_bytes.len() as u64;
let chunk_id = ChunkId::compute(chunk_bytes, &self.chunk_id_key);
result.push(chunk_id);
let is_new = Handle::current().block_on(async {
Ok::<bool, eyre::Report>(
self.cache_conn.is_chunk_new(chunk_id).await?
&& self.new_unflushed_chunks.insert(chunk_id),
)
})?;
if is_new {
let compressed_bytes = self.compressor.compress(&chunk_bytes)?;
Handle::current().block_on(async {
let writer = self.obtain_bloblog_writer(slot).await?;
writer.write_chunk(chunk_id, &compressed_bytes).await?;
Ok::<(), eyre::Report>(())
})?;
}
}
Ok((result, stream_length))
})
}
pub fn store_full_slice( pub fn store_full_slice(
&mut self, &mut self,
store_slice: &[u8], store_slice: &[u8],
@ -170,6 +229,39 @@ impl StoringState {
depth, depth,
}) })
} }
/// Stores a full stream (`Read`) and returns the recursive chunk ref plus the length of the
/// stream.
pub fn store_full_stream(
&mut self,
store_stream: impl Read,
sbw: &mut StoringBloblogWriters,
) -> eyre::Result<(RecursiveChunkRef, u64)> {
// First calculate all the chunk IDs needed to be written here.
let (mut chunk_ids, stream_length) =
self.store_full_stream_returning_chunks(store_stream, &mut sbw.file_contents)?;
let mut depth = 0;
// If we have the wrong number of chunks, we should chunk the chunk list...
while chunk_ids.len() != 1 {
let mut metachunks_list_bytes: Vec<u8> = Vec::with_capacity(chunk_ids.len() * 32);
for chunk_id in chunk_ids {
metachunks_list_bytes.extend_from_slice(&chunk_id.to_bytes());
}
// TODO It might be nice to store these in opposite order, so a read is a true sequential
// scan.
// i.e. (depth=3) (depth=2) (depth=1) (depth=0) ...
chunk_ids = self
.store_full_slice_returning_chunks(&metachunks_list_bytes, &mut sbw.metachunks)?;
depth += 1;
}
Ok((RecursiveChunkRef {
chunk_id: chunk_ids[0],
depth,
}, stream_length))
}
} }
async fn store_file( async fn store_file(
@ -242,25 +334,7 @@ impl StoragePipeline {
for spw_num in 0..workers { for spw_num in 0..workers {
let job_rx = job_rx.clone(); let job_rx = job_rx.clone();
let result_tx = result_tx.clone(); let result_tx = result_tx.clone();
let pwc = pwc.clone(); let storing_state = StoringState::new(pwc.clone()).await.context("failed to create storing state")?;
let compressor = match pwc.pile.pile_config.zstd_dict.as_ref() {
None => {
Compressor::new(get_zstd_level()).context("can't create dictless compressor")?
}
Some(dict_bytes) => Compressor::with_dictionary(get_zstd_level(), dict_bytes)
.context("can't create dictful compressor")?,
};
let chunk_id_key = pwc.pile.pile_config.chunk_id_key;
let storing_state = StoringState {
cache_conn: pwc.localcache.read().await?,
new_unflushed_chunks: Arc::new(Default::default()),
new_bloblogs: vec![],
pwc,
chunk_id_key,
compressor,
};
// make a logging span for the Storage Pipeline Workers // make a logging span for the Storage Pipeline Workers
let spw_span = info_span!("spw", n = spw_num); let spw_span = info_span!("spw", n = spw_num);
join_set.spawn( join_set.spawn(