Commit Diff


commit - 955537b57c2aade58b7ca42501a9bbe50dd91f26
commit + db27af79da9866f0e65965671e4750f9c0cd7f38
blob - /dev/null
blob + 566f39d1c1f304ff9d880926322ceecaf6e2434f (mode 644)
--- /dev/null
+++ changelogs/unreleased/gh-10286-faster-connpool-filter.md
@@ -0,0 +1,4 @@
+## feature/connpool
+
+- `connpool.filter()` now works faster, especially in case there are
+  unavailable instances (gh-10286).
blob - 6c97aa3548e4e91a05be347497609e4f2cee8ad2
blob + d81e3ee5a1e0e99580017c14542668f008bebc4c
--- src/box/lua/connpool.lua
+++ src/box/lua/connpool.lua
@@ -2,6 +2,7 @@ local fiber = require('fiber')
 local clock = require('clock')
 local config = require('config')
 local checks = require('checks')
+local fun = require('fun')
 local netbox = require('net.box')
 
 local WATCHER_DELAY = 0.1
@@ -21,7 +22,12 @@ local function is_connection_valid(conn, opts)
     return true
 end
 
+local connection_mode_update_cond = nil
 local function connect(instance_name, opts)
+    if not connection_mode_update_cond then
+        connection_mode_update_cond = fiber.cond()
+    end
+
     checks('string', {
         connect_timeout = '?number',
         wait_connected = '?boolean',
@@ -58,6 +64,7 @@ local function connect(instance_name, opts)
         conn.mode = mode
         local function watch_status(_key, value)
             conn._mode = value.is_ro and 'ro' or 'rw'
+            connection_mode_update_cond:broadcast()
         end
         conn:watch('box.status', watch_status)
     end
@@ -71,24 +78,49 @@ local function connect(instance_name, opts)
     return conn
 end
 
+local function is_candidate_connected(candidate)
+    local conn = connections[candidate]
+    return conn and conn.state == 'active' and conn:mode() ~= nil
+end
+
+-- Checks whether the candidate has responded with success or
+-- with an error.
+local function is_candidate_checked(candidate)
+    local conn = connections[candidate]
+
+    return not conn or
+           is_candidate_connected(candidate) or
+           conn.state == 'error' or
+           conn.state == 'closed'
+end
+
 local function connect_to_candidates(candidates)
     local delay = WATCHER_DELAY
-    local conn_opts = {connect_timeout = WATCHER_TIMEOUT}
-    local connected_candidates = {}
+    local connect_deadline = clock.monotonic() + WATCHER_TIMEOUT
+
     for _, instance_name in pairs(candidates) do
-        local time_connect_end = clock.monotonic() + WATCHER_TIMEOUT
-        local ok, conn = pcall(connect, instance_name, conn_opts)
-        -- If state is not 'active', conn:mode() cannot become not nil.
-        if ok and conn.state == 'active' then
-            while conn:mode() == nil and clock.monotonic() < time_connect_end do
-                fiber.sleep(delay)
-            end
-            if conn:mode() ~= nil then
-                table.insert(connected_candidates, instance_name)
-            else
-                conn:close()
-            end
+        pcall(connect, instance_name, {
+            wait_connected = false,
+            connect_timeout = WATCHER_TIMEOUT
+        })
+    end
+
+    assert(connection_mode_update_cond ~= nil)
+
+    local connected_candidates = {}
+    while clock.monotonic() < connect_deadline do
+        connected_candidates = fun.iter(candidates)
+            :filter(is_candidate_connected)
+            :totable()
+
+        local all_checked = fun.iter(candidates)
+            :all(is_candidate_checked)
+
+        if all_checked then
+            return connected_candidates
         end
+
+        connection_mode_update_cond:wait(delay)
     end
     return connected_candidates
 end
blob - db435826f7c6023a0a17648546b1fc1bebfc4676
blob + 00fd5b8f22d1b3cd5ae81e543a7bb28129c21511
--- test/config-luatest/rpc_test.lua
+++ test/config-luatest/rpc_test.lua
@@ -2,6 +2,7 @@ local t = require('luatest')
 local fun = require('fun')
 local treegen = require('luatest.treegen')
 local server = require('luatest.server')
+local socket = require('socket')
 local helpers = require('test.config-luatest.helpers')
 
 local g = helpers.group()
@@ -13,8 +14,26 @@ end
 
 local function skip_if_no_vshard()
     t.skip_if(not has_vshard, 'Module "vshard-ee/vshard" is not available')
+end
+
+local function start_stub_servers(g, dir, instances)
+    g.stub_servers = g.stub_servers or {}
+    for _, instance in ipairs(instances) do
+        local uri = ('%s/%s.iproto'):format(dir, instance)
+        local s = socket.tcp_server('unix/', uri, function()
+            require('fiber').sleep(11000)
+        end)
+        t.assert(s)
+        g.stub_servers[instance] = s
+    end
 end
 
+local function stop_stub_servers(g)
+    for _, server in pairs(g.stub_servers) do
+        server:close()
+    end
+end
+
 g.test_connect = function(g)
     local dir = treegen.prepare_directory({}, {})
     local config = [[
@@ -418,7 +437,89 @@ g.test_filter_mode = function(g)
     g.server_3:exec(check)
     g.server_4:exec(check)
 end
+
+g.test_filter_works_in_parallel = function(g)
+    local dir = treegen.prepare_directory(g, {}, {})
+    local config = [[
+    credentials:
+      users:
+        guest:
+          roles: [super]
+        myuser:
+          password: "secret"
+          roles: [replication]
+          privileges:
+          - permissions: [execute]
+            universe: true
+
+    iproto:
+      listen:
+        - uri: 'unix/:./{{ instance_name }}.iproto'
+      advertise:
+        peer:
+          login: 'myuser'
+
+    groups:
+      group-001:
+        replicasets:
+          replicaset-001:
+            instances:
+              instance-001:
+                database:
+                  mode: rw
+          replicaset-002:
+            instances:
+              instance-002: {}
+              instance-003: {}
+              instance-004: {}
+              instance-005: {}
+    ]]
+    treegen.write_file(dir, 'config.yaml', config)
 
+    local opts = {
+        env = {LUA_PATH = os.environ()['LUA_PATH']},
+        config_file = 'config.yaml',
+        chdir = dir,
+    }
+    g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
+
+    g.server_1:start({wait_until_ready = false})
+    start_stub_servers(g, dir, {
+        'instance-002',
+        'instance-003',
+        'instance-004',
+        'instance-005'
+    })
+
+    g.server_1:wait_until_ready()
+
+    local function check_filter()
+        local connpool = require('experimental.connpool')
+        local clock = require('clock')
+
+        -- The connection timeout for filter() is hardcoded in
+        -- connpool and equals to 10 seconds.
+        local CONNECT_TIMEOUT = 10
+
+        -- If the connection pool tries to connect in parallel, the
+        -- execution time is bounded with CONNECT_TIMEOUT plus some
+        -- small overhead.
+        --
+        -- Make sure we're trying to access the instances in parallel.
+        local opts = { mode = 'ro' }
+        local timestamp_before_filter = clock.monotonic()
+        t.assert_equals(connpool.filter(opts), {})
+        local elapsed_time = clock.monotonic() - timestamp_before_filter
+        t.assert_gt(elapsed_time, CONNECT_TIMEOUT / 2)
+        t.assert_lt(elapsed_time, CONNECT_TIMEOUT * 3 / 2)
+    end
+
+    g.server_1:exec(check_filter)
+end
+g.after_test('test_filter_works_in_parallel', function(g)
+    stop_stub_servers(g)
+end)
+
 g.test_call = function(g)
     local dir = treegen.prepare_directory({}, {})
     local config = [[
@@ -779,4 +880,100 @@ g.test_call_mode = function(g)
     g.server_2:exec(check)
     g.server_3:exec(check)
     g.server_4:exec(check)
+end
+
+g.test_call_works_in_parallel = function(g)
+    local dir = treegen.prepare_directory(g, {}, {})
+    local config = [[
+    credentials:
+      users:
+        guest:
+          roles: [super]
+        myuser:
+          password: "secret"
+          roles: [replication]
+          privileges:
+          - permissions: [execute]
+            universe: true
+
+    roles: [one]
+
+    iproto:
+      listen:
+        - uri: 'unix/:./{{ instance_name }}.iproto'
+      advertise:
+        peer:
+          login: 'myuser'
+
+    groups:
+      group-001:
+        replicasets:
+          replicaset-001:
+            instances:
+              instance-001:
+                database:
+                  mode: rw
+          replicaset-002:
+            instances:
+              instance-002: {}
+              instance-003: {}
+              instance-004: {}
+              instance-005: {}
+    ]]
+    treegen.write_file(dir, 'config.yaml', config)
+
+    local role = string.dump(function()
+        local function f1()
+            return box.info.name
+        end
+
+        rawset(_G, 'f1', f1)
+
+        return {
+            stop = function() end,
+            apply = function() end,
+            validate = function() end,
+        }
+    end)
+    treegen.write_file(dir, 'one.lua', role)
+
+    local opts = {
+        env = {LUA_PATH = os.environ()['LUA_PATH']},
+        config_file = 'config.yaml',
+        chdir = dir,
+    }
+    g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
+
+    g.server_1:start({wait_until_ready = false})
+    start_stub_servers(g, dir, {
+        'instance-002',
+        'instance-003',
+        'instance-004',
+        'instance-005'
+    })
+
+    g.server_1:wait_until_ready()
+
+    local function check_filter()
+        local connpool = require('experimental.connpool')
+        local clock = require('clock')
+
+        -- The connection timeout for filter() is hardcoded in
+        -- connpool and equals to 10 seconds.
+        local CONNECT_TIMEOUT = 10
+
+        -- call() internally uses filter(). Ensure it tries to
+        -- access sinstances in parallel.
+        local opts = { mode = 'prefer_ro' }
+        local timestamp_before_filter = clock.monotonic()
+        t.assert_equals(connpool.call('f1', nil, opts), 'instance-001')
+        local elapsed_time = clock.monotonic() - timestamp_before_filter
+        t.assert_gt(elapsed_time, CONNECT_TIMEOUT / 2)
+        t.assert_lt(elapsed_time, CONNECT_TIMEOUT * 3 / 2)
+    end
+
+    g.server_1:exec(check_filter)
 end
+g.after_test('test_call_works_in_parallel', function(g)
+    stop_stub_servers(g)
+end)