Merge pull request #122 from hecrj/feature/event-subscriptions

Event subscriptions
This commit is contained in:
Héctor Ramón 2019-12-16 21:38:56 +01:00 committed by GitHub
commit 0f2e20f5e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 726 additions and 39 deletions

View File

@ -37,14 +37,16 @@ iced_wgpu = { version = "0.1.0", path = "wgpu" }
iced_web = { version = "0.1.0", path = "web" }
[dev-dependencies]
iced_native = { version = "0.1", path = "./native" }
iced_wgpu = { version = "0.1", path = "./wgpu" }
env_logger = "0.7"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
directories = "2.0"
reqwest = "0.9"
futures = "0.3"
async-std = { version = "1.3", features = ["unstable"] }
surf = { version = "1.0", git = "https://github.com/http-rs/surf.git", rev = "2ff0f95513e82bdb5ccc56767f9dd0985f2eb8fe" }
rand = "0.7"
iced_native = { version = "0.1", path = "./native" }
iced_wgpu = { version = "0.1", path = "./wgpu" }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen = "0.2.51"

View File

@ -10,6 +10,8 @@ repository = "https://github.com/hecrj/iced"
[features]
# Exposes a future-based `Command` type
command = ["futures"]
# Exposes a future-based `Subscription` type
subscription = ["futures"]
[dependencies]
futures = { version = "0.3", optional = true }

View File

@ -34,8 +34,8 @@ impl<T> Command<T> {
}
}
/// Creates a [`Command`] that performs the actions of all the givens
/// futures.
/// Creates a [`Command`] that performs the actions of all the given
/// commands.
///
/// Once this command is run, all the futures will be exectued at once.
///

View File

@ -38,3 +38,9 @@ mod command;
#[cfg(feature = "command")]
pub use command::Command;
#[cfg(feature = "subscription")]
pub mod subscription;
#[cfg(feature = "subscription")]
pub use subscription::Subscription;

182
core/src/subscription.rs Normal file
View File

@ -0,0 +1,182 @@
//! Listen to external events in your application.
/// A request to listen to external events.
///
/// Besides performing async actions on demand with [`Command`], most
/// applications also need to listen to external events passively.
///
/// A [`Subscription`] is normally provided to some runtime, like a [`Command`],
/// and it will generate events as long as the user keeps requesting it.
///
/// For instance, you can use a [`Subscription`] to listen to a WebSocket
/// connection, keyboard presses, mouse events, time ticks, etc.
///
/// This type is normally aliased by runtimes with a specific `Input` and/or
/// `Hasher`.
///
/// [`Command`]: ../struct.Command.html
/// [`Subscription`]: struct.Subscription.html
pub struct Subscription<Hasher, Input, Output> {
recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>,
}
impl<H, I, O> Subscription<H, I, O>
where
H: std::hash::Hasher,
{
/// Returns an empty [`Subscription`] that will not produce any output.
///
/// [`Subscription`]: struct.Subscription.html
pub fn none() -> Self {
Self {
recipes: Vec::new(),
}
}
/// Creates a [`Subscription`] from a [`Recipe`] describing it.
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
pub fn from_recipe(
recipe: impl Recipe<H, I, Output = O> + 'static,
) -> Self {
Self {
recipes: vec![Box::new(recipe)],
}
}
/// Batches all the provided subscriptions and returns the resulting
/// [`Subscription`].
///
/// [`Subscription`]: struct.Subscription.html
pub fn batch(
subscriptions: impl Iterator<Item = Subscription<H, I, O>>,
) -> Self {
Self {
recipes: subscriptions
.flat_map(|subscription| subscription.recipes)
.collect(),
}
}
/// Returns the different recipes of the [`Subscription`].
///
/// [`Subscription`]: struct.Subscription.html
pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> {
self.recipes
}
/// Transforms the [`Subscription`] output with the given function.
///
/// [`Subscription`]: struct.Subscription.html
pub fn map<A>(
mut self,
f: impl Fn(O) -> A + Send + Sync + 'static,
) -> Subscription<H, I, A>
where
H: 'static,
I: 'static,
O: 'static,
A: 'static,
{
let function = std::sync::Arc::new(f);
Subscription {
recipes: self
.recipes
.drain(..)
.map(|recipe| {
Box::new(Map::new(recipe, function.clone()))
as Box<dyn Recipe<H, I, Output = A>>
})
.collect(),
}
}
}
impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription").finish()
}
}
/// The description of a [`Subscription`].
///
/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
/// by runtimes to run and identify subscriptions. You can use it to create your
/// own!
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
pub trait Recipe<Hasher: std::hash::Hasher, Input> {
/// The events that will be produced by a [`Subscription`] with this
/// [`Recipe`].
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
type Output;
/// Hashes the [`Recipe`].
///
/// This is used by runtimes to uniquely identify a [`Subscription`].
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
fn hash(&self, state: &mut Hasher);
/// Executes the [`Recipe`] and produces the stream of events of its
/// [`Subscription`].
///
/// It receives some generic `Input`, which is normally defined by runtimes.
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
fn stream(
self: Box<Self>,
input: Input,
) -> futures::stream::BoxStream<'static, Self::Output>;
}
struct Map<Hasher, Input, A, B> {
recipe: Box<dyn Recipe<Hasher, Input, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>,
}
impl<H, I, A, B> Map<H, I, A, B> {
fn new(
recipe: Box<dyn Recipe<H, I, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>,
) -> Self {
Map { recipe, mapper }
}
}
impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B>
where
A: 'static,
B: 'static,
H: std::hash::Hasher,
{
type Output = B;
fn hash(&self, state: &mut H) {
use std::hash::Hash;
std::any::TypeId::of::<B>().hash(state);
self.recipe.hash(state);
}
fn stream(
self: Box<Self>,
input: I,
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::StreamExt;
let mapper = self.mapper;
self.recipe
.stream(input)
.map(move |element| mapper(element))
.boxed()
}
}

91
examples/events.rs Normal file
View File

@ -0,0 +1,91 @@
use iced::{
Align, Application, Checkbox, Column, Command, Container, Element, Length,
Settings, Subscription, Text,
};
pub fn main() {
Events::run(Settings::default())
}
#[derive(Debug, Default)]
struct Events {
last: Vec<iced_native::Event>,
enabled: bool,
}
#[derive(Debug, Clone)]
enum Message {
EventOccurred(iced_native::Event),
Toggled(bool),
}
impl Application for Events {
type Message = Message;
fn new() -> (Events, Command<Message>) {
(Events::default(), Command::none())
}
fn title(&self) -> String {
String::from("Events - Iced")
}
fn update(&mut self, message: Message) -> Command<Message> {
match message {
Message::EventOccurred(event) => {
self.last.push(event);
if self.last.len() > 5 {
let _ = self.last.remove(0);
}
}
Message::Toggled(enabled) => {
self.enabled = enabled;
}
};
Command::none()
}
fn subscription(&self) -> Subscription<Message> {
if self.enabled {
iced_native::subscription::events().map(Message::EventOccurred)
} else {
Subscription::none()
}
}
fn view(&mut self) -> Element<Message> {
let events = self.last.iter().fold(
Column::new().width(Length::Shrink).spacing(10),
|column, event| {
column.push(
Text::new(format!("{:?}", event))
.size(40)
.width(Length::Shrink),
)
},
);
let toggle = Checkbox::new(
self.enabled,
"Listen to runtime events",
Message::Toggled,
)
.width(Length::Shrink);
let content = Column::new()
.width(Length::Shrink)
.align_items(Align::Center)
.spacing(20)
.push(events)
.push(toggle);
Container::new(content)
.width(Length::Fill)
.height(Length::Fill)
.center_x()
.center_y()
.into()
}
}

View File

@ -150,7 +150,6 @@ impl Pokemon {
async fn search() -> Result<Pokemon, Error> {
use rand::Rng;
use serde::Deserialize;
use std::io::Read;
#[derive(Debug, Deserialize)]
struct Entry {
@ -179,7 +178,11 @@ impl Pokemon {
let url = format!("https://pokeapi.co/api/v2/pokemon-species/{}", id);
let sprite = format!("https://raw.githubusercontent.com/PokeAPI/sprites/master/sprites/pokemon/{}.png", id);
let entry: Entry = reqwest::get(&url)?.json()?;
let (entry, sprite): (Entry, _) = futures::future::try_join(
surf::get(&url).recv_json(),
surf::get(&sprite).recv_bytes(),
)
.await?;
let description = entry
.flavor_text_entries
@ -188,13 +191,6 @@ impl Pokemon {
.next()
.ok_or(Error::LanguageError)?;
let mut sprite = reqwest::get(&sprite)?;
let mut bytes = Vec::new();
sprite
.read_to_end(&mut bytes)
.map_err(|_| Error::ImageError)?;
Ok(Pokemon {
number: id,
name: entry.name.to_uppercase(),
@ -203,7 +199,7 @@ impl Pokemon {
.chars()
.map(|c| if c.is_control() { ' ' } else { c })
.collect(),
image: image::Handle::from_memory(bytes),
image: image::Handle::from_memory(sprite),
})
}
}
@ -211,13 +207,12 @@ impl Pokemon {
#[derive(Debug, Clone)]
enum Error {
APIError,
ImageError,
LanguageError,
}
impl From<reqwest::Error> for Error {
fn from(error: reqwest::Error) -> Error {
dbg!(&error);
impl From<surf::Exception> for Error {
fn from(exception: surf::Exception) -> Error {
dbg!(&exception);
Error::APIError
}

182
examples/stopwatch.rs Normal file
View File

@ -0,0 +1,182 @@
use iced::{
button, Align, Application, Background, Button, Color, Column, Command,
Container, Element, HorizontalAlignment, Length, Row, Settings,
Subscription, Text,
};
use std::time::{Duration, Instant};
pub fn main() {
Stopwatch::run(Settings::default())
}
struct Stopwatch {
duration: Duration,
state: State,
toggle: button::State,
reset: button::State,
}
enum State {
Idle,
Ticking { last_tick: Instant },
}
#[derive(Debug, Clone)]
enum Message {
Toggle,
Reset,
Tick(Instant),
}
impl Application for Stopwatch {
type Message = Message;
fn new() -> (Stopwatch, Command<Message>) {
(
Stopwatch {
duration: Duration::default(),
state: State::Idle,
toggle: button::State::new(),
reset: button::State::new(),
},
Command::none(),
)
}
fn title(&self) -> String {
String::from("Stopwatch - Iced")
}
fn update(&mut self, message: Message) -> Command<Message> {
match message {
Message::Toggle => match self.state {
State::Idle => {
self.state = State::Ticking {
last_tick: Instant::now(),
};
}
State::Ticking { .. } => {
self.state = State::Idle;
}
},
Message::Tick(now) => match &mut self.state {
State::Ticking { last_tick } => {
self.duration += now - *last_tick;
*last_tick = now;
}
_ => {}
},
Message::Reset => {
self.duration = Duration::default();
}
}
Command::none()
}
fn subscription(&self) -> Subscription<Message> {
match self.state {
State::Idle => Subscription::none(),
State::Ticking { .. } => {
time::every(Duration::from_millis(10)).map(Message::Tick)
}
}
}
fn view(&mut self) -> Element<Message> {
const MINUTE: u64 = 60;
const HOUR: u64 = 60 * MINUTE;
let seconds = self.duration.as_secs();
let duration = Text::new(format!(
"{:0>2}:{:0>2}:{:0>2}.{:0>2}",
seconds / HOUR,
(seconds % HOUR) / MINUTE,
seconds % MINUTE,
self.duration.subsec_millis() / 10,
))
.width(Length::Shrink)
.size(40);
let button = |state, label, color: [f32; 3]| {
Button::new(
state,
Text::new(label)
.color(Color::WHITE)
.horizontal_alignment(HorizontalAlignment::Center),
)
.min_width(80)
.background(Background::Color(color.into()))
.border_radius(10)
.padding(10)
};
let toggle_button = {
let (label, color) = match self.state {
State::Idle => ("Start", [0.11, 0.42, 0.87]),
State::Ticking { .. } => ("Stop", [0.9, 0.4, 0.4]),
};
button(&mut self.toggle, label, color).on_press(Message::Toggle)
};
let reset_button = button(&mut self.reset, "Reset", [0.7, 0.7, 0.7])
.on_press(Message::Reset);
let controls = Row::new()
.width(Length::Shrink)
.spacing(20)
.push(toggle_button)
.push(reset_button);
let content = Column::new()
.width(Length::Shrink)
.align_items(Align::Center)
.spacing(20)
.push(duration)
.push(controls);
Container::new(content)
.width(Length::Fill)
.height(Length::Fill)
.center_x()
.center_y()
.into()
}
}
mod time {
pub fn every(
duration: std::time::Duration,
) -> iced::Subscription<std::time::Instant> {
iced::Subscription::from_recipe(Every(duration))
}
struct Every(std::time::Duration);
impl<H, I> iced_native::subscription::Recipe<H, I> for Every
where
H: std::hash::Hasher,
{
type Output = std::time::Instant;
fn hash(&self, state: &mut H) {
use std::hash::Hash;
std::any::TypeId::of::<Self>().hash(state);
self.0.hash(state);
}
fn stream(
self: Box<Self>,
_input: I,
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::stream::StreamExt;
async_std::stream::interval(self.0)
.map(|_| std::time::Instant::now())
.boxed()
}
}
}

View File

@ -517,21 +517,23 @@ impl SavedState {
}
async fn load() -> Result<SavedState, LoadError> {
use std::io::Read;
use async_std::prelude::*;
let mut contents = String::new();
let mut file = std::fs::File::open(Self::path())
let mut file = async_std::fs::File::open(Self::path())
.await
.map_err(|_| LoadError::FileError)?;
file.read_to_string(&mut contents)
.await
.map_err(|_| LoadError::FileError)?;
serde_json::from_str(&contents).map_err(|_| LoadError::FormatError)
}
async fn save(self) -> Result<(), SaveError> {
use std::io::Write;
use async_std::prelude::*;
let json = serde_json::to_string_pretty(&self)
.map_err(|_| SaveError::FormatError)?;
@ -539,20 +541,23 @@ impl SavedState {
let path = Self::path();
if let Some(dir) = path.parent() {
std::fs::create_dir_all(dir)
async_std::fs::create_dir_all(dir)
.await
.map_err(|_| SaveError::DirectoryError)?;
}
let mut file =
std::fs::File::create(path).map_err(|_| SaveError::FileError)?;
{
let mut file = async_std::fs::File::create(path)
.await
.map_err(|_| SaveError::FileError)?;
file.write_all(json.as_bytes())
.map_err(|_| SaveError::WriteError)?;
file.write_all(json.as_bytes())
.await
.map_err(|_| SaveError::WriteError)?;
}
// This is a simple way to save at most once every couple seconds
// We will be able to get rid of it once we implement event
// subscriptions
std::thread::sleep(std::time::Duration::from_secs(2));
async_std::task::sleep(std::time::Duration::from_secs(2)).await;
Ok(())
}

View File

@ -8,7 +8,8 @@ license = "MIT"
repository = "https://github.com/hecrj/iced"
[dependencies]
iced_core = { version = "0.1.0", path = "../core", features = ["command"] }
iced_core = { version = "0.1.0", path = "../core", features = ["command", "subscription"] }
twox-hash = "1.5"
raw-window-handle = "0.3"
unicode-segmentation = "1.6"
futures = "0.3"

View File

@ -42,6 +42,7 @@
pub mod input;
pub mod layout;
pub mod renderer;
pub mod subscription;
pub mod widget;
mod element;
@ -63,5 +64,6 @@ pub use layout::Layout;
pub use mouse_cursor::MouseCursor;
pub use renderer::Renderer;
pub use size::Size;
pub use subscription::Subscription;
pub use user_interface::{Cache, UserInterface};
pub use widget::*;

View File

@ -0,0 +1,42 @@
//! Listen to external events in your application.
use crate::{Event, Hasher};
use futures::stream::BoxStream;
/// A request to listen to external events.
///
/// Besides performing async actions on demand with [`Command`], most
/// applications also need to listen to external events passively.
///
/// A [`Subscription`] is normally provided to some runtime, like a [`Command`],
/// and it will generate events as long as the user keeps requesting it.
///
/// For instance, you can use a [`Subscription`] to listen to a WebSocket
/// connection, keyboard presses, mouse events, time ticks, etc.
///
/// [`Command`]: ../struct.Command.html
/// [`Subscription`]: struct.Subscription.html
pub type Subscription<T> = iced_core::Subscription<Hasher, EventStream, T>;
/// A stream of runtime events.
///
/// It is the input of a [`Subscription`] in the native runtime.
///
/// [`Subscription`]: type.Subscription.html
pub type EventStream = BoxStream<'static, Event>;
pub use iced_core::subscription::Recipe;
mod events;
use events::Events;
/// Returns a [`Subscription`] to all the runtime events.
///
/// This subscription will notify your application of any [`Event`] handled by
/// the runtime.
///
/// [`Subscription`]: type.Subscription.html
/// [`Event`]: ../enum.Event.html
pub fn events() -> Subscription<Event> {
Subscription::from_recipe(Events)
}

View File

@ -0,0 +1,23 @@
use crate::{
subscription::{EventStream, Recipe},
Event, Hasher,
};
pub struct Events;
impl Recipe<Hasher, EventStream> for Events {
type Output = Event;
fn hash(&self, state: &mut Hasher) {
use std::hash::Hash;
std::any::TypeId::of::<Self>().hash(state);
}
fn stream(
self: Box<Self>,
event_stream: EventStream,
) -> futures::stream::BoxStream<'static, Self::Output> {
event_stream
}
}

View File

@ -31,6 +31,7 @@ pub struct Checkbox<Message> {
on_toggle: Box<dyn Fn(bool) -> Message>,
label: String,
label_color: Option<Color>,
width: Length,
}
impl<Message> Checkbox<Message> {
@ -53,6 +54,7 @@ impl<Message> Checkbox<Message> {
on_toggle: Box::new(f),
label: String::from(label),
label_color: None,
width: Length::Fill,
}
}
@ -63,6 +65,14 @@ impl<Message> Checkbox<Message> {
self.label_color = Some(color.into());
self
}
/// Sets the width of the [`Checkbox`].
///
/// [`Checkbox`]: struct.Checkbox.html
pub fn width(mut self, width: Length) -> Self {
self.width = width;
self
}
}
impl<Message, Renderer> Widget<Message, Renderer> for Checkbox<Message>
@ -70,7 +80,7 @@ where
Renderer: self::Renderer + text::Renderer + row::Renderer,
{
fn width(&self) -> Length {
Length::Fill
self.width
}
fn height(&self) -> Length {
@ -85,6 +95,7 @@ where
let size = self::Renderer::default_size(renderer);
Row::<(), Renderer>::new()
.width(self.width)
.spacing(15)
.align_items(Align::Center)
.push(
@ -92,7 +103,7 @@ where
.width(Length::Units(size as u16))
.height(Length::Units(size as u16)),
)
.push(Text::new(&self.label))
.push(Text::new(&self.label).width(self.width))
.layout(renderer, limits)
}

View File

@ -1,4 +1,4 @@
use crate::{Command, Element, Settings};
use crate::{Command, Element, Settings, Subscription};
/// An interactive cross-platform application.
///
@ -117,6 +117,20 @@ pub trait Application: Sized {
/// [`Command`]: struct.Command.html
fn update(&mut self, message: Self::Message) -> Command<Self::Message>;
/// Returns the event [`Subscription`] for the current state of the
/// application.
///
/// A [`Subscription`] will be kept alive as long as you keep returning it,
/// and the __messages__ produced will be handled by
/// [`update`](#tymethod.update).
///
/// By default, this method returns an empty [`Subscription`].
///
/// [`Subscription`]: struct.Subscription.html
fn subscription(&self) -> Subscription<Self::Message> {
Subscription::none()
}
/// Returns the widgets to display in the [`Application`].
///
/// These widgets can produce __messages__ based on user interaction.
@ -168,6 +182,10 @@ where
self.0.update(message)
}
fn subscription(&self) -> Subscription<Self::Message> {
self.0.subscription()
}
fn view(&mut self) -> Element<'_, Self::Message> {
self.0.view()
}

View File

@ -1,6 +1,6 @@
pub use iced_winit::{
Align, Background, Color, Command, Font, HorizontalAlignment, Length,
VerticalAlignment,
Subscription, VerticalAlignment,
};
pub mod widget {

View File

@ -1,4 +1,4 @@
use crate::{Application, Command, Element, Settings};
use crate::{Application, Command, Element, Settings, Subscription};
/// A sandboxed [`Application`].
///
@ -149,6 +149,10 @@ where
Command::none()
}
fn subscription(&self) -> Subscription<T::Message> {
Subscription::none()
}
fn view(&mut self) -> Element<'_, T::Message> {
T::view(self)
}

View File

@ -2,8 +2,8 @@ use crate::{
conversion,
input::{keyboard, mouse},
renderer::{Target, Windowed},
Cache, Command, Container, Debug, Element, Event, Length, MouseCursor,
Settings, UserInterface,
subscription, Cache, Command, Container, Debug, Element, Event, Length,
MouseCursor, Settings, Subscription, UserInterface,
};
/// An interactive, native cross-platform application.
@ -57,6 +57,15 @@ pub trait Application: Sized {
/// [`Command`]: struct.Command.html
fn update(&mut self, message: Self::Message) -> Command<Self::Message>;
/// Returns the event `Subscription` for the current state of the
/// application.
///
/// The messages produced by the `Subscription` will be handled by
/// [`update`](#tymethod.update).
///
/// A `Subscription` will be kept alive as long as you keep returning it!
fn subscription(&self) -> Subscription<Self::Message>;
/// Returns the widgets to display in the [`Application`].
///
/// These widgets can produce __messages__ based on user interaction.
@ -89,11 +98,15 @@ pub trait Application: Sized {
let proxy = event_loop.create_proxy();
let mut thread_pool =
futures::executor::ThreadPool::new().expect("Create thread pool");
let mut subscription_pool = subscription::Pool::new();
let mut external_messages = Vec::new();
let (mut application, init_command) = Self::new();
spawn(init_command, &mut thread_pool, &proxy);
let subscription = application.subscription();
subscription_pool.update(subscription, &mut thread_pool, &proxy);
let mut title = application.title();
let window = {
@ -176,6 +189,10 @@ pub trait Application: Sized {
debug.layout_finished();
debug.event_processing_started();
events.iter().for_each(|event| {
subscription_pool.broadcast_event(*event)
});
let mut messages =
user_interface.update(&renderer, events.drain(..));
messages.extend(external_messages.drain(..));
@ -199,11 +216,17 @@ pub trait Application: Sized {
debug.update_started();
let command = application.update(message);
spawn(command, &mut thread_pool, &proxy);
debug.update_finished();
}
let subscription = application.subscription();
subscription_pool.update(
subscription,
&mut thread_pool,
&proxy,
);
// Update window title
let new_title = application.title();

View File

@ -29,6 +29,7 @@ pub mod conversion;
pub mod settings;
mod application;
mod subscription;
pub use application::Application;
pub use settings::Settings;

97
winit/src/subscription.rs Normal file
View File

@ -0,0 +1,97 @@
use iced_native::{Event, Hasher, Subscription};
use std::collections::HashMap;
pub struct Pool {
alive: HashMap<u64, Handle>,
}
pub struct Handle {
_cancel: futures::channel::oneshot::Sender<()>,
listener: Option<futures::channel::mpsc::Sender<Event>>,
}
impl Pool {
pub fn new() -> Self {
Self {
alive: HashMap::new(),
}
}
pub fn update<Message: Send>(
&mut self,
subscription: Subscription<Message>,
thread_pool: &mut futures::executor::ThreadPool,
proxy: &winit::event_loop::EventLoopProxy<Message>,
) {
use futures::{future::FutureExt, stream::StreamExt};
let recipes = subscription.recipes();
let mut alive = std::collections::HashSet::new();
for recipe in recipes {
let id = {
use std::hash::Hasher as _;
let mut hasher = Hasher::default();
recipe.hash(&mut hasher);
hasher.finish()
};
let _ = alive.insert(id);
if !self.alive.contains_key(&id) {
let (cancel, cancelled) = futures::channel::oneshot::channel();
// TODO: Use bus if/when it supports async
let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100);
let stream = recipe.stream(event_receiver.boxed());
let proxy = proxy.clone();
let future = futures::future::select(
cancelled,
stream.for_each(move |message| {
proxy
.send_event(message)
.expect("Send subscription result to event loop");
futures::future::ready(())
}),
)
.map(|_| ());
thread_pool.spawn_ok(future);
let _ = self.alive.insert(
id,
Handle {
_cancel: cancel,
listener: if event_sender.is_closed() {
None
} else {
Some(event_sender)
},
},
);
}
}
self.alive.retain(|id, _| alive.contains(&id));
}
pub fn broadcast_event(&mut self, event: Event) {
self.alive
.values_mut()
.filter_map(|connection| connection.listener.as_mut())
.for_each(|listener| {
if let Err(error) = listener.try_send(event) {
log::error!(
"Error sending event to subscription: {:?}",
error
);
}
});
}
}