using IpcLibrary.Connection; using IpcLibrary.Core; using IpcLibrary.Core.Exceptions; using IpcLibrary.Serialization; using IpcLibrary.Services; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace IpcLibrary.Server { /// /// IPC服务器实现 /// public class IPCServer : IIPCServer { private readonly NamedPipeConnectionManager _connectionManager; private readonly IServiceRegistry _serviceRegistry; private readonly HeartbeatManager _heartbeatManager; private readonly MessageRouter _messageRouter; private readonly IPCConfiguration _configuration; private readonly string _serverId; private bool _disposed; public bool IsRunning { get; private set; } public IReadOnlyList ConnectedProcesses => _connectionManager.GetAllProcesses(); public DateTime StartTime { get; internal set; } public event EventHandler ProcessConnected; public event EventHandler ProcessDisconnected; public event EventHandler MessageReceived; public IPCServer(IPCConfiguration configuration = null) { _configuration = configuration ?? new IPCConfiguration(); _serverId = $"Server_{Environment.ProcessId}_{Guid.NewGuid():N}"; var serializer = new CompositeMessageSerializer(); _connectionManager = new NamedPipeConnectionManager(serializer, _configuration, "IPCPipe"); _serviceRegistry = new ServiceRegistry(); _heartbeatManager = new HeartbeatManager(); var methodCallHandler = new MethodCallHandler(_serviceRegistry); _messageRouter = new MessageRouter(methodCallHandler); InitializeEventHandlers(); StartTime = DateTime.UtcNow; } public async Task StartAsync(string address) { if (IsRunning) return; try { Console.WriteLine($"IPC服务器已启动,地址: {address}"); if (_connectionManager is NamedPipeConnectionManager namedPipeManager) { await namedPipeManager.StartServerAsync(); } IsRunning = true; if (_configuration.EnableHeartbeat) { _heartbeatManager.SetHeartbeatInterval(_configuration.HeartbeatInterval); _heartbeatManager.SetTimeoutThreshold(_configuration.ConnectionTimeout); } Console.WriteLine($"IPC服务器已启动,地址: {address}"); } catch (Exception ex) { throw new ConnectionException($"启动服务器失败: {ex.Message}", ex); } } public async Task StopAsync() { if (!IsRunning) return; try { // 通知所有连接的进程服务器即将关闭 var shutdownMessage = new IPCMessage { Type = MessageType.Notification, Method = "ServerShutdown", SourceProcessId = _serverId }; await _connectionManager.BroadcastMessageAsync(shutdownMessage); // 断开所有连接 var processes = ConnectedProcesses.ToList(); foreach (var process in processes) { await _connectionManager.DisconnectFromProcessAsync(process.ProcessId); } IsRunning = false; Console.WriteLine("IPC服务器已停止"); } catch (Exception ex) { throw new ConnectionException($"停止服务器失败: {ex.Message}", ex); } } public async Task CallMethodAsync(string targetProcessId, string serviceName, string methodName, params object[] parameters) { var result = await CallMethodAsync(targetProcessId, serviceName, methodName, typeof(T), parameters); return result is T typedResult ? typedResult : default(T); } public async Task CallMethodAsync(string targetProcessId, string serviceName, string methodName, Type returnType, params object[] parameters) { if (!IsRunning) throw new InvalidOperationException("服务器未运行"); try { var request = new MethodCallRequest { ServiceName = serviceName, MethodName = methodName, Parameters = parameters, ParameterTypes = parameters != null ? Array.ConvertAll(parameters, p => p?.GetType()) : new Type[0] }; var message = new IPCMessage { Type = MessageType.Request, Method = "CallMethod", Parameters = new object[] { request }, SourceProcessId = _serverId, TargetProcessId = targetProcessId }; var response = await _messageRouter.SendRequestAsync( message, msg => _connectionManager.SendMessageAsync(targetProcessId, msg), _configuration.MethodCallTimeout); if (response.Type == MessageType.Error) { throw new MethodCallException(response.Error); } if (response.Result is MethodCallResponse methodResponse) { if (!methodResponse.IsSuccess) { throw new MethodCallException("方法调用失败", methodResponse.Exception); } return methodResponse.Result; } return response.Result; } catch (Exception ex) when (!(ex is IPCException)) { throw new MethodCallException($"调用方法失败: {ex.Message}", ex); } } public async Task SendNotificationAsync(string targetProcessId, string method, params object[] parameters) { if (!IsRunning) throw new InvalidOperationException("服务器未运行"); var message = new IPCMessage { Type = MessageType.Notification, Method = method, Parameters = parameters, SourceProcessId = _serverId, TargetProcessId = targetProcessId }; await _connectionManager.SendMessageAsync(targetProcessId, message); } public async Task BroadcastNotificationAsync(string method, params object[] parameters) { if (!IsRunning) throw new InvalidOperationException("服务器未运行"); var message = new IPCMessage { Type = MessageType.Notification, Method = method, Parameters = parameters, SourceProcessId = _serverId }; await _connectionManager.BroadcastMessageAsync(message); } public async Task RegisterServiceAsync(string serviceName, T serviceInstance) { await _serviceRegistry.RegisterServiceAsync(serviceName, serviceInstance); } public async Task UnregisterServiceAsync(string serviceName) { await _serviceRegistry.UnregisterServiceAsync(serviceName); } private void InitializeEventHandlers() { _connectionManager.ProcessConnected += OnProcessConnected; _connectionManager.ProcessDisconnected += OnProcessDisconnected; _heartbeatManager.ProcessTimeout += async (sender, processId) => { Console.WriteLine($"进程 {processId} 心跳超时,断开连接"); await _connectionManager.DisconnectFromProcessAsync(processId); }; } private void OnProcessConnected(object sender, ProcessInfo processInfo) { Console.WriteLine($"进程已连接: {processInfo.ProcessId}"); if (_configuration.EnableHeartbeat) { _heartbeatManager.StartMonitoring(processInfo.ProcessId); } ProcessConnected?.Invoke(this, processInfo); } private void OnProcessDisconnected(object sender, ProcessInfo processInfo) { Console.WriteLine($"进程已断开: {processInfo.ProcessId}"); _heartbeatManager.StopMonitoring(processInfo.ProcessId); ProcessDisconnected?.Invoke(this, processInfo); } private async Task HandleIncomingMessage(IPCMessage message) { try { // 更新心跳 if (_configuration.EnableHeartbeat && message.Type == MessageType.Heartbeat) { _heartbeatManager.UpdateHeartbeat(message.SourceProcessId); return; } // 处理消息 var response = await _messageRouter.HandleMessageAsync(message); if (response != null) { await _connectionManager.SendMessageAsync(message.SourceProcessId, response); } MessageReceived?.Invoke(this, message); } catch (Exception ex) { Console.WriteLine($"处理消息时发生错误: {ex.Message}"); // 发送错误响应 var errorResponse = new IPCMessage { Id = message.Id, Type = MessageType.Error, Error = ex.Message, SourceProcessId = _serverId, TargetProcessId = message.SourceProcessId }; await _connectionManager.SendMessageAsync(message.SourceProcessId, errorResponse); } } public void Dispose() { if (_disposed) return; _disposed = true; try { StopAsync().Wait(TimeSpan.FromSeconds(10)); } catch (Exception ex) { Console.WriteLine($"释放资源时发生错误: {ex.Message}"); } _connectionManager?.Dispose(); _heartbeatManager?.Dispose(); } } }