| | |
| | | using Microsoft.Extensions.Logging; |
| | | using Newtonsoft.Json; |
| | | using WIDESEAWCS_Common; |
| | | using WIDESEAWCS_Core.Caches; |
| | | using WIDESEAWCS_QuartzJob; |
| | | using WIDESEAWCS_Core.LogHelper; |
| | | using WIDESEAWCS_ITaskInfoRepository; |
| | | using WIDESEAWCS_Model.Models; |
| | | |
| | | namespace WIDESEAWCS_Tasks |
| | | { |
| | | /// <summary> |
| | | /// 机械手状态管理器 - 负责RobotSocketState的安全更新和克隆 |
| | | /// 机械手状态管理器 - 负责 RobotSocketState 的线程安全更新和克隆 |
| | | /// </summary> |
| | | /// <remarks> |
| | | /// 核心功能是通过 IRobotStateRepository 管理数据库中的机械手状态。 |
| | | /// 提供乐观并发控制,通过 RowVersion 防止并发更新时的数据覆盖问题。 |
| | | /// </remarks> |
| | | public class RobotStateManager |
| | | { |
| | | private readonly ICacheService _cache; |
| | | /// <summary> |
| | | /// 仓储服务实例,用于读写数据库中的状态数据 |
| | | /// </summary> |
| | | private readonly IRobotStateRepository _repository; |
| | | |
| | | public RobotStateManager(ICacheService cache) |
| | | /// <summary> |
| | | /// 日志记录器 |
| | | /// </summary> |
| | | private readonly ILogger _logger; |
| | | |
| | | /// <summary> |
| | | /// 构造函数 |
| | | /// </summary> |
| | | /// <param name="repository">仓储服务实例</param> |
| | | /// <param name="logger">日志记录器</param> |
| | | public RobotStateManager(IRobotStateRepository repository, ILogger logger) |
| | | { |
| | | _cache = cache; |
| | | _repository = repository; |
| | | _logger = logger; |
| | | } |
| | | |
| | | /// <summary> |
| | | /// 安全更新RobotSocketState缓存,防止并发覆盖 |
| | | /// 安全更新 RobotSocketState 缓存,防止并发覆盖 |
| | | /// </summary> |
| | | /// <param name="ipAddress">设备IP地址</param> |
| | | /// <param name="updateAction">更新状态的委托(传入当前状态,返回修改后的新状态)</param> |
| | | /// <returns>是否更新成功</returns> |
| | | /// <remarks> |
| | | /// 使用乐观并发模式:先读取当前 RowVersion,执行更新时检查版本是否一致。 |
| | | /// 如果 RowVersion 不匹配(说明有其他线程已更新),则更新失败返回 false。 |
| | | /// </remarks> |
| | | /// <param name="ipAddress">设备 IP 地址</param> |
| | | /// <param name="updateAction">更新状态的委托函数,传入当前状态副本,返回修改后的新状态</param> |
| | | /// <returns>是否更新成功;false 表示版本冲突或状态不存在</returns> |
| | | public bool TryUpdateStateSafely(string ipAddress, Func<RobotSocketState, RobotSocketState> updateAction) |
| | | { |
| | | var cacheKey = GetCacheKey(ipAddress); |
| | | var currentState = _cache.Get<RobotSocketState>(cacheKey); |
| | | // 从数据库获取当前存储的状态 |
| | | var currentEntity = _repository.GetByIp(ipAddress); |
| | | |
| | | if (currentState == null) |
| | | if (currentEntity == null) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | // 使用当前存储的版本号作为期望版本 |
| | | var expectedVersion = currentState.Version; |
| | | // 记录当前存储的 RowVersion,作为更新时的期望版本 |
| | | var expectedRowVersion = currentEntity.RowVersion; |
| | | |
| | | // 创建状态副本进行修改(避免修改原对象引用) |
| | | var stateCopy = CloneState(currentState); |
| | | // 创建状态的深拷贝副本(使用 JSON 序列化实现) |
| | | var stateCopy = CloneState(_repository.ToSocketState(currentEntity)); |
| | | |
| | | // 执行调用者提供的更新逻辑,传入副本状态,获取新的状态对象 |
| | | var newState = updateAction(stateCopy); |
| | | newState.Version = DateTime.UtcNow.Ticks; |
| | | |
| | | return _cache.TrySafeUpdate( |
| | | cacheKey, |
| | | newState, |
| | | expectedVersion, |
| | | s => s.Version |
| | | ); |
| | | // 将新状态转换为数据库实体 |
| | | var newEntity = _repository.ToEntity(newState); |
| | | newEntity.RowVersion = Array.Empty<byte>(); // SqlSugar 会自动管理 |
| | | newEntity.Id = currentEntity.Id; |
| | | |
| | | // 调用仓储的安全更新方法,传入期望 RowVersion |
| | | // 如果 RowVersion 不一致(已被其他线程更新),则更新失败 |
| | | return _repository.TryUpdate(ipAddress, newEntity, expectedRowVersion); |
| | | } |
| | | |
| | | /// <summary> |
| | | /// 安全更新RobotSocketState缓存(简单版本) |
| | | /// 安全更新 RobotSocketState 的重载版本(直接传入新状态) |
| | | /// </summary> |
| | | /// <param name="ipAddress">设备IP地址</param> |
| | | /// <param name="newState">新状态(会被更新Version字段)</param> |
| | | /// <returns>是否更新成功</returns> |
| | | /// <remarks> |
| | | /// 与上一个重载的区别:此方法直接接收完整的新状态对象,而不是更新委托。 |
| | | /// 如果数据库中不存在该设备的状态,则创建新记录。 |
| | | /// </remarks> |
| | | /// <param name="ipAddress">设备 IP 地址</param> |
| | | /// <param name="newState">新状态对象</param> |
| | | /// <returns>是否更新成功;新建设置为 true</returns> |
| | | public bool TryUpdateStateSafely(string ipAddress, RobotSocketState newState) |
| | | { |
| | | var cacheKey = GetCacheKey(ipAddress); |
| | | var currentState = _cache.Get<RobotSocketState>(cacheKey); |
| | | // 从数据库获取当前存储的状态 |
| | | var currentEntity = _repository.GetByIp(ipAddress); |
| | | |
| | | if (currentState == null) |
| | | // 如果当前不存在该设备的状态,创建新记录 |
| | | if (currentEntity == null) |
| | | { |
| | | // 当前不存在,直接添加 |
| | | newState.Version = DateTime.UtcNow.Ticks; |
| | | _cache.AddObject(cacheKey, newState); |
| | | var entity = _repository.ToEntity(newState); |
| | | entity.CreateTime = DateTime.Now; |
| | | entity.UpdateTime = DateTime.Now; |
| | | _repository.GetOrCreate(newState.IPAddress, newState.RobotCrane ?? new RobotCraneDevice()); |
| | | _logger.LogDebug("TryUpdateStateSafely:创建新状态,IP: {IpAddress}", ipAddress); |
| | | QuartzLogger.Debug($"创建新状态,IP: {ipAddress}", ipAddress); |
| | | return true; |
| | | } |
| | | |
| | | // 使用当前存储的版本号作为期望版本 |
| | | var expectedVersion = currentState.Version; |
| | | // 当前存在状态,记录期望 RowVersion 用于乐观锁检查 |
| | | var expectedRowVersion = currentEntity.RowVersion; |
| | | |
| | | // 更新新状态的版本号为最新时间戳 |
| | | newState.Version = DateTime.UtcNow.Ticks; |
| | | // 将新状态转换为数据库实体 |
| | | var newEntity = _repository.ToEntity(newState); |
| | | newEntity.Id = currentEntity.Id; |
| | | newEntity.RowVersion = Array.Empty<byte>(); |
| | | |
| | | return _cache.TrySafeUpdate( |
| | | cacheKey, |
| | | newState, |
| | | expectedVersion, |
| | | s => s.Version |
| | | ); |
| | | // 尝试安全更新,如果版本冲突则返回 false |
| | | bool success = _repository.TryUpdate(ipAddress, newEntity, expectedRowVersion); |
| | | |
| | | if (!success) |
| | | { |
| | | _logger.LogWarning("TryUpdateStateSafely:版本冲突,更新失败,IP: {IpAddress},期望版本字节长度: {ExpectedLength}", ipAddress, expectedRowVersion.Length); |
| | | QuartzLogger.Warn($"版本冲突,更新失败,IP: {ipAddress}", ipAddress); |
| | | } |
| | | |
| | | return success; |
| | | } |
| | | |
| | | /// <summary> |
| | | /// 克隆RobotSocketState对象(创建深拷贝) |
| | | /// 克隆 RobotSocketState 对象(深拷贝) |
| | | /// </summary> |
| | | /// <remarks> |
| | | /// 使用 JSON 序列化/反序列化实现深拷贝。 |
| | | /// 这样可以确保新对象与原对象完全独立,修改新对象不会影响原对象。 |
| | | /// </remarks> |
| | | /// <param name="source">源状态对象</param> |
| | | /// <returns>新的状态对象,是源对象的深拷贝</returns> |
| | | public RobotSocketState CloneState(RobotSocketState source) |
| | | { |
| | | // 使用序列化/反序列化进行深拷贝 |
| | | var json = JsonConvert.SerializeObject(source); |
| | | return JsonConvert.DeserializeObject<RobotSocketState>(json) ?? new RobotSocketState { IPAddress = source.IPAddress }; |
| | | } |
| | | |
| | | /// <summary> |
| | | /// 获取Redis缓存键 |
| | | /// 从数据库获取机械手状态 |
| | | /// </summary> |
| | | public static string GetCacheKey(string ipAddress) |
| | | { |
| | | return $"{RedisPrefix.Code}:{RedisName.SocketDevices}:{ipAddress}"; |
| | | } |
| | | |
| | | /// <summary> |
| | | /// 从缓存获取状态 |
| | | /// </summary> |
| | | /// <param name="ipAddress">设备 IP 地址</param> |
| | | /// <returns>如果存在则返回状态对象,否则返回 null</returns> |
| | | public RobotSocketState? GetState(string ipAddress) |
| | | { |
| | | return _cache.Get<RobotSocketState>(GetCacheKey(ipAddress)); |
| | | var entity = _repository.GetByIp(ipAddress); |
| | | return entity != null ? _repository.ToSocketState(entity) : null; |
| | | } |
| | | |
| | | /// <summary> |
| | | /// 获取或创建状态 |
| | | /// 获取或创建机械手状态 |
| | | /// </summary> |
| | | /// <remarks> |
| | | /// 如果数据库中已存在该设备的状态,直接返回。 |
| | | /// 如果不存在,则创建新的状态记录并返回。 |
| | | /// </remarks> |
| | | /// <param name="ipAddress">设备 IP 地址</param> |
| | | /// <param name="robotCrane">机器人设备信息,用于初始化新状态</param> |
| | | /// <returns>该设备的状态对象</returns> |
| | | public RobotSocketState GetOrCreateState(string ipAddress, RobotCraneDevice robotCrane) |
| | | { |
| | | return _cache.GetOrAdd(GetCacheKey(ipAddress), _ => new RobotSocketState |
| | | { |
| | | IPAddress = ipAddress, |
| | | RobotCrane = robotCrane |
| | | }); |
| | | var entity = _repository.GetOrCreate(ipAddress, robotCrane); |
| | | return _repository.ToSocketState(entity); |
| | | } |
| | | } |
| | | } |