shared/
events.rs

1use futures_util::StreamExt;
2use std::{collections::HashMap, pin::Pin, sync::Arc};
3use tokio::sync::RwLock;
4
5type Listener<Event> = dyn Fn(
6        crate::State,
7        Arc<Event>,
8    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>
9    + Send
10    + Sync;
11
12#[derive(Clone)]
13pub struct EventHandlerHandle {
14    listeners_ref: Arc<dyn DisconnectEventHandler + Send + Sync>,
15    id: uuid::Uuid,
16}
17
18impl EventHandlerHandle {
19    #[inline]
20    pub async fn disconnect(self) {
21        self.listeners_ref.disconnect(self.id).await;
22    }
23
24    /// # Warning
25    /// This method will block the current thread if the lock is not available
26    #[inline]
27    pub fn blocking_disconnect(self) {
28        self.listeners_ref.blocking_disconnect(self.id);
29    }
30}
31
32#[async_trait::async_trait]
33pub(crate) trait DisconnectEventHandler {
34    async fn disconnect(&self, id: uuid::Uuid);
35    fn blocking_disconnect(&self, id: uuid::Uuid);
36}
37
38#[async_trait::async_trait]
39impl<Event> DisconnectEventHandler for RwLock<HashMap<uuid::Uuid, Box<Listener<Event>>>> {
40    #[inline]
41    async fn disconnect(&self, id: uuid::Uuid) {
42        self.write().await.remove(&id);
43    }
44
45    #[inline]
46    fn blocking_disconnect(&self, id: uuid::Uuid) {
47        self.blocking_write().remove(&id);
48    }
49}
50
51pub struct EventEmitter<Event: 'static + Send + Sync> {
52    listeners: Arc<RwLock<HashMap<uuid::Uuid, Box<Listener<Event>>>>>,
53    event_channel: tokio::sync::mpsc::Sender<(crate::State, Event)>,
54    task: tokio::task::JoinHandle<()>,
55}
56
57impl<Event: 'static + Send + Sync> Default for EventEmitter<Event> {
58    fn default() -> Self {
59        let listeners = Arc::new(RwLock::new(HashMap::new()));
60        let (event_channel_sender, mut event_channel_receiver) = tokio::sync::mpsc::channel(64);
61
62        Self {
63            listeners: listeners.clone(),
64            event_channel: event_channel_sender,
65            task: tokio::spawn(async move {
66                let semaphore = Arc::new(tokio::sync::Semaphore::new(8));
67
68                while let Some((state, event)) = event_channel_receiver.recv().await {
69                    tracing::debug!("emitting event {:?}", std::any::type_name::<Event>());
70
71                    let listeners = listeners.clone();
72                    let permit = match semaphore.clone().acquire_owned().await {
73                        Ok(permit) => permit,
74                        Err(_) => {
75                            tracing::error!("semaphore closed, shutting down event emitter");
76                            break;
77                        }
78                    };
79
80                    tokio::spawn(async move {
81                        let event = Arc::new(event);
82                        let listeners = listeners
83                            .read()
84                            .await
85                            .values()
86                            .map(|listener| listener(state.clone(), event.clone()))
87                            .collect::<Vec<_>>();
88
89                        let mut result_stream =
90                            futures_util::stream::iter(listeners).buffer_unordered(8);
91
92                        while let Some(result) = result_stream.next().await {
93                            if let Err(err) = result {
94                                tracing::error!(
95                                    "event listener error for {:?}: {:?}",
96                                    std::any::type_name::<Event>(),
97                                    err
98                                );
99                            }
100                        }
101
102                        drop(permit);
103                    });
104                }
105            }),
106        }
107    }
108}
109
110impl<Event: 'static + Send + Sync> EventEmitter<Event> {
111    pub async fn register_event_handler<
112        F: Fn(crate::State, Arc<Event>) -> Fut + Send + Sync + 'static,
113        Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
114    >(
115        &self,
116        listener: F,
117    ) -> EventHandlerHandle {
118        let id = uuid::Uuid::new_v4();
119        let listener_box = Box::new(move |state: crate::State, event: Arc<Event>| {
120            Box::pin(listener(state, event))
121                as Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>
122        }) as Box<Listener<Event>>;
123
124        self.listeners.write().await.insert(id, listener_box);
125
126        EventHandlerHandle {
127            listeners_ref: self.listeners.clone(),
128            id,
129        }
130    }
131
132    /// # Warning
133    /// This method will block the current thread if the lock is not available
134    pub fn blocking_register_event_handler<
135        F: Fn(crate::State, Arc<Event>) -> Fut + Send + Sync + 'static,
136        Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
137    >(
138        &self,
139        listener: F,
140    ) -> EventHandlerHandle {
141        let id = uuid::Uuid::new_v4();
142        let listener_box = Box::new(move |state: crate::State, event: Arc<Event>| {
143            Box::pin(listener(state, event))
144                as Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>
145        }) as Box<Listener<Event>>;
146
147        self.listeners.blocking_write().insert(id, listener_box);
148
149        EventHandlerHandle {
150            listeners_ref: self.listeners.clone(),
151            id,
152        }
153    }
154
155    #[inline]
156    pub fn emit(&self, state: crate::State, event: Event) {
157        let _ = self.event_channel.try_send((state, event));
158    }
159}
160
161impl<Event: 'static + Send + Sync> Drop for EventEmitter<Event> {
162    #[inline]
163    fn drop(&mut self) {
164        self.task.abort();
165    }
166}