Compare commits

...

1 Commits

Author SHA1 Message Date
Olivier 'reivilibre' 2876939b59 Port to Postgres
ci/woodpecker/push/check Pipeline failed Details
ci/woodpecker/push/manual Pipeline was successful Details
ci/woodpecker/push/release Pipeline was successful Details
2023-04-02 10:52:58 +01:00
22 changed files with 1356 additions and 1595 deletions

343
Cargo.lock generated
View File

@ -15,7 +15,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab3f32d1eb0f323dcdd51cbb8d68cff415153870ff3bd60e12d5d56298bfcb1" checksum = "7ab3f32d1eb0f323dcdd51cbb8d68cff415153870ff3bd60e12d5d56298bfcb1"
dependencies = [ dependencies = [
"addr", "addr",
"base64", "base64 0.13.0",
"bitflags", "bitflags",
"flate2", "flate2",
"idna", "idna",
@ -206,9 +206,9 @@ dependencies = [
[[package]] [[package]]
name = "atoi" name = "atoi"
version = "0.4.0" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e"
dependencies = [ dependencies = [
"num-traits", "num-traits",
] ]
@ -318,6 +318,12 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "base64"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]] [[package]]
name = "bindgen" name = "bindgen"
version = "0.59.2" version = "0.59.2"
@ -394,15 +400,6 @@ dependencies = [
"generic-array 0.12.4", "generic-array 0.12.4",
] ]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array 0.14.5",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.10.2" version = "0.10.2"
@ -700,18 +697,18 @@ dependencies = [
[[package]] [[package]]
name = "crc" name = "crc"
version = "2.1.0" version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
dependencies = [ dependencies = [
"crc-catalog", "crc-catalog",
] ]
[[package]] [[package]]
name = "crc-catalog" name = "crc-catalog"
version = "1.1.1" version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
@ -947,15 +944,6 @@ dependencies = [
"generic-array 0.12.4", "generic-array 0.12.4",
] ]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array 0.14.5",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.3" version = "0.10.3"
@ -978,10 +966,30 @@ dependencies = [
] ]
[[package]] [[package]]
name = "dotenv" name = "dirs"
version = "0.15.0" version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "dotenvy"
version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]] [[package]]
name = "downcast-rs" name = "downcast-rs"
@ -1009,6 +1017,9 @@ name = "either"
version = "1.6.1" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "encoding_rs" name = "encoding_rs"
@ -1243,9 +1254,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.21" version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
@ -1349,13 +1360,13 @@ checksum = "aa12dfaa57be769c6681b4d193398cae8db7f7b9af3e86d362d7f0a3c294a1a0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"ring", "ring",
"rustls", "rustls 0.19.1",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.22.0",
"url", "url",
"webpki", "webpki 0.21.4",
"webpki-roots", "webpki-roots 0.21.1",
"x509-signature", "x509-signature",
] ]
@ -1453,12 +1464,21 @@ dependencies = [
] ]
[[package]] [[package]]
name = "hashlink" name = "hashbrown"
version = "0.7.0" version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [ dependencies = [
"hashbrown", "ahash 0.7.6",
]
[[package]]
name = "hashlink"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa"
dependencies = [
"hashbrown 0.12.3",
] ]
[[package]] [[package]]
@ -1467,7 +1487,7 @@ version = "7.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0" checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0"
dependencies = [ dependencies = [
"base64", "base64 0.13.0",
"byteorder", "byteorder",
"crossbeam-channel", "crossbeam-channel",
"flate2", "flate2",
@ -1489,6 +1509,9 @@ name = "heck"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
dependencies = [
"unicode-segmentation",
]
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
@ -1505,6 +1528,24 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hkdf"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437"
dependencies = [
"hmac",
]
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest 0.10.3",
]
[[package]] [[package]]
name = "html5ever" name = "html5ever"
version = "0.25.1" version = "0.25.1"
@ -1671,7 +1712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown", "hashbrown 0.11.2",
] ]
[[package]] [[package]]
@ -1890,9 +1931,9 @@ dependencies = [
[[package]] [[package]]
name = "libsqlite3-sys" name = "libsqlite3-sys"
version = "0.23.2" version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cafc7c74096c336d9d27145f7ebd4f4b6f95ba16aa5a282387267e6925cb58" checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14"
dependencies = [ dependencies = [
"cc", "cc",
"pkg-config", "pkg-config",
@ -2720,7 +2761,7 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb87f3080f6d1d69e8c564c0fcfde1d7aa8cc451ce40cae89479111f03bc0eb" checksum = "fcb87f3080f6d1d69e8c564c0fcfde1d7aa8cc451ce40cae89479111f03bc0eb"
dependencies = [ dependencies = [
"hashbrown", "hashbrown 0.11.2",
] ]
[[package]] [[package]]
@ -2779,6 +2820,15 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9376a4f0340565ad675d11fc1419227faf5f60cd7ac9cb2e7185a471f30af833" checksum = "9376a4f0340565ad675d11fc1419227faf5f60cd7ac9cb2e7185a471f30af833"
[[package]]
name = "md-5"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66b48670c893079d3c2ed79114e3644b7004df1c361a4e0ad52e2e6940d07c3d"
dependencies = [
"digest 0.10.3",
]
[[package]] [[package]]
name = "mdbx-sys" name = "mdbx-sys"
version = "0.11.4-git.20210105" version = "0.11.4-git.20210105"
@ -2924,7 +2974,7 @@ dependencies = [
"atomic-shim", "atomic-shim",
"crossbeam-epoch", "crossbeam-epoch",
"crossbeam-utils", "crossbeam-utils",
"hashbrown", "hashbrown 0.11.2",
"metrics 0.18.1", "metrics 0.18.1",
"num_cpus", "num_cpus",
"parking_lot 0.11.2", "parking_lot 0.11.2",
@ -3219,12 +3269,6 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]] [[package]]
name = "openssl" name = "openssl"
version = "0.10.38" version = "0.10.38"
@ -3615,11 +3659,11 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.36" version = "1.0.55"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" checksum = "1d0dd4be24fcdcfeaa12a432d588dc59bbad6cad3510c67e74a2b6b2fc950564"
dependencies = [ dependencies = [
"unicode-xid", "unicode-ident",
] ]
[[package]] [[package]]
@ -3659,7 +3703,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "292972edad6bbecc137ab84c5e36421a4a6c979ea31d3cc73540dd04315b33e1" checksum = "292972edad6bbecc137ab84c5e36421a4a6c979ea31d3cc73540dd04315b33e1"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"hashbrown", "hashbrown 0.11.2",
"idna", "idna",
"psl-types", "psl-types",
] ]
@ -3807,6 +3851,7 @@ dependencies = [
"diplomatic-bag", "diplomatic-bag",
"env_logger", "env_logger",
"feed-rs", "feed-rs",
"futures-core",
"futures-util", "futures-util",
"gemini-fetch", "gemini-fetch",
"html5ever", "html5ever",
@ -3815,12 +3860,10 @@ dependencies = [
"itertools", "itertools",
"kuchiki", "kuchiki",
"lazy_static", "lazy_static",
"libmdbx",
"lingua", "lingua",
"log", "log",
"lru", "lru",
"markup5ever", "markup5ever",
"mdbx-sys",
"metrics 0.18.1", "metrics 0.18.1",
"metrics-exporter-prometheus", "metrics-exporter-prometheus",
"metrics-process-promstyle", "metrics-process-promstyle",
@ -3843,7 +3886,7 @@ dependencies = [
"signal-hook 0.3.13", "signal-hook 0.3.13",
"sitemap", "sitemap",
"smartstring", "smartstring",
"tempfile", "sqlx",
"tikv-jemallocator", "tikv-jemallocator",
"tokio", "tokio",
"webp", "webp",
@ -4017,6 +4060,17 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "redox_users"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b"
dependencies = [
"getrandom 0.2.5",
"redox_syscall",
"thiserror",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.5.5" version = "1.5.5"
@ -4066,7 +4120,7 @@ version = "0.11.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb"
dependencies = [ dependencies = [
"base64", "base64 0.13.0",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
@ -4150,7 +4204,7 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b861ecaade43ac97886a512b360d01d66be9f41f3c61088b42cedf92e03d678" checksum = "1b861ecaade43ac97886a512b360d01d66be9f41f3c61088b42cedf92e03d678"
dependencies = [ dependencies = [
"base64", "base64 0.13.0",
"bitflags", "bitflags",
"serde", "serde",
] ]
@ -4186,11 +4240,32 @@ version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [ dependencies = [
"base64", "base64 0.13.0",
"log", "log",
"ring", "ring",
"sct", "sct 0.6.1",
"webpki", "webpki 0.21.4",
]
[[package]]
name = "rustls"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"log",
"ring",
"sct 0.7.0",
"webpki 0.22.0",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.21.0",
] ]
[[package]] [[package]]
@ -4237,6 +4312,16 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]] [[package]]
name = "seahash" name = "seahash"
version = "3.0.7" version = "3.0.7"
@ -4363,20 +4448,29 @@ dependencies = [
"block-buffer 0.7.3", "block-buffer 0.7.3",
"digest 0.8.1", "digest 0.8.1",
"fake-simd", "fake-simd",
"opaque-debug 0.2.3", "opaque-debug",
]
[[package]]
name = "sha1"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "006769ba83e921b3085caa8334186b00cf92b4cb1a6cf4632fbccc8eff5c7549"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.3",
] ]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.9.9" version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" checksum = "cf9db03534dff993187064c4e0c05a5708d2a9728ace9a8959b77bedf415dac5"
dependencies = [ dependencies = [
"block-buffer 0.9.0",
"cfg-if", "cfg-if",
"cpufeatures", "cpufeatures",
"digest 0.9.0", "digest 0.10.3",
"opaque-debug 0.3.0",
] ]
[[package]] [[package]]
@ -4500,9 +4594,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlformat" name = "sqlformat"
version = "0.1.8" version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4" checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e"
dependencies = [ dependencies = [
"itertools", "itertools",
"nom 7.1.1", "nom 7.1.1",
@ -4511,9 +4605,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx" name = "sqlx"
version = "0.5.11" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc15591eb44ffb5816a4a70a7efd5dd87bfd3aa84c4c200401c4396140525826" checksum = "f8de3b03a925878ed54a954f621e64bf55a3c1bd29652d0d1a17830405350188"
dependencies = [ dependencies = [
"sqlx-core", "sqlx-core",
"sqlx-macros", "sqlx-macros",
@ -4521,18 +4615,22 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-core" name = "sqlx-core"
version = "0.5.11" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "195183bf6ff8328bb82c0511a83faf60aacf75840103388851db61d7a9854ae3" checksum = "fa8241483a83a3f33aa5fff7e7d9def398ff9990b2752b6c6112b83c6d246029"
dependencies = [ dependencies = [
"ahash 0.7.6", "ahash 0.7.6",
"atoi", "atoi",
"base64 0.13.0",
"bitflags", "bitflags",
"byteorder", "byteorder",
"bytes", "bytes",
"crc", "crc",
"crossbeam-queue", "crossbeam-queue",
"dirs",
"dotenvy",
"either", "either",
"event-listener",
"flume", "flume",
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -4541,16 +4639,24 @@ dependencies = [
"futures-util", "futures-util",
"hashlink", "hashlink",
"hex", "hex",
"hkdf",
"hmac",
"indexmap", "indexmap",
"itoa 1.0.1", "itoa 1.0.1",
"libc", "libc",
"libsqlite3-sys", "libsqlite3-sys",
"log", "log",
"md-5",
"memchr", "memchr",
"once_cell", "once_cell",
"paste", "paste",
"percent-encoding", "percent-encoding",
"rustls", "rand 0.8.5",
"rustls 0.20.8",
"rustls-pemfile",
"serde",
"serde_json",
"sha1",
"sha2", "sha2",
"smallvec", "smallvec",
"sqlformat", "sqlformat",
@ -4559,22 +4665,25 @@ dependencies = [
"thiserror", "thiserror",
"tokio-stream", "tokio-stream",
"url", "url",
"webpki", "webpki-roots 0.22.6",
"webpki-roots", "whoami",
] ]
[[package]] [[package]]
name = "sqlx-macros" name = "sqlx-macros"
version = "0.5.11" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eee35713129561f5e55c554bba1c378e2a7e67f81257b7311183de98c50e6f94" checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9"
dependencies = [ dependencies = [
"dotenv", "dotenvy",
"either", "either",
"heck 0.3.3", "heck 0.4.0",
"hex",
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"serde",
"serde_json",
"sha2", "sha2",
"sqlx-core", "sqlx-core",
"sqlx-rt", "sqlx-rt",
@ -4584,13 +4693,13 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-rt" name = "sqlx-rt"
version = "0.5.11" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b555e70fbbf84e269ec3858b7a6515bcfe7a166a7cc9c636dd6efd20431678b6" checksum = "804d3f245f894e61b1e6263c84b23ca675d96753b5abfd5cc8597d86806e8024"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.23.4",
] ]
[[package]] [[package]]
@ -4683,13 +4792,13 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.89" version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"unicode-xid", "unicode-ident",
] ]
[[package]] [[package]]
@ -4705,7 +4814,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "264c2549892aa83975386a924ef8d0b8e909674c837d37ea58b4bd8739495c6e" checksum = "264c2549892aa83975386a924ef8d0b8e909674c837d37ea58b4bd8739495c6e"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64", "base64 0.13.0",
"bitpacking", "bitpacking",
"byteorder", "byteorder",
"census", "census",
@ -4994,9 +5103,20 @@ version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
dependencies = [ dependencies = [
"rustls", "rustls 0.19.1",
"tokio", "tokio",
"webpki", "webpki 0.21.4",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls 0.20.8",
"tokio",
"webpki 0.22.0",
] ]
[[package]] [[package]]
@ -5203,6 +5323,12 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f"
[[package]]
name = "unicode-ident"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
[[package]] [[package]]
name = "unicode-normalization" name = "unicode-normalization"
version = "0.1.19" version = "0.1.19"
@ -5218,12 +5344,6 @@ version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]] [[package]]
name = "unicode_categories" name = "unicode_categories"
version = "0.1.1" version = "0.1.1"
@ -5412,13 +5532,32 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]] [[package]]
name = "webpki-roots" name = "webpki-roots"
version = "0.21.1" version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
dependencies = [ dependencies = [
"webpki", "webpki 0.21.4",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki 0.22.0",
] ]
[[package]] [[package]]
@ -5436,6 +5575,16 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "whoami"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c70234412ca409cc04e864e89523cb0fc37f5e1344ebed5a3ebf4192b6b9f68"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

View File

@ -17,7 +17,7 @@ ron = "0.7.0"
tower-http = { version = "0.2.5", features = ["fs"] } tower-http = { version = "0.2.5", features = ["fs"] }
log = "0.4.14" log = "0.4.14"
env_logger = "0.9.0" env_logger = "0.9.0"
sqlx = { version = "0.5.11", features = ["sqlite", "runtime-tokio-rustls"] } sqlx = { version = "0.6.3", features = ["sqlite", "runtime-tokio-rustls"] }
itertools = "0.10.3" itertools = "0.10.3"
colour = "0.6.0" colour = "0.6.0"
futures-util = "0.3.21" futures-util = "0.3.21"

1
quickpeep_raker/.env Normal file
View File

@ -0,0 +1 @@
DATABASE_URL=${RAKER_DATABASE_URL}

View File

@ -34,9 +34,7 @@ bytesize = {version = "1.1.0", features = ["serde"]}
chrono = "0.4.19" chrono = "0.4.19"
### Storage ### Storage
libmdbx = "0.1.1" sqlx = { version = "0.6.3", features = ["postgres", "runtime-tokio-rustls", "offline", "json"] }
# Used for FFI. Must match the version in libmdbx.
mdbx-sys = "0.11.4-git.20210105"
# For compression of emitted packs. 0.11.1+zstd.1.5.2 # For compression of emitted packs. 0.11.1+zstd.1.5.2
zstd = "0.11.1" zstd = "0.11.1"
@ -45,6 +43,7 @@ lazy_static = "1.4.0"
bytes = "1.1.0" bytes = "1.1.0"
itertools = "0.10.3" itertools = "0.10.3"
ipnetwork = "0.18.0" ipnetwork = "0.18.0"
futures-core = "0.3.28"
futures-util = "0.3.21" futures-util = "0.3.21"
tokio = { version = "1.17.0", features = ["full"] } tokio = { version = "1.17.0", features = ["full"] }
anyhow = "1.0.55" anyhow = "1.0.55"
@ -91,6 +90,3 @@ metrics = "0.18.1"
metrics-exporter-prometheus = { version = "0.9.0", default-features = false, features = ["http-listener"] } metrics-exporter-prometheus = { version = "0.9.0", default-features = false, features = ["http-listener"] }
metrics-process-promstyle = "0.18.0" metrics-process-promstyle = "0.18.0"
bare-metrics-recorder = "0.1.0" bare-metrics-recorder = "0.1.0"
[dev-dependencies]
tempfile = "3.3.0"

5
quickpeep_raker/build.rs Normal file
View File

@ -0,0 +1,5 @@
// generated by `sqlx migrate build-script`
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
}

21
quickpeep_raker/devdb.sh Normal file
View File

@ -0,0 +1,21 @@
#!/bin/sh
cname=`docker run -d --rm -e POSTGRES_PASSWORD=password -p 127.0.0.10:55432:5432 postgres:15`
onstop() {
docker stop $cname
}
trap onstop EXIT
export RAKER_DATABASE_URL=postgres://postgres:password@127.0.0.10:55432/postgres
echo "Running migrations"
sqlx migrate run
echo "Preparing offline mode SQLx data"
#cargo sqlx prepare
cargo sqlx prepare -- --lib
#cargo sqlx prepare --merged -- --all-features --all-targets --lib

View File

@ -0,0 +1,42 @@
-- All timestamps are in sec since 2023-01-01.
CREATE TYPE rakeintent AS ENUM ('Any', 'Page', 'Feed', 'SiteMap', 'Icon');
CREATE TABLE domains (
domain_id SERIAL PRIMARY KEY NOT NULL,
name TEXT UNIQUE NOT NULL,
domain_record JSONB NOT NULL
);
CREATE TABLE urls (
url_id SERIAL PRIMARY KEY NOT NULL,
domain_id INTEGER NOT NULL REFERENCES domains(domain_id),
url TEXT NOT NULL,
intent rakeintent NOT NULL,
visited_at_ts INTEGER,
UNIQUE (domain_id, url)
);
CREATE TABLE url_queue (
url_id INTEGER PRIMARY KEY NOT NULL REFERENCES urls(url_id),
rake_after_ts INTEGER NOT NULL
);
-- Used for finding things to rake.
CREATE INDEX url_queue_rake_after_ts_idx ON url_queue(rake_after_ts);
CREATE TABLE active_domain_raffle (
domain_id INTEGER PRIMARY KEY NOT NULL REFERENCES domains(domain_id),
raffle INTEGER UNIQUE NOT NULL
);
CREATE TABLE domain_backoffs (
domain_id INTEGER PRIMARY KEY NOT NULL REFERENCES domains(domain_id),
backoff_until_ts INTEGER NOT NULL,
backoff_sec INTEGER NOT NULL,
reason TEXT NOT NULL
);
-- Used for finding things to rake.
CREATE INDEX domain_backoffs_backoff_until_ts_idx ON domain_backoffs(backoff_until_ts);

View File

@ -0,0 +1,3 @@
{
"db": "PostgreSQL"
}

View File

@ -1,173 +0,0 @@
use clap::Parser;
use std::borrow::Cow;
use std::fmt::Debug;
use env_logger::Env;
use anyhow::{bail, Context};
use colour::{dark_yellow_ln, red_ln};
use libmdbx::{Database, TableObject, RO};
use std::path::PathBuf;
use quickpeep_raker::config;
use quickpeep_raker::storage::mdbx_helper_types::MdbxBare;
use quickpeep_raker::storage::records::{
ActiveDomainRecord, BackingOffDomainRecord, DomainRecord, OnHoldUrlRecord, QueueUrlRecord,
UrlVisitedRecord,
};
use quickpeep_raker::storage::{RakerStore, RakerTxn};
/// Seeds a raker's queue with URLs
#[derive(Clone, Debug, Parser)]
pub struct Opts {
#[clap(long = "config")]
config: Option<PathBuf>,
/// Table name
table: String,
/// Key name to look up
key_name: String,
/// Search for any prefix, not an exact match.
#[clap(long = "prefix", short = 'p')]
prefix: bool,
}
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info,quickpeep=debug")).init();
let opts: Opts = Opts::parse();
let config_path = opts
.config
.unwrap_or_else(|| PathBuf::from("quickpeep.ron"));
let config = config::RakerConfig::load(&config_path).context("Failed to load config")?;
if !config.raker.workbench_dir.exists() {
bail!(
"Workbench directory ({:?}) doesn't exist.",
config.raker.workbench_dir
);
}
if !config.seed_dir.exists() {
bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir);
}
let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?;
let txn = store.ro_txn()?;
match opts.table.as_ref() {
"queue_urls" | "urls_queue" => {
inspect::<MdbxBare<QueueUrlRecord>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().queue_urls,
&txn,
)?;
}
"active_domains" => {
inspect::<MdbxBare<ActiveDomainRecord>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().active_domains,
&txn,
)?;
}
"active_domains_raffle" => {
inspect::<MdbxBare<String>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().active_domain_raffle,
&txn,
)?;
}
"backing_off_reinstatements" => {
inspect::<MdbxBare<String>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().backing_off_reinstatements,
&txn,
)?;
}
"backing_off_domains" => {
inspect::<MdbxBare<BackingOffDomainRecord>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().backing_off_domains,
&txn,
)?;
}
"visited_urls" => {
inspect::<MdbxBare<UrlVisitedRecord>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().visited_urls,
&txn,
)?;
}
"domains" => {
inspect::<MdbxBare<DomainRecord>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().domains,
&txn,
)?;
}
"urls_on_hold" => {
inspect::<MdbxBare<OnHoldUrlRecord>>(
opts.key_name.as_ref(),
opts.prefix,
&txn.mdbx.borrow_dbs().urls_on_hold,
&txn,
)?;
}
other => {
dark_yellow_ln!("Unknown database {:?}", other);
}
}
Ok(())
}
trait Inspectable {
fn inspect(&self) -> String;
}
impl<T: Debug> Inspectable for MdbxBare<T> {
fn inspect(&self) -> String {
format!("{:?}", &self.0)
}
}
fn inspect<'a, IV: Inspectable + TableObject<'a> + 'static>(
key: &str,
prefix: bool,
database: &Database<'a>,
txn: &'a RakerTxn<'a, RO>,
) -> anyhow::Result<()> {
if prefix {
let mut cur = txn.mdbx_txn.cursor(database)?;
for item in cur.iter_from::<Cow<'_, [u8]>, IV>(key.as_bytes()) {
let (k, v) = item?;
if !k.starts_with(key.as_bytes()) {
break;
}
println!("{}", std::str::from_utf8(&k).unwrap_or("<Not UTF-8>"));
println!(" = {}", v.inspect());
}
} else {
if let Some(entry) = txn.mdbx_txn.get::<IV>(database, key.as_bytes())? {
println!("{}", entry.inspect());
} else {
red_ln!("no value");
}
}
Ok(())
}

View File

@ -1,13 +1,11 @@
use anyhow::{bail, Context}; use anyhow::Context;
use clap::Parser; use clap::Parser;
use colour::{blue, yellow_ln}; use colour::{blue, yellow_ln};
use env_logger::Env; use env_logger::Env;
use itertools::Itertools; use itertools::Itertools;
use libmdbx::Database;
use quickpeep_raker::config; use quickpeep_raker::config;
use quickpeep_raker::storage::mdbx_helper_types::{MdbxBare, MdbxString};
use quickpeep_raker::storage::records::OnHoldUrlRecord;
use quickpeep_raker::storage::RakerStore; use quickpeep_raker::storage::RakerStore;
use sqlx::query;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
@ -16,12 +14,9 @@ use std::path::PathBuf;
pub struct Opts { pub struct Opts {
#[clap(long = "config")] #[clap(long = "config")]
config: Option<PathBuf>, config: Option<PathBuf>,
/// Whether to show URLs instead of domains
#[clap(long = "urls")]
urls: bool,
} }
// TODO re-introduce refcounting
#[tokio::main] #[tokio::main]
pub async fn main() -> anyhow::Result<()> { pub async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env( env_logger::Builder::from_env(
@ -36,19 +31,11 @@ pub async fn main() -> anyhow::Result<()> {
.unwrap_or_else(|| PathBuf::from("quickpeep.ron")); .unwrap_or_else(|| PathBuf::from("quickpeep.ron"));
let config = config::RakerConfig::load(&config_path).context("Failed to load config")?; let config = config::RakerConfig::load(&config_path).context("Failed to load config")?;
if !config.raker.workbench_dir.exists() { let store = RakerStore::open(&config.raker.database_uri).await?;
bail!(
"Workbench directory ({:?}) doesn't exist.",
config.raker.workbench_dir
);
}
let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?; let counts = count_on_hold(&store).await?;
let is_urls = opts.urls;
let counts = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<_>> { let counts = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<_>> {
let counts = count_on_hold(&store, is_urls)?;
let sorted_counts = counts let sorted_counts = counts
.into_iter() .into_iter()
.map(|(string, count)| (count, string)) .map(|(string, count)| (count, string))
@ -60,12 +47,8 @@ pub async fn main() -> anyhow::Result<()> {
}) })
.await??; .await??;
blue!("№ Refs "); blue!("№ URLs ");
if opts.urls {
yellow_ln!("URL");
} else {
yellow_ln!("Domain"); yellow_ln!("Domain");
}
for (count, string) in counts { for (count, string) in counts {
println!("{:>6} {}", count, string); println!("{:>6} {}", count, string);
} }
@ -73,25 +56,27 @@ pub async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub fn count_on_hold(store: &RakerStore, urls: bool) -> anyhow::Result<HashMap<String, u32>> { pub async fn count_on_hold(store: &RakerStore) -> anyhow::Result<HashMap<String, u32>> {
let mut map: HashMap<String, u32> = Default::default(); store
.ro_txn(move |txn| {
Box::pin(async move {
let rows = query!(
r#"
SELECT d.name AS "domain_name", COUNT(1) AS "url_count" FROM urls u
LEFT JOIN url_queue q ON u.url_id = q.url_id
JOIN domains d USING (domain_id)
WHERE u.visited_at_ts IS NULL AND q.url_id IS NULL
GROUP BY d.name
"#
)
.fetch_all(&mut *txn.txn)
.await?;
let txn = store.ro_txn()?; Ok(rows
let urls_on_hold: &Database = &txn.mdbx.borrow_dbs().urls_on_hold; .into_iter()
.map(|row| (row.domain_name, row.url_count.unwrap_or(0) as u32))
let mut cur = txn.mdbx_txn.cursor(urls_on_hold)?; .collect::<HashMap<String, u32>>())
})
for row in cur.iter_start::<MdbxString, MdbxBare<OnHoldUrlRecord>>() { })
let (domain_then_url, record) = row?; .await
let mut split = domain_then_url.0.as_ref().split('\n');
if urls {
// Skip one
split.next();
}
let piece = split.next().context("Missing piece")?;
let count = map.entry(piece.to_owned()).or_insert(0);
*count += record.0.refs as u32;
}
Ok(map)
} }

View File

@ -3,8 +3,7 @@ use clap::Parser;
use env_logger::Env; use env_logger::Env;
use adblock::lists::RuleTypes; use adblock::lists::RuleTypes;
use anyhow::{anyhow, bail, ensure, Context}; use anyhow::{ensure, Context};
use chrono::Utc;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use lru::LruCache; use lru::LruCache;
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
@ -14,10 +13,10 @@ use signal_hook::consts::{SIGINT, SIGTERM};
use signal_hook::iterator::Signals; use signal_hook::iterator::Signals;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use tokio::fs::File; use tokio::fs::File;
use tokio::sync::{mpsc, oneshot, Notify, Semaphore}; use tokio::sync::{mpsc, oneshot, Mutex, Notify, Semaphore};
use tokio::time::MissedTickBehavior; use tokio::time::MissedTickBehavior;
use quickpeep_raker::config; use quickpeep_raker::config;
@ -32,7 +31,6 @@ use quickpeep_structs::rake_entries::{
AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES, AnalysisAntifeatures, SCHEMA_RAKED_ICONS, SCHEMA_RAKED_PAGES, SCHEMA_RAKED_REFERENCES,
SCHEMA_RAKED_REJECTIONS, SCHEMA_RAKED_REJECTIONS,
}; };
use quickpeep_utils::dates::date_to_quickpeep_days;
/// The ordering is slightly important on these: more specific things should come first. /// The ordering is slightly important on these: more specific things should come first.
/// This means they filter out the troublesome elements before the broader filters do. /// This means they filter out the troublesome elements before the broader filters do.
@ -78,13 +76,6 @@ pub async fn main() -> anyhow::Result<()> {
.unwrap_or_else(|| PathBuf::from("quickpeep.ron")); .unwrap_or_else(|| PathBuf::from("quickpeep.ron"));
let config = config::RakerConfig::load(&config_path).context("Failed to load config")?; let config = config::RakerConfig::load(&config_path).context("Failed to load config")?;
if !config.raker.workbench_dir.exists() {
bail!(
"Workbench directory ({:?}) doesn't exist.",
config.raker.workbench_dir
);
}
let mut header_map = HeaderMap::new(); let mut header_map = HeaderMap::new();
header_map.insert(USER_AGENT, HeaderValue::from_static(RAKER_USER_AGENT)); header_map.insert(USER_AGENT, HeaderValue::from_static(RAKER_USER_AGENT));
@ -106,7 +97,7 @@ pub async fn main() -> anyhow::Result<()> {
.redirect(Policy::limited(5)) .redirect(Policy::limited(5))
.build()?; .build()?;
let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?; let store = RakerStore::open(&config.raker.database_uri).await?;
let mut adblock_engines = Vec::new(); let mut adblock_engines = Vec::new();
for (antifeature, name) in &ADBLOCK_FILTER_PATHS { for (antifeature, name) in &ADBLOCK_FILTER_PATHS {
@ -277,12 +268,8 @@ pub async fn main() -> anyhow::Result<()> {
// Reinstate old backoffs and re-rakable URLs // Reinstate old backoffs and re-rakable URLs
store store
.async_rw_txn(|txn| { .rw_txn(move |mut txn| {
let today = date_to_quickpeep_days(&Utc::today())?; Box::pin(async move { txn.reinstate_backoffs(SystemTime::now()).await })
txn.reinstate_backoffs(SystemTime::now())?;
txn.reinstate_rerakables(today)?;
txn.commit()?;
Ok(())
}) })
.await?; .await?;
@ -295,8 +282,9 @@ pub async fn main() -> anyhow::Result<()> {
loop { loop {
tokio::select! { tokio::select! {
_ = interval.tick() => { _ = interval.tick() => {
let txn = store.ro_txn()?; store.ro_txn(move |mut txn| Box::pin(async move {
txn.emit_datastore_metrics()?; txn.emit_datastore_metrics().await
})).await?;
metrics_process_promstyle::emit_now()?; metrics_process_promstyle::emit_now()?;
} }
_ = &mut dsmu_cancel_rx => { _ = &mut dsmu_cancel_rx => {
@ -341,14 +329,19 @@ pub async fn main() -> anyhow::Result<()> {
async fn acquire_active_domain(task_context: &TaskContext) -> anyhow::Result<Option<String>> { async fn acquire_active_domain(task_context: &TaskContext) -> anyhow::Result<Option<String>> {
// Acquire a domain for the task to run against // Acquire a domain for the task to run against
let domain = { let busy_domains = task_context.busy_domains.clone();
let txn = task_context.store.ro_txn()?; let domain = task_context
.store
.ro_txn(move |mut txn| {
Box::pin(async move {
// TODO: don't clone teh Arc here — conv to ref. // TODO: don't clone teh Arc here — conv to ref.
txn.acquire_random_active_domain(task_context.busy_domains.clone())? txn.acquire_random_active_domain(busy_domains).await
}; })
})
.await?;
match domain { match domain {
RandomActiveDomainAcquisition::GotOne { domain, record: _ } => Ok(Some(domain)), RandomActiveDomainAcquisition::GotOne { domain } => Ok(Some(domain)),
RandomActiveDomainAcquisition::AllBusy => Ok(None), RandomActiveDomainAcquisition::AllBusy => Ok(None),
RandomActiveDomainAcquisition::NoneLeft => Ok(None), RandomActiveDomainAcquisition::NoneLeft => Ok(None),
} }
@ -370,7 +363,7 @@ async fn orchestrator(task_context: TaskContext, semaphore: Arc<Semaphore>) -> a
if domain_to_process.is_none() && semaphore.available_permits() == max_permits { if domain_to_process.is_none() && semaphore.available_permits() == max_permits {
// There's nothing to do and nothing is being processed. // There's nothing to do and nothing is being processed.
ensure!( ensure!(
task_context.busy_domains.lock().unwrap().is_empty(), task_context.busy_domains.lock().await.is_empty(),
"Shutting down orchestrator but set of busy domains is not empty." "Shutting down orchestrator but set of busy domains is not empty."
); );
} }
@ -378,13 +371,9 @@ async fn orchestrator(task_context: TaskContext, semaphore: Arc<Semaphore>) -> a
tokio::select! { tokio::select! {
_ = tokio::time::sleep_until(next_reinstate.into()) => { _ = tokio::time::sleep_until(next_reinstate.into()) => {
// Reinstate backoffs and rerakables // Reinstate backoffs and rerakables
if let Err(err) = task_context.store.async_rw_txn(|txn| { if let Err(err) = task_context.store.rw_txn(move |mut txn| Box::pin(async move {
txn.reinstate_backoffs(SystemTime::now())?; txn.reinstate_backoffs(SystemTime::now()).await
let today = date_to_quickpeep_days(&Utc::today())?; })).await {
txn.reinstate_rerakables(today)?;
txn.commit()?;
Ok(())
}).await {
error!("Error performing periodic reinstatements: {err:?}"); error!("Error performing periodic reinstatements: {err:?}");
} }
@ -402,8 +391,7 @@ async fn orchestrator(task_context: TaskContext, semaphore: Arc<Semaphore>) -> a
} }
ensure!( ensure!(
task_context.busy_domains task_context.busy_domains
.lock() .lock().await
.map_err(|_| anyhow!("busy domains set poisoned"))?
.remove(&domain), .remove(&domain),
"Our domain was not busy after processing!" "Our domain was not busy after processing!"
); );

View File

@ -8,6 +8,8 @@ use anyhow::{bail, Context};
use colour::{dark_green_ln, dark_red_ln, dark_yellow, green, red, yellow_ln}; use colour::{dark_green_ln, dark_red_ln, dark_yellow, green, red, yellow_ln};
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
@ -42,17 +44,11 @@ pub async fn main() -> anyhow::Result<()> {
.unwrap_or_else(|| PathBuf::from("quickpeep.ron")); .unwrap_or_else(|| PathBuf::from("quickpeep.ron"));
let config = RakerConfig::load(&config_path).context("Failed to load config")?; let config = RakerConfig::load(&config_path).context("Failed to load config")?;
if !config.raker.workbench_dir.exists() {
bail!(
"Workbench directory ({:?}) doesn't exist.",
config.raker.workbench_dir
);
}
if !config.seed_dir.exists() { if !config.seed_dir.exists() {
bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir); bail!("Seed directory ({:?}) doesn't exist.", config.seed_dir);
} }
let store = RakerStore::open(&config.raker.workbench_dir.join("raker.mdbx"))?; let store = RakerStore::open(&config.raker.database_uri).await?;
import_seeds(store.clone(), &config).await?; import_seeds(store.clone(), &config).await?;
@ -60,7 +56,9 @@ pub async fn main() -> anyhow::Result<()> {
eprintln!("... re-applying seeds and weeds to on-hold URLs ..."); eprintln!("... re-applying seeds and weeds to on-hold URLs ...");
store store
.async_rw_txn(|txn| maintenance::reapply_seeds_and_weeds_to_on_hold_urls(txn)) .rw_txn(move |mut txn| {
Box::pin(async move { maintenance::reapply_seeds_and_weeds_to_on_hold_urls(txn).await })
})
.await?; .await?;
eprintln!("... done!"); eprintln!("... done!");
@ -137,18 +135,23 @@ async fn importer(
) -> anyhow::Result<SeedImportStats> { ) -> anyhow::Result<SeedImportStats> {
let mut buf = Vec::with_capacity(BATCH_SIZE); let mut buf = Vec::with_capacity(BATCH_SIZE);
let mut stats = SeedImportStats::default(); let mut stats = SeedImportStats::default();
let client = Client::new(); let client = Arc::new(Client::new());
while let Some(seed) = recv.recv().await { while let Some(seed) = recv.recv().await {
buf.push(seed); buf.push(seed);
if buf.len() == BATCH_SIZE { if buf.len() == BATCH_SIZE {
import_and_flush_batch_seeds_or_weeds( stats = import_and_flush_batch_seeds_or_weeds(
&store, &mut buf, &mut stats, &client, !are_weeds, &store,
buf,
stats,
client.clone(),
!are_weeds,
) )
.await?; .await?;
buf = Vec::new();
} }
} }
import_and_flush_batch_seeds_or_weeds(&store, &mut buf, &mut stats, &client, !are_weeds) stats = import_and_flush_batch_seeds_or_weeds(&store, buf, stats, client.clone(), !are_weeds)
.await?; .await?;
Ok(stats) Ok(stats)
@ -156,19 +159,21 @@ async fn importer(
async fn import_and_flush_batch_seeds_or_weeds( async fn import_and_flush_batch_seeds_or_weeds(
store: &RakerStore, store: &RakerStore,
buf: &mut Vec<Seed>, mut buf: Vec<Seed>,
stats: &mut SeedImportStats, mut stats: SeedImportStats,
client: &Client, client: Arc<Client>,
is_seed: bool, is_seed: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<SeedImportStats> {
let txn = store.rw_txn()?; store
.rw_txn(move |mut txn| {
Box::pin(async move {
for seed in buf.drain(..) { for seed in buf.drain(..) {
let as_url = Url::parse(seed.url.as_str()) let as_url = Url::parse(seed.url.as_str())
.with_context(|| format!("Failed to parse {:?} as URL", seed.url))?; .with_context(|| format!("Failed to parse {:?} as URL", seed.url))?;
let domain = get_reduced_domain(&as_url) let domain = get_reduced_domain(&as_url)
.with_context(|| format!("No domain in seed URL '{as_url}'!"))?; .with_context(|| format!("No domain in seed URL '{as_url}'!"))?;
let domain_record = txn.get_domain_record(domain.borrow())?; let domain_record = txn.get_domain_record(domain.borrow()).await?;
let is_domain_new = domain_record.is_none(); let is_domain_new = domain_record.is_none();
let mut domain_record = domain_record.unwrap_or_default(); let mut domain_record = domain_record.unwrap_or_default();
if is_domain_new { if is_domain_new {
@ -177,13 +182,17 @@ async fn import_and_flush_batch_seeds_or_weeds(
let mut dirty = is_domain_new; let mut dirty = is_domain_new;
// Register the domain. This is a no-op if it's already active or backing off. // Register the domain. This is a no-op if it's already active or backing off.
txn.insert_active_domain_with_new_raffle_ticket(domain.clone().into_owned())?; txn.insert_active_domain_with_new_raffle_ticket(domain.clone().into_owned())
.await?;
let url_like = match &seed.url { let url_like = match &seed.url {
UrlOrUrlPattern::Url(url_str) => { UrlOrUrlPattern::Url(url_str) => {
let url = Url::parse(url_str.as_str())?; let url = Url::parse(url_str.as_str())?;
if is_seed { if is_seed {
if txn.enqueue_url(url.as_str(), None, RakeIntent::Any)? { if txn
.enqueue_url(url.as_str(), None, RakeIntent::Any, false)
.await?
{
stats.new_urls += 1; stats.new_urls += 1;
} else { } else {
stats.already_present_urls += 1; stats.already_present_urls += 1;
@ -201,7 +210,15 @@ async fn import_and_flush_batch_seeds_or_weeds(
UrlOrUrlPattern::UrlPrefix(prefix) => { UrlOrUrlPattern::UrlPrefix(prefix) => {
let prefix_as_url = Url::parse(prefix.as_str())?; let prefix_as_url = Url::parse(prefix.as_str())?;
if is_seed { if is_seed {
if txn.enqueue_url(prefix_as_url.as_str(), None, RakeIntent::Any)? { if txn
.enqueue_url(
prefix_as_url.as_str(),
None,
RakeIntent::Any,
false,
)
.await?
{
stats.new_urls += 1; stats.new_urls += 1;
} else { } else {
stats.already_present_urls += 1; stats.already_present_urls += 1;
@ -218,7 +235,8 @@ async fn import_and_flush_batch_seeds_or_weeds(
}; };
if dirty { if dirty {
txn.put_domain_record(domain.borrow(), domain_record)?; txn.put_domain_record(domain.borrow(), domain_record)
.await?;
} }
if is_seed { if is_seed {
@ -226,13 +244,21 @@ async fn import_and_flush_batch_seeds_or_weeds(
if let Some(robots_txt) = get_robots_txt_for(&url_like, &client).await? { if let Some(robots_txt) = get_robots_txt_for(&url_like, &client).await? {
for sitemap in robots_txt.sitemaps { for sitemap in robots_txt.sitemaps {
if SUPPORTED_SCHEMES.contains(&sitemap.url.scheme()) { if SUPPORTED_SCHEMES.contains(&sitemap.url.scheme()) {
txn.enqueue_url(sitemap.url.as_str(), None, RakeIntent::SiteMap)?; txn.enqueue_url(
sitemap.url.as_str(),
None,
RakeIntent::SiteMap,
false,
)
.await?;
stats.new_sitemaps += 1; stats.new_sitemaps += 1;
} }
} }
} }
} }
} }
txn.commit()?; Ok(stats)
Ok(()) })
})
.await
} }

View File

@ -19,8 +19,9 @@ pub struct RakerOnlyConfig {
/// Path to data files /// Path to data files
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// Path to the raker's workbench (queue etc) /// URI to connect to Postgres.
pub workbench_dir: PathBuf, /// e.g. `postgres://user:password@host:port/dbname`
pub database_uri: String,
/// Directory where new rake packs will be emitted /// Directory where new rake packs will be emitted
pub emit_dir: PathBuf, pub emit_dir: PathBuf,
@ -36,17 +37,19 @@ pub struct RakerOnlyConfig {
pub struct RerakeTimings { pub struct RerakeTimings {
/// How long, in days, between re-rakes of the same page? /// How long, in days, between re-rakes of the same page?
/// Suggested: 300 /// Suggested: 300
pub page: u16, pub page: i32,
/// How long, in days, between re-rakes of feeds? /// How long, in days, between re-rakes of feeds?
/// Suggested: 10 /// Suggested: 10
pub feed: u16, pub feed: i32,
/// How long, in days, between re-rakes of icons? /// How long, in days, between re-rakes of icons?
/// Suggested: 365 /// Suggested: 365
pub icon: u16, pub icon: i32,
} }
pub const DAY_SEC: i32 = 86400;
impl RakerConfig { impl RakerConfig {
/// Loads a config at the specified path. /// Loads a config at the specified path.
/// Will resolve all the paths in the RakerConfig for you. /// Will resolve all the paths in the RakerConfig for you.
@ -57,7 +60,6 @@ impl RakerConfig {
raker_config.raker.data_dir = config_dir.join(raker_config.raker.data_dir); raker_config.raker.data_dir = config_dir.join(raker_config.raker.data_dir);
raker_config.seed_dir = config_dir.join(raker_config.seed_dir); raker_config.seed_dir = config_dir.join(raker_config.seed_dir);
raker_config.raker.workbench_dir = config_dir.join(raker_config.raker.workbench_dir);
raker_config.raker.emit_dir = config_dir.join(raker_config.raker.emit_dir); raker_config.raker.emit_dir = config_dir.join(raker_config.raker.emit_dir);
Ok(raker_config) Ok(raker_config)

View File

@ -126,7 +126,8 @@ impl Display for PermanentFailure {
impl Error for PermanentFailure {} impl Error for PermanentFailure {}
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, sqlx::Type)]
// supposedly we need this, but doesn't seem to work ... #[sqlx(postgres(oid = 16386))]
pub enum RakeIntent { pub enum RakeIntent {
Any, Any,
Page, Page,
@ -140,11 +141,11 @@ impl FromStr for RakeIntent {
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s.to_lowercase().as_ref() { Ok(match s.to_lowercase().as_ref() {
"any" => RakeIntent::Any, "Any" => RakeIntent::Any,
"page" => RakeIntent::Page, "Page" => RakeIntent::Page,
"feed" => RakeIntent::Feed, "Feed" => RakeIntent::Feed,
"sitemap" => RakeIntent::SiteMap, "SiteMap" => RakeIntent::SiteMap,
"icon" => RakeIntent::Icon, "Icon" => RakeIntent::Icon,
other => { other => {
bail!("Unrecognised intent: {:?}", other) bail!("Unrecognised intent: {:?}", other)
} }
@ -152,6 +153,18 @@ impl FromStr for RakeIntent {
} }
} }
impl Into<&'static str> for RakeIntent {
fn into(self) -> &'static str {
match self {
RakeIntent::Any => "Any",
RakeIntent::Page => "Page",
RakeIntent::Feed => "Feed",
RakeIntent::SiteMap => "SiteMap",
RakeIntent::Icon => "Icon",
}
}
}
impl From<ReferenceKind> for RakeIntent { impl From<ReferenceKind> for RakeIntent {
fn from(kind: ReferenceKind) -> Self { fn from(kind: ReferenceKind) -> Self {
match kind { match kind {

View File

@ -1,13 +1,12 @@
use crate::config::RerakeTimings; use crate::config::{RerakeTimings, DAY_SEC};
use crate::raking::references::{clean_url, references_from_urlrakes, SUPPORTED_SCHEMES}; use crate::raking::references::{clean_url, references_from_urlrakes, SUPPORTED_SCHEMES};
use crate::raking::{ use crate::raking::{
get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeIntent, get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeIntent,
RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
}; };
use crate::storage::records::{DomainRecord, UrlVisitedRecord}; use crate::storage::records::DomainRecord;
use crate::storage::RakerStore; use crate::storage::RakerStore;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use chrono::Utc;
use cylon::Cylon; use cylon::Cylon;
use log::{debug, warn}; use log::{debug, warn};
use lru::LruCache; use lru::LruCache;
@ -15,16 +14,16 @@ use metrics::increment_counter;
use quickpeep_structs::rake_entries::{ use quickpeep_structs::rake_entries::{
IconEntry, RakedPageEntry, RakedReference, RakedReferrerEntry, ReferenceKind, IconEntry, RakedPageEntry, RakedReference, RakedReferrerEntry, ReferenceKind,
}; };
use quickpeep_utils::dates::date_to_quickpeep_days; use quickpeep_utils::dates::QUICKPEEP_EPOCH_ST;
use quickpeep_utils::urls::get_reduced_domain; use quickpeep_utils::urls::get_reduced_domain;
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use std::borrow::{Borrow, Cow}; use std::borrow::{Borrow, Cow};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::{Duration, SystemTime};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::{Notify, Semaphore}; use tokio::sync::{Mutex, Notify, Semaphore};
use tokio::time::Instant; use tokio::time::Instant;
/// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the /// A crawl delay that is greater than 61 seconds will cause the domain to lose its place in the
@ -64,7 +63,7 @@ pub struct TaskContext {
pub raker: Arc<Raker>, pub raker: Arc<Raker>,
/// Busy domains (that are being processed by other tasks) /// Busy domains (that are being processed by other tasks)
pub busy_domains: Arc<StdMutex<HashSet<String>>>, pub busy_domains: Arc<Mutex<HashSet<String>>>,
/// Cache of robots.txt entries for recently-made dormant sites /// Cache of robots.txt entries for recently-made dormant sites
pub robotstxt_cache: Arc<RwLock<LruCache<String, Option<Cylon>>>>, pub robotstxt_cache: Arc<RwLock<LruCache<String, Option<Cylon>>>>,
@ -94,24 +93,31 @@ impl TaskContext {
let mut current_robot_rules: Option<Cylon> = None; let mut current_robot_rules: Option<Cylon> = None;
let mut wait_until: Option<Instant> = None; let mut wait_until: Option<Instant> = None;
let domain_record = { let domain2 = domain.clone();
let txn = self.store.ro_txn()?; let dr = self
let dr = txn.get_domain_record(&domain)?; .store
match dr { .ro_txn(move |mut txn| Box::pin(async move { txn.get_domain_record(&domain2).await }))
.await?;
let domain_record = match dr {
None => { None => {
return Ok(()); return Ok(());
} }
Some(dr) => dr, Some(dr) => dr,
}
}; };
while !self.graceful_stop.load(Ordering::Relaxed) { while !self.graceful_stop.load(Ordering::Relaxed) {
// Get a URL to process // Get a URL to process
let url = { let domain2 = domain.clone();
let txn = self.store.ro_txn()?; let url = self
txn.choose_url_for_domain(&domain) .store
.context("failed to choose URL for domain")? .ro_txn(move |mut txn| {
}; Box::pin(async move {
txn.choose_url_for_domain(&domain2)
.await
.context("failed to choose URL for domain")
})
})
.await?;
let (url_str, url_record) = if let Some(url) = url { let (url_str, url_record) = if let Some(url) = url {
url url
@ -121,10 +127,11 @@ impl TaskContext {
let domain = domain.to_owned(); let domain = domain.to_owned();
let out_of_urls = self let out_of_urls = self
.store .store
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
Box::pin(async move {
// Double-check we're still out of URLs (another could have been added since // Double-check we're still out of URLs (another could have been added since
// we last checked!) // we last checked!)
let out_of_urls = txn.choose_url_for_domain(&domain)?.is_none(); let out_of_urls = txn.choose_url_for_domain(&domain).await?.is_none();
if !out_of_urls { if !out_of_urls {
return Ok(false); return Ok(false);
@ -132,11 +139,11 @@ impl TaskContext {
// Delete the active domain from the store // Delete the active domain from the store
txn.remove_active_domain(&domain) txn.remove_active_domain(&domain)
.await
.context("failed to remove active domain")?; .context("failed to remove active domain")?;
txn.commit()?;
Ok(true) Ok(true)
}) })
})
.await .await
.context("failed to check if we're out of URLs")?; .context("failed to check if we're out of URLs")?;
if out_of_urls { if out_of_urls {
@ -154,10 +161,8 @@ impl TaskContext {
let domain = domain.clone(); let domain = domain.clone();
let url = url.clone(); let url = url.clone();
self.store self.store
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
txn.dequeue_url(&domain, url.as_str())?; Box::pin(async move { txn.dequeue_url(&domain, url.as_str()).await })
txn.commit()?;
Ok(())
}) })
.await?; .await?;
continue; continue;
@ -290,10 +295,11 @@ impl TaskContext {
let domain = domain.clone(); let domain = domain.clone();
let url = url.clone(); let url = url.clone();
let backoff = delay.as_secs().try_into().unwrap_or(u32::MAX); let backoff = delay.as_secs().try_into().unwrap_or(i32::MAX);
self.store self.store
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
Box::pin(async move {
txn.start_backing_off( txn.start_backing_off(
&domain, &domain,
backoff, backoff,
@ -305,10 +311,11 @@ impl TaskContext {
// Don't stack this up with a backoff; it's not an actual failure! // Don't stack this up with a backoff; it's not an actual failure!
backoff_sec: 0, backoff_sec: 0,
}, },
)?; )
txn.commit()?; .await?;
Ok(()) Ok(())
}) })
})
.await .await
.context("failure whilst turning long crawl delay into backoff")?; .context("failure whilst turning long crawl delay into backoff")?;
} }
@ -319,7 +326,10 @@ impl TaskContext {
/// Processes the outcome of /// Processes the outcome of
async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result<NextAction> { async fn process_outcome(&self, url: &Url, outcome: RakeOutcome) -> anyhow::Result<NextAction> {
let today = date_to_quickpeep_days(&Utc::today())?; let visited_on_ts = SystemTime::now()
.duration_since(*QUICKPEEP_EPOCH_ST)
.unwrap()
.as_secs() as i32;
match outcome { match outcome {
RakeOutcome::RakedPage(page) => { RakeOutcome::RakedPage(page) => {
self.submission self.submission
@ -335,11 +345,11 @@ impl TaskContext {
.context("Reference processor shut down; can't stream references!")?; .context("Reference processor shut down; can't stream references!")?;
self.as_event_processor() self.as_event_processor()
.process_page(url.clone(), page.page_entry, today) .process_page(url.clone(), page.page_entry, visited_on_ts)
.await .await
.context("failure processing page for RakedPage")?; .context("failure processing page for RakedPage")?;
self.as_event_processor() self.as_event_processor()
.process_refs(url.clone(), page.referrer_entry, today, false) .process_refs(url.clone(), page.referrer_entry, visited_on_ts, false)
.await .await
.context("failure processing refs for RakedPage")?; .context("failure processing refs for RakedPage")?;
@ -357,7 +367,7 @@ impl TaskContext {
.context("Reference processor shut down; can't stream references!")?; .context("Reference processor shut down; can't stream references!")?;
self.as_event_processor() self.as_event_processor()
.process_refs(url.clone(), refs, today, true) .process_refs(url.clone(), refs, visited_on_ts, true)
.await .await
.context("failure processing refs for RakedFeed")?; .context("failure processing refs for RakedFeed")?;
@ -375,7 +385,7 @@ impl TaskContext {
.context("Reference processor shut down; can't stream references!")?; .context("Reference processor shut down; can't stream references!")?;
self.as_event_processor() self.as_event_processor()
.process_refs(url.clone(), refs, today, true) .process_refs(url.clone(), refs, visited_on_ts, true)
.await .await
.context("failure processing refs for RakedSitemap")?; .context("failure processing refs for RakedSitemap")?;
@ -395,7 +405,7 @@ impl TaskContext {
.await?; .await?;
self.as_event_processor() self.as_event_processor()
.process_icon(url.clone(), today) .process_icon(url.clone(), visited_on_ts)
.await .await
.context("failure processing icon for RakedIcon")?; .context("failure processing icon for RakedIcon")?;
@ -423,7 +433,7 @@ impl TaskContext {
.context("Reference processor shut down; can't stream references!")?; .context("Reference processor shut down; can't stream references!")?;
self.as_event_processor() self.as_event_processor()
.process_refs(url.clone(), refs, today, false) .process_refs(url.clone(), refs, visited_on_ts, false)
.await .await
.context("Failure processing refs for Redirect")?; .context("Failure processing refs for Redirect")?;
@ -439,14 +449,15 @@ impl TaskContext {
let url = url.clone(); let url = url.clone();
// TODO(feature) add 1.1× the previous backoff, if there was one. // TODO(feature) add 1.1× the previous backoff, if there was one.
let new_backoff = failure.backoff_sec; let new_backoff = failure.backoff_sec as i32;
let domain = domain.into_owned(); let domain = domain.into_owned();
self.store self.store
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
txn.start_backing_off(&domain, new_backoff, url.to_string(), failure)?; Box::pin(async move {
txn.commit()?; txn.start_backing_off(&domain, new_backoff, url.to_string(), failure)
Ok(()) .await
})
}) })
.await .await
.context("failed to store backoff")?; .context("failed to store backoff")?;
@ -461,7 +472,7 @@ impl TaskContext {
.await .await
.context("Rejection processor shut down; can't stream rejection!!")?; .context("Rejection processor shut down; can't stream rejection!!")?;
self.as_event_processor() self.as_event_processor()
.process_rejection(url.clone(), today) .process_rejection(url.clone(), visited_on_ts)
.await .await
.context("failed to process rejection for PermanentFailure")?; .context("failed to process rejection for PermanentFailure")?;
@ -493,59 +504,59 @@ impl EventProcessor<'_> {
&self, &self,
url: Url, url: Url,
page: RakedPageEntry, page: RakedPageEntry,
datestamp: u16, visited_on_ts: i32,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let rerake_on = Some(datestamp + self.rerake_timings.page); let rerake_on = Some(visited_on_ts + self.rerake_timings.page * DAY_SEC);
self.store self.store
.as_ref() .as_ref()
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
Box::pin(async move {
let domain = get_reduced_domain(&url).with_context(|| { let domain = get_reduced_domain(&url).with_context(|| {
format!("No domain for URL '{url}' for which we are processing the page!") format!("No domain for URL '{url}' for which we are processing the page!")
})?; })?;
txn.mark_url_as_visited( txn.mark_url_as_visited(
domain.as_ref(), domain.as_ref(),
url.as_ref(), url.as_ref(),
UrlVisitedRecord { visited_on_ts,
last_visited_days: datestamp,
},
rerake_on, rerake_on,
)?; )
.await?;
// If there's a favicon to be tried, add it to the list... // If there's a favicon to be tried, add it to the list...
let favicon_url_rel = page.document.head.effective_favicon_url(); let favicon_url_rel = page.document.head.effective_favicon_url();
if let Ok(favicon_url) = url.join(favicon_url_rel) { if let Ok(favicon_url) = url.join(favicon_url_rel) {
if SUPPORTED_SCHEMES.contains(&favicon_url.scheme()) { if SUPPORTED_SCHEMES.contains(&favicon_url.scheme()) {
txn.enqueue_url(favicon_url.as_str(), None, RakeIntent::Icon)?; txn.enqueue_url(favicon_url.as_str(), None, RakeIntent::Icon, false)
.await?;
} }
} }
txn.commit()?;
Ok(()) Ok(())
}) })
})
.await .await
} }
pub async fn process_icon(&self, url: Url, datestamp: u16) -> anyhow::Result<()> { pub async fn process_icon(&self, url: Url, visited_on_ts: i32) -> anyhow::Result<()> {
let rerake_on = Some(datestamp + self.rerake_timings.icon); let rerake_on = Some(visited_on_ts + self.rerake_timings.icon * DAY_SEC);
self.store self.store
.as_ref() .as_ref()
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
Box::pin(async move {
let domain = get_reduced_domain(&url).with_context(|| { let domain = get_reduced_domain(&url).with_context(|| {
format!("No domain for URL '{url}' for which we are processing an icon!") format!("No domain for URL '{url}' for which we are processing an icon!")
})?; })?;
txn.mark_url_as_visited( txn.mark_url_as_visited(
domain.as_ref(), domain.as_ref(),
url.as_ref(), url.as_ref(),
UrlVisitedRecord { visited_on_ts,
last_visited_days: datestamp,
},
rerake_on, rerake_on,
)?; )
.await?;
txn.commit()?;
Ok(()) Ok(())
}) })
})
.await .await
} }
@ -553,28 +564,28 @@ impl EventProcessor<'_> {
&self, &self,
url: Url, url: Url,
refs: RakedReferrerEntry, refs: RakedReferrerEntry,
datestamp: u16, visited_on_ts: i32,
rerakeable_feed: bool, rerakeable_feed: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let rerake_on = if rerakeable_feed { let rerake_on = if rerakeable_feed {
Some(self.rerake_timings.feed) Some(visited_on_ts + self.rerake_timings.feed * DAY_SEC)
} else { } else {
None None
}; };
self.store self.store
.as_ref() .as_ref()
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
Box::pin(async move {
let domain = get_reduced_domain(&url).with_context(|| { let domain = get_reduced_domain(&url).with_context(|| {
format!("No domain for URL '{url}' for which we are processing refs!") format!("No domain for URL '{url}' for which we are processing refs!")
})?; })?;
txn.mark_url_as_visited( txn.mark_url_as_visited(
domain.as_ref(), domain.as_ref(),
url.as_ref(), url.as_ref(),
UrlVisitedRecord { visited_on_ts,
last_visited_days: datestamp,
},
rerake_on, rerake_on,
) )
.await
.context("failed to mark URL as visited")?; .context("failed to mark URL as visited")?;
// track all the referred-to URLs! // track all the referred-to URLs!
@ -591,17 +602,24 @@ impl EventProcessor<'_> {
// Check if this URL is an allowed URL (hence should be enqueued) // Check if this URL is an allowed URL (hence should be enqueued)
let allowed = txn let allowed = txn
.get_domain_record(domain.borrow())? .get_domain_record(domain.borrow())
.await?
.map(|record: DomainRecord| record.is_url_rakeable(&ref_url)) .map(|record: DomainRecord| record.is_url_rakeable(&ref_url))
.flatten(); .flatten();
match allowed { match allowed {
Some(true) => { Some(true) => {
let is_fresh = txn.enqueue_url( let is_fresh = txn
.enqueue_url(
&reference.target, &reference.target,
reference.last_mod, reference.last_mod.map(|days| {
*QUICKPEEP_EPOCH_ST
+ Duration::from_secs(days as u64 * DAY_SEC as u64)
}),
reference.kind.into(), reference.kind.into(),
)?; false,
)
.await?;
if is_fresh { if is_fresh {
increment_counter!("qprake_queue_new_url"); increment_counter!("qprake_queue_new_url");
} }
@ -612,35 +630,40 @@ impl EventProcessor<'_> {
} }
None => { None => {
// It's neither allowed nor weeded, so put it on hold for later inspection // It's neither allowed nor weeded, so put it on hold for later inspection
txn.put_url_on_hold(&reference.target, reference.kind.into())?; txn.enqueue_url(
&reference.target,
reference.last_mod.map(|days| {
*QUICKPEEP_EPOCH_ST
+ Duration::from_secs(days as u64 * DAY_SEC as u64)
}),
reference.kind.into(),
true,
)
.await?;
} }
} }
} }
txn.commit()?;
Ok(()) Ok(())
}) })
})
.await .await
} }
pub async fn process_rejection(&self, url: Url, datestamp: u16) -> anyhow::Result<()> { pub async fn process_rejection(&self, url: Url, visited_on_ts: i32) -> anyhow::Result<()> {
self.store self.store
.as_ref() .as_ref()
.async_rw_txn(move |txn| { .rw_txn(move |mut txn| {
Box::pin(async move {
let domain = get_reduced_domain(&url).with_context(|| { let domain = get_reduced_domain(&url).with_context(|| {
format!("No domain for URL '{url}' for which we are processing a rejection!") format!(
"No domain for URL '{url}' for which we are processing a rejection!"
)
})?; })?;
txn.mark_url_as_visited( txn.mark_url_as_visited(domain.as_ref(), url.as_ref(), visited_on_ts, None)
domain.as_ref(), .await?;
url.as_ref(),
UrlVisitedRecord {
last_visited_days: datestamp,
},
None,
)?;
txn.commit()?;
Ok(()) Ok(())
}) })
})
.await .await
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,80 +1,41 @@
use crate::storage::mdbx_helper_types::{MdbxBare, MdbxString}; use crate::storage::records::DomainRecord;
use crate::storage::records::{DomainRecord, OnHoldUrlRecord};
use crate::storage::RakerTxn; use crate::storage::RakerTxn;
use anyhow::Context; use itertools::Itertools;
use libmdbx::{Database, WriteFlags, RW}; use sqlx::query_as;
use reqwest::Url; use sqlx::types::Json;
/// Runs one big transaction that: /// Re-enqueues domains
/// - scans on-hold URLs
/// - moves 'allowed' ones to the queue
/// - deletes 'weeds'
/// - leaves unknown ones alone
///
/// Ideally should be applied after importing seeds and weeds on an existing database. /// Ideally should be applied after importing seeds and weeds on an existing database.
pub fn reapply_seeds_and_weeds_to_on_hold_urls(txn: RakerTxn<RW>) -> anyhow::Result<()> { pub async fn reapply_seeds_and_weeds_to_on_hold_urls(
mut txn: RakerTxn<'_, '_, true>,
) -> anyhow::Result<()> {
struct DomainState { struct DomainState {
pub domain: String, pub domain_id: i32,
pub domain_record: Option<DomainRecord>, pub domain_record: Json<DomainRecord>,
} }
let urls_on_hold: &Database = &txn.mdbx.borrow_dbs().urls_on_hold; let reinstatable_domains: Vec<DomainState> = query_as!(DomainState, r#"
SELECT DISTINCT u.domain_id, domain_record AS "domain_record: Json<DomainRecord>" FROM url_queue
JOIN urls u USING (url_id)
JOIN domains USING (domain_id)
LEFT JOIN domain_backoffs db ON u.domain_id = db.domain_id
WHERE db.domain_id IS NULL
"#).fetch_all(&mut *txn.txn)
.await?;
let mut domain_state = None; for domain_state in reinstatable_domains {
let any_seeds = domain_state
// Scan through the on-hold URLs
let mut cur = txn.mdbx_txn.cursor(urls_on_hold)?;
let mut first_iteration = true;
while let Some((MdbxString(domain_then_url), MdbxBare(record))) = if first_iteration {
first_iteration = false;
cur.first::<MdbxString, MdbxBare<OnHoldUrlRecord>>()
} else {
cur.next::<MdbxString, MdbxBare<OnHoldUrlRecord>>()
}? {
let mut split = domain_then_url.as_ref().split("\n");
let domain = split.next().context("No first split..?")?;
let url_str = split.next().context("No URL")?;
// Is the domain new?
if domain_state
.as_ref()
.map(|ds: &DomainState| &ds.domain != domain)
.unwrap_or(true)
{
// Then load the relevant records for it.
domain_state = Some(DomainState {
domain: domain.to_owned(),
domain_record: txn.get_domain_record(domain)?,
});
}
let url = Url::parse(url_str)?;
let domain_state = domain_state.as_ref().unwrap();
let is_rakeable = domain_state
.domain_record .domain_record
.as_ref() .rakeable_path_prefixes
.map(|dr: &DomainRecord| dr.is_url_rakeable(&url)) .values()
.flatten(); .contains(&true);
if !any_seeds {
match is_rakeable { continue;
Some(true) => {
// ALLOWED
// Make it a queued URL
txn.enqueue_url(url_str, None, record.queue_record.intent)?;
cur.del(WriteFlags::empty())?;
}
Some(false) => {
// WEED
// Just delete
cur.del(WriteFlags::empty())?;
}
None => { /* nop: neither allowed nor a weed. Keep on hold. */ }
}
} }
txn.commit()?; // This domain has *some* seeds, let's just reinstate it.
txn.insert_active_domain_id_with_new_raffle_ticket(domain_state.domain_id)
.await?;
}
Ok(()) Ok(())
} }

View File

@ -1,190 +0,0 @@
use anyhow::{anyhow, Context};
use libmdbx::{Error, TableObject, TransactionKind};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
/// u16 in BIG byte endianness (u16 not supported by INTEGERKEY mode!)
#[derive(Copy, Clone, Debug)]
pub struct MdbxU16BE(pub u16);
impl MdbxU16BE {
pub fn as_bytes(&self) -> Cow<'_, [u8]> {
Cow::Owned(self.0.to_be_bytes().to_vec())
}
}
impl TableObject<'_> for MdbxU16BE {
fn decode(data_val: &[u8]) -> Result<Self, libmdbx::Error>
where
Self: Sized,
{
if data_val.len() != 2 {
return Err(libmdbx::Error::DecodeError(
anyhow!("MDBX Key not 2 bytes; can't be decoded as u16").into(),
));
}
let mut buf = [0u8; 2];
buf.copy_from_slice(&data_val);
Ok(MdbxU16BE(u16::from_be_bytes(buf)))
}
}
/// u32 in native byte endianness (as required by INTEGERKEY mode)
#[derive(Copy, Clone, Debug)]
pub struct MdbxU32(pub u32);
impl MdbxU32 {
pub fn as_bytes(&self) -> Cow<'_, [u8]> {
Cow::Owned(self.0.to_ne_bytes().to_vec())
}
}
impl TableObject<'_> for MdbxU32 {
fn decode(data_val: &[u8]) -> Result<Self, libmdbx::Error>
where
Self: Sized,
{
if data_val.len() != 4 {
return Err(libmdbx::Error::DecodeError(
anyhow!("MDBX Key not 4 bytes; can't be decoded as u32").into(),
));
}
let mut buf = [0u8; 4];
buf.copy_from_slice(&data_val);
Ok(MdbxU32(u32::from_ne_bytes(buf)))
}
}
/// u64 in native byte endianness (as required by INTEGERKEY mode)
#[derive(Copy, Clone, Debug)]
pub struct MdbxU64(pub u64);
impl MdbxU64 {
pub fn as_bytes(&self) -> Cow<'_, [u8]> {
Cow::Owned(self.0.to_ne_bytes().to_vec())
}
}
impl TableObject<'_> for MdbxU64 {
fn decode(data_val: &[u8]) -> Result<Self, libmdbx::Error>
where
Self: Sized,
{
if data_val.len() != 8 {
return Err(libmdbx::Error::DecodeError(
anyhow!("MDBX Key not 8 bytes; can't be decoded as u64").into(),
));
}
let mut buf = [0u8; 8];
buf.copy_from_slice(&data_val);
Ok(MdbxU64(u64::from_ne_bytes(buf)))
}
}
/// UTF-8 String
#[derive(Clone, Debug)]
pub struct MdbxString<'txn>(pub Cow<'txn, str>);
impl MdbxString<'_> {
pub fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
}
pub fn into_string(self) -> String {
self.0.into_owned()
}
}
impl<'a> TableObject<'_> for MdbxString<'a> {
fn decode(_data_val: &[u8]) -> Result<Self, libmdbx::Error>
where
Self: Sized,
{
unreachable!()
}
unsafe fn decode_val<K: TransactionKind>(
txnptr: *const mdbx_sys::MDBX_txn,
data_val: &mdbx_sys::MDBX_val,
) -> Result<Self, Error>
where
Self: Sized,
{
let bytes = MdbxBytes::decode_val::<K>(txnptr, data_val)?;
let string_cow = match bytes {
Cow::Borrowed(data) => {
let string = std::str::from_utf8(data)
.context("Failed to decode MDBX key as string")
.map_err(|e| libmdbx::Error::DecodeError(e.into()))?;
Cow::Borrowed(string)
}
Cow::Owned(data) => {
let string = String::from_utf8(data)
.context("Failed to decode MDBX key as string")
.map_err(|e| libmdbx::Error::DecodeError(e.into()))?;
Cow::Owned(string)
}
};
Ok(MdbxString(string_cow))
}
}
// /// UTF-8 String
// /// Using Cow<'txn, str> would've needed some unsafe code (see `Cow<'txn, [u8]>` for inspiration),
// /// so I didn't bother for now.
// #[derive(Clone, Debug)]
// pub struct MdbxString(pub String);
//
// impl MdbxString {
// pub fn as_bytes(&self) -> &[u8] {
// self.0.as_bytes()
// }
// }
//
// impl TableObject<'_> for MdbxString {
// fn decode(data_val: &[u8]) -> Result<Self, libmdbx::Error> where Self: Sized {
// let string = String::from_utf8(data_val.to_vec())
// .context("Failed to decode MDBX key as string")
// .map_err(|e| libmdbx::Error::DecodeError(e.into()))?;
// Ok(MdbxString(string))
// }
// }
/// Any BARE payload
#[derive(Clone, Debug)]
pub struct MdbxBare<T>(pub T);
impl<T: Serialize> MdbxBare<T> {
pub fn as_bytes(&self) -> Vec<u8> {
serde_bare::to_vec(&self.0).expect("It's unreasonable to expect serialisation will fail")
}
}
impl<T: DeserializeOwned> TableObject<'_> for MdbxBare<T> {
fn decode(_data_val: &[u8]) -> Result<Self, libmdbx::Error>
where
Self: Sized,
{
unreachable!()
}
unsafe fn decode_val<K: TransactionKind>(
txnptr: *const mdbx_sys::MDBX_txn,
data_val: &mdbx_sys::MDBX_val,
) -> Result<Self, Error>
where
Self: Sized,
{
let bytes = MdbxBytes::decode_val::<K>(txnptr, data_val)?;
let record = serde_bare::from_slice(bytes.as_ref())
.context("Failed to decode MDBX key as BARE object")
.map_err(|e| libmdbx::Error::DecodeError(e.into()))?;
Ok(MdbxBare(record))
}
}
/// Supported natively by libmdbx.
pub type MdbxBytes<'txn> = Cow<'txn, [u8]>;

View File

@ -1,2 +0,0 @@
pub const MIGRATION_KEY: &[u8] = b"MIGRATION_VERSION";
pub const MIGRATION_VERSION: &str = "quickpeep_raker:0.1.0";

View File

@ -1,50 +1,13 @@
use crate::raking::{RakeIntent, TemporaryFailure}; use crate::raking::RakeIntent;
use reqwest::Url; use reqwest::Url;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub struct ActiveDomainRecord {
/// The raffle ticket number owned by this domain.
pub raffle_ticket: u32,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct UrlVisitedRecord {
/// Number of days since the QuickPeep Epoch that this page was last raked at.
/// A u16 is fine here, giving 179 years worth of values. This allows compact encoding.
/// We don't really care about a more granular timestamp: sitemaps and feeds usually only
/// give the date of last update anyway.
pub last_visited_days: u16,
}
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct QueueUrlRecord { pub struct QueueUrlRecord {
pub intent: RakeIntent, // TODO CONSIDER pub intent: RakeIntent, // TODO CONSIDER
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct OnHoldUrlRecord {
/// Record that should be emitted once this is released.
pub queue_record: QueueUrlRecord,
/// Number of times this URL has been 'enqueued'; capped at 255.
pub refs: u8,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct BackingOffDomainRecord {
/// The URL that caused the backoff.
pub failed_url: String,
/// The reason that this backoff is in place
pub failure: TemporaryFailure,
/// Duration of the backoff. Used to provide increasing backoffs if the failures persist.
pub backoff: u32,
/// When the domain should be reinstated
/// MUST match the timestamp present in the reinstatements table.
pub reinstate_at: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct DomainRecord { pub struct DomainRecord {
pub rakeable_path_prefixes: BTreeMap<String, bool>, pub rakeable_path_prefixes: BTreeMap<String, bool>,

View File

@ -1,11 +1,15 @@
use anyhow::Context; use anyhow::Context;
use chrono::{Date, Duration, TimeZone, Utc}; use chrono::{Date, Duration, TimeZone, Utc};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::time::SystemTime;
lazy_static! { lazy_static! {
/// The QuickPeep Epoch is 2022-01-01, as this gives us 52 years of extra headroom compared to the /// The QuickPeep Epoch is 2023-01-01, as this gives us 53 years of extra headroom compared to the
/// Unix one. QuickPeep didn't exist before 2022 so we needn't worry about negative dates! /// Unix one. This QuickPeep database format didn't exist before 2023 so we needn't worry about negative dates!
pub static ref QUICKPEEP_EPOCH: Date<Utc> = Utc.ymd(2022, 1, 1); pub static ref QUICKPEEP_EPOCH: Date<Utc> = Utc.ymd(2023, 1, 1);
/// 2023-01-01
pub static ref QUICKPEEP_EPOCH_ST: SystemTime = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1672531200);
} }
pub fn date_from_quickpeep_days(days: u16) -> Date<Utc> { pub fn date_from_quickpeep_days(days: u16) -> Date<Utc> {

View File

@ -17,6 +17,8 @@ pkgs.mkShell {
pkgs.pkg-config pkgs.pkg-config
pkgs.sqlx-cli
#pkgs.libclang # ?? #pkgs.libclang # ??
]; ];