Fix AI reindex job execution and progress
Some checks failed
docker-images / resolve-build-targets (push) Failing after 1s
docker-images / build-and-push (admin) (push) Has been cancelled
docker-images / submit-indexnow (push) Has been cancelled
docker-images / build-and-push (backend) (push) Has been cancelled
docker-images / build-and-push (frontend) (push) Has been cancelled
ui-regression / playwright-regression (push) Has been cancelled

This commit is contained in:
2026-04-04 00:40:46 +08:00
parent 320595ee1c
commit 11ec00281c
6 changed files with 250 additions and 22 deletions

View File

@@ -0,0 +1,116 @@
import type { WorkerJobRecord } from "@/lib/types";
type WorkerProgressShape = {
phase?: string;
message?: string;
total_chunks?: number;
processed_chunks?: number;
total_batches?: number;
current_batch?: number;
batch_size?: number;
percent?: number;
};
function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
function asNumber(value: unknown): number | null {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string") {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
}
return null;
}
function asText(value: unknown): string | null {
return typeof value === "string" && value.trim() ? value.trim() : null;
}
export function getWorkerProgress(
job: Pick<WorkerJobRecord, "result">,
): WorkerProgressShape | null {
const result = asRecord(job.result);
if (!result) {
return null;
}
const nested = asRecord(result.progress);
const source = nested ?? result;
const percent = asNumber(source.percent);
const totalChunks = asNumber(source.total_chunks);
const processedChunks = asNumber(source.processed_chunks);
const totalBatches = asNumber(source.total_batches);
const currentBatch = asNumber(source.current_batch);
const batchSize = asNumber(source.batch_size);
const phase = asText(source.phase) ?? asText(result.phase) ?? undefined;
const message = asText(source.message) ?? asText(result.message) ?? undefined;
if (
percent === null &&
totalChunks === null &&
processedChunks === null &&
totalBatches === null &&
currentBatch === null &&
batchSize === null &&
!phase &&
!message
) {
return null;
}
return {
phase,
message,
total_chunks: totalChunks ?? undefined,
processed_chunks: processedChunks ?? undefined,
total_batches: totalBatches ?? undefined,
current_batch: currentBatch ?? undefined,
batch_size: batchSize ?? undefined,
percent: percent ?? undefined,
};
}
export function formatWorkerProgress(
job: Pick<WorkerJobRecord, "result">,
): string | null {
const progress = getWorkerProgress(job);
if (!progress) {
return null;
}
const percentText =
typeof progress.percent === "number"
? `${Math.max(0, Math.min(100, Math.round(progress.percent)))}%`
: null;
const chunkText =
typeof progress.processed_chunks === "number" &&
typeof progress.total_chunks === "number"
? `${progress.processed_chunks}/${progress.total_chunks} 分块`
: null;
const batchText =
typeof progress.current_batch === "number" &&
typeof progress.total_batches === "number" &&
progress.total_batches > 0
? `${progress.current_batch}/${progress.total_batches}`
: null;
const details = [percentText, chunkText, batchText]
.filter(Boolean)
.join(" · ");
if (progress.message && details) {
return `${progress.message} ${details}`;
}
return progress.message ?? (details || null);
}

View File

@@ -19,7 +19,7 @@ use uuid::Uuid;
use crate::{
controllers::site_settings as site_settings_controller,
models::_entities::{ai_chunks, site_settings},
services::{content, storage},
services::{content, storage, worker_jobs},
};
const DEFAULT_AI_PROVIDER: &str = "openai";
@@ -36,7 +36,7 @@ const DEFAULT_TOP_K: usize = 4;
const DEFAULT_CHUNK_SIZE: usize = 1200;
const DEFAULT_SYSTEM_PROMPT: &str = "你是这个博客的站内 AI 助手。请严格基于提供的博客上下文回答,优先给出准确结论,再补充细节;如果上下文不足,请明确说明。";
const EMBEDDING_BATCH_SIZE: usize = 32;
const REINDEX_EMBEDDING_BATCH_SIZE: usize = 4;
pub(crate) const REINDEX_EMBEDDING_BATCH_SIZE: usize = 4;
const EMBEDDING_DIMENSION: usize = 384;
const LOCAL_EMBEDDING_MODEL_LABEL: &str = "fastembed / local all-MiniLM-L6-v2";
const LOCAL_EMBEDDING_CACHE_DIR: &str = "storage/ai_embedding_models/all-minilm-l6-v2";
@@ -203,6 +203,18 @@ pub struct AiIndexSummary {
pub last_indexed_at: Option<DateTime<Utc>>,
}
#[derive(Clone, Debug, Serialize)]
struct AiReindexProgress {
phase: String,
message: String,
total_chunks: usize,
processed_chunks: usize,
total_batches: usize,
current_batch: usize,
batch_size: usize,
percent: usize,
}
fn trim_to_option(value: Option<String>) -> Option<String> {
value.and_then(|item| {
let trimmed = item.trim().to_string();
@@ -2470,6 +2482,61 @@ fn retrieval_only_answer(matches: &[ScoredChunk]) -> String {
)
}
fn build_reindex_progress(
phase: &str,
message: String,
total_chunks: usize,
processed_chunks: usize,
batch_size: usize,
) -> AiReindexProgress {
let normalized_batch_size = batch_size.max(1);
let total_batches = total_chunks.div_ceil(normalized_batch_size);
let current_batch = if processed_chunks == 0 {
0
} else {
processed_chunks
.div_ceil(normalized_batch_size)
.min(total_batches)
};
let percent = if total_chunks == 0 {
100
} else {
((processed_chunks * 100) / total_chunks).min(100)
};
AiReindexProgress {
phase: phase.to_string(),
message,
total_chunks,
processed_chunks,
total_batches,
current_batch,
batch_size: normalized_batch_size,
percent,
}
}
async fn update_reindex_job_progress(
ctx: &AppContext,
job_id: Option<i32>,
progress: &AiReindexProgress,
) -> Result<()> {
let Some(job_id) = job_id else {
return Ok(());
};
worker_jobs::update_job_result(
ctx,
job_id,
serde_json::json!({
"phase": progress.phase,
"message": progress.message,
"progress": progress,
}),
)
.await
}
async fn load_runtime_settings(
ctx: &AppContext,
require_enabled: bool,
@@ -2643,11 +2710,25 @@ async fn retrieve_matches(
Ok((matches, indexed_chunks, last_indexed_at))
}
pub async fn rebuild_index(ctx: &AppContext) -> Result<AiIndexSummary> {
pub async fn rebuild_index(ctx: &AppContext, job_id: Option<i32>) -> Result<AiIndexSummary> {
let settings = load_runtime_settings(ctx, false).await?;
let posts = content::load_markdown_posts_from_store(ctx).await?;
let mut chunk_drafts = build_chunks(&posts, settings.chunk_size);
chunk_drafts.extend(build_profile_chunks(&settings.raw, settings.chunk_size));
let total_chunks = chunk_drafts.len();
let batch_size = REINDEX_EMBEDDING_BATCH_SIZE.max(1);
let preparing_progress = build_reindex_progress(
"preparing",
if total_chunks == 0 {
"没有可写入的内容,正在清理旧索引。".to_string()
} else {
format!("已收集 {total_chunks} 个分块,准备重建向量索引。")
},
total_chunks,
0,
batch_size,
);
update_reindex_job_progress(ctx, job_id, &preparing_progress).await?;
let txn = ctx.db.begin().await?;
txn.execute(Statement::from_string(
@@ -2656,14 +2737,16 @@ pub async fn rebuild_index(ctx: &AppContext) -> Result<AiIndexSummary> {
))
.await?;
for chunk_batch in chunk_drafts.chunks(REINDEX_EMBEDDING_BATCH_SIZE.max(1)) {
let mut processed_chunks = 0usize;
for chunk_batch in chunk_drafts.chunks(batch_size) {
let embeddings = embed_texts_locally_with_batch_size(
chunk_batch
.iter()
.map(|chunk| chunk.content.clone())
.collect::<Vec<_>>(),
EmbeddingKind::Passage,
REINDEX_EMBEDDING_BATCH_SIZE,
batch_size,
)
.await?;
@@ -2700,6 +2783,20 @@ pub async fn rebuild_index(ctx: &AppContext) -> Result<AiIndexSummary> {
);
txn.execute(statement).await?;
}
processed_chunks += chunk_batch.len();
let embedding_progress = build_reindex_progress(
"embedding",
format!(
"正在写入第 {}/{} 批向量。",
processed_chunks.div_ceil(batch_size),
total_chunks.div_ceil(batch_size)
),
total_chunks,
processed_chunks,
batch_size,
);
update_reindex_job_progress(ctx, job_id, &embedding_progress).await?;
}
let last_indexed_at = update_indexed_at(&txn, &settings.raw).await?;

View File

@@ -189,8 +189,7 @@ fn label_for(worker_name: &str) -> String {
fn description_for(worker_name: &str) -> String {
match worker_name {
WORKER_AI_REINDEX => "按当前站点内容重新生成 AI 检索索引,并分批写入向量数据。"
.to_string(),
WORKER_AI_REINDEX => "按当前站点内容重新生成 AI 检索索引,并分批写入向量数据。".to_string(),
WORKER_DOWNLOAD_MEDIA => "抓取远程图片 / PDF 到媒体库,并回写媒体元数据。".to_string(),
WORKER_NOTIFICATION_DELIVERY => "执行订阅通知、测试通知与 digest 投递。".to_string(),
TASK_RETRY_DELIVERIES => "扫描 retry_pending 的通知记录并重新入队。".to_string(),
@@ -356,18 +355,8 @@ async fn enqueue_download_worker(ctx: &AppContext, args: DownloadWorkerArgs) ->
}
async fn enqueue_ai_reindex_worker(ctx: &AppContext, args: AiReindexWorkerArgs) -> Result<()> {
match AiReindexWorker::perform_later(ctx, args.clone()).await {
Ok(_) => Ok(()),
Err(Error::QueueProviderMissing) => {
tokio::spawn(dispatch_ai_reindex(ctx.clone(), args));
Ok(())
}
Err(error) => {
tracing::warn!("ai reindex worker queue unavailable, falling back to local task: {error}");
tokio::spawn(dispatch_ai_reindex(ctx.clone(), args));
Ok(())
}
}
}
async fn enqueue_notification_worker(
@@ -617,6 +606,14 @@ pub async fn mark_job_succeeded(ctx: &AppContext, id: i32, result: Option<Value>
Ok(())
}
pub async fn update_job_result(ctx: &AppContext, id: i32, result: Value) -> Result<()> {
let item = find_job(ctx, id).await?;
let mut active = item.into_active_model();
active.result = Set(Some(result));
active.update(&ctx.db).await?;
Ok(())
}
pub async fn mark_job_failed(ctx: &AppContext, id: i32, error_text: String) -> Result<()> {
let item = find_job(ctx, id).await?;
let mut active = item.into_active_model();

View File

@@ -29,12 +29,24 @@ impl BackgroundWorker<AiReindexWorkerArgs> for AiReindexWorker {
return Ok(());
}
match ai::rebuild_index(&self.ctx).await {
match ai::rebuild_index(&self.ctx, Some(job_id)).await {
Ok(summary) => {
worker_jobs::mark_job_succeeded(
&self.ctx,
job_id,
Some(serde_json::json!({
"phase": "completed",
"message": "AI 索引重建完成。",
"progress": {
"phase": "completed",
"message": "AI 索引重建完成。",
"total_chunks": summary.indexed_chunks,
"processed_chunks": summary.indexed_chunks,
"total_batches": summary.indexed_chunks.div_ceil(ai::REINDEX_EMBEDDING_BATCH_SIZE.max(1)),
"current_batch": summary.indexed_chunks.div_ceil(ai::REINDEX_EMBEDDING_BATCH_SIZE.max(1)),
"batch_size": ai::REINDEX_EMBEDDING_BATCH_SIZE.max(1),
"percent": 100,
},
"indexed_chunks": summary.indexed_chunks,
"last_indexed_at": summary.last_indexed_at.map(|value| value.to_rfc3339()),
})),
@@ -48,7 +60,7 @@ impl BackgroundWorker<AiReindexWorkerArgs> for AiReindexWorker {
}
}
} else {
ai::rebuild_index(&self.ctx).await?;
ai::rebuild_index(&self.ctx, None).await?;
Ok(())
}
}

View File

@@ -74,6 +74,11 @@ backend-worker
如果只启动 `backend` 而没有 `backend-worker`,通知会入队但没人消费。
补充说明:
- `backend-worker` 目前主要消费 Redis 队列里的通知相关任务。
- AI 索引重建会直接在 `backend` 进程本地启动,这样创建任务后会立即进入执行,不再依赖独立 worker 消费。
## 2.1 推荐的后台认证链路
当前最推荐:

View File

@@ -148,6 +148,7 @@ A: 当前站点对外内容页优先 SEO 与首屏可见性,保留 SSR 更稳
A: 推荐前置 Caddy/Nginx 统一暴露 `80/443``frontend:4321` / `backend:5150` / `admin:80` 仅走内网。
当前 `compose.package.yml` 属于直连端口版,便于快速部署与联调。
另外因为通知已经走异步队列,生产务必同时启动 `backend-worker`
AI 索引重建当前直接在 `backend` 进程本地启动,不依赖 `backend-worker` 消费 Redis 队列。
### Q5: 为什么 compose 里没看到 `ADMIN_VITE_FRONTEND_BASE_URL`
A: