using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Options;
|
using StackExchange.Redis;
|
using WIDESEAWCS_RedisService.Connection;
|
using WIDESEAWCS_RedisService.Options;
|
|
namespace WIDESEAWCS_RedisService.DelayQueue
|
{
|
public class RedisDelayQueueService : IDelayQueueService
|
{
|
private readonly IRedisConnectionManager _connectionManager;
|
private readonly RedisOptions _options;
|
private readonly ILogger<RedisDelayQueueService> _logger;
|
|
public RedisDelayQueueService(
|
IRedisConnectionManager connectionManager,
|
IOptions<RedisOptions> options,
|
ILogger<RedisDelayQueueService> logger)
|
{
|
_connectionManager = connectionManager;
|
_options = options.Value;
|
_logger = logger;
|
}
|
|
private string BuildKey(string key) => $"{_options.KeyPrefix}delay:{key}";
|
|
public bool Enqueue(string queueName, string message, TimeSpan delay)
|
{
|
var score = DateTimeOffset.UtcNow.Add(delay).ToUnixTimeMilliseconds();
|
return _connectionManager.GetDatabase().SortedSetAdd(BuildKey(queueName), message, score);
|
}
|
|
public List<string> DequeueDue(string queueName, int count = 10)
|
{
|
var db = _connectionManager.GetDatabase();
|
var fullKey = BuildKey(queueName);
|
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
var entries = db.SortedSetRangeByScore(fullKey, 0, now, take: count);
|
var result = new List<string>();
|
|
foreach (var entry in entries)
|
{
|
if (db.SortedSetRemove(fullKey, entry))
|
result.Add(entry.ToString());
|
}
|
return result;
|
}
|
|
public bool Remove(string queueName, string message)
|
{
|
return _connectionManager.GetDatabase().SortedSetRemove(BuildKey(queueName), message);
|
}
|
|
public long GetQueueLength(string queueName)
|
{
|
return _connectionManager.GetDatabase().SortedSetLength(BuildKey(queueName));
|
}
|
}
|
}
|