Add a small amount of async_backtrace tracking
This commit is contained in:
parent
a9379dba14
commit
2c14654d29
|
@ -168,6 +168,34 @@ version = "0.7.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
|
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-backtrace"
|
||||||
|
version = "0.2.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3a2a4168316d920764f19e516d4d0f5b079ecdf0ced2ea2ef08a266102a38802"
|
||||||
|
dependencies = [
|
||||||
|
"async-backtrace-attributes",
|
||||||
|
"dashmap",
|
||||||
|
"futures",
|
||||||
|
"itertools",
|
||||||
|
"loom",
|
||||||
|
"once_cell",
|
||||||
|
"pin-project-lite",
|
||||||
|
"rustc-hash",
|
||||||
|
"static_assertions",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-backtrace-attributes"
|
||||||
|
version = "0.2.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "840d2e9edec91ac974365978efc6f00781ff497e706a12306fff29ae92f8ad46"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.13",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-recursion"
|
name = "async-recursion"
|
||||||
version = "1.0.4"
|
version = "1.0.4"
|
||||||
|
@ -752,6 +780,7 @@ dependencies = [
|
||||||
name = "datman"
|
name = "datman"
|
||||||
version = "0.7.0-alpha.1"
|
version = "0.7.0-alpha.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-backtrace",
|
||||||
"bytesize",
|
"bytesize",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
|
@ -1129,6 +1158,19 @@ dependencies = [
|
||||||
"slab",
|
"slab",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "generator"
|
||||||
|
version = "0.7.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e"
|
||||||
|
dependencies = [
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
"log",
|
||||||
|
"rustversion",
|
||||||
|
"windows",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "generic-array"
|
name = "generic-array"
|
||||||
version = "0.14.7"
|
version = "0.14.7"
|
||||||
|
@ -1586,6 +1628,19 @@ dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "loom"
|
||||||
|
version = "0.5.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"generator",
|
||||||
|
"scoped-tls",
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lscolors"
|
name = "lscolors"
|
||||||
version = "0.13.0"
|
version = "0.13.0"
|
||||||
|
@ -2427,6 +2482,12 @@ version = "0.1.23"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
|
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc-hash"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustix"
|
name = "rustix"
|
||||||
version = "0.37.6"
|
version = "0.37.6"
|
||||||
|
@ -2464,6 +2525,12 @@ dependencies = [
|
||||||
"base64 0.21.0",
|
"base64 0.21.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustversion"
|
||||||
|
version = "1.0.14"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rxml"
|
name = "rxml"
|
||||||
version = "0.8.2"
|
version = "0.8.2"
|
||||||
|
@ -2507,6 +2574,12 @@ dependencies = [
|
||||||
"windows-sys 0.42.0",
|
"windows-sys 0.42.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "scoped-tls"
|
||||||
|
version = "1.0.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scopeguard"
|
name = "scopeguard"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
@ -2652,6 +2725,16 @@ dependencies = [
|
||||||
"winapi-build",
|
"winapi-build",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "signal-hook"
|
||||||
|
version = "0.3.17"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"signal-hook-registry",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
|
@ -3903,6 +3986,7 @@ name = "yama"
|
||||||
version = "0.7.0-alpha.1"
|
version = "0.7.0-alpha.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"appdirs",
|
"appdirs",
|
||||||
|
"async-backtrace",
|
||||||
"async-recursion",
|
"async-recursion",
|
||||||
"clap",
|
"clap",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
|
@ -3918,6 +4002,7 @@ dependencies = [
|
||||||
"memmap2",
|
"memmap2",
|
||||||
"patricia_tree",
|
"patricia_tree",
|
||||||
"serde",
|
"serde",
|
||||||
|
"signal-hook",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -3939,6 +4024,7 @@ dependencies = [
|
||||||
name = "yama_localcache"
|
name = "yama_localcache"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-backtrace",
|
||||||
"eyre",
|
"eyre",
|
||||||
"itertools",
|
"itertools",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
|
@ -3971,6 +4057,7 @@ dependencies = [
|
||||||
name = "yama_pile"
|
name = "yama_pile"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-backtrace",
|
||||||
"backtrace",
|
"backtrace",
|
||||||
"chrono",
|
"chrono",
|
||||||
"eyre",
|
"eyre",
|
||||||
|
@ -4028,6 +4115,7 @@ dependencies = [
|
||||||
name = "yama_wormfile_sftp"
|
name = "yama_wormfile_sftp"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-backtrace",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"eyre",
|
"eyre",
|
||||||
"openssh",
|
"openssh",
|
||||||
|
|
|
@ -35,4 +35,5 @@ yama_wormfile = { path = "../yama_wormfile" }
|
||||||
#yama_wormfile_sftp = { path = "../yama_wormfile_sftp" }
|
#yama_wormfile_sftp = { path = "../yama_wormfile_sftp" }
|
||||||
yama_midlevel_crypto = { path = "../yama_midlevel_crypto" }
|
yama_midlevel_crypto = { path = "../yama_midlevel_crypto" }
|
||||||
|
|
||||||
patricia_tree = "0.5.7"
|
patricia_tree = "0.5.7"
|
||||||
|
async-backtrace = "0.2.6"
|
|
@ -66,14 +66,12 @@ pub async fn backup(
|
||||||
let pwc = pwc.clone();
|
let pwc = pwc.clone();
|
||||||
|
|
||||||
let bds_span = info_span!("storing");
|
let bds_span = info_span!("storing");
|
||||||
tokio::spawn(
|
tokio::spawn(async_backtrace::frame!(async move {
|
||||||
async move {
|
backup_dir_sources(dir_sources, pwc, new_unflushed_chunks)
|
||||||
backup_dir_sources(dir_sources, pwc, new_unflushed_chunks)
|
.await
|
||||||
.await
|
.context("whilst backing up dir sources")
|
||||||
.context("whilst backing up dir sources")
|
}
|
||||||
}
|
.instrument(bds_span)))
|
||||||
.instrument(bds_span),
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// (virtual source streams) Store to bloblog writers
|
// (virtual source streams) Store to bloblog writers
|
||||||
|
@ -82,14 +80,12 @@ pub async fn backup(
|
||||||
let new_unflushed_chunks = new_unflushed_chunks.clone();
|
let new_unflushed_chunks = new_unflushed_chunks.clone();
|
||||||
let pwc = pwc.clone();
|
let pwc = pwc.clone();
|
||||||
|
|
||||||
tokio::spawn(
|
tokio::spawn(async_backtrace::frame!(async move {
|
||||||
async move {
|
backup_virtual_sources(&sources_to_backup, now, pwc, new_unflushed_chunks)
|
||||||
backup_virtual_sources(&sources_to_backup, now, pwc, new_unflushed_chunks)
|
.await
|
||||||
.await
|
.context("whilst backing up virtual sources")
|
||||||
.context("whilst backing up virtual sources")
|
}
|
||||||
}
|
.instrument(bvs_span)))
|
||||||
.instrument(bvs_span),
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let (dir_sources_and_chunkmaps, virt_sources) =
|
let (dir_sources_and_chunkmaps, virt_sources) =
|
||||||
|
|
|
@ -31,6 +31,7 @@ use tracing_subscriber::filter::filter_fn;
|
||||||
use tracing_subscriber::layer::SubscriberExt;
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
use tracing_subscriber::util::SubscriberInitExt;
|
use tracing_subscriber::util::SubscriberInitExt;
|
||||||
use tracing_subscriber::Layer;
|
use tracing_subscriber::Layer;
|
||||||
|
use yama::debugging::register_sigusr1_backtrace_helper;
|
||||||
use yama::get_hostname;
|
use yama::get_hostname;
|
||||||
use yama::open::open_lock_and_update_cache;
|
use yama::open::open_lock_and_update_cache;
|
||||||
|
|
||||||
|
@ -171,6 +172,8 @@ pub async fn main() -> eyre::Result<()> {
|
||||||
.with(indicatif_layer)
|
.with(indicatif_layer)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
|
register_sigusr1_backtrace_helper();
|
||||||
|
|
||||||
let args: DatmanArgs = dbg!(DatmanArgs::parse());
|
let args: DatmanArgs = dbg!(DatmanArgs::parse());
|
||||||
|
|
||||||
let descriptor = load_descriptor(&args.config)
|
let descriptor = load_descriptor(&args.config)
|
||||||
|
|
|
@ -51,7 +51,8 @@ io-streams = "0.14.3"
|
||||||
|
|
||||||
|
|
||||||
dust_style_filetree_display = "0.8.5"
|
dust_style_filetree_display = "0.8.5"
|
||||||
|
async-backtrace = "0.2.6"
|
||||||
|
signal-hook = "0.3.17"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
maplit = "1.0.2"
|
maplit = "1.0.2"
|
|
@ -34,6 +34,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||||
use tracing_subscriber::util::SubscriberInitExt;
|
use tracing_subscriber::util::SubscriberInitExt;
|
||||||
use tracing_subscriber::Layer;
|
use tracing_subscriber::Layer;
|
||||||
use users::{get_current_gid, get_current_uid};
|
use users::{get_current_gid, get_current_uid};
|
||||||
|
use yama::debugging::register_sigusr1_backtrace_helper;
|
||||||
use yama::extract::flatten_treenode;
|
use yama::extract::flatten_treenode;
|
||||||
use yama::init::pack_keyring;
|
use yama::init::pack_keyring;
|
||||||
use yama::open::{
|
use yama::open::{
|
||||||
|
@ -293,6 +294,8 @@ async fn main() -> eyre::Result<()> {
|
||||||
.with(indicatif_layer)
|
.with(indicatif_layer)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
|
register_sigusr1_backtrace_helper();
|
||||||
|
|
||||||
match dbg!(YamaCommand::parse()) {
|
match dbg!(YamaCommand::parse()) {
|
||||||
YamaCommand::Init {
|
YamaCommand::Init {
|
||||||
sftp,
|
sftp,
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
use tokio::signal::unix::SignalKind;
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
|
/// Registers a signal handler on SIGUSR1 that dumps a backtrace of the tokio task tree.
|
||||||
|
///
|
||||||
|
/// May be useful for debugging deadlocks etc.
|
||||||
|
pub fn register_sigusr1_backtrace_helper() {
|
||||||
|
tokio::spawn(async {
|
||||||
|
while let Some(()) = tokio::signal::unix::signal(SignalKind::user_defined1())
|
||||||
|
.unwrap()
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
warn!(
|
||||||
|
"SIGUSR1 received; debug task backtrace:\n{}",
|
||||||
|
async_backtrace::taskdump_tree(false)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
|
@ -21,3 +21,5 @@ pub fn get_hostname() -> String {
|
||||||
.into_string()
|
.into_string()
|
||||||
.expect("Hostname string must be sensible.")
|
.expect("Hostname string must be sensible.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod debugging;
|
||||||
|
|
|
@ -108,11 +108,11 @@ pub fn create_fixed_retriever(
|
||||||
rint.set_up_job(job_id, job);
|
rint.set_up_job(job_id, job);
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async_backtrace::frame!(async move {
|
||||||
if let Err(e) = rint.retrieval_task().await {
|
if let Err(e) = rint.retrieval_task().await {
|
||||||
error!("retriever failed: {e:?}");
|
error!("retriever failed: {e:?}");
|
||||||
}
|
}
|
||||||
});
|
}));
|
||||||
|
|
||||||
Ok(results_rx)
|
Ok(results_rx)
|
||||||
}
|
}
|
||||||
|
@ -165,13 +165,13 @@ impl RetrieverInternals {
|
||||||
|
|
||||||
let (subjob_tx, subjob_rx) = flume::unbounded();
|
let (subjob_tx, subjob_rx) = flume::unbounded();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async_backtrace::frame!(async move {
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
Self::reader_task(bloblog_reader, subjob_rx, ack_tx, completion_tx).await
|
Self::reader_task(bloblog_reader, subjob_rx, ack_tx, completion_tx).await
|
||||||
{
|
{
|
||||||
error!("error in reader for {bloblog_id:?}: {e:?}");
|
error!("error in reader for {bloblog_id:?}: {e:?}");
|
||||||
}
|
}
|
||||||
});
|
}));
|
||||||
|
|
||||||
self.open_files.insert(
|
self.open_files.insert(
|
||||||
file_id,
|
file_id,
|
||||||
|
|
|
@ -58,11 +58,11 @@ impl PipelineDecompressor {
|
||||||
processing: Default::default(),
|
processing: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async_backtrace::frame!(async move {
|
||||||
if let Err(e) = pd.decompressor_manager().await {
|
if let Err(e) = pd.decompressor_manager().await {
|
||||||
eprintln!("pipeline decompressor error: {e:?}");
|
eprintln!("pipeline decompressor error: {e:?}");
|
||||||
}
|
}
|
||||||
});
|
}));
|
||||||
|
|
||||||
Ok(out_rx)
|
Ok(out_rx)
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,6 +112,7 @@ impl StoringBloblogWriters {
|
||||||
|
|
||||||
impl StoringState {
|
impl StoringState {
|
||||||
/// Acquire a bloblog writer handle, reusing the existing one in the slot if suitable.
|
/// Acquire a bloblog writer handle, reusing the existing one in the slot if suitable.
|
||||||
|
#[async_backtrace::framed]
|
||||||
async fn obtain_bloblog_writer<'a>(
|
async fn obtain_bloblog_writer<'a>(
|
||||||
&mut self,
|
&mut self,
|
||||||
slot: &'a mut Option<BloblogWriter<Pin<Box<dyn WormFileWriter>>>>,
|
slot: &'a mut Option<BloblogWriter<Pin<Box<dyn WormFileWriter>>>>,
|
||||||
|
@ -136,6 +137,7 @@ impl StoringState {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// For internal use only.
|
/// For internal use only.
|
||||||
|
#[async_backtrace::framed]
|
||||||
fn process_chunk(
|
fn process_chunk(
|
||||||
&mut self,
|
&mut self,
|
||||||
chunk_bytes: &[u8],
|
chunk_bytes: &[u8],
|
||||||
|
@ -164,6 +166,7 @@ impl StoringState {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_backtrace::framed]
|
||||||
fn store_full_slice_returning_chunks(
|
fn store_full_slice_returning_chunks(
|
||||||
&mut self,
|
&mut self,
|
||||||
store_slice: &[u8],
|
store_slice: &[u8],
|
||||||
|
@ -185,6 +188,7 @@ impl StoringState {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_backtrace::framed]
|
||||||
fn store_full_stream_returning_chunks(
|
fn store_full_stream_returning_chunks(
|
||||||
&mut self,
|
&mut self,
|
||||||
store_stream: impl Read,
|
store_stream: impl Read,
|
||||||
|
@ -208,6 +212,7 @@ impl StoringState {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_backtrace::framed]
|
||||||
pub fn store_full_slice(
|
pub fn store_full_slice(
|
||||||
&mut self,
|
&mut self,
|
||||||
store_slice: &[u8],
|
store_slice: &[u8],
|
||||||
|
@ -241,6 +246,7 @@ impl StoringState {
|
||||||
|
|
||||||
/// Stores a full stream (`Read`) and returns the recursive chunk ref plus the length of the
|
/// Stores a full stream (`Read`) and returns the recursive chunk ref plus the length of the
|
||||||
/// stream.
|
/// stream.
|
||||||
|
#[async_backtrace::framed]
|
||||||
pub fn store_full_stream(
|
pub fn store_full_stream(
|
||||||
&mut self,
|
&mut self,
|
||||||
store_stream: impl Read,
|
store_stream: impl Read,
|
||||||
|
@ -278,6 +284,7 @@ impl StoringState {
|
||||||
|
|
||||||
/// Stores a file, returning Ok(Some(...)) if fine, Ok(None) if the file doesn't exist (vanished)
|
/// Stores a file, returning Ok(Some(...)) if fine, Ok(None) if the file doesn't exist (vanished)
|
||||||
/// or Err(...) for any other error.
|
/// or Err(...) for any other error.
|
||||||
|
#[async_backtrace::framed]
|
||||||
async fn store_file(
|
async fn store_file(
|
||||||
file_path: &Path,
|
file_path: &Path,
|
||||||
storing_state: &mut StoringState,
|
storing_state: &mut StoringState,
|
||||||
|
@ -304,6 +311,7 @@ pub struct StoragePipeline<JobName> {
|
||||||
join_set: JoinSet<eyre::Result<StoringIntermediate>>,
|
join_set: JoinSet<eyre::Result<StoringIntermediate>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_backtrace::framed]
|
||||||
async fn storage_pipeline_worker<JobName: Debug>(
|
async fn storage_pipeline_worker<JobName: Debug>(
|
||||||
job_rx: Receiver<(JobName, PathBuf)>,
|
job_rx: Receiver<(JobName, PathBuf)>,
|
||||||
result_tx: Sender<(JobName, Option<(RecursiveChunkRef, u64)>)>,
|
result_tx: Sender<(JobName, Option<(RecursiveChunkRef, u64)>)>,
|
||||||
|
@ -362,16 +370,14 @@ impl<JobName: Debug + Send + 'static> StoragePipeline<JobName> {
|
||||||
.context("failed to create storing state")?;
|
.context("failed to create storing state")?;
|
||||||
// make a logging span for the Storage Pipeline Workers
|
// make a logging span for the Storage Pipeline Workers
|
||||||
let spw_span = info_span!("spw", n = spw_num);
|
let spw_span = info_span!("spw", n = spw_num);
|
||||||
join_set.spawn(
|
join_set.spawn(async_backtrace::frame!(async move {
|
||||||
async move {
|
let result = storage_pipeline_worker(job_rx, result_tx, storing_state).await;
|
||||||
let result = storage_pipeline_worker(job_rx, result_tx, storing_state).await;
|
if let Err(ref err) = result {
|
||||||
if let Err(ref err) = result {
|
error!("Error in SPW {err:?}");
|
||||||
error!("Error in SPW {err:?}");
|
|
||||||
}
|
|
||||||
result
|
|
||||||
}
|
}
|
||||||
.instrument(spw_span),
|
result
|
||||||
);
|
}
|
||||||
|
.instrument(spw_span)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
|
|
|
@ -12,4 +12,5 @@ eyre = "0.6.8"
|
||||||
tokio = "1.27.0"
|
tokio = "1.27.0"
|
||||||
yama_pile = { path = "../yama_pile" }
|
yama_pile = { path = "../yama_pile" }
|
||||||
yama_midlevel_crypto = { path = "../yama_midlevel_crypto" }
|
yama_midlevel_crypto = { path = "../yama_midlevel_crypto" }
|
||||||
itertools = "0.10.5"
|
itertools = "0.10.5"
|
||||||
|
async-backtrace = "0.2.6"
|
|
@ -22,4 +22,5 @@ chrono = { version = "0.4.24", features = ["serde"] }
|
||||||
uuid = { version = "1.3.0", features = ["fast-rng", "v4"] }
|
uuid = { version = "1.3.0", features = ["fast-rng", "v4"] }
|
||||||
unix_mode = "0.1.3"
|
unix_mode = "0.1.3"
|
||||||
|
|
||||||
backtrace = "0.3.67"
|
backtrace = "0.3.67"
|
||||||
|
async-backtrace = "0.2.6"
|
|
@ -181,7 +181,7 @@ impl LockHandle {
|
||||||
// Good. Now start a background task for refreshing it as necessary.
|
// Good. Now start a background task for refreshing it as necessary.
|
||||||
// TODO spawn this onto a joinset and then make sure we release locks at end of program...
|
// TODO spawn this onto a joinset and then make sure we release locks at end of program...
|
||||||
let lock_path2 = lock_path.clone();
|
let lock_path2 = lock_path.clone();
|
||||||
let lock_task_join_handle = Some(tokio::spawn(async move {
|
let lock_task_join_handle = Some(tokio::spawn(async_backtrace::frame!(async move {
|
||||||
if let Err(err) = lock_renewal(
|
if let Err(err) = lock_renewal(
|
||||||
provider,
|
provider,
|
||||||
lock_path2,
|
lock_path2,
|
||||||
|
@ -194,7 +194,7 @@ impl LockHandle {
|
||||||
{
|
{
|
||||||
error!("Lock renewal task failed: {err:?}");
|
error!("Lock renewal task failed: {err:?}");
|
||||||
}
|
}
|
||||||
}));
|
})));
|
||||||
|
|
||||||
break Ok(LockHandle {
|
break Ok(LockHandle {
|
||||||
lock,
|
lock,
|
||||||
|
|
|
@ -19,3 +19,4 @@ rand = "0.8.5"
|
||||||
thiserror = "1.0.40"
|
thiserror = "1.0.40"
|
||||||
eyre = "0.6.8"
|
eyre = "0.6.8"
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
|
async-backtrace = "0.2.6"
|
||||||
|
|
Loading…
Reference in New Issue