wanshenmean
2026-03-17 94ad631d316da04c46266ddb1fc6e63e6f8f2fae
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using WIDESEAWCS_RedisService.Connection;
using WIDESEAWCS_RedisService.Options;
 
namespace WIDESEAWCS_RedisService.PubSub
{
    public class RedisMessageQueueService : IMessageQueueService
    {
        private readonly IRedisConnectionManager _connectionManager;
        private readonly RedisOptions _options;
        private readonly ILogger<RedisMessageQueueService> _logger;
 
        public RedisMessageQueueService(
            IRedisConnectionManager connectionManager,
            IOptions<RedisOptions> options,
            ILogger<RedisMessageQueueService> logger)
        {
            _connectionManager = connectionManager;
            _options = options.Value;
            _logger = logger;
        }
 
        private string BuildKey(string key) => $"{_options.KeyPrefix}queue:{key}";
 
        public long Publish(string channel, string message)
        {
            var sub = _connectionManager.GetSubscriber();
            return sub.Publish(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"), message);
        }
 
        public void Subscribe(string channel, Action<string, string> handler)
        {
            var sub = _connectionManager.GetSubscriber();
            sub.Subscribe(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"), (ch, msg) =>
            {
                handler(ch!, msg!);
            });
        }
 
        public void Unsubscribe(string channel)
        {
            var sub = _connectionManager.GetSubscriber();
            sub.Unsubscribe(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"));
        }
 
        public long Enqueue(string queueName, string message)
        {
            return _connectionManager.GetDatabase().ListLeftPush(BuildKey(queueName), message);
        }
 
        public string? Dequeue(string queueName, TimeSpan? timeout = null)
        {
            var val = _connectionManager.GetDatabase().ListRightPop(BuildKey(queueName));
            return val.IsNullOrEmpty ? null : val.ToString();
        }
 
        public long GetQueueLength(string queueName)
        {
            return _connectionManager.GetDatabase().ListLength(BuildKey(queueName));
        }
    }
}