Skip to main content

shared/models/
node_allocation.rs

1use crate::prelude::*;
2use compact_str::ToCompactString;
3use serde::{Deserialize, Serialize};
4use sqlx::{Row, postgres::PgRow};
5use std::{
6    collections::{BTreeMap, HashMap},
7    sync::LazyLock,
8};
9#[derive(Serialize, Deserialize, Clone)]
10pub struct NodeAllocation {
11    pub uuid: uuid::Uuid,
12    pub server: Option<Fetchable<super::server::Server>>,
13
14    pub ip: sqlx::types::ipnetwork::IpNetwork,
15    pub ip_alias: Option<compact_str::CompactString>,
16    pub port: i32,
17
18    pub created: chrono::NaiveDateTime,
19
20    extension_data: super::ModelExtensionData,
21}
22
23impl BaseModel for NodeAllocation {
24    const NAME: &'static str = "node_allocation";
25
26    fn get_extension_list() -> &'static super::ModelExtensionList {
27        static EXTENSIONS: LazyLock<super::ModelExtensionList> =
28            LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
29
30        &EXTENSIONS
31    }
32
33    fn get_extension_data(&self) -> &super::ModelExtensionData {
34        &self.extension_data
35    }
36
37    #[inline]
38    fn base_columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
39        let prefix = prefix.unwrap_or_default();
40
41        BTreeMap::from([
42            (
43                "node_allocations.uuid",
44                compact_str::format_compact!("{prefix}uuid"),
45            ),
46            (
47                "node_allocations.ip",
48                compact_str::format_compact!("{prefix}ip"),
49            ),
50            (
51                "node_allocations.ip_alias",
52                compact_str::format_compact!("{prefix}ip_alias"),
53            ),
54            (
55                "node_allocations.port",
56                compact_str::format_compact!("{prefix}port"),
57            ),
58            (
59                "node_allocations.created",
60                compact_str::format_compact!("{prefix}created"),
61            ),
62        ])
63    }
64
65    #[inline]
66    fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError> {
67        let prefix = prefix.unwrap_or_default();
68
69        Ok(Self {
70            uuid: row.try_get(compact_str::format_compact!("{prefix}uuid").as_str())?,
71            server: if let Ok(server_uuid) = row.try_get::<uuid::Uuid, _>("server_uuid") {
72                Some(super::server::Server::get_fetchable(server_uuid))
73            } else {
74                None
75            },
76            ip: row.try_get(compact_str::format_compact!("{prefix}ip").as_str())?,
77            ip_alias: row.try_get(compact_str::format_compact!("{prefix}ip_alias").as_str())?,
78            port: row.try_get(compact_str::format_compact!("{prefix}port").as_str())?,
79            created: row.try_get(compact_str::format_compact!("{prefix}created").as_str())?,
80            extension_data: Self::map_extensions(prefix, row)?,
81        })
82    }
83}
84
85impl NodeAllocation {
86    pub async fn create(
87        database: &crate::database::Database,
88        node_uuid: uuid::Uuid,
89        ip: &sqlx::types::ipnetwork::IpNetwork,
90        ip_alias: Option<&str>,
91        port: i32,
92    ) -> Result<(), crate::database::DatabaseError> {
93        sqlx::query(
94            r#"
95            INSERT INTO node_allocations (node_uuid, ip, ip_alias, port)
96            VALUES ($1, $2, $3, $4)
97            "#,
98        )
99        .bind(node_uuid)
100        .bind(ip)
101        .bind(ip_alias)
102        .bind(port)
103        .execute(database.write())
104        .await?;
105
106        Ok(())
107    }
108
109    pub async fn get_random(
110        database: &crate::database::Database,
111        node_uuid: uuid::Uuid,
112        start_port: u16,
113        end_port: u16,
114        amount: i64,
115    ) -> Result<Vec<uuid::Uuid>, crate::database::DatabaseError> {
116        let rows = sqlx::query(
117            r#"
118            WITH eligible_ips AS (
119                SELECT node_allocations.ip
120                FROM node_allocations
121                LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
122                WHERE
123                    node_allocations.node_uuid = $1
124                    AND node_allocations.port BETWEEN $2 AND $3
125                    AND server_allocations.uuid IS NULL
126                GROUP BY node_allocations.ip
127                HAVING COUNT(*) >= $4
128            ),
129            random_ip AS (
130                SELECT ip FROM eligible_ips ORDER BY RANDOM() LIMIT 1
131            )
132            SELECT node_allocations.uuid
133            FROM node_allocations
134            LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
135            WHERE
136                node_allocations.node_uuid = $1
137                AND node_allocations.port BETWEEN $2 AND $3
138                AND server_allocations.uuid IS NULL
139                AND node_allocations.ip = (SELECT ip FROM random_ip)
140            ORDER BY RANDOM()
141            LIMIT $4
142            "#,
143        )
144        .bind(node_uuid)
145        .bind(start_port as i32)
146        .bind(end_port as i32)
147        .bind(amount)
148        .fetch_all(database.write())
149        .await?;
150
151        if rows.len() != amount as usize {
152            return Err(anyhow::anyhow!("only found {} available allocations", rows.len()).into());
153        }
154
155        Ok(rows
156            .into_iter()
157            .map(|row| row.get::<uuid::Uuid, _>("uuid"))
158            .collect())
159    }
160
161    pub async fn get_random_ip(
162        database: &crate::database::Database,
163        node_uuid: uuid::Uuid,
164        ip: &sqlx::types::ipnetwork::IpNetwork,
165        start_port: u16,
166        end_port: u16,
167        amount: i64,
168    ) -> Result<Vec<uuid::Uuid>, crate::database::DatabaseError> {
169        let rows = sqlx::query(
170            r#"
171            SELECT node_allocations.uuid
172            FROM node_allocations
173            LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
174            WHERE
175                node_allocations.node_uuid = $1
176                AND node_allocations.ip = $2
177                AND node_allocations.port BETWEEN $3 AND $4
178                AND server_allocations.uuid IS NULL
179            ORDER BY RANDOM()
180            LIMIT $5
181            "#,
182        )
183        .bind(node_uuid)
184        .bind(ip)
185        .bind(start_port as i32)
186        .bind(end_port as i32)
187        .bind(amount)
188        .fetch_all(database.write())
189        .await?;
190
191        if rows.len() != amount as usize {
192            return Err(anyhow::anyhow!(
193                "only found {} available allocations on this IP",
194                rows.len()
195            )
196            .into());
197        }
198
199        Ok(rows
200            .into_iter()
201            .map(|row| row.get::<uuid::Uuid, _>("uuid"))
202            .collect())
203    }
204
205    pub async fn get_random_dedicated(
206        database: &crate::database::Database,
207        node_uuid: uuid::Uuid,
208        start_port: u16,
209        end_port: u16,
210        amount: i64,
211    ) -> Result<Vec<uuid::Uuid>, crate::database::DatabaseError> {
212        let rows = sqlx::query(
213            r#"
214            WITH eligible_ips AS (
215                SELECT node_allocations.ip
216                FROM node_allocations
217                LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
218                WHERE node_allocations.node_uuid = $1
219                GROUP BY node_allocations.ip
220                HAVING 
221                    COUNT(server_allocations.uuid) = 0
222                    AND SUM(CASE WHEN node_allocations.port BETWEEN $2 AND $3 THEN 1 ELSE 0 END) >= $4
223            ),
224            random_ip AS (
225                SELECT ip FROM eligible_ips ORDER BY RANDOM() LIMIT 1
226            )
227            SELECT node_allocations.uuid
228            FROM node_allocations
229            WHERE
230                node_allocations.node_uuid = $1
231                AND node_allocations.port BETWEEN $2 AND $3
232                AND node_allocations.ip = (SELECT ip FROM random_ip)
233            ORDER BY RANDOM()
234            LIMIT $4
235            "#,
236        )
237        .bind(node_uuid)
238        .bind(start_port as i32)
239        .bind(end_port as i32)
240        .bind(amount)
241        .fetch_all(database.write())
242        .await?;
243
244        if rows.len() != amount as usize {
245            return Err(anyhow::anyhow!(
246                "only found {} available dedicated allocations",
247                rows.len()
248            )
249            .into());
250        }
251
252        Ok(rows
253            .into_iter()
254            .map(|row| row.get::<uuid::Uuid, _>("uuid"))
255            .collect())
256    }
257
258    pub async fn by_node_uuid_ip_port_unused(
259        database: &crate::database::Database,
260        node_uuid: uuid::Uuid,
261        ip: &sqlx::types::ipnetwork::IpNetwork,
262        port: i32,
263    ) -> Result<Option<Self>, crate::database::DatabaseError> {
264        let row = sqlx::query(&format!(
265            r#"
266            SELECT {}
267            FROM node_allocations
268            LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
269            WHERE node_allocations.node_uuid = $1 AND node_allocations.ip = $2 AND node_allocations.port = $3 AND server_allocations.uuid IS NULL
270            "#,
271            Self::columns_sql(None)
272        ))
273        .bind(node_uuid)
274        .bind(ip)
275        .bind(port)
276        .fetch_optional(database.read())
277        .await?;
278
279        row.try_map(|row| Self::map(None, &row))
280    }
281
282    pub async fn get_from_deployment<'a>(
283        database: &crate::database::Database,
284        deployment: &'a super::egg_configuration::EggConfigAllocationsDeployment,
285        node_uuid: uuid::Uuid,
286        variables: &mut HashMap<&'a str, compact_str::CompactString>,
287    ) -> Result<(Option<uuid::Uuid>, Vec<uuid::Uuid>), crate::database::DatabaseError> {
288        let mut primary = None;
289        let mut additional = Vec::new();
290
291        const MAX_ITER: usize = 100;
292
293        macro_rules! is_unused {
294            ($uuid:expr) => {
295                primary.as_ref().map_or(true, |p| p.uuid != $uuid) && !additional.contains(&$uuid)
296            };
297        }
298
299        macro_rules! get_random {
300            ($start_port:expr, $end_port:expr) => {
301                if let Some(primary) = &primary {
302                    Self::get_random_ip(database, node_uuid, &primary.ip, $start_port, $end_port, 1)
303                        .await?
304                } else {
305                    Self::get_random(database, node_uuid, $start_port, $end_port, 1).await?
306                }
307            };
308        }
309
310        'primary: for i in 0..MAX_ITER {
311            if i != 0 && deployment.primary.is_none() {
312                break;
313            }
314
315            additional.clear();
316            variables.clear();
317
318            if let Some(primary_allocation) = &deployment.primary {
319                let random = if deployment.dedicated {
320                    Self::get_random_dedicated(
321                        database,
322                        node_uuid,
323                        primary_allocation.start_port,
324                        primary_allocation.end_port,
325                        1,
326                    )
327                    .await?
328                } else {
329                    Self::get_random(
330                        database,
331                        node_uuid,
332                        primary_allocation.start_port,
333                        primary_allocation.end_port,
334                        1,
335                    )
336                    .await?
337                };
338
339                let Some(allocation) = random.into_iter().next() else {
340                    return Err(anyhow::anyhow!("no available primary allocation found").into());
341                };
342                let allocation =
343                    match Self::by_node_uuid_uuid(database, node_uuid, allocation).await? {
344                        Some(allocation) => allocation,
345                        None => {
346                            return Err(
347                                anyhow::anyhow!("allocated primary allocation not found").into()
348                            );
349                        }
350                    };
351
352                if let Some(variable_name) = &primary_allocation.assign_to_variable {
353                    variables.insert(variable_name, allocation.port.to_compact_string());
354                }
355
356                primary = Some(allocation);
357            }
358
359            for additional_allocation in &deployment.additional {
360                match additional_allocation.mode {
361                    super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::Random => {
362                        let mut found_allocation = None;
363                        for _ in 0..MAX_ITER {
364                            let random = get_random!(
365                                1,
366                                u16::MAX
367                            );
368
369                            if let Some(allocation) = random.into_iter().next() {
370                                if is_unused!(allocation) {
371                                    found_allocation = Some(allocation);
372                                    break;
373                                }
374                            } else {
375                                return Err(anyhow::anyhow!("no available additional allocation found").into());
376                            }
377                        }
378
379                        let Some(allocation) = found_allocation else {
380                            return Err(anyhow::anyhow!("no available additional allocation found").into());
381                        };
382                        additional.push(allocation);
383
384                        if let Some(variable_name) = &additional_allocation.assign_to_variable {
385                            let allocation = match Self::by_node_uuid_uuid(database, node_uuid, allocation).await? {
386                                Some(allocation) => allocation,
387                                None => {
388                                    return Err(
389                                        anyhow::anyhow!("allocated additional allocation not found").into()
390                                    );
391                                }
392                            };
393
394                            variables.insert(variable_name, allocation.port.to_compact_string());
395                        }
396                    },
397                    super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::Range {
398                        start_port,
399                        end_port,
400                    } => {
401                        let mut found_allocation = None;
402                        for _ in 0..MAX_ITER {
403                            let random = get_random!(
404                                start_port,
405                                end_port
406                            );
407
408                            if let Some(allocation) = random.into_iter().next() {
409                                if is_unused!(allocation) {
410                                    found_allocation = Some(allocation);
411                                    break;
412                                }
413                            } else {
414                                return Err(anyhow::anyhow!("no available additional allocation found").into());
415                            }
416                        }
417
418                        let Some(allocation) = found_allocation else {
419                            return Err(anyhow::anyhow!("no available additional allocation found").into());
420                        };
421                        additional.push(allocation);
422
423                        if let Some(variable_name) = &additional_allocation.assign_to_variable {
424                            let allocation = match Self::by_node_uuid_uuid(database, node_uuid, allocation).await? {
425                                Some(allocation) => allocation,
426                                None => {
427                                    return Err(
428                                        anyhow::anyhow!("allocated additional allocation not found").into()
429                                    );
430                                }
431                            };
432
433                            variables.insert(variable_name, allocation.port.to_compact_string());
434                        }
435                    }
436                    super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::AddPrimary { value } => {
437                        let primary = match &primary {
438                            Some(primary) => primary,
439                            None => {
440                                return Err(anyhow::anyhow!("primary allocation is required for `add_primary` mode").into());
441                            }
442                        };
443
444                        let allocation_port = primary.port + value as i32;
445
446                        let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
447                            Some(allocation) => allocation,
448                            None => continue 'primary,
449                        };
450                        if !is_unused!(allocation.uuid) {
451                            return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
452                        }
453                        additional.push(allocation.uuid);
454
455                        if let Some(variable_name) = &additional_allocation.assign_to_variable {
456                            variables.insert(variable_name, allocation.port.to_compact_string());
457                        }
458                    }
459                    super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::SubtractPrimary { value } => {
460                        let primary = match &primary {
461                            Some(primary) => primary,
462                            None => {
463                                return Err(anyhow::anyhow!("primary allocation is required for `subtract_primary` mode").into());
464                            }
465                        };
466
467                        let allocation_port = primary.port - value as i32;
468
469                        let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
470                            Some(allocation) => allocation,
471                            None => continue 'primary,
472                        };
473                        if !is_unused!(allocation.uuid) {
474                            return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
475                        }
476                        additional.push(allocation.uuid);
477
478                        if let Some(variable_name) = &additional_allocation.assign_to_variable {
479                            variables.insert(variable_name, allocation.port.to_compact_string());
480                        }
481                    }
482                    super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::MultiplyPrimary { value } => {
483                        let primary = match &primary {
484                            Some(primary) => primary,
485                            None => {
486                                return Err(anyhow::anyhow!("primary allocation is required for `multiply_primary` mode").into());
487                            }
488                        };
489
490                        let allocation_port = (primary.port as f64 * value) as i32;
491
492                        let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
493                            Some(allocation) => allocation,
494                            None => continue 'primary,
495                        };
496                        if !is_unused!(allocation.uuid) {
497                            return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
498                        }
499                        additional.push(allocation.uuid);
500
501                        if let Some(variable_name) = &additional_allocation.assign_to_variable {
502                            variables.insert(variable_name, allocation.port.to_compact_string());
503                        }
504                    }
505                    super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::DividePrimary { value } => {
506                        let primary = match &primary {
507                            Some(primary) => primary,
508                            None => {
509                                return Err(anyhow::anyhow!("primary allocation is required for `divide_primary` mode").into());
510                            }
511                        };
512
513                        let allocation_port = (primary.port as f64 / value) as i32;
514
515                        let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
516                            Some(allocation) => allocation,
517                            None => continue 'primary,
518                        };
519                        if !is_unused!(allocation.uuid) {
520                            return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
521                        }
522                        additional.push(allocation.uuid);
523
524                        if let Some(variable_name) = &additional_allocation.assign_to_variable {
525                            variables.insert(variable_name, allocation.port.to_compact_string());
526                        }
527                    }
528                }
529            }
530
531            break;
532        }
533
534        Ok((primary.map(|p| p.uuid), additional))
535    }
536
537    pub async fn by_node_uuid_uuid(
538        database: &crate::database::Database,
539        node_uuid: uuid::Uuid,
540        uuid: uuid::Uuid,
541    ) -> Result<Option<Self>, crate::database::DatabaseError> {
542        let row = sqlx::query(&format!(
543            r#"
544            SELECT {}
545            FROM node_allocations
546            WHERE node_allocations.node_uuid = $1 AND node_allocations.uuid = $2
547            "#,
548            Self::columns_sql(None)
549        ))
550        .bind(node_uuid)
551        .bind(uuid)
552        .fetch_optional(database.read())
553        .await?;
554
555        row.try_map(|row| Self::map(None, &row))
556    }
557
558    pub async fn available_by_node_uuid_with_pagination(
559        database: &crate::database::Database,
560        node_uuid: uuid::Uuid,
561        page: i64,
562        per_page: i64,
563        search: Option<&str>,
564    ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
565        let offset = (page - 1) * per_page;
566
567        let rows = sqlx::query(&format!(
568            r#"
569            SELECT {}, COUNT(*) OVER() AS total_count
570            FROM node_allocations
571            LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
572            WHERE
573                ($2 IS NULL OR host(node_allocations.ip) || ':' || node_allocations.port ILIKE '%' || $2 || '%')
574                AND (node_allocations.node_uuid = $1 AND server_allocations.uuid IS NULL)
575            ORDER BY node_allocations.ip, node_allocations.port
576            LIMIT $3 OFFSET $4
577            "#,
578            Self::columns_sql(None)
579        ))
580        .bind(node_uuid)
581        .bind(search)
582        .bind(per_page)
583        .bind(offset)
584        .fetch_all(database.read())
585        .await?;
586
587        Ok(super::Pagination {
588            total: rows
589                .first()
590                .map_or(Ok(0), |row| row.try_get("total_count"))?,
591            per_page,
592            page,
593            data: rows
594                .into_iter()
595                .map(|row| Self::map(None, &row))
596                .try_collect_vec()?,
597        })
598    }
599
600    pub async fn by_node_uuid_with_pagination(
601        database: &crate::database::Database,
602        node_uuid: uuid::Uuid,
603        page: i64,
604        per_page: i64,
605        search: Option<&str>,
606    ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
607        let offset = (page - 1) * per_page;
608
609        let rows = sqlx::query(&format!(
610            r#"
611            SELECT {}, server_allocations.server_uuid, COUNT(*) OVER() AS total_count
612            FROM node_allocations
613            LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
614            WHERE node_allocations.node_uuid = $1 AND ($2 IS NULL OR host(node_allocations.ip) || ':' || node_allocations.port ILIKE '%' || $2 || '%')
615            ORDER BY node_allocations.ip, node_allocations.port
616            LIMIT $3 OFFSET $4
617            "#,
618            Self::columns_sql(None)
619        ))
620        .bind(node_uuid)
621        .bind(search)
622        .bind(per_page)
623        .bind(offset)
624        .fetch_all(database.read())
625        .await?;
626
627        Ok(super::Pagination {
628            total: rows
629                .first()
630                .map_or(Ok(0), |row| row.try_get("total_count"))?,
631            per_page,
632            page,
633            data: rows
634                .into_iter()
635                .map(|row| Self::map(None, &row))
636                .try_collect_vec()?,
637        })
638    }
639
640    pub async fn delete_by_uuids(
641        database: &crate::database::Database,
642        node_uuid: uuid::Uuid,
643        uuids: &[uuid::Uuid],
644    ) -> Result<u64, crate::database::DatabaseError> {
645        let deleted = sqlx::query(
646            r#"
647            DELETE FROM node_allocations
648            WHERE node_allocations.node_uuid = $1 AND node_allocations.uuid = ANY($2)
649            "#,
650        )
651        .bind(node_uuid)
652        .bind(uuids)
653        .execute(database.write())
654        .await?
655        .rows_affected();
656
657        Ok(deleted)
658    }
659}
660
661#[async_trait::async_trait]
662impl IntoAdminApiObject for NodeAllocation {
663    type AdminApiObject = AdminApiNodeAllocation;
664    type ExtraArgs<'a> = &'a crate::storage::StorageUrlRetriever<'a>;
665
666    async fn into_admin_api_object<'a>(
667        self,
668        state: &crate::State,
669        storage_url_retriever: Self::ExtraArgs<'a>,
670    ) -> Result<Self::AdminApiObject, crate::database::DatabaseError> {
671        let api_object = AdminApiNodeAllocation::init_hooks(&self, state).await?;
672
673        let server = match self.server {
674            Some(fetchable) => Some(
675                fetchable
676                    .fetch_cached(&state.database)
677                    .await?
678                    .into_admin_api_object(state, storage_url_retriever)
679                    .await?,
680            ),
681            None => None,
682        };
683
684        let api_object = finish_extendible!(
685            AdminApiNodeAllocation {
686                uuid: self.uuid,
687                server,
688                ip: self.ip.ip().to_compact_string(),
689                ip_alias: self.ip_alias,
690                port: self.port,
691                created: self.created.and_utc(),
692            },
693            api_object,
694            state
695        )?;
696
697        Ok(api_object)
698    }
699}
700
701#[schema_extension_derive::extendible]
702#[init_args(NodeAllocation, crate::State)]
703#[hook_args(crate::State)]
704#[derive(ToSchema, Serialize)]
705#[schema(title = "NodeAllocation")]
706pub struct AdminApiNodeAllocation {
707    pub uuid: uuid::Uuid,
708    pub server: Option<super::server::AdminApiServer>,
709
710    pub ip: compact_str::CompactString,
711    pub ip_alias: Option<compact_str::CompactString>,
712    pub port: i32,
713
714    pub created: chrono::DateTime<chrono::Utc>,
715}