using Microsoft.AspNetCore.SignalR;
|
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Logging;
|
using System;
|
using System.Collections.Concurrent;
|
using System.Collections.Generic;
|
using System.Linq;
|
using System.Threading;
|
using System.Threading.Tasks;
|
using WIDESEA_Core.BaseRepository;
|
using WIDESEA_IStockService;
|
using WIDESEA_Model.Models;
|
using WIDESEA_WMSServer.Hubs;
|
|
namespace WIDESEA_WMSServer.BackgroundServices
|
{
|
/// <summary>
|
/// 库存监控后台服务。
|
/// 启动时初始化一次全量快照,后续仅按更新时间增量检查受影响货位,并通过 SignalR 推送变化。
|
/// </summary>
|
public class StockMonitorBackgroundService : BackgroundService
|
{
|
private const int MonitorIntervalMs = 3000;
|
|
private readonly ILogger<StockMonitorBackgroundService> _logger;
|
private readonly IHubContext<StockHub> _hubContext;
|
private readonly IServiceProvider _serviceProvider;
|
|
/// <summary>
|
/// 货位状态快照:key = LocationId。
|
/// </summary>
|
private readonly ConcurrentDictionary<int, LocationSnapshot> _lastLocationSnapshots = new();
|
|
/// <summary>
|
/// 库存所在货位映射:key = StockId, value = LocationId。
|
/// 用于识别库存从旧货位移动到新货位时,两边都需要推送刷新。
|
/// </summary>
|
private readonly ConcurrentDictionary<int, int> _lastStockLocationMap = new();
|
|
/// <summary>
|
/// 上次增量检查时间戳。
|
/// 分别跟踪货位、库存主表、库存明细,避免每次全表扫描。
|
/// </summary>
|
private DateTime _lastLocationCheckTime = DateTime.MinValue;
|
private DateTime _lastStockCheckTime = DateTime.MinValue;
|
private DateTime _lastDetailCheckTime = DateTime.MinValue;
|
|
public StockMonitorBackgroundService(
|
ILogger<StockMonitorBackgroundService> logger,
|
IHubContext<StockHub> hubContext,
|
IServiceProvider serviceProvider)
|
{
|
_logger = logger;
|
_hubContext = hubContext;
|
_serviceProvider = serviceProvider;
|
}
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
{
|
_logger.LogInformation("库存监控后台服务已启动");
|
|
// 等待应用初始化完成,避免启动阶段与其他初始化任务争抢数据库资源。
|
await Task.Delay(5000, stoppingToken);
|
await InitializeSnapshotsAsync();
|
|
while (!stoppingToken.IsCancellationRequested)
|
{
|
try
|
{
|
await CheckChangesAsync();
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "检查库存变更时发生异常");
|
}
|
|
await Task.Delay(MonitorIntervalMs, stoppingToken);
|
}
|
|
_logger.LogInformation("库存监控后台服务已停止");
|
}
|
|
/// <summary>
|
/// 初始化全量快照。仅在服务启动时执行一次,后续走增量检查。
|
/// </summary>
|
private async Task InitializeSnapshotsAsync()
|
{
|
using var scope = _serviceProvider.CreateScope();
|
var stockService = scope.ServiceProvider.GetRequiredService<IStockInfoService>();
|
var locationRepo = scope.ServiceProvider.GetRequiredService<IRepository<Dt_LocationInfo>>();
|
var initializedAt = DateTime.Now;
|
|
var allLocations = await locationRepo.Db.Queryable<Dt_LocationInfo>()
|
.Where(x => x.LocationStatus != 99)
|
.ToListAsync();
|
|
var allStocks = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
|
.Includes(x => x.Details)
|
.Where(x => x.LocationId != 0)
|
.ToListAsync();
|
|
var stockDict = allStocks
|
.Where(x => x.LocationId > 0)
|
.GroupBy(x => x.LocationId)
|
.ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First());
|
|
foreach (var location in allLocations)
|
{
|
stockDict.TryGetValue(location.Id, out var stock);
|
_lastLocationSnapshots[location.Id] = BuildLocationSnapshot(location, stock);
|
}
|
|
foreach (var stock in allStocks.Where(x => x.LocationId > 0))
|
{
|
_lastStockLocationMap[stock.Id] = stock.LocationId;
|
}
|
|
_lastLocationCheckTime = initializedAt;
|
_lastStockCheckTime = initializedAt;
|
_lastDetailCheckTime = initializedAt;
|
|
_logger.LogInformation("库存监控快照初始化完成,货位数={LocationCount},库存数={StockCount}", allLocations.Count, allStocks.Count);
|
}
|
|
/// <summary>
|
/// 增量检查货位和库存变化。
|
/// 只查询上次检查之后发生变化的货位、库存和明细,再回查受影响货位的当前完整数据。
|
/// </summary>
|
private async Task CheckChangesAsync()
|
{
|
using var scope = _serviceProvider.CreateScope();
|
var stockService = scope.ServiceProvider.GetRequiredService<IStockInfoService>();
|
var locationRepo = scope.ServiceProvider.GetRequiredService<IRepository<Dt_LocationInfo>>();
|
var checkStartedAt = DateTime.Now;
|
|
var changedLocationIds = await locationRepo.Db.Queryable<Dt_LocationInfo>()
|
.Where(x => x.LocationStatus != 99 && (x.ModifyDate ?? x.CreateDate) > _lastLocationCheckTime)
|
.Select(x => x.Id)
|
.ToListAsync();
|
|
var changedStocks = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
|
.Where(x => (x.ModifyDate ?? x.CreateDate) > _lastStockCheckTime)
|
.Select(x => new StockLocationChange
|
{
|
StockId = x.Id,
|
LocationId = x.LocationId
|
})
|
.ToListAsync();
|
|
var changedDetailStockIds = await stockService.Repository.Db.Queryable<Dt_StockInfoDetail>()
|
.Where(x => (x.ModifyDate ?? x.CreateDate) > _lastDetailCheckTime)
|
.Select(x => x.StockId)
|
.Distinct()
|
.ToListAsync();
|
|
var affectedLocationIds = new HashSet<int>(changedLocationIds);
|
|
foreach (var stock in changedStocks)
|
{
|
var previousLocationId = _lastStockLocationMap.TryGetValue(stock.StockId, out var oldLocationId)
|
? oldLocationId
|
: 0;
|
|
if (previousLocationId > 0 && previousLocationId != stock.LocationId)
|
{
|
affectedLocationIds.Add(previousLocationId);
|
}
|
|
if (stock.LocationId > 0)
|
{
|
affectedLocationIds.Add(stock.LocationId);
|
_lastStockLocationMap[stock.StockId] = stock.LocationId;
|
}
|
else
|
{
|
_lastStockLocationMap.TryRemove(stock.StockId, out _);
|
}
|
}
|
|
if (changedDetailStockIds.Count > 0)
|
{
|
var detailAffectedLocationIds = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
|
.Where(x => changedDetailStockIds.Contains(x.Id) && x.LocationId != 0)
|
.Select(x => x.LocationId)
|
.Distinct()
|
.ToListAsync();
|
|
foreach (var locationId in detailAffectedLocationIds)
|
{
|
affectedLocationIds.Add(locationId);
|
}
|
}
|
|
if (affectedLocationIds.Count == 0)
|
{
|
UpdateCheckTimes(checkStartedAt);
|
return;
|
}
|
|
var affectedLocations = await locationRepo.Db.Queryable<Dt_LocationInfo>()
|
.Where(x => affectedLocationIds.Contains(x.Id) && x.LocationStatus != 99)
|
.ToListAsync();
|
|
var locationDict = affectedLocations.ToDictionary(x => x.Id, x => x);
|
|
var affectedStocks = await stockService.Repository.Db.Queryable<Dt_StockInfo>()
|
.Includes(x => x.Details)
|
.Where(x => affectedLocationIds.Contains(x.LocationId))
|
.ToListAsync();
|
|
var stockDict = affectedStocks
|
.Where(x => x.LocationId > 0)
|
.GroupBy(x => x.LocationId)
|
.ToDictionary(x => x.Key, x => x.OrderByDescending(item => item.ModifyDate ?? item.CreateDate).First());
|
|
foreach (var locationId in affectedLocationIds)
|
{
|
if (!locationDict.TryGetValue(locationId, out var location))
|
{
|
_lastLocationSnapshots.TryRemove(locationId, out _);
|
continue;
|
}
|
|
stockDict.TryGetValue(locationId, out var stock);
|
var snapshot = BuildLocationSnapshot(location, stock);
|
|
if (HasSnapshotChanged(locationId, snapshot))
|
{
|
var update = new StockUpdateDTO
|
{
|
LocationId = snapshot.LocationId,
|
WarehouseId = snapshot.WarehouseId,
|
PalletCode = snapshot.PalletCode,
|
StockQuantity = snapshot.StockQuantity,
|
StockStatus = snapshot.StockStatus,
|
LocationStatus = snapshot.LocationStatus,
|
OutboundDate = snapshot.OutboundDate,
|
Details = BuildDetailDtos(stock?.Details?.ToList())
|
};
|
|
await _hubContext.Clients.All.SendAsync("StockUpdated", update);
|
_logger.LogDebug(
|
"增量库存变更推送,LocationId={LocationId},LocationStatus={LocationStatus},StockStatus={StockStatus},Quantity={Quantity}",
|
snapshot.LocationId,
|
snapshot.LocationStatus,
|
snapshot.StockStatus,
|
snapshot.OutboundDate,
|
snapshot.StockQuantity);
|
}
|
|
_lastLocationSnapshots[locationId] = snapshot;
|
}
|
|
UpdateCheckTimes(checkStartedAt);
|
}
|
|
/// <summary>
|
/// 构建货位快照,用于判断货位是否需要推送更新。
|
/// </summary>
|
private LocationSnapshot BuildLocationSnapshot(Dt_LocationInfo location, Dt_StockInfo stock)
|
{
|
float totalQuantity = 0;
|
string detailsHash = string.Empty;
|
|
if (stock?.Details != null && stock.Details.Any())
|
{
|
totalQuantity = stock.Details.Sum(d => d.StockQuantity);
|
detailsHash = GenerateDetailsHash(stock.Details.ToList());
|
}
|
|
return new LocationSnapshot
|
{
|
LocationId = location.Id,
|
WarehouseId = location.WarehouseId,
|
LocationCode = location.LocationCode,
|
LocationStatus = location.LocationStatus,
|
PalletCode = stock?.PalletCode,
|
StockStatus = stock?.StockStatus ?? 0,
|
StockQuantity = totalQuantity,
|
DetailsHash = detailsHash,
|
OutboundDate = stock?.OutboundDate ?? default
|
};
|
}
|
|
/// <summary>
|
/// 对比快照变化,仅在关键字段变化时触发推送。
|
/// </summary>
|
private bool HasSnapshotChanged(int locationId, LocationSnapshot snapshot)
|
{
|
if (!_lastLocationSnapshots.TryGetValue(locationId, out var lastSnapshot))
|
{
|
return true;
|
}
|
|
return lastSnapshot.LocationStatus != snapshot.LocationStatus ||
|
lastSnapshot.StockStatus != snapshot.StockStatus ||
|
lastSnapshot.PalletCode != snapshot.PalletCode ||
|
Math.Abs(lastSnapshot.StockQuantity - snapshot.StockQuantity) > 0.001f ||
|
lastSnapshot.DetailsHash != snapshot.DetailsHash;
|
}
|
|
/// <summary>
|
/// 生成库存明细哈希,确保明细内容变化时能触发前端刷新。
|
/// </summary>
|
private string GenerateDetailsHash(List<Dt_StockInfoDetail> details)
|
{
|
if (details == null || !details.Any())
|
{
|
return string.Empty;
|
}
|
|
var hashString = string.Join(
|
"|",
|
details
|
.OrderBy(d => d.Id)
|
.Select(d => $"{d.Id}:{d.MaterielCode}:{d.BatchNo}:{d.StockQuantity}:{d.SerialNumber}:{d.InboundOrderRowNo}:{d.Status}"));
|
|
return hashString.GetHashCode().ToString();
|
}
|
|
/// <summary>
|
/// 构建推送给前端的明细数据。
|
/// </summary>
|
private List<StockDetailUpdateDTO> BuildDetailDtos(List<Dt_StockInfoDetail> details)
|
{
|
if (details == null || !details.Any())
|
{
|
return new List<StockDetailUpdateDTO>();
|
}
|
|
return details.Select(d => new StockDetailUpdateDTO
|
{
|
Id = d.Id,
|
MaterielCode = d.MaterielCode,
|
MaterielName = d.MaterielName,
|
BatchNo = d.BatchNo,
|
StockQuantity = d.StockQuantity,
|
Unit = d.Unit,
|
Status = d.Status,
|
SerialNumber = d.SerialNumber,
|
InboundOrderRowNo = d.InboundOrderRowNo
|
}).ToList();
|
}
|
|
/// <summary>
|
/// 更新各类增量检查时间。
|
/// </summary>
|
private void UpdateCheckTimes(DateTime checkStartedAt)
|
{
|
_lastLocationCheckTime = checkStartedAt;
|
_lastStockCheckTime = checkStartedAt;
|
_lastDetailCheckTime = checkStartedAt;
|
}
|
|
/// <summary>
|
/// 货位快照。
|
/// </summary>
|
private class LocationSnapshot
|
{
|
public int LocationId { get; set; }
|
public int WarehouseId { get; set; }
|
public string LocationCode { get; set; }
|
public int LocationStatus { get; set; }
|
public string PalletCode { get; set; }
|
public int StockStatus { get; set; }
|
public float StockQuantity { get; set; }
|
public string DetailsHash { get; set; }
|
public DateTime OutboundDate { get; set; }
|
}
|
|
/// <summary>
|
/// 库存变更定位信息。
|
/// </summary>
|
private class StockLocationChange
|
{
|
public int StockId { get; set; }
|
public int LocationId { get; set; }
|
}
|
}
|
}
|