diff --git a/README.md b/README.md index 6bf4fa7..124d4e1 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,6 @@ FreeRedis is a redis client based on .NET, supports .NET Core 2.1+, .NET Framewo English | 中文

- - 🌈 RedisClient Keep all method names consistent with redis-cli @@ -224,6 +223,28 @@ foreach (var keys in cli.Scan("*", 10, null)) } ``` +### 🍡DelayQueue + +```c# +var delayQueue = cli.DelayQueue("TestDelayQueue"); + +//Add queue +delayQueue.Enqueue($"Execute in 5 seconds.", TimeSpan.FromSeconds(5)); +delayQueue.Enqueue($"Execute in 10 seconds.", DateTime.Now.AddSeconds(10)); +delayQueue.Enqueue($"Execute in 15 seconds.", DateTime.Now.AddSeconds(15)); +delayQueue.Enqueue($"Execute in 20 seconds.", TimeSpan.FromSeconds(20)); +delayQueue.Enqueue($"Execute in 25 seconds.", DateTime.Now.AddSeconds(25)); +delayQueue.Enqueue($"Execute in 2024-07-02 14:30:15", DateTime.Parse("2024-07-02 14:30:15")); + +//Consumption queue +await delayQueue.DequeueAsync(s => +{ + output.WriteLine($"{DateTime.Now}:{s}"); + + return Task.CompletedTask; +}); +``` + ## 👯 Contributors diff --git a/README.zh-CN.md b/README.zh-CN.md index b04b792..ab7b469 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -223,6 +223,27 @@ foreach (var keys in cli.Scan("*", 10, null)) } ``` +### 🍡DelayQueue (延时队列) + +```c# +var delayQueue = cli.DelayQueue("TestDelayQueue"); + +//添加队列 +delayQueue.Enqueue($"Execute in 5 seconds.", TimeSpan.FromSeconds(5)); +delayQueue.Enqueue($"Execute in 10 seconds.", DateTime.Now.AddSeconds(10)); +delayQueue.Enqueue($"Execute in 15 seconds.", DateTime.Now.AddSeconds(15)); +delayQueue.Enqueue($"Execute in 20 seconds.", TimeSpan.FromSeconds(20)); +delayQueue.Enqueue($"Execute in 25 seconds.", DateTime.Now.AddSeconds(25)); +delayQueue.Enqueue($"Execute in 2024-07-02 14:30:15", DateTime.Parse("2024-07-02 14:30:15")); + +//消费延时队列 +await delayQueue.DequeueAsync(s => +{ + output.WriteLine($"{DateTime.Now}:{s}"); + + return Task.CompletedTask; +}); +``` ## 👯 Contributors (贡献者) diff --git a/src/FreeRedis/FreeRedis.xml b/src/FreeRedis/FreeRedis.xml index 7b1127e..03530d5 100644 --- a/src/FreeRedis/FreeRedis.xml +++ b/src/FreeRedis/FreeRedis.xml @@ -1587,6 +1587,13 @@ Timeout milliseconds The command returns the number of replicas reached by all the writes performed in the context of the current connection. + + + 延时队列 + + 延时队列Key + + 开启分布式锁,若超时返回null @@ -2990,5 +2997,41 @@ redis version >=6.2: Added the GT and LT options. + + + 延时队列 + + + + + 写入延时队列 + + 队列值:值不可重复 + 延迟执行时间 + + + + + 写入延时队列 + + 队列值:值不可重复 + 延迟执行时间 + + + + + 消费延时队列,多个消费端不会重复 + + 消费委托 + 轮询队列时长,默认400毫秒,值越小越准确 + + + + 消费延时队列,多个消费端不会重复 + + 消费委托 + 轮询队列时长,默认400毫秒,值越小越准确 + + diff --git a/src/FreeRedis/RedisClient/Modules/DelayQueue.cs b/src/FreeRedis/RedisClient/Modules/DelayQueue.cs new file mode 100644 index 0000000..7a3371f --- /dev/null +++ b/src/FreeRedis/RedisClient/Modules/DelayQueue.cs @@ -0,0 +1,162 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace FreeRedis +{ + partial class RedisClient + { + /// + /// 延时队列 + /// + /// 延时队列Key + /// + public DelayQueue DelayQueue(string queueKey) => new DelayQueue(this, queueKey); + } + + /// + /// 延时队列 + /// + public class DelayQueue + { + private readonly RedisClient _redisClient = null; + + private readonly string _queueKey; + + + public DelayQueue(RedisClient redisClient, string queueKey) + { + _redisClient = redisClient; + _queueKey = queueKey; + } + + + /// + /// 写入延时队列 + /// + /// 队列值:值不可重复 + /// 延迟执行时间 + /// + public bool Enqueue(string value, TimeSpan delay) + { + var time = DateTime.UtcNow.Add(delay); + long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond; + + var res = _redisClient.ZAdd(_queueKey, timestamp, value); + return res > 0; + } + + /// + /// 写入延时队列 + /// + /// 队列值:值不可重复 + /// 延迟执行时间 + /// + public bool Enqueue(string value, DateTime delay) + { + var time = TimeZoneInfo.ConvertTimeToUtc(delay); + long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond; + var res = _redisClient.ZAdd(_queueKey, timestamp, value); + return res > 0; + } + + /// + /// 消费延时队列,多个消费端不会重复 + /// + /// 消费委托 + /// 轮询队列时长,默认400毫秒,值越小越准确 + public void Dequeue(Action action, int choke = 400) + { + Thread thread = new Thread(() => + { + while (true) + { + try + { + //阻塞节省CPU + Thread.Sleep(choke); + var res = InternalDequeue(); + if (!string.IsNullOrWhiteSpace(res)) + action.Invoke(res); + } + catch + { + // ignored + } + } + }); + + thread.Start(); + } + +#if isasync + + /// + /// 消费延时队列,多个消费端不会重复 + /// + /// 消费委托 + /// 轮询队列时长,默认400毫秒,值越小越准确 + /// + public System.Threading.Tasks.Task DequeueAsync(Func action, + int choke = 400, CancellationToken? token = null) + { + return Task.Factory.StartNew(async () => + { + while (true) + { + try + { + if (token != null && token.Value.IsCancellationRequested) + { + break; + } + + //阻塞节省CPU + await System.Threading.Tasks.Task.Delay(choke); + var res = InternalDequeue(); + if (!string.IsNullOrWhiteSpace(res)) + await action.Invoke(res); + } + catch + { + // ignored + } + } + }, TaskCreationOptions.LongRunning); + } + +#endif + + //取队列任务 + private string InternalDequeue() + { + long timestamp = (DateTime.UtcNow.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond; + + //lua脚本保持原子性 + var script = @" + local zkey = KEYS[1] + local score = ARGV[1] + local zrange = redis.call('zrangebyscore',zkey,0,score,'LIMIT',0,1) + if next(zrange) ~= nil and #zrange > 0 + then + local rmnum = redis.call('zrem',zkey,unpack(zrange)) + if(rmnum > 0) + then + return zrange + end + else + return {} + end + "; + + if (_redisClient.Eval(script, new[] { _queueKey }, timestamp) is object[] eval && eval.Any()) + { + var item = eval[0].ToString() ?? string.Empty; + return item; + } + + return default; + } + } +} \ No newline at end of file diff --git a/test/Unit/FreeRedis.Tests/RedisClientTests/DelayQueueTest.cs b/test/Unit/FreeRedis.Tests/RedisClientTests/DelayQueueTest.cs new file mode 100644 index 0000000..fa922f0 --- /dev/null +++ b/test/Unit/FreeRedis.Tests/RedisClientTests/DelayQueueTest.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace FreeRedis.Tests.RedisClientTests +{ + public class DelayQueueTest(ITestOutputHelper output) + { + static readonly RedisClient _client = new RedisClient("127.0.0.1:6379,password=123"); + + [Fact] + public async Task Test() + { + var delayQueue = _client.DelayQueue("TestDelayQueue"); + + //添加队列 + delayQueue.Enqueue($"Execute in 5 seconds.", TimeSpan.FromSeconds(5)); + delayQueue.Enqueue($"Execute in 10 seconds.", DateTime.Now.AddSeconds(10)); + delayQueue.Enqueue($"Execute in 15 seconds.", DateTime.Now.AddSeconds(15)); + delayQueue.Enqueue($"Execute in 20 seconds.", TimeSpan.FromSeconds(20)); + delayQueue.Enqueue($"Execute in 25 seconds.", DateTime.Now.AddSeconds(25)); + delayQueue.Enqueue($"Execute in 2024-07-02 14:30:15", DateTime.Parse("2024-07-02 14:30:15")); + + + //消费延时队列 + await delayQueue.DequeueAsync(s => + { + output.WriteLine($"{DateTime.Now}:{s}"); + + return Task.CompletedTask; + }); + } + } +} \ No newline at end of file