1use compact_str::ToCompactString;
2use serde::{Deserialize, Serialize};
3use sqlx::Row;
4use std::{collections::BTreeMap, fmt::Display, str::FromStr, sync::Arc};
5use tokio::sync::{RwLock, RwLockReadGuard};
6use utoipa::ToSchema;
7
8#[derive(ToSchema, Serialize, Deserialize, Clone)]
9pub struct VersionHistoryEntry {
10 version: compact_str::CompactString,
11 timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14#[derive(ToSchema, Serialize, Deserialize, Clone)]
19pub struct ParsedVersionInformation {
20 #[schema(value_type = String)]
21 pub version: semver::Version,
22 pub commit: Option<compact_str::CompactString>,
23 pub branch: Option<compact_str::CompactString>,
24}
25
26impl FromStr for ParsedVersionInformation {
27 type Err = anyhow::Error;
28
29 fn from_str(s: &str) -> Result<Self, Self::Err> {
30 if let Ok(version) = semver::Version::parse(s) {
31 return Ok(Self {
32 version,
33 commit: None,
34 branch: None,
35 });
36 }
37
38 let (version, commit_branch) = if let Some((version, commit)) = s.split_once(':') {
39 (version, Some(commit))
40 } else {
41 (s, None)
42 };
43 let (commit, branch) =
44 if let Some((commit, branch)) = commit_branch.and_then(|cb| cb.split_once('@')) {
45 (
46 Some(commit.to_compact_string()),
47 Some(branch.to_compact_string()),
48 )
49 } else {
50 (commit_branch.map(|c| c.to_compact_string()), None)
51 };
52
53 Ok(Self {
54 version: semver::Version::parse(version)?,
55 commit,
56 branch,
57 })
58 }
59}
60
61impl Display for ParsedVersionInformation {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 if let Some(commit) = &self.commit {
64 if let Some(branch) = &self.branch {
65 write!(f, "{}:{}@{}", self.version, commit, branch)
66 } else {
67 write!(f, "{}:{}", self.version, commit)
68 }
69 } else {
70 write!(f, "{}", self.version)
71 }
72 }
73}
74
75#[derive(ToSchema, Serialize, Clone)]
76#[serde(tag = "type", rename_all = "snake_case")]
77pub enum ExtensionUpdateCheckResult {
78 NoUpdate,
79 UpdateAvailable {
80 #[schema(value_type = String)]
81 version: semver::Version,
82 #[schema(value_type = String)]
83 latest_version: semver::Version,
84 changes: Vec<compact_str::CompactString>,
85 },
86 Error {
87 error: compact_str::CompactString,
88 },
89}
90
91#[derive(ToSchema, Serialize, Clone)]
92pub struct UpdateInformation {
93 pub panel_version: compact_str::CompactString,
94 #[schema(value_type = String)]
95 pub latest_panel_version: semver::Version,
96 #[schema(value_type = String)]
97 pub latest_wings_version: semver::Version,
98
99 pub extensions: BTreeMap<&'static str, ExtensionUpdateCheckResult>,
100}
101
102type ChannelData = Result<Arc<UpdateInformation>, Arc<anyhow::Error>>;
103
104pub struct UpdateManager {
105 recheck_notifier: Arc<tokio::sync::Notify>,
106 recheck_finished_receiver: tokio::sync::broadcast::Receiver<ChannelData>,
107 recheck_finished_sender: tokio::sync::broadcast::Sender<ChannelData>,
108
109 latest_info: Arc<RwLock<Option<Arc<UpdateInformation>>>>,
110 panel_version_history: Arc<RwLock<Vec<VersionHistoryEntry>>>,
111 extension_version_history: Arc<RwLock<BTreeMap<&'static str, Vec<VersionHistoryEntry>>>>,
112}
113
114impl Default for UpdateManager {
115 fn default() -> Self {
116 let (recheck_finished_sender, recheck_finished_receiver) =
117 tokio::sync::broadcast::channel(1);
118
119 Self {
120 recheck_notifier: Arc::new(tokio::sync::Notify::new()),
121 recheck_finished_receiver,
122 recheck_finished_sender,
123 latest_info: Arc::new(RwLock::new(None)),
124 panel_version_history: Arc::new(RwLock::new(Vec::new())),
125 extension_version_history: Arc::new(RwLock::new(BTreeMap::new())),
126 }
127 }
128}
129
130impl UpdateManager {
131 pub fn init(&self, state: super::State) {
132 if !state.env.app_primary {
133 return;
134 }
135
136 let recheck_notifier = self.recheck_notifier.clone();
137 let recheck_finished_sender = self.recheck_finished_sender.clone();
138 let latest_info = self.latest_info.clone();
139
140 tokio::spawn({
141 let state = state.clone();
142
143 async move {
144 loop {
145 let run_inner = async || -> Result<(), anyhow::Error> {
146 let data: Response = state
147 .client
148 .get("https://calagopus.com/api/latest")
149 .send()
150 .await?
151 .json()
152 .await?;
153
154 #[derive(Deserialize)]
155 struct Response {
156 versions: ResponseVersions,
157 }
158
159 #[derive(Deserialize)]
160 struct ResponseVersions {
161 panel: semver::Version,
162 wings: semver::Version,
163 }
164
165 let mut update_info = UpdateInformation {
166 panel_version: state.version.to_compact_string(),
167 latest_panel_version: data.versions.panel,
168 latest_wings_version: data.versions.wings,
169 extensions: BTreeMap::new(),
170 };
171
172 for extension in state.extensions.extensions().await.iter() {
173 let update_information = match extension
174 .check_for_updates(state.clone(), &extension.version)
175 .await
176 {
177 Ok(info) => info,
178 Err(err) => {
179 tracing::error!(
180 "failed to check for updates for extension {}: {:#?}",
181 extension.package_name,
182 err
183 );
184
185 update_info.extensions.insert(
186 extension.package_name,
187 ExtensionUpdateCheckResult::Error {
188 error: err.to_compact_string(),
189 },
190 );
191
192 continue;
193 }
194 };
195
196 if let Some(info) = update_information {
197 update_info.extensions.insert(
198 extension.package_name,
199 ExtensionUpdateCheckResult::UpdateAvailable {
200 version: extension.version.clone(),
201 latest_version: info.version,
202 changes: info.changes,
203 },
204 );
205 } else {
206 update_info.extensions.insert(
207 extension.package_name,
208 ExtensionUpdateCheckResult::NoUpdate,
209 );
210 }
211 }
212
213 let update_info = Arc::new(update_info);
214 *latest_info.write().await = Some(update_info.clone());
215 let _ = recheck_finished_sender.send(Ok(update_info));
216
217 Ok(())
218 };
219
220 if let Err(err) = run_inner().await {
221 tracing::error!("failed to check for updates: {:#?}", err);
222 let _ = recheck_finished_sender.send(Err(Arc::new(err)));
223 }
224
225 tracing::info!("finished update check, waiting for 12h or recheck trigger");
226
227 tokio::select! {
228 _ = recheck_notifier.notified() => {}
229 _ = tokio::time::sleep(std::time::Duration::from_hours(12)) => {}
230 }
231 }
232 }
233 });
234
235 tokio::spawn(async move {
236 let run = async || -> Result<(), anyhow::Error> {
237 sqlx::query(
238 "INSERT INTO version_history (extension, version) VALUES ('', $1)
239 ON CONFLICT (extension, version) DO NOTHING",
240 )
241 .bind(&state.version)
242 .execute(state.database.write())
243 .await?;
244
245 for extension in state.extensions.extensions().await.iter() {
246 sqlx::query(
247 "INSERT INTO version_history (extension, version) VALUES ($1, $2)
248 ON CONFLICT (extension, version) DO NOTHING",
249 )
250 .bind(extension.package_name)
251 .bind(extension.version.to_string())
252 .execute(state.database.write())
253 .await?;
254 }
255
256 let rows = sqlx::query("SELECT extension, version, installed FROM version_history ORDER BY version_history.installed DESC")
257 .fetch_all(state.database.read())
258 .await?;
259
260 let mut panel_history = Vec::new();
261 let mut extension_history = BTreeMap::new();
262
263 for row in rows {
264 let extension: compact_str::CompactString = row.try_get("extension")?;
265
266 let entry = VersionHistoryEntry {
267 version: row.try_get("version")?,
268 timestamp: row
269 .try_get::<chrono::NaiveDateTime, _>("installed")?
270 .and_utc(),
271 };
272
273 if extension.is_empty() {
274 panel_history.push(entry);
275 } else if let Some(ext) = state
276 .extensions
277 .extensions()
278 .await
279 .iter()
280 .find(|ext| ext.package_name == extension)
281 {
282 extension_history
283 .entry(ext.package_name)
284 .or_insert_with(Vec::new)
285 .push(entry);
286 }
287 }
288
289 *state.updates.panel_version_history.write().await = panel_history;
290 *state.updates.extension_version_history.write().await = extension_history;
291
292 Ok(())
293 };
294
295 if let Err(err) = run().await {
296 tracing::error!("failed to track version history: {:#?}", err);
297 }
298 });
299 }
300
301 pub async fn get_panel_version_history(&self) -> RwLockReadGuard<'_, Vec<VersionHistoryEntry>> {
302 self.panel_version_history.read().await
303 }
304
305 pub async fn get_extension_version_history(
306 &self,
307 ) -> RwLockReadGuard<'_, BTreeMap<&'static str, Vec<VersionHistoryEntry>>> {
308 self.extension_version_history.read().await
309 }
310
311 pub async fn get_update_information(&self) -> Option<Arc<UpdateInformation>> {
312 self.latest_info.read().await.clone()
313 }
314
315 pub fn trigger_recheck(&self) {
316 self.recheck_notifier.notify_waiters();
317 }
318
319 pub async fn trigger_recheck_and_wait(&self) -> ChannelData {
320 self.trigger_recheck();
321 self.recheck_finished_receiver
322 .resubscribe()
323 .recv()
324 .await
325 .map_err(|err| {
326 Arc::new(anyhow::anyhow!(
327 "failed to receive update check result: {:#?}",
328 err
329 ))
330 })
331 .flatten()
332 }
333}