using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; using WIDESEAWCS_RedisService.Connection; using WIDESEAWCS_RedisService.Options; namespace WIDESEAWCS_RedisService.PubSub { public class RedisMessageQueueService : IMessageQueueService { private readonly IRedisConnectionManager _connectionManager; private readonly RedisOptions _options; private readonly ILogger _logger; public RedisMessageQueueService( IRedisConnectionManager connectionManager, IOptions options, ILogger logger) { _connectionManager = connectionManager; _options = options.Value; _logger = logger; } private string BuildKey(string key) => $"{_options.KeyPrefix}queue:{key}"; public long Publish(string channel, string message) { var sub = _connectionManager.GetSubscriber(); return sub.Publish(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"), message); } public void Subscribe(string channel, Action handler) { var sub = _connectionManager.GetSubscriber(); sub.Subscribe(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"), (ch, msg) => { handler(ch!, msg!); }); } public void Unsubscribe(string channel) { var sub = _connectionManager.GetSubscriber(); sub.Unsubscribe(RedisChannel.Literal($"{_options.KeyPrefix}{channel}")); } public long Enqueue(string queueName, string message) { return _connectionManager.GetDatabase().ListLeftPush(BuildKey(queueName), message); } public string? Dequeue(string queueName, TimeSpan? timeout = null) { var val = _connectionManager.GetDatabase().ListRightPop(BuildKey(queueName)); return val.IsNullOrEmpty ? null : val.ToString(); } public long GetQueueLength(string queueName) { return _connectionManager.GetDatabase().ListLength(BuildKey(queueName)); } } }