133 lines
4.9 KiB
C#
133 lines
4.9 KiB
C#
using IpcLibrary.Core;
|
|
using System.IO.Pipes;
|
|
|
|
namespace IpcLibrary.Connection {
|
|
/// <summary>
|
|
/// 单个进程连接
|
|
/// </summary>
|
|
internal class ProcessConnection : IDisposable {
|
|
private readonly string _processId;
|
|
private readonly PipeStream _pipe;
|
|
private readonly IMessageSerializer _serializer;
|
|
private readonly CancellationTokenSource _cancellationTokenSource;
|
|
private readonly SemaphoreSlim _sendSemaphore;
|
|
private bool _disposed;
|
|
|
|
public string ProcessId => _processId;
|
|
public bool IsConnected => _pipe?.IsConnected == true;
|
|
|
|
public event EventHandler<IPCMessage> MessageReceived;
|
|
public event EventHandler Disconnected;
|
|
|
|
public ProcessConnection(string processId, PipeStream pipe, IMessageSerializer serializer) {
|
|
_processId = processId ?? throw new ArgumentNullException(nameof(processId));
|
|
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
|
|
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
|
|
_cancellationTokenSource = new CancellationTokenSource();
|
|
_sendSemaphore = new SemaphoreSlim(1, 1);
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken) {
|
|
var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(
|
|
cancellationToken, _cancellationTokenSource.Token).Token;
|
|
|
|
Task.Run(() => ReceiveMessages(combinedToken), combinedToken);
|
|
}
|
|
|
|
public async Task<bool> SendMessageAsync(IPCMessage message) {
|
|
if (!IsConnected || _disposed)
|
|
return false;
|
|
|
|
await _sendSemaphore.WaitAsync();
|
|
try {
|
|
var data = _serializer.Serialize(message);
|
|
var lengthBytes = BitConverter.GetBytes(data.Length);
|
|
|
|
await _pipe.WriteAsync(lengthBytes, 0, lengthBytes.Length);
|
|
await _pipe.WriteAsync(data, 0, data.Length);
|
|
await _pipe.FlushAsync();
|
|
|
|
return true;
|
|
} catch (Exception ex) {
|
|
Console.WriteLine($"发送消息失败: {ex.Message}");
|
|
return false;
|
|
} finally {
|
|
_sendSemaphore.Release();
|
|
}
|
|
}
|
|
|
|
public async Task DisconnectAsync() {
|
|
if (_disposed)
|
|
return;
|
|
|
|
_cancellationTokenSource.Cancel();
|
|
|
|
try {
|
|
_pipe?.Close();
|
|
} catch (Exception ex) {
|
|
Console.WriteLine($"关闭管道时发生错误: {ex.Message}");
|
|
}
|
|
|
|
Disconnected?.Invoke(this, EventArgs.Empty);
|
|
}
|
|
|
|
public ProcessInfo GetProcessInfo() {
|
|
return new ProcessInfo {
|
|
ProcessId = _processId,
|
|
Status = IsConnected ? ProcessStatus.Connected : ProcessStatus.Disconnected,
|
|
LastHeartbeat = DateTime.UtcNow
|
|
};
|
|
}
|
|
|
|
private async Task ReceiveMessages(CancellationToken cancellationToken) {
|
|
var lengthBuffer = new byte[4];
|
|
|
|
while (!cancellationToken.IsCancellationRequested && IsConnected) {
|
|
try {
|
|
// 读取消息长度
|
|
var bytesRead = await _pipe.ReadAsync(lengthBuffer, 0, 4, cancellationToken);
|
|
if (bytesRead != 4)
|
|
break;
|
|
|
|
var messageLength = BitConverter.ToInt32(lengthBuffer, 0);
|
|
if (messageLength <= 0 || messageLength > 1024 * 1024) // 限制最大消息大小
|
|
break;
|
|
|
|
// 读取消息内容
|
|
var messageBuffer = new byte[messageLength];
|
|
var totalBytesRead = 0;
|
|
|
|
while (totalBytesRead < messageLength && !cancellationToken.IsCancellationRequested) {
|
|
var bytes = await _pipe.ReadAsync(messageBuffer, totalBytesRead,
|
|
messageLength - totalBytesRead, cancellationToken);
|
|
|
|
if (bytes == 0)
|
|
break;
|
|
|
|
totalBytesRead += bytes;
|
|
}
|
|
|
|
if (totalBytesRead == messageLength) {
|
|
var message = _serializer.Deserialize<IPCMessage>(messageBuffer);
|
|
MessageReceived?.Invoke(this, message);
|
|
}
|
|
} catch (OperationCanceledException) {
|
|
break;
|
|
} catch (Exception ex) {
|
|
Console.WriteLine($"接收消息时发生错误: {ex.Message}");
|
|
break;
|
|
}
|
|
}
|
|
|
|
await DisconnectAsync();
|
|
}
|
|
|
|
public void Dispose() {
|
|
if (_disposed) return;
|
|
|
|
_disposed = true;
|
|
_cancellationTokenSource.Cancel();
|
|
_cancellationTokenSource.Dispose();
|
|
}
|
|
}
|
|
} |