From 00b06963d7b2f6e8b74308f6962c71766e265553 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 28 May 2022 21:21:40 +0100 Subject: [PATCH] Add tests and useful functionality --- bare_cnr/Cargo.toml | 4 +++ bare_cnr/src/lib.rs | 5 +++ bare_cnr/src/multiplexer.rs | 65 +++++++++++++++++++++++++++++++++++-- 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/bare_cnr/Cargo.toml b/bare_cnr/Cargo.toml index 063b170..b1d23a1 100644 --- a/bare_cnr/Cargo.toml +++ b/bare_cnr/Cargo.toml @@ -11,6 +11,10 @@ serde = "1.0.137" serde_bare = "0.5.0" dashmap = "5.3.4" log = "0.4.17" +rand = "0.8.5" + +[dev-dependencies] +tokio-test = "0.4.2" [dependencies.tokio] # TODO optional = true diff --git a/bare_cnr/src/lib.rs b/bare_cnr/src/lib.rs index 13debef..be55a1a 100644 --- a/bare_cnr/src/lib.rs +++ b/bare_cnr/src/lib.rs @@ -17,3 +17,8 @@ pub enum CnrError { #[error("Channel closed.")] Closed, } + +pub use multiplexer::Channel; +pub use multiplexer::ChannelHandle; +pub use multiplexer::ChannelLock; +pub use multiplexer::TransportMultiplexer; diff --git a/bare_cnr/src/multiplexer.rs b/bare_cnr/src/multiplexer.rs index 408f809..ccac1dc 100644 --- a/bare_cnr/src/multiplexer.rs +++ b/bare_cnr/src/multiplexer.rs @@ -1,11 +1,9 @@ use crate::CnrError; -use crate::CnrError::Closed; use dashmap::mapref::entry::Entry; use dashmap::DashMap; use log::{error, warn}; use serde::de::DeserializeOwned; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::borrow::{Borrow, BorrowMut}; use std::cell::RefCell; use std::marker::PhantomData; use std::sync::{Arc, Weak}; @@ -109,6 +107,16 @@ impl TransportMultiplexer { } } + pub fn open_unused_channel(self: &Arc) -> Option> { + loop { + let id: u16 = rand::random(); + let id = if self.initiator { id & !1 } else { id | 1 }; + if let Some(chan) = self.open_channel_with_id(id) { + return Some(chan); + } + } + } + async fn handle_tx( mut tx: W, mut txqueue_rx: Receiver<(u16, Vec)>, @@ -203,6 +211,14 @@ struct ChannelInner { multiplexer: Weak, } +impl Drop for ChannelInner { + fn drop(&mut self) { + if let Some(multiplexer) = self.multiplexer.upgrade() { + multiplexer.channels.remove(&self.chan_id); + } + } +} + pub struct ChannelLock<'a, R, W> { // TODO we could use an OwnedMutexGuard if needed guard: MutexGuard<'a, ChannelInner>, @@ -269,3 +285,48 @@ impl Channel { self.lock().await.recv().await } } + +#[cfg(test)] +mod test { + use crate::multiplexer::{Channel, ChannelHandle, TransportMultiplexer}; + + /// Tests that data reaches the other side! + #[tokio::test] + async fn test_data_reaches_the_other_side() { + let (commander, responder) = tokio::io::duplex(64); + let (commander_r, commander_w) = tokio::io::split(commander); + let (responder_r, responder_w) = tokio::io::split(responder); + let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap(); + let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap(); + + let mut c_chan0: Channel = commander.open_channel_with_id(0).unwrap(); + c_chan0.send(&32).await.unwrap(); + + let mut r_chan0: Channel = responder.open_channel_with_id(0).unwrap(); + assert_eq!(r_chan0.recv().await.unwrap(), 32); + r_chan0.send(&2048).await.unwrap(); + + assert_eq!(c_chan0.recv().await.unwrap(), 2048); + } + + /// Tests that you can hand over a channel just by including a ChannelHandle in the message. + #[tokio::test] + async fn test_channel_handover() { + let (commander, responder) = tokio::io::duplex(64); + let (commander_r, commander_w) = tokio::io::split(commander); + let (responder_r, responder_w) = tokio::io::split(responder); + let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap(); + let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap(); + + let mut c_chan0: Channel<(), ChannelHandle> = + commander.open_channel_with_id(0).unwrap(); + let mut c_chan2: Channel = commander.open_channel_with_id(2).unwrap(); + c_chan0.send(&c_chan2.handle()).await.unwrap(); + c_chan2.send(&42).await.unwrap(); + + let mut r_chan0: Channel, ()> = + responder.open_channel_with_id(0).unwrap(); + let mut r_chan2 = r_chan0.recv().await.unwrap().into_channel().unwrap(); + assert_eq!(r_chan2.recv().await.unwrap(), 42); + } +}