Give a name to all non-main threads
This commit is contained in:
parent
9a74fa2cdc
commit
7e2b13416b
@ -706,17 +706,20 @@ impl RawPile for SqliteBloblogPile {
|
|||||||
|
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
|
|
||||||
thread::spawn(move || {
|
thread::Builder::new()
|
||||||
let worker_id = Arc::new(format!("bloblogwriter"));
|
.name("SQLBloblogStPpln".to_string())
|
||||||
if let Err(err) = this.storage_pipeline_worker(incoming) {
|
.spawn(move || {
|
||||||
controller_send
|
let worker_id = Arc::new(format!("bloblogwriter"));
|
||||||
.send(ControllerMessage::Failure {
|
if let Err(err) = this.storage_pipeline_worker(incoming) {
|
||||||
worker_id,
|
controller_send
|
||||||
error_message: format!("err {:?}", err),
|
.send(ControllerMessage::Failure {
|
||||||
})
|
worker_id,
|
||||||
.expect("This is BAD: failed to send failure message to controller.");
|
error_message: format!("err {:?}", err),
|
||||||
}
|
})
|
||||||
});
|
.expect("This is BAD: failed to send failure message to controller.");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
Ok(sender)
|
Ok(sender)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -39,22 +39,34 @@ impl Requester {
|
|||||||
// Spawn a reader
|
// Spawn a reader
|
||||||
let in_flight = in_flight.clone();
|
let in_flight = in_flight.clone();
|
||||||
let shutdown_signal = shutdown_signal.clone();
|
let shutdown_signal = shutdown_signal.clone();
|
||||||
handles.push(thread::spawn(move || {
|
handles.push(
|
||||||
if let Err(e) = Self::reader(read, in_flight, shutdown_signal) {
|
thread::Builder::new()
|
||||||
error!("reader failed: {:?}", e);
|
.name("ReqstrReader".to_string())
|
||||||
}
|
.spawn(move || {
|
||||||
}));
|
if let Err(e) = Self::reader(read, in_flight, shutdown_signal) {
|
||||||
|
error!("reader failed: {:?}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
// Spawn a writer
|
// Spawn a writer
|
||||||
let in_flight = in_flight.clone();
|
let in_flight = in_flight.clone();
|
||||||
let command_receiver = command_receiver.clone();
|
let command_receiver = command_receiver.clone();
|
||||||
handles.push(thread::spawn(move || {
|
handles.push(
|
||||||
if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) {
|
thread::Builder::new()
|
||||||
error!("writer failed: {:?}", e);
|
.name("ReqstrWriter".to_string())
|
||||||
}
|
.spawn(move || {
|
||||||
}));
|
if let Err(e) =
|
||||||
|
Self::writer(write, in_flight, command_receiver, shutdown_signal)
|
||||||
|
{
|
||||||
|
error!("writer failed: {:?}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
(
|
(
|
||||||
@ -77,26 +89,38 @@ impl Requester {
|
|||||||
// Spawn a reader
|
// Spawn a reader
|
||||||
let in_flight = in_flight.clone();
|
let in_flight = in_flight.clone();
|
||||||
let shutdown_signal = shutdown_signal.clone();
|
let shutdown_signal = shutdown_signal.clone();
|
||||||
handles.push(thread::spawn(move || {
|
handles.push(
|
||||||
let stdin = stdin();
|
thread::Builder::new()
|
||||||
let read = stdin.lock();
|
.name("ReqstrReaderSI".to_string())
|
||||||
if let Err(e) = Self::reader(read, in_flight, shutdown_signal) {
|
.spawn(move || {
|
||||||
error!("reader failed: {:?}", e);
|
let stdin = stdin();
|
||||||
}
|
let read = stdin.lock();
|
||||||
}));
|
if let Err(e) = Self::reader(read, in_flight, shutdown_signal) {
|
||||||
|
error!("reader failed: {:?}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
// Spawn a writer
|
// Spawn a writer
|
||||||
let in_flight = in_flight.clone();
|
let in_flight = in_flight.clone();
|
||||||
let command_receiver = command_receiver.clone();
|
let command_receiver = command_receiver.clone();
|
||||||
handles.push(thread::spawn(move || {
|
handles.push(
|
||||||
let stdout = stdout();
|
thread::Builder::new()
|
||||||
let write = stdout.lock();
|
.name("ReqstrWriterSO".to_string())
|
||||||
if let Err(e) = Self::writer(write, in_flight, command_receiver, shutdown_signal) {
|
.spawn(move || {
|
||||||
error!("writer failed: {:?}", e);
|
let stdout = stdout();
|
||||||
}
|
let write = stdout.lock();
|
||||||
}));
|
if let Err(e) =
|
||||||
|
Self::writer(write, in_flight, command_receiver, shutdown_signal)
|
||||||
|
{
|
||||||
|
error!("writer failed: {:?}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
(
|
(
|
||||||
@ -327,74 +351,77 @@ impl RawPile for Requester {
|
|||||||
let command_sender = self.commands.clone();
|
let command_sender = self.commands.clone();
|
||||||
register_histogram!("requester_cmd_response_time_ms", Unit::Milliseconds);
|
register_histogram!("requester_cmd_response_time_ms", Unit::Milliseconds);
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::Builder::new()
|
||||||
let (response_tx, response_rx) = crossbeam_channel::bounded::<ResponseBody>(32);
|
.name("ReqStPpln".to_string())
|
||||||
let mut in_flight_writes = 0;
|
.spawn(move || {
|
||||||
const MAX_IN_FLIGHT_WRITES: u32 = 32;
|
let (response_tx, response_rx) = crossbeam_channel::bounded::<ResponseBody>(32);
|
||||||
let mut pipeline_still_going = true;
|
let mut in_flight_writes = 0;
|
||||||
|
const MAX_IN_FLIGHT_WRITES: u32 = 32;
|
||||||
|
let mut pipeline_still_going = true;
|
||||||
|
|
||||||
while pipeline_still_going || in_flight_writes > 0 {
|
while pipeline_still_going || in_flight_writes > 0 {
|
||||||
// TODO this won't handle channel closure properly.
|
// TODO this won't handle channel closure properly.
|
||||||
if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going {
|
if in_flight_writes < MAX_IN_FLIGHT_WRITES && pipeline_still_going {
|
||||||
crossbeam_channel::select! {
|
crossbeam_channel::select! {
|
||||||
recv(response_rx) -> resp => {
|
recv(response_rx) -> resp => {
|
||||||
in_flight_writes -= 1;
|
in_flight_writes -= 1;
|
||||||
match resp.unwrap() {
|
match resp.unwrap() {
|
||||||
ResponseBody::Success => {
|
ResponseBody::Success => {
|
||||||
// nop
|
// nop
|
||||||
|
}
|
||||||
|
ResponseBody::Failed(string) => {
|
||||||
|
panic!("Requester pipeline fail {}", string);
|
||||||
|
}
|
||||||
|
ResponseBody::BatchData { .. } => {
|
||||||
|
panic!("wtf BatchData");
|
||||||
|
}
|
||||||
|
ResponseBody::NotExists => {
|
||||||
|
panic!("wtf NotExists");
|
||||||
|
}
|
||||||
|
ResponseBody::Data(_) => {
|
||||||
|
panic!("wtf Data");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ResponseBody::Failed(string) => {
|
}
|
||||||
panic!("Requester pipeline fail {}", string);
|
recv(receiver) -> resp => {
|
||||||
}
|
if let Ok((chunk_id, write)) = resp {
|
||||||
ResponseBody::BatchData { .. } => {
|
in_flight_writes += 1;
|
||||||
panic!("wtf BatchData");
|
command_sender.send((RequestBody::Write {
|
||||||
}
|
kind: Keyspace::Chunk,
|
||||||
ResponseBody::NotExists => {
|
key: chunk_id.to_vec(),
|
||||||
panic!("wtf NotExists");
|
value: write
|
||||||
}
|
}, Some(response_tx.clone()))).unwrap();
|
||||||
ResponseBody::Data(_) => {
|
} else {
|
||||||
panic!("wtf Data");
|
// the input has stopped
|
||||||
|
pipeline_still_going = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
recv(receiver) -> resp => {
|
} else {
|
||||||
if let Ok((chunk_id, write)) = resp {
|
// Either the pipeline is stopping or we are too busy to accept new chunks,
|
||||||
in_flight_writes += 1;
|
// so only process responses.
|
||||||
command_sender.send((RequestBody::Write {
|
let resp = response_rx.recv().unwrap();
|
||||||
kind: Keyspace::Chunk,
|
match resp {
|
||||||
key: chunk_id.to_vec(),
|
ResponseBody::Success => {
|
||||||
value: write
|
// nop
|
||||||
}, Some(response_tx.clone()))).unwrap();
|
}
|
||||||
} else {
|
ResponseBody::Failed(string) => {
|
||||||
// the input has stopped
|
panic!("Requester pipeline fail {}", string);
|
||||||
pipeline_still_going = false;
|
}
|
||||||
|
ResponseBody::BatchData { .. } => {
|
||||||
|
panic!("wtf BatchData");
|
||||||
|
}
|
||||||
|
ResponseBody::NotExists => {
|
||||||
|
panic!("wtf NotExists");
|
||||||
|
}
|
||||||
|
ResponseBody::Data(_) => {
|
||||||
|
panic!("wtf Data");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Either the pipeline is stopping or we are too busy to accept new chunks,
|
|
||||||
// so only process responses.
|
|
||||||
let resp = response_rx.recv().unwrap();
|
|
||||||
match resp {
|
|
||||||
ResponseBody::Success => {
|
|
||||||
// nop
|
|
||||||
}
|
|
||||||
ResponseBody::Failed(string) => {
|
|
||||||
panic!("Requester pipeline fail {}", string);
|
|
||||||
}
|
|
||||||
ResponseBody::BatchData { .. } => {
|
|
||||||
panic!("wtf BatchData");
|
|
||||||
}
|
|
||||||
ResponseBody::NotExists => {
|
|
||||||
panic!("wtf NotExists");
|
|
||||||
}
|
|
||||||
ResponseBody::Data(_) => {
|
|
||||||
panic!("wtf Data");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
});
|
.unwrap();
|
||||||
|
|
||||||
Ok(input)
|
Ok(input)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -54,26 +54,33 @@ impl Responder {
|
|||||||
// spawn the reader
|
// spawn the reader
|
||||||
let work_queue_send = work_queue_send.clone();
|
let work_queue_send = work_queue_send.clone();
|
||||||
let responder = responder.clone();
|
let responder = responder.clone();
|
||||||
thread::spawn(move || {
|
thread::Builder::new()
|
||||||
let mut read = read;
|
.name("RespdrReader".to_string())
|
||||||
if let Err(e) = responder.reader(&mut read, work_queue_send, &mut progress_bar) {
|
.spawn(move || {
|
||||||
error!("reader failed: {:?}", e);
|
let mut read = read;
|
||||||
}
|
if let Err(e) = responder.reader(&mut read, work_queue_send, &mut progress_bar)
|
||||||
read
|
{
|
||||||
})
|
error!("reader failed: {:?}", e);
|
||||||
|
}
|
||||||
|
read
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
let w_handle = {
|
let w_handle = {
|
||||||
// spawn the writer
|
// spawn the writer
|
||||||
let resp_recv = resp_recv.clone();
|
let resp_recv = resp_recv.clone();
|
||||||
let responder = responder.clone();
|
let responder = responder.clone();
|
||||||
thread::spawn(move || {
|
thread::Builder::new()
|
||||||
let mut write = write;
|
.name("RespdrWriter".to_string())
|
||||||
if let Err(e) = responder.writer(&mut write, resp_recv) {
|
.spawn(move || {
|
||||||
error!("writer failed: {:?}", e);
|
let mut write = write;
|
||||||
}
|
if let Err(e) = responder.writer(&mut write, resp_recv) {
|
||||||
write
|
error!("writer failed: {:?}", e);
|
||||||
})
|
}
|
||||||
|
write
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
for worker_num in 0..num_workers {
|
for worker_num in 0..num_workers {
|
||||||
@ -82,11 +89,17 @@ impl Responder {
|
|||||||
let work_queue_recv = work_queue_recv.clone();
|
let work_queue_recv = work_queue_recv.clone();
|
||||||
let resp_send = resp_send.clone();
|
let resp_send = resp_send.clone();
|
||||||
let pile = pile.clone();
|
let pile = pile.clone();
|
||||||
handles.push(thread::spawn(move || {
|
handles.push(
|
||||||
if let Err(e) = responder.worker(pile.as_ref(), work_queue_recv, resp_send) {
|
thread::Builder::new()
|
||||||
error!("worker {} failed: {:?}", worker_num, e);
|
.name("RespdrWorker".to_string())
|
||||||
}
|
.spawn(move || {
|
||||||
}));
|
if let Err(e) = responder.worker(pile.as_ref(), work_queue_recv, resp_send)
|
||||||
|
{
|
||||||
|
error!("worker {} failed: {:?}", worker_num, e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
(r_handle, w_handle, handles)
|
(r_handle, w_handle, handles)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user