commit 494e2f3ed4564d1e4e0378152e78b1fb8ca802d6 from: Sergey Bronnikov date: Wed Jun 01 19:01:41 2022 UTC add possibility to run tests on cluster commit - dc01cc3710db3686a900ab2e2fed2cd837b4fcac commit + 494e2f3ed4564d1e4e0378152e78b1fb8ca802d6 blob - 71178ecb1a2b070125d571550b54f6087a9c12ca blob + dbc6dd4994e95c129e4decb28e05259caae2ceaf --- test/entrypoint/srv-tarantool.lua +++ test/entrypoint/srv-tarantool.lua @@ -1,30 +1,24 @@ #!/usr/bin/env tarantool -local workdir = os.getenv('TARANTOOL_WORKDIR') -local listen = os.getenv('TARANTOOL_LISTEN') -local qsync_quorum = os.getenv('TARANTOOL_QUORUM') -local qsync_peers = os.getenv('TARANTOOL_PEERS') +local json = require('json') -box.cfg({ - feedback_enabled = false, - listen = listen, - log_level = 6, - memtx_memory = 1024 * 1024 * 1024, - net_msg_max = 2 * 1024, - work_dir = workdir, - iproto_threads = 2, -}) - -if qsync_peers and qsync_quorum then - box.cfg.election_mode = 'candidate' - box.cfg.election_timeout = 0.5 - box.cfg.memtx_use_mvcc_engine = true - box.cfg.replication_synchro_quorum = qsync_quorum - box.cfg.replication_synchro_timeout = 0.2 - box.cfg.replication = { qsync_peers } - box.cfg.replication_timeout = 1 +local cfg = os.getenv('TARANTOOL_BOX_CFG') +if cfg == nil then + cfg = "{}" end +local res = json.decode(cfg) +assert(type(res) == 'table') +res.work_dir = os.getenv('TARANTOOL_WORKDIR') +res.listen = os.getenv('TARANTOOL_LISTEN') +res.feedback_enabled = false +res.log_level = 6 +res.memtx_memory = 1024 * 1024 * 1024 +res.net_msg_max = 2 * 1024 +res.iproto_threads = 2 + +box.cfg(res) + local function bootstrap() local space = box.schema.space.create('register_space') space:format({ @@ -44,6 +38,7 @@ local function bootstrap() box.schema.user.grant('guest', 'read,write', 'space', '_index') box.schema.user.grant('guest', 'write', 'space', '_schema') box.schema.user.grant('guest', 'write', 'space', '_space') + box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true}) end box.once('molly', bootstrap) @@ -127,3 +122,5 @@ function withdraw_multitable(space_name_source, space_ return true end + +_G.ready = true blob - /dev/null blob + 2fa67a4af18e971da593e11ec0d15236cc9b369b (mode 644) --- /dev/null +++ test/tarantool/qsync_test.lua @@ -0,0 +1,90 @@ +local t = require('luatest') +local log = require('log') +local Cluster = require('test.test-helpers.cluster') +local server = require('test.test-helpers.server') +local json = require('json') + +local pg = t.group('quorum_master', { + { engine = 'memtx' }, + { engine = 'vinyl' } +}) + +pg.before_each(function(cg) + local engine = cg.params.engine + cg.cluster = Cluster:new({}) + + cg.box_cfg = { + replication = { + server.build_instance_uri('master_quorum1'); + server.build_instance_uri('master_quorum2'); + server.build_instance_uri('master_quorum3'); + }; + election_timeout = 0.5, + memtx_use_mvcc_engine = true, + replication_connect_quorum = 0; + replication_synchro_timeout = 0.2, + replication_timeout = 0.1; + } + + cg.master_quorum1 = cg.cluster:build_server( + { + alias = 'master_quorum1', + engine = engine, + box_cfg = cg.box_cfg, + }) + + cg.master_quorum2 = cg.cluster:build_server( + { + alias = 'master_quorum2', + engine = engine, + box_cfg = cg.box_cfg, + }) + + cg.master_quorum3 = cg.cluster:build_server( + { + alias = 'master_quorum3', + engine = engine, + box_cfg = cg.box_cfg, + }) + + + pcall(log.cfg, {level = 6}) + +end) + +pg.after_each(function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + +pg.before_test('test_qsync_basic', function(cg) + cg.cluster:add_server(cg.master_quorum1) + cg.cluster:add_server(cg.master_quorum2) + cg.cluster:add_server(cg.master_quorum3) + cg.cluster:start() + local bootstrap_function = function() + box.schema.space.create('test', { + engine = os.getenv('TARANTOOL_ENGINE') + }) + box.space.test:create_index('primary') + end + cg.cluster:exec_on_leader(bootstrap_function) + +end) + +pg.after_test('test_qsync_basic', function(cg) + cg.cluster:drop({ + cg.master_quorum1, + cg.master_quorum2, + cg.master_quorum3, + }) +end) + +pg.test_qsync_basic = function(cg) + local repl = json.encode({replication = cg.box_cfg.replication}) + cg.master_quorum1:eval('box.cfg{replication = ""}') + t.assert_equals(cg.master_quorum1:eval('return box.space.test:insert{1}'), {1}) + cg.master_quorum1:eval(('box.cfg{replication = %s}'):format(repl.replication)) + cg.master_quorum2:wait_vclock_of(cg.master_quorum1) + --t.assert_equals(cg.master_quorum2:eval('return box.space.test:select()'), {{1}}) +end blob - 43e9722e56cd5743b13ef9685255b748a057f4cf blob + 79c2f7d6cc63b3afd43e3f8b7d526f1deb9eceba --- test/test-helpers/cluster.lua +++ test/test-helpers/cluster.lua @@ -1,10 +1,13 @@ +local checks = require('checks') local fio = require('fio') -local Server = require('test.luatest_helpers.server') +local luatest = require('luatest') -local root = os.environ()['SOURCEDIR'] or '.' +local Server = require('test.test-helpers.server') local Cluster = {} +local ROOT = os.environ()['SOURCEDIR'] or '.' + function Cluster:new(object) self:inherit(object) object:initialize() @@ -45,7 +48,7 @@ function Cluster:drop() end function Cluster:get_index(server) - local index = nil + local index for i, v in ipairs(self.servers) do if (v.id == server) then index = i @@ -92,8 +95,9 @@ function Cluster:start(opts) end function Cluster:build_server(server_config, instance_file) - instance_file = instance_file or 'default.lua' - server_config.command = fio.pathjoin(root, 'test/instances/', instance_file) + instance_file = instance_file or 'srv-tarantool.lua' + server_config = table.deepcopy(server_config) + server_config.command = fio.pathjoin(ROOT, 'test/entrypoint/', instance_file) assert(server_config.alias, 'Either replicaset.alias or server.alias must be given') local server = Server:new(server_config) table.insert(self.built_servers, server) @@ -126,4 +130,28 @@ function Cluster:exec_on_leader(bootstrap_function) return leader:exec(bootstrap_function) end +function Cluster:wait_fullmesh(params) + checks('table', {timeout = '?number', delay = '?number'}) + if not params then params = {} end + local config = {timeout = params.timeout or 30, delay = params.delay or 0.1} + + luatest.helpers.retrying(config, function(cluster) + for _, server1 in ipairs(cluster.servers) do + for _, server2 in ipairs(cluster.servers) do + if server1 ~= server2 then + local server1_id = server1:exec(function() + return box.info.id + end) + local server2_id = server2:exec(function() + return box.info.id + end) + if server1_id ~= server2_id then + server1:assert_follows_upstream(server2_id) + end + end + end + end + end, self) +end + return Cluster blob - /dev/null blob + 1ff7536a8f4f05f9478d685628c97950feb739a7 (mode 644) --- /dev/null +++ test/test-helpers/server.lua @@ -0,0 +1,336 @@ +local clock = require('clock') +local digest = require('digest') +local ffi = require('ffi') +local fiber = require('fiber') +local fio = require('fio') +local fun = require('fun') +local json = require('json') +local errno = require('errno') +local log = require('log') +local yaml = require('yaml') + +local checks = require('checks') +local luatest = require('luatest') + +ffi.cdef([[ + int kill(pid_t pid, int sig); +]]) + +local Server = luatest.Server:inherit({}) + +local WAIT_TIMEOUT = 60 +local WAIT_DELAY = 0.1 + +-- Differences from luatest.Server: +-- +-- * 'alias' is mandatory. +-- * 'command' is optional, assumed test/instances/default.lua by +-- default. +-- * 'datadir' is optional, specifies a directory: if specified, the directory's +-- contents will be recursively copied into 'workdir' during initialization. +-- * 'workdir' is optional, determined by 'alias'. +-- * The new 'box_cfg' parameter. +-- * engine - provides engine for parameterized tests +Server.constructor_checks = fun.chain(Server.constructor_checks, { + alias = 'string', + command = '?string', + datadir = '?string', + workdir = '?string', + box_cfg = '?table', + engine = '?string', +}):tomap() + +Server.socketdir = fio.abspath(os.getenv('VARDIR') or '/tmp/t') + +function Server.build_instance_uri(alias) + return ('%s/%s.iproto'):format(Server.socketdir, alias) +end + +function Server:initialize() + if self.id == nil then + local random = digest.urandom(9) + self.id = digest.base64_encode(random, {urlsafe = true}) + end + if self.workdir == nil then + self.workdir = ('%s/%s-%s'):format(self.socketdir, self.alias, self.id) + fio.rmtree(self.workdir) + fio.mktree(self.workdir) + end + if self.datadir ~= nil then + local ok, err = fio.copytree(self.datadir, self.workdir) + if not ok then + error(string.format('Failed to copy directory: %s', err)) + end + self.datadir = nil + end + if self.net_box_port == nil and self.net_box_uri == nil then + self.net_box_uri = self.build_instance_uri(self.alias) + fio.mktree(self.socketdir) + end + + -- AFAIU, the inner getmetatable() returns our helpers.Server + -- class, the outer one returns luatest.Server class. + getmetatable(getmetatable(self)).initialize(self) +end + +--- Generates environment to run process with. +-- The result is merged into os.environ(). +-- @return map +function Server:build_env() + local res = getmetatable(getmetatable(self)).build_env(self) + if self.box_cfg ~= nil then + res.TARANTOOL_BOX_CFG = json.encode(self.box_cfg) + end + res.TARANTOOL_ENGINE = self.engine + return res +end + +local function wait_cond(cond_name, server, func, ...) + local alias = server.alias + local id = server.id + local pid = server.process.pid + + local deadline = clock.time() + WAIT_TIMEOUT + while true do + if func(...) then + return + end + if clock.time() > deadline then + error(('Waiting for "%s" on server %s-%s (PID %d) timed out') + :format(cond_name, alias, id, pid)) + end + fiber.sleep(WAIT_DELAY) + end +end + +function Server:wait_for_readiness() + return wait_cond('readiness', self, function() + local ok, is_ready = pcall(function() + self:connect_net_box() + return self.net_box:eval('return _G.ready') == true + end) + return ok and is_ready + end) +end + +function Server:wait_election_leader() + -- Include read-only property too because if an instance is a leader, it + -- does not mean it finished the synchro queue ownership transition. It is + -- read-only until that happens. But in tests usually the leader is needed + -- as a writable node. + return wait_cond('election leader', self, self.exec, self, function() + return box.info.election.state == 'leader' and not box.info.ro + end) +end + +function Server:wait_election_leader_found() + return wait_cond('election leader is found', self, self.exec, self, + function() return box.info.election.leader ~= 0 end) +end + +function Server:wait_election_term(term) + return wait_cond('election term', self, self.exec, self, function(term) + return box.info.election.term >= term + end, {term}) +end + +function Server:wait_synchro_queue_term(term) + return wait_cond('synchro queue term', self, self.exec, self, function(term) + return box.info.synchro.queue.term >= term + end, {term}) +end + +-- Unlike the original luatest.Server function it waits for +-- starting the server. +function Server:start(opts) + checks('table', { + wait_for_readiness = '?boolean', + }) + getmetatable(getmetatable(self)).start(self) + + -- The option is true by default. + local wait_for_readiness = true + if opts ~= nil and opts.wait_for_readiness ~= nil then + wait_for_readiness = opts.wait_for_readiness + end + + if wait_for_readiness then + self:wait_for_readiness() + end +end + +function Server:instance_id() + -- Cache the value when found it first time. + if self.instance_id_value then + return self.instance_id_value + end + local id = self:exec(function() return box.info.id end) + -- But do not cache 0 - it is an anon instance, its ID might change. + if id ~= 0 then + self.instance_id_value = id + end + return id +end + +function Server:instance_uuid() + -- Cache the value when found it first time. + if self.instance_uuid_value then + return self.instance_uuid_value + end + local uuid = self:exec(function() return box.info.uuid end) + self.instance_uuid_value = uuid + return uuid +end + +function Server:election_term() + return self:exec(function() return box.info.election.term end) +end + +function Server:synchro_queue_term() + return self:exec(function() return box.info.synchro.queue.term end) +end + +-- TODO: Add the 'wait_for_readiness' parameter for the restart() +-- method. + +-- Unlike the original luatest.Server function it waits until +-- the server will stop. +function Server:stop() + local alias = self.alias + local id = self.id + if self.process then + local pid = self.process.pid + getmetatable(getmetatable(self)).stop(self) + + local deadline = clock.time() + WAIT_TIMEOUT + while true do + if ffi.C.kill(pid, 0) ~= 0 then + break + end + if clock.time() > deadline then + error(('Stopping of server %s-%s (PID %d) was timed out'):format( + alias, id, pid)) + end + fiber.sleep(WAIT_DELAY) + end + end +end + +function Server:cleanup() + fio.rmtree(self.workdir) + self.instance_id_value = nil + self.instance_uuid_value = nil +end + +function Server:drop() + self:stop() + self:cleanup() +end + +-- A copy of test_run:grep_log. +function Server:grep_log(what, bytes, opts) + local opts = opts or {} + local noreset = opts.noreset or false + -- if instance has crashed provide filename to use grep_log + local filename = opts.filename or self:eval('return box.cfg.log') + local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'}) + + local function fail(msg) + local err = errno.strerror() + if file ~= nil then + file:close() + end + error(string.format("%s: %s: %s", msg, filename, err)) + end + + if file == nil then + fail("Failed to open log file") + end + io.flush() -- attempt to flush stdout == log fd + local filesize = file:seek(0, 'SEEK_END') + if filesize == nil then + fail("Failed to get log file size") + end + local bytes = bytes or 65536 -- don't read whole log - it can be huge + bytes = bytes > filesize and filesize or bytes + if file:seek(-bytes, 'SEEK_END') == nil then + fail("Failed to seek log file") + end + local found, buf + repeat -- read file in chunks + local s = file:read(2048) + if s == nil then + fail("Failed to read log file") + end + local pos = 1 + repeat -- split read string in lines + local endpos = string.find(s, '\n', pos) + endpos = endpos and endpos - 1 -- strip terminating \n + local line = string.sub(s, pos, endpos) + if endpos == nil and s ~= '' then + -- line doesn't end with \n or eof, append it to buffer + -- to be checked on next iteration + buf = buf or {} + table.insert(buf, line) + else + if buf ~= nil then -- prepend line with buffered data + table.insert(buf, line) + line = table.concat(buf) + buf = nil + end + if string.match(line, "Starting instance") and not noreset then + found = nil -- server was restarted, reset search + else + found = string.match(line, what) or found + end + end + pos = endpos and endpos + 2 -- jump to char after \n + until pos == nil + until s == '' + file:close() + return found +end + +function Server:assert_follows_upstream(server_id) + local status = self:exec(function(id) + return box.info.replication[id].upstream.status + end, {server_id}) + luatest.assert_equals(status, 'follow', + ('%s: server does not follow upstream'):format(self.alias)) +end + +function Server:get_vclock() + return self:exec(function() return box.info.vclock end) +end + +function Server:wait_vclock(to_vclock) + while true do + local vclock = self:get_vclock() + local ok = true + + for server_id, to_lsn in pairs(to_vclock) do + local lsn = vclock[server_id] + if lsn == nil or lsn < to_lsn then + ok = false + break + end + end + + if ok then + return + end + + log.info("wait vclock: %s to %s", + yaml.encode(vclock), yaml.encode(to_vclock)) + fiber.sleep(0.001) + end +end + +function Server:wait_vclock_of(other_server) + local vclock = other_server:get_vclock() + -- First component is for local changes. + vclock[0] = nil + return self:wait_vclock(vclock) +end + +return Server