From e8fc448acee63d196e1a87e78c41f5f3d012d862 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 30 May 2022 23:11:16 +0100 Subject: [PATCH] Remove needless bare_cnr crate --- Cargo.lock | 150 +--------------- Cargo.toml | 1 - bare_cnr/Cargo.toml | 26 --- bare_cnr/src/lib.rs | 23 --- bare_cnr/src/multiplexer.rs | 332 ------------------------------------ 5 files changed, 6 insertions(+), 526 deletions(-) delete mode 100644 bare_cnr/Cargo.toml delete mode 100644 bare_cnr/src/lib.rs delete mode 100644 bare_cnr/src/multiplexer.rs diff --git a/Cargo.lock b/Cargo.lock index 933d632..d2d074f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,32 +56,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03d96e5bb1b3f9313145dfc9c15d22036fa900533d1a21744f684c642919dd09" dependencies = [ "ahash 0.3.8", - "dashmap 4.0.2", + "dashmap", "once_cell", "serde", ] -[[package]] -name = "async-stream" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" -dependencies = [ - "async-stream-impl", - "futures-core", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atty" version = "0.2.14" @@ -119,7 +98,7 @@ dependencies = [ "anyhow", "bare-metrics-core", "crossbeam-channel", - "dashmap 4.0.2", + "dashmap", "fxhash", "hdrhistogram", "log", @@ -128,20 +107,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "bare_cnr" -version = "0.1.0-alpha.1" -dependencies = [ - "dashmap 5.3.4", - "log", - "rand", - "serde", - "serde_bare 0.5.0", - "thiserror", - "tokio", - "tokio-test", -] - [[package]] name = "base64" version = "0.13.0" @@ -170,12 +135,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "bytes" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" - [[package]] name = "cc" version = "1.0.73" @@ -393,18 +352,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "dashmap" -version = "5.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" -dependencies = [ - "cfg-if 1.0.0", - "hashbrown 0.12.1", - "lock_api", - "parking_lot_core", -] - [[package]] name = "datman" version = "0.6.0-alpha.1" @@ -521,13 +468,11 @@ checksum = "861d7b3427fbf3e06300b4aca5c430a2e263b7a7b6821faff8b200d3dc4a61cb" [[package]] name = "flate2" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" dependencies = [ - "cfg-if 1.0.0", "crc32fast", - "libc", "miniz_oxide", ] @@ -541,12 +486,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "futures-core" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" - [[package]] name = "fxhash" version = "0.2.1" @@ -588,12 +527,6 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -[[package]] -name = "hashbrown" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" - [[package]] name = "hashlink" version = "0.6.0" @@ -827,9 +760,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.5.1" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" dependencies = [ "adler", ] @@ -957,12 +890,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "pin-project-lite" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" - [[package]] name = "pkg-config" version = "0.3.25" @@ -1283,16 +1210,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" -[[package]] -name = "socket2" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "sodiumoxide" version = "0.2.7" @@ -1430,61 +1347,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tokio" -version = "1.18.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" -dependencies = [ - "bytes", - "libc", - "memchr", - "mio", - "num_cpus", - "once_cell", - "parking_lot", - "pin-project-lite", - "signal-hook-registry", - "socket2", - "tokio-macros", - "winapi", -] - -[[package]] -name = "tokio-macros" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tokio-stream" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - -[[package]] -name = "tokio-test" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" -dependencies = [ - "async-stream", - "bytes", - "futures-core", - "tokio", - "tokio-stream", -] - [[package]] name = "toml" version = "0.5.9" diff --git a/Cargo.toml b/Cargo.toml index 9d7bf94..5c733e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "yama", "datman", - "bare_cnr", ] [profile.release] diff --git a/bare_cnr/Cargo.toml b/bare_cnr/Cargo.toml deleted file mode 100644 index b1d23a1..0000000 --- a/bare_cnr/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "bare_cnr" -version = "0.1.0-alpha.1" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -thiserror = "1.0.31" -serde = "1.0.137" -serde_bare = "0.5.0" -dashmap = "5.3.4" -log = "0.4.17" -rand = "0.8.5" - -[dev-dependencies] -tokio-test = "0.4.2" - -[dependencies.tokio] -# TODO optional = true -version = "1.18.2" -features = ["full"] # TODO restrict this - - -[features] -# TODO default = ["tokio"] \ No newline at end of file diff --git a/bare_cnr/src/lib.rs b/bare_cnr/src/lib.rs deleted file mode 100644 index d7daa50..0000000 --- a/bare_cnr/src/lib.rs +++ /dev/null @@ -1,23 +0,0 @@ -use thiserror::Error; - -mod multiplexer; - -#[derive(Error, Debug)] -pub enum CnrError { - #[error("Version mismatch.")] - VersionMismatch, - - #[error("Ser/deserialisation error.")] - Serde(#[from] serde_bare::error::Error), - - #[error("Input/output error.")] - Io(#[from] std::io::Error), - - #[error("Channel closed.")] - Closed, -} - -pub use multiplexer::Channel; -pub use multiplexer::ChannelHandle; -pub use multiplexer::ChannelLock; -pub use multiplexer::TransportMultiplexer; diff --git a/bare_cnr/src/multiplexer.rs b/bare_cnr/src/multiplexer.rs deleted file mode 100644 index ccac1dc..0000000 --- a/bare_cnr/src/multiplexer.rs +++ /dev/null @@ -1,332 +0,0 @@ -use crate::CnrError; -use dashmap::mapref::entry::Entry; -use dashmap::DashMap; -use log::{error, warn}; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::cell::RefCell; -use std::marker::PhantomData; -use std::sync::{Arc, Weak}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{Mutex, MutexGuard}; -use tokio::task::JoinHandle; - -pub struct TransportMultiplexer { - task_tx: JoinHandle<()>, - task_rx: JoinHandle<()>, - - /// Whether this side is the initiator. - /// The initiator can open even-numbered channels. - initiator: bool, - - /// Queue to send new bytes out on the wire - tx_queue: Sender<(u16, Vec)>, - - /// Senders for sending messages from the wire to channels - channels: Arc>>>, - - /// Channel receivers for channels that have received messages but haven't yet been claimed - unclaimed_channels: Arc>>>, -} - -thread_local! { - static CURRENT_MULTIPLEXER: RefCell> = Default::default(); -} - -impl TransportMultiplexer { - pub fn new( - rx: R, - tx: W, - initiator: bool, - ) -> Result, CnrError> { - let (txqueue_tx, txqueue_rx) = tokio::sync::mpsc::channel(8); - - let channels = Arc::new(Default::default()); - let unclaimed_channels = Arc::new(Default::default()); - - let task_tx = tokio::spawn(async move { - if let Err(err) = TransportMultiplexer::handle_tx(tx, txqueue_rx).await { - error!("TX handler failed: {:?}", err); - } - }); - let channels2 = Arc::clone(&channels); - let unclaimed_channels2 = Arc::clone(&unclaimed_channels); - let task_rx = tokio::spawn(async move { - if let Err(err) = - TransportMultiplexer::handle_rx(rx, channels2, unclaimed_channels2).await - { - error!("RX handler failed: {:?}", err); - } - }); - - Ok(Arc::new(TransportMultiplexer { - task_tx, - task_rx, - initiator, - tx_queue: txqueue_tx, - channels, - unclaimed_channels, - })) - } - - pub fn open_channel_with_id(self: &Arc, channel_id: u16) -> Option> { - match self.channels.entry(channel_id) { - Entry::Occupied(_) => { - match self.unclaimed_channels.remove(&channel_id) { - Some((_k, chan_rx)) => Some(Channel { - id: channel_id, - inner: Arc::new(Mutex::new(ChannelInner { - chan_id: channel_id, - tx: self.tx_queue.clone(), - rx: chan_rx, - multiplexer: Arc::downgrade(&self), - })), - marker: Default::default(), - }), - None => { - // Channel ID already in use. - None - } - } - } - Entry::Vacant(ve) => { - let (chan_tx, chan_rx) = tokio::sync::mpsc::channel(8); - ve.insert(chan_tx); - Some(Channel { - id: channel_id, - inner: Arc::new(Mutex::new(ChannelInner { - chan_id: channel_id, - tx: self.tx_queue.clone(), - rx: chan_rx, - multiplexer: Arc::downgrade(&self), - })), - marker: Default::default(), - }) - } - } - } - - pub fn open_unused_channel(self: &Arc) -> Option> { - loop { - let id: u16 = rand::random(); - let id = if self.initiator { id & !1 } else { id | 1 }; - if let Some(chan) = self.open_channel_with_id(id) { - return Some(chan); - } - } - } - - async fn handle_tx( - mut tx: W, - mut txqueue_rx: Receiver<(u16, Vec)>, - ) -> Result<(), CnrError> { - while let Some((chan_id, next_msg)) = txqueue_rx.recv().await { - tx.write_u16(chan_id).await?; - tx.write_u32(next_msg.len().try_into().unwrap()).await?; - tx.write_all(&next_msg).await?; - // TODO(performance) would be nice not to flush if something else is just behind. - tx.flush().await?; - } - Ok(()) - } - - async fn handle_rx( - mut rx: R, - channels: Arc>>>, - unclaimed_channels: Arc>>>, - ) -> Result<(), CnrError> { - loop { - // TODO(features): need to be able to support graceful EOF - let chan_id = rx.read_u16().await?; - let length = rx.read_u32().await? as usize; - // TODO(perf): use uninit? - let mut buf = vec![0u8; length]; - - rx.read_exact(&mut buf[..]).await?; - - match channels.entry(chan_id) { - Entry::Occupied(oe) => { - if let Err(err) = oe.get().send(buf).await { - // TODO this channel has died. What can we do about it? - warn!("Message received but channel {} dead", chan_id); - // TODO maybe we should clean it up at this point... - } - } - Entry::Vacant(ve) => { - let (chan_tx, chan_rx) = tokio::sync::mpsc::channel(8); - unclaimed_channels.insert(chan_id, chan_rx); - chan_tx.try_send(buf).expect("empty channel succeeds"); - ve.insert(chan_tx); - } - } - } - } -} - -pub struct ChannelHandle { - marker: PhantomData<(LTR, RTL)>, - chan_id: u16, - multiplexer: Weak, -} - -impl ChannelHandle { - pub fn into_channel(self) -> Option> { - let multiplexer = self.multiplexer.upgrade()?; - multiplexer.open_channel_with_id(self.chan_id) - } -} - -//impl Serialize for ChannelHandle { -impl Serialize for ChannelHandle { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - self.chan_id.serialize(serializer) - } -} - -//impl<'de, LTR, RTL, const INITIATOR: bool> Deserialize<'de> for ChannelHandle { -impl<'de, LTR, RTL> Deserialize<'de> for ChannelHandle { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - // TODO We could check the initiator flags are the right way around by embedding a bool in - // the format. - let chan_id = u16::deserialize(deserializer)?; - Ok(ChannelHandle { - marker: Default::default(), - chan_id, - multiplexer: CURRENT_MULTIPLEXER.with(|refcell| refcell.borrow().clone()), - }) - } -} - -struct ChannelInner { - chan_id: u16, - tx: Sender<(u16, Vec)>, - rx: Receiver>, - multiplexer: Weak, -} - -impl Drop for ChannelInner { - fn drop(&mut self) { - if let Some(multiplexer) = self.multiplexer.upgrade() { - multiplexer.channels.remove(&self.chan_id); - } - } -} - -pub struct ChannelLock<'a, R, W> { - // TODO we could use an OwnedMutexGuard if needed - guard: MutexGuard<'a, ChannelInner>, - marker: PhantomData<(R, W)>, -} - -impl<'a, R: DeserializeOwned, W: Serialize> ChannelLock<'a, R, W> { - pub async fn send(&mut self, message: &W) -> Result<(), CnrError> { - let bytes = serde_bare::to_vec(message)?; - self.guard - .tx - .send((self.guard.chan_id, bytes)) - .await - .map_err(|_| CnrError::Closed)?; - Ok(()) - } - - pub async fn recv(&mut self) -> Result { - let new_weak = self.guard.multiplexer.clone(); - CURRENT_MULTIPLEXER.with(move |refcell| { - *(refcell.borrow_mut()) = new_weak; - }); - let res = match self.guard.rx.recv().await { - Some(bytes) => Ok(serde_bare::from_slice(&bytes)?), - None => Err(CnrError::Closed), - }; - CURRENT_MULTIPLEXER.with(|refcell| { - *(refcell.borrow_mut()) = Weak::new(); - }); - res - } -} - -pub struct Channel { - pub id: u16, - inner: Arc>, - marker: PhantomData<(R, W)>, -} - -impl Channel { - /// Channel handles only get sent in one direction. - pub fn handle(&self) -> ChannelHandle { - ChannelHandle { - marker: Default::default(), - chan_id: self.id, - multiplexer: Default::default(), - } - } -} - -impl Channel { - pub async fn lock(&self) -> ChannelLock<'_, R, W> { - ChannelLock { - guard: self.inner.lock().await, - marker: Default::default(), - } - } - - pub async fn send(&mut self, message: &W) -> Result<(), CnrError> { - self.lock().await.send(message).await - } - - pub async fn recv(&mut self) -> Result { - self.lock().await.recv().await - } -} - -#[cfg(test)] -mod test { - use crate::multiplexer::{Channel, ChannelHandle, TransportMultiplexer}; - - /// Tests that data reaches the other side! - #[tokio::test] - async fn test_data_reaches_the_other_side() { - let (commander, responder) = tokio::io::duplex(64); - let (commander_r, commander_w) = tokio::io::split(commander); - let (responder_r, responder_w) = tokio::io::split(responder); - let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap(); - let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap(); - - let mut c_chan0: Channel = commander.open_channel_with_id(0).unwrap(); - c_chan0.send(&32).await.unwrap(); - - let mut r_chan0: Channel = responder.open_channel_with_id(0).unwrap(); - assert_eq!(r_chan0.recv().await.unwrap(), 32); - r_chan0.send(&2048).await.unwrap(); - - assert_eq!(c_chan0.recv().await.unwrap(), 2048); - } - - /// Tests that you can hand over a channel just by including a ChannelHandle in the message. - #[tokio::test] - async fn test_channel_handover() { - let (commander, responder) = tokio::io::duplex(64); - let (commander_r, commander_w) = tokio::io::split(commander); - let (responder_r, responder_w) = tokio::io::split(responder); - let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap(); - let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap(); - - let mut c_chan0: Channel<(), ChannelHandle> = - commander.open_channel_with_id(0).unwrap(); - let mut c_chan2: Channel = commander.open_channel_with_id(2).unwrap(); - c_chan0.send(&c_chan2.handle()).await.unwrap(); - c_chan2.send(&42).await.unwrap(); - - let mut r_chan0: Channel, ()> = - responder.open_channel_with_id(0).unwrap(); - let mut r_chan2 = r_chan0.recv().await.unwrap().into_channel().unwrap(); - assert_eq!(r_chan2.recv().await.unwrap(), 42); - } -}