feat: add worker operations and fix gitea actions
Some checks failed
docker-images / build-and-push (admin, admin, termi-astro-admin, admin/Dockerfile) (push) Successful in 29s
docker-images / build-and-push (backend, backend, termi-astro-backend, backend/Dockerfile) (push) Successful in 33m13s
docker-images / build-and-push (frontend, frontend, termi-astro-frontend, frontend/Dockerfile) (push) Successful in 58s
ui-regression / playwright-regression (push) Failing after 13m24s

This commit is contained in:
2026-04-02 03:43:37 +08:00
parent ee0bec4a78
commit a516be2e91
37 changed files with 3890 additions and 879 deletions

View File

@@ -0,0 +1,835 @@
use chrono::Utc;
use loco_rs::{
bgworker::BackgroundWorker,
prelude::*,
};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, Order,
PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Set,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use crate::{
models::_entities::{notification_deliveries, worker_jobs},
services::subscriptions,
workers::{
downloader::{DownloadWorker, DownloadWorkerArgs},
notification_delivery::{NotificationDeliveryWorker, NotificationDeliveryWorkerArgs},
},
};
pub const JOB_KIND_WORKER: &str = "worker";
pub const JOB_KIND_TASK: &str = "task";
pub const JOB_STATUS_QUEUED: &str = "queued";
pub const JOB_STATUS_RUNNING: &str = "running";
pub const JOB_STATUS_SUCCEEDED: &str = "succeeded";
pub const JOB_STATUS_FAILED: &str = "failed";
pub const JOB_STATUS_CANCELLED: &str = "cancelled";
pub const WORKER_DOWNLOAD_MEDIA: &str = "worker.download_media";
pub const WORKER_NOTIFICATION_DELIVERY: &str = "worker.notification_delivery";
pub const TASK_RETRY_DELIVERIES: &str = "task.retry_deliveries";
pub const TASK_SEND_WEEKLY_DIGEST: &str = "task.send_weekly_digest";
pub const TASK_SEND_MONTHLY_DIGEST: &str = "task.send_monthly_digest";
#[derive(Clone, Debug, Default)]
pub struct WorkerJobListQuery {
pub status: Option<String>,
pub job_kind: Option<String>,
pub worker_name: Option<String>,
pub search: Option<String>,
pub limit: Option<u64>,
}
#[derive(Clone, Debug, Serialize)]
pub struct WorkerCatalogEntry {
pub worker_name: String,
pub job_kind: String,
pub label: String,
pub description: String,
pub queue_name: Option<String>,
pub supports_cancel: bool,
pub supports_retry: bool,
}
#[derive(Clone, Debug, Serialize)]
pub struct WorkerStats {
pub worker_name: String,
pub job_kind: String,
pub label: String,
pub queued: usize,
pub running: usize,
pub succeeded: usize,
pub failed: usize,
pub cancelled: usize,
pub last_job_at: Option<String>,
}
#[derive(Clone, Debug, Serialize)]
pub struct WorkerOverview {
pub total_jobs: usize,
pub queued: usize,
pub running: usize,
pub succeeded: usize,
pub failed: usize,
pub cancelled: usize,
pub active_jobs: usize,
pub worker_stats: Vec<WorkerStats>,
pub catalog: Vec<WorkerCatalogEntry>,
}
#[derive(Clone, Debug, Serialize)]
pub struct WorkerJobRecord {
pub created_at: String,
pub updated_at: String,
pub id: i32,
pub parent_job_id: Option<i32>,
pub job_kind: String,
pub worker_name: String,
pub display_name: Option<String>,
pub status: String,
pub queue_name: Option<String>,
pub requested_by: Option<String>,
pub requested_source: Option<String>,
pub trigger_mode: Option<String>,
pub payload: Option<Value>,
pub result: Option<Value>,
pub error_text: Option<String>,
pub tags: Option<Value>,
pub related_entity_type: Option<String>,
pub related_entity_id: Option<String>,
pub attempts_count: i32,
pub max_attempts: i32,
pub cancel_requested: bool,
pub queued_at: Option<String>,
pub started_at: Option<String>,
pub finished_at: Option<String>,
pub can_cancel: bool,
pub can_retry: bool,
}
#[derive(Clone, Debug, Serialize)]
pub struct WorkerJobListResult {
pub total: u64,
pub jobs: Vec<WorkerJobRecord>,
}
#[derive(Clone, Debug, Serialize)]
pub struct WorkerTaskDispatchResult {
pub queued: bool,
pub job: WorkerJobRecord,
}
#[derive(Clone, Debug)]
struct CreateWorkerJobInput {
parent_job_id: Option<i32>,
job_kind: String,
worker_name: String,
display_name: Option<String>,
queue_name: Option<String>,
requested_by: Option<String>,
requested_source: Option<String>,
trigger_mode: Option<String>,
payload: Option<Value>,
tags: Option<Value>,
related_entity_type: Option<String>,
related_entity_id: Option<String>,
max_attempts: i32,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct RetryDeliveriesTaskPayload {
#[serde(default)]
limit: Option<u64>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct DigestTaskPayload {
period: String,
}
fn now_rfc3339() -> String {
Utc::now().to_rfc3339()
}
fn trim_to_option(value: Option<String>) -> Option<String> {
value.and_then(|item| {
let trimmed = item.trim().to_string();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
})
}
fn queue_name_for(worker_name: &str) -> Option<String> {
match worker_name {
WORKER_DOWNLOAD_MEDIA => Some("media".to_string()),
WORKER_NOTIFICATION_DELIVERY => Some("notifications".to_string()),
TASK_RETRY_DELIVERIES => Some("maintenance".to_string()),
TASK_SEND_WEEKLY_DIGEST | TASK_SEND_MONTHLY_DIGEST => Some("digests".to_string()),
_ => None,
}
}
fn label_for(worker_name: &str) -> String {
match worker_name {
WORKER_DOWNLOAD_MEDIA => "远程媒体下载".to_string(),
WORKER_NOTIFICATION_DELIVERY => "通知投递".to_string(),
TASK_RETRY_DELIVERIES => "重试待投递通知".to_string(),
TASK_SEND_WEEKLY_DIGEST => "发送周报".to_string(),
TASK_SEND_MONTHLY_DIGEST => "发送月报".to_string(),
_ => worker_name.to_string(),
}
}
fn description_for(worker_name: &str) -> String {
match worker_name {
WORKER_DOWNLOAD_MEDIA => "抓取远程图片 / PDF 到媒体库,并回写媒体元数据。".to_string(),
WORKER_NOTIFICATION_DELIVERY => "执行订阅通知、测试通知与 digest 投递。".to_string(),
TASK_RETRY_DELIVERIES => "扫描 retry_pending 的通知记录并重新入队。".to_string(),
TASK_SEND_WEEKLY_DIGEST => "根据近期内容生成周报,并为活跃订阅目标入队。".to_string(),
TASK_SEND_MONTHLY_DIGEST => "根据近期内容生成月报,并为活跃订阅目标入队。".to_string(),
_ => "后台异步任务。".to_string(),
}
}
fn tags_for(worker_name: &str) -> Value {
match worker_name {
WORKER_DOWNLOAD_MEDIA => json!(["media", "download"]),
WORKER_NOTIFICATION_DELIVERY => json!(["notifications", "delivery"]),
TASK_RETRY_DELIVERIES => json!(["maintenance", "retry"]),
TASK_SEND_WEEKLY_DIGEST => json!(["digest", "weekly"]),
TASK_SEND_MONTHLY_DIGEST => json!(["digest", "monthly"]),
_ => json!([]),
}
}
fn can_cancel_status(status: &str, cancel_requested: bool) -> bool {
!cancel_requested && matches!(status, JOB_STATUS_QUEUED | JOB_STATUS_RUNNING)
}
fn can_retry_status(status: &str) -> bool {
matches!(status, JOB_STATUS_FAILED | JOB_STATUS_CANCELLED | JOB_STATUS_SUCCEEDED)
}
fn to_job_record(item: worker_jobs::Model) -> WorkerJobRecord {
WorkerJobRecord {
created_at: item.created_at.to_rfc3339(),
updated_at: item.updated_at.to_rfc3339(),
id: item.id,
parent_job_id: item.parent_job_id,
job_kind: item.job_kind,
worker_name: item.worker_name,
display_name: item.display_name,
status: item.status.clone(),
queue_name: item.queue_name,
requested_by: item.requested_by,
requested_source: item.requested_source,
trigger_mode: item.trigger_mode,
payload: item.payload,
result: item.result,
error_text: item.error_text,
tags: item.tags,
related_entity_type: item.related_entity_type,
related_entity_id: item.related_entity_id,
attempts_count: item.attempts_count,
max_attempts: item.max_attempts,
cancel_requested: item.cancel_requested,
queued_at: item.queued_at,
started_at: item.started_at,
finished_at: item.finished_at,
can_cancel: can_cancel_status(&item.status, item.cancel_requested),
can_retry: can_retry_status(&item.status),
}
}
fn catalog_entries() -> Vec<WorkerCatalogEntry> {
[
(WORKER_DOWNLOAD_MEDIA, JOB_KIND_WORKER, true, true),
(WORKER_NOTIFICATION_DELIVERY, JOB_KIND_WORKER, true, true),
(TASK_RETRY_DELIVERIES, JOB_KIND_TASK, true, true),
(TASK_SEND_WEEKLY_DIGEST, JOB_KIND_TASK, true, true),
(TASK_SEND_MONTHLY_DIGEST, JOB_KIND_TASK, true, true),
]
.into_iter()
.map(|(worker_name, job_kind, supports_cancel, supports_retry)| WorkerCatalogEntry {
worker_name: worker_name.to_string(),
job_kind: job_kind.to_string(),
label: label_for(worker_name),
description: description_for(worker_name),
queue_name: queue_name_for(worker_name),
supports_cancel,
supports_retry,
})
.collect()
}
async fn create_job(ctx: &AppContext, input: CreateWorkerJobInput) -> Result<worker_jobs::Model> {
Ok(worker_jobs::ActiveModel {
parent_job_id: Set(input.parent_job_id),
job_kind: Set(input.job_kind),
worker_name: Set(input.worker_name),
display_name: Set(trim_to_option(input.display_name)),
status: Set(JOB_STATUS_QUEUED.to_string()),
queue_name: Set(trim_to_option(input.queue_name)),
requested_by: Set(trim_to_option(input.requested_by)),
requested_source: Set(trim_to_option(input.requested_source)),
trigger_mode: Set(trim_to_option(input.trigger_mode)),
payload: Set(input.payload),
result: Set(None),
error_text: Set(None),
tags: Set(input.tags),
related_entity_type: Set(trim_to_option(input.related_entity_type)),
related_entity_id: Set(trim_to_option(input.related_entity_id)),
attempts_count: Set(0),
max_attempts: Set(input.max_attempts.max(1)),
cancel_requested: Set(false),
queued_at: Set(Some(now_rfc3339())),
started_at: Set(None),
finished_at: Set(None),
..Default::default()
}
.insert(&ctx.db)
.await?)
}
async fn find_job(ctx: &AppContext, id: i32) -> Result<worker_jobs::Model> {
worker_jobs::Entity::find_by_id(id)
.one(&ctx.db)
.await?
.ok_or(Error::NotFound)
}
async fn dispatch_download(args_ctx: AppContext, args: DownloadWorkerArgs) {
let worker = DownloadWorker::build(&args_ctx);
if let Err(error) = worker.perform(args).await {
tracing::warn!("download worker execution failed: {error}");
}
}
async fn dispatch_notification_delivery(args_ctx: AppContext, args: NotificationDeliveryWorkerArgs) {
let worker = NotificationDeliveryWorker::build(&args_ctx);
if let Err(error) = worker.perform(args).await {
tracing::warn!("notification delivery worker execution failed: {error}");
}
}
async fn enqueue_download_worker(ctx: &AppContext, args: DownloadWorkerArgs) -> Result<()> {
match DownloadWorker::perform_later(ctx, args.clone()).await {
Ok(_) => Ok(()),
Err(Error::QueueProviderMissing) => {
tokio::spawn(dispatch_download(ctx.clone(), args));
Ok(())
}
Err(error) => {
tracing::warn!("download worker queue unavailable, falling back to local task: {error}");
tokio::spawn(dispatch_download(ctx.clone(), args));
Ok(())
}
}
}
async fn enqueue_notification_worker(
ctx: &AppContext,
args: NotificationDeliveryWorkerArgs,
) -> Result<()> {
match NotificationDeliveryWorker::perform_later(ctx, args.clone()).await {
Ok(_) => Ok(()),
Err(Error::QueueProviderMissing) => {
tokio::spawn(dispatch_notification_delivery(ctx.clone(), args));
Ok(())
}
Err(error) => {
tracing::warn!("notification worker queue unavailable, falling back to local task: {error}");
tokio::spawn(dispatch_notification_delivery(ctx.clone(), args));
Ok(())
}
}
}
async fn run_retry_deliveries_task(ctx: AppContext, job_id: i32, limit: Option<u64>) {
match begin_job_execution(&ctx, job_id).await {
Ok(true) => {}
Ok(false) => return,
Err(error) => {
tracing::warn!("failed to start retry deliveries job #{job_id}: {error}");
return;
}
}
let result = async {
let effective_limit = limit.unwrap_or(60);
let queued = subscriptions::retry_due_deliveries(&ctx, effective_limit).await?;
mark_job_succeeded(
&ctx,
job_id,
Some(json!({
"limit": effective_limit,
"queued": queued,
})),
)
.await
}
.await;
if let Err(error) = result {
let _ = mark_job_failed(&ctx, job_id, error.to_string()).await;
}
}
async fn run_digest_task(ctx: AppContext, job_id: i32, period: String) {
match begin_job_execution(&ctx, job_id).await {
Ok(true) => {}
Ok(false) => return,
Err(error) => {
tracing::warn!("failed to start digest job #{job_id}: {error}");
return;
}
}
let result = async {
let summary = subscriptions::send_digest(&ctx, &period).await?;
mark_job_succeeded(
&ctx,
job_id,
Some(json!({
"period": summary.period,
"post_count": summary.post_count,
"queued": summary.queued,
"skipped": summary.skipped,
})),
)
.await
}
.await;
if let Err(error) = result {
let _ = mark_job_failed(&ctx, job_id, error.to_string()).await;
}
}
pub async fn get_overview(ctx: &AppContext) -> Result<WorkerOverview> {
let items = worker_jobs::Entity::find()
.order_by(worker_jobs::Column::CreatedAt, Order::Desc)
.all(&ctx.db)
.await?;
let mut overview = WorkerOverview {
total_jobs: items.len(),
queued: 0,
running: 0,
succeeded: 0,
failed: 0,
cancelled: 0,
active_jobs: 0,
worker_stats: Vec::new(),
catalog: catalog_entries(),
};
let mut grouped = std::collections::BTreeMap::<String, WorkerStats>::new();
for item in items {
match item.status.as_str() {
JOB_STATUS_QUEUED => overview.queued += 1,
JOB_STATUS_RUNNING => overview.running += 1,
JOB_STATUS_SUCCEEDED => overview.succeeded += 1,
JOB_STATUS_FAILED => overview.failed += 1,
JOB_STATUS_CANCELLED => overview.cancelled += 1,
_ => {}
}
let entry = grouped.entry(item.worker_name.clone()).or_insert_with(|| WorkerStats {
worker_name: item.worker_name.clone(),
job_kind: item.job_kind.clone(),
label: label_for(&item.worker_name),
queued: 0,
running: 0,
succeeded: 0,
failed: 0,
cancelled: 0,
last_job_at: None,
});
match item.status.as_str() {
JOB_STATUS_QUEUED => entry.queued += 1,
JOB_STATUS_RUNNING => entry.running += 1,
JOB_STATUS_SUCCEEDED => entry.succeeded += 1,
JOB_STATUS_FAILED => entry.failed += 1,
JOB_STATUS_CANCELLED => entry.cancelled += 1,
_ => {}
}
if entry.last_job_at.is_none() {
entry.last_job_at = Some(item.created_at.to_rfc3339());
}
}
overview.active_jobs = overview.queued + overview.running;
overview.worker_stats = grouped.into_values().collect();
Ok(overview)
}
pub async fn list_jobs(ctx: &AppContext, query: WorkerJobListQuery) -> Result<WorkerJobListResult> {
let mut db_query = worker_jobs::Entity::find().order_by(worker_jobs::Column::CreatedAt, Order::Desc);
if let Some(status) = query.status.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
db_query = db_query.filter(worker_jobs::Column::Status.eq(status));
}
if let Some(job_kind) = query.job_kind.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
db_query = db_query.filter(worker_jobs::Column::JobKind.eq(job_kind));
}
if let Some(worker_name) = query.worker_name.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
db_query = db_query.filter(worker_jobs::Column::WorkerName.eq(worker_name));
}
if let Some(search) = query.search.map(|value| value.trim().to_string()).filter(|value| !value.is_empty()) {
db_query = db_query.filter(
Condition::any()
.add(worker_jobs::Column::WorkerName.contains(search.clone()))
.add(worker_jobs::Column::DisplayName.contains(search.clone()))
.add(worker_jobs::Column::RelatedEntityId.contains(search.clone()))
.add(worker_jobs::Column::RelatedEntityType.contains(search)),
);
}
let total = db_query.clone().count(&ctx.db).await?;
let limit = query.limit.unwrap_or(120);
let items = db_query.limit(limit).all(&ctx.db).await?;
Ok(WorkerJobListResult {
total,
jobs: items.into_iter().map(to_job_record).collect(),
})
}
pub async fn get_job_record(ctx: &AppContext, id: i32) -> Result<WorkerJobRecord> {
Ok(to_job_record(find_job(ctx, id).await?))
}
pub async fn find_latest_job_by_related_entity(
ctx: &AppContext,
related_entity_type: &str,
related_entity_id: &str,
worker_name: Option<&str>,
) -> Result<Option<WorkerJobRecord>> {
let mut query = worker_jobs::Entity::find()
.filter(worker_jobs::Column::RelatedEntityType.eq(related_entity_type.to_string()))
.filter(worker_jobs::Column::RelatedEntityId.eq(related_entity_id.to_string()))
.order_by(worker_jobs::Column::CreatedAt, Order::Desc);
if let Some(worker_name) = worker_name.map(str::trim).filter(|value| !value.is_empty()) {
query = query.filter(worker_jobs::Column::WorkerName.eq(worker_name.to_string()));
}
Ok(query.one(&ctx.db).await?.map(to_job_record))
}
pub async fn begin_job_execution(ctx: &AppContext, id: i32) -> Result<bool> {
let item = find_job(ctx, id).await?;
if item.status == JOB_STATUS_CANCELLED {
return Ok(false);
}
if item.cancel_requested {
finish_job_cancelled(ctx, id, Some("job cancelled before execution".to_string())).await?;
return Ok(false);
}
let attempts_count = item.attempts_count + 1;
let mut active = item.into_active_model();
active.status = Set(JOB_STATUS_RUNNING.to_string());
active.started_at = Set(Some(now_rfc3339()));
active.finished_at = Set(None);
active.error_text = Set(None);
active.result = Set(None);
active.attempts_count = Set(attempts_count);
let _ = active.update(&ctx.db).await?;
Ok(true)
}
pub async fn mark_job_succeeded(ctx: &AppContext, id: i32, result: Option<Value>) -> Result<()> {
let item = find_job(ctx, id).await?;
let mut active = item.into_active_model();
active.status = Set(JOB_STATUS_SUCCEEDED.to_string());
active.result = Set(result);
active.error_text = Set(None);
active.finished_at = Set(Some(now_rfc3339()));
active.update(&ctx.db).await?;
Ok(())
}
pub async fn mark_job_failed(ctx: &AppContext, id: i32, error_text: String) -> Result<()> {
let item = find_job(ctx, id).await?;
let mut active = item.into_active_model();
active.status = Set(JOB_STATUS_FAILED.to_string());
active.error_text = Set(Some(error_text));
active.finished_at = Set(Some(now_rfc3339()));
active.update(&ctx.db).await?;
Ok(())
}
pub async fn finish_job_cancelled(
ctx: &AppContext,
id: i32,
error_text: Option<String>,
) -> Result<()> {
let item = find_job(ctx, id).await?;
let mut active = item.into_active_model();
active.status = Set(JOB_STATUS_CANCELLED.to_string());
active.cancel_requested = Set(true);
active.finished_at = Set(Some(now_rfc3339()));
if error_text.is_some() {
active.error_text = Set(error_text);
}
active.update(&ctx.db).await?;
Ok(())
}
pub async fn request_cancel(ctx: &AppContext, id: i32) -> Result<WorkerJobRecord> {
let item = find_job(ctx, id).await?;
let mut active = item.clone().into_active_model();
active.cancel_requested = Set(true);
if item.status == JOB_STATUS_QUEUED {
active.status = Set(JOB_STATUS_CANCELLED.to_string());
active.finished_at = Set(Some(now_rfc3339()));
active.error_text = Set(Some("job cancelled before start".to_string()));
}
let updated = active.update(&ctx.db).await?;
Ok(to_job_record(updated))
}
pub async fn queue_download_job(
ctx: &AppContext,
args: &DownloadWorkerArgs,
requested_by: Option<String>,
requested_source: Option<String>,
parent_job_id: Option<i32>,
trigger_mode: Option<String>,
) -> Result<WorkerJobRecord> {
let payload = serde_json::to_value(args)?;
let job = create_job(
ctx,
CreateWorkerJobInput {
parent_job_id,
job_kind: JOB_KIND_WORKER.to_string(),
worker_name: WORKER_DOWNLOAD_MEDIA.to_string(),
display_name: Some(
args.title
.clone()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| format!("download {}", args.source_url)),
),
queue_name: queue_name_for(WORKER_DOWNLOAD_MEDIA),
requested_by,
requested_source,
trigger_mode,
payload: Some(payload),
tags: Some(tags_for(WORKER_DOWNLOAD_MEDIA)),
related_entity_type: Some("media_download".to_string()),
related_entity_id: Some(args.source_url.clone()),
max_attempts: 1,
},
)
.await?;
let mut worker_args = args.clone();
worker_args.job_id = Some(job.id);
enqueue_download_worker(ctx, worker_args).await?;
get_job_record(ctx, job.id).await
}
pub async fn queue_notification_delivery_job(
ctx: &AppContext,
delivery_id: i32,
requested_by: Option<String>,
requested_source: Option<String>,
parent_job_id: Option<i32>,
trigger_mode: Option<String>,
) -> Result<WorkerJobRecord> {
let delivery = notification_deliveries::Entity::find_by_id(delivery_id)
.one(&ctx.db)
.await?
.ok_or(Error::NotFound)?;
let base_args = NotificationDeliveryWorkerArgs {
delivery_id,
job_id: None,
};
let payload = serde_json::to_value(&base_args)?;
let display_name = format!("{}{}", delivery.event_type, delivery.target);
let job = create_job(
ctx,
CreateWorkerJobInput {
parent_job_id,
job_kind: JOB_KIND_WORKER.to_string(),
worker_name: WORKER_NOTIFICATION_DELIVERY.to_string(),
display_name: Some(display_name),
queue_name: queue_name_for(WORKER_NOTIFICATION_DELIVERY),
requested_by,
requested_source,
trigger_mode,
payload: Some(payload),
tags: Some(tags_for(WORKER_NOTIFICATION_DELIVERY)),
related_entity_type: Some("notification_delivery".to_string()),
related_entity_id: Some(delivery_id.to_string()),
max_attempts: 1,
},
)
.await?;
let args = NotificationDeliveryWorkerArgs {
delivery_id,
job_id: Some(job.id),
};
enqueue_notification_worker(ctx, args).await?;
get_job_record(ctx, job.id).await
}
pub async fn spawn_retry_deliveries_task(
ctx: &AppContext,
limit: Option<u64>,
requested_by: Option<String>,
requested_source: Option<String>,
parent_job_id: Option<i32>,
trigger_mode: Option<String>,
) -> Result<WorkerJobRecord> {
let payload = serde_json::to_value(RetryDeliveriesTaskPayload { limit })?;
let job = create_job(
ctx,
CreateWorkerJobInput {
parent_job_id,
job_kind: JOB_KIND_TASK.to_string(),
worker_name: TASK_RETRY_DELIVERIES.to_string(),
display_name: Some("重试待投递通知".to_string()),
queue_name: queue_name_for(TASK_RETRY_DELIVERIES),
requested_by,
requested_source,
trigger_mode,
payload: Some(payload),
tags: Some(tags_for(TASK_RETRY_DELIVERIES)),
related_entity_type: Some("notification_delivery".to_string()),
related_entity_id: None,
max_attempts: 1,
},
)
.await?;
tokio::spawn(run_retry_deliveries_task(ctx.clone(), job.id, limit));
get_job_record(ctx, job.id).await
}
pub async fn spawn_digest_task(
ctx: &AppContext,
period: &str,
requested_by: Option<String>,
requested_source: Option<String>,
parent_job_id: Option<i32>,
trigger_mode: Option<String>,
) -> Result<WorkerJobRecord> {
let normalized_period = match period.trim().to_ascii_lowercase().as_str() {
"monthly" => "monthly",
_ => "weekly",
}
.to_string();
let payload = serde_json::to_value(DigestTaskPayload {
period: normalized_period.clone(),
})?;
let worker_name = if normalized_period == "monthly" {
TASK_SEND_MONTHLY_DIGEST
} else {
TASK_SEND_WEEKLY_DIGEST
};
let job = create_job(
ctx,
CreateWorkerJobInput {
parent_job_id,
job_kind: JOB_KIND_TASK.to_string(),
worker_name: worker_name.to_string(),
display_name: Some(if normalized_period == "monthly" {
"发送月报".to_string()
} else {
"发送周报".to_string()
}),
queue_name: queue_name_for(worker_name),
requested_by,
requested_source,
trigger_mode,
payload: Some(payload),
tags: Some(tags_for(worker_name)),
related_entity_type: Some("subscription_digest".to_string()),
related_entity_id: Some(normalized_period.clone()),
max_attempts: 1,
},
)
.await?;
tokio::spawn(run_digest_task(ctx.clone(), job.id, normalized_period));
get_job_record(ctx, job.id).await
}
pub async fn retry_job(
ctx: &AppContext,
id: i32,
requested_by: Option<String>,
requested_source: Option<String>,
) -> Result<WorkerJobRecord> {
let item = find_job(ctx, id).await?;
let payload = item.payload.clone().unwrap_or(Value::Null);
match item.worker_name.as_str() {
WORKER_DOWNLOAD_MEDIA => {
let args = serde_json::from_value::<DownloadWorkerArgs>(payload)?;
queue_download_job(
ctx,
&args,
requested_by,
requested_source,
Some(item.id),
Some("retry".to_string()),
)
.await
}
WORKER_NOTIFICATION_DELIVERY => {
let args = serde_json::from_value::<NotificationDeliveryWorkerArgs>(payload)?;
queue_notification_delivery_job(
ctx,
args.delivery_id,
requested_by,
requested_source,
Some(item.id),
Some("retry".to_string()),
)
.await
}
TASK_RETRY_DELIVERIES => {
let args = serde_json::from_value::<RetryDeliveriesTaskPayload>(payload)?;
spawn_retry_deliveries_task(
ctx,
args.limit,
requested_by,
requested_source,
Some(item.id),
Some("retry".to_string()),
)
.await
}
TASK_SEND_WEEKLY_DIGEST | TASK_SEND_MONTHLY_DIGEST => {
let args = serde_json::from_value::<DigestTaskPayload>(payload)?;
spawn_digest_task(
ctx,
&args.period,
requested_by,
requested_source,
Some(item.id),
Some("retry".to_string()),
)
.await
}
_ => Err(Error::BadRequest(format!("不支持重试任务:{}", item.worker_name))),
}
}