Skip to main content

shared/models/
user_activity.rs

1use crate::{State, models::InsertQueryBuilder, prelude::*};
2use compact_str::ToCompactString;
3use garde::Validate;
4use serde::{Deserialize, Serialize};
5use sqlx::{Row, postgres::PgRow};
6use std::{
7    collections::BTreeMap,
8    sync::{Arc, LazyLock},
9};
10use utoipa::ToSchema;
11
12pub type GetUserActivityLogger = crate::extract::ConsumingExtension<UserActivityLogger>;
13
14#[derive(Clone)]
15pub struct UserActivityLogger {
16    pub state: State,
17    pub user_uuid: uuid::Uuid,
18    pub impersonator_uuid: Option<uuid::Uuid>,
19    pub api_key_uuid: Option<uuid::Uuid>,
20    pub ip: std::net::IpAddr,
21}
22
23impl UserActivityLogger {
24    pub async fn log(&self, event: impl Into<compact_str::CompactString>, data: serde_json::Value) {
25        let options = CreateUserActivityOptions {
26            user_uuid: self.user_uuid,
27            impersonator_uuid: self.impersonator_uuid,
28            api_key_uuid: self.api_key_uuid,
29            event: event.into(),
30            ip: Some(self.ip.into()),
31            data,
32            created: None,
33        };
34        if let Err(err) = UserActivity::create(&self.state, options).await {
35            tracing::warn!(
36                user = %self.user_uuid,
37                "failed to log user activity: {:#?}",
38                err
39            );
40        }
41    }
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct UserActivity {
46    pub impersonator: Option<Fetchable<super::user::User>>,
47    pub api_key: Option<Fetchable<super::user_api_key::UserApiKey>>,
48
49    pub event: compact_str::CompactString,
50    pub ip: Option<sqlx::types::ipnetwork::IpNetwork>,
51    pub data: serde_json::Value,
52
53    pub created: chrono::NaiveDateTime,
54
55    extension_data: super::ModelExtensionData,
56}
57
58impl BaseModel for UserActivity {
59    const NAME: &'static str = "user_activity";
60
61    fn get_extension_list() -> &'static super::ModelExtensionList {
62        static EXTENSIONS: LazyLock<super::ModelExtensionList> =
63            LazyLock::new(|| std::sync::RwLock::new(Vec::new()));
64
65        &EXTENSIONS
66    }
67
68    fn get_extension_data(&self) -> &super::ModelExtensionData {
69        &self.extension_data
70    }
71
72    #[inline]
73    fn base_columns(prefix: Option<&str>) -> BTreeMap<&'static str, compact_str::CompactString> {
74        let prefix = prefix.unwrap_or_default();
75
76        BTreeMap::from([
77            (
78                "user_activities.impersonator_uuid",
79                compact_str::format_compact!("{prefix}impersonator_uuid"),
80            ),
81            (
82                "user_activities.api_key_uuid",
83                compact_str::format_compact!("{prefix}api_key_uuid"),
84            ),
85            (
86                "user_activities.event",
87                compact_str::format_compact!("{prefix}event"),
88            ),
89            (
90                "user_activities.ip",
91                compact_str::format_compact!("{prefix}ip"),
92            ),
93            (
94                "user_activities.data",
95                compact_str::format_compact!("{prefix}data"),
96            ),
97            (
98                "user_activities.created",
99                compact_str::format_compact!("{prefix}created"),
100            ),
101        ])
102    }
103
104    #[inline]
105    fn map(prefix: Option<&str>, row: &PgRow) -> Result<Self, crate::database::DatabaseError> {
106        let prefix = prefix.unwrap_or_default();
107
108        Ok(Self {
109            impersonator: super::user::User::get_fetchable_from_row(
110                row,
111                compact_str::format_compact!("{prefix}impersonator_uuid"),
112            ),
113            api_key: super::user_api_key::UserApiKey::get_fetchable_from_row(
114                row,
115                compact_str::format_compact!("{prefix}api_key_uuid"),
116            ),
117            event: row.try_get(compact_str::format_compact!("{prefix}event").as_str())?,
118            ip: row.try_get(compact_str::format_compact!("{prefix}ip").as_str())?,
119            data: row.try_get(compact_str::format_compact!("{prefix}data").as_str())?,
120            created: row.try_get(compact_str::format_compact!("{prefix}created").as_str())?,
121            extension_data: Self::map_extensions(prefix, row)?,
122        })
123    }
124}
125
126impl UserActivity {
127    pub async fn by_user_uuid_with_pagination(
128        database: &crate::database::Database,
129        user_uuid: uuid::Uuid,
130        page: i64,
131        per_page: i64,
132        search: Option<&str>,
133    ) -> Result<super::Pagination<Self>, crate::database::DatabaseError> {
134        let offset = (page - 1) * per_page;
135
136        let rows = sqlx::query(&format!(
137            r#"
138            SELECT {}, COUNT(*) OVER() AS total_count
139            FROM user_activities
140            WHERE user_activities.user_uuid = $1 AND ($2 IS NULL OR user_activities.event ILIKE '%' || $2 || '%')
141            ORDER BY user_activities.created DESC
142            LIMIT $3 OFFSET $4
143            "#,
144            Self::columns_sql(None)
145        ))
146        .bind(user_uuid)
147        .bind(search)
148        .bind(per_page)
149        .bind(offset)
150        .fetch_all(database.read())
151        .await?;
152
153        Ok(super::Pagination {
154            total: rows
155                .first()
156                .map_or(Ok(0), |row| row.try_get("total_count"))?,
157            per_page,
158            page,
159            data: rows
160                .into_iter()
161                .map(|row| Self::map(None, &row))
162                .try_collect_vec()?,
163        })
164    }
165
166    pub async fn delete_older_than(
167        database: &crate::database::Database,
168        cutoff: chrono::DateTime<chrono::Utc>,
169    ) -> Result<u64, crate::database::DatabaseError> {
170        let result = sqlx::query(
171            r#"
172            DELETE FROM user_activities
173            WHERE user_activities.created < $1
174            "#,
175        )
176        .bind(cutoff.naive_utc())
177        .execute(database.write())
178        .await?;
179
180        Ok(result.rows_affected())
181    }
182
183    pub async fn retain_latest_logs_per_user(
184        database: &crate::database::Database,
185        keep_count: i64,
186    ) -> Result<u64, crate::database::DatabaseError> {
187        let result = sqlx::query(
188            r#"
189            DELETE FROM user_activities
190            WHERE ctid IN (
191                SELECT ctid 
192                FROM (
193                    SELECT
194                        ctid, 
195                        ROW_NUMBER() OVER (PARTITION BY user_activities.user_uuid ORDER BY user_activities.created DESC) rn
196                    FROM user_activities
197                ) sub
198                WHERE sub.rn > $1
199            )
200            "#,
201        )
202        .bind(keep_count)
203        .execute(database.write())
204        .await?;
205
206        Ok(result.rows_affected())
207    }
208}
209
210#[async_trait::async_trait]
211impl IntoApiObject for UserActivity {
212    type ApiObject = ApiUserActivity;
213    type ExtraArgs<'a> = &'a crate::storage::StorageUrlRetriever<'a>;
214
215    async fn into_api_object<'a>(
216        self,
217        state: &crate::State,
218        storage_url_retriever: Self::ExtraArgs<'a>,
219    ) -> Result<Self::ApiObject, crate::database::DatabaseError> {
220        let api_object = ApiUserActivity::init_hooks(&self, state).await?;
221
222        let impersonator = if let Some(impersonator) = self.impersonator {
223            Some(
224                impersonator
225                    .fetch_cached(&state.database)
226                    .await?
227                    .into_api_object(state, storage_url_retriever)
228                    .await?,
229            )
230        } else {
231            None
232        };
233
234        let api_object = finish_extendible!(
235            ApiUserActivity {
236                impersonator,
237                event: self.event,
238                ip: self.ip.map(|ip| ip.ip().to_compact_string()),
239                data: self.data,
240                is_api: self.api_key.is_some(),
241                created: self.created.and_utc(),
242            },
243            api_object,
244            state
245        )?;
246
247        Ok(api_object)
248    }
249}
250
251#[derive(ToSchema, Deserialize, Validate)]
252pub struct CreateUserActivityOptions {
253    #[garde(skip)]
254    pub user_uuid: uuid::Uuid,
255    #[garde(skip)]
256    pub impersonator_uuid: Option<uuid::Uuid>,
257    #[garde(skip)]
258    pub api_key_uuid: Option<uuid::Uuid>,
259    #[garde(length(chars, min = 1, max = 255))]
260    #[schema(min_length = 1, max_length = 255)]
261    pub event: compact_str::CompactString,
262    #[garde(skip)]
263    #[schema(value_type = Option<String>)]
264    pub ip: Option<sqlx::types::ipnetwork::IpNetwork>,
265    #[garde(skip)]
266    pub data: serde_json::Value,
267    #[garde(skip)]
268    pub created: Option<chrono::NaiveDateTime>,
269}
270
271#[async_trait::async_trait]
272impl CreatableModel for UserActivity {
273    type CreateOptions<'a> = CreateUserActivityOptions;
274    type CreateResult = ();
275
276    fn get_create_handlers() -> &'static LazyLock<CreateListenerList<Self>> {
277        static CREATE_LISTENERS: LazyLock<CreateListenerList<UserActivity>> =
278            LazyLock::new(|| Arc::new(ModelHandlerList::default()));
279
280        &CREATE_LISTENERS
281    }
282
283    async fn create_with_transaction(
284        state: &crate::State,
285        mut options: Self::CreateOptions<'_>,
286        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
287    ) -> Result<Self::CreateResult, crate::database::DatabaseError> {
288        options.validate()?;
289
290        let mut query_builder = InsertQueryBuilder::new("user_activities");
291
292        Self::run_create_handlers(&mut options, &mut query_builder, state, transaction).await?;
293
294        query_builder
295            .set("user_uuid", options.user_uuid)
296            .set("impersonator_uuid", options.impersonator_uuid)
297            .set("api_key_uuid", options.api_key_uuid)
298            .set("event", &options.event)
299            .set("ip", options.ip)
300            .set("data", &options.data);
301
302        if let Some(created) = options.created {
303            query_builder.set("created", created);
304        }
305
306        query_builder.execute(&mut **transaction).await?;
307
308        let mut result = ();
309
310        Self::run_after_create_handlers(&mut result, &options, state, transaction).await?;
311
312        Ok(result)
313    }
314}
315
316#[schema_extension_derive::extendible]
317#[init_args(UserActivity, crate::State)]
318#[hook_args(crate::State)]
319#[derive(ToSchema, Serialize)]
320#[schema(title = "UserActivity")]
321pub struct ApiUserActivity {
322    pub impersonator: Option<super::user::ApiUser>,
323
324    pub event: compact_str::CompactString,
325    pub ip: Option<compact_str::CompactString>,
326    pub data: serde_json::Value,
327
328    pub is_api: bool,
329
330    pub created: chrono::DateTime<chrono::Utc>,
331}