commit db27af79da9866f0e65965671e4750f9c0cd7f38 from: Georgiy Belyanin via: Alexander Turenko date: Wed Sep 25 11:54:10 2024 UTC connpool: make filter() check connections in parallel Connpool `filter()` method used to be executed synchronously in a single thread. Thus the maximum evaluation time was `number of instances * timeout`. This patch makes its execution parallel limiting the evaluation time with `timeout`. @TarantoolBot document Title: Experimental.connpool: `filter()` now works faster Now `connpool.filter()` checks the instance status in parallel, making the function execution faster, especially in case there are unavailable instances. Closes #10286 commit - 955537b57c2aade58b7ca42501a9bbe50dd91f26 commit + db27af79da9866f0e65965671e4750f9c0cd7f38 blob - /dev/null blob + 566f39d1c1f304ff9d880926322ceecaf6e2434f (mode 644) --- /dev/null +++ changelogs/unreleased/gh-10286-faster-connpool-filter.md @@ -0,0 +1,4 @@ +## feature/connpool + +- `connpool.filter()` now works faster, especially in case there are + unavailable instances (gh-10286). blob - 6c97aa3548e4e91a05be347497609e4f2cee8ad2 blob + d81e3ee5a1e0e99580017c14542668f008bebc4c --- src/box/lua/connpool.lua +++ src/box/lua/connpool.lua @@ -2,6 +2,7 @@ local fiber = require('fiber') local clock = require('clock') local config = require('config') local checks = require('checks') +local fun = require('fun') local netbox = require('net.box') local WATCHER_DELAY = 0.1 @@ -21,7 +22,12 @@ local function is_connection_valid(conn, opts) return true end +local connection_mode_update_cond = nil local function connect(instance_name, opts) + if not connection_mode_update_cond then + connection_mode_update_cond = fiber.cond() + end + checks('string', { connect_timeout = '?number', wait_connected = '?boolean', @@ -58,6 +64,7 @@ local function connect(instance_name, opts) conn.mode = mode local function watch_status(_key, value) conn._mode = value.is_ro and 'ro' or 'rw' + connection_mode_update_cond:broadcast() end conn:watch('box.status', watch_status) end @@ -71,24 +78,49 @@ local function connect(instance_name, opts) return conn end +local function is_candidate_connected(candidate) + local conn = connections[candidate] + return conn and conn.state == 'active' and conn:mode() ~= nil +end + +-- Checks whether the candidate has responded with success or +-- with an error. +local function is_candidate_checked(candidate) + local conn = connections[candidate] + + return not conn or + is_candidate_connected(candidate) or + conn.state == 'error' or + conn.state == 'closed' +end + local function connect_to_candidates(candidates) local delay = WATCHER_DELAY - local conn_opts = {connect_timeout = WATCHER_TIMEOUT} - local connected_candidates = {} + local connect_deadline = clock.monotonic() + WATCHER_TIMEOUT + for _, instance_name in pairs(candidates) do - local time_connect_end = clock.monotonic() + WATCHER_TIMEOUT - local ok, conn = pcall(connect, instance_name, conn_opts) - -- If state is not 'active', conn:mode() cannot become not nil. - if ok and conn.state == 'active' then - while conn:mode() == nil and clock.monotonic() < time_connect_end do - fiber.sleep(delay) - end - if conn:mode() ~= nil then - table.insert(connected_candidates, instance_name) - else - conn:close() - end + pcall(connect, instance_name, { + wait_connected = false, + connect_timeout = WATCHER_TIMEOUT + }) + end + + assert(connection_mode_update_cond ~= nil) + + local connected_candidates = {} + while clock.monotonic() < connect_deadline do + connected_candidates = fun.iter(candidates) + :filter(is_candidate_connected) + :totable() + + local all_checked = fun.iter(candidates) + :all(is_candidate_checked) + + if all_checked then + return connected_candidates end + + connection_mode_update_cond:wait(delay) end return connected_candidates end blob - db435826f7c6023a0a17648546b1fc1bebfc4676 blob + 00fd5b8f22d1b3cd5ae81e543a7bb28129c21511 --- test/config-luatest/rpc_test.lua +++ test/config-luatest/rpc_test.lua @@ -2,6 +2,7 @@ local t = require('luatest') local fun = require('fun') local treegen = require('luatest.treegen') local server = require('luatest.server') +local socket = require('socket') local helpers = require('test.config-luatest.helpers') local g = helpers.group() @@ -13,8 +14,26 @@ end local function skip_if_no_vshard() t.skip_if(not has_vshard, 'Module "vshard-ee/vshard" is not available') +end + +local function start_stub_servers(g, dir, instances) + g.stub_servers = g.stub_servers or {} + for _, instance in ipairs(instances) do + local uri = ('%s/%s.iproto'):format(dir, instance) + local s = socket.tcp_server('unix/', uri, function() + require('fiber').sleep(11000) + end) + t.assert(s) + g.stub_servers[instance] = s + end end +local function stop_stub_servers(g) + for _, server in pairs(g.stub_servers) do + server:close() + end +end + g.test_connect = function(g) local dir = treegen.prepare_directory({}, {}) local config = [[ @@ -418,7 +437,89 @@ g.test_filter_mode = function(g) g.server_3:exec(check) g.server_4:exec(check) end + +g.test_filter_works_in_parallel = function(g) + local dir = treegen.prepare_directory(g, {}, {}) + local config = [[ + credentials: + users: + guest: + roles: [super] + myuser: + password: "secret" + roles: [replication] + privileges: + - permissions: [execute] + universe: true + + iproto: + listen: + - uri: 'unix/:./{{ instance_name }}.iproto' + advertise: + peer: + login: 'myuser' + + groups: + group-001: + replicasets: + replicaset-001: + instances: + instance-001: + database: + mode: rw + replicaset-002: + instances: + instance-002: {} + instance-003: {} + instance-004: {} + instance-005: {} + ]] + treegen.write_file(dir, 'config.yaml', config) + local opts = { + env = {LUA_PATH = os.environ()['LUA_PATH']}, + config_file = 'config.yaml', + chdir = dir, + } + g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap()) + + g.server_1:start({wait_until_ready = false}) + start_stub_servers(g, dir, { + 'instance-002', + 'instance-003', + 'instance-004', + 'instance-005' + }) + + g.server_1:wait_until_ready() + + local function check_filter() + local connpool = require('experimental.connpool') + local clock = require('clock') + + -- The connection timeout for filter() is hardcoded in + -- connpool and equals to 10 seconds. + local CONNECT_TIMEOUT = 10 + + -- If the connection pool tries to connect in parallel, the + -- execution time is bounded with CONNECT_TIMEOUT plus some + -- small overhead. + -- + -- Make sure we're trying to access the instances in parallel. + local opts = { mode = 'ro' } + local timestamp_before_filter = clock.monotonic() + t.assert_equals(connpool.filter(opts), {}) + local elapsed_time = clock.monotonic() - timestamp_before_filter + t.assert_gt(elapsed_time, CONNECT_TIMEOUT / 2) + t.assert_lt(elapsed_time, CONNECT_TIMEOUT * 3 / 2) + end + + g.server_1:exec(check_filter) +end +g.after_test('test_filter_works_in_parallel', function(g) + stop_stub_servers(g) +end) + g.test_call = function(g) local dir = treegen.prepare_directory({}, {}) local config = [[ @@ -779,4 +880,100 @@ g.test_call_mode = function(g) g.server_2:exec(check) g.server_3:exec(check) g.server_4:exec(check) +end + +g.test_call_works_in_parallel = function(g) + local dir = treegen.prepare_directory(g, {}, {}) + local config = [[ + credentials: + users: + guest: + roles: [super] + myuser: + password: "secret" + roles: [replication] + privileges: + - permissions: [execute] + universe: true + + roles: [one] + + iproto: + listen: + - uri: 'unix/:./{{ instance_name }}.iproto' + advertise: + peer: + login: 'myuser' + + groups: + group-001: + replicasets: + replicaset-001: + instances: + instance-001: + database: + mode: rw + replicaset-002: + instances: + instance-002: {} + instance-003: {} + instance-004: {} + instance-005: {} + ]] + treegen.write_file(dir, 'config.yaml', config) + + local role = string.dump(function() + local function f1() + return box.info.name + end + + rawset(_G, 'f1', f1) + + return { + stop = function() end, + apply = function() end, + validate = function() end, + } + end) + treegen.write_file(dir, 'one.lua', role) + + local opts = { + env = {LUA_PATH = os.environ()['LUA_PATH']}, + config_file = 'config.yaml', + chdir = dir, + } + g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap()) + + g.server_1:start({wait_until_ready = false}) + start_stub_servers(g, dir, { + 'instance-002', + 'instance-003', + 'instance-004', + 'instance-005' + }) + + g.server_1:wait_until_ready() + + local function check_filter() + local connpool = require('experimental.connpool') + local clock = require('clock') + + -- The connection timeout for filter() is hardcoded in + -- connpool and equals to 10 seconds. + local CONNECT_TIMEOUT = 10 + + -- call() internally uses filter(). Ensure it tries to + -- access sinstances in parallel. + local opts = { mode = 'prefer_ro' } + local timestamp_before_filter = clock.monotonic() + t.assert_equals(connpool.call('f1', nil, opts), 'instance-001') + local elapsed_time = clock.monotonic() - timestamp_before_filter + t.assert_gt(elapsed_time, CONNECT_TIMEOUT / 2) + t.assert_lt(elapsed_time, CONNECT_TIMEOUT * 3 / 2) + end + + g.server_1:exec(check_filter) end +g.after_test('test_call_works_in_parallel', function(g) + stop_stub_servers(g) +end)