refactor: profile switch (#5197)

* refactor: proxy refresh

* fix(proxy-store): properly hydrate and filter backend provider snapshots

* fix(proxy-store): add monotonic fetch guard and event bridge cleanup

* fix(proxy-store): tweak fetch sequencing guard to prevent snapshot invalidation from wiping fast responses

* docs: UPDATELOG.md

* fix(proxy-snapshot, proxy-groups): restore last-selected proxy and group info

* fix(proxy): merge static and provider entries in snapshot; fix Virtuoso viewport height

* fix(proxy-groups): restrict reduced-height viewport to chain-mode column

* refactor(profiles): introduce a state machine

* refactor:replace state machine with reducer

* refactor:introduce a profile switch worker

* refactor: hooked up a backend-driven profile switch flow

* refactor(profile-switch): serialize switches with async queue and enrich frontend events

* feat(profiles): centralize profile switching with reducer/driver queue to fix stuck UI on rapid toggles

* chore: translate comments and log messages to English to avoid encoding issues

* refactor: migrate backend queue to SwitchDriver actor

* fix(profile): unify error string types in validation helper

* refactor(profile): make switch driver fully async and handle panics safely

* refactor(cmd): move switch-validation helper into new profile_switch module

* refactor(profile): modularize switch logic into profile_switch.rs

* refactor(profile_switch): modularize switch handler

- Break monolithic switch handler into proper module hierarchy
- Move shared globals, constants, and SwitchScope guard to state.rs
- Isolate queue orchestration and async task spawning in driver.rs
- Consolidate switch pipeline and config patching in workflow.rs
- Extract request pre-checks/YAML validation into validation.rs

* refactor(profile_switch): centralize state management and add cancellation flow

- Introduced SwitchManager in state.rs to unify mutex, sequencing, and SwitchScope handling.
- Added SwitchCancellation and SwitchRequest wrappers to encapsulate cancel tokens and notifications.
- Updated driver to allocate task IDs via SwitchManager, cancel old tokens, and queue next jobs in order.
- Updated workflow to check cancellation and sequence at each phase, replacing global flags with manager APIs.

* feat(profile_switch): integrate explicit state machine for profile switching

- workflow.rs:24 now delegates each switch to SwitchStateMachine, passing an owned SwitchRequest.
  Queue cancellation and state-sequence checks are centralized inside the machine instead of scattered guards.
- workflow.rs:176 replaces the old helper with `SwitchStateMachine::new(manager(), None, profiles).run().await`,
  ensuring manual profile patches follow the same workflow (locking, validation, rollback) as queued switches.
- workflow.rs:180 & 275 expose `validate_profile_yaml` and `restore_previous_profile` for reuse inside the state machine.

- workflow/state_machine.rs:1 introduces a dedicated state machine module.
  It manages global mutex acquisition, request/cancellation state, YAML validation, draft patching,
  `CoreManager::update_config`, failure rollback, and tray/notification side-effects.
  Transitions check for cancellations and stale sequences; completions release guards via `SwitchScope` drop.

* refactor(profile-switch): integrate stage-aware panic handling

- src-tauri/src/cmd/profile_switch/workflow/state_machine.rs:1
  Defines SwitchStage and SwitchPanicInfo as crate-visible, wraps each transition in with_stage(...) with catch_unwind, and propagates CmdResult<bool> to distinguish validation failures from panics while keeping cancellation semantics.

- src-tauri/src/cmd/profile_switch/workflow.rs:25
  Updates run_switch_job to return Result<bool, SwitchPanicInfo>, routing timeout, validation, config, and stage panic cases separately. Reuses SwitchPanicInfo for logging/UI notifications; patch_profiles_config maps state-machine panics into user-facing error strings.

- src-tauri/src/cmd/profile_switch/driver.rs:1
  Adds SwitchJobOutcome to unify workflow results: normal completions carry bool, and panics propagate SwitchPanicInfo. The driver loop now logs panics explicitly and uses AssertUnwindSafe(...).catch_unwind() to guard setup-phase panics.

* refactor(profile-switch): add watchdog, heartbeat, and async timeout guards

- Introduce SwitchHeartbeat for stage tracking and timing; log stage transitions with elapsed durations.
- Add watchdog in driver to cancel stalled switches (5s heartbeat timeout).
- Wrap blocking ops (Config::apply, tray updates, profiles_save_file_safe, etc.) with time::timeout to prevent async stalls.
- Improve logs for stage transitions and watchdog timeouts to clarify cancellation points.

* refactor(profile-switch): async post-switch tasks, early lock release, and spawn_blocking for IO

* feat(profile-switch): track cleanup and coordinate pipeline

- Add explicit cleanup tracking in the driver (`cleanup_profiles` map + `CleanupDone` messages) to know when background post-switch work is still running before starting a new workflow. (driver.rs:29-50)
- Update `handle_enqueue` to detect “cleanup in progress”: same-profile retries are short-circuited; other requests collapse the pending queue, cancelling old tokens so only the latest intent survives. (driver.rs:176-247)
- Rework scheduling helpers: `start_next_job` refuses to start while cleanup is outstanding; discarded requests release cancellation tokens; cleanup completion explicitly restarts the pipeline. (driver.rs:258-442)

* feat(profile-switch): unify post-switch cleanup handling

- workflow.rs (25-427) returns `SwitchWorkflowResult` (success + CleanupHandle) or `SwitchWorkflowError`.
  All failure/timeout paths stash post-switch work into a single CleanupHandle.
  Cleanup helpers (`notify_profile_switch_finished` and `close_connections_after_switch`) run inside that task for proper lifetime handling.

- driver.rs (29-439) propagates CleanupHandle through `SwitchJobOutcome`, spawns a bridge to wait for completion, and blocks `start_next_job` until done.
  Direct driver-side panics now schedule failure cleanup via the shared helper.

* tmp

* Revert "tmp"

This reverts commit e582cf4a65.

* refactor: queue frontend events through async dispatcher

* refactor: queue frontend switch/proxy events and throttle notices

* chore: frontend debug log

* fix: re-enable only ProfileSwitchFinished events - keep others suppressed for crash isolation

- Re-enabled only ProfileSwitchFinished events; RefreshClash, RefreshProxy, and ProfileChanged remain suppressed (they log suppression messages)
- Allows frontend to receive task completion notifications for UI feedback while crash isolation continues
- src-tauri/src/core/handle.rs now only suppresses notify_profile_changed
- Serialized emitter, frontend logging bridge, and other diagnostics unchanged

* refactor: refreshClashData

* refactor(proxy): stabilize proxy switch pipeline and rendering

- Add coalescing buffer in notification.rs to emit only the latest proxies-updated snapshot
- Replace nextTick with queueMicrotask in asyncQueue.ts for same-frame hydration
- Hide auto-generated GLOBAL snapshot and preserve optional metadata in proxy-snapshot.ts
- Introduce stable proxy rendering state in AppDataProvider (proxyTargetProfileId, proxyDisplayProfileId, isProxyRefreshPending)
- Update proxy page to fade content during refresh and overlay status banner instead of showing incomplete snapshot

* refactor(profiles): move manual activating logic to reducer for deterministic queue tracking

* refactor: replace proxy-data event bridge with pure polling and simplify proxy store

- Replaced the proxy-data event bridge with pure polling: AppDataProvider now fetches the initial snapshot and drives refreshes from the polled switchStatus, removing verge://refresh-* listeners (src/providers/app-data-provider.tsx).
- Simplified proxy-store by dropping the proxies-updated listener queue and unused payload/normalizer helpers; relies on SWR/provider fetch path + calcuProxies for live updates (src/stores/proxy-store.ts).
- Trimmed layout-level event wiring to keep only notice/show/hide subscriptions, removing obsolete refresh listeners (src/pages/_layout/useLayoutEvents.ts).

* refactor(proxy): streamline proxies-updated handling and store event flow

- AppDataProvider now treats `proxies-updated` as the fast path: the listener
  calls `applyLiveProxyPayload` immediately and schedules only a single fallback
  `fetchLiveProxies` ~600 ms later (replacing the old 0/250/1000/2000 cascade).
  Expensive provider/rule refreshes run in parallel via `Promise.allSettled`, and
  the multi-stage queue on profile updates completion was removed
  (src/providers/app-data-provider.tsx).

- Rebuilt proxy-store to support the event flow: restored `setLive`, provider
  normalization, and an animation-frame + async queue that applies payloads without
  blocking. Exposed `applyLiveProxyPayload` so providers can push events directly
  into the store (src/stores/proxy-store.ts).

* refactor: switch delay

* refactor(app-data-provider): trigger getProfileSwitchStatus revalidation on profile-switch-finished

- AppDataProvider now listens to `profile-switch-finished` and calls `mutate("getProfileSwitchStatus")` to immediately update state and unlock buttons (src/providers/app-data-provider.tsx).
- Retain existing detailed timing logs for monitoring other stages.
- Frontend success notifications remain instant; background refreshes continue asynchronously.

* fix(profiles): prevent duplicate toast on page remount

* refactor(profile-switch): make active switches preemptible and prevent queue piling

- Add notify mechanism to SwitchCancellation to await cancellation without busy-waiting (state.rs:82)
- Collapse pending queue to a single entry in the driver; cancel in-flight task on newer request (driver.rs:232)
- Update handle_update_core to watch cancel token and 30s timeout; release locks, discard draft, and exit early if canceled (state_machine.rs:301)
- Providers revalidate status immediately on profile-switch-finished events (app-data-provider.tsx:208)

* refactor(core): make core reload phase controllable, reduce 0xcfffffff risk

- CoreManager::apply_config now calls `reload_config_with_retry`, each attempt waits up to 5s, retries 3 times; on failure, returns error with duration logged and triggers core restart if needed (src-tauri/src/core/manager/config.rs:175, 205)
- `reload_config_with_retry` logs attempt info on timeout or error; if error is a Mihomo connection issue, fallback to original restart logic (src-tauri/src/core/manager/config.rs:211)
- `reload_config_once` retains original Mihomo call for retry wrapper usage (src-tauri/src/core/manager/config.rs:247)

* chore(frontend-logs): downgrade routine event logs from info to debug

- Logs like `emit_via_app entering spawn_blocking`, `Async emit…`, `Buffered proxies…` are now debug-level (src-tauri/src/core/notification.rs:155, :265, :309…)
- Genuine warnings/errors (failures/timeouts) remain at warn/error
- Core stage logs remain info to keep backend tracking visible

* refactor(frontend-emit): make emit_via_app fire-and-forget async

- `emit_via_app` now a regular function; spawns with `tokio::spawn` and logs a warn if `emit_to` fails, caller returns immediately (src-tauri/src/core/notification.rs:269)
- Removed `.await` at Async emit and flush_proxies calls; only record dispatch duration and warn on failure (src-tauri/src/core/notification.rs:211, :329)

* refactor(ui): restructure profile switch for event-driven speed + polling stability

- Backend
  - SwitchManager maintains a lightweight event queue: added `event_sequence`, `recent_events`, and `SwitchResultEvent`; provides `push_event` / `events_after` (state.rs)
  - `handle_completion` pushes events on success/failure and keeps `last_result` (driver.rs) for frontend incremental fetch
  - New Tauri command `get_profile_switch_events(after_sequence)` exposes `events_after` (profile_switch/mod.rs → profile.rs → lib.rs)
- Notification system
  - `NotificationSystem::process_event` only logs debug, disables WebView `emit_to`, fixes 0xcfffffff
  - Related emit/buffer functions now safe no-op, removed unused structures and warnings (notification.rs)
- Frontend
  - services/cmds.ts defines `SwitchResultEvent` and `getProfileSwitchEvents`
  - `AppDataProvider` holds `switchEventSeqRef`, polls incremental events every 0.25s (busy) / 1s (idle); each event triggers:
      - immediate `globalMutate("getProfiles")` to refresh current profile
      - background refresh of proxies/providers/rules via `Promise.allSettled` (failures logged, non-blocking)
      - forced `mutateSwitchStatus` to correct state
  - original switchStatus effect calls `handleSwitchResult` as fallback; other toast/activation logic handled in profiles.tsx
- Commands / API cleanup
  - removed `pub use profile_switch::*;` in cmd::mod.rs to avoid conflicts; frontend uses new command polling

* refactor(frontend): optimize profile switch with optimistic updates

* refactor(profile-switch): switch to event-driven flow with Profile Store

- SwitchManager pushes events; frontend polls get_profile_switch_events
- Zustand store handles optimistic profiles; AppDataProvider applies updates and background-fetches
- UI flicker removed

* fix(app-data): re-hook profile store updates during switch hydration

* fix(notification): restore frontend event dispatch and non-blocking emits

* fix(app-data-provider): restore proxy refresh and seed snapshot after refactor

* fix: ensure switch completion events are received and handle proxies-updated

* fix(app-data-provider): dedupe switch results by taskId and fix stale profile state

* fix(profile-switch): ensure patch_profiles_config_by_profile_index waits for real completion and handle join failures in apply_config_with_timeout

* docs: UPDATELOG.md

* chore: add necessary comments

* fix(core): always dispatch async proxy snapshot after RefreshClash event

* fix(proxy-store, provider): handle pending snapshots and proxy profiles

- Added pending snapshot tracking in proxy-store so `lastAppliedFetchId` no longer jumps on seed. Profile adoption is deferred until a qualifying fetch completes. Exposed `clearPendingProfile` for rollback support.
- Cleared pending snapshot state whenever live payloads apply or the store resets, preventing stale optimistic profile IDs after failures.
- In provider integration, subscribed to the pending proxy profile and fed it into target-profile derivation. Cleared it on failed switch results so hydration can advance and UI status remains accurate.

* fix(proxy): re-hook tray refresh events into proxy refresh queue

- Reattached listen("verge://refresh-proxy-config", …) at src/providers/app-data-provider.tsx:402 and registered it for cleanup.
- Added matching window fallback handler at src/providers/app-data-provider.tsx:430 so in-app dispatches share the same refresh path.

* fix(proxy-snapshot/proxy-groups): address review findings on snapshot placeholders

- src/utils/proxy-snapshot.ts:72-95 now derives snapshot group members solely from proxy-groups.proxies, so provider ids under `use` no longer generate placeholder proxy items.
- src/components/proxy/proxy-groups.tsx:665-677 lets the hydration overlay capture pointer events (and shows a wait cursor) so users can’t interact with snapshot-only placeholders before live data is ready.

* fix(profile-switch): preserve queued requests and avoid stale connection teardown

- Keep earlier queued switches intact by dropping the blanket “collapse” call: after removing duplicates for the same profile, new requests are simply appended, leaving other profiles pending (driver.rs:376). Resolves queue-loss scenario.
- Gate connection cleanup on real successes so cancelled/stale runs no longer tear down Mihomo connections; success handler now skips close_connections_after_switch when success == false (workflow.rs:419).

* fix(profile-switch, layout): improve profile validation and restore backend refresh

- Hardened profile validation using `tokio::fs` with a 5s timeout and offloading YAML parsing to `AsyncHandler::spawn_blocking`, preventing slow disks or malformed files from freezing the runtime (src-tauri/src/cmd/profile_switch/validation.rs:9, 71).
- Restored backend-triggered refresh handling by listening for `verge://refresh-clash-config` / `verge://refresh-verge-config` and invoking shared refresh services so SWR caches stay in sync with core events (src/pages/_layout/useLayoutEvents.ts:6, 45, 55).

* feat(profile-switch): handle cancellations for superseded requests

- Added a `cancelled` flag and constructor so superseded requests publish an explicit cancellation instead of a failure (src-tauri/src/cmd/profile_switch/state.rs:249, src-tauri/src/cmd/profile_switch/driver.rs:482)
- Updated the profile switch effect to log cancellations as info, retain the shared `mutate` call, and skip emitting error toasts while still refreshing follow-up work (src/pages/profiles.tsx:554, src/pages/profiles.tsx:581)
- Exposed the new flag on the TypeScript contract to keep downstream consumers type-safe (src/services/cmds.ts:20)

* fix(profiles): wrap logging payload for Tauri frontend_log

* fix(profile-switch): add rollback and error propagation for failed persistence

- Added rollback on apply failure so Mihomo restores to the previous profile
  before exiting the success path early (state_machine.rs:474).
- Reworked persist_profiles_with_timeout to surface timeout/join/save errors,
  convert them into CmdResult failures, and trigger rollback + error propagation
  when persistence fails (state_machine.rs:703).

* fix(profile-switch): prevent mid-finalize reentrancy and lingering tasks

* fix(profile-switch): preserve pending queue and surface discarded switches

* fix(profile-switch): avoid draining Mihomo sockets on failed/cancelled switches

* fix(app-data-provider): restore backend-driven refresh and reattach fallbacks

* fix(profile-switch): queue concurrent updates and add bounded wait/backoff

* fix(proxy): trigger live refresh on app start for proxy snapshot

* refactor(profile-switch): split flow into layers and centralize async cleanup

- Introduced `SwitchDriver` to encapsulate queue and driver logic while keeping the public Tauri command API.
- Added workflow/cleanup helpers for notification dispatch and Mihomo connection draining, re-exported for API consistency.
- Replaced monolithic state machine with `core.rs`, `context.rs`, and `stages.rs`, plus a thin `mod.rs` re-export layer; stage methods are now individually testable.
- Removed legacy `workflow/state_machine.rs` and adjusted visibility on re-exported types/constants to ensure compilation.
This commit is contained in:
Sline
2025-10-30 17:29:15 +08:00
committed by GitHub
parent af79bcd1cf
commit c2dcd86722
36 changed files with 5912 additions and 1275 deletions

View File

@@ -0,0 +1,48 @@
use super::CmdResult;
use crate::{logging, utils::logging::Type};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FrontendLogPayload {
pub level: Option<String>,
pub message: String,
pub context: Option<serde_json::Value>,
}
#[tauri::command]
pub fn frontend_log(payload: FrontendLogPayload) -> CmdResult<()> {
let level = payload.level.as_deref().unwrap_or("info");
match level {
"trace" | "debug" => logging!(
debug,
Type::Frontend,
"[frontend] {}",
payload.message.as_str()
),
"warn" => logging!(
warn,
Type::Frontend,
"[frontend] {}",
payload.message.as_str()
),
"error" => logging!(
error,
Type::Frontend,
"[frontend] {}",
payload.message.as_str()
),
_ => logging!(
info,
Type::Frontend,
"[frontend] {}",
payload.message.as_str()
),
}
if let Some(context) = payload.context {
logging!(info, Type::Frontend, "[frontend] context: {}", context);
}
Ok(())
}

View File

@@ -7,10 +7,12 @@ pub type CmdResult<T = ()> = Result<T, String>;
pub mod app;
pub mod backup;
pub mod clash;
pub mod frontend;
pub mod lightweight;
pub mod media_unlock_checker;
pub mod network;
pub mod profile;
mod profile_switch;
pub mod proxy;
pub mod runtime;
pub mod save_profile;
@@ -25,6 +27,7 @@ pub mod webdav;
pub use app::*;
pub use backup::*;
pub use clash::*;
pub use frontend::*;
pub use lightweight::*;
pub use media_unlock_checker::*;
pub use network::*;

View File

@@ -1,5 +1,4 @@
use super::CmdResult;
use super::StringifyErr;
use super::{CmdResult, StringifyErr, profile_switch};
use crate::{
config::{
Config, IProfiles, PrfItem, PrfOption,
@@ -9,77 +8,191 @@ use crate::{
},
profiles_append_item_safe,
},
core::{CoreManager, handle, timer::Timer, tray::Tray},
feat, logging,
process::AsyncHandler,
ret_err,
core::{CoreManager, handle, timer::Timer},
feat, logging, ret_err,
utils::{dirs, help, logging::Type},
};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use smartstring::alias::String;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
// 全局请求序列号跟踪,用于避免队列化执行
static CURRENT_REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
use crate::cmd::profile_switch::{ProfileSwitchStatus, SwitchResultEvent};
static CURRENT_SWITCHING_PROFILE: AtomicBool = AtomicBool::new(false);
#[tauri::command]
pub async fn get_profiles() -> CmdResult<IProfiles> {
// 策略1: 尝试快速获取latest数据
let latest_result = tokio::time::timeout(Duration::from_millis(500), async {
let profiles = Config::profiles().await;
let latest = profiles.latest_ref();
IProfiles {
current: latest.current.clone(),
items: latest.items.clone(),
}
})
.await;
match latest_result {
Ok(profiles) => {
logging!(info, Type::Cmd, "快速获取配置列表成功");
return Ok(profiles);
}
Err(_) => {
logging!(warn, Type::Cmd, "快速获取配置超时(500ms)");
}
}
// 策略2: 如果快速获取失败尝试获取data()
let data_result = tokio::time::timeout(Duration::from_secs(2), async {
let profiles = Config::profiles().await;
let data = profiles.latest_ref();
IProfiles {
current: data.current.clone(),
items: data.items.clone(),
}
})
.await;
match data_result {
Ok(profiles) => {
logging!(info, Type::Cmd, "获取draft配置列表成功");
return Ok(profiles);
}
Err(join_err) => {
logging!(
error,
Type::Cmd,
"获取draft配置任务失败或超时: {}",
join_err
);
}
}
// 策略3: fallback尝试重新创建配置
logging!(warn, Type::Cmd, "所有获取配置策略都失败尝试fallback");
Ok(IProfiles::new().await)
#[derive(Clone)]
struct CachedProfiles {
snapshot: IProfiles,
captured_at: Instant,
}
/// 增强配置文件
static PROFILES_CACHE: Lazy<RwLock<Option<CachedProfiles>>> = Lazy::new(|| RwLock::new(None));
#[derive(Default)]
struct SnapshotMetrics {
fast_hits: AtomicU64,
cache_hits: AtomicU64,
blocking_hits: AtomicU64,
refresh_scheduled: AtomicU64,
last_log_ms: AtomicU64,
}
static SNAPSHOT_METRICS: Lazy<SnapshotMetrics> = Lazy::new(SnapshotMetrics::default);
/// Store the latest snapshot so cache consumers can reuse it without hitting the lock again.
fn update_profiles_cache(snapshot: &IProfiles) {
*PROFILES_CACHE.write() = Some(CachedProfiles {
snapshot: snapshot.clone(),
captured_at: Instant::now(),
});
}
/// Return the cached snapshot and how old it is, if present.
fn cached_profiles_snapshot() -> Option<(IProfiles, u128)> {
PROFILES_CACHE.read().as_ref().map(|entry| {
(
entry.snapshot.clone(),
entry.captured_at.elapsed().as_millis(),
)
})
}
/// Return the latest profiles snapshot, preferring cached data so UI requests never block.
#[tauri::command]
pub async fn get_profiles() -> CmdResult<IProfiles> {
let started_at = Instant::now();
// Resolve snapshots in three tiers so UI reads never stall on a mutex:
// 1) try a non-blocking read, 2) fall back to the last cached copy while a
// writer holds the lock, 3) block and refresh the cache as a final resort.
if let Some(snapshot) = read_profiles_snapshot_nonblocking().await {
let item_count = snapshot
.items
.as_ref()
.map(|items| items.len())
.unwrap_or(0);
update_profiles_cache(&snapshot);
SNAPSHOT_METRICS.fast_hits.fetch_add(1, Ordering::Relaxed);
logging!(
debug,
Type::Cmd,
"[Profiles] Snapshot served (path=fast, items={}, elapsed={}ms)",
item_count,
started_at.elapsed().as_millis()
);
maybe_log_snapshot_metrics();
return Ok(snapshot);
}
if let Some((cached, age_ms)) = cached_profiles_snapshot() {
SNAPSHOT_METRICS.cache_hits.fetch_add(1, Ordering::Relaxed);
logging!(
debug,
Type::Cmd,
"[Profiles] Served cached snapshot while lock busy (age={}ms)",
age_ms
);
schedule_profiles_snapshot_refresh();
maybe_log_snapshot_metrics();
return Ok(cached);
}
let snapshot = read_profiles_snapshot_blocking().await;
let item_count = snapshot
.items
.as_ref()
.map(|items| items.len())
.unwrap_or(0);
update_profiles_cache(&snapshot);
SNAPSHOT_METRICS
.blocking_hits
.fetch_add(1, Ordering::Relaxed);
logging!(
debug,
Type::Cmd,
"[Profiles] Snapshot served (path=blocking, items={}, elapsed={}ms)",
item_count,
started_at.elapsed().as_millis()
);
maybe_log_snapshot_metrics();
Ok(snapshot)
}
/// Try to grab the latest profile data without waiting for the writer.
async fn read_profiles_snapshot_nonblocking() -> Option<IProfiles> {
let profiles = Config::profiles().await;
profiles.try_latest_ref().map(|guard| (**guard).clone())
}
/// Fall back to a blocking read when we absolutely must have fresh data.
async fn read_profiles_snapshot_blocking() -> IProfiles {
let profiles = Config::profiles().await;
let guard = profiles.latest_ref();
(**guard).clone()
}
/// Schedule a background cache refresh once the exclusive lock becomes available again.
fn schedule_profiles_snapshot_refresh() {
crate::process::AsyncHandler::spawn(|| async {
// Once the lock is released we refresh the cached snapshot so the next
// request observes the latest data instead of the stale fallback.
SNAPSHOT_METRICS
.refresh_scheduled
.fetch_add(1, Ordering::Relaxed);
let snapshot = read_profiles_snapshot_blocking().await;
update_profiles_cache(&snapshot);
logging!(
debug,
Type::Cmd,
"[Profiles] Cache refreshed after busy snapshot"
);
});
}
fn maybe_log_snapshot_metrics() {
const LOG_INTERVAL_MS: u64 = 5_000;
let now_ms = current_millis();
let last_ms = SNAPSHOT_METRICS.last_log_ms.load(Ordering::Relaxed);
if now_ms.saturating_sub(last_ms) < LOG_INTERVAL_MS {
return;
}
if SNAPSHOT_METRICS
.last_log_ms
.compare_exchange(last_ms, now_ms, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
return;
}
let fast = SNAPSHOT_METRICS.fast_hits.swap(0, Ordering::SeqCst);
let cache = SNAPSHOT_METRICS.cache_hits.swap(0, Ordering::SeqCst);
let blocking = SNAPSHOT_METRICS.blocking_hits.swap(0, Ordering::SeqCst);
let refresh = SNAPSHOT_METRICS.refresh_scheduled.swap(0, Ordering::SeqCst);
if fast == 0 && cache == 0 && blocking == 0 && refresh == 0 {
return;
}
logging!(
debug,
Type::Cmd,
"[Profiles][Metrics] 5s window => fast={}, cache={}, blocking={}, refresh_jobs={}",
fast,
cache,
blocking,
refresh
);
}
fn current_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
/// Run the optional enhancement pipeline and refresh Clash when it completes.
#[tauri::command]
pub async fn enhance_profiles() -> CmdResult {
match feat::enhance_profiles().await {
@@ -93,79 +206,106 @@ pub async fn enhance_profiles() -> CmdResult {
Ok(())
}
/// 导入配置文件
/// Download a profile from the given URL and persist it to the local catalog.
#[tauri::command]
pub async fn import_profile(url: std::string::String, option: Option<PrfOption>) -> CmdResult {
logging!(info, Type::Cmd, "[导入订阅] 开始导入: {}", url);
logging!(info, Type::Cmd, "[Profile Import] Begin: {}", url);
// 直接依赖 PrfItem::from_url 自身的超时/重试逻辑,不再使用 tokio::time::timeout 包裹
// Rely on PrfItem::from_url internal timeout/retry logic instead of wrapping with tokio::time::timeout
let item = match PrfItem::from_url(&url, None, None, option).await {
Ok(it) => {
logging!(info, Type::Cmd, "[导入订阅] 下载完成,开始保存配置");
logging!(
info,
Type::Cmd,
"[Profile Import] Download complete; saving configuration"
);
it
}
Err(e) => {
logging!(error, Type::Cmd, "[导入订阅] 下载失败: {}", e);
return Err(format!("导入订阅失败: {}", e).into());
logging!(error, Type::Cmd, "[Profile Import] Download failed: {}", e);
return Err(format!("Profile import failed: {}", e).into());
}
};
match profiles_append_item_safe(item.clone()).await {
Ok(_) => match profiles_save_file_safe().await {
Ok(_) => {
logging!(info, Type::Cmd, "[导入订阅] 配置文件保存成功");
logging!(
info,
Type::Cmd,
"[Profile Import] Configuration file saved successfully"
);
}
Err(e) => {
logging!(error, Type::Cmd, "[导入订阅] 保存配置文件失败: {}", e);
logging!(
error,
Type::Cmd,
"[Profile Import] Failed to save configuration file: {}",
e
);
}
},
Err(e) => {
logging!(error, Type::Cmd, "[导入订阅] 保存配置失败: {}", e);
return Err(format!("导入订阅失败: {}", e).into());
logging!(
error,
Type::Cmd,
"[Profile Import] Failed to persist configuration: {}",
e
);
return Err(format!("Profile import failed: {}", e).into());
}
}
// 立即发送配置变更通知
// Immediately emit a configuration change notification
if let Some(uid) = &item.uid {
logging!(info, Type::Cmd, "[导入订阅] 发送配置变更通知: {}", uid);
logging!(
info,
Type::Cmd,
"[Profile Import] Emitting configuration change event: {}",
uid
);
handle::Handle::notify_profile_changed(uid.clone());
}
// 异步保存配置文件并发送全局通知
// Save configuration asynchronously and emit a global notification
let uid_clone = item.uid.clone();
if let Some(uid) = uid_clone {
// 延迟发送,确保文件已完全写入
// Delay notification to ensure the file is fully written
tokio::time::sleep(Duration::from_millis(100)).await;
handle::Handle::notify_profile_changed(uid);
}
logging!(info, Type::Cmd, "[导入订阅] 导入完成: {}", url);
logging!(info, Type::Cmd, "[Profile Import] Completed: {}", url);
Ok(())
}
/// 调整profile的顺序
/// Move a profile in the list relative to another entry.
#[tauri::command]
pub async fn reorder_profile(active_id: String, over_id: String) -> CmdResult {
match profiles_reorder_safe(active_id, over_id).await {
Ok(_) => {
log::info!(target: "app", "重新排序配置文件");
log::info!(target: "app", "Reordered profiles");
Ok(())
}
Err(err) => {
log::error!(target: "app", "重新排序配置文件失败: {}", err);
Err(format!("重新排序配置文件失败: {}", err).into())
log::error!(target: "app", "Failed to reorder profiles: {}", err);
Err(format!("Failed to reorder profiles: {}", err).into())
}
}
}
/// 创建新的profile
/// 创建一个新的配置文件
/// Create a new profile entry and optionally write its backing file.
#[tauri::command]
pub async fn create_profile(item: PrfItem, file_data: Option<String>) -> CmdResult {
match profiles_append_item_with_filedata_safe(item.clone(), file_data).await {
Ok(_) => {
// 发送配置变更通知
// Emit configuration change notification
if let Some(uid) = &item.uid {
logging!(info, Type::Cmd, "[创建订阅] 发送配置变更通知: {}", uid);
logging!(
info,
Type::Cmd,
"[Profile Create] Emitting configuration change event: {}",
uid
);
handle::Handle::notify_profile_changed(uid.clone());
}
Ok(())
@@ -177,7 +317,7 @@ pub async fn create_profile(item: PrfItem, file_data: Option<String>) -> CmdResu
}
}
/// 更新配置文件
/// Force-refresh a profile from its remote source, if available.
#[tauri::command]
pub async fn update_profile(index: String, option: Option<PrfOption>) -> CmdResult {
match feat::update_profile(index, option, Some(true)).await {
@@ -189,11 +329,11 @@ pub async fn update_profile(index: String, option: Option<PrfOption>) -> CmdResu
}
}
/// 删除配置文件
/// Remove a profile and refresh the running configuration if necessary.
#[tauri::command]
pub async fn delete_profile(index: String) -> CmdResult {
println!("delete_profile: {}", index);
// 使用Send-safe helper函数
// Use send-safe helper function
let should_update = profiles_delete_item_safe(index.clone())
.await
.stringify_err()?;
@@ -203,8 +343,13 @@ pub async fn delete_profile(index: String) -> CmdResult {
match CoreManager::global().update_config().await {
Ok(_) => {
handle::Handle::refresh_clash();
// 发送配置变更通知
logging!(info, Type::Cmd, "[删除订阅] 发送配置变更通知: {}", index);
// Emit configuration change notification
logging!(
info,
Type::Cmd,
"[Profile Delete] Emitting configuration change event: {}",
index
);
handle::Handle::notify_profile_changed(index);
}
Err(e) => {
@@ -216,361 +361,28 @@ pub async fn delete_profile(index: String) -> CmdResult {
Ok(())
}
/// 验证新配置文件的语法
async fn validate_new_profile(new_profile: &String) -> Result<(), ()> {
logging!(info, Type::Cmd, "正在切换到新配置: {}", new_profile);
// 获取目标配置文件路径
let config_file_result = {
let profiles_config = Config::profiles().await;
let profiles_data = profiles_config.latest_ref();
match profiles_data.get_item(new_profile) {
Ok(item) => {
if let Some(file) = &item.file {
let path = dirs::app_profiles_dir().map(|dir| dir.join(file.as_str()));
path.ok()
} else {
None
}
}
Err(e) => {
logging!(error, Type::Cmd, "获取目标配置信息失败: {}", e);
None
}
}
};
// 如果获取到文件路径检查YAML语法
if let Some(file_path) = config_file_result {
if !file_path.exists() {
logging!(
error,
Type::Cmd,
"目标配置文件不存在: {}",
file_path.display()
);
handle::Handle::notice_message(
"config_validate::file_not_found",
format!("{}", file_path.display()),
);
return Err(());
}
// 超时保护
let file_read_result = tokio::time::timeout(
Duration::from_secs(5),
tokio::fs::read_to_string(&file_path),
)
.await;
match file_read_result {
Ok(Ok(content)) => {
let yaml_parse_result = AsyncHandler::spawn_blocking(move || {
serde_yaml_ng::from_str::<serde_yaml_ng::Value>(&content)
})
.await;
match yaml_parse_result {
Ok(Ok(_)) => {
logging!(info, Type::Cmd, "目标配置文件语法正确");
Ok(())
}
Ok(Err(err)) => {
let error_msg = format!(" {err}");
logging!(
error,
Type::Cmd,
"目标配置文件存在YAML语法错误:{}",
error_msg
);
handle::Handle::notice_message(
"config_validate::yaml_syntax_error",
error_msg.clone(),
);
Err(())
}
Err(join_err) => {
let error_msg = format!("YAML解析任务失败: {join_err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::yaml_parse_error",
error_msg.clone(),
);
Err(())
}
}
}
Ok(Err(err)) => {
let error_msg = format!("无法读取目标配置文件: {err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::file_read_error",
error_msg.clone(),
);
Err(())
}
Err(_) => {
let error_msg = "读取配置文件超时(5秒)".to_string();
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::file_read_timeout",
error_msg.clone(),
);
Err(())
}
}
} else {
Ok(())
}
}
/// 执行配置更新并处理结果
async fn restore_previous_profile(prev_profile: String) -> CmdResult<()> {
logging!(info, Type::Cmd, "尝试恢复到之前的配置: {}", prev_profile);
let restore_profiles = IProfiles {
current: Some(prev_profile),
items: None,
};
Config::profiles()
.await
.draft_mut()
.patch_config(restore_profiles)
.stringify_err()?;
Config::profiles().await.apply();
crate::process::AsyncHandler::spawn(|| async move {
if let Err(e) = profiles_save_file_safe().await {
log::warn!(target: "app", "异步保存恢复配置文件失败: {e}");
}
});
logging!(info, Type::Cmd, "成功恢复到之前的配置");
Ok(())
}
async fn handle_success(current_sequence: u64, current_value: Option<String>) -> CmdResult<bool> {
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
logging!(
info,
Type::Cmd,
"内核操作后发现更新的请求 (序列号: {} < {}),忽略当前结果",
current_sequence,
latest_sequence
);
Config::profiles().await.discard();
return Ok(false);
}
logging!(
info,
Type::Cmd,
"配置更新成功,序列号: {}",
current_sequence
);
Config::profiles().await.apply();
handle::Handle::refresh_clash();
if let Err(e) = Tray::global().update_tooltip().await {
log::warn!(target: "app", "异步更新托盘提示失败: {e}");
}
if let Err(e) = Tray::global().update_menu().await {
log::warn!(target: "app", "异步更新托盘菜单失败: {e}");
}
if let Err(e) = profiles_save_file_safe().await {
log::warn!(target: "app", "异步保存配置文件失败: {e}");
}
if let Some(current) = &current_value {
logging!(
info,
Type::Cmd,
"向前端发送配置变更事件: {}, 序列号: {}",
current,
current_sequence
);
handle::Handle::notify_profile_changed(current.clone());
}
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(true)
}
async fn handle_validation_failure(
error_msg: String,
current_profile: Option<String>,
) -> CmdResult<bool> {
logging!(warn, Type::Cmd, "配置验证失败: {}", error_msg);
Config::profiles().await.discard();
if let Some(prev_profile) = current_profile {
restore_previous_profile(prev_profile).await?;
}
handle::Handle::notice_message("config_validate::error", error_msg);
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(false)
}
async fn handle_update_error<E: std::fmt::Display>(e: E, current_sequence: u64) -> CmdResult<bool> {
logging!(
warn,
Type::Cmd,
"更新过程发生错误: {}, 序列号: {}",
e,
current_sequence
);
Config::profiles().await.discard();
handle::Handle::notice_message("config_validate::boot_error", e.to_string());
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(false)
}
async fn handle_timeout(current_profile: Option<String>, current_sequence: u64) -> CmdResult<bool> {
let timeout_msg = "配置更新超时(30秒),可能是配置验证或核心通信阻塞";
logging!(
error,
Type::Cmd,
"{}, 序列号: {}",
timeout_msg,
current_sequence
);
Config::profiles().await.discard();
if let Some(prev_profile) = current_profile {
restore_previous_profile(prev_profile).await?;
}
handle::Handle::notice_message("config_validate::timeout", timeout_msg);
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(false)
}
async fn perform_config_update(
current_sequence: u64,
current_value: Option<String>,
current_profile: Option<String>,
) -> CmdResult<bool> {
logging!(
info,
Type::Cmd,
"开始内核配置更新,序列号: {}",
current_sequence
);
let update_result = tokio::time::timeout(
Duration::from_secs(30),
CoreManager::global().update_config(),
)
.await;
match update_result {
Ok(Ok((true, _))) => handle_success(current_sequence, current_value).await,
Ok(Ok((false, error_msg))) => handle_validation_failure(error_msg, current_profile).await,
Ok(Err(e)) => handle_update_error(e, current_sequence).await,
Err(_) => handle_timeout(current_profile, current_sequence).await,
}
}
/// 修改profiles的配置
/// Apply partial profile list updates through the switching workflow.
#[tauri::command]
pub async fn patch_profiles_config(profiles: IProfiles) -> CmdResult<bool> {
if CURRENT_SWITCHING_PROFILE.load(Ordering::SeqCst) {
logging!(info, Type::Cmd, "当前正在切换配置,放弃请求");
return Ok(false);
}
CURRENT_SWITCHING_PROFILE.store(true, Ordering::SeqCst);
// 为当前请求分配序列号
let current_sequence = CURRENT_REQUEST_SEQUENCE.fetch_add(1, Ordering::SeqCst) + 1;
let target_profile = profiles.current.clone();
logging!(
info,
Type::Cmd,
"开始修改配置文件,请求序列号: {}, 目标profile: {:?}",
current_sequence,
target_profile
);
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
logging!(
info,
Type::Cmd,
"获取锁后发现更新的请求 (序列号: {} < {}),放弃当前请求",
current_sequence,
latest_sequence
);
return Ok(false);
}
// 保存当前配置,以便在验证失败时恢复
let current_profile = Config::profiles().await.latest_ref().current.clone();
logging!(info, Type::Cmd, "当前配置: {:?}", current_profile);
// 如果要切换配置,先检查目标配置文件是否有语法错误
if let Some(new_profile) = profiles.current.as_ref()
&& current_profile.as_ref() != Some(new_profile)
&& validate_new_profile(new_profile).await.is_err()
{
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
return Ok(false);
}
// 检查请求有效性
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
logging!(
info,
Type::Cmd,
"在核心操作前发现更新的请求 (序列号: {} < {}),放弃当前请求",
current_sequence,
latest_sequence
);
return Ok(false);
}
// 更新profiles配置
logging!(
info,
Type::Cmd,
"正在更新配置草稿,序列号: {}",
current_sequence
);
let current_value = profiles.current.clone();
let _ = Config::profiles().await.draft_mut().patch_config(profiles);
// 在调用内核前再次验证请求有效性
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
logging!(
info,
Type::Cmd,
"在内核交互前发现更新的请求 (序列号: {} < {}),放弃当前请求",
current_sequence,
latest_sequence
);
Config::profiles().await.discard();
return Ok(false);
}
perform_config_update(current_sequence, current_value, current_profile).await
profile_switch::patch_profiles_config(profiles).await
}
/// 根据profile name修改profiles
/// Switch to the provided profile index and wait for completion before returning.
#[tauri::command]
pub async fn patch_profiles_config_by_profile_index(profile_index: String) -> CmdResult<bool> {
logging!(info, Type::Cmd, "切换配置到: {}", profile_index);
let profiles = IProfiles {
current: Some(profile_index),
items: None,
};
patch_profiles_config(profiles).await
profile_switch::patch_profiles_config_by_profile_index(profile_index).await
}
/// 修改某个profile item的
/// Enqueue a profile switch request and optionally notify on success.
#[tauri::command]
pub async fn switch_profile(profile_index: String, notify_success: bool) -> CmdResult<bool> {
profile_switch::switch_profile(profile_index, notify_success).await
}
/// Update a specific profile item and refresh timers if its schedule changed.
#[tauri::command]
pub async fn patch_profile(index: String, profile: PrfItem) -> CmdResult {
// 保存修改前检查是否有更新 update_interval
// Check for update_interval changes before saving
let profiles = Config::profiles().await;
let should_refresh_timer = if let Ok(old_profile) = profiles.latest_ref().get_item(&index) {
let old_interval = old_profile.option.as_ref().and_then(|o| o.update_interval);
@@ -589,15 +401,19 @@ pub async fn patch_profile(index: String, profile: PrfItem) -> CmdResult {
.await
.stringify_err()?;
// 如果更新间隔或允许自动更新变更,异步刷新定时器
// If the interval or auto-update flag changes, refresh the timer asynchronously
if should_refresh_timer {
let index_clone = index.clone();
crate::process::AsyncHandler::spawn(move || async move {
logging!(info, Type::Timer, "定时器更新间隔已变更,正在刷新定时器...");
logging!(
info,
Type::Timer,
"Timer interval changed; refreshing timer..."
);
if let Err(e) = crate::core::Timer::global().refresh().await {
logging!(error, Type::Timer, "刷新定时器失败: {}", e);
logging!(error, Type::Timer, "Failed to refresh timer: {}", e);
} else {
// 刷新成功后发送自定义事件,不触发配置重载
// After refreshing successfully, emit a custom event without triggering a reload
crate::core::handle::Handle::notify_timer_updated(index_clone);
}
});
@@ -606,7 +422,7 @@ pub async fn patch_profile(index: String, profile: PrfItem) -> CmdResult {
Ok(())
}
/// 查看配置文件
/// Open the profile file in the system viewer.
#[tauri::command]
pub async fn view_profile(index: String) -> CmdResult {
let profiles = Config::profiles().await;
@@ -628,7 +444,7 @@ pub async fn view_profile(index: String) -> CmdResult {
help::open_file(path).stringify_err()
}
/// 读取配置文件内容
/// Return the raw YAML contents for the given profile file.
#[tauri::command]
pub async fn read_profile_file(index: String) -> CmdResult<String> {
let profiles = Config::profiles().await;
@@ -638,10 +454,22 @@ pub async fn read_profile_file(index: String) -> CmdResult<String> {
Ok(data)
}
/// 获取下一次更新时间
/// Report the scheduled refresh timestamp (if any) for the profile timer.
#[tauri::command]
pub async fn get_next_update_time(uid: String) -> CmdResult<Option<i64>> {
let timer = Timer::global();
let next_time = timer.get_next_update_time(&uid).await;
Ok(next_time)
}
/// Return the latest driver snapshot describing active and queued switch tasks.
#[tauri::command]
pub async fn get_profile_switch_status() -> CmdResult<ProfileSwitchStatus> {
profile_switch::get_switch_status()
}
/// Fetch switch result events newer than the provided sequence number.
#[tauri::command]
pub async fn get_profile_switch_events(after_sequence: u64) -> CmdResult<Vec<SwitchResultEvent>> {
profile_switch::get_switch_events(after_sequence)
}

View File

@@ -0,0 +1,683 @@
use super::{
CmdResult,
state::{
ProfileSwitchStatus, SwitchCancellation, SwitchManager, SwitchRequest, SwitchResultStatus,
SwitchTaskStatus, current_millis, manager,
},
workflow::{self, SwitchPanicInfo, SwitchStage},
};
use crate::{logging, utils::logging::Type};
use futures::FutureExt;
use once_cell::sync::OnceCell;
use smartstring::alias::String as SmartString;
use std::{
collections::{HashMap, VecDeque},
panic::AssertUnwindSafe,
time::Duration,
};
use tokio::{
sync::{
Mutex as AsyncMutex,
mpsc::{self, error::TrySendError},
oneshot,
},
time::{self, MissedTickBehavior},
};
// Single shared queue so profile switches are executed sequentially and can
// collapse redundant requests for the same profile.
const SWITCH_QUEUE_CAPACITY: usize = 32;
static SWITCH_QUEUE: OnceCell<mpsc::Sender<SwitchDriverMessage>> = OnceCell::new();
type CompletionRegistry = AsyncMutex<HashMap<u64, oneshot::Sender<SwitchResultStatus>>>;
static SWITCH_COMPLETION_WAITERS: OnceCell<CompletionRegistry> = OnceCell::new();
/// Global map of task id -> completion channel sender used when callers await the result.
fn completion_waiters() -> &'static CompletionRegistry {
SWITCH_COMPLETION_WAITERS.get_or_init(|| AsyncMutex::new(HashMap::new()))
}
/// Register a oneshot sender so `switch_profile_and_wait` can be notified when its task finishes.
async fn register_completion_waiter(task_id: u64) -> oneshot::Receiver<SwitchResultStatus> {
let (sender, receiver) = oneshot::channel();
let mut guard = completion_waiters().lock().await;
if guard.insert(task_id, sender).is_some() {
logging!(
warn,
Type::Cmd,
"Replacing existing completion waiter for task {}",
task_id
);
}
receiver
}
/// Remove an outstanding completion waiter; used when enqueue fails or succeeds immediately.
async fn remove_completion_waiter(task_id: u64) -> Option<oneshot::Sender<SwitchResultStatus>> {
completion_waiters().lock().await.remove(&task_id)
}
/// Fire-and-forget notify helper so we do not block the driver loop.
fn notify_completion_waiter(task_id: u64, result: SwitchResultStatus) {
tokio::spawn(async move {
let sender = completion_waiters().lock().await.remove(&task_id);
if let Some(sender) = sender {
let _ = sender.send(result);
}
});
}
const WATCHDOG_TIMEOUT: Duration = Duration::from_secs(5);
const WATCHDOG_TICK: Duration = Duration::from_millis(500);
// Mutable snapshot of the driver's world; all mutations happen on the driver task.
#[derive(Debug, Default)]
struct SwitchDriverState {
active: Option<SwitchRequest>,
queue: VecDeque<SwitchRequest>,
latest_tokens: HashMap<SmartString, SwitchCancellation>,
cleanup_profiles: HashMap<SmartString, tokio::task::JoinHandle<()>>,
last_result: Option<SwitchResultStatus>,
}
// Messages passed through SWITCH_QUEUE so the driver can react to events in order.
#[derive(Debug)]
enum SwitchDriverMessage {
Request {
request: SwitchRequest,
respond_to: oneshot::Sender<bool>,
},
Completion {
request: SwitchRequest,
outcome: SwitchJobOutcome,
},
CleanupDone {
profile: SmartString,
},
}
#[derive(Debug)]
enum SwitchJobOutcome {
Completed {
success: bool,
cleanup: workflow::CleanupHandle,
},
Panicked {
info: SwitchPanicInfo,
cleanup: workflow::CleanupHandle,
},
}
pub(super) async fn switch_profile(
profile_index: impl Into<SmartString>,
notify_success: bool,
) -> CmdResult<bool> {
switch_profile_impl(profile_index.into(), notify_success, false).await
}
pub(super) async fn switch_profile_and_wait(
profile_index: impl Into<SmartString>,
notify_success: bool,
) -> CmdResult<bool> {
switch_profile_impl(profile_index.into(), notify_success, true).await
}
async fn switch_profile_impl(
profile_index: SmartString,
notify_success: bool,
wait_for_completion: bool,
) -> CmdResult<bool> {
// wait_for_completion is used by CLI flows that must block until the switch finishes.
let manager = manager();
let sender = switch_driver_sender();
let request = SwitchRequest::new(
manager.next_task_id(),
profile_index.clone(),
notify_success,
);
logging!(
info,
Type::Cmd,
"Queue profile switch task {} -> {} (notify={})",
request.task_id(),
profile_index,
notify_success
);
let task_id = request.task_id();
let mut completion_rx = if wait_for_completion {
Some(register_completion_waiter(task_id).await)
} else {
None
};
let (tx, rx) = oneshot::channel();
let enqueue_result = match sender.try_send(SwitchDriverMessage::Request {
request,
respond_to: tx,
}) {
Ok(_) => match rx.await {
Ok(result) => Ok(result),
Err(err) => {
logging!(
error,
Type::Cmd,
"Failed to receive enqueue result for profile {}: {}",
profile_index,
err
);
Err("switch profile queue unavailable".into())
}
},
Err(TrySendError::Full(msg)) => {
logging!(
warn,
Type::Cmd,
"Profile switch queue is full; waiting for space: {}",
profile_index
);
match sender.send(msg).await {
Ok(_) => match rx.await {
Ok(result) => Ok(result),
Err(err) => {
logging!(
error,
Type::Cmd,
"Failed to receive enqueue result after wait for {}: {}",
profile_index,
err
);
Err("switch profile queue unavailable".into())
}
},
Err(err) => {
logging!(
error,
Type::Cmd,
"Profile switch queue closed while waiting ({}): {}",
profile_index,
err
);
Err("switch profile queue unavailable".into())
}
}
}
Err(TrySendError::Closed(_)) => {
logging!(
error,
Type::Cmd,
"Profile switch queue is closed, cannot enqueue: {}",
profile_index
);
Err("switch profile queue unavailable".into())
}
};
let accepted = match enqueue_result {
Ok(result) => result,
Err(err) => {
if completion_rx.is_some() {
remove_completion_waiter(task_id).await;
}
return Err(err);
}
};
if !accepted {
if completion_rx.is_some() {
remove_completion_waiter(task_id).await;
}
return Ok(false);
}
if let Some(rx_completion) = completion_rx.take() {
match rx_completion.await {
Ok(status) => Ok(status.success),
Err(err) => {
logging!(
error,
Type::Cmd,
"Switch task {} completion channel dropped: {}",
task_id,
err
);
Err("profile switch completion unavailable".into())
}
}
} else {
Ok(true)
}
}
fn switch_driver_sender() -> &'static mpsc::Sender<SwitchDriverMessage> {
SWITCH_QUEUE.get_or_init(|| {
let (tx, rx) = mpsc::channel::<SwitchDriverMessage>(SWITCH_QUEUE_CAPACITY);
let driver_tx = tx.clone();
tokio::spawn(async move {
let manager = manager();
let driver = SwitchDriver::new(manager, driver_tx);
driver.run(rx).await;
});
tx
})
}
struct SwitchDriver {
manager: &'static SwitchManager,
sender: mpsc::Sender<SwitchDriverMessage>,
state: SwitchDriverState,
}
impl SwitchDriver {
fn new(manager: &'static SwitchManager, sender: mpsc::Sender<SwitchDriverMessage>) -> Self {
let state = SwitchDriverState::default();
manager.set_status(state.snapshot(manager));
Self {
manager,
sender,
state,
}
}
async fn run(mut self, mut rx: mpsc::Receiver<SwitchDriverMessage>) {
while let Some(message) = rx.recv().await {
match message {
SwitchDriverMessage::Request {
request,
respond_to,
} => {
self.handle_enqueue(request, respond_to);
}
SwitchDriverMessage::Completion { request, outcome } => {
self.handle_completion(request, outcome);
}
SwitchDriverMessage::CleanupDone { profile } => {
self.handle_cleanup_done(profile);
}
}
}
}
fn handle_enqueue(&mut self, request: SwitchRequest, respond_to: oneshot::Sender<bool>) {
// Each new request supersedes older ones for the same profile to avoid thrashing the core.
let mut responder = Some(respond_to);
let accepted = true;
let profile_key = request.profile_id().clone();
let cleanup_pending =
self.state.active.is_none() && !self.state.cleanup_profiles.is_empty();
if cleanup_pending && self.state.cleanup_profiles.contains_key(&profile_key) {
logging!(
debug,
Type::Cmd,
"Cleanup running for {}; queueing switch task {} -> {} to run afterwards",
profile_key,
request.task_id(),
profile_key
);
if let Some(previous) = self
.state
.latest_tokens
.insert(profile_key.clone(), request.cancel_token().clone())
{
previous.cancel();
}
self.state
.queue
.retain(|queued| queued.profile_id() != &profile_key);
self.state.queue.push_back(request);
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
self.publish_status();
return;
}
if cleanup_pending {
logging!(
debug,
Type::Cmd,
"Cleanup running for {} profile(s); queueing task {} -> {} to run after cleanup without clearing existing requests",
self.state.cleanup_profiles.len(),
request.task_id(),
profile_key
);
}
if let Some(previous) = self
.state
.latest_tokens
.insert(profile_key.clone(), request.cancel_token().clone())
{
previous.cancel();
}
if let Some(active) = self.state.active.as_mut()
&& active.profile_id() == &profile_key
{
active.cancel_token().cancel();
active.merge_notify(request.notify());
self.state
.queue
.retain(|queued| queued.profile_id() != &profile_key);
self.state.queue.push_front(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
self.publish_status();
return;
}
if let Some(active) = self.state.active.as_ref() {
logging!(
debug,
Type::Cmd,
"Cancelling active switch task {} (profile={}) in favour of task {} -> {}",
active.task_id(),
active.profile_id(),
request.task_id(),
profile_key
);
active.cancel_token().cancel();
}
self.state
.queue
.retain(|queued| queued.profile_id() != &profile_key);
self.state.queue.push_back(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
self.start_next_job();
self.publish_status();
}
fn handle_completion(&mut self, request: SwitchRequest, outcome: SwitchJobOutcome) {
// Translate the workflow result into an event the frontend can understand.
let result_record = match &outcome {
SwitchJobOutcome::Completed { success, .. } => {
logging!(
info,
Type::Cmd,
"Switch task {} completed (success={})",
request.task_id(),
success
);
if *success {
SwitchResultStatus::success(request.task_id(), request.profile_id())
} else {
SwitchResultStatus::failed(request.task_id(), request.profile_id(), None, None)
}
}
SwitchJobOutcome::Panicked { info, .. } => {
logging!(
error,
Type::Cmd,
"Switch task {} panicked at stage {:?}: {}",
request.task_id(),
info.stage,
info.detail
);
SwitchResultStatus::failed(
request.task_id(),
request.profile_id(),
Some(format!("{:?}", info.stage)),
Some(info.detail.clone()),
)
}
};
if let Some(active) = self.state.active.as_ref()
&& active.task_id() == request.task_id()
{
self.state.active = None;
}
if let Some(latest) = self.state.latest_tokens.get(request.profile_id())
&& latest.same_token(request.cancel_token())
{
self.state.latest_tokens.remove(request.profile_id());
}
let cleanup = match outcome {
SwitchJobOutcome::Completed { cleanup, .. } => cleanup,
SwitchJobOutcome::Panicked { cleanup, .. } => cleanup,
};
self.track_cleanup(request.profile_id().clone(), cleanup);
let event_record = result_record.clone();
self.state.last_result = Some(result_record);
notify_completion_waiter(request.task_id(), event_record.clone());
self.manager.push_event(event_record);
self.start_next_job();
self.publish_status();
}
fn handle_cleanup_done(&mut self, profile: SmartString) {
if let Some(handle) = self.state.cleanup_profiles.remove(&profile) {
handle.abort();
}
self.start_next_job();
self.publish_status();
}
fn start_next_job(&mut self) {
if self.state.active.is_some() || !self.state.cleanup_profiles.is_empty() {
self.publish_status();
return;
}
while let Some(request) = self.state.queue.pop_front() {
if request.cancel_token().is_cancelled() {
self.discard_request(request);
continue;
}
self.state.active = Some(request.clone());
self.start_switch_job(request);
break;
}
self.publish_status();
}
fn track_cleanup(&mut self, profile: SmartString, cleanup: workflow::CleanupHandle) {
if let Some(existing) = self.state.cleanup_profiles.remove(&profile) {
existing.abort();
}
let driver_tx = self.sender.clone();
let profile_clone = profile.clone();
let handle = tokio::spawn(async move {
let profile_label = profile_clone.clone();
if let Err(err) = cleanup.await {
logging!(
warn,
Type::Cmd,
"Cleanup task for profile {} failed: {}",
profile_label.as_str(),
err
);
}
if let Err(err) = driver_tx
.send(SwitchDriverMessage::CleanupDone {
profile: profile_clone,
})
.await
{
logging!(
error,
Type::Cmd,
"Failed to push cleanup completion for profile {}: {}",
profile_label.as_str(),
err
);
}
});
self.state.cleanup_profiles.insert(profile, handle);
}
fn start_switch_job(&self, request: SwitchRequest) {
// Run the workflow in a background task while the driver keeps processing messages.
let driver_tx = self.sender.clone();
let manager = self.manager;
let completion_request = request.clone();
let heartbeat = request.heartbeat().clone();
let cancel_token = request.cancel_token().clone();
let task_id = request.task_id();
let profile_label = request.profile_id().clone();
tokio::spawn(async move {
let mut watchdog_interval = time::interval(WATCHDOG_TICK);
watchdog_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let workflow_fut =
AssertUnwindSafe(workflow::run_switch_job(manager, request)).catch_unwind();
tokio::pin!(workflow_fut);
let job_result = loop {
tokio::select! {
res = workflow_fut.as_mut() => {
break match res {
Ok(Ok(result)) => SwitchJobOutcome::Completed {
success: result.success,
cleanup: result.cleanup,
},
Ok(Err(error)) => SwitchJobOutcome::Panicked {
info: error.info,
cleanup: error.cleanup,
},
Err(payload) => {
let info = SwitchPanicInfo::driver_task(
workflow::describe_panic_payload(payload.as_ref()),
);
let cleanup = workflow::schedule_post_switch_failure(
profile_label.clone(),
completion_request.notify(),
completion_request.task_id(),
);
SwitchJobOutcome::Panicked { info, cleanup }
}
};
}
_ = watchdog_interval.tick() => {
if cancel_token.is_cancelled() {
continue;
}
let elapsed = heartbeat.elapsed();
if elapsed > WATCHDOG_TIMEOUT {
let stage = SwitchStage::from_code(heartbeat.stage_code())
.unwrap_or(SwitchStage::Workflow);
logging!(
warn,
Type::Cmd,
"Switch task {} watchdog timeout (profile={} stage={:?}, elapsed={:?}); cancelling",
task_id,
profile_label.as_str(),
stage,
elapsed
);
cancel_token.cancel();
}
}
}
};
let request_for_error = completion_request.clone();
if let Err(err) = driver_tx
.send(SwitchDriverMessage::Completion {
request: completion_request,
outcome: job_result,
})
.await
{
logging!(
error,
Type::Cmd,
"Failed to push switch completion to driver: {}",
err
);
notify_completion_waiter(
request_for_error.task_id(),
SwitchResultStatus::failed(
request_for_error.task_id(),
request_for_error.profile_id(),
Some("driver".to_string()),
Some(format!("completion dispatch failed: {}", err)),
),
);
}
});
}
/// Mark a request as failed because a newer request superseded it.
fn discard_request(&mut self, request: SwitchRequest) {
let key = request.profile_id().clone();
let should_remove = self
.state
.latest_tokens
.get(&key)
.map(|latest| latest.same_token(request.cancel_token()))
.unwrap_or(false);
if should_remove {
self.state.latest_tokens.remove(&key);
}
if !request.cancel_token().is_cancelled() {
request.cancel_token().cancel();
}
let event = SwitchResultStatus::cancelled(
request.task_id(),
request.profile_id(),
Some("request superseded".to_string()),
);
self.state.last_result = Some(event.clone());
notify_completion_waiter(request.task_id(), event.clone());
self.manager.push_event(event);
}
fn publish_status(&self) {
self.manager.set_status(self.state.snapshot(self.manager));
}
}
impl SwitchDriverState {
/// Lightweight struct suitable for sharing across the command boundary.
fn snapshot(&self, manager: &SwitchManager) -> ProfileSwitchStatus {
let active = self
.active
.as_ref()
.map(|req| SwitchTaskStatus::from_request(req, false));
let queue = self
.queue
.iter()
.map(|req| SwitchTaskStatus::from_request(req, true))
.collect::<Vec<_>>();
let cleanup_profiles = self
.cleanup_profiles
.keys()
.map(|key| key.to_string())
.collect::<Vec<_>>();
ProfileSwitchStatus {
is_switching: manager.is_switching(),
active,
queue,
cleanup_profiles,
last_result: self.last_result.clone(),
last_updated: current_millis(),
}
}
}

View File

@@ -0,0 +1,34 @@
// Profile switch orchestration: plumbing between the public tauri commands,
// the async driver queue, validation helpers, and the state machine workflow.
mod driver;
mod state;
mod validation;
mod workflow;
pub use state::{ProfileSwitchStatus, SwitchResultEvent};
use smartstring::alias::String;
use super::CmdResult;
pub(super) async fn patch_profiles_config(profiles: crate::config::IProfiles) -> CmdResult<bool> {
workflow::patch_profiles_config(profiles).await
}
pub(super) async fn patch_profiles_config_by_profile_index(
profile_index: String,
) -> CmdResult<bool> {
driver::switch_profile_and_wait(profile_index, false).await
}
pub(super) async fn switch_profile(profile_index: String, notify_success: bool) -> CmdResult<bool> {
driver::switch_profile(profile_index, notify_success).await
}
pub(super) fn get_switch_status() -> CmdResult<ProfileSwitchStatus> {
Ok(state::manager().status_snapshot())
}
pub(super) fn get_switch_events(after_sequence: u64) -> CmdResult<Vec<SwitchResultEvent>> {
Ok(state::manager().events_after(after_sequence))
}

View File

@@ -0,0 +1,353 @@
use once_cell::sync::OnceCell;
use parking_lot::RwLock;
use serde::Serialize;
use smartstring::alias::String as SmartString;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Notify};
pub(super) const SWITCH_JOB_TIMEOUT: Duration = Duration::from_secs(30);
pub(super) const SWITCH_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
static SWITCH_MANAGER: OnceCell<SwitchManager> = OnceCell::new();
pub(super) fn manager() -> &'static SwitchManager {
SWITCH_MANAGER.get_or_init(SwitchManager::default)
}
#[derive(Debug)]
// Central coordination point shared between the driver and workflow state machine.
pub(super) struct SwitchManager {
core_mutex: Mutex<()>,
request_sequence: AtomicU64,
switching: AtomicBool,
task_sequence: AtomicU64,
status: RwLock<ProfileSwitchStatus>,
event_sequence: AtomicU64,
recent_events: RwLock<VecDeque<SwitchResultEvent>>,
}
impl Default for SwitchManager {
fn default() -> Self {
Self {
core_mutex: Mutex::new(()),
request_sequence: AtomicU64::new(0),
switching: AtomicBool::new(false),
task_sequence: AtomicU64::new(0),
status: RwLock::new(ProfileSwitchStatus::default()),
event_sequence: AtomicU64::new(0),
recent_events: RwLock::new(VecDeque::with_capacity(32)),
}
}
}
impl SwitchManager {
pub(super) fn core_mutex(&self) -> &Mutex<()> {
&self.core_mutex
}
// Monotonic identifiers so logs can correlate enqueue/finish pairs.
pub(super) fn next_task_id(&self) -> u64 {
self.task_sequence.fetch_add(1, Ordering::SeqCst) + 1
}
/// Sequence id assigned to each enqueue request so we can spot stale work.
pub(super) fn next_request_sequence(&self) -> u64 {
self.request_sequence.fetch_add(1, Ordering::SeqCst) + 1
}
pub(super) fn latest_request_sequence(&self) -> u64 {
self.request_sequence.load(Ordering::SeqCst)
}
pub(super) fn begin_switch(&'static self) -> SwitchScope<'static> {
self.switching.store(true, Ordering::SeqCst);
SwitchScope { manager: self }
}
pub(super) fn is_switching(&self) -> bool {
self.switching.load(Ordering::SeqCst)
}
pub(super) fn set_status(&self, status: ProfileSwitchStatus) {
*self.status.write() = status;
}
pub(super) fn status_snapshot(&self) -> ProfileSwitchStatus {
self.status.read().clone()
}
pub(super) fn push_event(&self, result: SwitchResultStatus) {
const MAX_EVENTS: usize = 64;
let sequence = self.event_sequence.fetch_add(1, Ordering::SeqCst) + 1;
let mut guard = self.recent_events.write();
if guard.len() == MAX_EVENTS {
guard.pop_front();
}
guard.push_back(SwitchResultEvent { sequence, result });
}
pub(super) fn events_after(&self, sequence: u64) -> Vec<SwitchResultEvent> {
self.recent_events
.read()
.iter()
.filter(|event| event.sequence > sequence)
.cloned()
.collect()
}
}
pub(super) struct SwitchScope<'a> {
manager: &'a SwitchManager,
}
impl Drop for SwitchScope<'_> {
fn drop(&mut self) {
self.manager.switching.store(false, Ordering::SeqCst);
}
}
#[derive(Debug, Clone)]
pub(super) struct SwitchCancellation {
flag: Arc<AtomicBool>,
notify: Arc<Notify>,
}
impl SwitchCancellation {
pub(super) fn new() -> Self {
Self {
flag: Arc::new(AtomicBool::new(false)),
notify: Arc::new(Notify::new()),
}
}
pub(super) fn cancel(&self) {
self.flag.store(true, Ordering::SeqCst);
self.notify.notify_waiters();
}
/// True if another request already cancelled this job.
pub(super) fn is_cancelled(&self) -> bool {
self.flag.load(Ordering::SeqCst)
}
pub(super) fn same_token(&self, other: &SwitchCancellation) -> bool {
Arc::ptr_eq(&self.flag, &other.flag)
}
pub(super) async fn cancelled_future(&self) {
// Used by async blocks that want to pause until a newer request pre-empts them.
if self.is_cancelled() {
return;
}
self.notify.notified().await;
}
}
#[derive(Debug, Clone)]
pub(super) struct SwitchRequest {
task_id: u64,
profile_id: SmartString,
notify: bool,
cancel_token: SwitchCancellation,
heartbeat: SwitchHeartbeat,
}
impl SwitchRequest {
pub(super) fn new(task_id: u64, profile_id: SmartString, notify: bool) -> Self {
Self {
task_id,
profile_id,
notify,
cancel_token: SwitchCancellation::new(),
heartbeat: SwitchHeartbeat::new(),
}
}
pub(super) fn task_id(&self) -> u64 {
self.task_id
}
pub(super) fn profile_id(&self) -> &SmartString {
&self.profile_id
}
pub(super) fn notify(&self) -> bool {
self.notify
}
pub(super) fn merge_notify(&mut self, notify: bool) {
// When a new request wants a toast, remember it even if an older request did not.
if notify {
self.notify = true;
}
}
pub(super) fn cancel_token(&self) -> &SwitchCancellation {
&self.cancel_token
}
pub(super) fn heartbeat(&self) -> &SwitchHeartbeat {
&self.heartbeat
}
}
#[derive(Debug, Clone)]
pub(super) struct SwitchHeartbeat {
last_tick_millis: Arc<AtomicU64>,
stage_code: Arc<AtomicU32>,
}
fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
#[derive(Debug, Clone, Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ProfileSwitchStatus {
pub is_switching: bool,
pub active: Option<SwitchTaskStatus>,
pub queue: Vec<SwitchTaskStatus>,
pub cleanup_profiles: Vec<String>,
pub last_result: Option<SwitchResultStatus>,
pub last_updated: u64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SwitchTaskStatus {
pub task_id: u64,
pub profile_id: String,
pub notify: bool,
pub stage: Option<u32>,
pub queued: bool,
}
impl SwitchTaskStatus {
pub(super) fn from_request(request: &SwitchRequest, queued: bool) -> Self {
Self {
task_id: request.task_id(),
profile_id: request.profile_id().to_string(),
notify: request.notify(),
stage: if queued {
None
} else {
Some(request.heartbeat().stage_code())
},
queued,
}
}
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SwitchResultStatus {
pub task_id: u64,
pub profile_id: String,
pub success: bool,
pub cancelled: bool,
pub finished_at: u64,
pub error_stage: Option<String>,
pub error_detail: Option<String>,
}
impl SwitchResultStatus {
pub(super) fn success(task_id: u64, profile_id: &SmartString) -> Self {
Self {
task_id,
profile_id: profile_id.to_string(),
success: true,
cancelled: false,
finished_at: now_millis(),
error_stage: None,
error_detail: None,
}
}
pub(super) fn failed(
task_id: u64,
profile_id: &SmartString,
stage: Option<String>,
detail: Option<String>,
) -> Self {
Self {
task_id,
profile_id: profile_id.to_string(),
success: false,
cancelled: false,
finished_at: now_millis(),
error_stage: stage,
error_detail: detail,
}
}
pub(super) fn cancelled(
task_id: u64,
profile_id: &SmartString,
detail: Option<String>,
) -> Self {
Self {
task_id,
profile_id: profile_id.to_string(),
success: false,
cancelled: true,
finished_at: now_millis(),
error_stage: Some("cancelled".to_string()),
error_detail: detail,
}
}
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SwitchResultEvent {
pub sequence: u64,
pub result: SwitchResultStatus,
}
pub(super) fn current_millis() -> u64 {
now_millis()
}
impl SwitchHeartbeat {
fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
pub(super) fn new() -> Self {
let heartbeat = Self {
last_tick_millis: Arc::new(AtomicU64::new(Self::now_millis())),
stage_code: Arc::new(AtomicU32::new(0)),
};
heartbeat.touch();
heartbeat
}
pub(super) fn touch(&self) {
self.last_tick_millis
.store(Self::now_millis(), Ordering::SeqCst);
}
/// Update the internal timer to reflect the amount of time since the last heartbeat.
pub(super) fn elapsed(&self) -> Duration {
let last = self.last_tick_millis.load(Ordering::SeqCst);
let now = Self::now_millis();
Duration::from_millis(now.saturating_sub(last))
}
pub(super) fn set_stage(&self, stage: u32) {
self.stage_code.store(stage, Ordering::SeqCst);
self.touch();
}
pub(super) fn stage_code(&self) -> u32 {
self.stage_code.load(Ordering::SeqCst)
}
}

View File

@@ -0,0 +1,113 @@
use crate::{
config::Config,
logging,
process::AsyncHandler,
utils::{dirs, logging::Type},
};
use serde_yaml_ng as serde_yaml;
use smartstring::alias::String;
use std::time::Duration;
use tokio::{fs as tokio_fs, time};
const YAML_READ_TIMEOUT: Duration = Duration::from_secs(5);
/// Verify that the requested profile exists locally and is well-formed before switching.
pub(super) async fn validate_switch_request(task_id: u64, profile_id: &str) -> Result<(), String> {
logging!(
info,
Type::Cmd,
"Validating profile switch task {} -> {}",
task_id,
profile_id
);
let profile_key: String = profile_id.into();
let (file_path, profile_type, is_current, remote_url) = {
let profiles_guard = Config::profiles().await;
let latest = profiles_guard.latest_ref();
let item = latest.get_item(&profile_key).map_err(|err| -> String {
format!("Target profile {} not found: {}", profile_id, err).into()
})?;
(
item.file.clone().map(|f| f.to_string()),
item.itype.clone().map(|t| t.to_string()),
latest
.current
.as_ref()
.map(|current| current.as_str() == profile_id)
.unwrap_or(false),
item.url.clone().map(|u| u.to_string()),
)
};
if is_current {
logging!(
info,
Type::Cmd,
"Switch task {} is targeting the current profile {}; skipping validation",
task_id,
profile_id
);
return Ok(());
}
if matches!(profile_type.as_deref(), Some("remote")) {
// Remote profiles must retain a URL so the subsequent refresh job knows where to download.
let has_url = remote_url.as_ref().map(|u| !u.is_empty()).unwrap_or(false);
if !has_url {
return Err({
let msg = format!("Remote profile {} is missing a download URL", profile_id);
msg.into()
});
}
}
if let Some(file) = file_path {
let profiles_dir = dirs::app_profiles_dir().map_err(|err| -> String {
format!("Failed to resolve profiles directory: {}", err).into()
})?;
let path = profiles_dir.join(&file);
let contents = match time::timeout(YAML_READ_TIMEOUT, tokio_fs::read_to_string(&path)).await
{
Ok(Ok(contents)) => contents,
Ok(Err(err)) => {
return Err(
format!("Failed to read profile file {}: {}", path.display(), err).into(),
);
}
Err(_) => {
return Err(format!(
"Timed out reading profile file {} after {:?}",
path.display(),
YAML_READ_TIMEOUT
)
.into());
}
};
let parse_result = AsyncHandler::spawn_blocking(move || {
serde_yaml::from_str::<serde_yaml::Value>(&contents)
})
.await;
match parse_result {
Ok(Ok(_)) => {}
Ok(Err(err)) => {
return Err(
format!("Profile YAML parse failed for {}: {}", path.display(), err).into(),
);
}
Err(join_err) => {
return Err(format!(
"Profile YAML parse task panicked for {}: {}",
path.display(),
join_err
)
.into());
}
}
}
Ok(())
}

View File

@@ -0,0 +1,385 @@
use super::{
CmdResult,
state::{SWITCH_JOB_TIMEOUT, SwitchManager, SwitchRequest, manager},
validation::validate_switch_request,
};
use crate::cmd::StringifyErr;
use crate::{
config::{Config, IProfiles, profiles::profiles_save_file_safe},
core::handle,
logging,
process::AsyncHandler,
utils::{dirs, logging::Type},
};
use futures::FutureExt;
use serde_yaml_ng as serde_yaml;
use smartstring::alias::String as SmartString;
use std::{any::Any, panic::AssertUnwindSafe, time::Duration};
use tokio::{fs as tokio_fs, time};
mod cleanup;
mod state_machine;
pub(super) use cleanup::{
CleanupHandle, schedule_post_switch_failure, schedule_post_switch_success,
};
use state_machine::{CONFIG_APPLY_TIMEOUT, SAVE_PROFILES_TIMEOUT, SwitchStateMachine};
pub(super) use state_machine::{SwitchPanicInfo, SwitchStage};
pub(super) struct SwitchWorkflowResult {
pub success: bool,
pub cleanup: CleanupHandle,
}
pub(super) struct SwitchWorkflowError {
pub info: SwitchPanicInfo,
pub cleanup: CleanupHandle,
}
pub(super) async fn run_switch_job(
manager: &'static SwitchManager,
request: SwitchRequest,
) -> Result<SwitchWorkflowResult, SwitchWorkflowError> {
// Short-circuit cancelled jobs before we allocate resources or emit events.
if request.cancel_token().is_cancelled() {
logging!(
info,
Type::Cmd,
"Switch task {} cancelled before validation",
request.task_id()
);
let cleanup = schedule_post_switch_failure(
request.profile_id().clone(),
request.notify(),
request.task_id(),
);
return Ok(SwitchWorkflowResult {
success: false,
cleanup,
});
}
let profile_id = request.profile_id().clone();
let task_id = request.task_id();
let notify = request.notify();
if let Err(err) = validate_switch_request(task_id, profile_id.as_str()).await {
logging!(
warn,
Type::Cmd,
"Validation failed for switch task {} -> {}: {}",
task_id,
profile_id,
err
);
handle::Handle::notice_message("config_validate::error", err.clone());
let cleanup = schedule_post_switch_failure(profile_id.clone(), notify, task_id);
return Ok(SwitchWorkflowResult {
success: false,
cleanup,
});
}
logging!(
info,
Type::Cmd,
"Starting switch task {} for profile {} (notify={})",
task_id,
profile_id,
notify
);
let pipeline_request = request;
// The state machine owns the heavy lifting. We wrap it with timeout/panic guards so the driver never hangs.
let pipeline = async move {
let target_profile = pipeline_request.profile_id().clone();
SwitchStateMachine::new(
manager,
Some(pipeline_request),
IProfiles {
current: Some(target_profile),
items: None,
},
)
.run()
.await
};
match time::timeout(
SWITCH_JOB_TIMEOUT,
AssertUnwindSafe(pipeline).catch_unwind(),
)
.await
{
Err(_) => {
logging!(
error,
Type::Cmd,
"Profile switch task {} timed out after {:?}",
task_id,
SWITCH_JOB_TIMEOUT
);
handle::Handle::notice_message(
"config_validate::error",
format!("profile switch timed out: {}", profile_id),
);
let cleanup = schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Ok(SwitchWorkflowResult {
success: false,
cleanup,
})
}
Ok(Err(panic_payload)) => {
let panic_message = describe_panic_payload(panic_payload.as_ref());
logging!(
error,
Type::Cmd,
"Panic captured during profile switch task {} ({}): {}",
task_id,
profile_id,
panic_message
);
handle::Handle::notice_message(
"config_validate::panic",
format!("profile switch panic: {}", profile_id),
);
let cleanup = schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Err(SwitchWorkflowError {
info: SwitchPanicInfo::workflow_root(panic_message),
cleanup,
})
}
Ok(Ok(machine_result)) => match machine_result {
Ok(cmd_result) => match cmd_result {
Ok(success) => {
let cleanup =
schedule_post_switch_success(profile_id.clone(), success, notify, task_id);
Ok(SwitchWorkflowResult { success, cleanup })
}
Err(err) => {
logging!(
error,
Type::Cmd,
"Profile switch failed ({}): {}",
profile_id,
err
);
handle::Handle::notice_message("config_validate::error", err.clone());
let cleanup = schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Ok(SwitchWorkflowResult {
success: false,
cleanup,
})
}
},
Err(panic_info) => {
logging!(
error,
Type::Cmd,
"State machine panic during profile switch task {} ({} {:?}): {}",
task_id,
profile_id,
panic_info.stage,
panic_info.detail
);
handle::Handle::notice_message(
"config_validate::panic",
format!("profile switch panic: {}", profile_id),
);
let cleanup = schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Err(SwitchWorkflowError {
info: panic_info,
cleanup,
})
}
},
}
}
/// Allow patch operations (no driver request) to use the same state machine pipeline.
pub(super) async fn patch_profiles_config(profiles: IProfiles) -> CmdResult<bool> {
match SwitchStateMachine::new(manager(), None, profiles)
.run()
.await
{
Ok(result) => result,
Err(panic_info) => Err(format!(
"profile switch panic ({:?}): {}",
panic_info.stage, panic_info.detail
)
.into()),
}
}
/// Parse the target profile YAML on a background thread to catch syntax errors early.
pub(super) async fn validate_profile_yaml(profile: &SmartString) -> CmdResult<bool> {
let file_path = {
let profiles_guard = Config::profiles().await;
let profiles_data = profiles_guard.latest_ref();
match profiles_data.get_item(profile) {
Ok(item) => item.file.as_ref().and_then(|file| {
dirs::app_profiles_dir()
.ok()
.map(|dir| dir.join(file.as_str()))
}),
Err(e) => {
logging!(
error,
Type::Cmd,
"Failed to load target profile metadata: {}",
e
);
return Ok(false);
}
}
};
let Some(path) = file_path else {
return Ok(true);
};
if !path.exists() {
logging!(
error,
Type::Cmd,
"Target profile file does not exist: {}",
path.display()
);
handle::Handle::notice_message(
"config_validate::file_not_found",
format!("{}", path.display()),
);
return Ok(false);
}
let file_read_result =
time::timeout(Duration::from_secs(5), tokio_fs::read_to_string(&path)).await;
match file_read_result {
Ok(Ok(content)) => {
let yaml_parse_result = AsyncHandler::spawn_blocking(move || {
serde_yaml::from_str::<serde_yaml::Value>(&content)
})
.await;
match yaml_parse_result {
Ok(Ok(_)) => {
logging!(info, Type::Cmd, "Target profile YAML syntax is valid");
Ok(true)
}
Ok(Err(err)) => {
let error_msg = format!(" {err}");
logging!(
error,
Type::Cmd,
"Target profile contains YAML syntax errors: {}",
error_msg
);
handle::Handle::notice_message(
"config_validate::yaml_syntax_error",
error_msg.clone(),
);
Ok(false)
}
Err(join_err) => {
let error_msg = format!("YAML parsing task failed: {join_err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::yaml_parse_error",
error_msg.clone(),
);
Ok(false)
}
}
}
Ok(Err(err)) => {
let error_msg = format!("Failed to read target profile file: {err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message("config_validate::file_read_error", error_msg.clone());
Ok(false)
}
Err(_) => {
let error_msg = "Timed out reading profile file (5s)".to_string();
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message("config_validate::file_read_timeout", error_msg.clone());
Err(error_msg.into())
}
}
}
/// Best-effort rollback invoked when a switch fails midway through the pipeline.
pub(super) async fn restore_previous_profile(previous: Option<SmartString>) -> CmdResult<()> {
if let Some(prev_profile) = previous {
logging!(
info,
Type::Cmd,
"Attempting to restore previous configuration: {}",
prev_profile
);
let restore_profiles = IProfiles {
current: Some(prev_profile),
items: None,
};
Config::profiles()
.await
.draft_mut()
.patch_config(restore_profiles)
.stringify_err()?;
if time::timeout(CONFIG_APPLY_TIMEOUT, async {
Config::profiles().await.apply();
})
.await
.is_err()
{
logging!(
warn,
Type::Cmd,
"Restoring previous configuration timed out after {:?}",
CONFIG_APPLY_TIMEOUT
);
return Ok(());
}
AsyncHandler::spawn(|| async move {
let save_future = AsyncHandler::spawn_blocking(|| {
futures::executor::block_on(async { profiles_save_file_safe().await })
});
match time::timeout(SAVE_PROFILES_TIMEOUT, save_future).await {
Ok(join_res) => match join_res {
Ok(Ok(())) => {}
Ok(Err(err)) => {
logging!(
warn,
Type::Cmd,
"Failed to persist restored configuration asynchronously: {}",
err
);
}
Err(join_err) => {
logging!(warn, Type::Cmd, "Blocking save task failed: {}", join_err);
}
},
Err(_) => {
logging!(
warn,
Type::Cmd,
"Persisting restored configuration timed out after {:?}",
SAVE_PROFILES_TIMEOUT
);
}
}
});
}
Ok(())
}
pub(super) fn describe_panic_payload(payload: &(dyn Any + Send)) -> String {
if let Some(message) = payload.downcast_ref::<&str>() {
(*message).to_string()
} else if let Some(message) = payload.downcast_ref::<std::string::String>() {
message.clone()
} else {
"unknown panic".into()
}
}

View File

@@ -0,0 +1,65 @@
use super::super::state::SWITCH_CLEANUP_TIMEOUT;
use crate::{core::handle, logging, process::AsyncHandler, utils::logging::Type};
use smartstring::alias::String as SmartString;
use tokio::time;
pub(crate) type CleanupHandle = tauri::async_runtime::JoinHandle<()>;
pub(crate) fn schedule_post_switch_success(
profile_id: SmartString,
success: bool,
notify: bool,
task_id: u64,
) -> CleanupHandle {
// Post-success cleanup runs detached from the driver so the queue keeps moving.
AsyncHandler::spawn(move || async move {
handle::Handle::notify_profile_switch_finished(
profile_id.clone(),
success,
notify,
task_id,
);
if success {
close_connections_after_switch(profile_id).await;
}
})
}
pub(crate) fn schedule_post_switch_failure(
profile_id: SmartString,
notify: bool,
task_id: u64,
) -> CleanupHandle {
// Failures or cancellations do not alter the active profile, so skip draining live connections.
AsyncHandler::spawn(move || async move {
handle::Handle::notify_profile_switch_finished(profile_id.clone(), false, notify, task_id);
})
}
async fn close_connections_after_switch(profile_id: SmartString) {
match time::timeout(SWITCH_CLEANUP_TIMEOUT, async {
handle::Handle::mihomo().await.close_all_connections().await
})
.await
{
Ok(Ok(())) => {}
Ok(Err(err)) => {
logging!(
warn,
Type::Cmd,
"Failed to close connections after profile switch ({}): {}",
profile_id,
err
);
}
Err(_) => {
logging!(
warn,
Type::Cmd,
"Closing connections after profile switch ({}) timed out after {:?}",
profile_id,
SWITCH_CLEANUP_TIMEOUT
);
}
}
}

View File

@@ -0,0 +1,178 @@
use super::{CmdResult, core::SwitchStage};
use crate::{
cmd::profile_switch::state::{
SwitchCancellation, SwitchHeartbeat, SwitchManager, SwitchRequest, SwitchScope,
},
config::IProfiles,
logging,
utils::logging::Type,
};
use smartstring::alias::String as SmartString;
use tokio::sync::MutexGuard;
pub(super) struct SwitchContext {
pub(super) manager: &'static SwitchManager,
pub(super) request: Option<SwitchRequest>,
pub(super) profiles_patch: Option<IProfiles>,
pub(super) sequence: Option<u64>,
pub(super) target_profile: Option<SmartString>,
pub(super) previous_profile: Option<SmartString>,
pub(super) new_profile_for_event: Option<SmartString>,
pub(super) switch_scope: Option<SwitchScope<'static>>,
pub(super) core_guard: Option<MutexGuard<'static, ()>>,
pub(super) heartbeat: SwitchHeartbeat,
pub(super) task_id: Option<u64>,
pub(super) profile_label: SmartString,
pub(super) active_stage: SwitchStage,
}
impl SwitchContext {
// Captures all mutable data required across states (locks, profile ids, etc).
pub(super) fn new(
manager: &'static SwitchManager,
request: Option<SwitchRequest>,
profiles: IProfiles,
heartbeat: SwitchHeartbeat,
) -> Self {
let task_id = request.as_ref().map(|req| req.task_id());
let profile_label = request
.as_ref()
.map(|req| req.profile_id().clone())
.or_else(|| profiles.current.clone())
.unwrap_or_else(|| SmartString::from("unknown"));
heartbeat.touch();
Self {
manager,
request,
profiles_patch: Some(profiles),
sequence: None,
target_profile: None,
previous_profile: None,
new_profile_for_event: None,
switch_scope: None,
core_guard: None,
heartbeat,
task_id,
profile_label,
active_stage: SwitchStage::Start,
}
}
pub(super) fn ensure_target_profile(&mut self) {
// Lazily determine which profile we're switching to so shared paths (patch vs. driver) behave the same.
if let Some(patch) = self.profiles_patch.as_mut() {
if patch.current.is_none()
&& let Some(request) = self.request.as_ref()
{
patch.current = Some(request.profile_id().clone());
}
self.target_profile = patch.current.clone();
}
}
pub(super) fn take_profiles_patch(&mut self) -> CmdResult<IProfiles> {
self.profiles_patch
.take()
.ok_or_else(|| "profiles patch already consumed".into())
}
pub(super) fn cancel_token(&self) -> Option<SwitchCancellation> {
self.request.as_ref().map(|req| req.cancel_token().clone())
}
pub(super) fn cancelled(&self) -> bool {
self.request
.as_ref()
.map(|req| req.cancel_token().is_cancelled())
.unwrap_or(false)
}
pub(super) fn log_cancelled(&self, stage: &str) {
if let Some(request) = self.request.as_ref() {
logging!(
info,
Type::Cmd,
"Switch task {} cancelled {}; profile={}",
request.task_id(),
stage,
request.profile_id()
);
} else {
logging!(info, Type::Cmd, "Profile switch cancelled {}", stage);
}
}
pub(super) fn should_validate_target(&self) -> bool {
match (&self.target_profile, &self.previous_profile) {
(Some(target), Some(current)) => current != target,
(Some(_), None) => true,
_ => false,
}
}
pub(super) fn stale(&self) -> bool {
self.sequence
.map(|seq| seq < self.manager.latest_request_sequence())
.unwrap_or(false)
}
pub(super) fn sequence(&self) -> u64 {
self.sequence.unwrap_or_else(|| {
logging!(
warn,
Type::Cmd,
"Sequence unexpectedly missing in switch context; defaulting to 0"
);
0
})
}
pub(super) fn record_stage(&mut self, stage: SwitchStage) {
let since_last = self.heartbeat.elapsed();
let previous = self.active_stage;
self.active_stage = stage;
self.heartbeat.set_stage(stage.as_code());
match self.task_id {
Some(task_id) => logging!(
debug,
Type::Cmd,
"Switch task {} (profile={}) transitioned {:?} -> {:?} after {:?}",
task_id,
self.profile_label,
previous,
stage,
since_last
),
None => logging!(
debug,
Type::Cmd,
"Profile patch {} transitioned {:?} -> {:?} after {:?}",
self.profile_label,
previous,
stage,
since_last
),
}
}
pub(super) fn release_core_guard(&mut self) {
self.core_guard = None;
}
pub(super) fn release_switch_scope(&mut self) {
self.switch_scope = None;
}
pub(super) fn release_locks(&mut self) {
self.release_core_guard();
self.release_switch_scope();
}
}
impl Drop for SwitchContext {
fn drop(&mut self) {
self.core_guard.take();
self.switch_scope.take();
}
}

View File

@@ -0,0 +1,284 @@
use super::{CmdResult, context::SwitchContext, describe_panic_payload};
use crate::{
cmd::profile_switch::state::{SwitchHeartbeat, SwitchManager, SwitchRequest},
config::IProfiles,
logging,
utils::logging::Type,
};
use futures::FutureExt;
use std::{
mem,
panic::AssertUnwindSafe,
time::{Duration, Instant},
};
pub(crate) const CONFIG_APPLY_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) const TRAY_UPDATE_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const REFRESH_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const SAVE_PROFILES_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) const SWITCH_IDLE_WAIT_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const SWITCH_IDLE_WAIT_POLL: Duration = Duration::from_millis(25);
pub(crate) const SWITCH_IDLE_WAIT_MAX_BACKOFF: Duration = Duration::from_millis(250);
/// Explicit state machine for profile switching so we can reason about
/// cancellation, stale requests, and side effects at each stage.
pub(crate) struct SwitchStateMachine {
pub(super) ctx: SwitchContext,
state: SwitchState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SwitchStage {
Start,
AcquireCore,
Prepare,
ValidateTarget,
PatchDraft,
UpdateCore,
Finalize,
Workflow,
DriverTask,
}
impl SwitchStage {
pub(crate) fn as_code(self) -> u32 {
match self {
SwitchStage::Start => 0,
SwitchStage::AcquireCore => 1,
SwitchStage::Prepare => 2,
SwitchStage::ValidateTarget => 3,
SwitchStage::PatchDraft => 4,
SwitchStage::UpdateCore => 5,
SwitchStage::Finalize => 6,
SwitchStage::Workflow => 7,
SwitchStage::DriverTask => 8,
}
}
pub(crate) fn from_code(code: u32) -> Option<Self> {
Some(match code {
0 => SwitchStage::Start,
1 => SwitchStage::AcquireCore,
2 => SwitchStage::Prepare,
3 => SwitchStage::ValidateTarget,
4 => SwitchStage::PatchDraft,
5 => SwitchStage::UpdateCore,
6 => SwitchStage::Finalize,
7 => SwitchStage::Workflow,
8 => SwitchStage::DriverTask,
_ => return None,
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct SwitchPanicInfo {
pub(crate) stage: SwitchStage,
pub(crate) detail: String,
}
impl SwitchPanicInfo {
pub(crate) fn new(stage: SwitchStage, detail: String) -> Self {
Self { stage, detail }
}
pub(crate) fn workflow_root(detail: String) -> Self {
Self::new(SwitchStage::Workflow, detail)
}
pub(crate) fn driver_task(detail: String) -> Self {
Self::new(SwitchStage::DriverTask, detail)
}
}
/// High-level state machine nodes executed in strict sequence.
pub(crate) enum SwitchState {
Start,
AcquireCore,
Prepare,
ValidateTarget,
PatchDraft,
UpdateCore,
Finalize(CoreUpdateOutcome),
Complete(bool),
}
/// Result of trying to apply the draft configuration to the core.
pub(crate) enum CoreUpdateOutcome {
Success,
ValidationFailed { message: String },
CoreError { message: String },
Timeout,
}
/// Indicates where a stale request was detected so logs stay descriptive.
pub(crate) enum StaleStage {
AfterLock,
BeforeCoreOperation,
BeforeCoreInteraction,
AfterCoreOperation,
}
impl StaleStage {
pub(super) fn log(&self, ctx: &SwitchContext) {
let sequence = ctx.sequence();
let latest = ctx.manager.latest_request_sequence();
match self {
StaleStage::AfterLock => logging!(
info,
Type::Cmd,
"Detected a newer request after acquiring the lock (sequence: {} < {}), abandoning current request",
sequence,
latest
),
StaleStage::BeforeCoreOperation => logging!(
info,
Type::Cmd,
"Detected a newer request before core operation (sequence: {} < {}), abandoning current request",
sequence,
latest
),
StaleStage::BeforeCoreInteraction => logging!(
info,
Type::Cmd,
"Detected a newer request before core interaction (sequence: {} < {}), abandoning current request",
sequence,
latest
),
StaleStage::AfterCoreOperation => logging!(
info,
Type::Cmd,
"Detected a newer request after core operation (sequence: {} < {}), ignoring current result",
sequence,
latest
),
}
}
}
impl SwitchStateMachine {
pub(crate) fn new(
manager: &'static SwitchManager,
request: Option<SwitchRequest>,
profiles: IProfiles,
) -> Self {
let heartbeat = request
.as_ref()
.map(|req| req.heartbeat().clone())
.unwrap_or_else(SwitchHeartbeat::new);
Self {
ctx: SwitchContext::new(manager, request, profiles, heartbeat),
state: SwitchState::Start,
}
}
pub(crate) async fn run(mut self) -> Result<CmdResult<bool>, SwitchPanicInfo> {
// Drive the state machine until we either complete successfully or bubble up a panic.
loop {
let current_state = mem::replace(&mut self.state, SwitchState::Complete(false));
match current_state {
SwitchState::Complete(result) => return Ok(Ok(result)),
_ => match self.run_state(current_state).await? {
Ok(state) => self.state = state,
Err(err) => return Ok(Err(err)),
},
}
}
}
async fn run_state(
&mut self,
current: SwitchState,
) -> Result<CmdResult<SwitchState>, SwitchPanicInfo> {
match current {
SwitchState::Start => {
self.with_stage(
SwitchStage::Start,
|this| async move { this.handle_start() },
)
.await
}
SwitchState::AcquireCore => {
self.with_stage(SwitchStage::AcquireCore, |this| async move {
this.handle_acquire_core().await
})
.await
}
SwitchState::Prepare => {
self.with_stage(SwitchStage::Prepare, |this| async move {
this.handle_prepare().await
})
.await
}
SwitchState::ValidateTarget => {
self.with_stage(SwitchStage::ValidateTarget, |this| async move {
this.handle_validate_target().await
})
.await
}
SwitchState::PatchDraft => {
self.with_stage(SwitchStage::PatchDraft, |this| async move {
this.handle_patch_draft().await
})
.await
}
SwitchState::UpdateCore => {
self.with_stage(SwitchStage::UpdateCore, |this| async move {
this.handle_update_core().await
})
.await
}
SwitchState::Finalize(outcome) => {
self.with_stage(SwitchStage::Finalize, |this| async move {
this.handle_finalize(outcome).await
})
.await
}
SwitchState::Complete(result) => Ok(Ok(SwitchState::Complete(result))),
}
}
/// Helper that wraps each stage with consistent logging and panic reporting.
async fn with_stage<'a, F, Fut>(
&'a mut self,
stage: SwitchStage,
f: F,
) -> Result<CmdResult<SwitchState>, SwitchPanicInfo>
where
F: FnOnce(&'a mut Self) -> Fut,
Fut: std::future::Future<Output = CmdResult<SwitchState>> + 'a,
{
let sequence = self.ctx.sequence();
let task = self.ctx.task_id;
let profile = self.ctx.profile_label.clone();
logging!(
info,
Type::Cmd,
"Enter {:?} (sequence={}, task={:?}, profile={})",
stage,
sequence,
task,
profile
);
let stage_start = Instant::now();
self.ctx.record_stage(stage);
AssertUnwindSafe(f(self))
.catch_unwind()
.await
.map_err(|payload| {
SwitchPanicInfo::new(stage, describe_panic_payload(payload.as_ref()))
})
.inspect(|_| {
logging!(
info,
Type::Cmd,
"Exit {:?} (sequence={}, task={:?}, profile={}, elapsed={}ms)",
stage,
sequence,
task,
profile,
stage_start.elapsed().as_millis()
);
})
}
}

View File

@@ -0,0 +1,11 @@
mod context;
mod core;
mod stages;
pub(crate) use core::{
CONFIG_APPLY_TIMEOUT, SAVE_PROFILES_TIMEOUT, SwitchPanicInfo, SwitchStage, SwitchStateMachine,
};
pub(super) use super::{
CmdResult, describe_panic_payload, restore_previous_profile, validate_profile_yaml,
};

View File

@@ -0,0 +1,597 @@
use super::{
CmdResult,
core::{
CONFIG_APPLY_TIMEOUT, CoreUpdateOutcome, REFRESH_TIMEOUT, SAVE_PROFILES_TIMEOUT,
SWITCH_IDLE_WAIT_MAX_BACKOFF, SWITCH_IDLE_WAIT_POLL, SWITCH_IDLE_WAIT_TIMEOUT, StaleStage,
SwitchState, SwitchStateMachine, TRAY_UPDATE_TIMEOUT,
},
restore_previous_profile, validate_profile_yaml,
};
use crate::{
config::{Config, profiles::profiles_save_file_safe},
core::{CoreManager, handle, tray::Tray},
logging,
process::AsyncHandler,
utils::logging::Type,
};
use anyhow::Error;
use futures::future;
use smartstring::alias::String as SmartString;
use std::{
pin::Pin,
time::{Duration, Instant},
};
use tokio::time;
impl SwitchStateMachine {
pub(super) fn handle_start(&mut self) -> CmdResult<SwitchState> {
if self.ctx.manager.is_switching() {
logging!(
info,
Type::Cmd,
"Profile switch already in progress; queuing request for task={:?}, profile={}",
self.ctx.task_id,
self.ctx.profile_label
);
}
Ok(SwitchState::AcquireCore)
}
/// Grab the core lock, mark the manager as switching, and compute the target profile.
pub(super) async fn handle_acquire_core(&mut self) -> CmdResult<SwitchState> {
let manager = self.ctx.manager;
let core_guard = manager.core_mutex().lock().await;
if manager.is_switching() {
logging!(
info,
Type::Cmd,
"Active profile switch detected; waiting before acquiring scope"
);
let wait_start = Instant::now();
let mut backoff = SWITCH_IDLE_WAIT_POLL;
while manager.is_switching() {
if self.ctx.cancelled() {
self.ctx
.log_cancelled("while waiting for active switch to finish");
return Ok(SwitchState::Complete(false));
}
if wait_start.elapsed() >= SWITCH_IDLE_WAIT_TIMEOUT {
let message = format!(
"Timed out after {:?} waiting for active profile switch to finish",
SWITCH_IDLE_WAIT_TIMEOUT
);
logging!(error, Type::Cmd, "{}", message);
return Err(message.into());
}
time::sleep(backoff).await;
backoff = backoff.saturating_mul(2).min(SWITCH_IDLE_WAIT_MAX_BACKOFF);
}
let waited = wait_start.elapsed().as_millis();
if waited > 0 {
logging!(
info,
Type::Cmd,
"Waited {}ms for active switch to finish before acquiring scope",
waited
);
}
}
self.ctx.core_guard = Some(core_guard);
self.ctx.switch_scope = Some(manager.begin_switch());
self.ctx.sequence = Some(manager.next_request_sequence());
self.ctx.ensure_target_profile();
logging!(
info,
Type::Cmd,
"Begin modifying configuration; sequence: {}, target profile: {:?}",
self.ctx.sequence(),
self.ctx.target_profile
);
if self.ctx.cancelled() {
self.ctx.log_cancelled("after acquiring core lock");
return Ok(SwitchState::Complete(false));
}
if self.ctx.stale() {
StaleStage::AfterLock.log(&self.ctx);
return Ok(SwitchState::Complete(false));
}
Ok(SwitchState::Prepare)
}
pub(super) async fn handle_prepare(&mut self) -> CmdResult<SwitchState> {
let current_profile = {
let profiles_guard = Config::profiles().await;
profiles_guard.latest_ref().current.clone()
};
logging!(info, Type::Cmd, "Current profile: {:?}", current_profile);
self.ctx.previous_profile = current_profile;
Ok(SwitchState::ValidateTarget)
}
pub(super) async fn handle_validate_target(&mut self) -> CmdResult<SwitchState> {
if self.ctx.cancelled() {
self.ctx.log_cancelled("before validation");
return Ok(SwitchState::Complete(false));
}
if self.ctx.should_validate_target() {
let Some(target) = self.ctx.target_profile.clone() else {
logging!(
error,
Type::Cmd,
"Missing target profile while validation was requested; aborting switch"
);
return Err("missing target profile at validation".into());
};
if !validate_profile_yaml(&target).await? {
return Ok(SwitchState::Complete(false));
}
}
if self.ctx.stale() {
StaleStage::BeforeCoreOperation.log(&self.ctx);
return Ok(SwitchState::Complete(false));
}
Ok(SwitchState::PatchDraft)
}
pub(super) async fn handle_patch_draft(&mut self) -> CmdResult<SwitchState> {
if self.ctx.cancelled() {
self.ctx.log_cancelled("before patching configuration");
return Ok(SwitchState::Complete(false));
}
logging!(
info,
Type::Cmd,
"Updating configuration draft, sequence: {}",
self.ctx.sequence()
);
let patch = self.ctx.take_profiles_patch()?;
self.ctx.new_profile_for_event = patch.current.clone();
let _ = Config::profiles().await.draft_mut().patch_config(patch);
if self.ctx.stale() {
StaleStage::BeforeCoreInteraction.log(&self.ctx);
Config::profiles().await.discard();
return Ok(SwitchState::Complete(false));
}
Ok(SwitchState::UpdateCore)
}
pub(super) async fn handle_update_core(&mut self) -> CmdResult<SwitchState> {
let sequence = self.ctx.sequence();
let task_id = self.ctx.task_id;
let profile = self.ctx.profile_label.clone();
logging!(
info,
Type::Cmd,
"Starting core configuration update, sequence: {}, task={:?}, profile={}",
sequence,
task_id,
profile
);
let heartbeat = self.ctx.heartbeat.clone();
let start = Instant::now();
let mut ticker = time::interval(Duration::from_secs(1));
ticker.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
let update_future = CoreManager::global().update_config();
tokio::pin!(update_future);
let timeout = time::sleep(Duration::from_secs(30));
tokio::pin!(timeout);
let cancel_token = self.ctx.cancel_token();
let mut cancel_notifier: Pin<Box<dyn std::future::Future<Output = ()> + Send>> =
match cancel_token {
Some(token) => Box::pin(async move {
token.cancelled_future().await;
}),
None => Box::pin(future::pending()),
};
enum UpdateOutcome {
Finished(Result<(bool, SmartString), Error>),
Timeout,
Cancelled,
}
let update_outcome = loop {
tokio::select! {
res = &mut update_future => break UpdateOutcome::Finished(res),
_ = &mut timeout => break UpdateOutcome::Timeout,
_ = &mut cancel_notifier => break UpdateOutcome::Cancelled,
_ = ticker.tick() => {
let elapsed_ms = start.elapsed().as_millis();
heartbeat.touch();
match task_id {
Some(id) => logging!(
debug,
Type::Cmd,
"Switch task {} (profile={}) UpdateCore still running (elapsed={}ms)",
id,
profile,
elapsed_ms
),
None => logging!(
debug,
Type::Cmd,
"Profile patch {} UpdateCore still running (elapsed={}ms)",
profile,
elapsed_ms
),
}
}
}
};
let elapsed_ms = start.elapsed().as_millis();
let outcome = match update_outcome {
UpdateOutcome::Finished(Ok((true, _))) => {
logging!(
info,
Type::Cmd,
"Core configuration update succeeded in {}ms",
elapsed_ms
);
CoreUpdateOutcome::Success
}
UpdateOutcome::Finished(Ok((false, msg))) => {
logging!(
warn,
Type::Cmd,
"Core configuration update validation failed in {}ms: {}",
elapsed_ms,
msg
);
CoreUpdateOutcome::ValidationFailed {
message: msg.to_string(),
}
}
UpdateOutcome::Finished(Err(err)) => {
logging!(
error,
Type::Cmd,
"Core configuration update errored in {}ms: {}",
elapsed_ms,
err
);
CoreUpdateOutcome::CoreError {
message: err.to_string(),
}
}
UpdateOutcome::Timeout => {
logging!(
error,
Type::Cmd,
"Core configuration update timed out after {}ms",
elapsed_ms
);
CoreUpdateOutcome::Timeout
}
UpdateOutcome::Cancelled => {
self.ctx.log_cancelled("during core update");
logging!(
info,
Type::Cmd,
"Core configuration update cancelled after {}ms",
elapsed_ms
);
self.ctx.release_locks();
Config::profiles().await.discard();
return Ok(SwitchState::Complete(false));
}
};
self.ctx.release_core_guard();
Ok(SwitchState::Finalize(outcome))
}
pub(super) async fn handle_finalize(
&mut self,
outcome: CoreUpdateOutcome,
) -> CmdResult<SwitchState> {
let next_state = match outcome {
CoreUpdateOutcome::Success => self.finalize_success().await,
CoreUpdateOutcome::ValidationFailed { message } => {
self.finalize_validation_failed(message).await
}
CoreUpdateOutcome::CoreError { message } => self.finalize_core_error(message).await,
CoreUpdateOutcome::Timeout => self.finalize_timeout().await,
};
if next_state.is_err() || matches!(next_state, Ok(SwitchState::Complete(_))) {
self.ctx.release_switch_scope();
}
next_state
}
pub(super) async fn finalize_success(&mut self) -> CmdResult<SwitchState> {
if self.abort_if_stale_post_core().await? {
return Ok(SwitchState::Complete(false));
}
self.log_successful_update();
if !self.apply_config_with_timeout().await? {
logging!(
warn,
Type::Cmd,
"Apply step failed; attempting to restore previous profile before completing"
);
restore_previous_profile(self.ctx.previous_profile.clone()).await?;
return Ok(SwitchState::Complete(false));
}
self.refresh_clash_with_timeout().await;
self.update_tray_tooltip_with_timeout().await;
self.update_tray_menu_with_timeout().await;
if let Err(err) = self.persist_profiles_with_timeout().await {
logging!(
error,
Type::Cmd,
"Persisting new profile configuration failed; attempting to restore previous profile: {}",
err
);
restore_previous_profile(self.ctx.previous_profile.clone()).await?;
return Err(err);
}
self.emit_profile_change_event();
logging!(
debug,
Type::Cmd,
"Finalize success pipeline completed for sequence {}",
self.ctx.sequence()
);
Ok(SwitchState::Complete(true))
}
pub(super) async fn finalize_validation_failed(
&mut self,
message: String,
) -> CmdResult<SwitchState> {
logging!(
warn,
Type::Cmd,
"Configuration validation failed: {}",
message
);
Config::profiles().await.discard();
restore_previous_profile(self.ctx.previous_profile.clone()).await?;
handle::Handle::notice_message("config_validate::error", message);
Ok(SwitchState::Complete(false))
}
pub(super) async fn finalize_core_error(&mut self, message: String) -> CmdResult<SwitchState> {
logging!(
warn,
Type::Cmd,
"Error occurred during update: {}, sequence: {}",
message,
self.ctx.sequence()
);
Config::profiles().await.discard();
handle::Handle::notice_message("config_validate::boot_error", message);
Ok(SwitchState::Complete(false))
}
pub(super) async fn finalize_timeout(&mut self) -> CmdResult<SwitchState> {
let timeout_msg =
"Configuration update timed out (30s); possible validation or core communication stall";
logging!(
error,
Type::Cmd,
"{}, sequence: {}",
timeout_msg,
self.ctx.sequence()
);
Config::profiles().await.discard();
restore_previous_profile(self.ctx.previous_profile.clone()).await?;
handle::Handle::notice_message("config_validate::timeout", timeout_msg);
Ok(SwitchState::Complete(false))
}
pub(super) async fn abort_if_stale_post_core(&mut self) -> CmdResult<bool> {
if self.ctx.stale() {
StaleStage::AfterCoreOperation.log(&self.ctx);
Config::profiles().await.discard();
return Ok(true);
}
Ok(false)
}
pub(super) fn log_successful_update(&self) {
logging!(
info,
Type::Cmd,
"Configuration update succeeded, sequence: {}",
self.ctx.sequence()
);
}
pub(super) async fn apply_config_with_timeout(&mut self) -> CmdResult<bool> {
let apply_result = time::timeout(CONFIG_APPLY_TIMEOUT, async {
Config::profiles().await.apply()
})
.await;
if apply_result.is_ok() {
Ok(true)
} else {
logging!(
warn,
Type::Cmd,
"Applying profile configuration timed out after {:?}",
CONFIG_APPLY_TIMEOUT
);
Config::profiles().await.discard();
Ok(false)
}
}
pub(super) async fn refresh_clash_with_timeout(&self) {
let start = Instant::now();
let result = time::timeout(REFRESH_TIMEOUT, async {
handle::Handle::refresh_clash();
})
.await;
let elapsed = start.elapsed().as_millis();
match result {
Ok(_) => logging!(
debug,
Type::Cmd,
"refresh_clash_with_timeout completed in {}ms",
elapsed
),
Err(_) => logging!(
warn,
Type::Cmd,
"Refreshing Clash state timed out after {:?} (elapsed={}ms)",
REFRESH_TIMEOUT,
elapsed
),
}
}
pub(super) async fn update_tray_tooltip_with_timeout(&self) {
let start = Instant::now();
let update_tooltip = time::timeout(TRAY_UPDATE_TIMEOUT, async {
Tray::global().update_tooltip().await
})
.await;
let elapsed = start.elapsed().as_millis();
if update_tooltip.is_err() {
logging!(
warn,
Type::Cmd,
"Updating tray tooltip timed out after {:?} (elapsed={}ms)",
TRAY_UPDATE_TIMEOUT,
elapsed
);
} else if let Ok(Err(err)) = update_tooltip {
logging!(
warn,
Type::Cmd,
"Failed to update tray tooltip asynchronously: {}",
err
);
} else {
logging!(
debug,
Type::Cmd,
"update_tray_tooltip_with_timeout completed in {}ms",
elapsed
);
}
}
pub(super) async fn update_tray_menu_with_timeout(&self) {
let start = Instant::now();
let update_menu = time::timeout(TRAY_UPDATE_TIMEOUT, async {
Tray::global().update_menu().await
})
.await;
let elapsed = start.elapsed().as_millis();
if update_menu.is_err() {
logging!(
warn,
Type::Cmd,
"Updating tray menu timed out after {:?} (elapsed={}ms)",
TRAY_UPDATE_TIMEOUT,
elapsed
);
} else if let Ok(Err(err)) = update_menu {
logging!(
warn,
Type::Cmd,
"Failed to update tray menu asynchronously: {}",
err
);
} else {
logging!(
debug,
Type::Cmd,
"update_tray_menu_with_timeout completed in {}ms",
elapsed
);
}
}
pub(super) async fn persist_profiles_with_timeout(&self) -> CmdResult<()> {
let start = Instant::now();
let save_future = AsyncHandler::spawn_blocking(|| {
futures::executor::block_on(async { profiles_save_file_safe().await })
});
let elapsed = start.elapsed().as_millis();
match time::timeout(SAVE_PROFILES_TIMEOUT, save_future).await {
Err(_) => {
let message = format!(
"Persisting configuration file timed out after {:?} (elapsed={}ms)",
SAVE_PROFILES_TIMEOUT, elapsed
);
logging!(warn, Type::Cmd, "{}", message);
Err(message.into())
}
Ok(join_result) => match join_result {
Err(join_err) => {
let message = format!(
"Persisting configuration file failed: blocking task join error: {join_err}"
);
logging!(error, Type::Cmd, "{}", message);
Err(message.into())
}
Ok(save_result) => match save_result {
Ok(()) => {
logging!(
debug,
Type::Cmd,
"persist_profiles_with_timeout completed in {}ms",
elapsed
);
Ok(())
}
Err(err) => {
let message = format!("Persisting configuration file failed: {}", err);
logging!(error, Type::Cmd, "{}", message);
Err(message.into())
}
},
},
}
}
pub(super) fn emit_profile_change_event(&self) {
if let Some(current) = self.ctx.new_profile_for_event.clone() {
logging!(
info,
Type::Cmd,
"Emitting configuration change event to frontend: {}, sequence: {}",
current,
self.ctx.sequence()
);
handle::Handle::notify_profile_changed(current);
}
}
}

View File

@@ -1,7 +1,14 @@
use crate::{APP_HANDLE, constants::timing, singleton};
use crate::{
APP_HANDLE, config::Config, constants::timing, logging, singleton, utils::logging::Type,
};
use parking_lot::RwLock;
use serde_json::{Value, json};
use smartstring::alias::String;
use std::{sync::Arc, thread};
use std::{
sync::Arc,
thread,
time::{SystemTime, UNIX_EPOCH},
};
use tauri::{AppHandle, Manager, WebviewWindow};
use tauri_plugin_mihomo::{Mihomo, MihomoExt};
use tokio::sync::RwLockReadGuard;
@@ -66,10 +73,14 @@ impl Handle {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::RefreshClash);
{
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::RefreshClash);
}
}
Self::spawn_proxy_snapshot();
}
pub fn refresh_verge() {
@@ -85,11 +96,37 @@ impl Handle {
}
pub fn notify_profile_changed(profile_id: String) {
Self::send_event(FrontendEvent::ProfileChanged {
current_profile_id: profile_id,
let handle = Self::global();
if handle.is_exiting() {
return;
}
let system_opt = handle.notification_system.read();
if let Some(system) = system_opt.as_ref() {
system.send_event(FrontendEvent::ProfileChanged {
current_profile_id: profile_id,
});
}
}
pub fn notify_profile_switch_finished(
profile_id: String,
success: bool,
notify: bool,
task_id: u64,
) {
Self::send_event(FrontendEvent::ProfileSwitchFinished {
profile_id,
success,
notify,
task_id,
});
}
pub fn notify_rust_panic(message: String, location: String) {
Self::send_event(FrontendEvent::RustPanic { message, location });
}
pub fn notify_timer_updated(profile_index: String) {
Self::send_event(FrontendEvent::TimerUpdated { profile_index });
}
@@ -100,6 +137,86 @@ impl Handle {
pub fn notify_profile_update_completed(uid: String) {
Self::send_event(FrontendEvent::ProfileUpdateCompleted { uid });
Self::spawn_proxy_snapshot();
}
pub fn notify_proxies_updated(payload: Value) {
Self::send_event(FrontendEvent::ProxiesUpdated { payload });
}
pub async fn build_proxy_snapshot() -> Option<Value> {
let mihomo_guard = Self::mihomo().await;
let proxies = match mihomo_guard.get_proxies().await {
Ok(data) => match serde_json::to_value(&data) {
Ok(value) => value,
Err(error) => {
logging!(
warn,
Type::Frontend,
"Failed to serialize proxies snapshot: {error}"
);
return None;
}
},
Err(error) => {
logging!(
warn,
Type::Frontend,
"Failed to fetch proxies for snapshot: {error}"
);
return None;
}
};
drop(mihomo_guard);
let providers_guard = Self::mihomo().await;
let providers_value = match providers_guard.get_proxy_providers().await {
Ok(data) => serde_json::to_value(&data).unwrap_or_else(|error| {
logging!(
warn,
Type::Frontend,
"Failed to serialize proxy providers for snapshot: {error}"
);
Value::Null
}),
Err(error) => {
logging!(
warn,
Type::Frontend,
"Failed to fetch proxy providers for snapshot: {error}"
);
Value::Null
}
};
drop(providers_guard);
let profile_guard = Config::profiles().await;
let profile_id = profile_guard.latest_ref().current.clone();
drop(profile_guard);
let emitted_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as i64)
.unwrap_or(0);
let payload = json!({
"proxies": proxies,
"providers": providers_value,
"profileId": profile_id,
"emittedAt": emitted_at,
});
Some(payload)
}
fn spawn_proxy_snapshot() {
tauri::async_runtime::spawn(async {
if let Some(payload) = Handle::build_proxy_snapshot().await {
Handle::notify_proxies_updated(payload);
}
});
}
pub fn notice_message<S: Into<String>, M: Into<String>>(status: S, msg: M) {

View File

@@ -10,7 +10,10 @@ use anyhow::{Result, anyhow};
use smartstring::alias::String;
use std::{path::PathBuf, time::Instant};
use tauri_plugin_mihomo::Error as MihomoError;
use tokio::time::sleep;
use tokio::time::{sleep, timeout};
const RELOAD_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const MAX_RELOAD_ATTEMPTS: usize = 3;
impl CoreManager {
pub async fn use_default_config(&self, error_key: &str, error_msg: &str) -> Result<()> {
@@ -39,12 +42,38 @@ impl CoreManager {
return Ok((true, String::new()));
}
let start = Instant::now();
let _permit = self
.update_semaphore
.try_acquire()
.map_err(|_| anyhow!("Config update already in progress"))?;
self.perform_config_update().await
let result = self.perform_config_update().await;
match &result {
Ok((success, msg)) => {
logging!(
info,
Type::Core,
"[ConfigUpdate] Finished (success={}, elapsed={}ms, msg={})",
success,
start.elapsed().as_millis(),
msg
);
}
Err(err) => {
logging!(
error,
Type::Core,
"[ConfigUpdate] Failed after {}ms: {}",
start.elapsed().as_millis(),
err
);
}
}
result
}
fn should_update_config(&self) -> Result<bool> {
@@ -62,20 +91,73 @@ impl CoreManager {
}
async fn perform_config_update(&self) -> Result<(bool, String)> {
Config::generate().await?;
logging!(debug, Type::Core, "[ConfigUpdate] Pipeline start");
let total_start = Instant::now();
match CoreConfigValidator::global().validate_config().await {
let mut stage_timer = Instant::now();
Config::generate().await?;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Generation completed in {}ms",
stage_timer.elapsed().as_millis()
);
stage_timer = Instant::now();
let validation_result = CoreConfigValidator::global().validate_config().await;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Validation completed in {}ms",
stage_timer.elapsed().as_millis()
);
match validation_result {
Ok((true, _)) => {
stage_timer = Instant::now();
let run_path = Config::generate_file(ConfigType::Run).await?;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Runtime file generated in {}ms",
stage_timer.elapsed().as_millis()
);
stage_timer = Instant::now();
self.apply_config(run_path).await?;
logging!(
debug,
Type::Core,
"[ConfigUpdate] Core apply completed in {}ms",
stage_timer.elapsed().as_millis()
);
logging!(
debug,
Type::Core,
"[ConfigUpdate] Pipeline succeeded in {}ms",
total_start.elapsed().as_millis()
);
Ok((true, String::new()))
}
Ok((false, error_msg)) => {
Config::runtime().await.discard();
logging!(
warn,
Type::Core,
"[ConfigUpdate] Validation reported failure after {}ms: {}",
total_start.elapsed().as_millis(),
error_msg
);
Ok((false, error_msg))
}
Err(e) => {
Config::runtime().await.discard();
logging!(
error,
Type::Core,
"[ConfigUpdate] Validation errored after {}ms: {}",
total_start.elapsed().as_millis(),
e
);
Err(e)
}
}
@@ -88,17 +170,49 @@ impl CoreManager {
pub(super) async fn apply_config(&self, path: PathBuf) -> Result<()> {
let path_str = dirs::path_to_str(&path)?;
match self.reload_config(path_str).await {
let reload_start = Instant::now();
match self.reload_config_with_retry(path_str).await {
Ok(_) => {
Config::runtime().await.apply();
logging!(info, Type::Core, "Configuration applied");
logging!(
debug,
Type::Core,
"Configuration applied (reload={}ms)",
reload_start.elapsed().as_millis()
);
Ok(())
}
Err(err) if Self::should_restart_on_error(&err) => {
self.retry_with_restart(path_str).await
}
Err(err) => {
if Self::should_restart_for_anyhow(&err) {
logging!(
warn,
Type::Core,
"Reload failed after {}ms with retryable/timeout error; attempting restart: {}",
reload_start.elapsed().as_millis(),
err
);
match self.retry_with_restart(path_str).await {
Ok(_) => return Ok(()),
Err(retry_err) => {
logging!(
error,
Type::Core,
"Reload retry with restart failed: {}",
retry_err
);
Config::runtime().await.discard();
return Err(retry_err);
}
}
}
Config::runtime().await.discard();
logging!(
error,
Type::Core,
"Failed to apply config after {}ms: {}",
reload_start.elapsed().as_millis(),
err
);
Err(anyhow!("Failed to apply config: {}", err))
}
}
@@ -113,17 +227,116 @@ impl CoreManager {
self.restart_core().await?;
sleep(timing::CONFIG_RELOAD_DELAY).await;
self.reload_config(config_path).await?;
self.reload_config_with_retry(config_path).await?;
Config::runtime().await.apply();
logging!(info, Type::Core, "Configuration applied after restart");
Ok(())
}
async fn reload_config(&self, path: &str) -> Result<(), MihomoError> {
handle::Handle::mihomo()
async fn reload_config_with_retry(&self, path: &str) -> Result<()> {
for attempt in 1..=MAX_RELOAD_ATTEMPTS {
let attempt_start = Instant::now();
let reload_future = self.reload_config_once(path);
match timeout(RELOAD_TIMEOUT, reload_future).await {
Ok(Ok(())) => {
logging!(
debug,
Type::Core,
"reload_config attempt {}/{} succeeded in {}ms",
attempt,
MAX_RELOAD_ATTEMPTS,
attempt_start.elapsed().as_millis()
);
return Ok(());
}
Ok(Err(err)) => {
logging!(
warn,
Type::Core,
"reload_config attempt {}/{} failed after {}ms: {}",
attempt,
MAX_RELOAD_ATTEMPTS,
attempt_start.elapsed().as_millis(),
err
);
if attempt == MAX_RELOAD_ATTEMPTS {
return Err(anyhow!(
"Failed to reload config after {} attempts: {}",
attempt,
err
));
}
}
Err(_) => {
logging!(
warn,
Type::Core,
"reload_config attempt {}/{} timed out after {:?}",
attempt,
MAX_RELOAD_ATTEMPTS,
RELOAD_TIMEOUT
);
if attempt == MAX_RELOAD_ATTEMPTS {
return Err(anyhow!(
"Config reload timed out after {:?} ({} attempts)",
RELOAD_TIMEOUT,
MAX_RELOAD_ATTEMPTS
));
}
}
}
}
Err(anyhow!(
"Config reload retry loop exited unexpectedly ({} attempts)",
MAX_RELOAD_ATTEMPTS
))
}
async fn reload_config_once(&self, path: &str) -> Result<(), MihomoError> {
logging!(
info,
Type::Core,
"[ConfigUpdate] reload_config_once begin path={} ",
path
);
let start = Instant::now();
let result = handle::Handle::mihomo()
.await
.reload_config(true, path)
.await
.await;
let elapsed = start.elapsed().as_millis();
match result {
Ok(()) => {
logging!(
info,
Type::Core,
"[ConfigUpdate] reload_config_once succeeded (elapsed={}ms)",
elapsed
);
Ok(())
}
Err(err) => {
logging!(
warn,
Type::Core,
"[ConfigUpdate] reload_config_once failed (elapsed={}ms, err={})",
elapsed,
err
);
Err(err)
}
}
}
fn should_restart_for_anyhow(err: &anyhow::Error) -> bool {
if let Some(mihomo_err) = err.downcast_ref::<MihomoError>() {
return Self::should_restart_on_error(mihomo_err);
}
let msg = err.to_string();
msg.contains("timed out")
|| msg.contains("reload")
|| msg.contains("Failed to apply config")
}
fn should_restart_on_error(err: &MihomoError) -> bool {

View File

@@ -1,38 +1,71 @@
use crate::{
constants::{retry, timing},
logging,
utils::logging::Type,
};
use crate::{constants::retry, logging, utils::logging::Type};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use smartstring::alias::String;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc,
},
thread,
time::Instant,
};
use tauri::{Emitter, WebviewWindow};
use tauri::Emitter;
use tauri::async_runtime;
#[allow(dead_code)] // Temporarily suppress warnings while diagnostics disable certain events
#[derive(Debug, Clone)]
pub enum FrontendEvent {
RefreshClash,
RefreshVerge,
NoticeMessage { status: String, message: String },
ProfileChanged { current_profile_id: String },
TimerUpdated { profile_index: String },
ProfileUpdateStarted { uid: String },
ProfileUpdateCompleted { uid: String },
RefreshProxy,
ProxiesUpdated {
payload: serde_json::Value,
},
NoticeMessage {
status: String,
message: String,
},
ProfileChanged {
current_profile_id: String,
},
ProfileSwitchFinished {
profile_id: String,
success: bool,
notify: bool,
task_id: u64,
},
TimerUpdated {
profile_index: String,
},
ProfileUpdateStarted {
uid: String,
},
ProfileUpdateCompleted {
uid: String,
},
RustPanic {
message: String,
location: String,
},
}
static EMIT_SERIALIZER: Lazy<tokio::sync::Mutex<()>> = Lazy::new(|| tokio::sync::Mutex::new(()));
#[derive(Debug, Default)]
struct EventStats {
total_sent: AtomicU64,
total_errors: AtomicU64,
last_error_time: RwLock<Option<Instant>>,
}
#[derive(Debug, Default)]
#[allow(dead_code)]
struct BufferedProxies {
pending: parking_lot::Mutex<Option<serde_json::Value>>,
in_flight: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct ErrorMessage {
pub status: String,
@@ -47,6 +80,7 @@ pub struct NotificationSystem {
pub(super) is_running: bool,
stats: EventStats,
emergency_mode: RwLock<bool>,
proxies_buffer: Arc<BufferedProxies>,
}
impl Default for NotificationSystem {
@@ -63,6 +97,7 @@ impl NotificationSystem {
is_running: false,
stats: EventStats::default(),
emergency_mode: RwLock::new(false),
proxies_buffer: Arc::new(BufferedProxies::default()),
}
}
@@ -117,13 +152,78 @@ impl NotificationSystem {
return;
};
if system.should_skip_event(&event) {
return;
let event_label = Self::describe_event(&event);
match event {
FrontendEvent::ProxiesUpdated { payload } => {
logging!(
debug,
Type::Frontend,
"Queueing proxies-updated event for buffered emit: {}",
event_label
);
system.enqueue_proxies_updated(payload);
}
other => {
logging!(
debug,
Type::Frontend,
"Queueing event for async emit: {}",
event_label
);
let (event_name, payload_result) = system.serialize_event(other);
let payload = match payload_result {
Ok(value) => value,
Err(err) => {
logging!(
warn,
Type::Frontend,
"Failed to serialize event {}: {}",
event_name,
err
);
return;
}
};
logging!(
debug,
Type::Frontend,
"Dispatching async emit: {}",
event_name
);
let _ = Self::emit_via_app(event_name, payload);
}
}
}
fn enqueue_proxies_updated(&self, payload: serde_json::Value) {
let replaced = {
let mut slot = self.proxies_buffer.pending.lock();
let had_pending = slot.is_some();
*slot = Some(payload);
had_pending
};
if replaced {
logging!(
debug,
Type::Frontend,
"Replaced pending proxies-updated payload with latest snapshot"
);
}
if let Some(window) = super::handle::Handle::get_window() {
system.emit_to_window(&window, event);
thread::sleep(timing::EVENT_EMIT_DELAY);
if self
.proxies_buffer
.in_flight
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let buffer = Arc::clone(&self.proxies_buffer);
async_runtime::spawn(async move {
Self::flush_proxies(buffer).await;
});
}
}
@@ -135,25 +235,95 @@ impl NotificationSystem {
)
}
fn emit_to_window(&self, window: &WebviewWindow, event: FrontendEvent) {
let (event_name, payload) = self.serialize_event(event);
let Ok(payload) = payload else {
self.stats.total_errors.fetch_add(1, Ordering::Relaxed);
return;
};
match window.emit(event_name, payload) {
Ok(_) => {
self.stats.total_sent.fetch_add(1, Ordering::Relaxed);
fn emit_via_app(event_name: &'static str, payload: serde_json::Value) -> Result<(), String> {
let app_handle = super::handle::Handle::app_handle().clone();
let event_name = event_name.to_string();
async_runtime::spawn(async move {
if let Err(err) = app_handle.emit_to("main", event_name.as_str(), payload) {
logging!(
warn,
Type::Frontend,
"emit_to failed for {}: {}",
event_name,
err
);
}
Err(e) => {
logging!(warn, Type::Frontend, "Event emit failed: {}", e);
self.handle_emit_error();
});
Ok(())
}
async fn flush_proxies(buffer: Arc<BufferedProxies>) {
const EVENT_NAME: &str = "proxies-updated";
loop {
let payload_opt = {
let mut guard = buffer.pending.lock();
guard.take()
};
let Some(payload) = payload_opt else {
buffer.in_flight.store(false, Ordering::Release);
if buffer.pending.lock().is_some()
&& buffer
.in_flight
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
continue;
}
break;
};
logging!(debug, Type::Frontend, "Dispatching buffered proxies emit");
let _guard = EMIT_SERIALIZER.lock().await;
if let Err(err) = Self::emit_via_app(EVENT_NAME, payload) {
logging!(
warn,
Type::Frontend,
"Buffered proxies emit failed: {}",
err
);
}
}
}
fn describe_event(event: &FrontendEvent) -> String {
match event {
FrontendEvent::RefreshClash => "RefreshClash".into(),
FrontendEvent::RefreshVerge => "RefreshVerge".into(),
FrontendEvent::RefreshProxy => "RefreshProxy".into(),
FrontendEvent::ProxiesUpdated { .. } => "ProxiesUpdated".into(),
FrontendEvent::NoticeMessage { status, .. } => {
format!("NoticeMessage({})", status).into()
}
FrontendEvent::ProfileChanged { current_profile_id } => {
format!("ProfileChanged({})", current_profile_id).into()
}
FrontendEvent::ProfileSwitchFinished {
profile_id,
task_id,
..
} => format!(
"ProfileSwitchFinished(profile={}, task={})",
profile_id, task_id
)
.into(),
FrontendEvent::TimerUpdated { profile_index } => {
format!("TimerUpdated({})", profile_index).into()
}
FrontendEvent::ProfileUpdateStarted { uid } => {
format!("ProfileUpdateStarted({})", uid).into()
}
FrontendEvent::ProfileUpdateCompleted { uid } => {
format!("ProfileUpdateCompleted({})", uid).into()
}
FrontendEvent::RustPanic { message, .. } => format!("RustPanic({})", message).into(),
}
}
#[allow(dead_code)]
fn serialize_event(
&self,
event: FrontendEvent,
@@ -167,9 +337,25 @@ impl NotificationSystem {
"verge://notice-message",
serde_json::to_value((status, message)),
),
FrontendEvent::RefreshProxy => ("verge://refresh-proxy-config", Ok(json!("yes"))),
FrontendEvent::ProxiesUpdated { payload } => ("proxies-updated", Ok(payload)),
FrontendEvent::ProfileChanged { current_profile_id } => {
("profile-changed", Ok(json!(current_profile_id)))
}
FrontendEvent::ProfileSwitchFinished {
profile_id,
success,
notify,
task_id,
} => (
"profile-switch-finished",
Ok(json!({
"profileId": profile_id,
"success": success,
"notify": notify,
"taskId": task_id
})),
),
FrontendEvent::TimerUpdated { profile_index } => {
("verge://timer-updated", Ok(json!(profile_index)))
}
@@ -179,6 +365,10 @@ impl NotificationSystem {
FrontendEvent::ProfileUpdateCompleted { uid } => {
("profile-update-completed", Ok(json!({ "uid": uid })))
}
FrontendEvent::RustPanic { message, location } => (
"rust-panic",
Ok(json!({ "message": message, "location": location })),
),
}
}
@@ -204,10 +394,19 @@ impl NotificationSystem {
}
if let Some(sender) = &self.sender {
sender.send(event).is_ok()
} else {
false
if sender.send(event).is_err() {
logging!(
warn,
Type::Frontend,
"Failed to send event to worker thread"
);
self.handle_emit_error();
return false;
}
return true;
}
false
}
pub fn shutdown(&mut self) {

View File

@@ -192,6 +192,7 @@ mod app_init {
cmd::get_profiles,
cmd::enhance_profiles,
cmd::patch_profiles_config,
cmd::switch_profile,
cmd::view_profile,
cmd::patch_profile,
cmd::create_profile,
@@ -202,6 +203,8 @@ mod app_init {
cmd::read_profile_file,
cmd::save_profile_file,
cmd::get_next_update_time,
cmd::get_profile_switch_status,
cmd::get_profile_switch_events,
cmd::script_validate_notice,
cmd::validate_script_file,
cmd::create_local_backup,
@@ -218,6 +221,7 @@ mod app_init {
cmd::get_system_info,
cmd::get_unlock_items,
cmd::check_media_unlock,
cmd::frontend_log,
]
}
}
@@ -356,6 +360,28 @@ pub fn run() {
}
}
std::panic::set_hook(Box::new(|info| {
let payload = info
.payload()
.downcast_ref::<&'static str>()
.map(|s| (*s).to_string())
.or_else(|| info.payload().downcast_ref::<String>().cloned())
.unwrap_or_else(|| "Unknown panic".to_string());
let location = info
.location()
.map(|loc| format!("{}:{}", loc.file(), loc.line()))
.unwrap_or_else(|| "unknown location".to_string());
logging!(
error,
Type::System,
"Rust panic captured: {} @ {}",
payload,
location
);
handle::Handle::notify_rust_panic(payload.into(), location.into());
}));
#[cfg(feature = "clippy")]
let context = tauri::test::mock_context(tauri::test::noop_assets());
#[cfg(feature = "clippy")]

View File

@@ -68,6 +68,13 @@ impl<T: Clone + ToOwned> Draft<Box<T>> {
})
}
/// 尝试获取最新只读视图,若当前持有写锁则返回 `None`
pub fn try_latest_ref(&self) -> Option<MappedRwLockReadGuard<'_, Box<T>>> {
self.inner
.try_read()
.map(|guard| RwLockReadGuard::map(guard, |inner| inner.1.as_ref().unwrap_or(&inner.0)))
}
/// 提交草稿,返回旧正式数据
pub fn apply(&self) -> Option<Box<T>> {
let mut inner = self.inner.write();