using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using System.Buffers; using System.Net.WebSockets; using System.Text; using System.Threading; public class WebSocketClientService : IWebSocketClientService, IDisposable { private readonly ILogger _logger; private readonly string _serverUrl; private readonly CancellationTokenSource _cts = new(); private readonly SemaphoreSlim _connectionLock = new(1, 1); private ClientWebSocket _webSocket = new(); private Timer _heartbeatTimer; private DateTime _lastPongReceived = DateTime.MinValue; private const int HeartbeatInterval = 30000; // 30秒 private const int PongTimeout = 60000; // 60秒 public event EventHandler MessageReceived; public bool IsConnected => _webSocket?.State == WebSocketState.Open; public WebSocketClientService(ILogger logger) { _logger = logger; _serverUrl ="ws://localhost:5050"; } public async Task ConnectAsync() { await _connectionLock.WaitAsync(); try { if (IsConnected) { _logger.LogWarning("WebSocket 已连接"); return; } // 清理旧连接 //await SafeDisconnectAsync(); _webSocket = new ClientWebSocket(); _lastPongReceived = DateTime.UtcNow; try { await _webSocket.ConnectAsync(new Uri(_serverUrl), _cts.Token); _logger.LogInformation("成功连接到 {ServerUrl}", _serverUrl); // 启动心跳和接收消息 StartHeartbeat(); _ = ReceiveMessagesAsync(); } catch (Exception ex) { _logger.LogError(ex, "连接失败"); await SafeDisconnectAsync(); throw; } } finally { _connectionLock.Release(); } } private void StartHeartbeat() { _heartbeatTimer?.Dispose(); _heartbeatTimer = new Timer( async _ => await SendHeartbeatAsync(), null, HeartbeatInterval, HeartbeatInterval); } private async Task SendHeartbeatAsync() { if (!IsConnected) return; try { // 检查PONG响应超时 if ((DateTime.UtcNow - _lastPongReceived).TotalMilliseconds > PongTimeout) { _logger.LogWarning("PONG响应超时,重新连接..."); await ReconnectAsync(); return; } // 发送PING await SendAsync("PONG"); _logger.LogDebug("发送心跳检测"); } catch (Exception ex) { _logger.LogError(ex, "发送心跳失败"); await ReconnectAsync(); } } private async Task ReceiveMessagesAsync() { var buffer = ArrayPool.Shared.Rent(4096); try { while (IsConnected && !_cts.Token.IsCancellationRequested) { try { var result = await _webSocket.ReceiveAsync( new ArraySegment(buffer), _cts.Token); var message = Encoding.UTF8.GetString(buffer, 0, result.Count); if (result.MessageType == WebSocketMessageType.Close) { await DisconnectAsync(); break; } if (result.MessageType == WebSocketMessageType.Text) { MessageReceived?.Invoke(this, message); _logger.LogDebug("收到消息: {Message}", message); } // 可添加对二进制消息的处理 else if (result.MessageType == WebSocketMessageType.Binary) { _logger.LogWarning("收到二进制消息,忽略"); } // 如果是最后一片消息且是心跳响应 if (result.EndOfMessage && message == "PONG") { _lastPongReceived = DateTime.UtcNow; _logger.LogDebug("收到心跳响应"); } } catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) { _logger.LogWarning("连接提前关闭"); await DisconnectAsync(); break; } catch (OperationCanceledException) { _logger.LogDebug("接收消息操作已取消"); break; } catch (Exception ex) { _logger.LogError(ex, "接收消息错误"); await ReconnectAsync(); break; } } } finally { ArrayPool.Shared.Return(buffer); } } public async Task SendAsync(string message) { if (_webSocket == null || _webSocket.State != WebSocketState.Open) { _logger.LogWarning("WebSocket 未连接,尝试重新连接"); await ReconnectAsync(); if (_webSocket == null || _webSocket.State != WebSocketState.Open) { _logger.LogError("无法重新连接到 WebSocket"); return; } } var buffer = Encoding.UTF8.GetBytes(message); var segment = new ArraySegment(buffer); try { await _webSocket.SendAsync(segment, WebSocketMessageType.Text, true, _cts.Token); _logger.LogDebug("消息发送成功: {Message}", message); } catch (WebSocketException ex) { _logger.LogError(ex, "发送消息时 WebSocket 异常"); await DisconnectAsync(); } catch (OperationCanceledException) { _logger.LogDebug("发送消息操作已取消"); } catch (Exception ex) { _logger.LogError(ex, "发送消息时发生未知错误"); await ReconnectAsync(); } } public async Task DisconnectAsync() { if (_webSocket != null) { try { if (_webSocket.State == WebSocketState.Open) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Connection closed normally", CancellationToken.None); } } catch (Exception ex) { _logger.LogError(ex, "关闭WebSocket连接时出错"); } finally { _webSocket.Dispose(); _webSocket = null; } } } private async Task SafeDisconnectAsync() { try { _heartbeatTimer?.Dispose(); _heartbeatTimer = null; if (_webSocket?.State == WebSocketState.Open) { await _webSocket.CloseAsync( WebSocketCloseStatus.NormalClosure, "正常关闭", CancellationToken.None); } } catch (Exception ex) { _logger.LogWarning(ex, "关闭连接时出错"); } finally { _webSocket?.Dispose(); _webSocket = new ClientWebSocket(); _logger.LogInformation("连接已关闭"); } } private async Task ReconnectAsync() { await DisconnectAsync(); int retryCount = 0; const int maxRetries = 5; const int delayMilliseconds = 1000; while (retryCount < maxRetries && !_cts.Token.IsCancellationRequested) { try { await ConnectAsync(); // 假设 ConnectAsync 是你的连接方法 _logger.LogInformation("重新连接成功"); return; } catch (Exception ex) { _logger.LogError(ex, "重新连接失败"); retryCount++; if (retryCount < maxRetries) { await Task.Delay(delayMilliseconds, _cts.Token); } } } _logger.LogError("达到最大重试次数,无法重新连接"); } public void Dispose() { _cts.Cancel(); _heartbeatTimer?.Dispose(); _webSocket?.Dispose(); _connectionLock.Dispose(); _cts.Dispose(); GC.SuppressFinalize(this); } }