shared/models/
database_host.rs

1use crate::{
2    models::{InsertQueryBuilder, UpdateQueryBuilder},
3    prelude::*,
4};
5use garde::Validate;
6use serde::{Deserialize, Serialize};
7use sqlx::{Row, postgres::PgRow, prelude::Type};
8use std::{
9    collections::{BTreeMap, HashMap},
10    hash::Hash,
11    sync::{Arc, LazyLock},
12};
13use tokio::sync::Mutex;
14use utoipa::ToSchema;
15
16pub enum DatabaseTransaction<'a> {
17    Mysql(sqlx::Transaction<'a, sqlx::MySql>),
18    Postgres(
19        sqlx::Transaction<'a, sqlx::Postgres>,
20        Arc<sqlx::Pool<sqlx::Postgres>>,
21    ),
22}
23
24#[derive(Clone)]
25pub enum DatabasePool {
26    Mysql(Arc<sqlx::Pool<sqlx::MySql>>),
27    Postgres(Arc<sqlx::Pool<sqlx::Postgres>>),
28}
29
30type DatabasePoolValue = (std::time::Instant, DatabasePool);
31static DATABASE_CLIENTS: LazyLock<Arc<Mutex<HashMap<uuid::Uuid, DatabasePoolValue>>>> =
32    LazyLock::new(|| {
33        let clients = Arc::new(Mutex::new(HashMap::<uuid::Uuid, DatabasePoolValue>::new()));
34
35        tokio::spawn({
36            let clients = Arc::clone(&clients);
37            async move {
38                loop {
39                    tokio::time::sleep(std::time::Duration::from_secs(60)).await;
40
41                    let mut clients = clients.lock().await;
42                    clients.retain(|_, &mut (last_used, _)| {
43                        last_used.elapsed() < std::time::Duration::from_secs(300)
44                    });
45                }
46            }
47        });
48
49        clients
50    });
51
52#[derive(ToSchema, Serialize, Deserialize, Type, PartialEq, Eq, Hash, Clone, Copy)]
53#[serde(rename_all = "lowercase")]
54#[schema(rename_all = "lowercase")]
55#[sqlx(type_name = "database_type", rename_all = "SCREAMING_SNAKE_CASE")]
56pub enum DatabaseType {
57    Mysql,
58    Postgres,
59}
60
61#[derive(Serialize, Deserialize, Clone)]
62pub struct DatabaseHost {
63    pub uuid: uuid::Uuid,
64
65    pub name: compact_str::CompactString,
66    pub r#type: DatabaseType,
67
68    pub deployment_enabled: bool,
69    pub maintenance_enabled: bool,
70
71    pub public_host: Option<compact_str::CompactString>,
72    pub host: compact_str::CompactString,
73    pub public_port: Option<i32>,
74    pub port: i32,
75
76    pub username: compact_str::CompactString,
77    pub password: Vec<u8>,
78
79    pub created: chrono::NaiveDateTime,
80}
81
82impl BaseModel for DatabaseHost {
83    const NAME: &'static str = "database_host";
84
85    #[inline]
86    fn columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
87        let prefix = prefix.unwrap_or_default();
88
89        BTreeMap::from([
90            (
91                "database_hosts.uuid",
92                compact_str::format_compact!("{prefix}uuid"),
93            ),
94            (
95                "database_hosts.name",
96                compact_str::format_compact!("{prefix}name"),
97            ),
98            (
99                "database_hosts.type",
100                compact_str::format_compact!("{prefix}type"),
101            ),
102            (
103                "database_hosts.deployment_enabled",
104                compact_str::format_compact!("{prefix}deployment_enabled"),
105            ),
106            (
107                "database_hosts.maintenance_enabled",
108                compact_str::format_compact!("{prefix}maintenance_enabled"),
109            ),
110            (
111                "database_hosts.public_host",
112                compact_str::format_compact!("{prefix}public_host"),
113            ),
114            (
115                "database_hosts.host",
116                compact_str::format_compact!("{prefix}host"),
117            ),
118            (
119                "database_hosts.public_port",
120                compact_str::format_compact!("{prefix}public_port"),
121            ),
122            (
123                "database_hosts.port",
124                compact_str::format_compact!("{prefix}port"),
125            ),
126            (
127                "database_hosts.username",
128                compact_str::format_compact!("{prefix}username"),
129            ),
130            (
131                "database_hosts.password",
132                compact_str::format_compact!("{prefix}password"),
133            ),
134            (
135                "database_hosts.created",
136                compact_str::format_compact!("{prefix}created"),
137            ),
138        ])
139    }
140
141    #[inline]
142    fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError> {
143        let prefix = prefix.unwrap_or_default();
144
145        Ok(Self {
146            uuid: row.try_get(compact_str::format_compact!("{prefix}uuid").as_str())?,
147            name: row.try_get(compact_str::format_compact!("{prefix}name").as_str())?,
148            r#type: row.try_get(compact_str::format_compact!("{prefix}type").as_str())?,
149            deployment_enabled: row
150                .try_get(compact_str::format_compact!("{prefix}deployment_enabled").as_str())?,
151            maintenance_enabled: row
152                .try_get(compact_str::format_compact!("{prefix}maintenance_enabled").as_str())?,
153            public_host: row
154                .try_get(compact_str::format_compact!("{prefix}public_host").as_str())?,
155            host: row.try_get(compact_str::format_compact!("{prefix}host").as_str())?,
156            public_port: row
157                .try_get(compact_str::format_compact!("{prefix}public_port").as_str())?,
158            port: row.try_get(compact_str::format_compact!("{prefix}port").as_str())?,
159            username: row.try_get(compact_str::format_compact!("{prefix}username").as_str())?,
160            password: row.try_get(compact_str::format_compact!("{prefix}password").as_str())?,
161            created: row.try_get(compact_str::format_compact!("{prefix}created").as_str())?,
162        })
163    }
164}
165
166impl DatabaseHost {
167    pub async fn get_connection(
168        &self,
169        database: &crate::database::Database,
170    ) -> Result<DatabasePool, crate::database::DatabaseError> {
171        let mut clients = DATABASE_CLIENTS.lock().await;
172
173        if let Some((last_used, pool)) = clients.get_mut(&self.uuid) {
174            *last_used = std::time::Instant::now();
175
176            return Ok(pool.clone());
177        }
178
179        drop(clients);
180
181        let password = database.decrypt(self.password.clone()).await?;
182
183        let pool = match self.r#type {
184            DatabaseType::Mysql => {
185                let options = sqlx::mysql::MySqlConnectOptions::new()
186                    .host(&self.host)
187                    .port(self.port as u16)
188                    .username(&self.username)
189                    .password(&password);
190
191                let pool = sqlx::Pool::connect_with(options).await?;
192                DatabasePool::Mysql(Arc::new(pool))
193            }
194            DatabaseType::Postgres => {
195                let options = sqlx::postgres::PgConnectOptions::new()
196                    .host(&self.host)
197                    .port(self.port as u16)
198                    .username(&self.username)
199                    .password(&password)
200                    .database("postgres");
201
202                let pool = sqlx::Pool::connect_with(options).await?;
203                DatabasePool::Postgres(Arc::new(pool))
204            }
205        };
206
207        DATABASE_CLIENTS
208            .lock()
209            .await
210            .insert(self.uuid, (std::time::Instant::now(), pool.clone()));
211        Ok(pool)
212    }
213
214    pub async fn all_with_pagination(
215        database: &crate::database::Database,
216        page: i64,
217        per_page: i64,
218        search: Option<&str>,
219    ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
220        let offset = (page - 1) * per_page;
221
222        let rows = sqlx::query(&format!(
223            r#"
224            SELECT {}, COUNT(*) OVER() AS total_count
225            FROM database_hosts
226            WHERE ($1 IS NULL OR database_hosts.name ILIKE '%' || $1 || '%')
227            ORDER BY database_hosts.created
228            LIMIT $2 OFFSET $3
229            "#,
230            Self::columns_sql(None)
231        ))
232        .bind(search)
233        .bind(per_page)
234        .bind(offset)
235        .fetch_all(database.read())
236        .await?;
237
238        Ok(super::Pagination {
239            total: rows
240                .first()
241                .map_or(Ok(0), |row| row.try_get("total_count"))?,
242            per_page,
243            page,
244            data: rows
245                .into_iter()
246                .map(|row| Self::map(None, &row))
247                .try_collect_vec()?,
248        })
249    }
250
251    pub async fn by_location_uuid_uuid(
252        database: &crate::database::Database,
253        location_uuid: uuid::Uuid,
254        uuid: uuid::Uuid,
255    ) -> Result<Option<Self>, crate::database::DatabaseError> {
256        let row = sqlx::query(&format!(
257            r#"
258            SELECT {}
259            FROM database_hosts
260            JOIN location_database_hosts ON location_database_hosts.database_host_uuid = database_hosts.uuid AND location_database_hosts.location_uuid = $1
261            WHERE database_hosts.uuid = $2
262            "#,
263            Self::columns_sql(None)
264        ))
265        .bind(location_uuid)
266        .bind(uuid)
267        .fetch_optional(database.read())
268        .await?;
269
270        row.try_map(|row| Self::map(None, &row))
271    }
272
273    #[inline]
274    pub fn into_admin_api_object(self) -> AdminApiDatabaseHost {
275        AdminApiDatabaseHost {
276            uuid: self.uuid,
277            name: self.name,
278            r#type: self.r#type,
279            deployment_enabled: self.deployment_enabled,
280            maintenance_enabled: self.maintenance_enabled,
281            public_host: self.public_host,
282            host: self.host,
283            public_port: self.public_port,
284            port: self.port,
285            username: self.username,
286            created: self.created.and_utc(),
287        }
288    }
289
290    #[inline]
291    pub fn into_api_object(self) -> ApiDatabaseHost {
292        ApiDatabaseHost {
293            uuid: self.uuid,
294            name: self.name,
295            maintenance_enabled: self.maintenance_enabled,
296            r#type: self.r#type,
297            host: self.public_host.unwrap_or(self.host),
298            port: self.public_port.unwrap_or(self.port),
299        }
300    }
301}
302
303#[async_trait::async_trait]
304impl ByUuid for DatabaseHost {
305    async fn by_uuid(
306        database: &crate::database::Database,
307        uuid: uuid::Uuid,
308    ) -> Result<Self, crate::database::DatabaseError> {
309        let row = sqlx::query(&format!(
310            r#"
311            SELECT {}
312            FROM database_hosts
313            WHERE database_hosts.uuid = $1
314            "#,
315            Self::columns_sql(None)
316        ))
317        .bind(uuid)
318        .fetch_one(database.read())
319        .await?;
320
321        Self::map(None, &row)
322    }
323}
324
325#[derive(ToSchema, Deserialize, Validate)]
326pub struct CreateDatabaseHostOptions {
327    #[garde(length(chars, min = 3, max = 255))]
328    #[schema(min_length = 3, max_length = 255)]
329    pub name: compact_str::CompactString,
330    #[garde(skip)]
331    pub r#type: DatabaseType,
332
333    #[garde(skip)]
334    pub deployment_enabled: bool,
335    #[garde(skip)]
336    pub maintenance_enabled: bool,
337
338    #[garde(length(chars, min = 3, max = 255))]
339    #[schema(min_length = 3, max_length = 255)]
340    pub public_host: Option<compact_str::CompactString>,
341    #[garde(length(chars, min = 3, max = 255))]
342    #[schema(min_length = 3, max_length = 255)]
343    pub host: compact_str::CompactString,
344    #[garde(range(min = 1))]
345    #[schema(minimum = 1)]
346    pub public_port: Option<u16>,
347    #[garde(range(min = 1))]
348    #[schema(minimum = 1)]
349    pub port: u16,
350
351    #[garde(length(chars, min = 3, max = 255))]
352    #[schema(min_length = 3, max_length = 255)]
353    pub username: compact_str::CompactString,
354    #[garde(length(chars, min = 1, max = 512))]
355    #[schema(min_length = 1, max_length = 512)]
356    pub password: compact_str::CompactString,
357}
358
359#[async_trait::async_trait]
360impl CreatableModel for DatabaseHost {
361    type CreateOptions<'a> = CreateDatabaseHostOptions;
362    type CreateResult = Self;
363
364    fn get_create_handlers() -> &'static LazyLock<CreateListenerList<Self>> {
365        static CREATE_LISTENERS: LazyLock<CreateListenerList<DatabaseHost>> =
366            LazyLock::new(|| Arc::new(ModelHandlerList::default()));
367
368        &CREATE_LISTENERS
369    }
370
371    async fn create(
372        state: &crate::State,
373        mut options: Self::CreateOptions<'_>,
374    ) -> Result<Self, crate::database::DatabaseError> {
375        options.validate()?;
376
377        let mut transaction = state.database.write().begin().await?;
378
379        let mut query_builder = InsertQueryBuilder::new("database_hosts");
380
381        Self::run_create_handlers(&mut options, &mut query_builder, state, &mut transaction)
382            .await?;
383
384        query_builder
385            .set("name", &options.name)
386            .set("type", options.r#type)
387            .set("deployment_enabled", options.deployment_enabled)
388            .set("maintenance_enabled", options.maintenance_enabled)
389            .set("public_host", &options.public_host)
390            .set("host", &options.host)
391            .set("public_port", options.public_port.map(|p| p as i32))
392            .set("port", options.port as i32)
393            .set("username", &options.username)
394            .set(
395                "password",
396                state.database.encrypt(options.password.to_string()).await?,
397            );
398
399        let row = query_builder
400            .returning(&Self::columns_sql(None))
401            .fetch_one(&mut *transaction)
402            .await?;
403        let database_host = Self::map(None, &row)?;
404
405        transaction.commit().await?;
406
407        Ok(database_host)
408    }
409}
410
411#[derive(ToSchema, Serialize, Deserialize, Validate, Clone, Default)]
412pub struct UpdateDatabaseHostOptions {
413    #[garde(length(chars, min = 3, max = 255))]
414    #[schema(min_length = 3, max_length = 255)]
415    name: Option<compact_str::CompactString>,
416
417    #[garde(skip)]
418    deployment_enabled: Option<bool>,
419    #[garde(skip)]
420    maintenance_enabled: Option<bool>,
421
422    #[garde(length(max = 255))]
423    #[schema(max_length = 255)]
424    #[serde(
425        default,
426        skip_serializing_if = "Option::is_none",
427        with = "::serde_with::rust::double_option"
428    )]
429    public_host: Option<Option<compact_str::CompactString>>,
430    #[garde(length(chars, min = 3, max = 255))]
431    #[schema(min_length = 3, max_length = 255)]
432    host: Option<compact_str::CompactString>,
433    #[serde(
434        default,
435        skip_serializing_if = "Option::is_none",
436        with = "::serde_with::rust::double_option"
437    )]
438    #[garde(range(min = 1))]
439    #[schema(minimum = 1)]
440    public_port: Option<Option<u16>>,
441    #[garde(range(min = 1))]
442    #[schema(minimum = 1)]
443    port: Option<u16>,
444
445    #[garde(length(chars, min = 3, max = 255))]
446    #[schema(min_length = 3, max_length = 255)]
447    username: Option<compact_str::CompactString>,
448    #[garde(length(chars, min = 1, max = 512))]
449    #[schema(min_length = 1, max_length = 512)]
450    password: Option<compact_str::CompactString>,
451}
452
453#[async_trait::async_trait]
454impl UpdatableModel for DatabaseHost {
455    type UpdateOptions = UpdateDatabaseHostOptions;
456
457    fn get_update_handlers() -> &'static LazyLock<UpdateListenerList<Self>> {
458        static UPDATE_LISTENERS: LazyLock<UpdateListenerList<DatabaseHost>> =
459            LazyLock::new(|| Arc::new(ModelHandlerList::default()));
460
461        &UPDATE_LISTENERS
462    }
463
464    async fn update(
465        &mut self,
466        state: &crate::State,
467        mut options: Self::UpdateOptions,
468    ) -> Result<(), crate::database::DatabaseError> {
469        options.validate()?;
470
471        let mut transaction = state.database.write().begin().await?;
472
473        let mut query_builder = UpdateQueryBuilder::new("database_hosts");
474
475        Self::run_update_handlers(
476            self,
477            &mut options,
478            &mut query_builder,
479            state,
480            &mut transaction,
481        )
482        .await?;
483
484        let password = if let Some(password) = options.password {
485            Some(state.database.encrypt(password.to_string()).await?)
486        } else {
487            None
488        };
489
490        query_builder
491            .set("name", options.name.as_ref())
492            .set("deployment_enabled", options.deployment_enabled)
493            .set("maintenance_enabled", options.maintenance_enabled)
494            .set("public_host", options.public_host.as_ref())
495            .set("host", options.host.as_ref())
496            .set(
497                "public_port",
498                options
499                    .public_port
500                    .as_ref()
501                    .map(|p| p.as_ref().map(|port| *port as i32)),
502            )
503            .set("port", options.port.as_ref().map(|p| *p as i32))
504            .set("username", options.username.as_ref())
505            .set("password", password.as_ref())
506            .where_eq("uuid", self.uuid);
507
508        query_builder.execute(&mut *transaction).await?;
509
510        if let Some(name) = options.name {
511            self.name = name;
512        }
513        if let Some(deployment_enabled) = options.deployment_enabled {
514            self.deployment_enabled = deployment_enabled;
515        }
516        if let Some(maintenance_enabled) = options.maintenance_enabled {
517            self.maintenance_enabled = maintenance_enabled;
518        }
519        if let Some(public_host) = options.public_host {
520            self.public_host = public_host;
521        }
522        if let Some(host) = options.host {
523            self.host = host;
524        }
525        if let Some(public_port) = options.public_port {
526            self.public_port = public_port.map(|port| port as i32);
527        }
528        if let Some(port) = options.port {
529            self.port = port as i32;
530        }
531        if let Some(username) = options.username {
532            self.username = username;
533        }
534        if let Some(password) = password {
535            self.password = password;
536        }
537
538        transaction.commit().await?;
539
540        Ok(())
541    }
542}
543
544#[async_trait::async_trait]
545impl DeletableModel for DatabaseHost {
546    type DeleteOptions = ();
547
548    fn get_delete_handlers() -> &'static LazyLock<DeleteListenerList<Self>> {
549        static DELETE_LISTENERS: LazyLock<DeleteListenerList<DatabaseHost>> =
550            LazyLock::new(|| Arc::new(ModelHandlerList::default()));
551
552        &DELETE_LISTENERS
553    }
554
555    async fn delete(
556        &self,
557        state: &crate::State,
558        options: Self::DeleteOptions,
559    ) -> Result<(), anyhow::Error> {
560        let mut transaction = state.database.write().begin().await?;
561
562        self.run_delete_handlers(&options, state, &mut transaction)
563            .await?;
564
565        sqlx::query(
566            r#"
567            DELETE FROM database_hosts
568            WHERE database_hosts.uuid = $1
569            "#,
570        )
571        .bind(self.uuid)
572        .execute(&mut *transaction)
573        .await?;
574
575        transaction.commit().await?;
576
577        Ok(())
578    }
579}
580
581#[derive(ToSchema, Serialize)]
582#[schema(title = "AdminDatabaseHost")]
583pub struct AdminApiDatabaseHost {
584    pub uuid: uuid::Uuid,
585
586    pub name: compact_str::CompactString,
587    pub deployment_enabled: bool,
588    pub maintenance_enabled: bool,
589    pub r#type: DatabaseType,
590
591    pub public_host: Option<compact_str::CompactString>,
592    pub host: compact_str::CompactString,
593    pub public_port: Option<i32>,
594    pub port: i32,
595
596    pub username: compact_str::CompactString,
597
598    pub created: chrono::DateTime<chrono::Utc>,
599}
600
601#[derive(ToSchema, Serialize)]
602#[schema(title = "DatabaseHost")]
603pub struct ApiDatabaseHost {
604    pub uuid: uuid::Uuid,
605
606    pub name: compact_str::CompactString,
607    pub maintenance_enabled: bool,
608    pub r#type: DatabaseType,
609
610    pub host: compact_str::CompactString,
611    pub port: i32,
612}