refactor: streamline SWR configuration and improve error handling in AppDataProvider

This commit is contained in:
xmk23333
2025-10-21 17:51:12 +08:00
parent bafe2ae164
commit 0e933597f5
18 changed files with 1139 additions and 1383 deletions

View File

@@ -47,30 +47,32 @@ impl IClashTemp {
}
pub fn template() -> Self {
use crate::constants::{network, tun as tun_const};
let mut map = Mapping::new();
let mut tun = Mapping::new();
let mut tun_config = Mapping::new();
let mut cors_map = Mapping::new();
tun.insert("enable".into(), false.into());
#[cfg(target_os = "linux")]
tun.insert("stack".into(), "mixed".into());
#[cfg(not(target_os = "linux"))]
tun.insert("stack".into(), "gvisor".into());
tun.insert("auto-route".into(), true.into());
tun.insert("strict-route".into(), false.into());
tun.insert("auto-detect-interface".into(), true.into());
tun.insert("dns-hijack".into(), vec!["any:53"].into());
tun_config.insert("enable".into(), false.into());
tun_config.insert("stack".into(), tun_const::DEFAULT_STACK.into());
tun_config.insert("auto-route".into(), true.into());
tun_config.insert("strict-route".into(), false.into());
tun_config.insert("auto-detect-interface".into(), true.into());
tun_config.insert("dns-hijack".into(), tun_const::DNS_HIJACK.into());
#[cfg(not(target_os = "windows"))]
map.insert("redir-port".into(), 7895.into());
map.insert("redir-port".into(), network::ports::DEFAULT_REDIR.into());
#[cfg(target_os = "linux")]
map.insert("tproxy-port".into(), 7896.into());
map.insert("mixed-port".into(), 7897.into());
map.insert("socks-port".into(), 7898.into());
map.insert("port".into(), 7899.into());
map.insert("tproxy-port".into(), network::ports::DEFAULT_TPROXY.into());
map.insert("mixed-port".into(), network::ports::DEFAULT_MIXED.into());
map.insert("socks-port".into(), network::ports::DEFAULT_SOCKS.into());
map.insert("port".into(), network::ports::DEFAULT_HTTP.into());
map.insert("log-level".into(), "info".into());
map.insert("allow-lan".into(), false.into());
map.insert("ipv6".into(), true.into());
map.insert("mode".into(), "rule".into());
map.insert("external-controller".into(), "127.0.0.1:9097".into());
map.insert("external-controller".into(), network::DEFAULT_EXTERNAL_CONTROLLER.into());
#[cfg(unix)]
map.insert(
"external-controller-unix".into(),
@@ -81,9 +83,9 @@ impl IClashTemp {
"external-controller-pipe".into(),
Self::guard_external_controller_ipc().into(),
);
map.insert("tun".into(), tun_config.into());
cors_map.insert("allow-private-network".into(), true.into());
cors_map.insert(
"allow-origins".into(),
cors_map.insert("allow-origins".into(),
vec![
"tauri://localhost",
"http://tauri.localhost",
@@ -97,7 +99,6 @@ impl IClashTemp {
.into(),
);
map.insert("secret".into(), "set-your-secret".into());
map.insert("tun".into(), tun.into());
map.insert("external-controller-cors".into(), cors_map.into());
map.insert("unified-delay".into(), true.into());
Self(map)
@@ -325,8 +326,8 @@ impl IClashTemp {
.ok()
.and_then(|path| path_to_str(&path).ok().map(|s| s.into()))
.unwrap_or_else(|| {
log::error!(target: "app", "Failed to get IPC path, using default");
"127.0.0.1:9090".into()
log::error!(target: "app", "Failed to get IPC path");
crate::constants::network::DEFAULT_EXTERNAL_CONTROLLER.into()
})
}
}

View File

@@ -1,6 +1,7 @@
use super::{IClashTemp, IProfiles, IRuntime, IVerge};
use crate::{
config::{PrfItem, profiles_append_item_safe},
constants::{files, timing},
core::{CoreManager, handle, validate::CoreConfigValidator},
enhance, logging,
utils::{Draft, dirs, help, logging::Type},
@@ -8,13 +9,9 @@ use crate::{
use anyhow::{Result, anyhow};
use backoff::{Error as BackoffError, ExponentialBackoff};
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::OnceCell;
use tokio::time::sleep;
pub const RUNTIME_CONFIG: &str = "clash-verge.yaml";
pub const CHECK_CONFIG: &str = "clash-verge-check.yaml";
pub struct Config {
clash_config: Draft<Box<IClashTemp>>,
verge_config: Draft<Box<IVerge>>,
@@ -123,20 +120,18 @@ impl Config {
Some(("config_validate::error", String::new()))
};
// 在单独的任务中发送通知
if let Some((msg_type, msg_content)) = validation_result {
sleep(Duration::from_secs(2)).await;
sleep(timing::STARTUP_ERROR_DELAY).await;
handle::Handle::notice_message(msg_type, &msg_content);
}
Ok(())
}
/// 将订阅丢到对应的文件中
pub async fn generate_file(typ: ConfigType) -> Result<PathBuf> {
let path = match typ {
ConfigType::Run => dirs::app_home_dir()?.join(RUNTIME_CONFIG),
ConfigType::Check => dirs::app_home_dir()?.join(CHECK_CONFIG),
ConfigType::Run => dirs::app_home_dir()?.join(files::RUNTIME_CONFIG),
ConfigType::Check => dirs::app_home_dir()?.join(files::CHECK_CONFIG),
};
let runtime = Config::runtime().await;
@@ -152,7 +147,6 @@ impl Config {
Ok(path)
}
/// 生成订阅存好
pub async fn generate() -> Result<()> {
let (config, exists_keys, logs) = enhance::enhance().await;
@@ -166,8 +160,6 @@ impl Config {
}
pub async fn verify_config_initialization() {
logging!(info, Type::Setup, "Verifying config initialization...");
let backoff_strategy = ExponentialBackoff {
initial_interval: std::time::Duration::from_millis(100),
max_interval: std::time::Duration::from_secs(2),
@@ -178,48 +170,15 @@ impl Config {
let operation = || async {
if Config::runtime().await.latest_ref().config.is_some() {
logging!(
info,
Type::Setup,
"Config initialization verified successfully"
);
return Ok::<(), BackoffError<anyhow::Error>>(());
}
logging!(
warn,
Type::Setup,
"Runtime config not found, attempting to regenerate..."
);
match Config::generate().await {
Ok(_) => {
logging!(info, Type::Setup, "Config successfully regenerated");
Ok(())
}
Err(e) => {
logging!(warn, Type::Setup, "Failed to generate config: {}", e);
Err(BackoffError::transient(e))
}
}
Config::generate().await
.map_err(BackoffError::transient)
};
match backoff::future::retry(backoff_strategy, operation).await {
Ok(_) => {
logging!(
info,
Type::Setup,
"Config initialization verified with backoff retry"
);
}
Err(e) => {
logging!(
error,
Type::Setup,
"Failed to verify config initialization after retries: {}",
e
);
}
if let Err(e) = backoff::future::retry(backoff_strategy, operation).await {
logging!(error, Type::Setup, "Config init verification failed: {}", e);
}
}
}

View File

@@ -513,13 +513,8 @@ impl IVerge {
patch!(enable_external_controller);
}
/// 在初始化前尝试拿到单例端口的值
pub fn get_singleton_port() -> u16 {
#[cfg(not(feature = "verge-dev"))]
const SERVER_PORT: u16 = 33331;
#[cfg(feature = "verge-dev")]
const SERVER_PORT: u16 = 11233;
SERVER_PORT
crate::constants::network::ports::SINGLETON_SERVER
}
/// 获取日志等级

107
src-tauri/src/constants.rs Normal file
View File

@@ -0,0 +1,107 @@
use std::time::Duration;
pub mod network {
pub const DEFAULT_PROXY_HOST: &str = "127.0.0.1";
pub const DEFAULT_EXTERNAL_CONTROLLER: &str = "127.0.0.1:9097";
pub mod ports {
#[allow(dead_code)]
pub const DEFAULT_REDIR: u16 = 7895;
#[allow(dead_code)]
pub const DEFAULT_TPROXY: u16 = 7896;
pub const DEFAULT_MIXED: u16 = 7897;
pub const DEFAULT_SOCKS: u16 = 7898;
pub const DEFAULT_HTTP: u16 = 7899;
#[allow(dead_code)]
pub const DEFAULT_EXTERNAL_CONTROLLER: u16 = 9097;
#[cfg(not(feature = "verge-dev"))]
pub const SINGLETON_SERVER: u16 = 33331;
#[cfg(feature = "verge-dev")]
pub const SINGLETON_SERVER: u16 = 11233;
}
}
pub mod bypass {
#[cfg(target_os = "windows")]
pub const DEFAULT: &str = "localhost;127.*;192.168.*;10.*;172.16.*;172.17.*;172.18.*;172.19.*;172.20.*;172.21.*;172.22.*;172.23.*;172.24.*;172.25.*;172.26.*;172.27.*;172.28.*;172.29.*;172.30.*;172.31.*;<local>";
#[cfg(target_os = "linux")]
pub const DEFAULT: &str = "localhost,127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,::1";
#[cfg(target_os = "macos")]
pub const DEFAULT: &str = "127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,localhost,*.local,*.crashlytics.com,<local>";
}
pub mod timing {
use super::Duration;
pub const CONFIG_UPDATE_DEBOUNCE: Duration = Duration::from_millis(500);
pub const CONFIG_RELOAD_DELAY: Duration = Duration::from_millis(300);
pub const PROCESS_VERIFY_DELAY: Duration = Duration::from_millis(100);
#[allow(dead_code)]
pub const EVENT_EMIT_DELAY: Duration = Duration::from_millis(20);
pub const STARTUP_ERROR_DELAY: Duration = Duration::from_secs(2);
#[allow(dead_code)]
pub const ERROR_BATCH_DELAY: Duration = Duration::from_millis(300);
#[cfg(target_os = "windows")]
pub const SERVICE_WAIT_MAX: Duration = Duration::from_millis(3000);
#[cfg(target_os = "windows")]
pub const SERVICE_WAIT_INTERVAL: Duration = Duration::from_millis(200);
}
pub mod retry {
#[allow(dead_code)]
pub const EVENT_EMIT_THRESHOLD: u64 = 10;
#[allow(dead_code)]
pub const SWR_ERROR_RETRY: usize = 2;
}
pub mod files {
pub const RUNTIME_CONFIG: &str = "clash-verge.yaml";
pub const CHECK_CONFIG: &str = "clash-verge-check.yaml";
#[allow(dead_code)]
pub const DNS_CONFIG: &str = "dns_config.yaml";
#[allow(dead_code)]
pub const WINDOW_STATE: &str = "window_state.json";
}
pub mod process {
pub const VERGE_MIHOMO: &str = "verge-mihomo";
pub const VERGE_MIHOMO_ALPHA: &str = "verge-mihomo-alpha";
pub fn process_names() -> [&'static str; 2] {
[VERGE_MIHOMO, VERGE_MIHOMO_ALPHA]
}
#[cfg(windows)]
pub fn with_extension(name: &str) -> String {
format!("{}.exe", name)
}
#[cfg(not(windows))]
pub fn with_extension(name: &str) -> String {
name.to_string()
}
}
pub mod error_patterns {
pub const CONNECTION_ERRORS: &[&str] = &[
"Failed to create connection",
"The system cannot find the file specified",
"operation timed out",
"connection refused",
];
}
pub mod tun {
#[cfg(target_os = "linux")]
pub const DEFAULT_STACK: &str = "mixed";
#[cfg(not(target_os = "linux"))]
pub const DEFAULT_STACK: &str = "gvisor";
pub const DNS_HIJACK: &[&str] = &["any:53"];
}

View File

@@ -1,837 +0,0 @@
use crate::AsyncHandler;
use crate::core::logger::ClashLogger;
use crate::core::validate::CoreConfigValidator;
use crate::process::CommandChildGuard;
use crate::utils::init::sidecar_writer;
use crate::utils::logging::{SharedWriter, write_sidecar_log};
use crate::{
config::*,
core::{
handle,
service::{self, SERVICE_MANAGER, ServiceStatus},
},
logging, logging_error, singleton_lazy,
utils::{
dirs,
help::{self},
logging::Type,
},
};
use anyhow::{Result, anyhow};
#[cfg(target_os = "windows")]
use backoff::backoff::Backoff;
#[cfg(target_os = "windows")]
use backoff::{Error as BackoffError, ExponentialBackoff};
use compact_str::CompactString;
use flexi_logger::DeferredNow;
use log::Level;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::time::Instant;
use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Duration};
use tauri_plugin_mihomo::Error as MihomoError;
use tauri_plugin_shell::ShellExt;
use tokio::sync::Semaphore;
use tokio::time::sleep;
// TODO:
// - 重构,提升模式切换速度
// - 内核启动添加启动 IPC 启动参数, `-ext-ctl-unix` / `-ext-ctl-pipe`, 运行时配置需要删除相关配置项
#[derive(Debug)]
pub struct CoreManager {
running: Arc<Mutex<RunningMode>>,
child_sidecar: Arc<Mutex<Option<CommandChildGuard>>>,
update_semaphore: Arc<Semaphore>,
last_update: Arc<Mutex<Option<Instant>>>,
}
#[derive(Debug, Clone, Copy, serde::Serialize, PartialEq, Eq)]
pub enum RunningMode {
Service,
Sidecar,
NotRunning,
}
impl fmt::Display for RunningMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RunningMode::Service => write!(f, "Service"),
RunningMode::Sidecar => write!(f, "Sidecar"),
RunningMode::NotRunning => write!(f, "NotRunning"),
}
}
}
use crate::config::IVerge;
const CONNECTION_ERROR_PATTERNS: &[&str] = &[
"Failed to create connection",
"The system cannot find the file specified",
"operation timed out",
"connection refused",
];
impl CoreManager {
pub async fn use_default_config(&self, msg_type: &str, msg_content: &str) -> Result<()> {
let runtime_path = dirs::app_home_dir()?.join(RUNTIME_CONFIG);
let clash_config = Config::clash().await.latest_ref().0.clone();
*Config::runtime().await.draft_mut() = Box::new(IRuntime {
config: Some(clash_config.clone()),
exists_keys: vec![],
chain_logs: Default::default(),
});
help::save_yaml(&runtime_path, &clash_config, Some("# Clash Verge Runtime")).await?;
handle::Handle::notice_message(msg_type, msg_content);
Ok(())
}
/// 更新proxies等配置
pub async fn update_config(&self) -> Result<(bool, String)> {
if handle::Handle::global().is_exiting() {
logging!(info, Type::Config, "应用正在退出,跳过验证");
return Ok((true, String::new()));
}
let now = Instant::now();
{
let mut last = self.last_update.lock();
if let Some(last_time) = *last {
if now.duration_since(last_time) < Duration::from_millis(500) {
logging!(debug, Type::Config, "防抖:跳过重复的配置更新请求");
return Ok((true, String::new()));
}
}
*last = Some(now);
}
let permit = match self.update_semaphore.try_acquire() {
Ok(p) => p,
Err(_) => {
logging!(debug, Type::Config, "配置更新已在进行中,跳过");
return Ok((true, String::new()));
}
};
let result = async {
logging!(info, Type::Config, "生成新的配置内容");
Config::generate().await?;
match CoreConfigValidator::global().validate_config().await {
Ok((true, _)) => {
logging!(info, Type::Config, "配置验证通过, 生成运行时配置");
let run_path = Config::generate_file(ConfigType::Run).await?;
self.put_configs_force(run_path).await?;
Ok((true, String::new()))
}
Ok((false, error_msg)) => {
logging!(warn, Type::Config, "配置验证失败: {}", error_msg);
Config::runtime().await.discard();
Ok((false, error_msg))
}
Err(e) => {
logging!(warn, Type::Config, "验证过程发生错误: {}", e);
Config::runtime().await.discard();
Err(e)
}
}
}
.await;
drop(permit);
result
}
pub async fn put_configs_force(&self, path_buf: PathBuf) -> Result<()> {
let run_path_str = dirs::path_to_str(&path_buf).map_err(|e| {
let msg = e.to_string();
logging_error!(Type::Core, "{}", msg);
anyhow!(msg)
})?;
match self.reload_config_once(run_path_str).await {
Ok(_) => {
Config::runtime().await.apply();
logging!(info, Type::Core, "Configuration updated successfully");
Ok(())
}
Err(err) => {
let should_retry = Self::should_restart_on_reload_error(&err);
let err_msg = err.to_string();
if should_retry && !handle::Handle::global().is_exiting() {
logging!(
warn,
Type::Core,
"Reload config failed ({}), restarting core and retrying",
err_msg
);
if let Err(restart_err) = self.restart_core().await {
Config::runtime().await.discard();
logging_error!(
Type::Core,
"Failed to restart core after reload error: {}",
restart_err
);
return Err(restart_err);
}
sleep(Duration::from_millis(300)).await;
match self.reload_config_once(run_path_str).await {
Ok(_) => {
Config::runtime().await.apply();
logging!(
info,
Type::Core,
"Configuration updated successfully after restarting core"
);
return Ok(());
}
Err(retry_err) => {
let retry_msg = retry_err.to_string();
Config::runtime().await.discard();
logging_error!(
Type::Core,
"Failed to update configuration after restart: {}",
retry_msg
);
return Err(anyhow!(retry_msg));
}
}
}
Config::runtime().await.discard();
logging_error!(Type::Core, "Failed to update configuration: {}", err_msg);
Err(anyhow!(err_msg))
}
}
}
async fn reload_config_once(&self, config_path: &str) -> std::result::Result<(), MihomoError> {
handle::Handle::mihomo()
.await
.reload_config(true, config_path)
.await
}
fn should_restart_on_reload_error(err: &MihomoError) -> bool {
fn is_connection_io_error(kind: std::io::ErrorKind) -> bool {
matches!(
kind,
std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::ConnectionRefused
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::NotFound
)
}
fn contains_error_pattern(text: &str) -> bool {
CONNECTION_ERROR_PATTERNS.iter().any(|p| text.contains(p))
}
match err {
MihomoError::ConnectionFailed | MihomoError::ConnectionLost => true,
MihomoError::Io(io_err) => is_connection_io_error(io_err.kind()),
MihomoError::Reqwest(req_err) => {
if req_err.is_connect() || req_err.is_timeout() {
return true;
}
if let Some(source) = req_err.source() {
if let Some(io_err) = source.downcast_ref::<std::io::Error>() {
if is_connection_io_error(io_err.kind()) {
return true;
}
} else if contains_error_pattern(&source.to_string()) {
return true;
}
}
contains_error_pattern(&req_err.to_string())
}
MihomoError::FailedResponse(msg) => contains_error_pattern(msg),
_ => false,
}
}
}
impl CoreManager {
async fn cleanup_orphaned_mihomo_processes(&self) -> Result<()> {
logging!(info, Type::Core, "开始清理多余的 mihomo 进程");
let current_pid = self
.child_sidecar
.lock()
.as_ref()
.and_then(|child| child.pid());
let target_processes = ["verge-mihomo", "verge-mihomo-alpha"];
let process_futures = target_processes.iter().map(|&target| {
let process_name = if cfg!(windows) {
format!("{target}.exe")
} else {
target.to_string()
};
self.find_processes_by_name(process_name, target)
});
let process_results = futures::future::join_all(process_futures).await;
let pids_to_kill: Vec<_> = process_results
.into_iter()
.filter_map(|result| result.ok())
.flat_map(|(pids, process_name)| {
pids.into_iter()
.filter(|&pid| Some(pid) != current_pid)
.map(move |pid| (pid, process_name.clone()))
})
.collect();
if pids_to_kill.is_empty() {
logging!(debug, Type::Core, "未发现多余的 mihomo 进程");
return Ok(());
}
let kill_futures = pids_to_kill
.iter()
.map(|(pid, name)| self.kill_process_with_verification(*pid, name.clone()));
let killed_count = futures::future::join_all(kill_futures)
.await
.into_iter()
.filter(|&success| success)
.count();
if killed_count > 0 {
logging!(
info,
Type::Core,
"清理完成,共终止了 {} 个多余的 mihomo 进程",
killed_count
);
}
Ok(())
}
/// 根据进程名查找进程PID列
async fn find_processes_by_name(
&self,
process_name: String,
_target: &str,
) -> Result<(Vec<u32>, String)> {
#[cfg(windows)]
{
use std::mem;
use winapi::um::handleapi::CloseHandle;
use winapi::um::tlhelp32::{
CreateToolhelp32Snapshot, PROCESSENTRY32W, Process32FirstW, Process32NextW,
TH32CS_SNAPPROCESS,
};
use winapi::um::winnt::HANDLE;
let process_name_clone = process_name.clone();
let pids = AsyncHandler::spawn_blocking(move || -> Result<Vec<u32>> {
let mut pids = Vec::with_capacity(8);
unsafe {
let snapshot: HANDLE = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
return Err(anyhow::anyhow!("Failed to create process snapshot"));
}
let mut pe32: PROCESSENTRY32W = mem::zeroed();
pe32.dwSize = mem::size_of::<PROCESSENTRY32W>() as u32;
if Process32FirstW(snapshot, &mut pe32) != 0 {
loop {
let end_pos = pe32
.szExeFile
.iter()
.position(|&x| x == 0)
.unwrap_or(pe32.szExeFile.len());
if end_pos > 0 {
let exe_file = String::from_utf16_lossy(&pe32.szExeFile[..end_pos]);
if exe_file.eq_ignore_ascii_case(&process_name_clone) {
pids.push(pe32.th32ProcessID);
}
}
if Process32NextW(snapshot, &mut pe32) == 0 {
break;
}
}
}
CloseHandle(snapshot);
}
Ok(pids)
})
.await??;
Ok((pids, process_name))
}
#[cfg(not(windows))]
{
let output = if cfg!(target_os = "macos") {
tokio::process::Command::new("pgrep")
.arg(&process_name)
.output()
.await?
} else {
tokio::process::Command::new("pidof")
.arg(&process_name)
.output()
.await?
};
if !output.status.success() {
return Ok((Vec::new(), process_name));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let pids: Vec<u32> = stdout
.split_whitespace()
.filter_map(|s| s.parse().ok())
.collect();
Ok((pids, process_name))
}
}
async fn kill_process_with_verification(&self, pid: u32, process_name: String) -> bool {
logging!(
info,
Type::Core,
"尝试终止进程: {} (PID: {})",
process_name,
pid
);
#[cfg(windows)]
let success = {
use winapi::um::handleapi::CloseHandle;
use winapi::um::processthreadsapi::{OpenProcess, TerminateProcess};
use winapi::um::winnt::{HANDLE, PROCESS_TERMINATE};
AsyncHandler::spawn_blocking(move || unsafe {
let handle: HANDLE = OpenProcess(PROCESS_TERMINATE, 0, pid);
if handle.is_null() {
return false;
}
let result = TerminateProcess(handle, 1) != 0;
CloseHandle(handle);
result
})
.await
.unwrap_or(false)
};
#[cfg(not(windows))]
let success = tokio::process::Command::new("kill")
.args(["-9", &pid.to_string()])
.output()
.await
.map(|output| output.status.success())
.unwrap_or(false);
if !success {
logging!(
warn,
Type::Core,
"无法终止进程: {} (PID: {})",
process_name,
pid
);
return false;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if self.is_process_running(pid).await.unwrap_or(false) {
logging!(
warn,
Type::Core,
"进程 {} (PID: {}) 终止命令成功但进程仍在运行",
process_name,
pid
);
false
} else {
logging!(
info,
Type::Core,
"成功终止进程: {} (PID: {})",
process_name,
pid
);
true
}
}
async fn is_process_running(&self, pid: u32) -> Result<bool> {
#[cfg(windows)]
{
use winapi::shared::minwindef::DWORD;
use winapi::um::handleapi::CloseHandle;
use winapi::um::processthreadsapi::{GetExitCodeProcess, OpenProcess};
use winapi::um::winnt::{HANDLE, PROCESS_QUERY_INFORMATION};
AsyncHandler::spawn_blocking(move || unsafe {
let handle: HANDLE = OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid);
if handle.is_null() {
return Ok(false);
}
let mut exit_code: DWORD = 0;
let result = GetExitCodeProcess(handle, &mut exit_code);
CloseHandle(handle);
Ok(result != 0 && exit_code == 259)
})
.await?
}
#[cfg(not(windows))]
{
let output = tokio::process::Command::new("ps")
.args(["-p", &pid.to_string()])
.output()
.await?;
Ok(output.status.success() && !output.stdout.is_empty())
}
}
async fn start_core_by_sidecar(&self) -> Result<()> {
logging!(info, Type::Core, "Running core by sidecar");
let config_file = &Config::generate_file(ConfigType::Run).await?;
let app_handle = handle::Handle::app_handle();
let clash_core = Config::verge().await.latest_ref().get_valid_clash_core();
let config_dir = dirs::app_home_dir()?;
let (mut rx, child) = app_handle
.shell()
.sidecar(&clash_core)?
.args([
"-d",
dirs::path_to_str(&config_dir)?,
"-f",
dirs::path_to_str(config_file)?,
])
.spawn()?;
let pid = child.pid();
logging!(trace, Type::Core, "Started core by sidecar pid: {}", pid);
*self.child_sidecar.lock() = Some(CommandChildGuard::new(child));
self.set_running_mode(RunningMode::Sidecar);
let shared_writer: SharedWriter =
Arc::new(tokio::sync::Mutex::new(sidecar_writer().await?));
AsyncHandler::spawn(|| async move {
while let Some(event) = rx.recv().await {
match event {
tauri_plugin_shell::process::CommandEvent::Stdout(line)
| tauri_plugin_shell::process::CommandEvent::Stderr(line) => {
let mut now = DeferredNow::default();
let message = CompactString::from(String::from_utf8_lossy(&line).as_ref());
let w = shared_writer.lock().await;
write_sidecar_log(w, &mut now, Level::Error, &message);
ClashLogger::global().append_log(message);
}
tauri_plugin_shell::process::CommandEvent::Terminated(term) => {
let mut now = DeferredNow::default();
let message = if let Some(code) = term.code {
CompactString::from(format!("Process terminated with code: {}", code))
} else if let Some(signal) = term.signal {
CompactString::from(format!("Process terminated by signal: {}", signal))
} else {
CompactString::from("Process terminated")
};
let w = shared_writer.lock().await;
write_sidecar_log(w, &mut now, Level::Info, &message);
ClashLogger::global().clear_logs();
break;
}
_ => {}
}
}
});
Ok(())
}
fn stop_core_by_sidecar(&self) -> Result<()> {
logging!(info, Type::Core, "Stopping core by sidecar");
if let Some(child) = self.child_sidecar.lock().take() {
let pid = child.pid();
drop(child);
logging!(trace, Type::Core, "Stopped core by sidecar pid: {:?}", pid);
}
self.set_running_mode(RunningMode::NotRunning);
Ok(())
}
}
impl CoreManager {
async fn start_core_by_service(&self) -> Result<()> {
logging!(info, Type::Core, "Running core by service");
let config_file = &Config::generate_file(ConfigType::Run).await?;
service::run_core_by_service(config_file).await?;
self.set_running_mode(RunningMode::Service);
Ok(())
}
async fn stop_core_by_service(&self) -> Result<()> {
logging!(info, Type::Core, "Stopping core by service");
service::stop_core_by_service().await?;
self.set_running_mode(RunningMode::NotRunning);
Ok(())
}
}
impl Default for CoreManager {
fn default() -> Self {
CoreManager {
running: Arc::new(Mutex::new(RunningMode::NotRunning)),
child_sidecar: Arc::new(Mutex::new(None)),
update_semaphore: Arc::new(Semaphore::new(1)),
last_update: Arc::new(Mutex::new(None)),
}
}
}
impl CoreManager {
pub async fn init(&self) -> Result<()> {
logging!(info, Type::Core, "开始核心初始化");
if let Err(e) = self.cleanup_orphaned_mihomo_processes().await {
logging!(warn, Type::Core, "清理遗留 mihomo 进程失败: {}", e);
}
self.start_core().await?;
logging!(info, Type::Core, "核心初始化完成");
Ok(())
}
pub fn set_running_mode(&self, mode: RunningMode) {
let mut guard = self.running.lock();
*guard = mode;
}
pub fn get_running_mode(&self) -> RunningMode {
*self.running.lock()
}
#[cfg(target_os = "windows")]
async fn wait_for_service_ready_if_tun_enabled(&self) {
let require_service = Config::verge()
.await
.latest_ref()
.enable_tun_mode
.unwrap_or(false);
if !require_service {
return;
}
let max_wait = Duration::from_millis(3000);
let mut backoff_strategy = ExponentialBackoff {
initial_interval: Duration::from_millis(200),
max_interval: Duration::from_millis(200),
max_elapsed_time: Some(max_wait),
multiplier: 1.0,
randomization_factor: 0.0,
..Default::default()
};
backoff_strategy.reset();
let mut attempts = 0usize;
let operation = || {
attempts += 1;
let attempt = attempts;
async move {
let mut manager = SERVICE_MANAGER.lock().await;
if matches!(manager.current(), ServiceStatus::Ready) {
if attempt > 1 {
logging!(
info,
Type::Core,
"Service became ready for TUN after {} attempt(s)",
attempt
);
}
return Ok(());
}
if attempt == 1 {
logging!(
info,
Type::Core,
"TUN mode enabled but service not ready; waiting for service availability"
);
}
match manager.init().await {
Ok(_) => {
logging_error!(Type::Core, manager.refresh().await);
}
Err(err) => {
logging!(
debug,
Type::Core,
"Service connection attempt {} failed while waiting for TUN: {}",
attempt,
err
);
return Err(BackoffError::transient(err));
}
}
if matches!(manager.current(), ServiceStatus::Ready) {
logging!(
info,
Type::Core,
"Service became ready for TUN after {} attempt(s)",
attempt
);
return Ok(());
}
logging!(
debug,
Type::Core,
"Service not ready after attempt {}; retrying with backoff",
attempt
);
Err(BackoffError::transient(anyhow!("Service not ready yet")))
}
};
let wait_started = Instant::now();
if let Err(err) = backoff::future::retry(backoff_strategy, operation).await {
let waited_ms = wait_started.elapsed().as_millis();
logging!(
warn,
Type::Core,
"Service still not ready after waiting approximately {} ms ({} attempt(s)); falling back to sidecar mode: {}",
waited_ms,
attempts,
err
);
}
}
// TODO: 是否需要在非windows平台上进行检测
#[allow(clippy::unused_async)]
#[cfg(not(target_os = "windows"))]
async fn wait_for_service_ready_if_tun_enabled(&self) {}
pub async fn prestart_core(&self) -> Result<()> {
self.wait_for_service_ready_if_tun_enabled().await;
match SERVICE_MANAGER.lock().await.current() {
ServiceStatus::Ready => {
self.set_running_mode(RunningMode::Service);
}
_ => {
self.set_running_mode(RunningMode::Sidecar);
}
}
Ok(())
}
/// 启动核心
pub async fn start_core(&self) -> Result<()> {
self.prestart_core().await?;
match self.get_running_mode() {
RunningMode::Service => {
logging_error!(Type::Core, self.start_core_by_service().await);
}
RunningMode::NotRunning | RunningMode::Sidecar => {
logging_error!(Type::Core, self.start_core_by_sidecar().await);
}
};
Ok(())
}
pub async fn get_clash_logs(&self) -> Result<VecDeque<CompactString>> {
logging!(info, Type::Core, "get clash logs");
let logs = match self.get_running_mode() {
RunningMode::Service => service::get_clash_logs_by_service().await?,
RunningMode::Sidecar => ClashLogger::global().get_logs().clone(),
_ => VecDeque::new(),
};
Ok(logs)
}
/// 停止核心运行
pub async fn stop_core(&self) -> Result<()> {
ClashLogger::global().clear_logs();
match self.get_running_mode() {
RunningMode::Service => self.stop_core_by_service().await,
RunningMode::Sidecar => self.stop_core_by_sidecar(),
RunningMode::NotRunning => Ok(()),
}
}
/// 重启内核
pub async fn restart_core(&self) -> Result<()> {
logging!(info, Type::Core, "Restarting core");
self.stop_core().await?;
if SERVICE_MANAGER.lock().await.init().await.is_ok() {
logging_error!(Type::Setup, SERVICE_MANAGER.lock().await.refresh().await);
}
self.start_core().await?;
Ok(())
}
/// 切换核心
pub async fn change_core(&self, clash_core: Option<String>) -> Result<(), String> {
if clash_core.is_none() {
let error_message = "Clash core should not be Null";
logging!(error, Type::Core, "{}", error_message);
return Err(error_message.into());
}
let core = clash_core.as_ref().ok_or_else(|| {
let msg = "Clash core should not be None";
logging!(error, Type::Core, "{}", msg);
msg.to_string()
})?;
if !IVerge::VALID_CLASH_CORES.contains(&core.as_str()) {
let error_message = format!("Clash core invalid name: {core}");
logging!(error, Type::Core, "{}", error_message);
return Err(error_message);
}
Config::verge().await.draft_mut().clash_core = clash_core.clone();
Config::verge().await.apply();
// 分离数据获取和异步调用避免Send问题
let verge_data = Config::verge().await.latest_ref().clone();
logging_error!(Type::Core, verge_data.save_file().await);
let run_path = Config::generate_file(ConfigType::Run).await.map_err(|e| {
let msg = e.to_string();
logging_error!(Type::Core, "{}", msg);
msg
})?;
self.put_configs_force(run_path)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
}
// Use simplified singleton_lazy macro
singleton_lazy!(CoreManager, CORE_MANAGER, CoreManager::default);

View File

@@ -412,47 +412,42 @@ impl EventDrivenProxyManager {
}
async fn get_expected_sys_proxy() -> Sysproxy {
let verge_config = Config::verge().await;
let verge_mixed_port = verge_config.latest_ref().verge_mixed_port;
let proxy_host = verge_config.latest_ref().proxy_host.clone();
use crate::constants::network;
let port = verge_mixed_port.unwrap_or(Config::clash().await.latest_ref().get_mixed_port());
let proxy_host = proxy_host.unwrap_or_else(|| "127.0.0.1".into());
let (verge_mixed_port, proxy_host) = {
let verge_config = Config::verge().await;
let verge_ref = verge_config.latest_ref();
(verge_ref.verge_mixed_port, verge_ref.proxy_host.clone())
};
let default_port = {
let clash_config = Config::clash().await;
clash_config.latest_ref().get_mixed_port()
};
let port = verge_mixed_port.unwrap_or(default_port);
let host = proxy_host.unwrap_or_else(|| network::DEFAULT_PROXY_HOST.into());
Sysproxy {
enable: true,
host: proxy_host,
host,
port,
bypass: Self::get_bypass_config().await,
}
}
async fn get_bypass_config() -> String {
let (use_default, custom_bypass) = {
let verge_config = Config::verge().await;
let verge = verge_config.latest_ref();
(
verge.use_default_bypass.unwrap_or(true),
verge.system_proxy_bypass.clone().unwrap_or_default(),
)
};
use crate::constants::bypass;
#[cfg(target_os = "windows")]
let default_bypass = "localhost;127.*;192.168.*;10.*;172.16.*;172.17.*;172.18.*;172.19.*;172.20.*;172.21.*;172.22.*;172.23.*;172.24.*;172.25.*;172.26.*;172.27.*;172.28.*;172.29.*;172.30.*;172.31.*;<local>";
let verge_config = Config::verge().await;
let verge = verge_config.latest_ref();
let use_default = verge.use_default_bypass.unwrap_or(true);
let custom = verge.system_proxy_bypass.as_deref().unwrap_or("");
#[cfg(target_os = "linux")]
let default_bypass =
"localhost,127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,::1";
#[cfg(target_os = "macos")]
let default_bypass = "127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,localhost,*.local,*.crashlytics.com,<local>";
if custom_bypass.is_empty() {
default_bypass.to_string()
} else if use_default {
format!("{default_bypass},{custom_bypass}")
} else {
custom_bypass
match (use_default, custom.is_empty()) {
(_, true) => bypass::DEFAULT.to_string(),
(true, false) => format!("{},{}", bypass::DEFAULT, custom),
(false, false) => custom.to_string(),
}
}

View File

@@ -1,260 +1,18 @@
use crate::{APP_HANDLE, singleton};
use crate::{APP_HANDLE, singleton, constants::timing};
use parking_lot::RwLock;
use std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
mpsc,
},
thread,
time::{Duration, Instant},
};
use tauri::{AppHandle, Emitter, Manager, WebviewWindow};
use std::{sync::Arc, thread};
use tauri::{AppHandle, Manager, WebviewWindow};
use tauri_plugin_mihomo::{Mihomo, MihomoExt};
use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
use tokio::sync::RwLockReadGuard;
use crate::{logging, utils::logging::Type};
/// 不同类型的前端通知
#[derive(Debug, Clone)]
enum FrontendEvent {
RefreshClash,
RefreshVerge,
NoticeMessage { status: String, message: String },
ProfileChanged { current_profile_id: String },
TimerUpdated { profile_index: String },
ProfileUpdateStarted { uid: String },
ProfileUpdateCompleted { uid: String },
}
/// 事件发送统计和监控
#[derive(Debug, Default)]
struct EventStats {
total_sent: AtomicU64,
total_errors: AtomicU64,
last_error_time: RwLock<Option<Instant>>,
}
/// 存储启动期间的错误消息
#[derive(Debug, Clone)]
struct ErrorMessage {
status: String,
message: String,
}
/// 全局前端通知系统
#[derive(Debug)]
struct NotificationSystem {
sender: Option<mpsc::Sender<FrontendEvent>>,
worker_handle: Option<thread::JoinHandle<()>>,
is_running: bool,
stats: EventStats,
last_emit_time: RwLock<Instant>,
/// 当通知系统失败超过阈值时,进入紧急模式
emergency_mode: RwLock<bool>,
}
impl Default for NotificationSystem {
fn default() -> Self {
Self::new()
}
}
impl NotificationSystem {
fn new() -> Self {
Self {
sender: None,
worker_handle: None,
is_running: false,
stats: EventStats::default(),
last_emit_time: RwLock::new(Instant::now()),
emergency_mode: RwLock::new(false),
}
}
/// 启动通知处理线程
fn start(&mut self) {
if self.is_running {
return;
}
let (tx, rx) = mpsc::channel();
self.sender = Some(tx);
self.is_running = true;
*self.last_emit_time.write() = Instant::now();
match thread::Builder::new()
.name("frontend-notifier".into())
.spawn(move || {
let handle = Handle::global();
while !handle.is_exiting() {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(event) => {
let system_guard = handle.notification_system.read();
let Some(system) = system_guard.as_ref() else {
log::warn!("NotificationSystem not found in handle while processing event.");
continue;
};
let is_emergency = *system.emergency_mode.read();
if is_emergency
&& let FrontendEvent::NoticeMessage { ref status, .. } = event
&& status == "info" {
log::warn!(
"Emergency mode active, skipping info message"
);
continue;
}
if let Some(window) = Handle::get_window() {
*system.last_emit_time.write() = Instant::now();
let (event_name_str, payload_result) = match event {
FrontendEvent::RefreshClash => {
("verge://refresh-clash-config", Ok(serde_json::json!("yes")))
}
FrontendEvent::RefreshVerge => {
("verge://refresh-verge-config", Ok(serde_json::json!("yes")))
}
FrontendEvent::NoticeMessage { status, message } => {
match serde_json::to_value((status, message)) {
Ok(p) => ("verge://notice-message", Ok(p)),
Err(e) => {
log::error!("Failed to serialize NoticeMessage payload: {e}");
("verge://notice-message", Err(e))
}
}
}
FrontendEvent::ProfileChanged { current_profile_id } => {
("profile-changed", Ok(serde_json::json!(current_profile_id)))
}
FrontendEvent::TimerUpdated { profile_index } => {
("verge://timer-updated", Ok(serde_json::json!(profile_index)))
}
FrontendEvent::ProfileUpdateStarted { uid } => {
("profile-update-started", Ok(serde_json::json!({ "uid": uid })))
}
FrontendEvent::ProfileUpdateCompleted { uid } => {
("profile-update-completed", Ok(serde_json::json!({ "uid": uid })))
}
};
if let Ok(payload) = payload_result {
match window.emit(event_name_str, payload) {
Ok(_) => {
system.stats.total_sent.fetch_add(1, Ordering::SeqCst);
// 记录成功发送的事件
if log::log_enabled!(log::Level::Debug) {
log::debug!("Successfully emitted event: {event_name_str}");
}
}
Err(e) => {
log::warn!("Failed to emit event {event_name_str}: {e}");
system.stats.total_errors.fetch_add(1, Ordering::SeqCst);
*system.stats.last_error_time.write() = Some(Instant::now());
let errors = system.stats.total_errors.load(Ordering::SeqCst);
const EMIT_ERROR_THRESHOLD: u64 = 10;
if errors > EMIT_ERROR_THRESHOLD && !*system.emergency_mode.read() {
log::warn!(
"Reached {EMIT_ERROR_THRESHOLD} emit errors, entering emergency mode"
);
*system.emergency_mode.write() = true;
}
}
}
} else {
system.stats.total_errors.fetch_add(1, Ordering::SeqCst);
*system.stats.last_error_time.write() = Some(Instant::now());
log::warn!("Skipped emitting event due to payload serialization error for {event_name_str}");
}
} else {
log::warn!("No window found, skipping event emit.");
}
thread::sleep(Duration::from_millis(20));
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
log::info!(
"Notification channel disconnected, exiting worker thread"
);
break;
}
}
}
log::info!("Notification worker thread exiting");
}) {
Ok(handle) => {
self.worker_handle = Some(handle);
}
Err(e) => {
log::error!("Failed to start notification worker thread: {e}");
}
}
}
/// 发送事件到队列
fn send_event(&self, event: FrontendEvent) -> bool {
if *self.emergency_mode.read()
&& let FrontendEvent::NoticeMessage { ref status, .. } = event
&& status == "info"
{
log::info!("Skipping info message in emergency mode");
return false;
}
if let Some(sender) = &self.sender {
match sender.send(event) {
Ok(_) => true,
Err(e) => {
log::warn!("Failed to send event to notification queue: {e:?}");
self.stats.total_errors.fetch_add(1, Ordering::SeqCst);
*self.stats.last_error_time.write() = Some(Instant::now());
false
}
}
} else {
log::warn!("Notification system not started, can't send event");
false
}
}
fn shutdown(&mut self) {
log::info!("NotificationSystem shutdown initiated");
self.is_running = false;
// 先关闭发送端,让接收端知道不会再有新消息
if let Some(sender) = self.sender.take() {
drop(sender);
}
// 设置超时避免无限等待
if let Some(handle) = self.worker_handle.take() {
match handle.join() {
Ok(_) => {
log::info!("NotificationSystem worker thread joined successfully");
}
Err(e) => {
log::error!("NotificationSystem worker thread join failed: {e:?}");
}
}
}
log::info!("NotificationSystem shutdown completed");
}
}
use super::notification::{ErrorMessage, FrontendEvent, NotificationSystem};
#[derive(Debug, Clone)]
pub struct Handle {
pub is_exiting: Arc<RwLock<bool>>,
is_exiting: Arc<RwLock<bool>>,
startup_errors: Arc<RwLock<Vec<ErrorMessage>>>,
startup_completed: Arc<RwLock<bool>>,
notification_system: Arc<RwLock<Option<NotificationSystem>>>,
pub(crate) notification_system: Arc<RwLock<Option<NotificationSystem>>>,
}
impl Default for Handle {
@@ -268,7 +26,6 @@ impl Default for Handle {
}
}
// Use singleton macro
singleton!(Handle, HANDLE);
impl Handle {
@@ -277,45 +34,28 @@ impl Handle {
}
pub fn init(&self) {
// 如果正在退出,不要重新初始化
if self.is_exiting() {
log::debug!("Handle::init called while exiting, skipping initialization");
return;
}
let mut system_opt = self.notification_system.write();
if let Some(system) = system_opt.as_mut() {
// 只在未运行时启动
if !system.is_running {
system.start();
} else {
log::debug!("NotificationSystem already running, skipping start");
}
}
}
/// 获取 AppHandle
#[allow(clippy::expect_used)]
pub fn app_handle() -> &'static AppHandle {
APP_HANDLE.get().expect("failed to get global app handle")
APP_HANDLE.get().expect("App handle not initialized")
}
pub async fn mihomo() -> RwLockReadGuard<'static, Mihomo> {
Self::app_handle().mihomo().read().await
}
#[allow(unused)]
pub async fn mihomo_mut() -> RwLockWriteGuard<'static, Mihomo> {
Self::app_handle().mihomo().write().await
}
pub fn get_window() -> Option<WebviewWindow> {
let app_handle = Self::app_handle();
let window: Option<WebviewWindow> = app_handle.get_webview_window("main");
if window.is_none() {
log::debug!(target:"app", "main window not found");
}
window
Self::app_handle().get_webview_window("main")
}
pub fn refresh_clash() {
@@ -343,86 +83,29 @@ impl Handle {
}
pub fn notify_profile_changed(profile_id: String) {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::ProfileChanged {
current_profile_id: profile_id,
});
} else {
log::warn!(
"Notification system not initialized when trying to send ProfileChanged event."
);
}
Self::send_event(FrontendEvent::ProfileChanged {
current_profile_id: profile_id,
});
}
pub fn notify_timer_updated(profile_index: String) {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::TimerUpdated { profile_index });
} else {
log::warn!(
"Notification system not initialized when trying to send TimerUpdated event."
);
}
Self::send_event(FrontendEvent::TimerUpdated { profile_index });
}
pub fn notify_profile_update_started(uid: String) {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::ProfileUpdateStarted { uid });
} else {
log::warn!(
"Notification system not initialized when trying to send ProfileUpdateStarted event."
);
}
Self::send_event(FrontendEvent::ProfileUpdateStarted { uid });
}
pub fn notify_profile_update_completed(uid: String) {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::ProfileUpdateCompleted { uid });
} else {
log::warn!(
"Notification system not initialized when trying to send ProfileUpdateCompleted event."
);
}
Self::send_event(FrontendEvent::ProfileUpdateCompleted { uid });
}
/// 通知前端显示消息队列
pub fn notice_message<S: Into<String>, M: Into<String>>(status: S, msg: M) {
let handle = Self::global();
let status_str = status.into();
let msg_str = msg.into();
if !*handle.startup_completed.read() {
logging!(
info,
Type::Frontend,
"启动过程中发现错误,加入消息队列: {} - {}",
status_str,
msg_str
);
let mut errors = handle.startup_errors.write();
errors.push(ErrorMessage {
status: status_str,
@@ -435,25 +118,29 @@ impl Handle {
return;
}
Self::send_event(FrontendEvent::NoticeMessage {
status: status_str,
message: msg_str,
});
}
fn send_event(event: FrontendEvent) {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::NoticeMessage {
status: status_str,
message: msg_str,
});
system.send_event(event);
}
}
pub fn mark_startup_completed(&self) {
{
let mut completed = self.startup_completed.write();
*completed = true;
}
*self.startup_completed.write() = true;
self.send_startup_errors();
}
/// 发送启动时累积的所有错误消息
fn send_startup_errors(&self) {
let errors = {
let mut errors = self.startup_errors.write();
@@ -464,19 +151,10 @@ impl Handle {
return;
}
logging!(
info,
Type::Frontend,
"发送{}条启动时累积的错误消息: {:?}",
errors.len(),
errors
);
// 启动单独线程处理启动错误,避免阻塞主线程
let thread_result = thread::Builder::new()
let _ = thread::Builder::new()
.name("startup-errors-sender".into())
.spawn(move || {
thread::sleep(Duration::from_secs(2));
thread::sleep(timing::STARTUP_ERROR_DELAY);
let handle = Handle::global();
if handle.is_exiting() {
@@ -495,19 +173,14 @@ impl Handle {
message: error.message,
});
thread::sleep(Duration::from_millis(300));
thread::sleep(timing::ERROR_BATCH_DELAY);
}
}
});
if let Err(e) = thread_result {
log::error!("Failed to spawn startup errors thread: {e}");
}
}
pub fn set_is_exiting(&self) {
let mut is_exiting = self.is_exiting.write();
*is_exiting = true;
*self.is_exiting.write() = true;
let mut system_opt = self.notification_system.write();
if let Some(system) = system_opt.as_mut() {
@@ -523,33 +196,17 @@ impl Handle {
#[cfg(target_os = "macos")]
impl Handle {
pub fn set_activation_policy(&self, policy: tauri::ActivationPolicy) -> Result<(), String> {
let app_handle = Self::app_handle();
app_handle
Self::app_handle()
.set_activation_policy(policy)
.map_err(|e| e.to_string())
}
pub fn set_activation_policy_regular(&self) {
if let Err(e) = self.set_activation_policy(tauri::ActivationPolicy::Regular) {
logging!(
warn,
Type::Setup,
"Failed to set regular activation policy: {}",
e
);
}
let _ = self.set_activation_policy(tauri::ActivationPolicy::Regular);
}
pub fn set_activation_policy_accessory(&self) {
if let Err(e) = self.set_activation_policy(tauri::ActivationPolicy::Accessory) {
logging!(
warn,
Type::Setup,
"Failed to set accessory activation policy: {}",
e
);
}
let _ = self.set_activation_policy(tauri::ActivationPolicy::Accessory);
}
// Remove dead code policy prohibited function since https://github.com/clash-verge-rev/clash-verge-rev/pull/5103
}

View File

@@ -0,0 +1,152 @@
use super::CoreManager;
use crate::{
config::*,
constants::timing,
core::{handle, validate::CoreConfigValidator},
logging,
utils::{dirs, help, logging::Type},
};
use anyhow::{Result, anyhow};
use std::{path::PathBuf, time::Instant};
use tauri_plugin_mihomo::Error as MihomoError;
use tokio::time::sleep;
impl CoreManager {
pub async fn use_default_config(&self, error_key: &str, error_msg: &str) -> Result<()> {
use crate::constants::files::RUNTIME_CONFIG;
let runtime_path = dirs::app_home_dir()?.join(RUNTIME_CONFIG);
let clash_config = Config::clash().await.latest_ref().0.clone();
*Config::runtime().await.draft_mut() = Box::new(IRuntime {
config: Some(clash_config.clone()),
exists_keys: vec![],
chain_logs: Default::default(),
});
help::save_yaml(&runtime_path, &clash_config, Some("# Clash Verge Runtime")).await?;
handle::Handle::notice_message(error_key, error_msg);
Ok(())
}
pub async fn update_config(&self) -> Result<(bool, String)> {
if handle::Handle::global().is_exiting() {
return Ok((true, String::new()));
}
if !self.should_update_config()? {
return Ok((true, String::new()));
}
let _permit = self.update_semaphore.try_acquire()
.map_err(|_| anyhow!("Config update already in progress"))?;
self.perform_config_update().await
}
fn should_update_config(&self) -> Result<bool> {
let now = Instant::now();
let mut last = self.last_update.lock();
if let Some(last_time) = *last {
if now.duration_since(last_time) < timing::CONFIG_UPDATE_DEBOUNCE {
return Ok(false);
}
}
*last = Some(now);
Ok(true)
}
async fn perform_config_update(&self) -> Result<(bool, String)> {
Config::generate().await?;
match CoreConfigValidator::global().validate_config().await {
Ok((true, _)) => {
let run_path = Config::generate_file(ConfigType::Run).await?;
self.apply_config(run_path).await?;
Ok((true, String::new()))
}
Ok((false, error_msg)) => {
Config::runtime().await.discard();
Ok((false, error_msg))
}
Err(e) => {
Config::runtime().await.discard();
Err(e)
}
}
}
pub async fn put_configs_force(&self, path: PathBuf) -> Result<()> {
self.apply_config(path).await
}
pub(super) async fn apply_config(&self, path: PathBuf) -> Result<()> {
let path_str = dirs::path_to_str(&path)?;
match self.reload_config(path_str).await {
Ok(_) => {
Config::runtime().await.apply();
logging!(info, Type::Core, "Configuration applied");
Ok(())
}
Err(err) if Self::should_restart_on_error(&err) => {
self.retry_with_restart(path_str).await
}
Err(err) => {
Config::runtime().await.discard();
Err(anyhow!("Failed to apply config: {}", err))
}
}
}
async fn retry_with_restart(&self, config_path: &str) -> Result<()> {
if handle::Handle::global().is_exiting() {
return Err(anyhow!("Application exiting"));
}
logging!(warn, Type::Core, "Restarting core for config reload");
self.restart_core().await?;
sleep(timing::CONFIG_RELOAD_DELAY).await;
self.reload_config(config_path).await?;
Config::runtime().await.apply();
logging!(info, Type::Core, "Configuration applied after restart");
Ok(())
}
async fn reload_config(&self, path: &str) -> Result<(), MihomoError> {
handle::Handle::mihomo().await.reload_config(true, path).await
}
fn should_restart_on_error(err: &MihomoError) -> bool {
match err {
MihomoError::ConnectionFailed | MihomoError::ConnectionLost => true,
MihomoError::Io(io_err) => Self::is_connection_io_error(io_err.kind()),
MihomoError::Reqwest(req_err) => {
req_err.is_connect()
|| req_err.is_timeout()
|| Self::contains_error_pattern(&req_err.to_string())
}
MihomoError::FailedResponse(msg) => Self::contains_error_pattern(msg),
_ => false,
}
}
fn is_connection_io_error(kind: std::io::ErrorKind) -> bool {
matches!(
kind,
std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::ConnectionRefused
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::NotFound
)
}
fn contains_error_pattern(text: &str) -> bool {
use crate::constants::error_patterns::CONNECTION_ERRORS;
CONNECTION_ERRORS.iter().any(|p| text.contains(p))
}
}

View File

@@ -0,0 +1,120 @@
use super::{CoreManager, RunningMode};
use crate::{
core::{logger::ClashLogger, service::{ServiceStatus, SERVICE_MANAGER}},
logging,
utils::logging::Type,
};
use anyhow::Result;
impl CoreManager {
pub async fn start_core(&self) -> Result<()> {
self.prepare_startup().await?;
match self.get_running_mode() {
RunningMode::Service => self.start_core_by_service().await,
RunningMode::NotRunning | RunningMode::Sidecar => self.start_core_by_sidecar().await,
}
}
pub async fn stop_core(&self) -> Result<()> {
ClashLogger::global().clear_logs();
match self.get_running_mode() {
RunningMode::Service => self.stop_core_by_service().await,
RunningMode::Sidecar => self.stop_core_by_sidecar(),
RunningMode::NotRunning => Ok(()),
}
}
pub async fn restart_core(&self) -> Result<()> {
logging!(info, Type::Core, "Restarting core");
self.stop_core().await?;
if SERVICE_MANAGER.lock().await.init().await.is_ok() {
let _ = SERVICE_MANAGER.lock().await.refresh().await;
}
self.start_core().await
}
pub async fn change_core(&self, clash_core: Option<String>) -> Result<(), String> {
use crate::config::{Config, ConfigType, IVerge};
let core = clash_core.as_ref()
.ok_or_else(|| "Clash core cannot be None".to_string())?;
if !IVerge::VALID_CLASH_CORES.contains(&core.as_str()) {
return Err(format!("Invalid clash core: {}", core));
}
Config::verge().await.draft_mut().clash_core = clash_core;
Config::verge().await.apply();
let verge_data = Config::verge().await.latest_ref().clone();
verge_data.save_file().await.map_err(|e| e.to_string())?;
let run_path = Config::generate_file(ConfigType::Run).await
.map_err(|e| e.to_string())?;
self.apply_config(run_path).await.map_err(|e| e.to_string())
}
async fn prepare_startup(&self) -> Result<()> {
self.wait_for_service_if_needed().await;
let mode = match SERVICE_MANAGER.lock().await.current() {
ServiceStatus::Ready => RunningMode::Service,
_ => RunningMode::Sidecar,
};
self.set_running_mode(mode);
Ok(())
}
#[cfg(target_os = "windows")]
async fn wait_for_service_if_needed(&self) {
use crate::{config::Config, constants::timing};
use backoff::{Error as BackoffError, ExponentialBackoff};
let needs_service = Config::verge().await
.latest_ref()
.enable_tun_mode
.unwrap_or(false);
if !needs_service {
return;
}
let backoff = ExponentialBackoff {
initial_interval: timing::SERVICE_WAIT_INTERVAL,
max_interval: timing::SERVICE_WAIT_INTERVAL,
max_elapsed_time: Some(timing::SERVICE_WAIT_MAX),
multiplier: 1.0,
randomization_factor: 0.0,
..Default::default()
};
let operation = || async {
let mut manager = SERVICE_MANAGER.lock().await;
if matches!(manager.current(), ServiceStatus::Ready) {
return Ok(());
}
manager.init().await.map_err(BackoffError::transient)?;
let _ = manager.refresh().await;
if matches!(manager.current(), ServiceStatus::Ready) {
Ok(())
} else {
Err(BackoffError::transient(anyhow::anyhow!("Service not ready")))
}
};
let _ = backoff::future::retry(backoff, operation).await;
}
#[cfg(not(target_os = "windows"))]
async fn wait_for_service_if_needed(&self) {}
}

View File

@@ -0,0 +1,80 @@
mod config;
mod lifecycle;
mod process;
mod state;
use anyhow::Result;
use parking_lot::Mutex;
use std::{fmt, sync::Arc, time::Instant};
use tokio::sync::Semaphore;
use crate::process::CommandChildGuard;
use crate::singleton_lazy;
#[derive(Debug, Clone, Copy, serde::Serialize, PartialEq, Eq)]
pub enum RunningMode {
Service,
Sidecar,
NotRunning,
}
impl fmt::Display for RunningMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Service => write!(f, "Service"),
Self::Sidecar => write!(f, "Sidecar"),
Self::NotRunning => write!(f, "NotRunning"),
}
}
}
#[derive(Debug)]
pub struct CoreManager {
state: Arc<Mutex<State>>,
update_semaphore: Arc<Semaphore>,
last_update: Arc<Mutex<Option<Instant>>>,
}
#[derive(Debug)]
struct State {
running_mode: RunningMode,
child_sidecar: Option<CommandChildGuard>,
}
impl Default for State {
fn default() -> Self {
Self {
running_mode: RunningMode::NotRunning,
child_sidecar: None,
}
}
}
impl Default for CoreManager {
fn default() -> Self {
Self {
state: Arc::new(Mutex::new(State::default())),
update_semaphore: Arc::new(Semaphore::new(1)),
last_update: Arc::new(Mutex::new(None)),
}
}
}
impl CoreManager {
pub fn get_running_mode(&self) -> RunningMode {
self.state.lock().running_mode
}
pub fn set_running_mode(&self, mode: RunningMode) {
self.state.lock().running_mode = mode;
}
pub async fn init(&self) -> Result<()> {
self.cleanup_orphaned_processes().await?;
self.start_core().await?;
Ok(())
}
}
singleton_lazy!(CoreManager, CORE_MANAGER, CoreManager::default);

View File

@@ -0,0 +1,204 @@
use super::CoreManager;
use crate::{
AsyncHandler,
constants::{process, timing},
logging,
utils::logging::Type,
};
use anyhow::{Result, anyhow};
impl CoreManager {
pub async fn cleanup_orphaned_processes(&self) -> Result<()> {
logging!(info, Type::Core, "Cleaning orphaned mihomo processes");
let current_pid = self.state.lock().child_sidecar.as_ref().and_then(|c| c.pid());
let target_processes = process::process_names();
let process_futures = target_processes.iter().map(|&name| {
let process_name = process::with_extension(name);
self.find_processes_by_name(process_name, name)
});
let process_results = futures::future::join_all(process_futures).await;
let pids_to_kill: Vec<_> = process_results
.into_iter()
.filter_map(Result::ok)
.flat_map(|(pids, name)| {
pids.into_iter()
.filter(move |&pid| Some(pid) != current_pid)
.map(move |pid| (pid, name.clone()))
})
.collect();
if pids_to_kill.is_empty() {
return Ok(());
}
let kill_futures = pids_to_kill
.iter()
.map(|(pid, name)| self.kill_process_verified(*pid, name.clone()));
let killed_count = futures::future::join_all(kill_futures)
.await
.into_iter()
.filter(|&success| success)
.count();
if killed_count > 0 {
logging!(info, Type::Core, "Cleaned {} orphaned processes", killed_count);
}
Ok(())
}
async fn find_processes_by_name(&self, process_name: String, _target: &str) -> Result<(Vec<u32>, String)> {
#[cfg(windows)]
{
use std::mem;
use winapi::um::{
handleapi::CloseHandle,
tlhelp32::{CreateToolhelp32Snapshot, PROCESSENTRY32W, Process32FirstW, Process32NextW, TH32CS_SNAPPROCESS},
};
let process_name_clone = process_name.clone();
let pids = AsyncHandler::spawn_blocking(move || -> Result<Vec<u32>> {
let mut pids = Vec::with_capacity(8);
unsafe {
let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
return Err(anyhow!("Failed to create process snapshot"));
}
let mut pe32: PROCESSENTRY32W = mem::zeroed();
pe32.dwSize = mem::size_of::<PROCESSENTRY32W>() as u32;
if Process32FirstW(snapshot, &mut pe32) != 0 {
loop {
let end_pos = pe32.szExeFile.iter().position(|&x| x == 0)
.unwrap_or(pe32.szExeFile.len());
if end_pos > 0 {
let exe_file = String::from_utf16_lossy(&pe32.szExeFile[..end_pos]);
if exe_file.eq_ignore_ascii_case(&process_name_clone) {
pids.push(pe32.th32ProcessID);
}
}
if Process32NextW(snapshot, &mut pe32) == 0 {
break;
}
}
}
CloseHandle(snapshot);
}
Ok(pids)
}).await??;
Ok((pids, process_name))
}
#[cfg(not(windows))]
{
let cmd = if cfg!(target_os = "macos") { "pgrep" } else { "pidof" };
let output = tokio::process::Command::new(cmd)
.arg(&process_name)
.output()
.await?;
if !output.status.success() {
return Ok((Vec::new(), process_name));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let pids: Vec<u32> = stdout
.split_whitespace()
.filter_map(|s| s.parse().ok())
.collect();
Ok((pids, process_name))
}
}
async fn kill_process_verified(&self, pid: u32, process_name: String) -> bool {
#[cfg(windows)]
let success = {
use winapi::um::{
handleapi::CloseHandle,
processthreadsapi::{OpenProcess, TerminateProcess},
winnt::{HANDLE, PROCESS_TERMINATE},
};
AsyncHandler::spawn_blocking(move || unsafe {
let handle: HANDLE = OpenProcess(PROCESS_TERMINATE, 0, pid);
if handle.is_null() {
return false;
}
let result = TerminateProcess(handle, 1) != 0;
CloseHandle(handle);
result
}).await.unwrap_or(false)
};
#[cfg(not(windows))]
let success = tokio::process::Command::new("kill")
.args(["-9", &pid.to_string()])
.output()
.await
.map(|output| output.status.success())
.unwrap_or(false);
if !success {
return false;
}
tokio::time::sleep(timing::PROCESS_VERIFY_DELAY).await;
if self.is_process_running(pid).await.unwrap_or(false) {
logging!(warn, Type::Core, "Process {} (PID: {}) still running after termination", process_name, pid);
false
} else {
logging!(info, Type::Core, "Terminated process {} (PID: {})", process_name, pid);
true
}
}
async fn is_process_running(&self, pid: u32) -> Result<bool> {
#[cfg(windows)]
{
use winapi::{
shared::minwindef::DWORD,
um::{
handleapi::CloseHandle,
processthreadsapi::{GetExitCodeProcess, OpenProcess},
winnt::{HANDLE, PROCESS_QUERY_INFORMATION},
},
};
AsyncHandler::spawn_blocking(move || unsafe {
let handle: HANDLE = OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid);
if handle.is_null() {
return Ok(false);
}
let mut exit_code: DWORD = 0;
let result = GetExitCodeProcess(handle, &mut exit_code);
CloseHandle(handle);
Ok(result != 0 && exit_code == 259)
}).await?
}
#[cfg(not(windows))]
{
let output = tokio::process::Command::new("ps")
.args(["-p", &pid.to_string()])
.output()
.await?;
Ok(output.status.success() && !output.stdout.is_empty())
}
}
}

View File

@@ -0,0 +1,125 @@
use super::{CoreManager, RunningMode};
use crate::{
AsyncHandler,
config::Config,
core::{
handle,
logger::ClashLogger,
service,
},
logging,
process::CommandChildGuard,
utils::{
dirs,
init::sidecar_writer,
logging::{SharedWriter, Type, write_sidecar_log},
},
};
use anyhow::Result;
use compact_str::CompactString;
use flexi_logger::DeferredNow;
use log::Level;
use std::collections::VecDeque;
use tauri_plugin_shell::ShellExt;
impl CoreManager {
pub async fn get_clash_logs(&self) -> Result<VecDeque<CompactString>> {
match self.get_running_mode() {
RunningMode::Service => service::get_clash_logs_by_service().await,
RunningMode::Sidecar => Ok(ClashLogger::global().get_logs().clone()),
RunningMode::NotRunning => Ok(VecDeque::new()),
}
}
pub(super) async fn start_core_by_sidecar(&self) -> Result<()> {
logging!(info, Type::Core, "Starting core in sidecar mode");
let config_file = Config::generate_file(crate::config::ConfigType::Run).await?;
let app_handle = handle::Handle::app_handle();
let clash_core = Config::verge().await.latest_ref().get_valid_clash_core();
let config_dir = dirs::app_home_dir()?;
let (mut rx, child) = app_handle
.shell()
.sidecar(&clash_core)?
.args([
"-d",
dirs::path_to_str(&config_dir)?,
"-f",
dirs::path_to_str(&config_file)?,
])
.spawn()?;
let pid = child.pid();
logging!(trace, Type::Core, "Sidecar started with PID: {}", pid);
{
let mut state = self.state.lock();
state.child_sidecar = Some(CommandChildGuard::new(child));
state.running_mode = RunningMode::Sidecar;
}
let shared_writer: SharedWriter = std::sync::Arc::new(tokio::sync::Mutex::new(sidecar_writer().await?));
AsyncHandler::spawn(|| async move {
while let Some(event) = rx.recv().await {
match event {
tauri_plugin_shell::process::CommandEvent::Stdout(line)
| tauri_plugin_shell::process::CommandEvent::Stderr(line) => {
let mut now = DeferredNow::default();
let message = CompactString::from(String::from_utf8_lossy(&line).as_ref());
let w = shared_writer.lock().await;
write_sidecar_log(w, &mut now, Level::Error, &message);
ClashLogger::global().append_log(message);
}
tauri_plugin_shell::process::CommandEvent::Terminated(term) => {
let mut now = DeferredNow::default();
let message = if let Some(code) = term.code {
CompactString::from(format!("Process terminated with code: {}", code))
} else if let Some(signal) = term.signal {
CompactString::from(format!("Process terminated by signal: {}", signal))
} else {
CompactString::from("Process terminated")
};
let w = shared_writer.lock().await;
write_sidecar_log(w, &mut now, Level::Info, &message);
ClashLogger::global().clear_logs();
break;
}
_ => {}
}
}
});
Ok(())
}
pub(super) fn stop_core_by_sidecar(&self) -> Result<()> {
logging!(info, Type::Core, "Stopping sidecar");
let mut state = self.state.lock();
if let Some(child) = state.child_sidecar.take() {
let pid = child.pid();
drop(child);
logging!(trace, Type::Core, "Sidecar stopped (PID: {:?})", pid);
}
state.running_mode = RunningMode::NotRunning;
Ok(())
}
pub(super) async fn start_core_by_service(&self) -> Result<()> {
logging!(info, Type::Core, "Starting core in service mode");
let config_file = Config::generate_file(crate::config::ConfigType::Run).await?;
service::run_core_by_service(&config_file).await?;
self.set_running_mode(RunningMode::Service);
Ok(())
}
pub(super) async fn stop_core_by_service(&self) -> Result<()> {
logging!(info, Type::Core, "Stopping service");
service::stop_core_by_service().await?;
self.set_running_mode(RunningMode::NotRunning);
Ok(())
}
}

View File

@@ -1,11 +1,11 @@
pub mod async_proxy_query;
pub mod backup;
#[allow(clippy::module_inception)]
mod core;
pub mod event_driven_proxy;
pub mod handle;
pub mod hotkey;
pub mod logger;
pub mod manager;
mod notification;
pub mod service;
pub mod sysopt;
pub mod timer;
@@ -13,4 +13,8 @@ pub mod tray;
pub mod validate;
pub mod win_uwp;
pub use self::{core::*, event_driven_proxy::EventDrivenProxyManager, timer::Timer};
pub use self::{
event_driven_proxy::EventDrivenProxyManager,
manager::CoreManager,
timer::Timer,
};

View File

@@ -0,0 +1,203 @@
use crate::{
constants::{retry, timing},
logging,
utils::logging::Type,
};
use parking_lot::RwLock;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
mpsc,
},
thread,
time::Instant,
};
use tauri::{Emitter, WebviewWindow};
#[derive(Debug, Clone)]
pub enum FrontendEvent {
RefreshClash,
RefreshVerge,
NoticeMessage { status: String, message: String },
ProfileChanged { current_profile_id: String },
TimerUpdated { profile_index: String },
ProfileUpdateStarted { uid: String },
ProfileUpdateCompleted { uid: String },
}
#[derive(Debug, Default)]
struct EventStats {
total_sent: AtomicU64,
total_errors: AtomicU64,
last_error_time: RwLock<Option<Instant>>,
}
#[derive(Debug, Clone)]
pub struct ErrorMessage {
pub status: String,
pub message: String,
}
#[derive(Debug)]
pub struct NotificationSystem {
sender: Option<mpsc::Sender<FrontendEvent>>,
#[allow(clippy::type_complexity)]
worker_handle: Option<thread::JoinHandle<()>>,
pub(super) is_running: bool,
stats: EventStats,
emergency_mode: RwLock<bool>,
}
impl Default for NotificationSystem {
fn default() -> Self {
Self::new()
}
}
impl NotificationSystem {
pub fn new() -> Self {
Self {
sender: None,
worker_handle: None,
is_running: false,
stats: EventStats::default(),
emergency_mode: RwLock::new(false),
}
}
pub fn start(&mut self) {
if self.is_running {
return;
}
let (tx, rx) = mpsc::channel();
self.sender = Some(tx);
self.is_running = true;
let result = thread::Builder::new()
.name("frontend-notifier".into())
.spawn(move || Self::worker_loop(rx));
match result {
Ok(handle) => self.worker_handle = Some(handle),
Err(e) => logging!(error, Type::System, "Failed to start notification worker: {}", e),
}
}
fn worker_loop(rx: mpsc::Receiver<FrontendEvent>) {
use super::handle::Handle;
let handle = Handle::global();
while !handle.is_exiting() {
match rx.recv_timeout(std::time::Duration::from_millis(100)) {
Ok(event) => Self::process_event(&handle, event),
Err(mpsc::RecvTimeoutError::Disconnected) => break,
Err(mpsc::RecvTimeoutError::Timeout) => continue,
}
}
}
fn process_event(handle: &super::handle::Handle, event: FrontendEvent) {
let system_guard = handle.notification_system.read();
let Some(system) = system_guard.as_ref() else {
return;
};
if system.should_skip_event(&event) {
return;
}
if let Some(window) = super::handle::Handle::get_window() {
system.emit_to_window(&window, event);
thread::sleep(timing::EVENT_EMIT_DELAY);
}
}
fn should_skip_event(&self, event: &FrontendEvent) -> bool {
let is_emergency = *self.emergency_mode.read();
matches!(
(is_emergency, event),
(true, FrontendEvent::NoticeMessage { status, .. }) if status == "info"
)
}
fn emit_to_window(&self, window: &WebviewWindow, event: FrontendEvent) {
let (event_name, payload) = self.serialize_event(event);
let Ok(payload) = payload else {
self.stats.total_errors.fetch_add(1, Ordering::Relaxed);
return;
};
match window.emit(event_name, payload) {
Ok(_) => {
self.stats.total_sent.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
logging!(warn, Type::Frontend, "Event emit failed: {}", e);
self.handle_emit_error();
}
}
}
fn serialize_event(&self, event: FrontendEvent) -> (&'static str, Result<serde_json::Value, serde_json::Error>) {
use serde_json::json;
match event {
FrontendEvent::RefreshClash => ("verge://refresh-clash-config", Ok(json!("yes"))),
FrontendEvent::RefreshVerge => ("verge://refresh-verge-config", Ok(json!("yes"))),
FrontendEvent::NoticeMessage { status, message } => {
("verge://notice-message", serde_json::to_value((status, message)))
}
FrontendEvent::ProfileChanged { current_profile_id } => {
("profile-changed", Ok(json!(current_profile_id)))
}
FrontendEvent::TimerUpdated { profile_index } => {
("verge://timer-updated", Ok(json!(profile_index)))
}
FrontendEvent::ProfileUpdateStarted { uid } => {
("profile-update-started", Ok(json!({ "uid": uid })))
}
FrontendEvent::ProfileUpdateCompleted { uid } => {
("profile-update-completed", Ok(json!({ "uid": uid })))
}
}
}
fn handle_emit_error(&self) {
self.stats.total_errors.fetch_add(1, Ordering::Relaxed);
*self.stats.last_error_time.write() = Some(Instant::now());
let errors = self.stats.total_errors.load(Ordering::Relaxed);
if errors > retry::EVENT_EMIT_THRESHOLD && !*self.emergency_mode.read() {
logging!(warn, Type::Frontend, "Entering emergency mode after {} errors", errors);
*self.emergency_mode.write() = true;
}
}
pub fn send_event(&self, event: FrontendEvent) -> bool {
if self.should_skip_event(&event) {
return false;
}
if let Some(sender) = &self.sender {
sender.send(event).is_ok()
} else {
false
}
}
pub fn shutdown(&mut self) {
self.is_running = false;
if let Some(sender) = self.sender.take() {
drop(sender);
}
if let Some(handle) = self.worker_handle.take() {
let _ = handle.join();
}
}
}

View File

@@ -3,6 +3,7 @@
mod cmd;
pub mod config;
mod constants;
mod core;
mod enhance;
mod feat;

View File

@@ -122,7 +122,7 @@ macro_rules! wrap_err {
macro_rules! logging {
// 不带 print 参数的版本(默认不打印)
($level:ident, $type:expr, $($arg:tt)*) => {
log::$level!(target: "app", "{} {}", $type, format_args!($($arg)*));
log::$level!(target: "app", "{} {}", $type, format_args!($($arg)*))
};
}

View File

@@ -15,6 +15,7 @@ import {
getRunningMode,
getSystemProxy,
} from "@/services/cmds";
import { SWR_DEFAULTS, SWR_REALTIME, SWR_SLOW_POLL } from "@/services/config";
import { AppDataContext, AppDataContextType } from "./app-data-context";
@@ -30,61 +31,33 @@ export const AppDataProvider = ({
"getProxies",
calcuProxies,
{
refreshInterval: 8000,
revalidateOnFocus: false,
suspense: false,
errorRetryCount: 2,
dedupingInterval: 3000,
onError: (err) => {
console.warn("[DataProvider] 代理数据获取失败:", err);
},
...SWR_REALTIME,
onError: (err) => console.warn("[DataProvider] Proxy fetch failed:", err),
},
);
const { data: clashConfig, mutate: refreshClashConfig } = useSWR(
"getClashConfig",
getBaseConfig,
{
refreshInterval: 60000,
revalidateOnFocus: false,
suspense: false,
errorRetryCount: 2,
dedupingInterval: 5000,
},
SWR_SLOW_POLL,
);
const { data: proxyProviders, mutate: refreshProxyProviders } = useSWR(
"getProxyProviders",
calcuProxyProviders,
{
revalidateOnFocus: false,
revalidateOnReconnect: false,
dedupingInterval: 5000,
suspense: false,
errorRetryCount: 2,
},
SWR_DEFAULTS,
);
const { data: ruleProviders, mutate: refreshRuleProviders } = useSWR(
"getRuleProviders",
getRuleProviders,
{
revalidateOnFocus: false,
suspense: false,
errorRetryCount: 2,
dedupingInterval: 5000,
},
SWR_DEFAULTS,
);
const { data: rulesData, mutate: refreshRules } = useSWR(
"getRules",
getRules,
{
revalidateOnFocus: false,
suspense: false,
errorRetryCount: 2,
dedupingInterval: 5000,
},
SWR_DEFAULTS,
);
useEffect(() => {
@@ -101,7 +74,7 @@ export const AppDataProvider = ({
try {
fn();
} catch (error) {
console.error("[数据提供者] 立即清理失败:", error);
console.error("[DataProvider] Immediate cleanup failed:", error);
}
} else {
cleanupFns.push(fn);
@@ -151,10 +124,10 @@ export const AppDataProvider = ({
scheduleTimeout(() => {
refreshRules().catch((error) =>
console.warn("[数据提供者] 规则刷新失败:", error),
console.warn("[DataProvider] Rules refresh failed:", error),
);
refreshRuleProviders().catch((error) =>
console.warn("[数据提供者] 规则提供者刷新失败:", error),
console.warn("[DataProvider] Rule providers refresh failed:", error),
);
}, 200);
};
@@ -166,7 +139,7 @@ export const AppDataProvider = ({
lastUpdateTime = now;
scheduleTimeout(() => {
refreshProxy().catch((error) =>
console.error("[数据提供者] 代理刷新失败:", error),
console.error("[DataProvider] Proxy refresh failed:", error),
);
}, 200);
};
@@ -178,7 +151,7 @@ export const AppDataProvider = ({
lastUpdateTime = now;
scheduleTimeout(() => {
refreshProxy().catch((error) =>
console.warn("[数据提供者] 代理刷新失败:", error),
console.warn("[DataProvider] Proxy refresh failed:", error),
);
}, 200);
};
@@ -241,7 +214,7 @@ export const AppDataProvider = ({
if (errors.length > 0) {
console.error(
`[数据提供者] 清理过程中发生 ${errors.length} 个错误:`,
`[DataProvider] ${errors.length} errors during cleanup:`,
errors,
);
}
@@ -251,26 +224,18 @@ export const AppDataProvider = ({
const { data: sysproxy, mutate: refreshSysproxy } = useSWR(
"getSystemProxy",
getSystemProxy,
{
revalidateOnFocus: false,
revalidateOnReconnect: false,
suspense: false,
errorRetryCount: 2,
dedupingInterval: 5000,
},
SWR_DEFAULTS,
);
const { data: runningMode } = useSWR("getRunningMode", getRunningMode, {
revalidateOnFocus: false,
suspense: false,
errorRetryCount: 2,
dedupingInterval: 5000,
});
const { data: runningMode } = useSWR(
"getRunningMode",
getRunningMode,
SWR_DEFAULTS,
);
const { data: uptimeData } = useSWR("appUptime", getAppUptime, {
...SWR_DEFAULTS,
refreshInterval: 3000,
revalidateOnFocus: false,
suspense: false,
errorRetryCount: 1,
});

25
src/services/config.ts Normal file
View File

@@ -0,0 +1,25 @@
import { useSWRConfig } from "swr";
export const SWR_DEFAULTS = {
revalidateOnFocus: false,
revalidateOnReconnect: false,
suspense: false,
errorRetryCount: 2,
dedupingInterval: 5000,
} as const;
export const SWR_REALTIME = {
...SWR_DEFAULTS,
refreshInterval: 8000,
dedupingInterval: 3000,
} as const;
export const SWR_SLOW_POLL = {
...SWR_DEFAULTS,
refreshInterval: 60000,
} as const;
export const useSWRMutate = () => {
const { mutate } = useSWRConfig();
return mutate;
};