Add tests and useful functionality

This commit is contained in:
Olivier 'reivilibre' 2022-05-28 21:21:40 +01:00
parent b659a5ddac
commit 00b06963d7
3 changed files with 72 additions and 2 deletions

View File

@ -11,6 +11,10 @@ serde = "1.0.137"
serde_bare = "0.5.0"
dashmap = "5.3.4"
log = "0.4.17"
rand = "0.8.5"
[dev-dependencies]
tokio-test = "0.4.2"
[dependencies.tokio]
# TODO optional = true

View File

@ -17,3 +17,8 @@ pub enum CnrError {
#[error("Channel closed.")]
Closed,
}
pub use multiplexer::Channel;
pub use multiplexer::ChannelHandle;
pub use multiplexer::ChannelLock;
pub use multiplexer::TransportMultiplexer;

View File

@ -1,11 +1,9 @@
use crate::CnrError;
use crate::CnrError::Closed;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use log::{error, warn};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::borrow::{Borrow, BorrowMut};
use std::cell::RefCell;
use std::marker::PhantomData;
use std::sync::{Arc, Weak};
@ -109,6 +107,16 @@ impl TransportMultiplexer {
}
}
pub fn open_unused_channel<R, W>(self: &Arc<Self>) -> Option<Channel<R, W>> {
loop {
let id: u16 = rand::random();
let id = if self.initiator { id & !1 } else { id | 1 };
if let Some(chan) = self.open_channel_with_id(id) {
return Some(chan);
}
}
}
async fn handle_tx<W: AsyncWrite + Unpin>(
mut tx: W,
mut txqueue_rx: Receiver<(u16, Vec<u8>)>,
@ -203,6 +211,14 @@ struct ChannelInner {
multiplexer: Weak<TransportMultiplexer>,
}
impl Drop for ChannelInner {
fn drop(&mut self) {
if let Some(multiplexer) = self.multiplexer.upgrade() {
multiplexer.channels.remove(&self.chan_id);
}
}
}
pub struct ChannelLock<'a, R, W> {
// TODO we could use an OwnedMutexGuard if needed
guard: MutexGuard<'a, ChannelInner>,
@ -269,3 +285,48 @@ impl<R: Send + DeserializeOwned, W: Serialize> Channel<R, W> {
self.lock().await.recv().await
}
}
#[cfg(test)]
mod test {
use crate::multiplexer::{Channel, ChannelHandle, TransportMultiplexer};
/// Tests that data reaches the other side!
#[tokio::test]
async fn test_data_reaches_the_other_side() {
let (commander, responder) = tokio::io::duplex(64);
let (commander_r, commander_w) = tokio::io::split(commander);
let (responder_r, responder_w) = tokio::io::split(responder);
let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap();
let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap();
let mut c_chan0: Channel<u64, u8> = commander.open_channel_with_id(0).unwrap();
c_chan0.send(&32).await.unwrap();
let mut r_chan0: Channel<u8, u64> = responder.open_channel_with_id(0).unwrap();
assert_eq!(r_chan0.recv().await.unwrap(), 32);
r_chan0.send(&2048).await.unwrap();
assert_eq!(c_chan0.recv().await.unwrap(), 2048);
}
/// Tests that you can hand over a channel just by including a ChannelHandle in the message.
#[tokio::test]
async fn test_channel_handover() {
let (commander, responder) = tokio::io::duplex(64);
let (commander_r, commander_w) = tokio::io::split(commander);
let (responder_r, responder_w) = tokio::io::split(responder);
let commander = TransportMultiplexer::new(commander_r, commander_w, true).unwrap();
let responder = TransportMultiplexer::new(responder_r, responder_w, false).unwrap();
let mut c_chan0: Channel<(), ChannelHandle<u8, u64, true>> =
commander.open_channel_with_id(0).unwrap();
let mut c_chan2: Channel<u8, u64> = commander.open_channel_with_id(2).unwrap();
c_chan0.send(&c_chan2.handle()).await.unwrap();
c_chan2.send(&42).await.unwrap();
let mut r_chan0: Channel<ChannelHandle<u8, u64, false>, ()> =
responder.open_channel_with_id(0).unwrap();
let mut r_chan2 = r_chan0.recv().await.unwrap().into_channel().unwrap();
assert_eq!(r_chan2.recv().await.unwrap(), 42);
}
}