using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Logging;
|
using System.Buffers;
|
using System.Net.WebSockets;
|
using System.Text;
|
using System.Threading;
|
|
public class WebSocketClientService : IWebSocketClientService, IDisposable
|
{
|
private readonly ILogger<WebSocketClientService> _logger;
|
private readonly string _serverUrl;
|
private readonly CancellationTokenSource _cts = new();
|
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
|
private ClientWebSocket _webSocket = new();
|
private Timer _heartbeatTimer;
|
private DateTime _lastPongReceived = DateTime.MinValue;
|
private const int HeartbeatInterval = 30000; // 30秒
|
private const int PongTimeout = 60000; // 60秒
|
|
public event EventHandler<string> MessageReceived;
|
public bool IsConnected => _webSocket?.State == WebSocketState.Open;
|
|
public WebSocketClientService(ILogger<WebSocketClientService> logger)
|
{
|
_logger = logger;
|
_serverUrl ="ws://localhost:5050";
|
}
|
|
public async Task ConnectAsync()
|
{
|
await _connectionLock.WaitAsync();
|
try
|
{
|
if (IsConnected)
|
{
|
_logger.LogWarning("WebSocket 已连接");
|
return;
|
}
|
|
// 清理旧连接
|
//await SafeDisconnectAsync();
|
|
_webSocket = new ClientWebSocket();
|
_lastPongReceived = DateTime.UtcNow;
|
|
try
|
{
|
await _webSocket.ConnectAsync(new Uri(_serverUrl), _cts.Token);
|
_logger.LogInformation("成功连接到 {ServerUrl}", _serverUrl);
|
|
// 启动心跳和接收消息
|
StartHeartbeat();
|
_ = ReceiveMessagesAsync();
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "连接失败");
|
await SafeDisconnectAsync();
|
throw;
|
}
|
}
|
finally
|
{
|
_connectionLock.Release();
|
}
|
}
|
|
private void StartHeartbeat()
|
{
|
_heartbeatTimer?.Dispose();
|
_heartbeatTimer = new Timer(
|
async _ => await SendHeartbeatAsync(),
|
null,
|
HeartbeatInterval,
|
HeartbeatInterval);
|
}
|
|
private async Task SendHeartbeatAsync()
|
{
|
if (!IsConnected) return;
|
|
try
|
{
|
// 检查PONG响应超时
|
if ((DateTime.UtcNow - _lastPongReceived).TotalMilliseconds > PongTimeout)
|
{
|
_logger.LogWarning("PONG响应超时,重新连接...");
|
await ReconnectAsync();
|
return;
|
}
|
|
// 发送PING
|
await SendAsync("PONG");
|
_logger.LogDebug("发送心跳检测");
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "发送心跳失败");
|
await ReconnectAsync();
|
}
|
}
|
|
private async Task ReceiveMessagesAsync()
|
{
|
var buffer = ArrayPool<byte>.Shared.Rent(4096);
|
try
|
{
|
while (IsConnected && !_cts.Token.IsCancellationRequested)
|
{
|
try
|
{
|
var result = await _webSocket.ReceiveAsync(
|
new ArraySegment<byte>(buffer),
|
_cts.Token);
|
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
|
if (result.MessageType == WebSocketMessageType.Close)
|
{
|
await DisconnectAsync();
|
break;
|
}
|
|
if (result.MessageType == WebSocketMessageType.Text)
|
{
|
|
MessageReceived?.Invoke(this, message);
|
_logger.LogDebug("收到消息: {Message}", message);
|
}
|
// 可添加对二进制消息的处理
|
else if (result.MessageType == WebSocketMessageType.Binary)
|
{
|
_logger.LogWarning("收到二进制消息,忽略");
|
}
|
|
// 如果是最后一片消息且是心跳响应
|
if (result.EndOfMessage && message == "PONG")
|
{
|
_lastPongReceived = DateTime.UtcNow;
|
_logger.LogDebug("收到心跳响应");
|
}
|
}
|
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
|
{
|
_logger.LogWarning("连接提前关闭");
|
await DisconnectAsync();
|
break;
|
}
|
catch (OperationCanceledException)
|
{
|
_logger.LogDebug("接收消息操作已取消");
|
break;
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "接收消息错误");
|
await ReconnectAsync();
|
break;
|
}
|
}
|
}
|
finally
|
{
|
ArrayPool<byte>.Shared.Return(buffer);
|
}
|
}
|
|
public async Task SendAsync(string message)
|
{
|
if (_webSocket == null || _webSocket.State != WebSocketState.Open)
|
{
|
_logger.LogWarning("WebSocket 未连接,尝试重新连接");
|
await ReconnectAsync();
|
if (_webSocket == null || _webSocket.State != WebSocketState.Open)
|
{
|
_logger.LogError("无法重新连接到 WebSocket");
|
return;
|
}
|
}
|
|
var buffer = Encoding.UTF8.GetBytes(message);
|
var segment = new ArraySegment<byte>(buffer);
|
|
try
|
{
|
await _webSocket.SendAsync(segment, WebSocketMessageType.Text, true, _cts.Token);
|
_logger.LogDebug("消息发送成功: {Message}", message);
|
}
|
catch (WebSocketException ex)
|
{
|
_logger.LogError(ex, "发送消息时 WebSocket 异常");
|
await DisconnectAsync();
|
}
|
catch (OperationCanceledException)
|
{
|
_logger.LogDebug("发送消息操作已取消");
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "发送消息时发生未知错误");
|
await ReconnectAsync();
|
}
|
}
|
|
public async Task DisconnectAsync()
|
{
|
if (_webSocket != null)
|
{
|
try
|
{
|
if (_webSocket.State == WebSocketState.Open)
|
{
|
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Connection closed normally", CancellationToken.None);
|
}
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "关闭WebSocket连接时出错");
|
}
|
finally
|
{
|
_webSocket.Dispose();
|
_webSocket = null;
|
}
|
}
|
}
|
|
private async Task SafeDisconnectAsync()
|
{
|
try
|
{
|
_heartbeatTimer?.Dispose();
|
_heartbeatTimer = null;
|
|
if (_webSocket?.State == WebSocketState.Open)
|
{
|
await _webSocket.CloseAsync(
|
WebSocketCloseStatus.NormalClosure,
|
"正常关闭",
|
CancellationToken.None);
|
}
|
}
|
catch (Exception ex)
|
{
|
_logger.LogWarning(ex, "关闭连接时出错");
|
}
|
finally
|
{
|
_webSocket?.Dispose();
|
_webSocket = new ClientWebSocket();
|
_logger.LogInformation("连接已关闭");
|
}
|
}
|
|
private async Task ReconnectAsync()
|
{
|
await DisconnectAsync();
|
|
int retryCount = 0;
|
const int maxRetries = 5;
|
const int delayMilliseconds = 1000;
|
|
while (retryCount < maxRetries && !_cts.Token.IsCancellationRequested)
|
{
|
try
|
{
|
await ConnectAsync(); // 假设 ConnectAsync 是你的连接方法
|
_logger.LogInformation("重新连接成功");
|
return;
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "重新连接失败");
|
retryCount++;
|
if (retryCount < maxRetries)
|
{
|
await Task.Delay(delayMilliseconds, _cts.Token);
|
}
|
}
|
}
|
|
_logger.LogError("达到最大重试次数,无法重新连接");
|
}
|
|
|
public void Dispose()
|
{
|
_cts.Cancel();
|
_heartbeatTimer?.Dispose();
|
_webSocket?.Dispose();
|
_connectionLock.Dispose();
|
_cts.Dispose();
|
GC.SuppressFinalize(this);
|
}
|
}
|