Maintain an index file of rakepacks and append when a rakepack is finished

rei/rakerstore_postgres_overhaul
Olivier 'reivilibre' 2022-11-26 20:07:12 +00:00
parent 52d0183942
commit bd16f58d9e
1 changed files with 22 additions and 8 deletions

View File

@ -57,12 +57,15 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
Unit::Count,
"Records emitted into a pack file"
);
let pack_index_file = directory.join("index");
loop {
let now = Utc::now();
// 2022-01-01 01:01:01
let new_pack_file_path = loop {
let new_pack_file_path =
directory.join(format!("{}.{}.pack", now.format("%F_%T"), name));
// 2022-01-01_01:01:01
let (pack_name, new_pack_file_path) = loop {
let pack_name = format!("{}.{}.pack", now.format("%F_%T"), name);
let new_pack_file_path = directory.join(&pack_name);
if new_pack_file_path.exists() {
warn!(
"{:?} already exists; sleeping to generate new timestamp.",
@ -70,11 +73,11 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
);
std::thread::sleep(Duration::from_secs(2));
} else {
break new_pack_file_path;
break (pack_name, new_pack_file_path);
}
};
if !pack_emitter_to_file(
let file_cutoff_reached = pack_emitter_to_file(
&new_pack_file_path,
&mut rx,
name,
@ -82,8 +85,19 @@ pub fn pack_emitter<T: Serialize + Send + 'static>(
settings,
shutdown.clone(),
shutdown_notify.clone(),
)? {
// File wasn't filled; the receiver was exhausted (we're shutting down).
)?;
// Add an entry to the index. This essentially marks it as 'done' and enables
// a follower to catch up.
let mut index_file = OpenOptions::new()
.create(true)
.append(true)
.open(&pack_index_file)?;
index_file.write(format!("\n{}", pack_name).as_bytes())?;
index_file.flush()?;
if !file_cutoff_reached {
// File wasn't filled; the receiver was exhausted (that means we're shutting down).
break;
}
}