1use crate::settings::SettingsReadGuard;
2use compact_str::ToCompactString;
3use serde::{Deserialize, Serialize};
4use std::{path::Path, sync::Arc};
5use tokio::io::AsyncWriteExt;
6use utoipa::ToSchema;
7
8#[derive(ToSchema, Deserialize, Serialize)]
9pub struct StorageAsset {
10 pub name: compact_str::CompactString,
11 pub url: String,
12 pub size: u64,
13 pub is_directory: bool,
14 pub created: chrono::DateTime<chrono::Utc>,
15}
16
17fn get_s3_client(
18 access_key: &str,
19 secret_key: &str,
20 bucket: &str,
21 region: &str,
22 endpoint: &str,
23 path_style: bool,
24) -> Result<Box<s3::Bucket>, anyhow::Error> {
25 let mut bucket = s3::Bucket::new(
26 bucket,
27 s3::Region::Custom {
28 region: region.to_string(),
29 endpoint: endpoint.to_string(),
30 },
31 s3::creds::Credentials::new(Some(access_key), Some(secret_key), None, None, None)?,
32 )?;
33
34 if path_style {
35 bucket.set_path_style();
36 }
37
38 Ok(bucket)
39}
40
41pub struct StorageUrlRetriever<'a> {
42 settings: SettingsReadGuard<'a>,
43}
44
45impl<'a> StorageUrlRetriever<'a> {
46 pub fn new(settings: SettingsReadGuard<'a>) -> Self {
47 Self { settings }
48 }
49
50 pub fn get_settings(&self) -> &super::settings::AppSettings {
51 &self.settings
52 }
53
54 pub fn get_url(&self, path: impl AsRef<str>) -> String {
55 match &self.settings.storage_driver {
56 super::settings::StorageDriver::Filesystem { .. } => {
57 format!(
58 "{}/{}",
59 self.settings.app.url.trim_end_matches('/'),
60 path.as_ref()
61 )
62 }
63 super::settings::StorageDriver::S3 { public_url, .. } => {
64 format!("{}/{}", public_url.trim_end_matches('/'), path.as_ref())
65 }
66 }
67 }
68}
69
70pub struct Storage {
71 settings: Arc<super::settings::Settings>,
72}
73
74impl Storage {
75 pub fn new(settings: Arc<super::settings::Settings>) -> Self {
76 Self { settings }
77 }
78
79 pub async fn retrieve_urls(&self) -> Result<StorageUrlRetriever<'_>, anyhow::Error> {
80 let settings = self.settings.get().await?;
81
82 Ok(StorageUrlRetriever::new(settings))
83 }
84
85 pub async fn remove(&self, path: Option<impl AsRef<str>>) -> Result<(), anyhow::Error> {
86 let path = match path {
87 Some(path) => path,
88 None => return Ok(()),
89 };
90 let path = path.as_ref();
91
92 if path.is_empty() || path.contains("..") || path.starts_with("/") {
93 return Err(anyhow::anyhow!("invalid path"));
94 }
95
96 let settings = self.settings.get().await?;
97
98 tracing::debug!(path, "removing file");
99
100 match &settings.storage_driver {
101 super::settings::StorageDriver::Filesystem { path: base_path } => {
102 let base_filesystem =
103 match crate::cap::CapFilesystem::async_new(base_path.into()).await {
104 Ok(base_filesystem) => base_filesystem,
105 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
106 Err(err) => return Err(err.into()),
107 };
108 drop(settings);
109
110 if let Err(err) = base_filesystem.async_remove_file(&path).await
111 && err
112 .downcast_ref::<std::io::Error>()
113 .is_none_or(|e| e.kind() != std::io::ErrorKind::NotFound)
114 {
115 return Err(err);
116 }
117
118 if let Some(parent) = Path::new(path).parent().map(|p| p.to_path_buf()) {
119 tokio::spawn(async move {
120 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
121
122 let mut directory = match base_filesystem.async_read_dir(&parent).await {
123 Ok(directory) => directory,
124 Err(_) => return,
125 };
126
127 if directory.next_entry().await.is_none() {
128 base_filesystem.async_remove_dir(parent).await.ok();
129 }
130 });
131 }
132 }
133 super::settings::StorageDriver::S3 {
134 access_key,
135 secret_key,
136 bucket,
137 region,
138 endpoint,
139 path_style,
140 ..
141 } => {
142 let s3_client = get_s3_client(
143 access_key,
144 secret_key,
145 bucket,
146 region,
147 endpoint,
148 *path_style,
149 )?;
150 drop(settings);
151
152 s3_client.delete_object(path).await?;
153 }
154 }
155
156 Ok(())
157 }
158
159 pub async fn store(
160 &self,
161 path: impl AsRef<str>,
162 mut data: impl tokio::io::AsyncRead + Unpin,
163 content_type: impl AsRef<str>,
164 ) -> Result<u64, anyhow::Error> {
165 let path = path.as_ref();
166 let content_type = content_type.as_ref();
167
168 if path.is_empty() || path.contains("..") || path.starts_with("/") {
169 return Err(anyhow::anyhow!("invalid path"));
170 }
171
172 let settings = self.settings.get().await?;
173
174 tracing::debug!(path, content_type, "storing file");
175
176 match &settings.storage_driver {
177 super::settings::StorageDriver::Filesystem { path: base_path } => {
178 tokio::fs::create_dir_all(base_path).await?;
179
180 let base_filesystem =
181 crate::cap::CapFilesystem::async_new(base_path.into()).await?;
182 drop(settings);
183
184 if let Some(parent) = Path::new(path).parent() {
185 base_filesystem.async_create_dir_all(parent).await?;
186 }
187
188 let mut file = base_filesystem.async_create(path).await?;
189 let bytes = tokio::io::copy(&mut data, &mut file).await?;
190
191 file.shutdown().await?;
192 Ok(bytes)
193 }
194 super::settings::StorageDriver::S3 {
195 access_key,
196 secret_key,
197 bucket,
198 region,
199 endpoint,
200 path_style,
201 ..
202 } => {
203 let s3_client = get_s3_client(
204 access_key,
205 secret_key,
206 bucket,
207 region,
208 endpoint,
209 *path_style,
210 )?;
211 drop(settings);
212
213 let response = s3_client
214 .put_object_stream_with_content_type(&mut data, path, content_type)
215 .await?;
216 Ok(response.uploaded_bytes() as u64)
217 }
218 }
219 }
220
221 pub async fn list(
222 &self,
223 base: impl AsRef<str>,
224 directory: impl AsRef<str>,
225 page: usize,
226 per_page: usize,
227 ) -> Result<crate::models::Pagination<StorageAsset>, anyhow::Error> {
228 let base = base.as_ref();
229 let directory = directory.as_ref();
230
231 if base.is_empty() || base.contains("..") || base.starts_with('/') {
232 return Err(anyhow::anyhow!("invalid base path"));
233 }
234 if !directory.is_empty()
235 && (directory.contains("..") || directory.starts_with('/') || directory.ends_with('/'))
236 {
237 return Err(anyhow::anyhow!("invalid directory path"));
238 }
239
240 let settings = self.settings.get().await?;
241
242 match &settings.storage_driver {
243 super::settings::StorageDriver::Filesystem { path: base_path } => {
244 let dir_path = if directory.is_empty() {
245 Path::new(base_path).join(base)
246 } else {
247 Path::new(base_path).join(base).join(directory)
248 };
249
250 let base_filesystem = match crate::cap::CapFilesystem::async_new(dir_path).await {
251 Ok(base_filesystem) => base_filesystem,
252 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
253 return Ok(crate::models::Pagination {
254 total: 0,
255 per_page: per_page as i64,
256 page: page as i64,
257 data: Vec::new(),
258 });
259 }
260 Err(err) => return Err(err.into()),
261 };
262 drop(settings);
263
264 let mut dir_reader = base_filesystem.async_read_dir("").await?;
265 let mut raw_dirs: Vec<String> = Vec::new();
266 let mut raw_files: Vec<String> = Vec::new();
267
268 while let Some(Ok((is_dir, name))) = dir_reader.next_entry().await {
269 if is_dir {
270 raw_dirs.push(name);
271 } else {
272 raw_files.push(name);
273 }
274 }
275
276 raw_dirs.sort_unstable();
277 raw_files.sort_unstable();
278
279 let total = (raw_dirs.len() + raw_files.len()) as i64;
280 let start = (page - 1) * per_page;
281
282 let storage_url_retriever = self.retrieve_urls().await?;
283
284 let mut entries = Vec::new();
285
286 for (is_dir, name) in raw_dirs
287 .into_iter()
288 .map(|n| (true, n))
289 .chain(raw_files.into_iter().map(|n| (false, n)))
290 .skip(start)
291 .take(per_page)
292 {
293 let full_name = if directory.is_empty() {
294 name.clone()
295 } else {
296 format!("{directory}/{name}")
297 };
298
299 let (size, created) = if is_dir {
300 (0u64, chrono::DateTime::<chrono::Utc>::default())
301 } else {
302 let metadata = match base_filesystem.async_metadata(&name).await {
303 Ok(m) => m,
304 Err(_) => continue,
305 };
306 let created = metadata
307 .created()
308 .or_else(|_| metadata.modified())?
309 .into_std()
310 .into();
311 (metadata.len(), created)
312 };
313
314 entries.push(StorageAsset {
315 url: storage_url_retriever.get_url(format!("{base}/{full_name}")),
316 name: full_name.to_compact_string(),
317 size,
318 is_directory: is_dir,
319 created,
320 });
321 }
322
323 Ok(crate::models::Pagination {
324 total,
325 per_page: per_page as i64,
326 page: page as i64,
327 data: entries,
328 })
329 }
330 super::settings::StorageDriver::S3 {
331 access_key,
332 secret_key,
333 bucket,
334 region,
335 endpoint,
336 path_style,
337 ..
338 } => {
339 let s3_client = get_s3_client(
340 access_key,
341 secret_key,
342 bucket,
343 region,
344 endpoint,
345 *path_style,
346 )?;
347 drop(settings);
348
349 let s3_prefix = if directory.is_empty() {
350 format!("{base}/")
351 } else {
352 format!("{base}/{directory}/")
353 };
354 let strip_prefix = format!("{base}/");
355
356 let results = s3_client
357 .list(s3_prefix.clone(), Some("/".to_string()))
358 .await?;
359
360 let storage_url_retriever = self.retrieve_urls().await?;
361
362 let mut dirs: Vec<StorageAsset> = Vec::new();
363 let mut files: Vec<StorageAsset> = Vec::new();
364
365 for result in &results {
366 if let Some(prefixes) = &result.common_prefixes {
367 for cp in prefixes {
368 let name = cp
369 .prefix
370 .trim_start_matches(&strip_prefix)
371 .trim_end_matches('/')
372 .to_compact_string();
373 dirs.push(StorageAsset {
374 url: storage_url_retriever.get_url(&cp.prefix),
375 name,
376 size: 0,
377 is_directory: true,
378 created: chrono::DateTime::<chrono::Utc>::default(),
379 });
380 }
381 }
382
383 for entry in &result.contents {
384 if entry.key == s3_prefix {
385 continue;
386 }
387 let name = entry
388 .key
389 .trim_start_matches(&strip_prefix)
390 .to_compact_string();
391 files.push(StorageAsset {
392 url: storage_url_retriever.get_url(&entry.key),
393 name,
394 size: entry.size,
395 is_directory: false,
396 created: entry.last_modified.parse().unwrap_or_default(),
397 });
398 }
399 }
400
401 let total = (dirs.len() + files.len()) as i64;
402 let start = (page - 1) * per_page;
403
404 Ok(crate::models::Pagination {
405 total,
406 per_page: per_page as i64,
407 page: page as i64,
408 data: dirs
409 .into_iter()
410 .chain(files)
411 .skip(start)
412 .take(per_page)
413 .collect(),
414 })
415 }
416 }
417 }
418}