mirror of
https://github.com/clash-verge-rev/clash-verge-rev.git
synced 2026-01-28 16:30:52 +08:00
feat: migrate logs API from REST to IPC streaming (#4277)
* feat: migrate logs API from REST to IPC streaming - Replace REST API `/logs` calls with IPC streaming implementation - Add new `src-tauri/src/ipc/logs.rs` with `LogsMonitor` for real-time log streaming - Implement duplicate stream prevention with level tracking - Add frontend-backend communication via Tauri commands for log management - Remove WebSocket compatibility, maintain IPC-only mode - Fix duplicate monitoring task startup when toggling log service - Add proper task lifecycle management with JoinHandle cleanup * refactor: remove dead code from logs.rs to fix clippy warnings - Remove unused `timestamp` field from LogItem struct - Remove unused `client` field from LogsMonitor struct - Remove unused methods: `is_fresh`, `get_current_monitoring_level`, `get_current_logs` - Simplify LogsMonitor initialization by removing client dependency - All clippy warnings with -D warnings now resolved * refactor: extract duplicate fmt_bytes function to utils module - Create new utils/format.rs module with fmt_bytes function - Remove duplicate fmt_bytes implementations from traffic.rs and memory.rs - Update imports to use shared utils::format::fmt_bytes - Add comprehensive unit tests for fmt_bytes function - Ensure DRY principle compliance and code maintainability
This commit is contained in:
@@ -1,7 +1,12 @@
|
||||
use super::CmdResult;
|
||||
use crate::{
|
||||
config::*, core::*, feat, ipc::IpcManager, process::AsyncHandler,
|
||||
state::proxy::ProxyRequestCache, wrap_err,
|
||||
config::*,
|
||||
core::*,
|
||||
feat,
|
||||
ipc::{self, IpcManager},
|
||||
process::AsyncHandler,
|
||||
state::proxy::ProxyRequestCache,
|
||||
wrap_err,
|
||||
};
|
||||
use serde_yaml::Mapping;
|
||||
use std::time::Duration;
|
||||
@@ -572,3 +577,23 @@ pub async fn is_clash_debug_enabled() -> CmdResult<bool> {
|
||||
pub async fn clash_gc() -> CmdResult {
|
||||
wrap_err!(IpcManager::global().gc().await)
|
||||
}
|
||||
|
||||
/// 获取日志 (使用新的流式实现)
|
||||
#[tauri::command]
|
||||
pub async fn get_clash_logs(level: Option<String>) -> CmdResult<serde_json::Value> {
|
||||
Ok(ipc::get_logs_json(level).await)
|
||||
}
|
||||
|
||||
/// 启动日志监控
|
||||
#[tauri::command]
|
||||
pub async fn start_logs_monitoring(level: Option<String>) -> CmdResult {
|
||||
ipc::start_logs_monitoring(level).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 清除日志
|
||||
#[tauri::command]
|
||||
pub async fn clear_logs() -> CmdResult {
|
||||
ipc::clear_logs().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -382,29 +382,5 @@ impl IpcManager {
|
||||
}
|
||||
}
|
||||
|
||||
// 流量数据相关
|
||||
#[allow(dead_code)]
|
||||
pub async fn get_traffic(&self) -> AnyResult<serde_json::Value> {
|
||||
let url = "/traffic";
|
||||
logging!(info, Type::Ipc, true, "IPC: 发送 GET 请求到 {}", url);
|
||||
let result = self.send_request("GET", url, None).await;
|
||||
logging!(
|
||||
info,
|
||||
Type::Ipc,
|
||||
true,
|
||||
"IPC: /traffic 请求结果: {:?}",
|
||||
result
|
||||
);
|
||||
result
|
||||
}
|
||||
|
||||
// 内存相关
|
||||
#[allow(dead_code)]
|
||||
pub async fn get_memory(&self) -> AnyResult<serde_json::Value> {
|
||||
let url = "/memory";
|
||||
logging!(info, Type::Ipc, true, "IPC: 发送 GET 请求到 {}", url);
|
||||
let result = self.send_request("GET", url, None).await;
|
||||
logging!(info, Type::Ipc, true, "IPC: /memory 请求结果: {:?}", result);
|
||||
result
|
||||
}
|
||||
// 日志相关功能已迁移到 logs.rs 模块,使用流式处理
|
||||
}
|
||||
|
||||
295
src-tauri/src/ipc/logs.rs
Normal file
295
src-tauri/src/ipc/logs.rs
Normal file
@@ -0,0 +1,295 @@
|
||||
use kode_bridge::IpcStreamClient;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, OnceLock},
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::{sync::RwLock, task::JoinHandle, time::Duration};
|
||||
|
||||
use crate::{
|
||||
logging,
|
||||
utils::{dirs::ipc_path, logging::Type},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct LogData {
|
||||
#[serde(rename = "type")]
|
||||
pub log_type: String,
|
||||
pub payload: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LogItem {
|
||||
pub log_type: String,
|
||||
pub payload: String,
|
||||
pub time: String,
|
||||
}
|
||||
|
||||
impl LogItem {
|
||||
fn new(log_type: String, payload: String) -> Self {
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
// Simple time formatting (HH:MM:SS)
|
||||
let hours = (now / 3600) % 24;
|
||||
let minutes = (now / 60) % 60;
|
||||
let seconds = now % 60;
|
||||
let time_str = format!("{hours:02}:{minutes:02}:{seconds:02}");
|
||||
|
||||
Self {
|
||||
log_type,
|
||||
payload,
|
||||
time: time_str,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CurrentLogs {
|
||||
pub logs: VecDeque<LogItem>,
|
||||
pub level: String,
|
||||
pub last_updated: Instant,
|
||||
}
|
||||
|
||||
impl Default for CurrentLogs {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
logs: VecDeque::with_capacity(1000),
|
||||
level: "info".to_string(),
|
||||
last_updated: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Logs monitor with streaming support
|
||||
pub struct LogsMonitor {
|
||||
current: Arc<RwLock<CurrentLogs>>,
|
||||
task_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
|
||||
current_monitoring_level: Arc<RwLock<Option<String>>>,
|
||||
}
|
||||
|
||||
static INSTANCE: OnceLock<LogsMonitor> = OnceLock::new();
|
||||
|
||||
impl LogsMonitor {
|
||||
pub fn global() -> &'static LogsMonitor {
|
||||
INSTANCE.get_or_init(|| {
|
||||
let instance = LogsMonitor::new();
|
||||
logging!(info, Type::Ipc, true, "LogsMonitor initialized");
|
||||
instance
|
||||
})
|
||||
}
|
||||
|
||||
fn new() -> Self {
|
||||
let current = Arc::new(RwLock::new(CurrentLogs::default()));
|
||||
|
||||
Self {
|
||||
current,
|
||||
task_handle: Arc::new(RwLock::new(None)),
|
||||
current_monitoring_level: Arc::new(RwLock::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_monitoring(&self, level: Option<String>) {
|
||||
let filter_level = level.clone().unwrap_or_else(|| "info".to_string());
|
||||
|
||||
// Check if we're already monitoring the same level
|
||||
{
|
||||
let current_level = self.current_monitoring_level.read().await;
|
||||
if let Some(existing_level) = current_level.as_ref() {
|
||||
if existing_level == &filter_level {
|
||||
logging!(
|
||||
info,
|
||||
Type::Ipc,
|
||||
true,
|
||||
"LogsMonitor: Already monitoring level '{}', skipping duplicate request",
|
||||
filter_level
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop existing monitoring task if level changed or first time
|
||||
{
|
||||
let mut handle = self.task_handle.write().await;
|
||||
if let Some(task) = handle.take() {
|
||||
task.abort();
|
||||
logging!(
|
||||
info,
|
||||
Type::Ipc,
|
||||
true,
|
||||
"LogsMonitor: Stopped previous monitoring task (level changed)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Update current monitoring level
|
||||
{
|
||||
let mut current_level = self.current_monitoring_level.write().await;
|
||||
*current_level = Some(filter_level.clone());
|
||||
}
|
||||
|
||||
let monitor_current = self.current.clone();
|
||||
let ipc_path_buf = ipc_path().unwrap();
|
||||
let ipc_path = ipc_path_buf.to_str().unwrap_or_default();
|
||||
let client = IpcStreamClient::new(ipc_path).unwrap();
|
||||
|
||||
// Update current level in data structure
|
||||
{
|
||||
let mut current = monitor_current.write().await;
|
||||
current.level = filter_level.clone();
|
||||
}
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
loop {
|
||||
let url = if filter_level == "info" {
|
||||
"/logs".to_string()
|
||||
} else {
|
||||
let level_param = if filter_level == "all" {
|
||||
"debug"
|
||||
} else {
|
||||
&filter_level
|
||||
};
|
||||
format!("/logs?level={level_param}")
|
||||
};
|
||||
|
||||
logging!(
|
||||
info,
|
||||
Type::Ipc,
|
||||
true,
|
||||
"LogsMonitor: Starting stream for {}",
|
||||
url
|
||||
);
|
||||
|
||||
let _ = client
|
||||
.get(&url)
|
||||
.timeout(Duration::from_secs(30))
|
||||
.process_lines(|line| {
|
||||
if let Ok(log_data) = serde_json::from_str::<LogData>(line.trim()) {
|
||||
// Filter logs based on level if needed
|
||||
let should_include = match filter_level.as_str() {
|
||||
"all" => true,
|
||||
level => log_data.log_type.to_lowercase() == level.to_lowercase(),
|
||||
};
|
||||
|
||||
if should_include {
|
||||
let log_item = LogItem::new(log_data.log_type, log_data.payload);
|
||||
|
||||
tokio::spawn({
|
||||
let current = monitor_current.clone();
|
||||
async move {
|
||||
let mut logs = current.write().await;
|
||||
|
||||
// Add new log
|
||||
logs.logs.push_back(log_item);
|
||||
|
||||
// Keep only the last 1000 logs
|
||||
if logs.logs.len() > 1000 {
|
||||
logs.logs.pop_front();
|
||||
}
|
||||
|
||||
logs.last_updated = Instant::now();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
|
||||
// Wait before retrying
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
}
|
||||
});
|
||||
|
||||
// Store the task handle
|
||||
{
|
||||
let mut handle = self.task_handle.write().await;
|
||||
*handle = Some(task);
|
||||
}
|
||||
|
||||
logging!(
|
||||
info,
|
||||
Type::Ipc,
|
||||
true,
|
||||
"LogsMonitor: Started new monitoring task for level: {:?}",
|
||||
level
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn current(&self) -> CurrentLogs {
|
||||
self.current.read().await.clone()
|
||||
}
|
||||
|
||||
pub async fn clear_logs(&self) {
|
||||
let mut current = self.current.write().await;
|
||||
current.logs.clear();
|
||||
current.last_updated = Instant::now();
|
||||
|
||||
// Also reset monitoring level when clearing logs
|
||||
{
|
||||
let mut monitoring_level = self.current_monitoring_level.write().await;
|
||||
*monitoring_level = None;
|
||||
}
|
||||
|
||||
// Abort current monitoring task
|
||||
{
|
||||
let mut handle = self.task_handle.write().await;
|
||||
if let Some(task) = handle.take() {
|
||||
task.abort();
|
||||
logging!(
|
||||
info,
|
||||
Type::Ipc,
|
||||
true,
|
||||
"LogsMonitor: Stopped monitoring task due to clear_logs"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_logs_as_json(&self, level: Option<String>) -> serde_json::Value {
|
||||
let current = self.current().await;
|
||||
|
||||
let filtered_logs: Vec<serde_json::Value> = current
|
||||
.logs
|
||||
.iter()
|
||||
.filter(|log| {
|
||||
if let Some(ref filter_level) = level {
|
||||
if filter_level == "all" {
|
||||
true
|
||||
} else {
|
||||
log.log_type.to_lowercase() == filter_level.to_lowercase()
|
||||
}
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.map(|log| {
|
||||
serde_json::json!({
|
||||
"type": log.log_type,
|
||||
"payload": log.payload,
|
||||
"time": log.time
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
serde_json::Value::Array(filtered_logs)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_logs_monitoring(level: Option<String>) {
|
||||
LogsMonitor::global().start_monitoring(level).await;
|
||||
}
|
||||
|
||||
pub async fn clear_logs() {
|
||||
LogsMonitor::global().clear_logs().await;
|
||||
}
|
||||
|
||||
pub async fn get_logs_json(level: Option<String>) -> serde_json::Value {
|
||||
LogsMonitor::global().get_logs_as_json(level).await
|
||||
}
|
||||
@@ -8,7 +8,7 @@ use tokio::{sync::RwLock, time::Duration};
|
||||
|
||||
use crate::{
|
||||
logging,
|
||||
utils::{dirs::ipc_path, logging::Type},
|
||||
utils::{dirs::ipc_path, format::fmt_bytes, logging::Type},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
@@ -101,16 +101,6 @@ impl MemoryMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
fn fmt_bytes(bytes: u64) -> String {
|
||||
const UNITS: &[&str] = &["B", "KB", "MB", "GB"];
|
||||
let (mut val, mut unit) = (bytes as f64, 0);
|
||||
while val >= 1024.0 && unit < 3 {
|
||||
val /= 1024.0;
|
||||
unit += 1;
|
||||
}
|
||||
format!("{:.1}{}", val, UNITS[unit])
|
||||
}
|
||||
|
||||
pub async fn get_current_memory() -> CurrentMemory {
|
||||
MemoryMonitor::global().current().await
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
pub mod general;
|
||||
pub mod logs;
|
||||
pub mod memory;
|
||||
pub mod traffic;
|
||||
|
||||
pub use general::IpcManager;
|
||||
pub use logs::{clear_logs, get_logs_json, start_logs_monitoring};
|
||||
pub use memory::{get_current_memory, get_formatted_memory};
|
||||
pub use traffic::{get_current_traffic, get_formatted_traffic};
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ use tokio::{sync::RwLock, time::Duration};
|
||||
|
||||
use crate::{
|
||||
logging,
|
||||
utils::{dirs::ipc_path, logging::Type},
|
||||
utils::{dirs::ipc_path, format::fmt_bytes, logging::Type},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
@@ -119,16 +119,6 @@ impl TrafficMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
fn fmt_bytes(bytes: u64) -> String {
|
||||
const UNITS: &[&str] = &["B", "KB", "MB", "GB"];
|
||||
let (mut val, mut unit) = (bytes as f64, 0);
|
||||
while val >= 1024.0 && unit < 3 {
|
||||
val /= 1024.0;
|
||||
unit += 1;
|
||||
}
|
||||
format!("{:.1}{}", val, UNITS[unit])
|
||||
}
|
||||
|
||||
pub async fn get_current_traffic() -> CurrentTraffic {
|
||||
TrafficMonitor::global().current().await
|
||||
}
|
||||
|
||||
@@ -289,6 +289,9 @@ pub fn run() {
|
||||
cmd::get_group_proxy_delays,
|
||||
cmd::is_clash_debug_enabled,
|
||||
cmd::clash_gc,
|
||||
cmd::get_clash_logs,
|
||||
cmd::start_logs_monitoring,
|
||||
cmd::clear_logs,
|
||||
cmd::get_traffic_data,
|
||||
cmd::get_memory_data,
|
||||
cmd::get_formatted_traffic_data,
|
||||
|
||||
25
src-tauri/src/utils/format.rs
Normal file
25
src-tauri/src/utils/format.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
/// Format bytes into human readable string (B, KB, MB, GB)
|
||||
pub fn fmt_bytes(bytes: u64) -> String {
|
||||
const UNITS: &[&str] = &["B", "KB", "MB", "GB"];
|
||||
let (mut val, mut unit) = (bytes as f64, 0);
|
||||
while val >= 1024.0 && unit < 3 {
|
||||
val /= 1024.0;
|
||||
unit += 1;
|
||||
}
|
||||
format!("{:.1}{}", val, UNITS[unit])
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_fmt_bytes() {
|
||||
assert_eq!(fmt_bytes(0), "0.0B");
|
||||
assert_eq!(fmt_bytes(512), "512.0B");
|
||||
assert_eq!(fmt_bytes(1024), "1.0KB");
|
||||
assert_eq!(fmt_bytes(1536), "1.5KB");
|
||||
assert_eq!(fmt_bytes(1024 * 1024), "1.0MB");
|
||||
assert_eq!(fmt_bytes(1024 * 1024 * 1024), "1.0GB");
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod autostart;
|
||||
pub mod dirs;
|
||||
pub mod format;
|
||||
pub mod help;
|
||||
pub mod i18n;
|
||||
pub mod init;
|
||||
|
||||
Reference in New Issue
Block a user