From d0934e12a85aaaff10c52276ab76702fdd195276 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 9 Jul 2021 21:09:18 +0100 Subject: [PATCH] Make the remote end shut down when done --- yama/src/pile.rs | 2 +- yama/src/remote.rs | 3 +- yama/src/remote/requester.rs | 62 +++++++++++++++++++++++++++++++++--- yama/src/remote/responder.rs | 7 ++++ 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/yama/src/pile.rs b/yama/src/pile.rs index a86af5b..d5ffd27 100644 --- a/yama/src/pile.rs +++ b/yama/src/pile.rs @@ -58,7 +58,7 @@ pub struct RemoteDescriptor { pub path: PathBuf, } -#[derive(PartialOrd, PartialEq, Copy, Clone, Serialize, Deserialize)] +#[derive(PartialOrd, PartialEq, Copy, Clone, Serialize, Deserialize, Eq)] pub enum Keyspace { Chunk, ChunkHash, diff --git a/yama/src/remote.rs b/yama/src/remote.rs index d762fc8..a24c1d0 100644 --- a/yama/src/remote.rs +++ b/yama/src/remote.rs @@ -35,7 +35,7 @@ pub struct Request { body: RequestBody, } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)] pub enum RequestBody { Read { kind: Keyspace, @@ -62,6 +62,7 @@ pub enum RequestBody { }, Flush, LowLevelCheck, + Shutdown, } #[derive(Serialize, Deserialize, Clone)] diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 3827542..3e77145 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -10,6 +10,7 @@ use log::{error, info}; use crate::pile::{Keyspace, RawPile}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; +use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; pub struct Requester { commands: Sender<(RequestBody, Sender)>, @@ -25,11 +26,14 @@ impl Requester { let (command_sender, command_receiver) = crossbeam_channel::bounded(16); let mut handles = Vec::new(); + let shutdown_signal: Arc<(AtomicU16, AtomicBool)> = Default::default(); + { // Spawn a reader let in_flight = in_flight.clone(); + let shutdown_signal = shutdown_signal.clone(); handles.push(thread::spawn(move || { - if let Err(e) = Self::reader(read, in_flight) { + if let Err(e) = Self::reader(read, in_flight, shutdown_signal) { error!("reader failed: {:?}", e); } })); @@ -40,7 +44,7 @@ impl Requester { let in_flight = in_flight.clone(); let command_receiver = command_receiver.clone(); handles.push(thread::spawn(move || { - if let Err(e) = Self::writer(write, in_flight, command_receiver) { + if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) { error!("writer failed: {:?}", e); } })); @@ -60,13 +64,16 @@ impl Requester { let (command_sender, command_receiver) = crossbeam_channel::bounded(16); let mut handles = Vec::new(); + let shutdown_signal: Arc<(AtomicU16, AtomicBool)> = Default::default(); + { // Spawn a reader let in_flight = in_flight.clone(); + let shutdown_signal = shutdown_signal.clone(); handles.push(thread::spawn(move || { let stdin = stdin(); let read = stdin.lock(); - if let Err(e) = Self::reader(read, in_flight) { + if let Err(e) = Self::reader(read, in_flight, shutdown_signal) { error!("reader failed: {:?}", e); } })); @@ -79,7 +86,7 @@ impl Requester { handles.push(thread::spawn(move || { let stdout = stdout(); let write = stdout.lock(); - if let Err(e) = Self::writer(write, in_flight, command_receiver) { + if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) { error!("writer failed: {:?}", e); } })); @@ -97,10 +104,17 @@ impl Requester { fn reader( mut read: R, in_flight: Arc>>>, + shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>, ) -> anyhow::Result<()> { loop { let response: Response = read_message(&mut read)?; + if shutdown_request_channel.1.load(Ordering::Relaxed) + && response.response_to == shutdown_request_channel.0.load(Ordering::Relaxed) + { + return Ok(()); + } + let map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?; map.get(&response.response_to) .ok_or(anyhow!("Didn't find response channel..."))? @@ -114,6 +128,7 @@ impl Requester { mut write: W, in_flight: Arc>>>, command_receiver: Receiver<(RequestBody, Sender)>, + shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>, ) -> anyhow::Result<()> { while let Ok((req_body, response_channel)) = command_receiver.recv() { let request_id = { @@ -125,6 +140,15 @@ impl Requester { map.insert(request_id, response_channel); request_id }; + let shutting_down = &req_body == &RequestBody::Shutdown; + + if shutting_down { + shutdown_request_channel + .0 + .store(request_id, Ordering::SeqCst); + shutdown_request_channel.1.store(true, Ordering::SeqCst); + } + write_message( &mut write, &Request { @@ -133,7 +157,37 @@ impl Requester { }, )?; write.flush()?; + + if shutting_down { + return Ok(()); + } } + + info!("Exited send loop without shutdown message, issuing Shutdown."); + // shutdown ourselves + let request_id = { + let map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?; + let request_id = (0u16..u16::MAX) + .into_iter() + .find(|id| !map.contains_key(&id)) + .expect("No ID found"); + request_id + }; + + shutdown_request_channel + .0 + .store(request_id, Ordering::SeqCst); + shutdown_request_channel.1.store(true, Ordering::SeqCst); + + write_message( + &mut write, + &Request { + id: request_id, + body: RequestBody::Shutdown, + }, + )?; + write.flush()?; + Ok(()) } diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index 84118a1..69748b9 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -279,6 +279,13 @@ impl Responder { } } }, + RequestBody::Shutdown => { + warn!("Not yet implemented: Shutdown"); + Response { + response_to: request.id, + body: ResponseBody::Success, + } + } }; responses