Commit Diff


commit - dc01cc3710db3686a900ab2e2fed2cd837b4fcac
commit + 494e2f3ed4564d1e4e0378152e78b1fb8ca802d6
blob - 71178ecb1a2b070125d571550b54f6087a9c12ca
blob + dbc6dd4994e95c129e4decb28e05259caae2ceaf
--- test/entrypoint/srv-tarantool.lua
+++ test/entrypoint/srv-tarantool.lua
@@ -1,30 +1,24 @@
 #!/usr/bin/env tarantool
 
-local workdir = os.getenv('TARANTOOL_WORKDIR')
-local listen = os.getenv('TARANTOOL_LISTEN')
-local qsync_quorum = os.getenv('TARANTOOL_QUORUM')
-local qsync_peers = os.getenv('TARANTOOL_PEERS')
+local json = require('json')
 
-box.cfg({
-    feedback_enabled = false,
-    listen = listen,
-    log_level = 6,
-    memtx_memory = 1024 * 1024 * 1024,
-    net_msg_max = 2 * 1024,
-    work_dir = workdir,
-    iproto_threads = 2,
-})
-
-if qsync_peers and qsync_quorum then
-    box.cfg.election_mode = 'candidate'
-    box.cfg.election_timeout = 0.5
-    box.cfg.memtx_use_mvcc_engine = true
-    box.cfg.replication_synchro_quorum = qsync_quorum
-    box.cfg.replication_synchro_timeout = 0.2
-    box.cfg.replication = { qsync_peers }
-    box.cfg.replication_timeout = 1
+local cfg = os.getenv('TARANTOOL_BOX_CFG')
+if cfg == nil then
+    cfg = "{}"
 end
+local res = json.decode(cfg)
+assert(type(res) == 'table')
 
+res.work_dir = os.getenv('TARANTOOL_WORKDIR')
+res.listen = os.getenv('TARANTOOL_LISTEN')
+res.feedback_enabled = false
+res.log_level = 6
+res.memtx_memory = 1024 * 1024 * 1024
+res.net_msg_max = 2 * 1024
+res.iproto_threads = 2
+
+box.cfg(res)
+
 local function bootstrap()
     local space = box.schema.space.create('register_space')
     space:format({
@@ -44,6 +38,7 @@ local function bootstrap()
     box.schema.user.grant('guest', 'read,write', 'space', '_index')
     box.schema.user.grant('guest', 'write', 'space', '_schema')
     box.schema.user.grant('guest', 'write', 'space', '_space')
+    box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true})
 end
 
 box.once('molly', bootstrap)
@@ -127,3 +122,5 @@ function withdraw_multitable(space_name_source, space_
 
     return true
 end
+
+_G.ready = true
blob - /dev/null
blob + 2fa67a4af18e971da593e11ec0d15236cc9b369b (mode 644)
--- /dev/null
+++ test/tarantool/qsync_test.lua
@@ -0,0 +1,90 @@
+local t = require('luatest')
+local log = require('log')
+local Cluster =  require('test.test-helpers.cluster')
+local server = require('test.test-helpers.server')
+local json = require('json')
+
+local pg = t.group('quorum_master', {
+    { engine = 'memtx' },
+    { engine = 'vinyl' }
+})
+
+pg.before_each(function(cg)
+    local engine = cg.params.engine
+    cg.cluster = Cluster:new({})
+
+    cg.box_cfg = {
+        replication = {
+            server.build_instance_uri('master_quorum1');
+            server.build_instance_uri('master_quorum2');
+            server.build_instance_uri('master_quorum3');
+        };
+        election_timeout = 0.5,
+        memtx_use_mvcc_engine = true,
+        replication_connect_quorum = 0;
+        replication_synchro_timeout = 0.2,
+        replication_timeout = 0.1;
+    }
+
+    cg.master_quorum1 = cg.cluster:build_server(
+        {
+            alias = 'master_quorum1',
+            engine = engine,
+            box_cfg = cg.box_cfg,
+    })
+
+    cg.master_quorum2 = cg.cluster:build_server(
+        {
+            alias = 'master_quorum2',
+            engine = engine,
+            box_cfg = cg.box_cfg,
+    })
+
+    cg.master_quorum3 = cg.cluster:build_server(
+        {
+            alias = 'master_quorum3',
+            engine = engine,
+            box_cfg = cg.box_cfg,
+    })
+
+
+    pcall(log.cfg, {level = 6})
+
+end)
+
+pg.after_each(function(cg)
+    cg.cluster.servers = nil
+    cg.cluster:drop()
+end)
+
+pg.before_test('test_qsync_basic', function(cg)
+    cg.cluster:add_server(cg.master_quorum1)
+    cg.cluster:add_server(cg.master_quorum2)
+    cg.cluster:add_server(cg.master_quorum3)
+    cg.cluster:start()
+    local bootstrap_function = function()
+        box.schema.space.create('test', {
+            engine = os.getenv('TARANTOOL_ENGINE')
+        })
+        box.space.test:create_index('primary')
+    end
+    cg.cluster:exec_on_leader(bootstrap_function)
+
+end)
+
+pg.after_test('test_qsync_basic', function(cg)
+    cg.cluster:drop({
+        cg.master_quorum1,
+        cg.master_quorum2,
+        cg.master_quorum3,
+    })
+end)
+
+pg.test_qsync_basic = function(cg)
+    local repl = json.encode({replication = cg.box_cfg.replication})
+    cg.master_quorum1:eval('box.cfg{replication = ""}')
+    t.assert_equals(cg.master_quorum1:eval('return box.space.test:insert{1}'), {1})
+    cg.master_quorum1:eval(('box.cfg{replication = %s}'):format(repl.replication))
+    cg.master_quorum2:wait_vclock_of(cg.master_quorum1)
+    --t.assert_equals(cg.master_quorum2:eval('return box.space.test:select()'), {{1}})
+end
blob - 43e9722e56cd5743b13ef9685255b748a057f4cf
blob + 79c2f7d6cc63b3afd43e3f8b7d526f1deb9eceba
--- test/test-helpers/cluster.lua
+++ test/test-helpers/cluster.lua
@@ -1,10 +1,13 @@
+local checks = require('checks')
 local fio = require('fio')
-local Server = require('test.luatest_helpers.server')
+local luatest = require('luatest')
 
-local root = os.environ()['SOURCEDIR'] or '.'
+local Server = require('test.test-helpers.server')
 
 local Cluster = {}
 
+local ROOT = os.environ()['SOURCEDIR'] or '.'
+
 function Cluster:new(object)
     self:inherit(object)
     object:initialize()
@@ -45,7 +48,7 @@ function Cluster:drop()
 end
 
 function Cluster:get_index(server)
-    local index = nil
+    local index
     for i, v in ipairs(self.servers) do
         if (v.id == server) then
           index = i
@@ -92,8 +95,9 @@ function Cluster:start(opts)
 end
 
 function Cluster:build_server(server_config, instance_file)
-    instance_file = instance_file or 'default.lua'
-    server_config.command = fio.pathjoin(root, 'test/instances/', instance_file)
+    instance_file = instance_file or 'srv-tarantool.lua'
+    server_config = table.deepcopy(server_config)
+    server_config.command = fio.pathjoin(ROOT, 'test/entrypoint/', instance_file)
     assert(server_config.alias, 'Either replicaset.alias or server.alias must be given')
     local server = Server:new(server_config)
     table.insert(self.built_servers, server)
@@ -126,4 +130,28 @@ function Cluster:exec_on_leader(bootstrap_function)
     return leader:exec(bootstrap_function)
 end
 
+function Cluster:wait_fullmesh(params)
+    checks('table', {timeout = '?number', delay = '?number'})
+    if not params then params = {} end
+    local config = {timeout = params.timeout or 30, delay = params.delay or 0.1}
+
+    luatest.helpers.retrying(config, function(cluster)
+        for _, server1 in ipairs(cluster.servers) do
+            for _, server2 in ipairs(cluster.servers) do
+                if server1 ~= server2 then
+                    local server1_id = server1:exec(function()
+                        return box.info.id
+                    end)
+                    local server2_id = server2:exec(function()
+                        return box.info.id
+                    end)
+                    if server1_id ~= server2_id then
+                        server1:assert_follows_upstream(server2_id)
+                    end
+                end
+            end
+        end
+    end, self)
+end
+
 return Cluster
blob - /dev/null
blob + 1ff7536a8f4f05f9478d685628c97950feb739a7 (mode 644)
--- /dev/null
+++ test/test-helpers/server.lua
@@ -0,0 +1,336 @@
+local clock = require('clock')
+local digest = require('digest')
+local ffi = require('ffi')
+local fiber = require('fiber')
+local fio = require('fio')
+local fun = require('fun')
+local json = require('json')
+local errno = require('errno')
+local log = require('log')
+local yaml = require('yaml')
+
+local checks = require('checks')
+local luatest = require('luatest')
+
+ffi.cdef([[
+    int kill(pid_t pid, int sig);
+]])
+
+local Server = luatest.Server:inherit({})
+
+local WAIT_TIMEOUT = 60
+local WAIT_DELAY = 0.1
+
+-- Differences from luatest.Server:
+--
+-- * 'alias' is mandatory.
+-- * 'command' is optional, assumed test/instances/default.lua by
+--   default.
+-- * 'datadir' is optional, specifies a directory: if specified, the directory's
+--   contents will be recursively copied into 'workdir' during initialization.
+-- * 'workdir' is optional, determined by 'alias'.
+-- * The new 'box_cfg' parameter.
+-- * engine - provides engine for parameterized tests
+Server.constructor_checks = fun.chain(Server.constructor_checks, {
+    alias = 'string',
+    command = '?string',
+    datadir = '?string',
+    workdir = '?string',
+    box_cfg = '?table',
+    engine = '?string',
+}):tomap()
+
+Server.socketdir = fio.abspath(os.getenv('VARDIR') or '/tmp/t')
+
+function Server.build_instance_uri(alias)
+    return ('%s/%s.iproto'):format(Server.socketdir, alias)
+end
+
+function Server:initialize()
+    if self.id == nil then
+        local random = digest.urandom(9)
+        self.id = digest.base64_encode(random, {urlsafe = true})
+    end
+    if self.workdir == nil then
+        self.workdir = ('%s/%s-%s'):format(self.socketdir, self.alias, self.id)
+        fio.rmtree(self.workdir)
+        fio.mktree(self.workdir)
+    end
+    if self.datadir ~= nil then
+        local ok, err = fio.copytree(self.datadir, self.workdir)
+        if not ok then
+            error(string.format('Failed to copy directory: %s', err))
+        end
+        self.datadir = nil
+    end
+    if self.net_box_port == nil and self.net_box_uri == nil then
+        self.net_box_uri = self.build_instance_uri(self.alias)
+        fio.mktree(self.socketdir)
+    end
+
+    -- AFAIU, the inner getmetatable() returns our helpers.Server
+    -- class, the outer one returns luatest.Server class.
+    getmetatable(getmetatable(self)).initialize(self)
+end
+
+--- Generates environment to run process with.
+-- The result is merged into os.environ().
+-- @return map
+function Server:build_env()
+    local res = getmetatable(getmetatable(self)).build_env(self)
+    if self.box_cfg ~= nil then
+        res.TARANTOOL_BOX_CFG = json.encode(self.box_cfg)
+    end
+    res.TARANTOOL_ENGINE = self.engine
+    return res
+end
+
+local function wait_cond(cond_name, server, func, ...)
+    local alias = server.alias
+    local id = server.id
+    local pid = server.process.pid
+
+    local deadline = clock.time() + WAIT_TIMEOUT
+    while true do
+        if func(...) then
+            return
+        end
+        if clock.time() > deadline then
+            error(('Waiting for "%s" on server %s-%s (PID %d) timed out')
+                  :format(cond_name, alias, id, pid))
+        end
+        fiber.sleep(WAIT_DELAY)
+    end
+end
+
+function Server:wait_for_readiness()
+    return wait_cond('readiness', self, function()
+        local ok, is_ready = pcall(function()
+            self:connect_net_box()
+            return self.net_box:eval('return _G.ready') == true
+        end)
+        return ok and is_ready
+    end)
+end
+
+function Server:wait_election_leader()
+    -- Include read-only property too because if an instance is a leader, it
+    -- does not mean it finished the synchro queue ownership transition. It is
+    -- read-only until that happens. But in tests usually the leader is needed
+    -- as a writable node.
+    return wait_cond('election leader', self, self.exec, self, function()
+        return box.info.election.state == 'leader' and not box.info.ro
+    end)
+end
+
+function Server:wait_election_leader_found()
+    return wait_cond('election leader is found', self, self.exec, self,
+                     function() return box.info.election.leader ~= 0 end)
+end
+
+function Server:wait_election_term(term)
+    return wait_cond('election term', self, self.exec, self, function(term)
+        return box.info.election.term >= term
+    end, {term})
+end
+
+function Server:wait_synchro_queue_term(term)
+    return wait_cond('synchro queue term', self, self.exec, self, function(term)
+        return box.info.synchro.queue.term >= term
+    end, {term})
+end
+
+-- Unlike the original luatest.Server function it waits for
+-- starting the server.
+function Server:start(opts)
+    checks('table', {
+        wait_for_readiness = '?boolean',
+    })
+    getmetatable(getmetatable(self)).start(self)
+
+    -- The option is true by default.
+    local wait_for_readiness = true
+    if opts ~= nil and opts.wait_for_readiness ~= nil then
+        wait_for_readiness = opts.wait_for_readiness
+    end
+
+    if wait_for_readiness then
+        self:wait_for_readiness()
+    end
+end
+
+function Server:instance_id()
+    -- Cache the value when found it first time.
+    if self.instance_id_value then
+        return self.instance_id_value
+    end
+    local id = self:exec(function() return box.info.id end)
+    -- But do not cache 0 - it is an anon instance, its ID might change.
+    if id ~= 0 then
+        self.instance_id_value = id
+    end
+    return id
+end
+
+function Server:instance_uuid()
+    -- Cache the value when found it first time.
+    if self.instance_uuid_value then
+        return self.instance_uuid_value
+    end
+    local uuid = self:exec(function() return box.info.uuid end)
+    self.instance_uuid_value = uuid
+    return uuid
+end
+
+function Server:election_term()
+    return self:exec(function() return box.info.election.term end)
+end
+
+function Server:synchro_queue_term()
+    return self:exec(function() return box.info.synchro.queue.term end)
+end
+
+-- TODO: Add the 'wait_for_readiness' parameter for the restart()
+-- method.
+
+-- Unlike the original luatest.Server function it waits until
+-- the server will stop.
+function Server:stop()
+    local alias = self.alias
+    local id = self.id
+    if self.process then
+        local pid = self.process.pid
+        getmetatable(getmetatable(self)).stop(self)
+
+        local deadline = clock.time() + WAIT_TIMEOUT
+        while true do
+            if ffi.C.kill(pid, 0) ~= 0 then
+                break
+            end
+            if clock.time() > deadline then
+                error(('Stopping of server %s-%s (PID %d) was timed out'):format(
+                    alias, id, pid))
+            end
+            fiber.sleep(WAIT_DELAY)
+        end
+    end
+end
+
+function Server:cleanup()
+    fio.rmtree(self.workdir)
+    self.instance_id_value = nil
+    self.instance_uuid_value = nil
+end
+
+function Server:drop()
+    self:stop()
+    self:cleanup()
+end
+
+-- A copy of test_run:grep_log.
+function Server:grep_log(what, bytes, opts)
+    local opts = opts or {}
+    local noreset = opts.noreset or false
+    -- if instance has crashed provide filename to use grep_log
+    local filename = opts.filename or self:eval('return box.cfg.log')
+    local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'})
+
+    local function fail(msg)
+        local err = errno.strerror()
+        if file ~= nil then
+            file:close()
+        end
+        error(string.format("%s: %s: %s", msg, filename, err))
+    end
+
+    if file == nil then
+        fail("Failed to open log file")
+    end
+    io.flush() -- attempt to flush stdout == log fd
+    local filesize = file:seek(0, 'SEEK_END')
+    if filesize == nil then
+        fail("Failed to get log file size")
+    end
+    local bytes = bytes or 65536 -- don't read whole log - it can be huge
+    bytes = bytes > filesize and filesize or bytes
+    if file:seek(-bytes, 'SEEK_END') == nil then
+        fail("Failed to seek log file")
+    end
+    local found, buf
+    repeat -- read file in chunks
+        local s = file:read(2048)
+        if s == nil then
+            fail("Failed to read log file")
+        end
+        local pos = 1
+        repeat -- split read string in lines
+            local endpos = string.find(s, '\n', pos)
+            endpos = endpos and endpos - 1 -- strip terminating \n
+            local line = string.sub(s, pos, endpos)
+            if endpos == nil and s ~= '' then
+                -- line doesn't end with \n or eof, append it to buffer
+                -- to be checked on next iteration
+                buf = buf or {}
+                table.insert(buf, line)
+            else
+                if buf ~= nil then -- prepend line with buffered data
+                    table.insert(buf, line)
+                    line = table.concat(buf)
+                    buf = nil
+                end
+                if string.match(line, "Starting instance") and not noreset then
+                    found = nil -- server was restarted, reset search
+                else
+                    found = string.match(line, what) or found
+                end
+            end
+            pos = endpos and endpos + 2 -- jump to char after \n
+        until pos == nil
+    until s == ''
+    file:close()
+    return found
+end
+
+function Server:assert_follows_upstream(server_id)
+    local status = self:exec(function(id)
+        return box.info.replication[id].upstream.status
+    end, {server_id})
+    luatest.assert_equals(status, 'follow',
+        ('%s: server does not follow upstream'):format(self.alias))
+end
+
+function Server:get_vclock()
+    return self:exec(function() return box.info.vclock end)
+end
+
+function Server:wait_vclock(to_vclock)
+    while true do
+        local vclock = self:get_vclock()
+        local ok = true
+
+        for server_id, to_lsn in pairs(to_vclock) do
+            local lsn = vclock[server_id]
+            if lsn == nil or lsn < to_lsn then
+                ok = false
+                break
+            end
+        end
+
+        if ok then
+            return
+        end
+
+        log.info("wait vclock: %s to %s",
+            yaml.encode(vclock), yaml.encode(to_vclock))
+        fiber.sleep(0.001)
+    end
+end
+
+function Server:wait_vclock_of(other_server)
+    local vclock = other_server:get_vclock()
+    -- First component is for local changes.
+    vclock[0] = nil
+    return self:wait_vclock(vclock)
+end
+
+return Server