using IpcLibrary.Core; using System.IO.Pipes; namespace IpcLibrary.Connection { /// /// 单个进程连接 /// internal class ProcessConnection : IDisposable { private readonly string _processId; private readonly PipeStream _pipe; private readonly IMessageSerializer _serializer; private readonly CancellationTokenSource _cancellationTokenSource; private readonly SemaphoreSlim _sendSemaphore; private bool _disposed; public string ProcessId => _processId; public bool IsConnected => _pipe?.IsConnected == true; public event EventHandler MessageReceived; public event EventHandler Disconnected; public ProcessConnection(string processId, PipeStream pipe, IMessageSerializer serializer) { _processId = processId ?? throw new ArgumentNullException(nameof(processId)); _pipe = pipe ?? throw new ArgumentNullException(nameof(pipe)); _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); _cancellationTokenSource = new CancellationTokenSource(); _sendSemaphore = new SemaphoreSlim(1, 1); } public async Task StartAsync(CancellationToken cancellationToken) { var combinedToken = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken, _cancellationTokenSource.Token).Token; Task.Run(() => ReceiveMessages(combinedToken), combinedToken); } public async Task SendMessageAsync(IPCMessage message) { if (!IsConnected || _disposed) return false; await _sendSemaphore.WaitAsync(); try { var data = _serializer.Serialize(message); var lengthBytes = BitConverter.GetBytes(data.Length); await _pipe.WriteAsync(lengthBytes, 0, lengthBytes.Length); await _pipe.WriteAsync(data, 0, data.Length); await _pipe.FlushAsync(); return true; } catch (Exception ex) { Console.WriteLine($"发送消息失败: {ex.Message}"); return false; } finally { _sendSemaphore.Release(); } } public async Task DisconnectAsync() { if (_disposed) return; _cancellationTokenSource.Cancel(); try { _pipe?.Close(); } catch (Exception ex) { Console.WriteLine($"关闭管道时发生错误: {ex.Message}"); } Disconnected?.Invoke(this, EventArgs.Empty); } public ProcessInfo GetProcessInfo() { return new ProcessInfo { ProcessId = _processId, Status = IsConnected ? ProcessStatus.Connected : ProcessStatus.Disconnected, LastHeartbeat = DateTime.UtcNow }; } private async Task ReceiveMessages(CancellationToken cancellationToken) { var lengthBuffer = new byte[4]; while (!cancellationToken.IsCancellationRequested && IsConnected) { try { // 读取消息长度 var bytesRead = await _pipe.ReadAsync(lengthBuffer, 0, 4, cancellationToken); if (bytesRead != 4) break; var messageLength = BitConverter.ToInt32(lengthBuffer, 0); if (messageLength <= 0 || messageLength > 1024 * 1024) // 限制最大消息大小 break; // 读取消息内容 var messageBuffer = new byte[messageLength]; var totalBytesRead = 0; while (totalBytesRead < messageLength && !cancellationToken.IsCancellationRequested) { var bytes = await _pipe.ReadAsync(messageBuffer, totalBytesRead, messageLength - totalBytesRead, cancellationToken); if (bytes == 0) break; totalBytesRead += bytes; } if (totalBytesRead == messageLength) { var message = _serializer.Deserialize(messageBuffer); MessageReceived?.Invoke(this, message); } } catch (OperationCanceledException) { break; } catch (Exception ex) { Console.WriteLine($"接收消息时发生错误: {ex.Message}"); break; } } await DisconnectAsync(); } public void Dispose() { if (_disposed) return; _disposed = true; _cancellationTokenSource.Cancel(); _cancellationTokenSource.Dispose(); } } }