From 49dc63b114cc398c4988d490560a1591b77964ea Mon Sep 17 00:00:00 2001 From: Thijs Schreijer Date: Sat, 9 Mar 2024 08:53:04 +0100 Subject: [PATCH] fix(queue): finish will not return until the item is done (#169) done meaning: it was handled. Previously, it would return once the last item was popped, (but not yet handled) fixes: #168 --- docs/index.html | 2 ++ docs/reference.html | 5 ++++ src/copas/queue.lua | 33 +++++++++++++++++++++++++- tests/queue.lua | 58 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 96 insertions(+), 2 deletions(-) diff --git a/docs/index.html b/docs/index.html index 0c96c8d..d6af96f 100644 --- a/docs/index.html +++ b/docs/index.html @@ -104,6 +104,8 @@

History

Copas 4.7.0 [15/Jan/2023]
diff --git a/docs/reference.html b/docs/reference.html index 2fd3b78..843416e 100644 --- a/docs/reference.html +++ b/docs/reference.html @@ -530,6 +530,11 @@

Non-blocking data exchange and timer/sleep functions

Finishes a queue. Calls queue:stop() and then waits for the queue to run empty (and be destroyed) before returning.

+

When using "workers" via queue:add_worker() + then this method will return after the worker has finished processing the popped item. + When using your own threads and calling queue:pop(), then this method will + return after the last item has been popped, but not necessarily also processed.

+

The timeout defaults to 10 seconds (the default timeout value for a lock), math.huge can be used to wait forever.

diff --git a/src/copas/queue.lua b/src/copas/queue.lua index 01cd53c..ff79892 100644 --- a/src/copas/queue.lua +++ b/src/copas/queue.lua @@ -1,4 +1,5 @@ local copas = require "copas" +local gettime = require("socket").gettime local Sema = copas.semaphore local Lock = copas.lock @@ -30,6 +31,7 @@ function Queue.new(opts) self.workers = setmetatable({}, { __mode = "k" }) self.stopping = false self.worker_id = 0 + self.exit_semaphore = Sema.new(10^9) return self end @@ -105,6 +107,8 @@ end -- destroyed on a timeout. function Queue:finish(timeout, no_destroy_on_timeout) self:stop() + timeout = timeout or self.lock.timeout + local endtime = gettime() + timeout local _, err = self.lock:get(timeout) -- the lock never gets released, only destroyed, so we have to check the error string if err == "timeout" then @@ -113,7 +117,31 @@ function Queue:finish(timeout, no_destroy_on_timeout) end return nil, err end - return true + + -- if we get here, the lock was destroyed, so the queue is empty, now wait for all workers to exit + if not next(self.workers) then + -- all workers already exited, we're done + return true + end + + -- multiple threads can call this "finish" method, so we must check exiting workers + -- one by one. + while true do + local _, err = self.exit_semaphore:take(1, math.max(0, endtime - gettime())) + if err == "destroyed" then + return true -- someone else destroyed/finished it, so we're done + end + if err == "timeout" then + if not no_destroy_on_timeout then + self:destroy() + end + return nil, "timeout" + end + if not next(self.workers) then + self.exit_semaphore:destroy() + return true -- all workers exited, we're done + end + end end @@ -170,6 +198,9 @@ function Queue:add_worker(worker) worker(item) -- TODO: wrap in errorhandling end self.workers[coro] = nil + if self.exit_semaphore then + self.exit_semaphore:give(1) + end end) self.workers[coro] = true diff --git a/tests/queue.lua b/tests/queue.lua index 2e6d71f..ef0b9e9 100644 --- a/tests/queue.lua +++ b/tests/queue.lua @@ -134,5 +134,61 @@ copas.loop(function() copas.pause(0.5) -- to activate the worker, which will now be blocked on the q semaphore q:stop() -- this should exit the idle workers and exit the copas loop end) - print("test 4 success!") + + +-- finish a queue while workers are idle +copas.loop(function() + local q = Queue:new() + q:add_worker(function() end) + copas.pause(0.5) -- to activate the worker, which will now be blocked on the q semaphore + q:finish() -- this should exit the idle workers and exit the copas loop +end) +print("test 5 success!") + + +-- finish doesn't return until all workers are done (finished handling the last queue item) +local result = {} +local passed = true +copas.loop(function() + local q = Queue:new() + q:push(1) + q:push(2) + q:push(3) + for i = 1,2 do -- add 2 workers + q:add_worker(function(n) + table.insert(result, "start item " .. n) + copas.pause(0.5) + table.insert(result, "end item " .. n) + end) + end + -- local s = now() + table.insert(result, "start queue") + copas.pause(0.75) + table.insert(result, "start finish") + local ok, err = q:finish() + table.insert(result, "finished "..tostring(ok).." "..tostring(err)) + copas.pause(1) + local expected = { + "start queue", + "start item 1", + "start item 2", + "end item 1", + "start item 3", + "end item 2", + "start finish", + "end item 3", + "finished true nil", + } + for i = 1, math.max(#result, #expected) do + if result[i] ~= expected[i] then + for n = 1, math.max(#result, #expected) do + print(n, result[n], expected[n], result[n] == expected[n] and "" or " <--- failed") + end + passed = false + break + end + end +end) +assert(passed, "test 6 failed!") +print("test 6 success!")