From 7e2b13416b4a3777f291fe76634bf89491aac8c2 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 10 Jan 2022 21:51:17 +0000 Subject: [PATCH] Give a name to all non-main threads --- yama/src/pile/local_sqlitebloblogs.rs | 25 ++-- yama/src/remote/requester.rs | 193 +++++++++++++++----------- yama/src/remote/responder.rs | 51 ++++--- 3 files changed, 156 insertions(+), 113 deletions(-) diff --git a/yama/src/pile/local_sqlitebloblogs.rs b/yama/src/pile/local_sqlitebloblogs.rs index 81f3f64..de2d91e 100644 --- a/yama/src/pile/local_sqlitebloblogs.rs +++ b/yama/src/pile/local_sqlitebloblogs.rs @@ -706,17 +706,20 @@ impl RawPile for SqliteBloblogPile { let this = self.clone(); - thread::spawn(move || { - let worker_id = Arc::new(format!("bloblogwriter")); - if let Err(err) = this.storage_pipeline_worker(incoming) { - controller_send - .send(ControllerMessage::Failure { - worker_id, - error_message: format!("err {:?}", err), - }) - .expect("This is BAD: failed to send failure message to controller."); - } - }); + thread::Builder::new() + .name("SQLBloblogStPpln".to_string()) + .spawn(move || { + let worker_id = Arc::new(format!("bloblogwriter")); + if let Err(err) = this.storage_pipeline_worker(incoming) { + controller_send + .send(ControllerMessage::Failure { + worker_id, + error_message: format!("err {:?}", err), + }) + .expect("This is BAD: failed to send failure message to controller."); + } + }) + .unwrap(); Ok(sender) } diff --git a/yama/src/remote/requester.rs b/yama/src/remote/requester.rs index 0965359..c39f2f6 100644 --- a/yama/src/remote/requester.rs +++ b/yama/src/remote/requester.rs @@ -39,22 +39,34 @@ impl Requester { // Spawn a reader let in_flight = in_flight.clone(); let shutdown_signal = shutdown_signal.clone(); - handles.push(thread::spawn(move || { - if let Err(e) = Self::reader(read, in_flight, shutdown_signal) { - error!("reader failed: {:?}", e); - } - })); + handles.push( + thread::Builder::new() + .name("ReqstrReader".to_string()) + .spawn(move || { + if let Err(e) = Self::reader(read, in_flight, shutdown_signal) { + error!("reader failed: {:?}", e); + } + }) + .unwrap(), + ); } { // Spawn a writer let in_flight = in_flight.clone(); let command_receiver = command_receiver.clone(); - handles.push(thread::spawn(move || { - if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) { - error!("writer failed: {:?}", e); - } - })); + handles.push( + thread::Builder::new() + .name("ReqstrWriter".to_string()) + .spawn(move || { + if let Err(e) = + Self::writer(write, in_flight, command_receiver, shutdown_signal) + { + error!("writer failed: {:?}", e); + } + }) + .unwrap(), + ); } ( @@ -77,26 +89,38 @@ impl Requester { // Spawn a reader let in_flight = in_flight.clone(); let shutdown_signal = shutdown_signal.clone(); - handles.push(thread::spawn(move || { - let stdin = stdin(); - let read = stdin.lock(); - if let Err(e) = Self::reader(read, in_flight, shutdown_signal) { - error!("reader failed: {:?}", e); - } - })); + handles.push( + thread::Builder::new() + .name("ReqstrReaderSI".to_string()) + .spawn(move || { + let stdin = stdin(); + let read = stdin.lock(); + if let Err(e) = Self::reader(read, in_flight, shutdown_signal) { + error!("reader failed: {:?}", e); + } + }) + .unwrap(), + ); } { // Spawn a writer let in_flight = in_flight.clone(); let command_receiver = command_receiver.clone(); - handles.push(thread::spawn(move || { - let stdout = stdout(); - let write = stdout.lock(); - if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) { - error!("writer failed: {:?}", e); - } - })); + handles.push( + thread::Builder::new() + .name("ReqstrWriterSO".to_string()) + .spawn(move || { + let stdout = stdout(); + let write = stdout.lock(); + if let Err(e) = + Self::writer(write, in_flight, command_receiver, shutdown_signal) + { + error!("writer failed: {:?}", e); + } + }) + .unwrap(), + ); } ( @@ -327,74 +351,77 @@ impl RawPile for Requester { let command_sender = self.commands.clone(); register_histogram!("requester_cmd_response_time_ms", Unit::Milliseconds); - std::thread::spawn(move || { - let (response_tx, response_rx) = crossbeam_channel::bounded::(32); - let mut in_flight_writes = 0; - const MAX_IN_FLIGHT_WRITES: u32 = 32; - let mut pipeline_still_going = true; + std::thread::Builder::new() + .name("ReqStPpln".to_string()) + .spawn(move || { + let (response_tx, response_rx) = crossbeam_channel::bounded::(32); + let mut in_flight_writes = 0; + const MAX_IN_FLIGHT_WRITES: u32 = 32; + let mut pipeline_still_going = true; - while pipeline_still_going || in_flight_writes > 0 { - // TODO this won't handle channel closure properly. - if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going { - crossbeam_channel::select! { - recv(response_rx) -> resp => { - in_flight_writes -= 1; - match resp.unwrap() { - ResponseBody::Success => { - // nop + while pipeline_still_going || in_flight_writes > 0 { + // TODO this won't handle channel closure properly. + if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going { + crossbeam_channel::select! { + recv(response_rx) -> resp => { + in_flight_writes -= 1; + match resp.unwrap() { + ResponseBody::Success => { + // nop + } + ResponseBody::Failed(string) => { + panic!("Requester pipeline fail {}", string); + } + ResponseBody::BatchData { .. } => { + panic!("wtf BatchData"); + } + ResponseBody::NotExists => { + panic!("wtf NotExists"); + } + ResponseBody::Data(_) => { + panic!("wtf Data"); + } } - ResponseBody::Failed(string) => { - panic!("Requester pipeline fail {}", string); - } - ResponseBody::BatchData { .. } => { - panic!("wtf BatchData"); - } - ResponseBody::NotExists => { - panic!("wtf NotExists"); - } - ResponseBody::Data(_) => { - panic!("wtf Data"); + } + recv(receiver) -> resp => { + if let Ok((chunk_id, write)) = resp { + in_flight_writes += 1; + command_sender.send((RequestBody::Write { + kind: Keyspace::Chunk, + key: chunk_id.to_vec(), + value: write + }, Some(response_tx.clone()))).unwrap(); + } else { + // the input has stopped + pipeline_still_going = false; } } } - recv(receiver) -> resp => { - if let Ok((chunk_id, write)) = resp { - in_flight_writes += 1; - command_sender.send((RequestBody::Write { - kind: Keyspace::Chunk, - key: chunk_id.to_vec(), - value: write - }, Some(response_tx.clone()))).unwrap(); - } else { - // the input has stopped - pipeline_still_going = false; + } else { + // Either the pipeline is stopping or we are too busy to accept new chunks, + // so only process responses. + let resp = response_rx.recv().unwrap(); + match resp { + ResponseBody::Success => { + // nop + } + ResponseBody::Failed(string) => { + panic!("Requester pipeline fail {}", string); + } + ResponseBody::BatchData { .. } => { + panic!("wtf BatchData"); + } + ResponseBody::NotExists => { + panic!("wtf NotExists"); + } + ResponseBody::Data(_) => { + panic!("wtf Data"); } - } - } - } else { - // Either the pipeline is stopping or we are too busy to accept new chunks, - // so only process responses. - let resp = response_rx.recv().unwrap(); - match resp { - ResponseBody::Success => { - // nop - } - ResponseBody::Failed(string) => { - panic!("Requester pipeline fail {}", string); - } - ResponseBody::BatchData { .. } => { - panic!("wtf BatchData"); - } - ResponseBody::NotExists => { - panic!("wtf NotExists"); - } - ResponseBody::Data(_) => { - panic!("wtf Data"); } } } - } - }); + }) + .unwrap(); Ok(input) } diff --git a/yama/src/remote/responder.rs b/yama/src/remote/responder.rs index 3786029..58c8525 100644 --- a/yama/src/remote/responder.rs +++ b/yama/src/remote/responder.rs @@ -54,26 +54,33 @@ impl Responder { // spawn the reader let work_queue_send = work_queue_send.clone(); let responder = responder.clone(); - thread::spawn(move || { - let mut read = read; - if let Err(e) = responder.reader(&mut read, work_queue_send, &mut progress_bar) { - error!("reader failed: {:?}", e); - } - read - }) + thread::Builder::new() + .name("RespdrReader".to_string()) + .spawn(move || { + let mut read = read; + if let Err(e) = responder.reader(&mut read, work_queue_send, &mut progress_bar) + { + error!("reader failed: {:?}", e); + } + read + }) + .unwrap() }; let w_handle = { // spawn the writer let resp_recv = resp_recv.clone(); let responder = responder.clone(); - thread::spawn(move || { - let mut write = write; - if let Err(e) = responder.writer(&mut write, resp_recv) { - error!("writer failed: {:?}", e); - } - write - }) + thread::Builder::new() + .name("RespdrWriter".to_string()) + .spawn(move || { + let mut write = write; + if let Err(e) = responder.writer(&mut write, resp_recv) { + error!("writer failed: {:?}", e); + } + write + }) + .unwrap() }; for worker_num in 0..num_workers { @@ -82,11 +89,17 @@ impl Responder { let work_queue_recv = work_queue_recv.clone(); let resp_send = resp_send.clone(); let pile = pile.clone(); - handles.push(thread::spawn(move || { - if let Err(e) = responder.worker(pile.as_ref(), work_queue_recv, resp_send) { - error!("worker {} failed: {:?}", worker_num, e); - } - })); + handles.push( + thread::Builder::new() + .name("RespdrWorker".to_string()) + .spawn(move || { + if let Err(e) = responder.worker(pile.as_ref(), work_queue_recv, resp_send) + { + error!("worker {} failed: {:?}", worker_num, e); + } + }) + .unwrap(), + ); } (r_handle, w_handle, handles)