From 4f85aebd380def6db167108e883ae117031a1cf2 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sun, 20 Mar 2022 06:33:39 +0000 Subject: [PATCH] Theoretically allow graceful stop --- quickpeep_raker/src/bin/qp-raker.rs | 3 +++ quickpeep_raker/src/raking/task.rs | 11 +++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/quickpeep_raker/src/bin/qp-raker.rs b/quickpeep_raker/src/bin/qp-raker.rs index 1c0f51e..5eddb0c 100644 --- a/quickpeep_raker/src/bin/qp-raker.rs +++ b/quickpeep_raker/src/bin/qp-raker.rs @@ -9,6 +9,7 @@ use lru::LruCache; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use reqwest::redirect::Policy; use std::path::PathBuf; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use tokio::fs::File; @@ -119,6 +120,7 @@ pub async fn main() -> anyhow::Result<()> { rejections: rejections_tx, }; + let graceful_stop = Arc::new(AtomicBool::new(false)); let task_context = TaskContext { store: store.clone(), client: Default::default(), @@ -127,6 +129,7 @@ pub async fn main() -> anyhow::Result<()> { robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))), semaphore, submission, + graceful_stop, }; let mut tasks = Vec::with_capacity(num_tasks as usize); diff --git a/quickpeep_raker/src/raking/task.rs b/quickpeep_raker/src/raking/task.rs index 0d9e837..1ee256e 100644 --- a/quickpeep_raker/src/raking/task.rs +++ b/quickpeep_raker/src/raking/task.rs @@ -4,7 +4,7 @@ use crate::raking::{ get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, }; -use crate::storage::records::{ActiveDomainRecord, UrlVisitedRecord}; +use crate::storage::records::UrlVisitedRecord; use crate::storage::RakerStore; use anyhow::{anyhow, Context}; use chrono::Utc; @@ -18,6 +18,7 @@ use quickpeep_utils::dates::date_to_quickpeep_days; use reqwest::{Client, Url}; use std::borrow::Cow; use std::collections::HashSet; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::time::Duration; use tokio::sync::mpsc::Sender; @@ -66,19 +67,21 @@ pub struct TaskContext { pub semaphore: Arc, pub submission: TaskResultSubmission, + + pub graceful_stop: Arc, } impl TaskContext { pub async fn run(mut self) -> anyhow::Result<()> { // Get a domain to process - loop { + while !self.graceful_stop.load(Ordering::Relaxed) { let domain = { let txn = self.store.ro_txn()?; txn.choose_random_active_domain()? }; match domain { - Some((domain, active_record)) => { + Some((domain, _active_record)) => { let is_ours = { let mut busy_domains = self .busy_domains @@ -116,7 +119,7 @@ impl TaskContext { let mut current_robot_rules: Option = None; let mut wait_until: Option = None; - loop { + while !self.graceful_stop.load(Ordering::Relaxed) { // Get a URL to process let url = { let txn = self.store.ro_txn()?;