Skip to content

Commit

Permalink
Merge pull request #192 from d4ilys/master
Browse files Browse the repository at this point in the history
增加延时队列功能
  • Loading branch information
2881099 authored Jul 2, 2024
2 parents 6832e8b + a8c154c commit c1d3d47
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 1 deletion.
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ FreeRedis is a redis client based on .NET, supports .NET Core 2.1+, .NET Framewo
<span>English</span> |
<a href="README.zh-CN.md">中文</a>
</p>

</div>

- 🌈 RedisClient Keep all method names consistent with redis-cli
Expand Down Expand Up @@ -224,6 +223,28 @@ foreach (var keys in cli.Scan("*", 10, null))
}
```

### 🍡DelayQueue

```c#
var delayQueue = cli.DelayQueue("TestDelayQueue");

//Add queue
delayQueue.Enqueue($"Execute in 5 seconds.", TimeSpan.FromSeconds(5));
delayQueue.Enqueue($"Execute in 10 seconds.", DateTime.Now.AddSeconds(10));
delayQueue.Enqueue($"Execute in 15 seconds.", DateTime.Now.AddSeconds(15));
delayQueue.Enqueue($"Execute in 20 seconds.", TimeSpan.FromSeconds(20));
delayQueue.Enqueue($"Execute in 25 seconds.", DateTime.Now.AddSeconds(25));
delayQueue.Enqueue($"Execute in 2024-07-02 14:30:15", DateTime.Parse("2024-07-02 14:30:15"));

//Consumption queue
await delayQueue.DequeueAsync(s =>
{
output.WriteLine($"{DateTime.Now}:{s}");

return Task.CompletedTask;
});
```

## 👯 Contributors

<a href="https://github.com/2881099/FreeRedis/graphs/contributors">
Expand Down
21 changes: 21 additions & 0 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,27 @@ foreach (var keys in cli.Scan("*", 10, null))
}
```

### 🍡DelayQueue (延时队列)

```c#
var delayQueue = cli.DelayQueue("TestDelayQueue");

//添加队列
delayQueue.Enqueue($"Execute in 5 seconds.", TimeSpan.FromSeconds(5));
delayQueue.Enqueue($"Execute in 10 seconds.", DateTime.Now.AddSeconds(10));
delayQueue.Enqueue($"Execute in 15 seconds.", DateTime.Now.AddSeconds(15));
delayQueue.Enqueue($"Execute in 20 seconds.", TimeSpan.FromSeconds(20));
delayQueue.Enqueue($"Execute in 25 seconds.", DateTime.Now.AddSeconds(25));
delayQueue.Enqueue($"Execute in 2024-07-02 14:30:15", DateTime.Parse("2024-07-02 14:30:15"));

//消费延时队列
await delayQueue.DequeueAsync(s =>
{
output.WriteLine($"{DateTime.Now}:{s}");

return Task.CompletedTask;
});
```

## 👯 Contributors (贡献者)

Expand Down
43 changes: 43 additions & 0 deletions src/FreeRedis/FreeRedis.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

162 changes: 162 additions & 0 deletions src/FreeRedis/RedisClient/Modules/DelayQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace FreeRedis
{
partial class RedisClient
{
/// <summary>
/// 延时队列
/// </summary>
/// <param name="queueKey">延时队列Key</param>
/// <returns></returns>
public DelayQueue DelayQueue(string queueKey) => new DelayQueue(this, queueKey);
}

/// <summary>
/// 延时队列
/// </summary>
public class DelayQueue
{
private readonly RedisClient _redisClient = null;

private readonly string _queueKey;


public DelayQueue(RedisClient redisClient, string queueKey)
{
_redisClient = redisClient;
_queueKey = queueKey;
}


/// <summary>
/// 写入延时队列
/// </summary>
/// <param name="value">队列值:值不可重复</param>
/// <param name="delay">延迟执行时间</param>
/// <returns></returns>
public bool Enqueue(string value, TimeSpan delay)
{
var time = DateTime.UtcNow.Add(delay);
long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond;

var res = _redisClient.ZAdd(_queueKey, timestamp, value);
return res > 0;
}

/// <summary>
/// 写入延时队列
/// </summary>
/// <param name="value">队列值:值不可重复</param>
/// <param name="delay">延迟执行时间</param>
/// <returns></returns>
public bool Enqueue(string value, DateTime delay)
{
var time = TimeZoneInfo.ConvertTimeToUtc(delay);
long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond;
var res = _redisClient.ZAdd(_queueKey, timestamp, value);
return res > 0;
}

/// <summary>
/// 消费延时队列,多个消费端不会重复
/// </summary>
/// <param name="action">消费委托</param>
/// <param name="choke">轮询队列时长,默认400毫秒,值越小越准确</param>
public void Dequeue(Action<string> action, int choke = 400)
{
Thread thread = new Thread(() =>
{
while (true)
{
try
{
//阻塞节省CPU
Thread.Sleep(choke);
var res = InternalDequeue();
if (!string.IsNullOrWhiteSpace(res))
action.Invoke(res);
}
catch
{
// ignored
}
}
});

thread.Start();
}

#if isasync

/// <summary>
/// 消费延时队列,多个消费端不会重复
/// </summary>
/// <param name="action">消费委托</param>
/// <param name="choke">轮询队列时长,默认400毫秒,值越小越准确</param>
/// <param name="token"></param>
public System.Threading.Tasks.Task DequeueAsync(Func<string, System.Threading.Tasks.Task> action,
int choke = 400, CancellationToken? token = null)
{
return Task.Factory.StartNew(async () =>
{
while (true)
{
try
{
if (token != null && token.Value.IsCancellationRequested)
{
break;
}

//阻塞节省CPU
await System.Threading.Tasks.Task.Delay(choke);
var res = InternalDequeue();
if (!string.IsNullOrWhiteSpace(res))
await action.Invoke(res);
}
catch
{
// ignored
}
}
}, TaskCreationOptions.LongRunning);
}

#endif

//取队列任务
private string InternalDequeue()
{
long timestamp = (DateTime.UtcNow.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond;

//lua脚本保持原子性
var script = @"
local zkey = KEYS[1]
local score = ARGV[1]
local zrange = redis.call('zrangebyscore',zkey,0,score,'LIMIT',0,1)
if next(zrange) ~= nil and #zrange > 0
then
local rmnum = redis.call('zrem',zkey,unpack(zrange))
if(rmnum > 0)
then
return zrange
end
else
return {}
end
";

if (_redisClient.Eval(script, new[] { _queueKey }, timestamp) is object[] eval && eval.Any())
{
var item = eval[0].ToString() ?? string.Empty;
return item;
}

return default;
}
}
}
39 changes: 39 additions & 0 deletions test/Unit/FreeRedis.Tests/RedisClientTests/DelayQueueTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace FreeRedis.Tests.RedisClientTests
{
public class DelayQueueTest(ITestOutputHelper output)
{
static readonly RedisClient _client = new RedisClient("127.0.0.1:6379,password=123");

[Fact]
public async Task Test()
{
var delayQueue = _client.DelayQueue("TestDelayQueue");

//添加队列
delayQueue.Enqueue($"Execute in 5 seconds.", TimeSpan.FromSeconds(5));
delayQueue.Enqueue($"Execute in 10 seconds.", DateTime.Now.AddSeconds(10));
delayQueue.Enqueue($"Execute in 15 seconds.", DateTime.Now.AddSeconds(15));
delayQueue.Enqueue($"Execute in 20 seconds.", TimeSpan.FromSeconds(20));
delayQueue.Enqueue($"Execute in 25 seconds.", DateTime.Now.AddSeconds(25));
delayQueue.Enqueue($"Execute in 2024-07-02 14:30:15", DateTime.Parse("2024-07-02 14:30:15"));


//消费延时队列
await delayQueue.DequeueAsync(s =>
{
output.WriteLine($"{DateTime.Now}{s}");

return Task.CompletedTask;
});
}
}
}

0 comments on commit c1d3d47

Please sign in to comment.