1use crate::database::DatabaseError;
2use compact_str::CompactStringExt;
3use futures_util::{StreamExt, TryStreamExt};
4use garde::Validate;
5use serde::{Deserialize, Serialize, de::DeserializeOwned};
6use sqlx::{
7 Arguments, Postgres, QueryBuilder, Row,
8 encode::IsNull,
9 error::BoxDynError,
10 postgres::{PgArgumentBuffer, PgArguments, PgRow, PgTypeInfo},
11};
12use std::{
13 collections::{BTreeMap, HashSet},
14 marker::PhantomData,
15 pin::Pin,
16 sync::{Arc, LazyLock},
17};
18use tokio::sync::RwLock;
19use utoipa::ToSchema;
20
21pub mod admin_activity;
22pub mod announcement;
23pub mod backup_configuration;
24pub mod database_host;
25pub mod egg_configuration;
26pub mod egg_repository;
27pub mod egg_repository_egg;
28pub mod location;
29pub mod location_database_host;
30pub mod mount;
31pub mod nest;
32pub mod nest_egg;
33pub mod nest_egg_mount;
34pub mod nest_egg_variable;
35pub mod node;
36pub mod node_allocation;
37pub mod node_mount;
38pub mod oauth_provider;
39pub mod role;
40pub mod server;
41pub mod server_activity;
42pub mod server_allocation;
43pub mod server_backup;
44pub mod server_database;
45pub mod server_mount;
46pub mod server_schedule;
47pub mod server_schedule_step;
48pub mod server_subuser;
49pub mod server_variable;
50pub mod user;
51pub mod user_activity;
52pub mod user_api_key;
53pub mod user_command_snippet;
54pub mod user_oauth_link;
55pub mod user_password_reset;
56pub mod user_recovery_code;
57pub mod user_security_key;
58pub mod user_server_group;
59pub mod user_session;
60pub mod user_ssh_key;
61
62#[derive(ToSchema, Validate, Deserialize, Serialize)]
63pub struct PaginationParams {
64 #[garde(range(min = 1))]
65 #[schema(minimum = 1)]
66 #[serde(default = "Pagination::default_page")]
67 pub page: i64,
68 #[garde(range(min = 1, max = 100))]
69 #[schema(minimum = 1, maximum = 100)]
70 #[serde(default = "Pagination::default_per_page")]
71 pub per_page: i64,
72}
73
74#[derive(ToSchema, Validate, Deserialize, Serialize)]
75pub struct PaginationParamsWithSearch {
76 #[garde(range(min = 1))]
77 #[schema(minimum = 1)]
78 #[serde(default = "Pagination::default_page")]
79 pub page: i64,
80 #[garde(range(min = 1, max = 100))]
81 #[schema(minimum = 1, maximum = 100)]
82 #[serde(default = "Pagination::default_per_page")]
83 pub per_page: i64,
84 #[garde(length(chars, min = 1, max = 128))]
85 #[schema(min_length = 1, max_length = 128)]
86 #[serde(
87 default,
88 deserialize_with = "crate::deserialize::deserialize_string_option"
89 )]
90 pub search: Option<compact_str::CompactString>,
91}
92
93#[derive(ToSchema, Deserialize, Serialize)]
94pub struct Pagination<T: Serialize = serde_json::Value> {
95 pub total: i64,
96 pub per_page: i64,
97 pub page: i64,
98
99 pub data: Vec<T>,
100}
101
102impl Pagination {
103 #[inline]
104 pub const fn default_page() -> i64 {
105 1
106 }
107
108 #[inline]
109 pub const fn default_per_page() -> i64 {
110 25
111 }
112}
113
114impl<T: Serialize> Pagination<T> {
115 pub async fn async_map<R: serde::Serialize, Fut: Future<Output = R>>(
116 self,
117 mapper: impl Fn(T) -> Fut,
118 ) -> Pagination<R> {
119 let mut results = Vec::new();
120 results.reserve_exact(self.data.len());
121 let mut result_stream =
122 futures_util::stream::iter(self.data.into_iter().map(mapper)).buffered(25);
123
124 while let Some(result) = result_stream.next().await {
125 results.push(result);
126 }
127
128 Pagination {
129 total: self.total,
130 per_page: self.per_page,
131 page: self.page,
132 data: results,
133 }
134 }
135
136 pub async fn try_async_map<R: serde::Serialize, E, Fut: Future<Output = Result<R, E>>>(
137 self,
138 mapper: impl Fn(T) -> Fut,
139 ) -> Result<Pagination<R>, E> {
140 let mut results = Vec::new();
141 results.reserve_exact(self.data.len());
142 let mut result_stream =
143 futures_util::stream::iter(self.data.into_iter().map(mapper)).buffered(25);
144
145 while let Some(result) = result_stream.try_next().await? {
146 results.push(result);
147 }
148
149 Ok(Pagination {
150 total: self.total,
151 per_page: self.per_page,
152 page: self.page,
153 data: results,
154 })
155 }
156}
157
158pub type ModelExtensionList = std::sync::RwLock<Vec<Box<dyn ModelExtension + Send + Sync>>>;
159pub type ModelExtensionData = Vec<(compact_str::CompactString, Vec<u8>)>;
160pub type ModelExtensionMapType = Box<dyn erased_serde::Serialize>;
161
162pub trait ModelExtension {
163 fn extension_name(&self) -> &'static str;
164
165 fn extended_columns(&self, prefix: &str) -> BTreeMap<&'static str, compact_str::CompactString>;
166
167 fn map_extended(
168 &self,
169 prefix: &str,
170 row: &PgRow,
171 ) -> Result<ModelExtensionMapType, crate::database::DatabaseError>;
172}
173
174pub trait SafeModelExtension: ModelExtension {
175 type Value: Serialize + DeserializeOwned;
176
177 fn name() -> &'static str;
178}
179
180pub trait BaseModel: Serialize + DeserializeOwned {
181 const NAME: &'static str;
182
183 fn get_extension_list() -> &'static ModelExtensionList;
184 fn get_extension_data(&self) -> &ModelExtensionData;
185
186 fn register_model_extension(extension: impl ModelExtension + Send + Sync + 'static) {
188 let mut extensions = Self::get_extension_list().write().unwrap();
189
190 if extensions
191 .iter()
192 .any(|e| e.extension_name() == extension.extension_name())
193 {
194 return;
195 }
196
197 extensions.push(Box::new(extension));
198 }
199
200 fn parse_model_extension<Extension: SafeModelExtension>(
204 &self,
205 ) -> Result<Extension::Value, crate::database::DatabaseError>
206 where
207 Extension::Value: Serialize + DeserializeOwned,
208 {
209 let data = self.get_extension_data();
210
211 for (name, value) in data.iter() {
212 if name.as_str() == Extension::name() {
213 let deserialized =
214 rmp_serde::from_slice::<Extension::Value>(value).map_err(anyhow::Error::new)?;
215
216 return Ok(deserialized);
217 }
218 }
219
220 Err(crate::database::DatabaseError::Any(anyhow::anyhow!(
221 "model extension not found"
222 )))
223 }
224
225 fn base_columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString>;
226 fn columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
227 if let Ok(extensions) = Self::get_extension_list().read() {
228 let mut columns = Self::base_columns(prefix);
229
230 for extension in extensions.iter() {
231 columns.extend(extension.extended_columns(prefix.unwrap_or_default()));
232 }
233
234 columns
235 } else {
236 Self::base_columns(prefix)
237 }
238 }
239
240 #[inline]
241 fn columns_sql(prefix: Option<&str>) -> compact_str::CompactString {
242 Self::columns(prefix)
243 .iter()
244 .map(|(key, value)| compact_str::format_compact!("{key} as {value}"))
245 .join_compact(", ")
246 }
247
248 fn map_extensions(
249 prefix: &str,
250 row: &PgRow,
251 ) -> Result<ModelExtensionData, crate::database::DatabaseError> {
252 let mut data = Vec::new();
253
254 if let Ok(extensions) = Self::get_extension_list().read() {
255 for extension in extensions.iter() {
256 let value = extension.map_extended(prefix, row)?;
257 let serialized = rmp_serde::to_vec(&value).map_err(anyhow::Error::new)?;
258
259 data.push((
260 compact_str::CompactString::const_new(extension.extension_name()),
261 serialized,
262 ));
263 }
264 }
265
266 Ok(data)
267 }
268
269 fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError>;
270}
271
272#[async_trait::async_trait]
273pub trait EventEmittingModel: BaseModel {
274 type Event: Send + Sync + 'static;
275
276 fn get_event_emitter() -> &'static crate::events::EventEmitter<Self::Event>;
277
278 async fn register_event_handler<
279 F: Fn(crate::State, Arc<Self::Event>) -> Fut + Send + Sync + 'static,
280 Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
281 >(
282 listener: F,
283 ) -> crate::events::EventHandlerHandle {
284 Self::get_event_emitter()
285 .register_event_handler(listener)
286 .await
287 }
288
289 fn blocking_register_event_handler<
292 F: Fn(crate::State, Arc<Self::Event>) -> Fut + Send + Sync + 'static,
293 Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
294 >(
295 listener: F,
296 ) -> crate::events::EventHandlerHandle {
297 Self::get_event_emitter().blocking_register_event_handler(listener)
298 }
299}
300
301type CreateHandlerResult<'a> =
302 Pin<Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>>;
303type CreateHandler<M> = dyn for<'a> Fn(
304 &'a mut <M as CreatableModel>::CreateOptions<'_>,
305 &'a mut InsertQueryBuilder,
306 &'a crate::State,
307 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
308 ) -> CreateHandlerResult<'a>
309 + Send
310 + Sync;
311type CreateAfterHandler<M> = dyn for<'a> Fn(
312 &'a mut <M as CreatableModel>::CreateResult,
313 &'a <M as CreatableModel>::CreateOptions<'_>,
314 &'a crate::State,
315 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
316 ) -> CreateHandlerResult<'a>
317 + Send
318 + Sync;
319pub type CreateListenerList<M> =
320 Arc<ModelHandlerList<Box<CreateHandler<M>>, Box<CreateAfterHandler<M>>>>;
321
322#[async_trait::async_trait]
323pub trait CreatableModel: BaseModel + Send + Sync + 'static {
324 type CreateOptions<'a>: Send + Sync + Validate;
325 type CreateResult: Send;
326
327 fn get_create_handlers() -> &'static LazyLock<CreateListenerList<Self>>;
328
329 async fn register_create_handler<
330 F: for<'a> Fn(
331 &'a mut Self::CreateOptions<'_>,
332 &'a mut InsertQueryBuilder,
333 &'a crate::State,
334 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
335 ) -> Pin<
336 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
337 > + Send
338 + Sync
339 + 'static,
340 >(
341 priority: ListenerPriority,
342 callback: F,
343 ) {
344 let erased = Box::new(callback) as Box<CreateHandler<Self>>;
345
346 Self::get_create_handlers()
347 .register_handler(priority, erased)
348 .await;
349 }
350
351 async fn register_after_create_handler<
352 F: for<'a> Fn(
353 &'a mut Self::CreateResult,
354 &'a Self::CreateOptions<'_>,
355 &'a crate::State,
356 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
357 ) -> Pin<
358 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
359 > + Send
360 + Sync
361 + 'static,
362 >(
363 priority: ListenerPriority,
364 callback: F,
365 ) {
366 let erased = Box::new(callback) as Box<CreateAfterHandler<Self>>;
367
368 Self::get_create_handlers()
369 .register_after_handler(priority, erased)
370 .await;
371 }
372
373 fn blocking_register_create_handler<
376 F: for<'a> Fn(
377 &'a mut Self::CreateOptions<'_>,
378 &'a mut InsertQueryBuilder,
379 &'a crate::State,
380 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
381 ) -> Pin<
382 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
383 > + Send
384 + Sync
385 + 'static,
386 >(
387 priority: ListenerPriority,
388 callback: F,
389 ) {
390 let erased = Box::new(callback) as Box<CreateHandler<Self>>;
391
392 Self::get_create_handlers().blocking_register_handler(priority, erased);
393 }
394
395 fn blocking_register_after_create_handler<
398 F: for<'a> Fn(
399 &'a mut Self::CreateResult,
400 &'a Self::CreateOptions<'_>,
401 &'a crate::State,
402 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
403 ) -> Pin<
404 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
405 > + Send
406 + Sync
407 + 'static,
408 >(
409 priority: ListenerPriority,
410 callback: F,
411 ) {
412 let erased = Box::new(callback) as Box<CreateAfterHandler<Self>>;
413
414 Self::get_create_handlers().blocking_register_after_handler(priority, erased);
415 }
416
417 async fn run_create_handlers(
418 options: &mut Self::CreateOptions<'_>,
419 query_builder: &mut InsertQueryBuilder,
420 state: &crate::State,
421 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
422 ) -> Result<(), crate::database::DatabaseError> {
423 let listeners = Self::get_create_handlers().before_handlers.read().await;
424
425 for listener in listeners.iter() {
426 (*listener.callback)(options, query_builder, state, transaction).await?;
427 }
428
429 Ok(())
430 }
431
432 async fn run_after_create_handlers(
433 result: &mut Self::CreateResult,
434 options: &Self::CreateOptions<'_>,
435 state: &crate::State,
436 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
437 ) -> Result<(), crate::database::DatabaseError> {
438 let listeners = Self::get_create_handlers().after_handlers.read().await;
439
440 for listener in listeners.iter() {
441 (*listener.callback)(result, options, state, transaction).await?;
442 }
443
444 Ok(())
445 }
446
447 async fn create_with_transaction(
448 state: &crate::State,
449 options: Self::CreateOptions<'_>,
450 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
451 ) -> Result<Self::CreateResult, crate::database::DatabaseError>;
452
453 async fn create(
454 state: &crate::State,
455 options: Self::CreateOptions<'_>,
456 ) -> Result<Self::CreateResult, crate::database::DatabaseError> {
457 let mut transaction = state.database.write().begin().await?;
458
459 let result = Self::create_with_transaction(state, options, &mut transaction).await?;
460
461 transaction.commit().await?;
462
463 Ok(result)
464 }
465}
466
467type UpdateHandlerResult<'a> =
468 Pin<Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>>;
469type UpdateHandler<M> = dyn for<'a> Fn(
470 &'a mut M,
471 &'a mut <M as UpdatableModel>::UpdateOptions,
472 &'a mut UpdateQueryBuilder,
473 &'a crate::State,
474 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
475 ) -> UpdateHandlerResult<'a>
476 + Send
477 + Sync;
478type UpdateAfterHandler<M> = dyn for<'a> Fn(
479 &'a mut M,
480 &'a crate::State,
481 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
482 ) -> UpdateHandlerResult<'a>
483 + Send
484 + Sync;
485pub type UpdateHandlerList<M> =
486 Arc<ModelHandlerList<Box<UpdateHandler<M>>, Box<UpdateAfterHandler<M>>>>;
487
488#[async_trait::async_trait]
489pub trait UpdatableModel: BaseModel + Send + Sync + 'static {
490 type UpdateOptions: Send + Sync + Default + ToSchema + DeserializeOwned + Serialize + Validate;
491
492 fn get_update_handlers() -> &'static LazyLock<UpdateHandlerList<Self>>;
493
494 async fn register_update_handler<
495 F: for<'a> Fn(
496 &'a mut Self,
497 &'a mut Self::UpdateOptions,
498 &'a mut UpdateQueryBuilder,
499 &'a crate::State,
500 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
501 ) -> Pin<
502 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
503 > + Send
504 + Sync
505 + 'static,
506 >(
507 priority: ListenerPriority,
508 callback: F,
509 ) {
510 let erased = Box::new(callback) as Box<UpdateHandler<Self>>;
511
512 Self::get_update_handlers()
513 .register_handler(priority, erased)
514 .await;
515 }
516
517 async fn register_after_update_handler<
518 F: for<'a> Fn(
519 &'a mut Self,
520 &'a crate::State,
521 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
522 ) -> Pin<
523 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
524 > + Send
525 + Sync
526 + 'static,
527 >(
528 priority: ListenerPriority,
529 callback: F,
530 ) {
531 let erased = Box::new(callback) as Box<UpdateAfterHandler<Self>>;
532
533 Self::get_update_handlers()
534 .register_after_handler(priority, erased)
535 .await;
536 }
537
538 fn blocking_register_update_handler<
541 F: for<'a> Fn(
542 &'a mut Self,
543 &'a mut Self::UpdateOptions,
544 &'a mut UpdateQueryBuilder,
545 &'a crate::State,
546 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
547 ) -> Pin<
548 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
549 > + Send
550 + Sync
551 + 'static,
552 >(
553 priority: ListenerPriority,
554 callback: F,
555 ) {
556 let erased = Box::new(callback) as Box<UpdateHandler<Self>>;
557
558 Self::get_update_handlers().blocking_register_handler(priority, erased);
559 }
560
561 fn blocking_register_after_update_handler<
564 F: for<'a> Fn(
565 &'a mut Self,
566 &'a crate::State,
567 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
568 ) -> Pin<
569 Box<dyn Future<Output = Result<(), crate::database::DatabaseError>> + Send + 'a>,
570 > + Send
571 + Sync
572 + 'static,
573 >(
574 priority: ListenerPriority,
575 callback: F,
576 ) {
577 let erased = Box::new(callback) as Box<UpdateAfterHandler<Self>>;
578
579 Self::get_update_handlers().blocking_register_after_handler(priority, erased);
580 }
581
582 async fn run_update_handlers(
583 &mut self,
584 options: &mut Self::UpdateOptions,
585 query_builder: &mut UpdateQueryBuilder,
586 state: &crate::State,
587 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
588 ) -> Result<(), crate::database::DatabaseError> {
589 let listeners = Self::get_update_handlers().before_handlers.read().await;
590
591 for listener in listeners.iter() {
592 (*listener.callback)(self, options, query_builder, state, transaction).await?;
593 }
594
595 Ok(())
596 }
597
598 async fn run_after_update_handlers(
599 &mut self,
600 state: &crate::State,
601 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
602 ) -> Result<(), crate::database::DatabaseError> {
603 let listeners = Self::get_update_handlers().after_handlers.read().await;
604
605 for listener in listeners.iter() {
606 (*listener.callback)(self, state, transaction).await?;
607 }
608
609 Ok(())
610 }
611
612 async fn update_with_transaction(
613 &mut self,
614 state: &crate::State,
615 options: Self::UpdateOptions,
616 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
617 ) -> Result<(), crate::database::DatabaseError>;
618
619 async fn update(
620 &mut self,
621 state: &crate::State,
622 options: Self::UpdateOptions,
623 ) -> Result<(), crate::database::DatabaseError> {
624 let mut transaction = state.database.write().begin().await?;
625
626 self.update_with_transaction(state, options, &mut transaction)
627 .await?;
628
629 transaction.commit().await?;
630
631 Ok(())
632 }
633}
634
635type DeleteHandlerResult<'a> = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>;
636type DeleteHandler<M> = dyn for<'a> Fn(
637 &'a M,
638 &'a <M as DeletableModel>::DeleteOptions,
639 &'a crate::State,
640 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
641 ) -> DeleteHandlerResult<'a>
642 + Send
643 + Sync;
644type DeleteAfterHandler<M> = dyn for<'a> Fn(
645 &'a M,
646 &'a <M as DeletableModel>::DeleteOptions,
647 &'a crate::State,
648 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
649 ) -> DeleteHandlerResult<'a>
650 + Send
651 + Sync;
652pub type DeleteHandlerList<M> =
653 Arc<ModelHandlerList<Box<DeleteHandler<M>>, Box<DeleteAfterHandler<M>>>>;
654
655#[async_trait::async_trait]
656pub trait DeletableModel: BaseModel + Send + Sync + 'static {
657 type DeleteOptions: Send + Sync + Default + Clone;
658
659 fn get_delete_handlers() -> &'static LazyLock<DeleteHandlerList<Self>>;
660
661 async fn register_delete_handler<
662 F: for<'a> Fn(
663 &'a Self,
664 &'a Self::DeleteOptions,
665 &'a crate::State,
666 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
667 )
668 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
669 + Send
670 + Sync
671 + 'static,
672 >(
673 priority: ListenerPriority,
674 callback: F,
675 ) {
676 let erased = Box::new(callback) as Box<DeleteHandler<Self>>;
677
678 Self::get_delete_handlers()
679 .register_handler(priority, erased)
680 .await;
681 }
682
683 async fn register_after_delete_handler<
684 F: for<'a> Fn(
685 &'a Self,
686 &'a Self::DeleteOptions,
687 &'a crate::State,
688 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
689 )
690 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
691 + Send
692 + Sync
693 + 'static,
694 >(
695 priority: ListenerPriority,
696 callback: F,
697 ) {
698 let erased = Box::new(callback) as Box<DeleteAfterHandler<Self>>;
699
700 Self::get_delete_handlers()
701 .register_after_handler(priority, erased)
702 .await;
703 }
704
705 fn blocking_register_delete_handler<
708 F: for<'a> Fn(
709 &'a Self,
710 &'a Self::DeleteOptions,
711 &'a crate::State,
712 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
713 )
714 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
715 + Send
716 + Sync
717 + 'static,
718 >(
719 priority: ListenerPriority,
720 callback: F,
721 ) {
722 let erased = Box::new(callback) as Box<DeleteHandler<Self>>;
723
724 Self::get_delete_handlers().blocking_register_handler(priority, erased);
725 }
726
727 fn blocking_register_after_delete_handler<
730 F: for<'a> Fn(
731 &'a Self,
732 &'a Self::DeleteOptions,
733 &'a crate::State,
734 &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
735 )
736 -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>>
737 + Send
738 + Sync
739 + 'static,
740 >(
741 priority: ListenerPriority,
742 callback: F,
743 ) {
744 let erased = Box::new(callback) as Box<DeleteAfterHandler<Self>>;
745
746 Self::get_delete_handlers().blocking_register_after_handler(priority, erased);
747 }
748
749 async fn run_delete_handlers(
750 &self,
751 options: &Self::DeleteOptions,
752 state: &crate::State,
753 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
754 ) -> Result<(), anyhow::Error> {
755 let listeners = Self::get_delete_handlers().before_handlers.read().await;
756
757 for listener in listeners.iter() {
758 (*listener.callback)(self, options, state, transaction).await?;
759 }
760
761 Ok(())
762 }
763
764 async fn run_after_delete_handlers(
765 &self,
766 options: &Self::DeleteOptions,
767 state: &crate::State,
768 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
769 ) -> Result<(), anyhow::Error> {
770 let listeners = Self::get_delete_handlers().after_handlers.read().await;
771
772 for listener in listeners.iter() {
773 (*listener.callback)(self, options, state, transaction).await?;
774 }
775
776 Ok(())
777 }
778
779 async fn delete_with_transaction(
780 &self,
781 state: &crate::State,
782 options: Self::DeleteOptions,
783 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
784 ) -> Result<(), anyhow::Error>;
785
786 async fn delete(
787 &self,
788 state: &crate::State,
789 options: Self::DeleteOptions,
790 ) -> Result<(), anyhow::Error> {
791 let mut transaction = state.database.write().begin().await?;
792
793 self.delete_with_transaction(state, options, &mut transaction)
794 .await?;
795
796 transaction.commit().await?;
797
798 Ok(())
799 }
800}
801
802#[async_trait::async_trait]
803pub trait ByUuid: BaseModel {
804 async fn by_uuid(
805 database: &crate::database::Database,
806 uuid: uuid::Uuid,
807 ) -> Result<Self, DatabaseError>;
808
809 async fn by_uuid_with_transaction(
810 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
811 uuid: uuid::Uuid,
812 ) -> Result<Self, DatabaseError>;
813
814 async fn by_uuid_cached(
815 database: &crate::database::Database,
816 uuid: uuid::Uuid,
817 ) -> Result<Self, anyhow::Error> {
818 database
819 .cache
820 .cached(&format!("{}::{uuid}", Self::NAME), 10, || {
821 Self::by_uuid(database, uuid)
822 })
823 .await
824 }
825
826 async fn by_uuid_optional(
827 database: &crate::database::Database,
828 uuid: uuid::Uuid,
829 ) -> Result<Option<Self>, DatabaseError> {
830 match Self::by_uuid(database, uuid).await {
831 Ok(res) => Ok(Some(res)),
832 Err(DatabaseError::Sqlx(sqlx::Error::RowNotFound)) => Ok(None),
833 Err(err) => Err(err),
834 }
835 }
836
837 async fn by_uuid_optional_with_transaction(
838 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
839 uuid: uuid::Uuid,
840 ) -> Result<Option<Self>, DatabaseError> {
841 match Self::by_uuid_with_transaction(transaction, uuid).await {
842 Ok(res) => Ok(Some(res)),
843 Err(DatabaseError::Sqlx(sqlx::Error::RowNotFound)) => Ok(None),
844 Err(err) => Err(err),
845 }
846 }
847
848 async fn by_uuid_optional_cached(
849 database: &crate::database::Database,
850 uuid: uuid::Uuid,
851 ) -> Result<Option<Self>, anyhow::Error> {
852 match Self::by_uuid_cached(database, uuid).await {
853 Ok(res) => Ok(Some(res)),
854 Err(err) => {
855 if let Some(DatabaseError::Sqlx(sqlx::Error::RowNotFound)) =
856 err.downcast_ref::<DatabaseError>()
857 {
858 Ok(None)
859 } else {
860 Err(err)
861 }
862 }
863 }
864 }
865
866 #[inline]
867 fn get_fetchable(uuid: uuid::Uuid) -> Fetchable<Self> {
868 Fetchable {
869 uuid,
870 _model: PhantomData,
871 }
872 }
873
874 #[inline]
875 fn get_fetchable_from_row(row: &PgRow, column: impl AsRef<str>) -> Option<Fetchable<Self>> {
876 match row.try_get(column.as_ref()) {
877 Ok(uuid) => Some(Fetchable {
878 uuid,
879 _model: PhantomData,
880 }),
881 Err(_) => None,
882 }
883 }
884}
885
886#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
887pub enum ListenerPriority {
888 Highest,
889 High,
890 #[default]
891 Normal,
892 Low,
893 Lowest,
894}
895
896impl ListenerPriority {
897 #[inline]
898 fn rank(self) -> u8 {
899 match self {
900 Self::Highest => 5,
901 Self::High => 4,
902 Self::Normal => 3,
903 Self::Low => 2,
904 Self::Lowest => 1,
905 }
906 }
907}
908
909impl PartialOrd for ListenerPriority {
910 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
911 Some(self.cmp(other))
912 }
913}
914
915impl Ord for ListenerPriority {
916 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
917 let self_rank = self.rank();
918 let other_rank = other.rank();
919
920 other_rank.cmp(&self_rank)
921 }
922}
923
924#[async_trait::async_trait]
925impl<F: Send + Sync, AfterF: Send + Sync> crate::events::DisconnectEventHandler
926 for ModelHandlerList<F, AfterF>
927{
928 #[inline]
929 async fn disconnect(&self, id: uuid::Uuid) {
930 self.before_handlers.write().await.retain(|l| l.uuid != id);
931 self.after_handlers.write().await.retain(|l| l.uuid != id);
932 }
933
934 #[inline]
935 fn blocking_disconnect(&self, id: uuid::Uuid) {
936 self.before_handlers
937 .blocking_write()
938 .retain(|l| l.uuid != id);
939 self.after_handlers
940 .blocking_write()
941 .retain(|l| l.uuid != id);
942 }
943}
944
945pub struct ModelHandlerList<F: Send + Sync + 'static, AfterF: Send + Sync + 'static> {
946 before_handlers: RwLock<Vec<ModelHandler<F>>>,
947 after_handlers: RwLock<Vec<ModelHandler<AfterF>>>,
948}
949
950impl<F: Send + Sync + 'static, AfterF: Send + Sync + 'static> Default
951 for ModelHandlerList<F, AfterF>
952{
953 fn default() -> Self {
954 Self {
955 before_handlers: RwLock::new(Vec::new()),
956 after_handlers: RwLock::new(Vec::new()),
957 }
958 }
959}
960
961impl<F: Send + Sync + 'static, AfterF: Send + Sync + 'static> ModelHandlerList<F, AfterF> {
962 pub async fn register_handler(
963 self: &Arc<Self>,
964 priority: ListenerPriority,
965 callback: F,
966 ) -> ModelHandlerHandle {
967 let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
968
969 let mut self_listeners = self.before_handlers.write().await;
970 self_listeners.push(listener);
971 self_listeners.sort_by_key(|a| a.priority);
972
973 aborter
974 }
975
976 pub async fn register_after_handler(
977 self: &Arc<Self>,
978 priority: ListenerPriority,
979 callback: AfterF,
980 ) -> ModelHandlerHandle {
981 let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
982
983 let mut self_listeners = self.after_handlers.write().await;
984 self_listeners.push(listener);
985 self_listeners.sort_by_key(|a| a.priority);
986
987 aborter
988 }
989
990 pub fn blocking_register_handler(
993 self: &Arc<Self>,
994 priority: ListenerPriority,
995 callback: F,
996 ) -> ModelHandlerHandle {
997 let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
998
999 let mut self_listeners = self.before_handlers.blocking_write();
1000 self_listeners.push(listener);
1001 self_listeners.sort_by_key(|a| a.priority);
1002
1003 aborter
1004 }
1005
1006 pub fn blocking_register_after_handler(
1009 self: &Arc<Self>,
1010 priority: ListenerPriority,
1011 callback: AfterF,
1012 ) -> ModelHandlerHandle {
1013 let (listener, aborter) = ModelHandler::new(callback, priority, self.clone());
1014
1015 let mut self_listeners = self.after_handlers.blocking_write();
1016 self_listeners.push(listener);
1017 self_listeners.sort_by_key(|a| a.priority);
1018
1019 aborter
1020 }
1021}
1022
1023pub struct ModelHandler<F: Send + Sync + 'static> {
1024 uuid: uuid::Uuid,
1025 priority: ListenerPriority,
1026
1027 pub callback: F,
1028}
1029
1030impl<F: Send + Sync + 'static> ModelHandler<F> {
1031 pub(crate) fn new(
1032 callback: F,
1033 priority: ListenerPriority,
1034 list: Arc<dyn crate::events::DisconnectEventHandler + Send + Sync>,
1035 ) -> (Self, ModelHandlerHandle) {
1036 let handler = Self {
1037 uuid: uuid::Uuid::new_v4(),
1038 priority,
1039 callback,
1040 };
1041 let handle = ModelHandlerHandle {
1042 list_ref: list,
1043 id: handler.uuid,
1044 };
1045 (handler, handle)
1046 }
1047}
1048
1049pub struct ModelHandlerHandle {
1050 list_ref: Arc<dyn crate::events::DisconnectEventHandler + Send + Sync>,
1051 id: uuid::Uuid,
1052}
1053
1054impl ModelHandlerHandle {
1055 pub async fn disconnect(&self) {
1056 self.list_ref.disconnect(self.id).await;
1057 }
1058
1059 pub fn blocking_disconnect(&self) {
1062 self.list_ref.blocking_disconnect(self.id);
1063 }
1064}
1065
1066#[derive(Serialize, Deserialize, Clone, Copy)]
1067pub struct Fetchable<M: ByUuid> {
1068 pub uuid: uuid::Uuid,
1069 #[serde(skip)]
1070 _model: PhantomData<M>,
1071}
1072
1073impl<M: ByUuid + Send> Fetchable<M> {
1074 #[inline]
1075 pub async fn fetch(&self, database: &crate::database::Database) -> Result<M, DatabaseError> {
1076 M::by_uuid(database, self.uuid).await
1077 }
1078
1079 #[inline]
1080 pub async fn fetch_cached(
1081 &self,
1082 database: &crate::database::Database,
1083 ) -> Result<M, anyhow::Error> {
1084 M::by_uuid_cached(database, self.uuid).await
1085 }
1086
1087 #[inline]
1088 pub async fn fetch_optional(
1089 &self,
1090 database: &crate::database::Database,
1091 ) -> Result<Option<M>, DatabaseError> {
1092 M::by_uuid_optional(database, self.uuid).await
1093 }
1094
1095 #[inline]
1096 pub async fn fetch_optional_cached(
1097 &self,
1098 database: &crate::database::Database,
1099 ) -> Result<Option<M>, anyhow::Error> {
1100 M::by_uuid_optional_cached(database, self.uuid).await
1101 }
1102}
1103
1104pub struct InsertQueryBuilder<'a> {
1105 table: &'a str,
1106 columns: Vec<&'a str>,
1107 expressions: Vec<String>,
1108 arguments: PgArguments,
1109 returning_clause: Option<&'a str>,
1110}
1111
1112impl<'a> InsertQueryBuilder<'a> {
1113 pub fn new(table: &'a str) -> Self {
1114 Self {
1115 table,
1116 columns: Vec::new(),
1117 expressions: Vec::new(),
1118 arguments: PgArguments::default(),
1119 returning_clause: None,
1120 }
1121 }
1122
1123 pub fn set<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1124 &mut self,
1125 column: &'a str,
1126 value: T,
1127 ) -> &mut Self {
1128 if self.columns.contains(&column) {
1129 return self;
1130 }
1131
1132 if self.arguments.add(value).is_ok() {
1133 self.columns.push(column);
1134 let idx = self.arguments.len();
1135 self.expressions.push(format!("${}", idx));
1136 }
1137
1138 self
1139 }
1140
1141 pub fn set_expr<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1142 &mut self,
1143 column: &'a str,
1144 expression: &str,
1145 values: Vec<T>,
1146 ) -> &mut Self {
1147 if self.columns.contains(&column) {
1148 return self;
1149 }
1150
1151 let start_len = self.arguments.len();
1152
1153 for value in values {
1154 if self.arguments.add(value).is_err() {
1155 return self;
1156 }
1157 }
1158
1159 let mut expr = expression.to_string();
1160 let added_count = self.arguments.len() - start_len;
1161
1162 for i in (1..=added_count).rev() {
1163 let global_idx = start_len + i;
1164 expr = expr.replace(&format!("${}", i), &format!("${}", global_idx));
1165 }
1166
1167 self.columns.push(column);
1168 self.expressions.push(expr);
1169
1170 self
1171 }
1172
1173 pub fn returning(mut self, clause: &'a str) -> Self {
1174 self.returning_clause = Some(clause);
1175 self
1176 }
1177
1178 fn build_sql(&self) -> String {
1179 let columns_sql = self.columns.join(", ");
1180 let values_sql = self.expressions.join(", ");
1181
1182 let mut sql = format!(
1183 "INSERT INTO {} ({}) VALUES ({})",
1184 self.table, columns_sql, values_sql
1185 );
1186
1187 if let Some(clause) = self.returning_clause {
1188 sql.push_str(" RETURNING ");
1189 sql.push_str(clause);
1190 }
1191
1192 sql
1193 }
1194
1195 pub async fn execute(
1196 self,
1197 executor: impl sqlx::Executor<'a, Database = Postgres>,
1198 ) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error> {
1199 let sql = self.build_sql();
1200 sqlx::query_with(&sql, self.arguments)
1201 .execute(executor)
1202 .await
1203 }
1204
1205 pub async fn fetch_one(
1206 self,
1207 executor: impl sqlx::Executor<'a, Database = Postgres>,
1208 ) -> Result<sqlx::postgres::PgRow, sqlx::Error> {
1209 let sql = self.build_sql();
1210 sqlx::query_with(&sql, self.arguments)
1211 .fetch_one(executor)
1212 .await
1213 }
1214}
1215
1216pub struct UpdateQueryBuilder<'a> {
1217 builder: QueryBuilder<'a, Postgres>,
1218 updated_fields: HashSet<&'a str>,
1219 has_set_fields: bool,
1220 has_where: bool,
1221}
1222
1223impl<'a> UpdateQueryBuilder<'a> {
1224 pub fn new(table: &'a str) -> Self {
1225 let mut builder = QueryBuilder::new("UPDATE ");
1226 builder.push(table);
1227 builder.push(" SET ");
1228
1229 Self {
1230 builder,
1231 updated_fields: HashSet::new(),
1232 has_set_fields: false,
1233 has_where: false,
1234 }
1235 }
1236
1237 pub fn set<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1240 &mut self,
1241 column: &'a str,
1242 value: Option<T>,
1243 ) -> &mut Self {
1244 let Some(value) = value else {
1245 return self;
1246 };
1247
1248 if !self.updated_fields.insert(column) {
1249 return self;
1250 }
1251
1252 if self.has_set_fields {
1253 self.builder.push(", ");
1254 }
1255
1256 self.builder.push(column);
1257 self.builder.push(" = ");
1258 self.builder.push_bind(value);
1259
1260 self.has_set_fields = true;
1261 self
1262 }
1263
1264 pub fn where_eq<T: 'a + sqlx::Encode<'a, Postgres> + sqlx::Type<Postgres> + Send>(
1265 &mut self,
1266 column: &'a str,
1267 value: T,
1268 ) -> &mut Self {
1269 if self.has_where {
1270 self.builder.push(" AND ");
1271 } else {
1272 self.builder.push(" WHERE ");
1273 self.has_where = true;
1274 }
1275
1276 self.builder.push(column);
1277 self.builder.push(" = ");
1278 self.builder.push_bind(value);
1279 self
1280 }
1281
1282 pub async fn execute(
1283 mut self,
1284 executor: impl sqlx::Executor<'a, Database = Postgres>,
1285 ) -> Result<sqlx::any::AnyQueryResult, sqlx::Error> {
1286 if !self.has_set_fields {
1287 return Ok(sqlx::any::AnyQueryResult::default());
1288 }
1289
1290 let query = self.builder.build();
1291 query.execute(executor).await.map(|r| r.into())
1292 }
1293}
1294
1295pub struct OrderedJson<T>(pub T);
1297
1298impl<T: Serialize> sqlx::Encode<'_, sqlx::Postgres> for OrderedJson<T> {
1299 fn encode_by_ref(&self, buf: &mut PgArgumentBuffer) -> Result<IsNull, BoxDynError> {
1300 serde_json::to_writer(&mut **buf, &self.0)?;
1301 Ok(IsNull::No)
1302 }
1303}
1304
1305impl<T> sqlx::Type<sqlx::Postgres> for OrderedJson<T> {
1306 fn type_info() -> PgTypeInfo {
1307 PgTypeInfo::with_oid(sqlx::postgres::types::Oid(114))
1309 }
1310}
1311
1312#[async_trait::async_trait]
1313pub trait IntoApiObject {
1314 type ApiObject: Send;
1315 type ExtraArgs<'a>: Send;
1316
1317 async fn into_api_object<'a>(
1318 self,
1319 state: &crate::State,
1320 args: Self::ExtraArgs<'a>,
1321 ) -> Result<Self::ApiObject, DatabaseError>;
1322}
1323
1324#[async_trait::async_trait]
1325pub trait IntoAdminApiObject {
1326 type AdminApiObject: Send;
1327 type ExtraArgs<'a>: Send;
1328
1329 async fn into_admin_api_object<'a>(
1330 self,
1331 state: &crate::State,
1332 args: Self::ExtraArgs<'a>,
1333 ) -> Result<Self::AdminApiObject, DatabaseError>;
1334}