diff --git a/src-tauri/src/config/clash.rs b/src-tauri/src/config/clash.rs index 6540d9798..3ca9b6e5c 100644 --- a/src-tauri/src/config/clash.rs +++ b/src-tauri/src/config/clash.rs @@ -47,30 +47,32 @@ impl IClashTemp { } pub fn template() -> Self { + use crate::constants::{network, tun as tun_const}; + let mut map = Mapping::new(); - let mut tun = Mapping::new(); + let mut tun_config = Mapping::new(); let mut cors_map = Mapping::new(); - tun.insert("enable".into(), false.into()); - #[cfg(target_os = "linux")] - tun.insert("stack".into(), "mixed".into()); - #[cfg(not(target_os = "linux"))] - tun.insert("stack".into(), "gvisor".into()); - tun.insert("auto-route".into(), true.into()); - tun.insert("strict-route".into(), false.into()); - tun.insert("auto-detect-interface".into(), true.into()); - tun.insert("dns-hijack".into(), vec!["any:53"].into()); + + tun_config.insert("enable".into(), false.into()); + tun_config.insert("stack".into(), tun_const::DEFAULT_STACK.into()); + tun_config.insert("auto-route".into(), true.into()); + tun_config.insert("strict-route".into(), false.into()); + tun_config.insert("auto-detect-interface".into(), true.into()); + tun_config.insert("dns-hijack".into(), tun_const::DNS_HIJACK.into()); + #[cfg(not(target_os = "windows"))] - map.insert("redir-port".into(), 7895.into()); + map.insert("redir-port".into(), network::ports::DEFAULT_REDIR.into()); #[cfg(target_os = "linux")] - map.insert("tproxy-port".into(), 7896.into()); - map.insert("mixed-port".into(), 7897.into()); - map.insert("socks-port".into(), 7898.into()); - map.insert("port".into(), 7899.into()); + map.insert("tproxy-port".into(), network::ports::DEFAULT_TPROXY.into()); + + map.insert("mixed-port".into(), network::ports::DEFAULT_MIXED.into()); + map.insert("socks-port".into(), network::ports::DEFAULT_SOCKS.into()); + map.insert("port".into(), network::ports::DEFAULT_HTTP.into()); map.insert("log-level".into(), "info".into()); map.insert("allow-lan".into(), false.into()); map.insert("ipv6".into(), true.into()); map.insert("mode".into(), "rule".into()); - map.insert("external-controller".into(), "127.0.0.1:9097".into()); + map.insert("external-controller".into(), network::DEFAULT_EXTERNAL_CONTROLLER.into()); #[cfg(unix)] map.insert( "external-controller-unix".into(), @@ -81,9 +83,9 @@ impl IClashTemp { "external-controller-pipe".into(), Self::guard_external_controller_ipc().into(), ); + map.insert("tun".into(), tun_config.into()); cors_map.insert("allow-private-network".into(), true.into()); - cors_map.insert( - "allow-origins".into(), + cors_map.insert("allow-origins".into(), vec![ "tauri://localhost", "http://tauri.localhost", @@ -97,7 +99,6 @@ impl IClashTemp { .into(), ); map.insert("secret".into(), "set-your-secret".into()); - map.insert("tun".into(), tun.into()); map.insert("external-controller-cors".into(), cors_map.into()); map.insert("unified-delay".into(), true.into()); Self(map) @@ -325,8 +326,8 @@ impl IClashTemp { .ok() .and_then(|path| path_to_str(&path).ok().map(|s| s.into())) .unwrap_or_else(|| { - log::error!(target: "app", "Failed to get IPC path, using default"); - "127.0.0.1:9090".into() + log::error!(target: "app", "Failed to get IPC path"); + crate::constants::network::DEFAULT_EXTERNAL_CONTROLLER.into() }) } } diff --git a/src-tauri/src/config/config.rs b/src-tauri/src/config/config.rs index 52bbaabf1..89eda8e68 100644 --- a/src-tauri/src/config/config.rs +++ b/src-tauri/src/config/config.rs @@ -1,6 +1,7 @@ use super::{IClashTemp, IProfiles, IRuntime, IVerge}; use crate::{ config::{PrfItem, profiles_append_item_safe}, + constants::{files, timing}, core::{CoreManager, handle, validate::CoreConfigValidator}, enhance, logging, utils::{Draft, dirs, help, logging::Type}, @@ -8,13 +9,9 @@ use crate::{ use anyhow::{Result, anyhow}; use backoff::{Error as BackoffError, ExponentialBackoff}; use std::path::PathBuf; -use std::time::Duration; use tokio::sync::OnceCell; use tokio::time::sleep; -pub const RUNTIME_CONFIG: &str = "clash-verge.yaml"; -pub const CHECK_CONFIG: &str = "clash-verge-check.yaml"; - pub struct Config { clash_config: Draft>, verge_config: Draft>, @@ -123,20 +120,18 @@ impl Config { Some(("config_validate::error", String::new())) }; - // 在单独的任务中发送通知 if let Some((msg_type, msg_content)) = validation_result { - sleep(Duration::from_secs(2)).await; + sleep(timing::STARTUP_ERROR_DELAY).await; handle::Handle::notice_message(msg_type, &msg_content); } Ok(()) } - /// 将订阅丢到对应的文件中 pub async fn generate_file(typ: ConfigType) -> Result { let path = match typ { - ConfigType::Run => dirs::app_home_dir()?.join(RUNTIME_CONFIG), - ConfigType::Check => dirs::app_home_dir()?.join(CHECK_CONFIG), + ConfigType::Run => dirs::app_home_dir()?.join(files::RUNTIME_CONFIG), + ConfigType::Check => dirs::app_home_dir()?.join(files::CHECK_CONFIG), }; let runtime = Config::runtime().await; @@ -152,7 +147,6 @@ impl Config { Ok(path) } - /// 生成订阅存好 pub async fn generate() -> Result<()> { let (config, exists_keys, logs) = enhance::enhance().await; @@ -166,8 +160,6 @@ impl Config { } pub async fn verify_config_initialization() { - logging!(info, Type::Setup, "Verifying config initialization..."); - let backoff_strategy = ExponentialBackoff { initial_interval: std::time::Duration::from_millis(100), max_interval: std::time::Duration::from_secs(2), @@ -178,48 +170,15 @@ impl Config { let operation = || async { if Config::runtime().await.latest_ref().config.is_some() { - logging!( - info, - Type::Setup, - "Config initialization verified successfully" - ); return Ok::<(), BackoffError>(()); } - logging!( - warn, - Type::Setup, - "Runtime config not found, attempting to regenerate..." - ); - - match Config::generate().await { - Ok(_) => { - logging!(info, Type::Setup, "Config successfully regenerated"); - Ok(()) - } - Err(e) => { - logging!(warn, Type::Setup, "Failed to generate config: {}", e); - Err(BackoffError::transient(e)) - } - } + Config::generate().await + .map_err(BackoffError::transient) }; - match backoff::future::retry(backoff_strategy, operation).await { - Ok(_) => { - logging!( - info, - Type::Setup, - "Config initialization verified with backoff retry" - ); - } - Err(e) => { - logging!( - error, - Type::Setup, - "Failed to verify config initialization after retries: {}", - e - ); - } + if let Err(e) = backoff::future::retry(backoff_strategy, operation).await { + logging!(error, Type::Setup, "Config init verification failed: {}", e); } } } diff --git a/src-tauri/src/config/verge.rs b/src-tauri/src/config/verge.rs index d2b12f896..d522e8a18 100644 --- a/src-tauri/src/config/verge.rs +++ b/src-tauri/src/config/verge.rs @@ -513,13 +513,8 @@ impl IVerge { patch!(enable_external_controller); } - /// 在初始化前尝试拿到单例端口的值 pub fn get_singleton_port() -> u16 { - #[cfg(not(feature = "verge-dev"))] - const SERVER_PORT: u16 = 33331; - #[cfg(feature = "verge-dev")] - const SERVER_PORT: u16 = 11233; - SERVER_PORT + crate::constants::network::ports::SINGLETON_SERVER } /// 获取日志等级 diff --git a/src-tauri/src/constants.rs b/src-tauri/src/constants.rs new file mode 100644 index 000000000..5d291b124 --- /dev/null +++ b/src-tauri/src/constants.rs @@ -0,0 +1,107 @@ +use std::time::Duration; + +pub mod network { + pub const DEFAULT_PROXY_HOST: &str = "127.0.0.1"; + pub const DEFAULT_EXTERNAL_CONTROLLER: &str = "127.0.0.1:9097"; + + pub mod ports { + #[allow(dead_code)] + pub const DEFAULT_REDIR: u16 = 7895; + #[allow(dead_code)] + pub const DEFAULT_TPROXY: u16 = 7896; + pub const DEFAULT_MIXED: u16 = 7897; + pub const DEFAULT_SOCKS: u16 = 7898; + pub const DEFAULT_HTTP: u16 = 7899; + #[allow(dead_code)] + pub const DEFAULT_EXTERNAL_CONTROLLER: u16 = 9097; + + #[cfg(not(feature = "verge-dev"))] + pub const SINGLETON_SERVER: u16 = 33331; + #[cfg(feature = "verge-dev")] + pub const SINGLETON_SERVER: u16 = 11233; + } +} + +pub mod bypass { + #[cfg(target_os = "windows")] + pub const DEFAULT: &str = "localhost;127.*;192.168.*;10.*;172.16.*;172.17.*;172.18.*;172.19.*;172.20.*;172.21.*;172.22.*;172.23.*;172.24.*;172.25.*;172.26.*;172.27.*;172.28.*;172.29.*;172.30.*;172.31.*;"; + + #[cfg(target_os = "linux")] + pub const DEFAULT: &str = "localhost,127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,::1"; + + #[cfg(target_os = "macos")] + pub const DEFAULT: &str = "127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,localhost,*.local,*.crashlytics.com,"; +} + +pub mod timing { + use super::Duration; + + pub const CONFIG_UPDATE_DEBOUNCE: Duration = Duration::from_millis(500); + pub const CONFIG_RELOAD_DELAY: Duration = Duration::from_millis(300); + pub const PROCESS_VERIFY_DELAY: Duration = Duration::from_millis(100); + #[allow(dead_code)] + pub const EVENT_EMIT_DELAY: Duration = Duration::from_millis(20); + pub const STARTUP_ERROR_DELAY: Duration = Duration::from_secs(2); + #[allow(dead_code)] + pub const ERROR_BATCH_DELAY: Duration = Duration::from_millis(300); + + #[cfg(target_os = "windows")] + pub const SERVICE_WAIT_MAX: Duration = Duration::from_millis(3000); + #[cfg(target_os = "windows")] + pub const SERVICE_WAIT_INTERVAL: Duration = Duration::from_millis(200); +} + +pub mod retry { + #[allow(dead_code)] + pub const EVENT_EMIT_THRESHOLD: u64 = 10; + #[allow(dead_code)] + pub const SWR_ERROR_RETRY: usize = 2; +} + +pub mod files { + pub const RUNTIME_CONFIG: &str = "clash-verge.yaml"; + pub const CHECK_CONFIG: &str = "clash-verge-check.yaml"; + #[allow(dead_code)] + pub const DNS_CONFIG: &str = "dns_config.yaml"; + #[allow(dead_code)] + pub const WINDOW_STATE: &str = "window_state.json"; +} + +pub mod process { + pub const VERGE_MIHOMO: &str = "verge-mihomo"; + pub const VERGE_MIHOMO_ALPHA: &str = "verge-mihomo-alpha"; + + pub fn process_names() -> [&'static str; 2] { + [VERGE_MIHOMO, VERGE_MIHOMO_ALPHA] + } + + #[cfg(windows)] + pub fn with_extension(name: &str) -> String { + format!("{}.exe", name) + } + + #[cfg(not(windows))] + pub fn with_extension(name: &str) -> String { + name.to_string() + } +} + +pub mod error_patterns { + pub const CONNECTION_ERRORS: &[&str] = &[ + "Failed to create connection", + "The system cannot find the file specified", + "operation timed out", + "connection refused", + ]; +} + +pub mod tun { + #[cfg(target_os = "linux")] + pub const DEFAULT_STACK: &str = "mixed"; + + #[cfg(not(target_os = "linux"))] + pub const DEFAULT_STACK: &str = "gvisor"; + + pub const DNS_HIJACK: &[&str] = &["any:53"]; +} + diff --git a/src-tauri/src/core/core.rs b/src-tauri/src/core/core.rs deleted file mode 100644 index be5df01f8..000000000 --- a/src-tauri/src/core/core.rs +++ /dev/null @@ -1,837 +0,0 @@ -use crate::AsyncHandler; -use crate::core::logger::ClashLogger; -use crate::core::validate::CoreConfigValidator; -use crate::process::CommandChildGuard; -use crate::utils::init::sidecar_writer; -use crate::utils::logging::{SharedWriter, write_sidecar_log}; -use crate::{ - config::*, - core::{ - handle, - service::{self, SERVICE_MANAGER, ServiceStatus}, - }, - logging, logging_error, singleton_lazy, - utils::{ - dirs, - help::{self}, - logging::Type, - }, -}; -use anyhow::{Result, anyhow}; -#[cfg(target_os = "windows")] -use backoff::backoff::Backoff; -#[cfg(target_os = "windows")] -use backoff::{Error as BackoffError, ExponentialBackoff}; -use compact_str::CompactString; -use flexi_logger::DeferredNow; -use log::Level; -use parking_lot::Mutex; -use std::collections::VecDeque; -use std::time::Instant; -use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Duration}; -use tauri_plugin_mihomo::Error as MihomoError; -use tauri_plugin_shell::ShellExt; -use tokio::sync::Semaphore; -use tokio::time::sleep; - -// TODO: -// - 重构,提升模式切换速度 -// - 内核启动添加启动 IPC 启动参数, `-ext-ctl-unix` / `-ext-ctl-pipe`, 运行时配置需要删除相关配置项 - -#[derive(Debug)] -pub struct CoreManager { - running: Arc>, - child_sidecar: Arc>>, - update_semaphore: Arc, - last_update: Arc>>, -} - -#[derive(Debug, Clone, Copy, serde::Serialize, PartialEq, Eq)] -pub enum RunningMode { - Service, - Sidecar, - NotRunning, -} - -impl fmt::Display for RunningMode { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RunningMode::Service => write!(f, "Service"), - RunningMode::Sidecar => write!(f, "Sidecar"), - RunningMode::NotRunning => write!(f, "NotRunning"), - } - } -} - -use crate::config::IVerge; - -const CONNECTION_ERROR_PATTERNS: &[&str] = &[ - "Failed to create connection", - "The system cannot find the file specified", - "operation timed out", - "connection refused", -]; - -impl CoreManager { - pub async fn use_default_config(&self, msg_type: &str, msg_content: &str) -> Result<()> { - let runtime_path = dirs::app_home_dir()?.join(RUNTIME_CONFIG); - let clash_config = Config::clash().await.latest_ref().0.clone(); - - *Config::runtime().await.draft_mut() = Box::new(IRuntime { - config: Some(clash_config.clone()), - exists_keys: vec![], - chain_logs: Default::default(), - }); - help::save_yaml(&runtime_path, &clash_config, Some("# Clash Verge Runtime")).await?; - handle::Handle::notice_message(msg_type, msg_content); - Ok(()) - } - - /// 更新proxies等配置 - pub async fn update_config(&self) -> Result<(bool, String)> { - if handle::Handle::global().is_exiting() { - logging!(info, Type::Config, "应用正在退出,跳过验证"); - return Ok((true, String::new())); - } - - let now = Instant::now(); - { - let mut last = self.last_update.lock(); - if let Some(last_time) = *last { - if now.duration_since(last_time) < Duration::from_millis(500) { - logging!(debug, Type::Config, "防抖:跳过重复的配置更新请求"); - return Ok((true, String::new())); - } - } - *last = Some(now); - } - - let permit = match self.update_semaphore.try_acquire() { - Ok(p) => p, - Err(_) => { - logging!(debug, Type::Config, "配置更新已在进行中,跳过"); - return Ok((true, String::new())); - } - }; - - let result = async { - logging!(info, Type::Config, "生成新的配置内容"); - Config::generate().await?; - - match CoreConfigValidator::global().validate_config().await { - Ok((true, _)) => { - logging!(info, Type::Config, "配置验证通过, 生成运行时配置"); - let run_path = Config::generate_file(ConfigType::Run).await?; - self.put_configs_force(run_path).await?; - Ok((true, String::new())) - } - Ok((false, error_msg)) => { - logging!(warn, Type::Config, "配置验证失败: {}", error_msg); - Config::runtime().await.discard(); - Ok((false, error_msg)) - } - Err(e) => { - logging!(warn, Type::Config, "验证过程发生错误: {}", e); - Config::runtime().await.discard(); - Err(e) - } - } - } - .await; - - drop(permit); - result - } - pub async fn put_configs_force(&self, path_buf: PathBuf) -> Result<()> { - let run_path_str = dirs::path_to_str(&path_buf).map_err(|e| { - let msg = e.to_string(); - logging_error!(Type::Core, "{}", msg); - anyhow!(msg) - })?; - - match self.reload_config_once(run_path_str).await { - Ok(_) => { - Config::runtime().await.apply(); - logging!(info, Type::Core, "Configuration updated successfully"); - Ok(()) - } - Err(err) => { - let should_retry = Self::should_restart_on_reload_error(&err); - let err_msg = err.to_string(); - - if should_retry && !handle::Handle::global().is_exiting() { - logging!( - warn, - Type::Core, - "Reload config failed ({}), restarting core and retrying", - err_msg - ); - if let Err(restart_err) = self.restart_core().await { - Config::runtime().await.discard(); - logging_error!( - Type::Core, - "Failed to restart core after reload error: {}", - restart_err - ); - return Err(restart_err); - } - sleep(Duration::from_millis(300)).await; - - match self.reload_config_once(run_path_str).await { - Ok(_) => { - Config::runtime().await.apply(); - logging!( - info, - Type::Core, - "Configuration updated successfully after restarting core" - ); - return Ok(()); - } - Err(retry_err) => { - let retry_msg = retry_err.to_string(); - Config::runtime().await.discard(); - logging_error!( - Type::Core, - "Failed to update configuration after restart: {}", - retry_msg - ); - return Err(anyhow!(retry_msg)); - } - } - } - - Config::runtime().await.discard(); - logging_error!(Type::Core, "Failed to update configuration: {}", err_msg); - Err(anyhow!(err_msg)) - } - } - } - - async fn reload_config_once(&self, config_path: &str) -> std::result::Result<(), MihomoError> { - handle::Handle::mihomo() - .await - .reload_config(true, config_path) - .await - } - - fn should_restart_on_reload_error(err: &MihomoError) -> bool { - fn is_connection_io_error(kind: std::io::ErrorKind) -> bool { - matches!( - kind, - std::io::ErrorKind::ConnectionAborted - | std::io::ErrorKind::ConnectionRefused - | std::io::ErrorKind::ConnectionReset - | std::io::ErrorKind::NotFound - ) - } - - fn contains_error_pattern(text: &str) -> bool { - CONNECTION_ERROR_PATTERNS.iter().any(|p| text.contains(p)) - } - - match err { - MihomoError::ConnectionFailed | MihomoError::ConnectionLost => true, - MihomoError::Io(io_err) => is_connection_io_error(io_err.kind()), - MihomoError::Reqwest(req_err) => { - if req_err.is_connect() || req_err.is_timeout() { - return true; - } - if let Some(source) = req_err.source() { - if let Some(io_err) = source.downcast_ref::() { - if is_connection_io_error(io_err.kind()) { - return true; - } - } else if contains_error_pattern(&source.to_string()) { - return true; - } - } - contains_error_pattern(&req_err.to_string()) - } - MihomoError::FailedResponse(msg) => contains_error_pattern(msg), - _ => false, - } - } -} - -impl CoreManager { - async fn cleanup_orphaned_mihomo_processes(&self) -> Result<()> { - logging!(info, Type::Core, "开始清理多余的 mihomo 进程"); - - let current_pid = self - .child_sidecar - .lock() - .as_ref() - .and_then(|child| child.pid()); - let target_processes = ["verge-mihomo", "verge-mihomo-alpha"]; - - let process_futures = target_processes.iter().map(|&target| { - let process_name = if cfg!(windows) { - format!("{target}.exe") - } else { - target.to_string() - }; - self.find_processes_by_name(process_name, target) - }); - - let process_results = futures::future::join_all(process_futures).await; - - let pids_to_kill: Vec<_> = process_results - .into_iter() - .filter_map(|result| result.ok()) - .flat_map(|(pids, process_name)| { - pids.into_iter() - .filter(|&pid| Some(pid) != current_pid) - .map(move |pid| (pid, process_name.clone())) - }) - .collect(); - - if pids_to_kill.is_empty() { - logging!(debug, Type::Core, "未发现多余的 mihomo 进程"); - return Ok(()); - } - - let kill_futures = pids_to_kill - .iter() - .map(|(pid, name)| self.kill_process_with_verification(*pid, name.clone())); - - let killed_count = futures::future::join_all(kill_futures) - .await - .into_iter() - .filter(|&success| success) - .count(); - - if killed_count > 0 { - logging!( - info, - Type::Core, - "清理完成,共终止了 {} 个多余的 mihomo 进程", - killed_count - ); - } - - Ok(()) - } - - /// 根据进程名查找进程PID列 - async fn find_processes_by_name( - &self, - process_name: String, - _target: &str, - ) -> Result<(Vec, String)> { - #[cfg(windows)] - { - use std::mem; - use winapi::um::handleapi::CloseHandle; - use winapi::um::tlhelp32::{ - CreateToolhelp32Snapshot, PROCESSENTRY32W, Process32FirstW, Process32NextW, - TH32CS_SNAPPROCESS, - }; - use winapi::um::winnt::HANDLE; - - let process_name_clone = process_name.clone(); - let pids = AsyncHandler::spawn_blocking(move || -> Result> { - let mut pids = Vec::with_capacity(8); - - unsafe { - let snapshot: HANDLE = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); - if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE { - return Err(anyhow::anyhow!("Failed to create process snapshot")); - } - - let mut pe32: PROCESSENTRY32W = mem::zeroed(); - pe32.dwSize = mem::size_of::() as u32; - - if Process32FirstW(snapshot, &mut pe32) != 0 { - loop { - let end_pos = pe32 - .szExeFile - .iter() - .position(|&x| x == 0) - .unwrap_or(pe32.szExeFile.len()); - - if end_pos > 0 { - let exe_file = String::from_utf16_lossy(&pe32.szExeFile[..end_pos]); - if exe_file.eq_ignore_ascii_case(&process_name_clone) { - pids.push(pe32.th32ProcessID); - } - } - - if Process32NextW(snapshot, &mut pe32) == 0 { - break; - } - } - } - - CloseHandle(snapshot); - } - - Ok(pids) - }) - .await??; - - Ok((pids, process_name)) - } - - #[cfg(not(windows))] - { - let output = if cfg!(target_os = "macos") { - tokio::process::Command::new("pgrep") - .arg(&process_name) - .output() - .await? - } else { - tokio::process::Command::new("pidof") - .arg(&process_name) - .output() - .await? - }; - - if !output.status.success() { - return Ok((Vec::new(), process_name)); - } - - let stdout = String::from_utf8_lossy(&output.stdout); - let pids: Vec = stdout - .split_whitespace() - .filter_map(|s| s.parse().ok()) - .collect(); - - Ok((pids, process_name)) - } - } - - async fn kill_process_with_verification(&self, pid: u32, process_name: String) -> bool { - logging!( - info, - Type::Core, - "尝试终止进程: {} (PID: {})", - process_name, - pid - ); - - #[cfg(windows)] - let success = { - use winapi::um::handleapi::CloseHandle; - use winapi::um::processthreadsapi::{OpenProcess, TerminateProcess}; - use winapi::um::winnt::{HANDLE, PROCESS_TERMINATE}; - - AsyncHandler::spawn_blocking(move || unsafe { - let handle: HANDLE = OpenProcess(PROCESS_TERMINATE, 0, pid); - if handle.is_null() { - return false; - } - let result = TerminateProcess(handle, 1) != 0; - CloseHandle(handle); - result - }) - .await - .unwrap_or(false) - }; - - #[cfg(not(windows))] - let success = tokio::process::Command::new("kill") - .args(["-9", &pid.to_string()]) - .output() - .await - .map(|output| output.status.success()) - .unwrap_or(false); - - if !success { - logging!( - warn, - Type::Core, - "无法终止进程: {} (PID: {})", - process_name, - pid - ); - return false; - } - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - if self.is_process_running(pid).await.unwrap_or(false) { - logging!( - warn, - Type::Core, - "进程 {} (PID: {}) 终止命令成功但进程仍在运行", - process_name, - pid - ); - false - } else { - logging!( - info, - Type::Core, - "成功终止进程: {} (PID: {})", - process_name, - pid - ); - true - } - } - - async fn is_process_running(&self, pid: u32) -> Result { - #[cfg(windows)] - { - use winapi::shared::minwindef::DWORD; - use winapi::um::handleapi::CloseHandle; - use winapi::um::processthreadsapi::{GetExitCodeProcess, OpenProcess}; - use winapi::um::winnt::{HANDLE, PROCESS_QUERY_INFORMATION}; - - AsyncHandler::spawn_blocking(move || unsafe { - let handle: HANDLE = OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid); - if handle.is_null() { - return Ok(false); - } - let mut exit_code: DWORD = 0; - let result = GetExitCodeProcess(handle, &mut exit_code); - CloseHandle(handle); - Ok(result != 0 && exit_code == 259) - }) - .await? - } - - #[cfg(not(windows))] - { - let output = tokio::process::Command::new("ps") - .args(["-p", &pid.to_string()]) - .output() - .await?; - - Ok(output.status.success() && !output.stdout.is_empty()) - } - } - - async fn start_core_by_sidecar(&self) -> Result<()> { - logging!(info, Type::Core, "Running core by sidecar"); - - let config_file = &Config::generate_file(ConfigType::Run).await?; - let app_handle = handle::Handle::app_handle(); - let clash_core = Config::verge().await.latest_ref().get_valid_clash_core(); - let config_dir = dirs::app_home_dir()?; - - let (mut rx, child) = app_handle - .shell() - .sidecar(&clash_core)? - .args([ - "-d", - dirs::path_to_str(&config_dir)?, - "-f", - dirs::path_to_str(config_file)?, - ]) - .spawn()?; - - let pid = child.pid(); - logging!(trace, Type::Core, "Started core by sidecar pid: {}", pid); - *self.child_sidecar.lock() = Some(CommandChildGuard::new(child)); - self.set_running_mode(RunningMode::Sidecar); - - let shared_writer: SharedWriter = - Arc::new(tokio::sync::Mutex::new(sidecar_writer().await?)); - - AsyncHandler::spawn(|| async move { - while let Some(event) = rx.recv().await { - match event { - tauri_plugin_shell::process::CommandEvent::Stdout(line) - | tauri_plugin_shell::process::CommandEvent::Stderr(line) => { - let mut now = DeferredNow::default(); - let message = CompactString::from(String::from_utf8_lossy(&line).as_ref()); - let w = shared_writer.lock().await; - write_sidecar_log(w, &mut now, Level::Error, &message); - ClashLogger::global().append_log(message); - } - tauri_plugin_shell::process::CommandEvent::Terminated(term) => { - let mut now = DeferredNow::default(); - let message = if let Some(code) = term.code { - CompactString::from(format!("Process terminated with code: {}", code)) - } else if let Some(signal) = term.signal { - CompactString::from(format!("Process terminated by signal: {}", signal)) - } else { - CompactString::from("Process terminated") - }; - let w = shared_writer.lock().await; - write_sidecar_log(w, &mut now, Level::Info, &message); - ClashLogger::global().clear_logs(); - break; - } - _ => {} - } - } - }); - - Ok(()) - } - fn stop_core_by_sidecar(&self) -> Result<()> { - logging!(info, Type::Core, "Stopping core by sidecar"); - - if let Some(child) = self.child_sidecar.lock().take() { - let pid = child.pid(); - drop(child); - logging!(trace, Type::Core, "Stopped core by sidecar pid: {:?}", pid); - } - self.set_running_mode(RunningMode::NotRunning); - Ok(()) - } -} - -impl CoreManager { - async fn start_core_by_service(&self) -> Result<()> { - logging!(info, Type::Core, "Running core by service"); - let config_file = &Config::generate_file(ConfigType::Run).await?; - service::run_core_by_service(config_file).await?; - self.set_running_mode(RunningMode::Service); - Ok(()) - } - - async fn stop_core_by_service(&self) -> Result<()> { - logging!(info, Type::Core, "Stopping core by service"); - service::stop_core_by_service().await?; - self.set_running_mode(RunningMode::NotRunning); - Ok(()) - } -} - -impl Default for CoreManager { - fn default() -> Self { - CoreManager { - running: Arc::new(Mutex::new(RunningMode::NotRunning)), - child_sidecar: Arc::new(Mutex::new(None)), - update_semaphore: Arc::new(Semaphore::new(1)), - last_update: Arc::new(Mutex::new(None)), - } - } -} - -impl CoreManager { - pub async fn init(&self) -> Result<()> { - logging!(info, Type::Core, "开始核心初始化"); - - if let Err(e) = self.cleanup_orphaned_mihomo_processes().await { - logging!(warn, Type::Core, "清理遗留 mihomo 进程失败: {}", e); - } - - self.start_core().await?; - logging!(info, Type::Core, "核心初始化完成"); - Ok(()) - } - - pub fn set_running_mode(&self, mode: RunningMode) { - let mut guard = self.running.lock(); - *guard = mode; - } - - pub fn get_running_mode(&self) -> RunningMode { - *self.running.lock() - } - - #[cfg(target_os = "windows")] - async fn wait_for_service_ready_if_tun_enabled(&self) { - let require_service = Config::verge() - .await - .latest_ref() - .enable_tun_mode - .unwrap_or(false); - - if !require_service { - return; - } - - let max_wait = Duration::from_millis(3000); - let mut backoff_strategy = ExponentialBackoff { - initial_interval: Duration::from_millis(200), - max_interval: Duration::from_millis(200), - max_elapsed_time: Some(max_wait), - multiplier: 1.0, - randomization_factor: 0.0, - ..Default::default() - }; - backoff_strategy.reset(); - - let mut attempts = 0usize; - - let operation = || { - attempts += 1; - let attempt = attempts; - - async move { - let mut manager = SERVICE_MANAGER.lock().await; - - if matches!(manager.current(), ServiceStatus::Ready) { - if attempt > 1 { - logging!( - info, - Type::Core, - "Service became ready for TUN after {} attempt(s)", - attempt - ); - } - return Ok(()); - } - - if attempt == 1 { - logging!( - info, - Type::Core, - "TUN mode enabled but service not ready; waiting for service availability" - ); - } - - match manager.init().await { - Ok(_) => { - logging_error!(Type::Core, manager.refresh().await); - } - Err(err) => { - logging!( - debug, - Type::Core, - "Service connection attempt {} failed while waiting for TUN: {}", - attempt, - err - ); - return Err(BackoffError::transient(err)); - } - } - - if matches!(manager.current(), ServiceStatus::Ready) { - logging!( - info, - Type::Core, - "Service became ready for TUN after {} attempt(s)", - attempt - ); - return Ok(()); - } - - logging!( - debug, - Type::Core, - "Service not ready after attempt {}; retrying with backoff", - attempt - ); - - Err(BackoffError::transient(anyhow!("Service not ready yet"))) - } - }; - - let wait_started = Instant::now(); - - if let Err(err) = backoff::future::retry(backoff_strategy, operation).await { - let waited_ms = wait_started.elapsed().as_millis(); - logging!( - warn, - Type::Core, - "Service still not ready after waiting approximately {} ms ({} attempt(s)); falling back to sidecar mode: {}", - waited_ms, - attempts, - err - ); - } - } - - // TODO: 是否需要在非windows平台上进行检测 - #[allow(clippy::unused_async)] - #[cfg(not(target_os = "windows"))] - async fn wait_for_service_ready_if_tun_enabled(&self) {} - - pub async fn prestart_core(&self) -> Result<()> { - self.wait_for_service_ready_if_tun_enabled().await; - - match SERVICE_MANAGER.lock().await.current() { - ServiceStatus::Ready => { - self.set_running_mode(RunningMode::Service); - } - _ => { - self.set_running_mode(RunningMode::Sidecar); - } - } - Ok(()) - } - - /// 启动核心 - pub async fn start_core(&self) -> Result<()> { - self.prestart_core().await?; - - match self.get_running_mode() { - RunningMode::Service => { - logging_error!(Type::Core, self.start_core_by_service().await); - } - RunningMode::NotRunning | RunningMode::Sidecar => { - logging_error!(Type::Core, self.start_core_by_sidecar().await); - } - }; - - Ok(()) - } - - pub async fn get_clash_logs(&self) -> Result> { - logging!(info, Type::Core, "get clash logs"); - let logs = match self.get_running_mode() { - RunningMode::Service => service::get_clash_logs_by_service().await?, - RunningMode::Sidecar => ClashLogger::global().get_logs().clone(), - _ => VecDeque::new(), - }; - Ok(logs) - } - - /// 停止核心运行 - pub async fn stop_core(&self) -> Result<()> { - ClashLogger::global().clear_logs(); - match self.get_running_mode() { - RunningMode::Service => self.stop_core_by_service().await, - RunningMode::Sidecar => self.stop_core_by_sidecar(), - RunningMode::NotRunning => Ok(()), - } - } - - /// 重启内核 - pub async fn restart_core(&self) -> Result<()> { - logging!(info, Type::Core, "Restarting core"); - self.stop_core().await?; - if SERVICE_MANAGER.lock().await.init().await.is_ok() { - logging_error!(Type::Setup, SERVICE_MANAGER.lock().await.refresh().await); - } - self.start_core().await?; - Ok(()) - } - - /// 切换核心 - pub async fn change_core(&self, clash_core: Option) -> Result<(), String> { - if clash_core.is_none() { - let error_message = "Clash core should not be Null"; - logging!(error, Type::Core, "{}", error_message); - return Err(error_message.into()); - } - let core = clash_core.as_ref().ok_or_else(|| { - let msg = "Clash core should not be None"; - logging!(error, Type::Core, "{}", msg); - msg.to_string() - })?; - if !IVerge::VALID_CLASH_CORES.contains(&core.as_str()) { - let error_message = format!("Clash core invalid name: {core}"); - logging!(error, Type::Core, "{}", error_message); - return Err(error_message); - } - - Config::verge().await.draft_mut().clash_core = clash_core.clone(); - Config::verge().await.apply(); - - // 分离数据获取和异步调用避免Send问题 - let verge_data = Config::verge().await.latest_ref().clone(); - logging_error!(Type::Core, verge_data.save_file().await); - - let run_path = Config::generate_file(ConfigType::Run).await.map_err(|e| { - let msg = e.to_string(); - logging_error!(Type::Core, "{}", msg); - msg - })?; - - self.put_configs_force(run_path) - .await - .map_err(|e| e.to_string())?; - - Ok(()) - } -} - -// Use simplified singleton_lazy macro -singleton_lazy!(CoreManager, CORE_MANAGER, CoreManager::default); diff --git a/src-tauri/src/core/event_driven_proxy.rs b/src-tauri/src/core/event_driven_proxy.rs index 396afd091..567166f19 100644 --- a/src-tauri/src/core/event_driven_proxy.rs +++ b/src-tauri/src/core/event_driven_proxy.rs @@ -412,47 +412,42 @@ impl EventDrivenProxyManager { } async fn get_expected_sys_proxy() -> Sysproxy { - let verge_config = Config::verge().await; - let verge_mixed_port = verge_config.latest_ref().verge_mixed_port; - let proxy_host = verge_config.latest_ref().proxy_host.clone(); - - let port = verge_mixed_port.unwrap_or(Config::clash().await.latest_ref().get_mixed_port()); - let proxy_host = proxy_host.unwrap_or_else(|| "127.0.0.1".into()); + use crate::constants::network; + + let (verge_mixed_port, proxy_host) = { + let verge_config = Config::verge().await; + let verge_ref = verge_config.latest_ref(); + (verge_ref.verge_mixed_port, verge_ref.proxy_host.clone()) + }; + + let default_port = { + let clash_config = Config::clash().await; + clash_config.latest_ref().get_mixed_port() + }; + + let port = verge_mixed_port.unwrap_or(default_port); + let host = proxy_host.unwrap_or_else(|| network::DEFAULT_PROXY_HOST.into()); Sysproxy { enable: true, - host: proxy_host, + host, port, bypass: Self::get_bypass_config().await, } } async fn get_bypass_config() -> String { - let (use_default, custom_bypass) = { - let verge_config = Config::verge().await; - let verge = verge_config.latest_ref(); - ( - verge.use_default_bypass.unwrap_or(true), - verge.system_proxy_bypass.clone().unwrap_or_default(), - ) - }; + use crate::constants::bypass; + + let verge_config = Config::verge().await; + let verge = verge_config.latest_ref(); + let use_default = verge.use_default_bypass.unwrap_or(true); + let custom = verge.system_proxy_bypass.as_deref().unwrap_or(""); - #[cfg(target_os = "windows")] - let default_bypass = "localhost;127.*;192.168.*;10.*;172.16.*;172.17.*;172.18.*;172.19.*;172.20.*;172.21.*;172.22.*;172.23.*;172.24.*;172.25.*;172.26.*;172.27.*;172.28.*;172.29.*;172.30.*;172.31.*;"; - - #[cfg(target_os = "linux")] - let default_bypass = - "localhost,127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,::1"; - - #[cfg(target_os = "macos")] - let default_bypass = "127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,172.29.0.0/16,localhost,*.local,*.crashlytics.com,"; - - if custom_bypass.is_empty() { - default_bypass.to_string() - } else if use_default { - format!("{default_bypass},{custom_bypass}") - } else { - custom_bypass + match (use_default, custom.is_empty()) { + (_, true) => bypass::DEFAULT.to_string(), + (true, false) => format!("{},{}", bypass::DEFAULT, custom), + (false, false) => custom.to_string(), } } diff --git a/src-tauri/src/core/handle.rs b/src-tauri/src/core/handle.rs index f3c83f743..c0adf62d7 100644 --- a/src-tauri/src/core/handle.rs +++ b/src-tauri/src/core/handle.rs @@ -1,260 +1,18 @@ -use crate::{APP_HANDLE, singleton}; +use crate::{APP_HANDLE, singleton, constants::timing}; use parking_lot::RwLock; -use std::{ - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - mpsc, - }, - thread, - time::{Duration, Instant}, -}; -use tauri::{AppHandle, Emitter, Manager, WebviewWindow}; +use std::{sync::Arc, thread}; +use tauri::{AppHandle, Manager, WebviewWindow}; use tauri_plugin_mihomo::{Mihomo, MihomoExt}; -use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; +use tokio::sync::RwLockReadGuard; -use crate::{logging, utils::logging::Type}; - -/// 不同类型的前端通知 -#[derive(Debug, Clone)] -enum FrontendEvent { - RefreshClash, - RefreshVerge, - NoticeMessage { status: String, message: String }, - ProfileChanged { current_profile_id: String }, - TimerUpdated { profile_index: String }, - ProfileUpdateStarted { uid: String }, - ProfileUpdateCompleted { uid: String }, -} - -/// 事件发送统计和监控 -#[derive(Debug, Default)] -struct EventStats { - total_sent: AtomicU64, - total_errors: AtomicU64, - last_error_time: RwLock>, -} - -/// 存储启动期间的错误消息 -#[derive(Debug, Clone)] -struct ErrorMessage { - status: String, - message: String, -} - -/// 全局前端通知系统 -#[derive(Debug)] -struct NotificationSystem { - sender: Option>, - worker_handle: Option>, - is_running: bool, - stats: EventStats, - last_emit_time: RwLock, - /// 当通知系统失败超过阈值时,进入紧急模式 - emergency_mode: RwLock, -} - -impl Default for NotificationSystem { - fn default() -> Self { - Self::new() - } -} - -impl NotificationSystem { - fn new() -> Self { - Self { - sender: None, - worker_handle: None, - is_running: false, - stats: EventStats::default(), - last_emit_time: RwLock::new(Instant::now()), - emergency_mode: RwLock::new(false), - } - } - - /// 启动通知处理线程 - fn start(&mut self) { - if self.is_running { - return; - } - - let (tx, rx) = mpsc::channel(); - self.sender = Some(tx); - self.is_running = true; - - *self.last_emit_time.write() = Instant::now(); - - match thread::Builder::new() - .name("frontend-notifier".into()) - .spawn(move || { - let handle = Handle::global(); - - while !handle.is_exiting() { - match rx.recv_timeout(Duration::from_millis(100)) { - Ok(event) => { - let system_guard = handle.notification_system.read(); - let Some(system) = system_guard.as_ref() else { - log::warn!("NotificationSystem not found in handle while processing event."); - continue; - }; - - let is_emergency = *system.emergency_mode.read(); - - if is_emergency - && let FrontendEvent::NoticeMessage { ref status, .. } = event - && status == "info" { - log::warn!( - "Emergency mode active, skipping info message" - ); - continue; - } - - if let Some(window) = Handle::get_window() { - *system.last_emit_time.write() = Instant::now(); - - let (event_name_str, payload_result) = match event { - FrontendEvent::RefreshClash => { - ("verge://refresh-clash-config", Ok(serde_json::json!("yes"))) - } - FrontendEvent::RefreshVerge => { - ("verge://refresh-verge-config", Ok(serde_json::json!("yes"))) - } - FrontendEvent::NoticeMessage { status, message } => { - match serde_json::to_value((status, message)) { - Ok(p) => ("verge://notice-message", Ok(p)), - Err(e) => { - log::error!("Failed to serialize NoticeMessage payload: {e}"); - ("verge://notice-message", Err(e)) - } - } - } - FrontendEvent::ProfileChanged { current_profile_id } => { - ("profile-changed", Ok(serde_json::json!(current_profile_id))) - } - FrontendEvent::TimerUpdated { profile_index } => { - ("verge://timer-updated", Ok(serde_json::json!(profile_index))) - } - FrontendEvent::ProfileUpdateStarted { uid } => { - ("profile-update-started", Ok(serde_json::json!({ "uid": uid }))) - } - FrontendEvent::ProfileUpdateCompleted { uid } => { - ("profile-update-completed", Ok(serde_json::json!({ "uid": uid }))) - } - }; - - if let Ok(payload) = payload_result { - match window.emit(event_name_str, payload) { - Ok(_) => { - system.stats.total_sent.fetch_add(1, Ordering::SeqCst); - // 记录成功发送的事件 - if log::log_enabled!(log::Level::Debug) { - log::debug!("Successfully emitted event: {event_name_str}"); - } - } - Err(e) => { - log::warn!("Failed to emit event {event_name_str}: {e}"); - system.stats.total_errors.fetch_add(1, Ordering::SeqCst); - *system.stats.last_error_time.write() = Some(Instant::now()); - - let errors = system.stats.total_errors.load(Ordering::SeqCst); - const EMIT_ERROR_THRESHOLD: u64 = 10; - if errors > EMIT_ERROR_THRESHOLD && !*system.emergency_mode.read() { - log::warn!( - "Reached {EMIT_ERROR_THRESHOLD} emit errors, entering emergency mode" - ); - *system.emergency_mode.write() = true; - } - } - } - } else { - system.stats.total_errors.fetch_add(1, Ordering::SeqCst); - *system.stats.last_error_time.write() = Some(Instant::now()); - log::warn!("Skipped emitting event due to payload serialization error for {event_name_str}"); - } - } else { - log::warn!("No window found, skipping event emit."); - } - thread::sleep(Duration::from_millis(20)); - } - Err(mpsc::RecvTimeoutError::Timeout) => { - } - Err(mpsc::RecvTimeoutError::Disconnected) => { - log::info!( - "Notification channel disconnected, exiting worker thread" - ); - break; - } - } - } - - log::info!("Notification worker thread exiting"); - }) { - Ok(handle) => { - self.worker_handle = Some(handle); - } - Err(e) => { - log::error!("Failed to start notification worker thread: {e}"); - } - } - } - - /// 发送事件到队列 - fn send_event(&self, event: FrontendEvent) -> bool { - if *self.emergency_mode.read() - && let FrontendEvent::NoticeMessage { ref status, .. } = event - && status == "info" - { - log::info!("Skipping info message in emergency mode"); - return false; - } - - if let Some(sender) = &self.sender { - match sender.send(event) { - Ok(_) => true, - Err(e) => { - log::warn!("Failed to send event to notification queue: {e:?}"); - self.stats.total_errors.fetch_add(1, Ordering::SeqCst); - *self.stats.last_error_time.write() = Some(Instant::now()); - false - } - } - } else { - log::warn!("Notification system not started, can't send event"); - false - } - } - - fn shutdown(&mut self) { - log::info!("NotificationSystem shutdown initiated"); - self.is_running = false; - - // 先关闭发送端,让接收端知道不会再有新消息 - if let Some(sender) = self.sender.take() { - drop(sender); - } - - // 设置超时避免无限等待 - if let Some(handle) = self.worker_handle.take() { - match handle.join() { - Ok(_) => { - log::info!("NotificationSystem worker thread joined successfully"); - } - Err(e) => { - log::error!("NotificationSystem worker thread join failed: {e:?}"); - } - } - } - - log::info!("NotificationSystem shutdown completed"); - } -} +use super::notification::{ErrorMessage, FrontendEvent, NotificationSystem}; #[derive(Debug, Clone)] pub struct Handle { - pub is_exiting: Arc>, + is_exiting: Arc>, startup_errors: Arc>>, startup_completed: Arc>, - notification_system: Arc>>, + pub(crate) notification_system: Arc>>, } impl Default for Handle { @@ -268,7 +26,6 @@ impl Default for Handle { } } -// Use singleton macro singleton!(Handle, HANDLE); impl Handle { @@ -277,45 +34,28 @@ impl Handle { } pub fn init(&self) { - // 如果正在退出,不要重新初始化 if self.is_exiting() { - log::debug!("Handle::init called while exiting, skipping initialization"); return; } let mut system_opt = self.notification_system.write(); if let Some(system) = system_opt.as_mut() { - // 只在未运行时启动 if !system.is_running { system.start(); - } else { - log::debug!("NotificationSystem already running, skipping start"); } } } - /// 获取 AppHandle - #[allow(clippy::expect_used)] pub fn app_handle() -> &'static AppHandle { - APP_HANDLE.get().expect("failed to get global app handle") + APP_HANDLE.get().expect("App handle not initialized") } pub async fn mihomo() -> RwLockReadGuard<'static, Mihomo> { Self::app_handle().mihomo().read().await } - #[allow(unused)] - pub async fn mihomo_mut() -> RwLockWriteGuard<'static, Mihomo> { - Self::app_handle().mihomo().write().await - } - pub fn get_window() -> Option { - let app_handle = Self::app_handle(); - let window: Option = app_handle.get_webview_window("main"); - if window.is_none() { - log::debug!(target:"app", "main window not found"); - } - window + Self::app_handle().get_webview_window("main") } pub fn refresh_clash() { @@ -343,86 +83,29 @@ impl Handle { } pub fn notify_profile_changed(profile_id: String) { - let handle = Self::global(); - if handle.is_exiting() { - return; - } - - let system_opt = handle.notification_system.read(); - if let Some(system) = system_opt.as_ref() { - system.send_event(FrontendEvent::ProfileChanged { - current_profile_id: profile_id, - }); - } else { - log::warn!( - "Notification system not initialized when trying to send ProfileChanged event." - ); - } + Self::send_event(FrontendEvent::ProfileChanged { + current_profile_id: profile_id, + }); } pub fn notify_timer_updated(profile_index: String) { - let handle = Self::global(); - if handle.is_exiting() { - return; - } - - let system_opt = handle.notification_system.read(); - if let Some(system) = system_opt.as_ref() { - system.send_event(FrontendEvent::TimerUpdated { profile_index }); - } else { - log::warn!( - "Notification system not initialized when trying to send TimerUpdated event." - ); - } + Self::send_event(FrontendEvent::TimerUpdated { profile_index }); } pub fn notify_profile_update_started(uid: String) { - let handle = Self::global(); - if handle.is_exiting() { - return; - } - - let system_opt = handle.notification_system.read(); - if let Some(system) = system_opt.as_ref() { - system.send_event(FrontendEvent::ProfileUpdateStarted { uid }); - } else { - log::warn!( - "Notification system not initialized when trying to send ProfileUpdateStarted event." - ); - } + Self::send_event(FrontendEvent::ProfileUpdateStarted { uid }); } pub fn notify_profile_update_completed(uid: String) { - let handle = Self::global(); - if handle.is_exiting() { - return; - } - - let system_opt = handle.notification_system.read(); - if let Some(system) = system_opt.as_ref() { - system.send_event(FrontendEvent::ProfileUpdateCompleted { uid }); - } else { - log::warn!( - "Notification system not initialized when trying to send ProfileUpdateCompleted event." - ); - } + Self::send_event(FrontendEvent::ProfileUpdateCompleted { uid }); } - /// 通知前端显示消息队列 pub fn notice_message, M: Into>(status: S, msg: M) { let handle = Self::global(); let status_str = status.into(); let msg_str = msg.into(); if !*handle.startup_completed.read() { - logging!( - info, - Type::Frontend, - "启动过程中发现错误,加入消息队列: {} - {}", - status_str, - msg_str - ); - let mut errors = handle.startup_errors.write(); errors.push(ErrorMessage { status: status_str, @@ -435,25 +118,29 @@ impl Handle { return; } + Self::send_event(FrontendEvent::NoticeMessage { + status: status_str, + message: msg_str, + }); + } + + fn send_event(event: FrontendEvent) { + let handle = Self::global(); + if handle.is_exiting() { + return; + } + let system_opt = handle.notification_system.read(); if let Some(system) = system_opt.as_ref() { - system.send_event(FrontendEvent::NoticeMessage { - status: status_str, - message: msg_str, - }); + system.send_event(event); } } pub fn mark_startup_completed(&self) { - { - let mut completed = self.startup_completed.write(); - *completed = true; - } - + *self.startup_completed.write() = true; self.send_startup_errors(); } - /// 发送启动时累积的所有错误消息 fn send_startup_errors(&self) { let errors = { let mut errors = self.startup_errors.write(); @@ -464,19 +151,10 @@ impl Handle { return; } - logging!( - info, - Type::Frontend, - "发送{}条启动时累积的错误消息: {:?}", - errors.len(), - errors - ); - - // 启动单独线程处理启动错误,避免阻塞主线程 - let thread_result = thread::Builder::new() + let _ = thread::Builder::new() .name("startup-errors-sender".into()) .spawn(move || { - thread::sleep(Duration::from_secs(2)); + thread::sleep(timing::STARTUP_ERROR_DELAY); let handle = Handle::global(); if handle.is_exiting() { @@ -495,19 +173,14 @@ impl Handle { message: error.message, }); - thread::sleep(Duration::from_millis(300)); + thread::sleep(timing::ERROR_BATCH_DELAY); } } }); - - if let Err(e) = thread_result { - log::error!("Failed to spawn startup errors thread: {e}"); - } } pub fn set_is_exiting(&self) { - let mut is_exiting = self.is_exiting.write(); - *is_exiting = true; + *self.is_exiting.write() = true; let mut system_opt = self.notification_system.write(); if let Some(system) = system_opt.as_mut() { @@ -523,33 +196,17 @@ impl Handle { #[cfg(target_os = "macos")] impl Handle { pub fn set_activation_policy(&self, policy: tauri::ActivationPolicy) -> Result<(), String> { - let app_handle = Self::app_handle(); - app_handle + Self::app_handle() .set_activation_policy(policy) .map_err(|e| e.to_string()) } pub fn set_activation_policy_regular(&self) { - if let Err(e) = self.set_activation_policy(tauri::ActivationPolicy::Regular) { - logging!( - warn, - Type::Setup, - "Failed to set regular activation policy: {}", - e - ); - } + let _ = self.set_activation_policy(tauri::ActivationPolicy::Regular); } pub fn set_activation_policy_accessory(&self) { - if let Err(e) = self.set_activation_policy(tauri::ActivationPolicy::Accessory) { - logging!( - warn, - Type::Setup, - "Failed to set accessory activation policy: {}", - e - ); - } + let _ = self.set_activation_policy(tauri::ActivationPolicy::Accessory); } - - // Remove dead code policy prohibited function since https://github.com/clash-verge-rev/clash-verge-rev/pull/5103 } + diff --git a/src-tauri/src/core/manager/config.rs b/src-tauri/src/core/manager/config.rs new file mode 100644 index 000000000..825eb889a --- /dev/null +++ b/src-tauri/src/core/manager/config.rs @@ -0,0 +1,152 @@ +use super::CoreManager; +use crate::{ + config::*, + constants::timing, + core::{handle, validate::CoreConfigValidator}, + logging, + utils::{dirs, help, logging::Type}, +}; +use anyhow::{Result, anyhow}; +use std::{path::PathBuf, time::Instant}; +use tauri_plugin_mihomo::Error as MihomoError; +use tokio::time::sleep; + +impl CoreManager { + pub async fn use_default_config(&self, error_key: &str, error_msg: &str) -> Result<()> { + use crate::constants::files::RUNTIME_CONFIG; + + let runtime_path = dirs::app_home_dir()?.join(RUNTIME_CONFIG); + let clash_config = Config::clash().await.latest_ref().0.clone(); + + *Config::runtime().await.draft_mut() = Box::new(IRuntime { + config: Some(clash_config.clone()), + exists_keys: vec![], + chain_logs: Default::default(), + }); + + help::save_yaml(&runtime_path, &clash_config, Some("# Clash Verge Runtime")).await?; + handle::Handle::notice_message(error_key, error_msg); + Ok(()) + } + + pub async fn update_config(&self) -> Result<(bool, String)> { + if handle::Handle::global().is_exiting() { + return Ok((true, String::new())); + } + + if !self.should_update_config()? { + return Ok((true, String::new())); + } + + let _permit = self.update_semaphore.try_acquire() + .map_err(|_| anyhow!("Config update already in progress"))?; + + self.perform_config_update().await + } + + fn should_update_config(&self) -> Result { + let now = Instant::now(); + let mut last = self.last_update.lock(); + + if let Some(last_time) = *last { + if now.duration_since(last_time) < timing::CONFIG_UPDATE_DEBOUNCE { + return Ok(false); + } + } + + *last = Some(now); + Ok(true) + } + + async fn perform_config_update(&self) -> Result<(bool, String)> { + Config::generate().await?; + + match CoreConfigValidator::global().validate_config().await { + Ok((true, _)) => { + let run_path = Config::generate_file(ConfigType::Run).await?; + self.apply_config(run_path).await?; + Ok((true, String::new())) + } + Ok((false, error_msg)) => { + Config::runtime().await.discard(); + Ok((false, error_msg)) + } + Err(e) => { + Config::runtime().await.discard(); + Err(e) + } + } + } + + pub async fn put_configs_force(&self, path: PathBuf) -> Result<()> { + self.apply_config(path).await + } + + pub(super) async fn apply_config(&self, path: PathBuf) -> Result<()> { + let path_str = dirs::path_to_str(&path)?; + + match self.reload_config(path_str).await { + Ok(_) => { + Config::runtime().await.apply(); + logging!(info, Type::Core, "Configuration applied"); + Ok(()) + } + Err(err) if Self::should_restart_on_error(&err) => { + self.retry_with_restart(path_str).await + } + Err(err) => { + Config::runtime().await.discard(); + Err(anyhow!("Failed to apply config: {}", err)) + } + } + } + + async fn retry_with_restart(&self, config_path: &str) -> Result<()> { + if handle::Handle::global().is_exiting() { + return Err(anyhow!("Application exiting")); + } + + logging!(warn, Type::Core, "Restarting core for config reload"); + self.restart_core().await?; + sleep(timing::CONFIG_RELOAD_DELAY).await; + + self.reload_config(config_path).await?; + Config::runtime().await.apply(); + logging!(info, Type::Core, "Configuration applied after restart"); + Ok(()) + } + + async fn reload_config(&self, path: &str) -> Result<(), MihomoError> { + handle::Handle::mihomo().await.reload_config(true, path).await + } + + fn should_restart_on_error(err: &MihomoError) -> bool { + match err { + MihomoError::ConnectionFailed | MihomoError::ConnectionLost => true, + MihomoError::Io(io_err) => Self::is_connection_io_error(io_err.kind()), + MihomoError::Reqwest(req_err) => { + req_err.is_connect() + || req_err.is_timeout() + || Self::contains_error_pattern(&req_err.to_string()) + } + MihomoError::FailedResponse(msg) => Self::contains_error_pattern(msg), + _ => false, + } + } + + fn is_connection_io_error(kind: std::io::ErrorKind) -> bool { + matches!( + kind, + std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::ConnectionRefused + | std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::NotFound + ) + } + + fn contains_error_pattern(text: &str) -> bool { + use crate::constants::error_patterns::CONNECTION_ERRORS; + CONNECTION_ERRORS.iter().any(|p| text.contains(p)) + } +} + diff --git a/src-tauri/src/core/manager/lifecycle.rs b/src-tauri/src/core/manager/lifecycle.rs new file mode 100644 index 000000000..394488412 --- /dev/null +++ b/src-tauri/src/core/manager/lifecycle.rs @@ -0,0 +1,120 @@ +use super::{CoreManager, RunningMode}; +use crate::{ + core::{logger::ClashLogger, service::{ServiceStatus, SERVICE_MANAGER}}, + logging, + utils::logging::Type, +}; +use anyhow::Result; + +impl CoreManager { + pub async fn start_core(&self) -> Result<()> { + self.prepare_startup().await?; + + match self.get_running_mode() { + RunningMode::Service => self.start_core_by_service().await, + RunningMode::NotRunning | RunningMode::Sidecar => self.start_core_by_sidecar().await, + } + } + + pub async fn stop_core(&self) -> Result<()> { + ClashLogger::global().clear_logs(); + + match self.get_running_mode() { + RunningMode::Service => self.stop_core_by_service().await, + RunningMode::Sidecar => self.stop_core_by_sidecar(), + RunningMode::NotRunning => Ok(()), + } + } + + pub async fn restart_core(&self) -> Result<()> { + logging!(info, Type::Core, "Restarting core"); + self.stop_core().await?; + + if SERVICE_MANAGER.lock().await.init().await.is_ok() { + let _ = SERVICE_MANAGER.lock().await.refresh().await; + } + + self.start_core().await + } + + pub async fn change_core(&self, clash_core: Option) -> Result<(), String> { + use crate::config::{Config, ConfigType, IVerge}; + + let core = clash_core.as_ref() + .ok_or_else(|| "Clash core cannot be None".to_string())?; + + if !IVerge::VALID_CLASH_CORES.contains(&core.as_str()) { + return Err(format!("Invalid clash core: {}", core)); + } + + Config::verge().await.draft_mut().clash_core = clash_core; + Config::verge().await.apply(); + + let verge_data = Config::verge().await.latest_ref().clone(); + verge_data.save_file().await.map_err(|e| e.to_string())?; + + let run_path = Config::generate_file(ConfigType::Run).await + .map_err(|e| e.to_string())?; + + self.apply_config(run_path).await.map_err(|e| e.to_string()) + } + + async fn prepare_startup(&self) -> Result<()> { + self.wait_for_service_if_needed().await; + + let mode = match SERVICE_MANAGER.lock().await.current() { + ServiceStatus::Ready => RunningMode::Service, + _ => RunningMode::Sidecar, + }; + + self.set_running_mode(mode); + Ok(()) + } + + #[cfg(target_os = "windows")] + async fn wait_for_service_if_needed(&self) { + use crate::{config::Config, constants::timing}; + use backoff::{Error as BackoffError, ExponentialBackoff}; + + let needs_service = Config::verge().await + .latest_ref() + .enable_tun_mode + .unwrap_or(false); + + if !needs_service { + return; + } + + let backoff = ExponentialBackoff { + initial_interval: timing::SERVICE_WAIT_INTERVAL, + max_interval: timing::SERVICE_WAIT_INTERVAL, + max_elapsed_time: Some(timing::SERVICE_WAIT_MAX), + multiplier: 1.0, + randomization_factor: 0.0, + ..Default::default() + }; + + let operation = || async { + let mut manager = SERVICE_MANAGER.lock().await; + + if matches!(manager.current(), ServiceStatus::Ready) { + return Ok(()); + } + + manager.init().await.map_err(BackoffError::transient)?; + let _ = manager.refresh().await; + + if matches!(manager.current(), ServiceStatus::Ready) { + Ok(()) + } else { + Err(BackoffError::transient(anyhow::anyhow!("Service not ready"))) + } + }; + + let _ = backoff::future::retry(backoff, operation).await; + } + + #[cfg(not(target_os = "windows"))] + async fn wait_for_service_if_needed(&self) {} +} + diff --git a/src-tauri/src/core/manager/mod.rs b/src-tauri/src/core/manager/mod.rs new file mode 100644 index 000000000..85f28395a --- /dev/null +++ b/src-tauri/src/core/manager/mod.rs @@ -0,0 +1,80 @@ +mod config; +mod lifecycle; +mod process; +mod state; + +use anyhow::Result; +use parking_lot::Mutex; +use std::{fmt, sync::Arc, time::Instant}; +use tokio::sync::Semaphore; + +use crate::process::CommandChildGuard; +use crate::singleton_lazy; + +#[derive(Debug, Clone, Copy, serde::Serialize, PartialEq, Eq)] +pub enum RunningMode { + Service, + Sidecar, + NotRunning, +} + +impl fmt::Display for RunningMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Service => write!(f, "Service"), + Self::Sidecar => write!(f, "Sidecar"), + Self::NotRunning => write!(f, "NotRunning"), + } + } +} + +#[derive(Debug)] +pub struct CoreManager { + state: Arc>, + update_semaphore: Arc, + last_update: Arc>>, +} + +#[derive(Debug)] +struct State { + running_mode: RunningMode, + child_sidecar: Option, +} + +impl Default for State { + fn default() -> Self { + Self { + running_mode: RunningMode::NotRunning, + child_sidecar: None, + } + } +} + +impl Default for CoreManager { + fn default() -> Self { + Self { + state: Arc::new(Mutex::new(State::default())), + update_semaphore: Arc::new(Semaphore::new(1)), + last_update: Arc::new(Mutex::new(None)), + } + } +} + +impl CoreManager { + pub fn get_running_mode(&self) -> RunningMode { + self.state.lock().running_mode + } + + pub fn set_running_mode(&self, mode: RunningMode) { + self.state.lock().running_mode = mode; + } + + pub async fn init(&self) -> Result<()> { + self.cleanup_orphaned_processes().await?; + self.start_core().await?; + Ok(()) + } +} + +singleton_lazy!(CoreManager, CORE_MANAGER, CoreManager::default); + diff --git a/src-tauri/src/core/manager/process.rs b/src-tauri/src/core/manager/process.rs new file mode 100644 index 000000000..267439686 --- /dev/null +++ b/src-tauri/src/core/manager/process.rs @@ -0,0 +1,204 @@ +use super::CoreManager; +use crate::{ + AsyncHandler, + constants::{process, timing}, + logging, + utils::logging::Type, +}; +use anyhow::{Result, anyhow}; + +impl CoreManager { + pub async fn cleanup_orphaned_processes(&self) -> Result<()> { + logging!(info, Type::Core, "Cleaning orphaned mihomo processes"); + + let current_pid = self.state.lock().child_sidecar.as_ref().and_then(|c| c.pid()); + let target_processes = process::process_names(); + + let process_futures = target_processes.iter().map(|&name| { + let process_name = process::with_extension(name); + self.find_processes_by_name(process_name, name) + }); + + let process_results = futures::future::join_all(process_futures).await; + + let pids_to_kill: Vec<_> = process_results + .into_iter() + .filter_map(Result::ok) + .flat_map(|(pids, name)| { + pids.into_iter() + .filter(move |&pid| Some(pid) != current_pid) + .map(move |pid| (pid, name.clone())) + }) + .collect(); + + if pids_to_kill.is_empty() { + return Ok(()); + } + + let kill_futures = pids_to_kill + .iter() + .map(|(pid, name)| self.kill_process_verified(*pid, name.clone())); + + let killed_count = futures::future::join_all(kill_futures) + .await + .into_iter() + .filter(|&success| success) + .count(); + + if killed_count > 0 { + logging!(info, Type::Core, "Cleaned {} orphaned processes", killed_count); + } + + Ok(()) + } + + async fn find_processes_by_name(&self, process_name: String, _target: &str) -> Result<(Vec, String)> { + #[cfg(windows)] + { + use std::mem; + use winapi::um::{ + handleapi::CloseHandle, + tlhelp32::{CreateToolhelp32Snapshot, PROCESSENTRY32W, Process32FirstW, Process32NextW, TH32CS_SNAPPROCESS}, + }; + + let process_name_clone = process_name.clone(); + let pids = AsyncHandler::spawn_blocking(move || -> Result> { + let mut pids = Vec::with_capacity(8); + + unsafe { + let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); + if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE { + return Err(anyhow!("Failed to create process snapshot")); + } + + let mut pe32: PROCESSENTRY32W = mem::zeroed(); + pe32.dwSize = mem::size_of::() as u32; + + if Process32FirstW(snapshot, &mut pe32) != 0 { + loop { + let end_pos = pe32.szExeFile.iter().position(|&x| x == 0) + .unwrap_or(pe32.szExeFile.len()); + + if end_pos > 0 { + let exe_file = String::from_utf16_lossy(&pe32.szExeFile[..end_pos]); + if exe_file.eq_ignore_ascii_case(&process_name_clone) { + pids.push(pe32.th32ProcessID); + } + } + + if Process32NextW(snapshot, &mut pe32) == 0 { + break; + } + } + } + + CloseHandle(snapshot); + } + + Ok(pids) + }).await??; + + Ok((pids, process_name)) + } + + #[cfg(not(windows))] + { + let cmd = if cfg!(target_os = "macos") { "pgrep" } else { "pidof" }; + let output = tokio::process::Command::new(cmd) + .arg(&process_name) + .output() + .await?; + + if !output.status.success() { + return Ok((Vec::new(), process_name)); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let pids: Vec = stdout + .split_whitespace() + .filter_map(|s| s.parse().ok()) + .collect(); + + Ok((pids, process_name)) + } + } + + async fn kill_process_verified(&self, pid: u32, process_name: String) -> bool { + #[cfg(windows)] + let success = { + use winapi::um::{ + handleapi::CloseHandle, + processthreadsapi::{OpenProcess, TerminateProcess}, + winnt::{HANDLE, PROCESS_TERMINATE}, + }; + + AsyncHandler::spawn_blocking(move || unsafe { + let handle: HANDLE = OpenProcess(PROCESS_TERMINATE, 0, pid); + if handle.is_null() { + return false; + } + let result = TerminateProcess(handle, 1) != 0; + CloseHandle(handle); + result + }).await.unwrap_or(false) + }; + + #[cfg(not(windows))] + let success = tokio::process::Command::new("kill") + .args(["-9", &pid.to_string()]) + .output() + .await + .map(|output| output.status.success()) + .unwrap_or(false); + + if !success { + return false; + } + + tokio::time::sleep(timing::PROCESS_VERIFY_DELAY).await; + + if self.is_process_running(pid).await.unwrap_or(false) { + logging!(warn, Type::Core, "Process {} (PID: {}) still running after termination", process_name, pid); + false + } else { + logging!(info, Type::Core, "Terminated process {} (PID: {})", process_name, pid); + true + } + } + + async fn is_process_running(&self, pid: u32) -> Result { + #[cfg(windows)] + { + use winapi::{ + shared::minwindef::DWORD, + um::{ + handleapi::CloseHandle, + processthreadsapi::{GetExitCodeProcess, OpenProcess}, + winnt::{HANDLE, PROCESS_QUERY_INFORMATION}, + }, + }; + + AsyncHandler::spawn_blocking(move || unsafe { + let handle: HANDLE = OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid); + if handle.is_null() { + return Ok(false); + } + let mut exit_code: DWORD = 0; + let result = GetExitCodeProcess(handle, &mut exit_code); + CloseHandle(handle); + Ok(result != 0 && exit_code == 259) + }).await? + } + + #[cfg(not(windows))] + { + let output = tokio::process::Command::new("ps") + .args(["-p", &pid.to_string()]) + .output() + .await?; + + Ok(output.status.success() && !output.stdout.is_empty()) + } + } +} + diff --git a/src-tauri/src/core/manager/state.rs b/src-tauri/src/core/manager/state.rs new file mode 100644 index 000000000..1335b3f73 --- /dev/null +++ b/src-tauri/src/core/manager/state.rs @@ -0,0 +1,125 @@ +use super::{CoreManager, RunningMode}; +use crate::{ + AsyncHandler, + config::Config, + core::{ + handle, + logger::ClashLogger, + service, + }, + logging, + process::CommandChildGuard, + utils::{ + dirs, + init::sidecar_writer, + logging::{SharedWriter, Type, write_sidecar_log}, + }, +}; +use anyhow::Result; +use compact_str::CompactString; +use flexi_logger::DeferredNow; +use log::Level; +use std::collections::VecDeque; +use tauri_plugin_shell::ShellExt; + +impl CoreManager { + pub async fn get_clash_logs(&self) -> Result> { + match self.get_running_mode() { + RunningMode::Service => service::get_clash_logs_by_service().await, + RunningMode::Sidecar => Ok(ClashLogger::global().get_logs().clone()), + RunningMode::NotRunning => Ok(VecDeque::new()), + } + } + + pub(super) async fn start_core_by_sidecar(&self) -> Result<()> { + logging!(info, Type::Core, "Starting core in sidecar mode"); + + let config_file = Config::generate_file(crate::config::ConfigType::Run).await?; + let app_handle = handle::Handle::app_handle(); + let clash_core = Config::verge().await.latest_ref().get_valid_clash_core(); + let config_dir = dirs::app_home_dir()?; + + let (mut rx, child) = app_handle + .shell() + .sidecar(&clash_core)? + .args([ + "-d", + dirs::path_to_str(&config_dir)?, + "-f", + dirs::path_to_str(&config_file)?, + ]) + .spawn()?; + + let pid = child.pid(); + logging!(trace, Type::Core, "Sidecar started with PID: {}", pid); + + { + let mut state = self.state.lock(); + state.child_sidecar = Some(CommandChildGuard::new(child)); + state.running_mode = RunningMode::Sidecar; + } + + let shared_writer: SharedWriter = std::sync::Arc::new(tokio::sync::Mutex::new(sidecar_writer().await?)); + + AsyncHandler::spawn(|| async move { + while let Some(event) = rx.recv().await { + match event { + tauri_plugin_shell::process::CommandEvent::Stdout(line) + | tauri_plugin_shell::process::CommandEvent::Stderr(line) => { + let mut now = DeferredNow::default(); + let message = CompactString::from(String::from_utf8_lossy(&line).as_ref()); + let w = shared_writer.lock().await; + write_sidecar_log(w, &mut now, Level::Error, &message); + ClashLogger::global().append_log(message); + } + tauri_plugin_shell::process::CommandEvent::Terminated(term) => { + let mut now = DeferredNow::default(); + let message = if let Some(code) = term.code { + CompactString::from(format!("Process terminated with code: {}", code)) + } else if let Some(signal) = term.signal { + CompactString::from(format!("Process terminated by signal: {}", signal)) + } else { + CompactString::from("Process terminated") + }; + let w = shared_writer.lock().await; + write_sidecar_log(w, &mut now, Level::Info, &message); + ClashLogger::global().clear_logs(); + break; + } + _ => {} + } + } + }); + + Ok(()) + } + + pub(super) fn stop_core_by_sidecar(&self) -> Result<()> { + logging!(info, Type::Core, "Stopping sidecar"); + + let mut state = self.state.lock(); + if let Some(child) = state.child_sidecar.take() { + let pid = child.pid(); + drop(child); + logging!(trace, Type::Core, "Sidecar stopped (PID: {:?})", pid); + } + state.running_mode = RunningMode::NotRunning; + Ok(()) + } + + pub(super) async fn start_core_by_service(&self) -> Result<()> { + logging!(info, Type::Core, "Starting core in service mode"); + let config_file = Config::generate_file(crate::config::ConfigType::Run).await?; + service::run_core_by_service(&config_file).await?; + self.set_running_mode(RunningMode::Service); + Ok(()) + } + + pub(super) async fn stop_core_by_service(&self) -> Result<()> { + logging!(info, Type::Core, "Stopping service"); + service::stop_core_by_service().await?; + self.set_running_mode(RunningMode::NotRunning); + Ok(()) + } +} + diff --git a/src-tauri/src/core/mod.rs b/src-tauri/src/core/mod.rs index 03e44d57f..b4e2119c1 100644 --- a/src-tauri/src/core/mod.rs +++ b/src-tauri/src/core/mod.rs @@ -1,11 +1,11 @@ pub mod async_proxy_query; pub mod backup; -#[allow(clippy::module_inception)] -mod core; pub mod event_driven_proxy; pub mod handle; pub mod hotkey; pub mod logger; +pub mod manager; +mod notification; pub mod service; pub mod sysopt; pub mod timer; @@ -13,4 +13,8 @@ pub mod tray; pub mod validate; pub mod win_uwp; -pub use self::{core::*, event_driven_proxy::EventDrivenProxyManager, timer::Timer}; +pub use self::{ + event_driven_proxy::EventDrivenProxyManager, + manager::CoreManager, + timer::Timer, +}; diff --git a/src-tauri/src/core/notification.rs b/src-tauri/src/core/notification.rs new file mode 100644 index 000000000..18f17eed0 --- /dev/null +++ b/src-tauri/src/core/notification.rs @@ -0,0 +1,203 @@ +use crate::{ + constants::{retry, timing}, + logging, + utils::logging::Type, +}; +use parking_lot::RwLock; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + mpsc, + }, + thread, + time::Instant, +}; +use tauri::{Emitter, WebviewWindow}; + +#[derive(Debug, Clone)] +pub enum FrontendEvent { + RefreshClash, + RefreshVerge, + NoticeMessage { status: String, message: String }, + ProfileChanged { current_profile_id: String }, + TimerUpdated { profile_index: String }, + ProfileUpdateStarted { uid: String }, + ProfileUpdateCompleted { uid: String }, +} + +#[derive(Debug, Default)] +struct EventStats { + total_sent: AtomicU64, + total_errors: AtomicU64, + last_error_time: RwLock>, +} + +#[derive(Debug, Clone)] +pub struct ErrorMessage { + pub status: String, + pub message: String, +} + +#[derive(Debug)] +pub struct NotificationSystem { + sender: Option>, + #[allow(clippy::type_complexity)] + worker_handle: Option>, + pub(super) is_running: bool, + stats: EventStats, + emergency_mode: RwLock, +} + +impl Default for NotificationSystem { + fn default() -> Self { + Self::new() + } +} + +impl NotificationSystem { + pub fn new() -> Self { + Self { + sender: None, + worker_handle: None, + is_running: false, + stats: EventStats::default(), + emergency_mode: RwLock::new(false), + } + } + + pub fn start(&mut self) { + if self.is_running { + return; + } + + let (tx, rx) = mpsc::channel(); + self.sender = Some(tx); + self.is_running = true; + + let result = thread::Builder::new() + .name("frontend-notifier".into()) + .spawn(move || Self::worker_loop(rx)); + + match result { + Ok(handle) => self.worker_handle = Some(handle), + Err(e) => logging!(error, Type::System, "Failed to start notification worker: {}", e), + } + } + + fn worker_loop(rx: mpsc::Receiver) { + use super::handle::Handle; + + let handle = Handle::global(); + + while !handle.is_exiting() { + match rx.recv_timeout(std::time::Duration::from_millis(100)) { + Ok(event) => Self::process_event(&handle, event), + Err(mpsc::RecvTimeoutError::Disconnected) => break, + Err(mpsc::RecvTimeoutError::Timeout) => continue, + } + } + } + + fn process_event(handle: &super::handle::Handle, event: FrontendEvent) { + let system_guard = handle.notification_system.read(); + let Some(system) = system_guard.as_ref() else { + return; + }; + + if system.should_skip_event(&event) { + return; + } + + if let Some(window) = super::handle::Handle::get_window() { + system.emit_to_window(&window, event); + thread::sleep(timing::EVENT_EMIT_DELAY); + } + } + + fn should_skip_event(&self, event: &FrontendEvent) -> bool { + let is_emergency = *self.emergency_mode.read(); + matches!( + (is_emergency, event), + (true, FrontendEvent::NoticeMessage { status, .. }) if status == "info" + ) + } + + fn emit_to_window(&self, window: &WebviewWindow, event: FrontendEvent) { + let (event_name, payload) = self.serialize_event(event); + + let Ok(payload) = payload else { + self.stats.total_errors.fetch_add(1, Ordering::Relaxed); + return; + }; + + match window.emit(event_name, payload) { + Ok(_) => { + self.stats.total_sent.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + logging!(warn, Type::Frontend, "Event emit failed: {}", e); + self.handle_emit_error(); + } + } + } + + fn serialize_event(&self, event: FrontendEvent) -> (&'static str, Result) { + use serde_json::json; + + match event { + FrontendEvent::RefreshClash => ("verge://refresh-clash-config", Ok(json!("yes"))), + FrontendEvent::RefreshVerge => ("verge://refresh-verge-config", Ok(json!("yes"))), + FrontendEvent::NoticeMessage { status, message } => { + ("verge://notice-message", serde_json::to_value((status, message))) + } + FrontendEvent::ProfileChanged { current_profile_id } => { + ("profile-changed", Ok(json!(current_profile_id))) + } + FrontendEvent::TimerUpdated { profile_index } => { + ("verge://timer-updated", Ok(json!(profile_index))) + } + FrontendEvent::ProfileUpdateStarted { uid } => { + ("profile-update-started", Ok(json!({ "uid": uid }))) + } + FrontendEvent::ProfileUpdateCompleted { uid } => { + ("profile-update-completed", Ok(json!({ "uid": uid }))) + } + } + } + + fn handle_emit_error(&self) { + self.stats.total_errors.fetch_add(1, Ordering::Relaxed); + *self.stats.last_error_time.write() = Some(Instant::now()); + + let errors = self.stats.total_errors.load(Ordering::Relaxed); + if errors > retry::EVENT_EMIT_THRESHOLD && !*self.emergency_mode.read() { + logging!(warn, Type::Frontend, "Entering emergency mode after {} errors", errors); + *self.emergency_mode.write() = true; + } + } + + pub fn send_event(&self, event: FrontendEvent) -> bool { + if self.should_skip_event(&event) { + return false; + } + + if let Some(sender) = &self.sender { + sender.send(event).is_ok() + } else { + false + } + } + + pub fn shutdown(&mut self) { + self.is_running = false; + + if let Some(sender) = self.sender.take() { + drop(sender); + } + + if let Some(handle) = self.worker_handle.take() { + let _ = handle.join(); + } + } +} + diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 680df052f..1b5da0d5f 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -3,6 +3,7 @@ mod cmd; pub mod config; +mod constants; mod core; mod enhance; mod feat; diff --git a/src-tauri/src/utils/logging.rs b/src-tauri/src/utils/logging.rs index 6718b3578..ec08e4e26 100644 --- a/src-tauri/src/utils/logging.rs +++ b/src-tauri/src/utils/logging.rs @@ -122,7 +122,7 @@ macro_rules! wrap_err { macro_rules! logging { // 不带 print 参数的版本(默认不打印) ($level:ident, $type:expr, $($arg:tt)*) => { - log::$level!(target: "app", "{} {}", $type, format_args!($($arg)*)); + log::$level!(target: "app", "{} {}", $type, format_args!($($arg)*)) }; } diff --git a/src/providers/app-data-provider.tsx b/src/providers/app-data-provider.tsx index 9511b795e..1944722f4 100644 --- a/src/providers/app-data-provider.tsx +++ b/src/providers/app-data-provider.tsx @@ -15,6 +15,7 @@ import { getRunningMode, getSystemProxy, } from "@/services/cmds"; +import { SWR_DEFAULTS, SWR_REALTIME, SWR_SLOW_POLL } from "@/services/config"; import { AppDataContext, AppDataContextType } from "./app-data-context"; @@ -30,61 +31,33 @@ export const AppDataProvider = ({ "getProxies", calcuProxies, { - refreshInterval: 8000, - revalidateOnFocus: false, - suspense: false, - errorRetryCount: 2, - dedupingInterval: 3000, - onError: (err) => { - console.warn("[DataProvider] 代理数据获取失败:", err); - }, + ...SWR_REALTIME, + onError: (err) => console.warn("[DataProvider] Proxy fetch failed:", err), }, ); const { data: clashConfig, mutate: refreshClashConfig } = useSWR( "getClashConfig", getBaseConfig, - { - refreshInterval: 60000, - revalidateOnFocus: false, - suspense: false, - errorRetryCount: 2, - dedupingInterval: 5000, - }, + SWR_SLOW_POLL, ); const { data: proxyProviders, mutate: refreshProxyProviders } = useSWR( "getProxyProviders", calcuProxyProviders, - { - revalidateOnFocus: false, - revalidateOnReconnect: false, - dedupingInterval: 5000, - suspense: false, - errorRetryCount: 2, - }, + SWR_DEFAULTS, ); const { data: ruleProviders, mutate: refreshRuleProviders } = useSWR( "getRuleProviders", getRuleProviders, - { - revalidateOnFocus: false, - suspense: false, - errorRetryCount: 2, - dedupingInterval: 5000, - }, + SWR_DEFAULTS, ); const { data: rulesData, mutate: refreshRules } = useSWR( "getRules", getRules, - { - revalidateOnFocus: false, - suspense: false, - errorRetryCount: 2, - dedupingInterval: 5000, - }, + SWR_DEFAULTS, ); useEffect(() => { @@ -101,7 +74,7 @@ export const AppDataProvider = ({ try { fn(); } catch (error) { - console.error("[数据提供者] 立即清理失败:", error); + console.error("[DataProvider] Immediate cleanup failed:", error); } } else { cleanupFns.push(fn); @@ -151,10 +124,10 @@ export const AppDataProvider = ({ scheduleTimeout(() => { refreshRules().catch((error) => - console.warn("[数据提供者] 规则刷新失败:", error), + console.warn("[DataProvider] Rules refresh failed:", error), ); refreshRuleProviders().catch((error) => - console.warn("[数据提供者] 规则提供者刷新失败:", error), + console.warn("[DataProvider] Rule providers refresh failed:", error), ); }, 200); }; @@ -166,7 +139,7 @@ export const AppDataProvider = ({ lastUpdateTime = now; scheduleTimeout(() => { refreshProxy().catch((error) => - console.error("[数据提供者] 代理刷新失败:", error), + console.error("[DataProvider] Proxy refresh failed:", error), ); }, 200); }; @@ -178,7 +151,7 @@ export const AppDataProvider = ({ lastUpdateTime = now; scheduleTimeout(() => { refreshProxy().catch((error) => - console.warn("[数据提供者] 代理刷新失败:", error), + console.warn("[DataProvider] Proxy refresh failed:", error), ); }, 200); }; @@ -241,7 +214,7 @@ export const AppDataProvider = ({ if (errors.length > 0) { console.error( - `[数据提供者] 清理过程中发生 ${errors.length} 个错误:`, + `[DataProvider] ${errors.length} errors during cleanup:`, errors, ); } @@ -251,26 +224,18 @@ export const AppDataProvider = ({ const { data: sysproxy, mutate: refreshSysproxy } = useSWR( "getSystemProxy", getSystemProxy, - { - revalidateOnFocus: false, - revalidateOnReconnect: false, - suspense: false, - errorRetryCount: 2, - dedupingInterval: 5000, - }, + SWR_DEFAULTS, ); - const { data: runningMode } = useSWR("getRunningMode", getRunningMode, { - revalidateOnFocus: false, - suspense: false, - errorRetryCount: 2, - dedupingInterval: 5000, - }); + const { data: runningMode } = useSWR( + "getRunningMode", + getRunningMode, + SWR_DEFAULTS, + ); const { data: uptimeData } = useSWR("appUptime", getAppUptime, { + ...SWR_DEFAULTS, refreshInterval: 3000, - revalidateOnFocus: false, - suspense: false, errorRetryCount: 1, }); diff --git a/src/services/config.ts b/src/services/config.ts new file mode 100644 index 000000000..f49b86ffc --- /dev/null +++ b/src/services/config.ts @@ -0,0 +1,25 @@ +import { useSWRConfig } from "swr"; + +export const SWR_DEFAULTS = { + revalidateOnFocus: false, + revalidateOnReconnect: false, + suspense: false, + errorRetryCount: 2, + dedupingInterval: 5000, +} as const; + +export const SWR_REALTIME = { + ...SWR_DEFAULTS, + refreshInterval: 8000, + dedupingInterval: 3000, +} as const; + +export const SWR_SLOW_POLL = { + ...SWR_DEFAULTS, + refreshInterval: 60000, +} as const; + +export const useSWRMutate = () => { + const { mutate } = useSWRConfig(); + return mutate; +};