using System.Net; using System.Net.Sockets; using System.Text; using System.Collections.Concurrent; var options = ClientOptions.FromArgs(args); var cts = new CancellationTokenSource(); var clients = new ConcurrentDictionary(); Console.WriteLine($"准备启动 {options.ClientCount} 个客户端,连接到 {options.ServerHost}:{options.ServerPort}"); // 启动所有客户端连接任务 var connectTasks = Enumerable.Range(0, options.ClientCount) .Select(i => RunClientAsync(i + 1, options, clients, cts.Token)) .ToArray(); // 等待所有客户端连接完成 await Task.WhenAll(connectTasks); // 启动发送循环(与监听任务并行运行) await SendLoopAsync(clients, cts); static async Task RunClientAsync(int clientId, ClientOptions options, ConcurrentDictionary clients, CancellationToken cancellationToken) { var localPort = options.StartLocalPort + clientId - 1; // 验证端口范围 if (localPort > 65535) { Console.WriteLine($"错误: 客户端#{clientId} 的本地端口 {localPort} 超出有效范围"); return; } // 首次连接 var client = await ConnectAsync(clientId, options, localPort, cancellationToken); if (client == null) { return; } clients[clientId] = client; // 启动带重连功能的监听任务(后台运行,不阻塞) _ = Task.Run(() => RunWithReconnectAsync(clientId, options, localPort, clients, cancellationToken), cancellationToken); // 发送初始化消息 await InitializeClientAsync(client, clientId); // 立即返回,让主线程继续执行 SendLoopAsync } static async Task ConnectAsync(int clientId, ClientOptions options, int localPort, CancellationToken cancellationToken) { try { var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); // 允许重用本地端口,解决重连时端口被占用的问题 client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); client.Bind(new IPEndPoint(IPAddress.Any, localPort)); await client.ConnectAsync(options.ServerHost, options.ServerPort); Console.WriteLine($"客户端#{clientId} 已连接,本地端口: {localPort}"); return client; } catch (Exception ex) { Console.WriteLine($"客户端#{clientId} 连接失败: {ex.Message}"); return null; } } static async Task RunWithReconnectAsync(int clientId, ClientOptions options, int localPort, ConcurrentDictionary clients, CancellationToken cancellationToken) { var reconnectCount = 0; while (!cancellationToken.IsCancellationRequested) { Socket? client = null; try { // 获取当前连接的 Socket(如果存在) if (clients.TryGetValue(clientId, out client)) { // 监听消息,直到连接断开 // ListenServerMessagesAsync 的 finally 会自动清理断开的 Socket await ListenServerMessagesAsync(client, clientId, cancellationToken, clients); } // 连接已断开,进入重连逻辑 reconnectCount++; // 检查是否启用重连 if (!options.EnableReconnect) { Console.WriteLine($"客户端#{clientId} 重连已禁用,退出"); return; } // 检查重连次数限制 if (options.MaxReconnectAttempts > 0 && reconnectCount > options.MaxReconnectAttempts) { Console.WriteLine($"客户端#{clientId} 已达到最大重连次数 ({options.MaxReconnectAttempts}),退出"); return; } // 等待重连间隔 Console.WriteLine($"客户端#{clientId} 等待 {options.ReconnectIntervalMs / 1000} 秒后重连... (第 {reconnectCount} 次)"); await Task.Delay(options.ReconnectIntervalMs, cancellationToken); // 尝试重新连接 client = await ConnectAsync(clientId, options, localPort, cancellationToken); if (client == null) { continue; } // 将新 Socket 加入字典 clients[clientId] = client; // 重新初始化 await InitializeClientAsync(client, clientId); Console.WriteLine($"客户端#{clientId} 重连成功,已重新初始化"); } catch (OperationCanceledException) { // 正常取消 return; } catch (Exception ex) { Console.WriteLine($"客户端#{clientId} 运行异常: {ex.Message}"); clients.TryRemove(clientId, out _); try { client?.Shutdown(SocketShutdown.Both); } catch { } try { client?.Dispose(); } catch { } } } } static async Task InitializeClientAsync(Socket client, int clientId) { await Task.Delay(5000); await WriteMessageAsync(client, clientId, "Homed"); await Task.Delay(500); await WriteMessageAsync(client, clientId, "Runmode,2"); await Task.Delay(500); await WriteMessageAsync(client, clientId, "Armobject,0"); await Task.Delay(500); await WriteMessageAsync(client, clientId, "Controlmode,1"); } static async Task WriteMessageAsync(Socket client, int clientId, string message) { var framedMessage = $"{message}"; var sendBuffer = Encoding.UTF8.GetBytes(framedMessage); await client.SendAsync(sendBuffer, SocketFlags.None); Console.WriteLine($"客户端#{clientId} 发送: {framedMessage}"); } static async Task ReadMessageAsync(Socket client, int clientId) { var receiveBuffer = new byte[1024]; var length = await client.ReceiveAsync(receiveBuffer, SocketFlags.None); if (length <= 0) { Console.WriteLine($"客户端#{clientId} 未接收到数据"); return; } var response = Encoding.UTF8.GetString(receiveBuffer, 0, length); if (!response.StartsWith("") || !response.EndsWith("")) { Console.WriteLine($"客户端#{clientId} 收到无效消息(缺少消息头或消息尾),已忽略"); return; } var content = response["".Length..^"".Length]; Console.WriteLine($"客户端#{clientId} 收到: {content}"); } static async Task ListenServerMessagesAsync(Socket client, int clientId, CancellationToken cancellationToken, ConcurrentDictionary clients) { var receiveBuffer = new byte[1024]; var cache = new StringBuilder(); try { while (!cancellationToken.IsCancellationRequested) { int length; try { length = await client.ReceiveAsync(receiveBuffer, SocketFlags.None, cancellationToken); } catch (SocketException) { Console.WriteLine($"客户端#{clientId} 与服务端断开连接"); return; } catch (ObjectDisposedException) { Console.WriteLine($"客户端#{clientId} Socket 已被释放"); return; } if (length <= 0) { Console.WriteLine($"客户端#{clientId} 与服务端断开连接"); return; } cache.Append(Encoding.UTF8.GetString(receiveBuffer, 0, length)); await TryReadFramedMessages(cache, clientId, client); } } catch (OperationCanceledException) { // 正常取消,不需要记录错误 } catch (Exception ex) { Console.WriteLine($"客户端#{clientId} 接收消息异常: {ex.Message}"); } finally { // 连接断开时,从字典中移除这个 Socket(如果还是当前的话) clients.TryRemove(clientId, out var removed); if (removed == client) { try { client.Shutdown(SocketShutdown.Both); } catch { } try { client.Dispose(); } catch { } } } } static async Task TryReadFramedMessages(StringBuilder cache, int clientId, Socket client) { const string start = ""; const string end = ""; while (true) { var text = cache.ToString(); var startIndex = text.IndexOf(start, StringComparison.Ordinal); if (startIndex < 0) { // 没有找到 START 标记,保留缓存等待更多数据 // 但如果缓存太大,清空以防内存泄漏 if (cache.Length > 10240) // 10KB 阈值 { cache.Clear(); } return; } var endIndex = text.IndexOf(end, startIndex + start.Length, StringComparison.Ordinal); if (endIndex < 0) { // 移除 START 之前的无效数据 if (startIndex > 0) { cache.Remove(0, startIndex); } return; } var contentStart = startIndex + start.Length; var contentLength = endIndex - contentStart; var content = text.Substring(contentStart, contentLength); Console.WriteLine($"客户端#{clientId} 收到: {content}"); cache.Remove(0, endIndex + end.Length); if (content == "Pickbattery,1") { await WriteMessageAsync(client, clientId, "Running"); await Task.Delay(1000); await WriteMessageAsync(client, clientId, "Picking"); //await WriteMessageAsync(client, clientId, "PickFinished,1,2,3,4"); } else if (content == "Putbattery,1") { await WriteMessageAsync(client, clientId, "Running"); await Task.Delay(500); await WriteMessageAsync(client, clientId, "Puting"); await Task.Delay(5000); //await WriteMessageAsync(client, clientId, "PutFinished,1,2,3,4"); } } } static async Task SendLoopAsync(ConcurrentDictionary clients, CancellationTokenSource cts) { Console.WriteLine("输入格式: 客户端编号:消息,例如 1:Status,输入 exit 退出"); while (!cts.IsCancellationRequested) { var line = Console.ReadLine(); if (string.IsNullOrWhiteSpace(line)) { continue; } if (line.Equals("exit", StringComparison.OrdinalIgnoreCase)) { cts.Cancel(); foreach (var socket in clients.Values) { try { socket.Shutdown(SocketShutdown.Both); socket.Dispose(); } catch { } } return; } var index = line.IndexOf(':'); if (index <= 0 || index == line.Length - 1) { Console.WriteLine("输入不正确,请使用: 客户端编号:消息"); continue; } var idText = line[..index]; var message = line[(index + 1)..]; if (!int.TryParse(idText, out var clientId) || !clients.TryGetValue(clientId, out var client)) { Console.WriteLine($"客户端#{idText} 不存在或未连接"); continue; } try { await WriteMessageAsync(client, clientId, message); } catch (ObjectDisposedException) { Console.WriteLine($"客户端#{clientId} 发送失败: Socket 已关闭(可能正在重连中)"); clients.TryRemove(clientId, out _); } catch (SocketException ex) { Console.WriteLine($"客户端#{clientId} 发送失败: {ex.Message}"); clients.TryRemove(clientId, out _); } catch (Exception ex) { Console.WriteLine($"客户端#{clientId} 发送失败: {ex.GetType().Name} - {ex.Message}"); } } } file sealed class ClientOptions { public int ClientCount { get; init; } = 1; public string ServerHost { get; init; } = "127.0.0.1"; public int ServerPort { get; init; } = 2000; public int StartLocalPort { get; init; } = 62312; public bool EnableReconnect { get; init; } = true; public int ReconnectIntervalMs { get; init; } = 5000; public int MaxReconnectAttempts { get; init; } = -1; // -1 表示无限重试 public static ClientOptions FromArgs(string[] args) { var map = args .Select(v => v.Split('=', 2, StringSplitOptions.TrimEntries)) .Where(parts => parts.Length == 2) .ToDictionary(parts => parts[0].TrimStart('-', '/').ToLowerInvariant(), parts => parts[1]); return new ClientOptions { ClientCount = GetInt(map, "count", 1), ServerHost = GetString(map, "host", "127.0.0.1"), ServerPort = GetInt(map, "serverport", 2000), StartLocalPort = GetInt(map, "localport", 62312), EnableReconnect = GetBool(map, "reconnect", true), ReconnectIntervalMs = GetInt(map, "reconnectinterval", 5000), MaxReconnectAttempts = GetInt(map, "maxreconnect", -1) }; } private static bool GetBool(Dictionary map, string key, bool defaultValue) => map.TryGetValue(key, out var value) ? (bool.TryParse(value, out var result) ? result : defaultValue) : defaultValue; private static int GetInt(Dictionary map, string key, int defaultValue) => map.TryGetValue(key, out var value) && int.TryParse(value, out var number) ? number : defaultValue; private static string GetString(Dictionary map, string key, string defaultValue) => map.TryGetValue(key, out var value) && !string.IsNullOrWhiteSpace(value) ? value : defaultValue; }