chore: checkpoint admin editor and perf work
This commit is contained in:
441
backend/src/services/analytics.rs
Normal file
441
backend/src/services/analytics.rs
Normal file
@@ -0,0 +1,441 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
use axum::http::HeaderMap;
|
||||
use chrono::{DateTime, Duration, NaiveDate, Utc};
|
||||
use loco_rs::prelude::*;
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
|
||||
QuerySelect, Set,
|
||||
};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::models::_entities::query_events;
|
||||
|
||||
const EVENT_TYPE_SEARCH: &str = "search";
|
||||
const EVENT_TYPE_AI_QUESTION: &str = "ai_question";
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct QueryEventRequestContext {
|
||||
pub request_path: Option<String>,
|
||||
pub referrer: Option<String>,
|
||||
pub user_agent: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct QueryEventDraft {
|
||||
pub event_type: String,
|
||||
pub query_text: String,
|
||||
pub request_context: QueryEventRequestContext,
|
||||
pub result_count: Option<i32>,
|
||||
pub success: Option<bool>,
|
||||
pub response_mode: Option<String>,
|
||||
pub provider: Option<String>,
|
||||
pub chat_model: Option<String>,
|
||||
pub latency_ms: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct AnalyticsOverview {
|
||||
pub total_searches: u64,
|
||||
pub total_ai_questions: u64,
|
||||
pub searches_last_24h: u64,
|
||||
pub ai_questions_last_24h: u64,
|
||||
pub searches_last_7d: u64,
|
||||
pub ai_questions_last_7d: u64,
|
||||
pub unique_search_terms_last_7d: usize,
|
||||
pub unique_ai_questions_last_7d: usize,
|
||||
pub avg_search_results_last_7d: f64,
|
||||
pub avg_ai_latency_ms_last_7d: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct AnalyticsTopQuery {
|
||||
pub query: String,
|
||||
pub count: u64,
|
||||
pub last_seen_at: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct AnalyticsRecentEvent {
|
||||
pub id: i32,
|
||||
pub event_type: String,
|
||||
pub query: String,
|
||||
pub result_count: Option<i32>,
|
||||
pub success: Option<bool>,
|
||||
pub response_mode: Option<String>,
|
||||
pub provider: Option<String>,
|
||||
pub chat_model: Option<String>,
|
||||
pub latency_ms: Option<i32>,
|
||||
pub created_at: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct AnalyticsProviderBucket {
|
||||
pub provider: String,
|
||||
pub count: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct AnalyticsDailyBucket {
|
||||
pub date: String,
|
||||
pub searches: u64,
|
||||
pub ai_questions: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct AdminAnalyticsResponse {
|
||||
pub overview: AnalyticsOverview,
|
||||
pub top_search_terms: Vec<AnalyticsTopQuery>,
|
||||
pub top_ai_questions: Vec<AnalyticsTopQuery>,
|
||||
pub recent_events: Vec<AnalyticsRecentEvent>,
|
||||
pub providers_last_7d: Vec<AnalyticsProviderBucket>,
|
||||
pub daily_activity: Vec<AnalyticsDailyBucket>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct QueryAggregate {
|
||||
query: String,
|
||||
count: u64,
|
||||
last_seen_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
fn trim_to_option(value: Option<String>) -> Option<String> {
|
||||
value.and_then(|item| {
|
||||
let trimmed = item.trim().to_string();
|
||||
if trimmed.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(trimmed)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn normalize_query(value: &str) -> String {
|
||||
value
|
||||
.split_whitespace()
|
||||
.collect::<Vec<_>>()
|
||||
.join(" ")
|
||||
.to_lowercase()
|
||||
}
|
||||
|
||||
fn format_timestamp(value: DateTime<Utc>) -> String {
|
||||
value.format("%Y-%m-%d %H:%M").to_string()
|
||||
}
|
||||
|
||||
fn header_value(headers: &HeaderMap, key: &str) -> Option<String> {
|
||||
headers
|
||||
.get(key)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(ToString::to_string)
|
||||
.and_then(|value| trim_to_option(Some(value)))
|
||||
}
|
||||
|
||||
fn clamp_latency(latency_ms: i64) -> i32 {
|
||||
latency_ms.clamp(0, i64::from(i32::MAX)) as i32
|
||||
}
|
||||
|
||||
fn build_query_aggregates(
|
||||
events: &[query_events::Model],
|
||||
wanted_type: &str,
|
||||
) -> Vec<QueryAggregate> {
|
||||
let mut grouped: HashMap<String, QueryAggregate> = HashMap::new();
|
||||
|
||||
for event in events
|
||||
.iter()
|
||||
.filter(|event| event.event_type == wanted_type)
|
||||
{
|
||||
let entry = grouped
|
||||
.entry(event.normalized_query.clone())
|
||||
.or_insert_with(|| QueryAggregate {
|
||||
query: event.query_text.clone(),
|
||||
count: 0,
|
||||
last_seen_at: event.created_at.into(),
|
||||
});
|
||||
|
||||
entry.count += 1;
|
||||
|
||||
let created_at = DateTime::<Utc>::from(event.created_at);
|
||||
if created_at >= entry.last_seen_at {
|
||||
entry.query = event.query_text.clone();
|
||||
entry.last_seen_at = created_at;
|
||||
}
|
||||
}
|
||||
|
||||
let mut items = grouped.into_values().collect::<Vec<_>>();
|
||||
items.sort_by(|left, right| {
|
||||
right
|
||||
.count
|
||||
.cmp(&left.count)
|
||||
.then_with(|| right.last_seen_at.cmp(&left.last_seen_at))
|
||||
});
|
||||
items
|
||||
}
|
||||
|
||||
fn aggregate_queries(
|
||||
events: &[query_events::Model],
|
||||
wanted_type: &str,
|
||||
limit: usize,
|
||||
) -> (usize, Vec<AnalyticsTopQuery>) {
|
||||
let aggregates = build_query_aggregates(events, wanted_type);
|
||||
let total_unique = aggregates.len();
|
||||
let items = aggregates
|
||||
.into_iter()
|
||||
.take(limit)
|
||||
.map(|item| AnalyticsTopQuery {
|
||||
query: item.query,
|
||||
count: item.count,
|
||||
last_seen_at: format_timestamp(item.last_seen_at),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
(total_unique, items)
|
||||
}
|
||||
|
||||
pub fn request_context_from_headers(path: &str, headers: &HeaderMap) -> QueryEventRequestContext {
|
||||
QueryEventRequestContext {
|
||||
request_path: trim_to_option(Some(path.to_string())),
|
||||
referrer: header_value(headers, "referer"),
|
||||
user_agent: header_value(headers, "user-agent"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record_event(ctx: &AppContext, draft: QueryEventDraft) {
|
||||
let query_text = draft.query_text.trim().to_string();
|
||||
if query_text.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let active_model = query_events::ActiveModel {
|
||||
event_type: Set(draft.event_type),
|
||||
query_text: Set(query_text.clone()),
|
||||
normalized_query: Set(normalize_query(&query_text)),
|
||||
request_path: Set(trim_to_option(draft.request_context.request_path)),
|
||||
referrer: Set(trim_to_option(draft.request_context.referrer)),
|
||||
user_agent: Set(trim_to_option(draft.request_context.user_agent)),
|
||||
result_count: Set(draft.result_count),
|
||||
success: Set(draft.success),
|
||||
response_mode: Set(trim_to_option(draft.response_mode)),
|
||||
provider: Set(trim_to_option(draft.provider)),
|
||||
chat_model: Set(trim_to_option(draft.chat_model)),
|
||||
latency_ms: Set(draft.latency_ms.map(|value| value.max(0))),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Err(error) = active_model.insert(&ctx.db).await {
|
||||
tracing::warn!("failed to record query analytics event: {error}");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record_search_event(
|
||||
ctx: &AppContext,
|
||||
query_text: &str,
|
||||
result_count: usize,
|
||||
headers: &HeaderMap,
|
||||
latency_ms: i64,
|
||||
) {
|
||||
record_event(
|
||||
ctx,
|
||||
QueryEventDraft {
|
||||
event_type: EVENT_TYPE_SEARCH.to_string(),
|
||||
query_text: query_text.to_string(),
|
||||
request_context: request_context_from_headers("/api/search", headers),
|
||||
result_count: Some(result_count.min(i32::MAX as usize) as i32),
|
||||
success: Some(true),
|
||||
response_mode: None,
|
||||
provider: None,
|
||||
chat_model: None,
|
||||
latency_ms: Some(clamp_latency(latency_ms)),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn record_ai_question_event(
|
||||
ctx: &AppContext,
|
||||
question: &str,
|
||||
headers: &HeaderMap,
|
||||
success: bool,
|
||||
response_mode: &str,
|
||||
provider: Option<String>,
|
||||
chat_model: Option<String>,
|
||||
result_count: Option<usize>,
|
||||
latency_ms: i64,
|
||||
) {
|
||||
record_event(
|
||||
ctx,
|
||||
QueryEventDraft {
|
||||
event_type: EVENT_TYPE_AI_QUESTION.to_string(),
|
||||
query_text: question.to_string(),
|
||||
request_context: request_context_from_headers(
|
||||
if response_mode == "stream" {
|
||||
"/api/ai/ask/stream"
|
||||
} else {
|
||||
"/api/ai/ask"
|
||||
},
|
||||
headers,
|
||||
),
|
||||
result_count: result_count.map(|value| value.min(i32::MAX as usize) as i32),
|
||||
success: Some(success),
|
||||
response_mode: Some(response_mode.to_string()),
|
||||
provider,
|
||||
chat_model,
|
||||
latency_ms: Some(clamp_latency(latency_ms)),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn build_admin_analytics(ctx: &AppContext) -> Result<AdminAnalyticsResponse> {
|
||||
let now = Utc::now();
|
||||
let since_24h = now - Duration::hours(24);
|
||||
let since_7d = now - Duration::days(7);
|
||||
|
||||
let total_searches = query_events::Entity::find()
|
||||
.filter(query_events::Column::EventType.eq(EVENT_TYPE_SEARCH))
|
||||
.count(&ctx.db)
|
||||
.await?;
|
||||
let total_ai_questions = query_events::Entity::find()
|
||||
.filter(query_events::Column::EventType.eq(EVENT_TYPE_AI_QUESTION))
|
||||
.count(&ctx.db)
|
||||
.await?;
|
||||
|
||||
let searches_last_24h = query_events::Entity::find()
|
||||
.filter(query_events::Column::EventType.eq(EVENT_TYPE_SEARCH))
|
||||
.filter(query_events::Column::CreatedAt.gte(since_24h))
|
||||
.count(&ctx.db)
|
||||
.await?;
|
||||
let ai_questions_last_24h = query_events::Entity::find()
|
||||
.filter(query_events::Column::EventType.eq(EVENT_TYPE_AI_QUESTION))
|
||||
.filter(query_events::Column::CreatedAt.gte(since_24h))
|
||||
.count(&ctx.db)
|
||||
.await?;
|
||||
|
||||
let last_7d_events = query_events::Entity::find()
|
||||
.filter(query_events::Column::CreatedAt.gte(since_7d))
|
||||
.order_by_desc(query_events::Column::CreatedAt)
|
||||
.all(&ctx.db)
|
||||
.await?;
|
||||
|
||||
let searches_last_7d = last_7d_events
|
||||
.iter()
|
||||
.filter(|event| event.event_type == EVENT_TYPE_SEARCH)
|
||||
.count() as u64;
|
||||
let ai_questions_last_7d = last_7d_events
|
||||
.iter()
|
||||
.filter(|event| event.event_type == EVENT_TYPE_AI_QUESTION)
|
||||
.count() as u64;
|
||||
|
||||
let (unique_search_terms_last_7d, top_search_terms) =
|
||||
aggregate_queries(&last_7d_events, EVENT_TYPE_SEARCH, 8);
|
||||
let (unique_ai_questions_last_7d, top_ai_questions) =
|
||||
aggregate_queries(&last_7d_events, EVENT_TYPE_AI_QUESTION, 8);
|
||||
|
||||
let mut provider_breakdown: HashMap<String, u64> = HashMap::new();
|
||||
let mut daily_map: BTreeMap<NaiveDate, (u64, u64)> = BTreeMap::new();
|
||||
let mut total_search_results = 0.0_f64;
|
||||
let mut counted_search_results = 0_u64;
|
||||
let mut total_ai_latency = 0.0_f64;
|
||||
let mut counted_ai_latency = 0_u64;
|
||||
|
||||
for offset in 0..7 {
|
||||
let date = (now - Duration::days(offset)).date_naive();
|
||||
daily_map.entry(date).or_insert((0, 0));
|
||||
}
|
||||
|
||||
for event in &last_7d_events {
|
||||
let day = DateTime::<Utc>::from(event.created_at).date_naive();
|
||||
let entry = daily_map.entry(day).or_insert((0, 0));
|
||||
|
||||
if event.event_type == EVENT_TYPE_SEARCH {
|
||||
entry.0 += 1;
|
||||
if let Some(result_count) = event.result_count {
|
||||
total_search_results += f64::from(result_count.max(0));
|
||||
counted_search_results += 1;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if event.event_type == EVENT_TYPE_AI_QUESTION {
|
||||
entry.1 += 1;
|
||||
|
||||
let provider = event
|
||||
.provider
|
||||
.clone()
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.unwrap_or_else(|| "local-or-unspecified".to_string());
|
||||
*provider_breakdown.entry(provider).or_insert(0) += 1;
|
||||
|
||||
if let Some(latency_ms) = event.latency_ms {
|
||||
total_ai_latency += f64::from(latency_ms.max(0));
|
||||
counted_ai_latency += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut providers_last_7d = provider_breakdown
|
||||
.into_iter()
|
||||
.map(|(provider, count)| AnalyticsProviderBucket { provider, count })
|
||||
.collect::<Vec<_>>();
|
||||
providers_last_7d.sort_by(|left, right| {
|
||||
right
|
||||
.count
|
||||
.cmp(&left.count)
|
||||
.then_with(|| left.provider.cmp(&right.provider))
|
||||
});
|
||||
providers_last_7d.truncate(6);
|
||||
|
||||
let mut daily_activity = daily_map
|
||||
.into_iter()
|
||||
.map(|(date, (searches, ai_questions))| AnalyticsDailyBucket {
|
||||
date: date.format("%Y-%m-%d").to_string(),
|
||||
searches,
|
||||
ai_questions,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
daily_activity.sort_by(|left, right| left.date.cmp(&right.date));
|
||||
|
||||
let recent_events = query_events::Entity::find()
|
||||
.order_by_desc(query_events::Column::CreatedAt)
|
||||
.limit(24)
|
||||
.all(&ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|event| AnalyticsRecentEvent {
|
||||
id: event.id,
|
||||
event_type: event.event_type,
|
||||
query: event.query_text,
|
||||
result_count: event.result_count,
|
||||
success: event.success,
|
||||
response_mode: event.response_mode,
|
||||
provider: event.provider,
|
||||
chat_model: event.chat_model,
|
||||
latency_ms: event.latency_ms,
|
||||
created_at: format_timestamp(event.created_at.into()),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(AdminAnalyticsResponse {
|
||||
overview: AnalyticsOverview {
|
||||
total_searches,
|
||||
total_ai_questions,
|
||||
searches_last_24h,
|
||||
ai_questions_last_24h,
|
||||
searches_last_7d,
|
||||
ai_questions_last_7d,
|
||||
unique_search_terms_last_7d,
|
||||
unique_ai_questions_last_7d,
|
||||
avg_search_results_last_7d: if counted_search_results > 0 {
|
||||
total_search_results / counted_search_results as f64
|
||||
} else {
|
||||
0.0
|
||||
},
|
||||
avg_ai_latency_ms_last_7d: (counted_ai_latency > 0)
|
||||
.then(|| total_ai_latency / counted_ai_latency as f64),
|
||||
},
|
||||
top_search_terms,
|
||||
top_ai_questions,
|
||||
recent_events,
|
||||
providers_last_7d,
|
||||
daily_activity,
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user