feat: add tokio-stream dependency and refactor event loop handling in EventDrivenProxyManager

This commit is contained in:
Tunglies
2025-08-22 03:16:59 +08:00
parent 7d5fd295ed
commit 02f67961a9
3 changed files with 22 additions and 26 deletions

1
src-tauri/Cargo.lock generated
View File

@@ -1150,6 +1150,7 @@ dependencies = [
"tauri-plugin-updater", "tauri-plugin-updater",
"tauri-plugin-window-state", "tauri-plugin-window-state",
"tokio", "tokio",
"tokio-stream",
"users", "users",
"warp", "warp",
"winapi", "winapi",

View File

@@ -77,6 +77,7 @@ kode-bridge = "0.2.1-rc1"
dashmap = "6.1.0" dashmap = "6.1.0"
tauri-plugin-notification = "2.3.0" tauri-plugin-notification = "2.3.0"
console-subscriber = { version = "0.4.1", optional = true } console-subscriber = { version = "0.4.1", optional = true }
tokio-stream = "0.1.17"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
runas = "=1.2.0" runas = "=1.2.0"

View File

@@ -2,6 +2,7 @@ use parking_lot::RwLock;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, timeout, Duration}; use tokio::time::{sleep, timeout, Duration};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use crate::config::{Config, IVerge}; use crate::config::{Config, IVerge};
use crate::core::async_proxy_query::AsyncProxyQuery; use crate::core::async_proxy_query::AsyncProxyQuery;
@@ -74,7 +75,7 @@ pub struct EventDrivenProxyManager {
} }
#[derive(Debug)] #[derive(Debug)]
struct QueryRequest { pub struct QueryRequest {
response_tx: oneshot::Sender<Autoproxy>, response_tx: oneshot::Sender<Autoproxy>,
} }
@@ -170,39 +171,32 @@ impl EventDrivenProxyManager {
} }
} }
fn start_event_loop( pub fn start_event_loop(
state: Arc<RwLock<ProxyState>>, state: Arc<RwLock<ProxyState>>,
mut event_rx: mpsc::UnboundedReceiver<ProxyEvent>, event_rx: mpsc::UnboundedReceiver<ProxyEvent>,
mut query_rx: mpsc::UnboundedReceiver<QueryRequest>, query_rx: mpsc::UnboundedReceiver<QueryRequest>,
) { ) {
tokio::spawn(async move { tokio::spawn(async move {
log::info!(target: "app", "事件驱动代理管理器启动"); log::info!(target: "app", "事件驱动代理管理器启动");
// 将 mpsc 接收器包装成 Stream避免每次循环创建 future
let mut event_stream = UnboundedReceiverStream::new(event_rx);
let mut query_stream = UnboundedReceiverStream::new(query_rx);
loop { loop {
tokio::select! { tokio::select! {
event = event_rx.recv() => { Some(event) = event_stream.next() => {
match event { log::debug!(target: "app", "处理代理事件: {event:?}");
Some(event) => { Self::handle_event(&state, event).await;
log::debug!(target: "app", "处理代理事件: {event:?}");
Self::handle_event(&state, event).await;
}
None => {
log::info!(target: "app", "事件通道关闭,代理管理器停止");
break;
}
}
} }
query = query_rx.recv() => { Some(query) = query_stream.next() => {
match query { let result = Self::handle_query(&state).await;
Some(query) => { let _ = query.response_tx.send(result);
let result = Self::handle_query(&state).await; }
let _ = query.response_tx.send(result); else => {
} // 两个通道都关闭时退出
None => { log::info!(target: "app", "事件或查询通道关闭,代理管理器停止");
log::info!(target: "app", "查询通道关闭"); break;
break;
}
}
} }
} }
} }