Skip to main content

shared/
storage.rs

1use crate::settings::SettingsReadGuard;
2use compact_str::ToCompactString;
3use serde::{Deserialize, Serialize};
4use std::{path::Path, sync::Arc};
5use tokio::io::AsyncWriteExt;
6use utoipa::ToSchema;
7
8#[derive(ToSchema, Deserialize, Serialize)]
9pub struct StorageAsset {
10    pub name: compact_str::CompactString,
11    pub url: String,
12    pub size: u64,
13    pub is_directory: bool,
14    pub created: chrono::DateTime<chrono::Utc>,
15}
16
17fn get_s3_client(
18    access_key: &str,
19    secret_key: &str,
20    bucket: &str,
21    region: &str,
22    endpoint: &str,
23    path_style: bool,
24) -> Result<Box<s3::Bucket>, anyhow::Error> {
25    let mut bucket = s3::Bucket::new(
26        bucket,
27        s3::Region::Custom {
28            region: region.to_string(),
29            endpoint: endpoint.to_string(),
30        },
31        s3::creds::Credentials::new(Some(access_key), Some(secret_key), None, None, None)?,
32    )?;
33
34    if path_style {
35        bucket.set_path_style();
36    }
37
38    Ok(bucket)
39}
40
41pub struct StorageUrlRetriever<'a> {
42    settings: SettingsReadGuard<'a>,
43}
44
45impl<'a> StorageUrlRetriever<'a> {
46    pub fn new(settings: SettingsReadGuard<'a>) -> Self {
47        Self { settings }
48    }
49
50    pub fn get_settings(&self) -> &super::settings::AppSettings {
51        &self.settings
52    }
53
54    pub fn get_url(&self, path: impl AsRef<str>) -> String {
55        match &self.settings.storage_driver {
56            super::settings::StorageDriver::Filesystem { .. } => {
57                format!(
58                    "{}/{}",
59                    self.settings.app.url.trim_end_matches('/'),
60                    path.as_ref()
61                )
62            }
63            super::settings::StorageDriver::S3 { public_url, .. } => {
64                format!("{}/{}", public_url.trim_end_matches('/'), path.as_ref())
65            }
66        }
67    }
68}
69
70pub struct Storage {
71    settings: Arc<super::settings::Settings>,
72}
73
74impl Storage {
75    pub fn new(settings: Arc<super::settings::Settings>) -> Self {
76        Self { settings }
77    }
78
79    pub async fn retrieve_urls(&self) -> Result<StorageUrlRetriever<'_>, anyhow::Error> {
80        let settings = self.settings.get().await?;
81
82        Ok(StorageUrlRetriever::new(settings))
83    }
84
85    pub async fn remove(&self, path: Option<impl AsRef<str>>) -> Result<(), anyhow::Error> {
86        let path = match path {
87            Some(path) => path,
88            None => return Ok(()),
89        };
90        let path = path.as_ref();
91
92        if path.is_empty() || path.contains("..") || path.starts_with("/") {
93            return Err(anyhow::anyhow!("invalid path"));
94        }
95
96        let settings = self.settings.get().await?;
97
98        tracing::debug!(path, "removing file");
99
100        match &settings.storage_driver {
101            super::settings::StorageDriver::Filesystem { path: base_path } => {
102                let base_filesystem =
103                    match crate::cap::CapFilesystem::async_new(base_path.into()).await {
104                        Ok(base_filesystem) => base_filesystem,
105                        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
106                        Err(err) => return Err(err.into()),
107                    };
108                drop(settings);
109
110                if let Err(err) = base_filesystem.async_remove_file(&path).await
111                    && err
112                        .downcast_ref::<std::io::Error>()
113                        .is_none_or(|e| e.kind() != std::io::ErrorKind::NotFound)
114                {
115                    return Err(err);
116                }
117
118                if let Some(parent) = Path::new(path).parent().map(|p| p.to_path_buf()) {
119                    tokio::spawn(async move {
120                        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
121
122                        let mut directory = match base_filesystem.async_read_dir(&parent).await {
123                            Ok(directory) => directory,
124                            Err(_) => return,
125                        };
126
127                        if directory.next_entry().await.is_none() {
128                            base_filesystem.async_remove_dir(parent).await.ok();
129                        }
130                    });
131                }
132            }
133            super::settings::StorageDriver::S3 {
134                access_key,
135                secret_key,
136                bucket,
137                region,
138                endpoint,
139                path_style,
140                ..
141            } => {
142                let s3_client = get_s3_client(
143                    access_key,
144                    secret_key,
145                    bucket,
146                    region,
147                    endpoint,
148                    *path_style,
149                )?;
150                drop(settings);
151
152                s3_client.delete_object(path).await?;
153            }
154        }
155
156        Ok(())
157    }
158
159    pub async fn store(
160        &self,
161        path: impl AsRef<str>,
162        mut data: impl tokio::io::AsyncRead + Unpin,
163        content_type: impl AsRef<str>,
164    ) -> Result<u64, anyhow::Error> {
165        let path = path.as_ref();
166        let content_type = content_type.as_ref();
167
168        if path.is_empty() || path.contains("..") || path.starts_with("/") {
169            return Err(anyhow::anyhow!("invalid path"));
170        }
171
172        let settings = self.settings.get().await?;
173
174        tracing::debug!(path, content_type, "storing file");
175
176        match &settings.storage_driver {
177            super::settings::StorageDriver::Filesystem { path: base_path } => {
178                tokio::fs::create_dir_all(base_path).await?;
179
180                let base_filesystem =
181                    crate::cap::CapFilesystem::async_new(base_path.into()).await?;
182                drop(settings);
183
184                if let Some(parent) = Path::new(path).parent() {
185                    base_filesystem.async_create_dir_all(parent).await?;
186                }
187
188                let mut file = base_filesystem.async_create(path).await?;
189                let bytes = tokio::io::copy(&mut data, &mut file).await?;
190
191                file.shutdown().await?;
192                Ok(bytes)
193            }
194            super::settings::StorageDriver::S3 {
195                access_key,
196                secret_key,
197                bucket,
198                region,
199                endpoint,
200                path_style,
201                ..
202            } => {
203                let s3_client = get_s3_client(
204                    access_key,
205                    secret_key,
206                    bucket,
207                    region,
208                    endpoint,
209                    *path_style,
210                )?;
211                drop(settings);
212
213                let response = s3_client
214                    .put_object_stream_with_content_type(&mut data, path, content_type)
215                    .await?;
216                Ok(response.uploaded_bytes() as u64)
217            }
218        }
219    }
220
221    pub async fn list(
222        &self,
223        base: impl AsRef<str>,
224        directory: impl AsRef<str>,
225        page: usize,
226        per_page: usize,
227    ) -> Result<crate::models::Pagination<StorageAsset>, anyhow::Error> {
228        let base = base.as_ref();
229        let directory = directory.as_ref();
230
231        if base.is_empty() || base.contains("..") || base.starts_with('/') {
232            return Err(anyhow::anyhow!("invalid base path"));
233        }
234        if !directory.is_empty()
235            && (directory.contains("..") || directory.starts_with('/') || directory.ends_with('/'))
236        {
237            return Err(anyhow::anyhow!("invalid directory path"));
238        }
239
240        let settings = self.settings.get().await?;
241
242        match &settings.storage_driver {
243            super::settings::StorageDriver::Filesystem { path: base_path } => {
244                let dir_path = if directory.is_empty() {
245                    Path::new(base_path).join(base)
246                } else {
247                    Path::new(base_path).join(base).join(directory)
248                };
249
250                let base_filesystem = match crate::cap::CapFilesystem::async_new(dir_path).await {
251                    Ok(base_filesystem) => base_filesystem,
252                    Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
253                        return Ok(crate::models::Pagination {
254                            total: 0,
255                            per_page: per_page as i64,
256                            page: page as i64,
257                            data: Vec::new(),
258                        });
259                    }
260                    Err(err) => return Err(err.into()),
261                };
262                drop(settings);
263
264                let mut dir_reader = base_filesystem.async_read_dir("").await?;
265                let mut raw_dirs: Vec<String> = Vec::new();
266                let mut raw_files: Vec<String> = Vec::new();
267
268                while let Some(Ok((is_dir, name))) = dir_reader.next_entry().await {
269                    if is_dir {
270                        raw_dirs.push(name);
271                    } else {
272                        raw_files.push(name);
273                    }
274                }
275
276                raw_dirs.sort_unstable();
277                raw_files.sort_unstable();
278
279                let total = (raw_dirs.len() + raw_files.len()) as i64;
280                let start = (page - 1) * per_page;
281
282                let storage_url_retriever = self.retrieve_urls().await?;
283
284                let mut entries = Vec::new();
285
286                for (is_dir, name) in raw_dirs
287                    .into_iter()
288                    .map(|n| (true, n))
289                    .chain(raw_files.into_iter().map(|n| (false, n)))
290                    .skip(start)
291                    .take(per_page)
292                {
293                    let full_name = if directory.is_empty() {
294                        name.clone()
295                    } else {
296                        format!("{directory}/{name}")
297                    };
298
299                    let (size, created) = if is_dir {
300                        (0u64, chrono::DateTime::<chrono::Utc>::default())
301                    } else {
302                        let metadata = match base_filesystem.async_metadata(&name).await {
303                            Ok(m) => m,
304                            Err(_) => continue,
305                        };
306                        let created = metadata
307                            .created()
308                            .or_else(|_| metadata.modified())?
309                            .into_std()
310                            .into();
311                        (metadata.len(), created)
312                    };
313
314                    entries.push(StorageAsset {
315                        url: storage_url_retriever.get_url(format!("{base}/{full_name}")),
316                        name: full_name.to_compact_string(),
317                        size,
318                        is_directory: is_dir,
319                        created,
320                    });
321                }
322
323                Ok(crate::models::Pagination {
324                    total,
325                    per_page: per_page as i64,
326                    page: page as i64,
327                    data: entries,
328                })
329            }
330            super::settings::StorageDriver::S3 {
331                access_key,
332                secret_key,
333                bucket,
334                region,
335                endpoint,
336                path_style,
337                ..
338            } => {
339                let s3_client = get_s3_client(
340                    access_key,
341                    secret_key,
342                    bucket,
343                    region,
344                    endpoint,
345                    *path_style,
346                )?;
347                drop(settings);
348
349                let s3_prefix = if directory.is_empty() {
350                    format!("{base}/")
351                } else {
352                    format!("{base}/{directory}/")
353                };
354                let strip_prefix = format!("{base}/");
355
356                let results = s3_client
357                    .list(s3_prefix.clone(), Some("/".to_string()))
358                    .await?;
359
360                let storage_url_retriever = self.retrieve_urls().await?;
361
362                let mut dirs: Vec<StorageAsset> = Vec::new();
363                let mut files: Vec<StorageAsset> = Vec::new();
364
365                for result in &results {
366                    if let Some(prefixes) = &result.common_prefixes {
367                        for cp in prefixes {
368                            let name = cp
369                                .prefix
370                                .trim_start_matches(&strip_prefix)
371                                .trim_end_matches('/')
372                                .to_compact_string();
373                            dirs.push(StorageAsset {
374                                url: storage_url_retriever.get_url(&cp.prefix),
375                                name,
376                                size: 0,
377                                is_directory: true,
378                                created: chrono::DateTime::<chrono::Utc>::default(),
379                            });
380                        }
381                    }
382
383                    for entry in &result.contents {
384                        if entry.key == s3_prefix {
385                            continue;
386                        }
387                        let name = entry
388                            .key
389                            .trim_start_matches(&strip_prefix)
390                            .to_compact_string();
391                        files.push(StorageAsset {
392                            url: storage_url_retriever.get_url(&entry.key),
393                            name,
394                            size: entry.size,
395                            is_directory: false,
396                            created: entry.last_modified.parse().unwrap_or_default(),
397                        });
398                    }
399                }
400
401                let total = (dirs.len() + files.len()) as i64;
402                let start = (page - 1) * per_page;
403
404                Ok(crate::models::Pagination {
405                    total,
406                    per_page: per_page as i64,
407                    page: page as i64,
408                    data: dirs
409                        .into_iter()
410                        .chain(files)
411                        .skip(start)
412                        .take(per_page)
413                        .collect(),
414                })
415            }
416        }
417    }
418}