259 lines
10 KiB
C#
259 lines
10 KiB
C#
using IpcLibrary.Connection;
|
||
using IpcLibrary.Core;
|
||
using IpcLibrary.Core.Exceptions;
|
||
using IpcLibrary.Serialization;
|
||
using IpcLibrary.Services;
|
||
using System;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace IpcLibrary.Server {
|
||
/// <summary>
|
||
/// IPC服务器实现
|
||
/// </summary>
|
||
public class IPCServer : IIPCServer {
|
||
private readonly NamedPipeConnectionManager _connectionManager;
|
||
private readonly IServiceRegistry _serviceRegistry;
|
||
private readonly HeartbeatManager _heartbeatManager;
|
||
private readonly MessageRouter _messageRouter;
|
||
private readonly IPCConfiguration _configuration;
|
||
private readonly string _serverId;
|
||
private bool _disposed;
|
||
|
||
public bool IsRunning { get; private set; }
|
||
public IReadOnlyList<ProcessInfo> ConnectedProcesses => _connectionManager.GetAllProcesses();
|
||
|
||
public DateTime StartTime { get; internal set; }
|
||
|
||
public event EventHandler<ProcessInfo> ProcessConnected;
|
||
public event EventHandler<ProcessInfo> ProcessDisconnected;
|
||
public event EventHandler<IPCMessage> MessageReceived;
|
||
|
||
public IPCServer(IPCConfiguration configuration = null) {
|
||
_configuration = configuration ?? new IPCConfiguration();
|
||
_serverId = $"Server_{Environment.ProcessId}_{Guid.NewGuid():N}";
|
||
|
||
var serializer = new CompositeMessageSerializer();
|
||
_connectionManager = new NamedPipeConnectionManager(serializer, _configuration, "IPCPipe");
|
||
_serviceRegistry = new ServiceRegistry();
|
||
_heartbeatManager = new HeartbeatManager();
|
||
|
||
var methodCallHandler = new MethodCallHandler(_serviceRegistry);
|
||
_messageRouter = new MessageRouter(methodCallHandler);
|
||
|
||
InitializeEventHandlers();
|
||
StartTime = DateTime.UtcNow;
|
||
}
|
||
|
||
public async Task StartAsync(string address) {
|
||
if (IsRunning)
|
||
return;
|
||
|
||
try {
|
||
Console.WriteLine($"IPC服务器已启动,地址: {address}");
|
||
if (_connectionManager is NamedPipeConnectionManager namedPipeManager) {
|
||
await namedPipeManager.StartServerAsync();
|
||
}
|
||
IsRunning = true;
|
||
|
||
if (_configuration.EnableHeartbeat) {
|
||
_heartbeatManager.SetHeartbeatInterval(_configuration.HeartbeatInterval);
|
||
_heartbeatManager.SetTimeoutThreshold(_configuration.ConnectionTimeout);
|
||
}
|
||
|
||
Console.WriteLine($"IPC服务器已启动,地址: {address}");
|
||
} catch (Exception ex) {
|
||
throw new ConnectionException($"启动服务器失败: {ex.Message}", ex);
|
||
}
|
||
}
|
||
|
||
public async Task StopAsync() {
|
||
if (!IsRunning)
|
||
return;
|
||
|
||
try {
|
||
// 通知所有连接的进程服务器即将关闭
|
||
var shutdownMessage = new IPCMessage {
|
||
Type = MessageType.Notification,
|
||
Method = "ServerShutdown",
|
||
SourceProcessId = _serverId
|
||
};
|
||
|
||
await _connectionManager.BroadcastMessageAsync(shutdownMessage);
|
||
|
||
// 断开所有连接
|
||
var processes = ConnectedProcesses.ToList();
|
||
foreach (var process in processes) {
|
||
await _connectionManager.DisconnectFromProcessAsync(process.ProcessId);
|
||
}
|
||
|
||
IsRunning = false;
|
||
Console.WriteLine("IPC服务器已停止");
|
||
} catch (Exception ex) {
|
||
throw new ConnectionException($"停止服务器失败: {ex.Message}", ex);
|
||
}
|
||
}
|
||
|
||
public async Task<T> CallMethodAsync<T>(string targetProcessId, string serviceName, string methodName, params object[] parameters) {
|
||
var result = await CallMethodAsync(targetProcessId, serviceName, methodName, typeof(T), parameters);
|
||
return result is T typedResult ? typedResult : default(T);
|
||
}
|
||
|
||
public async Task<object> CallMethodAsync(string targetProcessId, string serviceName, string methodName, Type returnType, params object[] parameters) {
|
||
if (!IsRunning)
|
||
throw new InvalidOperationException("服务器未运行");
|
||
|
||
try {
|
||
var request = new MethodCallRequest {
|
||
ServiceName = serviceName,
|
||
MethodName = methodName,
|
||
Parameters = parameters,
|
||
ParameterTypes = parameters != null ? Array.ConvertAll(parameters, p => p?.GetType()) : new Type[0]
|
||
};
|
||
|
||
var message = new IPCMessage {
|
||
Type = MessageType.Request,
|
||
Method = "CallMethod",
|
||
Parameters = new object[] { request },
|
||
SourceProcessId = _serverId,
|
||
TargetProcessId = targetProcessId
|
||
};
|
||
|
||
var response = await _messageRouter.SendRequestAsync(
|
||
message,
|
||
msg => _connectionManager.SendMessageAsync(targetProcessId, msg),
|
||
_configuration.MethodCallTimeout);
|
||
|
||
if (response.Type == MessageType.Error) {
|
||
throw new MethodCallException(response.Error);
|
||
}
|
||
|
||
if (response.Result is MethodCallResponse methodResponse) {
|
||
if (!methodResponse.IsSuccess) {
|
||
throw new MethodCallException("方法调用失败", methodResponse.Exception);
|
||
}
|
||
|
||
return methodResponse.Result;
|
||
}
|
||
|
||
return response.Result;
|
||
} catch (Exception ex) when (!(ex is IPCException)) {
|
||
throw new MethodCallException($"调用方法失败: {ex.Message}", ex);
|
||
}
|
||
}
|
||
|
||
public async Task SendNotificationAsync(string targetProcessId, string method, params object[] parameters) {
|
||
if (!IsRunning)
|
||
throw new InvalidOperationException("服务器未运行");
|
||
|
||
var message = new IPCMessage {
|
||
Type = MessageType.Notification,
|
||
Method = method,
|
||
Parameters = parameters,
|
||
SourceProcessId = _serverId,
|
||
TargetProcessId = targetProcessId
|
||
};
|
||
|
||
await _connectionManager.SendMessageAsync(targetProcessId, message);
|
||
}
|
||
|
||
public async Task BroadcastNotificationAsync(string method, params object[] parameters) {
|
||
if (!IsRunning)
|
||
throw new InvalidOperationException("服务器未运行");
|
||
|
||
var message = new IPCMessage {
|
||
Type = MessageType.Notification,
|
||
Method = method,
|
||
Parameters = parameters,
|
||
SourceProcessId = _serverId
|
||
};
|
||
|
||
await _connectionManager.BroadcastMessageAsync(message);
|
||
}
|
||
|
||
public async Task RegisterServiceAsync<T>(string serviceName, T serviceInstance) {
|
||
await _serviceRegistry.RegisterServiceAsync(serviceName, serviceInstance);
|
||
}
|
||
|
||
public async Task UnregisterServiceAsync(string serviceName) {
|
||
await _serviceRegistry.UnregisterServiceAsync(serviceName);
|
||
}
|
||
|
||
private void InitializeEventHandlers() {
|
||
_connectionManager.ProcessConnected += OnProcessConnected;
|
||
_connectionManager.ProcessDisconnected += OnProcessDisconnected;
|
||
|
||
_heartbeatManager.ProcessTimeout += async (sender, processId) => {
|
||
Console.WriteLine($"进程 {processId} 心跳超时,断开连接");
|
||
await _connectionManager.DisconnectFromProcessAsync(processId);
|
||
};
|
||
}
|
||
|
||
private void OnProcessConnected(object sender, ProcessInfo processInfo) {
|
||
Console.WriteLine($"进程已连接: {processInfo.ProcessId}");
|
||
|
||
if (_configuration.EnableHeartbeat) {
|
||
_heartbeatManager.StartMonitoring(processInfo.ProcessId);
|
||
}
|
||
|
||
ProcessConnected?.Invoke(this, processInfo);
|
||
}
|
||
|
||
private void OnProcessDisconnected(object sender, ProcessInfo processInfo) {
|
||
Console.WriteLine($"进程已断开: {processInfo.ProcessId}");
|
||
|
||
_heartbeatManager.StopMonitoring(processInfo.ProcessId);
|
||
ProcessDisconnected?.Invoke(this, processInfo);
|
||
}
|
||
|
||
private async Task HandleIncomingMessage(IPCMessage message) {
|
||
try {
|
||
// 更新心跳
|
||
if (_configuration.EnableHeartbeat && message.Type == MessageType.Heartbeat) {
|
||
_heartbeatManager.UpdateHeartbeat(message.SourceProcessId);
|
||
return;
|
||
}
|
||
|
||
// 处理消息
|
||
var response = await _messageRouter.HandleMessageAsync(message);
|
||
if (response != null) {
|
||
await _connectionManager.SendMessageAsync(message.SourceProcessId, response);
|
||
}
|
||
|
||
MessageReceived?.Invoke(this, message);
|
||
} catch (Exception ex) {
|
||
Console.WriteLine($"处理消息时发生错误: {ex.Message}");
|
||
|
||
// 发送错误响应
|
||
var errorResponse = new IPCMessage {
|
||
Id = message.Id,
|
||
Type = MessageType.Error,
|
||
Error = ex.Message,
|
||
SourceProcessId = _serverId,
|
||
TargetProcessId = message.SourceProcessId
|
||
};
|
||
|
||
await _connectionManager.SendMessageAsync(message.SourceProcessId, errorResponse);
|
||
}
|
||
}
|
||
|
||
public void Dispose() {
|
||
if (_disposed)
|
||
return;
|
||
|
||
_disposed = true;
|
||
|
||
try {
|
||
StopAsync().Wait(TimeSpan.FromSeconds(10));
|
||
} catch (Exception ex) {
|
||
Console.WriteLine($"释放资源时发生错误: {ex.Message}");
|
||
}
|
||
|
||
_connectionManager?.Dispose();
|
||
_heartbeatManager?.Dispose();
|
||
}
|
||
}
|
||
}
|