Remove needless bare_cnr crate
This commit is contained in:
parent
4216243dcf
commit
e8fc448ace
|
@ -56,32 +56,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "03d96e5bb1b3f9313145dfc9c15d22036fa900533d1a21744f684c642919dd09"
|
checksum = "03d96e5bb1b3f9313145dfc9c15d22036fa900533d1a21744f684c642919dd09"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.3.8",
|
"ahash 0.3.8",
|
||||||
"dashmap 4.0.2",
|
"dashmap",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-stream"
|
|
||||||
version = "0.3.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
|
|
||||||
dependencies = [
|
|
||||||
"async-stream-impl",
|
|
||||||
"futures-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-stream-impl"
|
|
||||||
version = "0.3.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atty"
|
name = "atty"
|
||||||
version = "0.2.14"
|
version = "0.2.14"
|
||||||
|
@ -119,7 +98,7 @@ dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bare-metrics-core",
|
"bare-metrics-core",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"dashmap 4.0.2",
|
"dashmap",
|
||||||
"fxhash",
|
"fxhash",
|
||||||
"hdrhistogram",
|
"hdrhistogram",
|
||||||
"log",
|
"log",
|
||||||
|
@ -128,20 +107,6 @@ dependencies = [
|
||||||
"thiserror",
|
"thiserror",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "bare_cnr"
|
|
||||||
version = "0.1.0-alpha.1"
|
|
||||||
dependencies = [
|
|
||||||
"dashmap 5.3.4",
|
|
||||||
"log",
|
|
||||||
"rand",
|
|
||||||
"serde",
|
|
||||||
"serde_bare 0.5.0",
|
|
||||||
"thiserror",
|
|
||||||
"tokio",
|
|
||||||
"tokio-test",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "base64"
|
name = "base64"
|
||||||
version = "0.13.0"
|
version = "0.13.0"
|
||||||
|
@ -170,12 +135,6 @@ version = "1.4.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "bytes"
|
|
||||||
version = "1.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cc"
|
name = "cc"
|
||||||
version = "1.0.73"
|
version = "1.0.73"
|
||||||
|
@ -393,18 +352,6 @@ dependencies = [
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "dashmap"
|
|
||||||
version = "5.3.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if 1.0.0",
|
|
||||||
"hashbrown 0.12.1",
|
|
||||||
"lock_api",
|
|
||||||
"parking_lot_core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "datman"
|
name = "datman"
|
||||||
version = "0.6.0-alpha.1"
|
version = "0.6.0-alpha.1"
|
||||||
|
@ -521,13 +468,11 @@ checksum = "861d7b3427fbf3e06300b4aca5c430a2e263b7a7b6821faff8b200d3dc4a61cb"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flate2"
|
name = "flate2"
|
||||||
version = "1.0.23"
|
version = "1.0.24"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af"
|
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if 1.0.0",
|
|
||||||
"crc32fast",
|
"crc32fast",
|
||||||
"libc",
|
|
||||||
"miniz_oxide",
|
"miniz_oxide",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -541,12 +486,6 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures-core"
|
|
||||||
version = "0.3.21"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fxhash"
|
name = "fxhash"
|
||||||
version = "0.2.1"
|
version = "0.2.1"
|
||||||
|
@ -588,12 +527,6 @@ version = "0.11.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "hashbrown"
|
|
||||||
version = "0.12.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hashlink"
|
name = "hashlink"
|
||||||
version = "0.6.0"
|
version = "0.6.0"
|
||||||
|
@ -827,9 +760,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "miniz_oxide"
|
name = "miniz_oxide"
|
||||||
version = "0.5.1"
|
version = "0.5.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082"
|
checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"adler",
|
"adler",
|
||||||
]
|
]
|
||||||
|
@ -957,12 +890,6 @@ dependencies = [
|
||||||
"windows-sys",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "pin-project-lite"
|
|
||||||
version = "0.2.9"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pkg-config"
|
name = "pkg-config"
|
||||||
version = "0.3.25"
|
version = "0.3.25"
|
||||||
|
@ -1283,16 +1210,6 @@ version = "1.8.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
|
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "socket2"
|
|
||||||
version = "0.4.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sodiumoxide"
|
name = "sodiumoxide"
|
||||||
version = "0.2.7"
|
version = "0.2.7"
|
||||||
|
@ -1430,61 +1347,6 @@ dependencies = [
|
||||||
"crunchy",
|
"crunchy",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio"
|
|
||||||
version = "1.18.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"libc",
|
|
||||||
"memchr",
|
|
||||||
"mio",
|
|
||||||
"num_cpus",
|
|
||||||
"once_cell",
|
|
||||||
"parking_lot",
|
|
||||||
"pin-project-lite",
|
|
||||||
"signal-hook-registry",
|
|
||||||
"socket2",
|
|
||||||
"tokio-macros",
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-macros"
|
|
||||||
version = "1.7.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-stream"
|
|
||||||
version = "0.1.8"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
|
|
||||||
dependencies = [
|
|
||||||
"futures-core",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-test"
|
|
||||||
version = "0.4.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
|
|
||||||
dependencies = [
|
|
||||||
"async-stream",
|
|
||||||
"bytes",
|
|
||||||
"futures-core",
|
|
||||||
"tokio",
|
|
||||||
"tokio-stream",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toml"
|
name = "toml"
|
||||||
version = "0.5.9"
|
version = "0.5.9"
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
members = [
|
members = [
|
||||||
"yama",
|
"yama",
|
||||||
"datman",
|
"datman",
|
||||||
"bare_cnr",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "bare_cnr"
|
|
||||||
version = "0.1.0-alpha.1"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
thiserror = "1.0.31"
|
|
||||||
serde = "1.0.137"
|
|
||||||
serde_bare = "0.5.0"
|
|
||||||
dashmap = "5.3.4"
|
|
||||||
log = "0.4.17"
|
|
||||||
rand = "0.8.5"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
|
||||||
tokio-test = "0.4.2"
|
|
||||||
|
|
||||||
[dependencies.tokio]
|
|
||||||
# TODO optional = true
|
|
||||||
version = "1.18.2"
|
|
||||||
features = ["full"] # TODO restrict this
|
|
||||||
|
|
||||||
|
|
||||||
[features]
|
|
||||||
# TODO default = ["tokio"]
|
|
|
@ -1,23 +0,0 @@
|
||||||
use thiserror::Error;
|
|
||||||
|
|
||||||
mod multiplexer;
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
|
||||||
pub enum CnrError {
|
|
||||||
#[error("Version mismatch.")]
|
|
||||||
VersionMismatch,
|
|
||||||
|
|
||||||
#[error("Ser/deserialisation error.")]
|
|
||||||
Serde(#[from] serde_bare::error::Error),
|
|
||||||
|
|
||||||
#[error("Input/output error.")]
|
|
||||||
Io(#[from] std::io::Error),
|
|
||||||
|
|
||||||
#[error("Channel closed.")]
|
|
||||||
Closed,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub use multiplexer::Channel;
|
|
||||||
pub use multiplexer::ChannelHandle;
|
|
||||||
pub use multiplexer::ChannelLock;
|
|
||||||
pub use multiplexer::TransportMultiplexer;
|
|
|
@ -1,332 +0,0 @@
|
||||||
use crate::CnrError;
|
|
||||||
use dashmap::mapref::entry::Entry;
|
|
||||||
use dashmap::DashMap;
|
|
||||||
use log::{error, warn};
|
|
||||||
use serde::de::DeserializeOwned;
|
|
||||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
|
||||||
use std::cell::RefCell;
|
|
||||||
use std::marker::PhantomData;
|
|
||||||
use std::sync::{Arc, Weak};
|
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
|
||||||
use tokio::sync::{Mutex, MutexGuard};
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
pub struct TransportMultiplexer {
|
|
||||||
task_tx: JoinHandle<()>,
|
|
||||||
task_rx: JoinHandle<()>,
|
|
||||||
|
|
||||||
/// Whether this side is the initiator.
|
|
||||||
/// The initiator can open even-numbered channels.
|
|
||||||
initiator: bool,
|
|
||||||
|
|
||||||
/// Queue to send new bytes out on the wire
|
|
||||||
tx_queue: Sender<(u16, Vec<u8>)>,
|
|
||||||
|
|
||||||
/// Senders for sending messages from the wire to channels
|
|
||||||
channels: Arc<DashMap<u16, Sender<Vec<u8>>>>,
|
|
||||||
|
|
||||||
/// Channel receivers for channels that have received messages but haven't yet been claimed
|
|
||||||
unclaimed_channels: Arc<DashMap<u16, Receiver<Vec<u8>>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_local! {
|
|
||||||
static CURRENT_MULTIPLEXER: RefCell<Weak<TransportMultiplexer>> = Default::default();
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TransportMultiplexer {
|
|
||||||
pub fn new<R: AsyncRead + Send + 'static + Unpin, W: AsyncWrite + Send + 'static + Unpin>(
|
|
||||||
rx: R,
|
|
||||||
tx: W,
|
|
||||||
initiator: bool,
|
|
||||||
) -> Result<Arc<TransportMultiplexer>, CnrError> {
|
|
||||||
let (txqueue_tx, txqueue_rx) = tokio::sync::mpsc::channel(8);
|
|
||||||
|
|
||||||
let channels = Arc::new(Default::default());
|
|
||||||
let unclaimed_channels = Arc::new(Default::default());
|
|
||||||
|
|
||||||
let task_tx = tokio::spawn(async move {
|
|
||||||
if let Err(err) = TransportMultiplexer::handle_tx(tx, txqueue_rx).await {
|
|
||||||
error!("TX handler failed: {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let channels2 = Arc::clone(&channels);
|
|
||||||
let unclaimed_channels2 = Arc::clone(&unclaimed_channels);
|
|
||||||
let task_rx = tokio::spawn(async move {
|
|
||||||
if let Err(err) =
|
|
||||||
TransportMultiplexer::handle_rx(rx, channels2, unclaimed_channels2).await
|
|
||||||
{
|
|
||||||
error!("RX handler failed: {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Arc::new(TransportMultiplexer {
|
|
||||||
task_tx,
|
|
||||||
task_rx,
|
|
||||||
initiator,
|
|
||||||
tx_queue: txqueue_tx,
|
|
||||||
channels,
|
|
||||||
unclaimed_channels,
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn open_channel_with_id<R, W>(self: &Arc<Self>, channel_id: u16) -> Option<Channel<R, W>> {
|
|
||||||
match self.channels.entry(channel_id) {
|
|
||||||
Entry::Occupied(_) => {
|
|
||||||
match self.unclaimed_channels.remove(&channel_id) {
|
|
||||||
Some((_k, chan_rx)) => Some(Channel {
|
|
||||||
id: channel_id,
|
|
||||||
inner: Arc::new(Mutex::new(ChannelInner {
|
|
||||||
chan_id: channel_id,
|
|
||||||
tx: self.tx_queue.clone(),
|
|
||||||
rx: chan_rx,
|
|
||||||
multiplexer: Arc::downgrade(&self),
|
|
||||||
})),
|
|
||||||
marker: Default::default(),
|
|
||||||
}),
|
|
||||||
None => {
|
|
||||||
// Channel ID already in use.
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Entry::Vacant(ve) => {
|
|
||||||
let (chan_tx, chan_rx) = tokio::sync::mpsc::channel(8);
|
|
||||||
ve.insert(chan_tx);
|
|
||||||
Some(Channel {
|
|
||||||
id: channel_id,
|
|
||||||
inner: Arc::new(Mutex::new(ChannelInner {
|
|
||||||
chan_id: channel_id,
|
|
||||||
tx: self.tx_queue.clone(),
|
|
||||||
rx: chan_rx,
|
|
||||||
multiplexer: Arc::downgrade(&self),
|
|
||||||
})),
|
|
||||||
marker: Default::default(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn open_unused_channel<R, W>(self: &Arc<Self>) -> Option<Channel<R, W>> {
|
|
||||||
loop {
|
|
||||||
let id: u16 = rand::random();
|
|
||||||
let id = if self.initiator { id & !1 } else { id | 1 };
|
|
||||||
if let Some(chan) = self.open_channel_with_id(id) {
|
|
||||||
return Some(chan);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_tx<W: AsyncWrite + Unpin>(
|
|
||||||
mut tx: W,
|
|
||||||
mut txqueue_rx: Receiver<(u16, Vec<u8>)>,
|
|
||||||
) -> Result<(), CnrError> {
|
|
||||||
while let Some((chan_id, next_msg)) = txqueue_rx.recv().await {
|
|
||||||
tx.write_u16(chan_id).await?;
|
|
||||||
tx.write_u32(next_msg.len().try_into().unwrap()).await?;
|
|
||||||
tx.write_all(&next_msg).await?;
|
|
||||||
// TODO(performance) would be nice not to flush if something else is just behind.
|
|
||||||
tx.flush().await?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_rx<R: AsyncRead + Unpin>(
|
|
||||||
mut rx: R,
|
|
||||||
channels: Arc<DashMap<u16, Sender<Vec<u8>>>>,
|
|
||||||
unclaimed_channels: Arc<DashMap<u16, Receiver<Vec<u8>>>>,
|
|
||||||
) -> Result<(), CnrError> {
|
|
||||||
loop {
|
|
||||||
// TODO(features): need to be able to support graceful EOF
|
|
||||||
let chan_id = rx.read_u16().await?;
|
|
||||||
let length = rx.read_u32().await? as usize;
|
|
||||||
// TODO(perf): use uninit?
|
|
||||||
let mut buf = vec![0u8; length];
|
|
||||||
|
|
||||||
rx.read_exact(&mut buf[..]).await?;
|
|
||||||
|
|
||||||
match channels.entry(chan_id) {
|
|
||||||
Entry::Occupied(oe) => {
|
|
||||||
if let Err(err) = oe.get().send(buf).await {
|
|
||||||
// TODO this channel has died. What can we do about it?
|
|
||||||
warn!("Message received but channel {} dead", chan_id);
|
|
||||||
// TODO maybe we should clean it up at this point...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Entry::Vacant(ve) => {
|
|
||||||
let (chan_tx, chan_rx) = tokio::sync::mpsc::channel(8);
|
|
||||||
unclaimed_channels.insert(chan_id, chan_rx);
|
|
||||||
chan_tx.try_send(buf).expect("empty channel succeeds");
|
|
||||||
ve.insert(chan_tx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ChannelHandle<LTR, RTL, const INITIATOR: bool> {
|
|
||||||
marker: PhantomData<(LTR, RTL)>,
|
|
||||||
chan_id: u16,
|
|
||||||
multiplexer: Weak<TransportMultiplexer>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<LTR, RTL> ChannelHandle<LTR, RTL, false> {
|
|
||||||
pub fn into_channel(self) -> Option<Channel<RTL, LTR>> {
|
|
||||||
let multiplexer = self.multiplexer.upgrade()?;
|
|
||||||
multiplexer.open_channel_with_id(self.chan_id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//impl<LTR, RTL, const INITIATOR: bool> Serialize for ChannelHandle<LTR, RTL, INITIATOR> {
|
|
||||||
impl<LTR, RTL> Serialize for ChannelHandle<LTR, RTL, true> {
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
self.chan_id.serialize(serializer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//impl<'de, LTR, RTL, const INITIATOR: bool> Deserialize<'de> for ChannelHandle<LTR, RTL, INITIATOR> {
|
|
||||||
impl<'de, LTR, RTL> Deserialize<'de> for ChannelHandle<LTR, RTL, false> {
|
|
||||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
|
||||||
where
|
|
||||||
D: Deserializer<'de>,
|
|
||||||
{
|
|
||||||
// TODO We could check the initiator flags are the right way around by embedding a bool in
|
|
||||||
// the format.
|
|
||||||
let chan_id = u16::deserialize(deserializer)?;
|
|
||||||
Ok(ChannelHandle {
|
|
||||||
marker: Default::default(),
|
|
||||||
chan_id,
|
|
||||||
multiplexer: CURRENT_MULTIPLEXER.with(|refcell| refcell.borrow().clone()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ChannelInner {
|
|
||||||
chan_id: u16,
|
|
||||||
tx: Sender<(u16, Vec<u8>)>,
|
|
||||||
rx: Receiver<Vec<u8>>,
|
|
||||||
multiplexer: Weak<TransportMultiplexer>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for ChannelInner {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Some(multiplexer) = self.multiplexer.upgrade() {
|
|
||||||
multiplexer.channels.remove(&self.chan_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ChannelLock<'a, R, W> {
|
|
||||||
// TODO we could use an OwnedMutexGuard if needed
|
|
||||||
guard: MutexGuard<'a, ChannelInner>,
|
|
||||||
marker: PhantomData<(R, W)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, R: DeserializeOwned, W: Serialize> ChannelLock<'a, R, W> {
|
|
||||||
pub async fn send(&mut self, message: &W) -> Result<(), CnrError> {
|
|
||||||
let bytes = serde_bare::to_vec(message)?;
|
|
||||||
self.guard
|
|
||||||
.tx
|
|
||||||
.send((self.guard.chan_id, bytes))
|
|
||||||
.await
|
|
||||||
.map_err(|_| CnrError::Closed)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn recv(&mut self) -> Result<R, CnrError> {
|
|
||||||
let new_weak = self.guard.multiplexer.clone();
|
|
||||||
CURRENT_MULTIPLEXER.with(move |refcell| {
|
|
||||||
*(refcell.borrow_mut()) = new_weak;
|
|
||||||
});
|
|
||||||
let res = match self.guard.rx.recv().await {
|
|
||||||
Some(bytes) => Ok(serde_bare::from_slice(&bytes)?),
|
|
||||||
None => Err(CnrError::Closed),
|
|
||||||
};
|
|
||||||
CURRENT_MULTIPLEXER.with(|refcell| {
|
|
||||||
*(refcell.borrow_mut()) = Weak::new();
|
|
||||||
});
|
|
||||||
res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Channel<R, W> {
|
|
||||||
pub id: u16,
|
|
||||||
inner: Arc<Mutex<ChannelInner>>,
|
|
||||||
marker: PhantomData<(R, W)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R, W> Channel<R, W> {
|
|
||||||
/// Channel handles only get sent in one direction.
|
|
||||||
pub fn handle(&self) -> ChannelHandle<R, W, true> {
|
|
||||||
ChannelHandle {
|
|
||||||
marker: Default::default(),
|
|
||||||
chan_id: self.id,
|
|
||||||
multiplexer: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: Send + DeserializeOwned, W: Serialize> Channel<R, W> {
|
|
||||||
pub async fn lock(&self) -> ChannelLock<'_, R, W> {
|
|
||||||
ChannelLock {
|
|
||||||
guard: self.inner.lock().await,
|
|
||||||
marker: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send(&mut self, message: &W) -> Result<(), CnrError> {
|
|
||||||
self.lock().await.send(message).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn recv(&mut self) -> Result<R, CnrError> {
|
|
||||||
self.lock().await.recv().await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use crate::multiplexer::{Channel, ChannelHandle, TransportMultiplexer};
|
|
||||||
|
|
||||||
/// Tests that data reaches the other side!
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_data_reaches_the_other_side() {
|
|
||||||
let (commander, responder) = tokio::io::duplex(64);
|
|
||||||
let (commander_r, commander_w) = tokio::io::split(commander);
|
|
||||||
let (responder_r, responder_w) = tokio::io::split(responder);
|
|
||||||
let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap();
|
|
||||||
let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap();
|
|
||||||
|
|
||||||
let mut c_chan0: Channel<u64, u8> = commander.open_channel_with_id(0).unwrap();
|
|
||||||
c_chan0.send(&32).await.unwrap();
|
|
||||||
|
|
||||||
let mut r_chan0: Channel<u8, u64> = responder.open_channel_with_id(0).unwrap();
|
|
||||||
assert_eq!(r_chan0.recv().await.unwrap(), 32);
|
|
||||||
r_chan0.send(&2048).await.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(c_chan0.recv().await.unwrap(), 2048);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tests that you can hand over a channel just by including a ChannelHandle in the message.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_channel_handover() {
|
|
||||||
let (commander, responder) = tokio::io::duplex(64);
|
|
||||||
let (commander_r, commander_w) = tokio::io::split(commander);
|
|
||||||
let (responder_r, responder_w) = tokio::io::split(responder);
|
|
||||||
let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap();
|
|
||||||
let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap();
|
|
||||||
|
|
||||||
let mut c_chan0: Channel<(), ChannelHandle<u8, u64, true>> =
|
|
||||||
commander.open_channel_with_id(0).unwrap();
|
|
||||||
let mut c_chan2: Channel<u8, u64> = commander.open_channel_with_id(2).unwrap();
|
|
||||||
c_chan0.send(&c_chan2.handle()).await.unwrap();
|
|
||||||
c_chan2.send(&42).await.unwrap();
|
|
||||||
|
|
||||||
let mut r_chan0: Channel<ChannelHandle<u8, u64, false>, ()> =
|
|
||||||
responder.open_channel_with_id(0).unwrap();
|
|
||||||
let mut r_chan2 = r_chan0.recv().await.unwrap().into_channel().unwrap();
|
|
||||||
assert_eq!(r_chan2.recv().await.unwrap(), 42);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue