From 02f67961a97e5505fd74d01bbb7497e3d8f014f4 Mon Sep 17 00:00:00 2001 From: Tunglies <77394545+Tunglies@users.noreply.github.com> Date: Fri, 22 Aug 2025 03:16:59 +0800 Subject: [PATCH] feat: add tokio-stream dependency and refactor event loop handling in EventDrivenProxyManager --- src-tauri/Cargo.lock | 1 + src-tauri/Cargo.toml | 1 + src-tauri/src/core/event_driven_proxy.rs | 46 +++++++++++------------- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 48efcd22b..93db77839 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -1150,6 +1150,7 @@ dependencies = [ "tauri-plugin-updater", "tauri-plugin-window-state", "tokio", + "tokio-stream", "users", "warp", "winapi", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 39760b04e..f50006002 100755 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -77,6 +77,7 @@ kode-bridge = "0.2.1-rc1" dashmap = "6.1.0" tauri-plugin-notification = "2.3.0" console-subscriber = { version = "0.4.1", optional = true } +tokio-stream = "0.1.17" [target.'cfg(windows)'.dependencies] runas = "=1.2.0" diff --git a/src-tauri/src/core/event_driven_proxy.rs b/src-tauri/src/core/event_driven_proxy.rs index 4ad852470..c6db3673c 100644 --- a/src-tauri/src/core/event_driven_proxy.rs +++ b/src-tauri/src/core/event_driven_proxy.rs @@ -2,6 +2,7 @@ use parking_lot::RwLock; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio::time::{sleep, timeout, Duration}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use crate::config::{Config, IVerge}; use crate::core::async_proxy_query::AsyncProxyQuery; @@ -74,7 +75,7 @@ pub struct EventDrivenProxyManager { } #[derive(Debug)] -struct QueryRequest { +pub struct QueryRequest { response_tx: oneshot::Sender, } @@ -170,39 +171,32 @@ impl EventDrivenProxyManager { } } - fn start_event_loop( + pub fn start_event_loop( state: Arc>, - mut event_rx: mpsc::UnboundedReceiver, - mut query_rx: mpsc::UnboundedReceiver, + event_rx: mpsc::UnboundedReceiver, + query_rx: mpsc::UnboundedReceiver, ) { tokio::spawn(async move { log::info!(target: "app", "事件驱动代理管理器启动"); + // 将 mpsc 接收器包装成 Stream,避免每次循环创建 future + let mut event_stream = UnboundedReceiverStream::new(event_rx); + let mut query_stream = UnboundedReceiverStream::new(query_rx); + loop { tokio::select! { - event = event_rx.recv() => { - match event { - Some(event) => { - log::debug!(target: "app", "处理代理事件: {event:?}"); - Self::handle_event(&state, event).await; - } - None => { - log::info!(target: "app", "事件通道关闭,代理管理器停止"); - break; - } - } + Some(event) = event_stream.next() => { + log::debug!(target: "app", "处理代理事件: {event:?}"); + Self::handle_event(&state, event).await; } - query = query_rx.recv() => { - match query { - Some(query) => { - let result = Self::handle_query(&state).await; - let _ = query.response_tx.send(result); - } - None => { - log::info!(target: "app", "查询通道关闭"); - break; - } - } + Some(query) = query_stream.next() => { + let result = Self::handle_query(&state).await; + let _ = query.response_tx.send(result); + } + else => { + // 两个通道都关闭时退出 + log::info!(target: "app", "事件或查询通道关闭,代理管理器停止"); + break; } } }