feat: add more network control methods

This commit is contained in:
wonfen
2025-05-03 23:17:51 +08:00
parent c72413cbe6
commit 3983762202
2 changed files with 173 additions and 57 deletions

View File

@@ -5,6 +5,7 @@ use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use serde_yaml::Mapping;
use std::fs;
use std::time::Duration;
use super::Config;
@@ -247,7 +248,7 @@ impl PrfItem {
opt_ref.is_some_and(|o| o.danger_accept_invalid_certs.unwrap_or(false));
let user_agent = opt_ref.and_then(|o| o.user_agent.clone());
let update_interval = opt_ref.and_then(|o| o.update_interval);
let timeout = opt_ref.and_then(|o| o.timeout_seconds).unwrap_or(60);
let timeout = opt_ref.and_then(|o| o.timeout_seconds).unwrap_or(20);
let mut merge = opt_ref.and_then(|o| o.merge.clone());
let mut script = opt_ref.and_then(|o| o.script.clone());
let mut rules = opt_ref.and_then(|o| o.rules.clone());
@@ -264,15 +265,22 @@ impl PrfItem {
};
// 使用网络管理器发送请求
let resp = NetworkManager::global()
.get_with_retry(
let resp = match NetworkManager::global()
.get_with_interrupt(
url,
proxy_type,
Some(timeout),
user_agent.clone(),
accept_invalid_certs,
)
.await?;
.await
{
Ok(r) => r,
Err(e) => {
tokio::time::sleep(Duration::from_millis(100)).await;
bail!("failed to fetch remote profile: {}", e);
}
};
let status_code = resp.status();
if !StatusCode::is_success(&status_code) {

View File

@@ -1,12 +1,21 @@
use anyhow::{Context, Result};
use anyhow::Result;
use lazy_static::lazy_static;
use reqwest::{Client, ClientBuilder, Proxy, RequestBuilder, Response};
use std::sync::{Arc, Mutex, Once};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::runtime::{Builder, Runtime};
use crate::{config::Config, logging, utils::logging::Type};
// HTTP2 相关
const H2_CONNECTION_WINDOW_SIZE: u32 = 1024 * 1024;
const H2_STREAM_WINDOW_SIZE: u32 = 1024 * 1024;
const H2_MAX_FRAME_SIZE: u32 = 16 * 1024;
const H2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
const H2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
/// 网络管理器
pub struct NetworkManager {
runtime: Arc<Runtime>,
@@ -14,6 +23,8 @@ pub struct NetworkManager {
system_proxy_client: Arc<Mutex<Option<Client>>>,
no_proxy_client: Arc<Mutex<Option<Client>>>,
init: Once,
last_connection_error: Arc<Mutex<Option<(Instant, String)>>>,
connection_error_count: Arc<Mutex<usize>>,
}
lazy_static! {
@@ -37,6 +48,8 @@ impl NetworkManager {
system_proxy_client: Arc::new(Mutex::new(None)),
no_proxy_client: Arc::new(Mutex::new(None)),
init: Once::new(),
last_connection_error: Arc::new(Mutex::new(None)),
connection_error_count: Arc::new(Mutex::new(0)),
}
}
@@ -69,8 +82,57 @@ impl NetworkManager {
});
}
fn record_connection_error(&self, error: &str) {
let mut last_error = self.last_connection_error.lock().unwrap();
*last_error = Some((Instant::now(), error.to_string()));
let mut error_count = self.connection_error_count.lock().unwrap();
*error_count += 1;
}
fn should_reset_clients(&self) -> bool {
let error_count = *self.connection_error_count.lock().unwrap();
let last_error = self.last_connection_error.lock().unwrap();
if error_count > 5 {
return true;
}
if let Some((time, _)) = *last_error {
if time.elapsed() < Duration::from_secs(30) && error_count > 2 {
return true;
}
}
false
}
fn reset_clients(&self) {
logging!(info, Type::Network, true, "正在重置所有HTTP客户端");
{
let mut client = self.self_proxy_client.lock().unwrap();
*client = None;
}
{
let mut client = self.system_proxy_client.lock().unwrap();
*client = None;
}
{
let mut client = self.no_proxy_client.lock().unwrap();
*client = None;
}
{
let mut error_count = self.connection_error_count.lock().unwrap();
*error_count = 0;
}
}
/// 获取或创建自代理客户端
fn get_or_create_self_proxy_client(&self) -> Client {
if self.should_reset_clients() {
self.reset_clients();
}
let mut client_guard = self.self_proxy_client.lock().unwrap();
if client_guard.is_none() {
@@ -85,10 +147,18 @@ impl NetworkManager {
.use_rustls_tls()
.pool_max_idle_per_host(5)
.pool_idle_timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(60));
.connect_timeout(DEFAULT_CONNECT_TIMEOUT)
.timeout(DEFAULT_REQUEST_TIMEOUT)
.http2_initial_stream_window_size(H2_STREAM_WINDOW_SIZE)
.http2_initial_connection_window_size(H2_CONNECTION_WINDOW_SIZE)
.http2_adaptive_window(true)
.http2_keep_alive_interval(Some(H2_KEEP_ALIVE_INTERVAL))
.http2_keep_alive_timeout(H2_KEEP_ALIVE_TIMEOUT)
.http2_max_frame_size(H2_MAX_FRAME_SIZE)
.tcp_keepalive(Some(Duration::from_secs(10)))
.http2_prior_knowledge()
.http2_max_header_list_size(16 * 1024);
// 添加所有代理类型
if let Ok(proxy) = Proxy::http(&proxy_scheme) {
builder = builder.proxy(proxy);
}
@@ -108,6 +178,10 @@ impl NetworkManager {
/// 获取或创建系统代理客户端
fn get_or_create_system_proxy_client(&self) -> Client {
if self.should_reset_clients() {
self.reset_clients();
}
let mut client_guard = self.system_proxy_client.lock().unwrap();
if client_guard.is_none() {
@@ -117,8 +191,17 @@ impl NetworkManager {
.use_rustls_tls()
.pool_max_idle_per_host(5)
.pool_idle_timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(60));
.connect_timeout(DEFAULT_CONNECT_TIMEOUT)
.timeout(DEFAULT_REQUEST_TIMEOUT)
.http2_initial_stream_window_size(H2_STREAM_WINDOW_SIZE)
.http2_initial_connection_window_size(H2_CONNECTION_WINDOW_SIZE)
.http2_adaptive_window(true)
.http2_keep_alive_interval(Some(H2_KEEP_ALIVE_INTERVAL))
.http2_keep_alive_timeout(H2_KEEP_ALIVE_TIMEOUT)
.http2_max_frame_size(H2_MAX_FRAME_SIZE)
.tcp_keepalive(Some(Duration::from_secs(10)))
.http2_prior_knowledge()
.http2_max_header_list_size(16 * 1024);
if let Ok(p @ Sysproxy { enable: true, .. }) = Sysproxy::get_system_proxy() {
let proxy_scheme = format!("http://{}:{}", p.host, p.port);
@@ -164,18 +247,29 @@ impl NetworkManager {
user_agent: Option<String>,
accept_invalid_certs: bool,
) -> RequestBuilder {
if self.should_reset_clients() {
self.reset_clients();
}
let mut builder = ClientBuilder::new()
.use_rustls_tls()
.connect_timeout(Duration::from_secs(10));
.connect_timeout(DEFAULT_CONNECT_TIMEOUT)
.http2_initial_stream_window_size(H2_STREAM_WINDOW_SIZE)
.http2_initial_connection_window_size(H2_CONNECTION_WINDOW_SIZE)
.http2_adaptive_window(true)
.http2_keep_alive_interval(Some(H2_KEEP_ALIVE_INTERVAL))
.http2_keep_alive_timeout(H2_KEEP_ALIVE_TIMEOUT)
.http2_max_frame_size(H2_MAX_FRAME_SIZE)
.tcp_keepalive(Some(Duration::from_secs(10)))
.http2_prior_knowledge()
.http2_max_header_list_size(16 * 1024);
// 超时
if let Some(timeout) = timeout_secs {
builder = builder.timeout(Duration::from_secs(timeout));
} else {
builder = builder.timeout(Duration::from_secs(60));
builder = builder.timeout(DEFAULT_REQUEST_TIMEOUT);
}
// 设置代理
match proxy_type {
ProxyType::NoProxy => {
builder = builder.no_proxy();
@@ -217,10 +311,8 @@ impl NetworkManager {
}
}
// 证书验证选项
builder = builder.danger_accept_invalid_certs(accept_invalid_certs);
// 用户代理
if let Some(ua) = user_agent {
builder = builder.user_agent(ua);
} else {
@@ -234,13 +326,12 @@ impl NetworkManager {
builder = builder.user_agent(version);
}
// 构建请求
let client = builder.build().expect("Failed to build custom HTTP client");
client.get(url)
}
/// 执行GET请求
/// 执行GET请求,添加错误跟踪
pub async fn get(
&self,
url: &str,
@@ -249,18 +340,35 @@ impl NetworkManager {
user_agent: Option<String>,
accept_invalid_certs: bool,
) -> Result<Response> {
self.create_request(
let request = self.create_request(
url,
proxy_type,
timeout_secs,
user_agent,
accept_invalid_certs,
)
.send()
.await
.context("Failed to send HTTP request")
);
let timeout_duration = timeout_secs.unwrap_or(30);
match tokio::time::timeout(Duration::from_secs(timeout_duration), request.send()).await {
Ok(result) => match result {
Ok(response) => Ok(response),
Err(e) => {
self.record_connection_error(&e.to_string());
Err(anyhow::anyhow!("Failed to send HTTP request: {}", e))
}
pub async fn get_with_retry(
},
Err(_) => {
self.record_connection_error("Request timeout");
Err(anyhow::anyhow!(
"HTTP request timed out after {} seconds",
timeout_duration
))
}
}
}
pub async fn get_with_interrupt(
&self,
url: &str,
proxy_type: ProxyType,
@@ -268,43 +376,43 @@ impl NetworkManager {
user_agent: Option<String>,
accept_invalid_certs: bool,
) -> Result<Response> {
let max_retries = 2;
let mut last_error = None;
for attempt in 0..=max_retries {
if attempt > 0 {
logging!(info, Type::Network, "重试第{}次请求: {}", attempt, url);
tokio::time::sleep(Duration::from_millis(500)).await;
}
match self
.get(
let request = self.create_request(
url,
proxy_type,
timeout_secs,
user_agent.clone(),
user_agent,
accept_invalid_certs,
)
.await
{
Ok(resp) => return Ok(resp),
Err(e) => {
logging!(
warn,
Type::Network,
"请求失败 (尝试 {}/{}): {} - {}",
attempt + 1,
max_retries + 1,
url,
e
);
last_error = Some(e);
continue;
}
}
}
Err(last_error.unwrap_or_else(|| anyhow::anyhow!("请求失败,但没有具体错误信息")))
let timeout_duration = timeout_secs.unwrap_or(20);
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
let url_clone = url.to_string();
let watchdog = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(timeout_duration)).await;
let _ = cancel_tx.send(());
logging!(warn, Type::Network, true, "请求超时取消: {}", url_clone);
});
let result = tokio::select! {
result = request.send() => {
watchdog.abort();
result
},
_ = cancel_rx => {
self.record_connection_error(&format!("Request interrupted for: {}", url));
return Err(anyhow::anyhow!("Request interrupted after {} seconds", timeout_duration));
}
};
match result {
Ok(response) => Ok(response),
Err(e) => {
self.record_connection_error(&e.to_string());
Err(anyhow::anyhow!("Failed to send HTTP request: {}", e))
}
}
}
}