use chrono::Utc; use loco_rs::{bgworker::BackgroundWorker, prelude::*}; use sea_orm::{ ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, Order, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Set, }; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use crate::{ models::_entities::{notification_deliveries, worker_jobs}, services::subscriptions, workers::{ ai_reindex::{AiReindexWorker, AiReindexWorkerArgs}, downloader::{DownloadWorker, DownloadWorkerArgs}, notification_delivery::{NotificationDeliveryWorker, NotificationDeliveryWorkerArgs}, }, }; pub const JOB_KIND_WORKER: &str = "worker"; pub const JOB_KIND_TASK: &str = "task"; pub const JOB_STATUS_QUEUED: &str = "queued"; pub const JOB_STATUS_RUNNING: &str = "running"; pub const JOB_STATUS_SUCCEEDED: &str = "succeeded"; pub const JOB_STATUS_FAILED: &str = "failed"; pub const JOB_STATUS_CANCELLED: &str = "cancelled"; pub const WORKER_DOWNLOAD_MEDIA: &str = "worker.download_media"; pub const WORKER_NOTIFICATION_DELIVERY: &str = "worker.notification_delivery"; pub const WORKER_AI_REINDEX: &str = "worker.ai_reindex"; pub const TASK_RETRY_DELIVERIES: &str = "task.retry_deliveries"; pub const TASK_SEND_WEEKLY_DIGEST: &str = "task.send_weekly_digest"; pub const TASK_SEND_MONTHLY_DIGEST: &str = "task.send_monthly_digest"; #[derive(Clone, Debug, Default)] pub struct WorkerJobListQuery { pub status: Option, pub job_kind: Option, pub worker_name: Option, pub search: Option, pub limit: Option, } #[derive(Clone, Debug, Serialize)] pub struct WorkerCatalogEntry { pub worker_name: String, pub job_kind: String, pub label: String, pub description: String, pub queue_name: Option, pub supports_cancel: bool, pub supports_retry: bool, } #[derive(Clone, Debug, Serialize)] pub struct WorkerStats { pub worker_name: String, pub job_kind: String, pub label: String, pub queued: usize, pub running: usize, pub succeeded: usize, pub failed: usize, pub cancelled: usize, pub last_job_at: Option, } #[derive(Clone, Debug, Serialize)] pub struct WorkerOverview { pub total_jobs: usize, pub queued: usize, pub running: usize, pub succeeded: usize, pub failed: usize, pub cancelled: usize, pub active_jobs: usize, pub worker_stats: Vec, pub catalog: Vec, } #[derive(Clone, Debug, Serialize)] pub struct WorkerJobRecord { pub created_at: String, pub updated_at: String, pub id: i32, pub parent_job_id: Option, pub job_kind: String, pub worker_name: String, pub display_name: Option, pub status: String, pub queue_name: Option, pub requested_by: Option, pub requested_source: Option, pub trigger_mode: Option, pub payload: Option, pub result: Option, pub error_text: Option, pub tags: Option, pub related_entity_type: Option, pub related_entity_id: Option, pub attempts_count: i32, pub max_attempts: i32, pub cancel_requested: bool, pub queued_at: Option, pub started_at: Option, pub finished_at: Option, pub can_cancel: bool, pub can_retry: bool, } #[derive(Clone, Debug, Serialize)] pub struct WorkerJobListResult { pub total: u64, pub jobs: Vec, } #[derive(Clone, Debug, Serialize)] pub struct WorkerTaskDispatchResult { pub queued: bool, pub job: WorkerJobRecord, } #[derive(Clone, Debug)] struct CreateWorkerJobInput { parent_job_id: Option, job_kind: String, worker_name: String, display_name: Option, queue_name: Option, requested_by: Option, requested_source: Option, trigger_mode: Option, payload: Option, tags: Option, related_entity_type: Option, related_entity_id: Option, max_attempts: i32, } #[derive(Clone, Debug, Deserialize, Serialize)] struct RetryDeliveriesTaskPayload { #[serde(default)] limit: Option, } #[derive(Clone, Debug, Deserialize, Serialize)] struct DigestTaskPayload { period: String, } fn now_rfc3339() -> String { Utc::now().to_rfc3339() } fn trim_to_option(value: Option) -> Option { value.and_then(|item| { let trimmed = item.trim().to_string(); if trimmed.is_empty() { None } else { Some(trimmed) } }) } fn queue_name_for(worker_name: &str) -> Option { match worker_name { WORKER_AI_REINDEX => Some("ai".to_string()), WORKER_DOWNLOAD_MEDIA => Some("media".to_string()), WORKER_NOTIFICATION_DELIVERY => Some("notifications".to_string()), TASK_RETRY_DELIVERIES => Some("maintenance".to_string()), TASK_SEND_WEEKLY_DIGEST | TASK_SEND_MONTHLY_DIGEST => Some("digests".to_string()), _ => None, } } fn label_for(worker_name: &str) -> String { match worker_name { WORKER_AI_REINDEX => "AI 索引重建".to_string(), WORKER_DOWNLOAD_MEDIA => "远程媒体下载".to_string(), WORKER_NOTIFICATION_DELIVERY => "通知投递".to_string(), TASK_RETRY_DELIVERIES => "重试待投递通知".to_string(), TASK_SEND_WEEKLY_DIGEST => "发送周报".to_string(), TASK_SEND_MONTHLY_DIGEST => "发送月报".to_string(), _ => worker_name.to_string(), } } fn description_for(worker_name: &str) -> String { match worker_name { WORKER_AI_REINDEX => "按当前站点内容重新生成 AI 检索索引,并分批写入向量数据。" .to_string(), WORKER_DOWNLOAD_MEDIA => "抓取远程图片 / PDF 到媒体库,并回写媒体元数据。".to_string(), WORKER_NOTIFICATION_DELIVERY => "执行订阅通知、测试通知与 digest 投递。".to_string(), TASK_RETRY_DELIVERIES => "扫描 retry_pending 的通知记录并重新入队。".to_string(), TASK_SEND_WEEKLY_DIGEST => "根据近期内容生成周报,并为活跃订阅目标入队。".to_string(), TASK_SEND_MONTHLY_DIGEST => "根据近期内容生成月报,并为活跃订阅目标入队。".to_string(), _ => "后台异步任务。".to_string(), } } fn tags_for(worker_name: &str) -> Value { match worker_name { WORKER_AI_REINDEX => json!(["ai", "reindex"]), WORKER_DOWNLOAD_MEDIA => json!(["media", "download"]), WORKER_NOTIFICATION_DELIVERY => json!(["notifications", "delivery"]), TASK_RETRY_DELIVERIES => json!(["maintenance", "retry"]), TASK_SEND_WEEKLY_DIGEST => json!(["digest", "weekly"]), TASK_SEND_MONTHLY_DIGEST => json!(["digest", "monthly"]), _ => json!([]), } } fn can_cancel_status(status: &str, cancel_requested: bool) -> bool { !cancel_requested && matches!(status, JOB_STATUS_QUEUED | JOB_STATUS_RUNNING) } fn can_retry_status(status: &str) -> bool { matches!( status, JOB_STATUS_FAILED | JOB_STATUS_CANCELLED | JOB_STATUS_SUCCEEDED ) } fn to_job_record(item: worker_jobs::Model) -> WorkerJobRecord { WorkerJobRecord { created_at: item.created_at.to_rfc3339(), updated_at: item.updated_at.to_rfc3339(), id: item.id, parent_job_id: item.parent_job_id, job_kind: item.job_kind, worker_name: item.worker_name, display_name: item.display_name, status: item.status.clone(), queue_name: item.queue_name, requested_by: item.requested_by, requested_source: item.requested_source, trigger_mode: item.trigger_mode, payload: item.payload, result: item.result, error_text: item.error_text, tags: item.tags, related_entity_type: item.related_entity_type, related_entity_id: item.related_entity_id, attempts_count: item.attempts_count, max_attempts: item.max_attempts, cancel_requested: item.cancel_requested, queued_at: item.queued_at, started_at: item.started_at, finished_at: item.finished_at, can_cancel: can_cancel_status(&item.status, item.cancel_requested), can_retry: can_retry_status(&item.status), } } fn catalog_entries() -> Vec { [ (WORKER_AI_REINDEX, JOB_KIND_WORKER, true, true), (WORKER_DOWNLOAD_MEDIA, JOB_KIND_WORKER, true, true), (WORKER_NOTIFICATION_DELIVERY, JOB_KIND_WORKER, true, true), (TASK_RETRY_DELIVERIES, JOB_KIND_TASK, true, true), (TASK_SEND_WEEKLY_DIGEST, JOB_KIND_TASK, true, true), (TASK_SEND_MONTHLY_DIGEST, JOB_KIND_TASK, true, true), ] .into_iter() .map( |(worker_name, job_kind, supports_cancel, supports_retry)| WorkerCatalogEntry { worker_name: worker_name.to_string(), job_kind: job_kind.to_string(), label: label_for(worker_name), description: description_for(worker_name), queue_name: queue_name_for(worker_name), supports_cancel, supports_retry, }, ) .collect() } async fn create_job(ctx: &AppContext, input: CreateWorkerJobInput) -> Result { Ok(worker_jobs::ActiveModel { parent_job_id: Set(input.parent_job_id), job_kind: Set(input.job_kind), worker_name: Set(input.worker_name), display_name: Set(trim_to_option(input.display_name)), status: Set(JOB_STATUS_QUEUED.to_string()), queue_name: Set(trim_to_option(input.queue_name)), requested_by: Set(trim_to_option(input.requested_by)), requested_source: Set(trim_to_option(input.requested_source)), trigger_mode: Set(trim_to_option(input.trigger_mode)), payload: Set(input.payload), result: Set(None), error_text: Set(None), tags: Set(input.tags), related_entity_type: Set(trim_to_option(input.related_entity_type)), related_entity_id: Set(trim_to_option(input.related_entity_id)), attempts_count: Set(0), max_attempts: Set(input.max_attempts.max(1)), cancel_requested: Set(false), queued_at: Set(Some(now_rfc3339())), started_at: Set(None), finished_at: Set(None), ..Default::default() } .insert(&ctx.db) .await?) } async fn find_job(ctx: &AppContext, id: i32) -> Result { worker_jobs::Entity::find_by_id(id) .one(&ctx.db) .await? .ok_or(Error::NotFound) } async fn dispatch_download(args_ctx: AppContext, args: DownloadWorkerArgs) { let worker = DownloadWorker::build(&args_ctx); if let Err(error) = worker.perform(args).await { tracing::warn!("download worker execution failed: {error}"); } } async fn dispatch_ai_reindex(args_ctx: AppContext, args: AiReindexWorkerArgs) { let worker = AiReindexWorker::build(&args_ctx); if let Err(error) = worker.perform(args).await { tracing::warn!("ai reindex worker execution failed: {error}"); } } async fn dispatch_notification_delivery( args_ctx: AppContext, args: NotificationDeliveryWorkerArgs, ) { let worker = NotificationDeliveryWorker::build(&args_ctx); if let Err(error) = worker.perform(args).await { tracing::warn!("notification delivery worker execution failed: {error}"); } } async fn enqueue_download_worker(ctx: &AppContext, args: DownloadWorkerArgs) -> Result<()> { match DownloadWorker::perform_later(ctx, args.clone()).await { Ok(_) => Ok(()), Err(Error::QueueProviderMissing) => { tokio::spawn(dispatch_download(ctx.clone(), args)); Ok(()) } Err(error) => { tracing::warn!( "download worker queue unavailable, falling back to local task: {error}" ); tokio::spawn(dispatch_download(ctx.clone(), args)); Ok(()) } } } async fn enqueue_ai_reindex_worker(ctx: &AppContext, args: AiReindexWorkerArgs) -> Result<()> { match AiReindexWorker::perform_later(ctx, args.clone()).await { Ok(_) => Ok(()), Err(Error::QueueProviderMissing) => { tokio::spawn(dispatch_ai_reindex(ctx.clone(), args)); Ok(()) } Err(error) => { tracing::warn!("ai reindex worker queue unavailable, falling back to local task: {error}"); tokio::spawn(dispatch_ai_reindex(ctx.clone(), args)); Ok(()) } } } async fn enqueue_notification_worker( ctx: &AppContext, args: NotificationDeliveryWorkerArgs, ) -> Result<()> { match NotificationDeliveryWorker::perform_later(ctx, args.clone()).await { Ok(_) => Ok(()), Err(Error::QueueProviderMissing) => { tokio::spawn(dispatch_notification_delivery(ctx.clone(), args)); Ok(()) } Err(error) => { tracing::warn!( "notification worker queue unavailable, falling back to local task: {error}" ); tokio::spawn(dispatch_notification_delivery(ctx.clone(), args)); Ok(()) } } } async fn run_retry_deliveries_task(ctx: AppContext, job_id: i32, limit: Option) { match begin_job_execution(&ctx, job_id).await { Ok(true) => {} Ok(false) => return, Err(error) => { tracing::warn!("failed to start retry deliveries job #{job_id}: {error}"); return; } } let result = async { let effective_limit = limit.unwrap_or(60); let queued = subscriptions::retry_due_deliveries(&ctx, effective_limit).await?; mark_job_succeeded( &ctx, job_id, Some(json!({ "limit": effective_limit, "queued": queued, })), ) .await } .await; if let Err(error) = result { let _ = mark_job_failed(&ctx, job_id, error.to_string()).await; } } async fn run_digest_task(ctx: AppContext, job_id: i32, period: String) { match begin_job_execution(&ctx, job_id).await { Ok(true) => {} Ok(false) => return, Err(error) => { tracing::warn!("failed to start digest job #{job_id}: {error}"); return; } } let result = async { let summary = subscriptions::send_digest(&ctx, &period).await?; mark_job_succeeded( &ctx, job_id, Some(json!({ "period": summary.period, "post_count": summary.post_count, "queued": summary.queued, "skipped": summary.skipped, })), ) .await } .await; if let Err(error) = result { let _ = mark_job_failed(&ctx, job_id, error.to_string()).await; } } pub async fn get_overview(ctx: &AppContext) -> Result { let items = worker_jobs::Entity::find() .order_by(worker_jobs::Column::CreatedAt, Order::Desc) .all(&ctx.db) .await?; let mut overview = WorkerOverview { total_jobs: items.len(), queued: 0, running: 0, succeeded: 0, failed: 0, cancelled: 0, active_jobs: 0, worker_stats: Vec::new(), catalog: catalog_entries(), }; let mut grouped = std::collections::BTreeMap::::new(); for item in items { match item.status.as_str() { JOB_STATUS_QUEUED => overview.queued += 1, JOB_STATUS_RUNNING => overview.running += 1, JOB_STATUS_SUCCEEDED => overview.succeeded += 1, JOB_STATUS_FAILED => overview.failed += 1, JOB_STATUS_CANCELLED => overview.cancelled += 1, _ => {} } let entry = grouped .entry(item.worker_name.clone()) .or_insert_with(|| WorkerStats { worker_name: item.worker_name.clone(), job_kind: item.job_kind.clone(), label: label_for(&item.worker_name), queued: 0, running: 0, succeeded: 0, failed: 0, cancelled: 0, last_job_at: None, }); match item.status.as_str() { JOB_STATUS_QUEUED => entry.queued += 1, JOB_STATUS_RUNNING => entry.running += 1, JOB_STATUS_SUCCEEDED => entry.succeeded += 1, JOB_STATUS_FAILED => entry.failed += 1, JOB_STATUS_CANCELLED => entry.cancelled += 1, _ => {} } if entry.last_job_at.is_none() { entry.last_job_at = Some(item.created_at.to_rfc3339()); } } overview.active_jobs = overview.queued + overview.running; overview.worker_stats = grouped.into_values().collect(); Ok(overview) } pub async fn list_jobs(ctx: &AppContext, query: WorkerJobListQuery) -> Result { let mut db_query = worker_jobs::Entity::find().order_by(worker_jobs::Column::CreatedAt, Order::Desc); if let Some(status) = query .status .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) { db_query = db_query.filter(worker_jobs::Column::Status.eq(status)); } if let Some(job_kind) = query .job_kind .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) { db_query = db_query.filter(worker_jobs::Column::JobKind.eq(job_kind)); } if let Some(worker_name) = query .worker_name .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) { db_query = db_query.filter(worker_jobs::Column::WorkerName.eq(worker_name)); } if let Some(search) = query .search .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) { db_query = db_query.filter( Condition::any() .add(worker_jobs::Column::WorkerName.contains(search.clone())) .add(worker_jobs::Column::DisplayName.contains(search.clone())) .add(worker_jobs::Column::RelatedEntityId.contains(search.clone())) .add(worker_jobs::Column::RelatedEntityType.contains(search)), ); } let total = db_query.clone().count(&ctx.db).await?; let limit = query.limit.unwrap_or(120); let items = db_query.limit(limit).all(&ctx.db).await?; Ok(WorkerJobListResult { total, jobs: items.into_iter().map(to_job_record).collect(), }) } pub async fn get_job_record(ctx: &AppContext, id: i32) -> Result { Ok(to_job_record(find_job(ctx, id).await?)) } pub async fn find_latest_job_by_related_entity( ctx: &AppContext, related_entity_type: &str, related_entity_id: &str, worker_name: Option<&str>, ) -> Result> { let mut query = worker_jobs::Entity::find() .filter(worker_jobs::Column::RelatedEntityType.eq(related_entity_type.to_string())) .filter(worker_jobs::Column::RelatedEntityId.eq(related_entity_id.to_string())) .order_by(worker_jobs::Column::CreatedAt, Order::Desc); if let Some(worker_name) = worker_name.map(str::trim).filter(|value| !value.is_empty()) { query = query.filter(worker_jobs::Column::WorkerName.eq(worker_name.to_string())); } Ok(query.one(&ctx.db).await?.map(to_job_record)) } pub async fn begin_job_execution(ctx: &AppContext, id: i32) -> Result { let item = find_job(ctx, id).await?; if item.status == JOB_STATUS_CANCELLED { return Ok(false); } if item.cancel_requested { finish_job_cancelled(ctx, id, Some("job cancelled before execution".to_string())).await?; return Ok(false); } let attempts_count = item.attempts_count + 1; let mut active = item.into_active_model(); active.status = Set(JOB_STATUS_RUNNING.to_string()); active.started_at = Set(Some(now_rfc3339())); active.finished_at = Set(None); active.error_text = Set(None); active.result = Set(None); active.attempts_count = Set(attempts_count); let _ = active.update(&ctx.db).await?; Ok(true) } pub async fn mark_job_succeeded(ctx: &AppContext, id: i32, result: Option) -> Result<()> { let item = find_job(ctx, id).await?; let mut active = item.into_active_model(); active.status = Set(JOB_STATUS_SUCCEEDED.to_string()); active.result = Set(result); active.error_text = Set(None); active.finished_at = Set(Some(now_rfc3339())); active.update(&ctx.db).await?; Ok(()) } pub async fn mark_job_failed(ctx: &AppContext, id: i32, error_text: String) -> Result<()> { let item = find_job(ctx, id).await?; let mut active = item.into_active_model(); active.status = Set(JOB_STATUS_FAILED.to_string()); active.error_text = Set(Some(error_text)); active.finished_at = Set(Some(now_rfc3339())); active.update(&ctx.db).await?; Ok(()) } pub async fn finish_job_cancelled( ctx: &AppContext, id: i32, error_text: Option, ) -> Result<()> { let item = find_job(ctx, id).await?; let mut active = item.into_active_model(); active.status = Set(JOB_STATUS_CANCELLED.to_string()); active.cancel_requested = Set(true); active.finished_at = Set(Some(now_rfc3339())); if error_text.is_some() { active.error_text = Set(error_text); } active.update(&ctx.db).await?; Ok(()) } pub async fn request_cancel(ctx: &AppContext, id: i32) -> Result { let item = find_job(ctx, id).await?; let mut active = item.clone().into_active_model(); active.cancel_requested = Set(true); if item.status == JOB_STATUS_QUEUED { active.status = Set(JOB_STATUS_CANCELLED.to_string()); active.finished_at = Set(Some(now_rfc3339())); active.error_text = Set(Some("job cancelled before start".to_string())); } let updated = active.update(&ctx.db).await?; Ok(to_job_record(updated)) } pub async fn queue_download_job( ctx: &AppContext, args: &DownloadWorkerArgs, requested_by: Option, requested_source: Option, parent_job_id: Option, trigger_mode: Option, ) -> Result { let payload = serde_json::to_value(args)?; let job = create_job( ctx, CreateWorkerJobInput { parent_job_id, job_kind: JOB_KIND_WORKER.to_string(), worker_name: WORKER_DOWNLOAD_MEDIA.to_string(), display_name: Some( args.title .clone() .filter(|value| !value.trim().is_empty()) .unwrap_or_else(|| format!("download {}", args.source_url)), ), queue_name: queue_name_for(WORKER_DOWNLOAD_MEDIA), requested_by, requested_source, trigger_mode, payload: Some(payload), tags: Some(tags_for(WORKER_DOWNLOAD_MEDIA)), related_entity_type: Some("media_download".to_string()), related_entity_id: Some(args.source_url.clone()), max_attempts: 1, }, ) .await?; let mut worker_args = args.clone(); worker_args.job_id = Some(job.id); enqueue_download_worker(ctx, worker_args).await?; get_job_record(ctx, job.id).await } pub async fn queue_notification_delivery_job( ctx: &AppContext, delivery_id: i32, requested_by: Option, requested_source: Option, parent_job_id: Option, trigger_mode: Option, ) -> Result { let delivery = notification_deliveries::Entity::find_by_id(delivery_id) .one(&ctx.db) .await? .ok_or(Error::NotFound)?; let base_args = NotificationDeliveryWorkerArgs { delivery_id, job_id: None, }; let payload = serde_json::to_value(&base_args)?; let display_name = format!("{} → {}", delivery.event_type, delivery.target); let job = create_job( ctx, CreateWorkerJobInput { parent_job_id, job_kind: JOB_KIND_WORKER.to_string(), worker_name: WORKER_NOTIFICATION_DELIVERY.to_string(), display_name: Some(display_name), queue_name: queue_name_for(WORKER_NOTIFICATION_DELIVERY), requested_by, requested_source, trigger_mode, payload: Some(payload), tags: Some(tags_for(WORKER_NOTIFICATION_DELIVERY)), related_entity_type: Some("notification_delivery".to_string()), related_entity_id: Some(delivery_id.to_string()), max_attempts: 1, }, ) .await?; let args = NotificationDeliveryWorkerArgs { delivery_id, job_id: Some(job.id), }; enqueue_notification_worker(ctx, args).await?; get_job_record(ctx, job.id).await } pub async fn queue_ai_reindex_job( ctx: &AppContext, requested_by: Option, requested_source: Option, parent_job_id: Option, trigger_mode: Option, ) -> Result { let base_args = AiReindexWorkerArgs { job_id: None }; let payload = serde_json::to_value(&base_args)?; let job = create_job( ctx, CreateWorkerJobInput { parent_job_id, job_kind: JOB_KIND_WORKER.to_string(), worker_name: WORKER_AI_REINDEX.to_string(), display_name: Some("重建 AI 索引".to_string()), queue_name: queue_name_for(WORKER_AI_REINDEX), requested_by, requested_source, trigger_mode, payload: Some(payload), tags: Some(tags_for(WORKER_AI_REINDEX)), related_entity_type: Some("ai_index".to_string()), related_entity_id: Some("site".to_string()), max_attempts: 1, }, ) .await?; enqueue_ai_reindex_worker( ctx, AiReindexWorkerArgs { job_id: Some(job.id), }, ) .await?; get_job_record(ctx, job.id).await } pub async fn spawn_retry_deliveries_task( ctx: &AppContext, limit: Option, requested_by: Option, requested_source: Option, parent_job_id: Option, trigger_mode: Option, ) -> Result { let payload = serde_json::to_value(RetryDeliveriesTaskPayload { limit })?; let job = create_job( ctx, CreateWorkerJobInput { parent_job_id, job_kind: JOB_KIND_TASK.to_string(), worker_name: TASK_RETRY_DELIVERIES.to_string(), display_name: Some("重试待投递通知".to_string()), queue_name: queue_name_for(TASK_RETRY_DELIVERIES), requested_by, requested_source, trigger_mode, payload: Some(payload), tags: Some(tags_for(TASK_RETRY_DELIVERIES)), related_entity_type: Some("notification_delivery".to_string()), related_entity_id: None, max_attempts: 1, }, ) .await?; tokio::spawn(run_retry_deliveries_task(ctx.clone(), job.id, limit)); get_job_record(ctx, job.id).await } pub async fn spawn_digest_task( ctx: &AppContext, period: &str, requested_by: Option, requested_source: Option, parent_job_id: Option, trigger_mode: Option, ) -> Result { let normalized_period = match period.trim().to_ascii_lowercase().as_str() { "monthly" => "monthly", _ => "weekly", } .to_string(); let payload = serde_json::to_value(DigestTaskPayload { period: normalized_period.clone(), })?; let worker_name = if normalized_period == "monthly" { TASK_SEND_MONTHLY_DIGEST } else { TASK_SEND_WEEKLY_DIGEST }; let job = create_job( ctx, CreateWorkerJobInput { parent_job_id, job_kind: JOB_KIND_TASK.to_string(), worker_name: worker_name.to_string(), display_name: Some(if normalized_period == "monthly" { "发送月报".to_string() } else { "发送周报".to_string() }), queue_name: queue_name_for(worker_name), requested_by, requested_source, trigger_mode, payload: Some(payload), tags: Some(tags_for(worker_name)), related_entity_type: Some("subscription_digest".to_string()), related_entity_id: Some(normalized_period.clone()), max_attempts: 1, }, ) .await?; tokio::spawn(run_digest_task(ctx.clone(), job.id, normalized_period)); get_job_record(ctx, job.id).await } pub async fn retry_job( ctx: &AppContext, id: i32, requested_by: Option, requested_source: Option, ) -> Result { let item = find_job(ctx, id).await?; let payload = item.payload.clone().unwrap_or(Value::Null); match item.worker_name.as_str() { WORKER_AI_REINDEX => { let _ = serde_json::from_value::(payload)?; queue_ai_reindex_job( ctx, requested_by, requested_source, Some(item.id), Some("retry".to_string()), ) .await } WORKER_DOWNLOAD_MEDIA => { let args = serde_json::from_value::(payload)?; queue_download_job( ctx, &args, requested_by, requested_source, Some(item.id), Some("retry".to_string()), ) .await } WORKER_NOTIFICATION_DELIVERY => { let args = serde_json::from_value::(payload)?; queue_notification_delivery_job( ctx, args.delivery_id, requested_by, requested_source, Some(item.id), Some("retry".to_string()), ) .await } TASK_RETRY_DELIVERIES => { let args = serde_json::from_value::(payload)?; spawn_retry_deliveries_task( ctx, args.limit, requested_by, requested_source, Some(item.id), Some("retry".to_string()), ) .await } TASK_SEND_WEEKLY_DIGEST | TASK_SEND_MONTHLY_DIGEST => { let args = serde_json::from_value::(payload)?; spawn_digest_task( ctx, &args.period, requested_by, requested_source, Some(item.id), Some("retry".to_string()), ) .await } _ => Err(Error::BadRequest(format!( "不支持重试任务:{}", item.worker_name ))), } }