using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; using WIDESEAWCS_RedisService.Connection; using WIDESEAWCS_RedisService.Options; namespace WIDESEAWCS_RedisService.Stream { public class RedisStreamProcessingService : IStreamProcessingService { private readonly IRedisConnectionManager _connectionManager; private readonly RedisOptions _options; private readonly ILogger _logger; public RedisStreamProcessingService( IRedisConnectionManager connectionManager, IOptions options, ILogger logger) { _connectionManager = connectionManager; _options = options.Value; _logger = logger; } private string BuildKey(string key) => $"{_options.KeyPrefix}stream:{key}"; public string AddMessage(string streamKey, Dictionary fields) { var entries = fields.Select(f => new NameValueEntry(f.Key, f.Value)).ToArray(); var id = _connectionManager.GetDatabase().StreamAdd(BuildKey(streamKey), entries); return id.ToString(); } public bool CreateConsumerGroup(string streamKey, string groupName, string startId = "$") { try { return _connectionManager.GetDatabase() .StreamCreateConsumerGroup(BuildKey(streamKey), groupName, startId, true); } catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP")) { _logger.LogWarning("消费者组 {Group} 已存在", groupName); return true; } } public List ReadGroup(string streamKey, string groupName, string consumerName, int count = 10) { var entries = _connectionManager.GetDatabase() .StreamReadGroup(BuildKey(streamKey), groupName, consumerName, ">", count); return entries?.Select(e => new StreamEntry { Id = e.Id.ToString(), Fields = e.Values.ToDictionary(v => v.Name.ToString(), v => v.Value.ToString()) }).ToList() ?? new List(); } public long Acknowledge(string streamKey, string groupName, params string[] messageIds) { var ids = messageIds.Select(id => new RedisValue(id)).ToArray(); return _connectionManager.GetDatabase().StreamAcknowledge(BuildKey(streamKey), groupName, ids); } public long GetStreamLength(string streamKey) { return _connectionManager.GetDatabase().StreamLength(BuildKey(streamKey)); } public long TrimStream(string streamKey, int maxLength) { return _connectionManager.GetDatabase().StreamTrim(BuildKey(streamKey), maxLength); } } }