Files
RhythmicWallpaper/IpcLibrary/Server/IPCServer.cs

259 lines
10 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using IpcLibrary.Connection;
using IpcLibrary.Core;
using IpcLibrary.Core.Exceptions;
using IpcLibrary.Serialization;
using IpcLibrary.Services;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace IpcLibrary.Server {
/// <summary>
/// IPC服务器实现
/// </summary>
public class IPCServer : IIPCServer {
private readonly NamedPipeConnectionManager _connectionManager;
private readonly IServiceRegistry _serviceRegistry;
private readonly HeartbeatManager _heartbeatManager;
private readonly MessageRouter _messageRouter;
private readonly IPCConfiguration _configuration;
private readonly string _serverId;
private bool _disposed;
public bool IsRunning { get; private set; }
public IReadOnlyList<ProcessInfo> ConnectedProcesses => _connectionManager.GetAllProcesses();
public DateTime StartTime { get; internal set; }
public event EventHandler<ProcessInfo> ProcessConnected;
public event EventHandler<ProcessInfo> ProcessDisconnected;
public event EventHandler<IPCMessage> MessageReceived;
public IPCServer(IPCConfiguration configuration = null) {
_configuration = configuration ?? new IPCConfiguration();
_serverId = $"Server_{Environment.ProcessId}_{Guid.NewGuid():N}";
var serializer = new CompositeMessageSerializer();
_connectionManager = new NamedPipeConnectionManager(serializer, _configuration, "IPCPipe");
_serviceRegistry = new ServiceRegistry();
_heartbeatManager = new HeartbeatManager();
var methodCallHandler = new MethodCallHandler(_serviceRegistry);
_messageRouter = new MessageRouter(methodCallHandler);
InitializeEventHandlers();
StartTime = DateTime.UtcNow;
}
public async Task StartAsync(string address) {
if (IsRunning)
return;
try {
Console.WriteLine($"IPC服务器已启动地址: {address}");
if (_connectionManager is NamedPipeConnectionManager namedPipeManager) {
await namedPipeManager.StartServerAsync();
}
IsRunning = true;
if (_configuration.EnableHeartbeat) {
_heartbeatManager.SetHeartbeatInterval(_configuration.HeartbeatInterval);
_heartbeatManager.SetTimeoutThreshold(_configuration.ConnectionTimeout);
}
Console.WriteLine($"IPC服务器已启动地址: {address}");
} catch (Exception ex) {
throw new ConnectionException($"启动服务器失败: {ex.Message}", ex);
}
}
public async Task StopAsync() {
if (!IsRunning)
return;
try {
// 通知所有连接的进程服务器即将关闭
var shutdownMessage = new IPCMessage {
Type = MessageType.Notification,
Method = "ServerShutdown",
SourceProcessId = _serverId
};
await _connectionManager.BroadcastMessageAsync(shutdownMessage);
// 断开所有连接
var processes = ConnectedProcesses.ToList();
foreach (var process in processes) {
await _connectionManager.DisconnectFromProcessAsync(process.ProcessId);
}
IsRunning = false;
Console.WriteLine("IPC服务器已停止");
} catch (Exception ex) {
throw new ConnectionException($"停止服务器失败: {ex.Message}", ex);
}
}
public async Task<T> CallMethodAsync<T>(string targetProcessId, string serviceName, string methodName, params object[] parameters) {
var result = await CallMethodAsync(targetProcessId, serviceName, methodName, typeof(T), parameters);
return result is T typedResult ? typedResult : default(T);
}
public async Task<object> CallMethodAsync(string targetProcessId, string serviceName, string methodName, Type returnType, params object[] parameters) {
if (!IsRunning)
throw new InvalidOperationException("服务器未运行");
try {
var request = new MethodCallRequest {
ServiceName = serviceName,
MethodName = methodName,
Parameters = parameters,
ParameterTypes = parameters != null ? Array.ConvertAll(parameters, p => p?.GetType()) : new Type[0]
};
var message = new IPCMessage {
Type = MessageType.Request,
Method = "CallMethod",
Parameters = new object[] { request },
SourceProcessId = _serverId,
TargetProcessId = targetProcessId
};
var response = await _messageRouter.SendRequestAsync(
message,
msg => _connectionManager.SendMessageAsync(targetProcessId, msg),
_configuration.MethodCallTimeout);
if (response.Type == MessageType.Error) {
throw new MethodCallException(response.Error);
}
if (response.Result is MethodCallResponse methodResponse) {
if (!methodResponse.IsSuccess) {
throw new MethodCallException("方法调用失败", methodResponse.Exception);
}
return methodResponse.Result;
}
return response.Result;
} catch (Exception ex) when (!(ex is IPCException)) {
throw new MethodCallException($"调用方法失败: {ex.Message}", ex);
}
}
public async Task SendNotificationAsync(string targetProcessId, string method, params object[] parameters) {
if (!IsRunning)
throw new InvalidOperationException("服务器未运行");
var message = new IPCMessage {
Type = MessageType.Notification,
Method = method,
Parameters = parameters,
SourceProcessId = _serverId,
TargetProcessId = targetProcessId
};
await _connectionManager.SendMessageAsync(targetProcessId, message);
}
public async Task BroadcastNotificationAsync(string method, params object[] parameters) {
if (!IsRunning)
throw new InvalidOperationException("服务器未运行");
var message = new IPCMessage {
Type = MessageType.Notification,
Method = method,
Parameters = parameters,
SourceProcessId = _serverId
};
await _connectionManager.BroadcastMessageAsync(message);
}
public async Task RegisterServiceAsync<T>(string serviceName, T serviceInstance) {
await _serviceRegistry.RegisterServiceAsync(serviceName, serviceInstance);
}
public async Task UnregisterServiceAsync(string serviceName) {
await _serviceRegistry.UnregisterServiceAsync(serviceName);
}
private void InitializeEventHandlers() {
_connectionManager.ProcessConnected += OnProcessConnected;
_connectionManager.ProcessDisconnected += OnProcessDisconnected;
_heartbeatManager.ProcessTimeout += async (sender, processId) => {
Console.WriteLine($"进程 {processId} 心跳超时,断开连接");
await _connectionManager.DisconnectFromProcessAsync(processId);
};
}
private void OnProcessConnected(object sender, ProcessInfo processInfo) {
Console.WriteLine($"进程已连接: {processInfo.ProcessId}");
if (_configuration.EnableHeartbeat) {
_heartbeatManager.StartMonitoring(processInfo.ProcessId);
}
ProcessConnected?.Invoke(this, processInfo);
}
private void OnProcessDisconnected(object sender, ProcessInfo processInfo) {
Console.WriteLine($"进程已断开: {processInfo.ProcessId}");
_heartbeatManager.StopMonitoring(processInfo.ProcessId);
ProcessDisconnected?.Invoke(this, processInfo);
}
private async Task HandleIncomingMessage(IPCMessage message) {
try {
// 更新心跳
if (_configuration.EnableHeartbeat && message.Type == MessageType.Heartbeat) {
_heartbeatManager.UpdateHeartbeat(message.SourceProcessId);
return;
}
// 处理消息
var response = await _messageRouter.HandleMessageAsync(message);
if (response != null) {
await _connectionManager.SendMessageAsync(message.SourceProcessId, response);
}
MessageReceived?.Invoke(this, message);
} catch (Exception ex) {
Console.WriteLine($"处理消息时发生错误: {ex.Message}");
// 发送错误响应
var errorResponse = new IPCMessage {
Id = message.Id,
Type = MessageType.Error,
Error = ex.Message,
SourceProcessId = _serverId,
TargetProcessId = message.SourceProcessId
};
await _connectionManager.SendMessageAsync(message.SourceProcessId, errorResponse);
}
}
public void Dispose() {
if (_disposed)
return;
_disposed = true;
try {
StopAsync().Wait(TimeSpan.FromSeconds(10));
} catch (Exception ex) {
Console.WriteLine($"释放资源时发生错误: {ex.Message}");
}
_connectionManager?.Dispose();
_heartbeatManager?.Dispose();
}
}
}