using Microsoft.Extensions.Caching.Memory;
|
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Options;
|
using StackExchange.Redis;
|
using System.Collections.Concurrent;
|
using WIDESEAWCS_RedisService.Connection;
|
using WIDESEAWCS_RedisService.Options;
|
using WIDESEAWCS_RedisService.Serialization;
|
|
namespace WIDESEAWCS_RedisService.Cache
|
{
|
/// <summary>
|
/// Redis到内存缓存的自动同步后台服务
|
/// 定期从Redis提取数据并覆盖内存缓存,确保数据一致性
|
/// </summary>
|
public class CacheSyncBackgroundService : BackgroundService
|
{
|
private readonly IRedisConnectionManager _connectionManager;
|
private readonly IMemoryCache _memoryCache;
|
private readonly IRedisSerializer _serializer;
|
private readonly RedisOptions _options;
|
private readonly ILogger<CacheSyncBackgroundService> _logger;
|
private readonly ConcurrentDictionary<string, bool> _trackedKeys;
|
|
public CacheSyncBackgroundService(
|
IRedisConnectionManager connectionManager,
|
IMemoryCache memoryCache,
|
IRedisSerializer serializer,
|
IOptions<RedisOptions> options,
|
ILogger<CacheSyncBackgroundService> logger)
|
{
|
_connectionManager = connectionManager;
|
_memoryCache = memoryCache;
|
_serializer = serializer;
|
_options = options.Value;
|
_logger = logger;
|
_trackedKeys = new ConcurrentDictionary<string, bool>();
|
}
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
{
|
// 如果未启用L1缓存,则无需同步
|
if (!_options.EnableL1Cache)
|
{
|
_logger.LogInformation("L1缓存未启用,后台同步服务不会运行");
|
return;
|
}
|
|
// 如果未启用自动同步,则不运行
|
if (!_options.EnableAutoSync)
|
{
|
_logger.LogInformation("Redis自动同步未启用,后台同步服务不会运行");
|
return;
|
}
|
|
_logger.LogInformation("Redis缓存同步服务已启动,同步间隔: {Interval}秒", _options.SyncIntervalSeconds);
|
|
// 等待Redis连接就绪
|
while (!_connectionManager.IsConnected && !stoppingToken.IsCancellationRequested)
|
{
|
_logger.LogWarning("等待Redis连接就绪...");
|
await Task.Delay(5000, stoppingToken);
|
}
|
|
if (stoppingToken.IsCancellationRequested)
|
return;
|
|
// 启动时先进行一次全量同步
|
await SyncCacheAsync(stoppingToken);
|
|
// 定期同步
|
while (!stoppingToken.IsCancellationRequested)
|
{
|
try
|
{
|
await Task.Delay(TimeSpan.FromSeconds(_options.SyncIntervalSeconds), stoppingToken);
|
|
if (_connectionManager.IsConnected)
|
{
|
await SyncCacheAsync(stoppingToken);
|
}
|
else
|
{
|
_logger.LogWarning("Redis未连接,跳过本次同步");
|
}
|
}
|
catch (OperationCanceledException)
|
{
|
// 正常退出
|
break;
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "缓存同步过程中发生错误");
|
}
|
}
|
|
_logger.LogInformation("Redis缓存同步服务已停止");
|
}
|
|
/// <summary>
|
/// 执行缓存同步
|
/// </summary>
|
private async Task SyncCacheAsync(CancellationToken stoppingToken)
|
{
|
try
|
{
|
var db = _connectionManager.GetDatabase();
|
var server = _connectionManager.GetServer();
|
var keyPrefix = _options.KeyPrefix;
|
|
// 获取所有匹配前缀的Redis key
|
var redisKeys = server.Keys(pattern: $"{keyPrefix}*", pageSize: _options.SyncBatchSize).ToList();
|
|
if (redisKeys.Count == 0)
|
{
|
_logger.LogDebug("Redis中没有找到匹配前缀 {Prefix} 的key", keyPrefix);
|
return;
|
}
|
|
int syncedCount = 0;
|
int skippedCount = 0;
|
int removedCount = 0;
|
|
// 1. 同步Redis中的数据到内存缓存
|
foreach (var redisKey in redisKeys)
|
{
|
if (stoppingToken.IsCancellationRequested)
|
break;
|
|
try
|
{
|
var keyStr = redisKey.ToString();
|
|
// 跳过排除的key前缀(如设备状态等频繁变动的数据)
|
if (_options.SyncExcludePrefixes.Any(prefix => keyStr.StartsWith(prefix, StringComparison.Ordinal)))
|
{
|
_logger.LogTrace("跳过排除的key: {Key}", keyStr);
|
skippedCount++;
|
continue;
|
}
|
|
_trackedKeys.AddOrUpdate(keyStr, true, (_, _) => true);
|
|
// 获取Redis中的值
|
var value = db.StringGet(redisKey);
|
if (!value.IsNullOrEmpty)
|
{
|
var valueStr = value.ToString();
|
var ttl = db.KeyTimeToLive(redisKey);
|
|
// 获取过期时间,如果没有设置TTL则使用默认5分钟
|
var expireSeconds = ttl.HasValue && ttl.Value.TotalSeconds > 0
|
? (int)ttl.Value.TotalSeconds
|
: 300;
|
|
// 更新内存缓存
|
var entryOptions = new MemoryCacheEntryOptions
|
{
|
AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(expireSeconds)
|
};
|
|
_memoryCache.Set(keyStr, valueStr, entryOptions);
|
syncedCount++;
|
}
|
}
|
catch (Exception ex)
|
{
|
_logger.LogWarning(ex, "同步key {Key} 时发生错误", redisKey);
|
}
|
}
|
|
// 2. 清理内存缓存中不存在于Redis的key
|
// 注意:IMemoryCache不支持枚举,这里只能清理已跟踪的key
|
var keysToRemove = _trackedKeys.Where(k => !redisKeys.Contains(k.Key)).Select(k => k.Key).ToList();
|
|
foreach (var keyToRemove in keysToRemove)
|
{
|
_memoryCache.Remove(keyToRemove);
|
_trackedKeys.TryRemove(keyToRemove, out _);
|
removedCount++;
|
}
|
|
_logger.LogInformation(
|
"缓存同步完成: 同步 {SyncedCount} 个, 跳过 {SkippedCount} 个, 清理 {RemovedCount} 个, 总计 {TotalCount} 个key",
|
syncedCount, skippedCount, removedCount, redisKeys.Count);
|
}
|
catch (Exception ex)
|
{
|
_logger.LogError(ex, "缓存同步时发生错误");
|
}
|
}
|
|
/// <summary>
|
/// 获取同步统计信息
|
/// </summary>
|
public CacheSyncStatistics GetStatistics()
|
{
|
return new CacheSyncStatistics
|
{
|
IsEnabled = _options.EnableAutoSync && _options.EnableL1Cache,
|
SyncIntervalSeconds = _options.SyncIntervalSeconds,
|
TrackedKeysCount = _trackedKeys.Count,
|
IsRedisConnected = _connectionManager.IsConnected
|
};
|
}
|
}
|
|
/// <summary>
|
/// 缓存同步统计信息
|
/// </summary>
|
public class CacheSyncStatistics
|
{
|
public bool IsEnabled { get; set; }
|
public int SyncIntervalSeconds { get; set; }
|
public int TrackedKeysCount { get; set; }
|
public bool IsRedisConnected { get; set; }
|
}
|
}
|