using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Options;
|
using StackExchange.Redis;
|
using WIDESEAWCS_RedisService.Connection;
|
using WIDESEAWCS_RedisService.Options;
|
|
namespace WIDESEAWCS_RedisService.Stream
|
{
|
public class RedisStreamProcessingService : IStreamProcessingService
|
{
|
private readonly IRedisConnectionManager _connectionManager;
|
private readonly RedisOptions _options;
|
private readonly ILogger<RedisStreamProcessingService> _logger;
|
|
public RedisStreamProcessingService(
|
IRedisConnectionManager connectionManager,
|
IOptions<RedisOptions> options,
|
ILogger<RedisStreamProcessingService> logger)
|
{
|
_connectionManager = connectionManager;
|
_options = options.Value;
|
_logger = logger;
|
}
|
|
private string BuildKey(string key) => $"{_options.KeyPrefix}stream:{key}";
|
|
public string AddMessage(string streamKey, Dictionary<string, string> fields)
|
{
|
var entries = fields.Select(f => new NameValueEntry(f.Key, f.Value)).ToArray();
|
var id = _connectionManager.GetDatabase().StreamAdd(BuildKey(streamKey), entries);
|
return id.ToString();
|
}
|
|
public bool CreateConsumerGroup(string streamKey, string groupName, string startId = "$")
|
{
|
try
|
{
|
return _connectionManager.GetDatabase()
|
.StreamCreateConsumerGroup(BuildKey(streamKey), groupName, startId, true);
|
}
|
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP"))
|
{
|
_logger.LogWarning("消费者组 {Group} 已存在", groupName);
|
return true;
|
}
|
}
|
|
public List<StreamEntry> ReadGroup(string streamKey, string groupName, string consumerName, int count = 10)
|
{
|
var entries = _connectionManager.GetDatabase()
|
.StreamReadGroup(BuildKey(streamKey), groupName, consumerName, ">", count);
|
|
return entries?.Select(e => new StreamEntry
|
{
|
Id = e.Id.ToString(),
|
Fields = e.Values.ToDictionary(v => v.Name.ToString(), v => v.Value.ToString())
|
}).ToList() ?? new List<StreamEntry>();
|
}
|
|
public long Acknowledge(string streamKey, string groupName, params string[] messageIds)
|
{
|
var ids = messageIds.Select(id => new RedisValue(id)).ToArray();
|
return _connectionManager.GetDatabase().StreamAcknowledge(BuildKey(streamKey), groupName, ids);
|
}
|
|
public long GetStreamLength(string streamKey)
|
{
|
return _connectionManager.GetDatabase().StreamLength(BuildKey(streamKey));
|
}
|
|
public long TrimStream(string streamKey, int maxLength)
|
{
|
return _connectionManager.GetDatabase().StreamTrim(BuildKey(streamKey), maxLength);
|
}
|
}
|
}
|