Theoretically allow graceful stop

This commit is contained in:
Olivier 'reivilibre' 2022-03-20 06:33:39 +00:00
parent 085020b80d
commit 4f85aebd38
2 changed files with 10 additions and 4 deletions

View File

@ -9,6 +9,7 @@ use lru::LruCache;
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use reqwest::redirect::Policy; use reqwest::redirect::Policy;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
use tokio::fs::File; use tokio::fs::File;
@ -119,6 +120,7 @@ pub async fn main() -> anyhow::Result<()> {
rejections: rejections_tx, rejections: rejections_tx,
}; };
let graceful_stop = Arc::new(AtomicBool::new(false));
let task_context = TaskContext { let task_context = TaskContext {
store: store.clone(), store: store.clone(),
client: Default::default(), client: Default::default(),
@ -127,6 +129,7 @@ pub async fn main() -> anyhow::Result<()> {
robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))), robotstxt_cache: Arc::new(RwLock::new(LruCache::new(64))),
semaphore, semaphore,
submission, submission,
graceful_stop,
}; };
let mut tasks = Vec::with_capacity(num_tasks as usize); let mut tasks = Vec::with_capacity(num_tasks as usize);

View File

@ -4,7 +4,7 @@ use crate::raking::{
get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome, get_robots_txt_for, robots_txt_url_for, PermanentFailure, PermanentFailureReason, RakeOutcome,
Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason, Raker, RedirectReason, RobotsTxt, TemporaryFailure, TemporaryFailureReason,
}; };
use crate::storage::records::{ActiveDomainRecord, UrlVisitedRecord}; use crate::storage::records::UrlVisitedRecord;
use crate::storage::RakerStore; use crate::storage::RakerStore;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use chrono::Utc; use chrono::Utc;
@ -18,6 +18,7 @@ use quickpeep_utils::dates::date_to_quickpeep_days;
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, RwLock}; use std::sync::{Arc, Mutex as StdMutex, RwLock};
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
@ -66,19 +67,21 @@ pub struct TaskContext {
pub semaphore: Arc<Semaphore>, pub semaphore: Arc<Semaphore>,
pub submission: TaskResultSubmission, pub submission: TaskResultSubmission,
pub graceful_stop: Arc<AtomicBool>,
} }
impl TaskContext { impl TaskContext {
pub async fn run(mut self) -> anyhow::Result<()> { pub async fn run(mut self) -> anyhow::Result<()> {
// Get a domain to process // Get a domain to process
loop { while !self.graceful_stop.load(Ordering::Relaxed) {
let domain = { let domain = {
let txn = self.store.ro_txn()?; let txn = self.store.ro_txn()?;
txn.choose_random_active_domain()? txn.choose_random_active_domain()?
}; };
match domain { match domain {
Some((domain, active_record)) => { Some((domain, _active_record)) => {
let is_ours = { let is_ours = {
let mut busy_domains = self let mut busy_domains = self
.busy_domains .busy_domains
@ -116,7 +119,7 @@ impl TaskContext {
let mut current_robot_rules: Option<Cylon> = None; let mut current_robot_rules: Option<Cylon> = None;
let mut wait_until: Option<Instant> = None; let mut wait_until: Option<Instant> = None;
loop { while !self.graceful_stop.load(Ordering::Relaxed) {
// Get a URL to process // Get a URL to process
let url = { let url = {
let txn = self.store.ro_txn()?; let txn = self.store.ro_txn()?;