fix(subscription): resolve issues causing import failures in some cases #4534, #4436, #4552, #4519, #4517, #4503, #4336, #4301 (#4553)

* fix(subscription): resolve issues causing import failures in some cases #4534, #4436, #4552, #4519, #4517, #4503, #4336, #4301

* fix(profile): update profile creation to include file data handling

* fix(app): improve singleton instance exit handling

* fix: remove unsued handle method
This commit is contained in:
Tunglies
2025-08-29 17:46:46 +08:00
committed by GitHub
parent a9951e4eca
commit 6eecd70bd5
11 changed files with 341 additions and 309 deletions

View File

@@ -2,10 +2,10 @@ use super::CmdResult;
use crate::{
config::{
profiles::{
profiles_append_item_safe, profiles_delete_item_safe, profiles_patch_item_safe,
profiles_reorder_safe, profiles_save_file_safe,
profiles_append_item_with_filedata_safe, profiles_delete_item_safe,
profiles_patch_item_safe, profiles_reorder_safe, profiles_save_file_safe,
},
Config, IProfiles, PrfItem, PrfOption,
profiles_append_item_safe, Config, IProfiles, PrfItem, PrfOption,
},
core::{handle, timer::Timer, tray::Tray, CoreManager},
feat, logging,
@@ -225,8 +225,8 @@ pub async fn reorder_profile(active_id: String, over_id: String) -> CmdResult {
/// 创建新的profile
/// 创建一个新的配置文件
#[tauri::command]
pub async fn create_profile(item: PrfItem, _file_data: Option<String>) -> CmdResult {
match profiles_append_item_safe(item).await {
pub async fn create_profile(item: PrfItem, file_data: Option<String>) -> CmdResult {
match profiles_append_item_with_filedata_safe(item, file_data).await {
Ok(_) => Ok(()),
Err(err) => match err.to_string().as_str() {
"the file already exists" => Err("the file already exists".into()),

View File

@@ -1,6 +1,6 @@
use super::{Draft, IClashTemp, IProfiles, IRuntime, IVerge};
use crate::{
config::{profiles::profiles_append_item_safe, PrfItem},
config::{profiles_append_item_safe, PrfItem},
core::{handle, CoreManager},
enhance, logging,
process::AsyncHandler,

View File

@@ -4,7 +4,6 @@ use crate::utils::{
tmpl,
};
use anyhow::{bail, Context, Result};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use serde_yaml::Mapping;
use std::{fs, time::Duration};
@@ -266,7 +265,7 @@ impl PrfItem {
};
// 使用网络管理器发送请求
let resp = match NetworkManager::global()
let resp = match NetworkManager::new()
.get_with_interrupt(
url,
proxy_type,
@@ -284,7 +283,7 @@ impl PrfItem {
};
let status_code = resp.status();
if !StatusCode::is_success(&status_code) {
if !status_code.is_success() {
bail!("failed to fetch remote profile with status {status_code}")
}
@@ -350,7 +349,7 @@ impl PrfItem {
let uid = help::get_uid("R");
let file = format!("{uid}.yaml");
let name = name.unwrap_or(filename.unwrap_or("Remote File".into()));
let data = resp.text_with_charset("utf-8").await?;
let data = resp.text_with_charset()?;
// process the charset "UTF-8 with BOM"
let data = data.trim_start_matches('\u{feff}');

View File

@@ -740,6 +740,22 @@ impl IProfiles {
// 特殊的Send-safe helper函数完全避免跨await持有guard
use crate::config::Config;
pub async fn profiles_append_item_with_filedata_safe(
item: PrfItem,
file_data: Option<String>,
) -> Result<()> {
AsyncHandler::spawn_blocking(move || {
AsyncHandler::handle().block_on(async {
let item = PrfItem::from(item, file_data).await?;
let profiles = Config::profiles().await;
let mut profiles_guard = profiles.data_mut();
profiles_guard.append_item(item).await
})
})
.await
.map_err(|e| anyhow::anyhow!("Task join error: {}", e))?
}
pub async fn profiles_append_item_safe(item: PrfItem) -> Result<()> {
AsyncHandler::spawn_blocking(move || {
AsyncHandler::handle().block_on(async {

View File

@@ -294,10 +294,6 @@ impl Handle {
}
}
pub fn is_initialized(&self) -> bool {
self.app_handle().is_some()
}
/// 获取 AppHandle
pub fn app_handle(&self) -> Option<AppHandle> {
self.app_handle.read().clone()

View File

@@ -122,7 +122,7 @@ pub async fn test_delay(url: String) -> anyhow::Result<u32> {
let start = Instant::now();
let response = NetworkManager::global()
let response = NetworkManager::new()
.get_with_interrupt(&url, proxy_type, Some(10), user_agent, false)
.await;

View File

@@ -40,8 +40,8 @@ mod app_init {
Ok(result) => {
if result.is_err() {
logging!(info, Type::Setup, true, "检测到已有应用实例运行");
if handle::Handle::global().is_initialized() {
handle::Handle::global().app_handle().unwrap().exit(0);
if let Some(app_handle) = handle::Handle::global().app_handle() {
app_handle.exit(0);
} else {
std::process::exit(0);
}
@@ -309,7 +309,7 @@ pub fn run() {
app_init::init_singleton_check();
// Initialize network manager
utils::network::NetworkManager::global().init();
utils::network::NetworkManager::new().init();
// Initialize portable flag
let _ = utils::dirs::init_portable_flag();

View File

@@ -1,143 +1,97 @@
use anyhow::Result;
use parking_lot::Mutex;
use reqwest::{Client, ClientBuilder, Proxy, RequestBuilder, Response};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Once,
},
time::{Duration, Instant},
use isahc::http::{
header::{HeaderMap, HeaderValue, USER_AGENT},
StatusCode, Uri,
};
use tokio::runtime::{Builder, Runtime};
use isahc::prelude::*;
use isahc::{config::SslOption, HttpClient};
use std::sync::Once;
use std::time::{Duration, Instant};
use sysproxy::Sysproxy;
use tokio::sync::Mutex;
use tokio::time::timeout;
use crate::utils::logging::Type;
use crate::{config::Config, logging, process::AsyncHandler, singleton_lazy};
use crate::config::Config;
// HTTP2 相关
const H2_CONNECTION_WINDOW_SIZE: u32 = 1024 * 1024;
const H2_STREAM_WINDOW_SIZE: u32 = 1024 * 1024;
const H2_MAX_FRAME_SIZE: u32 = 16 * 1024;
const H2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
const H2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
const POOL_MAX_IDLE_PER_HOST: usize = 5;
const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(15);
/// 网络管理器
pub struct NetworkManager {
runtime: Arc<Runtime>,
self_proxy_client: Mutex<Option<Client>>,
system_proxy_client: Mutex<Option<Client>>,
no_proxy_client: Mutex<Option<Client>>,
init: Once,
last_connection_error: Mutex<Option<(Instant, String)>>,
connection_error_count: AtomicUsize,
#[derive(Debug)]
pub struct HttpResponse {
status: StatusCode,
headers: HeaderMap,
body: String,
}
// Use singleton_lazy macro to replace lazy_static!
singleton_lazy!(NetworkManager, NETWORK_MANAGER, NetworkManager::new);
impl HttpResponse {
pub fn new(status: StatusCode, headers: HeaderMap, body: String) -> Self {
Self {
status,
headers,
body,
}
}
pub fn status(&self) -> StatusCode {
self.status
}
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
pub fn text_with_charset(&self) -> Result<&str> {
Ok(&self.body)
}
}
#[derive(Debug, Clone, Copy)]
pub enum ProxyType {
None,
Localhost,
System,
}
pub struct NetworkManager {
self_proxy_client: Mutex<Option<HttpClient>>,
system_proxy_client: Mutex<Option<HttpClient>>,
no_proxy_client: Mutex<Option<HttpClient>>,
init: Once,
last_connection_error: Mutex<Option<(Instant, String)>>,
connection_error_count: Mutex<usize>,
}
impl NetworkManager {
fn new() -> Self {
// 创建专用的异步运行时线程数限制为4个
let runtime = match Builder::new_multi_thread()
.worker_threads(4)
.thread_name("clash-verge-network")
.enable_io()
.enable_time()
.build()
{
Ok(runtime) => runtime,
Err(e) => {
log::error!(
"Failed to create network runtime: {}. Using fallback single-threaded runtime.",
e
);
// Fallback to current thread runtime
match Builder::new_current_thread()
.enable_io()
.enable_time()
.thread_name("clash-verge-network-fallback")
.build()
{
Ok(fallback_runtime) => fallback_runtime,
Err(fallback_err) => {
log::error!(
"Failed to create fallback runtime: {}. This is critical.",
fallback_err
);
std::process::exit(1);
}
}
}
};
NetworkManager {
runtime: Arc::new(runtime),
pub fn new() -> Self {
Self {
self_proxy_client: Mutex::new(None),
system_proxy_client: Mutex::new(None),
no_proxy_client: Mutex::new(None),
init: Once::new(),
last_connection_error: Mutex::new(None),
connection_error_count: AtomicUsize::new(0),
connection_error_count: Mutex::new(0),
}
}
/// 初始化网络客户端
pub fn init(&self) {
self.init.call_once(|| {
self.runtime.spawn(async {
logging!(info, Type::Network, true, "初始化网络管理器");
// 创建无代理客户端
let no_proxy_client = match ClientBuilder::new()
.use_rustls_tls()
.no_proxy()
.pool_max_idle_per_host(POOL_MAX_IDLE_PER_HOST)
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(30))
.build()
{
Ok(client) => client,
Err(e) => {
logging!(
error,
Type::Network,
true,
"Failed to build no_proxy client: {}",
e
);
return;
}
};
let mut no_proxy_guard = NetworkManager::global().no_proxy_client.lock();
*no_proxy_guard = Some(no_proxy_client);
logging!(info, Type::Network, true, "网络管理器初始化完成");
});
});
self.init.call_once(|| {});
}
fn record_connection_error(&self, error: &str) {
let mut last_error = self.last_connection_error.lock();
async fn record_connection_error(&self, error: &str) {
let mut last_error = self.last_connection_error.lock().await;
*last_error = Some((Instant::now(), error.to_string()));
self.connection_error_count.fetch_add(1, Ordering::Relaxed);
let mut count = self.connection_error_count.lock().await;
*count += 1;
}
fn should_reset_clients(&self) -> bool {
let error_count = self.connection_error_count.load(Ordering::Relaxed);
let last_error = self.last_connection_error.lock();
async fn should_reset_clients(&self) -> bool {
let count = *self.connection_error_count.lock().await;
let last_error_guard = self.last_connection_error.lock().await;
if error_count > 5 {
if count > 5 {
return true;
}
if let Some((time, _)) = *last_error {
if time.elapsed() < Duration::from_secs(30) && error_count > 2 {
if let Some((time, _)) = &*last_error_guard {
if time.elapsed() < Duration::from_secs(30) && count > 2 {
return true;
}
}
@@ -145,60 +99,57 @@ impl NetworkManager {
false
}
pub fn reset_clients(&self) {
logging!(info, Type::Network, true, "正在重置所有HTTP客户端");
{
let mut client = self.self_proxy_client.lock();
*client = None;
}
{
let mut client = self.system_proxy_client.lock();
*client = None;
}
{
let mut client = self.no_proxy_client.lock();
*client = None;
}
self.connection_error_count.store(0, Ordering::Relaxed);
pub async fn reset_clients(&self) {
*self.self_proxy_client.lock().await = None;
*self.system_proxy_client.lock().await = None;
*self.no_proxy_client.lock().await = None;
*self.connection_error_count.lock().await = 0;
}
fn build_client(
&self,
proxy_uri: Option<Uri>,
default_headers: HeaderMap,
accept_invalid_certs: bool,
timeout_secs: Option<u64>,
) -> Result<HttpClient> {
let proxy_uri_clone = proxy_uri.clone();
let headers_clone = default_headers.clone();
let client = {
let mut builder = HttpClient::builder();
builder = match proxy_uri_clone {
Some(uri) => builder.proxy(Some(uri)),
None => builder.proxy(None),
};
for (name, value) in headers_clone.iter() {
builder = builder.default_header(name, value);
}
if accept_invalid_certs {
builder = builder.ssl_options(SslOption::DANGER_ACCEPT_INVALID_CERTS);
}
if let Some(secs) = timeout_secs {
builder = builder.timeout(Duration::from_secs(secs));
}
Ok(builder.build()?)
};
client
}
/// 创建带有自定义选项的HTTP请求
pub async fn create_request(
&self,
url: &str,
proxy_type: ProxyType,
timeout_secs: Option<u64>,
user_agent: Option<String>,
accept_invalid_certs: bool,
) -> RequestBuilder {
if self.should_reset_clients() {
self.reset_clients();
}
let mut builder = ClientBuilder::new()
.use_rustls_tls()
.pool_max_idle_per_host(POOL_MAX_IDLE_PER_HOST)
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.connect_timeout(DEFAULT_CONNECT_TIMEOUT)
.http2_initial_stream_window_size(H2_STREAM_WINDOW_SIZE)
.http2_initial_connection_window_size(H2_CONNECTION_WINDOW_SIZE)
.http2_adaptive_window(true)
.http2_keep_alive_interval(Some(H2_KEEP_ALIVE_INTERVAL))
.http2_keep_alive_timeout(H2_KEEP_ALIVE_TIMEOUT)
.http2_max_frame_size(H2_MAX_FRAME_SIZE)
.tcp_keepalive(Some(Duration::from_secs(10)))
.http2_max_header_list_size(16 * 1024);
if let Some(timeout) = timeout_secs {
builder = builder.timeout(Duration::from_secs(timeout));
} else {
builder = builder.timeout(DEFAULT_REQUEST_TIMEOUT);
}
match proxy_type {
ProxyType::None => {
builder = builder.no_proxy();
}
) -> Result<HttpClient> {
let proxy_uri = match proxy_type {
ProxyType::None => None,
ProxyType::Localhost => {
let port = {
let verge_port = Config::verge().await.latest_ref().verge_mixed_port;
@@ -207,94 +158,31 @@ impl NetworkManager {
None => Config::clash().await.latest_ref().get_mixed_port(),
}
};
let proxy_scheme = format!("http://127.0.0.1:{port}");
if let Ok(proxy) = Proxy::http(&proxy_scheme) {
builder = builder.proxy(proxy);
}
if let Ok(proxy) = Proxy::https(&proxy_scheme) {
builder = builder.proxy(proxy);
}
if let Ok(proxy) = Proxy::all(&proxy_scheme) {
builder = builder.proxy(proxy);
}
proxy_scheme.parse::<Uri>().ok()
}
ProxyType::System => {
use sysproxy::Sysproxy;
if let Ok(p @ Sysproxy { enable: true, .. }) = Sysproxy::get_system_proxy() {
let proxy_scheme = format!("http://{}:{}", p.host, p.port);
if let Ok(proxy) = Proxy::http(&proxy_scheme) {
builder = builder.proxy(proxy);
}
if let Ok(proxy) = Proxy::https(&proxy_scheme) {
builder = builder.proxy(proxy);
}
if let Ok(proxy) = Proxy::all(&proxy_scheme) {
builder = builder.proxy(proxy);
}
}
}
}
builder = builder.danger_accept_invalid_certs(accept_invalid_certs);
if let Some(ua) = user_agent {
builder = builder.user_agent(ua);
} else {
use crate::utils::resolve::VERSION;
let version = match VERSION.get() {
Some(v) => format!("clash-verge/v{v}"),
None => "clash-verge/unknown".to_string(),
};
builder = builder.user_agent(version);
}
let client = match builder.build() {
Ok(client) => client,
Err(e) => {
logging!(
error,
Type::Network,
true,
"Failed to build custom HTTP client: {}",
e
);
// Return a simple no-proxy client as fallback
match ClientBuilder::new()
.use_rustls_tls()
.no_proxy()
.timeout(DEFAULT_REQUEST_TIMEOUT)
.build()
{
Ok(fallback_client) => fallback_client,
Err(fallback_err) => {
logging!(
error,
Type::Network,
true,
"Failed to create fallback client: {}",
fallback_err
);
self.record_connection_error(&format!(
"Critical client build failure: {}",
fallback_err
));
// Return a minimal client that will likely fail but won't panic
ClientBuilder::new().build().unwrap_or_else(|_| {
// If even the most basic client fails, this is truly critical
std::process::exit(1);
})
}
proxy_scheme.parse::<Uri>().ok()
} else {
None
}
}
};
client.get(url)
let mut headers = HeaderMap::new();
headers.insert(
USER_AGENT,
HeaderValue::from_str(
&user_agent
.unwrap_or_else(|| format!("clash-verge/v{}", env!("CARGO_PKG_VERSION"))),
)?,
);
let client = self.build_client(proxy_uri, headers, accept_invalid_certs, timeout_secs)?;
Ok(client)
}
pub async fn get_with_interrupt(
@@ -304,51 +192,38 @@ impl NetworkManager {
timeout_secs: Option<u64>,
user_agent: Option<String>,
accept_invalid_certs: bool,
) -> Result<Response> {
let request = self
.create_request(
url,
proxy_type,
timeout_secs,
user_agent,
accept_invalid_certs,
)
.await;
) -> Result<HttpResponse> {
if self.should_reset_clients().await {
self.reset_clients().await;
}
let timeout_duration = timeout_secs.unwrap_or(20);
let client = self
.create_request(proxy_type, timeout_secs, user_agent, accept_invalid_certs)
.await?;
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
let timeout_duration = Duration::from_secs(timeout_secs.unwrap_or(20));
let url_owned = url.to_string();
let url_clone = url.to_string();
let watchdog = AsyncHandler::spawn(move || async move {
tokio::time::sleep(Duration::from_secs(timeout_duration)).await;
let _ = cancel_tx.send(());
logging!(warn, Type::Network, true, "请求超时取消: {}", url_clone);
});
let result = tokio::select! {
result = request.send() => result,
_ = cancel_rx => {
self.record_connection_error(&format!("Request interrupted for: {url}"));
return Err(anyhow::anyhow!("Request interrupted after {} seconds", timeout_duration));
let response = match timeout(timeout_duration, async {
let mut response = client.get_async(&url_owned).await?;
let status = response.status();
let headers = response.headers().clone();
let body = response.text().await?;
Ok::<_, anyhow::Error>(HttpResponse::new(status, headers, body))
})
.await
{
Ok(res) => res?,
Err(_) => {
self.record_connection_error(&format!("Request interrupted: {}", url))
.await;
return Err(anyhow::anyhow!(
"Request interrupted after {}s",
timeout_duration.as_secs()
));
}
};
watchdog.abort();
match result {
Ok(response) => Ok(response),
Err(e) => {
self.record_connection_error(&e.to_string());
Err(anyhow::anyhow!("Failed to send HTTP request: {}", e))
}
}
Ok(response)
}
}
/// 代理类型
#[derive(Debug, Clone, Copy)]
pub enum ProxyType {
None,
Localhost,
System,
}