diff --git a/.gitea/workflows/ui-regression.yml b/.gitea/workflows/ui-regression.yml
index e09241e..6fa4d2d 100644
--- a/.gitea/workflows/ui-regression.yml
+++ b/.gitea/workflows/ui-regression.yml
@@ -100,67 +100,27 @@ jobs:
cp -R playwright-smoke/test-results playwright-smoke/.artifacts/admin/test-results
fi
- - name: Upload frontend HTML report
+ - name: Summarize Playwright artifact paths
if: always()
- uses: actions/upload-artifact@v4
- with:
- name: playwright-html-report-frontend
- path: playwright-smoke/.artifacts/frontend/playwright-report
- retention-days: 14
- if-no-files-found: ignore
+ shell: bash
+ run: |
+ set -euo pipefail
- - name: Upload admin HTML report
- if: always()
- uses: actions/upload-artifact@v4
- with:
- name: playwright-html-report-admin
- path: playwright-smoke/.artifacts/admin/playwright-report
- retention-days: 14
- if-no-files-found: ignore
+ echo "Gitea Actions 当前不支持 actions/upload-artifact@v4,改为直接输出产物目录:"
- - name: Upload frontend raw results
- if: always()
- uses: actions/upload-artifact@v4
- with:
- name: playwright-raw-results-frontend
- path: playwright-smoke/.artifacts/frontend/test-results
- retention-days: 14
- if-no-files-found: ignore
-
- - name: Upload admin raw results
- if: always()
- uses: actions/upload-artifact@v4
- with:
- name: playwright-raw-results-admin
- path: playwright-smoke/.artifacts/admin/test-results
- retention-days: 14
- if-no-files-found: ignore
-
- - name: Upload frontend failure screenshots / videos / traces
- if: steps.ui_frontend.outcome != 'success'
- uses: actions/upload-artifact@v4
- with:
- name: playwright-failure-artifacts-frontend
- path: |
- playwright-smoke/.artifacts/frontend/test-results/**/*.png
- playwright-smoke/.artifacts/frontend/test-results/**/*.webm
- playwright-smoke/.artifacts/frontend/test-results/**/*.zip
- playwright-smoke/.artifacts/frontend/test-results/**/error-context.md
- retention-days: 21
- if-no-files-found: ignore
-
- - name: Upload admin failure screenshots / videos / traces
- if: steps.ui_admin.outcome != 'success'
- uses: actions/upload-artifact@v4
- with:
- name: playwright-failure-artifacts-admin
- path: |
- playwright-smoke/.artifacts/admin/test-results/**/*.png
- playwright-smoke/.artifacts/admin/test-results/**/*.webm
- playwright-smoke/.artifacts/admin/test-results/**/*.zip
- playwright-smoke/.artifacts/admin/test-results/**/error-context.md
- retention-days: 21
- if-no-files-found: ignore
+ for path in \
+ "playwright-smoke/.artifacts/frontend/playwright-report" \
+ "playwright-smoke/.artifacts/frontend/test-results" \
+ "playwright-smoke/.artifacts/admin/playwright-report" \
+ "playwright-smoke/.artifacts/admin/test-results"
+ do
+ if [ -d "${path}" ]; then
+ echo "- ${path}"
+ find "${path}" -maxdepth 2 -type f | sort | head -n 20
+ else
+ echo "- ${path} (missing)"
+ fi
+ done
- name: Mark workflow failed when any suite failed
if: steps.ui_frontend.outcome != 'success' || steps.ui_admin.outcome != 'success'
diff --git a/admin/src/App.tsx b/admin/src/App.tsx
index aee3dbf..0b2b026 100644
--- a/admin/src/App.tsx
+++ b/admin/src/App.tsx
@@ -94,6 +94,10 @@ const SubscriptionsPage = lazy(async () => {
const mod = await import('@/pages/subscriptions-page')
return { default: mod.SubscriptionsPage }
})
+const WorkersPage = lazy(async () => {
+ const mod = await import('@/pages/workers-page')
+ return { default: mod.WorkersPage }
+})
type SessionContextValue = {
session: AdminSessionResponse
@@ -389,6 +393,14 @@ function AppRoutes() {
}
/>
+
+
+
+ }
+ />
- request<{ queued: boolean; id: number; delivery_id: number }>(`/api/admin/subscriptions/${id}/test`, {
+ request<{ queued: boolean; id: number; delivery_id: number; job_id?: number | null }>(`/api/admin/subscriptions/${id}/test`, {
method: 'POST',
}),
listSubscriptionDeliveries: async (limit = 80) =>
@@ -248,6 +254,42 @@ export const adminApi = {
method: 'POST',
body: JSON.stringify({ period }),
}),
+ getWorkersOverview: () => request('/api/admin/workers/overview'),
+ listWorkerJobs: (query?: {
+ status?: string
+ jobKind?: string
+ workerName?: string
+ search?: string
+ limit?: number
+ }) =>
+ request(
+ appendQueryParams('/api/admin/workers/jobs', {
+ status: query?.status,
+ job_kind: query?.jobKind,
+ worker_name: query?.workerName,
+ search: query?.search,
+ limit: query?.limit,
+ }),
+ ),
+ getWorkerJob: (id: number) => request(`/api/admin/workers/jobs/${id}`),
+ cancelWorkerJob: (id: number) =>
+ request(`/api/admin/workers/jobs/${id}/cancel`, {
+ method: 'POST',
+ }),
+ retryWorkerJob: (id: number) =>
+ request(`/api/admin/workers/jobs/${id}/retry`, {
+ method: 'POST',
+ }),
+ runRetryDeliveriesWorker: (limit?: number) =>
+ request('/api/admin/workers/tasks/retry-deliveries', {
+ method: 'POST',
+ body: JSON.stringify({ limit }),
+ }),
+ runDigestWorker: (period: 'weekly' | 'monthly') =>
+ request('/api/admin/workers/tasks/digest', {
+ method: 'POST',
+ body: JSON.stringify({ period }),
+ }),
dashboard: () => request('/api/admin/dashboard'),
analytics: () => request('/api/admin/analytics'),
listCategories: () => request('/api/admin/categories'),
@@ -405,6 +447,19 @@ export const adminApi = {
body: formData,
})
},
+ downloadMediaObject: (payload: MediaDownloadPayload) =>
+ request('/api/admin/storage/media/download', {
+ method: 'POST',
+ body: JSON.stringify({
+ source_url: payload.sourceUrl,
+ prefix: payload.prefix,
+ title: payload.title,
+ alt_text: payload.altText,
+ caption: payload.caption,
+ tags: payload.tags,
+ notes: payload.notes,
+ }),
+ }),
updateMediaObjectMetadata: (payload: MediaAssetMetadataPayload) =>
request('/api/admin/storage/media/metadata', {
method: 'PATCH',
diff --git a/admin/src/lib/types.ts b/admin/src/lib/types.ts
index 28f072e..878effe 100644
--- a/admin/src/lib/types.ts
+++ b/admin/src/lib/types.ts
@@ -125,6 +125,79 @@ export interface SubscriptionDigestResponse {
skipped: number
}
+export interface WorkerCatalogEntry {
+ worker_name: string
+ job_kind: string
+ label: string
+ description: string
+ queue_name: string | null
+ supports_cancel: boolean
+ supports_retry: boolean
+}
+
+export interface WorkerStats {
+ worker_name: string
+ job_kind: string
+ label: string
+ queued: number
+ running: number
+ succeeded: number
+ failed: number
+ cancelled: number
+ last_job_at: string | null
+}
+
+export interface WorkerOverview {
+ total_jobs: number
+ queued: number
+ running: number
+ succeeded: number
+ failed: number
+ cancelled: number
+ active_jobs: number
+ worker_stats: WorkerStats[]
+ catalog: WorkerCatalogEntry[]
+}
+
+export interface WorkerJobRecord {
+ created_at: string
+ updated_at: string
+ id: number
+ parent_job_id: number | null
+ job_kind: string
+ worker_name: string
+ display_name: string | null
+ status: string
+ queue_name: string | null
+ requested_by: string | null
+ requested_source: string | null
+ trigger_mode: string | null
+ payload: Record | null
+ result: Record | null
+ error_text: string | null
+ tags: unknown[] | Record | null
+ related_entity_type: string | null
+ related_entity_id: string | null
+ attempts_count: number
+ max_attempts: number
+ cancel_requested: boolean
+ queued_at: string | null
+ started_at: string | null
+ finished_at: string | null
+ can_cancel: boolean
+ can_retry: boolean
+}
+
+export interface WorkerJobListResponse {
+ total: number
+ jobs: WorkerJobRecord[]
+}
+
+export interface WorkerTaskActionResponse {
+ queued: boolean
+ job: WorkerJobRecord
+}
+
export interface DashboardStats {
total_posts: number
total_comments: number
@@ -533,6 +606,22 @@ export interface AdminMediaReplaceResponse {
url: string
}
+export interface MediaDownloadPayload {
+ sourceUrl: string
+ prefix?: string | null
+ title?: string | null
+ altText?: string | null
+ caption?: string | null
+ tags?: string[]
+ notes?: string | null
+}
+
+export interface AdminMediaDownloadResponse {
+ queued: boolean
+ job_id: number
+ status: string
+}
+
export interface MediaAssetMetadataPayload {
key: string
title?: string | null
diff --git a/admin/src/pages/dashboard-page.tsx b/admin/src/pages/dashboard-page.tsx
index 74295d4..36672bd 100644
--- a/admin/src/pages/dashboard-page.tsx
+++ b/admin/src/pages/dashboard-page.tsx
@@ -8,8 +8,10 @@ import {
Rss,
Star,
Tags,
+ Workflow,
} from 'lucide-react'
import { startTransition, useCallback, useEffect, useState } from 'react'
+import { Link } from 'react-router-dom'
import { toast } from 'sonner'
import { Badge } from '@/components/ui/badge'
@@ -35,7 +37,7 @@ import {
formatReviewStatus,
formatReviewType,
} from '@/lib/admin-format'
-import type { AdminDashboardResponse } from '@/lib/types'
+import type { AdminDashboardResponse, WorkerOverview } from '@/lib/types'
function StatCard({
label,
@@ -66,6 +68,7 @@ function StatCard({
export function DashboardPage() {
const [data, setData] = useState(null)
+ const [workerOverview, setWorkerOverview] = useState(null)
const [loading, setLoading] = useState(true)
const [refreshing, setRefreshing] = useState(false)
@@ -75,9 +78,13 @@ export function DashboardPage() {
setRefreshing(true)
}
- const next = await adminApi.dashboard()
+ const [next, nextWorkerOverview] = await Promise.all([
+ adminApi.dashboard(),
+ adminApi.getWorkersOverview(),
+ ])
startTransition(() => {
setData(next)
+ setWorkerOverview(nextWorkerOverview)
})
if (showToast) {
@@ -98,7 +105,7 @@ export function DashboardPage() {
void loadDashboard(false)
}, [loadDashboard])
- if (loading || !data) {
+ if (loading || !data || !workerOverview) {
return (
@@ -146,6 +153,12 @@ export function DashboardPage() {
note: data.stats.ai_enabled ? '知识库已启用' : 'AI 功能当前关闭',
icon: BrainCircuit,
},
+ {
+ label: 'Worker 活动',
+ value: workerOverview.active_jobs,
+ note: `失败 ${workerOverview.failed} / 运行 ${workerOverview.running}`,
+ icon: Workflow,
+ },
]
return (
@@ -314,6 +327,75 @@ export function DashboardPage() {
{data.site.ai_last_indexed_at ?? '站点还没有建立过索引。'}
+
+
+
+
+
+ Worker 健康
+
+
+ 当前排队 {workerOverview.queued}、运行 {workerOverview.running}、失败 {workerOverview.failed}。
+
+
+
+
+
+
+
+
Queued
+
{workerOverview.queued}
+
+
+
Running
+
{workerOverview.running}
+
+
+
Failed
+
{workerOverview.failed}
+
+
+
+ {workerOverview.worker_stats.length ? (
+
+ {workerOverview.worker_stats.slice(0, 3).map((item) => (
+
+
+
{item.label}
+
{item.worker_name}
+
+
+
Q {item.queued} · R {item.running}
+
ERR {item.failed}
+
+
+ ))}
+
+ ) : null}
+
diff --git a/admin/src/pages/media-page.tsx b/admin/src/pages/media-page.tsx
index 85e2f56..7fd2eba 100644
--- a/admin/src/pages/media-page.tsx
+++ b/admin/src/pages/media-page.tsx
@@ -1,6 +1,7 @@
import {
CheckSquare,
Copy,
+ Download,
Image as ImageIcon,
RefreshCcw,
Replace,
@@ -10,6 +11,7 @@ import {
Upload,
} from 'lucide-react'
import { startTransition, useCallback, useEffect, useMemo, useState } from 'react'
+import { Link } from 'react-router-dom'
import { toast } from 'sonner'
import { Badge } from '@/components/ui/badge'
@@ -58,6 +60,24 @@ const defaultMetadataForm: MediaMetadataFormState = {
notes: '',
}
+type RemoteDownloadFormState = {
+ sourceUrl: string
+ title: string
+ altText: string
+ caption: string
+ tags: string
+ notes: string
+}
+
+const defaultRemoteDownloadForm: RemoteDownloadFormState = {
+ sourceUrl: '',
+ title: '',
+ altText: '',
+ caption: '',
+ tags: '',
+ notes: '',
+}
+
function normalizeMediaTags(value: unknown): string[] {
if (!Array.isArray(value)) {
return []
@@ -121,6 +141,11 @@ export function MediaPage() {
const [metadataSaving, setMetadataSaving] = useState(false)
const [compressBeforeUpload, setCompressBeforeUpload] = useState(true)
const [compressQuality, setCompressQuality] = useState('0.82')
+ const [remoteDownloadForm, setRemoteDownloadForm] = useState(
+ defaultRemoteDownloadForm,
+ )
+ const [downloadingRemote, setDownloadingRemote] = useState(false)
+ const [lastRemoteDownloadJobId, setLastRemoteDownloadJobId] = useState(null)
const loadItems = useCallback(async (showToast = false) => {
try {
@@ -352,6 +377,147 @@ export function MediaPage() {
: ''}
) : null}
+
+
+
+
远程抓取到媒体库
+
+ 输入可访问的图片 / PDF 直链后,会创建异步 worker 任务;下载完成后写入当前目录前缀,并同步媒体元数据。
+
+
+
+
+
+
+
+ {lastRemoteDownloadJobId ? (
+
+ ) : null}
+
+
diff --git a/admin/src/pages/subscriptions-page.tsx b/admin/src/pages/subscriptions-page.tsx
index cb2da5f..f4e3ce3 100644
--- a/admin/src/pages/subscriptions-page.tsx
+++ b/admin/src/pages/subscriptions-page.tsx
@@ -1,5 +1,6 @@
import { BellRing, MailPlus, Pencil, RefreshCcw, Save, Send, Trash2, X } from 'lucide-react'
import { startTransition, useCallback, useEffect, useMemo, useState } from 'react'
+import { Link } from 'react-router-dom'
import { toast } from 'sonner'
import { Badge } from '@/components/ui/badge'
@@ -19,7 +20,7 @@ import {
} from '@/components/ui/table'
import { Textarea } from '@/components/ui/textarea'
import { adminApi, ApiError } from '@/lib/api'
-import type { NotificationDeliveryRecord, SubscriptionRecord } from '@/lib/types'
+import type { NotificationDeliveryRecord, SubscriptionRecord, WorkerJobRecord } from '@/lib/types'
const CHANNEL_OPTIONS = [
{ value: 'email', label: 'Email' },
@@ -80,6 +81,8 @@ export function SubscriptionsPage() {
const [digesting, setDigesting] = useState<'weekly' | 'monthly' | null>(null)
const [actioningId, setActioningId] = useState(null)
const [editingId, setEditingId] = useState(null)
+ const [workerJobs, setWorkerJobs] = useState([])
+ const [lastActionJobId, setLastActionJobId] = useState(null)
const [form, setForm] = useState(emptyForm())
const loadData = useCallback(async (showToast = false) => {
@@ -87,13 +90,18 @@ export function SubscriptionsPage() {
if (showToast) {
setRefreshing(true)
}
- const [nextSubscriptions, nextDeliveries] = await Promise.all([
+ const [nextSubscriptions, nextDeliveries, nextWorkerJobs] = await Promise.all([
adminApi.listSubscriptions(),
adminApi.listSubscriptionDeliveries(),
+ adminApi.listWorkerJobs({
+ workerName: 'worker.notification_delivery',
+ limit: 200,
+ }),
])
startTransition(() => {
setSubscriptions(nextSubscriptions)
setDeliveries(nextDeliveries)
+ setWorkerJobs(nextWorkerJobs.jobs)
})
if (showToast) {
toast.success('订阅中心已刷新。')
@@ -123,6 +131,17 @@ export function SubscriptionsPage() {
[deliveries],
)
+ const deliveryJobMap = useMemo(() => {
+ const map = new Map()
+ for (const item of workerJobs) {
+ const relatedId = Number.parseInt(String(item.related_entity_id || ''), 10)
+ if (Number.isFinite(relatedId) && !map.has(relatedId)) {
+ map.set(relatedId, item)
+ }
+ }
+ return map
+ }, [workerJobs])
+
const resetForm = useCallback(() => {
setEditingId(null)
setForm(emptyForm())
@@ -192,8 +211,9 @@ export function SubscriptionsPage() {
onClick={async () => {
try {
setDigesting('weekly')
- const result = await adminApi.sendSubscriptionDigest('weekly')
- toast.success(`周报已入队:queued ${result.queued},skipped ${result.skipped}`)
+ const result = await adminApi.runDigestWorker('weekly')
+ setLastActionJobId(result.job.id)
+ toast.success(`周报任务已入队:#${result.job.id}`)
await loadData(false)
} catch (error) {
toast.error(error instanceof ApiError ? error.message : '发送周报失败。')
@@ -211,8 +231,9 @@ export function SubscriptionsPage() {
onClick={async () => {
try {
setDigesting('monthly')
- const result = await adminApi.sendSubscriptionDigest('monthly')
- toast.success(`月报已入队:queued ${result.queued},skipped ${result.skipped}`)
+ const result = await adminApi.runDigestWorker('monthly')
+ setLastActionJobId(result.job.id)
+ toast.success(`月报任务已入队:#${result.job.id}`)
await loadData(false)
} catch (error) {
toast.error(error instanceof ApiError ? error.message : '发送月报失败。')
@@ -224,6 +245,11 @@ export function SubscriptionsPage() {
{digesting === 'monthly' ? '入队中...' : '发送月报'}
+ {lastActionJobId ? (
+
+ ) : null}
@@ -414,8 +440,15 @@ export function SubscriptionsPage() {
onClick={async () => {
try {
setActioningId(item.id)
- await adminApi.testSubscription(item.id)
- toast.success('测试通知已入队。')
+ const result = await adminApi.testSubscription(item.id)
+ if (result.job_id) {
+ setLastActionJobId(result.job_id)
+ }
+ toast.success(
+ result.job_id
+ ? `测试通知已入队:#${result.job_id}`
+ : '测试通知已入队。',
+ )
await loadData(false)
} catch (error) {
toast.error(error instanceof ApiError ? error.message : '测试发送失败。')
@@ -478,11 +511,14 @@ export function SubscriptionsPage() {
频道
状态
重试
+ Worker
响应
- {deliveries.map((item) => (
+ {deliveries.map((item) => {
+ const workerJob = deliveryJobMap.get(item.id)
+ return (
{item.delivered_at ?? item.created_at}
@@ -504,11 +540,26 @@ export function SubscriptionsPage() {
attempts: {item.attempts_count}
next: {item.next_retry_at ?? '—'}
+
+ {workerJob ? (
+
+ ) : (
+ —
+ )}
+
{item.response_text ?? '—'}
- ))}
+ )
+ })}
diff --git a/admin/src/pages/workers-page.tsx b/admin/src/pages/workers-page.tsx
new file mode 100644
index 0000000..a779043
--- /dev/null
+++ b/admin/src/pages/workers-page.tsx
@@ -0,0 +1,529 @@
+import {
+ LoaderCircle,
+ RefreshCcw,
+ RotateCcw,
+ Send,
+ SquareTerminal,
+ StopCircle,
+ TimerReset,
+ Workflow,
+} from 'lucide-react'
+import { startTransition, useCallback, useEffect, useMemo, useState } from 'react'
+import { toast } from 'sonner'
+
+import { Badge } from '@/components/ui/badge'
+import { Button } from '@/components/ui/button'
+import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'
+import { Input } from '@/components/ui/input'
+import { Select } from '@/components/ui/select'
+import { Skeleton } from '@/components/ui/skeleton'
+import {
+ Table,
+ TableBody,
+ TableCell,
+ TableHead,
+ TableHeader,
+ TableRow,
+} from '@/components/ui/table'
+import { adminApi, ApiError } from '@/lib/api'
+import type { WorkerJobRecord, WorkerOverview } from '@/lib/types'
+import { cn } from '@/lib/utils'
+import { useSearchParams } from 'react-router-dom'
+
+function prettyJson(value: unknown) {
+ if (value === null || value === undefined) {
+ return '—'
+ }
+
+ try {
+ return JSON.stringify(value, null, 2)
+ } catch {
+ return String(value)
+ }
+}
+
+function statusVariant(status: string) {
+ switch (status) {
+ case 'succeeded':
+ return 'success' as const
+ case 'running':
+ return 'default' as const
+ case 'queued':
+ return 'secondary' as const
+ case 'failed':
+ return 'danger' as const
+ case 'cancelled':
+ return 'warning' as const
+ default:
+ return 'outline' as const
+ }
+}
+
+const EMPTY_OVERVIEW: WorkerOverview = {
+ total_jobs: 0,
+ queued: 0,
+ running: 0,
+ succeeded: 0,
+ failed: 0,
+ cancelled: 0,
+ active_jobs: 0,
+ worker_stats: [],
+ catalog: [],
+}
+
+export function WorkersPage() {
+ const [searchParams] = useSearchParams()
+ const [overview, setOverview] = useState(EMPTY_OVERVIEW)
+ const [jobs, setJobs] = useState([])
+ const [total, setTotal] = useState(0)
+ const [loading, setLoading] = useState(true)
+ const [refreshing, setRefreshing] = useState(false)
+ const [actioning, setActioning] = useState(null)
+ const [selectedJobId, setSelectedJobId] = useState(null)
+ const [statusFilter, setStatusFilter] = useState(searchParams.get('status') || 'all')
+ const [kindFilter, setKindFilter] = useState(searchParams.get('kind') || 'all')
+ const [workerFilter, setWorkerFilter] = useState(searchParams.get('worker') || 'all')
+ const [search, setSearch] = useState(searchParams.get('search') || '')
+
+ const requestedJobId = useMemo(() => {
+ const raw = Number.parseInt(searchParams.get('job') || '', 10)
+ return Number.isFinite(raw) ? raw : null
+ }, [searchParams])
+
+ useEffect(() => {
+ setStatusFilter(searchParams.get('status') || 'all')
+ setKindFilter(searchParams.get('kind') || 'all')
+ setWorkerFilter(searchParams.get('worker') || 'all')
+ setSearch(searchParams.get('search') || '')
+ }, [searchParams])
+
+ const loadData = useCallback(async (showToast = false) => {
+ try {
+ if (showToast) {
+ setRefreshing(true)
+ }
+ const [nextOverview, nextJobs] = await Promise.all([
+ adminApi.getWorkersOverview(),
+ adminApi.listWorkerJobs({
+ status: statusFilter === 'all' ? undefined : statusFilter,
+ jobKind: kindFilter === 'all' ? undefined : kindFilter,
+ workerName: workerFilter === 'all' ? undefined : workerFilter,
+ search: search.trim() || undefined,
+ limit: 120,
+ }),
+ ])
+ let nextJobList = nextJobs.jobs
+ if (requestedJobId && !nextJobList.some((item) => item.id === requestedJobId)) {
+ try {
+ const requestedJob = await adminApi.getWorkerJob(requestedJobId)
+ nextJobList = [requestedJob, ...nextJobList]
+ } catch {
+ // ignore deep-link miss and keep current list
+ }
+ }
+ startTransition(() => {
+ setOverview(nextOverview)
+ setJobs(nextJobList)
+ setTotal(nextJobs.total)
+ })
+ if (showToast) {
+ toast.success('Worker 管理面板已刷新。')
+ }
+ } catch (error) {
+ toast.error(error instanceof ApiError ? error.message : '无法加载 worker 面板。')
+ } finally {
+ setLoading(false)
+ setRefreshing(false)
+ }
+ }, [kindFilter, requestedJobId, search, statusFilter, workerFilter])
+
+ useEffect(() => {
+ void loadData(false)
+ }, [loadData])
+
+ useEffect(() => {
+ const timer = window.setInterval(() => {
+ void loadData(false)
+ }, 5000)
+
+ return () => window.clearInterval(timer)
+ }, [loadData])
+
+ useEffect(() => {
+ setSelectedJobId((current) => {
+ if (requestedJobId) {
+ return requestedJobId
+ }
+ if (current && jobs.some((item) => item.id === current)) {
+ return current
+ }
+ return jobs[0]?.id ?? null
+ })
+ }, [jobs, requestedJobId])
+
+ const selectedJob = useMemo(
+ () => jobs.find((item) => item.id === selectedJobId) ?? null,
+ [jobs, selectedJobId],
+ )
+
+ const runTask = useCallback(async (task: 'weekly' | 'monthly' | 'retry') => {
+ try {
+ setActioning(task)
+ const result =
+ task === 'retry'
+ ? await adminApi.runRetryDeliveriesWorker(80)
+ : await adminApi.runDigestWorker(task)
+ toast.success(`已入队:#${result.job.id} ${result.job.display_name ?? result.job.worker_name}`)
+ await loadData(false)
+ setSelectedJobId(result.job.id)
+ } catch (error) {
+ toast.error(error instanceof ApiError ? error.message : '任务入队失败。')
+ } finally {
+ setActioning(null)
+ }
+ }, [loadData])
+
+ const workerOptions = overview.catalog.map((item) => ({
+ value: item.worker_name,
+ label: item.label,
+ }))
+
+ if (loading) {
+ return (
+
+
+
+
+ )
+ }
+
+ return (
+
+
+
+
Workers / Queue
+
+
异步 Worker 控制台
+
+ 统一查看后台下载、通知投递与 digest / 重试任务;支持筛选、查看详情、取消、重跑与手动触发。
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {[
+ { label: '总任务', value: overview.total_jobs, hint: `${overview.worker_stats.length} 种 worker`, icon: SquareTerminal },
+ { label: '排队中', value: overview.queued, hint: 'queued', icon: LoaderCircle },
+ { label: '运行中', value: overview.running, hint: 'running', icon: Workflow },
+ { label: '成功', value: overview.succeeded, hint: 'succeeded', icon: Send },
+ { label: '失败', value: overview.failed, hint: 'failed', icon: RotateCcw },
+ { label: '已取消', value: overview.cancelled, hint: 'cancelled', icon: StopCircle },
+ ].map((item) => {
+ const Icon = item.icon
+ return (
+
+
+
+
{item.label}
+
{item.value}
+
{item.hint}
+
+
+
+
+
+
+ )
+ })}
+
+
+
+
+ Worker 分类视图
+ 快速看每类 worker / task 当前堆积、失败与最近执行情况。
+
+
+ {overview.worker_stats.map((item) => (
+
+ ))}
+
+
+
+
+
+
+ 任务历史
+ 当前筛选后共 {total} 条,列表保留最近 120 条任务记录。
+
+
+
+
+
+
+ setSearch(event.target.value)}
+ />
+
+
+
+
+
+ ID
+ 任务
+ 状态
+ 来源
+ 实体
+ 时间
+
+
+
+ {jobs.map((item) => (
+ setSelectedJobId(item.id)}
+ >
+ #{item.id}
+
+
+
{item.display_name ?? item.worker_name}
+
{item.worker_name}
+
+
+
+
+
{item.status}
+ {item.cancel_requested ?
cancel requested
: null}
+
+
+
+
+
{item.job_kind}
+
{item.requested_by ?? 'system'}
+
+
+
+ {item.related_entity_type && item.related_entity_id
+ ? `${item.related_entity_type}:${item.related_entity_id}`
+ : '—'}
+
+
+ {item.queued_at ?? item.created_at}
+ done: {item.finished_at ?? '—'}
+
+
+ ))}
+ {!jobs.length ? (
+
+
+ 当前筛选没有匹配任务。
+
+
+ ) : null}
+
+
+
+
+
+
+
+ 任务详情
+ 查看 payload / result / error,并对单个任务执行取消或重跑。
+
+
+ {selectedJob ? (
+
+
+ #{selectedJob.id}
+ {selectedJob.status}
+ {selectedJob.job_kind}
+
+
+
+
+ {selectedJob.display_name ?? selectedJob.worker_name}
+
+
{selectedJob.worker_name}
+
+
+
+ {[
+ ['请求人', selectedJob.requested_by ?? 'system'],
+ ['来源', selectedJob.requested_source ?? 'system'],
+ ['关联实体', selectedJob.related_entity_type && selectedJob.related_entity_id ? `${selectedJob.related_entity_type}:${selectedJob.related_entity_id}` : '—'],
+ ['尝试次数', `${selectedJob.attempts_count} / ${selectedJob.max_attempts}`],
+ ['排队时间', selectedJob.queued_at ?? selectedJob.created_at],
+ ['开始时间', selectedJob.started_at ?? '—'],
+ ['完成时间', selectedJob.finished_at ?? '—'],
+ ['上游任务', selectedJob.parent_job_id ? `#${selectedJob.parent_job_id}` : '—'],
+ ].map(([label, value]) => (
+
+ ))}
+
+
+
+
+
+
+
+
+
+
Payload
+
{prettyJson(selectedJob.payload)}
+
+
+
+
Result
+
{prettyJson(selectedJob.result)}
+
+
+
+
Error
+
{selectedJob.error_text ?? '—'}
+
+
+
+ ) : (
+
+ 暂无可查看的任务详情。
+
+ )}
+
+
+
+
+ )
+}
diff --git a/backend/Dockerfile b/backend/Dockerfile
index 835513f..1e6cc14 100644
--- a/backend/Dockerfile
+++ b/backend/Dockerfile
@@ -1,5 +1,3 @@
-# syntax=docker/dockerfile:1.7
-
FROM rust:1.94-trixie AS chef
RUN cargo install cargo-chef --locked
WORKDIR /app
diff --git a/backend/migration/src/lib.rs b/backend/migration/src/lib.rs
index f1a0bfa..8a7bd0f 100644
--- a/backend/migration/src/lib.rs
+++ b/backend/migration/src/lib.rs
@@ -43,6 +43,7 @@ mod m20260401_000032_add_runtime_security_keys_to_site_settings;
mod m20260401_000033_add_taxonomy_metadata_and_media_assets;
mod m20260401_000034_add_source_markdown_to_posts;
mod m20260401_000035_add_human_verification_modes_to_site_settings;
+mod m20260402_000036_create_worker_jobs;
pub struct Migrator;
#[async_trait::async_trait]
@@ -90,6 +91,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260401_000033_add_taxonomy_metadata_and_media_assets::Migration),
Box::new(m20260401_000034_add_source_markdown_to_posts::Migration),
Box::new(m20260401_000035_add_human_verification_modes_to_site_settings::Migration),
+ Box::new(m20260402_000036_create_worker_jobs::Migration),
// inject-above (do not remove this comment)
]
}
diff --git a/backend/migration/src/m20260402_000036_create_worker_jobs.rs b/backend/migration/src/m20260402_000036_create_worker_jobs.rs
new file mode 100644
index 0000000..fed958a
--- /dev/null
+++ b/backend/migration/src/m20260402_000036_create_worker_jobs.rs
@@ -0,0 +1,98 @@
+use loco_rs::schema::*;
+use sea_orm_migration::prelude::*;
+
+#[derive(DeriveMigrationName)]
+pub struct Migration;
+
+#[async_trait::async_trait]
+impl MigrationTrait for Migration {
+ async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+ create_table(
+ manager,
+ "worker_jobs",
+ &[
+ ("id", ColType::PkAuto),
+ ("parent_job_id", ColType::IntegerNull),
+ ("job_kind", ColType::String),
+ ("worker_name", ColType::String),
+ ("display_name", ColType::StringNull),
+ ("status", ColType::String),
+ ("queue_name", ColType::StringNull),
+ ("requested_by", ColType::StringNull),
+ ("requested_source", ColType::StringNull),
+ ("trigger_mode", ColType::StringNull),
+ ("payload", ColType::JsonBinaryNull),
+ ("result", ColType::JsonBinaryNull),
+ ("error_text", ColType::TextNull),
+ ("tags", ColType::JsonBinaryNull),
+ ("related_entity_type", ColType::StringNull),
+ ("related_entity_id", ColType::StringNull),
+ ("attempts_count", ColType::Integer),
+ ("max_attempts", ColType::Integer),
+ ("cancel_requested", ColType::Boolean),
+ ("queued_at", ColType::StringNull),
+ ("started_at", ColType::StringNull),
+ ("finished_at", ColType::StringNull),
+ ],
+ &[],
+ )
+ .await?;
+
+ for (name, columns) in [
+ (
+ "idx_worker_jobs_status_created_at",
+ vec![Alias::new("status"), Alias::new("created_at")],
+ ),
+ (
+ "idx_worker_jobs_worker_status_created_at",
+ vec![
+ Alias::new("worker_name"),
+ Alias::new("status"),
+ Alias::new("created_at"),
+ ],
+ ),
+ (
+ "idx_worker_jobs_kind_created_at",
+ vec![Alias::new("job_kind"), Alias::new("created_at")],
+ ),
+ (
+ "idx_worker_jobs_related_entity",
+ vec![Alias::new("related_entity_type"), Alias::new("related_entity_id")],
+ ),
+ (
+ "idx_worker_jobs_parent_job_id",
+ vec![Alias::new("parent_job_id")],
+ ),
+ ] {
+ let mut statement = Index::create();
+ statement.name(name).table(Alias::new("worker_jobs"));
+ for column in columns {
+ statement.col(column);
+ }
+ manager.create_index(statement.to_owned()).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+ for index_name in [
+ "idx_worker_jobs_parent_job_id",
+ "idx_worker_jobs_related_entity",
+ "idx_worker_jobs_kind_created_at",
+ "idx_worker_jobs_worker_status_created_at",
+ "idx_worker_jobs_status_created_at",
+ ] {
+ manager
+ .drop_index(
+ Index::drop()
+ .name(index_name)
+ .table(Alias::new("worker_jobs"))
+ .to_owned(),
+ )
+ .await?;
+ }
+
+ drop_table(manager, "worker_jobs").await
+ }
+}
diff --git a/backend/src/controllers/admin_api.rs b/backend/src/controllers/admin_api.rs
index 5ccf42d..1ed3ff7 100644
--- a/backend/src/controllers/admin_api.rs
+++ b/backend/src/controllers/admin_api.rs
@@ -22,7 +22,10 @@ use crate::{
ai_chunks, comment_blacklist, comment_persona_analysis_logs, comments, friend_links, posts,
reviews,
},
- services::{admin_audit, ai, analytics, comment_guard, content, media_assets, storage},
+ services::{
+ admin_audit, ai, analytics, comment_guard, content, media_assets, storage, worker_jobs,
+ },
+ workers::downloader::DownloadWorkerArgs,
};
#[derive(Clone, Debug, Deserialize)]
@@ -346,6 +349,30 @@ pub struct AdminMediaMetadataResponse {
pub notes: Option,
}
+#[derive(Clone, Debug, Deserialize)]
+pub struct AdminMediaDownloadPayload {
+ pub source_url: String,
+ #[serde(default)]
+ pub prefix: Option,
+ #[serde(default)]
+ pub title: Option,
+ #[serde(default)]
+ pub alt_text: Option,
+ #[serde(default)]
+ pub caption: Option,
+ #[serde(default)]
+ pub tags: Option>,
+ #[serde(default)]
+ pub notes: Option,
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct AdminMediaDownloadResponse {
+ pub queued: bool,
+ pub job_id: i32,
+ pub status: String,
+}
+
#[derive(Clone, Debug, Deserialize)]
pub struct AdminMediaListQuery {
pub prefix: Option,
@@ -1457,6 +1484,55 @@ pub async fn replace_media_object(
})
}
+#[debug_handler]
+pub async fn download_media_object(
+ headers: HeaderMap,
+ State(ctx): State,
+ Json(payload): Json,
+) -> Result {
+ let actor = check_auth(&headers)?;
+ let worker_args = DownloadWorkerArgs {
+ source_url: payload.source_url.clone(),
+ prefix: payload.prefix.clone(),
+ title: payload.title.clone(),
+ alt_text: payload.alt_text.clone(),
+ caption: payload.caption.clone(),
+ tags: payload.tags.unwrap_or_default(),
+ notes: payload.notes.clone(),
+ job_id: None,
+ };
+ let job = worker_jobs::queue_download_job(
+ &ctx,
+ &worker_args,
+ Some(actor.username.clone()),
+ Some(actor.source.clone()),
+ None,
+ Some("manual".to_string()),
+ )
+ .await?;
+
+ admin_audit::log_event(
+ &ctx,
+ Some(&actor),
+ "media.download",
+ "media",
+ Some(job.id.to_string()),
+ Some(payload.source_url.clone()),
+ Some(serde_json::json!({
+ "job_id": job.id,
+ "queued": true,
+ "source_url": payload.source_url,
+ })),
+ )
+ .await?;
+
+ format::json(AdminMediaDownloadResponse {
+ queued: true,
+ job_id: job.id,
+ status: job.status,
+ })
+}
+
#[debug_handler]
pub async fn list_comment_blacklist(
headers: HeaderMap,
@@ -1982,6 +2058,7 @@ pub fn routes() -> Routes {
"/storage/media/metadata",
patch(update_media_object_metadata),
)
+ .add("/storage/media/download", post(download_media_object))
.add("/storage/media/replace", post(replace_media_object))
.add(
"/comments/blacklist",
diff --git a/backend/src/controllers/admin_ops.rs b/backend/src/controllers/admin_ops.rs
index c55acbb..2d6fa6c 100644
--- a/backend/src/controllers/admin_ops.rs
+++ b/backend/src/controllers/admin_ops.rs
@@ -13,7 +13,7 @@ use crate::{
},
services::{
admin_audit, backups, post_revisions as revision_service,
- subscriptions as subscription_service,
+ subscriptions as subscription_service, worker_jobs,
},
};
@@ -35,6 +35,15 @@ pub struct DeliveriesQuery {
pub limit: Option,
}
+#[derive(Clone, Debug, Default, Deserialize)]
+pub struct WorkerJobsQuery {
+ pub status: Option,
+ pub job_kind: Option,
+ pub worker_name: Option,
+ pub search: Option,
+ pub limit: Option,
+}
+
#[derive(Clone, Debug, Deserialize)]
pub struct SubscriptionPayload {
#[serde(alias = "channelType")]
@@ -85,6 +94,11 @@ pub struct DigestDispatchRequest {
pub period: Option,
}
+#[derive(Clone, Debug, Default, Deserialize)]
+pub struct RetryDeliveriesRequest {
+ pub limit: Option,
+}
+
#[derive(Clone, Debug, Deserialize)]
pub struct SiteBackupImportRequest {
pub backup: backups::SiteBackupDocument,
@@ -132,6 +146,12 @@ pub struct DeliveryListResponse {
pub deliveries: Vec,
}
+#[derive(Clone, Debug, Serialize)]
+pub struct WorkerTaskActionResponse {
+ pub queued: bool,
+ pub job: worker_jobs::WorkerJobRecord,
+}
+
fn trim_to_option(value: Option) -> Option {
value.and_then(|item| {
let trimmed = item.trim().to_string();
@@ -408,6 +428,13 @@ pub async fn test_subscription(
.ok_or(Error::NotFound)?;
let delivery = subscription_service::send_test_notification(&ctx, &item).await?;
+ let job = worker_jobs::find_latest_job_by_related_entity(
+ &ctx,
+ "notification_delivery",
+ &delivery.id.to_string(),
+ Some(worker_jobs::WORKER_NOTIFICATION_DELIVERY),
+ )
+ .await?;
admin_audit::log_event(
&ctx,
Some(&actor),
@@ -419,7 +446,12 @@ pub async fn test_subscription(
)
.await?;
- format::json(serde_json::json!({ "queued": true, "id": item.id, "delivery_id": delivery.id }))
+ format::json(serde_json::json!({
+ "queued": true,
+ "id": item.id,
+ "delivery_id": delivery.id,
+ "job_id": job.as_ref().map(|value| value.id),
+ }))
}
#[debug_handler]
@@ -450,6 +482,162 @@ pub async fn send_subscription_digest(
format::json(summary)
}
+#[debug_handler]
+pub async fn workers_overview(
+ headers: HeaderMap,
+ State(ctx): State,
+) -> Result {
+ check_auth(&headers)?;
+ format::json(worker_jobs::get_overview(&ctx).await?)
+}
+
+#[debug_handler]
+pub async fn list_worker_jobs(
+ headers: HeaderMap,
+ Query(query): Query,
+ State(ctx): State,
+) -> Result {
+ check_auth(&headers)?;
+ format::json(
+ worker_jobs::list_jobs(
+ &ctx,
+ worker_jobs::WorkerJobListQuery {
+ status: query.status,
+ job_kind: query.job_kind,
+ worker_name: query.worker_name,
+ search: query.search,
+ limit: query.limit,
+ },
+ )
+ .await?,
+ )
+}
+
+#[debug_handler]
+pub async fn get_worker_job(
+ headers: HeaderMap,
+ Path(id): Path,
+ State(ctx): State,
+) -> Result {
+ check_auth(&headers)?;
+ format::json(worker_jobs::get_job_record(&ctx, id).await?)
+}
+
+#[debug_handler]
+pub async fn cancel_worker_job(
+ headers: HeaderMap,
+ Path(id): Path,
+ State(ctx): State,
+) -> Result {
+ let actor = check_auth(&headers)?;
+ let updated = worker_jobs::request_cancel(&ctx, id).await?;
+
+ admin_audit::log_event(
+ &ctx,
+ Some(&actor),
+ "worker.cancel",
+ "worker_job",
+ Some(id.to_string()),
+ Some(updated.worker_name.clone()),
+ Some(serde_json::json!({ "status": updated.status })),
+ )
+ .await?;
+
+ format::json(updated)
+}
+
+#[debug_handler]
+pub async fn retry_worker_job(
+ headers: HeaderMap,
+ Path(id): Path,
+ State(ctx): State,
+) -> Result {
+ let actor = check_auth(&headers)?;
+ let job = worker_jobs::retry_job(
+ &ctx,
+ id,
+ Some(actor.username.clone()),
+ Some(actor.source.clone()),
+ )
+ .await?;
+
+ admin_audit::log_event(
+ &ctx,
+ Some(&actor),
+ "worker.retry",
+ "worker_job",
+ Some(job.id.to_string()),
+ Some(job.worker_name.clone()),
+ Some(serde_json::json!({ "source_job_id": id })),
+ )
+ .await?;
+
+ format::json(WorkerTaskActionResponse { queued: true, job })
+}
+
+#[debug_handler]
+pub async fn run_retry_deliveries_job(
+ headers: HeaderMap,
+ State(ctx): State,
+ Json(payload): Json,
+) -> Result {
+ let actor = check_auth(&headers)?;
+ let job = worker_jobs::spawn_retry_deliveries_task(
+ &ctx,
+ payload.limit,
+ Some(actor.username.clone()),
+ Some(actor.source.clone()),
+ None,
+ Some("manual".to_string()),
+ )
+ .await?;
+
+ admin_audit::log_event(
+ &ctx,
+ Some(&actor),
+ "worker.task.retry_deliveries",
+ "worker_job",
+ Some(job.id.to_string()),
+ Some(job.worker_name.clone()),
+ Some(serde_json::json!({ "limit": payload.limit })),
+ )
+ .await?;
+
+ format::json(WorkerTaskActionResponse { queued: true, job })
+}
+
+#[debug_handler]
+pub async fn run_digest_worker_job(
+ headers: HeaderMap,
+ State(ctx): State,
+ Json(payload): Json,
+) -> Result {
+ let actor = check_auth(&headers)?;
+ let period = payload.period.unwrap_or_else(|| "weekly".to_string());
+ let job = worker_jobs::spawn_digest_task(
+ &ctx,
+ &period,
+ Some(actor.username.clone()),
+ Some(actor.source.clone()),
+ None,
+ Some("manual".to_string()),
+ )
+ .await?;
+
+ admin_audit::log_event(
+ &ctx,
+ Some(&actor),
+ "worker.task.digest",
+ "worker_job",
+ Some(job.id.to_string()),
+ Some(job.worker_name.clone()),
+ Some(serde_json::json!({ "period": period })),
+ )
+ .await?;
+
+ format::json(WorkerTaskActionResponse { queued: true, job })
+}
+
#[debug_handler]
pub async fn export_site_backup(
headers: HeaderMap,
@@ -481,6 +669,13 @@ pub fn routes() -> Routes {
.add("/subscriptions/digest", post(send_subscription_digest))
.add("/subscriptions/{id}", patch(update_subscription).delete(delete_subscription))
.add("/subscriptions/{id}/test", post(test_subscription))
+ .add("/workers/overview", get(workers_overview))
+ .add("/workers/jobs", get(list_worker_jobs))
+ .add("/workers/jobs/{id}", get(get_worker_job))
+ .add("/workers/jobs/{id}/cancel", post(cancel_worker_job))
+ .add("/workers/jobs/{id}/retry", post(retry_worker_job))
+ .add("/workers/tasks/retry-deliveries", post(run_retry_deliveries_job))
+ .add("/workers/tasks/digest", post(run_digest_worker_job))
.add("/site-backup/export", get(export_site_backup))
.add("/site-backup/import", post(import_site_backup))
}
diff --git a/backend/src/controllers/review.rs b/backend/src/controllers/review.rs
index a95d491..2b7c50a 100644
--- a/backend/src/controllers/review.rs
+++ b/backend/src/controllers/review.rs
@@ -7,12 +7,19 @@ use sea_orm::{EntityTrait, QueryOrder, Set};
use serde::{Deserialize, Serialize};
use crate::{
- controllers::admin::check_auth,
+ controllers::admin::{check_auth, resolve_admin_identity},
models::_entities::reviews::{self, Entity as ReviewEntity},
services::{admin_audit, storage},
};
-#[derive(Serialize, Deserialize, Debug)]
+fn is_public_review_status(status: Option<&str>) -> bool {
+ matches!(
+ status.unwrap_or_default().trim().to_ascii_lowercase().as_str(),
+ "published" | "completed" | "done"
+ )
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CreateReviewRequest {
pub title: String,
pub review_type: String,
@@ -25,7 +32,7 @@ pub struct CreateReviewRequest {
pub link_url: Option,
}
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UpdateReviewRequest {
pub title: Option,
pub review_type: Option,
@@ -38,23 +45,30 @@ pub struct UpdateReviewRequest {
pub link_url: Option,
}
-pub async fn list(State(ctx): State) -> Result {
+pub async fn list(headers: HeaderMap, State(ctx): State) -> Result {
+ let include_private = resolve_admin_identity(&headers).is_some();
let reviews = ReviewEntity::find()
.order_by_desc(reviews::Column::CreatedAt)
.all(&ctx.db)
- .await?;
+ .await?
+ .into_iter()
+ .filter(|review| include_private || is_public_review_status(review.status.as_deref()))
+ .collect::>();
format::json(reviews)
}
pub async fn get_one(
+ headers: HeaderMap,
Path(id): Path,
State(ctx): State,
) -> Result {
+ let include_private = resolve_admin_identity(&headers).is_some();
let review = ReviewEntity::find_by_id(id).one(&ctx.db).await?;
match review {
- Some(r) => format::json(r),
+ Some(r) if include_private || is_public_review_status(r.status.as_deref()) => format::json(r),
+ Some(_) => Err(Error::NotFound),
None => Err(Error::NotFound),
}
}
diff --git a/backend/src/models/_entities/mod.rs b/backend/src/models/_entities/mod.rs
index 4c9a987..5474404 100644
--- a/backend/src/models/_entities/mod.rs
+++ b/backend/src/models/_entities/mod.rs
@@ -20,3 +20,4 @@ pub mod site_settings;
pub mod subscriptions;
pub mod tags;
pub mod users;
+pub mod worker_jobs;
diff --git a/backend/src/models/_entities/prelude.rs b/backend/src/models/_entities/prelude.rs
index aaf8fc5..4de8aa5 100644
--- a/backend/src/models/_entities/prelude.rs
+++ b/backend/src/models/_entities/prelude.rs
@@ -18,3 +18,4 @@ pub use super::site_settings::Entity as SiteSettings;
pub use super::subscriptions::Entity as Subscriptions;
pub use super::tags::Entity as Tags;
pub use super::users::Entity as Users;
+pub use super::worker_jobs::Entity as WorkerJobs;
diff --git a/backend/src/models/_entities/worker_jobs.rs b/backend/src/models/_entities/worker_jobs.rs
new file mode 100644
index 0000000..8354fa6
--- /dev/null
+++ b/backend/src/models/_entities/worker_jobs.rs
@@ -0,0 +1,43 @@
+//! `SeaORM` Entity, manually maintained
+
+use sea_orm::entity::prelude::*;
+use serde::{Deserialize, Serialize};
+
+#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
+#[sea_orm(table_name = "worker_jobs")]
+pub struct Model {
+ pub created_at: DateTimeWithTimeZone,
+ pub updated_at: DateTimeWithTimeZone,
+ #[sea_orm(primary_key)]
+ pub id: i32,
+ pub parent_job_id: Option,
+ pub job_kind: String,
+ pub worker_name: String,
+ pub display_name: Option,
+ pub status: String,
+ pub queue_name: Option,
+ pub requested_by: Option,
+ pub requested_source: Option,
+ pub trigger_mode: Option,
+ #[sea_orm(column_type = "JsonBinary", nullable)]
+ pub payload: Option,
+ #[sea_orm(column_type = "JsonBinary", nullable)]
+ pub result: Option,
+ #[sea_orm(column_type = "Text", nullable)]
+ pub error_text: Option,
+ #[sea_orm(column_type = "JsonBinary", nullable)]
+ pub tags: Option,
+ pub related_entity_type: Option,
+ pub related_entity_id: Option,
+ pub attempts_count: i32,
+ pub max_attempts: i32,
+ pub cancel_requested: bool,
+ pub queued_at: Option,
+ pub started_at: Option,
+ pub finished_at: Option,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {}
+
+impl ActiveModelBehavior for ActiveModel {}
diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs
index f360dff..b5d8692 100644
--- a/backend/src/services/mod.rs
+++ b/backend/src/services/mod.rs
@@ -12,3 +12,4 @@ pub mod storage;
pub mod subscriptions;
pub mod turnstile;
pub mod web_push;
+pub mod worker_jobs;
diff --git a/backend/src/services/subscriptions.rs b/backend/src/services/subscriptions.rs
index de9bf13..cd017dd 100644
--- a/backend/src/services/subscriptions.rs
+++ b/backend/src/services/subscriptions.rs
@@ -1,8 +1,5 @@
use chrono::{Duration, Utc};
-use loco_rs::{
- bgworker::BackgroundWorker,
- prelude::*,
-};
+use loco_rs::prelude::*;
use reqwest::Client;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, Order, QueryFilter, QueryOrder,
@@ -15,10 +12,7 @@ use uuid::Uuid;
use crate::{
mailers::subscription::SubscriptionMailer,
models::_entities::{notification_deliveries, posts, subscriptions},
- services::{content, web_push as web_push_service},
- workers::notification_delivery::{
- NotificationDeliveryWorker, NotificationDeliveryWorkerArgs,
- },
+ services::{content, web_push as web_push_service, worker_jobs},
};
pub const CHANNEL_EMAIL: &str = "email";
@@ -837,14 +831,16 @@ async fn update_subscription_delivery_state(
}
async fn enqueue_delivery(ctx: &AppContext, delivery_id: i32) -> Result<()> {
- match NotificationDeliveryWorker::perform_later(ctx, NotificationDeliveryWorkerArgs { delivery_id }).await {
- Ok(_) => Ok(()),
- Err(Error::QueueProviderMissing) => process_delivery(ctx, delivery_id).await,
- Err(error) => {
- tracing::warn!("failed to enqueue delivery #{delivery_id}, falling back to sync processing: {error}");
- process_delivery(ctx, delivery_id).await
- }
- }
+ let _ = worker_jobs::queue_notification_delivery_job(
+ ctx,
+ delivery_id,
+ None,
+ Some("system".to_string()),
+ None,
+ Some("system".to_string()),
+ )
+ .await?;
+ Ok(())
}
pub async fn queue_direct_notification(
diff --git a/backend/src/services/worker_jobs.rs b/backend/src/services/worker_jobs.rs
new file mode 100644
index 0000000..4d856da
--- /dev/null
+++ b/backend/src/services/worker_jobs.rs
@@ -0,0 +1,835 @@
+use chrono::Utc;
+use loco_rs::{
+ bgworker::BackgroundWorker,
+ prelude::*,
+};
+use sea_orm::{
+ ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, Order,
+ PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Set,
+};
+use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
+
+use crate::{
+ models::_entities::{notification_deliveries, worker_jobs},
+ services::subscriptions,
+ workers::{
+ downloader::{DownloadWorker, DownloadWorkerArgs},
+ notification_delivery::{NotificationDeliveryWorker, NotificationDeliveryWorkerArgs},
+ },
+};
+
+pub const JOB_KIND_WORKER: &str = "worker";
+pub const JOB_KIND_TASK: &str = "task";
+
+pub const JOB_STATUS_QUEUED: &str = "queued";
+pub const JOB_STATUS_RUNNING: &str = "running";
+pub const JOB_STATUS_SUCCEEDED: &str = "succeeded";
+pub const JOB_STATUS_FAILED: &str = "failed";
+pub const JOB_STATUS_CANCELLED: &str = "cancelled";
+
+pub const WORKER_DOWNLOAD_MEDIA: &str = "worker.download_media";
+pub const WORKER_NOTIFICATION_DELIVERY: &str = "worker.notification_delivery";
+pub const TASK_RETRY_DELIVERIES: &str = "task.retry_deliveries";
+pub const TASK_SEND_WEEKLY_DIGEST: &str = "task.send_weekly_digest";
+pub const TASK_SEND_MONTHLY_DIGEST: &str = "task.send_monthly_digest";
+
+#[derive(Clone, Debug, Default)]
+pub struct WorkerJobListQuery {
+ pub status: Option,
+ pub job_kind: Option,
+ pub worker_name: Option,
+ pub search: Option,
+ pub limit: Option,
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct WorkerCatalogEntry {
+ pub worker_name: String,
+ pub job_kind: String,
+ pub label: String,
+ pub description: String,
+ pub queue_name: Option,
+ pub supports_cancel: bool,
+ pub supports_retry: bool,
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct WorkerStats {
+ pub worker_name: String,
+ pub job_kind: String,
+ pub label: String,
+ pub queued: usize,
+ pub running: usize,
+ pub succeeded: usize,
+ pub failed: usize,
+ pub cancelled: usize,
+ pub last_job_at: Option,
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct WorkerOverview {
+ pub total_jobs: usize,
+ pub queued: usize,
+ pub running: usize,
+ pub succeeded: usize,
+ pub failed: usize,
+ pub cancelled: usize,
+ pub active_jobs: usize,
+ pub worker_stats: Vec,
+ pub catalog: Vec,
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct WorkerJobRecord {
+ pub created_at: String,
+ pub updated_at: String,
+ pub id: i32,
+ pub parent_job_id: Option,
+ pub job_kind: String,
+ pub worker_name: String,
+ pub display_name: Option,
+ pub status: String,
+ pub queue_name: Option,
+ pub requested_by: Option,
+ pub requested_source: Option,
+ pub trigger_mode: Option,
+ pub payload: Option,
+ pub result: Option,
+ pub error_text: Option,
+ pub tags: Option,
+ pub related_entity_type: Option,
+ pub related_entity_id: Option,
+ pub attempts_count: i32,
+ pub max_attempts: i32,
+ pub cancel_requested: bool,
+ pub queued_at: Option,
+ pub started_at: Option,
+ pub finished_at: Option,
+ pub can_cancel: bool,
+ pub can_retry: bool,
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct WorkerJobListResult {
+ pub total: u64,
+ pub jobs: Vec,
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct WorkerTaskDispatchResult {
+ pub queued: bool,
+ pub job: WorkerJobRecord,
+}
+
+#[derive(Clone, Debug)]
+struct CreateWorkerJobInput {
+ parent_job_id: Option,
+ job_kind: String,
+ worker_name: String,
+ display_name: Option,
+ queue_name: Option,
+ requested_by: Option,
+ requested_source: Option,
+ trigger_mode: Option,
+ payload: Option,
+ tags: Option,
+ related_entity_type: Option,
+ related_entity_id: Option,
+ max_attempts: i32,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct RetryDeliveriesTaskPayload {
+ #[serde(default)]
+ limit: Option,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DigestTaskPayload {
+ period: String,
+}
+
+fn now_rfc3339() -> String {
+ Utc::now().to_rfc3339()
+}
+
+fn trim_to_option(value: Option) -> Option {
+ value.and_then(|item| {
+ let trimmed = item.trim().to_string();
+ if trimmed.is_empty() {
+ None
+ } else {
+ Some(trimmed)
+ }
+ })
+}
+
+fn queue_name_for(worker_name: &str) -> Option {
+ match worker_name {
+ WORKER_DOWNLOAD_MEDIA => Some("media".to_string()),
+ WORKER_NOTIFICATION_DELIVERY => Some("notifications".to_string()),
+ TASK_RETRY_DELIVERIES => Some("maintenance".to_string()),
+ TASK_SEND_WEEKLY_DIGEST | TASK_SEND_MONTHLY_DIGEST => Some("digests".to_string()),
+ _ => None,
+ }
+}
+
+fn label_for(worker_name: &str) -> String {
+ match worker_name {
+ WORKER_DOWNLOAD_MEDIA => "远程媒体下载".to_string(),
+ WORKER_NOTIFICATION_DELIVERY => "通知投递".to_string(),
+ TASK_RETRY_DELIVERIES => "重试待投递通知".to_string(),
+ TASK_SEND_WEEKLY_DIGEST => "发送周报".to_string(),
+ TASK_SEND_MONTHLY_DIGEST => "发送月报".to_string(),
+ _ => worker_name.to_string(),
+ }
+}
+
+fn description_for(worker_name: &str) -> String {
+ match worker_name {
+ WORKER_DOWNLOAD_MEDIA => "抓取远程图片 / PDF 到媒体库,并回写媒体元数据。".to_string(),
+ WORKER_NOTIFICATION_DELIVERY => "执行订阅通知、测试通知与 digest 投递。".to_string(),
+ TASK_RETRY_DELIVERIES => "扫描 retry_pending 的通知记录并重新入队。".to_string(),
+ TASK_SEND_WEEKLY_DIGEST => "根据近期内容生成周报,并为活跃订阅目标入队。".to_string(),
+ TASK_SEND_MONTHLY_DIGEST => "根据近期内容生成月报,并为活跃订阅目标入队。".to_string(),
+ _ => "后台异步任务。".to_string(),
+ }
+}
+
+fn tags_for(worker_name: &str) -> Value {
+ match worker_name {
+ WORKER_DOWNLOAD_MEDIA => json!(["media", "download"]),
+ WORKER_NOTIFICATION_DELIVERY => json!(["notifications", "delivery"]),
+ TASK_RETRY_DELIVERIES => json!(["maintenance", "retry"]),
+ TASK_SEND_WEEKLY_DIGEST => json!(["digest", "weekly"]),
+ TASK_SEND_MONTHLY_DIGEST => json!(["digest", "monthly"]),
+ _ => json!([]),
+ }
+}
+
+fn can_cancel_status(status: &str, cancel_requested: bool) -> bool {
+ !cancel_requested && matches!(status, JOB_STATUS_QUEUED | JOB_STATUS_RUNNING)
+}
+
+fn can_retry_status(status: &str) -> bool {
+ matches!(status, JOB_STATUS_FAILED | JOB_STATUS_CANCELLED | JOB_STATUS_SUCCEEDED)
+}
+
+fn to_job_record(item: worker_jobs::Model) -> WorkerJobRecord {
+ WorkerJobRecord {
+ created_at: item.created_at.to_rfc3339(),
+ updated_at: item.updated_at.to_rfc3339(),
+ id: item.id,
+ parent_job_id: item.parent_job_id,
+ job_kind: item.job_kind,
+ worker_name: item.worker_name,
+ display_name: item.display_name,
+ status: item.status.clone(),
+ queue_name: item.queue_name,
+ requested_by: item.requested_by,
+ requested_source: item.requested_source,
+ trigger_mode: item.trigger_mode,
+ payload: item.payload,
+ result: item.result,
+ error_text: item.error_text,
+ tags: item.tags,
+ related_entity_type: item.related_entity_type,
+ related_entity_id: item.related_entity_id,
+ attempts_count: item.attempts_count,
+ max_attempts: item.max_attempts,
+ cancel_requested: item.cancel_requested,
+ queued_at: item.queued_at,
+ started_at: item.started_at,
+ finished_at: item.finished_at,
+ can_cancel: can_cancel_status(&item.status, item.cancel_requested),
+ can_retry: can_retry_status(&item.status),
+ }
+}
+
+fn catalog_entries() -> Vec {
+ [
+ (WORKER_DOWNLOAD_MEDIA, JOB_KIND_WORKER, true, true),
+ (WORKER_NOTIFICATION_DELIVERY, JOB_KIND_WORKER, true, true),
+ (TASK_RETRY_DELIVERIES, JOB_KIND_TASK, true, true),
+ (TASK_SEND_WEEKLY_DIGEST, JOB_KIND_TASK, true, true),
+ (TASK_SEND_MONTHLY_DIGEST, JOB_KIND_TASK, true, true),
+ ]
+ .into_iter()
+ .map(|(worker_name, job_kind, supports_cancel, supports_retry)| WorkerCatalogEntry {
+ worker_name: worker_name.to_string(),
+ job_kind: job_kind.to_string(),
+ label: label_for(worker_name),
+ description: description_for(worker_name),
+ queue_name: queue_name_for(worker_name),
+ supports_cancel,
+ supports_retry,
+ })
+ .collect()
+}
+
+async fn create_job(ctx: &AppContext, input: CreateWorkerJobInput) -> Result {
+ Ok(worker_jobs::ActiveModel {
+ parent_job_id: Set(input.parent_job_id),
+ job_kind: Set(input.job_kind),
+ worker_name: Set(input.worker_name),
+ display_name: Set(trim_to_option(input.display_name)),
+ status: Set(JOB_STATUS_QUEUED.to_string()),
+ queue_name: Set(trim_to_option(input.queue_name)),
+ requested_by: Set(trim_to_option(input.requested_by)),
+ requested_source: Set(trim_to_option(input.requested_source)),
+ trigger_mode: Set(trim_to_option(input.trigger_mode)),
+ payload: Set(input.payload),
+ result: Set(None),
+ error_text: Set(None),
+ tags: Set(input.tags),
+ related_entity_type: Set(trim_to_option(input.related_entity_type)),
+ related_entity_id: Set(trim_to_option(input.related_entity_id)),
+ attempts_count: Set(0),
+ max_attempts: Set(input.max_attempts.max(1)),
+ cancel_requested: Set(false),
+ queued_at: Set(Some(now_rfc3339())),
+ started_at: Set(None),
+ finished_at: Set(None),
+ ..Default::default()
+ }
+ .insert(&ctx.db)
+ .await?)
+}
+
+async fn find_job(ctx: &AppContext, id: i32) -> Result {
+ worker_jobs::Entity::find_by_id(id)
+ .one(&ctx.db)
+ .await?
+ .ok_or(Error::NotFound)
+}
+
+async fn dispatch_download(args_ctx: AppContext, args: DownloadWorkerArgs) {
+ let worker = DownloadWorker::build(&args_ctx);
+ if let Err(error) = worker.perform(args).await {
+ tracing::warn!("download worker execution failed: {error}");
+ }
+}
+
+async fn dispatch_notification_delivery(args_ctx: AppContext, args: NotificationDeliveryWorkerArgs) {
+ let worker = NotificationDeliveryWorker::build(&args_ctx);
+ if let Err(error) = worker.perform(args).await {
+ tracing::warn!("notification delivery worker execution failed: {error}");
+ }
+}
+
+async fn enqueue_download_worker(ctx: &AppContext, args: DownloadWorkerArgs) -> Result<()> {
+ match DownloadWorker::perform_later(ctx, args.clone()).await {
+ Ok(_) => Ok(()),
+ Err(Error::QueueProviderMissing) => {
+ tokio::spawn(dispatch_download(ctx.clone(), args));
+ Ok(())
+ }
+ Err(error) => {
+ tracing::warn!("download worker queue unavailable, falling back to local task: {error}");
+ tokio::spawn(dispatch_download(ctx.clone(), args));
+ Ok(())
+ }
+ }
+}
+
+async fn enqueue_notification_worker(
+ ctx: &AppContext,
+ args: NotificationDeliveryWorkerArgs,
+) -> Result<()> {
+ match NotificationDeliveryWorker::perform_later(ctx, args.clone()).await {
+ Ok(_) => Ok(()),
+ Err(Error::QueueProviderMissing) => {
+ tokio::spawn(dispatch_notification_delivery(ctx.clone(), args));
+ Ok(())
+ }
+ Err(error) => {
+ tracing::warn!("notification worker queue unavailable, falling back to local task: {error}");
+ tokio::spawn(dispatch_notification_delivery(ctx.clone(), args));
+ Ok(())
+ }
+ }
+}
+
+async fn run_retry_deliveries_task(ctx: AppContext, job_id: i32, limit: Option) {
+ match begin_job_execution(&ctx, job_id).await {
+ Ok(true) => {}
+ Ok(false) => return,
+ Err(error) => {
+ tracing::warn!("failed to start retry deliveries job #{job_id}: {error}");
+ return;
+ }
+ }
+
+ let result = async {
+ let effective_limit = limit.unwrap_or(60);
+ let queued = subscriptions::retry_due_deliveries(&ctx, effective_limit).await?;
+ mark_job_succeeded(
+ &ctx,
+ job_id,
+ Some(json!({
+ "limit": effective_limit,
+ "queued": queued,
+ })),
+ )
+ .await
+ }
+ .await;
+
+ if let Err(error) = result {
+ let _ = mark_job_failed(&ctx, job_id, error.to_string()).await;
+ }
+}
+
+async fn run_digest_task(ctx: AppContext, job_id: i32, period: String) {
+ match begin_job_execution(&ctx, job_id).await {
+ Ok(true) => {}
+ Ok(false) => return,
+ Err(error) => {
+ tracing::warn!("failed to start digest job #{job_id}: {error}");
+ return;
+ }
+ }
+
+ let result = async {
+ let summary = subscriptions::send_digest(&ctx, &period).await?;
+ mark_job_succeeded(
+ &ctx,
+ job_id,
+ Some(json!({
+ "period": summary.period,
+ "post_count": summary.post_count,
+ "queued": summary.queued,
+ "skipped": summary.skipped,
+ })),
+ )
+ .await
+ }
+ .await;
+
+ if let Err(error) = result {
+ let _ = mark_job_failed(&ctx, job_id, error.to_string()).await;
+ }
+}
+
+pub async fn get_overview(ctx: &AppContext) -> Result {
+ let items = worker_jobs::Entity::find()
+ .order_by(worker_jobs::Column::CreatedAt, Order::Desc)
+ .all(&ctx.db)
+ .await?;
+
+ let mut overview = WorkerOverview {
+ total_jobs: items.len(),
+ queued: 0,
+ running: 0,
+ succeeded: 0,
+ failed: 0,
+ cancelled: 0,
+ active_jobs: 0,
+ worker_stats: Vec::new(),
+ catalog: catalog_entries(),
+ };
+
+ let mut grouped = std::collections::BTreeMap::::new();
+
+ for item in items {
+ match item.status.as_str() {
+ JOB_STATUS_QUEUED => overview.queued += 1,
+ JOB_STATUS_RUNNING => overview.running += 1,
+ JOB_STATUS_SUCCEEDED => overview.succeeded += 1,
+ JOB_STATUS_FAILED => overview.failed += 1,
+ JOB_STATUS_CANCELLED => overview.cancelled += 1,
+ _ => {}
+ }
+
+ let entry = grouped.entry(item.worker_name.clone()).or_insert_with(|| WorkerStats {
+ worker_name: item.worker_name.clone(),
+ job_kind: item.job_kind.clone(),
+ label: label_for(&item.worker_name),
+ queued: 0,
+ running: 0,
+ succeeded: 0,
+ failed: 0,
+ cancelled: 0,
+ last_job_at: None,
+ });
+
+ match item.status.as_str() {
+ JOB_STATUS_QUEUED => entry.queued += 1,
+ JOB_STATUS_RUNNING => entry.running += 1,
+ JOB_STATUS_SUCCEEDED => entry.succeeded += 1,
+ JOB_STATUS_FAILED => entry.failed += 1,
+ JOB_STATUS_CANCELLED => entry.cancelled += 1,
+ _ => {}
+ }
+ if entry.last_job_at.is_none() {
+ entry.last_job_at = Some(item.created_at.to_rfc3339());
+ }
+ }
+
+ overview.active_jobs = overview.queued + overview.running;
+ overview.worker_stats = grouped.into_values().collect();
+ Ok(overview)
+}
+
+pub async fn list_jobs(ctx: &AppContext, query: WorkerJobListQuery) -> Result {
+ let mut db_query = worker_jobs::Entity::find().order_by(worker_jobs::Column::CreatedAt, Order::Desc);
+
+ if let Some(status) = query.status.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
+ db_query = db_query.filter(worker_jobs::Column::Status.eq(status));
+ }
+ if let Some(job_kind) = query.job_kind.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
+ db_query = db_query.filter(worker_jobs::Column::JobKind.eq(job_kind));
+ }
+ if let Some(worker_name) = query.worker_name.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
+ db_query = db_query.filter(worker_jobs::Column::WorkerName.eq(worker_name));
+ }
+ if let Some(search) = query.search.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
+ db_query = db_query.filter(
+ Condition::any()
+ .add(worker_jobs::Column::WorkerName.contains(search.clone()))
+ .add(worker_jobs::Column::DisplayName.contains(search.clone()))
+ .add(worker_jobs::Column::RelatedEntityId.contains(search.clone()))
+ .add(worker_jobs::Column::RelatedEntityType.contains(search)),
+ );
+ }
+
+ let total = db_query.clone().count(&ctx.db).await?;
+ let limit = query.limit.unwrap_or(120);
+ let items = db_query.limit(limit).all(&ctx.db).await?;
+
+ Ok(WorkerJobListResult {
+ total,
+ jobs: items.into_iter().map(to_job_record).collect(),
+ })
+}
+
+pub async fn get_job_record(ctx: &AppContext, id: i32) -> Result {
+ Ok(to_job_record(find_job(ctx, id).await?))
+}
+
+pub async fn find_latest_job_by_related_entity(
+ ctx: &AppContext,
+ related_entity_type: &str,
+ related_entity_id: &str,
+ worker_name: Option<&str>,
+) -> Result