using IpcLibrary.Connection; using IpcLibrary.Core; using IpcLibrary.Core.Exceptions; using IpcLibrary.Serialization; using IpcLibrary.Services; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace IpcLibrary.Client { /// /// IPC客户端实现 /// public class IPCClient : IIPCClient { private readonly NamedPipeConnectionManager _connectionManager; private readonly IServiceRegistry _serviceRegistry; private readonly HeartbeatManager _heartbeatManager; private readonly MessageRouter _messageRouter; private readonly IPCConfiguration _configuration; private readonly string _processId; private bool _disposed; public string ProcessId => _processId; public bool IsConnected { get; private set; } public event EventHandler ProcessConnected; public event EventHandler ProcessDisconnected; public event EventHandler MessageReceived; public IPCClient(IPCConfiguration configuration = null) { _configuration = configuration ?? new IPCConfiguration(); _processId = $"Client_{Environment.ProcessId}_{Guid.NewGuid():N}"; var serializer = new CompositeMessageSerializer(); _connectionManager = new NamedPipeConnectionManager(serializer, _configuration, "IPCPipe"); _serviceRegistry = new ServiceRegistry(); _heartbeatManager = new HeartbeatManager(); var methodCallHandler = new MethodCallHandler(_serviceRegistry); _messageRouter = new MessageRouter(methodCallHandler); InitializeEventHandlers(); } public async Task ConnectAsync(string serverAddress) { try { var success = await _connectionManager.ConnectToProcessAsync(_processId, serverAddress); if (success) { IsConnected = true; if (_configuration.EnableHeartbeat) { _heartbeatManager.StartMonitoring(_processId); _ = Task.Run(SendHeartbeats); } } return success; } catch (Exception ex) { throw new ConnectionException($"连接失败: {ex.Message}", ex); } } public async Task DisconnectAsync() { if (!IsConnected) return; try { _heartbeatManager.StopMonitoring(_processId); await _connectionManager.DisconnectFromProcessAsync(_processId); IsConnected = false; } catch (Exception ex) { throw new ConnectionException($"断开连接失败: {ex.Message}", ex); } } public async Task CallMethodAsync(string targetProcessId, string serviceName, string methodName, params object[] parameters) { var result = await CallMethodAsync(targetProcessId, serviceName, methodName, typeof(T), parameters); return result is T typedResult ? typedResult : default(T); } public async Task CallMethodAsync(string targetProcessId, string serviceName, string methodName, Type returnType, params object[] parameters) { if (!IsConnected) throw new ConnectionException("客户端未连接"); try { var request = new MethodCallRequest { ServiceName = serviceName, MethodName = methodName, Parameters = parameters, ParameterTypes = parameters != null ? Array.ConvertAll(parameters, p => p?.GetType()) : new Type[0] }; var message = new IPCMessage { Type = MessageType.Request, Method = "CallMethod", Parameters = new object[] { request }, SourceProcessId = _processId, TargetProcessId = targetProcessId }; var response = await _messageRouter.SendRequestAsync( message, msg => _connectionManager.SendMessageAsync(targetProcessId, msg), _configuration.MethodCallTimeout); if (response.Type == MessageType.Error) { throw new MethodCallException(response.Error); } if (response.Result is MethodCallResponse methodResponse) { if (!methodResponse.IsSuccess) { throw new MethodCallException("方法调用失败", methodResponse.Exception); } return methodResponse.Result; } return response.Result; } catch (Exception ex) when (!(ex is IPCException)) { throw new MethodCallException($"调用方法失败: {ex.Message}", ex); } } public async Task SendNotificationAsync(string targetProcessId, string method, params object[] parameters) { if (!IsConnected) throw new ConnectionException("客户端未连接"); var message = new IPCMessage { Type = MessageType.Notification, Method = method, Parameters = parameters, SourceProcessId = _processId, TargetProcessId = targetProcessId }; await _connectionManager.SendMessageAsync(targetProcessId, message); } public async Task RegisterServiceAsync(string serviceName, T serviceInstance) { await _serviceRegistry.RegisterServiceAsync(serviceName, serviceInstance); } public async Task UnregisterServiceAsync(string serviceName) { await _serviceRegistry.UnregisterServiceAsync(serviceName); } private void InitializeEventHandlers() { _connectionManager.ProcessConnected += (sender, info) => ProcessConnected?.Invoke(this, info); _connectionManager.ProcessDisconnected += (sender, info) => ProcessDisconnected?.Invoke(this, info); _heartbeatManager.ProcessTimeout += async (sender, processId) => { if (processId == _processId) { IsConnected = false; if (_configuration.EnableAutoReconnect) { await TryReconnectAsync(); } } }; } private async Task SendHeartbeats() { while (IsConnected && !_disposed) { try { var heartbeatMessage = new IPCMessage { Type = MessageType.Heartbeat, SourceProcessId = _processId }; await _connectionManager.BroadcastMessageAsync(heartbeatMessage); await Task.Delay(_configuration.HeartbeatInterval); } catch (Exception ex) { Console.WriteLine($"发送心跳失败: {ex.Message}"); break; } } } private async Task TryReconnectAsync() { for (int attempt = 0; attempt < _configuration.MaxRetryAttempts; attempt++) { try { await Task.Delay(_configuration.RetryDelay); if (await ConnectAsync("default")) { break; } } catch (Exception ex) { Console.WriteLine($"重连尝试 {attempt + 1} 失败: {ex.Message}"); } } } public void Dispose() { if (_disposed) return; _disposed = true; try { DisconnectAsync().Wait(TimeSpan.FromSeconds(5)); } catch (Exception ex) { Console.WriteLine($"释放资源时发生错误: {ex.Message}"); } _connectionManager?.Dispose(); _heartbeatManager?.Dispose(); } } }