Skip to main content

shared/models/
mod.rs

1use crate::database::DatabaseError;
2use compact_str::CompactStringExt;
3use futures_util::{StreamExt, TryStreamExt};
4use garde::Validate;
5use serde::{Deserialize, Serialize, de::DeserializeOwned};
6use sqlx::{
7    Arguments, Postgres, QueryBuilder, Row,
8    encode::IsNull,
9    error::BoxDynError,
10    postgres::{PgArgumentBuffer, PgArguments, PgRow, PgTypeInfo},
11};
12use std::{
13    collections::{BTreeMap, HashSet},
14    marker::PhantomData,
15    pin::Pin,
16    sync::{Arc, LazyLock},
17};
18use tokio::sync::RwLock;
19use utoipa::ToSchema;
20
21pub mod admin_activity;
22pub mod announcement;
23pub mod backup_configuration;
24pub mod database_host;
25pub mod egg_configuration;
26pub mod egg_repository;
27pub mod egg_repository_egg;
28pub mod location;
29pub mod location_database_host;
30pub mod mount;
31pub mod nest;
32pub mod nest_egg;
33pub mod nest_egg_mount;
34pub mod nest_egg_variable;
35pub mod node;
36pub mod node_allocation;
37pub mod node_mount;
38pub mod oauth_provider;
39pub mod role;
40pub mod server;
41pub mod server_activity;
42pub mod server_allocation;
43pub mod server_backup;
44pub mod server_database;
45pub mod server_mount;
46pub mod server_schedule;
47pub mod server_schedule_step;
48pub mod server_subuser;
49pub mod server_variable;
50pub mod user;
51pub mod user_activity;
52pub mod user_api_key;
53pub mod user_command_snippet;
54pub mod user_oauth_link;
55pub mod user_password_reset;
56pub mod user_recovery_code;
57pub mod user_security_key;
58pub mod user_server_group;
59pub mod user_session;
60pub mod user_ssh_key;
61
62#[derive(ToSchema, Validate, Deserialize, Serialize)]
63pub struct PaginationParams {
64    #[garde(range(min = 1))]
65    #[schema(minimum = 1)]
66    #[serde(default = "Pagination::default_page")]
67    pub page: i64,
68    #[garde(range(min = 1, max = 100))]
69    #[schema(minimum = 1, maximum = 100)]
70    #[serde(default = "Pagination::default_per_page")]
71    pub per_page: i64,
72}
73
74#[derive(ToSchema, Validate, Deserialize, Serialize)]
75pub struct PaginationParamsWithSearch {
76    #[garde(range(min = 1))]
77    #[schema(minimum = 1)]
78    #[serde(default = "Pagination::default_page")]
79    pub page: i64,
80    #[garde(range(min = 1, max = 100))]
81    #[schema(minimum = 1, maximum = 100)]
82    #[serde(default = "Pagination::default_per_page")]
83    pub per_page: i64,
84    #[garde(length(chars, min = 1, max = 128))]
85    #[schema(min_length = 1, max_length = 128)]
86    #[serde(
87        default,
88        deserialize_with = "crate::deserialize::deserialize_string_option"
89    )]
90    pub search: Option<compact_str::CompactString>,
91}
92
93#[derive(ToSchema, Deserialize, Serialize)]
94pub struct Pagination<T: Serialize = serde_json::Value> {
95    pub total: i64,
96    pub per_page: i64,
97    pub page: i64,
98
99    pub data: Vec<T>,
100}
101
102impl Pagination {
103    #[inline]
104    pub const fn default_page() -> i64 {
105        1
106    }
107
108    #[inline]
109    pub const fn default_per_page() -> i64 {
110        25
111    }
112}
113
114impl<T: Serialize> Pagination<T> {
115    pub async fn async_map<R: serde::Serialize, Fut: Future<Output = R>>(
116        self,
117        mapper: impl Fn(T) -> Fut,
118    ) -> Pagination<R> {
119        let mut results = Vec::new();
120        results.reserve_exact(self.data.len());
121        let mut result_stream =
122            futures_util::stream::iter(self.data.into_iter().map(mapper)).buffered(25);
123
124        while let Some(result) = result_stream.next().await {
125            results.push(result);
126        }
127
128        Pagination {
129            total: self.total,
130            per_page: self.per_page,
131            page: self.page,
132            data: results,
133        }
134    }
135
136    pub async fn try_async_map<R: serde::Serialize, E, Fut: Future<Output = Result<R, E>>>(
137        self,
138        mapper: impl Fn(T) -> Fut,
139    ) -> Result<Pagination<R>, E> {
140        let mut results = Vec::new();
141        results.reserve_exact(self.data.len());
142        let mut result_stream =
143            futures_util::stream::iter(self.data.into_iter().map(mapper)).buffered(25);
144
145        while let Some(result) = result_stream.try_next().await? {
146            results.push(result);
147        }
148
149        Ok(Pagination {
150            total: self.total,
151            per_page: self.per_page,
152            page: self.page,
153            data: results,
154        })
155    }
156}
157
158pub type ModelExtensionList = std::sync::RwLock<Vec<Box<dyn ModelExtension + Send + Sync>>>;
159pub type ModelExtensionData = Vec<(compact_str::CompactString, Vec<u8>)>;
160pub type ModelExtensionMapType = Box<dyn erased_serde::Serialize>;
161
162pub trait ModelExtension {
163    fn extension_name(&self) -> &'static str;
164
165    fn extended_columns(&self, prefix: &str) -> BTreeMap<&'static str, compact_str::CompactString>;
166
167    fn map_extended(
168        &self,
169        prefix: &str,
170        row: &PgRow,
171    ) -> Result<ModelExtensionMapType, crate::database::DatabaseError>;
172}
173
174pub trait SafeModelExtension: ModelExtension {
175    type Value: Serialize + DeserializeOwned;
176
177    fn name() -> &'static str;
178}
179
180pub trait BaseModel: Serialize + DeserializeOwned {
181    const NAME: &'static str;
182
183    fn get_extension_list() -> &'static ModelExtensionList;
184    fn get_extension_data(&self) -> &ModelExtensionData;
185
186    /// Registers a model extension. If an extension with the same name is already registered, this function will do nothing.
187    fn register_model_extension(extension: impl ModelExtension + Send + Sync + 'static) {
188        let mut extensions = Self::get_extension_list().write().unwrap();
189
190        if extensions
191            .iter()
192            .any(|e| e.extension_name() == extension.extension_name())
193        {
194            return;
195        }
196
197        extensions.push(Box::new(extension));
198    }
199
200    /// Parses a model extension from the model's extension data. If the extension is not found, or if the data cannot be deserialized, an error is returned.
201    ///
202    /// This can be costly depending on what is stored, so use sparingly.
203    fn parse_model_extension<Extension: SafeModelExtension>(
204        &self,
205    ) -> Result<Extension::Value, crate::database::DatabaseError>
206    where
207        Extension::Value: Serialize + DeserializeOwned,
208    {
209        let data = self.get_extension_data();
210
211        for (name, value) in data.iter() {
212            if name.as_str() == Extension::name() {
213                let deserialized =
214                    rmp_serde::from_slice::<Extension::Value>(value).map_err(anyhow::Error::new)?;
215
216                return Ok(deserialized);
217            }
218        }
219
220        Err(crate::database::DatabaseError::Any(anyhow::anyhow!(
221            "model extension not found"
222        )))
223    }
224
225    fn base_columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString>;
226    fn columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
227        if let Ok(extensions) = Self::get_extension_list().read() {
228            let mut columns = Self::base_columns(prefix);
229
230            for extension in extensions.iter() {
231                columns.extend(extension.extended_columns(prefix.unwrap_or_default()));
232            }
233
234            columns
235        } else {
236            Self::base_columns(prefix)
237        }
238    }
239
240    #[inline]
241    fn columns_sql(prefix: Option<&str>) -> compact_str::CompactString {
242        Self::columns(prefix)
243            .iter()
244            .map(|(key, value)| compact_str::format_compact!("{key} as {value}"))
245            .join_compact(", ")
246    }
247
248    fn map_extensions(
249        prefix: &str,
250        row: &PgRow,
251    ) -> Result<ModelExtensionData, crate::database::DatabaseError> {
252        let mut data = Vec::new();
253
254        if let Ok(extensions) = Self::get_extension_list().read() {
255            for extension in extensions.iter() {
256                let value = extension.map_extended(prefix, row)?;
257                let serialized = rmp_serde::to_vec(&value).map_err(anyhow::Error::new)?;
258
259                data.push((
260                    compact_str::CompactString::const_new(extension.extension_name()),
261                    serialized,
262                ));
263            }
264        }
265
266        Ok(data)
267    }
268
269    fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError>;
270}
271
272#[async_trait::async_trait]
273pub trait EventEmittingModel: BaseModel {
274    type Event: Send + Sync + 'static;
275
276    fn get_event_emitter() -> &'static crate::events::EventEmitter<Self::Event>;
277
278    async fn register_event_handler<
279        F: Fn(crate::State, Arc<Self::Event>) -> Fut + Send + Sync + 'static,
280        Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
281    >(
282        listener: F,
283    ) -> crate::events::EventHandlerHandle {
284        Self::get_event_emitter()
285            .register_event_handler(listener)
286            .await
287    }
288
289    /// # Warning
290    /// This method will block the current thread if the lock is not available
291    fn blocking_register_event_handler<
292        F: Fn(crate::State, Arc<Self::Event>) -> Fut + Send + Sync + 'static,
293        Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
294    >(
295        listener: F,
296    ) -> crate::events::EventHandlerHandle {
297        Self::get_event_emitter().blocking_register_event_handler(listener)
298    }
299}
300
301type CreateHandlerResult<'a> =
302    Pin<Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>>;
303type CreateHandler<M> = dyn for<'a> Fn(
304        &'a mut <M as CreatableModel>::CreateOptions<'_>,
305        &'a mut InsertQueryBuilder,
306        &'a crate::State,
307        &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
308    ) -> CreateHandlerResult<'a>
309    + Send
310    + Sync;
311type CreateAfterHandler<M> = dyn for<'a> Fn(
312        &'a mut <M as CreatableModel>::CreateResult,
313        &'a <M as CreatableModel>::CreateOptions<'_>,
314        &'a crate::State,
315        &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
316    ) -> CreateHandlerResult<'a>
317    + Send
318    + Sync;
319pub type CreateListenerList<M> =
320    Arc<ModelHandlerList<Box<CreateHandler<M>>, Box<CreateAfterHandler<M>>>>;
321
322#[async_trait::async_trait]
323pub trait CreatableModel: BaseModel + Send + Sync + 'static {
324    type CreateOptions<'a>: Send + Sync + Validate;
325    type CreateResult: Send;
326
327    fn get_create_handlers() -> &'static LazyLock<CreateListenerList<Self>>;
328
329    async fn register_create_handler<
330        F: for<'a> Fn(
331                &'a mut Self::CreateOptions<'_>,
332                &'a mut InsertQueryBuilder,
333                &'a crate::State,
334                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
335            ) -> Pin<
336                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
337            > + Send
338            + Sync
339            + 'static,
340    >(
341        priority: ListenerPriority,
342        callback: F,
343    ) {
344        let erased = Box::new(callback) as Box<CreateHandler<Self>>;
345
346        Self::get_create_handlers()
347            .register_handler(priority, erased)
348            .await;
349    }
350
351    async fn register_after_create_handler<
352        F: for<'a> Fn(
353                &'a mut Self::CreateResult,
354                &'a Self::CreateOptions<'_>,
355                &'a crate::State,
356                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
357            ) -> Pin<
358                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
359            > + Send
360            + Sync
361            + 'static,
362    >(
363        priority: ListenerPriority,
364        callback: F,
365    ) {
366        let erased = Box::new(callback) as Box<CreateAfterHandler<Self>>;
367
368        Self::get_create_handlers()
369            .register_after_handler(priority, erased)
370            .await;
371    }
372
373    /// # Warning
374    /// This method will block the current thread if the lock is not available
375    fn blocking_register_create_handler<
376        F: for<'a> Fn(
377                &'a mut Self::CreateOptions<'_>,
378                &'a mut InsertQueryBuilder,
379                &'a crate::State,
380                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
381            ) -> Pin<
382                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
383            > + Send
384            + Sync
385            + 'static,
386    >(
387        priority: ListenerPriority,
388        callback: F,
389    ) {
390        let erased = Box::new(callback) as Box<CreateHandler<Self>>;
391
392        Self::get_create_handlers().blocking_register_handler(priority, erased);
393    }
394
395    /// # Warning
396    /// This method will block the current thread if the lock is not available
397    fn blocking_register_after_create_handler<
398        F: for<'a> Fn(
399                &'a mut Self::CreateResult,
400                &'a Self::CreateOptions<'_>,
401                &'a crate::State,
402                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
403            ) -> Pin<
404                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
405            > + Send
406            + Sync
407            + 'static,
408    >(
409        priority: ListenerPriority,
410        callback: F,
411    ) {
412        let erased = Box::new(callback) as Box<CreateAfterHandler<Self>>;
413
414        Self::get_create_handlers().blocking_register_after_handler(priority, erased);
415    }
416
417    async fn run_create_handlers(
418        options: &mut Self::CreateOptions<'_>,
419        query_builder: &mut InsertQueryBuilder,
420        state: &crate::State,
421        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
422    ) -> Result<(), crate::database::DatabaseError> {
423        let listeners = Self::get_create_handlers().before_handlers.read().await;
424
425        for listener in listeners.iter() {
426            (*listener.callback)(options, query_builder, state, transaction).await?;
427        }
428
429        Ok(())
430    }
431
432    async fn run_after_create_handlers(
433        result: &mut Self::CreateResult,
434        options: &Self::CreateOptions<'_>,
435        state: &crate::State,
436        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
437    ) -> Result<(), crate::database::DatabaseError> {
438        let listeners = Self::get_create_handlers().after_handlers.read().await;
439
440        for listener in listeners.iter() {
441            (*listener.callback)(result, options, state, transaction).await?;
442        }
443
444        Ok(())
445    }
446
447    async fn create_with_transaction(
448        state: &crate::State,
449        options: Self::CreateOptions<'_>,
450        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
451    ) -> Result<Self::CreateResult, crate::database::DatabaseError>;
452
453    async fn create(
454        state: &crate::State,
455        options: Self::CreateOptions<'_>,
456    ) -> Result<Self::CreateResult, crate::database::DatabaseError> {
457        let mut transaction = state.database.write().begin().await?;
458
459        let result = Self::create_with_transaction(state, options, &mut transaction).await?;
460
461        transaction.commit().await?;
462
463        Ok(result)
464    }
465}
466
467type UpdateHandlerResult<'a> =
468    Pin<Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>>;
469type UpdateHandler<M> = dyn for<'a> Fn(
470        &'a mut M,
471        &'a mut <M as UpdatableModel>::UpdateOptions,
472        &'a mut UpdateQueryBuilder,
473        &'a crate::State,
474        &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
475    ) -> UpdateHandlerResult<'a>
476    + Send
477    + Sync;
478type UpdateAfterHandler<M> = dyn for<'a> Fn(
479        &'a mut M,
480        &'a crate::State,
481        &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
482    ) -> UpdateHandlerResult<'a>
483    + Send
484    + Sync;
485pub type UpdateHandlerList<M> =
486    Arc<ModelHandlerList<Box<UpdateHandler<M>>, Box<UpdateAfterHandler<M>>>>;
487
488#[async_trait::async_trait]
489pub trait UpdatableModel: BaseModel + Send + Sync + 'static {
490    type UpdateOptions: Send + Sync + Default + ToSchema + DeserializeOwned + Serialize + Validate;
491
492    fn get_update_handlers() -> &'static LazyLock<UpdateHandlerList<Self>>;
493
494    async fn register_update_handler<
495        F: for<'a> Fn(
496                &'a mut Self,
497                &'a mut Self::UpdateOptions,
498                &'a mut UpdateQueryBuilder,
499                &'a crate::State,
500                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
501            ) -> Pin<
502                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
503            > + Send
504            + Sync
505            + 'static,
506    >(
507        priority: ListenerPriority,
508        callback: F,
509    ) {
510        let erased = Box::new(callback) as Box<UpdateHandler<Self>>;
511
512        Self::get_update_handlers()
513            .register_handler(priority, erased)
514            .await;
515    }
516
517    async fn register_after_update_handler<
518        F: for<'a> Fn(
519                &'a mut Self,
520                &'a crate::State,
521                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
522            ) -> Pin<
523                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
524            > + Send
525            + Sync
526            + 'static,
527    >(
528        priority: ListenerPriority,
529        callback: F,
530    ) {
531        let erased = Box::new(callback) as Box<UpdateAfterHandler<Self>>;
532
533        Self::get_update_handlers()
534            .register_after_handler(priority, erased)
535            .await;
536    }
537
538    /// # Warning
539    /// This method will block the current thread if the lock is not available
540    fn blocking_register_update_handler<
541        F: for<'a> Fn(
542                &'a mut Self,
543                &'a mut Self::UpdateOptions,
544                &'a mut UpdateQueryBuilder,
545                &'a crate::State,
546                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
547            ) -> Pin<
548                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
549            > + Send
550            + Sync
551            + 'static,
552    >(
553        priority: ListenerPriority,
554        callback: F,
555    ) {
556        let erased = Box::new(callback) as Box<UpdateHandler<Self>>;
557
558        Self::get_update_handlers().blocking_register_handler(priority, erased);
559    }
560
561    /// # Warning
562    /// This method will block the current thread if the lock is not available
563    fn blocking_register_after_update_handler<
564        F: for<'a> Fn(
565                &'a mut Self,
566                &'a crate::State,
567                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
568            ) -> Pin<
569                Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
570            > + Send
571            + Sync
572            + 'static,
573    >(
574        priority: ListenerPriority,
575        callback: F,
576    ) {
577        let erased = Box::new(callback) as Box<UpdateAfterHandler<Self>>;
578
579        Self::get_update_handlers().blocking_register_after_handler(priority, erased);
580    }
581
582    async fn run_update_handlers(
583        &mut self,
584        options: &mut Self::UpdateOptions,
585        query_builder: &mut UpdateQueryBuilder,
586        state: &crate::State,
587        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
588    ) -> Result<(), crate::database::DatabaseError> {
589        let listeners = Self::get_update_handlers().before_handlers.read().await;
590
591        for listener in listeners.iter() {
592            (*listener.callback)(self, options, query_builder, state, transaction).await?;
593        }
594
595        Ok(())
596    }
597
598    async fn run_after_update_handlers(
599        &mut self,
600        state: &crate::State,
601        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
602    ) -> Result<(), crate::database::DatabaseError> {
603        let listeners = Self::get_update_handlers().after_handlers.read().await;
604
605        for listener in listeners.iter() {
606            (*listener.callback)(self, state, transaction).await?;
607        }
608
609        Ok(())
610    }
611
612    async fn update_with_transaction(
613        &mut self,
614        state: &crate::State,
615        options: Self::UpdateOptions,
616        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
617    ) -> Result<(), crate::database::DatabaseError>;
618
619    async fn update(
620        &mut self,
621        state: &crate::State,
622        options: Self::UpdateOptions,
623    ) -> Result<(), crate::database::DatabaseError> {
624        let mut transaction = state.database.write().begin().await?;
625
626        self.update_with_transaction(state, options, &mut transaction)
627            .await?;
628
629        transaction.commit().await?;
630
631        Ok(())
632    }
633}
634
635type DeleteHandlerResult<'a> = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>;
636type DeleteHandler<M> = dyn for<'a> Fn(
637        &'a M,
638        &'a <M as DeletableModel>::DeleteOptions,
639        &'a crate::State,
640        &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
641    ) -> DeleteHandlerResult<'a>
642    + Send
643    + Sync;
644type DeleteAfterHandler<M> = dyn for<'a> Fn(
645        &'a M,
646        &'a <M as DeletableModel>::DeleteOptions,
647        &'a crate::State,
648        &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
649    ) -> DeleteHandlerResult<'a>
650    + Send
651    + Sync;
652pub type DeleteHandlerList<M> =
653    Arc<ModelHandlerList<Box<DeleteHandler<M>>, Box<DeleteAfterHandler<M>>>>;
654
655#[async_trait::async_trait]
656pub trait DeletableModel: BaseModel + Send + Sync + 'static {
657    type DeleteOptions: Send + Sync + Default + Clone;
658
659    fn get_delete_handlers() -> &'static LazyLock<DeleteHandlerList<Self>>;
660
661    async fn register_delete_handler<
662        F: for<'a> Fn(
663                &'a Self,
664                &'a Self::DeleteOptions,
665                &'a crate::State,
666                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
667            )
668                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
669            + Send
670            + Sync
671            + 'static,
672    >(
673        priority: ListenerPriority,
674        callback: F,
675    ) {
676        let erased = Box::new(callback) as Box<DeleteHandler<Self>>;
677
678        Self::get_delete_handlers()
679            .register_handler(priority, erased)
680            .await;
681    }
682
683    async fn register_after_delete_handler<
684        F: for<'a> Fn(
685                &'a Self,
686                &'a Self::DeleteOptions,
687                &'a crate::State,
688                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
689            )
690                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
691            + Send
692            + Sync
693            + 'static,
694    >(
695        priority: ListenerPriority,
696        callback: F,
697    ) {
698        let erased = Box::new(callback) as Box<DeleteAfterHandler<Self>>;
699
700        Self::get_delete_handlers()
701            .register_after_handler(priority, erased)
702            .await;
703    }
704
705    /// # Warning
706    /// This method will block the current thread if the lock is not available
707    fn blocking_register_delete_handler<
708        F: for<'a> Fn(
709                &'a Self,
710                &'a Self::DeleteOptions,
711                &'a crate::State,
712                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
713            )
714                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
715            + Send
716            + Sync
717            + 'static,
718    >(
719        priority: ListenerPriority,
720        callback: F,
721    ) {
722        let erased = Box::new(callback) as Box<DeleteHandler<Self>>;
723
724        Self::get_delete_handlers().blocking_register_handler(priority, erased);
725    }
726
727    /// # Warning
728    /// This method will block the current thread if the lock is not available
729    fn blocking_register_after_delete_handler<
730        F: for<'a> Fn(
731                &'a Self,
732                &'a Self::DeleteOptions,
733                &'a crate::State,
734                &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
735            )
736                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
737            + Send
738            + Sync
739            + 'static,
740    >(
741        priority: ListenerPriority,
742        callback: F,
743    ) {
744        let erased = Box::new(callback) as Box<DeleteAfterHandler<Self>>;
745
746        Self::get_delete_handlers().blocking_register_after_handler(priority, erased);
747    }
748
749    async fn run_delete_handlers(
750        &self,
751        options: &Self::DeleteOptions,
752        state: &crate::State,
753        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
754    ) -> Result<(), anyhow::Error> {
755        let listeners = Self::get_delete_handlers().before_handlers.read().await;
756
757        for listener in listeners.iter() {
758            (*listener.callback)(self, options, state, transaction).await?;
759        }
760
761        Ok(())
762    }
763
764    async fn run_after_delete_handlers(
765        &self,
766        options: &Self::DeleteOptions,
767        state: &crate::State,
768        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
769    ) -> Result<(), anyhow::Error> {
770        let listeners = Self::get_delete_handlers().after_handlers.read().await;
771
772        for listener in listeners.iter() {
773            (*listener.callback)(self, options, state, transaction).await?;
774        }
775
776        Ok(())
777    }
778
779    async fn delete_with_transaction(
780        &self,
781        state: &crate::State,
782        options: Self::DeleteOptions,
783        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
784    ) -> Result<(), anyhow::Error>;
785
786    async fn delete(
787        &self,
788        state: &crate::State,
789        options: Self::DeleteOptions,
790    ) -> Result<(), anyhow::Error> {
791        let mut transaction = state.database.write().begin().await?;
792
793        self.delete_with_transaction(state, options, &mut transaction)
794            .await?;
795
796        transaction.commit().await?;
797
798        Ok(())
799    }
800}
801
802#[async_trait::async_trait]
803pub trait ByUuid: BaseModel {
804    async fn by_uuid(
805        database: &crate::database::Database,
806        uuid: uuid::Uuid,
807    ) -> Result<Self, DatabaseError>;
808
809    async fn by_uuid_with_transaction(
810        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
811        uuid: uuid::Uuid,
812    ) -> Result<Self, DatabaseError>;
813
814    async fn by_uuid_cached(
815        database: &crate::database::Database,
816        uuid: uuid::Uuid,
817    ) -> Result<Self, anyhow::Error> {
818        database
819            .cache
820            .cached(&format!("{}::{uuid}", Self::NAME), 10, || {
821                Self::by_uuid(database, uuid)
822            })
823            .await
824    }
825
826    async fn by_uuid_optional(
827        database: &crate::database::Database,
828        uuid: uuid::Uuid,
829    ) -> Result<Option<Self>, DatabaseError> {
830        match Self::by_uuid(database, uuid).await {
831            Ok(res) => Ok(Some(res)),
832            Err(DatabaseError::Sqlx(sqlx::Error::RowNotFound)) => Ok(None),
833            Err(err) => Err(err),
834        }
835    }
836
837    async fn by_uuid_optional_with_transaction(
838        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
839        uuid: uuid::Uuid,
840    ) -> Result<Option<Self>, DatabaseError> {
841        match Self::by_uuid_with_transaction(transaction, uuid).await {
842            Ok(res) => Ok(Some(res)),
843            Err(DatabaseError::Sqlx(sqlx::Error::RowNotFound)) => Ok(None),
844            Err(err) => Err(err),
845        }
846    }
847
848    async fn by_uuid_optional_cached(
849        database: &crate::database::Database,
850        uuid: uuid::Uuid,
851    ) -> Result<Option<Self>, anyhow::Error> {
852        match Self::by_uuid_cached(database, uuid).await {
853            Ok(res) => Ok(Some(res)),
854            Err(err) => {
855                if let Some(DatabaseError::Sqlx(sqlx::Error::RowNotFound)) =
856                    err.downcast_ref::<DatabaseError>()
857                {
858                    Ok(None)
859                } else {
860                    Err(err)
861                }
862            }
863        }
864    }
865
866    #[inline]
867    fn get_fetchable(uuid: uuid::Uuid) -> Fetchable<Self> {
868        Fetchable {
869            uuid,
870            _model: PhantomData,
871        }
872    }
873
874    #[inline]
875    fn get_fetchable_from_row(row: &PgRow, column: impl AsRef<str>) -> Option<Fetchable<Self>> {
876        match row.try_get(column.as_ref()) {
877            Ok(uuid) => Some(Fetchable {
878                uuid,
879                _model: PhantomData,
880            }),
881            Err(_) => None,
882        }
883    }
884}
885
886#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
887pub enum ListenerPriority {
888    Highest,
889    High,
890    #[default]
891    Normal,
892    Low,
893    Lowest,
894}
895
896impl ListenerPriority {
897    #[inline]
898    fn rank(self) -> u8 {
899        match self {
900            Self::Highest => 5,
901            Self::High => 4,
902            Self::Normal => 3,
903            Self::Low => 2,
904            Self::Lowest => 1,
905        }
906    }
907}
908
909impl PartialOrd for ListenerPriority {
910    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
911        Some(self.cmp(other))
912    }
913}
914
915impl Ord for ListenerPriority {
916    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
917        let self_rank = self.rank();
918        let other_rank = other.rank();
919
920        other_rank.cmp(&self_rank)
921    }
922}
923
924#[async_trait::async_trait]
925impl<F: Send + Sync, AfterF: Send + Sync> crate::events::DisconnectEventHandler
926    for ModelHandlerList<F, AfterF>
927{
928    #[inline]
929    async fn disconnect(&self, id: uuid::Uuid) {
930        self.before_handlers.write().await.retain(|l| l.uuid != id);
931        self.after_handlers.write().await.retain(|l| l.uuid != id);
932    }
933
934    #[inline]
935    fn blocking_disconnect(&self, id: uuid::Uuid) {
936        self.before_handlers
937            .blocking_write()
938            .retain(|l| l.uuid != id);
939        self.after_handlers
940            .blocking_write()
941            .retain(|l| l.uuid != id);
942    }
943}
944
945pub struct ModelHandlerList<F: Send + Sync + 'static, AfterF: Send + Sync + 'static> {
946    before_handlers: RwLock<Vec<ModelHandler<F>>>,
947    after_handlers: RwLock<Vec<ModelHandler<AfterF>>>,
948}
949
950impl<F: Send + Sync + 'static, AfterF: Send + Sync + 'static> Default
951    for ModelHandlerList<F, AfterF>
952{
953    fn default() -> Self {
954        Self {
955            before_handlers: RwLock::new(Vec::new()),
956            after_handlers: RwLock::new(Vec::new()),
957        }
958    }
959}
960
961impl<F: Send + Sync + 'static, AfterF: Send + Sync + 'static> ModelHandlerList<F, AfterF> {
962    pub async fn register_handler(
963        self: &Arc<Self>,
964        priority: ListenerPriority,
965        callback: F,
966    ) -> ModelHandlerHandle {
967        let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
968
969        let mut self_listeners = self.before_handlers.write().await;
970        self_listeners.push(listener);
971        self_listeners.sort_by_key(|a| a.priority);
972
973        aborter
974    }
975
976    pub async fn register_after_handler(
977        self: &Arc<Self>,
978        priority: ListenerPriority,
979        callback: AfterF,
980    ) -> ModelHandlerHandle {
981        let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
982
983        let mut self_listeners = self.after_handlers.write().await;
984        self_listeners.push(listener);
985        self_listeners.sort_by_key(|a| a.priority);
986
987        aborter
988    }
989
990    /// # Warning
991    /// This method will block the current thread if the lock is not available
992    pub fn blocking_register_handler(
993        self: &Arc<Self>,
994        priority: ListenerPriority,
995        callback: F,
996    ) -> ModelHandlerHandle {
997        let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
998
999        let mut self_listeners = self.before_handlers.blocking_write();
1000        self_listeners.push(listener);
1001        self_listeners.sort_by_key(|a| a.priority);
1002
1003        aborter
1004    }
1005
1006    /// # Warning
1007    /// This method will block the current thread if the lock is not available
1008    pub fn blocking_register_after_handler(
1009        self: &Arc<Self>,
1010        priority: ListenerPriority,
1011        callback: AfterF,
1012    ) -> ModelHandlerHandle {
1013        let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
1014
1015        let mut self_listeners = self.after_handlers.blocking_write();
1016        self_listeners.push(listener);
1017        self_listeners.sort_by_key(|a| a.priority);
1018
1019        aborter
1020    }
1021}
1022
1023pub struct ModelHandler<F: Send + Sync + 'static> {
1024    uuid: uuid::Uuid,
1025    priority: ListenerPriority,
1026
1027    pub callback: F,
1028}
1029
1030impl<F: Send + Sync + 'static> ModelHandler<F> {
1031    pub(crate) fn new(
1032        callback: F,
1033        priority: ListenerPriority,
1034        list: Arc<dyn crate::events::DisconnectEventHandler + Send + Sync>,
1035    ) -> (Self, ModelHandlerHandle) {
1036        let handler = Self {
1037            uuid: uuid::Uuid::new_v4(),
1038            priority,
1039            callback,
1040        };
1041        let handle = ModelHandlerHandle {
1042            list_ref: list,
1043            id: handler.uuid,
1044        };
1045        (handler, handle)
1046    }
1047}
1048
1049pub struct ModelHandlerHandle {
1050    list_ref: Arc<dyn crate::events::DisconnectEventHandler + Send + Sync>,
1051    id: uuid::Uuid,
1052}
1053
1054impl ModelHandlerHandle {
1055    pub async fn disconnect(&self) {
1056        self.list_ref.disconnect(self.id).await;
1057    }
1058
1059    /// # Warning
1060    /// This method will block the current thread if the lists' lock is not available
1061    pub fn blocking_disconnect(&self) {
1062        self.list_ref.blocking_disconnect(self.id);
1063    }
1064}
1065
1066#[derive(Serialize, Deserialize, Clone, Copy)]
1067pub struct Fetchable<M: ByUuid> {
1068    pub uuid: uuid::Uuid,
1069    #[serde(skip)]
1070    _model: PhantomData<M>,
1071}
1072
1073impl<M: ByUuid + Send> Fetchable<M> {
1074    #[inline]
1075    pub async fn fetch(&self, database: &crate::database::Database) -> Result<M, DatabaseError> {
1076        M::by_uuid(database, self.uuid).await
1077    }
1078
1079    #[inline]
1080    pub async fn fetch_cached(
1081        &self,
1082        database: &crate::database::Database,
1083    ) -> Result<M, anyhow::Error> {
1084        M::by_uuid_cached(database, self.uuid).await
1085    }
1086
1087    #[inline]
1088    pub async fn fetch_optional(
1089        &self,
1090        database: &crate::database::Database,
1091    ) -> Result<Option<M>, DatabaseError> {
1092        M::by_uuid_optional(database, self.uuid).await
1093    }
1094
1095    #[inline]
1096    pub async fn fetch_optional_cached(
1097        &self,
1098        database: &crate::database::Database,
1099    ) -> Result<Option<M>, anyhow::Error> {
1100        M::by_uuid_optional_cached(database, self.uuid).await
1101    }
1102}
1103
1104pub struct InsertQueryBuilder<'a> {
1105    table: &'a str,
1106    columns: Vec<&'a str>,
1107    expressions: Vec<String>,
1108    arguments: PgArguments,
1109    returning_clause: Option<&'a str>,
1110}
1111
1112impl<'a> InsertQueryBuilder<'a> {
1113    pub fn new(table: &'a str) -> Self {
1114        Self {
1115            table,
1116            columns: Vec::new(),
1117            expressions: Vec::new(),
1118            arguments: PgArguments::default(),
1119            returning_clause: None,
1120        }
1121    }
1122
1123    pub fn set<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1124        &mut self,
1125        column: &'a str,
1126        value: T,
1127    ) -> &mut Self {
1128        if self.columns.contains(&column) {
1129            return self;
1130        }
1131
1132        if self.arguments.add(value).is_ok() {
1133            self.columns.push(column);
1134            let idx = self.arguments.len();
1135            self.expressions.push(format!("${}", idx));
1136        }
1137
1138        self
1139    }
1140
1141    pub fn set_expr<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1142        &mut self,
1143        column: &'a str,
1144        expression: &str,
1145        values: Vec<T>,
1146    ) -> &mut Self {
1147        if self.columns.contains(&column) {
1148            return self;
1149        }
1150
1151        let start_len = self.arguments.len();
1152
1153        for value in values {
1154            if self.arguments.add(value).is_err() {
1155                return self;
1156            }
1157        }
1158
1159        let mut expr = expression.to_string();
1160        let added_count = self.arguments.len() - start_len;
1161
1162        for i in (1..=added_count).rev() {
1163            let global_idx = start_len + i;
1164            expr = expr.replace(&format!("${}", i), &format!("${}", global_idx));
1165        }
1166
1167        self.columns.push(column);
1168        self.expressions.push(expr);
1169
1170        self
1171    }
1172
1173    pub fn returning(mut self, clause: &'a str) -> Self {
1174        self.returning_clause = Some(clause);
1175        self
1176    }
1177
1178    fn build_sql(&self) -> String {
1179        let columns_sql = self.columns.join(", ");
1180        let values_sql = self.expressions.join(", ");
1181
1182        let mut sql = format!(
1183            "INSERT INTO {} ({}) VALUES ({})",
1184            self.table, columns_sql, values_sql
1185        );
1186
1187        if let Some(clause) = self.returning_clause {
1188            sql.push_str(" RETURNING ");
1189            sql.push_str(clause);
1190        }
1191
1192        sql
1193    }
1194
1195    pub async fn execute(
1196        self,
1197        executor: impl sqlx::Executor<'a, Database = Postgres>,
1198    ) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error> {
1199        let sql = self.build_sql();
1200        sqlx::query_with(&sql, self.arguments)
1201            .execute(executor)
1202            .await
1203    }
1204
1205    pub async fn fetch_one(
1206        self,
1207        executor: impl sqlx::Executor<'a, Database = Postgres>,
1208    ) -> Result<sqlx::postgres::PgRow, sqlx::Error> {
1209        let sql = self.build_sql();
1210        sqlx::query_with(&sql, self.arguments)
1211            .fetch_one(executor)
1212            .await
1213    }
1214}
1215
1216pub struct UpdateQueryBuilder<'a> {
1217    builder: QueryBuilder<'a, Postgres>,
1218    updated_fields: HashSet<&'a str>,
1219    has_set_fields: bool,
1220    has_where: bool,
1221}
1222
1223impl<'a> UpdateQueryBuilder<'a> {
1224    pub fn new(table: &'a str) -> Self {
1225        let mut builder = QueryBuilder::new("UPDATE ");
1226        builder.push(table);
1227        builder.push(" SET ");
1228
1229        Self {
1230            builder,
1231            updated_fields: HashSet::new(),
1232            has_set_fields: false,
1233            has_where: false,
1234        }
1235    }
1236
1237    /// Adds a field to be updated, if `None`, will not add the field
1238    /// To set a field to null (`None`), you need a `Some(None)`
1239    pub fn set<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1240        &mut self,
1241        column: &'a str,
1242        value: Option<T>,
1243    ) -> &mut Self {
1244        let Some(value) = value else {
1245            return self;
1246        };
1247
1248        if !self.updated_fields.insert(column) {
1249            return self;
1250        }
1251
1252        if self.has_set_fields {
1253            self.builder.push(", ");
1254        }
1255
1256        self.builder.push(column);
1257        self.builder.push(" = ");
1258        self.builder.push_bind(value);
1259
1260        self.has_set_fields = true;
1261        self
1262    }
1263
1264    pub fn where_eq<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1265        &mut self,
1266        column: &'a str,
1267        value: T,
1268    ) -> &mut Self {
1269        if self.has_where {
1270            self.builder.push(" AND ");
1271        } else {
1272            self.builder.push(" WHERE ");
1273            self.has_where = true;
1274        }
1275
1276        self.builder.push(column);
1277        self.builder.push(" = ");
1278        self.builder.push_bind(value);
1279        self
1280    }
1281
1282    pub async fn execute(
1283        mut self,
1284        executor: impl sqlx::Executor<'a, Database = Postgres>,
1285    ) -> Result<sqlx::any::AnyQueryResult, sqlx::Error> {
1286        if !self.has_set_fields {
1287            return Ok(sqlx::any::AnyQueryResult::default());
1288        }
1289
1290        let query = self.builder.build();
1291        query.execute(executor).await.map(|r| r.into())
1292    }
1293}
1294
1295/// SQLx helper type to preserve order of keys when encoding JSON. By default, SQLx encodes JSON using `serde_json::Value`, which does not preserve order of keys. This type allows you to encode any serializable type as JSON while preserving the order of keys.
1296pub struct OrderedJson<T>(pub T);
1297
1298impl<T: Serialize> sqlx::Encode<'_, sqlx::Postgres> for OrderedJson<T> {
1299    fn encode_by_ref(&self, buf: &mut PgArgumentBuffer) -> Result<IsNull, BoxDynError> {
1300        serde_json::to_writer(&mut **buf, &self.0)?;
1301        Ok(IsNull::No)
1302    }
1303}
1304
1305impl<T> sqlx::Type<sqlx::Postgres> for OrderedJson<T> {
1306    fn type_info() -> PgTypeInfo {
1307        // JSON, not JSONB, to preserve order of keys
1308        PgTypeInfo::with_oid(sqlx::postgres::types::Oid(114))
1309    }
1310}
1311
1312#[async_trait::async_trait]
1313pub trait IntoApiObject {
1314    type ApiObject: Send;
1315    type ExtraArgs<'a>: Send;
1316
1317    async fn into_api_object<'a>(
1318        self,
1319        state: &crate::State,
1320        args: Self::ExtraArgs<'a>,
1321    ) -> Result<Self::ApiObject, DatabaseError>;
1322}
1323
1324#[async_trait::async_trait]
1325pub trait IntoAdminApiObject {
1326    type AdminApiObject: Send;
1327    type ExtraArgs<'a>: Send;
1328
1329    async fn into_admin_api_object<'a>(
1330        self,
1331        state: &crate::State,
1332        args: Self::ExtraArgs<'a>,
1333    ) -> Result<Self::AdminApiObject, DatabaseError>;
1334}