use std::collections::{BTreeMap, HashMap}; use axum::http::HeaderMap; use chrono::{DateTime, Duration, NaiveDate, Utc}; use loco_rs::prelude::*; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Set, }; use serde::Serialize; use crate::models::_entities::{content_events, posts, query_events}; const EVENT_TYPE_SEARCH: &str = "search"; const EVENT_TYPE_AI_QUESTION: &str = "ai_question"; pub const CONTENT_EVENT_PAGE_VIEW: &str = "page_view"; pub const CONTENT_EVENT_READ_PROGRESS: &str = "read_progress"; pub const CONTENT_EVENT_READ_COMPLETE: &str = "read_complete"; #[derive(Clone, Debug, Default)] pub struct QueryEventRequestContext { pub request_path: Option, pub referrer: Option, pub user_agent: Option, } #[derive(Clone, Debug)] pub struct QueryEventDraft { pub event_type: String, pub query_text: String, pub request_context: QueryEventRequestContext, pub result_count: Option, pub success: Option, pub response_mode: Option, pub provider: Option, pub chat_model: Option, pub latency_ms: Option, } #[derive(Clone, Debug, Default)] pub struct ContentEventRequestContext { pub path: Option, pub referrer: Option, pub user_agent: Option, } #[derive(Clone, Debug)] pub struct ContentEventDraft { pub event_type: String, pub path: String, pub post_slug: Option, pub session_id: Option, pub request_context: ContentEventRequestContext, pub duration_ms: Option, pub progress_percent: Option, pub metadata: Option, } #[derive(Clone, Debug, Serialize)] pub struct AnalyticsOverview { pub total_searches: u64, pub total_ai_questions: u64, pub searches_last_24h: u64, pub ai_questions_last_24h: u64, pub searches_last_7d: u64, pub ai_questions_last_7d: u64, pub unique_search_terms_last_7d: usize, pub unique_ai_questions_last_7d: usize, pub avg_search_results_last_7d: f64, pub avg_ai_latency_ms_last_7d: Option, } #[derive(Clone, Debug, Serialize)] pub struct ContentAnalyticsOverview { pub total_page_views: u64, pub page_views_last_24h: u64, pub page_views_last_7d: u64, pub total_read_completes: u64, pub read_completes_last_7d: u64, pub avg_read_progress_last_7d: f64, pub avg_read_duration_ms_last_7d: Option, } #[derive(Clone, Debug, Serialize)] pub struct AnalyticsTopQuery { pub query: String, pub count: u64, pub last_seen_at: String, } #[derive(Clone, Debug, Serialize)] pub struct AnalyticsRecentEvent { pub id: i32, pub event_type: String, pub query: String, pub result_count: Option, pub success: Option, pub response_mode: Option, pub provider: Option, pub chat_model: Option, pub latency_ms: Option, pub created_at: String, } #[derive(Clone, Debug, Serialize)] pub struct AnalyticsProviderBucket { pub provider: String, pub count: u64, } #[derive(Clone, Debug, Serialize)] pub struct AnalyticsReferrerBucket { pub referrer: String, pub count: u64, } #[derive(Clone, Debug, Serialize)] pub struct AnalyticsPopularPost { pub slug: String, pub title: String, pub page_views: u64, pub read_completes: u64, pub avg_progress_percent: f64, pub avg_duration_ms: Option, } #[derive(Clone, Debug, Serialize)] pub struct AnalyticsDailyBucket { pub date: String, pub searches: u64, pub ai_questions: u64, } #[derive(Clone, Debug, Serialize)] pub struct AdminAnalyticsResponse { pub overview: AnalyticsOverview, pub content_overview: ContentAnalyticsOverview, pub top_search_terms: Vec, pub top_ai_questions: Vec, pub recent_events: Vec, pub providers_last_7d: Vec, pub top_referrers: Vec, pub ai_referrers_last_7d: Vec, pub ai_discovery_page_views_last_7d: u64, pub popular_posts: Vec, pub daily_activity: Vec, } #[derive(Clone, Debug, Serialize)] pub struct PublicContentHighlights { pub overview: ContentAnalyticsOverview, pub popular_posts: Vec, } #[derive(Clone, Debug, Serialize)] pub struct PublicContentWindowOverview { pub page_views: u64, pub read_completes: u64, pub avg_read_progress: f64, pub avg_read_duration_ms: Option, } #[derive(Clone, Debug, Serialize)] pub struct PublicContentWindowHighlights { pub key: String, pub label: String, pub days: i32, pub overview: PublicContentWindowOverview, pub popular_posts: Vec, } #[derive(Clone, Debug)] struct QueryAggregate { query: String, count: u64, last_seen_at: DateTime, } fn trim_to_option(value: Option) -> Option { value.and_then(|item| { let trimmed = item.trim().to_string(); if trimmed.is_empty() { None } else { Some(trimmed) } }) } fn normalize_query(value: &str) -> String { value .split_whitespace() .collect::>() .join(" ") .to_lowercase() } fn format_timestamp(value: DateTime) -> String { value.format("%Y-%m-%d %H:%M").to_string() } fn metadata_string(metadata: Option<&serde_json::Value>, key: &str) -> Option { metadata .and_then(|value| value.get(key)) .and_then(|value| value.as_str()) .map(ToString::to_string) .and_then(|value| trim_to_option(Some(value))) } fn parse_path_query_value(path: Option<&str>, key: &str) -> Option { let path = path.and_then(|value| trim_to_option(Some(value.to_string())))?; let synthetic_url = if path.starts_with("http://") || path.starts_with("https://") { path } else if path.starts_with('/') { format!("https://local.test{path}") } else { format!("https://local.test/{path}") }; reqwest::Url::parse(&synthetic_url) .ok() .and_then(|url| { url.query_pairs() .find(|(item_key, _)| item_key == key) .map(|(_, value)| value.to_string()) }) .and_then(|value| trim_to_option(Some(value))) } fn normalize_tracking_source_token(value: Option) -> String { let Some(value) = trim_to_option(value) else { return "direct".to_string(); }; let normalized = reqwest::Url::parse(&value) .ok() .and_then(|url| url.host_str().map(ToString::to_string)) .filter(|item| !item.trim().is_empty()) .unwrap_or(value) .trim() .to_ascii_lowercase(); match normalized.as_str() { "direct" => "direct".to_string(), value if value.contains("chatgpt") || value.contains("openai") => { "chatgpt-search".to_string() } value if value.contains("perplexity") => "perplexity".to_string(), value if value.contains("copilot") || value.contains("bing") => "copilot-bing".to_string(), value if value.contains("gemini") => "gemini".to_string(), value if value.contains("google") => "google".to_string(), value if value.contains("claude") => "claude".to_string(), value if value.contains("duckduckgo") => "duckduckgo".to_string(), value if value.contains("kagi") => "kagi".to_string(), _ => normalized, } } fn normalize_tracking_source( path: Option<&str>, referrer: Option, metadata: Option<&serde_json::Value>, ) -> String { let preferred = metadata_string(metadata, "landingSource") .or_else(|| metadata_string(metadata, "landing_source")) .or_else(|| metadata_string(metadata, "utmSource")) .or_else(|| metadata_string(metadata, "utm_source")) .or_else(|| parse_path_query_value(path, "utm_source")) .or_else(|| metadata_string(metadata, "referrerHost")) .or_else(|| referrer); normalize_tracking_source_token(preferred) } fn is_ai_discovery_source(value: &str) -> bool { matches!( value, "chatgpt-search" | "perplexity" | "copilot-bing" | "gemini" | "claude" ) } fn sorted_referrer_buckets( breakdown: &HashMap, predicate: impl Fn(&str) -> bool, limit: usize, ) -> Vec { let mut items = breakdown .iter() .filter_map(|(referrer, count)| { predicate(referrer).then(|| AnalyticsReferrerBucket { referrer: referrer.clone(), count: *count, }) }) .collect::>(); items.sort_by(|left, right| { right .count .cmp(&left.count) .then_with(|| left.referrer.cmp(&right.referrer)) }); items.truncate(limit); items } fn header_value(headers: &HeaderMap, key: &str) -> Option { headers .get(key) .and_then(|value| value.to_str().ok()) .map(ToString::to_string) .and_then(|value| trim_to_option(Some(value))) } fn clamp_latency(latency_ms: i64) -> i32 { latency_ms.clamp(0, i64::from(i32::MAX)) as i32 } fn clamp_percentage(value: i32) -> i32 { value.clamp(0, 100) } fn build_query_aggregates( events: &[query_events::Model], wanted_type: &str, ) -> Vec { let mut grouped: HashMap = HashMap::new(); for event in events .iter() .filter(|event| event.event_type == wanted_type) { let entry = grouped .entry(event.normalized_query.clone()) .or_insert_with(|| QueryAggregate { query: event.query_text.clone(), count: 0, last_seen_at: event.created_at.into(), }); entry.count += 1; let created_at = DateTime::::from(event.created_at); if created_at >= entry.last_seen_at { entry.query = event.query_text.clone(); entry.last_seen_at = created_at; } } let mut items = grouped.into_values().collect::>(); items.sort_by(|left, right| { right .count .cmp(&left.count) .then_with(|| right.last_seen_at.cmp(&left.last_seen_at)) }); items } fn aggregate_queries( events: &[query_events::Model], wanted_type: &str, limit: usize, ) -> (usize, Vec) { let aggregates = build_query_aggregates(events, wanted_type); let total_unique = aggregates.len(); let items = aggregates .into_iter() .take(limit) .map(|item| AnalyticsTopQuery { query: item.query, count: item.count, last_seen_at: format_timestamp(item.last_seen_at), }) .collect::>(); (total_unique, items) } pub fn request_context_from_headers(path: &str, headers: &HeaderMap) -> QueryEventRequestContext { QueryEventRequestContext { request_path: trim_to_option(Some(path.to_string())), referrer: header_value(headers, "referer"), user_agent: header_value(headers, "user-agent"), } } pub fn content_request_context_from_headers( path: &str, headers: &HeaderMap, ) -> ContentEventRequestContext { ContentEventRequestContext { path: trim_to_option(Some(path.to_string())), referrer: header_value(headers, "referer"), user_agent: header_value(headers, "user-agent"), } } pub async fn record_event(ctx: &AppContext, draft: QueryEventDraft) { let query_text = draft.query_text.trim().to_string(); if query_text.is_empty() { return; } let active_model = query_events::ActiveModel { event_type: Set(draft.event_type), query_text: Set(query_text.clone()), normalized_query: Set(normalize_query(&query_text)), request_path: Set(trim_to_option(draft.request_context.request_path)), referrer: Set(trim_to_option(draft.request_context.referrer)), user_agent: Set(trim_to_option(draft.request_context.user_agent)), result_count: Set(draft.result_count), success: Set(draft.success), response_mode: Set(trim_to_option(draft.response_mode)), provider: Set(trim_to_option(draft.provider)), chat_model: Set(trim_to_option(draft.chat_model)), latency_ms: Set(draft.latency_ms.map(|value| value.max(0))), ..Default::default() }; if let Err(error) = active_model.insert(&ctx.db).await { tracing::warn!("failed to record query analytics event: {error}"); } } pub async fn record_content_event(ctx: &AppContext, draft: ContentEventDraft) { let path = draft.path.trim().to_string(); if path.is_empty() { return; } let event_type = draft.event_type.trim().to_ascii_lowercase(); if !matches!( event_type.as_str(), CONTENT_EVENT_PAGE_VIEW | CONTENT_EVENT_READ_PROGRESS | CONTENT_EVENT_READ_COMPLETE ) { return; } let active_model = content_events::ActiveModel { event_type: Set(event_type), path: Set(path), post_slug: Set(trim_to_option(draft.post_slug)), session_id: Set(trim_to_option(draft.session_id)), referrer: Set(trim_to_option(draft.request_context.referrer)), user_agent: Set(trim_to_option(draft.request_context.user_agent)), duration_ms: Set(draft.duration_ms.map(|value| value.max(0))), progress_percent: Set(draft.progress_percent.map(clamp_percentage)), metadata: Set(draft.metadata), ..Default::default() }; if let Err(error) = active_model.insert(&ctx.db).await { tracing::warn!("failed to record content analytics event: {error}"); } } pub async fn record_search_event( ctx: &AppContext, query_text: &str, result_count: usize, headers: &HeaderMap, latency_ms: i64, ) { record_event( ctx, QueryEventDraft { event_type: EVENT_TYPE_SEARCH.to_string(), query_text: query_text.to_string(), request_context: request_context_from_headers("/api/search", headers), result_count: Some(result_count.min(i32::MAX as usize) as i32), success: Some(true), response_mode: None, provider: None, chat_model: None, latency_ms: Some(clamp_latency(latency_ms)), }, ) .await; } pub async fn record_ai_question_event( ctx: &AppContext, question: &str, headers: &HeaderMap, success: bool, response_mode: &str, provider: Option, chat_model: Option, result_count: Option, latency_ms: i64, ) { record_event( ctx, QueryEventDraft { event_type: EVENT_TYPE_AI_QUESTION.to_string(), query_text: question.to_string(), request_context: request_context_from_headers( if response_mode == "stream" { "/api/ai/ask/stream" } else { "/api/ai/ask" }, headers, ), result_count: result_count.map(|value| value.min(i32::MAX as usize) as i32), success: Some(success), response_mode: Some(response_mode.to_string()), provider, chat_model, latency_ms: Some(clamp_latency(latency_ms)), }, ) .await; } pub async fn build_admin_analytics(ctx: &AppContext) -> Result { let now = Utc::now(); let since_24h = now - Duration::hours(24); let since_7d = now - Duration::days(7); let total_searches = query_events::Entity::find() .filter(query_events::Column::EventType.eq(EVENT_TYPE_SEARCH)) .count(&ctx.db) .await?; let total_ai_questions = query_events::Entity::find() .filter(query_events::Column::EventType.eq(EVENT_TYPE_AI_QUESTION)) .count(&ctx.db) .await?; let searches_last_24h = query_events::Entity::find() .filter(query_events::Column::EventType.eq(EVENT_TYPE_SEARCH)) .filter(query_events::Column::CreatedAt.gte(since_24h)) .count(&ctx.db) .await?; let ai_questions_last_24h = query_events::Entity::find() .filter(query_events::Column::EventType.eq(EVENT_TYPE_AI_QUESTION)) .filter(query_events::Column::CreatedAt.gte(since_24h)) .count(&ctx.db) .await?; let total_page_views = content_events::Entity::find() .filter(content_events::Column::EventType.eq(CONTENT_EVENT_PAGE_VIEW)) .count(&ctx.db) .await?; let total_read_completes = content_events::Entity::find() .filter(content_events::Column::EventType.eq(CONTENT_EVENT_READ_COMPLETE)) .count(&ctx.db) .await?; let last_7d_events = query_events::Entity::find() .filter(query_events::Column::CreatedAt.gte(since_7d)) .order_by_desc(query_events::Column::CreatedAt) .all(&ctx.db) .await?; let last_7d_content_events = content_events::Entity::find() .filter(content_events::Column::CreatedAt.gte(since_7d)) .order_by_desc(content_events::Column::CreatedAt) .all(&ctx.db) .await?; let searches_last_7d = last_7d_events .iter() .filter(|event| event.event_type == EVENT_TYPE_SEARCH) .count() as u64; let ai_questions_last_7d = last_7d_events .iter() .filter(|event| event.event_type == EVENT_TYPE_AI_QUESTION) .count() as u64; let (unique_search_terms_last_7d, top_search_terms) = aggregate_queries(&last_7d_events, EVENT_TYPE_SEARCH, 8); let (unique_ai_questions_last_7d, top_ai_questions) = aggregate_queries(&last_7d_events, EVENT_TYPE_AI_QUESTION, 8); let mut provider_breakdown: HashMap = HashMap::new(); let mut daily_map: BTreeMap = BTreeMap::new(); let mut total_search_results = 0.0_f64; let mut counted_search_results = 0_u64; let mut total_ai_latency = 0.0_f64; let mut counted_ai_latency = 0_u64; let mut referrer_breakdown: HashMap = HashMap::new(); let mut total_read_progress = 0.0_f64; let mut counted_read_progress = 0_u64; let mut total_read_duration = 0.0_f64; let mut counted_read_duration = 0_u64; let mut page_views_last_24h = 0_u64; let mut page_views_last_7d = 0_u64; let mut read_completes_last_7d = 0_u64; for offset in 0..7 { let date = (now - Duration::days(offset)).date_naive(); daily_map.entry(date).or_insert((0, 0)); } for event in &last_7d_events { let day = DateTime::::from(event.created_at).date_naive(); let entry = daily_map.entry(day).or_insert((0, 0)); if event.event_type == EVENT_TYPE_SEARCH { entry.0 += 1; if let Some(result_count) = event.result_count { total_search_results += f64::from(result_count.max(0)); counted_search_results += 1; } continue; } if event.event_type == EVENT_TYPE_AI_QUESTION { entry.1 += 1; let provider = event .provider .clone() .filter(|value| !value.trim().is_empty()) .unwrap_or_else(|| "local-or-unspecified".to_string()); *provider_breakdown.entry(provider).or_insert(0) += 1; if let Some(latency_ms) = event.latency_ms { total_ai_latency += f64::from(latency_ms.max(0)); counted_ai_latency += 1; } } } let post_titles = posts::Entity::find() .all(&ctx.db) .await? .into_iter() .map(|post| { ( post.slug, post.title.unwrap_or_else(|| "Untitled post".to_string()), ) }) .collect::>(); let mut post_breakdown: HashMap = HashMap::new(); for event in &last_7d_content_events { let created_at = DateTime::::from(event.created_at); if event.event_type == CONTENT_EVENT_PAGE_VIEW { page_views_last_7d += 1; if created_at >= since_24h { page_views_last_24h += 1; } let referrer = normalize_tracking_source( Some(&event.path), event.referrer.clone(), event.metadata.as_ref(), ); *referrer_breakdown.entry(referrer).or_insert(0) += 1; } if event.event_type == CONTENT_EVENT_READ_COMPLETE { read_completes_last_7d += 1; } if matches!( event.event_type.as_str(), CONTENT_EVENT_READ_PROGRESS | CONTENT_EVENT_READ_COMPLETE ) { let progress = event.progress_percent.unwrap_or({ if event.event_type == CONTENT_EVENT_READ_COMPLETE { 100 } else { 0 } }); if progress > 0 { total_read_progress += f64::from(progress); counted_read_progress += 1; } if let Some(duration_ms) = event.duration_ms.filter(|value| *value >= 0) { total_read_duration += f64::from(duration_ms); counted_read_duration += 1; } } let Some(post_slug) = event .post_slug .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) .map(ToString::to_string) else { continue; }; let entry = post_breakdown .entry(post_slug) .or_insert((0, 0, 0.0, 0, 0.0, 0)); if event.event_type == CONTENT_EVENT_PAGE_VIEW { entry.0 += 1; } if event.event_type == CONTENT_EVENT_READ_COMPLETE { entry.1 += 1; } if matches!( event.event_type.as_str(), CONTENT_EVENT_READ_PROGRESS | CONTENT_EVENT_READ_COMPLETE ) { let progress = event.progress_percent.unwrap_or({ if event.event_type == CONTENT_EVENT_READ_COMPLETE { 100 } else { 0 } }); if progress > 0 { entry.2 += f64::from(progress); entry.3 += 1; } if let Some(duration_ms) = event.duration_ms.filter(|value| *value >= 0) { entry.4 += f64::from(duration_ms); entry.5 += 1; } } } let mut providers_last_7d = provider_breakdown .into_iter() .map(|(provider, count)| AnalyticsProviderBucket { provider, count }) .collect::>(); providers_last_7d.sort_by(|left, right| { right .count .cmp(&left.count) .then_with(|| left.provider.cmp(&right.provider)) }); providers_last_7d.truncate(6); let top_referrers = sorted_referrer_buckets(&referrer_breakdown, |_| true, 8); let ai_referrers_last_7d = sorted_referrer_buckets(&referrer_breakdown, is_ai_discovery_source, 6); let ai_discovery_page_views_last_7d = referrer_breakdown .iter() .filter(|(referrer, _)| is_ai_discovery_source(referrer)) .map(|(_, count)| *count) .sum::(); let mut popular_posts = post_breakdown .into_iter() .map( |( slug, ( page_views, read_completes, total_progress, progress_count, total_duration, duration_count, ), )| { AnalyticsPopularPost { title: post_titles .get(&slug) .cloned() .unwrap_or_else(|| slug.clone()), slug, page_views, read_completes, avg_progress_percent: if progress_count > 0 { total_progress / progress_count as f64 } else { 0.0 }, avg_duration_ms: (duration_count > 0) .then(|| total_duration / duration_count as f64), } }, ) .collect::>(); popular_posts.sort_by(|left, right| { right .page_views .cmp(&left.page_views) .then_with(|| right.read_completes.cmp(&left.read_completes)) .then_with(|| left.slug.cmp(&right.slug)) }); popular_posts.truncate(10); let mut daily_activity = daily_map .into_iter() .map(|(date, (searches, ai_questions))| AnalyticsDailyBucket { date: date.format("%Y-%m-%d").to_string(), searches, ai_questions, }) .collect::>(); daily_activity.sort_by(|left, right| left.date.cmp(&right.date)); let recent_events = query_events::Entity::find() .order_by_desc(query_events::Column::CreatedAt) .limit(24) .all(&ctx.db) .await? .into_iter() .map(|event| AnalyticsRecentEvent { id: event.id, event_type: event.event_type, query: event.query_text, result_count: event.result_count, success: event.success, response_mode: event.response_mode, provider: event.provider, chat_model: event.chat_model, latency_ms: event.latency_ms, created_at: format_timestamp(event.created_at.into()), }) .collect::>(); Ok(AdminAnalyticsResponse { overview: AnalyticsOverview { total_searches, total_ai_questions, searches_last_24h, ai_questions_last_24h, searches_last_7d, ai_questions_last_7d, unique_search_terms_last_7d, unique_ai_questions_last_7d, avg_search_results_last_7d: if counted_search_results > 0 { total_search_results / counted_search_results as f64 } else { 0.0 }, avg_ai_latency_ms_last_7d: (counted_ai_latency > 0) .then(|| total_ai_latency / counted_ai_latency as f64), }, content_overview: ContentAnalyticsOverview { total_page_views, page_views_last_24h, page_views_last_7d, total_read_completes, read_completes_last_7d, avg_read_progress_last_7d: if counted_read_progress > 0 { total_read_progress / counted_read_progress as f64 } else { 0.0 }, avg_read_duration_ms_last_7d: (counted_read_duration > 0) .then(|| total_read_duration / counted_read_duration as f64), }, top_search_terms, top_ai_questions, recent_events, providers_last_7d, top_referrers, ai_referrers_last_7d, ai_discovery_page_views_last_7d, popular_posts, daily_activity, }) } pub async fn build_public_content_highlights( ctx: &AppContext, public_posts: &[posts::Model], ) -> Result { if public_posts.is_empty() { return Ok(PublicContentHighlights { overview: ContentAnalyticsOverview { total_page_views: 0, page_views_last_24h: 0, page_views_last_7d: 0, total_read_completes: 0, read_completes_last_7d: 0, avg_read_progress_last_7d: 0.0, avg_read_duration_ms_last_7d: None, }, popular_posts: Vec::new(), }); } let now = Utc::now(); let since_24h = now - Duration::hours(24); let since_7d = now - Duration::days(7); let public_slugs = public_posts .iter() .map(|post| post.slug.clone()) .collect::>(); let post_titles = public_posts .iter() .map(|post| { ( post.slug.clone(), trim_to_option(post.title.clone()).unwrap_or_else(|| post.slug.clone()), ) }) .collect::>(); let total_page_views = content_events::Entity::find() .filter(content_events::Column::EventType.eq(CONTENT_EVENT_PAGE_VIEW)) .filter(content_events::Column::PostSlug.is_in(public_slugs.clone())) .count(&ctx.db) .await?; let total_read_completes = content_events::Entity::find() .filter(content_events::Column::EventType.eq(CONTENT_EVENT_READ_COMPLETE)) .filter(content_events::Column::PostSlug.is_in(public_slugs.clone())) .count(&ctx.db) .await?; let last_7d_content_events = content_events::Entity::find() .filter(content_events::Column::CreatedAt.gte(since_7d)) .filter(content_events::Column::PostSlug.is_in(public_slugs)) .all(&ctx.db) .await?; let mut page_views_last_24h = 0_u64; let mut page_views_last_7d = 0_u64; let mut read_completes_last_7d = 0_u64; let mut total_read_progress = 0.0_f64; let mut counted_read_progress = 0_u64; let mut total_read_duration = 0.0_f64; let mut counted_read_duration = 0_u64; let mut post_breakdown = HashMap::::new(); for event in &last_7d_content_events { let created_at = DateTime::::from(event.created_at); let Some(post_slug) = event .post_slug .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) .map(ToString::to_string) else { continue; }; if event.event_type == CONTENT_EVENT_PAGE_VIEW { page_views_last_7d += 1; if created_at >= since_24h { page_views_last_24h += 1; } } if event.event_type == CONTENT_EVENT_READ_COMPLETE { read_completes_last_7d += 1; } if matches!( event.event_type.as_str(), CONTENT_EVENT_READ_PROGRESS | CONTENT_EVENT_READ_COMPLETE ) { let progress = event.progress_percent.unwrap_or({ if event.event_type == CONTENT_EVENT_READ_COMPLETE { 100 } else { 0 } }); if progress > 0 { total_read_progress += f64::from(progress); counted_read_progress += 1; } if let Some(duration_ms) = event.duration_ms.filter(|value| *value >= 0) { total_read_duration += f64::from(duration_ms); counted_read_duration += 1; } } let entry = post_breakdown .entry(post_slug) .or_insert((0, 0, 0.0, 0, 0.0, 0)); if event.event_type == CONTENT_EVENT_PAGE_VIEW { entry.0 += 1; } if event.event_type == CONTENT_EVENT_READ_COMPLETE { entry.1 += 1; } if matches!( event.event_type.as_str(), CONTENT_EVENT_READ_PROGRESS | CONTENT_EVENT_READ_COMPLETE ) { let progress = event.progress_percent.unwrap_or({ if event.event_type == CONTENT_EVENT_READ_COMPLETE { 100 } else { 0 } }); if progress > 0 { entry.2 += f64::from(progress); entry.3 += 1; } if let Some(duration_ms) = event.duration_ms.filter(|value| *value >= 0) { entry.4 += f64::from(duration_ms); entry.5 += 1; } } } let mut popular_posts = post_breakdown .into_iter() .map( |( slug, ( page_views, read_completes, total_progress, progress_count, total_duration, duration_count, ), )| AnalyticsPopularPost { title: post_titles .get(&slug) .cloned() .unwrap_or_else(|| slug.clone()), slug, page_views, read_completes, avg_progress_percent: if progress_count > 0 { total_progress / progress_count as f64 } else { 0.0 }, avg_duration_ms: (duration_count > 0) .then(|| total_duration / duration_count as f64), }, ) .collect::>(); popular_posts.sort_by(|left, right| { right .page_views .cmp(&left.page_views) .then_with(|| right.read_completes.cmp(&left.read_completes)) .then_with(|| left.slug.cmp(&right.slug)) }); popular_posts.truncate(6); Ok(PublicContentHighlights { overview: ContentAnalyticsOverview { total_page_views, page_views_last_24h, page_views_last_7d, total_read_completes, read_completes_last_7d, avg_read_progress_last_7d: if counted_read_progress > 0 { total_read_progress / counted_read_progress as f64 } else { 0.0 }, avg_read_duration_ms_last_7d: (counted_read_duration > 0) .then(|| total_read_duration / counted_read_duration as f64), }, popular_posts, }) } pub async fn build_public_content_windows( ctx: &AppContext, public_posts: &[posts::Model], ) -> Result> { if public_posts.is_empty() { return Ok(vec![ build_empty_public_content_window("24h", "24h", 1), build_empty_public_content_window("7d", "7d", 7), build_empty_public_content_window("30d", "30d", 30), ]); } let now = Utc::now(); let since_30d = now - Duration::days(30); let public_slugs = public_posts .iter() .map(|post| post.slug.clone()) .collect::>(); let post_titles = public_posts .iter() .map(|post| { ( post.slug.clone(), trim_to_option(post.title.clone()).unwrap_or_else(|| post.slug.clone()), ) }) .collect::>(); let events = content_events::Entity::find() .filter(content_events::Column::CreatedAt.gte(since_30d)) .filter(content_events::Column::PostSlug.is_in(public_slugs)) .all(&ctx.db) .await?; Ok(vec![ summarize_public_content_window( &events, &post_titles, now - Duration::hours(24), "24h", "24h", 1, ), summarize_public_content_window( &events, &post_titles, now - Duration::days(7), "7d", "7d", 7, ), summarize_public_content_window(&events, &post_titles, since_30d, "30d", "30d", 30), ]) } fn build_empty_public_content_window( key: &str, label: &str, days: i32, ) -> PublicContentWindowHighlights { PublicContentWindowHighlights { key: key.to_string(), label: label.to_string(), days, overview: PublicContentWindowOverview { page_views: 0, read_completes: 0, avg_read_progress: 0.0, avg_read_duration_ms: None, }, popular_posts: Vec::new(), } } fn summarize_public_content_window( events: &[content_events::Model], post_titles: &HashMap, since: DateTime, key: &str, label: &str, days: i32, ) -> PublicContentWindowHighlights { let mut page_views = 0_u64; let mut read_completes = 0_u64; let mut total_read_progress = 0.0_f64; let mut counted_read_progress = 0_u64; let mut total_read_duration = 0.0_f64; let mut counted_read_duration = 0_u64; let mut post_breakdown = HashMap::::new(); for event in events { let created_at = DateTime::::from(event.created_at); if created_at < since { continue; } let Some(post_slug) = event .post_slug .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) .map(ToString::to_string) else { continue; }; if event.event_type == CONTENT_EVENT_PAGE_VIEW { page_views += 1; } if event.event_type == CONTENT_EVENT_READ_COMPLETE { read_completes += 1; } if matches!( event.event_type.as_str(), CONTENT_EVENT_READ_PROGRESS | CONTENT_EVENT_READ_COMPLETE ) { let progress = event.progress_percent.unwrap_or({ if event.event_type == CONTENT_EVENT_READ_COMPLETE { 100 } else { 0 } }); if progress > 0 { total_read_progress += f64::from(progress); counted_read_progress += 1; } if let Some(duration_ms) = event.duration_ms.filter(|value| *value >= 0) { total_read_duration += f64::from(duration_ms); counted_read_duration += 1; } } let entry = post_breakdown .entry(post_slug) .or_insert((0, 0, 0.0, 0, 0.0, 0)); if event.event_type == CONTENT_EVENT_PAGE_VIEW { entry.0 += 1; } if event.event_type == CONTENT_EVENT_READ_COMPLETE { entry.1 += 1; } if matches!( event.event_type.as_str(), CONTENT_EVENT_READ_PROGRESS | CONTENT_EVENT_READ_COMPLETE ) { let progress = event.progress_percent.unwrap_or({ if event.event_type == CONTENT_EVENT_READ_COMPLETE { 100 } else { 0 } }); if progress > 0 { entry.2 += f64::from(progress); entry.3 += 1; } if let Some(duration_ms) = event.duration_ms.filter(|value| *value >= 0) { entry.4 += f64::from(duration_ms); entry.5 += 1; } } } let mut popular_posts = post_breakdown .into_iter() .map( |( slug, ( item_page_views, item_read_completes, total_progress, progress_count, total_duration, duration_count, ), )| AnalyticsPopularPost { title: post_titles .get(&slug) .cloned() .unwrap_or_else(|| slug.clone()), slug, page_views: item_page_views, read_completes: item_read_completes, avg_progress_percent: if progress_count > 0 { total_progress / progress_count as f64 } else { 0.0 }, avg_duration_ms: (duration_count > 0) .then(|| total_duration / duration_count as f64), }, ) .collect::>(); popular_posts.sort_by(|left, right| { right .page_views .cmp(&left.page_views) .then_with(|| right.read_completes.cmp(&left.read_completes)) .then_with(|| { right .avg_progress_percent .partial_cmp(&left.avg_progress_percent) .unwrap_or(std::cmp::Ordering::Equal) }) .then_with(|| left.slug.cmp(&right.slug)) }); popular_posts.truncate(6); PublicContentWindowHighlights { key: key.to_string(), label: label.to_string(), days, overview: PublicContentWindowOverview { page_views, read_completes, avg_read_progress: if counted_read_progress > 0 { total_read_progress / counted_read_progress as f64 } else { 0.0 }, avg_read_duration_ms: (counted_read_duration > 0) .then(|| total_read_duration / counted_read_duration as f64), }, popular_posts, } }