From 11ec00281c28092c364a146e43bff622aa398740 Mon Sep 17 00:00:00 2001 From: limitcool Date: Sat, 4 Apr 2026 00:40:46 +0800 Subject: [PATCH] Fix AI reindex job execution and progress --- admin/src/lib/worker-progress.ts | 116 ++++++++++++++++++++++++++++ backend/src/services/ai.rs | 107 +++++++++++++++++++++++-- backend/src/services/worker_jobs.rs | 25 +++--- backend/src/workers/ai_reindex.rs | 16 +++- deploy/docker/ARCHITECTURE.md | 5 ++ deploy/docker/README.md | 3 +- 6 files changed, 250 insertions(+), 22 deletions(-) create mode 100644 admin/src/lib/worker-progress.ts diff --git a/admin/src/lib/worker-progress.ts b/admin/src/lib/worker-progress.ts new file mode 100644 index 0000000..f4c8567 --- /dev/null +++ b/admin/src/lib/worker-progress.ts @@ -0,0 +1,116 @@ +import type { WorkerJobRecord } from "@/lib/types"; + +type WorkerProgressShape = { + phase?: string; + message?: string; + total_chunks?: number; + processed_chunks?: number; + total_batches?: number; + current_batch?: number; + batch_size?: number; + percent?: number; +}; + +function asRecord(value: unknown): Record | null { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return null; + } + + return value as Record; +} + +function asNumber(value: unknown): number | null { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + + if (typeof value === "string") { + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : null; + } + + return null; +} + +function asText(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +export function getWorkerProgress( + job: Pick, +): WorkerProgressShape | null { + const result = asRecord(job.result); + if (!result) { + return null; + } + + const nested = asRecord(result.progress); + const source = nested ?? result; + const percent = asNumber(source.percent); + const totalChunks = asNumber(source.total_chunks); + const processedChunks = asNumber(source.processed_chunks); + const totalBatches = asNumber(source.total_batches); + const currentBatch = asNumber(source.current_batch); + const batchSize = asNumber(source.batch_size); + const phase = asText(source.phase) ?? asText(result.phase) ?? undefined; + const message = asText(source.message) ?? asText(result.message) ?? undefined; + + if ( + percent === null && + totalChunks === null && + processedChunks === null && + totalBatches === null && + currentBatch === null && + batchSize === null && + !phase && + !message + ) { + return null; + } + + return { + phase, + message, + total_chunks: totalChunks ?? undefined, + processed_chunks: processedChunks ?? undefined, + total_batches: totalBatches ?? undefined, + current_batch: currentBatch ?? undefined, + batch_size: batchSize ?? undefined, + percent: percent ?? undefined, + }; +} + +export function formatWorkerProgress( + job: Pick, +): string | null { + const progress = getWorkerProgress(job); + if (!progress) { + return null; + } + + const percentText = + typeof progress.percent === "number" + ? `${Math.max(0, Math.min(100, Math.round(progress.percent)))}%` + : null; + const chunkText = + typeof progress.processed_chunks === "number" && + typeof progress.total_chunks === "number" + ? `${progress.processed_chunks}/${progress.total_chunks} 分块` + : null; + const batchText = + typeof progress.current_batch === "number" && + typeof progress.total_batches === "number" && + progress.total_batches > 0 + ? `第 ${progress.current_batch}/${progress.total_batches} 批` + : null; + + const details = [percentText, chunkText, batchText] + .filter(Boolean) + .join(" · "); + + if (progress.message && details) { + return `${progress.message} ${details}`; + } + + return progress.message ?? (details || null); +} diff --git a/backend/src/services/ai.rs b/backend/src/services/ai.rs index d6b2a5e..c29545d 100644 --- a/backend/src/services/ai.rs +++ b/backend/src/services/ai.rs @@ -19,7 +19,7 @@ use uuid::Uuid; use crate::{ controllers::site_settings as site_settings_controller, models::_entities::{ai_chunks, site_settings}, - services::{content, storage}, + services::{content, storage, worker_jobs}, }; const DEFAULT_AI_PROVIDER: &str = "openai"; @@ -36,7 +36,7 @@ const DEFAULT_TOP_K: usize = 4; const DEFAULT_CHUNK_SIZE: usize = 1200; const DEFAULT_SYSTEM_PROMPT: &str = "你是这个博客的站内 AI 助手。请严格基于提供的博客上下文回答,优先给出准确结论,再补充细节;如果上下文不足,请明确说明。"; const EMBEDDING_BATCH_SIZE: usize = 32; -const REINDEX_EMBEDDING_BATCH_SIZE: usize = 4; +pub(crate) const REINDEX_EMBEDDING_BATCH_SIZE: usize = 4; const EMBEDDING_DIMENSION: usize = 384; const LOCAL_EMBEDDING_MODEL_LABEL: &str = "fastembed / local all-MiniLM-L6-v2"; const LOCAL_EMBEDDING_CACHE_DIR: &str = "storage/ai_embedding_models/all-minilm-l6-v2"; @@ -203,6 +203,18 @@ pub struct AiIndexSummary { pub last_indexed_at: Option>, } +#[derive(Clone, Debug, Serialize)] +struct AiReindexProgress { + phase: String, + message: String, + total_chunks: usize, + processed_chunks: usize, + total_batches: usize, + current_batch: usize, + batch_size: usize, + percent: usize, +} + fn trim_to_option(value: Option) -> Option { value.and_then(|item| { let trimmed = item.trim().to_string(); @@ -2470,6 +2482,61 @@ fn retrieval_only_answer(matches: &[ScoredChunk]) -> String { ) } +fn build_reindex_progress( + phase: &str, + message: String, + total_chunks: usize, + processed_chunks: usize, + batch_size: usize, +) -> AiReindexProgress { + let normalized_batch_size = batch_size.max(1); + let total_batches = total_chunks.div_ceil(normalized_batch_size); + let current_batch = if processed_chunks == 0 { + 0 + } else { + processed_chunks + .div_ceil(normalized_batch_size) + .min(total_batches) + }; + let percent = if total_chunks == 0 { + 100 + } else { + ((processed_chunks * 100) / total_chunks).min(100) + }; + + AiReindexProgress { + phase: phase.to_string(), + message, + total_chunks, + processed_chunks, + total_batches, + current_batch, + batch_size: normalized_batch_size, + percent, + } +} + +async fn update_reindex_job_progress( + ctx: &AppContext, + job_id: Option, + progress: &AiReindexProgress, +) -> Result<()> { + let Some(job_id) = job_id else { + return Ok(()); + }; + + worker_jobs::update_job_result( + ctx, + job_id, + serde_json::json!({ + "phase": progress.phase, + "message": progress.message, + "progress": progress, + }), + ) + .await +} + async fn load_runtime_settings( ctx: &AppContext, require_enabled: bool, @@ -2643,11 +2710,25 @@ async fn retrieve_matches( Ok((matches, indexed_chunks, last_indexed_at)) } -pub async fn rebuild_index(ctx: &AppContext) -> Result { +pub async fn rebuild_index(ctx: &AppContext, job_id: Option) -> Result { let settings = load_runtime_settings(ctx, false).await?; let posts = content::load_markdown_posts_from_store(ctx).await?; let mut chunk_drafts = build_chunks(&posts, settings.chunk_size); chunk_drafts.extend(build_profile_chunks(&settings.raw, settings.chunk_size)); + let total_chunks = chunk_drafts.len(); + let batch_size = REINDEX_EMBEDDING_BATCH_SIZE.max(1); + let preparing_progress = build_reindex_progress( + "preparing", + if total_chunks == 0 { + "没有可写入的内容,正在清理旧索引。".to_string() + } else { + format!("已收集 {total_chunks} 个分块,准备重建向量索引。") + }, + total_chunks, + 0, + batch_size, + ); + update_reindex_job_progress(ctx, job_id, &preparing_progress).await?; let txn = ctx.db.begin().await?; txn.execute(Statement::from_string( @@ -2656,14 +2737,16 @@ pub async fn rebuild_index(ctx: &AppContext) -> Result { )) .await?; - for chunk_batch in chunk_drafts.chunks(REINDEX_EMBEDDING_BATCH_SIZE.max(1)) { + let mut processed_chunks = 0usize; + + for chunk_batch in chunk_drafts.chunks(batch_size) { let embeddings = embed_texts_locally_with_batch_size( chunk_batch .iter() .map(|chunk| chunk.content.clone()) .collect::>(), EmbeddingKind::Passage, - REINDEX_EMBEDDING_BATCH_SIZE, + batch_size, ) .await?; @@ -2700,6 +2783,20 @@ pub async fn rebuild_index(ctx: &AppContext) -> Result { ); txn.execute(statement).await?; } + + processed_chunks += chunk_batch.len(); + let embedding_progress = build_reindex_progress( + "embedding", + format!( + "正在写入第 {}/{} 批向量。", + processed_chunks.div_ceil(batch_size), + total_chunks.div_ceil(batch_size) + ), + total_chunks, + processed_chunks, + batch_size, + ); + update_reindex_job_progress(ctx, job_id, &embedding_progress).await?; } let last_indexed_at = update_indexed_at(&txn, &settings.raw).await?; diff --git a/backend/src/services/worker_jobs.rs b/backend/src/services/worker_jobs.rs index 2651d84..f7da69d 100644 --- a/backend/src/services/worker_jobs.rs +++ b/backend/src/services/worker_jobs.rs @@ -189,8 +189,7 @@ fn label_for(worker_name: &str) -> String { fn description_for(worker_name: &str) -> String { match worker_name { - WORKER_AI_REINDEX => "按当前站点内容重新生成 AI 检索索引,并分批写入向量数据。" - .to_string(), + WORKER_AI_REINDEX => "按当前站点内容重新生成 AI 检索索引,并分批写入向量数据。".to_string(), WORKER_DOWNLOAD_MEDIA => "抓取远程图片 / PDF 到媒体库,并回写媒体元数据。".to_string(), WORKER_NOTIFICATION_DELIVERY => "执行订阅通知、测试通知与 digest 投递。".to_string(), TASK_RETRY_DELIVERIES => "扫描 retry_pending 的通知记录并重新入队。".to_string(), @@ -356,18 +355,8 @@ async fn enqueue_download_worker(ctx: &AppContext, args: DownloadWorkerArgs) -> } async fn enqueue_ai_reindex_worker(ctx: &AppContext, args: AiReindexWorkerArgs) -> Result<()> { - match AiReindexWorker::perform_later(ctx, args.clone()).await { - Ok(_) => Ok(()), - Err(Error::QueueProviderMissing) => { - tokio::spawn(dispatch_ai_reindex(ctx.clone(), args)); - Ok(()) - } - Err(error) => { - tracing::warn!("ai reindex worker queue unavailable, falling back to local task: {error}"); - tokio::spawn(dispatch_ai_reindex(ctx.clone(), args)); - Ok(()) - } - } + tokio::spawn(dispatch_ai_reindex(ctx.clone(), args)); + Ok(()) } async fn enqueue_notification_worker( @@ -617,6 +606,14 @@ pub async fn mark_job_succeeded(ctx: &AppContext, id: i32, result: Option Ok(()) } +pub async fn update_job_result(ctx: &AppContext, id: i32, result: Value) -> Result<()> { + let item = find_job(ctx, id).await?; + let mut active = item.into_active_model(); + active.result = Set(Some(result)); + active.update(&ctx.db).await?; + Ok(()) +} + pub async fn mark_job_failed(ctx: &AppContext, id: i32, error_text: String) -> Result<()> { let item = find_job(ctx, id).await?; let mut active = item.into_active_model(); diff --git a/backend/src/workers/ai_reindex.rs b/backend/src/workers/ai_reindex.rs index 30a2aff..4258788 100644 --- a/backend/src/workers/ai_reindex.rs +++ b/backend/src/workers/ai_reindex.rs @@ -29,12 +29,24 @@ impl BackgroundWorker for AiReindexWorker { return Ok(()); } - match ai::rebuild_index(&self.ctx).await { + match ai::rebuild_index(&self.ctx, Some(job_id)).await { Ok(summary) => { worker_jobs::mark_job_succeeded( &self.ctx, job_id, Some(serde_json::json!({ + "phase": "completed", + "message": "AI 索引重建完成。", + "progress": { + "phase": "completed", + "message": "AI 索引重建完成。", + "total_chunks": summary.indexed_chunks, + "processed_chunks": summary.indexed_chunks, + "total_batches": summary.indexed_chunks.div_ceil(ai::REINDEX_EMBEDDING_BATCH_SIZE.max(1)), + "current_batch": summary.indexed_chunks.div_ceil(ai::REINDEX_EMBEDDING_BATCH_SIZE.max(1)), + "batch_size": ai::REINDEX_EMBEDDING_BATCH_SIZE.max(1), + "percent": 100, + }, "indexed_chunks": summary.indexed_chunks, "last_indexed_at": summary.last_indexed_at.map(|value| value.to_rfc3339()), })), @@ -48,7 +60,7 @@ impl BackgroundWorker for AiReindexWorker { } } } else { - ai::rebuild_index(&self.ctx).await?; + ai::rebuild_index(&self.ctx, None).await?; Ok(()) } } diff --git a/deploy/docker/ARCHITECTURE.md b/deploy/docker/ARCHITECTURE.md index 326a0f7..6ffc782 100644 --- a/deploy/docker/ARCHITECTURE.md +++ b/deploy/docker/ARCHITECTURE.md @@ -74,6 +74,11 @@ backend-worker 如果只启动 `backend` 而没有 `backend-worker`,通知会入队但没人消费。 +补充说明: + +- `backend-worker` 目前主要消费 Redis 队列里的通知相关任务。 +- AI 索引重建会直接在 `backend` 进程本地启动,这样创建任务后会立即进入执行,不再依赖独立 worker 消费。 + ## 2.1 推荐的后台认证链路 当前最推荐: diff --git a/deploy/docker/README.md b/deploy/docker/README.md index 337463c..bcefe1c 100644 --- a/deploy/docker/README.md +++ b/deploy/docker/README.md @@ -147,7 +147,8 @@ A: 当前站点对外内容页优先 SEO 与首屏可见性,保留 SSR 更稳 ### Q4: 生产推荐端口设计是什么? A: 推荐前置 Caddy/Nginx 统一暴露 `80/443`,`frontend:4321` / `backend:5150` / `admin:80` 仅走内网。 当前 `compose.package.yml` 属于直连端口版,便于快速部署与联调。 -另外因为通知已经走异步队列,生产务必同时启动 `backend-worker`。 +另外因为通知已经走异步队列,生产务必同时启动 `backend-worker`。 +AI 索引重建当前直接在 `backend` 进程本地启动,不依赖 `backend-worker` 消费 Redis 队列。 ### Q5: 为什么 compose 里没看到 `ADMIN_VITE_FRONTEND_BASE_URL`? A: