From 3c09e741d464b1d09376c83ef4ab7ef2066a4fa3 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 9 Jan 2022 07:43:13 +0000 Subject: [PATCH] Add some metrics and emit metric logs from Datman --- Cargo.lock | 148 +++++++++++++++++++++++++++++++---- datman/Cargo.toml | 2 + datman/src/bin/datman.rs | 11 +++ yama/Cargo.toml | 2 +- yama/src/pile/compression.rs | 39 +++++++++ 5 files changed, 186 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9afe12e..f543e35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.3.8" @@ -39,9 +45,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.41" +version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61" +checksum = "84450d0b4a8bd1ba4144ce8ce718fbc5d071358b1e5384bace6536b3d1f2d5b3" [[package]] name = "arc-interner" @@ -72,6 +78,37 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "bare-metrics-core" +version = "0.1.0" +dependencies = [ + "hdrhistogram", + "serde", + "serde_bare 0.5.0", +] + +[[package]] +name = "bare-metrics-recorder" +version = "0.1.0" +dependencies = [ + "anyhow", + "bare-metrics-core", + "crossbeam-channel", + "dashmap", + "fxhash", + "hdrhistogram", + "log", + "metrics", + "serde_bare 0.5.0", + "thiserror", +] + +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + [[package]] name = "bitflags" version = "1.2.1" @@ -196,6 +233,15 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "crc32fast" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "738c290dfaea84fc1ca15ad9c168d083b05a714e1efddd8edaab678dc28d2836" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "crossbeam-channel" version = "0.5.1" @@ -262,6 +308,7 @@ version = "0.5.0-alpha.2" dependencies = [ "anyhow", "arc-interner", + "bare-metrics-recorder", "byteorder", "chrono", "clap", @@ -273,6 +320,7 @@ dependencies = [ "indicatif", "itertools 0.10.1", "log", + "metrics", "serde", "serde_json", "termion", @@ -357,6 +405,18 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5afa29be46b12c8c380b997def8d1ac77c2665da93eb0a768fab0bf4db79333f" +[[package]] +name = "flate2" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f" +dependencies = [ + "cfg-if 1.0.0", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "fs2" version = "0.4.3" @@ -367,6 +427,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "gcc" version = "0.3.55" @@ -419,6 +488,20 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "hdrhistogram" +version = "7.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6490be71f07a5f62b564bc58e36953f675833df11c7e4a0647bee7a07ca1ec5e" +dependencies = [ + "base64", + "byteorder", + "crossbeam-channel", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.3.3" @@ -583,13 +666,12 @@ dependencies = [ [[package]] name = "metrics" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a00f42f354a2ed4894db863b3a4db47aef2d2e4435b937221749bd37a8a7aaa8" +checksum = "55586aa936c35f34ba8aa5d97356d554311206e1ce1f9e68fe7b07288e5ad827" dependencies = [ "ahash 0.7.6", "metrics-macros", - "proc-macro-hack", ] [[package]] @@ -606,6 +688,22 @@ dependencies = [ "syn", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "nix" version = "0.17.0" @@ -631,6 +729,17 @@ dependencies = [ "libc", ] +[[package]] +name = "nom" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +dependencies = [ + "memchr", + "minimal-lexical", + "version_check", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -914,9 +1023,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.126" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +checksum = "97565067517b60e2d1ea8b268e59ce036de907ac523ad83a0475da04e818989a" dependencies = [ "serde_derive", ] @@ -931,10 +1040,19 @@ dependencies = [ ] [[package]] -name = "serde_derive" -version = "1.0.126" +name = "serde_bare" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" +checksum = "51c55386eed0f1ae957b091dc2ca8122f287b60c79c774cbe3d5f2b69fded660" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_derive" +version = "1.0.133" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537" dependencies = [ "proc-macro2", "quote", @@ -1055,18 +1173,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.25" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa6f76457f59514c7eeb4e59d891395fab0b2fd1d40723ae737d64153392e9c6" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.25" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a36768c0fbf1bb15eca10defa29526bda730a2376c2ab4393ccfa16fb1a318d" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" dependencies = [ "proc-macro2", "quote", @@ -1241,7 +1359,7 @@ dependencies = [ "rusqlite", "rustyline", "serde", - "serde_bare", + "serde_bare 0.3.0", "sodiumoxide", "sshish", "temp-dir", diff --git a/datman/Cargo.toml b/datman/Cargo.toml index 4d2e3f5..7559721 100644 --- a/datman/Cargo.toml +++ b/datman/Cargo.toml @@ -31,3 +31,5 @@ chrono = "0.4.19" itertools = "0.10.1" hostname = "0.3.1" yama = { path = "../yama", version = "0.5.0-alpha.1" } +metrics = "0.17.1" +bare-metrics-recorder = { path = "../../../libraries/bare-metrics/bare-metrics-recorder" } diff --git a/datman/src/bin/datman.rs b/datman/src/bin/datman.rs index 89ecc8a..1667476 100644 --- a/datman/src/bin/datman.rs +++ b/datman/src/bin/datman.rs @@ -15,12 +15,14 @@ You should have received a copy of the GNU General Public License along with Yama. If not, see . */ +use std::fs::File; use std::path::{Path, PathBuf}; use clap::Parser; use env_logger::Env; use anyhow::bail; +use bare_metrics_recorder::recording::BareMetricsRecorderCore; use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, TimeZone, Utc}; use datman::commands::backup::{backup_all_sources_to_destination, backup_source_to_destination}; use datman::commands::ilabel::interactive_labelling_session; @@ -169,6 +171,15 @@ fn with_exitcode(result: anyhow::Result) { fn main() -> anyhow::Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + let now = Utc::now(); + + let (shard, stopper) = BareMetricsRecorderCore::new(File::create(format!( + "/tmp/datman_{}.baremetrics", + now.format("%F_%H%M%S") + ))?) + .start("datman".to_string())?; + shard.install_as_metrics_recorder()?; + let opts: DatmanCommand = DatmanCommand::parse(); match opts { diff --git a/yama/Cargo.toml b/yama/Cargo.toml index bb09dc0..4fdbedc 100644 --- a/yama/Cargo.toml +++ b/yama/Cargo.toml @@ -39,7 +39,7 @@ rusqlite = "0.24.2" chrono = "0.4.19" rustyline = "7.1.0" derivative = "2.2.0" -metrics = "0.17.0" +metrics = "0.17.1" [dev-dependencies] temp-dir = "0.1.11" diff --git a/yama/src/pile/compression.rs b/yama/src/pile/compression.rs index 0675ea0..d233e33 100644 --- a/yama/src/pile/compression.rs +++ b/yama/src/pile/compression.rs @@ -23,6 +23,7 @@ use anyhow::anyhow; use crossbeam_channel::{Receiver, Sender}; use derivative::Derivative; use log::error; +use metrics::{register_counter, Unit}; use zstd::block::{Compressor, Decompressor}; use crate::definitions::ChunkId; @@ -58,6 +59,37 @@ impl RawPileCompressor { underlying: R, settings: CompressionSettings, ) -> anyhow::Result<(Self, Vec>)> { + register_counter!( + "compressor_in_bytes", + Unit::Bytes, + "Number of bytes that have been fed into the compressor" + ); + register_counter!( + "compressor_out_bytes", + Unit::Bytes, + "Number of bytes that have come out of the compressor" + ); + register_counter!( + "compressor_chunks", + Unit::Count, + "Number of chunks that have been compressed" + ); + register_counter!( + "decompressor_in_bytes", + Unit::Bytes, + "Number of bytes that have been fed into the decompressor" + ); + register_counter!( + "decompressor_out_bytes", + Unit::Bytes, + "Number of bytes that have come out of the decompressor" + ); + register_counter!( + "decompressor_chunks", + Unit::Count, + "Number of chunks that have been decompressed" + ); + if settings.num_compressors == 0 && settings.num_decompressors == 0 { // optimisation for when we're only building a pipeline: we don't want to return Ok(( @@ -204,8 +236,15 @@ impl RawPileCompressor { let bytes = compressor.compress(&bytes, level)?; let out_bytes = bytes.len(); next_stage.send((chunk_id, bytes))?; + // Per-worker metrics + // TODO rename metrics::counter!("compressor_bytes_input", in_bytes as u64, "id" => worker_id); metrics::counter!("compressor_bytes_output", out_bytes as u64, "id" => worker_id); + + // Global metrics + metrics::counter!("compressor_in_bytes", in_bytes as u64); + metrics::counter!("compressor_out_bytes", out_bytes as u64); + metrics::increment_counter!("compressor_chunks"); } Ok(())