Skip to content

Commit

Permalink
- fix: DelayQueue 精确到毫秒 #217
Browse files Browse the repository at this point in the history
  • Loading branch information
2881099 committed Dec 27, 2024
1 parent 02afb80 commit 082de4a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/FreeRedis/FreeRedis.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 11 additions & 13 deletions src/FreeRedis/RedisClient/Modules/DelayQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public DelayQueue(RedisClient redisClient, string queueKey)
public bool Enqueue(string value, TimeSpan delay)
{
var time = DateTime.UtcNow.Add(delay);
long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond;
long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerMillisecond;

var res = _redisClient.ZAdd(_queueKey, timestamp, value);
return res > 0;
Expand All @@ -56,7 +56,7 @@ public bool Enqueue(string value, TimeSpan delay)
public bool Enqueue(string value, DateTime delay)
{
var time = TimeZoneInfo.ConvertTimeToUtc(delay);
long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond;
long timestamp = (time.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerMillisecond;
var res = _redisClient.ZAdd(_queueKey, timestamp, value);
return res > 0;
}
Expand All @@ -66,14 +66,17 @@ public bool Enqueue(string value, DateTime delay)
/// </summary>
/// <param name="action">消费委托</param>
/// <param name="choke">轮询队列时长,默认400毫秒,值越小越准确</param>
public void Dequeue(Action<string> action, int choke = 400)
public void Dequeue(Action<string> action, int choke = 400, CancellationToken? token = null)
{
Thread thread = new Thread(() =>
{
while (true)
{
try
{
if (token != null && token.Value.IsCancellationRequested)
break;

//阻塞节省CPU
Thread.Sleep(choke);
var res = InternalDequeue();
Expand All @@ -98,8 +101,7 @@ public void Dequeue(Action<string> action, int choke = 400)
/// <param name="action">消费委托</param>
/// <param name="choke">轮询队列时长,默认400毫秒,值越小越准确</param>
/// <param name="token"></param>
public System.Threading.Tasks.Task DequeueAsync(Func<string, System.Threading.Tasks.Task> action,
int choke = 400, CancellationToken? token = null)
public Task DequeueAsync(Func<string, Task> action, int choke = 400, CancellationToken? token = null)
{
return Task.Factory.StartNew(async () =>
{
Expand All @@ -108,12 +110,10 @@ public System.Threading.Tasks.Task DequeueAsync(Func<string, System.Threading.Ta
try
{
if (token != null && token.Value.IsCancellationRequested)
{
break;
}

//阻塞节省CPU
await System.Threading.Tasks.Task.Delay(choke);
await Task.Delay(choke);
var res = InternalDequeue();
if (!string.IsNullOrWhiteSpace(res))
await action.Invoke(res);
Expand All @@ -131,16 +131,14 @@ public System.Threading.Tasks.Task DequeueAsync(Func<string, System.Threading.Ta
//取队列任务
private string InternalDequeue()
{
long timestamp = (DateTime.UtcNow.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerSecond;
long timestamp = (DateTime.UtcNow.Ticks - new DateTime(1970, 1, 1).Ticks) / TimeSpan.TicksPerMillisecond;

//lua脚本保持原子性
var script = @"
local zkey = KEYS[1]
local score = ARGV[1]
local zrange = redis.call('zrangebyscore',zkey,0,score,'LIMIT',0,1)
local zrange = redis.call('zrangebyscore',KEYS[1],0,ARGV[1],'LIMIT',0,1)
if next(zrange) ~= nil and #zrange > 0
then
local rmnum = redis.call('zrem',zkey,unpack(zrange))
local rmnum = redis.call('zrem',KEYS[1],unpack(zrange))
if(rmnum > 0)
then
return zrange
Expand Down

0 comments on commit 082de4a

Please sign in to comment.