Make the remote end shut down when done

This commit is contained in:
Olivier 'reivilibre' 2021-07-09 21:09:18 +01:00
parent b008a80ca4
commit d0934e12a8
4 changed files with 68 additions and 6 deletions

View File

@ -58,7 +58,7 @@ pub struct RemoteDescriptor {
pub path: PathBuf, pub path: PathBuf,
} }
#[derive(PartialOrd, PartialEq, Copy, Clone, Serialize, Deserialize)] #[derive(PartialOrd, PartialEq, Copy, Clone, Serialize, Deserialize, Eq)]
pub enum Keyspace { pub enum Keyspace {
Chunk, Chunk,
ChunkHash, ChunkHash,

View File

@ -35,7 +35,7 @@ pub struct Request {
body: RequestBody, body: RequestBody,
} }
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum RequestBody { pub enum RequestBody {
Read { Read {
kind: Keyspace, kind: Keyspace,
@ -62,6 +62,7 @@ pub enum RequestBody {
}, },
Flush, Flush,
LowLevelCheck, LowLevelCheck,
Shutdown,
} }
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]

View File

@ -10,6 +10,7 @@ use log::{error, info};
use crate::pile::{Keyspace, RawPile}; use crate::pile::{Keyspace, RawPile};
use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody}; use crate::remote::{read_message, write_message, Request, RequestBody, Response, ResponseBody};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
pub struct Requester { pub struct Requester {
commands: Sender<(RequestBody, Sender<ResponseBody>)>, commands: Sender<(RequestBody, Sender<ResponseBody>)>,
@ -25,11 +26,14 @@ impl Requester {
let (command_sender, command_receiver) = crossbeam_channel::bounded(16); let (command_sender, command_receiver) = crossbeam_channel::bounded(16);
let mut handles = Vec::new(); let mut handles = Vec::new();
let shutdown_signal: Arc<(AtomicU16, AtomicBool)> = Default::default();
{ {
// Spawn a reader // Spawn a reader
let in_flight = in_flight.clone(); let in_flight = in_flight.clone();
let shutdown_signal = shutdown_signal.clone();
handles.push(thread::spawn(move || { handles.push(thread::spawn(move || {
if let Err(e) = Self::reader(read, in_flight) { if let Err(e) = Self::reader(read, in_flight, shutdown_signal) {
error!("reader failed: {:?}", e); error!("reader failed: {:?}", e);
} }
})); }));
@ -40,7 +44,7 @@ impl Requester {
let in_flight = in_flight.clone(); let in_flight = in_flight.clone();
let command_receiver = command_receiver.clone(); let command_receiver = command_receiver.clone();
handles.push(thread::spawn(move || { handles.push(thread::spawn(move || {
if let Err(e) = Self::writer(write, in_flight, command_receiver) { if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) {
error!("writer failed: {:?}", e); error!("writer failed: {:?}", e);
} }
})); }));
@ -60,13 +64,16 @@ impl Requester {
let (command_sender, command_receiver) = crossbeam_channel::bounded(16); let (command_sender, command_receiver) = crossbeam_channel::bounded(16);
let mut handles = Vec::new(); let mut handles = Vec::new();
let shutdown_signal: Arc<(AtomicU16, AtomicBool)> = Default::default();
{ {
// Spawn a reader // Spawn a reader
let in_flight = in_flight.clone(); let in_flight = in_flight.clone();
let shutdown_signal = shutdown_signal.clone();
handles.push(thread::spawn(move || { handles.push(thread::spawn(move || {
let stdin = stdin(); let stdin = stdin();
let read = stdin.lock(); let read = stdin.lock();
if let Err(e) = Self::reader(read, in_flight) { if let Err(e) = Self::reader(read, in_flight, shutdown_signal) {
error!("reader failed: {:?}", e); error!("reader failed: {:?}", e);
} }
})); }));
@ -79,7 +86,7 @@ impl Requester {
handles.push(thread::spawn(move || { handles.push(thread::spawn(move || {
let stdout = stdout(); let stdout = stdout();
let write = stdout.lock(); let write = stdout.lock();
if let Err(e) = Self::writer(write, in_flight, command_receiver) { if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) {
error!("writer failed: {:?}", e); error!("writer failed: {:?}", e);
} }
})); }));
@ -97,10 +104,17 @@ impl Requester {
fn reader<R: Read>( fn reader<R: Read>(
mut read: R, mut read: R,
in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>, in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>,
shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
loop { loop {
let response: Response = read_message(&mut read)?; let response: Response = read_message(&mut read)?;
if shutdown_request_channel.1.load(Ordering::Relaxed)
&& response.response_to == shutdown_request_channel.0.load(Ordering::Relaxed)
{
return Ok(());
}
let map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?; let map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?;
map.get(&response.response_to) map.get(&response.response_to)
.ok_or(anyhow!("Didn't find response channel..."))? .ok_or(anyhow!("Didn't find response channel..."))?
@ -114,6 +128,7 @@ impl Requester {
mut write: W, mut write: W,
in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>, in_flight: Arc<Mutex<HashMap<u16, Sender<ResponseBody>>>>,
command_receiver: Receiver<(RequestBody, Sender<ResponseBody>)>, command_receiver: Receiver<(RequestBody, Sender<ResponseBody>)>,
shutdown_request_channel: Arc<(AtomicU16, AtomicBool)>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
while let Ok((req_body, response_channel)) = command_receiver.recv() { while let Ok((req_body, response_channel)) = command_receiver.recv() {
let request_id = { let request_id = {
@ -125,6 +140,15 @@ impl Requester {
map.insert(request_id, response_channel); map.insert(request_id, response_channel);
request_id request_id
}; };
let shutting_down = &req_body == &RequestBody::Shutdown;
if shutting_down {
shutdown_request_channel
.0
.store(request_id, Ordering::SeqCst);
shutdown_request_channel.1.store(true, Ordering::SeqCst);
}
write_message( write_message(
&mut write, &mut write,
&Request { &Request {
@ -133,7 +157,37 @@ impl Requester {
}, },
)?; )?;
write.flush()?; write.flush()?;
if shutting_down {
return Ok(());
} }
}
info!("Exited send loop without shutdown message, issuing Shutdown.");
// shutdown ourselves
let request_id = {
let map = in_flight.lock().or(Err(anyhow!("Mutex poisoned")))?;
let request_id = (0u16..u16::MAX)
.into_iter()
.find(|id| !map.contains_key(&id))
.expect("No ID found");
request_id
};
shutdown_request_channel
.0
.store(request_id, Ordering::SeqCst);
shutdown_request_channel.1.store(true, Ordering::SeqCst);
write_message(
&mut write,
&Request {
id: request_id,
body: RequestBody::Shutdown,
},
)?;
write.flush()?;
Ok(()) Ok(())
} }

View File

@ -279,6 +279,13 @@ impl Responder {
} }
} }
}, },
RequestBody::Shutdown => {
warn!("Not yet implemented: Shutdown");
Response {
response_to: request.id,
body: ResponseBody::Success,
}
}
}; };
responses responses