Some bugfixes that get the raker mostly going
This commit is contained in:
parent
51d5b9208b
commit
e0fb714f7a
|
@ -13,3 +13,4 @@ qp_web.ron
|
||||||
dist
|
dist
|
||||||
book
|
book
|
||||||
workbench
|
workbench
|
||||||
|
rakepacks
|
|
@ -6,3 +6,5 @@ emit_dir = "./rakepacks"
|
||||||
[metrics]
|
[metrics]
|
||||||
prometheus = "127.0.0.1:9774"
|
prometheus = "127.0.0.1:9774"
|
||||||
# bare_metrics = true
|
# bare_metrics = true
|
||||||
|
|
||||||
|
[pack_emitter]
|
||||||
|
|
|
@ -4,7 +4,7 @@ use env_logger::Env;
|
||||||
|
|
||||||
use adblock::lists::RuleTypes;
|
use adblock::lists::RuleTypes;
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use log::{error, warn};
|
use log::{debug, error, warn};
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
|
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
|
||||||
|
@ -56,7 +56,11 @@ pub struct Opts {
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
pub async fn main() -> anyhow::Result<()> {
|
pub async fn main() -> anyhow::Result<()> {
|
||||||
env_logger::Builder::from_env(Env::default().default_filter_or("info,quickpeep=debug")).init();
|
env_logger::Builder::from_env(
|
||||||
|
Env::default().default_filter_or("info,quickpeep_raker=debug,qp_raker=debug"),
|
||||||
|
)
|
||||||
|
.init();
|
||||||
|
debug!("Starting up...");
|
||||||
|
|
||||||
let opts: Opts = Opts::parse();
|
let opts: Opts = Opts::parse();
|
||||||
|
|
||||||
|
@ -233,6 +237,12 @@ pub async fn main() -> anyhow::Result<()> {
|
||||||
for task in tasks {
|
for task in tasks {
|
||||||
task.await?;
|
task.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for task in emitters {
|
||||||
|
let result = task.join().expect("Can't join thread");
|
||||||
|
result?;
|
||||||
|
}
|
||||||
|
|
||||||
let _ = dsmu_cancel_tx.send(());
|
let _ = dsmu_cancel_tx.send(());
|
||||||
datastore_metrics_updater.await??;
|
datastore_metrics_updater.await??;
|
||||||
|
|
||||||
|
|
|
@ -231,20 +231,8 @@ impl TaskContext {
|
||||||
|
|
||||||
match self.process_outcome(&url, rake_outcome).await? {
|
match self.process_outcome(&url, rake_outcome).await? {
|
||||||
NextAction::Continue => {
|
NextAction::Continue => {
|
||||||
// Take that URL off the queue
|
// The URL has already been taken off the queue.
|
||||||
let now = Utc::today();
|
// We just need to continue!
|
||||||
let record = UrlVisitedRecord {
|
|
||||||
last_visited_days: date_to_quickpeep_days(&now)?,
|
|
||||||
};
|
|
||||||
|
|
||||||
let domain = domain.clone();
|
|
||||||
self.store
|
|
||||||
.async_rw_txn(move |txn| {
|
|
||||||
txn.mark_url_as_visited(&domain, &url_str, record)?;
|
|
||||||
txn.commit()?;
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
NextAction::ChangeDomain => {
|
NextAction::ChangeDomain => {
|
||||||
let mut cache = self
|
let mut cache = self
|
||||||
|
|
|
@ -324,10 +324,13 @@ impl<'a> RakerTxn<'a, RW> {
|
||||||
|
|
||||||
let queue_key = format!("{}\n{}", domain, url_str);
|
let queue_key = format!("{}\n{}", domain, url_str);
|
||||||
|
|
||||||
ensure!(
|
// We legitimately want this to NOP when already dequeued; so don't ensure the opposite.
|
||||||
self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?,
|
// ensure!(
|
||||||
"No queued URL to delete"
|
// self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?,
|
||||||
);
|
// "No queued URL to delete ({})", queue_key
|
||||||
|
// );
|
||||||
|
|
||||||
|
self.mdbx_txn.del(&queue_urls, queue_key.as_bytes(), None)?;
|
||||||
|
|
||||||
self.mdbx_txn.put(
|
self.mdbx_txn.put(
|
||||||
visited_urls,
|
visited_urls,
|
||||||
|
|
Loading…
Reference in New Issue