1use futures_util::StreamExt;
2use std::{collections::HashMap, pin::Pin, sync::Arc};
3use tokio::sync::RwLock;
4
5type Listener<Event> = dyn Fn(
6 crate::State,
7 Arc<Event>,
8 ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>
9 + Send
10 + Sync;
11
12#[derive(Clone)]
13pub struct EventHandlerHandle {
14 listeners_ref: Arc<dyn DisconnectEventHandler + Send + Sync>,
15 id: uuid::Uuid,
16}
17
18impl EventHandlerHandle {
19 #[inline]
20 pub async fn disconnect(self) {
21 self.listeners_ref.disconnect(self.id).await;
22 }
23
24 #[inline]
27 pub fn blocking_disconnect(self) {
28 self.listeners_ref.blocking_disconnect(self.id);
29 }
30}
31
32#[async_trait::async_trait]
33pub(crate) trait DisconnectEventHandler {
34 async fn disconnect(&self, id: uuid::Uuid);
35 fn blocking_disconnect(&self, id: uuid::Uuid);
36}
37
38#[async_trait::async_trait]
39impl<Event> DisconnectEventHandler for RwLock<HashMap<uuid::Uuid, Box<Listener<Event>>>> {
40 #[inline]
41 async fn disconnect(&self, id: uuid::Uuid) {
42 self.write().await.remove(&id);
43 }
44
45 #[inline]
46 fn blocking_disconnect(&self, id: uuid::Uuid) {
47 self.blocking_write().remove(&id);
48 }
49}
50
51pub struct EventEmitter<Event: 'static + Send + Sync> {
52 listeners: Arc<RwLock<HashMap<uuid::Uuid, Box<Listener<Event>>>>>,
53 event_channel: tokio::sync::mpsc::Sender<(crate::State, Event)>,
54 task: tokio::task::JoinHandle<()>,
55}
56
57impl<Event: 'static + Send + Sync> Default for EventEmitter<Event> {
58 fn default() -> Self {
59 let listeners = Arc::new(RwLock::new(HashMap::new()));
60 let (event_channel_sender, mut event_channel_receiver) = tokio::sync::mpsc::channel(64);
61
62 Self {
63 listeners: listeners.clone(),
64 event_channel: event_channel_sender,
65 task: tokio::spawn(async move {
66 let semaphore = Arc::new(tokio::sync::Semaphore::new(8));
67
68 while let Some((state, event)) = event_channel_receiver.recv().await {
69 tracing::debug!("emitting event {:?}", std::any::type_name::<Event>());
70
71 let listeners = listeners.clone();
72 let permit = match semaphore.clone().acquire_owned().await {
73 Ok(permit) => permit,
74 Err(_) => {
75 tracing::error!("semaphore closed, shutting down event emitter");
76 break;
77 }
78 };
79
80 tokio::spawn(async move {
81 let event = Arc::new(event);
82 let listeners = listeners
83 .read()
84 .await
85 .values()
86 .map(|listener| listener(state.clone(), event.clone()))
87 .collect::<Vec<_>>();
88
89 let mut result_stream =
90 futures_util::stream::iter(listeners).buffer_unordered(8);
91
92 while let Some(result) = result_stream.next().await {
93 if let Err(err) = result {
94 tracing::error!(
95 "event listener error for {:?}: {:?}",
96 std::any::type_name::<Event>(),
97 err
98 );
99 }
100 }
101
102 drop(permit);
103 });
104 }
105 }),
106 }
107 }
108}
109
110impl<Event: 'static + Send + Sync> EventEmitter<Event> {
111 pub async fn register_event_handler<
112 F: Fn(crate::State, Arc<Event>) -> Fut + Send + Sync + 'static,
113 Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
114 >(
115 &self,
116 listener: F,
117 ) -> EventHandlerHandle {
118 let id = uuid::Uuid::new_v4();
119 let listener_box = Box::new(move |state: crate::State, event: Arc<Event>| {
120 Box::pin(listener(state, event))
121 as Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>
122 }) as Box<Listener<Event>>;
123
124 self.listeners.write().await.insert(id, listener_box);
125
126 EventHandlerHandle {
127 listeners_ref: self.listeners.clone(),
128 id,
129 }
130 }
131
132 pub fn blocking_register_event_handler<
135 F: Fn(crate::State, Arc<Event>) -> Fut + Send + Sync + 'static,
136 Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
137 >(
138 &self,
139 listener: F,
140 ) -> EventHandlerHandle {
141 let id = uuid::Uuid::new_v4();
142 let listener_box = Box::new(move |state: crate::State, event: Arc<Event>| {
143 Box::pin(listener(state, event))
144 as Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>
145 }) as Box<Listener<Event>>;
146
147 self.listeners.blocking_write().insert(id, listener_box);
148
149 EventHandlerHandle {
150 listeners_ref: self.listeners.clone(),
151 id,
152 }
153 }
154
155 #[inline]
156 pub fn emit(&self, state: crate::State, event: Event) {
157 let _ = self.event_channel.try_send((state, event));
158 }
159}
160
161impl<Event: 'static + Send + Sync> Drop for EventEmitter<Event> {
162 #[inline]
163 fn drop(&mut self) {
164 self.task.abort();
165 }
166}