import { Activity, AlertTriangle, CheckCircle2, Clock3, Filter, PlayCircle, RefreshCcw, RotateCcw, Send, SquareTerminal, StopCircle, TimerReset, Workflow, } from "lucide-react"; import { startTransition, useCallback, useEffect, useMemo, useState, } from "react"; import { useSearchParams } from "react-router-dom"; import { toast } from "sonner"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { Card, CardContent, CardDescription, CardHeader, CardTitle, } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; import { Select } from "@/components/ui/select"; import { Skeleton } from "@/components/ui/skeleton"; import { Textarea } from "@/components/ui/textarea"; import { adminApi, ApiError } from "@/lib/api"; import { formatDateTime } from "@/lib/admin-format"; import { formatWorkerProgress, getWorkerProgressPercent, } from "@/lib/worker-progress"; import type { WorkerJobRecord, WorkerOverview, WorkerStats } from "@/lib/types"; import { cn } from "@/lib/utils"; function prettyJson(value: unknown) { if (value === null || value === undefined) { return "—"; } try { return JSON.stringify(value, null, 2); } catch { return String(value); } } function statusVariant(status: string) { switch (status) { case "succeeded": return "success" as const; case "running": return "default" as const; case "queued": return "secondary" as const; case "failed": return "danger" as const; case "cancelled": return "warning" as const; default: return "outline" as const; } } function summaryIconTone(status: string) { switch (status) { case "running": return "border-blue-500/20 bg-blue-500/10 text-blue-600"; case "succeeded": return "border-emerald-500/20 bg-emerald-500/10 text-emerald-600"; case "failed": return "border-rose-500/20 bg-rose-500/10 text-rose-600"; case "queued": return "border-sky-500/20 bg-sky-500/10 text-sky-600"; default: return "border-primary/20 bg-primary/10 text-primary"; } } function workerStatTotal(item: WorkerStats) { return ( item.queued + item.running + item.succeeded + item.failed + item.cancelled ); } function workerHealthLabel(item: WorkerStats) { if (item.failed > 0) { return "有失败"; } if (item.running > 0) { return "执行中"; } if (item.queued > 0) { return "排队中"; } return "稳定"; } function workerHealthTone(item: WorkerStats) { if (item.failed > 0) { return "text-rose-600"; } if (item.running > 0) { return "text-blue-600"; } if (item.queued > 0) { return "text-sky-600"; } return "text-emerald-600"; } function relatedEntityText(job: WorkerJobRecord) { if (!job.related_entity_type || !job.related_entity_id) { return "—"; } return `${job.related_entity_type}:${job.related_entity_id}`; } function jobTitle(job: WorkerJobRecord) { return job.display_name ?? job.worker_name; } function SummaryCard({ label, value, hint, icon: Icon, tone, }: { label: string; value: string | number; hint: string; icon: typeof SquareTerminal; tone: string; }) { return (
{label}
{value}
{hint}
); } const EMPTY_OVERVIEW: WorkerOverview = { total_jobs: 0, queued: 0, running: 0, succeeded: 0, failed: 0, cancelled: 0, active_jobs: 0, worker_stats: [], catalog: [], }; export function WorkersPage() { const [searchParams] = useSearchParams(); const [overview, setOverview] = useState(EMPTY_OVERVIEW); const [jobs, setJobs] = useState([]); const [total, setTotal] = useState(0); const [loading, setLoading] = useState(true); const [refreshing, setRefreshing] = useState(false); const [actioning, setActioning] = useState(null); const [selectedJobId, setSelectedJobId] = useState(null); const [statusFilter, setStatusFilter] = useState( searchParams.get("status") || "all", ); const [kindFilter, setKindFilter] = useState( searchParams.get("kind") || "all", ); const [workerFilter, setWorkerFilter] = useState( searchParams.get("worker") || "all", ); const [search, setSearch] = useState(searchParams.get("search") || ""); const requestedJobId = useMemo(() => { const raw = Number.parseInt(searchParams.get("job") || "", 10); return Number.isFinite(raw) ? raw : null; }, [searchParams]); useEffect(() => { setStatusFilter(searchParams.get("status") || "all"); setKindFilter(searchParams.get("kind") || "all"); setWorkerFilter(searchParams.get("worker") || "all"); setSearch(searchParams.get("search") || ""); }, [searchParams]); const loadData = useCallback( async (showToast = false) => { try { if (showToast) { setRefreshing(true); } const [nextOverview, nextJobs] = await Promise.all([ adminApi.getWorkersOverview(), adminApi.listWorkerJobs({ status: statusFilter === "all" ? undefined : statusFilter, jobKind: kindFilter === "all" ? undefined : kindFilter, workerName: workerFilter === "all" ? undefined : workerFilter, search: search.trim() || undefined, limit: 120, }), ]); let nextJobList = nextJobs.jobs; if ( requestedJobId && !nextJobList.some((item) => item.id === requestedJobId) ) { try { const requestedJob = await adminApi.getWorkerJob(requestedJobId); nextJobList = [requestedJob, ...nextJobList]; } catch { // ignore deep-link miss and keep current list } } startTransition(() => { setOverview(nextOverview); setJobs(nextJobList); setTotal(nextJobs.total); }); if (showToast) { toast.success("Worker 面板已刷新。"); } } catch (error) { toast.error( error instanceof ApiError ? error.message : "无法加载 worker 面板。", ); } finally { setLoading(false); setRefreshing(false); } }, [kindFilter, requestedJobId, search, statusFilter, workerFilter], ); useEffect(() => { void loadData(false); }, [loadData]); useEffect(() => { const timer = window.setInterval(() => { void loadData(false); }, 5000); return () => window.clearInterval(timer); }, [loadData]); useEffect(() => { setSelectedJobId((current) => { if (requestedJobId) { return requestedJobId; } if (current && jobs.some((item) => item.id === current)) { return current; } return jobs[0]?.id ?? null; }); }, [jobs, requestedJobId]); const selectedJob = useMemo( () => jobs.find((item) => item.id === selectedJobId) ?? null, [jobs, selectedJobId], ); const selectedWorkerLabel = useMemo(() => { if (workerFilter === "all") { return null; } return ( overview.catalog.find((item) => item.worker_name === workerFilter) ?.label ?? workerFilter ); }, [overview.catalog, workerFilter]); const selectedProgressText = selectedJob ? formatWorkerProgress(selectedJob) : null; const selectedProgressPercent = selectedJob ? getWorkerProgressPercent(selectedJob) : null; const runTask = useCallback( async (task: "weekly" | "monthly" | "retry") => { try { setActioning(task); const result = task === "retry" ? await adminApi.runRetryDeliveriesWorker(80) : await adminApi.runDigestWorker(task); toast.success( `已入队:#${result.job.id} ${result.job.display_name ?? result.job.worker_name}`, ); await loadData(false); setSelectedJobId(result.job.id); } catch (error) { toast.error( error instanceof ApiError ? error.message : "任务入队失败。", ); } finally { setActioning(null); } }, [loadData], ); const workerOptions = overview.catalog.map((item) => ({ value: item.worker_name, label: item.label, })); if (loading) { return (
); } return (
Workers / Queue 每 5 秒自动刷新 {selectedWorkerLabel ? ( 当前聚焦 {selectedWorkerLabel} ) : null}

异步任务控制台

统一查看队列堆积、执行进度和失败任务。把手动调度、失败排查和运行状态都收在同一页里。

0 ? "failed" : "succeeded")} /> item.can_retry || item.can_cancel).length}`} hint="可取消或重跑" icon={PlayCircle} tone={summaryIconTone("queued")} />
快捷操作
保留手动调度入口,但不再做成一整块独立视觉。
Worker 视图 每张卡代表一个 worker,优先显示失败、排队和最近一次活动。
点击任意卡片即可快速锁定该 worker。
{overview.worker_stats.length ? (
{overview.worker_stats.map((item) => { const totalJobs = Math.max(workerStatTotal(item), 1); const isSelected = workerFilter === item.worker_name; return ( ); })}
) : (
还没有可展示的 worker 统计。
)}
任务流 当前筛选后共 {total} 条,列表保留最近 120 条记录。
{statusFilter !== "all" ? ( {statusFilter} ) : null} {kindFilter !== "all" ? ( {kindFilter} ) : null} {selectedWorkerLabel ? ( {selectedWorkerLabel} ) : null}
setSearch(event.target.value)} />
{jobs.length ? (
{jobs.map((item) => { const progressText = formatWorkerProgress(item); const progressPercent = getWorkerProgressPercent(item); const isSelected = selectedJobId === item.id; return ( ); })}
) : (
当前筛选没有匹配任务。
)}
任务详情 选中左侧任务后,这里展示上下文、进度和原始数据。
{selectedJob ? (
#{selectedJob.id} {selectedJob.status} {selectedJob.job_kind}
{jobTitle(selectedJob)}
{selectedJob.worker_name}
{selectedProgressText ? (
执行进度 {selectedProgressPercent !== null ? ( {selectedProgressPercent}% ) : null}
{selectedProgressPercent !== null ? (
) : null}
{selectedProgressText}
) : null}
{[ ["请求人", selectedJob.requested_by ?? "system"], ["来源", selectedJob.requested_source ?? "system"], ["关联实体", relatedEntityText(selectedJob)], [ "尝试次数", `${selectedJob.attempts_count} / ${selectedJob.max_attempts}`, ], [ "排队时间", formatDateTime( selectedJob.queued_at ?? selectedJob.created_at, ), ], [ "开始时间", selectedJob.started_at ? formatDateTime(selectedJob.started_at) : "—", ], [ "完成时间", selectedJob.finished_at ? formatDateTime(selectedJob.finished_at) : "—", ], [ "上游任务", selectedJob.parent_job_id ? `#${selectedJob.parent_job_id}` : "—", ], ].map(([label, value]) => (
{label}
{value}
))}
Payload