using System.Net;
|
using System.Net.Sockets;
|
using System.Text;
|
using System.Collections.Concurrent;
|
|
var options = ClientOptions.FromArgs(args);
|
var cts = new CancellationTokenSource();
|
var clients = new ConcurrentDictionary<int, Socket>();
|
|
Console.WriteLine($"准备启动 {options.ClientCount} 个客户端,连接到 {options.ServerHost}:{options.ServerPort}");
|
|
// 启动所有客户端连接任务
|
var connectTasks = Enumerable.Range(0, options.ClientCount)
|
.Select(i => RunClientAsync(i + 1, options, clients, cts.Token))
|
.ToArray();
|
|
// 等待所有客户端连接完成
|
await Task.WhenAll(connectTasks);
|
|
// 启动发送循环(与监听任务并行运行)
|
await SendLoopAsync(clients, cts);
|
|
static async Task RunClientAsync(int clientId, ClientOptions options, ConcurrentDictionary<int, Socket> clients, CancellationToken cancellationToken)
|
{
|
var localPort = options.StartLocalPort + clientId - 1;
|
|
// 验证端口范围
|
if (localPort > 65535)
|
{
|
Console.WriteLine($"错误: 客户端#{clientId} 的本地端口 {localPort} 超出有效范围");
|
return;
|
}
|
|
// 首次连接
|
var client = await ConnectAsync(clientId, options, localPort, cancellationToken);
|
if (client == null)
|
{
|
return;
|
}
|
|
clients[clientId] = client;
|
|
// 启动带重连功能的监听任务(后台运行,不阻塞)
|
_ = Task.Run(() => RunWithReconnectAsync(clientId, options, localPort, clients, cancellationToken), cancellationToken);
|
|
// 发送初始化消息
|
await InitializeClientAsync(client, clientId);
|
|
// 立即返回,让主线程继续执行 SendLoopAsync
|
}
|
|
static async Task<Socket?> ConnectAsync(int clientId, ClientOptions options, int localPort, CancellationToken cancellationToken)
|
{
|
try
|
{
|
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
// 允许重用本地端口,解决重连时端口被占用的问题
|
client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
client.Bind(new IPEndPoint(IPAddress.Any, localPort));
|
await client.ConnectAsync(options.ServerHost, options.ServerPort);
|
Console.WriteLine($"客户端#{clientId} 已连接,本地端口: {localPort}");
|
return client;
|
}
|
catch (Exception ex)
|
{
|
Console.WriteLine($"客户端#{clientId} 连接失败: {ex.Message}");
|
return null;
|
}
|
}
|
|
static async Task RunWithReconnectAsync(int clientId, ClientOptions options, int localPort, ConcurrentDictionary<int, Socket> clients, CancellationToken cancellationToken)
|
{
|
var reconnectCount = 0;
|
|
while (!cancellationToken.IsCancellationRequested)
|
{
|
Socket? client = null;
|
try
|
{
|
// 获取当前连接的 Socket(如果存在)
|
if (clients.TryGetValue(clientId, out client))
|
{
|
// 监听消息,直到连接断开
|
// ListenServerMessagesAsync 的 finally 会自动清理断开的 Socket
|
await ListenServerMessagesAsync(client, clientId, cancellationToken, clients);
|
}
|
|
// 连接已断开,进入重连逻辑
|
reconnectCount++;
|
|
// 检查是否启用重连
|
if (!options.EnableReconnect)
|
{
|
Console.WriteLine($"客户端#{clientId} 重连已禁用,退出");
|
return;
|
}
|
|
// 检查重连次数限制
|
if (options.MaxReconnectAttempts > 0 && reconnectCount > options.MaxReconnectAttempts)
|
{
|
Console.WriteLine($"客户端#{clientId} 已达到最大重连次数 ({options.MaxReconnectAttempts}),退出");
|
return;
|
}
|
|
// 等待重连间隔
|
Console.WriteLine($"客户端#{clientId} 等待 {options.ReconnectIntervalMs / 1000} 秒后重连... (第 {reconnectCount} 次)");
|
await Task.Delay(options.ReconnectIntervalMs, cancellationToken);
|
|
// 尝试重新连接
|
client = await ConnectAsync(clientId, options, localPort, cancellationToken);
|
if (client == null)
|
{
|
continue;
|
}
|
|
// 将新 Socket 加入字典
|
clients[clientId] = client;
|
|
// 重新初始化
|
await InitializeClientAsync(client, clientId);
|
Console.WriteLine($"客户端#{clientId} 重连成功,已重新初始化");
|
}
|
catch (OperationCanceledException)
|
{
|
// 正常取消
|
return;
|
}
|
catch (Exception ex)
|
{
|
Console.WriteLine($"客户端#{clientId} 运行异常: {ex.Message}");
|
clients.TryRemove(clientId, out _);
|
try { client?.Shutdown(SocketShutdown.Both); } catch { }
|
try { client?.Dispose(); } catch { }
|
}
|
}
|
}
|
|
static async Task InitializeClientAsync(Socket client, int clientId)
|
{
|
await Task.Delay(5000);
|
await WriteMessageAsync(client, clientId, "Homed");
|
await Task.Delay(500);
|
|
await WriteMessageAsync(client, clientId, "Runmode,2");
|
await Task.Delay(500);
|
|
await WriteMessageAsync(client, clientId, "Armobject,0");
|
await Task.Delay(500);
|
|
await WriteMessageAsync(client, clientId, "Controlmode,1");
|
}
|
|
static async Task WriteMessageAsync(Socket client, int clientId, string message)
|
{
|
var framedMessage = $"<START>{message}<END>";
|
var sendBuffer = Encoding.UTF8.GetBytes(framedMessage);
|
await client.SendAsync(sendBuffer, SocketFlags.None);
|
Console.WriteLine($"客户端#{clientId} 发送: {framedMessage}");
|
}
|
|
static async Task ReadMessageAsync(Socket client, int clientId)
|
{
|
var receiveBuffer = new byte[1024];
|
var length = await client.ReceiveAsync(receiveBuffer, SocketFlags.None);
|
if (length <= 0)
|
{
|
Console.WriteLine($"客户端#{clientId} 未接收到数据");
|
return;
|
}
|
|
var response = Encoding.UTF8.GetString(receiveBuffer, 0, length);
|
if (!response.StartsWith("<START>") || !response.EndsWith("<END>"))
|
{
|
Console.WriteLine($"客户端#{clientId} 收到无效消息(缺少消息头或消息尾),已忽略");
|
return;
|
}
|
|
var content = response["<START>".Length..^"<END>".Length];
|
Console.WriteLine($"客户端#{clientId} 收到: {content}");
|
|
|
}
|
|
static async Task ListenServerMessagesAsync(Socket client, int clientId, CancellationToken cancellationToken, ConcurrentDictionary<int, Socket> clients)
|
{
|
var receiveBuffer = new byte[1024];
|
var cache = new StringBuilder();
|
|
try
|
{
|
while (!cancellationToken.IsCancellationRequested)
|
{
|
int length;
|
try
|
{
|
length = await client.ReceiveAsync(receiveBuffer, SocketFlags.None, cancellationToken);
|
}
|
catch (SocketException)
|
{
|
Console.WriteLine($"客户端#{clientId} 与服务端断开连接");
|
return;
|
}
|
catch (ObjectDisposedException)
|
{
|
Console.WriteLine($"客户端#{clientId} Socket 已被释放");
|
return;
|
}
|
|
if (length <= 0)
|
{
|
Console.WriteLine($"客户端#{clientId} 与服务端断开连接");
|
return;
|
}
|
|
cache.Append(Encoding.UTF8.GetString(receiveBuffer, 0, length));
|
await TryReadFramedMessages(cache, clientId, client);
|
}
|
}
|
catch (OperationCanceledException)
|
{
|
// 正常取消,不需要记录错误
|
}
|
catch (Exception ex)
|
{
|
Console.WriteLine($"客户端#{clientId} 接收消息异常: {ex.Message}");
|
}
|
finally
|
{
|
// 连接断开时,从字典中移除这个 Socket(如果还是当前的话)
|
clients.TryRemove(clientId, out var removed);
|
if (removed == client)
|
{
|
try { client.Shutdown(SocketShutdown.Both); } catch { }
|
try { client.Dispose(); } catch { }
|
}
|
}
|
}
|
|
static async Task TryReadFramedMessages(StringBuilder cache, int clientId, Socket client)
|
{
|
const string start = "<START>";
|
const string end = "<END>";
|
|
while (true)
|
{
|
var text = cache.ToString();
|
var startIndex = text.IndexOf(start, StringComparison.Ordinal);
|
if (startIndex < 0)
|
{
|
// 没有找到 START 标记,保留缓存等待更多数据
|
// 但如果缓存太大,清空以防内存泄漏
|
if (cache.Length > 10240) // 10KB 阈值
|
{
|
cache.Clear();
|
}
|
return;
|
}
|
|
var endIndex = text.IndexOf(end, startIndex + start.Length, StringComparison.Ordinal);
|
if (endIndex < 0)
|
{
|
// 移除 START 之前的无效数据
|
if (startIndex > 0)
|
{
|
cache.Remove(0, startIndex);
|
}
|
return;
|
}
|
|
var contentStart = startIndex + start.Length;
|
var contentLength = endIndex - contentStart;
|
var content = text.Substring(contentStart, contentLength);
|
Console.WriteLine($"客户端#{clientId} 收到: {content}");
|
|
cache.Remove(0, endIndex + end.Length);
|
|
if (content == "Pickbattery,1")
|
{
|
await WriteMessageAsync(client, clientId, "Running");
|
await Task.Delay(1000);
|
await WriteMessageAsync(client, clientId, "Picking");
|
|
//await WriteMessageAsync(client, clientId, "PickFinished,1,2,3,4");
|
}
|
else if (content == "Putbattery,1")
|
{
|
await WriteMessageAsync(client, clientId, "Running");
|
await Task.Delay(500);
|
await WriteMessageAsync(client, clientId, "Puting");
|
await Task.Delay(5000);
|
|
//await WriteMessageAsync(client, clientId, "PutFinished,1,2,3,4");
|
}
|
}
|
}
|
|
static async Task SendLoopAsync(ConcurrentDictionary<int, Socket> clients, CancellationTokenSource cts)
|
{
|
Console.WriteLine("输入格式: 客户端编号:消息,例如 1:Status,输入 exit 退出");
|
|
while (!cts.IsCancellationRequested)
|
{
|
var line = Console.ReadLine();
|
if (string.IsNullOrWhiteSpace(line))
|
{
|
continue;
|
}
|
|
if (line.Equals("exit", StringComparison.OrdinalIgnoreCase))
|
{
|
cts.Cancel();
|
foreach (var socket in clients.Values)
|
{
|
try
|
{
|
socket.Shutdown(SocketShutdown.Both);
|
socket.Dispose();
|
}
|
catch { }
|
}
|
return;
|
}
|
|
var index = line.IndexOf(':');
|
if (index <= 0 || index == line.Length - 1)
|
{
|
Console.WriteLine("输入不正确,请使用: 客户端编号:消息");
|
continue;
|
}
|
|
var idText = line[..index];
|
var message = line[(index + 1)..];
|
if (!int.TryParse(idText, out var clientId) || !clients.TryGetValue(clientId, out var client))
|
{
|
Console.WriteLine($"客户端#{idText} 不存在或未连接");
|
continue;
|
}
|
|
try
|
{
|
await WriteMessageAsync(client, clientId, message);
|
}
|
catch (ObjectDisposedException)
|
{
|
Console.WriteLine($"客户端#{clientId} 发送失败: Socket 已关闭(可能正在重连中)");
|
clients.TryRemove(clientId, out _);
|
}
|
catch (SocketException ex)
|
{
|
Console.WriteLine($"客户端#{clientId} 发送失败: {ex.Message}");
|
clients.TryRemove(clientId, out _);
|
}
|
catch (Exception ex)
|
{
|
Console.WriteLine($"客户端#{clientId} 发送失败: {ex.GetType().Name} - {ex.Message}");
|
}
|
}
|
}
|
|
file sealed class ClientOptions
|
{
|
public int ClientCount { get; init; } = 1;
|
public string ServerHost { get; init; } = "127.0.0.1";
|
public int ServerPort { get; init; } = 2000;
|
public int StartLocalPort { get; init; } = 62312;
|
public bool EnableReconnect { get; init; } = true;
|
public int ReconnectIntervalMs { get; init; } = 5000;
|
public int MaxReconnectAttempts { get; init; } = -1; // -1 表示无限重试
|
|
public static ClientOptions FromArgs(string[] args)
|
{
|
var map = args
|
.Select(v => v.Split('=', 2, StringSplitOptions.TrimEntries))
|
.Where(parts => parts.Length == 2)
|
.ToDictionary(parts => parts[0].TrimStart('-', '/').ToLowerInvariant(), parts => parts[1]);
|
|
return new ClientOptions
|
{
|
ClientCount = GetInt(map, "count", 1),
|
ServerHost = GetString(map, "host", "127.0.0.1"),
|
ServerPort = GetInt(map, "serverport", 2000),
|
StartLocalPort = GetInt(map, "localport", 62312),
|
EnableReconnect = GetBool(map, "reconnect", true),
|
ReconnectIntervalMs = GetInt(map, "reconnectinterval", 5000),
|
MaxReconnectAttempts = GetInt(map, "maxreconnect", -1)
|
};
|
}
|
|
private static bool GetBool(Dictionary<string, string> map, string key, bool defaultValue)
|
=> map.TryGetValue(key, out var value)
|
? (bool.TryParse(value, out var result) ? result : defaultValue)
|
: defaultValue;
|
|
private static int GetInt(Dictionary<string, string> map, string key, int defaultValue)
|
=> map.TryGetValue(key, out var value) && int.TryParse(value, out var number) ? number : defaultValue;
|
|
private static string GetString(Dictionary<string, string> map, string key, string defaultValue)
|
=> map.TryGetValue(key, out var value) && !string.IsNullOrWhiteSpace(value) ? value : defaultValue;
|
}
|