backend: unload local embedding model when idle
All checks were successful
docker-images / resolve-build-targets (push) Successful in 6s
docker-images / build-and-push (admin) (push) Successful in 3s
docker-images / build-and-push (backend) (push) Successful in 16s
docker-images / build-and-push (frontend) (push) Successful in 3s
docker-images / submit-indexnow (push) Has been skipped

This commit is contained in:
2026-04-16 13:35:14 +08:00
parent 7d4f027062
commit c9639ae04e

View File

@@ -14,6 +14,8 @@ use serde_json::{Value, json};
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Mutex, OnceLock}; use std::sync::{Mutex, OnceLock};
use std::thread;
use std::time::{Duration, Instant};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
@@ -40,6 +42,8 @@ pub(crate) const REINDEX_EMBEDDING_BATCH_SIZE: usize = 4;
const EMBEDDING_DIMENSION: usize = 384; const EMBEDDING_DIMENSION: usize = 384;
const LOCAL_EMBEDDING_MODEL_LABEL: &str = "fastembed / local all-MiniLM-L6-v2"; const LOCAL_EMBEDDING_MODEL_LABEL: &str = "fastembed / local all-MiniLM-L6-v2";
const LOCAL_EMBEDDING_CACHE_DIR: &str = "storage/ai_embedding_models/all-minilm-l6-v2"; const LOCAL_EMBEDDING_CACHE_DIR: &str = "storage/ai_embedding_models/all-minilm-l6-v2";
const LOCAL_EMBEDDING_IDLE_TIMEOUT_SECS: u64 = 300;
const LOCAL_EMBEDDING_REAPER_INTERVAL_SECS: u64 = 30;
const LOCAL_EMBEDDING_BASE_URL: &str = const LOCAL_EMBEDDING_BASE_URL: &str =
"https://huggingface.co/Qdrant/all-MiniLM-L6-v2-onnx/resolve/main"; "https://huggingface.co/Qdrant/all-MiniLM-L6-v2-onnx/resolve/main";
const LOCAL_EMBEDDING_FILES: [&str; 5] = [ const LOCAL_EMBEDDING_FILES: [&str; 5] = [
@@ -50,7 +54,13 @@ const LOCAL_EMBEDDING_FILES: [&str; 5] = [
"tokenizer_config.json", "tokenizer_config.json",
]; ];
static TEXT_EMBEDDING_MODEL: OnceLock<Mutex<TextEmbedding>> = OnceLock::new(); static TEXT_EMBEDDING_MODEL: OnceLock<Mutex<Option<LocalEmbeddingRuntime>>> = OnceLock::new();
static TEXT_EMBEDDING_REAPER_STARTED: OnceLock<()> = OnceLock::new();
struct LocalEmbeddingRuntime {
model: TextEmbedding,
last_used_at: Instant,
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct AiImageRuntimeSettings { struct AiImageRuntimeSettings {
@@ -403,18 +413,78 @@ fn load_local_embedding_model() -> Result<TextEmbedding> {
.map_err(|error| Error::BadRequest(format!("本地 embedding 模型初始化失败: {error}"))) .map_err(|error| Error::BadRequest(format!("本地 embedding 模型初始化失败: {error}")))
} }
fn local_embedding_engine() -> Result<&'static Mutex<TextEmbedding>> { fn local_embedding_state() -> &'static Mutex<Option<LocalEmbeddingRuntime>> {
if let Some(model) = TEXT_EMBEDDING_MODEL.get() { TEXT_EMBEDDING_MODEL.get_or_init(|| Mutex::new(None))
return Ok(model); }
fn ensure_local_embedding_reaper_started() {
TEXT_EMBEDDING_REAPER_STARTED.get_or_init(|| {
if let Err(error) = thread::Builder::new()
.name("local-embedding-reaper".to_string())
.spawn(|| {
let idle_timeout = Duration::from_secs(LOCAL_EMBEDDING_IDLE_TIMEOUT_SECS);
let check_interval = Duration::from_secs(LOCAL_EMBEDDING_REAPER_INTERVAL_SECS);
loop {
thread::sleep(check_interval);
let Some(state) = TEXT_EMBEDDING_MODEL.get() else {
continue;
};
let mut guard = match state.lock() {
Ok(guard) => guard,
Err(_) => {
tracing::warn!("failed to lock local embedding model for idle cleanup");
continue;
}
};
let should_unload = guard
.as_ref()
.is_some_and(|runtime| runtime.last_used_at.elapsed() >= idle_timeout);
if should_unload {
*guard = None;
tracing::info!(
"unloaded local embedding model after {} seconds of inactivity",
LOCAL_EMBEDDING_IDLE_TIMEOUT_SECS
);
}
}
})
{
tracing::warn!("failed to start local embedding reaper thread: {error}");
}
});
}
fn with_local_embedding_engine<T>(
operation: impl FnOnce(&mut TextEmbedding) -> Result<T>,
) -> Result<T> {
ensure_local_embedding_reaper_started();
let state = local_embedding_state();
let mut guard = state
.lock()
.map_err(|_| Error::BadRequest("本地 embedding 模型当前不可用,请稍后重试".to_string()))?;
if guard.is_none() {
tracing::info!("loading local embedding model into memory");
*guard = Some(LocalEmbeddingRuntime {
model: load_local_embedding_model()?,
last_used_at: Instant::now(),
});
} }
let model = load_local_embedding_model()?; let runtime = guard
.as_mut()
.ok_or_else(|| Error::BadRequest("本地 embedding 模型未能成功缓存".to_string()))?;
runtime.last_used_at = Instant::now();
let _ = TEXT_EMBEDDING_MODEL.set(Mutex::new(model)); let result = operation(&mut runtime.model);
runtime.last_used_at = Instant::now();
TEXT_EMBEDDING_MODEL result
.get()
.ok_or_else(|| Error::BadRequest("本地 embedding 模型未能成功缓存".to_string()))
} }
fn vector_literal(embedding: &[f64]) -> Result<String> { fn vector_literal(embedding: &[f64]) -> Result<String> {
@@ -793,17 +863,13 @@ async fn embed_texts_locally_with_batch_size(
batch_size: usize, batch_size: usize,
) -> Result<Vec<Vec<f64>>> { ) -> Result<Vec<Vec<f64>>> {
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let model = local_embedding_engine()?;
let prepared = inputs let prepared = inputs
.iter() .iter()
.map(|item| prepare_embedding_text(kind, item)) .map(|item| prepare_embedding_text(kind, item))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut guard = model.lock().map_err(|_| { with_local_embedding_engine(|model| {
Error::BadRequest("本地 embedding 模型当前不可用,请稍后重试".to_string()) let embeddings = model
})?;
let embeddings = guard
.embed(prepared, Some(batch_size.max(1))) .embed(prepared, Some(batch_size.max(1)))
.map_err(|error| Error::BadRequest(format!("本地 embedding 生成失败: {error}")))?; .map_err(|error| Error::BadRequest(format!("本地 embedding 生成失败: {error}")))?;
@@ -812,6 +878,7 @@ async fn embed_texts_locally_with_batch_size(
.map(|embedding| embedding.into_iter().map(f64::from).collect::<Vec<_>>()) .map(|embedding| embedding.into_iter().map(f64::from).collect::<Vec<_>>())
.collect::<Vec<_>>()) .collect::<Vec<_>>())
}) })
})
.await .await
.map_err(|error| Error::BadRequest(format!("本地 embedding 任务执行失败: {error}")))? .map_err(|error| Error::BadRequest(format!("本地 embedding 任务执行失败: {error}")))?
} }