using IpcLibrary.Core;
using System.IO.Pipes;
namespace IpcLibrary.Connection {
///
/// 单个进程连接
///
internal class ProcessConnection : IDisposable {
private readonly string _processId;
private readonly PipeStream _pipe;
private readonly IMessageSerializer _serializer;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly SemaphoreSlim _sendSemaphore;
private bool _disposed;
public string ProcessId => _processId;
public bool IsConnected => _pipe?.IsConnected == true;
public event EventHandler MessageReceived;
public event EventHandler Disconnected;
public ProcessConnection(string processId, PipeStream pipe, IMessageSerializer serializer) {
_processId = processId ?? throw new ArgumentNullException(nameof(processId));
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_cancellationTokenSource = new CancellationTokenSource();
_sendSemaphore = new SemaphoreSlim(1, 1);
}
public async Task StartAsync(CancellationToken cancellationToken) {
var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken, _cancellationTokenSource.Token).Token;
Task.Run(() => ReceiveMessages(combinedToken), combinedToken);
}
public async Task SendMessageAsync(IPCMessage message) {
if (!IsConnected || _disposed)
return false;
await _sendSemaphore.WaitAsync();
try {
var data = _serializer.Serialize(message);
var lengthBytes = BitConverter.GetBytes(data.Length);
await _pipe.WriteAsync(lengthBytes, 0, lengthBytes.Length);
await _pipe.WriteAsync(data, 0, data.Length);
await _pipe.FlushAsync();
return true;
} catch (Exception ex) {
Console.WriteLine($"发送消息失败: {ex.Message}");
return false;
} finally {
_sendSemaphore.Release();
}
}
public async Task DisconnectAsync() {
if (_disposed)
return;
_cancellationTokenSource.Cancel();
try {
_pipe?.Close();
} catch (Exception ex) {
Console.WriteLine($"关闭管道时发生错误: {ex.Message}");
}
Disconnected?.Invoke(this, EventArgs.Empty);
}
public ProcessInfo GetProcessInfo() {
return new ProcessInfo {
ProcessId = _processId,
Status = IsConnected ? ProcessStatus.Connected : ProcessStatus.Disconnected,
LastHeartbeat = DateTime.UtcNow
};
}
private async Task ReceiveMessages(CancellationToken cancellationToken) {
var lengthBuffer = new byte[4];
while (!cancellationToken.IsCancellationRequested && IsConnected) {
try {
// 读取消息长度
var bytesRead = await _pipe.ReadAsync(lengthBuffer, 0, 4, cancellationToken);
if (bytesRead != 4)
break;
var messageLength = BitConverter.ToInt32(lengthBuffer, 0);
if (messageLength <= 0 || messageLength > 1024 * 1024) // 限制最大消息大小
break;
// 读取消息内容
var messageBuffer = new byte[messageLength];
var totalBytesRead = 0;
while (totalBytesRead < messageLength && !cancellationToken.IsCancellationRequested) {
var bytes = await _pipe.ReadAsync(messageBuffer, totalBytesRead,
messageLength - totalBytesRead, cancellationToken);
if (bytes == 0)
break;
totalBytesRead += bytes;
}
if (totalBytesRead == messageLength) {
var message = _serializer.Deserialize(messageBuffer);
MessageReceived?.Invoke(this, message);
}
} catch (OperationCanceledException) {
break;
} catch (Exception ex) {
Console.WriteLine($"接收消息时发生错误: {ex.Message}");
break;
}
}
await DisconnectAsync();
}
public void Dispose() {
if (_disposed) return;
_disposed = true;
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
}
}
}