using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Options;
|
using StackExchange.Redis;
|
using WIDESEAWCS_RedisService.Connection;
|
using WIDESEAWCS_RedisService.Options;
|
|
namespace WIDESEAWCS_RedisService.PubSub
|
{
|
public class RedisMessageQueueService : IMessageQueueService
|
{
|
private readonly IRedisConnectionManager _connectionManager;
|
private readonly RedisOptions _options;
|
private readonly ILogger<RedisMessageQueueService> _logger;
|
|
public RedisMessageQueueService(
|
IRedisConnectionManager connectionManager,
|
IOptions<RedisOptions> options,
|
ILogger<RedisMessageQueueService> logger)
|
{
|
_connectionManager = connectionManager;
|
_options = options.Value;
|
_logger = logger;
|
}
|
|
private string BuildKey(string key) => $"{_options.KeyPrefix}queue:{key}";
|
|
public long Publish(string channel, string message)
|
{
|
var sub = _connectionManager.GetSubscriber();
|
return sub.Publish(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"), message);
|
}
|
|
public void Subscribe(string channel, Action<string, string> handler)
|
{
|
var sub = _connectionManager.GetSubscriber();
|
sub.Subscribe(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"), (ch, msg) =>
|
{
|
handler(ch!, msg!);
|
});
|
}
|
|
public void Unsubscribe(string channel)
|
{
|
var sub = _connectionManager.GetSubscriber();
|
sub.Unsubscribe(RedisChannel.Literal($"{_options.KeyPrefix}{channel}"));
|
}
|
|
public long Enqueue(string queueName, string message)
|
{
|
return _connectionManager.GetDatabase().ListLeftPush(BuildKey(queueName), message);
|
}
|
|
public string? Dequeue(string queueName, TimeSpan? timeout = null)
|
{
|
var val = _connectionManager.GetDatabase().ListRightPop(BuildKey(queueName));
|
return val.IsNullOrEmpty ? null : val.ToString();
|
}
|
|
public long GetQueueLength(string queueName)
|
{
|
return _connectionManager.GetDatabase().ListLength(BuildKey(queueName));
|
}
|
}
|
}
|