using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using WIDESEA_Core.BaseRepository; using WIDESEA_IStockService; using WIDESEA_Model.Models; using WIDESEA_WMSServer.Hubs; namespace WIDESEA_WMSServer.BackgroundServices { /// /// 库存监控后台服务。 /// 启动时初始化一次全量快照,后续仅按更新时间增量检查受影响货位,并通过 SignalR 推送变化。 /// public class StockMonitorBackgroundService : BackgroundService { private const int MonitorIntervalMs = 3000; private readonly ILogger _logger; private readonly IHubContext _hubContext; private readonly IServiceProvider _serviceProvider; /// /// 货位状态快照:key = LocationId。 /// private readonly ConcurrentDictionary _lastLocationSnapshots = new(); /// /// 库存所在货位映射:key = StockId, value = LocationId。 /// 用于识别库存从旧货位移动到新货位时,两边都需要推送刷新。 /// private readonly ConcurrentDictionary _lastStockLocationMap = new(); /// /// 上次增量检查时间戳。 /// 分别跟踪货位、库存主表、库存明细,避免每次全表扫描。 /// private DateTime _lastLocationCheckTime = DateTime.MinValue; private DateTime _lastStockCheckTime = DateTime.MinValue; private DateTime _lastDetailCheckTime = DateTime.MinValue; public StockMonitorBackgroundService( ILogger logger, IHubContext hubContext, IServiceProvider serviceProvider) { _logger = logger; _hubContext = hubContext; _serviceProvider = serviceProvider; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("库存监控后台服务已启动"); // 等待应用初始化完成,避免启动阶段与其他初始化任务争抢数据库资源。 await Task.Delay(5000, stoppingToken); await InitializeSnapshotsAsync(); while (!stoppingToken.IsCancellationRequested) { try { await CheckChangesAsync(); } catch (Exception ex) { _logger.LogError(ex, "检查库存变更时发生异常"); } await Task.Delay(MonitorIntervalMs, stoppingToken); } _logger.LogInformation("库存监控后台服务已停止"); } /// /// 初始化全量快照。仅在服务启动时执行一次,后续走增量检查。 /// private async Task InitializeSnapshotsAsync() { using var scope = _serviceProvider.CreateScope(); var stockService = scope.ServiceProvider.GetRequiredService(); var locationRepo = scope.ServiceProvider.GetRequiredService>(); var initializedAt = DateTime.Now; var allLocations = await locationRepo.Db.Queryable() .Where(x => x.LocationStatus != 99) .ToListAsync(); var allStocks = await stockService.Repository.Db.Queryable() .Includes(x => x.Details) .Where(x => x.LocationId != 0) .ToListAsync(); var stockDict = allStocks .Where(x => x.LocationId > 0) .GroupBy(x => x.LocationId) .ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First()); foreach (var location in allLocations) { stockDict.TryGetValue(location.Id, out var stock); _lastLocationSnapshots[location.Id] = BuildLocationSnapshot(location, stock); } foreach (var stock in allStocks.Where(x => x.LocationId > 0)) { _lastStockLocationMap[stock.Id] = stock.LocationId; } _lastLocationCheckTime = initializedAt; _lastStockCheckTime = initializedAt; _lastDetailCheckTime = initializedAt; _logger.LogInformation("库存监控快照初始化完成,货位数={LocationCount},库存数={StockCount}", allLocations.Count, allStocks.Count); } /// /// 增量检查货位和库存变化。 /// 只查询上次检查之后发生变化的货位、库存和明细,再回查受影响货位的当前完整数据。 /// private async Task CheckChangesAsync() { using var scope = _serviceProvider.CreateScope(); var stockService = scope.ServiceProvider.GetRequiredService(); var locationRepo = scope.ServiceProvider.GetRequiredService>(); var checkStartedAt = DateTime.Now; var changedLocationIds = await locationRepo.Db.Queryable() .Where(x => x.LocationStatus != 99 && (x.ModifyDate ?? x.CreateDate) > _lastLocationCheckTime) .Select(x => x.Id) .ToListAsync(); var changedStocks = await stockService.Repository.Db.Queryable() .Where(x => (x.ModifyDate ?? x.CreateDate) > _lastStockCheckTime) .Select(x => new StockLocationChange { StockId = x.Id, LocationId = x.LocationId }) .ToListAsync(); var changedDetailStockIds = await stockService.Repository.Db.Queryable() .Where(x => (x.ModifyDate ?? x.CreateDate) > _lastDetailCheckTime) .Select(x => x.StockId) .Distinct() .ToListAsync(); var affectedLocationIds = new HashSet(changedLocationIds); foreach (var stock in changedStocks) { var previousLocationId = _lastStockLocationMap.TryGetValue(stock.StockId, out var oldLocationId) ? oldLocationId : 0; if (previousLocationId > 0 && previousLocationId != stock.LocationId) { affectedLocationIds.Add(previousLocationId); } if (stock.LocationId > 0) { affectedLocationIds.Add(stock.LocationId); _lastStockLocationMap[stock.StockId] = stock.LocationId; } else { _lastStockLocationMap.TryRemove(stock.StockId, out _); } } if (changedDetailStockIds.Count > 0) { var detailAffectedLocationIds = await stockService.Repository.Db.Queryable() .Where(x => changedDetailStockIds.Contains(x.Id) && x.LocationId != 0) .Select(x => x.LocationId) .Distinct() .ToListAsync(); foreach (var locationId in detailAffectedLocationIds) { affectedLocationIds.Add(locationId); } } if (affectedLocationIds.Count == 0) { UpdateCheckTimes(checkStartedAt); return; } var affectedLocations = await locationRepo.Db.Queryable() .Where(x => affectedLocationIds.Contains(x.Id) && x.LocationStatus != 99) .ToListAsync(); var locationDict = affectedLocations.ToDictionary(x => x.Id, x => x); var affectedStocks = await stockService.Repository.Db.Queryable() .Includes(x => x.Details) .Where(x => affectedLocationIds.Contains(x.LocationId)) .ToListAsync(); var stockDict = affectedStocks .Where(x => x.LocationId > 0) .GroupBy(x => x.LocationId) .ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First()); foreach (var locationId in affectedLocationIds) { if (!locationDict.TryGetValue(locationId, out var location)) { _lastLocationSnapshots.TryRemove(locationId, out _); continue; } stockDict.TryGetValue(locationId, out var stock); var snapshot = BuildLocationSnapshot(location, stock); if (HasSnapshotChanged(locationId, snapshot)) { var update = new StockUpdateDTO { LocationId = snapshot.LocationId, WarehouseId = snapshot.WarehouseId, PalletCode = snapshot.PalletCode, StockQuantity = snapshot.StockQuantity, StockStatus = snapshot.StockStatus, LocationStatus = snapshot.LocationStatus, OutboundDate = snapshot.OutboundDate, Details = BuildDetailDtos(stock?.Details?.ToList()) }; await _hubContext.Clients.All.SendAsync("StockUpdated", update); _logger.LogDebug( "增量库存变更推送,LocationId={LocationId},LocationStatus={LocationStatus},StockStatus={StockStatus},Quantity={Quantity}", snapshot.LocationId, snapshot.LocationStatus, snapshot.StockStatus, snapshot.OutboundDate, snapshot.StockQuantity); } _lastLocationSnapshots[locationId] = snapshot; } UpdateCheckTimes(checkStartedAt); } /// /// 构建货位快照,用于判断货位是否需要推送更新。 /// private LocationSnapshot BuildLocationSnapshot(Dt_LocationInfo location, Dt_StockInfo stock) { float totalQuantity = 0; string detailsHash = string.Empty; if (stock?.Details != null && stock.Details.Any()) { totalQuantity = stock.Details.Sum(d => d.StockQuantity); detailsHash = GenerateDetailsHash(stock.Details.ToList()); } return new LocationSnapshot { LocationId = location.Id, WarehouseId = location.WarehouseId, LocationCode = location.LocationCode, LocationStatus = location.LocationStatus, PalletCode = stock?.PalletCode, StockStatus = stock?.StockStatus ?? 0, StockQuantity = totalQuantity, DetailsHash = detailsHash, OutboundDate = stock?.OutboundDate ?? default }; } /// /// 对比快照变化,仅在关键字段变化时触发推送。 /// private bool HasSnapshotChanged(int locationId, LocationSnapshot snapshot) { if (!_lastLocationSnapshots.TryGetValue(locationId, out var lastSnapshot)) { return true; } return lastSnapshot.LocationStatus != snapshot.LocationStatus || lastSnapshot.StockStatus != snapshot.StockStatus || lastSnapshot.PalletCode != snapshot.PalletCode || Math.Abs(lastSnapshot.StockQuantity - snapshot.StockQuantity) > 0.001f || lastSnapshot.DetailsHash != snapshot.DetailsHash; } /// /// 生成库存明细哈希,确保明细内容变化时能触发前端刷新。 /// private string GenerateDetailsHash(List details) { if (details == null || !details.Any()) { return string.Empty; } var hashString = string.Join( "|", details .OrderBy(d => d.Id) .Select(d => $"{d.Id}:{d.MaterielCode}:{d.BatchNo}:{d.StockQuantity}:{d.SerialNumber}:{d.InboundOrderRowNo}:{d.Status}")); return hashString.GetHashCode().ToString(); } /// /// 构建推送给前端的明细数据。 /// private List BuildDetailDtos(List details) { if (details == null || !details.Any()) { return new List(); } return details.Select(d => new StockDetailUpdateDTO { Id = d.Id, MaterielCode = d.MaterielCode, MaterielName = d.MaterielName, BatchNo = d.BatchNo, StockQuantity = d.StockQuantity, Unit = d.Unit, Status = d.Status, SerialNumber = d.SerialNumber, InboundOrderRowNo = d.InboundOrderRowNo }).ToList(); } /// /// 更新各类增量检查时间。 /// private void UpdateCheckTimes(DateTime checkStartedAt) { _lastLocationCheckTime = checkStartedAt; _lastStockCheckTime = checkStartedAt; _lastDetailCheckTime = checkStartedAt; } /// /// 货位快照。 /// private class LocationSnapshot { public int LocationId { get; set; } public int WarehouseId { get; set; } public string LocationCode { get; set; } public int LocationStatus { get; set; } public string PalletCode { get; set; } public int StockStatus { get; set; } public float StockQuantity { get; set; } public string DetailsHash { get; set; } public DateTime OutboundDate { get; set; } } /// /// 库存变更定位信息。 /// private class StockLocationChange { public int StockId { get; set; } public int LocationId { get; set; } } } }