Revert "refactor: profile switch (#5197)"

This reverts commit c2dcd86722.
This commit is contained in:
Tunglies
2025-10-30 18:11:04 +08:00
parent 928f226d10
commit a869dbb441
36 changed files with 1257 additions and 5894 deletions

View File

@@ -1,14 +1,7 @@
use crate::{
APP_HANDLE, config::Config, constants::timing, logging, singleton, utils::logging::Type,
};
use crate::{APP_HANDLE, constants::timing, singleton};
use parking_lot::RwLock;
use serde_json::{Value, json};
use smartstring::alias::String;
use std::{
sync::Arc,
thread,
time::{SystemTime, UNIX_EPOCH},
};
use std::{sync::Arc, thread};
use tauri::{AppHandle, Manager, WebviewWindow};
use tauri_plugin_mihomo::{Mihomo, MihomoExt};
use tokio::sync::RwLockReadGuard;
@@ -73,14 +66,10 @@ impl Handle {
return;
}
{
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::RefreshClash);
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::RefreshClash);
}
Self::spawn_proxy_snapshot();
}
pub fn refresh_verge() {
@@ -96,37 +85,11 @@ impl Handle {
}
pub fn notify_profile_changed(profile_id: String) {
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::ProfileChanged {
current_profile_id: profile_id,
});
}
}
pub fn notify_profile_switch_finished(
profile_id: String,
success: bool,
notify: bool,
task_id: u64,
) {
Self::send_event(FrontendEvent::ProfileSwitchFinished {
profile_id,
success,
notify,
task_id,
Self::send_event(FrontendEvent::ProfileChanged {
current_profile_id: profile_id,
});
}
pub fn notify_rust_panic(message: String, location: String) {
Self::send_event(FrontendEvent::RustPanic { message, location });
}
pub fn notify_timer_updated(profile_index: String) {
Self::send_event(FrontendEvent::TimerUpdated { profile_index });
}
@@ -137,86 +100,6 @@ impl Handle {
pub fn notify_profile_update_completed(uid: String) {
Self::send_event(FrontendEvent::ProfileUpdateCompleted { uid });
Self::spawn_proxy_snapshot();
}
pub fn notify_proxies_updated(payload: Value) {
Self::send_event(FrontendEvent::ProxiesUpdated { payload });
}
pub async fn build_proxy_snapshot() -> Option<Value> {
let mihomo_guard = Self::mihomo().await;
let proxies = match mihomo_guard.get_proxies().await {
Ok(data) => match serde_json::to_value(&data) {
Ok(value) => value,
Err(error) => {
logging!(
warn,
Type::Frontend,
"Failed to serialize proxies snapshot: {error}"
);
return None;
}
},
Err(error) => {
logging!(
warn,
Type::Frontend,
"Failed to fetch proxies for snapshot: {error}"
);
return None;
}
};
drop(mihomo_guard);
let providers_guard = Self::mihomo().await;
let providers_value = match providers_guard.get_proxy_providers().await {
Ok(data) => serde_json::to_value(&data).unwrap_or_else(|error| {
logging!(
warn,
Type::Frontend,
"Failed to serialize proxy providers for snapshot: {error}"
);
Value::Null
}),
Err(error) => {
logging!(
warn,
Type::Frontend,
"Failed to fetch proxy providers for snapshot: {error}"
);
Value::Null
}
};
drop(providers_guard);
let profile_guard = Config::profiles().await;
let profile_id = profile_guard.latest_ref().current.clone();
drop(profile_guard);
let emitted_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as i64)
.unwrap_or(0);
let payload = json!({
"proxies": proxies,
"providers": providers_value,
"profileId": profile_id,
"emittedAt": emitted_at,
});
Some(payload)
}
fn spawn_proxy_snapshot() {
tauri::async_runtime::spawn(async {
if let Some(payload) = Handle::build_proxy_snapshot().await {
Handle::notify_proxies_updated(payload);
}
});
}
pub fn notice_message<S: Into<String>, M: Into<String>>(status: S, msg: M) {

View File

@@ -10,10 +10,7 @@ use anyhow::{Result, anyhow};
use smartstring::alias::String;
use std::{path::PathBuf, time::Instant};
use tauri_plugin_mihomo::Error as MihomoError;
use tokio::time::{sleep, timeout};
const RELOAD_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const MAX_RELOAD_ATTEMPTS: usize = 3;
use tokio::time::sleep;
impl CoreManager {
pub async fn use_default_config(&self, error_key: &str, error_msg: &str) -> Result<()> {
@@ -42,38 +39,12 @@ impl CoreManager {
return Ok((true, String::new()));
}
let start = Instant::now();
let _permit = self
.update_semaphore
.try_acquire()
.map_err(|_| anyhow!("Config update already in progress"))?;
let result = self.perform_config_update().await;
match &result {
Ok((success, msg)) => {
logging!(
info,
Type::Core,
"[ConfigUpdate] Finished (success={}, elapsed={}ms, msg={})",
success,
start.elapsed().as_millis(),
msg
);
}
Err(err) => {
logging!(
error,
Type::Core,
"[ConfigUpdate] Failed after {}ms: {}",
start.elapsed().as_millis(),
err
);
}
}
result
self.perform_config_update().await
}
fn should_update_config(&self) -> Result<bool> {
@@ -91,73 +62,20 @@ impl CoreManager {
}
async fn perform_config_update(&self) -> Result<(bool, String)> {
logging!(debug, Type::Core, "[ConfigUpdate] Pipeline start");
let total_start = Instant::now();
let mut stage_timer = Instant::now();
Config::generate().await?;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Generation completed in {}ms",
stage_timer.elapsed().as_millis()
);
stage_timer = Instant::now();
let validation_result = CoreConfigValidator::global().validate_config().await;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Validation completed in {}ms",
stage_timer.elapsed().as_millis()
);
match validation_result {
match CoreConfigValidator::global().validate_config().await {
Ok((true, _)) => {
stage_timer = Instant::now();
let run_path = Config::generate_file(ConfigType::Run).await?;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Runtime file generated in {}ms",
stage_timer.elapsed().as_millis()
);
stage_timer = Instant::now();
self.apply_config(run_path).await?;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Core apply completed in {}ms",
stage_timer.elapsed().as_millis()
);
logging!(
debug,
Type::Core,
"[ConfigUpdate] Pipeline succeeded in {}ms",
total_start.elapsed().as_millis()
);
Ok((true, String::new()))
}
Ok((false, error_msg)) => {
Config::runtime().await.discard();
logging!(
warn,
Type::Core,
"[ConfigUpdate] Validation reported failure after {}ms: {}",
total_start.elapsed().as_millis(),
error_msg
);
Ok((false, error_msg))
}
Err(e) => {
Config::runtime().await.discard();
logging!(
error,
Type::Core,
"[ConfigUpdate] Validation errored after {}ms: {}",
total_start.elapsed().as_millis(),
e
);
Err(e)
}
}
@@ -170,49 +88,17 @@ impl CoreManager {
pub(super) async fn apply_config(&self, path: PathBuf) -> Result<()> {
let path_str = dirs::path_to_str(&path)?;
let reload_start = Instant::now();
match self.reload_config_with_retry(path_str).await {
match self.reload_config(path_str).await {
Ok(_) => {
Config::runtime().await.apply();
logging!(
debug,
Type::Core,
"Configuration applied (reload={}ms)",
reload_start.elapsed().as_millis()
);
logging!(info, Type::Core, "Configuration applied");
Ok(())
}
Err(err) if Self::should_restart_on_error(&err) => {
self.retry_with_restart(path_str).await
}
Err(err) => {
if Self::should_restart_for_anyhow(&err) {
logging!(
warn,
Type::Core,
"Reload failed after {}ms with retryable/timeout error; attempting restart: {}",
reload_start.elapsed().as_millis(),
err
);
match self.retry_with_restart(path_str).await {
Ok(_) => return Ok(()),
Err(retry_err) => {
logging!(
error,
Type::Core,
"Reload retry with restart failed: {}",
retry_err
);
Config::runtime().await.discard();
return Err(retry_err);
}
}
}
Config::runtime().await.discard();
logging!(
error,
Type::Core,
"Failed to apply config after {}ms: {}",
reload_start.elapsed().as_millis(),
err
);
Err(anyhow!("Failed to apply config: {}", err))
}
}
@@ -227,116 +113,17 @@ impl CoreManager {
self.restart_core().await?;
sleep(timing::CONFIG_RELOAD_DELAY).await;
self.reload_config_with_retry(config_path).await?;
self.reload_config(config_path).await?;
Config::runtime().await.apply();
logging!(info, Type::Core, "Configuration applied after restart");
Ok(())
}
async fn reload_config_with_retry(&self, path: &str) -> Result<()> {
for attempt in 1..=MAX_RELOAD_ATTEMPTS {
let attempt_start = Instant::now();
let reload_future = self.reload_config_once(path);
match timeout(RELOAD_TIMEOUT, reload_future).await {
Ok(Ok(())) => {
logging!(
debug,
Type::Core,
"reload_config attempt {}/{} succeeded in {}ms",
attempt,
MAX_RELOAD_ATTEMPTS,
attempt_start.elapsed().as_millis()
);
return Ok(());
}
Ok(Err(err)) => {
logging!(
warn,
Type::Core,
"reload_config attempt {}/{} failed after {}ms: {}",
attempt,
MAX_RELOAD_ATTEMPTS,
attempt_start.elapsed().as_millis(),
err
);
if attempt == MAX_RELOAD_ATTEMPTS {
return Err(anyhow!(
"Failed to reload config after {} attempts: {}",
attempt,
err
));
}
}
Err(_) => {
logging!(
warn,
Type::Core,
"reload_config attempt {}/{} timed out after {:?}",
attempt,
MAX_RELOAD_ATTEMPTS,
RELOAD_TIMEOUT
);
if attempt == MAX_RELOAD_ATTEMPTS {
return Err(anyhow!(
"Config reload timed out after {:?} ({} attempts)",
RELOAD_TIMEOUT,
MAX_RELOAD_ATTEMPTS
));
}
}
}
}
Err(anyhow!(
"Config reload retry loop exited unexpectedly ({} attempts)",
MAX_RELOAD_ATTEMPTS
))
}
async fn reload_config_once(&self, path: &str) -> Result<(), MihomoError> {
logging!(
info,
Type::Core,
"[ConfigUpdate] reload_config_once begin path={} ",
path
);
let start = Instant::now();
let result = handle::Handle::mihomo()
async fn reload_config(&self, path: &str) -> Result<(), MihomoError> {
handle::Handle::mihomo()
.await
.reload_config(true, path)
.await;
let elapsed = start.elapsed().as_millis();
match result {
Ok(()) => {
logging!(
info,
Type::Core,
"[ConfigUpdate] reload_config_once succeeded (elapsed={}ms)",
elapsed
);
Ok(())
}
Err(err) => {
logging!(
warn,
Type::Core,
"[ConfigUpdate] reload_config_once failed (elapsed={}ms, err={})",
elapsed,
err
);
Err(err)
}
}
}
fn should_restart_for_anyhow(err: &anyhow::Error) -> bool {
if let Some(mihomo_err) = err.downcast_ref::<MihomoError>() {
return Self::should_restart_on_error(mihomo_err);
}
let msg = err.to_string();
msg.contains("timed out")
|| msg.contains("reload")
|| msg.contains("Failed to apply config")
.await
}
fn should_restart_on_error(err: &MihomoError) -> bool {

View File

@@ -1,71 +1,38 @@
use crate::{constants::retry, logging, utils::logging::Type};
use once_cell::sync::Lazy;
use crate::{
constants::{retry, timing},
logging,
utils::logging::Type,
};
use parking_lot::RwLock;
use smartstring::alias::String;
use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
mpsc,
},
thread,
time::Instant,
};
use tauri::Emitter;
use tauri::async_runtime;
use tauri::{Emitter, WebviewWindow};
#[allow(dead_code)] // Temporarily suppress warnings while diagnostics disable certain events
#[derive(Debug, Clone)]
pub enum FrontendEvent {
RefreshClash,
RefreshVerge,
RefreshProxy,
ProxiesUpdated {
payload: serde_json::Value,
},
NoticeMessage {
status: String,
message: String,
},
ProfileChanged {
current_profile_id: String,
},
ProfileSwitchFinished {
profile_id: String,
success: bool,
notify: bool,
task_id: u64,
},
TimerUpdated {
profile_index: String,
},
ProfileUpdateStarted {
uid: String,
},
ProfileUpdateCompleted {
uid: String,
},
RustPanic {
message: String,
location: String,
},
NoticeMessage { status: String, message: String },
ProfileChanged { current_profile_id: String },
TimerUpdated { profile_index: String },
ProfileUpdateStarted { uid: String },
ProfileUpdateCompleted { uid: String },
}
static EMIT_SERIALIZER: Lazy<tokio::sync::Mutex<()>> = Lazy::new(|| tokio::sync::Mutex::new(()));
#[derive(Debug, Default)]
struct EventStats {
total_sent: AtomicU64,
total_errors: AtomicU64,
last_error_time: RwLock<Option<Instant>>,
}
#[derive(Debug, Default)]
#[allow(dead_code)]
struct BufferedProxies {
pending: parking_lot::Mutex<Option<serde_json::Value>>,
in_flight: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct ErrorMessage {
pub status: String,
@@ -80,7 +47,6 @@ pub struct NotificationSystem {
pub(super) is_running: bool,
stats: EventStats,
emergency_mode: RwLock<bool>,
proxies_buffer: Arc<BufferedProxies>,
}
impl Default for NotificationSystem {
@@ -97,7 +63,6 @@ impl NotificationSystem {
is_running: false,
stats: EventStats::default(),
emergency_mode: RwLock::new(false),
proxies_buffer: Arc::new(BufferedProxies::default()),
}
}
@@ -152,78 +117,13 @@ impl NotificationSystem {
return;
};
let event_label = Self::describe_event(&event);
match event {
FrontendEvent::ProxiesUpdated { payload } => {
logging!(
debug,
Type::Frontend,
"Queueing proxies-updated event for buffered emit: {}",
event_label
);
system.enqueue_proxies_updated(payload);
}
other => {
logging!(
debug,
Type::Frontend,
"Queueing event for async emit: {}",
event_label
);
let (event_name, payload_result) = system.serialize_event(other);
let payload = match payload_result {
Ok(value) => value,
Err(err) => {
logging!(
warn,
Type::Frontend,
"Failed to serialize event {}: {}",
event_name,
err
);
return;
}
};
logging!(
debug,
Type::Frontend,
"Dispatching async emit: {}",
event_name
);
let _ = Self::emit_via_app(event_name, payload);
}
}
}
fn enqueue_proxies_updated(&self, payload: serde_json::Value) {
let replaced = {
let mut slot = self.proxies_buffer.pending.lock();
let had_pending = slot.is_some();
*slot = Some(payload);
had_pending
};
if replaced {
logging!(
debug,
Type::Frontend,
"Replaced pending proxies-updated payload with latest snapshot"
);
if system.should_skip_event(&event) {
return;
}
if self
.proxies_buffer
.in_flight
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let buffer = Arc::clone(&self.proxies_buffer);
async_runtime::spawn(async move {
Self::flush_proxies(buffer).await;
});
if let Some(window) = super::handle::Handle::get_window() {
system.emit_to_window(&window, event);
thread::sleep(timing::EVENT_EMIT_DELAY);
}
}
@@ -235,95 +135,25 @@ impl NotificationSystem {
)
}
fn emit_via_app(event_name: &'static str, payload: serde_json::Value) -> Result<(), String> {
let app_handle = super::handle::Handle::app_handle().clone();
let event_name = event_name.to_string();
async_runtime::spawn(async move {
if let Err(err) = app_handle.emit_to("main", event_name.as_str(), payload) {
logging!(
warn,
Type::Frontend,
"emit_to failed for {}: {}",
event_name,
err
);
fn emit_to_window(&self, window: &WebviewWindow, event: FrontendEvent) {
let (event_name, payload) = self.serialize_event(event);
let Ok(payload) = payload else {
self.stats.total_errors.fetch_add(1, Ordering::Relaxed);
return;
};
match window.emit(event_name, payload) {
Ok(_) => {
self.stats.total_sent.fetch_add(1, Ordering::Relaxed);
}
});
Ok(())
}
async fn flush_proxies(buffer: Arc<BufferedProxies>) {
const EVENT_NAME: &str = "proxies-updated";
loop {
let payload_opt = {
let mut guard = buffer.pending.lock();
guard.take()
};
let Some(payload) = payload_opt else {
buffer.in_flight.store(false, Ordering::Release);
if buffer.pending.lock().is_some()
&& buffer
.in_flight
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
continue;
}
break;
};
logging!(debug, Type::Frontend, "Dispatching buffered proxies emit");
let _guard = EMIT_SERIALIZER.lock().await;
if let Err(err) = Self::emit_via_app(EVENT_NAME, payload) {
logging!(
warn,
Type::Frontend,
"Buffered proxies emit failed: {}",
err
);
Err(e) => {
logging!(warn, Type::Frontend, "Event emit failed: {}", e);
self.handle_emit_error();
}
}
}
fn describe_event(event: &FrontendEvent) -> String {
match event {
FrontendEvent::RefreshClash => "RefreshClash".into(),
FrontendEvent::RefreshVerge => "RefreshVerge".into(),
FrontendEvent::RefreshProxy => "RefreshProxy".into(),
FrontendEvent::ProxiesUpdated { .. } => "ProxiesUpdated".into(),
FrontendEvent::NoticeMessage { status, .. } => {
format!("NoticeMessage({})", status).into()
}
FrontendEvent::ProfileChanged { current_profile_id } => {
format!("ProfileChanged({})", current_profile_id).into()
}
FrontendEvent::ProfileSwitchFinished {
profile_id,
task_id,
..
} => format!(
"ProfileSwitchFinished(profile={}, task={})",
profile_id, task_id
)
.into(),
FrontendEvent::TimerUpdated { profile_index } => {
format!("TimerUpdated({})", profile_index).into()
}
FrontendEvent::ProfileUpdateStarted { uid } => {
format!("ProfileUpdateStarted({})", uid).into()
}
FrontendEvent::ProfileUpdateCompleted { uid } => {
format!("ProfileUpdateCompleted({})", uid).into()
}
FrontendEvent::RustPanic { message, .. } => format!("RustPanic({})", message).into(),
}
}
#[allow(dead_code)]
fn serialize_event(
&self,
event: FrontendEvent,
@@ -337,25 +167,9 @@ impl NotificationSystem {
"verge://notice-message",
serde_json::to_value((status, message)),
),
FrontendEvent::RefreshProxy => ("verge://refresh-proxy-config", Ok(json!("yes"))),
FrontendEvent::ProxiesUpdated { payload } => ("proxies-updated", Ok(payload)),
FrontendEvent::ProfileChanged { current_profile_id } => {
("profile-changed", Ok(json!(current_profile_id)))
}
FrontendEvent::ProfileSwitchFinished {
profile_id,
success,
notify,
task_id,
} => (
"profile-switch-finished",
Ok(json!({
"profileId": profile_id,
"success": success,
"notify": notify,
"taskId": task_id
})),
),
FrontendEvent::TimerUpdated { profile_index } => {
("verge://timer-updated", Ok(json!(profile_index)))
}
@@ -365,10 +179,6 @@ impl NotificationSystem {
FrontendEvent::ProfileUpdateCompleted { uid } => {
("profile-update-completed", Ok(json!({ "uid": uid })))
}
FrontendEvent::RustPanic { message, location } => (
"rust-panic",
Ok(json!({ "message": message, "location": location })),
),
}
}
@@ -394,19 +204,10 @@ impl NotificationSystem {
}
if let Some(sender) = &self.sender {
if sender.send(event).is_err() {
logging!(
warn,
Type::Frontend,
"Failed to send event to worker thread"
);
self.handle_emit_error();
return false;
}
return true;
sender.send(event).is_ok()
} else {
false
}
false
}
pub fn shutdown(&mut self) {