From bd16f58d9e6e4bd097f5e693cb1ffea7fba4e3e5 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 26 Nov 2022 20:07:12 +0000 Subject: [PATCH] Maintain an index file of rakepacks and append when a rakepack is finished --- quickpeep_raker/src/rakepack_emitter.rs | 30 ++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/quickpeep_raker/src/rakepack_emitter.rs b/quickpeep_raker/src/rakepack_emitter.rs index df0977b..3bf25d2 100644 --- a/quickpeep_raker/src/rakepack_emitter.rs +++ b/quickpeep_raker/src/rakepack_emitter.rs @@ -57,12 +57,15 @@ pub fn pack_emitter( Unit::Count, "Records emitted into a pack file" ); + + let pack_index_file = directory.join("index"); + loop { let now = Utc::now(); - // 2022-01-01 01:01:01 - let new_pack_file_path = loop { - let new_pack_file_path = - directory.join(format!("{}.{}.pack", now.format("%F_%T"), name)); + // 2022-01-01_01:01:01 + let (pack_name, new_pack_file_path) = loop { + let pack_name = format!("{}.{}.pack", now.format("%F_%T"), name); + let new_pack_file_path = directory.join(&pack_name); if new_pack_file_path.exists() { warn!( "{:?} already exists; sleeping to generate new timestamp.", @@ -70,11 +73,11 @@ pub fn pack_emitter( ); std::thread::sleep(Duration::from_secs(2)); } else { - break new_pack_file_path; + break (pack_name, new_pack_file_path); } }; - if !pack_emitter_to_file( + let file_cutoff_reached = pack_emitter_to_file( &new_pack_file_path, &mut rx, name, @@ -82,8 +85,19 @@ pub fn pack_emitter( settings, shutdown.clone(), shutdown_notify.clone(), - )? { - // File wasn't filled; the receiver was exhausted (we're shutting down). + )?; + + // Add an entry to the index. This essentially marks it as 'done' and enables + // a follower to catch up. + let mut index_file = OpenOptions::new() + .create(true) + .append(true) + .open(&pack_index_file)?; + index_file.write(format!("\n{}", pack_name).as_bytes())?; + index_file.flush()?; + + if !file_cutoff_reached { + // File wasn't filled; the receiver was exhausted (that means we're shutting down). break; } }