using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using WIDESEA_Core.BaseRepository;
using WIDESEA_IStockService;
using WIDESEA_Model.Models;
using WIDESEA_WMSServer.Hubs;
namespace WIDESEA_WMSServer.BackgroundServices
{
///
/// 库存监控后台服务。
/// 启动时初始化一次全量快照,后续仅按更新时间增量检查受影响货位,并通过 SignalR 推送变化。
///
public class StockMonitorBackgroundService : BackgroundService
{
private const int MonitorIntervalMs = 3000;
private readonly ILogger _logger;
private readonly IHubContext _hubContext;
private readonly IServiceProvider _serviceProvider;
///
/// 货位状态快照:key = LocationId。
///
private readonly ConcurrentDictionary _lastLocationSnapshots = new();
///
/// 库存所在货位映射:key = StockId, value = LocationId。
/// 用于识别库存从旧货位移动到新货位时,两边都需要推送刷新。
///
private readonly ConcurrentDictionary _lastStockLocationMap = new();
///
/// 上次增量检查时间戳。
/// 分别跟踪货位、库存主表、库存明细,避免每次全表扫描。
///
private DateTime _lastLocationCheckTime = DateTime.MinValue;
private DateTime _lastStockCheckTime = DateTime.MinValue;
private DateTime _lastDetailCheckTime = DateTime.MinValue;
public StockMonitorBackgroundService(
ILogger logger,
IHubContext hubContext,
IServiceProvider serviceProvider)
{
_logger = logger;
_hubContext = hubContext;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("库存监控后台服务已启动");
// 等待应用初始化完成,避免启动阶段与其他初始化任务争抢数据库资源。
await Task.Delay(5000, stoppingToken);
await InitializeSnapshotsAsync();
while (!stoppingToken.IsCancellationRequested)
{
try
{
await CheckChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "检查库存变更时发生异常");
}
await Task.Delay(MonitorIntervalMs, stoppingToken);
}
_logger.LogInformation("库存监控后台服务已停止");
}
///
/// 初始化全量快照。仅在服务启动时执行一次,后续走增量检查。
///
private async Task InitializeSnapshotsAsync()
{
using var scope = _serviceProvider.CreateScope();
var stockService = scope.ServiceProvider.GetRequiredService();
var locationRepo = scope.ServiceProvider.GetRequiredService>();
var initializedAt = DateTime.Now;
var allLocations = await locationRepo.Db.Queryable()
.Where(x => x.LocationStatus != 99)
.ToListAsync();
var allStocks = await stockService.Repository.Db.Queryable()
.Includes(x => x.Details)
.Where(x => x.LocationId != 0)
.ToListAsync();
var stockDict = allStocks
.Where(x => x.LocationId > 0)
.GroupBy(x => x.LocationId)
.ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First());
foreach (var location in allLocations)
{
stockDict.TryGetValue(location.Id, out var stock);
_lastLocationSnapshots[location.Id] = BuildLocationSnapshot(location, stock);
}
foreach (var stock in allStocks.Where(x => x.LocationId > 0))
{
_lastStockLocationMap[stock.Id] = stock.LocationId;
}
_lastLocationCheckTime = initializedAt;
_lastStockCheckTime = initializedAt;
_lastDetailCheckTime = initializedAt;
_logger.LogInformation("库存监控快照初始化完成,货位数={LocationCount},库存数={StockCount}", allLocations.Count, allStocks.Count);
}
///
/// 增量检查货位和库存变化。
/// 只查询上次检查之后发生变化的货位、库存和明细,再回查受影响货位的当前完整数据。
///
private async Task CheckChangesAsync()
{
using var scope = _serviceProvider.CreateScope();
var stockService = scope.ServiceProvider.GetRequiredService();
var locationRepo = scope.ServiceProvider.GetRequiredService>();
var checkStartedAt = DateTime.Now;
var changedLocationIds = await locationRepo.Db.Queryable()
.Where(x => x.LocationStatus != 99 && (x.ModifyDate ?? x.CreateDate) > _lastLocationCheckTime)
.Select(x => x.Id)
.ToListAsync();
var changedStocks = await stockService.Repository.Db.Queryable()
.Where(x => (x.ModifyDate ?? x.CreateDate) > _lastStockCheckTime)
.Select(x => new StockLocationChange
{
StockId = x.Id,
LocationId = x.LocationId
})
.ToListAsync();
var changedDetailStockIds = await stockService.Repository.Db.Queryable()
.Where(x => (x.ModifyDate ?? x.CreateDate) > _lastDetailCheckTime)
.Select(x => x.StockId)
.Distinct()
.ToListAsync();
var affectedLocationIds = new HashSet(changedLocationIds);
foreach (var stock in changedStocks)
{
var previousLocationId = _lastStockLocationMap.TryGetValue(stock.StockId, out var oldLocationId)
? oldLocationId
: 0;
if (previousLocationId > 0 && previousLocationId != stock.LocationId)
{
affectedLocationIds.Add(previousLocationId);
}
if (stock.LocationId > 0)
{
affectedLocationIds.Add(stock.LocationId);
_lastStockLocationMap[stock.StockId] = stock.LocationId;
}
else
{
_lastStockLocationMap.TryRemove(stock.StockId, out _);
}
}
if (changedDetailStockIds.Count > 0)
{
var detailAffectedLocationIds = await stockService.Repository.Db.Queryable()
.Where(x => changedDetailStockIds.Contains(x.Id) && x.LocationId != 0)
.Select(x => x.LocationId)
.Distinct()
.ToListAsync();
foreach (var locationId in detailAffectedLocationIds)
{
affectedLocationIds.Add(locationId);
}
}
if (affectedLocationIds.Count == 0)
{
UpdateCheckTimes(checkStartedAt);
return;
}
var affectedLocations = await locationRepo.Db.Queryable()
.Where(x => affectedLocationIds.Contains(x.Id) && x.LocationStatus != 99)
.ToListAsync();
var locationDict = affectedLocations.ToDictionary(x => x.Id, x => x);
var affectedStocks = await stockService.Repository.Db.Queryable()
.Includes(x => x.Details)
.Where(x => affectedLocationIds.Contains(x.LocationId))
.ToListAsync();
var stockDict = affectedStocks
.Where(x => x.LocationId > 0)
.GroupBy(x => x.LocationId)
.ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First());
foreach (var locationId in affectedLocationIds)
{
if (!locationDict.TryGetValue(locationId, out var location))
{
_lastLocationSnapshots.TryRemove(locationId, out _);
continue;
}
stockDict.TryGetValue(locationId, out var stock);
var snapshot = BuildLocationSnapshot(location, stock);
if (HasSnapshotChanged(locationId, snapshot))
{
var update = new StockUpdateDTO
{
LocationId = snapshot.LocationId,
WarehouseId = snapshot.WarehouseId,
PalletCode = snapshot.PalletCode,
StockQuantity = snapshot.StockQuantity,
StockStatus = snapshot.StockStatus,
LocationStatus = snapshot.LocationStatus,
OutboundDate = snapshot.OutboundDate,
Details = BuildDetailDtos(stock?.Details?.ToList())
};
await _hubContext.Clients.All.SendAsync("StockUpdated", update);
_logger.LogDebug(
"增量库存变更推送,LocationId={LocationId},LocationStatus={LocationStatus},StockStatus={StockStatus},Quantity={Quantity}",
snapshot.LocationId,
snapshot.LocationStatus,
snapshot.StockStatus,
snapshot.OutboundDate,
snapshot.StockQuantity);
}
_lastLocationSnapshots[locationId] = snapshot;
}
UpdateCheckTimes(checkStartedAt);
}
///
/// 构建货位快照,用于判断货位是否需要推送更新。
///
private LocationSnapshot BuildLocationSnapshot(Dt_LocationInfo location, Dt_StockInfo stock)
{
float totalQuantity = 0;
string detailsHash = string.Empty;
if (stock?.Details != null && stock.Details.Any())
{
totalQuantity = stock.Details.Sum(d => d.StockQuantity);
detailsHash = GenerateDetailsHash(stock.Details.ToList());
}
return new LocationSnapshot
{
LocationId = location.Id,
WarehouseId = location.WarehouseId,
LocationCode = location.LocationCode,
LocationStatus = location.LocationStatus,
PalletCode = stock?.PalletCode,
StockStatus = stock?.StockStatus ?? 0,
StockQuantity = totalQuantity,
DetailsHash = detailsHash,
OutboundDate = stock?.OutboundDate ?? default
};
}
///
/// 对比快照变化,仅在关键字段变化时触发推送。
///
private bool HasSnapshotChanged(int locationId, LocationSnapshot snapshot)
{
if (!_lastLocationSnapshots.TryGetValue(locationId, out var lastSnapshot))
{
return true;
}
return lastSnapshot.LocationStatus != snapshot.LocationStatus ||
lastSnapshot.StockStatus != snapshot.StockStatus ||
lastSnapshot.PalletCode != snapshot.PalletCode ||
Math.Abs(lastSnapshot.StockQuantity - snapshot.StockQuantity) > 0.001f ||
lastSnapshot.DetailsHash != snapshot.DetailsHash;
}
///
/// 生成库存明细哈希,确保明细内容变化时能触发前端刷新。
///
private string GenerateDetailsHash(List details)
{
if (details == null || !details.Any())
{
return string.Empty;
}
var hashString = string.Join(
"|",
details
.OrderBy(d => d.Id)
.Select(d => $"{d.Id}:{d.MaterielCode}:{d.BatchNo}:{d.StockQuantity}:{d.SerialNumber}:{d.InboundOrderRowNo}:{d.Status}"));
return hashString.GetHashCode().ToString();
}
///
/// 构建推送给前端的明细数据。
///
private List BuildDetailDtos(List details)
{
if (details == null || !details.Any())
{
return new List();
}
return details.Select(d => new StockDetailUpdateDTO
{
Id = d.Id,
MaterielCode = d.MaterielCode,
MaterielName = d.MaterielName,
BatchNo = d.BatchNo,
StockQuantity = d.StockQuantity,
Unit = d.Unit,
Status = d.Status,
SerialNumber = d.SerialNumber,
InboundOrderRowNo = d.InboundOrderRowNo
}).ToList();
}
///
/// 更新各类增量检查时间。
///
private void UpdateCheckTimes(DateTime checkStartedAt)
{
_lastLocationCheckTime = checkStartedAt;
_lastStockCheckTime = checkStartedAt;
_lastDetailCheckTime = checkStartedAt;
}
///
/// 货位快照。
///
private class LocationSnapshot
{
public int LocationId { get; set; }
public int WarehouseId { get; set; }
public string LocationCode { get; set; }
public int LocationStatus { get; set; }
public string PalletCode { get; set; }
public int StockStatus { get; set; }
public float StockQuantity { get; set; }
public string DetailsHash { get; set; }
public DateTime OutboundDate { get; set; }
}
///
/// 库存变更定位信息。
///
private class StockLocationChange
{
public int StockId { get; set; }
public int LocationId { get; set; }
}
}
}