Skip to content

Commit

Permalink
api: fix INIT state stuck
Browse files Browse the repository at this point in the history
Sometimes, instance could enter the queue initialization
while still not running (for example, left in the orphan mode).
This resulted in "lazy start". But Tarantool does not call
`box.cfg {}` after leaving orphan mode, so queue could stuck in the
`INIT` state.

Now if the instance is read-only,  separate fiber is watching for
updates of its mode.

Note that this fix works only for Tarantool versions >= 2.10.0.
This is because of used watchers.

Closes #226
  • Loading branch information
DerekBum authored and oleg-jukovec committed Apr 15, 2024
1 parent 5f2b145 commit aa7c092
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 14 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.

## [1.3.3] - 2023-09-13

### Fixed
Expand Down
58 changes: 44 additions & 14 deletions queue/init.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
local fiber = require('fiber')

local abstract = require('queue.abstract')
local queue_state = require('queue.abstract.queue_state')
local qc = require('queue.compat')
local queue = nil

-- load all core drivers
Expand All @@ -11,6 +14,10 @@ local core_drivers = {
limfifottl = require('queue.abstract.driver.limfifottl')
}

-- since:
-- https://github.com/locker/tarantool/commit/8cf5151cb4f05cee3fd0ea831add2b3187a01fe4
local watchers_supported = qc.check_version({2, 10, 0})

local function register_driver(driver_name, tube_ctr)
if type(tube_ctr.create_space) ~= 'function' or
type(tube_ctr.new) ~= 'function' then
Expand Down Expand Up @@ -60,7 +67,20 @@ queue = setmetatable({
local orig_cfg = nil
local orig_call = nil

local wrapper_impl
local wrapper_impl, handle_instance_mode

local function rw_waiter()
fiber.name('queue instance rw waiter')
local wait_cond = fiber.cond()
local w = box.watch('box.status', function(_, new_status)
if new_status.is_ro == false then
wait_cond:signal()
end
end)
wait_cond:wait()
w:unregister()
handle_instance_mode()
end

local function cfg_wrapper(...)
box.cfg = orig_cfg
Expand All @@ -79,24 +99,22 @@ local function wrap_box_cfg()
orig_cfg = box.cfg
box.cfg = cfg_wrapper
elseif type(box.cfg) == 'table' then
-- box.cfg after the first box.cfg call
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
if box.info.ro_reason == 'config' or not watchers_supported then
-- box.cfg after the first box.cfg call.
-- The another call could switch the mode.
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
else
-- Wait for the rw state.
fiber.new(rw_waiter)
end
else
error('The box.cfg type is unexpected: ' .. type(box.cfg))
end
end

function wrapper_impl(...)
local result = { pcall(box.cfg,...) }
if result[1] then
table.remove(result, 1)
else
wrap_box_cfg()
error(result[2])
end

function handle_instance_mode()
if box.info.ro == false then
local abstract = require 'queue.abstract'
for name, val in pairs(abstract) do
Expand All @@ -113,6 +131,18 @@ function wrapper_impl(...)
-- with read_only = false
wrap_box_cfg()
end
end

function wrapper_impl(...)
local result = { pcall(box.cfg,...) }
if result[1] then
table.remove(result, 1)
else
wrap_box_cfg()
error(result[2])
end

handle_instance_mode()
return unpack(result)
end

Expand Down
97 changes: 97 additions & 0 deletions t/230-orphan-not-stalling-init.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env tarantool

local test = require('tap').test('')
local queue = require('queue')
local tnt = require('t.tnt')
local fio = require('fio')
local fiber = require('fiber')

rawset(_G, 'queue', require('queue'))

local qc = require('queue.compat')
if not qc.check_version({2, 10, 0}) then
require('log').info('Tests skipped, tarantool version < 2.10.0 ' ..
'does not support the lazy init')
return
end

local snapdir_optname = qc.snapdir_optname
local logger_optname = qc.logger_optname

test:plan(1)

test:test('Check orphan mode not stalling queue', function(test)
test:plan(4)
local engine = os.getenv('ENGINE') or 'memtx'
tnt.cluster.cfg{}

local dir_replica = fio.tempdir()
local cmd_replica = {
arg[-1],
'-e',
[[
box.cfg {
replication = {
'replicator:[email protected]:3399',
'replicator:[email protected]:3398',
},
listen = '127.0.0.1:3396',
wal_dir = ']] .. dir_replica .. '\'' ..
',' .. snapdir_optname() .. ' = \'' .. dir_replica .. '\'' ..
',' .. logger_optname() .. ' = \'' ..
fio.pathjoin(dir_replica, 'tarantool.log') .. '\'' ..
'}'
}

replica = require('popen').new(cmd_replica, {
stdin = 'devnull',
stdout = 'devnull',
stderr = 'devnull',
})

local attempts = 0
-- Wait for replica to connect.
while box.info.replication[3] == nil or
box.info.replication[3].downstream.status ~= 'follow' do

attempts = attempts + 1
if attempts == 30 then
error('wait for replica connection')
end
fiber.sleep(0.1)
end

local conn = require('net.box').connect('127.0.0.1:3396')

conn:eval([[
box.cfg{
replication = {
'replicator:[email protected]:3399',
'replicator:[email protected]:3398',
'replicator:[email protected]:3396',
},
listen = '127.0.0.1:3397',
replication_connect_quorum = 4,
}
]])

conn:eval('rawset(_G, "queue", require("queue"))')

test:is(conn:call('queue.state'), 'INIT', 'check queue state')
test:is(conn:call('box.info').ro, true, 'check read only')
test:is(conn:call('box.info').ro_reason, 'orphan', 'check ro reason')

conn:eval('box.cfg{replication_connect_quorum = 2}')

local attempts = 0
while conn:call('queue.state') ~= 'RUNNING' and attempts < 50 do
fiber.sleep(0.1)
attempts = attempts + 1
end
test:is(conn:call('queue.state'), 'RUNNING', 'check queue state after orphan')
end)

rawset(_G, 'queue', nil)
tnt.finish()
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :

0 comments on commit aa7c092

Please sign in to comment.