1use crate::{State, models::InsertQueryBuilder, prelude::*};
2use compact_str::ToCompactString;
3use garde::Validate;
4use serde::{Deserialize, Serialize};
5use sqlx::{Row, postgres::PgRow};
6use std::{
7 collections::BTreeMap,
8 sync::{Arc, LazyLock},
9};
10use utoipa::ToSchema;
11
12pub type GetUserActivityLogger = crate::extract::ConsumingExtension<UserActivityLogger>;
13
14#[derive(Clone)]
15pub struct UserActivityLogger {
16 pub state: State,
17 pub user_uuid: uuid::Uuid,
18 pub impersonator_uuid: Option<uuid::Uuid>,
19 pub api_key_uuid: Option<uuid::Uuid>,
20 pub ip: std::net::IpAddr,
21}
22
23impl UserActivityLogger {
24 pub async fn log(&self, event: impl Into<compact_str::CompactString>, data: serde_json::Value) {
25 let options = CreateUserActivityOptions {
26 user_uuid: self.user_uuid,
27 impersonator_uuid: self.impersonator_uuid,
28 api_key_uuid: self.api_key_uuid,
29 event: event.into(),
30 ip: Some(self.ip.into()),
31 data,
32 created: None,
33 };
34 if let Err(err) = UserActivity::create(&self.state, options).await {
35 tracing::warn!(
36 user = %self.user_uuid,
37 "failed to log user activity: {:#?}",
38 err
39 );
40 }
41 }
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct UserActivity {
46 pub impersonator: Option<Fetchable<super::user::User>>,
47 pub api_key: Option<Fetchable<super::user_api_key::UserApiKey>>,
48
49 pub event: compact_str::CompactString,
50 pub ip: Option<sqlx::types::ipnetwork::IpNetwork>,
51 pub data: serde_json::Value,
52
53 pub created: chrono::NaiveDateTime,
54
55 extension_data: super::ModelExtensionData,
56}
57
58impl BaseModel for UserActivity {
59 const NAME: &'static str = "user_activity";
60
61 fn get_extension_list() -> &'static super::ModelExtensionList {
62 static EXTENSIONS: LazyLock<super::ModelExtensionList> =
63 LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
64
65 &EXTENSIONS
66 }
67
68 fn get_extension_data(&self) -> &super::ModelExtensionData {
69 &self.extension_data
70 }
71
72 #[inline]
73 fn base_columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
74 let prefix = prefix.unwrap_or_default();
75
76 BTreeMap::from([
77 (
78 "user_activities.impersonator_uuid",
79 compact_str::format_compact!("{prefix}impersonator_uuid"),
80 ),
81 (
82 "user_activities.api_key_uuid",
83 compact_str::format_compact!("{prefix}api_key_uuid"),
84 ),
85 (
86 "user_activities.event",
87 compact_str::format_compact!("{prefix}event"),
88 ),
89 (
90 "user_activities.ip",
91 compact_str::format_compact!("{prefix}ip"),
92 ),
93 (
94 "user_activities.data",
95 compact_str::format_compact!("{prefix}data"),
96 ),
97 (
98 "user_activities.created",
99 compact_str::format_compact!("{prefix}created"),
100 ),
101 ])
102 }
103
104 #[inline]
105 fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError> {
106 let prefix = prefix.unwrap_or_default();
107
108 Ok(Self {
109 impersonator: super::user::User::get_fetchable_from_row(
110 row,
111 compact_str::format_compact!("{prefix}impersonator_uuid"),
112 ),
113 api_key: super::user_api_key::UserApiKey::get_fetchable_from_row(
114 row,
115 compact_str::format_compact!("{prefix}api_key_uuid"),
116 ),
117 event: row.try_get(compact_str::format_compact!("{prefix}event").as_str())?,
118 ip: row.try_get(compact_str::format_compact!("{prefix}ip").as_str())?,
119 data: row.try_get(compact_str::format_compact!("{prefix}data").as_str())?,
120 created: row.try_get(compact_str::format_compact!("{prefix}created").as_str())?,
121 extension_data: Self::map_extensions(prefix, row)?,
122 })
123 }
124}
125
126impl UserActivity {
127 pub async fn by_user_uuid_with_pagination(
128 database: &crate::database::Database,
129 user_uuid: uuid::Uuid,
130 page: i64,
131 per_page: i64,
132 search: Option<&str>,
133 ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
134 let offset = (page - 1) * per_page;
135
136 let rows = sqlx::query(&format!(
137 r#"
138 SELECT {}, COUNT(*) OVER() AS total_count
139 FROM user_activities
140 WHERE user_activities.user_uuid = $1 AND ($2 IS NULL OR user_activities.event ILIKE '%' || $2 || '%')
141 ORDER BY user_activities.created DESC
142 LIMIT $3 OFFSET $4
143 "#,
144 Self::columns_sql(None)
145 ))
146 .bind(user_uuid)
147 .bind(search)
148 .bind(per_page)
149 .bind(offset)
150 .fetch_all(database.read())
151 .await?;
152
153 Ok(super::Pagination {
154 total: rows
155 .first()
156 .map_or(Ok(0), |row| row.try_get("total_count"))?,
157 per_page,
158 page,
159 data: rows
160 .into_iter()
161 .map(|row| Self::map(None, &row))
162 .try_collect_vec()?,
163 })
164 }
165
166 pub async fn delete_older_than(
167 database: &crate::database::Database,
168 cutoff: chrono::DateTime<chrono::Utc>,
169 ) -> Result<u64, crate::database::DatabaseError> {
170 let result = sqlx::query(
171 r#"
172 DELETE FROM user_activities
173 WHERE user_activities.created < $1
174 "#,
175 )
176 .bind(cutoff.naive_utc())
177 .execute(database.write())
178 .await?;
179
180 Ok(result.rows_affected())
181 }
182
183 pub async fn retain_latest_logs_per_user(
184 database: &crate::database::Database,
185 keep_count: i64,
186 ) -> Result<u64, crate::database::DatabaseError> {
187 let result = sqlx::query(
188 r#"
189 DELETE FROM user_activities
190 WHERE ctid IN (
191 SELECT ctid
192 FROM (
193 SELECT
194 ctid,
195 ROW_NUMBER() OVER (PARTITION BY user_activities.user_uuid ORDER BY user_activities.created DESC) rn
196 FROM user_activities
197 ) sub
198 WHERE sub.rn > $1
199 )
200 "#,
201 )
202 .bind(keep_count)
203 .execute(database.write())
204 .await?;
205
206 Ok(result.rows_affected())
207 }
208}
209
210#[async_trait::async_trait]
211impl IntoApiObject for UserActivity {
212 type ApiObject = ApiUserActivity;
213 type ExtraArgs<'a> = &'a crate::storage::StorageUrlRetriever<'a>;
214
215 async fn into_api_object<'a>(
216 self,
217 state: &crate::State,
218 storage_url_retriever: Self::ExtraArgs<'a>,
219 ) -> Result<Self::ApiObject, crate::database::DatabaseError> {
220 let api_object = ApiUserActivity::init_hooks(&self, state).await?;
221
222 let impersonator = if let Some(impersonator) = self.impersonator {
223 Some(
224 impersonator
225 .fetch_cached(&state.database)
226 .await?
227 .into_api_object(state, storage_url_retriever)
228 .await?,
229 )
230 } else {
231 None
232 };
233
234 let api_object = finish_extendible!(
235 ApiUserActivity {
236 impersonator,
237 event: self.event,
238 ip: self.ip.map(|ip| ip.ip().to_compact_string()),
239 data: self.data,
240 is_api: self.api_key.is_some(),
241 created: self.created.and_utc(),
242 },
243 api_object,
244 state
245 )?;
246
247 Ok(api_object)
248 }
249}
250
251#[derive(ToSchema, Deserialize, Validate)]
252pub struct CreateUserActivityOptions {
253 #[garde(skip)]
254 pub user_uuid: uuid::Uuid,
255 #[garde(skip)]
256 pub impersonator_uuid: Option<uuid::Uuid>,
257 #[garde(skip)]
258 pub api_key_uuid: Option<uuid::Uuid>,
259 #[garde(length(chars, min = 1, max = 255))]
260 #[schema(min_length = 1, max_length = 255)]
261 pub event: compact_str::CompactString,
262 #[garde(skip)]
263 #[schema(value_type = Option<String>)]
264 pub ip: Option<sqlx::types::ipnetwork::IpNetwork>,
265 #[garde(skip)]
266 pub data: serde_json::Value,
267 #[garde(skip)]
268 pub created: Option<chrono::NaiveDateTime>,
269}
270
271#[async_trait::async_trait]
272impl CreatableModel for UserActivity {
273 type CreateOptions<'a> = CreateUserActivityOptions;
274 type CreateResult = ();
275
276 fn get_create_handlers() -> &'static LazyLock<CreateListenerList<Self>> {
277 static CREATE_LISTENERS: LazyLock<CreateListenerList<UserActivity>> =
278 LazyLock::new(|| Arc::new(ModelHandlerList::default()));
279
280 &CREATE_LISTENERS
281 }
282
283 async fn create_with_transaction(
284 state: &crate::State,
285 mut options: Self::CreateOptions<'_>,
286 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
287 ) -> Result<Self::CreateResult, crate::database::DatabaseError> {
288 options.validate()?;
289
290 let mut query_builder = InsertQueryBuilder::new("user_activities");
291
292 Self::run_create_handlers(&mut options, &mut query_builder, state, transaction).await?;
293
294 query_builder
295 .set("user_uuid", options.user_uuid)
296 .set("impersonator_uuid", options.impersonator_uuid)
297 .set("api_key_uuid", options.api_key_uuid)
298 .set("event", &options.event)
299 .set("ip", options.ip)
300 .set("data", &options.data);
301
302 if let Some(created) = options.created {
303 query_builder.set("created", created);
304 }
305
306 query_builder.execute(&mut **transaction).await?;
307
308 let mut result = ();
309
310 Self::run_after_create_handlers(&mut result, &options, state, transaction).await?;
311
312 Ok(result)
313 }
314}
315
316#[schema_extension_derive::extendible]
317#[init_args(UserActivity, crate::State)]
318#[hook_args(crate::State)]
319#[derive(ToSchema, Serialize)]
320#[schema(title = "UserActivity")]
321pub struct ApiUserActivity {
322 pub impersonator: Option<super::user::ApiUser>,
323
324 pub event: compact_str::CompactString,
325 pub ip: Option<compact_str::CompactString>,
326 pub data: serde_json::Value,
327
328 pub is_api: bool,
329
330 pub created: chrono::DateTime<chrono::Utc>,
331}