1use crate::{
2 models::{InsertQueryBuilder, UpdateQueryBuilder},
3 prelude::*,
4};
5use garde::Validate;
6use serde::{Deserialize, Serialize};
7use sqlx::{Row, postgres::PgRow, prelude::Type};
8use std::{
9 collections::{BTreeMap, HashMap},
10 hash::Hash,
11 sync::{Arc, LazyLock},
12};
13use tokio::sync::Mutex;
14use utoipa::ToSchema;
15
16pub enum DatabaseTransaction<'a> {
17 Mysql(sqlx::Transaction<'a, sqlx::MySql>),
18 Postgres(
19 sqlx::Transaction<'a, sqlx::Postgres>,
20 Arc<sqlx::Pool<sqlx::Postgres>>,
21 ),
22}
23
24#[derive(Clone)]
25pub enum DatabasePool {
26 Mysql(Arc<sqlx::Pool<sqlx::MySql>>),
27 Postgres(Arc<sqlx::Pool<sqlx::Postgres>>),
28}
29
30type DatabasePoolValue = (std::time::Instant, DatabasePool);
31static DATABASE_CLIENTS: LazyLock<Arc<Mutex<HashMap<uuid::Uuid, DatabasePoolValue>>>> =
32 LazyLock::new(|| {
33 let clients = Arc::new(Mutex::new(HashMap::<uuid::Uuid, DatabasePoolValue>::new()));
34
35 tokio::spawn({
36 let clients = Arc::clone(&clients);
37 async move {
38 loop {
39 tokio::time::sleep(std::time::Duration::from_secs(60)).await;
40
41 let mut clients = clients.lock().await;
42 clients.retain(|_, &mut (last_used, _)| {
43 last_used.elapsed() < std::time::Duration::from_secs(300)
44 });
45 }
46 }
47 });
48
49 clients
50 });
51
52#[derive(ToSchema, Serialize, Deserialize, Type, PartialEq, Eq, Hash, Clone, Copy)]
53#[serde(rename_all = "lowercase")]
54#[schema(rename_all = "lowercase")]
55#[sqlx(type_name = "database_type", rename_all = "SCREAMING_SNAKE_CASE")]
56pub enum DatabaseType {
57 Mysql,
58 Postgres,
59}
60
61#[derive(Serialize, Deserialize, Clone)]
62pub struct DatabaseHost {
63 pub uuid: uuid::Uuid,
64
65 pub name: compact_str::CompactString,
66 pub r#type: DatabaseType,
67
68 pub deployment_enabled: bool,
69 pub maintenance_enabled: bool,
70
71 pub public_host: Option<compact_str::CompactString>,
72 pub host: compact_str::CompactString,
73 pub public_port: Option<i32>,
74 pub port: i32,
75
76 pub username: compact_str::CompactString,
77 pub password: Vec<u8>,
78
79 pub created: chrono::NaiveDateTime,
80}
81
82impl BaseModel for DatabaseHost {
83 const NAME: &'static str = "database_host";
84
85 #[inline]
86 fn columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
87 let prefix = prefix.unwrap_or_default();
88
89 BTreeMap::from([
90 (
91 "database_hosts.uuid",
92 compact_str::format_compact!("{prefix}uuid"),
93 ),
94 (
95 "database_hosts.name",
96 compact_str::format_compact!("{prefix}name"),
97 ),
98 (
99 "database_hosts.type",
100 compact_str::format_compact!("{prefix}type"),
101 ),
102 (
103 "database_hosts.deployment_enabled",
104 compact_str::format_compact!("{prefix}deployment_enabled"),
105 ),
106 (
107 "database_hosts.maintenance_enabled",
108 compact_str::format_compact!("{prefix}maintenance_enabled"),
109 ),
110 (
111 "database_hosts.public_host",
112 compact_str::format_compact!("{prefix}public_host"),
113 ),
114 (
115 "database_hosts.host",
116 compact_str::format_compact!("{prefix}host"),
117 ),
118 (
119 "database_hosts.public_port",
120 compact_str::format_compact!("{prefix}public_port"),
121 ),
122 (
123 "database_hosts.port",
124 compact_str::format_compact!("{prefix}port"),
125 ),
126 (
127 "database_hosts.username",
128 compact_str::format_compact!("{prefix}username"),
129 ),
130 (
131 "database_hosts.password",
132 compact_str::format_compact!("{prefix}password"),
133 ),
134 (
135 "database_hosts.created",
136 compact_str::format_compact!("{prefix}created"),
137 ),
138 ])
139 }
140
141 #[inline]
142 fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError> {
143 let prefix = prefix.unwrap_or_default();
144
145 Ok(Self {
146 uuid: row.try_get(compact_str::format_compact!("{prefix}uuid").as_str())?,
147 name: row.try_get(compact_str::format_compact!("{prefix}name").as_str())?,
148 r#type: row.try_get(compact_str::format_compact!("{prefix}type").as_str())?,
149 deployment_enabled: row
150 .try_get(compact_str::format_compact!("{prefix}deployment_enabled").as_str())?,
151 maintenance_enabled: row
152 .try_get(compact_str::format_compact!("{prefix}maintenance_enabled").as_str())?,
153 public_host: row
154 .try_get(compact_str::format_compact!("{prefix}public_host").as_str())?,
155 host: row.try_get(compact_str::format_compact!("{prefix}host").as_str())?,
156 public_port: row
157 .try_get(compact_str::format_compact!("{prefix}public_port").as_str())?,
158 port: row.try_get(compact_str::format_compact!("{prefix}port").as_str())?,
159 username: row.try_get(compact_str::format_compact!("{prefix}username").as_str())?,
160 password: row.try_get(compact_str::format_compact!("{prefix}password").as_str())?,
161 created: row.try_get(compact_str::format_compact!("{prefix}created").as_str())?,
162 })
163 }
164}
165
166impl DatabaseHost {
167 pub async fn get_connection(
168 &self,
169 database: &crate::database::Database,
170 ) -> Result<DatabasePool, crate::database::DatabaseError> {
171 let mut clients = DATABASE_CLIENTS.lock().await;
172
173 if let Some((last_used, pool)) = clients.get_mut(&self.uuid) {
174 *last_used = std::time::Instant::now();
175
176 return Ok(pool.clone());
177 }
178
179 drop(clients);
180
181 let password = database.decrypt(self.password.clone()).await?;
182
183 let pool = match self.r#type {
184 DatabaseType::Mysql => {
185 let options = sqlx::mysql::MySqlConnectOptions::new()
186 .host(&self.host)
187 .port(self.port as u16)
188 .username(&self.username)
189 .password(&password);
190
191 let pool = sqlx::Pool::connect_with(options).await?;
192 DatabasePool::Mysql(Arc::new(pool))
193 }
194 DatabaseType::Postgres => {
195 let options = sqlx::postgres::PgConnectOptions::new()
196 .host(&self.host)
197 .port(self.port as u16)
198 .username(&self.username)
199 .password(&password)
200 .database("postgres");
201
202 let pool = sqlx::Pool::connect_with(options).await?;
203 DatabasePool::Postgres(Arc::new(pool))
204 }
205 };
206
207 DATABASE_CLIENTS
208 .lock()
209 .await
210 .insert(self.uuid, (std::time::Instant::now(), pool.clone()));
211 Ok(pool)
212 }
213
214 pub async fn all_with_pagination(
215 database: &crate::database::Database,
216 page: i64,
217 per_page: i64,
218 search: Option<&str>,
219 ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
220 let offset = (page - 1) * per_page;
221
222 let rows = sqlx::query(&format!(
223 r#"
224 SELECT {}, COUNT(*) OVER() AS total_count
225 FROM database_hosts
226 WHERE ($1 IS NULL OR database_hosts.name ILIKE '%' || $1 || '%')
227 ORDER BY database_hosts.created
228 LIMIT $2 OFFSET $3
229 "#,
230 Self::columns_sql(None)
231 ))
232 .bind(search)
233 .bind(per_page)
234 .bind(offset)
235 .fetch_all(database.read())
236 .await?;
237
238 Ok(super::Pagination {
239 total: rows
240 .first()
241 .map_or(Ok(0), |row| row.try_get("total_count"))?,
242 per_page,
243 page,
244 data: rows
245 .into_iter()
246 .map(|row| Self::map(None, &row))
247 .try_collect_vec()?,
248 })
249 }
250
251 pub async fn by_location_uuid_uuid(
252 database: &crate::database::Database,
253 location_uuid: uuid::Uuid,
254 uuid: uuid::Uuid,
255 ) -> Result<Option<Self>, crate::database::DatabaseError> {
256 let row = sqlx::query(&format!(
257 r#"
258 SELECT {}
259 FROM database_hosts
260 JOIN location_database_hosts ON location_database_hosts.database_host_uuid = database_hosts.uuid AND location_database_hosts.location_uuid = $1
261 WHERE database_hosts.uuid = $2
262 "#,
263 Self::columns_sql(None)
264 ))
265 .bind(location_uuid)
266 .bind(uuid)
267 .fetch_optional(database.read())
268 .await?;
269
270 row.try_map(|row| Self::map(None, &row))
271 }
272
273 #[inline]
274 pub fn into_admin_api_object(self) -> AdminApiDatabaseHost {
275 AdminApiDatabaseHost {
276 uuid: self.uuid,
277 name: self.name,
278 r#type: self.r#type,
279 deployment_enabled: self.deployment_enabled,
280 maintenance_enabled: self.maintenance_enabled,
281 public_host: self.public_host,
282 host: self.host,
283 public_port: self.public_port,
284 port: self.port,
285 username: self.username,
286 created: self.created.and_utc(),
287 }
288 }
289
290 #[inline]
291 pub fn into_api_object(self) -> ApiDatabaseHost {
292 ApiDatabaseHost {
293 uuid: self.uuid,
294 name: self.name,
295 maintenance_enabled: self.maintenance_enabled,
296 r#type: self.r#type,
297 host: self.public_host.unwrap_or(self.host),
298 port: self.public_port.unwrap_or(self.port),
299 }
300 }
301}
302
303#[async_trait::async_trait]
304impl ByUuid for DatabaseHost {
305 async fn by_uuid(
306 database: &crate::database::Database,
307 uuid: uuid::Uuid,
308 ) -> Result<Self, crate::database::DatabaseError> {
309 let row = sqlx::query(&format!(
310 r#"
311 SELECT {}
312 FROM database_hosts
313 WHERE database_hosts.uuid = $1
314 "#,
315 Self::columns_sql(None)
316 ))
317 .bind(uuid)
318 .fetch_one(database.read())
319 .await?;
320
321 Self::map(None, &row)
322 }
323}
324
325#[derive(ToSchema, Deserialize, Validate)]
326pub struct CreateDatabaseHostOptions {
327 #[garde(length(chars, min = 3, max = 255))]
328 #[schema(min_length = 3, max_length = 255)]
329 pub name: compact_str::CompactString,
330 #[garde(skip)]
331 pub r#type: DatabaseType,
332
333 #[garde(skip)]
334 pub deployment_enabled: bool,
335 #[garde(skip)]
336 pub maintenance_enabled: bool,
337
338 #[garde(length(chars, min = 3, max = 255))]
339 #[schema(min_length = 3, max_length = 255)]
340 pub public_host: Option<compact_str::CompactString>,
341 #[garde(length(chars, min = 3, max = 255))]
342 #[schema(min_length = 3, max_length = 255)]
343 pub host: compact_str::CompactString,
344 #[garde(range(min = 1))]
345 #[schema(minimum = 1)]
346 pub public_port: Option<u16>,
347 #[garde(range(min = 1))]
348 #[schema(minimum = 1)]
349 pub port: u16,
350
351 #[garde(length(chars, min = 3, max = 255))]
352 #[schema(min_length = 3, max_length = 255)]
353 pub username: compact_str::CompactString,
354 #[garde(length(chars, min = 1, max = 512))]
355 #[schema(min_length = 1, max_length = 512)]
356 pub password: compact_str::CompactString,
357}
358
359#[async_trait::async_trait]
360impl CreatableModel for DatabaseHost {
361 type CreateOptions<'a> = CreateDatabaseHostOptions;
362 type CreateResult = Self;
363
364 fn get_create_handlers() -> &'static LazyLock<CreateListenerList<Self>> {
365 static CREATE_LISTENERS: LazyLock<CreateListenerList<DatabaseHost>> =
366 LazyLock::new(|| Arc::new(ModelHandlerList::default()));
367
368 &CREATE_LISTENERS
369 }
370
371 async fn create(
372 state: &crate::State,
373 mut options: Self::CreateOptions<'_>,
374 ) -> Result<Self, crate::database::DatabaseError> {
375 options.validate()?;
376
377 let mut transaction = state.database.write().begin().await?;
378
379 let mut query_builder = InsertQueryBuilder::new("database_hosts");
380
381 Self::run_create_handlers(&mut options, &mut query_builder, state, &mut transaction)
382 .await?;
383
384 query_builder
385 .set("name", &options.name)
386 .set("type", options.r#type)
387 .set("deployment_enabled", options.deployment_enabled)
388 .set("maintenance_enabled", options.maintenance_enabled)
389 .set("public_host", &options.public_host)
390 .set("host", &options.host)
391 .set("public_port", options.public_port.map(|p| p as i32))
392 .set("port", options.port as i32)
393 .set("username", &options.username)
394 .set(
395 "password",
396 state.database.encrypt(options.password.to_string()).await?,
397 );
398
399 let row = query_builder
400 .returning(&Self::columns_sql(None))
401 .fetch_one(&mut *transaction)
402 .await?;
403 let database_host = Self::map(None, &row)?;
404
405 transaction.commit().await?;
406
407 Ok(database_host)
408 }
409}
410
411#[derive(ToSchema, Serialize, Deserialize, Validate, Clone, Default)]
412pub struct UpdateDatabaseHostOptions {
413 #[garde(length(chars, min = 3, max = 255))]
414 #[schema(min_length = 3, max_length = 255)]
415 name: Option<compact_str::CompactString>,
416
417 #[garde(skip)]
418 deployment_enabled: Option<bool>,
419 #[garde(skip)]
420 maintenance_enabled: Option<bool>,
421
422 #[garde(length(max = 255))]
423 #[schema(max_length = 255)]
424 #[serde(
425 default,
426 skip_serializing_if = "Option::is_none",
427 with = "::serde_with::rust::double_option"
428 )]
429 public_host: Option<Option<compact_str::CompactString>>,
430 #[garde(length(chars, min = 3, max = 255))]
431 #[schema(min_length = 3, max_length = 255)]
432 host: Option<compact_str::CompactString>,
433 #[serde(
434 default,
435 skip_serializing_if = "Option::is_none",
436 with = "::serde_with::rust::double_option"
437 )]
438 #[garde(range(min = 1))]
439 #[schema(minimum = 1)]
440 public_port: Option<Option<u16>>,
441 #[garde(range(min = 1))]
442 #[schema(minimum = 1)]
443 port: Option<u16>,
444
445 #[garde(length(chars, min = 3, max = 255))]
446 #[schema(min_length = 3, max_length = 255)]
447 username: Option<compact_str::CompactString>,
448 #[garde(length(chars, min = 1, max = 512))]
449 #[schema(min_length = 1, max_length = 512)]
450 password: Option<compact_str::CompactString>,
451}
452
453#[async_trait::async_trait]
454impl UpdatableModel for DatabaseHost {
455 type UpdateOptions = UpdateDatabaseHostOptions;
456
457 fn get_update_handlers() -> &'static LazyLock<UpdateListenerList<Self>> {
458 static UPDATE_LISTENERS: LazyLock<UpdateListenerList<DatabaseHost>> =
459 LazyLock::new(|| Arc::new(ModelHandlerList::default()));
460
461 &UPDATE_LISTENERS
462 }
463
464 async fn update(
465 &mut self,
466 state: &crate::State,
467 mut options: Self::UpdateOptions,
468 ) -> Result<(), crate::database::DatabaseError> {
469 options.validate()?;
470
471 let mut transaction = state.database.write().begin().await?;
472
473 let mut query_builder = UpdateQueryBuilder::new("database_hosts");
474
475 Self::run_update_handlers(
476 self,
477 &mut options,
478 &mut query_builder,
479 state,
480 &mut transaction,
481 )
482 .await?;
483
484 let password = if let Some(password) = options.password {
485 Some(state.database.encrypt(password.to_string()).await?)
486 } else {
487 None
488 };
489
490 query_builder
491 .set("name", options.name.as_ref())
492 .set("deployment_enabled", options.deployment_enabled)
493 .set("maintenance_enabled", options.maintenance_enabled)
494 .set("public_host", options.public_host.as_ref())
495 .set("host", options.host.as_ref())
496 .set(
497 "public_port",
498 options
499 .public_port
500 .as_ref()
501 .map(|p| p.as_ref().map(|port| *port as i32)),
502 )
503 .set("port", options.port.as_ref().map(|p| *p as i32))
504 .set("username", options.username.as_ref())
505 .set("password", password.as_ref())
506 .where_eq("uuid", self.uuid);
507
508 query_builder.execute(&mut *transaction).await?;
509
510 if let Some(name) = options.name {
511 self.name = name;
512 }
513 if let Some(deployment_enabled) = options.deployment_enabled {
514 self.deployment_enabled = deployment_enabled;
515 }
516 if let Some(maintenance_enabled) = options.maintenance_enabled {
517 self.maintenance_enabled = maintenance_enabled;
518 }
519 if let Some(public_host) = options.public_host {
520 self.public_host = public_host;
521 }
522 if let Some(host) = options.host {
523 self.host = host;
524 }
525 if let Some(public_port) = options.public_port {
526 self.public_port = public_port.map(|port| port as i32);
527 }
528 if let Some(port) = options.port {
529 self.port = port as i32;
530 }
531 if let Some(username) = options.username {
532 self.username = username;
533 }
534 if let Some(password) = password {
535 self.password = password;
536 }
537
538 transaction.commit().await?;
539
540 Ok(())
541 }
542}
543
544#[async_trait::async_trait]
545impl DeletableModel for DatabaseHost {
546 type DeleteOptions = ();
547
548 fn get_delete_handlers() -> &'static LazyLock<DeleteListenerList<Self>> {
549 static DELETE_LISTENERS: LazyLock<DeleteListenerList<DatabaseHost>> =
550 LazyLock::new(|| Arc::new(ModelHandlerList::default()));
551
552 &DELETE_LISTENERS
553 }
554
555 async fn delete(
556 &self,
557 state: &crate::State,
558 options: Self::DeleteOptions,
559 ) -> Result<(), anyhow::Error> {
560 let mut transaction = state.database.write().begin().await?;
561
562 self.run_delete_handlers(&options, state, &mut transaction)
563 .await?;
564
565 sqlx::query(
566 r#"
567 DELETE FROM database_hosts
568 WHERE database_hosts.uuid = $1
569 "#,
570 )
571 .bind(self.uuid)
572 .execute(&mut *transaction)
573 .await?;
574
575 transaction.commit().await?;
576
577 Ok(())
578 }
579}
580
581#[derive(ToSchema, Serialize)]
582#[schema(title = "AdminDatabaseHost")]
583pub struct AdminApiDatabaseHost {
584 pub uuid: uuid::Uuid,
585
586 pub name: compact_str::CompactString,
587 pub deployment_enabled: bool,
588 pub maintenance_enabled: bool,
589 pub r#type: DatabaseType,
590
591 pub public_host: Option<compact_str::CompactString>,
592 pub host: compact_str::CompactString,
593 pub public_port: Option<i32>,
594 pub port: i32,
595
596 pub username: compact_str::CompactString,
597
598 pub created: chrono::DateTime<chrono::Utc>,
599}
600
601#[derive(ToSchema, Serialize)]
602#[schema(title = "DatabaseHost")]
603pub struct ApiDatabaseHost {
604 pub uuid: uuid::Uuid,
605
606 pub name: compact_str::CompactString,
607 pub maintenance_enabled: bool,
608 pub r#type: DatabaseType,
609
610 pub host: compact_str::CompactString,
611 pub port: i32,
612}