refactor: move event dispatch logic into notifier thread to improve efficiency

This commit is contained in:
wonfen
2025-05-17 19:42:43 +08:00
parent 305a64c3e3
commit 8448217bd4

View File

@@ -87,10 +87,13 @@ impl NotificationSystem {
match rx.recv_timeout(Duration::from_millis(100)) { match rx.recv_timeout(Duration::from_millis(100)) {
Ok(event) => { Ok(event) => {
let system_guard = handle.notification_system.read(); let system_guard = handle.notification_system.read();
let is_emergency = system_guard if system_guard.as_ref().is_none() {
.as_ref() log::warn!("NotificationSystem not found in handle while processing event.");
.map(|sys| *sys.emergency_mode.read()) continue;
.unwrap_or(false); }
let system = system_guard.as_ref().unwrap();
let is_emergency = *system.emergency_mode.read();
if is_emergency { if is_emergency {
if let FrontendEvent::NoticeMessage { ref status, .. } = event { if let FrontendEvent::NoticeMessage { ref status, .. } = event {
@@ -104,37 +107,55 @@ impl NotificationSystem {
} }
if let Some(window) = handle.get_window() { if let Some(window) = handle.get_window() {
match event { *system.last_emit_time.write() = Instant::now();
let (event_name_str, payload_result) = match event {
FrontendEvent::RefreshClash => { FrontendEvent::RefreshClash => {
Self::emit_with_timeout( ("verge://refresh-clash-config", Ok(serde_json::json!("yes")))
&window,
"verge://refresh-clash-config",
"yes",
handle,
);
} }
FrontendEvent::RefreshVerge => { FrontendEvent::RefreshVerge => {
Self::emit_with_timeout( ("verge://refresh-verge-config", Ok(serde_json::json!("yes")))
&window,
"verge://refresh-verge-config",
"yes",
handle,
);
} }
FrontendEvent::NoticeMessage { FrontendEvent::NoticeMessage { status, message } => {
ref status, match serde_json::to_value((status, message)) {
ref message, Ok(p) => ("verge://notice-message", Ok(p)),
} => { Err(e) => {
Self::emit_with_timeout( log::error!("Failed to serialize NoticeMessage payload: {}", e);
&window, ("verge://notice-message", Err(e))
"verge://notice-message", }
(status.clone(), message.clone()), }
handle,
);
} }
} };
}
if let Ok(payload) = payload_result {
match window.emit(event_name_str, payload) {
Ok(_) => {
system.stats.total_sent.fetch_add(1, Ordering::SeqCst);
}
Err(e) => {
log::warn!("Failed to emit event {}: {}", event_name_str, e);
system.stats.total_errors.fetch_add(1, Ordering::SeqCst);
*system.stats.last_error_time.write() = Some(Instant::now());
let errors = system.stats.total_errors.load(Ordering::SeqCst);
const EMIT_ERROR_THRESHOLD: u64 = 10;
if errors > EMIT_ERROR_THRESHOLD && !*system.emergency_mode.read() {
log::warn!(
"Reached {} emit errors, entering emergency mode",
EMIT_ERROR_THRESHOLD
);
*system.emergency_mode.write() = true;
}
}
}
} else {
system.stats.total_errors.fetch_add(1, Ordering::SeqCst);
*system.stats.last_error_time.write() = Some(Instant::now());
log::warn!("Skipped emitting event due to payload serialization error for {}", event_name_str);
}
} else {
log::warn!("No window found, skipping event emit.");
}
thread::sleep(Duration::from_millis(20)); thread::sleep(Duration::from_millis(20));
} }
Err(mpsc::RecvTimeoutError::Timeout) => { Err(mpsc::RecvTimeoutError::Timeout) => {
@@ -155,63 +176,6 @@ impl NotificationSystem {
); );
} }
/// 使用超时控制发送事件,防止无限阻塞
fn emit_with_timeout<P: serde::Serialize + Clone + Send + 'static>(
window: &WebviewWindow,
event: &str,
payload: P,
handle: &Handle,
) {
let start = Instant::now();
let system_guard = handle.notification_system.read();
if let Some(system) = system_guard.as_ref() {
*system.last_emit_time.write() = start;
let window_label = window.label().to_string();
let event_clone = event.to_string();
let app_handle_clone = match handle.app_handle() {
Some(h) => h,
None => return,
};
let (tx, rx) = mpsc::channel();
let _ = thread::Builder::new()
.name("emit-timeout".into())
.spawn(move || {
if let Some(win) = app_handle_clone.get_webview_window(&window_label) {
let result = win.emit(&event_clone, payload);
let _ = tx.send(result);
} else {
let _ = tx.send(Err(tauri::Error::WebviewNotFound));
}
});
match rx.recv_timeout(Duration::from_millis(500)) {
Ok(result) => {
if let Err(e) = result {
log::warn!("Failed to emit event {}: {}", event, e);
system.stats.total_errors.fetch_add(1, Ordering::SeqCst);
*system.stats.last_error_time.write() = Some(Instant::now());
} else {
system.stats.total_sent.fetch_add(1, Ordering::SeqCst);
}
}
Err(_) => {
log::error!("Emit timed out for event: {}", event);
system.stats.total_errors.fetch_add(1, Ordering::SeqCst);
*system.stats.last_error_time.write() = Some(Instant::now());
let errors = system.stats.total_errors.load(Ordering::SeqCst);
if errors > 5 {
log::warn!("Too many emit errors, entering emergency mode");
*system.emergency_mode.write() = true;
}
}
}
}
}
/// 发送事件到队列 /// 发送事件到队列
fn send_event(&self, event: FrontendEvent) -> bool { fn send_event(&self, event: FrontendEvent) -> bool {
if *self.emergency_mode.read() { if *self.emergency_mode.read() {