Fix web push delivery handling and worker console
Some checks failed
docker-images / resolve-build-targets (push) Successful in 5s
docker-images / build-and-push (admin) (push) Successful in 30s
docker-images / submit-indexnow (push) Has been cancelled
docker-images / build-and-push (frontend) (push) Has been cancelled
docker-images / build-and-push (backend) (push) Has been cancelled

This commit is contained in:
2026-04-04 04:15:20 +08:00
parent ab18bbaf23
commit 381dc9b854
19 changed files with 1607 additions and 747 deletions

View File

@@ -2537,6 +2537,18 @@ async fn update_reindex_job_progress(
.await
}
async fn stop_reindex_if_cancel_requested(ctx: &AppContext, job_id: Option<i32>) -> Result<()> {
let Some(job_id) = job_id else {
return Ok(());
};
if worker_jobs::cancel_job_if_requested(ctx, job_id, "job cancelled during reindex").await? {
return Err(Error::BadRequest("job cancelled".to_string()));
}
Ok(())
}
async fn load_runtime_settings(
ctx: &AppContext,
require_enabled: bool,
@@ -2729,6 +2741,7 @@ pub async fn rebuild_index(ctx: &AppContext, job_id: Option<i32>) -> Result<AiIn
batch_size,
);
update_reindex_job_progress(ctx, job_id, &preparing_progress).await?;
stop_reindex_if_cancel_requested(ctx, job_id).await?;
let txn = ctx.db.begin().await?;
txn.execute(Statement::from_string(
@@ -2740,6 +2753,7 @@ pub async fn rebuild_index(ctx: &AppContext, job_id: Option<i32>) -> Result<AiIn
let mut processed_chunks = 0usize;
for chunk_batch in chunk_drafts.chunks(batch_size) {
stop_reindex_if_cancel_requested(ctx, job_id).await?;
let embeddings = embed_texts_locally_with_batch_size(
chunk_batch
.iter()
@@ -2799,6 +2813,7 @@ pub async fn rebuild_index(ctx: &AppContext, job_id: Option<i32>) -> Result<AiIn
update_reindex_job_progress(ctx, job_id, &embedding_progress).await?;
}
stop_reindex_if_cancel_requested(ctx, job_id).await?;
let last_indexed_at = update_indexed_at(&txn, &settings.raw).await?;
txn.commit().await?;

View File

@@ -40,7 +40,12 @@ pub const DELIVERY_STATUS_RETRY_PENDING: &str = "retry_pending";
pub const DELIVERY_STATUS_EXHAUSTED: &str = "exhausted";
pub const DELIVERY_STATUS_SKIPPED: &str = "skipped";
const MAX_DELIVERY_ATTEMPTS: i32 = 5;
const WEB_PUSH_TITLE_MAX_CHARS: usize = 72;
const WEB_PUSH_BODY_MAX_CHARS: usize = 160;
const WEB_PUSH_MAX_PAYLOAD_BYTES: usize = 2800;
const WEB_PUSH_AUTO_PAUSE_FAILURE_THRESHOLD: i32 = 2;
const WEB_PUSH_AUTO_PAUSE_NOTE: &str =
"浏览器推送订阅连续投递失败,系统已自动暂停。请在浏览器里重新开启提醒。";
#[derive(Clone, Debug, Serialize)]
pub struct DigestDispatchSummary {
@@ -259,6 +264,97 @@ fn merge_browser_push_metadata(
Value::Object(object)
}
fn merge_subscription_note(existing: Option<&str>, note: &str) -> Option<String> {
let note = note.trim();
let mut lines = existing
.unwrap_or_default()
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.map(ToString::to_string)
.collect::<Vec<_>>();
if !note.is_empty() && !lines.iter().any(|line| line == note) {
lines.push(note.to_string());
}
if lines.is_empty() {
None
} else {
Some(lines.join("\n"))
}
}
fn remove_subscription_note(existing: Option<&str>, note: &str) -> Option<String> {
let note = note.trim();
let lines = existing
.unwrap_or_default()
.lines()
.map(str::trim)
.filter(|line| !line.is_empty() && *line != note)
.map(ToString::to_string)
.collect::<Vec<_>>();
if lines.is_empty() {
None
} else {
Some(lines.join("\n"))
}
}
fn web_push_error_looks_terminal(error_text: &str) -> bool {
let normalized = error_text.trim().to_ascii_lowercase();
normalized.contains("endpoint_host=fcm.googleapis.com")
&& normalized.contains("unspecified error")
|| normalized.contains("410")
|| normalized.contains("404")
|| normalized.contains("gone")
|| normalized.contains("not found")
|| normalized.contains("expired")
|| normalized.contains("unsubscribed")
|| normalized.contains("invalid subscription")
|| normalized.contains("push subscription")
}
fn should_auto_pause_failed_web_push_subscription(
failure_count_after_error: i32,
error_text: &str,
) -> bool {
failure_count_after_error >= WEB_PUSH_AUTO_PAUSE_FAILURE_THRESHOLD
|| web_push_error_looks_terminal(error_text)
}
async fn maybe_pause_failed_web_push_subscription(
ctx: &AppContext,
subscription: Option<&subscriptions::Model>,
error_text: &str,
) -> Result<()> {
let Some(subscription) = subscription else {
return Ok(());
};
if subscription.channel_type != CHANNEL_WEB_PUSH
|| normalize_status(&subscription.status) != STATUS_ACTIVE
{
return Ok(());
}
let failure_count_after_error = subscription.failure_count.unwrap_or(0) + 1;
if !should_auto_pause_failed_web_push_subscription(failure_count_after_error, error_text) {
return Ok(());
}
let mut active = subscription.clone().into_active_model();
active.status = Set(STATUS_PAUSED.to_string());
active.notes = Set(merge_subscription_note(
subscription.notes.as_deref(),
WEB_PUSH_AUTO_PAUSE_NOTE,
));
let _ = active.update(&ctx.db).await?;
Ok(())
}
fn json_string_list(value: Option<&Value>, key: &str) -> Vec<String> {
value
.and_then(Value::as_object)
@@ -321,16 +417,6 @@ fn payload_match_strings(payload: &Value, key: &str) -> Vec<String> {
values
}
fn delivery_retry_delay(attempts: i32) -> Duration {
match attempts {
0 | 1 => Duration::minutes(1),
2 => Duration::minutes(5),
3 => Duration::minutes(15),
4 => Duration::minutes(60),
_ => Duration::hours(6),
}
}
fn effective_period(period: &str) -> (&'static str, i64, &'static str) {
match period.trim().to_ascii_lowercase().as_str() {
"monthly" | "month" | "30d" => ("monthly", 30, EVENT_DIGEST_MONTHLY),
@@ -680,6 +766,12 @@ pub async fn create_public_web_push_subscription(
active.status = Set(STATUS_ACTIVE.to_string());
active.confirm_token = Set(None);
active.verified_at = Set(Some(Utc::now().to_rfc3339()));
active.failure_count = Set(Some(0));
active.last_delivery_status = Set(None);
active.notes = Set(remove_subscription_note(
existing.notes.as_deref(),
WEB_PUSH_AUTO_PAUSE_NOTE,
));
active.metadata = Set(Some(merge_browser_push_metadata(
existing.metadata.as_ref(),
metadata,
@@ -1066,26 +1158,47 @@ fn web_push_target_url(message: &QueuedDeliveryPayload) -> Option<String> {
}
fn build_web_push_payload(message: &QueuedDeliveryPayload) -> Value {
let body = truncate_chars(&collapse_whitespace(&message.text), 220);
let title = truncate_chars(
&collapse_whitespace(&message.subject),
WEB_PUSH_TITLE_MAX_CHARS,
);
let body = truncate_chars(&collapse_whitespace(&message.text), WEB_PUSH_BODY_MAX_CHARS);
let url = web_push_target_url(message);
let event_type = message
.payload
.get("event_type")
.and_then(Value::as_str)
.unwrap_or("subscription");
serde_json::json!({
"title": message.subject,
"title": title,
"body": body,
"icon": site_asset_url(message.site_url.as_deref(), "/favicon.svg"),
"badge": site_asset_url(message.site_url.as_deref(), "/favicon.ico"),
"url": web_push_target_url(message),
"tag": message
.payload
.get("event_type")
.and_then(Value::as_str)
.unwrap_or("subscription"),
"url": url.clone(),
"tag": event_type,
"data": {
"event_type": message.payload.get("event_type").cloned().unwrap_or(Value::Null),
"payload": message.payload,
"url": url,
"event_type": event_type,
}
})
}
fn encode_web_push_payload(message: &QueuedDeliveryPayload) -> Result<Vec<u8>> {
let payload = build_web_push_payload(message);
let encoded = serde_json::to_vec(&payload)?;
if encoded.len() > WEB_PUSH_MAX_PAYLOAD_BYTES {
return Err(Error::BadRequest(format!(
"web push payload too large: {} bytes exceeds safe limit {} bytes",
encoded.len(),
WEB_PUSH_MAX_PAYLOAD_BYTES
)));
}
Ok(encoded)
}
async fn deliver_via_channel(
ctx: &AppContext,
channel_type: &str,
@@ -1126,7 +1239,7 @@ async fn deliver_via_channel(
CHANNEL_WEB_PUSH => {
let settings = crate::controllers::site_settings::load_current(ctx).await?;
let subscription_info = web_push_service::subscription_info_from_metadata(metadata)?;
let payload = serde_json::to_vec(&build_web_push_payload(message))?;
let payload = encode_web_push_payload(message)?;
web_push_service::send_payload(
&settings,
&subscription_info,
@@ -1275,24 +1388,25 @@ pub async fn process_delivery(ctx: &AppContext, delivery_id: i32) -> Result<()>
.await?;
}
Err(error) => {
let next_retry_at = (attempts < MAX_DELIVERY_ATTEMPTS)
.then(|| (Utc::now() + delivery_retry_delay(attempts)).to_rfc3339());
let status = if next_retry_at.is_some() {
DELIVERY_STATUS_RETRY_PENDING
} else {
DELIVERY_STATUS_EXHAUSTED
};
let error_text = error.to_string();
let mut active = delivery.into_active_model();
active.status = Set(status.to_string());
active.status = Set(DELIVERY_STATUS_EXHAUSTED.to_string());
active.provider = Set(Some(provider_name(&delivery_channel_type).to_string()));
active.response_text = Set(Some(error.to_string()));
active.response_text = Set(Some(error_text.clone()));
active.attempts_count = Set(attempts);
active.last_attempt_at = Set(Some(Utc::now().to_rfc3339()));
active.next_retry_at = Set(next_retry_at);
active.next_retry_at = Set(None);
active.delivered_at = Set(Some(Utc::now().to_rfc3339()));
let _ = active.update(&ctx.db).await?;
update_subscription_delivery_state(ctx, subscription_id, status, false).await?;
update_subscription_delivery_state(
ctx,
subscription_id,
DELIVERY_STATUS_EXHAUSTED,
false,
)
.await?;
maybe_pause_failed_web_push_subscription(ctx, subscription.as_ref(), &error_text)
.await?;
Err(error)?;
}
}

View File

@@ -1,4 +1,5 @@
use loco_rs::prelude::*;
use reqwest::Url;
use serde_json::Value;
use web_push::{
ContentEncoding, HyperWebPushClient, SubscriptionInfo, Urgency, VapidSignatureBuilder,
@@ -46,17 +47,30 @@ pub fn vapid_subject(settings: &site_settings::Model) -> Option<String> {
.or_else(|| env_value(ENV_WEB_PUSH_VAPID_SUBJECT))
}
fn normalize_vapid_subject(value: Option<String>) -> Option<String> {
value.and_then(|item| {
let trimmed = item.trim();
if trimmed.starts_with("mailto:") || trimmed.starts_with("https://") {
Some(trimmed.to_string())
} else {
None
}
})
}
fn effective_vapid_subject(settings: &site_settings::Model, site_url: Option<&str>) -> String {
vapid_subject(settings)
.or_else(|| {
site_url
.map(str::trim)
.filter(|value| value.starts_with("http://") || value.starts_with("https://"))
.map(ToString::to_string)
})
normalize_vapid_subject(vapid_subject(settings))
.or_else(|| normalize_vapid_subject(site_url.map(ToString::to_string)))
.unwrap_or_else(|| "mailto:noreply@example.com".to_string())
}
fn subscription_endpoint_host(subscription_info: &SubscriptionInfo) -> String {
Url::parse(&subscription_info.endpoint)
.ok()
.and_then(|url| url.host_str().map(ToString::to_string))
.unwrap_or_else(|| "unknown".to_string())
}
pub fn public_key_configured(settings: &site_settings::Model) -> bool {
public_key(settings).is_some()
}
@@ -90,10 +104,12 @@ pub async fn send_payload(
) -> Result<()> {
let private_key = private_key(settings)
.ok_or_else(|| Error::BadRequest("web push VAPID private key 未配置".to_string()))?;
let subject = effective_vapid_subject(settings, site_url);
let endpoint_host = subscription_endpoint_host(subscription_info);
let mut signature_builder = VapidSignatureBuilder::from_base64(&private_key, subscription_info)
.map_err(|error| Error::BadRequest(format!("web push vapid build failed: {error}")))?;
signature_builder.add_claim("sub", effective_vapid_subject(settings, site_url));
signature_builder.add_claim("sub", subject.clone());
let signature = signature_builder
.build()
.map_err(|error| Error::BadRequest(format!("web push vapid sign failed: {error}")))?;
@@ -111,10 +127,11 @@ pub async fn send_payload(
.build()
.map_err(|error| Error::BadRequest(format!("web push message build failed: {error}")))?;
client
.send(message)
.await
.map_err(|error| Error::BadRequest(format!("web push send failed: {error}")))?;
client.send(message).await.map_err(|error| {
Error::BadRequest(format!(
"web push send failed: {error}; vapid subject={subject}; endpoint_host={endpoint_host}"
))
})?;
Ok(())
}

View File

@@ -614,6 +614,20 @@ pub async fn update_job_result(ctx: &AppContext, id: i32, result: Value) -> Resu
Ok(())
}
pub async fn cancel_job_if_requested(ctx: &AppContext, id: i32, reason: &str) -> Result<bool> {
let item = find_job(ctx, id).await?;
if item.status == JOB_STATUS_CANCELLED {
return Ok(true);
}
if item.cancel_requested {
finish_job_cancelled(ctx, id, Some(reason.to_string())).await?;
return Ok(true);
}
Ok(false)
}
pub async fn mark_job_failed(ctx: &AppContext, id: i32, error_text: String) -> Result<()> {
let item = find_job(ctx, id).await?;
let mut active = item.into_active_model();
@@ -708,6 +722,13 @@ pub async fn queue_notification_delivery_job(
.one(&ctx.db)
.await?
.ok_or(Error::NotFound)?;
let mut delivery_active = delivery.clone().into_active_model();
delivery_active.status = Set(subscriptions::DELIVERY_STATUS_QUEUED.to_string());
delivery_active.response_text = Set(None);
delivery_active.next_retry_at = Set(None);
delivery_active.delivered_at = Set(None);
delivery_active.attempts_count = Set(0);
let delivery = delivery_active.update(&ctx.db).await?;
let base_args = NotificationDeliveryWorkerArgs {
delivery_id,

View File

@@ -55,6 +55,16 @@ impl BackgroundWorker<AiReindexWorkerArgs> for AiReindexWorker {
Ok(())
}
Err(error) => {
if worker_jobs::cancel_job_if_requested(
&self.ctx,
job_id,
"job cancelled during reindex",
)
.await?
{
return Ok(());
}
worker_jobs::mark_job_failed(&self.ctx, job_id, error.to_string()).await?;
Err(error)
}

View File

@@ -20,10 +20,6 @@ impl BackgroundWorker<NotificationDeliveryWorkerArgs> for NotificationDeliveryWo
Self { ctx: ctx.clone() }
}
fn tags() -> Vec<String> {
vec!["notifications".to_string()]
}
async fn perform(&self, args: NotificationDeliveryWorkerArgs) -> Result<()> {
if let Some(job_id) = args.job_id {
if !worker_jobs::begin_job_execution(&self.ctx, job_id).await? {