commit - 955537b57c2aade58b7ca42501a9bbe50dd91f26
commit + db27af79da9866f0e65965671e4750f9c0cd7f38
blob - /dev/null
blob + 566f39d1c1f304ff9d880926322ceecaf6e2434f (mode 644)
--- /dev/null
+++ changelogs/unreleased/gh-10286-faster-connpool-filter.md
+## feature/connpool
+
+- `connpool.filter()` now works faster, especially in case there are
+ unavailable instances (gh-10286).
blob - 6c97aa3548e4e91a05be347497609e4f2cee8ad2
blob + d81e3ee5a1e0e99580017c14542668f008bebc4c
--- src/box/lua/connpool.lua
+++ src/box/lua/connpool.lua
local clock = require('clock')
local config = require('config')
local checks = require('checks')
+local fun = require('fun')
local netbox = require('net.box')
local WATCHER_DELAY = 0.1
return true
end
+local connection_mode_update_cond = nil
local function connect(instance_name, opts)
+ if not connection_mode_update_cond then
+ connection_mode_update_cond = fiber.cond()
+ end
+
checks('string', {
connect_timeout = '?number',
wait_connected = '?boolean',
conn.mode = mode
local function watch_status(_key, value)
conn._mode = value.is_ro and 'ro' or 'rw'
+ connection_mode_update_cond:broadcast()
end
conn:watch('box.status', watch_status)
end
return conn
end
+local function is_candidate_connected(candidate)
+ local conn = connections[candidate]
+ return conn and conn.state == 'active' and conn:mode() ~= nil
+end
+
+-- Checks whether the candidate has responded with success or
+-- with an error.
+local function is_candidate_checked(candidate)
+ local conn = connections[candidate]
+
+ return not conn or
+ is_candidate_connected(candidate) or
+ conn.state == 'error' or
+ conn.state == 'closed'
+end
+
local function connect_to_candidates(candidates)
local delay = WATCHER_DELAY
- local conn_opts = {connect_timeout = WATCHER_TIMEOUT}
- local connected_candidates = {}
+ local connect_deadline = clock.monotonic() + WATCHER_TIMEOUT
+
for _, instance_name in pairs(candidates) do
- local time_connect_end = clock.monotonic() + WATCHER_TIMEOUT
- local ok, conn = pcall(connect, instance_name, conn_opts)
- -- If state is not 'active', conn:mode() cannot become not nil.
- if ok and conn.state == 'active' then
- while conn:mode() == nil and clock.monotonic() < time_connect_end do
- fiber.sleep(delay)
- end
- if conn:mode() ~= nil then
- table.insert(connected_candidates, instance_name)
- else
- conn:close()
- end
+ pcall(connect, instance_name, {
+ wait_connected = false,
+ connect_timeout = WATCHER_TIMEOUT
+ })
+ end
+
+ assert(connection_mode_update_cond ~= nil)
+
+ local connected_candidates = {}
+ while clock.monotonic() < connect_deadline do
+ connected_candidates = fun.iter(candidates)
+ :filter(is_candidate_connected)
+ :totable()
+
+ local all_checked = fun.iter(candidates)
+ :all(is_candidate_checked)
+
+ if all_checked then
+ return connected_candidates
end
+
+ connection_mode_update_cond:wait(delay)
end
return connected_candidates
end
blob - db435826f7c6023a0a17648546b1fc1bebfc4676
blob + 00fd5b8f22d1b3cd5ae81e543a7bb28129c21511
--- test/config-luatest/rpc_test.lua
+++ test/config-luatest/rpc_test.lua
local fun = require('fun')
local treegen = require('luatest.treegen')
local server = require('luatest.server')
+local socket = require('socket')
local helpers = require('test.config-luatest.helpers')
local g = helpers.group()
local function skip_if_no_vshard()
t.skip_if(not has_vshard, 'Module "vshard-ee/vshard" is not available')
+end
+
+local function start_stub_servers(g, dir, instances)
+ g.stub_servers = g.stub_servers or {}
+ for _, instance in ipairs(instances) do
+ local uri = ('%s/%s.iproto'):format(dir, instance)
+ local s = socket.tcp_server('unix/', uri, function()
+ require('fiber').sleep(11000)
+ end)
+ t.assert(s)
+ g.stub_servers[instance] = s
+ end
end
+local function stop_stub_servers(g)
+ for _, server in pairs(g.stub_servers) do
+ server:close()
+ end
+end
+
g.test_connect = function(g)
local dir = treegen.prepare_directory({}, {})
local config = [[
g.server_3:exec(check)
g.server_4:exec(check)
end
+
+g.test_filter_works_in_parallel = function(g)
+ local dir = treegen.prepare_directory(g, {}, {})
+ local config = [[
+ credentials:
+ users:
+ guest:
+ roles: [super]
+ myuser:
+ password: "secret"
+ roles: [replication]
+ privileges:
+ - permissions: [execute]
+ universe: true
+
+ iproto:
+ listen:
+ - uri: 'unix/:./{{ instance_name }}.iproto'
+ advertise:
+ peer:
+ login: 'myuser'
+
+ groups:
+ group-001:
+ replicasets:
+ replicaset-001:
+ instances:
+ instance-001:
+ database:
+ mode: rw
+ replicaset-002:
+ instances:
+ instance-002: {}
+ instance-003: {}
+ instance-004: {}
+ instance-005: {}
+ ]]
+ treegen.write_file(dir, 'config.yaml', config)
+ local opts = {
+ env = {LUA_PATH = os.environ()['LUA_PATH']},
+ config_file = 'config.yaml',
+ chdir = dir,
+ }
+ g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
+
+ g.server_1:start({wait_until_ready = false})
+ start_stub_servers(g, dir, {
+ 'instance-002',
+ 'instance-003',
+ 'instance-004',
+ 'instance-005'
+ })
+
+ g.server_1:wait_until_ready()
+
+ local function check_filter()
+ local connpool = require('experimental.connpool')
+ local clock = require('clock')
+
+ -- The connection timeout for filter() is hardcoded in
+ -- connpool and equals to 10 seconds.
+ local CONNECT_TIMEOUT = 10
+
+ -- If the connection pool tries to connect in parallel, the
+ -- execution time is bounded with CONNECT_TIMEOUT plus some
+ -- small overhead.
+ --
+ -- Make sure we're trying to access the instances in parallel.
+ local opts = { mode = 'ro' }
+ local timestamp_before_filter = clock.monotonic()
+ t.assert_equals(connpool.filter(opts), {})
+ local elapsed_time = clock.monotonic() - timestamp_before_filter
+ t.assert_gt(elapsed_time, CONNECT_TIMEOUT / 2)
+ t.assert_lt(elapsed_time, CONNECT_TIMEOUT * 3 / 2)
+ end
+
+ g.server_1:exec(check_filter)
+end
+g.after_test('test_filter_works_in_parallel', function(g)
+ stop_stub_servers(g)
+end)
+
g.test_call = function(g)
local dir = treegen.prepare_directory({}, {})
local config = [[
g.server_2:exec(check)
g.server_3:exec(check)
g.server_4:exec(check)
+end
+
+g.test_call_works_in_parallel = function(g)
+ local dir = treegen.prepare_directory(g, {}, {})
+ local config = [[
+ credentials:
+ users:
+ guest:
+ roles: [super]
+ myuser:
+ password: "secret"
+ roles: [replication]
+ privileges:
+ - permissions: [execute]
+ universe: true
+
+ roles: [one]
+
+ iproto:
+ listen:
+ - uri: 'unix/:./{{ instance_name }}.iproto'
+ advertise:
+ peer:
+ login: 'myuser'
+
+ groups:
+ group-001:
+ replicasets:
+ replicaset-001:
+ instances:
+ instance-001:
+ database:
+ mode: rw
+ replicaset-002:
+ instances:
+ instance-002: {}
+ instance-003: {}
+ instance-004: {}
+ instance-005: {}
+ ]]
+ treegen.write_file(dir, 'config.yaml', config)
+
+ local role = string.dump(function()
+ local function f1()
+ return box.info.name
+ end
+
+ rawset(_G, 'f1', f1)
+
+ return {
+ stop = function() end,
+ apply = function() end,
+ validate = function() end,
+ }
+ end)
+ treegen.write_file(dir, 'one.lua', role)
+
+ local opts = {
+ env = {LUA_PATH = os.environ()['LUA_PATH']},
+ config_file = 'config.yaml',
+ chdir = dir,
+ }
+ g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
+
+ g.server_1:start({wait_until_ready = false})
+ start_stub_servers(g, dir, {
+ 'instance-002',
+ 'instance-003',
+ 'instance-004',
+ 'instance-005'
+ })
+
+ g.server_1:wait_until_ready()
+
+ local function check_filter()
+ local connpool = require('experimental.connpool')
+ local clock = require('clock')
+
+ -- The connection timeout for filter() is hardcoded in
+ -- connpool and equals to 10 seconds.
+ local CONNECT_TIMEOUT = 10
+
+ -- call() internally uses filter(). Ensure it tries to
+ -- access sinstances in parallel.
+ local opts = { mode = 'prefer_ro' }
+ local timestamp_before_filter = clock.monotonic()
+ t.assert_equals(connpool.call('f1', nil, opts), 'instance-001')
+ local elapsed_time = clock.monotonic() - timestamp_before_filter
+ t.assert_gt(elapsed_time, CONNECT_TIMEOUT / 2)
+ t.assert_lt(elapsed_time, CONNECT_TIMEOUT * 3 / 2)
+ end
+
+ g.server_1:exec(check_filter)
end
+g.after_test('test_call_works_in_parallel', function(g)
+ stop_stub_servers(g)
+end)