From cf00dc5e8e9e1f75c52fc595b267a2f79e14e1f8 Mon Sep 17 00:00:00 2001
From: limitcool
Date: Fri, 3 Apr 2026 15:48:33 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20AI=20=E7=B4=A2?=
=?UTF-8?q?=E5=BC=95=E9=87=8D=E5=BB=BA=E5=8A=9F=E8=83=BD=EF=BC=8C=E4=BC=98?=
=?UTF-8?q?=E5=8C=96=E7=9B=B8=E5=85=B3=20API=20=E5=92=8C=E5=B7=A5=E4=BD=9C?=
=?UTF-8?q?=E6=B5=81=EF=BC=8C=E5=A2=9E=E5=BC=BA=E5=86=85=E5=AD=98=E7=AE=A1?=
=?UTF-8?q?=E7=90=86=E9=85=8D=E7=BD=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
admin/src/lib/api.ts | 3 +-
admin/src/lib/types.ts | 5 -
admin/src/pages/site-settings-page.tsx | 99 +++++++++++++++++++-
backend/src/app.rs | 6 +-
backend/src/controllers/admin_api.rs | 33 +++++--
backend/src/controllers/ai.rs | 21 +++--
backend/src/services/ai.rs | 122 +++++++++++++------------
backend/src/services/worker_jobs.rs | 81 ++++++++++++++++
backend/src/workers/ai_reindex.rs | 55 +++++++++++
backend/src/workers/mod.rs | 1 +
deploy/docker/.env.example | 11 +++
deploy/docker/README.md | 13 +++
deploy/docker/compose.package.yml | 19 ++++
deploy/docker/config.yaml.example | 8 ++
mcp-server/server.js | 2 +-
15 files changed, 391 insertions(+), 88 deletions(-)
create mode 100644 backend/src/workers/ai_reindex.rs
diff --git a/admin/src/lib/api.ts b/admin/src/lib/api.ts
index 5e7c8d1..945dcd6 100644
--- a/admin/src/lib/api.ts
+++ b/admin/src/lib/api.ts
@@ -1,7 +1,6 @@
import type {
AdminAnalyticsResponse,
AdminAiImageProviderTestResponse,
- AdminAiReindexResponse,
AdminAiProviderTestResponse,
AdminImageUploadResponse,
AdminMediaBatchDeleteResponse,
@@ -362,7 +361,7 @@ export const adminApi = {
body: JSON.stringify(payload),
}),
reindexAi: () =>
- request('/api/admin/ai/reindex', {
+ request('/api/admin/ai/reindex', {
method: 'POST',
}),
testAiProvider: (provider: {
diff --git a/admin/src/lib/types.ts b/admin/src/lib/types.ts
index 8db3c3f..95eb587 100644
--- a/admin/src/lib/types.ts
+++ b/admin/src/lib/types.ts
@@ -545,11 +545,6 @@ export interface TaxonomyPayload {
seoDescription?: string | null
}
-export interface AdminAiReindexResponse {
- indexed_chunks: number
- last_indexed_at: string | null
-}
-
export interface AdminAiProviderTestResponse {
provider: string
endpoint: string
diff --git a/admin/src/pages/site-settings-page.tsx b/admin/src/pages/site-settings-page.tsx
index a100da9..f5c7233 100644
--- a/admin/src/pages/site-settings-page.tsx
+++ b/admin/src/pages/site-settings-page.tsx
@@ -1,6 +1,7 @@
import { Bot, Check, Plus, RefreshCcw, Save, Trash2 } from 'lucide-react'
import type { ReactNode } from 'react'
import { startTransition, useCallback, useEffect, useMemo, useState } from 'react'
+import { Link } from 'react-router-dom'
import { toast } from 'sonner'
import { MediaUrlControls } from '@/components/media-url-controls'
@@ -19,6 +20,7 @@ import type {
HumanVerificationMode,
MusicTrack,
SiteSettingsPayload,
+ WorkerJobRecord,
} from '@/lib/types'
function createEmptyMusicTrack(): MusicTrack {
@@ -237,6 +239,8 @@ export function SiteSettingsPage() {
const [loading, setLoading] = useState(true)
const [saving, setSaving] = useState(false)
const [reindexing, setReindexing] = useState(false)
+ const [reindexJobId, setReindexJobId] = useState(null)
+ const [reindexJobStatus, setReindexJobStatus] = useState(null)
const [testingProvider, setTestingProvider] = useState(false)
const [testingImageProvider, setTestingImageProvider] = useState(false)
const [testingR2Storage, setTestingR2Storage] = useState(false)
@@ -295,6 +299,74 @@ export function SiteSettingsPage() {
})
}, [form?.ai_active_provider_id, form?.ai_providers])
+ useEffect(() => {
+ if (!reindexJobId) {
+ return
+ }
+
+ let cancelled = false
+ let timer: ReturnType | null = null
+
+ const scheduleNextPoll = () => {
+ timer = setTimeout(() => {
+ void pollJob()
+ }, 4000)
+ }
+
+ const pollJob = async () => {
+ try {
+ const job = await adminApi.getWorkerJob(reindexJobId)
+ if (cancelled) {
+ return
+ }
+
+ setReindexJobStatus(job.status)
+
+ if (job.status === 'succeeded') {
+ setReindexing(false)
+ setReindexJobId(null)
+ setReindexJobStatus(null)
+ const indexedChunks = Number(job.result?.indexed_chunks ?? 0)
+ toast.success(
+ indexedChunks > 0
+ ? `AI 索引重建完成,共生成 ${indexedChunks} 个分块。`
+ : 'AI 索引重建完成。',
+ )
+ await loadSettings(false)
+ return
+ }
+
+ if (job.status === 'failed' || job.status === 'cancelled') {
+ setReindexing(false)
+ setReindexJobId(null)
+ setReindexJobStatus(null)
+ toast.error(job.error_text?.trim() || 'AI 重建索引失败。')
+ return
+ }
+
+ scheduleNextPoll()
+ } catch (error) {
+ if (cancelled) {
+ return
+ }
+
+ scheduleNextPoll()
+ if (error instanceof ApiError && error.status === 401) {
+ return
+ }
+ }
+ }
+
+ void pollJob()
+
+ return () => {
+ cancelled = true
+ if (timer) {
+ clearTimeout(timer)
+ }
+ }
+ }, [loadSettings, reindexJobId])
+
const updateField = (
key: K,
value: AdminSiteSettingsResponse[K],
@@ -498,25 +570,38 @@ export function SiteSettingsPage() {
刷新
+ {reindexJobId ? (
+
+ ) : null}
+ {reindexJobId ? (
+
+ 当前后台任务 #{reindexJobId}
+ {reindexJobStatus ? `,状态:${reindexJobStatus}` : ''}
+
+ ) : null}
diff --git a/backend/src/app.rs b/backend/src/app.rs
index 6e10757..e7ab805 100644
--- a/backend/src/app.rs
+++ b/backend/src/app.rs
@@ -28,7 +28,10 @@ use crate::{
ai_chunks, categories, comments, friend_links, posts, reviews, site_settings, tags, users,
},
tasks,
- workers::{downloader::DownloadWorker, notification_delivery::NotificationDeliveryWorker},
+ workers::{
+ ai_reindex::AiReindexWorker, downloader::DownloadWorker,
+ notification_delivery::NotificationDeliveryWorker,
+ },
};
pub struct App;
@@ -153,6 +156,7 @@ impl Hooks for App {
Ok(router.layer(cors))
}
async fn connect_workers(ctx: &AppContext, queue: &Queue) -> Result<()> {
+ queue.register(AiReindexWorker::build(ctx)).await?;
queue.register(DownloadWorker::build(ctx)).await?;
queue
.register(NotificationDeliveryWorker::build(ctx))
diff --git a/backend/src/controllers/admin_api.rs b/backend/src/controllers/admin_api.rs
index eb92522..560c4db 100644
--- a/backend/src/controllers/admin_api.rs
+++ b/backend/src/controllers/admin_api.rs
@@ -230,8 +230,8 @@ pub struct AdminSiteSettingsResponse {
#[derive(Clone, Debug, Serialize)]
pub struct AdminAiReindexResponse {
- pub indexed_chunks: usize,
- pub last_indexed_at: Option,
+ pub queued: bool,
+ pub job: worker_jobs::WorkerJobRecord,
}
#[derive(Clone, Debug, Deserialize)]
@@ -1395,15 +1395,30 @@ pub async fn update_site_settings(
#[debug_handler]
pub async fn reindex_ai(headers: HeaderMap, State(ctx): State) -> Result {
- check_auth(&headers)?;
- let summary = ai::rebuild_index(&ctx).await?;
+ let actor = check_auth(&headers)?;
+ let job = worker_jobs::queue_ai_reindex_job(
+ &ctx,
+ Some(actor.username.clone()),
+ Some(actor.source.clone()),
+ None,
+ Some("manual".to_string()),
+ )
+ .await?;
+
+ admin_audit::log_event(
+ &ctx,
+ Some(&actor),
+ "worker.ai_reindex",
+ "worker_job",
+ Some(job.id.to_string()),
+ Some(job.worker_name.clone()),
+ None,
+ )
+ .await?;
format::json(AdminAiReindexResponse {
- indexed_chunks: summary.indexed_chunks,
- last_indexed_at: format_timestamp(
- summary.last_indexed_at.map(Into::into),
- "%Y-%m-%d %H:%M:%S UTC",
- ),
+ queued: true,
+ job,
})
}
diff --git a/backend/src/controllers/ai.rs b/backend/src/controllers/ai.rs
index 9c1b40d..9dbe278 100644
--- a/backend/src/controllers/ai.rs
+++ b/backend/src/controllers/ai.rs
@@ -16,7 +16,7 @@ use std::time::Instant;
use crate::{
controllers::{admin::check_auth, site_settings},
- services::{abuse_guard, ai, analytics},
+ services::{abuse_guard, ai, analytics, worker_jobs},
};
#[derive(Clone, Debug, Deserialize)]
@@ -35,8 +35,8 @@ pub struct AskResponse {
#[derive(Clone, Debug, Serialize)]
pub struct ReindexResponse {
- pub indexed_chunks: usize,
- pub last_indexed_at: Option,
+ pub queued: bool,
+ pub job: worker_jobs::WorkerJobRecord,
}
#[derive(Clone, Debug, Serialize)]
@@ -514,12 +514,19 @@ pub async fn ask_stream(
#[debug_handler]
pub async fn reindex(headers: HeaderMap, State(ctx): State) -> Result {
- check_auth(&headers)?;
- let summary = ai::rebuild_index(&ctx).await?;
+ let actor = check_auth(&headers)?;
+ let job = worker_jobs::queue_ai_reindex_job(
+ &ctx,
+ Some(actor.username.clone()),
+ Some(actor.source.clone()),
+ None,
+ Some("manual".to_string()),
+ )
+ .await?;
format::json(ReindexResponse {
- indexed_chunks: summary.indexed_chunks,
- last_indexed_at: format_timestamp(summary.last_indexed_at),
+ queued: true,
+ job,
})
}
diff --git a/backend/src/services/ai.rs b/backend/src/services/ai.rs
index a5a9cda..d6b2a5e 100644
--- a/backend/src/services/ai.rs
+++ b/backend/src/services/ai.rs
@@ -7,7 +7,7 @@ use loco_rs::prelude::*;
use reqwest::{Client, Url, header::CONTENT_TYPE, multipart};
use sea_orm::{
ActiveModelTrait, ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, IntoActiveModel,
- PaginatorTrait, QueryOrder, Set, Statement,
+ PaginatorTrait, QueryOrder, Set, Statement, TransactionTrait,
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
@@ -36,6 +36,7 @@ const DEFAULT_TOP_K: usize = 4;
const DEFAULT_CHUNK_SIZE: usize = 1200;
const DEFAULT_SYSTEM_PROMPT: &str = "你是这个博客的站内 AI 助手。请严格基于提供的博客上下文回答,优先给出准确结论,再补充细节;如果上下文不足,请明确说明。";
const EMBEDDING_BATCH_SIZE: usize = 32;
+const REINDEX_EMBEDDING_BATCH_SIZE: usize = 4;
const EMBEDDING_DIMENSION: usize = 384;
const LOCAL_EMBEDDING_MODEL_LABEL: &str = "fastembed / local all-MiniLM-L6-v2";
const LOCAL_EMBEDDING_CACHE_DIR: &str = "storage/ai_embedding_models/all-minilm-l6-v2";
@@ -771,6 +772,14 @@ pub fn default_image_model_for_provider(provider: &str) -> &'static str {
}
async fn embed_texts_locally(inputs: Vec, kind: EmbeddingKind) -> Result>> {
+ embed_texts_locally_with_batch_size(inputs, kind, EMBEDDING_BATCH_SIZE).await
+}
+
+async fn embed_texts_locally_with_batch_size(
+ inputs: Vec,
+ kind: EmbeddingKind,
+ batch_size: usize,
+) -> Result>> {
tokio::task::spawn_blocking(move || {
let model = local_embedding_engine()?;
let prepared = inputs
@@ -783,7 +792,7 @@ async fn embed_texts_locally(inputs: Vec, kind: EmbeddingKind) -> Result
})?;
let embeddings = guard
- .embed(prepared, Some(EMBEDDING_BATCH_SIZE))
+ .embed(prepared, Some(batch_size.max(1)))
.map_err(|error| Error::BadRequest(format!("本地 embedding 生成失败: {error}")))?;
Ok(embeddings
@@ -2555,14 +2564,14 @@ async fn load_runtime_settings(
})
}
-async fn update_indexed_at(
- ctx: &AppContext,
+async fn update_indexed_at(
+ db: &C,
settings: &site_settings::Model,
) -> Result> {
let now = Utc::now();
let mut model = settings.clone().into_active_model();
model.ai_last_indexed_at = Set(Some(now.into()));
- let _ = model.update(&ctx.db).await?;
+ let _ = model.update(db).await?;
Ok(now)
}
@@ -2571,14 +2580,8 @@ async fn retrieve_matches(
settings: &AiRuntimeSettings,
question: &str,
) -> Result<(Vec, usize, Option>)> {
- let mut indexed_chunks = ai_chunks::Entity::find().count(&ctx.db).await? as usize;
- let mut last_indexed_at = settings.raw.ai_last_indexed_at.map(Into::into);
-
- if indexed_chunks == 0 {
- let summary = rebuild_index(ctx).await?;
- indexed_chunks = summary.indexed_chunks;
- last_indexed_at = summary.last_indexed_at;
- }
+ let indexed_chunks = ai_chunks::Entity::find().count(&ctx.db).await? as usize;
+ let last_indexed_at = settings.raw.ai_last_indexed_at.map(Into::into);
if indexed_chunks == 0 {
return Ok((Vec::new(), 0, last_indexed_at));
@@ -2645,61 +2648,62 @@ pub async fn rebuild_index(ctx: &AppContext) -> Result {
let posts = content::load_markdown_posts_from_store(ctx).await?;
let mut chunk_drafts = build_chunks(&posts, settings.chunk_size);
chunk_drafts.extend(build_profile_chunks(&settings.raw, settings.chunk_size));
- let embeddings = if chunk_drafts.is_empty() {
- Vec::new()
- } else {
- embed_texts_locally(
- chunk_drafts
+ let txn = ctx.db.begin().await?;
+
+ txn.execute(Statement::from_string(
+ DbBackend::Postgres,
+ "TRUNCATE TABLE ai_chunks RESTART IDENTITY".to_string(),
+ ))
+ .await?;
+
+ for chunk_batch in chunk_drafts.chunks(REINDEX_EMBEDDING_BATCH_SIZE.max(1)) {
+ let embeddings = embed_texts_locally_with_batch_size(
+ chunk_batch
.iter()
.map(|chunk| chunk.content.clone())
.collect::>(),
EmbeddingKind::Passage,
+ REINDEX_EMBEDDING_BATCH_SIZE,
)
- .await?
- };
-
- ctx.db
- .execute(Statement::from_string(
- DbBackend::Postgres,
- "TRUNCATE TABLE ai_chunks RESTART IDENTITY".to_string(),
- ))
.await?;
- for (draft, embedding) in chunk_drafts.iter().zip(embeddings.into_iter()) {
- let embedding_literal = vector_literal(&embedding)?;
- let statement = Statement::from_sql_and_values(
- DbBackend::Postgres,
- r#"
- INSERT INTO ai_chunks (
- source_slug,
- source_title,
- source_path,
- source_type,
- chunk_index,
- content,
- content_preview,
- embedding,
- word_count
- ) VALUES (
- $1, $2, $3, $4, $5, $6, $7, $8::vector, $9
- )
- "#,
- vec![
- draft.source_slug.clone().into(),
- draft.source_title.clone().into(),
- draft.source_path.clone().into(),
- draft.source_type.clone().into(),
- draft.chunk_index.into(),
- draft.content.clone().into(),
- draft.content_preview.clone().into(),
- embedding_literal.into(),
- draft.word_count.into(),
- ],
- );
- ctx.db.execute(statement).await?;
+ for (draft, embedding) in chunk_batch.iter().zip(embeddings.into_iter()) {
+ let embedding_literal = vector_literal(&embedding)?;
+ let statement = Statement::from_sql_and_values(
+ DbBackend::Postgres,
+ r#"
+ INSERT INTO ai_chunks (
+ source_slug,
+ source_title,
+ source_path,
+ source_type,
+ chunk_index,
+ content,
+ content_preview,
+ embedding,
+ word_count
+ ) VALUES (
+ $1, $2, $3, $4, $5, $6, $7, $8::vector, $9
+ )
+ "#,
+ vec![
+ draft.source_slug.clone().into(),
+ draft.source_title.clone().into(),
+ draft.source_path.clone().into(),
+ draft.source_type.clone().into(),
+ draft.chunk_index.into(),
+ draft.content.clone().into(),
+ draft.content_preview.clone().into(),
+ embedding_literal.into(),
+ draft.word_count.into(),
+ ],
+ );
+ txn.execute(statement).await?;
+ }
}
- let last_indexed_at = update_indexed_at(ctx, &settings.raw).await?;
+ let last_indexed_at = update_indexed_at(&txn, &settings.raw).await?;
+ txn.commit().await?;
Ok(AiIndexSummary {
indexed_chunks: chunk_drafts.len(),
diff --git a/backend/src/services/worker_jobs.rs b/backend/src/services/worker_jobs.rs
index bec477f..2651d84 100644
--- a/backend/src/services/worker_jobs.rs
+++ b/backend/src/services/worker_jobs.rs
@@ -11,6 +11,7 @@ use crate::{
models::_entities::{notification_deliveries, worker_jobs},
services::subscriptions,
workers::{
+ ai_reindex::{AiReindexWorker, AiReindexWorkerArgs},
downloader::{DownloadWorker, DownloadWorkerArgs},
notification_delivery::{NotificationDeliveryWorker, NotificationDeliveryWorkerArgs},
},
@@ -27,6 +28,7 @@ pub const JOB_STATUS_CANCELLED: &str = "cancelled";
pub const WORKER_DOWNLOAD_MEDIA: &str = "worker.download_media";
pub const WORKER_NOTIFICATION_DELIVERY: &str = "worker.notification_delivery";
+pub const WORKER_AI_REINDEX: &str = "worker.ai_reindex";
pub const TASK_RETRY_DELIVERIES: &str = "task.retry_deliveries";
pub const TASK_SEND_WEEKLY_DIGEST: &str = "task.send_weekly_digest";
pub const TASK_SEND_MONTHLY_DIGEST: &str = "task.send_monthly_digest";
@@ -164,6 +166,7 @@ fn trim_to_option(value: Option) -> Option {
fn queue_name_for(worker_name: &str) -> Option {
match worker_name {
+ WORKER_AI_REINDEX => Some("ai".to_string()),
WORKER_DOWNLOAD_MEDIA => Some("media".to_string()),
WORKER_NOTIFICATION_DELIVERY => Some("notifications".to_string()),
TASK_RETRY_DELIVERIES => Some("maintenance".to_string()),
@@ -174,6 +177,7 @@ fn queue_name_for(worker_name: &str) -> Option {
fn label_for(worker_name: &str) -> String {
match worker_name {
+ WORKER_AI_REINDEX => "AI 索引重建".to_string(),
WORKER_DOWNLOAD_MEDIA => "远程媒体下载".to_string(),
WORKER_NOTIFICATION_DELIVERY => "通知投递".to_string(),
TASK_RETRY_DELIVERIES => "重试待投递通知".to_string(),
@@ -185,6 +189,8 @@ fn label_for(worker_name: &str) -> String {
fn description_for(worker_name: &str) -> String {
match worker_name {
+ WORKER_AI_REINDEX => "按当前站点内容重新生成 AI 检索索引,并分批写入向量数据。"
+ .to_string(),
WORKER_DOWNLOAD_MEDIA => "抓取远程图片 / PDF 到媒体库,并回写媒体元数据。".to_string(),
WORKER_NOTIFICATION_DELIVERY => "执行订阅通知、测试通知与 digest 投递。".to_string(),
TASK_RETRY_DELIVERIES => "扫描 retry_pending 的通知记录并重新入队。".to_string(),
@@ -196,6 +202,7 @@ fn description_for(worker_name: &str) -> String {
fn tags_for(worker_name: &str) -> Value {
match worker_name {
+ WORKER_AI_REINDEX => json!(["ai", "reindex"]),
WORKER_DOWNLOAD_MEDIA => json!(["media", "download"]),
WORKER_NOTIFICATION_DELIVERY => json!(["notifications", "delivery"]),
TASK_RETRY_DELIVERIES => json!(["maintenance", "retry"]),
@@ -249,6 +256,7 @@ fn to_job_record(item: worker_jobs::Model) -> WorkerJobRecord {
fn catalog_entries() -> Vec {
[
+ (WORKER_AI_REINDEX, JOB_KIND_WORKER, true, true),
(WORKER_DOWNLOAD_MEDIA, JOB_KIND_WORKER, true, true),
(WORKER_NOTIFICATION_DELIVERY, JOB_KIND_WORKER, true, true),
(TASK_RETRY_DELIVERIES, JOB_KIND_TASK, true, true),
@@ -313,6 +321,13 @@ async fn dispatch_download(args_ctx: AppContext, args: DownloadWorkerArgs) {
}
}
+async fn dispatch_ai_reindex(args_ctx: AppContext, args: AiReindexWorkerArgs) {
+ let worker = AiReindexWorker::build(&args_ctx);
+ if let Err(error) = worker.perform(args).await {
+ tracing::warn!("ai reindex worker execution failed: {error}");
+ }
+}
+
async fn dispatch_notification_delivery(
args_ctx: AppContext,
args: NotificationDeliveryWorkerArgs,
@@ -340,6 +355,21 @@ async fn enqueue_download_worker(ctx: &AppContext, args: DownloadWorkerArgs) ->
}
}
+async fn enqueue_ai_reindex_worker(ctx: &AppContext, args: AiReindexWorkerArgs) -> Result<()> {
+ match AiReindexWorker::perform_later(ctx, args.clone()).await {
+ Ok(_) => Ok(()),
+ Err(Error::QueueProviderMissing) => {
+ tokio::spawn(dispatch_ai_reindex(ctx.clone(), args));
+ Ok(())
+ }
+ Err(error) => {
+ tracing::warn!("ai reindex worker queue unavailable, falling back to local task: {error}");
+ tokio::spawn(dispatch_ai_reindex(ctx.clone(), args));
+ Ok(())
+ }
+ }
+}
+
async fn enqueue_notification_worker(
ctx: &AppContext,
args: NotificationDeliveryWorkerArgs,
@@ -717,6 +747,46 @@ pub async fn queue_notification_delivery_job(
get_job_record(ctx, job.id).await
}
+pub async fn queue_ai_reindex_job(
+ ctx: &AppContext,
+ requested_by: Option,
+ requested_source: Option,
+ parent_job_id: Option,
+ trigger_mode: Option,
+) -> Result {
+ let base_args = AiReindexWorkerArgs { job_id: None };
+ let payload = serde_json::to_value(&base_args)?;
+
+ let job = create_job(
+ ctx,
+ CreateWorkerJobInput {
+ parent_job_id,
+ job_kind: JOB_KIND_WORKER.to_string(),
+ worker_name: WORKER_AI_REINDEX.to_string(),
+ display_name: Some("重建 AI 索引".to_string()),
+ queue_name: queue_name_for(WORKER_AI_REINDEX),
+ requested_by,
+ requested_source,
+ trigger_mode,
+ payload: Some(payload),
+ tags: Some(tags_for(WORKER_AI_REINDEX)),
+ related_entity_type: Some("ai_index".to_string()),
+ related_entity_id: Some("site".to_string()),
+ max_attempts: 1,
+ },
+ )
+ .await?;
+
+ enqueue_ai_reindex_worker(
+ ctx,
+ AiReindexWorkerArgs {
+ job_id: Some(job.id),
+ },
+ )
+ .await?;
+ get_job_record(ctx, job.id).await
+}
+
pub async fn spawn_retry_deliveries_task(
ctx: &AppContext,
limit: Option,
@@ -810,6 +880,17 @@ pub async fn retry_job(
let payload = item.payload.clone().unwrap_or(Value::Null);
match item.worker_name.as_str() {
+ WORKER_AI_REINDEX => {
+ let _ = serde_json::from_value::(payload)?;
+ queue_ai_reindex_job(
+ ctx,
+ requested_by,
+ requested_source,
+ Some(item.id),
+ Some("retry".to_string()),
+ )
+ .await
+ }
WORKER_DOWNLOAD_MEDIA => {
let args = serde_json::from_value::(payload)?;
queue_download_job(
diff --git a/backend/src/workers/ai_reindex.rs b/backend/src/workers/ai_reindex.rs
new file mode 100644
index 0000000..30a2aff
--- /dev/null
+++ b/backend/src/workers/ai_reindex.rs
@@ -0,0 +1,55 @@
+use loco_rs::prelude::*;
+use serde::{Deserialize, Serialize};
+
+use crate::services::{ai, worker_jobs};
+
+pub struct AiReindexWorker {
+ pub ctx: AppContext,
+}
+
+#[derive(Clone, Debug, Default, Deserialize, Serialize)]
+pub struct AiReindexWorkerArgs {
+ #[serde(default)]
+ pub job_id: Option,
+}
+
+#[async_trait]
+impl BackgroundWorker for AiReindexWorker {
+ fn build(ctx: &AppContext) -> Self {
+ Self { ctx: ctx.clone() }
+ }
+
+ fn tags() -> Vec {
+ vec!["ai".to_string(), "reindex".to_string()]
+ }
+
+ async fn perform(&self, args: AiReindexWorkerArgs) -> Result<()> {
+ if let Some(job_id) = args.job_id {
+ if !worker_jobs::begin_job_execution(&self.ctx, job_id).await? {
+ return Ok(());
+ }
+
+ match ai::rebuild_index(&self.ctx).await {
+ Ok(summary) => {
+ worker_jobs::mark_job_succeeded(
+ &self.ctx,
+ job_id,
+ Some(serde_json::json!({
+ "indexed_chunks": summary.indexed_chunks,
+ "last_indexed_at": summary.last_indexed_at.map(|value| value.to_rfc3339()),
+ })),
+ )
+ .await?;
+ Ok(())
+ }
+ Err(error) => {
+ worker_jobs::mark_job_failed(&self.ctx, job_id, error.to_string()).await?;
+ Err(error)
+ }
+ }
+ } else {
+ ai::rebuild_index(&self.ctx).await?;
+ Ok(())
+ }
+ }
+}
diff --git a/backend/src/workers/mod.rs b/backend/src/workers/mod.rs
index 9df2b47..b5fc4c4 100644
--- a/backend/src/workers/mod.rs
+++ b/backend/src/workers/mod.rs
@@ -1,2 +1,3 @@
+pub mod ai_reindex;
pub mod downloader;
pub mod notification_delivery;
diff --git a/deploy/docker/.env.example b/deploy/docker/.env.example
index 382da69..97f34d0 100644
--- a/deploy/docker/.env.example
+++ b/deploy/docker/.env.example
@@ -3,6 +3,17 @@ BACKEND_PORT=5150
FRONTEND_PORT=4321
ADMIN_PORT=4322
+# 建议在小内存主机上给每个服务设置明确上限,避免 backend 在 AI 重建索引时
+# 把整台主机拖进 swap 抖动。默认值与 compose.package.yml 保持一致。
+BACKEND_MEMORY_LIMIT=768m
+BACKEND_MEMORY_SWAP_LIMIT=768m
+BACKEND_WORKER_MEMORY_LIMIT=512m
+BACKEND_WORKER_MEMORY_SWAP_LIMIT=512m
+FRONTEND_MEMORY_LIMIT=256m
+FRONTEND_MEMORY_SWAP_LIMIT=256m
+ADMIN_MEMORY_LIMIT=128m
+ADMIN_MEMORY_SWAP_LIMIT=128m
+
# frontend SSR 服务端访问 backend 用这个内部地址(compose 默认可直接使用)
INTERNAL_API_BASE_URL=http://backend:5150/api
diff --git a/deploy/docker/README.md b/deploy/docker/README.md
index 0390614..c3e6c63 100644
--- a/deploy/docker/README.md
+++ b/deploy/docker/README.md
@@ -43,6 +43,10 @@ python deploy/scripts/render_compose_env.py \
建议在 `config.yaml -> compose_env` 下同时检查这些运行时变量:
+- `BACKEND_MEMORY_LIMIT / BACKEND_MEMORY_SWAP_LIMIT`:backend 容器内存 / swap 上限;对小内存主机建议显式设置
+- `BACKEND_WORKER_MEMORY_LIMIT / BACKEND_WORKER_MEMORY_SWAP_LIMIT`:worker 容器内存 / swap 上限
+- `FRONTEND_MEMORY_LIMIT / FRONTEND_MEMORY_SWAP_LIMIT`:frontend 容器内存 / swap 上限
+- `ADMIN_MEMORY_LIMIT / ADMIN_MEMORY_SWAP_LIMIT`:admin 容器内存 / swap 上限
- `INTERNAL_API_BASE_URL`:frontend SSR 容器访问 backend 用,compose 默认推荐 `http://backend:5150/api`
- `PUBLIC_API_BASE_URL`:浏览器访问 backend API 用;留空时前台会回退到“当前主机 + `:5150/api`”
- `PUBLIC_COMMENT_TURNSTILE_SITE_KEY`:前台评论 / 订阅表单使用的 Cloudflare Turnstile site key
@@ -62,6 +66,14 @@ python deploy/scripts/render_compose_env.py \
```yaml
compose_env:
+ BACKEND_MEMORY_LIMIT: 768m
+ BACKEND_MEMORY_SWAP_LIMIT: 768m
+ BACKEND_WORKER_MEMORY_LIMIT: 512m
+ BACKEND_WORKER_MEMORY_SWAP_LIMIT: 512m
+ FRONTEND_MEMORY_LIMIT: 256m
+ FRONTEND_MEMORY_SWAP_LIMIT: 256m
+ ADMIN_MEMORY_LIMIT: 128m
+ ADMIN_MEMORY_SWAP_LIMIT: 128m
PUBLIC_API_BASE_URL: https://api.blog.init.cool
PUBLIC_COMMENT_TURNSTILE_SITE_KEY: 1x00000000000000000000AA
PUBLIC_WEB_PUSH_VAPID_PUBLIC_KEY: replace-with-web-push-vapid-public-key
@@ -178,6 +190,7 @@ A:
A:
- `backend` 镜像启动时会先执行 `db migrate`
- `backend` 提供 `/healthz`
+- `backend-worker` 不提供 HTTP `/healthz`;compose 会覆盖镜像默认 healthcheck,改为检查主进程是否仍以 `--worker` 模式运行
- `frontend` 提供 `/healthz`
- `admin` 继续由 Nginx 提供 `/healthz`
- compose 现在使用 `depends_on.condition: service_healthy`
diff --git a/deploy/docker/compose.package.yml b/deploy/docker/compose.package.yml
index 928583c..048559a 100644
--- a/deploy/docker/compose.package.yml
+++ b/deploy/docker/compose.package.yml
@@ -3,6 +3,10 @@ services:
image: ${BACKEND_IMAGE:-git.init.cool/cool/termi-astro-backend:latest}
pull_policy: always
restart: unless-stopped
+ # 对 tohka 这类小内存主机,建议给服务设置明确上限,
+ # 避免 AI 重建索引时把整机拖进 swap 抖动 / OOM。
+ mem_limit: ${BACKEND_MEMORY_LIMIT:-768m}
+ memswap_limit: ${BACKEND_MEMORY_SWAP_LIMIT:-768m}
environment:
PORT: 5150
APP_BASE_URL: ${APP_BASE_URL:-http://localhost:5150}
@@ -30,6 +34,8 @@ services:
image: ${BACKEND_IMAGE:-git.init.cool/cool/termi-astro-backend:latest}
pull_policy: always
restart: unless-stopped
+ mem_limit: ${BACKEND_WORKER_MEMORY_LIMIT:-512m}
+ memswap_limit: ${BACKEND_WORKER_MEMORY_SWAP_LIMIT:-512m}
depends_on:
backend:
condition: service_healthy
@@ -48,11 +54,22 @@ services:
TERMI_WEB_PUSH_VAPID_SUBJECT: ${TERMI_WEB_PUSH_VAPID_SUBJECT:-}
RUST_LOG: ${RUST_LOG:-info}
TERMI_SKIP_MIGRATIONS: 'true'
+ # backend 镜像默认 healthcheck 会探测 HTTP /healthz,
+ # 但 worker 模式不监听 5150,所以这里改成“主进程仍然是 --worker”检查。
+ healthcheck:
+ test:
+ ['CMD-SHELL', "test -r /proc/1/cmdline && tr '\\000' ' ' {
const data = await requestBackend('POST', '/ai/reindex');
- return createToolResult('AI index rebuilt', data);
+ return createToolResult('AI index rebuild job queued', data);
}
);