refactor(notification): streamline notification system by removing unused timing constants and simplifying event handling

This commit is contained in:
Tunglies
2025-12-30 20:23:35 +08:00
parent 9ce343fb45
commit 1ea707699c
4 changed files with 88 additions and 234 deletions

View File

@@ -1,7 +1,7 @@
use super::{IClashTemp, IProfiles, IVerge};
use crate::{
config::{PrfItem, profiles_append_item_safe},
constants::{files, timing},
constants::files,
core::{
CoreManager,
handle::{self, Handle},
@@ -21,7 +21,6 @@ use smartstring::alias::String;
use std::path::PathBuf;
use tauri_plugin_clash_verge_sysinfo::is_current_app_handle_admin;
use tokio::sync::OnceCell;
use tokio::time::sleep;
pub struct Config {
clash_config: Draft<IClashTemp>,
@@ -88,7 +87,6 @@ impl Config {
let validation_result = Self::generate_and_validate().await?;
if let Some((msg_type, msg_content)) = validation_result {
sleep(timing::STARTUP_ERROR_DELAY).await;
handle::Handle::notice_message(msg_type, msg_content);
}

View File

@@ -23,9 +23,6 @@ pub mod timing {
use super::Duration;
pub const CONFIG_UPDATE_DEBOUNCE: Duration = Duration::from_millis(300);
pub const EVENT_EMIT_DELAY: Duration = Duration::from_millis(20);
pub const STARTUP_ERROR_DELAY: Duration = Duration::from_secs(2);
pub const ERROR_BATCH_DELAY: Duration = Duration::from_millis(300);
#[cfg(target_os = "windows")]
pub const SERVICE_WAIT_MAX: Duration = Duration::from_millis(3000);
@@ -33,10 +30,6 @@ pub mod timing {
pub const SERVICE_WAIT_INTERVAL: Duration = Duration::from_millis(200);
}
pub mod retry {
pub const EVENT_EMIT_THRESHOLD: u64 = 10;
}
pub mod files {
pub const RUNTIME_CONFIG: &str = "clash-verge.yaml";
pub const CHECK_CONFIG: &str = "clash-verge-check.yaml";

View File

@@ -1,12 +1,10 @@
use crate::{APP_HANDLE, constants::timing, singleton};
use crate::{APP_HANDLE, singleton};
use arc_swap::ArcSwap;
use parking_lot::RwLock;
use smartstring::alias::String;
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use tauri::{AppHandle, Manager as _, WebviewWindow};
use tauri_plugin_mihomo::{Mihomo, MihomoExt as _};
@@ -14,23 +12,12 @@ use tokio::sync::RwLockReadGuard;
use super::notification::{ErrorMessage, FrontendEvent, NotificationSystem};
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct Handle {
is_exiting: AtomicBool,
startup_errors: Arc<RwLock<Vec<ErrorMessage>>>,
startup_completed: AtomicBool,
pub(crate) notification_system: Arc<RwLock<Option<NotificationSystem>>>,
}
impl Default for Handle {
fn default() -> Self {
Self {
is_exiting: AtomicBool::new(false),
startup_errors: Arc::new(RwLock::new(Vec::new())),
startup_completed: AtomicBool::new(false),
notification_system: Arc::new(RwLock::new(Some(NotificationSystem::new()))),
}
}
startup_errors: Arc<RwLock<Vec<ErrorMessage>>>,
pub(super) notification_system: ArcSwap<NotificationSystem>,
}
singleton!(Handle, HANDLE);
@@ -44,13 +31,7 @@ impl Handle {
if self.is_exiting() {
return;
}
let mut system_opt = self.notification_system.write();
if let Some(system) = system_opt.as_mut()
&& !system.is_running
{
system.start();
}
self.notification_system.load().start();
}
pub fn app_handle() -> &'static AppHandle {
@@ -67,27 +48,11 @@ impl Handle {
}
pub fn refresh_clash() {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::RefreshClash);
}
Self::send_event(FrontendEvent::RefreshClash);
}
pub fn refresh_verge() {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::RefreshVerge);
}
Self::send_event(FrontendEvent::RefreshVerge);
}
pub fn notify_profile_changed(profile_id: String) {
@@ -137,11 +102,7 @@ impl Handle {
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(event);
}
handle.notification_system.load().send_event(event);
}
pub fn mark_startup_completed(&self) {
@@ -150,50 +111,18 @@ impl Handle {
}
fn send_startup_errors(&self) {
let errors = {
let mut errors = self.startup_errors.write();
std::mem::take(&mut *errors)
};
if errors.is_empty() {
return;
}
let _ = thread::Builder::new()
.name("startup-errors-sender".into())
.spawn(move || {
thread::sleep(timing::STARTUP_ERROR_DELAY);
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
for error in errors {
if handle.is_exiting() {
break;
}
system.send_event(FrontendEvent::NoticeMessage {
status: error.status,
message: error.message,
});
thread::sleep(timing::ERROR_BATCH_DELAY);
}
}
let errors = std::mem::take(&mut *self.startup_errors.write());
for error in errors {
Self::send_event(FrontendEvent::NoticeMessage {
status: error.status,
message: error.message,
});
}
}
pub fn set_is_exiting(&self) {
self.is_exiting.store(true, Ordering::Release);
let mut system_opt = self.notification_system.write();
if let Some(system) = system_opt.as_mut() {
system.shutdown();
}
self.notification_system.load().shutdown();
}
pub fn is_exiting(&self) -> bool {

View File

@@ -1,17 +1,15 @@
use std::sync::atomic::{AtomicBool, Ordering};
use super::handle::Handle;
use crate::constants::{retry, timing};
use clash_verge_logging::{Type, logging};
use parking_lot::RwLock;
use parking_lot::Mutex;
use serde_json::json;
use smartstring::alias::String;
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc,
},
thread,
time::Instant,
use tauri::{
Emitter as _, WebviewWindow,
async_runtime::{Receiver, Sender, channel},
};
use tauri::{Emitter as _, WebviewWindow};
use tokio::task::JoinHandle;
// TODO 重构或优化,避免 Clone 过多
#[derive(Debug, Clone)]
@@ -25,129 +23,100 @@ pub enum FrontendEvent {
ProfileUpdateCompleted { uid: String },
}
#[derive(Debug, Default)]
struct EventStats {
total_sent: AtomicU64,
total_errors: AtomicU64,
last_error_time: RwLock<Option<Instant>>,
}
#[derive(Debug, Clone)]
pub struct ErrorMessage {
pub status: String,
pub message: String,
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct NotificationSystem {
sender: Option<mpsc::Sender<FrontendEvent>>,
#[allow(clippy::type_complexity)]
worker_handle: Option<thread::JoinHandle<()>>,
pub(super) is_running: bool,
stats: EventStats,
emergency_mode: AtomicBool,
sender: Mutex<Option<Sender<FrontendEvent>>>,
worker_task: Mutex<Option<JoinHandle<()>>>,
pub(super) is_running: AtomicBool,
}
impl Default for NotificationSystem {
fn default() -> Self {
Self::new()
impl NotificationSystem {
pub fn start(&self) {
if self
.is_running
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
.is_err()
{
return;
}
let (tx, rx) = channel(32);
*self.sender.lock() = Some(tx);
#[allow(clippy::expect_used)]
let rt = tokio::runtime::Builder::new_current_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("Failed to build runtime for notification system");
let task = rt.spawn(async move {
Self::worker_loop(rx).await;
});
*self.worker_task.lock() = Some(task);
}
pub fn shutdown(&self) {
if self
.is_running
.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed)
.is_err()
{
return;
}
self.sender.lock().take();
let value = self.worker_task.lock().take();
if let Some(task) = value {
task.abort();
}
}
pub fn send_event(&self, event: FrontendEvent) -> bool {
if let Some(sender) = &*self.sender.lock() {
sender.try_send(event).is_ok()
} else {
false
}
}
}
impl NotificationSystem {
pub fn new() -> Self {
Self {
sender: None,
worker_handle: None,
is_running: false,
stats: EventStats::default(),
emergency_mode: AtomicBool::new(false),
}
}
pub fn start(&mut self) {
if self.is_running {
return;
}
let (tx, rx) = mpsc::channel();
self.sender = Some(tx);
self.is_running = true;
let result = thread::Builder::new()
.name("frontend-notifier".into())
.spawn(move || Self::worker_loop(rx));
match result {
Ok(handle) => self.worker_handle = Some(handle),
Err(e) => logging!(error, Type::System, "Failed to start notification worker: {}", e),
}
}
fn worker_loop(rx: mpsc::Receiver<FrontendEvent>) {
async fn worker_loop(mut rx: Receiver<FrontendEvent>) {
let handle = Handle::global();
loop {
while let Some(event) = rx.recv().await {
if handle.is_exiting() {
break;
}
match rx.recv() {
Ok(event) => Self::process_event(handle, event),
Err(e) => {
logging!(error, Type::System, "Notification System will exit, recv error: {}", e);
break;
}
}
Self::process_event_sync(handle, event);
}
}
fn process_event(handle: &super::handle::Handle, event: FrontendEvent) {
let binding = handle.notification_system.read();
let system = match binding.as_ref() {
Some(s) => s,
None => return,
};
if system.should_skip_event(&event) {
return;
}
fn process_event_sync(handle: &super::handle::Handle, event: FrontendEvent) {
let system = handle.notification_system.load();
if let Some(window) = super::handle::Handle::get_window() {
system.emit_to_window(&window, event);
drop(binding);
thread::sleep(timing::EVENT_EMIT_DELAY);
}
}
fn should_skip_event(&self, event: &FrontendEvent) -> bool {
let is_emergency = self.emergency_mode.load(Ordering::Acquire);
matches!(
(is_emergency, event),
(true, FrontendEvent::NoticeMessage { status, .. }) if status == "info"
)
}
fn emit_to_window(&self, window: &WebviewWindow, event: FrontendEvent) {
let (event_name, payload) = self.serialize_event(event);
let Ok(payload) = payload else {
self.stats.total_errors.fetch_add(1, Ordering::Relaxed);
return;
};
match window.emit(event_name, payload) {
Ok(_) => {
self.stats.total_sent.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
logging!(warn, Type::Frontend, "Event emit failed: {}", e);
self.handle_emit_error();
}
if let Err(e) = window.emit(event_name, payload) {
logging!(warn, Type::Frontend, "Event emit failed: {}", e);
}
}
fn serialize_event(&self, event: FrontendEvent) -> (&'static str, Result<serde_json::Value, serde_json::Error>) {
use serde_json::json;
match event {
FrontendEvent::RefreshClash => ("verge://refresh-clash-config", Ok(json!("yes"))),
FrontendEvent::RefreshVerge => ("verge://refresh-verge-config", Ok(json!("yes"))),
@@ -160,39 +129,4 @@ impl NotificationSystem {
FrontendEvent::ProfileUpdateCompleted { uid } => ("profile-update-completed", Ok(json!({ "uid": uid }))),
}
}
fn handle_emit_error(&self) {
self.stats.total_errors.fetch_add(1, Ordering::Relaxed);
*self.stats.last_error_time.write() = Some(Instant::now());
let errors = self.stats.total_errors.load(Ordering::Relaxed);
if errors > retry::EVENT_EMIT_THRESHOLD && !self.emergency_mode.load(Ordering::Acquire) {
logging!(warn, Type::Frontend, "Entering emergency mode after {} errors", errors);
self.emergency_mode.store(true, Ordering::Release);
}
}
pub fn send_event(&self, event: FrontendEvent) -> bool {
if self.should_skip_event(&event) {
return false;
}
if let Some(sender) = &self.sender {
sender.send(event).is_ok()
} else {
false
}
}
pub fn shutdown(&mut self) {
self.is_running = false;
if let Some(sender) = self.sender.take() {
drop(sender);
}
if let Some(handle) = self.worker_handle.take() {
let _ = handle.join();
}
}
}