Accept a function pointer in Subscription::map

Instead of a closure, a function pointer can be hashed and used to
uniquely identify a particular `Subscription`.

This should fix a bug where two different instances of `Subscription`
producing the same output were not treated differently by the runtime,
causing one of them to be ignored.
This commit is contained in:
Héctor Ramón Jiménez 2020-10-29 10:26:33 +01:00
parent b40775fb74
commit dd1b1cac0a

View File

@ -76,10 +76,6 @@ where
/// ///
/// The value will be part of the identity of a [`Subscription`]. /// The value will be part of the identity of a [`Subscription`].
/// ///
/// This is necessary if you want to use multiple instances of the same
/// [`Subscription`] to produce different kinds of messages based on some
/// external data.
///
/// [`Subscription`]: struct.Subscription.html /// [`Subscription`]: struct.Subscription.html
pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)> pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)>
where where
@ -103,24 +99,19 @@ where
/// Transforms the [`Subscription`] output with the given function. /// Transforms the [`Subscription`] output with the given function.
/// ///
/// [`Subscription`]: struct.Subscription.html /// [`Subscription`]: struct.Subscription.html
pub fn map<A>( pub fn map<A>(mut self, f: fn(O) -> A) -> Subscription<H, E, A>
mut self,
f: impl Fn(O) -> A + Send + Sync + 'static,
) -> Subscription<H, E, A>
where where
H: 'static, H: 'static,
E: 'static, E: 'static,
O: 'static, O: 'static,
A: 'static, A: 'static,
{ {
let function = std::sync::Arc::new(f);
Subscription { Subscription {
recipes: self recipes: self
.recipes .recipes
.drain(..) .drain(..)
.map(|recipe| { .map(|recipe| {
Box::new(Map::new(recipe, function.clone())) Box::new(Map::new(recipe, f))
as Box<dyn Recipe<H, E, Output = A>> as Box<dyn Recipe<H, E, Output = A>>
}) })
.collect(), .collect(),
@ -186,13 +177,13 @@ pub trait Recipe<Hasher: std::hash::Hasher, Event> {
struct Map<Hasher, Event, A, B> { struct Map<Hasher, Event, A, B> {
recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>, mapper: fn(A) -> B,
} }
impl<H, E, A, B> Map<H, E, A, B> { impl<H, E, A, B> Map<H, E, A, B> {
fn new( fn new(
recipe: Box<dyn Recipe<H, E, Output = A>>, recipe: Box<dyn Recipe<H, E, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>, mapper: fn(A) -> B,
) -> Self { ) -> Self {
Map { recipe, mapper } Map { recipe, mapper }
} }
@ -209,8 +200,8 @@ where
fn hash(&self, state: &mut H) { fn hash(&self, state: &mut H) {
use std::hash::Hash; use std::hash::Hash;
std::any::TypeId::of::<B>().hash(state);
self.recipe.hash(state); self.recipe.hash(state);
self.mapper.hash(state);
} }
fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> { fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> {