Skip to content

workers are now dynamically changable #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 110 additions & 32 deletions xqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,57 @@ local function _table2tuple ( qformat )
return dostring(fun)
end

local function run_worker(space, xq, worker_num)
local worker = {
space = space;
xq = xq;
num = worker_num;
}
worker.fname = space.name .. '.xq.wrk' .. tostring(worker_num)
if package.reload then
worker.generation = package.reload.count
worker.fname = worker.fname .. '.' .. package.reload.count
end
xq.workers.registered[worker.num] = worker
worker.fiber = fiber.create(function(space, xq, worker_num, worker)
fiber.name(string.sub(worker.fname,1,32))
repeat fiber.sleep(0.001) until space.xq
if worker.xq.ready then xq.ready:get() end
log.info("I am worker %s", worker_num)

while box.space[space.name] and space.xq == xq and xq.workers.registered[worker.num] do
if xq.workers.handler then
local task = space:take(1)
if task then
local key = xq:getkey(task)
local r,e = pcall(xq.workers.handler, task)

if not r then
log.error("Worker for {%s} has error: %s", key, e)
else
if xq.taken[ key ] then
space:ack(task)
end
end

if xq.taken[ key ] then
log.error("Worker for {%s} not released task", key)
space:release(task)
end
end
end
fiber.sleep(1)
end
log.info("worker %s ended", worker_num)
end, space, xq, worker_num, worker)

return setmetatable(worker, {
__serialize = function(self)
return ("[worker#%s] for %s"):format(self.num, self.space.name)
end;
})
end

local methods = {}

function M.upgrade(space,opts,depth)
Expand Down Expand Up @@ -594,44 +645,20 @@ function M.upgrade(space,opts,depth)
end

if opts.worker then
local workers = opts.workers or 1
local worker = opts.worker
for i = 1,workers do
fiber.create(function(space,xq,i)
local fname = space.name .. '.xq.wrk' .. tostring(i)
if package.reload then fname = fname .. '.' .. package.reload.count end
fiber.name(string.sub(fname,1,32))
repeat fiber.sleep(0.001) until space.xq
if xq.ready then xq.ready:get() end
log.info("I am worker %s",i)
while box.space[space.name] and space.xq == xq do
local task = space:take(1)
if task then
local key = xq:getkey(task)
local r,e = pcall(worker,task)
if not r then
log.error("Worker for {%s} has error: %s", key, e)
else
if xq.taken[ key ] then
space:ack(task)
end
end
if xq.taken[ key ] then
log.error("Worker for {%s} not released task", key)
space:release(task)
end
end
fiber.sleep(1)
end
log.info("worker %s ended", i)
end,space,self,i)
self.workers = {
count = opts.workers or 1;
handler = opts.worker;
registered = {};
}
for i = 1, self.workers.count do
run_worker(space, self, i)
end
end

if have_runat then
self.runat_chan = fiber.channel(0)
self.runat = fiber.create(function(space,xq,runat_index)
local fname = space.name .. '.xq'
local fname = space.name .. '.xq.runat'
if package.reload then fname = fname .. '.' .. package.reload.count end
fiber.name(string.sub(fname,1,32))
repeat fiber.sleep(0.001) until space.xq
Expand Down Expand Up @@ -1315,6 +1342,57 @@ function methods:stats(pretty)
return stats
end

function methods:set_worker(handler)
local xq = self.xq
if not xq.workers then
xq.workers = {
count = 1;
handler = handler;
registered = {};
}
else
xq.workers.handler = handler
end

for i = 1, xq.workers.count do
if not xq.workers.registered[i] then
run_worker(self, xq, i)
end
end

return self
end

function methods:set_workers_cnt(cnt)
assert(type(cnt) == 'number', "worker count must be a number")

local xq = self.xq
local old_cnt = xq.workers and xq.workers.count or 0
if not xq.workers then
xq.workers = {
handler = nil;
count = cnt;
registered = {};
}
else
xq.workers.count = cnt
end

if old_cnt > xq.workers.count then
for i = xq.workers.count + 1, old_cnt do
xq.workers.registered[i] = nil
end
elseif old_cnt < xq.workers.count then
for i = old_cnt, xq.workers.count do
if i ~= 0 then
run_worker(self, xq, i)
end
end
end

return self
end

setmetatable(M,{
__call = function(M, space, opts)
M.upgrade(space,opts,1)
Expand Down