using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Text; namespace WIDESEAWCS_S7Simulator.Server.Services; public interface IRobotClientManager { Task StartAsync(RobotServerStartRequest request, CancellationToken cancellationToken = default); Task StopAsync(string? serverId = null); Task GetStatusAsync(); Task SendToClientAsync(string serverId, int clientId, string message); Task SendToAllAsync(string serverId, string message); Task ClearReceivedMessagesAsync(string serverId); } /// /// 机械手 TCP 客户端多实例管理器。 /// 一个 ServerId 对应一个 TcpClient,主动连接目标服务端。 /// public sealed class RobotClientManager : IRobotClientManager, IDisposable { private readonly ILogger _logger; private readonly ConcurrentDictionary _clients = new(StringComparer.OrdinalIgnoreCase); private bool _disposed; public RobotClientManager(ILogger logger) { _logger = logger; } public async Task StartAsync(RobotServerStartRequest request, CancellationToken cancellationToken = default) { ValidateStartRequest(request); EnsureLocalPortAvailable(request.LocalPort); var key = request.ServerId.Trim(); if (_clients.ContainsKey(key)) { throw new InvalidOperationException($"客户端实例 '{key}' 已存在"); } var runtime = new RobotClientRuntime { ServerId = key, RemoteIp = request.ListenIp, RemotePort = request.ListenPort, LocalPort = request.LocalPort, Cancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken), Connected = false }; runtime.ConnectionLoopTask = Task.Run(() => ConnectionLoopAsync(runtime), runtime.Cancellation.Token); if (!_clients.TryAdd(key, runtime)) { await StopRuntimeAsync(runtime); throw new InvalidOperationException($"客户端实例 '{key}' 创建失败"); } _logger.LogInformation("机械手客户端已启动: {ServerId} 本地:{LocalPort} -> 远端:{Ip}:{Port}", key, request.LocalPort, request.ListenIp, request.ListenPort); return await GetStatusAsync(); } public async Task StopAsync(string? serverId = null) { if (string.IsNullOrWhiteSpace(serverId)) { var all = _clients.Values.ToArray(); _clients.Clear(); await Task.WhenAll(all.Select(StopRuntimeAsync)); return; } if (_clients.TryRemove(serverId.Trim(), out var runtime)) { await StopRuntimeAsync(runtime); } } public Task GetStatusAsync() { var servers = _clients.Values .OrderBy(x => x.ServerId, StringComparer.OrdinalIgnoreCase) .Select(ToStatusItem) .ToArray(); return Task.FromResult(new RobotServerCollectionStatusResponse { RunningServerCount = servers.Count(x => x.Running), Servers = servers }); } public async Task SendToClientAsync(string serverId, int clientId, string message) { var runtime = GetClientOrThrow(serverId); if (clientId != 1) { throw new InvalidOperationException($"客户端实例 '{runtime.ServerId}' 仅支持 ClientId=1"); } await SendFrameAsync(runtime, message, CancellationToken.None); } public async Task SendToAllAsync(string serverId, string message) { var runtime = GetClientOrThrow(serverId); await SendFrameAsync(runtime, message, CancellationToken.None); } /// /// 清空指定客户端实例的收发消息日志。 /// public Task ClearReceivedMessagesAsync(string serverId) { var runtime = GetClientOrThrow(serverId); while (runtime.ReceivedMessages.TryDequeue(out _)) { } while (runtime.SentMessages.TryDequeue(out _)) { } return Task.CompletedTask; } private RobotClientRuntime GetClientOrThrow(string serverId) { if (string.IsNullOrWhiteSpace(serverId)) { throw new ArgumentException("ServerId 不能为空"); } if (!_clients.TryGetValue(serverId.Trim(), out var runtime)) { throw new InvalidOperationException($"客户端实例 '{serverId}' 不存在"); } return runtime; } /// /// 客户端连接守护循环:断开后自动重连。 /// private async Task ConnectionLoopAsync(RobotClientRuntime runtime) { var token = runtime.Cancellation.Token; while (!token.IsCancellationRequested) { if (!runtime.Connected) { try { await ConnectOnceAsync(runtime, token); } catch (OperationCanceledException) { break; } catch (Exception ex) { runtime.LastError = ex.Message; _logger.LogWarning(ex, "[{ServerId}] 连接失败,将在 2 秒后重试", runtime.ServerId); await DelayForReconnect(token); continue; } } var tcpClient = runtime.TcpClient; if (tcpClient == null) { await DelayForReconnect(token); continue; } await ReceiveLoopAsync(runtime, tcpClient); await DelayForReconnect(token); } } /// /// 单次连接动作。成功后更新运行时连接状态。 /// private async Task ConnectOnceAsync(RobotClientRuntime runtime, CancellationToken token) { var tcpClient = new TcpClient(AddressFamily.InterNetwork); try { // 每次重连都固定绑定同一本地端口。 tcpClient.Client.Bind(new IPEndPoint(IPAddress.Any, runtime.LocalPort)); await tcpClient.ConnectAsync(runtime.RemoteIp, runtime.RemotePort, token); } catch { tcpClient.Dispose(); throw; } runtime.TcpClient = tcpClient; runtime.Connected = true; runtime.ConnectedAt = DateTimeOffset.Now; runtime.LastError = null; _logger.LogInformation("[{ServerId}] 已连接 本地:{LocalPort} -> 远端:{Ip}:{Port}", runtime.ServerId, runtime.LocalPort, runtime.RemoteIp, runtime.RemotePort); } /// /// 单个客户端收包循环,支持帧协议和普通文本两种输入。 /// private async Task ReceiveLoopAsync(RobotClientRuntime runtime, TcpClient tcpClient) { var token = runtime.Cancellation.Token; var buffer = new byte[2048]; var cache = new StringBuilder(); try { var stream = tcpClient.GetStream(); while (!token.IsCancellationRequested) { var length = await stream.ReadAsync(buffer, 0, buffer.Length, token); if (length <= 0) { break; } runtime.LastReceivedAt = DateTimeOffset.Now; cache.Append(Encoding.UTF8.GetString(buffer, 0, length)); TryReadMessages(runtime, cache); } } catch (OperationCanceledException) { // 正常停止。 } catch (Exception ex) { runtime.LastError = ex.Message; _logger.LogWarning(ex, "[{ServerId}] 客户端收包异常", runtime.ServerId); } finally { runtime.Connected = false; await CloseRuntimeSocketAsync(runtime); _logger.LogWarning("[{ServerId}] 客户端连接已断开,准备自动重连", runtime.ServerId); } } private void TryReadMessages(RobotClientRuntime runtime, StringBuilder cache) { const string start = ""; const string end = ""; while (true) { var text = cache.ToString(); var startIndex = text.IndexOf(start, StringComparison.Ordinal); var endIndex = text.IndexOf(end, StringComparison.Ordinal); // 未命中帧协议时,按整段文本记录,避免消息丢失。 if (startIndex < 0 || endIndex < 0 || endIndex <= startIndex) { if (cache.Length > 0 && cache.Length < 2048 && !text.Contains("", StringComparison.Ordinal)) { AppendReceived(runtime, text.Trim()); cache.Clear(); } if (cache.Length > 10240) { cache.Clear(); } return; } var contentStart = startIndex + start.Length; var contentLength = endIndex - contentStart; var content = text.Substring(contentStart, contentLength); cache.Remove(0, endIndex + end.Length); AppendReceived(runtime, content); } } private void AppendReceived(RobotClientRuntime runtime, string message) { if (string.IsNullOrWhiteSpace(message)) { return; } runtime.LastReceivedMessage = message; runtime.ReceivedMessages.Enqueue(new RobotServerReceivedMessageItem { ReceivedAt = DateTimeOffset.Now, ClientId = 1, RemoteEndPoint = $"{runtime.RemoteIp}:{runtime.RemotePort}", Message = message }); while (runtime.ReceivedMessages.Count > 500) { runtime.ReceivedMessages.TryDequeue(out _); } _logger.LogInformation("[{ServerId}] 收到: {Message}", runtime.ServerId, message); } private async Task SendFrameAsync(RobotClientRuntime runtime, string message, CancellationToken token) { if (!runtime.Connected) { throw new InvalidOperationException($"客户端实例 '{runtime.ServerId}' 未连接"); } var frame = $"{message}"; var bytes = Encoding.UTF8.GetBytes(frame); await runtime.SendLock.WaitAsync(token); try { if (runtime.TcpClient == null) { throw new InvalidOperationException($"客户端实例 '{runtime.ServerId}' 未连接"); } var stream = runtime.TcpClient.GetStream(); await stream.WriteAsync(bytes, 0, bytes.Length, token); await stream.FlushAsync(token); runtime.LastSentAt = DateTimeOffset.Now; runtime.SentMessages.Enqueue(new RobotServerSentMessageItem { SentAt = DateTimeOffset.Now, ClientId = 1, RemoteEndPoint = $"{runtime.RemoteIp}:{runtime.RemotePort}", Message = message }); while (runtime.SentMessages.Count > 500) { runtime.SentMessages.TryDequeue(out _); } _logger.LogInformation("[{ServerId}] 发送: {Frame}", runtime.ServerId, frame); } finally { runtime.SendLock.Release(); } } private static async Task CloseRuntimeSocketAsync(RobotClientRuntime runtime) { await runtime.SendLock.WaitAsync(); try { if (runtime.TcpClient != null) { try { runtime.TcpClient.Close(); } catch { } runtime.TcpClient.Dispose(); runtime.TcpClient = null; } runtime.Connected = false; } finally { runtime.SendLock.Release(); } } private async Task StopRuntimeAsync(RobotClientRuntime runtime) { try { runtime.Cancellation.Cancel(); } catch { } if (runtime.ConnectionLoopTask != null) { try { await runtime.ConnectionLoopTask; } catch { } } await CloseRuntimeSocketAsync(runtime); runtime.Cancellation.Dispose(); _logger.LogInformation("机械手客户端已停止: {ServerId}", runtime.ServerId); } private static RobotServerStatusItem ToStatusItem(RobotClientRuntime runtime) { var clients = new[] { new RobotServerClientStatusItem { ClientId = 1, RemoteEndPoint = $"{runtime.RemoteIp}:{runtime.RemotePort}", Connected = runtime.Connected, ConnectedAt = runtime.ConnectedAt, LastReceivedAt = runtime.LastReceivedAt, LastSentAt = runtime.LastSentAt, LastReceivedMessage = runtime.LastReceivedMessage, LastError = runtime.LastError } }; var receivedMessages = runtime.ReceivedMessages .Reverse() .Take(200) .ToArray(); var sentMessages = runtime.SentMessages .Reverse() .Take(200) .ToArray(); return new RobotServerStatusItem { ServerId = runtime.ServerId, Running = !runtime.Cancellation.IsCancellationRequested, ListenIp = runtime.RemoteIp, ListenPort = runtime.RemotePort, LocalPort = runtime.LocalPort, ConnectedCount = runtime.Connected ? 1 : 0, Clients = clients, ReceivedMessages = receivedMessages, SentMessages = sentMessages }; } private static void ValidateStartRequest(RobotServerStartRequest request) { if (string.IsNullOrWhiteSpace(request.ServerId)) { throw new ArgumentException("ServerId 不能为空"); } if (string.IsNullOrWhiteSpace(request.ListenIp)) { throw new ArgumentException("目标服务端地址不能为空"); } if (request.ListenPort <= 0 || request.ListenPort > 65535) { throw new ArgumentException("目标服务端端口必须在 1-65535 范围内"); } if (request.LocalPort <= 0 || request.LocalPort > 65535) { throw new ArgumentException("客户端本地端口必须在 1-65535 范围内"); } } /// /// 启动前检查本地端口是否可用,避免实例进入无效重连状态。 /// private static void EnsureLocalPortAvailable(int localPort) { Socket? probe = null; try { probe = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); probe.Bind(new IPEndPoint(IPAddress.Any, localPort)); } catch (SocketException ex) { throw new InvalidOperationException($"本地端口 {localPort} 已被占用或不可用", ex); } finally { try { probe?.Dispose(); } catch { } } } private static async Task DelayForReconnect(CancellationToken token) { try { await Task.Delay(TimeSpan.FromSeconds(2), token); } catch (OperationCanceledException) { } } public void Dispose() { if (_disposed) { return; } _disposed = true; StopAsync().GetAwaiter().GetResult(); } private sealed class RobotClientRuntime { public string ServerId { get; set; } = string.Empty; public string RemoteIp { get; set; } = string.Empty; public int RemotePort { get; set; } public int LocalPort { get; set; } public TcpClient? TcpClient { get; set; } public CancellationTokenSource Cancellation { get; set; } = default!; public Task? ConnectionLoopTask { get; set; } public SemaphoreSlim SendLock { get; } = new(1, 1); public bool Connected { get; set; } public DateTimeOffset? ConnectedAt { get; set; } public DateTimeOffset? LastReceivedAt { get; set; } public DateTimeOffset? LastSentAt { get; set; } public string? LastReceivedMessage { get; set; } public string? LastError { get; set; } public ConcurrentQueue ReceivedMessages { get; } = new(); public ConcurrentQueue SentMessages { get; } = new(); } } public sealed class RobotServerStartRequest { public string ServerId { get; set; } = "default"; public string ListenIp { get; set; } = "127.0.0.1"; public int ListenPort { get; set; } = 2000; public int LocalPort { get; set; } = 2001; } public sealed class RobotServerSendRequest { public string ServerId { get; set; } = string.Empty; public int? ClientId { get; set; } public string Message { get; set; } = string.Empty; } public sealed class RobotServerCollectionStatusResponse { public int RunningServerCount { get; set; } public IReadOnlyList Servers { get; set; } = Array.Empty(); } public sealed class RobotServerStatusItem { public string ServerId { get; set; } = string.Empty; public bool Running { get; set; } public string ListenIp { get; set; } = string.Empty; public int ListenPort { get; set; } public int LocalPort { get; set; } public int ConnectedCount { get; set; } public IReadOnlyList Clients { get; set; } = Array.Empty(); public IReadOnlyList ReceivedMessages { get; set; } = Array.Empty(); public IReadOnlyList SentMessages { get; set; } = Array.Empty(); } public sealed class RobotServerClientStatusItem { public int ClientId { get; set; } public string? RemoteEndPoint { get; set; } public bool Connected { get; set; } public DateTimeOffset? ConnectedAt { get; set; } public DateTimeOffset? LastReceivedAt { get; set; } public DateTimeOffset? LastSentAt { get; set; } public string? LastReceivedMessage { get; set; } public string? LastError { get; set; } } public sealed class RobotServerReceivedMessageItem { public DateTimeOffset ReceivedAt { get; set; } public int ClientId { get; set; } public string? RemoteEndPoint { get; set; } public string Message { get; set; } = string.Empty; } public sealed class RobotServerSentMessageItem { public DateTimeOffset SentAt { get; set; } public int ClientId { get; set; } public string? RemoteEndPoint { get; set; } public string Message { get; set; } = string.Empty; }