using IpcLibrary.Core;
using IpcLibrary.Core.Exceptions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IpcLibrary.Connection {
///
/// 基于命名管道的连接管理器
///
public class NamedPipeConnectionManager : IConnectionManager, IDisposable {
private readonly ConcurrentDictionary _connections;
private readonly IMessageSerializer _serializer;
private readonly IPCConfiguration _configuration;
private readonly string _serverPipeName;
private readonly CancellationTokenSource _cancellationTokenSource;
private NamedPipeServerStream _serverPipe;
private bool _isServer;
private bool _disposed;
public event EventHandler ProcessConnected;
public event EventHandler ProcessDisconnected;
public NamedPipeConnectionManager(IMessageSerializer serializer, IPCConfiguration configuration, string pipeName) {
_connections = new ConcurrentDictionary();
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_serverPipeName = pipeName ?? throw new ArgumentNullException(nameof(pipeName));
_cancellationTokenSource = new CancellationTokenSource();
}
public async Task StartServerAsync() {
if (_isServer)
return;
_isServer = true;
Console.WriteLine("123456");
await Task.Run(() => ListenForConnections(_cancellationTokenSource.Token));
}
public async Task ConnectToProcessAsync(string processId, string address) {
try {
if (_connections.ContainsKey(processId))
return true;
var clientPipe = new NamedPipeClientStream(".", _serverPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
Console.WriteLine("00000000000");
await clientPipe.ConnectAsync(_configuration.ConnectionTimeout, _cancellationTokenSource.Token);
Console.WriteLine("1111111111");
var connection = new ProcessConnection(processId, clientPipe, _serializer);
connection.MessageReceived += OnConnectionMessageReceived;
connection.Disconnected += OnConnectionDisconnected;
if (_connections.TryAdd(processId, connection)) {
Console.WriteLine("00000000000");
await connection.StartAsync(_cancellationTokenSource.Token);
Console.WriteLine("000001000000");
var processInfo = new ProcessInfo {
ProcessId = processId,
Status = ProcessStatus.Connected,
LastHeartbeat = DateTime.UtcNow
};
ProcessConnected?.Invoke(this, processInfo);
return true;
}
return false;
} catch (Exception ex) {
throw new ConnectionException($"连接到进程 {processId} 失败: {ex.Message}", ex);
}
}
public async Task DisconnectFromProcessAsync(string processId) {
if (_connections.TryRemove(processId, out var connection)) {
await connection.DisconnectAsync();
var processInfo = new ProcessInfo {
ProcessId = processId,
Status = ProcessStatus.Disconnected,
LastHeartbeat = DateTime.UtcNow
};
ProcessDisconnected?.Invoke(this, processInfo);
}
}
public async Task SendMessageAsync(string targetProcessId, IPCMessage message) {
if (_connections.TryGetValue(targetProcessId, out var connection)) {
return await connection.SendMessageAsync(message);
}
return false;
}
public async Task BroadcastMessageAsync(IPCMessage message) {
var tasks = _connections.Values.Select(conn => conn.SendMessageAsync(message));
await Task.WhenAll(tasks);
}
public ProcessInfo GetProcessInfo(string processId) {
if (_connections.TryGetValue(processId, out var connection)) {
return connection.GetProcessInfo();
}
return null;
}
public IReadOnlyList GetAllProcesses() {
return _connections.Values.Select(conn => conn.GetProcessInfo()).ToList();
}
private async Task ListenForConnections(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
try {
_serverPipe = new NamedPipeServerStream(_serverPipeName, PipeDirection.InOut,
_configuration.MaxConcurrentConnections, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
await _serverPipe.WaitForConnectionAsync(cancellationToken);
var processId = Guid.NewGuid().ToString();
var connection = new ProcessConnection(processId, _serverPipe, _serializer);
connection.MessageReceived += OnConnectionMessageReceived;
connection.Disconnected += OnConnectionDisconnected;
if (_connections.TryAdd(processId, connection)) {
Console.WriteLine(123);
await connection.StartAsync(cancellationToken);
Console.WriteLine(254);
var processInfo = new ProcessInfo {
ProcessId = processId,
Status = ProcessStatus.Connected,
LastHeartbeat = DateTime.UtcNow
};
ProcessConnected?.Invoke(this, processInfo);
}
} catch (OperationCanceledException) {
break;
} catch (Exception ex) {
// 记录错误,继续监听
Console.WriteLine($"监听连接时发生错误: {ex.Message}");
await Task.Delay(1000, cancellationToken);
}
}
}
private void OnConnectionMessageReceived(object sender, IPCMessage message) {
// 消息将由上层处理
}
private async void OnConnectionDisconnected(object sender, EventArgs e) {
if (sender is ProcessConnection connection) {
var processId = connection.ProcessId;
_connections.TryRemove(processId, out _);
var processInfo = new ProcessInfo {
ProcessId = processId,
Status = ProcessStatus.Disconnected,
LastHeartbeat = DateTime.UtcNow
};
ProcessDisconnected?.Invoke(this, processInfo);
// 如果启用自动重连
if (_configuration.EnableAutoReconnect && !_disposed) {
await TryReconnectAsync(processId);
}
}
}
private async Task TryReconnectAsync(string processId) {
for (int attempt = 0; attempt < _configuration.MaxRetryAttempts; attempt++) {
try {
await Task.Delay(_configuration.RetryDelay);
if (await ConnectToProcessAsync(processId, _serverPipeName)) {
break;
}
} catch (Exception ex) {
Console.WriteLine($"重连尝试 {attempt + 1} 失败: {ex.Message}");
}
}
}
public void Dispose() {
if (_disposed)
return;
_disposed = true;
_cancellationTokenSource.Cancel();
_cancellationTokenSource?.Dispose();
}
}
}