199 lines
8.1 KiB
C#
199 lines
8.1 KiB
C#
using IpcLibrary.Core;
|
|
using IpcLibrary.Core.Exceptions;
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.IO.Pipes;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace IpcLibrary.Connection {
|
|
/// <summary>
|
|
/// 基于命名管道的连接管理器
|
|
/// </summary>
|
|
public class NamedPipeConnectionManager : IConnectionManager, IDisposable {
|
|
private readonly ConcurrentDictionary<string, ProcessConnection> _connections;
|
|
private readonly IMessageSerializer _serializer;
|
|
private readonly IPCConfiguration _configuration;
|
|
private readonly string _serverPipeName;
|
|
private readonly CancellationTokenSource _cancellationTokenSource;
|
|
private NamedPipeServerStream _serverPipe;
|
|
private bool _isServer;
|
|
private bool _disposed;
|
|
|
|
public event EventHandler<ProcessInfo> ProcessConnected;
|
|
public event EventHandler<ProcessInfo> ProcessDisconnected;
|
|
|
|
public NamedPipeConnectionManager(IMessageSerializer serializer, IPCConfiguration configuration, string pipeName) {
|
|
_connections = new ConcurrentDictionary<string, ProcessConnection>();
|
|
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
|
|
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
|
|
_serverPipeName = pipeName ?? throw new ArgumentNullException(nameof(pipeName));
|
|
_cancellationTokenSource = new CancellationTokenSource();
|
|
}
|
|
|
|
public async Task StartServerAsync() {
|
|
if (_isServer)
|
|
return;
|
|
|
|
_isServer = true;
|
|
Console.WriteLine("123456");
|
|
await Task.Run(() => ListenForConnections(_cancellationTokenSource.Token));
|
|
}
|
|
|
|
public async Task<bool> ConnectToProcessAsync(string processId, string address) {
|
|
try {
|
|
if (_connections.ContainsKey(processId))
|
|
return true;
|
|
|
|
var clientPipe = new NamedPipeClientStream(".", _serverPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
|
|
Console.WriteLine("00000000000");
|
|
await clientPipe.ConnectAsync(_configuration.ConnectionTimeout, _cancellationTokenSource.Token);
|
|
Console.WriteLine("1111111111");
|
|
var connection = new ProcessConnection(processId, clientPipe, _serializer);
|
|
connection.MessageReceived += OnConnectionMessageReceived;
|
|
connection.Disconnected += OnConnectionDisconnected;
|
|
|
|
if (_connections.TryAdd(processId, connection)) {
|
|
Console.WriteLine("00000000000");
|
|
await connection.StartAsync(_cancellationTokenSource.Token);
|
|
Console.WriteLine("000001000000");
|
|
var processInfo = new ProcessInfo {
|
|
ProcessId = processId,
|
|
Status = ProcessStatus.Connected,
|
|
LastHeartbeat = DateTime.UtcNow
|
|
};
|
|
|
|
ProcessConnected?.Invoke(this, processInfo);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
} catch (Exception ex) {
|
|
throw new ConnectionException($"连接到进程 {processId} 失败: {ex.Message}", ex);
|
|
}
|
|
}
|
|
|
|
public async Task DisconnectFromProcessAsync(string processId) {
|
|
if (_connections.TryRemove(processId, out var connection)) {
|
|
await connection.DisconnectAsync();
|
|
|
|
var processInfo = new ProcessInfo {
|
|
ProcessId = processId,
|
|
Status = ProcessStatus.Disconnected,
|
|
LastHeartbeat = DateTime.UtcNow
|
|
};
|
|
|
|
ProcessDisconnected?.Invoke(this, processInfo);
|
|
}
|
|
}
|
|
|
|
public async Task<bool> SendMessageAsync(string targetProcessId, IPCMessage message) {
|
|
if (_connections.TryGetValue(targetProcessId, out var connection)) {
|
|
return await connection.SendMessageAsync(message);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
public async Task BroadcastMessageAsync(IPCMessage message) {
|
|
var tasks = _connections.Values.Select(conn => conn.SendMessageAsync(message));
|
|
await Task.WhenAll(tasks);
|
|
}
|
|
|
|
public ProcessInfo GetProcessInfo(string processId) {
|
|
if (_connections.TryGetValue(processId, out var connection)) {
|
|
return connection.GetProcessInfo();
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
public IReadOnlyList<ProcessInfo> GetAllProcesses() {
|
|
return _connections.Values.Select(conn => conn.GetProcessInfo()).ToList();
|
|
}
|
|
|
|
private async Task ListenForConnections(CancellationToken cancellationToken) {
|
|
while (!cancellationToken.IsCancellationRequested) {
|
|
try {
|
|
_serverPipe = new NamedPipeServerStream(_serverPipeName, PipeDirection.InOut,
|
|
_configuration.MaxConcurrentConnections, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
|
|
await _serverPipe.WaitForConnectionAsync(cancellationToken);
|
|
var processId = Guid.NewGuid().ToString();
|
|
var connection = new ProcessConnection(processId, _serverPipe, _serializer);
|
|
connection.MessageReceived += OnConnectionMessageReceived;
|
|
connection.Disconnected += OnConnectionDisconnected;
|
|
|
|
if (_connections.TryAdd(processId, connection)) {
|
|
Console.WriteLine(123);
|
|
await connection.StartAsync(cancellationToken);
|
|
Console.WriteLine(254);
|
|
var processInfo = new ProcessInfo {
|
|
ProcessId = processId,
|
|
Status = ProcessStatus.Connected,
|
|
LastHeartbeat = DateTime.UtcNow
|
|
};
|
|
|
|
ProcessConnected?.Invoke(this, processInfo);
|
|
}
|
|
} catch (OperationCanceledException) {
|
|
break;
|
|
} catch (Exception ex) {
|
|
// 记录错误,继续监听
|
|
Console.WriteLine($"监听连接时发生错误: {ex.Message}");
|
|
await Task.Delay(1000, cancellationToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
private void OnConnectionMessageReceived(object sender, IPCMessage message) {
|
|
// 消息将由上层处理
|
|
}
|
|
|
|
private async void OnConnectionDisconnected(object sender, EventArgs e) {
|
|
if (sender is ProcessConnection connection) {
|
|
var processId = connection.ProcessId;
|
|
_connections.TryRemove(processId, out _);
|
|
|
|
var processInfo = new ProcessInfo {
|
|
ProcessId = processId,
|
|
Status = ProcessStatus.Disconnected,
|
|
LastHeartbeat = DateTime.UtcNow
|
|
};
|
|
|
|
ProcessDisconnected?.Invoke(this, processInfo);
|
|
|
|
// 如果启用自动重连
|
|
if (_configuration.EnableAutoReconnect && !_disposed) {
|
|
await TryReconnectAsync(processId);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task TryReconnectAsync(string processId) {
|
|
for (int attempt = 0; attempt < _configuration.MaxRetryAttempts; attempt++) {
|
|
try {
|
|
await Task.Delay(_configuration.RetryDelay);
|
|
|
|
if (await ConnectToProcessAsync(processId, _serverPipeName)) {
|
|
break;
|
|
}
|
|
} catch (Exception ex) {
|
|
Console.WriteLine($"重连尝试 {attempt + 1} 失败: {ex.Message}");
|
|
}
|
|
}
|
|
}
|
|
|
|
public void Dispose() {
|
|
if (_disposed)
|
|
return;
|
|
_disposed = true;
|
|
_cancellationTokenSource.Cancel();
|
|
_cancellationTokenSource?.Dispose();
|
|
}
|
|
}
|
|
|
|
}
|