using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; using WIDESEAWCS_RedisService.Connection; using WIDESEAWCS_RedisService.Options; namespace WIDESEAWCS_RedisService.DelayQueue { public class RedisDelayQueueService : IDelayQueueService { private readonly IRedisConnectionManager _connectionManager; private readonly RedisOptions _options; private readonly ILogger _logger; public RedisDelayQueueService( IRedisConnectionManager connectionManager, IOptions options, ILogger logger) { _connectionManager = connectionManager; _options = options.Value; _logger = logger; } private string BuildKey(string key) => $"{_options.KeyPrefix}delay:{key}"; public bool Enqueue(string queueName, string message, TimeSpan delay) { var score = DateTimeOffset.UtcNow.Add(delay).ToUnixTimeMilliseconds(); return _connectionManager.GetDatabase().SortedSetAdd(BuildKey(queueName), message, score); } public List DequeueDue(string queueName, int count = 10) { var db = _connectionManager.GetDatabase(); var fullKey = BuildKey(queueName); var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var entries = db.SortedSetRangeByScore(fullKey, 0, now, take: count); var result = new List(); foreach (var entry in entries) { if (db.SortedSetRemove(fullKey, entry)) result.Add(entry.ToString()); } return result; } public bool Remove(string queueName, string message) { return _connectionManager.GetDatabase().SortedSetRemove(BuildKey(queueName), message); } public long GetQueueLength(string queueName) { return _connectionManager.GetDatabase().SortedSetLength(BuildKey(queueName)); } } }