Skip to main content

shared/
updates.rs

1use compact_str::ToCompactString;
2use serde::{Deserialize, Serialize};
3use sqlx::Row;
4use std::{collections::BTreeMap, fmt::Display, str::FromStr, sync::Arc};
5use tokio::sync::{RwLock, RwLockReadGuard};
6use utoipa::ToSchema;
7
8#[derive(ToSchema, Serialize, Deserialize, Clone)]
9pub struct VersionHistoryEntry {
10    version: compact_str::CompactString,
11    timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14/// Accepted formats:
15/// - `1.0.0`
16/// - `1.0.0:commit`
17/// - `1.0.0:commit@branch`
18#[derive(ToSchema, Serialize, Deserialize, Clone)]
19pub struct ParsedVersionInformation {
20    #[schema(value_type = String)]
21    pub version: semver::Version,
22    pub commit: Option<compact_str::CompactString>,
23    pub branch: Option<compact_str::CompactString>,
24}
25
26impl FromStr for ParsedVersionInformation {
27    type Err = anyhow::Error;
28
29    fn from_str(s: &str) -> Result<Self, Self::Err> {
30        if let Ok(version) = semver::Version::parse(s) {
31            return Ok(Self {
32                version,
33                commit: None,
34                branch: None,
35            });
36        }
37
38        let (version, commit_branch) = if let Some((version, commit)) = s.split_once(':') {
39            (version, Some(commit))
40        } else {
41            (s, None)
42        };
43        let (commit, branch) =
44            if let Some((commit, branch)) = commit_branch.and_then(|cb| cb.split_once('@')) {
45                (
46                    Some(commit.to_compact_string()),
47                    Some(branch.to_compact_string()),
48                )
49            } else {
50                (commit_branch.map(|c| c.to_compact_string()), None)
51            };
52
53        Ok(Self {
54            version: semver::Version::parse(version)?,
55            commit,
56            branch,
57        })
58    }
59}
60
61impl Display for ParsedVersionInformation {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        if let Some(commit) = &self.commit {
64            if let Some(branch) = &self.branch {
65                write!(f, "{}:{}@{}", self.version, commit, branch)
66            } else {
67                write!(f, "{}:{}", self.version, commit)
68            }
69        } else {
70            write!(f, "{}", self.version)
71        }
72    }
73}
74
75#[derive(ToSchema, Serialize, Clone)]
76#[serde(tag = "type", rename_all = "snake_case")]
77pub enum ExtensionUpdateCheckResult {
78    NoUpdate,
79    UpdateAvailable {
80        #[schema(value_type = String)]
81        version: semver::Version,
82        #[schema(value_type = String)]
83        latest_version: semver::Version,
84        changes: Vec<compact_str::CompactString>,
85    },
86    Error {
87        error: compact_str::CompactString,
88    },
89}
90
91#[derive(ToSchema, Serialize, Clone)]
92pub struct UpdateInformation {
93    pub panel_version: compact_str::CompactString,
94    #[schema(value_type = String)]
95    pub latest_panel_version: semver::Version,
96    #[schema(value_type = String)]
97    pub latest_wings_version: semver::Version,
98
99    pub extensions: BTreeMap<&'static str, ExtensionUpdateCheckResult>,
100}
101
102type ChannelData = Result<Arc<UpdateInformation>, Arc<anyhow::Error>>;
103
104pub struct UpdateManager {
105    recheck_notifier: Arc<tokio::sync::Notify>,
106    recheck_finished_receiver: tokio::sync::broadcast::Receiver<ChannelData>,
107    recheck_finished_sender: tokio::sync::broadcast::Sender<ChannelData>,
108
109    latest_info: Arc<RwLock<Option<Arc<UpdateInformation>>>>,
110    panel_version_history: Arc<RwLock<Vec<VersionHistoryEntry>>>,
111    extension_version_history: Arc<RwLock<BTreeMap<&'static str, Vec<VersionHistoryEntry>>>>,
112}
113
114impl Default for UpdateManager {
115    fn default() -> Self {
116        let (recheck_finished_sender, recheck_finished_receiver) =
117            tokio::sync::broadcast::channel(1);
118
119        Self {
120            recheck_notifier: Arc::new(tokio::sync::Notify::new()),
121            recheck_finished_receiver,
122            recheck_finished_sender,
123            latest_info: Arc::new(RwLock::new(None)),
124            panel_version_history: Arc::new(RwLock::new(Vec::new())),
125            extension_version_history: Arc::new(RwLock::new(BTreeMap::new())),
126        }
127    }
128}
129
130impl UpdateManager {
131    pub fn init(&self, state: super::State) {
132        if !state.env.app_primary {
133            return;
134        }
135
136        let recheck_notifier = self.recheck_notifier.clone();
137        let recheck_finished_sender = self.recheck_finished_sender.clone();
138        let latest_info = self.latest_info.clone();
139
140        tokio::spawn({
141            let state = state.clone();
142
143            async move {
144                loop {
145                    let run_inner = async || -> Result<(), anyhow::Error> {
146                        let data: Response = state
147                            .client
148                            .get("https://calagopus.com/api/latest")
149                            .send()
150                            .await?
151                            .json()
152                            .await?;
153
154                        #[derive(Deserialize)]
155                        struct Response {
156                            versions: ResponseVersions,
157                        }
158
159                        #[derive(Deserialize)]
160                        struct ResponseVersions {
161                            panel: semver::Version,
162                            wings: semver::Version,
163                        }
164
165                        let mut update_info = UpdateInformation {
166                            panel_version: state.version.to_compact_string(),
167                            latest_panel_version: data.versions.panel,
168                            latest_wings_version: data.versions.wings,
169                            extensions: BTreeMap::new(),
170                        };
171
172                        for extension in state.extensions.extensions().await.iter() {
173                            let update_information = match extension
174                                .check_for_updates(state.clone(), &extension.version)
175                                .await
176                            {
177                                Ok(info) => info,
178                                Err(err) => {
179                                    tracing::error!(
180                                        "failed to check for updates for extension {}: {:#?}",
181                                        extension.package_name,
182                                        err
183                                    );
184
185                                    update_info.extensions.insert(
186                                        extension.package_name,
187                                        ExtensionUpdateCheckResult::Error {
188                                            error: err.to_compact_string(),
189                                        },
190                                    );
191
192                                    continue;
193                                }
194                            };
195
196                            if let Some(info) = update_information {
197                                update_info.extensions.insert(
198                                    extension.package_name,
199                                    ExtensionUpdateCheckResult::UpdateAvailable {
200                                        version: extension.version.clone(),
201                                        latest_version: info.version,
202                                        changes: info.changes,
203                                    },
204                                );
205                            } else {
206                                update_info.extensions.insert(
207                                    extension.package_name,
208                                    ExtensionUpdateCheckResult::NoUpdate,
209                                );
210                            }
211                        }
212
213                        let update_info = Arc::new(update_info);
214                        *latest_info.write().await = Some(update_info.clone());
215                        let _ = recheck_finished_sender.send(Ok(update_info));
216
217                        Ok(())
218                    };
219
220                    if let Err(err) = run_inner().await {
221                        tracing::error!("failed to check for updates: {:#?}", err);
222                        let _ = recheck_finished_sender.send(Err(Arc::new(err)));
223                    }
224
225                    tracing::info!("finished update check, waiting for 12h or recheck trigger");
226
227                    tokio::select! {
228                        _ = recheck_notifier.notified() => {}
229                        _ = tokio::time::sleep(std::time::Duration::from_hours(12)) => {}
230                    }
231                }
232            }
233        });
234
235        tokio::spawn(async move {
236            let run = async || -> Result<(), anyhow::Error> {
237                sqlx::query(
238                    "INSERT INTO version_history (extension, version) VALUES ('', $1)
239                    ON CONFLICT (extension, version) DO NOTHING",
240                )
241                .bind(&state.version)
242                .execute(state.database.write())
243                .await?;
244
245                for extension in state.extensions.extensions().await.iter() {
246                    sqlx::query(
247                        "INSERT INTO version_history (extension, version) VALUES ($1, $2)
248                        ON CONFLICT (extension, version) DO NOTHING",
249                    )
250                    .bind(extension.package_name)
251                    .bind(extension.version.to_string())
252                    .execute(state.database.write())
253                    .await?;
254                }
255
256                let rows = sqlx::query("SELECT extension, version, installed FROM version_history ORDER BY version_history.installed DESC")
257                    .fetch_all(state.database.read())
258                    .await?;
259
260                let mut panel_history = Vec::new();
261                let mut extension_history = BTreeMap::new();
262
263                for row in rows {
264                    let extension: compact_str::CompactString = row.try_get("extension")?;
265
266                    let entry = VersionHistoryEntry {
267                        version: row.try_get("version")?,
268                        timestamp: row
269                            .try_get::<chrono::NaiveDateTime, _>("installed")?
270                            .and_utc(),
271                    };
272
273                    if extension.is_empty() {
274                        panel_history.push(entry);
275                    } else if let Some(ext) = state
276                        .extensions
277                        .extensions()
278                        .await
279                        .iter()
280                        .find(|ext| ext.package_name == extension)
281                    {
282                        extension_history
283                            .entry(ext.package_name)
284                            .or_insert_with(Vec::new)
285                            .push(entry);
286                    }
287                }
288
289                *state.updates.panel_version_history.write().await = panel_history;
290                *state.updates.extension_version_history.write().await = extension_history;
291
292                Ok(())
293            };
294
295            if let Err(err) = run().await {
296                tracing::error!("failed to track version history: {:#?}", err);
297            }
298        });
299    }
300
301    pub async fn get_panel_version_history(&self) -> RwLockReadGuard<'_, Vec<VersionHistoryEntry>> {
302        self.panel_version_history.read().await
303    }
304
305    pub async fn get_extension_version_history(
306        &self,
307    ) -> RwLockReadGuard<'_, BTreeMap<&'static str, Vec<VersionHistoryEntry>>> {
308        self.extension_version_history.read().await
309    }
310
311    pub async fn get_update_information(&self) -> Option<Arc<UpdateInformation>> {
312        self.latest_info.read().await.clone()
313    }
314
315    pub fn trigger_recheck(&self) {
316        self.recheck_notifier.notify_waiters();
317    }
318
319    pub async fn trigger_recheck_and_wait(&self) -> ChannelData {
320        self.trigger_recheck();
321        self.recheck_finished_receiver
322            .resubscribe()
323            .recv()
324            .await
325            .map_err(|err| {
326                Arc::new(anyhow::anyhow!(
327                    "failed to receive update check result: {:#?}",
328                    err
329                ))
330            })
331            .flatten()
332    }
333}