1use crate::prelude::*;
2use compact_str::ToCompactString;
3use serde::{Deserialize, Serialize};
4use sqlx::{Row, postgres::PgRow};
5use std::{
6 collections::{BTreeMap, HashMap},
7 sync::LazyLock,
8};
9#[derive(Serialize, Deserialize, Clone)]
10pub struct NodeAllocation {
11 pub uuid: uuid::Uuid,
12 pub server: Option<Fetchable<super::server::Server>>,
13
14 pub ip: sqlx::types::ipnetwork::IpNetwork,
15 pub ip_alias: Option<compact_str::CompactString>,
16 pub port: i32,
17
18 pub created: chrono::NaiveDateTime,
19
20 extension_data: super::ModelExtensionData,
21}
22
23impl BaseModel for NodeAllocation {
24 const NAME: &'static str = "node_allocation";
25
26 fn get_extension_list() -> &'static super::ModelExtensionList {
27 static EXTENSIONS: LazyLock<super::ModelExtensionList> =
28 LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
29
30 &EXTENSIONS
31 }
32
33 fn get_extension_data(&self) -> &super::ModelExtensionData {
34 &self.extension_data
35 }
36
37 #[inline]
38 fn base_columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
39 let prefix = prefix.unwrap_or_default();
40
41 BTreeMap::from([
42 (
43 "node_allocations.uuid",
44 compact_str::format_compact!("{prefix}uuid"),
45 ),
46 (
47 "node_allocations.ip",
48 compact_str::format_compact!("{prefix}ip"),
49 ),
50 (
51 "node_allocations.ip_alias",
52 compact_str::format_compact!("{prefix}ip_alias"),
53 ),
54 (
55 "node_allocations.port",
56 compact_str::format_compact!("{prefix}port"),
57 ),
58 (
59 "node_allocations.created",
60 compact_str::format_compact!("{prefix}created"),
61 ),
62 ])
63 }
64
65 #[inline]
66 fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError> {
67 let prefix = prefix.unwrap_or_default();
68
69 Ok(Self {
70 uuid: row.try_get(compact_str::format_compact!("{prefix}uuid").as_str())?,
71 server: if let Ok(server_uuid) = row.try_get::<uuid::Uuid, _>("server_uuid") {
72 Some(super::server::Server::get_fetchable(server_uuid))
73 } else {
74 None
75 },
76 ip: row.try_get(compact_str::format_compact!("{prefix}ip").as_str())?,
77 ip_alias: row.try_get(compact_str::format_compact!("{prefix}ip_alias").as_str())?,
78 port: row.try_get(compact_str::format_compact!("{prefix}port").as_str())?,
79 created: row.try_get(compact_str::format_compact!("{prefix}created").as_str())?,
80 extension_data: Self::map_extensions(prefix, row)?,
81 })
82 }
83}
84
85impl NodeAllocation {
86 pub async fn create(
87 database: &crate::database::Database,
88 node_uuid: uuid::Uuid,
89 ip: &sqlx::types::ipnetwork::IpNetwork,
90 ip_alias: Option<&str>,
91 port: i32,
92 ) -> Result<(), crate::database::DatabaseError> {
93 sqlx::query(
94 r#"
95 INSERT INTO node_allocations (node_uuid, ip, ip_alias, port)
96 VALUES ($1, $2, $3, $4)
97 "#,
98 )
99 .bind(node_uuid)
100 .bind(ip)
101 .bind(ip_alias)
102 .bind(port)
103 .execute(database.write())
104 .await?;
105
106 Ok(())
107 }
108
109 pub async fn get_random(
110 database: &crate::database::Database,
111 node_uuid: uuid::Uuid,
112 start_port: u16,
113 end_port: u16,
114 amount: i64,
115 ) -> Result<Vec<uuid::Uuid>, crate::database::DatabaseError> {
116 let rows = sqlx::query(
117 r#"
118 WITH eligible_ips AS (
119 SELECT node_allocations.ip
120 FROM node_allocations
121 LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
122 WHERE
123 node_allocations.node_uuid = $1
124 AND node_allocations.port BETWEEN $2 AND $3
125 AND server_allocations.uuid IS NULL
126 GROUP BY node_allocations.ip
127 HAVING COUNT(*) >= $4
128 ),
129 random_ip AS (
130 SELECT ip FROM eligible_ips ORDER BY RANDOM() LIMIT 1
131 )
132 SELECT node_allocations.uuid
133 FROM node_allocations
134 LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
135 WHERE
136 node_allocations.node_uuid = $1
137 AND node_allocations.port BETWEEN $2 AND $3
138 AND server_allocations.uuid IS NULL
139 AND node_allocations.ip = (SELECT ip FROM random_ip)
140 ORDER BY RANDOM()
141 LIMIT $4
142 "#,
143 )
144 .bind(node_uuid)
145 .bind(start_port as i32)
146 .bind(end_port as i32)
147 .bind(amount)
148 .fetch_all(database.write())
149 .await?;
150
151 if rows.len() != amount as usize {
152 return Err(anyhow::anyhow!("only found {} available allocations", rows.len()).into());
153 }
154
155 Ok(rows
156 .into_iter()
157 .map(|row| row.get::<uuid::Uuid, _>("uuid"))
158 .collect())
159 }
160
161 pub async fn get_random_ip(
162 database: &crate::database::Database,
163 node_uuid: uuid::Uuid,
164 ip: &sqlx::types::ipnetwork::IpNetwork,
165 start_port: u16,
166 end_port: u16,
167 amount: i64,
168 ) -> Result<Vec<uuid::Uuid>, crate::database::DatabaseError> {
169 let rows = sqlx::query(
170 r#"
171 SELECT node_allocations.uuid
172 FROM node_allocations
173 LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
174 WHERE
175 node_allocations.node_uuid = $1
176 AND node_allocations.ip = $2
177 AND node_allocations.port BETWEEN $3 AND $4
178 AND server_allocations.uuid IS NULL
179 ORDER BY RANDOM()
180 LIMIT $5
181 "#,
182 )
183 .bind(node_uuid)
184 .bind(ip)
185 .bind(start_port as i32)
186 .bind(end_port as i32)
187 .bind(amount)
188 .fetch_all(database.write())
189 .await?;
190
191 if rows.len() != amount as usize {
192 return Err(anyhow::anyhow!(
193 "only found {} available allocations on this IP",
194 rows.len()
195 )
196 .into());
197 }
198
199 Ok(rows
200 .into_iter()
201 .map(|row| row.get::<uuid::Uuid, _>("uuid"))
202 .collect())
203 }
204
205 pub async fn get_random_dedicated(
206 database: &crate::database::Database,
207 node_uuid: uuid::Uuid,
208 start_port: u16,
209 end_port: u16,
210 amount: i64,
211 ) -> Result<Vec<uuid::Uuid>, crate::database::DatabaseError> {
212 let rows = sqlx::query(
213 r#"
214 WITH eligible_ips AS (
215 SELECT node_allocations.ip
216 FROM node_allocations
217 LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
218 WHERE node_allocations.node_uuid = $1
219 GROUP BY node_allocations.ip
220 HAVING
221 COUNT(server_allocations.uuid) = 0
222 AND SUM(CASE WHEN node_allocations.port BETWEEN $2 AND $3 THEN 1 ELSE 0 END) >= $4
223 ),
224 random_ip AS (
225 SELECT ip FROM eligible_ips ORDER BY RANDOM() LIMIT 1
226 )
227 SELECT node_allocations.uuid
228 FROM node_allocations
229 WHERE
230 node_allocations.node_uuid = $1
231 AND node_allocations.port BETWEEN $2 AND $3
232 AND node_allocations.ip = (SELECT ip FROM random_ip)
233 ORDER BY RANDOM()
234 LIMIT $4
235 "#,
236 )
237 .bind(node_uuid)
238 .bind(start_port as i32)
239 .bind(end_port as i32)
240 .bind(amount)
241 .fetch_all(database.write())
242 .await?;
243
244 if rows.len() != amount as usize {
245 return Err(anyhow::anyhow!(
246 "only found {} available dedicated allocations",
247 rows.len()
248 )
249 .into());
250 }
251
252 Ok(rows
253 .into_iter()
254 .map(|row| row.get::<uuid::Uuid, _>("uuid"))
255 .collect())
256 }
257
258 pub async fn by_node_uuid_ip_port_unused(
259 database: &crate::database::Database,
260 node_uuid: uuid::Uuid,
261 ip: &sqlx::types::ipnetwork::IpNetwork,
262 port: i32,
263 ) -> Result<Option<Self>, crate::database::DatabaseError> {
264 let row = sqlx::query(&format!(
265 r#"
266 SELECT {}
267 FROM node_allocations
268 LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
269 WHERE node_allocations.node_uuid = $1 AND node_allocations.ip = $2 AND node_allocations.port = $3 AND server_allocations.uuid IS NULL
270 "#,
271 Self::columns_sql(None)
272 ))
273 .bind(node_uuid)
274 .bind(ip)
275 .bind(port)
276 .fetch_optional(database.read())
277 .await?;
278
279 row.try_map(|row| Self::map(None, &row))
280 }
281
282 pub async fn get_from_deployment<'a>(
283 database: &crate::database::Database,
284 deployment: &'a super::egg_configuration::EggConfigAllocationsDeployment,
285 node_uuid: uuid::Uuid,
286 variables: &mut HashMap<&'a str, compact_str::CompactString>,
287 ) -> Result<(Option<uuid::Uuid>, Vec<uuid::Uuid>), crate::database::DatabaseError> {
288 let mut primary = None;
289 let mut additional = Vec::new();
290
291 const MAX_ITER: usize = 100;
292
293 macro_rules! is_unused {
294 ($uuid:expr) => {
295 primary.as_ref().map_or(true, |p| p.uuid != $uuid) && !additional.contains(&$uuid)
296 };
297 }
298
299 macro_rules! get_random {
300 ($start_port:expr, $end_port:expr) => {
301 if let Some(primary) = &primary {
302 Self::get_random_ip(database, node_uuid, &primary.ip, $start_port, $end_port, 1)
303 .await?
304 } else {
305 Self::get_random(database, node_uuid, $start_port, $end_port, 1).await?
306 }
307 };
308 }
309
310 'primary: for i in 0..MAX_ITER {
311 if i != 0 && deployment.primary.is_none() {
312 break;
313 }
314
315 additional.clear();
316 variables.clear();
317
318 if let Some(primary_allocation) = &deployment.primary {
319 let random = if deployment.dedicated {
320 Self::get_random_dedicated(
321 database,
322 node_uuid,
323 primary_allocation.start_port,
324 primary_allocation.end_port,
325 1,
326 )
327 .await?
328 } else {
329 Self::get_random(
330 database,
331 node_uuid,
332 primary_allocation.start_port,
333 primary_allocation.end_port,
334 1,
335 )
336 .await?
337 };
338
339 let Some(allocation) = random.into_iter().next() else {
340 return Err(anyhow::anyhow!("no available primary allocation found").into());
341 };
342 let allocation =
343 match Self::by_node_uuid_uuid(database, node_uuid, allocation).await? {
344 Some(allocation) => allocation,
345 None => {
346 return Err(
347 anyhow::anyhow!("allocated primary allocation not found").into()
348 );
349 }
350 };
351
352 if let Some(variable_name) = &primary_allocation.assign_to_variable {
353 variables.insert(variable_name, allocation.port.to_compact_string());
354 }
355
356 primary = Some(allocation);
357 }
358
359 for additional_allocation in &deployment.additional {
360 match additional_allocation.mode {
361 super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::Random => {
362 let mut found_allocation = None;
363 for _ in 0..MAX_ITER {
364 let random = get_random!(
365 1,
366 u16::MAX
367 );
368
369 if let Some(allocation) = random.into_iter().next() {
370 if is_unused!(allocation) {
371 found_allocation = Some(allocation);
372 break;
373 }
374 } else {
375 return Err(anyhow::anyhow!("no available additional allocation found").into());
376 }
377 }
378
379 let Some(allocation) = found_allocation else {
380 return Err(anyhow::anyhow!("no available additional allocation found").into());
381 };
382 additional.push(allocation);
383
384 if let Some(variable_name) = &additional_allocation.assign_to_variable {
385 let allocation = match Self::by_node_uuid_uuid(database, node_uuid, allocation).await? {
386 Some(allocation) => allocation,
387 None => {
388 return Err(
389 anyhow::anyhow!("allocated additional allocation not found").into()
390 );
391 }
392 };
393
394 variables.insert(variable_name, allocation.port.to_compact_string());
395 }
396 },
397 super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::Range {
398 start_port,
399 end_port,
400 } => {
401 let mut found_allocation = None;
402 for _ in 0..MAX_ITER {
403 let random = get_random!(
404 start_port,
405 end_port
406 );
407
408 if let Some(allocation) = random.into_iter().next() {
409 if is_unused!(allocation) {
410 found_allocation = Some(allocation);
411 break;
412 }
413 } else {
414 return Err(anyhow::anyhow!("no available additional allocation found").into());
415 }
416 }
417
418 let Some(allocation) = found_allocation else {
419 return Err(anyhow::anyhow!("no available additional allocation found").into());
420 };
421 additional.push(allocation);
422
423 if let Some(variable_name) = &additional_allocation.assign_to_variable {
424 let allocation = match Self::by_node_uuid_uuid(database, node_uuid, allocation).await? {
425 Some(allocation) => allocation,
426 None => {
427 return Err(
428 anyhow::anyhow!("allocated additional allocation not found").into()
429 );
430 }
431 };
432
433 variables.insert(variable_name, allocation.port.to_compact_string());
434 }
435 }
436 super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::AddPrimary { value } => {
437 let primary = match &primary {
438 Some(primary) => primary,
439 None => {
440 return Err(anyhow::anyhow!("primary allocation is required for `add_primary` mode").into());
441 }
442 };
443
444 let allocation_port = primary.port + value as i32;
445
446 let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
447 Some(allocation) => allocation,
448 None => continue 'primary,
449 };
450 if !is_unused!(allocation.uuid) {
451 return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
452 }
453 additional.push(allocation.uuid);
454
455 if let Some(variable_name) = &additional_allocation.assign_to_variable {
456 variables.insert(variable_name, allocation.port.to_compact_string());
457 }
458 }
459 super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::SubtractPrimary { value } => {
460 let primary = match &primary {
461 Some(primary) => primary,
462 None => {
463 return Err(anyhow::anyhow!("primary allocation is required for `subtract_primary` mode").into());
464 }
465 };
466
467 let allocation_port = primary.port - value as i32;
468
469 let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
470 Some(allocation) => allocation,
471 None => continue 'primary,
472 };
473 if !is_unused!(allocation.uuid) {
474 return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
475 }
476 additional.push(allocation.uuid);
477
478 if let Some(variable_name) = &additional_allocation.assign_to_variable {
479 variables.insert(variable_name, allocation.port.to_compact_string());
480 }
481 }
482 super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::MultiplyPrimary { value } => {
483 let primary = match &primary {
484 Some(primary) => primary,
485 None => {
486 return Err(anyhow::anyhow!("primary allocation is required for `multiply_primary` mode").into());
487 }
488 };
489
490 let allocation_port = (primary.port as f64 * value) as i32;
491
492 let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
493 Some(allocation) => allocation,
494 None => continue 'primary,
495 };
496 if !is_unused!(allocation.uuid) {
497 return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
498 }
499 additional.push(allocation.uuid);
500
501 if let Some(variable_name) = &additional_allocation.assign_to_variable {
502 variables.insert(variable_name, allocation.port.to_compact_string());
503 }
504 }
505 super::egg_configuration::EggConfigAllocationDeploymentAdditionalAllocationMode::DividePrimary { value } => {
506 let primary = match &primary {
507 Some(primary) => primary,
508 None => {
509 return Err(anyhow::anyhow!("primary allocation is required for `divide_primary` mode").into());
510 }
511 };
512
513 let allocation_port = (primary.port as f64 / value) as i32;
514
515 let allocation = match Self::by_node_uuid_ip_port_unused(database, node_uuid, &primary.ip, allocation_port).await? {
516 Some(allocation) => allocation,
517 None => continue 'primary,
518 };
519 if !is_unused!(allocation.uuid) {
520 return Err(anyhow::anyhow!("allocated additional allocation is already in use").into());
521 }
522 additional.push(allocation.uuid);
523
524 if let Some(variable_name) = &additional_allocation.assign_to_variable {
525 variables.insert(variable_name, allocation.port.to_compact_string());
526 }
527 }
528 }
529 }
530
531 break;
532 }
533
534 Ok((primary.map(|p| p.uuid), additional))
535 }
536
537 pub async fn by_node_uuid_uuid(
538 database: &crate::database::Database,
539 node_uuid: uuid::Uuid,
540 uuid: uuid::Uuid,
541 ) -> Result<Option<Self>, crate::database::DatabaseError> {
542 let row = sqlx::query(&format!(
543 r#"
544 SELECT {}
545 FROM node_allocations
546 WHERE node_allocations.node_uuid = $1 AND node_allocations.uuid = $2
547 "#,
548 Self::columns_sql(None)
549 ))
550 .bind(node_uuid)
551 .bind(uuid)
552 .fetch_optional(database.read())
553 .await?;
554
555 row.try_map(|row| Self::map(None, &row))
556 }
557
558 pub async fn available_by_node_uuid_with_pagination(
559 database: &crate::database::Database,
560 node_uuid: uuid::Uuid,
561 page: i64,
562 per_page: i64,
563 search: Option<&str>,
564 ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
565 let offset = (page - 1) * per_page;
566
567 let rows = sqlx::query(&format!(
568 r#"
569 SELECT {}, COUNT(*) OVER() AS total_count
570 FROM node_allocations
571 LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
572 WHERE
573 ($2 IS NULL OR host(node_allocations.ip) || ':' || node_allocations.port ILIKE '%' || $2 || '%')
574 AND (node_allocations.node_uuid = $1 AND server_allocations.uuid IS NULL)
575 ORDER BY node_allocations.ip, node_allocations.port
576 LIMIT $3 OFFSET $4
577 "#,
578 Self::columns_sql(None)
579 ))
580 .bind(node_uuid)
581 .bind(search)
582 .bind(per_page)
583 .bind(offset)
584 .fetch_all(database.read())
585 .await?;
586
587 Ok(super::Pagination {
588 total: rows
589 .first()
590 .map_or(Ok(0), |row| row.try_get("total_count"))?,
591 per_page,
592 page,
593 data: rows
594 .into_iter()
595 .map(|row| Self::map(None, &row))
596 .try_collect_vec()?,
597 })
598 }
599
600 pub async fn by_node_uuid_with_pagination(
601 database: &crate::database::Database,
602 node_uuid: uuid::Uuid,
603 page: i64,
604 per_page: i64,
605 search: Option<&str>,
606 ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
607 let offset = (page - 1) * per_page;
608
609 let rows = sqlx::query(&format!(
610 r#"
611 SELECT {}, server_allocations.server_uuid, COUNT(*) OVER() AS total_count
612 FROM node_allocations
613 LEFT JOIN server_allocations ON server_allocations.allocation_uuid = node_allocations.uuid
614 WHERE node_allocations.node_uuid = $1 AND ($2 IS NULL OR host(node_allocations.ip) || ':' || node_allocations.port ILIKE '%' || $2 || '%')
615 ORDER BY node_allocations.ip, node_allocations.port
616 LIMIT $3 OFFSET $4
617 "#,
618 Self::columns_sql(None)
619 ))
620 .bind(node_uuid)
621 .bind(search)
622 .bind(per_page)
623 .bind(offset)
624 .fetch_all(database.read())
625 .await?;
626
627 Ok(super::Pagination {
628 total: rows
629 .first()
630 .map_or(Ok(0), |row| row.try_get("total_count"))?,
631 per_page,
632 page,
633 data: rows
634 .into_iter()
635 .map(|row| Self::map(None, &row))
636 .try_collect_vec()?,
637 })
638 }
639
640 pub async fn delete_by_uuids(
641 database: &crate::database::Database,
642 node_uuid: uuid::Uuid,
643 uuids: &[uuid::Uuid],
644 ) -> Result<u64, crate::database::DatabaseError> {
645 let deleted = sqlx::query(
646 r#"
647 DELETE FROM node_allocations
648 WHERE node_allocations.node_uuid = $1 AND node_allocations.uuid = ANY($2)
649 "#,
650 )
651 .bind(node_uuid)
652 .bind(uuids)
653 .execute(database.write())
654 .await?
655 .rows_affected();
656
657 Ok(deleted)
658 }
659}
660
661#[async_trait::async_trait]
662impl IntoAdminApiObject for NodeAllocation {
663 type AdminApiObject = AdminApiNodeAllocation;
664 type ExtraArgs<'a> = &'a crate::storage::StorageUrlRetriever<'a>;
665
666 async fn into_admin_api_object<'a>(
667 self,
668 state: &crate::State,
669 storage_url_retriever: Self::ExtraArgs<'a>,
670 ) -> Result<Self::AdminApiObject, crate::database::DatabaseError> {
671 let api_object = AdminApiNodeAllocation::init_hooks(&self, state).await?;
672
673 let server = match self.server {
674 Some(fetchable) => Some(
675 fetchable
676 .fetch_cached(&state.database)
677 .await?
678 .into_admin_api_object(state, storage_url_retriever)
679 .await?,
680 ),
681 None => None,
682 };
683
684 let api_object = finish_extendible!(
685 AdminApiNodeAllocation {
686 uuid: self.uuid,
687 server,
688 ip: self.ip.ip().to_compact_string(),
689 ip_alias: self.ip_alias,
690 port: self.port,
691 created: self.created.and_utc(),
692 },
693 api_object,
694 state
695 )?;
696
697 Ok(api_object)
698 }
699}
700
701#[schema_extension_derive::extendible]
702#[init_args(NodeAllocation, crate::State)]
703#[hook_args(crate::State)]
704#[derive(ToSchema, Serialize)]
705#[schema(title = "NodeAllocation")]
706pub struct AdminApiNodeAllocation {
707 pub uuid: uuid::Uuid,
708 pub server: Option<super::server::AdminApiServer>,
709
710 pub ip: compact_str::CompactString,
711 pub ip_alias: Option<compact_str::CompactString>,
712 pub port: i32,
713
714 pub created: chrono::DateTime<chrono::Utc>,
715}