wanshenmean
5 天以前 682413a01effa5ae936e418fecdfd72f670d09ab
Code/WMS/WIDESEA_WMSServer/WIDESEA_WMSServer/BackgroundServices/StockMonitorBackgroundService.cs
@@ -1,8 +1,10 @@
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -14,20 +16,35 @@
namespace WIDESEA_WMSServer.BackgroundServices
{
    /// <summary>
    /// 库存监控后台服务
    /// 定期检查库存和货位数据变化并通过SignalR推送到前端
    /// 库存监控后台服务。
    /// 启动时初始化一次全量快照,后续仅按更新时间增量检查受影响货位,并通过 SignalR 推送变化。
    /// </summary>
    public class StockMonitorBackgroundService : BackgroundService
    {
        private const int MonitorIntervalMs = 3000;
        private readonly ILogger<StockMonitorBackgroundService> _logger;
        private readonly IHubContext<StockHub> _hubContext;
        private readonly IServiceProvider _serviceProvider;
        // 货位状态快照:key = LocationId
        private ConcurrentDictionary<int, LocationSnapshot> _lastLocationSnapshots = new();
        /// <summary>
        /// 货位状态快照:key = LocationId。
        /// </summary>
        private readonly ConcurrentDictionary<int, LocationSnapshot> _lastLocationSnapshots = new();
        // 监控间隔(毫秒)
        private const int MonitorIntervalMs = 3000;
        /// <summary>
        /// 库存所在货位映射:key = StockId, value = LocationId。
        /// 用于识别库存从旧货位移动到新货位时,两边都需要推送刷新。
        /// </summary>
        private readonly ConcurrentDictionary<int, int> _lastStockLocationMap = new();
        /// <summary>
        /// 上次增量检查时间戳。
        /// 分别跟踪货位、库存主表、库存明细,避免每次全表扫描。
        /// </summary>
        private DateTime _lastLocationCheckTime = DateTime.MinValue;
        private DateTime _lastStockCheckTime = DateTime.MinValue;
        private DateTime _lastDetailCheckTime = DateTime.MinValue;
        public StockMonitorBackgroundService(
            ILogger<StockMonitorBackgroundService> logger,
@@ -43,8 +60,9 @@
        {
            _logger.LogInformation("库存监控后台服务已启动");
            // 等待应用完全启动
            // 等待应用初始化完成,避免启动阶段与其他初始化任务争抢数据库资源。
            await Task.Delay(5000, stoppingToken);
            await InitializeSnapshotsAsync();
            while (!stoppingToken.IsCancellationRequested)
            {
@@ -54,7 +72,7 @@
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "检查数据变化时发生错误");
                    _logger.LogError(ex, "检查库存变更时发生异常");
                }
                await Task.Delay(MonitorIntervalMs, stoppingToken);
@@ -64,110 +82,252 @@
        }
        /// <summary>
        /// 检查货位和库存变化
        /// 初始化全量快照。仅在服务启动时执行一次,后续走增量检查。
        /// </summary>
        private async Task InitializeSnapshotsAsync()
        {
            using var scope = _serviceProvider.CreateScope();
            var stockService = scope.ServiceProvider.GetRequiredService<IStockInfoService>();
            var locationRepo = scope.ServiceProvider.GetRequiredService<IRepository<Dt_LocationInfo>>();
            var initializedAt = DateTime.Now;
            var allLocations = await locationRepo.Db.Queryable<Dt_LocationInfo>()
                .Where(x => x.LocationStatus != 99)
                .ToListAsync();
            var allStocks = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
                .Includes(x => x.Details)
                .Where(x => x.LocationId != 0)
                .ToListAsync();
            var stockDict = allStocks
                .Where(x => x.LocationId > 0)
                .GroupBy(x => x.LocationId)
                .ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First());
            foreach (var location in allLocations)
            {
                stockDict.TryGetValue(location.Id, out var stock);
                _lastLocationSnapshots[location.Id] = BuildLocationSnapshot(location, stock);
            }
            foreach (var stock in allStocks.Where(x => x.LocationId > 0))
            {
                _lastStockLocationMap[stock.Id] = stock.LocationId;
            }
            _lastLocationCheckTime = initializedAt;
            _lastStockCheckTime = initializedAt;
            _lastDetailCheckTime = initializedAt;
            _logger.LogInformation("库存监控快照初始化完成,货位数={LocationCount},库存数={StockCount}", allLocations.Count, allStocks.Count);
        }
        /// <summary>
        /// 增量检查货位和库存变化。
        /// 只查询上次检查之后发生变化的货位、库存和明细,再回查受影响货位的当前完整数据。
        /// </summary>
        private async Task CheckChangesAsync()
        {
            using var scope = _serviceProvider.CreateScope();
            var stockService = scope.ServiceProvider.GetRequiredService<IStockInfoService>();
            var locationRepo = scope.ServiceProvider.GetRequiredService<IRepository<Dt_LocationInfo>>();
            var checkStartedAt = DateTime.Now;
            // 1. 获取所有货位数据
            var allLocations = await locationRepo.QueryDataAsync(x => x.LocationStatus != 99); // 排除禁用的货位
            // 2. 获取所有库存数据(包含明细)
            var allStockData = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
                .Includes(x => x.Details)
            var changedLocationIds = await locationRepo.Db.Queryable<Dt_LocationInfo>()
                .Where(x => x.LocationStatus != 99 && (x.ModifyDate ?? x.CreateDate) > _lastLocationCheckTime)
                .Select(x => x.Id)
                .ToListAsync();
            // 构建库存字典:LocationId -> StockInfo
            var stockDict = allStockData
                .Where(s => s.LocationId > 0)
                .ToDictionary(s => s.LocationId, s => s);
            // 构建当前货位快照字典
            var currentSnapshots = new ConcurrentDictionary<int, LocationSnapshot>();
            foreach (var location in allLocations)
            {
                // 获取该货位的库存信息
                stockDict.TryGetValue(location.Id, out var stock);
                // 计算库存数量
                float totalQuantity = 0;
                string detailsHash = string.Empty;
                if (stock?.Details != null && stock.Details.Any())
            var changedStocks = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
                .Where(x => (x.ModifyDate ?? x.CreateDate) > _lastStockCheckTime)
                .Select(x => new StockLocationChange
                {
                    totalQuantity = stock.Details.Sum(d => d.StockQuantity);
                    detailsHash = GenerateDetailsHash(stock.Details.ToList());
                    StockId = x.Id,
                    LocationId = x.LocationId
                })
                .ToListAsync();
            var changedDetailStockIds = await stockService.Repository.Db.Queryable<Dt_StockInfoDetail>()
                .Where(x => (x.ModifyDate ?? x.CreateDate) > _lastDetailCheckTime)
                .Select(x => x.StockId)
                .Distinct()
                .ToListAsync();
            var affectedLocationIds = new HashSet<int>(changedLocationIds);
            foreach (var stock in changedStocks)
            {
                var previousLocationId = _lastStockLocationMap.TryGetValue(stock.StockId, out var oldLocationId)
                    ? oldLocationId
                    : 0;
                if (previousLocationId > 0 && previousLocationId != stock.LocationId)
                {
                    affectedLocationIds.Add(previousLocationId);
                }
                var snapshot = new LocationSnapshot
                if (stock.LocationId > 0)
                {
                    LocationId = location.Id,
                    WarehouseId = location.WarehouseId,
                    LocationCode = location.LocationCode,
                    LocationStatus = location.LocationStatus,
                    PalletCode = stock?.PalletCode,
                    StockStatus = stock?.StockStatus ?? 0,
                    StockQuantity = totalQuantity,
                    DetailsHash = detailsHash
                };
                currentSnapshots.TryAdd(location.Id, snapshot);
                // 检查是否有变化
                if (_lastLocationSnapshots.TryGetValue(location.Id, out var lastSnapshot))
                    affectedLocationIds.Add(stock.LocationId);
                    _lastStockLocationMap[stock.StockId] = stock.LocationId;
                }
                else
                {
                    // 检测变化:货位状态、库存状态、数量、明细变化
                    if (lastSnapshot.LocationStatus != snapshot.LocationStatus ||
                        lastSnapshot.StockStatus != snapshot.StockStatus ||
                        lastSnapshot.PalletCode != snapshot.PalletCode ||
                        Math.Abs(lastSnapshot.StockQuantity - snapshot.StockQuantity) > 0.001f ||
                        lastSnapshot.DetailsHash != snapshot.DetailsHash)
                    {
                        // 构建更新DTO
                        var update = new StockUpdateDTO
                        {
                            LocationId = snapshot.LocationId,
                            WarehouseId = snapshot.WarehouseId,
                            PalletCode = snapshot.PalletCode,
                            StockQuantity = snapshot.StockQuantity,
                            StockStatus = snapshot.StockStatus,
                            LocationStatus = snapshot.LocationStatus,
                            Details = BuildDetailDtos(stock?.Details?.ToList())
                        };
                        await _hubContext.Clients.All.SendAsync("StockUpdated", update);
                        _logger.LogDebug("数据变化推送: LocationId={LocationId}, LocStatus={LocStatus}, StockStatus={StockStatus}, Quantity={Quantity}",
                            snapshot.LocationId, snapshot.LocationStatus, snapshot.StockStatus, snapshot.StockQuantity);
                    }
                    _lastStockLocationMap.TryRemove(stock.StockId, out _);
                }
            }
            // 更新快照数据
            _lastLocationSnapshots = currentSnapshots;
            if (changedDetailStockIds.Count > 0)
            {
                var detailAffectedLocationIds = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
                    .Where(x => changedDetailStockIds.Contains(x.Id) && x.LocationId != 0)
                    .Select(x => x.LocationId)
                    .Distinct()
                    .ToListAsync();
                foreach (var locationId in detailAffectedLocationIds)
                {
                    affectedLocationIds.Add(locationId);
                }
            }
            if (affectedLocationIds.Count == 0)
            {
                UpdateCheckTimes(checkStartedAt);
                return;
            }
            var affectedLocations = await locationRepo.Db.Queryable<Dt_LocationInfo>()
                .Where(x => affectedLocationIds.Contains(x.Id) && x.LocationStatus != 99)
                .ToListAsync();
            var locationDict = affectedLocations.ToDictionary(x => x.Id, x => x);
            var affectedStocks = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
                .Includes(x => x.Details)
                .Where(x => affectedLocationIds.Contains(x.LocationId))
                .ToListAsync();
            var stockDict = affectedStocks
                .Where(x => x.LocationId > 0)
                .GroupBy(x => x.LocationId)
                .ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First());
            foreach (var locationId in affectedLocationIds)
            {
                if (!locationDict.TryGetValue(locationId, out var location))
                {
                    _lastLocationSnapshots.TryRemove(locationId, out _);
                    continue;
                }
                stockDict.TryGetValue(locationId, out var stock);
                var snapshot = BuildLocationSnapshot(location, stock);
                if (HasSnapshotChanged(locationId, snapshot))
                {
                    var update = new StockUpdateDTO
                    {
                        LocationId = snapshot.LocationId,
                        WarehouseId = snapshot.WarehouseId,
                        PalletCode = snapshot.PalletCode,
                        StockQuantity = snapshot.StockQuantity,
                        StockStatus = snapshot.StockStatus,
                        LocationStatus = snapshot.LocationStatus,
                        OutboundDate = snapshot.OutboundDate,
                        Details = BuildDetailDtos(stock?.Details?.ToList())
                    };
                    await _hubContext.Clients.All.SendAsync("StockUpdated", update);
                    _logger.LogDebug(
                        "增量库存变更推送,LocationId={LocationId},LocationStatus={LocationStatus},StockStatus={StockStatus},Quantity={Quantity}",
                        snapshot.LocationId,
                        snapshot.LocationStatus,
                        snapshot.StockStatus,
                        snapshot.OutboundDate,
                        snapshot.StockQuantity);
                }
                _lastLocationSnapshots[locationId] = snapshot;
            }
            UpdateCheckTimes(checkStartedAt);
        }
        /// <summary>
        /// 生成明细数据哈希
        /// 构建货位快照,用于判断货位是否需要推送更新。
        /// </summary>
        private LocationSnapshot BuildLocationSnapshot(Dt_LocationInfo location, Dt_StockInfo stock)
        {
            float totalQuantity = 0;
            string detailsHash = string.Empty;
            if (stock?.Details != null && stock.Details.Any())
            {
                totalQuantity = stock.Details.Sum(d => d.StockQuantity);
                detailsHash = GenerateDetailsHash(stock.Details.ToList());
            }
            return new LocationSnapshot
            {
                LocationId = location.Id,
                WarehouseId = location.WarehouseId,
                LocationCode = location.LocationCode,
                LocationStatus = location.LocationStatus,
                PalletCode = stock?.PalletCode,
                StockStatus = stock?.StockStatus ?? 0,
                StockQuantity = totalQuantity,
                DetailsHash = detailsHash,
                OutboundDate = stock?.OutboundDate ?? default
            };
        }
        /// <summary>
        /// 对比快照变化,仅在关键字段变化时触发推送。
        /// </summary>
        private bool HasSnapshotChanged(int locationId, LocationSnapshot snapshot)
        {
            if (!_lastLocationSnapshots.TryGetValue(locationId, out var lastSnapshot))
            {
                return true;
            }
            return lastSnapshot.LocationStatus != snapshot.LocationStatus ||
                   lastSnapshot.StockStatus != snapshot.StockStatus ||
                   lastSnapshot.PalletCode != snapshot.PalletCode ||
                   Math.Abs(lastSnapshot.StockQuantity - snapshot.StockQuantity) > 0.001f ||
                   lastSnapshot.DetailsHash != snapshot.DetailsHash;
        }
        /// <summary>
        /// 生成库存明细哈希,确保明细内容变化时能触发前端刷新。
        /// </summary>
        private string GenerateDetailsHash(List<Dt_StockInfoDetail> details)
        {
            if (details == null || !details.Any()) return string.Empty;
            if (details == null || !details.Any())
            {
                return string.Empty;
            }
            var hashString = string.Join("|", details
                .OrderBy(d => d.Id)
                .Select(d => $"{d.Id}:{d.MaterielCode}:{d.BatchNo}:{d.StockQuantity}"));
            var hashString = string.Join(
                "|",
                details
                    .OrderBy(d => d.Id)
                    .Select(d => $"{d.Id}:{d.MaterielCode}:{d.BatchNo}:{d.StockQuantity}:{d.SerialNumber}:{d.InboundOrderRowNo}:{d.Status}"));
            return hashString.GetHashCode().ToString();
        }
        /// <summary>
        /// 构建明细DTO列表
        /// 构建推送给前端的明细数据。
        /// </summary>
        private List<StockDetailUpdateDTO> BuildDetailDtos(List<Dt_StockInfoDetail> details)
        {
            if (details == null || !details.Any()) return new List<StockDetailUpdateDTO>();
            if (details == null || !details.Any())
            {
                return new List<StockDetailUpdateDTO>();
            }
            return details.Select(d => new StockDetailUpdateDTO
            {
@@ -177,12 +337,24 @@
                BatchNo = d.BatchNo,
                StockQuantity = d.StockQuantity,
                Unit = d.Unit,
                Status = d.Status
                Status = d.Status,
                SerialNumber = d.SerialNumber,
                InboundOrderRowNo = d.InboundOrderRowNo
            }).ToList();
        }
        /// <summary>
        /// 货位快照
        /// 更新各类增量检查时间。
        /// </summary>
        private void UpdateCheckTimes(DateTime checkStartedAt)
        {
            _lastLocationCheckTime = checkStartedAt;
            _lastStockCheckTime = checkStartedAt;
            _lastDetailCheckTime = checkStartedAt;
        }
        /// <summary>
        /// 货位快照。
        /// </summary>
        private class LocationSnapshot
        {
@@ -194,6 +366,16 @@
            public int StockStatus { get; set; }
            public float StockQuantity { get; set; }
            public string DetailsHash { get; set; }
            public DateTime OutboundDate { get; set; }
        }
        /// <summary>
        /// 库存变更定位信息。
        /// </summary>
        private class StockLocationChange
        {
            public int StockId { get; set; }
            public int LocationId { get; set; }
        }
    }
}