wanshenmean
3 天以前 b690250002ee04f4309e6a90fd16fbfd9bd959e2
Code/WCS/WIDESEAWCS_Server/WIDESEAWCS_Tasks/RobotJob/RobotStateManager.cs
@@ -1,7 +1,6 @@
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using Newtonsoft.Json;
using WIDESEAWCS_Common;
using WIDESEAWCS_Core.LogHelper;
using Serilog;
using WIDESEAWCS_ITaskInfoRepository;
using WIDESEAWCS_Model.Models;
@@ -12,7 +11,8 @@
    /// </summary>
    /// <remarks>
    /// 核心功能是通过 IRobotStateRepository 管理数据库中的机械手状态。
    /// 提供乐观并发控制,通过 RowVersion 防止并发更新时的数据覆盖问题。
    /// 提供乐观并发控制,通过 Version 字段防止并发更新时的数据覆盖问题。
    /// 同时提供基于 SemaphoreSlim 的互斥锁,确保消息处理与 Job 执行不会并发操作同一设备状态。
    /// </remarks>
    public class RobotStateManager
    {
@@ -27,6 +27,15 @@
        private readonly ILogger _logger;
        /// <summary>
        /// 每个设备的异步互斥锁字典,用于 Job 执行与消息处理之间的互斥
        /// </summary>
        /// <remarks>
        /// Key 为设备 IP 地址,Value 为该设备专属的 SemaphoreSlim(1,1)。
        /// 确保同一设备的 Job 轮询和 TCP 消息处理不会同时操作状态。
        /// </remarks>
        private readonly ConcurrentDictionary<string, SemaphoreSlim> _robotLocks = new();
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="repository">仓储服务实例</param>
@@ -38,11 +47,25 @@
        }
        /// <summary>
        /// 安全更新 RobotSocketState 缓存,防止并发覆盖
        /// 获取或创建指定设备的异步互斥锁
        /// </summary>
        /// <remarks>
        /// 使用乐观并发模式:先读取当前 RowVersion,执行更新时检查版本是否一致。
        /// 如果 RowVersion 不匹配(说明有其他线程已更新),则更新失败返回 false。
        /// 使用 ConcurrentDictionary 确保每个设备 IP 对应唯一的 SemaphoreSlim(1,1)。
        /// 该锁用于 Job 执行与 TCP 消息处理之间的互斥,防止并发操作同一设备状态。
        /// </remarks>
        /// <param name="ipAddress">设备 IP 地址</param>
        /// <returns>该设备的信号量实例</returns>
        public SemaphoreSlim GetOrCreateLock(string ipAddress)
        {
            return _robotLocks.GetOrAdd(ipAddress, _ => new SemaphoreSlim(1, 1));
        }
        /// <summary>
        /// 安全更新 RobotSocketState,防止并发覆盖
        /// </summary>
        /// <remarks>
        /// 使用乐观并发模式:先读取当前 Version,执行更新时检查版本是否一致。
        /// 如果 Version 不匹配(说明有其他线程已更新),则更新失败返回 false。
        /// </remarks>
        /// <param name="ipAddress">设备 IP 地址</param>
        /// <param name="updateAction">更新状态的委托函数,传入当前状态副本,返回修改后的新状态</param>
@@ -57,8 +80,8 @@
                return false;
            }
            // 记录当前存储的 RowVersion,作为更新时的期望版本
            var expectedRowVersion = currentEntity.RowVersion;
            // 记录当前存储的 Version,作为更新时的期望版本
            var expectedVersion = currentEntity.Version;
            // 创建状态的深拷贝副本(使用 JSON 序列化实现)
            var stateCopy = CloneState(_repository.ToSocketState(currentEntity));
@@ -68,12 +91,12 @@
            // 将新状态转换为数据库实体
            var newEntity = _repository.ToEntity(newState);
            newEntity.RowVersion = Array.Empty<byte>(); // SqlSugar 会自动管理
            newEntity.Id = currentEntity.Id;
            newEntity.Version = expectedVersion + 1; // 版本自增
            // 调用仓储的安全更新方法,传入期望 RowVersion
            // 如果 RowVersion 不一致(已被其他线程更新),则更新失败
            return _repository.TryUpdate(ipAddress, newEntity, expectedRowVersion);
            // 调用仓储的安全更新方法,传入期望 Version
            // 如果 Version 不一致(已被其他线程更新),则更新失败
            return _repository.TryUpdate(ipAddress, newEntity, expectedVersion);
        }
        /// <summary>
@@ -94,30 +117,25 @@
            // 如果当前不存在该设备的状态,创建新记录
            if (currentEntity == null)
            {
                var entity = _repository.ToEntity(newState);
                entity.CreateTime = DateTime.Now;
                entity.UpdateTime = DateTime.Now;
                _repository.GetOrCreate(newState.IPAddress, newState.RobotCrane ?? new RobotCraneDevice());
                _logger.LogDebug("TryUpdateStateSafely:创建新状态,IP: {IpAddress}", ipAddress);
                QuartzLogger.Debug($"创建新状态,IP: {ipAddress}", ipAddress);
                QuartzLogHelper.LogDebug(_logger, $"创建新状态,IP: {ipAddress}", ipAddress);
                return true;
            }
            // 当前存在状态,记录期望 RowVersion 用于乐观锁检查
            var expectedRowVersion = currentEntity.RowVersion;
            // 当前存在状态,记录期望 Version 用于乐观锁检查
            var expectedVersion = currentEntity.Version;
            // 将新状态转换为数据库实体
            var newEntity = _repository.ToEntity(newState);
            newEntity.Id = currentEntity.Id;
            newEntity.RowVersion = Array.Empty<byte>();
            newEntity.Version = expectedVersion + 1; // 版本自增
            // 尝试安全更新,如果版本冲突则返回 false
            bool success = _repository.TryUpdate(ipAddress, newEntity, expectedRowVersion);
            bool success = _repository.TryUpdate(ipAddress, newEntity, expectedVersion);
            if (!success)
            {
                _logger.LogWarning("TryUpdateStateSafely:版本冲突,更新失败,IP: {IpAddress},期望版本字节长度: {ExpectedLength}", ipAddress, expectedRowVersion.Length);
                QuartzLogger.Warn($"版本冲突,更新失败,IP: {ipAddress}", ipAddress);
                QuartzLogHelper.LogWarn(_logger, $"TryUpdateStateSafely:版本冲突,更新失败,IP: {ipAddress},期望版本: {expectedVersion}", ipAddress);
            }
            return success;
@@ -165,4 +183,4 @@
            return _repository.ToSocketState(entity);
        }
    }
}
}