commit - dc01cc3710db3686a900ab2e2fed2cd837b4fcac
commit + 494e2f3ed4564d1e4e0378152e78b1fb8ca802d6
blob - 71178ecb1a2b070125d571550b54f6087a9c12ca
blob + dbc6dd4994e95c129e4decb28e05259caae2ceaf
--- test/entrypoint/srv-tarantool.lua
+++ test/entrypoint/srv-tarantool.lua
#!/usr/bin/env tarantool
-local workdir = os.getenv('TARANTOOL_WORKDIR')
-local listen = os.getenv('TARANTOOL_LISTEN')
-local qsync_quorum = os.getenv('TARANTOOL_QUORUM')
-local qsync_peers = os.getenv('TARANTOOL_PEERS')
+local json = require('json')
-box.cfg({
- feedback_enabled = false,
- listen = listen,
- log_level = 6,
- memtx_memory = 1024 * 1024 * 1024,
- net_msg_max = 2 * 1024,
- work_dir = workdir,
- iproto_threads = 2,
-})
-
-if qsync_peers and qsync_quorum then
- box.cfg.election_mode = 'candidate'
- box.cfg.election_timeout = 0.5
- box.cfg.memtx_use_mvcc_engine = true
- box.cfg.replication_synchro_quorum = qsync_quorum
- box.cfg.replication_synchro_timeout = 0.2
- box.cfg.replication = { qsync_peers }
- box.cfg.replication_timeout = 1
+local cfg = os.getenv('TARANTOOL_BOX_CFG')
+if cfg == nil then
+ cfg = "{}"
end
+local res = json.decode(cfg)
+assert(type(res) == 'table')
+res.work_dir = os.getenv('TARANTOOL_WORKDIR')
+res.listen = os.getenv('TARANTOOL_LISTEN')
+res.feedback_enabled = false
+res.log_level = 6
+res.memtx_memory = 1024 * 1024 * 1024
+res.net_msg_max = 2 * 1024
+res.iproto_threads = 2
+
+box.cfg(res)
+
local function bootstrap()
local space = box.schema.space.create('register_space')
space:format({
box.schema.user.grant('guest', 'read,write', 'space', '_index')
box.schema.user.grant('guest', 'write', 'space', '_schema')
box.schema.user.grant('guest', 'write', 'space', '_space')
+ box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true})
end
box.once('molly', bootstrap)
return true
end
+
+_G.ready = true
blob - /dev/null
blob + 2fa67a4af18e971da593e11ec0d15236cc9b369b (mode 644)
--- /dev/null
+++ test/tarantool/qsync_test.lua
+local t = require('luatest')
+local log = require('log')
+local Cluster = require('test.test-helpers.cluster')
+local server = require('test.test-helpers.server')
+local json = require('json')
+
+local pg = t.group('quorum_master', {
+ { engine = 'memtx' },
+ { engine = 'vinyl' }
+})
+
+pg.before_each(function(cg)
+ local engine = cg.params.engine
+ cg.cluster = Cluster:new({})
+
+ cg.box_cfg = {
+ replication = {
+ server.build_instance_uri('master_quorum1');
+ server.build_instance_uri('master_quorum2');
+ server.build_instance_uri('master_quorum3');
+ };
+ election_timeout = 0.5,
+ memtx_use_mvcc_engine = true,
+ replication_connect_quorum = 0;
+ replication_synchro_timeout = 0.2,
+ replication_timeout = 0.1;
+ }
+
+ cg.master_quorum1 = cg.cluster:build_server(
+ {
+ alias = 'master_quorum1',
+ engine = engine,
+ box_cfg = cg.box_cfg,
+ })
+
+ cg.master_quorum2 = cg.cluster:build_server(
+ {
+ alias = 'master_quorum2',
+ engine = engine,
+ box_cfg = cg.box_cfg,
+ })
+
+ cg.master_quorum3 = cg.cluster:build_server(
+ {
+ alias = 'master_quorum3',
+ engine = engine,
+ box_cfg = cg.box_cfg,
+ })
+
+
+ pcall(log.cfg, {level = 6})
+
+end)
+
+pg.after_each(function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+pg.before_test('test_qsync_basic', function(cg)
+ cg.cluster:add_server(cg.master_quorum1)
+ cg.cluster:add_server(cg.master_quorum2)
+ cg.cluster:add_server(cg.master_quorum3)
+ cg.cluster:start()
+ local bootstrap_function = function()
+ box.schema.space.create('test', {
+ engine = os.getenv('TARANTOOL_ENGINE')
+ })
+ box.space.test:create_index('primary')
+ end
+ cg.cluster:exec_on_leader(bootstrap_function)
+
+end)
+
+pg.after_test('test_qsync_basic', function(cg)
+ cg.cluster:drop({
+ cg.master_quorum1,
+ cg.master_quorum2,
+ cg.master_quorum3,
+ })
+end)
+
+pg.test_qsync_basic = function(cg)
+ local repl = json.encode({replication = cg.box_cfg.replication})
+ cg.master_quorum1:eval('box.cfg{replication = ""}')
+ t.assert_equals(cg.master_quorum1:eval('return box.space.test:insert{1}'), {1})
+ cg.master_quorum1:eval(('box.cfg{replication = %s}'):format(repl.replication))
+ cg.master_quorum2:wait_vclock_of(cg.master_quorum1)
+ --t.assert_equals(cg.master_quorum2:eval('return box.space.test:select()'), {{1}})
+end
blob - 43e9722e56cd5743b13ef9685255b748a057f4cf
blob + 79c2f7d6cc63b3afd43e3f8b7d526f1deb9eceba
--- test/test-helpers/cluster.lua
+++ test/test-helpers/cluster.lua
+local checks = require('checks')
local fio = require('fio')
-local Server = require('test.luatest_helpers.server')
+local luatest = require('luatest')
-local root = os.environ()['SOURCEDIR'] or '.'
+local Server = require('test.test-helpers.server')
local Cluster = {}
+local ROOT = os.environ()['SOURCEDIR'] or '.'
+
function Cluster:new(object)
self:inherit(object)
object:initialize()
end
function Cluster:get_index(server)
- local index = nil
+ local index
for i, v in ipairs(self.servers) do
if (v.id == server) then
index = i
end
function Cluster:build_server(server_config, instance_file)
- instance_file = instance_file or 'default.lua'
- server_config.command = fio.pathjoin(root, 'test/instances/', instance_file)
+ instance_file = instance_file or 'srv-tarantool.lua'
+ server_config = table.deepcopy(server_config)
+ server_config.command = fio.pathjoin(ROOT, 'test/entrypoint/', instance_file)
assert(server_config.alias, 'Either replicaset.alias or server.alias must be given')
local server = Server:new(server_config)
table.insert(self.built_servers, server)
return leader:exec(bootstrap_function)
end
+function Cluster:wait_fullmesh(params)
+ checks('table', {timeout = '?number', delay = '?number'})
+ if not params then params = {} end
+ local config = {timeout = params.timeout or 30, delay = params.delay or 0.1}
+
+ luatest.helpers.retrying(config, function(cluster)
+ for _, server1 in ipairs(cluster.servers) do
+ for _, server2 in ipairs(cluster.servers) do
+ if server1 ~= server2 then
+ local server1_id = server1:exec(function()
+ return box.info.id
+ end)
+ local server2_id = server2:exec(function()
+ return box.info.id
+ end)
+ if server1_id ~= server2_id then
+ server1:assert_follows_upstream(server2_id)
+ end
+ end
+ end
+ end
+ end, self)
+end
+
return Cluster
blob - /dev/null
blob + 1ff7536a8f4f05f9478d685628c97950feb739a7 (mode 644)
--- /dev/null
+++ test/test-helpers/server.lua
+local clock = require('clock')
+local digest = require('digest')
+local ffi = require('ffi')
+local fiber = require('fiber')
+local fio = require('fio')
+local fun = require('fun')
+local json = require('json')
+local errno = require('errno')
+local log = require('log')
+local yaml = require('yaml')
+
+local checks = require('checks')
+local luatest = require('luatest')
+
+ffi.cdef([[
+ int kill(pid_t pid, int sig);
+]])
+
+local Server = luatest.Server:inherit({})
+
+local WAIT_TIMEOUT = 60
+local WAIT_DELAY = 0.1
+
+-- Differences from luatest.Server:
+--
+-- * 'alias' is mandatory.
+-- * 'command' is optional, assumed test/instances/default.lua by
+-- default.
+-- * 'datadir' is optional, specifies a directory: if specified, the directory's
+-- contents will be recursively copied into 'workdir' during initialization.
+-- * 'workdir' is optional, determined by 'alias'.
+-- * The new 'box_cfg' parameter.
+-- * engine - provides engine for parameterized tests
+Server.constructor_checks = fun.chain(Server.constructor_checks, {
+ alias = 'string',
+ command = '?string',
+ datadir = '?string',
+ workdir = '?string',
+ box_cfg = '?table',
+ engine = '?string',
+}):tomap()
+
+Server.socketdir = fio.abspath(os.getenv('VARDIR') or '/tmp/t')
+
+function Server.build_instance_uri(alias)
+ return ('%s/%s.iproto'):format(Server.socketdir, alias)
+end
+
+function Server:initialize()
+ if self.id == nil then
+ local random = digest.urandom(9)
+ self.id = digest.base64_encode(random, {urlsafe = true})
+ end
+ if self.workdir == nil then
+ self.workdir = ('%s/%s-%s'):format(self.socketdir, self.alias, self.id)
+ fio.rmtree(self.workdir)
+ fio.mktree(self.workdir)
+ end
+ if self.datadir ~= nil then
+ local ok, err = fio.copytree(self.datadir, self.workdir)
+ if not ok then
+ error(string.format('Failed to copy directory: %s', err))
+ end
+ self.datadir = nil
+ end
+ if self.net_box_port == nil and self.net_box_uri == nil then
+ self.net_box_uri = self.build_instance_uri(self.alias)
+ fio.mktree(self.socketdir)
+ end
+
+ -- AFAIU, the inner getmetatable() returns our helpers.Server
+ -- class, the outer one returns luatest.Server class.
+ getmetatable(getmetatable(self)).initialize(self)
+end
+
+--- Generates environment to run process with.
+-- The result is merged into os.environ().
+-- @return map
+function Server:build_env()
+ local res = getmetatable(getmetatable(self)).build_env(self)
+ if self.box_cfg ~= nil then
+ res.TARANTOOL_BOX_CFG = json.encode(self.box_cfg)
+ end
+ res.TARANTOOL_ENGINE = self.engine
+ return res
+end
+
+local function wait_cond(cond_name, server, func, ...)
+ local alias = server.alias
+ local id = server.id
+ local pid = server.process.pid
+
+ local deadline = clock.time() + WAIT_TIMEOUT
+ while true do
+ if func(...) then
+ return
+ end
+ if clock.time() > deadline then
+ error(('Waiting for "%s" on server %s-%s (PID %d) timed out')
+ :format(cond_name, alias, id, pid))
+ end
+ fiber.sleep(WAIT_DELAY)
+ end
+end
+
+function Server:wait_for_readiness()
+ return wait_cond('readiness', self, function()
+ local ok, is_ready = pcall(function()
+ self:connect_net_box()
+ return self.net_box:eval('return _G.ready') == true
+ end)
+ return ok and is_ready
+ end)
+end
+
+function Server:wait_election_leader()
+ -- Include read-only property too because if an instance is a leader, it
+ -- does not mean it finished the synchro queue ownership transition. It is
+ -- read-only until that happens. But in tests usually the leader is needed
+ -- as a writable node.
+ return wait_cond('election leader', self, self.exec, self, function()
+ return box.info.election.state == 'leader' and not box.info.ro
+ end)
+end
+
+function Server:wait_election_leader_found()
+ return wait_cond('election leader is found', self, self.exec, self,
+ function() return box.info.election.leader ~= 0 end)
+end
+
+function Server:wait_election_term(term)
+ return wait_cond('election term', self, self.exec, self, function(term)
+ return box.info.election.term >= term
+ end, {term})
+end
+
+function Server:wait_synchro_queue_term(term)
+ return wait_cond('synchro queue term', self, self.exec, self, function(term)
+ return box.info.synchro.queue.term >= term
+ end, {term})
+end
+
+-- Unlike the original luatest.Server function it waits for
+-- starting the server.
+function Server:start(opts)
+ checks('table', {
+ wait_for_readiness = '?boolean',
+ })
+ getmetatable(getmetatable(self)).start(self)
+
+ -- The option is true by default.
+ local wait_for_readiness = true
+ if opts ~= nil and opts.wait_for_readiness ~= nil then
+ wait_for_readiness = opts.wait_for_readiness
+ end
+
+ if wait_for_readiness then
+ self:wait_for_readiness()
+ end
+end
+
+function Server:instance_id()
+ -- Cache the value when found it first time.
+ if self.instance_id_value then
+ return self.instance_id_value
+ end
+ local id = self:exec(function() return box.info.id end)
+ -- But do not cache 0 - it is an anon instance, its ID might change.
+ if id ~= 0 then
+ self.instance_id_value = id
+ end
+ return id
+end
+
+function Server:instance_uuid()
+ -- Cache the value when found it first time.
+ if self.instance_uuid_value then
+ return self.instance_uuid_value
+ end
+ local uuid = self:exec(function() return box.info.uuid end)
+ self.instance_uuid_value = uuid
+ return uuid
+end
+
+function Server:election_term()
+ return self:exec(function() return box.info.election.term end)
+end
+
+function Server:synchro_queue_term()
+ return self:exec(function() return box.info.synchro.queue.term end)
+end
+
+-- TODO: Add the 'wait_for_readiness' parameter for the restart()
+-- method.
+
+-- Unlike the original luatest.Server function it waits until
+-- the server will stop.
+function Server:stop()
+ local alias = self.alias
+ local id = self.id
+ if self.process then
+ local pid = self.process.pid
+ getmetatable(getmetatable(self)).stop(self)
+
+ local deadline = clock.time() + WAIT_TIMEOUT
+ while true do
+ if ffi.C.kill(pid, 0) ~= 0 then
+ break
+ end
+ if clock.time() > deadline then
+ error(('Stopping of server %s-%s (PID %d) was timed out'):format(
+ alias, id, pid))
+ end
+ fiber.sleep(WAIT_DELAY)
+ end
+ end
+end
+
+function Server:cleanup()
+ fio.rmtree(self.workdir)
+ self.instance_id_value = nil
+ self.instance_uuid_value = nil
+end
+
+function Server:drop()
+ self:stop()
+ self:cleanup()
+end
+
+-- A copy of test_run:grep_log.
+function Server:grep_log(what, bytes, opts)
+ local opts = opts or {}
+ local noreset = opts.noreset or false
+ -- if instance has crashed provide filename to use grep_log
+ local filename = opts.filename or self:eval('return box.cfg.log')
+ local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'})
+
+ local function fail(msg)
+ local err = errno.strerror()
+ if file ~= nil then
+ file:close()
+ end
+ error(string.format("%s: %s: %s", msg, filename, err))
+ end
+
+ if file == nil then
+ fail("Failed to open log file")
+ end
+ io.flush() -- attempt to flush stdout == log fd
+ local filesize = file:seek(0, 'SEEK_END')
+ if filesize == nil then
+ fail("Failed to get log file size")
+ end
+ local bytes = bytes or 65536 -- don't read whole log - it can be huge
+ bytes = bytes > filesize and filesize or bytes
+ if file:seek(-bytes, 'SEEK_END') == nil then
+ fail("Failed to seek log file")
+ end
+ local found, buf
+ repeat -- read file in chunks
+ local s = file:read(2048)
+ if s == nil then
+ fail("Failed to read log file")
+ end
+ local pos = 1
+ repeat -- split read string in lines
+ local endpos = string.find(s, '\n', pos)
+ endpos = endpos and endpos - 1 -- strip terminating \n
+ local line = string.sub(s, pos, endpos)
+ if endpos == nil and s ~= '' then
+ -- line doesn't end with \n or eof, append it to buffer
+ -- to be checked on next iteration
+ buf = buf or {}
+ table.insert(buf, line)
+ else
+ if buf ~= nil then -- prepend line with buffered data
+ table.insert(buf, line)
+ line = table.concat(buf)
+ buf = nil
+ end
+ if string.match(line, "Starting instance") and not noreset then
+ found = nil -- server was restarted, reset search
+ else
+ found = string.match(line, what) or found
+ end
+ end
+ pos = endpos and endpos + 2 -- jump to char after \n
+ until pos == nil
+ until s == ''
+ file:close()
+ return found
+end
+
+function Server:assert_follows_upstream(server_id)
+ local status = self:exec(function(id)
+ return box.info.replication[id].upstream.status
+ end, {server_id})
+ luatest.assert_equals(status, 'follow',
+ ('%s: server does not follow upstream'):format(self.alias))
+end
+
+function Server:get_vclock()
+ return self:exec(function() return box.info.vclock end)
+end
+
+function Server:wait_vclock(to_vclock)
+ while true do
+ local vclock = self:get_vclock()
+ local ok = true
+
+ for server_id, to_lsn in pairs(to_vclock) do
+ local lsn = vclock[server_id]
+ if lsn == nil or lsn < to_lsn then
+ ok = false
+ break
+ end
+ end
+
+ if ok then
+ return
+ end
+
+ log.info("wait vclock: %s to %s",
+ yaml.encode(vclock), yaml.encode(to_vclock))
+ fiber.sleep(0.001)
+ end
+end
+
+function Server:wait_vclock_of(other_server)
+ local vclock = other_server:get_vclock()
+ -- First component is for local changes.
+ vclock[0] = nil
+ return self:wait_vclock(vclock)
+end
+
+return Server