using IpcLibrary.Core; using IpcLibrary.Core.Exceptions; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO.Pipes; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace IpcLibrary.Connection { /// /// 基于命名管道的连接管理器 /// public class NamedPipeConnectionManager : IConnectionManager, IDisposable { private readonly ConcurrentDictionary _connections; private readonly IMessageSerializer _serializer; private readonly IPCConfiguration _configuration; private readonly string _serverPipeName; private readonly CancellationTokenSource _cancellationTokenSource; private NamedPipeServerStream _serverPipe; private bool _isServer; private bool _disposed; public event EventHandler ProcessConnected; public event EventHandler ProcessDisconnected; public NamedPipeConnectionManager(IMessageSerializer serializer, IPCConfiguration configuration, string pipeName) { _connections = new ConcurrentDictionary(); _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); _serverPipeName = pipeName ?? throw new ArgumentNullException(nameof(pipeName)); _cancellationTokenSource = new CancellationTokenSource(); } public async Task StartServerAsync() { if (_isServer) return; _isServer = true; Console.WriteLine("123456"); await Task.Run(() => ListenForConnections(_cancellationTokenSource.Token)); } public async Task ConnectToProcessAsync(string processId, string address) { try { if (_connections.ContainsKey(processId)) return true; var clientPipe = new NamedPipeClientStream(".", _serverPipeName, PipeDirection.InOut, PipeOptions.Asynchronous); Console.WriteLine("00000000000"); await clientPipe.ConnectAsync(_configuration.ConnectionTimeout, _cancellationTokenSource.Token); Console.WriteLine("1111111111"); var connection = new ProcessConnection(processId, clientPipe, _serializer); connection.MessageReceived += OnConnectionMessageReceived; connection.Disconnected += OnConnectionDisconnected; if (_connections.TryAdd(processId, connection)) { Console.WriteLine("00000000000"); await connection.StartAsync(_cancellationTokenSource.Token); Console.WriteLine("000001000000"); var processInfo = new ProcessInfo { ProcessId = processId, Status = ProcessStatus.Connected, LastHeartbeat = DateTime.UtcNow }; ProcessConnected?.Invoke(this, processInfo); return true; } return false; } catch (Exception ex) { throw new ConnectionException($"连接到进程 {processId} 失败: {ex.Message}", ex); } } public async Task DisconnectFromProcessAsync(string processId) { if (_connections.TryRemove(processId, out var connection)) { await connection.DisconnectAsync(); var processInfo = new ProcessInfo { ProcessId = processId, Status = ProcessStatus.Disconnected, LastHeartbeat = DateTime.UtcNow }; ProcessDisconnected?.Invoke(this, processInfo); } } public async Task SendMessageAsync(string targetProcessId, IPCMessage message) { if (_connections.TryGetValue(targetProcessId, out var connection)) { return await connection.SendMessageAsync(message); } return false; } public async Task BroadcastMessageAsync(IPCMessage message) { var tasks = _connections.Values.Select(conn => conn.SendMessageAsync(message)); await Task.WhenAll(tasks); } public ProcessInfo GetProcessInfo(string processId) { if (_connections.TryGetValue(processId, out var connection)) { return connection.GetProcessInfo(); } return null; } public IReadOnlyList GetAllProcesses() { return _connections.Values.Select(conn => conn.GetProcessInfo()).ToList(); } private async Task ListenForConnections(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { _serverPipe = new NamedPipeServerStream(_serverPipeName, PipeDirection.InOut, _configuration.MaxConcurrentConnections, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); await _serverPipe.WaitForConnectionAsync(cancellationToken); var processId = Guid.NewGuid().ToString(); var connection = new ProcessConnection(processId, _serverPipe, _serializer); connection.MessageReceived += OnConnectionMessageReceived; connection.Disconnected += OnConnectionDisconnected; if (_connections.TryAdd(processId, connection)) { Console.WriteLine(123); await connection.StartAsync(cancellationToken); Console.WriteLine(254); var processInfo = new ProcessInfo { ProcessId = processId, Status = ProcessStatus.Connected, LastHeartbeat = DateTime.UtcNow }; ProcessConnected?.Invoke(this, processInfo); } } catch (OperationCanceledException) { break; } catch (Exception ex) { // 记录错误,继续监听 Console.WriteLine($"监听连接时发生错误: {ex.Message}"); await Task.Delay(1000, cancellationToken); } } } private void OnConnectionMessageReceived(object sender, IPCMessage message) { // 消息将由上层处理 } private async void OnConnectionDisconnected(object sender, EventArgs e) { if (sender is ProcessConnection connection) { var processId = connection.ProcessId; _connections.TryRemove(processId, out _); var processInfo = new ProcessInfo { ProcessId = processId, Status = ProcessStatus.Disconnected, LastHeartbeat = DateTime.UtcNow }; ProcessDisconnected?.Invoke(this, processInfo); // 如果启用自动重连 if (_configuration.EnableAutoReconnect && !_disposed) { await TryReconnectAsync(processId); } } } private async Task TryReconnectAsync(string processId) { for (int attempt = 0; attempt < _configuration.MaxRetryAttempts; attempt++) { try { await Task.Delay(_configuration.RetryDelay); if (await ConnectToProcessAsync(processId, _serverPipeName)) { break; } } catch (Exception ex) { Console.WriteLine($"重连尝试 {attempt + 1} 失败: {ex.Message}"); } } } public void Dispose() { if (_disposed) return; _disposed = true; _cancellationTokenSource.Cancel(); _cancellationTokenSource?.Dispose(); } } }