From 2876939b59454e4393e5ecdfe69a21c61a18a927 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 2 Apr 2023 10:52:58 +0100 Subject: [PATCH] Port to Postgres --- Cargo.lock | 343 +++-- quickpeep/Cargo.toml | 2 +- quickpeep_raker/.env | 1 + quickpeep_raker/Cargo.toml | 8 +- quickpeep_raker/build.rs | 5 + quickpeep_raker/devdb.sh | 21 + .../20230401164804_initial_raker_store.sql | 42 + quickpeep_raker/sqlx-data.json | 3 + quickpeep_raker/src/bin/qp-raker-db.rs | 173 --- quickpeep_raker/src/bin/qp-raker-onhold.rs | 73 +- quickpeep_raker/src/bin/qp-raker.rs | 62 +- quickpeep_raker/src/bin/qp-seedrake.rs | 196 +-- quickpeep_raker/src/config.rs | 14 +- quickpeep_raker/src/raking.rs | 25 +- quickpeep_raker/src/raking/task.rs | 369 ++--- quickpeep_raker/src/storage.rs | 1274 ++++++++--------- quickpeep_raker/src/storage/maintenance.rs | 97 +- .../src/storage/mdbx_helper_types.rs | 190 --- quickpeep_raker/src/storage/migrations.rs | 2 - quickpeep_raker/src/storage/records.rs | 39 +- quickpeep_utils/src/dates.rs | 10 +- shell.nix | 2 + 22 files changed, 1356 insertions(+), 1595 deletions(-) create mode 100644 quickpeep_raker/.env create mode 100644 quickpeep_raker/build.rs create mode 100644 quickpeep_raker/devdb.sh create mode 100644 quickpeep_raker/migrations/20230401164804_initial_raker_store.sql create mode 100644 quickpeep_raker/sqlx-data.json delete mode 100644 quickpeep_raker/src/bin/qp-raker-db.rs delete mode 100644 quickpeep_raker/src/storage/mdbx_helper_types.rs delete mode 100644 quickpeep_raker/src/storage/migrations.rs diff --git a/Cargo.lock b/Cargo.lock index 7c731ee..5750bc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,7 +15,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab3f32d1eb0f323dcdd51cbb8d68cff415153870ff3bd60e12d5d56298bfcb1" dependencies = [ "addr", - "base64", + "base64 0.13.0", "bitflags", "flate2", "idna", @@ -206,9 +206,9 @@ dependencies = [ [[package]] name = "atoi" -version = "0.4.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" dependencies = [ "num-traits", ] @@ -318,6 +318,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" + [[package]] name = "bindgen" version = "0.59.2" @@ -394,15 +400,6 @@ dependencies = [ "generic-array 0.12.4", ] -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array 0.14.5", -] - [[package]] name = "block-buffer" version = "0.10.2" @@ -700,18 +697,18 @@ dependencies = [ [[package]] name = "crc" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" dependencies = [ "crc-catalog", ] [[package]] name = "crc-catalog" -version = "1.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" +checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" [[package]] name = "crc32fast" @@ -947,15 +944,6 @@ dependencies = [ "generic-array 0.12.4", ] -[[package]] -name = "digest" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" -dependencies = [ - "generic-array 0.14.5", -] - [[package]] name = "digest" version = "0.10.3" @@ -978,10 +966,30 @@ dependencies = [ ] [[package]] -name = "dotenv" -version = "0.15.0" +name = "dirs" +version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" [[package]] name = "downcast-rs" @@ -1009,6 +1017,9 @@ name = "either" version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +dependencies = [ + "serde", +] [[package]] name = "encoding_rs" @@ -1243,9 +1254,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" @@ -1349,13 +1360,13 @@ checksum = "aa12dfaa57be769c6681b4d193398cae8db7f7b9af3e86d362d7f0a3c294a1a0" dependencies = [ "anyhow", "ring", - "rustls", + "rustls 0.19.1", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.22.0", "url", - "webpki", - "webpki-roots", + "webpki 0.21.4", + "webpki-roots 0.21.1", "x509-signature", ] @@ -1453,12 +1464,21 @@ dependencies = [ ] [[package]] -name = "hashlink" -version = "0.7.0" +name = "hashbrown" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "hashbrown", + "ahash 0.7.6", +] + +[[package]] +name = "hashlink" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +dependencies = [ + "hashbrown 0.12.3", ] [[package]] @@ -1467,7 +1487,7 @@ version = "7.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0" dependencies = [ - "base64", + "base64 0.13.0", "byteorder", "crossbeam-channel", "flate2", @@ -1489,6 +1509,9 @@ name = "heck" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "hermit-abi" @@ -1505,6 +1528,24 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "html5ever" version = "0.25.1" @@ -1671,7 +1712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -1890,9 +1931,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.23.2" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cafc7c74096c336d9d27145f7ebd4f4b6f95ba16aa5a282387267e6925cb58" +checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14" dependencies = [ "cc", "pkg-config", @@ -2720,7 +2761,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb87f3080f6d1d69e8c564c0fcfde1d7aa8cc451ce40cae89479111f03bc0eb" dependencies = [ - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -2779,6 +2820,15 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9376a4f0340565ad675d11fc1419227faf5f60cd7ac9cb2e7185a471f30af833" +[[package]] +name = "md-5" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66b48670c893079d3c2ed79114e3644b7004df1c361a4e0ad52e2e6940d07c3d" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "mdbx-sys" version = "0.11.4-git.20210105" @@ -2924,7 +2974,7 @@ dependencies = [ "atomic-shim", "crossbeam-epoch", "crossbeam-utils", - "hashbrown", + "hashbrown 0.11.2", "metrics 0.18.1", "num_cpus", "parking_lot 0.11.2", @@ -3219,12 +3269,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "openssl" version = "0.10.38" @@ -3615,11 +3659,11 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "1d0dd4be24fcdcfeaa12a432d588dc59bbad6cad3510c67e74a2b6b2fc950564" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -3659,7 +3703,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "292972edad6bbecc137ab84c5e36421a4a6c979ea31d3cc73540dd04315b33e1" dependencies = [ "byteorder", - "hashbrown", + "hashbrown 0.11.2", "idna", "psl-types", ] @@ -3807,6 +3851,7 @@ dependencies = [ "diplomatic-bag", "env_logger", "feed-rs", + "futures-core", "futures-util", "gemini-fetch", "html5ever", @@ -3815,12 +3860,10 @@ dependencies = [ "itertools", "kuchiki", "lazy_static", - "libmdbx", "lingua", "log", "lru", "markup5ever", - "mdbx-sys", "metrics 0.18.1", "metrics-exporter-prometheus", "metrics-process-promstyle", @@ -3843,7 +3886,7 @@ dependencies = [ "signal-hook 0.3.13", "sitemap", "smartstring", - "tempfile", + "sqlx", "tikv-jemallocator", "tokio", "webp", @@ -4017,6 +4060,17 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +dependencies = [ + "getrandom 0.2.5", + "redox_syscall", + "thiserror", +] + [[package]] name = "regex" version = "1.5.5" @@ -4066,7 +4120,7 @@ version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" dependencies = [ - "base64", + "base64 0.13.0", "bytes", "encoding_rs", "futures-core", @@ -4150,7 +4204,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b861ecaade43ac97886a512b360d01d66be9f41f3c61088b42cedf92e03d678" dependencies = [ - "base64", + "base64 0.13.0", "bitflags", "serde", ] @@ -4186,11 +4240,32 @@ version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" dependencies = [ - "base64", + "base64 0.13.0", "log", "ring", - "sct", - "webpki", + "sct 0.6.1", + "webpki 0.21.4", +] + +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct 0.7.0", + "webpki 0.22.0", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +dependencies = [ + "base64 0.21.0", ] [[package]] @@ -4237,6 +4312,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "seahash" version = "3.0.7" @@ -4363,20 +4448,29 @@ dependencies = [ "block-buffer 0.7.3", "digest 0.8.1", "fake-simd", - "opaque-debug 0.2.3", + "opaque-debug", +] + +[[package]] +name = "sha1" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "006769ba83e921b3085caa8334186b00cf92b4cb1a6cf4632fbccc8eff5c7549" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.3", ] [[package]] name = "sha2" -version = "0.9.9" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +checksum = "cf9db03534dff993187064c4e0c05a5708d2a9728ace9a8959b77bedf415dac5" dependencies = [ - "block-buffer 0.9.0", "cfg-if", "cpufeatures", - "digest 0.9.0", - "opaque-debug 0.3.0", + "digest 0.10.3", ] [[package]] @@ -4500,9 +4594,9 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.1.8" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4" +checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" dependencies = [ "itertools", "nom 7.1.1", @@ -4511,9 +4605,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.5.11" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc15591eb44ffb5816a4a70a7efd5dd87bfd3aa84c4c200401c4396140525826" +checksum = "f8de3b03a925878ed54a954f621e64bf55a3c1bd29652d0d1a17830405350188" dependencies = [ "sqlx-core", "sqlx-macros", @@ -4521,18 +4615,22 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.5.11" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "195183bf6ff8328bb82c0511a83faf60aacf75840103388851db61d7a9854ae3" +checksum = "fa8241483a83a3f33aa5fff7e7d9def398ff9990b2752b6c6112b83c6d246029" dependencies = [ "ahash 0.7.6", "atoi", + "base64 0.13.0", "bitflags", "byteorder", "bytes", "crc", "crossbeam-queue", + "dirs", + "dotenvy", "either", + "event-listener", "flume", "futures-channel", "futures-core", @@ -4541,16 +4639,24 @@ dependencies = [ "futures-util", "hashlink", "hex", + "hkdf", + "hmac", "indexmap", "itoa 1.0.1", "libc", "libsqlite3-sys", "log", + "md-5", "memchr", "once_cell", "paste", "percent-encoding", - "rustls", + "rand 0.8.5", + "rustls 0.20.8", + "rustls-pemfile", + "serde", + "serde_json", + "sha1", "sha2", "smallvec", "sqlformat", @@ -4559,22 +4665,25 @@ dependencies = [ "thiserror", "tokio-stream", "url", - "webpki", - "webpki-roots", + "webpki-roots 0.22.6", + "whoami", ] [[package]] name = "sqlx-macros" -version = "0.5.11" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eee35713129561f5e55c554bba1c378e2a7e67f81257b7311183de98c50e6f94" +checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9" dependencies = [ - "dotenv", + "dotenvy", "either", - "heck 0.3.3", + "heck 0.4.0", + "hex", "once_cell", "proc-macro2", "quote", + "serde", + "serde_json", "sha2", "sqlx-core", "sqlx-rt", @@ -4584,13 +4693,13 @@ dependencies = [ [[package]] name = "sqlx-rt" -version = "0.5.11" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b555e70fbbf84e269ec3858b7a6515bcfe7a166a7cc9c636dd6efd20431678b6" +checksum = "804d3f245f894e61b1e6263c84b23ca675d96753b5abfd5cc8597d86806e8024" dependencies = [ "once_cell", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", ] [[package]] @@ -4683,13 +4792,13 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.89" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -4705,7 +4814,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "264c2549892aa83975386a924ef8d0b8e909674c837d37ea58b4bd8739495c6e" dependencies = [ "async-trait", - "base64", + "base64 0.13.0", "bitpacking", "byteorder", "census", @@ -4994,9 +5103,20 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ - "rustls", + "rustls 0.19.1", "tokio", - "webpki", + "webpki 0.21.4", +] + +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls 0.20.8", + "tokio", + "webpki 0.22.0", ] [[package]] @@ -5203,6 +5323,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" +[[package]] +name = "unicode-ident" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" + [[package]] name = "unicode-normalization" version = "0.1.19" @@ -5218,12 +5344,6 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" -[[package]] -name = "unicode-xid" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" - [[package]] name = "unicode_categories" version = "0.1.1" @@ -5412,13 +5532,32 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki-roots" version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" dependencies = [ - "webpki", + "webpki 0.21.4", +] + +[[package]] +name = "webpki-roots" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" +dependencies = [ + "webpki 0.22.0", ] [[package]] @@ -5436,6 +5575,16 @@ dependencies = [ "cc", ] +[[package]] +name = "whoami" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c70234412ca409cc04e864e89523cb0fc37f5e1344ebed5a3ebf4192b6b9f68" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/quickpeep/Cargo.toml b/quickpeep/Cargo.toml index 2ab0019..0bf7b9f 100644 --- a/quickpeep/Cargo.toml +++ b/quickpeep/Cargo.toml @@ -17,7 +17,7 @@ ron = "0.7.0" tower-http = { version = "0.2.5", features = ["fs"] } log = "0.4.14" env_logger = "0.9.0" -sqlx = { version = "0.5.11", features = ["sqlite", "runtime-tokio-rustls"] } +sqlx = { version = "0.6.3", features = ["sqlite", "runtime-tokio-rustls"] } itertools = "0.10.3" colour = "0.6.0" futures-util = "0.3.21" diff --git a/quickpeep_raker/.env b/quickpeep_raker/.env new file mode 100644 index 0000000..5955be7 --- /dev/null +++ b/quickpeep_raker/.env @@ -0,0 +1 @@ +DATABASE_URL=${RAKER_DATABASE_URL} diff --git a/quickpeep_raker/Cargo.toml b/quickpeep_raker/Cargo.toml index 0bee5fb..7adb3d3 100644 --- a/quickpeep_raker/Cargo.toml +++ b/quickpeep_raker/Cargo.toml @@ -34,9 +34,7 @@ bytesize = {version = "1.1.0", features = ["serde"]} chrono = "0.4.19" ### Storage -libmdbx = "0.1.1" -# Used for FFI. Must match the version in libmdbx. -mdbx-sys = "0.11.4-git.20210105" +sqlx = { version = "0.6.3", features = ["postgres", "runtime-tokio-rustls", "offline", "json"] } # For compression of emitted packs. 0.11.1+zstd.1.5.2 zstd = "0.11.1" @@ -45,6 +43,7 @@ lazy_static = "1.4.0" bytes = "1.1.0" itertools = "0.10.3" ipnetwork = "0.18.0" +futures-core = "0.3.28" futures-util = "0.3.21" tokio = { version = "1.17.0", features = ["full"] } anyhow = "1.0.55" @@ -91,6 +90,3 @@ metrics = "0.18.1" metrics-exporter-prometheus = { version = "0.9.0", default-features = false, features = ["http-listener"] } metrics-process-promstyle = "0.18.0" bare-metrics-recorder = "0.1.0" - -[dev-dependencies] -tempfile = "3.3.0" diff --git a/quickpeep_raker/build.rs b/quickpeep_raker/build.rs new file mode 100644 index 0000000..d506869 --- /dev/null +++ b/quickpeep_raker/build.rs @@ -0,0 +1,5 @@ +// generated by `sqlx migrate build-script` +fn main() { + // trigger recompilation when a new migration is added + println!("cargo:rerun-if-changed=migrations"); +} diff --git a/quickpeep_raker/devdb.sh b/quickpeep_raker/devdb.sh new file mode 100644 index 0000000..74ce4de --- /dev/null +++ b/quickpeep_raker/devdb.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +cname=`docker run -d --rm -e POSTGRES_PASSWORD=password -p 127.0.0.10:55432:5432 postgres:15` + +onstop() { + docker stop $cname +} + +trap onstop EXIT + +export RAKER_DATABASE_URL=postgres://postgres:password@127.0.0.10:55432/postgres + +echo "Running migrations" +sqlx migrate run +echo "Preparing offline mode SQLx data" +#cargo sqlx prepare +cargo sqlx prepare -- --lib +#cargo sqlx prepare --merged -- --all-features --all-targets --lib + + + diff --git a/quickpeep_raker/migrations/20230401164804_initial_raker_store.sql b/quickpeep_raker/migrations/20230401164804_initial_raker_store.sql new file mode 100644 index 0000000..dc943cb --- /dev/null +++ b/quickpeep_raker/migrations/20230401164804_initial_raker_store.sql @@ -0,0 +1,42 @@ +-- All timestamps are in sec since 2023-01-01. + +CREATE TYPE rakeintent AS ENUM ('Any', 'Page', 'Feed', 'SiteMap', 'Icon'); + +CREATE TABLE domains ( + domain_id SERIAL PRIMARY KEY NOT NULL, + name TEXT UNIQUE NOT NULL, + domain_record JSONB NOT NULL +); + +CREATE TABLE urls ( + url_id SERIAL PRIMARY KEY NOT NULL, + domain_id INTEGER NOT NULL REFERENCES domains(domain_id), + url TEXT NOT NULL, + intent rakeintent NOT NULL, + visited_at_ts INTEGER, + + UNIQUE (domain_id, url) +); + +CREATE TABLE url_queue ( + url_id INTEGER PRIMARY KEY NOT NULL REFERENCES urls(url_id), + rake_after_ts INTEGER NOT NULL +); +-- Used for finding things to rake. +CREATE INDEX url_queue_rake_after_ts_idx ON url_queue(rake_after_ts); + + +CREATE TABLE active_domain_raffle ( + domain_id INTEGER PRIMARY KEY NOT NULL REFERENCES domains(domain_id), + raffle INTEGER UNIQUE NOT NULL +); + + +CREATE TABLE domain_backoffs ( + domain_id INTEGER PRIMARY KEY NOT NULL REFERENCES domains(domain_id), + backoff_until_ts INTEGER NOT NULL, + backoff_sec INTEGER NOT NULL, + reason TEXT NOT NULL +); +-- Used for finding things to rake. +CREATE INDEX domain_backoffs_backoff_until_ts_idx ON domain_backoffs(backoff_until_ts); diff --git a/quickpeep_raker/sqlx-data.json b/quickpeep_raker/sqlx-data.json new file mode 100644 index 0000000..95c8c85 --- /dev/null +++ b/quickpeep_raker/sqlx-data.json @@ -0,0 +1,3 @@ +{ + "db": "PostgreSQL" +} \ No newline at end of file diff --git a/quickpeep_raker/src/bin/qp-raker-db.rs b/quickpeep_raker/src/bin/qp-raker-db.rs deleted file mode 100644 index b23c352..0000000 --- a/quickpeep_raker/src/bin/qp-raker-db.rs +++ /dev/null @@ -1,173 +0,0 @@ -use clap::Parser; -use std::borrow::Cow; - -use std::fmt::Debug; - -use env_logger::Env; - -use anyhow::{bail, Context}; - -use colour::{dark_yellow_ln, red_ln}; -use libmdbx::{Database, TableObject, RO}; - -use std::path::PathBuf; - -use quickpeep_raker::config; - -use quickpeep_raker::storage::mdbx_helper_types::MdbxBare; -use quickpeep_raker::storage::records::{ - ActiveDomainRecord, BackingOffDomainRecord, DomainRecord, OnHoldUrlRecord, QueueUrlRecord, - UrlVisitedRecord, -}; -use quickpeep_raker::storage::{RakerStore, RakerTxn}; - -/// Seeds a raker's queue with URLs -#[derive(Clone, Debug, Parser)] -pub struct Opts { - #[clap(long = "config")] - config: Option, - - /// Table name - table: String, - - /// Key name to look up - key_name: String, - - /// Search for any prefix, not an exact match. - #[clap(long = "prefix", short = 'p')] - prefix: bool, -} - -#[tokio::main] -pub async fn main() -> anyhow::Result<()> { - env_logger::Builder::from_env(Env::default().default_filter_or("info,quickpeep=debug")).init(); - - let opts: Opts = Opts::parse(); - - let config_path = opts - .config - .unwrap_or_else(|| PathBuf::from("quickpeep.ron")); - let config = config::RakerConfig::load(&config_path).context("Failed to load config")?; - - if !config.raker.workbench_dir.exists() { - bail!( - "Workbench directory ({:?}) doesn't exist.", - config.raker.workbench_dir - ); - } - if !config.seed_dir.exists() { - bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir); - } - - let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?; - - let txn = store.ro_txn()?; - match opts.table.as_ref() { - "queue_urls" | "urls_queue" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().queue_urls, - &txn, - )?; - } - "active_domains" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().active_domains, - &txn, - )?; - } - "active_domains_raffle" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().active_domain_raffle, - &txn, - )?; - } - "backing_off_reinstatements" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().backing_off_reinstatements, - &txn, - )?; - } - "backing_off_domains" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().backing_off_domains, - &txn, - )?; - } - "visited_urls" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().visited_urls, - &txn, - )?; - } - "domains" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().domains, - &txn, - )?; - } - "urls_on_hold" => { - inspect::>( - opts.key_name.as_ref(), - opts.prefix, - &txn.mdbx.borrow_dbs().urls_on_hold, - &txn, - )?; - } - other => { - dark_yellow_ln!("Unknown database {:?}", other); - } - } - - Ok(()) -} - -trait Inspectable { - fn inspect(&self) -> String; -} - -impl Inspectable for MdbxBare { - fn inspect(&self) -> String { - format!("{:?}", &self.0) - } -} - -fn inspect<'a, IV: Inspectable + TableObject<'a> + 'static>( - key: &str, - prefix: bool, - database: &Database<'a>, - txn: &'a RakerTxn<'a, RO>, -) -> anyhow::Result<()> { - if prefix { - let mut cur = txn.mdbx_txn.cursor(database)?; - for item in cur.iter_from::, IV>(key.as_bytes()) { - let (k, v) = item?; - if !k.starts_with(key.as_bytes()) { - break; - } - println!("• {}", std::str::from_utf8(&k).unwrap_or("")); - println!(" = {}", v.inspect()); - } - } else { - if let Some(entry) = txn.mdbx_txn.get::(database, key.as_bytes())? { - println!("{}", entry.inspect()); - } else { - red_ln!("no value"); - } - } - - Ok(()) -} diff --git a/quickpeep_raker/src/bin/qp-raker-onhold.rs b/quickpeep_raker/src/bin/qp-raker-onhold.rs index 05bdb58..ed3446d 100644 --- a/quickpeep_raker/src/bin/qp-raker-onhold.rs +++ b/quickpeep_raker/src/bin/qp-raker-onhold.rs @@ -1,13 +1,11 @@ -use anyhow::{bail, Context}; +use anyhow::Context; use clap::Parser; use colour::{blue, yellow_ln}; use env_logger::Env; use itertools::Itertools; -use libmdbx::Database; use quickpeep_raker::config; -use quickpeep_raker::storage::mdbx_helper_types::{MdbxBare, MdbxString}; -use quickpeep_raker::storage::records::OnHoldUrlRecord; use quickpeep_raker::storage::RakerStore; +use sqlx::query; use std::collections::HashMap; use std::path::PathBuf; @@ -16,12 +14,9 @@ use std::path::PathBuf; pub struct Opts { #[clap(long = "config")] config: Option, - - /// Whether to show URLs instead of domains - #[clap(long = "urls")] - urls: bool, } +// TODO re-introduce refcounting #[tokio::main] pub async fn main() -> anyhow::Result<()> { env_logger::Builder::from_env( @@ -36,19 +31,11 @@ pub async fn main() -> anyhow::Result<()> { .unwrap_or_else(|| PathBuf::from("quickpeep.ron")); let config = config::RakerConfig::load(&config_path).context("Failed to load config")?; - if !config.raker.workbench_dir.exists() { - bail!( - "Workbench directory ({:?}) doesn't exist.", - config.raker.workbench_dir - ); - } + let store = RakerStore::open(&config.raker.database_uri).await?; - let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?; - let is_urls = opts.urls; + let counts = count_on_hold(&store).await?; let counts = tokio::task::spawn_blocking(move || -> anyhow::Result> { - let counts = count_on_hold(&store, is_urls)?; - let sorted_counts = counts .into_iter() .map(|(string, count)| (count, string)) @@ -60,12 +47,8 @@ pub async fn main() -> anyhow::Result<()> { }) .await??; - blue!("№ Refs "); - if opts.urls { - yellow_ln!("URL"); - } else { - yellow_ln!("Domain"); - } + blue!("№ URLs "); + yellow_ln!("Domain"); for (count, string) in counts { println!("{:>6} {}", count, string); } @@ -73,25 +56,27 @@ pub async fn main() -> anyhow::Result<()> { Ok(()) } -pub fn count_on_hold(store: &RakerStore, urls: bool) -> anyhow::Result> { - let mut map: HashMap = Default::default(); +pub async fn count_on_hold(store: &RakerStore) -> anyhow::Result> { + store + .ro_txn(move |txn| { + Box::pin(async move { + let rows = query!( + r#" + SELECT d.name AS "domain_name", COUNT(1) AS "url_count" FROM urls u + LEFT JOIN url_queue q ON u.url_id = q.url_id + JOIN domains d USING (domain_id) + WHERE u.visited_at_ts IS NULL AND q.url_id IS NULL + GROUP BY d.name + "# + ) + .fetch_all(&mut *txn.txn) + .await?; - let txn = store.ro_txn()?; - let urls_on_hold: &Database = &txn.mdbx.borrow_dbs().urls_on_hold; - - let mut cur = txn.mdbx_txn.cursor(urls_on_hold)?; - - for row in cur.iter_start::>() { - let (domain_then_url, record) = row?; - let mut split = domain_then_url.0.as_ref().split('\n'); - if urls { - // Skip one - split.next(); - } - let piece = split.next().context("Missing piece")?; - let count = map.entry(piece.to_owned()).or_insert(0); - *count += record.0.refs as u32; - } - - Ok(map) + Ok(rows + .into_iter() + .map(|row| (row.domain_name, row.url_count.unwrap_or(0) as u32)) + .collect::>()) + }) + }) + .await } diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index c0677b0..08d9eec 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -3,8 +3,7 @@ use clap::Parser; use env_logger::Env; use adblock::lists::RuleTypes; -use anyhow::{anyhow, bail, ensure, Context}; -use chrono::Utc; +use anyhow::{ensure, Context}; use log::{debug, error, info, warn}; use lru::LruCache; use metrics_exporter_prometheus::PrometheusBuilder; @@ -14,10 +13,10 @@ use signal_hook::consts::{SIGINT, SIGTERM}; use signal_hook::iterator::Signals; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant, SystemTime}; use tokio::fs::File; -use tokio::sync::{mpsc, oneshot, Notify, Semaphore}; +use tokio::sync::{mpsc, oneshot, Mutex, Notify, Semaphore}; use tokio::time::MissedTickBehavior; use quickpeep_raker::config; @@ -32,7 +31,6 @@ use quickpeep_structs::rake_entries::{ AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES, SCHEMA_RAKED_REJECTIONS, }; -use quickpeep_utils::dates::date_to_quickpeep_days; /// The ordering is slightly important on these: more specific things should come first. /// This means they filter out the troublesome elements before the broader filters do. @@ -78,13 +76,6 @@ pub async fn main() -> anyhow::Result<()> { .unwrap_or_else(|| PathBuf::from("quickpeep.ron")); let config = config::RakerConfig::load(&config_path).context("Failed to load config")?; - if !config.raker.workbench_dir.exists() { - bail!( - "Workbench directory ({:?}) doesn't exist.", - config.raker.workbench_dir - ); - } - let mut header_map = HeaderMap::new(); header_map.insert(USER_AGENT, HeaderValue::from_static(RAKER_USER_AGENT)); @@ -106,7 +97,7 @@ pub async fn main() -> anyhow::Result<()> { .redirect(Policy::limited(5)) .build()?; - let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?; + let store = RakerStore::open(&config.raker.database_uri).await?; let mut adblock_engines = Vec::new(); for (antifeature, name) in &ADBLOCK_FILTER_PATHS { @@ -277,12 +268,8 @@ pub async fn main() -> anyhow::Result<()> { // Reinstate old backoffs and re-rakable URLs store - .async_rw_txn(|txn| { - let today = date_to_quickpeep_days(&Utc::today())?; - txn.reinstate_backoffs(SystemTime::now())?; - txn.reinstate_rerakables(today)?; - txn.commit()?; - Ok(()) + .rw_txn(move |mut txn| { + Box::pin(async move { txn.reinstate_backoffs(SystemTime::now()).await }) }) .await?; @@ -295,8 +282,9 @@ pub async fn main() -> anyhow::Result<()> { loop { tokio::select! { _ = interval.tick() => { - let txn = store.ro_txn()?; - txn.emit_datastore_metrics()?; + store.ro_txn(move |mut txn| Box::pin(async move { + txn.emit_datastore_metrics().await + })).await?; metrics_process_promstyle::emit_now()?; } _ = &mut dsmu_cancel_rx => { @@ -341,14 +329,19 @@ pub async fn main() -> anyhow::Result<()> { async fn acquire_active_domain(task_context: &TaskContext) -> anyhow::Result> { // Acquire a domain for the task to run against - let domain = { - let txn = task_context.store.ro_txn()?; - // TODO: don't clone teh Arc here — conv to ref. - txn.acquire_random_active_domain(task_context.busy_domains.clone())? - }; + let busy_domains = task_context.busy_domains.clone(); + let domain = task_context + .store + .ro_txn(move |mut txn| { + Box::pin(async move { + // TODO: don't clone teh Arc here — conv to ref. + txn.acquire_random_active_domain(busy_domains).await + }) + }) + .await?; match domain { - RandomActiveDomainAcquisition::GotOne { domain, record: _ } => Ok(Some(domain)), + RandomActiveDomainAcquisition::GotOne { domain } => Ok(Some(domain)), RandomActiveDomainAcquisition::AllBusy => Ok(None), RandomActiveDomainAcquisition::NoneLeft => Ok(None), } @@ -370,7 +363,7 @@ async fn orchestrator(task_context: TaskContext, semaphore: Arc) -> a if domain_to_process.is_none() && semaphore.available_permits() == max_permits { // There's nothing to do and nothing is being processed. ensure!( - task_context.busy_domains.lock().unwrap().is_empty(), + task_context.busy_domains.lock().await.is_empty(), "Shutting down orchestrator but set of busy domains is not empty." ); } @@ -378,13 +371,9 @@ async fn orchestrator(task_context: TaskContext, semaphore: Arc) -> a tokio::select! { _ = tokio::time::sleep_until(next_reinstate.into()) => { // Reinstate backoffs and rerakables - if let Err(err) = task_context.store.async_rw_txn(|txn| { - txn.reinstate_backoffs(SystemTime::now())?; - let today = date_to_quickpeep_days(&Utc::today())?; - txn.reinstate_rerakables(today)?; - txn.commit()?; - Ok(()) - }).await { + if let Err(err) = task_context.store.rw_txn(move |mut txn| Box::pin(async move { + txn.reinstate_backoffs(SystemTime::now()).await + })).await { error!("Error performing periodic reinstatements: {err:?}"); } @@ -402,8 +391,7 @@ async fn orchestrator(task_context: TaskContext, semaphore: Arc) -> a } ensure!( task_context.busy_domains - .lock() - .map_err(|_| anyhow!("busy domains set poisoned"))? + .lock().await .remove(&domain), "Our domain was not busy after processing!" ); diff --git a/quickpeep_raker/src/bin/qp-seedrake.rs b/quickpeep_raker/src/bin/qp-seedrake.rs index c586575..3eea693 100644 --- a/quickpeep_raker/src/bin/qp-seedrake.rs +++ b/quickpeep_raker/src/bin/qp-seedrake.rs @@ -8,6 +8,8 @@ use anyhow::{bail, Context}; use colour::{dark_green_ln, dark_red_ln, dark_yellow, green, red, yellow_ln}; use reqwest::{Client, Url}; use std::path::PathBuf; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; @@ -42,17 +44,11 @@ pub async fn main() -> anyhow::Result<()> { .unwrap_or_else(|| PathBuf::from("quickpeep.ron")); let config = RakerConfig::load(&config_path).context("Failed to load config")?; - if !config.raker.workbench_dir.exists() { - bail!( - "Workbench directory ({:?}) doesn't exist.", - config.raker.workbench_dir - ); - } if !config.seed_dir.exists() { bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir); } - let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?; + let store = RakerStore::open(&config.raker.database_uri).await?; import_seeds(store.clone(), &config).await?; @@ -60,7 +56,9 @@ pub async fn main() -> anyhow::Result<()> { eprintln!("... re-applying seeds and weeds to on-hold URLs ..."); store - .async_rw_txn(|txn| maintenance::reapply_seeds_and_weeds_to_on_hold_urls(txn)) + .rw_txn(move |mut txn| { + Box::pin(async move { maintenance::reapply_seeds_and_weeds_to_on_hold_urls(txn).await }) + }) .await?; eprintln!("... done!"); @@ -137,18 +135,23 @@ async fn importer( ) -> anyhow::Result { let mut buf = Vec::with_capacity(BATCH_SIZE); let mut stats = SeedImportStats::default(); - let client = Client::new(); + let client = Arc::new(Client::new()); while let Some(seed) = recv.recv().await { buf.push(seed); if buf.len() == BATCH_SIZE { - import_and_flush_batch_seeds_or_weeds( - &store, &mut buf, &mut stats, &client, !are_weeds, + stats = import_and_flush_batch_seeds_or_weeds( + &store, + buf, + stats, + client.clone(), + !are_weeds, ) .await?; + buf = Vec::new(); } } - import_and_flush_batch_seeds_or_weeds(&store, &mut buf, &mut stats, &client, !are_weeds) + stats = import_and_flush_batch_seeds_or_weeds(&store, buf, stats, client.clone(), !are_weeds) .await?; Ok(stats) @@ -156,83 +159,106 @@ async fn importer( async fn import_and_flush_batch_seeds_or_weeds( store: &RakerStore, - buf: &mut Vec, - stats: &mut SeedImportStats, - client: &Client, + mut buf: Vec, + mut stats: SeedImportStats, + client: Arc, is_seed: bool, -) -> anyhow::Result<()> { - let txn = store.rw_txn()?; - for seed in buf.drain(..) { - let as_url = Url::parse(seed.url.as_str()) - .with_context(|| format!("Failed to parse {:?} as URL", seed.url))?; - let domain = get_reduced_domain(&as_url) - .with_context(|| format!("No domain in seed URL '{as_url}'!"))?; +) -> anyhow::Result { + store + .rw_txn(move |mut txn| { + Box::pin(async move { + for seed in buf.drain(..) { + let as_url = Url::parse(seed.url.as_str()) + .with_context(|| format!("Failed to parse {:?} as URL", seed.url))?; + let domain = get_reduced_domain(&as_url) + .with_context(|| format!("No domain in seed URL '{as_url}'!"))?; - let domain_record = txn.get_domain_record(domain.borrow())?; - let is_domain_new = domain_record.is_none(); - let mut domain_record = domain_record.unwrap_or_default(); - if is_domain_new { - stats.new_domains += 1; - } - let mut dirty = is_domain_new; + let domain_record = txn.get_domain_record(domain.borrow()).await?; + let is_domain_new = domain_record.is_none(); + let mut domain_record = domain_record.unwrap_or_default(); + if is_domain_new { + stats.new_domains += 1; + } + let mut dirty = is_domain_new; - // Register the domain. This is a no-op if it's already active or backing off. - txn.insert_active_domain_with_new_raffle_ticket(domain.clone().into_owned())?; + // Register the domain. This is a no-op if it's already active or backing off. + txn.insert_active_domain_with_new_raffle_ticket(domain.clone().into_owned()) + .await?; - let url_like = match &seed.url { - UrlOrUrlPattern::Url(url_str) => { - let url = Url::parse(url_str.as_str())?; - if is_seed { - if txn.enqueue_url(url.as_str(), None, RakeIntent::Any)? { - stats.new_urls += 1; - } else { - stats.already_present_urls += 1; + let url_like = match &seed.url { + UrlOrUrlPattern::Url(url_str) => { + let url = Url::parse(url_str.as_str())?; + if is_seed { + if txn + .enqueue_url(url.as_str(), None, RakeIntent::Any, false) + .await? + { + stats.new_urls += 1; + } else { + stats.already_present_urls += 1; + } + } + + // Seed/weed with empty prefix + dirty |= domain_record + .rakeable_path_prefixes + .insert(String::new(), is_seed) + != Some(is_seed); + + url + } + UrlOrUrlPattern::UrlPrefix(prefix) => { + let prefix_as_url = Url::parse(prefix.as_str())?; + if is_seed { + if txn + .enqueue_url( + prefix_as_url.as_str(), + None, + RakeIntent::Any, + false, + ) + .await? + { + stats.new_urls += 1; + } else { + stats.already_present_urls += 1; + } + } + + dirty |= domain_record + .rakeable_path_prefixes + .insert(prefix_as_url.path().to_string(), is_seed) + != Some(is_seed); + + prefix_as_url + } + }; + + if dirty { + txn.put_domain_record(domain.borrow(), domain_record) + .await?; + } + + if is_seed { + // look at robots.txt and discover sitemaps! + if let Some(robots_txt) = get_robots_txt_for(&url_like, &client).await? { + for sitemap in robots_txt.sitemaps { + if SUPPORTED_SCHEMES.contains(&sitemap.url.scheme()) { + txn.enqueue_url( + sitemap.url.as_str(), + None, + RakeIntent::SiteMap, + false, + ) + .await?; + stats.new_sitemaps += 1; + } + } + } } } - - // Seed/weed with empty prefix - dirty |= domain_record - .rakeable_path_prefixes - .insert(String::new(), is_seed) - != Some(is_seed); - - url - } - UrlOrUrlPattern::UrlPrefix(prefix) => { - let prefix_as_url = Url::parse(prefix.as_str())?; - if is_seed { - if txn.enqueue_url(prefix_as_url.as_str(), None, RakeIntent::Any)? { - stats.new_urls += 1; - } else { - stats.already_present_urls += 1; - } - } - - dirty |= domain_record - .rakeable_path_prefixes - .insert(prefix_as_url.path().to_string(), is_seed) - != Some(is_seed); - - prefix_as_url - } - }; - - if dirty { - txn.put_domain_record(domain.borrow(), domain_record)?; - } - - if is_seed { - // look at robots.txt and discover sitemaps! - if let Some(robots_txt) = get_robots_txt_for(&url_like, &client).await? { - for sitemap in robots_txt.sitemaps { - if SUPPORTED_SCHEMES.contains(&sitemap.url.scheme()) { - txn.enqueue_url(sitemap.url.as_str(), None, RakeIntent::SiteMap)?; - stats.new_sitemaps += 1; - } - } - } - } - } - txn.commit()?; - Ok(()) + Ok(stats) + }) + }) + .await } diff --git a/quickpeep_raker/src/config.rs b/quickpeep_raker/src/config.rs index 5b9b332..67a0d5e 100644 --- a/quickpeep_raker/src/config.rs +++ b/quickpeep_raker/src/config.rs @@ -19,8 +19,9 @@ pub struct RakerOnlyConfig { /// Path to data files pub data_dir: PathBuf, - /// Path to the raker's workbench (queue etc) - pub workbench_dir: PathBuf, + /// URI to connect to Postgres. + /// e.g. `postgres://user:password@host:port/dbname` + pub database_uri: String, /// Directory where new rake packs will be emitted pub emit_dir: PathBuf, @@ -36,17 +37,19 @@ pub struct RakerOnlyConfig { pub struct RerakeTimings { /// How long, in days, between re-rakes of the same page? /// Suggested: 300 - pub page: u16, + pub page: i32, /// How long, in days, between re-rakes of feeds? /// Suggested: 10 - pub feed: u16, + pub feed: i32, /// How long, in days, between re-rakes of icons? /// Suggested: 365 - pub icon: u16, + pub icon: i32, } +pub const DAY_SEC: i32 = 86400; + impl RakerConfig { /// Loads a config at the specified path. /// Will resolve all the paths in the RakerConfig for you. @@ -57,7 +60,6 @@ impl RakerConfig { raker_config.raker.data_dir = config_dir.join(raker_config.raker.data_dir); raker_config.seed_dir = config_dir.join(raker_config.seed_dir); - raker_config.raker.workbench_dir = config_dir.join(raker_config.raker.workbench_dir); raker_config.raker.emit_dir = config_dir.join(raker_config.raker.emit_dir); Ok(raker_config) diff --git a/quickpeep_raker/src/raking.rs b/quickpeep_raker/src/raking.rs index 78e3dae..4e96a1a 100644 --- a/quickpeep_raker/src/raking.rs +++ b/quickpeep_raker/src/raking.rs @@ -126,7 +126,8 @@ impl Display for PermanentFailure { impl Error for PermanentFailure {} -#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, sqlx::Type)] +// supposedly we need this, but doesn't seem to work ... #[sqlx(postgres(oid = 16386))] pub enum RakeIntent { Any, Page, @@ -140,11 +141,11 @@ impl FromStr for RakeIntent { fn from_str(s: &str) -> Result { Ok(match s.to_lowercase().as_ref() { - "any" => RakeIntent::Any, - "page" => RakeIntent::Page, - "feed" => RakeIntent::Feed, - "sitemap" => RakeIntent::SiteMap, - "icon" => RakeIntent::Icon, + "Any" => RakeIntent::Any, + "Page" => RakeIntent::Page, + "Feed" => RakeIntent::Feed, + "SiteMap" => RakeIntent::SiteMap, + "Icon" => RakeIntent::Icon, other => { bail!("Unrecognised intent: {:?}", other) } @@ -152,6 +153,18 @@ impl FromStr for RakeIntent { } } +impl Into<&'static str> for RakeIntent { + fn into(self) -> &'static str { + match self { + RakeIntent::Any => "Any", + RakeIntent::Page => "Page", + RakeIntent::Feed => "Feed", + RakeIntent::SiteMap => "SiteMap", + RakeIntent::Icon => "Icon", + } + } +} + impl From for RakeIntent { fn from(kind: ReferenceKind) -> Self { match kind { diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index 8c9dfdc..b36411a 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -1,13 +1,12 @@ -use crate::config::RerakeTimings; +use crate::config::{RerakeTimings, DAY_SEC}; use crate::raking::references::{clean_url, references_from_urlrakes, SUPPORTED_SCHEMES}; use crate::raking::{ get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeIntent, RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, }; -use crate::storage::records::{DomainRecord, UrlVisitedRecord}; +use crate::storage::records::DomainRecord; use crate::storage::RakerStore; use anyhow::{anyhow, Context}; -use chrono::Utc; use cylon::Cylon; use log::{debug, warn}; use lru::LruCache; @@ -15,16 +14,16 @@ use metrics::increment_counter; use quickpeep_structs::rake_entries::{ IconEntry, RakedPageEntry, RakedReference, RakedReferrerEntry, ReferenceKind, }; -use quickpeep_utils::dates::date_to_quickpeep_days; +use quickpeep_utils::dates::QUICKPEEP_EPOCH_ST; use quickpeep_utils::urls::get_reduced_domain; use reqwest::{Client, Url}; use std::borrow::{Borrow, Cow}; use std::collections::HashSet; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex as StdMutex, RwLock}; -use std::time::Duration; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, SystemTime}; use tokio::sync::mpsc::Sender; -use tokio::sync::{Notify, Semaphore}; +use tokio::sync::{Mutex, Notify, Semaphore}; use tokio::time::Instant; /// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the @@ -64,7 +63,7 @@ pub struct TaskContext { pub raker: Arc, /// Busy domains (that are being processed by other tasks) - pub busy_domains: Arc>>, + pub busy_domains: Arc>>, /// Cache of robots.txt entries for recently-made dormant sites pub robotstxt_cache: Arc>>>, @@ -94,24 +93,31 @@ impl TaskContext { let mut current_robot_rules: Option = None; let mut wait_until: Option = None; - let domain_record = { - let txn = self.store.ro_txn()?; - let dr = txn.get_domain_record(&domain)?; - match dr { - None => { - return Ok(()); - } - Some(dr) => dr, + let domain2 = domain.clone(); + let dr = self + .store + .ro_txn(move |mut txn| Box::pin(async move { txn.get_domain_record(&domain2).await })) + .await?; + let domain_record = match dr { + None => { + return Ok(()); } + Some(dr) => dr, }; while !self.graceful_stop.load(Ordering::Relaxed) { // Get a URL to process - let url = { - let txn = self.store.ro_txn()?; - txn.choose_url_for_domain(&domain) - .context("failed to choose URL for domain")? - }; + let domain2 = domain.clone(); + let url = self + .store + .ro_txn(move |mut txn| { + Box::pin(async move { + txn.choose_url_for_domain(&domain2) + .await + .context("failed to choose URL for domain") + }) + }) + .await?; let (url_str, url_record) = if let Some(url) = url { url @@ -121,21 +127,22 @@ impl TaskContext { let domain = domain.to_owned(); let out_of_urls = self .store - .async_rw_txn(move |txn| { - // Double-check we're still out of URLs (another could have been added since - // we last checked!) - let out_of_urls = txn.choose_url_for_domain(&domain)?.is_none(); + .rw_txn(move |mut txn| { + Box::pin(async move { + // Double-check we're still out of URLs (another could have been added since + // we last checked!) + let out_of_urls = txn.choose_url_for_domain(&domain).await?.is_none(); - if !out_of_urls { - return Ok(false); - } + if !out_of_urls { + return Ok(false); + } - // Delete the active domain from the store - txn.remove_active_domain(&domain) - .context("failed to remove active domain")?; - - txn.commit()?; - Ok(true) + // Delete the active domain from the store + txn.remove_active_domain(&domain) + .await + .context("failed to remove active domain")?; + Ok(true) + }) }) .await .context("failed to check if we're out of URLs")?; @@ -154,10 +161,8 @@ impl TaskContext { let domain = domain.clone(); let url = url.clone(); self.store - .async_rw_txn(move |txn| { - txn.dequeue_url(&domain, url.as_str())?; - txn.commit()?; - Ok(()) + .rw_txn(move |mut txn| { + Box::pin(async move { txn.dequeue_url(&domain, url.as_str()).await }) }) .await?; continue; @@ -290,24 +295,26 @@ impl TaskContext { let domain = domain.clone(); let url = url.clone(); - let backoff = delay.as_secs().try_into().unwrap_or(u32::MAX); + let backoff = delay.as_secs().try_into().unwrap_or(i32::MAX); self.store - .async_rw_txn(move |txn| { - txn.start_backing_off( - &domain, - backoff, - url.to_string(), - TemporaryFailure { - reason: TemporaryFailureReason::ExcruciatingCrawlDelay( - delay.as_secs(), - ), - // Don't stack this up with a backoff; it's not an actual failure! - backoff_sec: 0, - }, - )?; - txn.commit()?; - Ok(()) + .rw_txn(move |mut txn| { + Box::pin(async move { + txn.start_backing_off( + &domain, + backoff, + url.to_string(), + TemporaryFailure { + reason: TemporaryFailureReason::ExcruciatingCrawlDelay( + delay.as_secs(), + ), + // Don't stack this up with a backoff; it's not an actual failure! + backoff_sec: 0, + }, + ) + .await?; + Ok(()) + }) }) .await .context("failure whilst turning long crawl delay into backoff")?; @@ -319,7 +326,10 @@ impl TaskContext { /// Processes the outcome of async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result { - let today = date_to_quickpeep_days(&Utc::today())?; + let visited_on_ts = SystemTime::now() + .duration_since(*QUICKPEEP_EPOCH_ST) + .unwrap() + .as_secs() as i32; match outcome { RakeOutcome::RakedPage(page) => { self.submission @@ -335,11 +345,11 @@ impl TaskContext { .context("Reference processor shut down; can't stream references!")?; self.as_event_processor() - .process_page(url.clone(), page.page_entry, today) + .process_page(url.clone(), page.page_entry, visited_on_ts) .await .context("failure processing page for RakedPage")?; self.as_event_processor() - .process_refs(url.clone(), page.referrer_entry, today, false) + .process_refs(url.clone(), page.referrer_entry, visited_on_ts, false) .await .context("failure processing refs for RakedPage")?; @@ -357,7 +367,7 @@ impl TaskContext { .context("Reference processor shut down; can't stream references!")?; self.as_event_processor() - .process_refs(url.clone(), refs, today, true) + .process_refs(url.clone(), refs, visited_on_ts, true) .await .context("failure processing refs for RakedFeed")?; @@ -375,7 +385,7 @@ impl TaskContext { .context("Reference processor shut down; can't stream references!")?; self.as_event_processor() - .process_refs(url.clone(), refs, today, true) + .process_refs(url.clone(), refs, visited_on_ts, true) .await .context("failure processing refs for RakedSitemap")?; @@ -395,7 +405,7 @@ impl TaskContext { .await?; self.as_event_processor() - .process_icon(url.clone(), today) + .process_icon(url.clone(), visited_on_ts) .await .context("failure processing icon for RakedIcon")?; @@ -423,7 +433,7 @@ impl TaskContext { .context("Reference processor shut down; can't stream references!")?; self.as_event_processor() - .process_refs(url.clone(), refs, today, false) + .process_refs(url.clone(), refs, visited_on_ts, false) .await .context("Failure processing refs for Redirect")?; @@ -439,14 +449,15 @@ impl TaskContext { let url = url.clone(); // TODO(feature) add 1.1× the previous backoff, if there was one. - let new_backoff = failure.backoff_sec; + let new_backoff = failure.backoff_sec as i32; let domain = domain.into_owned(); self.store - .async_rw_txn(move |txn| { - txn.start_backing_off(&domain, new_backoff, url.to_string(), failure)?; - txn.commit()?; - Ok(()) + .rw_txn(move |mut txn| { + Box::pin(async move { + txn.start_backing_off(&domain, new_backoff, url.to_string(), failure) + .await + }) }) .await .context("failed to store backoff")?; @@ -461,7 +472,7 @@ impl TaskContext { .await .context("Rejection processor shut down; can't stream rejection!!")?; self.as_event_processor() - .process_rejection(url.clone(), today) + .process_rejection(url.clone(), visited_on_ts) .await .context("failed to process rejection for PermanentFailure")?; @@ -493,58 +504,58 @@ impl EventProcessor<'_> { &self, url: Url, page: RakedPageEntry, - datestamp: u16, + visited_on_ts: i32, ) -> anyhow::Result<()> { - let rerake_on = Some(datestamp + self.rerake_timings.page); + let rerake_on = Some(visited_on_ts + self.rerake_timings.page * DAY_SEC); self.store .as_ref() - .async_rw_txn(move |txn| { - let domain = get_reduced_domain(&url).with_context(|| { - format!("No domain for URL '{url}' for which we are processing the page!") - })?; - txn.mark_url_as_visited( - domain.as_ref(), - url.as_ref(), - UrlVisitedRecord { - last_visited_days: datestamp, - }, - rerake_on, - )?; + .rw_txn(move |mut txn| { + Box::pin(async move { + let domain = get_reduced_domain(&url).with_context(|| { + format!("No domain for URL '{url}' for which we are processing the page!") + })?; + txn.mark_url_as_visited( + domain.as_ref(), + url.as_ref(), + visited_on_ts, + rerake_on, + ) + .await?; - // If there's a favicon to be tried, add it to the list... - let favicon_url_rel = page.document.head.effective_favicon_url(); - if let Ok(favicon_url) = url.join(favicon_url_rel) { - if SUPPORTED_SCHEMES.contains(&favicon_url.scheme()) { - txn.enqueue_url(favicon_url.as_str(), None, RakeIntent::Icon)?; + // If there's a favicon to be tried, add it to the list... + let favicon_url_rel = page.document.head.effective_favicon_url(); + if let Ok(favicon_url) = url.join(favicon_url_rel) { + if SUPPORTED_SCHEMES.contains(&favicon_url.scheme()) { + txn.enqueue_url(favicon_url.as_str(), None, RakeIntent::Icon, false) + .await?; + } } - } - txn.commit()?; - Ok(()) + Ok(()) + }) }) .await } - pub async fn process_icon(&self, url: Url, datestamp: u16) -> anyhow::Result<()> { - let rerake_on = Some(datestamp + self.rerake_timings.icon); + pub async fn process_icon(&self, url: Url, visited_on_ts: i32) -> anyhow::Result<()> { + let rerake_on = Some(visited_on_ts + self.rerake_timings.icon * DAY_SEC); self.store .as_ref() - .async_rw_txn(move |txn| { - let domain = get_reduced_domain(&url).with_context(|| { - format!("No domain for URL '{url}' for which we are processing an icon!") - })?; - txn.mark_url_as_visited( - domain.as_ref(), - url.as_ref(), - UrlVisitedRecord { - last_visited_days: datestamp, - }, - rerake_on, - )?; - - txn.commit()?; - Ok(()) + .rw_txn(move |mut txn| { + Box::pin(async move { + let domain = get_reduced_domain(&url).with_context(|| { + format!("No domain for URL '{url}' for which we are processing an icon!") + })?; + txn.mark_url_as_visited( + domain.as_ref(), + url.as_ref(), + visited_on_ts, + rerake_on, + ) + .await?; + Ok(()) + }) }) .await } @@ -553,93 +564,105 @@ impl EventProcessor<'_> { &self, url: Url, refs: RakedReferrerEntry, - datestamp: u16, + visited_on_ts: i32, rerakeable_feed: bool, ) -> anyhow::Result<()> { let rerake_on = if rerakeable_feed { - Some(self.rerake_timings.feed) + Some(visited_on_ts + self.rerake_timings.feed * DAY_SEC) } else { None }; self.store .as_ref() - .async_rw_txn(move |txn| { - let domain = get_reduced_domain(&url).with_context(|| { - format!("No domain for URL '{url}' for which we are processing refs!") - })?; - txn.mark_url_as_visited( - domain.as_ref(), - url.as_ref(), - UrlVisitedRecord { - last_visited_days: datestamp, - }, - rerake_on, - ) - .context("failed to mark URL as visited")?; - - // track all the referred-to URLs! - for reference in refs.references { - let ref_url = Url::parse(&reference.target).with_context(|| { - format!( - "failed to parse target URL of reference: {:?}", - reference.target - ) - })?; - let domain = get_reduced_domain(&ref_url).with_context(|| { - format!("failed to reduce domain: {:?}", reference.target) + .rw_txn(move |mut txn| { + Box::pin(async move { + let domain = get_reduced_domain(&url).with_context(|| { + format!("No domain for URL '{url}' for which we are processing refs!") })?; + txn.mark_url_as_visited( + domain.as_ref(), + url.as_ref(), + visited_on_ts, + rerake_on, + ) + .await + .context("failed to mark URL as visited")?; - // Check if this URL is an allowed URL (hence should be enqueued) - let allowed = txn - .get_domain_record(domain.borrow())? - .map(|record: DomainRecord| record.is_url_rakeable(&ref_url)) - .flatten(); + // track all the referred-to URLs! + for reference in refs.references { + let ref_url = Url::parse(&reference.target).with_context(|| { + format!( + "failed to parse target URL of reference: {:?}", + reference.target + ) + })?; + let domain = get_reduced_domain(&ref_url).with_context(|| { + format!("failed to reduce domain: {:?}", reference.target) + })?; - match allowed { - Some(true) => { - let is_fresh = txn.enqueue_url( - &reference.target, - reference.last_mod, - reference.kind.into(), - )?; - if is_fresh { - increment_counter!("qprake_queue_new_url"); + // Check if this URL is an allowed URL (hence should be enqueued) + let allowed = txn + .get_domain_record(domain.borrow()) + .await? + .map(|record: DomainRecord| record.is_url_rakeable(&ref_url)) + .flatten(); + + match allowed { + Some(true) => { + let is_fresh = txn + .enqueue_url( + &reference.target, + reference.last_mod.map(|days| { + *QUICKPEEP_EPOCH_ST + + Duration::from_secs(days as u64 * DAY_SEC as u64) + }), + reference.kind.into(), + false, + ) + .await?; + if is_fresh { + increment_counter!("qprake_queue_new_url"); + } + continue; + } + Some(false) => { + // Weed! Do nothing. + } + None => { + // It's neither allowed nor weeded, so put it on hold for later inspection + txn.enqueue_url( + &reference.target, + reference.last_mod.map(|days| { + *QUICKPEEP_EPOCH_ST + + Duration::from_secs(days as u64 * DAY_SEC as u64) + }), + reference.kind.into(), + true, + ) + .await?; } - continue; - } - Some(false) => { - // Weed! Do nothing. - } - None => { - // It's neither allowed nor weeded, so put it on hold for later inspection - txn.put_url_on_hold(&reference.target, reference.kind.into())?; } } - } - - txn.commit()?; - Ok(()) + Ok(()) + }) }) .await } - pub async fn process_rejection(&self, url: Url, datestamp: u16) -> anyhow::Result<()> { + pub async fn process_rejection(&self, url: Url, visited_on_ts: i32) -> anyhow::Result<()> { self.store .as_ref() - .async_rw_txn(move |txn| { - let domain = get_reduced_domain(&url).with_context(|| { - format!("No domain for URL '{url}' for which we are processing a rejection!") - })?; - txn.mark_url_as_visited( - domain.as_ref(), - url.as_ref(), - UrlVisitedRecord { - last_visited_days: datestamp, - }, - None, - )?; - txn.commit()?; - Ok(()) + .rw_txn(move |mut txn| { + Box::pin(async move { + let domain = get_reduced_domain(&url).with_context(|| { + format!( + "No domain for URL '{url}' for which we are processing a rejection!" + ) + })?; + txn.mark_url_as_visited(domain.as_ref(), url.as_ref(), visited_on_ts, None) + .await?; + Ok(()) + }) }) .await } diff --git a/quickpeep_raker/src/storage.rs b/quickpeep_raker/src/storage.rs index f9f6fdd..f450771 100644 --- a/quickpeep_raker/src/storage.rs +++ b/quickpeep_raker/src/storage.rs @@ -1,230 +1,131 @@ use crate::raking::{RakeIntent, TemporaryFailure}; -use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString, MdbxU16BE, MdbxU32, MdbxU64}; -use crate::storage::migrations::{MIGRATION_KEY, MIGRATION_VERSION}; -use crate::storage::records::{ - ActiveDomainRecord, BackingOffDomainRecord, DomainRecord, OnHoldUrlRecord, QueueUrlRecord, - UrlVisitedRecord, -}; -use anyhow::{anyhow, bail, ensure, Context}; -use libmdbx::{ - Database, DatabaseFlags, Environment, EnvironmentFlags, Geometry, Transaction, TransactionKind, - WriteFlags, WriteMap, RO, RW, -}; -use log::info; +use crate::storage::records::{DomainRecord, QueueUrlRecord}; +use anyhow::{bail, ensure, Context}; +use futures_core::future::BoxFuture; +use log::warn; use metrics::{describe_gauge, gauge, Unit}; -use ouroboros::self_referencing; +use quickpeep_utils::dates::QUICKPEEP_EPOCH_ST; use quickpeep_utils::urls::get_reduced_domain; +use rand::random; use reqwest::Url; -use std::borrow::{Borrow, Cow}; -use std::collections::{BTreeSet, HashSet}; -use std::ops::Add; -use std::path::Path; +use sqlx::postgres::PgPoolOptions; +use sqlx::types::Json; +use sqlx::{query, query_as, Executor, PgPool, Postgres, Transaction}; +use std::collections::HashSet; use std::sync::atomic::AtomicU64; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::Mutex; pub mod maintenance; -pub mod mdbx_helper_types; -mod migrations; pub mod records; -/// The databases available in an environment. -pub struct Databases<'env> { - /// Domain \n URL → QueueUrlRecord - pub queue_urls: Database<'env>, - /// u16 → URL. The u16 is the day-precision QuickPeep timestamp at which the URL should (MULTI-VALUE; INT16) - /// be enqueued again for reraking. - pub rerake_queue: Database<'env>, - /// Domain → ActiveDomainRecord - pub active_domains: Database<'env>, - /// u32 → domain name. Used to try and give some fairness. - pub active_domain_raffle: Database<'env>, - /// timestamp → BackingOffReinstatementRecord (MULTI-VALUE; INT) - pub backing_off_reinstatements: Database<'env>, - /// Domain → BackingOffDomainRecord - pub backing_off_domains: Database<'env>, - /// URL → VisitedDomainRecord - pub visited_urls: Database<'env>, - /// Domain → DomainRecord - pub domains: Database<'env>, - /// Domain \n URL → OnHoldUrlRecord Number of refs (INT VALUE) - pub urls_on_hold: Database<'env>, -} - -impl<'env> Databases<'env> { - pub fn iter_all_databases(&self) -> impl Iterator)> { - [ - ("queue_urls", &self.queue_urls), - ("rerake_queue", &self.rerake_queue), - ("active_domains", &self.active_domains), - ("active_domain_raffle", &self.active_domain_raffle), - ( - "backing_off_reinstatements", - &self.backing_off_reinstatements, - ), - ("backing_off_domains", &self.backing_off_domains), - ("visited_urls", &self.visited_urls), - ("domains", &self.domains), - ("urls_on_hold", &self.urls_on_hold), - ] - .into_iter() - } -} - -// Must match the order of the Databases struct fields. -pub const DATABASES: [(&'static str, DatabaseFlags); 9] = [ - ("urls_queue", DatabaseFlags::empty()), - ("rerake_queue", DatabaseFlags::DUP_SORT), - ("active_domains", DatabaseFlags::empty()), - ("active_domain_raffle", DatabaseFlags::INTEGER_KEY), - ( - "backing_off_reinstatements", - DatabaseFlags::INTEGER_KEY.union(DatabaseFlags::DUP_SORT), - ), - ("backing_off_domains", DatabaseFlags::empty()), - ("urls_visited", DatabaseFlags::empty()), - ("domains", DatabaseFlags::empty()), - ("urls_on_hold", DatabaseFlags::empty()), -]; - -#[self_referencing] -pub struct RakerDb { - pub env: Environment, - #[borrows(env)] - #[covariant] - pub dbs: Databases<'this>, -} - /// Handle to the store. /// Doesn't need wrapping in Arc because it already is. #[derive(Clone)] pub struct RakerStore { - pub mdbx: Arc, + pub pool: Arc, pub metrics: Option>, } impl RakerStore { - pub fn open(path: &Path) -> anyhow::Result { - let mut flags = EnvironmentFlags::default(); - flags.no_sub_dir = true; + pub async fn open(db_uri: &str) -> anyhow::Result { + let pool = PgPoolOptions::new() + .after_connect(|conn, _meta| { + Box::pin(async move { + conn.execute("SET default_transaction_isolation TO 'serializable'") + .await?; + Ok(()) + }) + }) + .connect(db_uri) + .await + .context("Failed to connect to Postgres pool")?; - let mut geom = Geometry::default(); - // Don't stop the database growing until it hits 64 GiB. - // (The default is 1 MiB which is just not enough!) - geom.size = Some(1024 * 1024..64 * 1024 * 1024 * 1024); - - // Grow 16 MiB at a time. - geom.growth_step = Some(16 * 1024 * 1024); - // Shrink 64 MiB at a time. - geom.shrink_threshold = Some(64 * 1024 * 1024); - // (Yes these numbers represent a large database). - - let env = Environment::new() - .set_geometry(geom) - .set_max_dbs(256) - .set_flags(flags) - .open(path)?; - - let mdbx: RakerDb = RakerDbBuilder { - env, - dbs_builder: |env: &Environment| { - let txn = env - .begin_rw_txn() - .expect("Should be able to start a transaction"); - - let root_meta_db = txn - .create_db(None, DatabaseFlags::empty()) - .expect("txn.create_db failed"); - match txn - .get::(&root_meta_db, MIGRATION_KEY) - .expect("txn.get failed") - { - None => { - info!("Loading database with no migration version. Assuming it's fresh!"); - txn.put( - &root_meta_db, - MIGRATION_KEY, - MIGRATION_VERSION.as_bytes(), - WriteFlags::empty(), - ) - .expect("txn.put failed"); - } - Some(version) => { - info!("Loading database with migration version {:?}", version.0); - if MIGRATION_VERSION != version.0.as_ref() { - panic!( - "Migration version not supported: {:?} (I support {:?})", - version.0.as_ref(), - MIGRATION_VERSION - ); - } - } - } - - for (db_name, db_flags) in DATABASES { - let db = txn - .create_db(Some(db_name), db_flags) - .expect("Failed to open database"); - - txn.prime_for_permaopen(db); - } - - let (_bool, dbs) = txn - .commit_and_rebind_open_dbs() - .expect("Failed to commit & rebind"); - let mut dbs = dbs.into_iter(); - - // Must match the order of the DATABASES constant and the struct field definitions - Databases { - queue_urls: dbs.next().unwrap(), - rerake_queue: dbs.next().unwrap(), - active_domains: dbs.next().unwrap(), - active_domain_raffle: dbs.next().unwrap(), - backing_off_reinstatements: dbs.next().unwrap(), - backing_off_domains: dbs.next().unwrap(), - visited_urls: dbs.next().unwrap(), - domains: dbs.next().unwrap(), - urls_on_hold: dbs.next().unwrap(), - } - }, - } - .build(); + sqlx::migrate!() + .run(&pool) + .await + .context("Failed to apply database migrations")?; Ok(RakerStore { - mdbx: Arc::new(mdbx), + pool: Arc::new(pool), metrics: Some(Arc::new(Default::default())), }) } - pub async fn async_rw_txn(&self, f: F) -> anyhow::Result + pub async fn rw_txn<'a, F, R>(&'a self, f: F) -> anyhow::Result where - F: FnOnce(RakerTxn<'_, RW>) -> anyhow::Result + Send + 'static, + F: for<'c> FnOnce(RakerTxn<'c, '_, true>) -> BoxFuture<'c, anyhow::Result> + + Send + + Sync + + 'a, R: Send + 'static, { - // TODO(robustness) consider adding a lock here to prevent all the async executors getting stuck here... - let this = self.clone(); - Ok(tokio::task::spawn_blocking(move || -> anyhow::Result { - let txn = this.rw_txn()?; - let r = f(txn)?; - Ok(r) - }) - .await??) + self.txn(f).await } - pub fn rw_txn(&self) -> anyhow::Result> { - let mdbx_txn = self.mdbx.borrow_env().begin_rw_txn()?; - Ok(RakerTxn { - mdbx_txn, - mdbx: self.mdbx.clone(), - }) + pub async fn ro_txn<'a, F, R>(&'a self, f: F) -> anyhow::Result + where + F: for<'c> FnOnce(RakerTxn<'c, '_, false>) -> BoxFuture<'c, anyhow::Result> + + Send + + Sync + + 'a, + R: Send + 'static, + { + self.txn(f).await } - pub fn ro_txn(&self) -> anyhow::Result> { - let mdbx_txn = self.mdbx.borrow_env().begin_ro_txn()?; - Ok(RakerTxn { - mdbx_txn, - mdbx: self.mdbx.clone(), - }) + async fn txn<'a, const RW: bool, F, R>(&'a self, f: F) -> anyhow::Result + where + F: for<'c> FnOnce(RakerTxn<'c, '_, RW>) -> BoxFuture<'c, anyhow::Result> + + Send + + Sync + + 'a, + R: Send + 'static, + { + fn should_retry_db_err(sqlx_err: &sqlx::Error, retry_num: i32) -> bool { + if let sqlx::Error::Database(db_err) = sqlx_err { + if db_err.code().map(|code| code == "40001").unwrap_or(false) { + // This is a serialisation fault, retry the transaction. + warn!("serialisation failure: {db_err:?}; retrying try №{retry_num}"); + return true; + } + } + false + } + + fn should_retry_err(err: &anyhow::Error, retry_num: i32) -> bool { + if let Some(sqlx_err) = err.downcast_ref::() { + should_retry_db_err(sqlx_err, retry_num) + } else { + false + } + } + + let mut retry_num = 0; + //TODO loop { + retry_num += 1; + let mut txn = self.pool.begin().await.context("Failed to get txn")?; + + match f(RakerTxn { txn: &mut txn }).await { + Ok(r) => { + if let Err(err) = txn.commit().await { + if should_retry_db_err(&err, retry_num) { + tokio::time::sleep(Duration::from_millis(50)).await; + bail!("continue;"); // TODO + } + } + return Ok(r); + } + Err(err) => { + if should_retry_err(&err, retry_num) { + tokio::time::sleep(Duration::from_millis(50)).await; + bail!("continue;"); // TODO + } + return Err(err); + } + } + //} } } @@ -237,14 +138,51 @@ pub struct RakerStoreMetrics { pub queued_url_count: AtomicU64, } -pub struct RakerTxn<'a, K: TransactionKind> { - pub mdbx_txn: Transaction<'a, K, WriteMap>, - pub mdbx: Arc, +pub struct RakerTxn<'a, 't, const RW: bool> { + pub txn: &'a mut Transaction<'t, Postgres>, } -impl<'a> RakerTxn<'a, RW> { - pub fn commit(self) -> anyhow::Result<()> { - self.mdbx_txn.commit()?; +impl<'a, 't> RakerTxn<'a, 't, true> { + pub async fn domain_to_id(&mut self, domain: &str) -> anyhow::Result { + let new_id_row = query!( + r#" + INSERT INTO domains + (name, domain_record) + VALUES ($1, '{"rakeable_path_prefixes": {}}') + ON CONFLICT DO NOTHING + RETURNING domain_id + "#, + &domain + ) + .fetch_optional(&mut *self.txn) + .await?; + + if let Some(new_row) = new_id_row { + return Ok(new_row.domain_id); + } + + let row = query!( + r#" + SELECT domain_id FROM domains WHERE name = $1 + "#, + &domain + ) + .fetch_one(&mut *self.txn) + .await?; + Ok(row.domain_id) + } + + /// Inserts a domain into the active domain table, + /// generating a raffle ticket (and inserting it too). + /// + /// No-op if the domain is already an active domain or being backed off from. + pub async fn insert_active_domain_with_new_raffle_ticket( + &mut self, + new_domain: String, + ) -> anyhow::Result<()> { + let domain_id = self.domain_to_id(&new_domain).await?; + self.insert_active_domain_id_with_new_raffle_ticket(domain_id) + .await?; Ok(()) } @@ -252,58 +190,54 @@ impl<'a> RakerTxn<'a, RW> { /// generating a raffle ticket (and inserting it too). /// /// No-op if the domain is already an active domain or being backed off from. - pub fn insert_active_domain_with_new_raffle_ticket( - &self, - new_domain: String, + pub async fn insert_active_domain_id_with_new_raffle_ticket( + &mut self, + domain_id: i32, ) -> anyhow::Result<()> { - let active_domains = &self.mdbx.borrow_dbs().active_domains; - let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; - let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains; - - let new_domain = MdbxString(Cow::Owned(new_domain)); - - if self - .mdbx_txn - .get::<()>(active_domains, new_domain.as_bytes())? - .is_some() + if query!( + "SELECT 1 AS _meh FROM domain_backoffs WHERE domain_id = $1", + domain_id + ) + .fetch_optional(&mut *self.txn) + .await? + .is_some() { + // nop: backoff active return Ok(()); } - if self - .mdbx_txn - .get::<()>(backing_off_domains, new_domain.as_bytes())? - .is_some() - { - return Ok(()); - } + loop { + let random_raffle_ticket: i32 = random(); + let rows_affected = query!( + r#" + INSERT INTO active_domain_raffle (domain_id, raffle) + VALUES ($1, $2) + ON CONFLICT DO NOTHING + "#, + domain_id, + random_raffle_ticket + ) + .execute(&mut *self.txn) + .await? + .rows_affected(); - let reserved_raffle_ticket = loop { - let next_raffle_ticket = MdbxU32(rand::random()); - if self - .mdbx_txn - .get::<()>(active_domain_raffle, &next_raffle_ticket.as_bytes())? - .is_none() - { - break next_raffle_ticket; + if rows_affected == 0 { + let r = query!( + r#" + SELECT raffle FROM active_domain_raffle WHERE domain_id = $1 + "#, + domain_id, + ) + .fetch_optional(&mut *self.txn) + .await?; + if r.is_some() { + break; + } + } else { + break; } - }; + } - self.mdbx_txn.put( - active_domain_raffle, - &reserved_raffle_ticket.as_bytes(), - new_domain.as_bytes(), - WriteFlags::empty(), - )?; - self.mdbx_txn.put( - active_domains, - new_domain.as_bytes(), - MdbxBare(ActiveDomainRecord { - raffle_ticket: reserved_raffle_ticket.0, - }) - .as_bytes(), - WriteFlags::empty(), - )?; Ok(()) } @@ -311,315 +245,282 @@ impl<'a> RakerTxn<'a, RW> { /// table. /// /// Returns true if a deletion took place, and false if it did not. - pub fn remove_active_domain(&self, domain: &str) -> anyhow::Result { - let active_domains = &self.mdbx.borrow_dbs().active_domains; - let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; + pub async fn remove_active_domain(&mut self, domain: &str) -> anyhow::Result { + let domain_id = self.domain_to_id(domain).await?; + let rowcount = query!( + r#" + DELETE FROM active_domain_raffle + WHERE domain_id = $1 + "#, + domain_id + ) + .execute(&mut *self.txn) + .await? + .rows_affected(); - let domain = MdbxString(Cow::Borrowed(domain)); - - if let Some(MdbxBare(active_domain)) = self - .mdbx_txn - .get::>(active_domains, domain.as_bytes())? - { - ensure!(self.mdbx_txn.del(active_domains, domain.as_bytes(), None)?); - let raffle_ticket = MdbxU32(active_domain.raffle_ticket); - ensure!(self - .mdbx_txn - .del(active_domain_raffle, raffle_ticket.as_bytes(), None)?); - Ok(true) - } else { - Ok(false) - } + Ok(rowcount > 0) } /// Marks a URL as visited and takes it out of the queue. - pub fn mark_url_as_visited( - &self, + pub async fn mark_url_as_visited( + &mut self, domain: &str, url_str: &str, - record: UrlVisitedRecord, - rerake_on: Option, + visited_on_ts: i32, + rerake_at_ts: Option, ) -> anyhow::Result<()> { - let queue_urls = &self.mdbx.borrow_dbs().queue_urls; - let visited_urls = &self.mdbx.borrow_dbs().visited_urls; - let rerake_queue = &self.mdbx.borrow_dbs().rerake_queue; + // url_id SERIAL PRIMARY KEY NOT NULL, + // domain_id INTEGER NOT NULL REFERENCES domains(domain_id), + // scheme urlscheme NOT NULL, + // path TEXT NOT NULL, + // intent rakeintent NOT NULL, + // visited_at_ts INTEGER, + let domain_id = self.domain_to_id(domain).await?; + // TODO are domains always the same as on + query!( + r#" + INSERT INTO urls (domain_id, url, intent, visited_at_ts) + VALUES ($1, $2, $3::text::rakeintent, $4) + ON CONFLICT (domain_id, url) + DO UPDATE SET + visited_at_ts = EXCLUDED.visited_at_ts + "#, + domain_id, + url_str, + "Any", + visited_on_ts + ) + .execute(&mut *self.txn) + .await?; - let queue_key = format!("{}\n{}", domain, url_str); - - // We legitimately want this to NOP when already dequeued; so don't ensure the opposite. - // ensure!( - // self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?, - // "No queued URL to delete ({})", queue_key - // ); - - self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?; - - self.mdbx_txn.put( - visited_urls, - url_str.as_bytes(), - &MdbxBare(record).as_bytes(), - WriteFlags::empty(), - )?; - - if let Some(rerake_on) = rerake_on { - self.mdbx_txn.put( - rerake_queue, - &rerake_on.to_be_bytes(), - url_str.as_bytes(), - WriteFlags::empty(), - )?; + if let Some(rerake_at) = rerake_at_ts { + query!( + r#" + INSERT INTO url_queue (url_id, rake_after_ts) + SELECT url_id, $3 FROM urls WHERE domain_id = $1 AND url = $2 + "#, + domain_id, + url_str, + rerake_at + ) + .execute(&mut *self.txn) + .await?; + } else { + self.dequeue_url(domain, url_str).await?; } Ok(()) } - /// Marks a URL as visited and takes it out of the queue. - pub fn dequeue_url(&self, domain: &str, url_str: &str) -> anyhow::Result<()> { - let queue_urls = &self.mdbx.borrow_dbs().queue_urls; - let queue_key = format!("{}\n{}", domain, url_str); - self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?; + /// Takes a URL out of the queue. + pub async fn dequeue_url(&mut self, domain: &str, url_str: &str) -> anyhow::Result<()> { + let domain_id = self.domain_to_id(domain).await?; + query!( + r#" + DELETE FROM url_queue + WHERE url_id IN (SELECT url_id FROM urls WHERE domain_id = $1 AND url = $2) + "#, + domain_id, + url_str + ) + .execute(&mut *self.txn) + .await?; Ok(()) } - pub fn start_backing_off( - &self, + pub async fn start_backing_off( + &mut self, domain: &str, - backoff_for: u32, + backoff_for: i32, failed_url: String, failure: TemporaryFailure, ) -> anyhow::Result<()> { + let domain_id = self.domain_to_id(domain).await?; + + let reinstate_at = SystemTime::now() + .duration_since(*QUICKPEEP_EPOCH_ST)? + .as_secs() as i32 + + backoff_for; + + let reason = format!("{failed_url} {failure:?}"); + query!( + r#" + INSERT INTO domain_backoffs (domain_id, backoff_until_ts, backoff_sec, reason) + VALUES ($1, $2, $3, $4) + ON CONFLICT (domain_id) DO UPDATE SET + backoff_until_ts = EXCLUDED.backoff_until_ts, + backoff_sec = EXCLUDED.backoff_sec, + reason = EXCLUDED.reason + "#, + domain_id, + reinstate_at, + backoff_for, + reason + ) + .execute(&mut *self.txn) + .await?; + ensure!( - self.remove_active_domain(domain)?, + self.remove_active_domain(domain).await?, "Can't back off from domain that's not active" ); - let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains; - let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements; - - let reinstate_at = - SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() + backoff_for as u64; - - let backoff_record = BackingOffDomainRecord { - failed_url, - failure, - backoff: backoff_for, - reinstate_at, - }; - - self.mdbx_txn.put( - backing_off_domains, - domain.as_bytes(), - &MdbxBare(backoff_record).as_bytes(), - WriteFlags::empty(), - )?; - self.mdbx_txn.put( - backing_off_reinstatements, - MdbxU64(reinstate_at).as_bytes(), - domain.as_bytes(), - WriteFlags::empty(), - )?; Ok(()) } /// Reinstates backing-off domains up to the specified time. /// Returns the time of the next reinstatement, if there is one. - pub fn reinstate_backoffs(&self, up_to_ts: SystemTime) -> anyhow::Result> { - let backing_off_domains = &self.mdbx.borrow_dbs().backing_off_domains; - let backing_off_reinstatements = &self.mdbx.borrow_dbs().backing_off_reinstatements; + pub async fn reinstate_backoffs( + &mut self, + up_to_ts: SystemTime, + ) -> anyhow::Result> { + let reinstate_up_to = up_to_ts.duration_since(*QUICKPEEP_EPOCH_ST)?.as_secs() as i32; - let reinstate_up_to = up_to_ts.duration_since(UNIX_EPOCH)?.as_secs(); + let domain_id_rows = query!( + r#" + SELECT domain_id FROM domain_backoffs WHERE backoff_until_ts <= $1 + "#, + reinstate_up_to + ) + .fetch_all(&mut *self.txn) + .await?; - let mut cur = self.mdbx_txn.cursor(backing_off_reinstatements)?; - cur.first::()?; - loop { - let (MdbxU64(reinstatement_time), domain_to_reinstate) = - match cur.get_current::()? { - Some(x) => x, - None => break, - }; + query!( + r#" + DELETE FROM domain_backoffs WHERE backoff_until_ts <= $1 + "#, + reinstate_up_to + ) + .execute(&mut *self.txn) + .await?; - if reinstatement_time > reinstate_up_to { - return Ok(Some( - UNIX_EPOCH.add(Duration::from_secs(reinstatement_time)), - )); - } - - let dom_str = domain_to_reinstate.into_string(); - self.mdbx_txn - .del(backing_off_domains, dom_str.clone(), None)?; - self.insert_active_domain_with_new_raffle_ticket(dom_str)?; - cur.del(WriteFlags::empty())?; + for domain_id_row in domain_id_rows { + self.insert_active_domain_id_with_new_raffle_ticket(domain_id_row.domain_id) + .await?; } - Ok(None) - } - - /// Reinstates URLs that are now re-rakable. - pub fn reinstate_rerakables(&self, today: u16) -> anyhow::Result<()> { - let queue_urls = &self.mdbx.borrow_dbs().queue_urls; - let rerake_queue = &self.mdbx.borrow_dbs().rerake_queue; - - let mut reinstatable_domains: BTreeSet = BTreeSet::new(); - - let mut cur = self.mdbx_txn.cursor(rerake_queue)?; - cur.first::()?; - loop { - let (MdbxU16BE(rerake_datestamp), url_to_rerake) = - match cur.get_current::()? { - Some(x) => x, - None => break, - }; - - if rerake_datestamp > today { - break; - } - - let url_str = url_to_rerake.into_string(); - let url = Url::parse(&url_str).context("Failed to parse rerakable URL")?; - let url_domain = - get_reduced_domain(&url).context("Unable to reduce domain for rerakable URL")?; - - self.mdbx_txn.put( - queue_urls, - format!("{}\n{}", url_domain, url_str).as_bytes(), - // TODO(correctness): should specify the same intent as before. - &MdbxBare(QueueUrlRecord { - intent: RakeIntent::Any, - }) - .as_bytes(), - WriteFlags::NO_OVERWRITE, - )?; - - reinstatable_domains.insert(url_domain.into_owned()); - - cur.del(WriteFlags::empty())?; - } - - for domain in reinstatable_domains { - self.insert_active_domain_with_new_raffle_ticket(domain)?; - } - - Ok(()) + Ok(query!( + r#" + SELECT backoff_until_ts FROM domain_backoffs ORDER BY backoff_until_ts LIMIT 1 + "# + ) + .fetch_optional(&mut *self.txn) + .await? + .map(|row| *QUICKPEEP_EPOCH_ST + Duration::from_secs(row.backoff_until_ts as u64))) } /// Enqueues a URL. - /// If `only_if_not_visited_since` is specified, then this is a no-op if the page has already been + /// If `last_modified` is specified, then this is a no-op if the page has already been /// visited since then. - /// If `only_if_not_visited_since` is not specified, then this is a no-op if the page has already + /// If `last_modified` is not specified, then this is a no-op if the page has already /// been visited. /// /// Returns: true if it was enqueued, false if nothing changed. - pub fn enqueue_url( - &self, + pub async fn enqueue_url( + &mut self, url_str: &str, - last_modified: Option, + last_modified: Option, intent: RakeIntent, + on_hold: bool, ) -> anyhow::Result { - let queue_urls = &self.mdbx.borrow_dbs().queue_urls; - let visited_urls = &self.mdbx.borrow_dbs().visited_urls; - let url = Url::parse(url_str)?; let url_domain = get_reduced_domain(&url) .with_context(|| format!("No domain for to-be-enqueued URL: '{url}'!"))?; + let domain_id = self.domain_to_id(&url_domain).await?; - let queue_key = format!("{}\n{}", url_domain, url); + // Don't enqueue if it hasn't been modified since the last visit + if let Some(last_modified) = last_modified { + let last_visited = query!( + r#" + SELECT visited_at_ts FROM urls + WHERE domain_id = $1 AND url = $2 + "#, + domain_id, + url_str + ) + .fetch_optional(&mut *self.txn) + .await?; - if self - .mdbx_txn - .get::<()>(queue_urls, queue_key.as_bytes())? - .is_some() - { - // Already in the queue. Nothing to do here. - return Ok(false); - } + let last_visited = last_visited + .map(|lv| lv.visited_at_ts) + .flatten() + .map(|lv| *QUICKPEEP_EPOCH_ST + Duration::from_secs(lv as u64)); - if let Some(MdbxBare(visited_entry)) = self - .mdbx_txn - .get::>(visited_urls, url_str.as_bytes())? - { - match last_modified { - None => { - // Already visited. Nothing to do here. + if let Some(last_visited) = last_visited { + if last_visited >= last_modified { return Ok(false); } - Some(last_modified) => { - if last_modified <= visited_entry.last_visited_days { - // Hasn't been modified since our last visit - return Ok(false); - } - } } } - // Add the entry to the queue - self.mdbx_txn.put( - queue_urls, - queue_key.as_bytes(), - &MdbxBare(QueueUrlRecord { intent }).as_bytes(), - WriteFlags::empty(), - )?; + let intent_str: &'static str = intent.into(); + let rowcount = query!( + r#" + INSERT INTO urls (domain_id, url, intent, visited_at_ts) + VALUES ($1, $2, $3::text::rakeintent, NULL) + ON CONFLICT DO NOTHING + "#, + domain_id, + url_str, + intent_str + ) + .execute(&mut *self.txn) + .await? + .rows_affected(); - // Activate the domain if needed... - self.insert_active_domain_with_new_raffle_ticket(url_domain.into_owned())?; + if !(last_modified.is_some() || (rowcount > 0)) { + return Ok(false); + } + + if !on_hold { + let url_id = query!( + r#" + SELECT url_id FROM urls WHERE domain_id = $1 AND url = $2 + "#, + domain_id, + url_str + ) + .fetch_one(&mut *self.txn) + .await? + .url_id; + + query!( + r#" + INSERT INTO url_queue (url_id, rake_after_ts) + VALUES ($1, 0) + ON CONFLICT DO NOTHING + "#, + url_id + ) + .execute(&mut *self.txn) + .await?; + + // Activate the domain if needed... + self.insert_active_domain_id_with_new_raffle_ticket(domain_id) + .await?; + } Ok(true) } - /// Enqueues a URL to the 'on hold' queue. - /// - /// Returns: true if it was enqueued, false if nothing changed. - pub fn put_url_on_hold(&self, url_str: &str, intent: RakeIntent) -> anyhow::Result { - let urls_on_hold = &self.mdbx.borrow_dbs().urls_on_hold; - - let url = Url::parse(url_str)?; - let url_domain = get_reduced_domain(&url) - .with_context(|| format!("No domain for to-be-put-on-hold URL: '{url}'!"))?; - - let queue_key = format!("{}\n{}", url_domain, url); - - let (record, is_new) = if let Some(mut record) = self - .mdbx_txn - .get::>(urls_on_hold, queue_key.as_bytes())? - { - // Already in the queue. Nothing to do here, except bump up the refs count. - record.0.refs = record.0.refs.saturating_add(1); - - (record, false) - } else { - ( - MdbxBare(OnHoldUrlRecord { - refs: 1, - queue_record: QueueUrlRecord { intent }, - }), - true, - ) - }; - - // Add the entry to the queue - self.mdbx_txn.put( - urls_on_hold, - queue_key.as_bytes(), - &record.as_bytes(), - WriteFlags::empty(), - )?; - - Ok(is_new) - } - - pub fn put_domain_record( - &self, + pub async fn put_domain_record( + &mut self, domain: &str, domain_record: DomainRecord, ) -> anyhow::Result<()> { - let domains = &self.mdbx.borrow_dbs().domains; - - self.mdbx_txn.put( - domains, - domain.as_bytes(), - MdbxBare(domain_record).as_bytes(), - WriteFlags::empty(), - )?; + let drjson = + serde_json::to_string(&domain_record).context("failed to serialise DomainRecord")?; + query!( + r#" + INSERT INTO domains (name, domain_record) VALUES ($1, $2::text::jsonb) + ON CONFLICT (name) DO UPDATE SET domain_record = EXCLUDED.domain_record + "#, + domain, + drjson + ) + .execute(&mut *self.txn) + .await?; Ok(()) } } @@ -644,151 +545,171 @@ pub fn register_datastore_metrics() -> anyhow::Result<()> { #[derive(Clone, Debug, Eq, PartialEq)] pub enum RandomActiveDomainAcquisition { - GotOne { - domain: String, - record: ActiveDomainRecord, - }, + GotOne { domain: String }, AllBusy, NoneLeft, } /// Read-only implementations (but can also be used on RW transactions) -impl<'a, K: TransactionKind> RakerTxn<'a, K> { +impl<'a, 't, const RW: bool> RakerTxn<'a, 't, RW> { /// Chooses a domain that is not busy, then marks it as busy. - pub fn acquire_random_active_domain( - &self, + pub async fn acquire_random_active_domain( + &mut self, busy_domains: Arc>>, ) -> anyhow::Result { - let active_domains = &self.mdbx.borrow_dbs().active_domains; - let active_domain_raffle = &self.mdbx.borrow_dbs().active_domain_raffle; - let mut busy_domains = busy_domains - .lock() - .map_err(|_| anyhow!("busy domain set poisoned"))?; + let mut busy_domains = busy_domains.lock().await; - let mut cur = self.mdbx_txn.cursor(&active_domain_raffle)?; + let rand_key: i32 = random(); - let rand_key: u32 = rand::random(); + let next_domain_row_opt = query!( + r#" + SELECT name + FROM active_domain_raffle + JOIN domains USING (domain_id) + WHERE raffle >= $1 + ORDER BY raffle + LIMIT 1 + "#, + rand_key + ) + .fetch_optional(&mut *self.txn) + .await?; - let (raffle_ticket, domain) = match cur - .iter_from::(&MdbxU32(rand_key).as_bytes()) - .next() - { - Some(entry) => { - let (k, v) = entry?; - (k, v) + if let Some(next_domain_row) = next_domain_row_opt { + if !busy_domains.contains(&next_domain_row.name) { + ensure!( + busy_domains.insert(next_domain_row.name.clone()), + "Domain already present even though it was checked to be free" + ); + return Ok(RandomActiveDomainAcquisition::GotOne { + domain: next_domain_row.name, + }); } - None => { - // Wrap around to the start for fairness - if let Some((k, v)) = cur.first::()? { - (k, v) - } else { - // No entries left! - return Ok(RandomActiveDomainAcquisition::NoneLeft); - } - } - }; - - let (raffle_ticket, domain) = if busy_domains.contains::(domain.0.borrow()) { - // This domain is already busy. - // As a fallback, sequential-scan the raffle ticket table and look for something new. - let mut found = None; - for entry in cur.iter_start::() { - let (k, v) = entry?; - if !busy_domains.contains::(v.0.borrow()) { - found = Some((k, v)); - break; - } - } - match found { - None => { - // ALL the rows are busy! - return Ok(RandomActiveDomainAcquisition::AllBusy); - } - Some(entry) => entry, - } - } else { - (raffle_ticket, domain) - }; - - ensure!( - busy_domains.insert(domain.clone().into_string()), - "Domain already present even though it was checked to be free" - ); - - let record = if let Some(record) = self - .mdbx_txn - .get::>(active_domains, domain.as_bytes())? - { - record.0 - } else { - bail!("Inconsistent database: raffle ticket received for domain that isn't present."); - }; - - if record.raffle_ticket != raffle_ticket.0 { - bail!("Inconsistent database: picked raffle ticket {:?} but domain {:?} thinks it had {:?}", raffle_ticket, domain.0, record.raffle_ticket); } - Ok(RandomActiveDomainAcquisition::GotOne { - domain: domain.into_string(), - record, - }) + // If we hit this, then our raffle ticket didn't bring anything non-busy up. + // Fall back to a simple mechanism. + let row_limit = (busy_domains.len() + 1) as i64; + let next_domain_rows = query!( + r#" + SELECT name + FROM active_domain_raffle + JOIN domains USING (domain_id) + WHERE raffle >= $1 + ORDER BY raffle + LIMIT $2 + "#, + rand_key, + row_limit + ) + .fetch_all(&mut *self.txn) + .await?; + + if next_domain_rows.is_empty() { + return Ok(RandomActiveDomainAcquisition::NoneLeft); + } + + for next_domain_row in next_domain_rows { + if !busy_domains.contains(&next_domain_row.name) { + ensure!( + busy_domains.insert(next_domain_row.name.clone()), + "Domain already present even though it was checked to be free" + ); + return Ok(RandomActiveDomainAcquisition::GotOne { + domain: next_domain_row.name, + }); + } + } + + Ok(RandomActiveDomainAcquisition::AllBusy) } - pub fn choose_url_for_domain( - &self, + pub async fn choose_url_for_domain( + &mut self, domain: &str, ) -> anyhow::Result> { - let queue: &Database = &self.mdbx.borrow_dbs().queue_urls; + // TODO(perf): consider denormalising domain_id into url_queue + let now_ts = SystemTime::now() + .duration_since(*QUICKPEEP_EPOCH_ST) + .unwrap() + .as_secs() as i32; + let row_opt = query!( + r#" + SELECT url, intent AS "intent: RakeIntent" FROM domains d + JOIN urls u USING (domain_id) + JOIN url_queue USING (url_id) + WHERE d.name = $1 AND rake_after_ts < $2 + ORDER BY rake_after_ts + LIMIT 1 + "#, + domain, + now_ts + ) + .fetch_optional(&mut *self.txn) + .await?; - let mut cur = self.mdbx_txn.cursor(queue)?; - match cur - .iter_from::>( - MdbxString(Cow::Owned(format!("{}\n", domain))).as_bytes(), - ) - .next() - { - Some(entry) => { - let (k, MdbxBare(record)) = entry?; - let domain_followed_by_url = k.0.as_ref(); - let mut split = domain_followed_by_url.split("\n"); - let actual_domain = split.next().context("No domain")?; - let url = split.next().context("No URL")?; - - if domain != actual_domain { - // This means we've ran out of URLs for the domain in question. - return Ok(None); - } - - ensure!(split.next().is_none(), "Should be no more splits."); - - Ok(Some((url.to_owned(), record))) - } - None => Ok(None), - } + Ok(row_opt.map(|row| (row.url, QueueUrlRecord { intent: row.intent }))) } - pub fn get_domain_record(&self, domain: &str) -> anyhow::Result> { - let domains = &self.mdbx.borrow_dbs().domains; - - match self - .mdbx_txn - .get::>(domains, domain.as_bytes())? - { - None => Ok(None), - Some(MdbxBare(record)) => Ok(Some(record)), + pub async fn get_domain_record( + &mut self, + domain: &str, + ) -> anyhow::Result> { + struct Row { + pub domain_record: Json, } + + let domain_row = query_as!( + Row, + r#" + SELECT domain_record AS "domain_record: Json" FROM domains WHERE name = $1 + "#, + domain + ) + .fetch_optional(&mut *self.txn) + .await?; + + Ok(domain_row.map(|row| { + let Json(result) = row.domain_record; + result + })) } /// Emits metrics for the datastore. Call this occasionally. - pub fn emit_datastore_metrics(&self) -> anyhow::Result<()> { - for (db_name, db) in self.mdbx.borrow_dbs().iter_all_databases() { - let stat = self.mdbx_txn.db_stat(db)?; - let entries = stat.entries() as f64; - let size_in_pages = stat.branch_pages() + stat.leaf_pages() + stat.overflow_pages(); - let size_in_bytes = stat.page_size() as f64 * size_in_pages as f64; - gauge!("db_entries", entries, "db" => db_name); - gauge!("db_size_bytes", size_in_bytes, "db" => db_name); + pub async fn emit_datastore_metrics(&mut self) -> anyhow::Result<()> { + for table in [ + "domains", + "urls", + "url_queue", + "active_domain_raffle", + "domain_backoffs", + ] { + let size_bytes = query!( + "SELECT pg_relation_size($1::text::regclass) AS size_bytes", + table + ) + .fetch_one(&mut *self.txn) + .await? + .size_bytes + .unwrap_or(-1); + + let num_rows = query!( + r#" + SELECT n_live_tup + FROM pg_stat_user_tables + WHERE relname = $1 + "#, + table + ) + .fetch_one(&mut *self.txn) + .await? + .n_live_tup + .unwrap_or(-1); + + gauge!("db_entries", num_rows as f64, "db" => table); + gauge!("db_size_bytes", size_bytes as f64, "db" => table); } + Ok(()) } } @@ -798,67 +719,85 @@ pub mod test { use super::*; use crate::raking::TemporaryFailureReason; use std::collections::BTreeSet; - use tempfile::NamedTempFile; - #[test] - fn test_reinstate_multiple_domains() -> anyhow::Result<()> { - let tfile = NamedTempFile::new()?; - let store = RakerStore::open(tfile.path())?; - { - let txn = store.rw_txn()?; - txn.insert_active_domain_with_new_raffle_ticket("a.invalid".to_owned())?; - txn.insert_active_domain_with_new_raffle_ticket("b.invalid".to_owned())?; - txn.commit()?; - } + #[tokio::test] + async fn test_reinstate_multiple_domains() -> anyhow::Result<()> { + let store = RakerStore::open(env!("RAKER_DATABASE_URI")).await?; + + store + .rw_txn(move |mut txn| { + Box::pin(async move { + txn.insert_active_domain_with_new_raffle_ticket("a.invalid".to_owned()) + .await?; + txn.insert_active_domain_with_new_raffle_ticket("b.invalid".to_owned()) + .await?; + Ok(()) + }) + }) + .await?; let now = SystemTime::now(); - { - let txn = store.rw_txn()?; - txn.start_backing_off( - "a.invalid", - 300, - "".to_owned(), - TemporaryFailure { - reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1), - backoff_sec: 300, - }, - )?; - txn.start_backing_off( - "b.invalid", - 300, - "".to_owned(), - TemporaryFailure { - reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1), - backoff_sec: 300, - }, - )?; - txn.commit()?; - } + store + .rw_txn(move |mut txn| { + Box::pin(async move { + txn.start_backing_off( + "a.invalid", + 300, + "".to_owned(), + TemporaryFailure { + reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1), + backoff_sec: 300, + }, + ) + .await?; + txn.start_backing_off( + "b.invalid", + 300, + "".to_owned(), + TemporaryFailure { + reason: TemporaryFailureReason::ExcruciatingCrawlDelay(1), + backoff_sec: 300, + }, + ) + .await?; + Ok(()) + }) + }) + .await?; - { - let txn = store.ro_txn()?; - assert_eq!( - txn.acquire_random_active_domain(Default::default())?, - RandomActiveDomainAcquisition::NoneLeft - ); - } + store + .ro_txn(move |mut txn| { + Box::pin(async move { + assert_eq!( + txn.acquire_random_active_domain(Default::default()).await?, + RandomActiveDomainAcquisition::NoneLeft + ); + Ok(()) + }) + }) + .await?; - { - let txn = store.rw_txn()?; - txn.reinstate_backoffs(now + Duration::from_secs(600))?; - txn.commit()?; - } + store + .rw_txn(move |mut txn| { + Box::pin(async move { + txn.reinstate_backoffs(now + Duration::from_secs(600)) + .await?; + Ok(()) + }) + }) + .await?; - { - let txn = store.ro_txn()?; - let busy = Default::default(); + store + .ro_txn(move |mut txn| { + Box::pin(async move { + let busy = Default::default(); - let acq1 = txn.acquire_random_active_domain(Arc::clone(&busy))?; - let acq2 = txn.acquire_random_active_domain(Arc::clone(&busy))?; + let acq1 = txn.acquire_random_active_domain(Arc::clone(&busy)).await?; + let acq2 = txn.acquire_random_active_domain(Arc::clone(&busy)).await?; - assert!( - matches!((acq1.clone(), acq2.clone()), ( + assert!( + matches!((acq1.clone(), acq2.clone()), ( RandomActiveDomainAcquisition::GotOne { domain: dom1, .. @@ -870,10 +809,13 @@ pub mod test { ) if vec![dom1.as_ref(), dom2.as_ref()].into_iter().collect::>() == vec![ "a.invalid", "b.invalid" ].into_iter().collect::>()), - "{:#?}", - (acq1, acq2) - ); - } + "{:#?}", + (acq1, acq2) + ); + Ok(()) + }) + }) + .await?; Ok(()) } diff --git a/quickpeep_raker/src/storage/maintenance.rs b/quickpeep_raker/src/storage/maintenance.rs index f0f1dd8..3d28229 100644 --- a/quickpeep_raker/src/storage/maintenance.rs +++ b/quickpeep_raker/src/storage/maintenance.rs @@ -1,80 +1,41 @@ -use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString}; -use crate::storage::records::{DomainRecord, OnHoldUrlRecord}; +use crate::storage::records::DomainRecord; use crate::storage::RakerTxn; -use anyhow::Context; -use libmdbx::{Database, WriteFlags, RW}; -use reqwest::Url; +use itertools::Itertools; +use sqlx::query_as; +use sqlx::types::Json; -/// Runs one big transaction that: -/// - scans on-hold URLs -/// - moves 'allowed' ones to the queue -/// - deletes 'weeds' -/// - leaves unknown ones alone -/// +/// Re-enqueues domains /// Ideally should be applied after importing seeds and weeds on an existing database. -pub fn reapply_seeds_and_weeds_to_on_hold_urls(txn: RakerTxn) -> anyhow::Result<()> { +pub async fn reapply_seeds_and_weeds_to_on_hold_urls( + mut txn: RakerTxn<'_, '_, true>, +) -> anyhow::Result<()> { struct DomainState { - pub domain: String, - pub domain_record: Option, + pub domain_id: i32, + pub domain_record: Json, } - let urls_on_hold: &Database = &txn.mdbx.borrow_dbs().urls_on_hold; + let reinstatable_domains: Vec = query_as!(DomainState, r#" + SELECT DISTINCT u.domain_id, domain_record AS "domain_record: Json" FROM url_queue + JOIN urls u USING (url_id) + JOIN domains USING (domain_id) + LEFT JOIN domain_backoffs db ON u.domain_id = db.domain_id + WHERE db.domain_id IS NULL + "#).fetch_all(&mut *txn.txn) + .await?; - let mut domain_state = None; - - // Scan through the on-hold URLs - let mut cur = txn.mdbx_txn.cursor(urls_on_hold)?; - let mut first_iteration = true; - - while let Some((MdbxString(domain_then_url), MdbxBare(record))) = if first_iteration { - first_iteration = false; - cur.first::>() - } else { - cur.next::>() - }? { - let mut split = domain_then_url.as_ref().split("\n"); - let domain = split.next().context("No first split..?")?; - let url_str = split.next().context("No URL")?; - - // Is the domain new? - if domain_state - .as_ref() - .map(|ds: &DomainState| &ds.domain != domain) - .unwrap_or(true) - { - // Then load the relevant records for it. - domain_state = Some(DomainState { - domain: domain.to_owned(), - domain_record: txn.get_domain_record(domain)?, - }); - } - - let url = Url::parse(url_str)?; - - let domain_state = domain_state.as_ref().unwrap(); - - let is_rakeable = domain_state + for domain_state in reinstatable_domains { + let any_seeds = domain_state .domain_record - .as_ref() - .map(|dr: &DomainRecord| dr.is_url_rakeable(&url)) - .flatten(); - - match is_rakeable { - Some(true) => { - // ALLOWED - // Make it a queued URL - txn.enqueue_url(url_str, None, record.queue_record.intent)?; - cur.del(WriteFlags::empty())?; - } - Some(false) => { - // WEED - // Just delete - cur.del(WriteFlags::empty())?; - } - None => { /* nop: neither allowed nor a weed. Keep on hold. */ } + .rakeable_path_prefixes + .values() + .contains(&true); + if !any_seeds { + continue; } - } - txn.commit()?; + // This domain has *some* seeds, let's just reinstate it. + txn.insert_active_domain_id_with_new_raffle_ticket(domain_state.domain_id) + .await?; + } Ok(()) } diff --git a/quickpeep_raker/src/storage/mdbx_helper_types.rs b/quickpeep_raker/src/storage/mdbx_helper_types.rs deleted file mode 100644 index caf3b1f..0000000 --- a/quickpeep_raker/src/storage/mdbx_helper_types.rs +++ /dev/null @@ -1,190 +0,0 @@ -use anyhow::{anyhow, Context}; -use libmdbx::{Error, TableObject, TransactionKind}; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::borrow::Cow; - -/// u16 in BIG byte endianness (u16 not supported by INTEGERKEY mode!) -#[derive(Copy, Clone, Debug)] -pub struct MdbxU16BE(pub u16); - -impl MdbxU16BE { - pub fn as_bytes(&self) -> Cow<'_, [u8]> { - Cow::Owned(self.0.to_be_bytes().to_vec()) - } -} - -impl TableObject<'_> for MdbxU16BE { - fn decode(data_val: &[u8]) -> Result - where - Self: Sized, - { - if data_val.len() != 2 { - return Err(libmdbx::Error::DecodeError( - anyhow!("MDBX Key not 2 bytes; can't be decoded as u16").into(), - )); - } - let mut buf = [0u8; 2]; - buf.copy_from_slice(&data_val); - Ok(MdbxU16BE(u16::from_be_bytes(buf))) - } -} - -/// u32 in native byte endianness (as required by INTEGERKEY mode) -#[derive(Copy, Clone, Debug)] -pub struct MdbxU32(pub u32); - -impl MdbxU32 { - pub fn as_bytes(&self) -> Cow<'_, [u8]> { - Cow::Owned(self.0.to_ne_bytes().to_vec()) - } -} - -impl TableObject<'_> for MdbxU32 { - fn decode(data_val: &[u8]) -> Result - where - Self: Sized, - { - if data_val.len() != 4 { - return Err(libmdbx::Error::DecodeError( - anyhow!("MDBX Key not 4 bytes; can't be decoded as u32").into(), - )); - } - let mut buf = [0u8; 4]; - buf.copy_from_slice(&data_val); - Ok(MdbxU32(u32::from_ne_bytes(buf))) - } -} - -/// u64 in native byte endianness (as required by INTEGERKEY mode) -#[derive(Copy, Clone, Debug)] -pub struct MdbxU64(pub u64); - -impl MdbxU64 { - pub fn as_bytes(&self) -> Cow<'_, [u8]> { - Cow::Owned(self.0.to_ne_bytes().to_vec()) - } -} - -impl TableObject<'_> for MdbxU64 { - fn decode(data_val: &[u8]) -> Result - where - Self: Sized, - { - if data_val.len() != 8 { - return Err(libmdbx::Error::DecodeError( - anyhow!("MDBX Key not 8 bytes; can't be decoded as u64").into(), - )); - } - let mut buf = [0u8; 8]; - buf.copy_from_slice(&data_val); - Ok(MdbxU64(u64::from_ne_bytes(buf))) - } -} - -/// UTF-8 String -#[derive(Clone, Debug)] -pub struct MdbxString<'txn>(pub Cow<'txn, str>); - -impl MdbxString<'_> { - pub fn as_bytes(&self) -> &[u8] { - self.0.as_bytes() - } - - pub fn into_string(self) -> String { - self.0.into_owned() - } -} - -impl<'a> TableObject<'_> for MdbxString<'a> { - fn decode(_data_val: &[u8]) -> Result - where - Self: Sized, - { - unreachable!() - } - - unsafe fn decode_val( - txnptr: *const mdbx_sys::MDBX_txn, - data_val: &mdbx_sys::MDBX_val, - ) -> Result - where - Self: Sized, - { - let bytes = MdbxBytes::decode_val::(txnptr, data_val)?; - let string_cow = match bytes { - Cow::Borrowed(data) => { - let string = std::str::from_utf8(data) - .context("Failed to decode MDBX key as string") - .map_err(|e| libmdbx::Error::DecodeError(e.into()))?; - Cow::Borrowed(string) - } - Cow::Owned(data) => { - let string = String::from_utf8(data) - .context("Failed to decode MDBX key as string") - .map_err(|e| libmdbx::Error::DecodeError(e.into()))?; - Cow::Owned(string) - } - }; - Ok(MdbxString(string_cow)) - } -} - -// /// UTF-8 String -// /// Using Cow<'txn, str> would've needed some unsafe code (see `Cow<'txn, [u8]>` for inspiration), -// /// so I didn't bother for now. -// #[derive(Clone, Debug)] -// pub struct MdbxString(pub String); -// -// impl MdbxString { -// pub fn as_bytes(&self) -> &[u8] { -// self.0.as_bytes() -// } -// } -// -// impl TableObject<'_> for MdbxString { -// fn decode(data_val: &[u8]) -> Result where Self: Sized { -// let string = String::from_utf8(data_val.to_vec()) -// .context("Failed to decode MDBX key as string") -// .map_err(|e| libmdbx::Error::DecodeError(e.into()))?; -// Ok(MdbxString(string)) -// } -// } - -/// Any BARE payload -#[derive(Clone, Debug)] -pub struct MdbxBare(pub T); - -impl MdbxBare { - pub fn as_bytes(&self) -> Vec { - serde_bare::to_vec(&self.0).expect("It's unreasonable to expect serialisation will fail") - } -} - -impl TableObject<'_> for MdbxBare { - fn decode(_data_val: &[u8]) -> Result - where - Self: Sized, - { - unreachable!() - } - - unsafe fn decode_val( - txnptr: *const mdbx_sys::MDBX_txn, - data_val: &mdbx_sys::MDBX_val, - ) -> Result - where - Self: Sized, - { - let bytes = MdbxBytes::decode_val::(txnptr, data_val)?; - - let record = serde_bare::from_slice(bytes.as_ref()) - .context("Failed to decode MDBX key as BARE object") - .map_err(|e| libmdbx::Error::DecodeError(e.into()))?; - - Ok(MdbxBare(record)) - } -} - -/// Supported natively by libmdbx. -pub type MdbxBytes<'txn> = Cow<'txn, [u8]>; diff --git a/quickpeep_raker/src/storage/migrations.rs b/quickpeep_raker/src/storage/migrations.rs deleted file mode 100644 index 5af7165..0000000 --- a/quickpeep_raker/src/storage/migrations.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub const MIGRATION_KEY: &[u8] = b"MIGRATION_VERSION"; -pub const MIGRATION_VERSION: &str = "quickpeep_raker:0.1.0"; diff --git a/quickpeep_raker/src/storage/records.rs b/quickpeep_raker/src/storage/records.rs index e4c7f79..2265a8f 100644 --- a/quickpeep_raker/src/storage/records.rs +++ b/quickpeep_raker/src/storage/records.rs @@ -1,50 +1,13 @@ -use crate::raking::{RakeIntent, TemporaryFailure}; +use crate::raking::RakeIntent; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] -pub struct ActiveDomainRecord { - /// The raffle ticket number owned by this domain. - pub raffle_ticket: u32, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct UrlVisitedRecord { - /// Number of days since the QuickPeep Epoch that this page was last raked at. - /// A u16 is fine here, giving 179 years worth of values. This allows compact encoding. - /// We don't really care about a more granular timestamp: sitemaps and feeds usually only - /// give the date of last update anyway. - pub last_visited_days: u16, -} - #[derive(Clone, Debug, Deserialize, Serialize)] pub struct QueueUrlRecord { pub intent: RakeIntent, // TODO CONSIDER } -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct OnHoldUrlRecord { - /// Record that should be emitted once this is released. - pub queue_record: QueueUrlRecord, - - /// Number of times this URL has been 'enqueued'; capped at 255. - pub refs: u8, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct BackingOffDomainRecord { - /// The URL that caused the backoff. - pub failed_url: String, - /// The reason that this backoff is in place - pub failure: TemporaryFailure, - /// Duration of the backoff. Used to provide increasing backoffs if the failures persist. - pub backoff: u32, - /// When the domain should be reinstated - /// MUST match the timestamp present in the reinstatements table. - pub reinstate_at: u64, -} - #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct DomainRecord { pub rakeable_path_prefixes: BTreeMap, diff --git a/quickpeep_utils/src/dates.rs b/quickpeep_utils/src/dates.rs index 6c01bff..26a2f61 100644 --- a/quickpeep_utils/src/dates.rs +++ b/quickpeep_utils/src/dates.rs @@ -1,11 +1,15 @@ use anyhow::Context; use chrono::{Date, Duration, TimeZone, Utc}; use lazy_static::lazy_static; +use std::time::SystemTime; lazy_static! { - /// The QuickPeep Epoch is 2022-01-01, as this gives us 52 years of extra headroom compared to the - /// Unix one. QuickPeep didn't exist before 2022 so we needn't worry about negative dates! - pub static ref QUICKPEEP_EPOCH: Date = Utc.ymd(2022, 1, 1); + /// The QuickPeep Epoch is 2023-01-01, as this gives us 53 years of extra headroom compared to the + /// Unix one. This QuickPeep database format didn't exist before 2023 so we needn't worry about negative dates! + pub static ref QUICKPEEP_EPOCH: Date = Utc.ymd(2023, 1, 1); + + /// 2023-01-01 + pub static ref QUICKPEEP_EPOCH_ST: SystemTime = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1672531200); } pub fn date_from_quickpeep_days(days: u16) -> Date { diff --git a/shell.nix b/shell.nix index eb0ec79..76c71aa 100644 --- a/shell.nix +++ b/shell.nix @@ -17,6 +17,8 @@ pkgs.mkShell { pkgs.pkg-config + pkgs.sqlx-cli + #pkgs.libclang # ?? ];