Commit Diff


commit - e5834e43350becf3186a0428bec07535a44111de
commit + f2cbc886970451628b335b9b8216be25a0e54c3a
blob - d1276472bc9171a90e0d47184582145bc174203f
blob + 3a1735637b9702ddd6d0f5733b1e8be3637d4ca2
--- src/box/CMakeLists.txt
+++ src/box/CMakeLists.txt
@@ -13,6 +13,7 @@ lua_source(lua_sources lua/net_box.lua)
 lua_source(lua_sources lua/upgrade.lua)
 lua_source(lua_sources lua/console.lua)
 lua_source(lua_sources lua/xlog.lua)
+lua_source(lua_sources lua/ctl.lua)
 set(bin_sources)
 bin_source(bin_sources bootstrap.snap bootstrap.h)
 
blob - /dev/null
blob + d2991a4ee1c3c9e812bc3974bed900efb3b43786 (mode 644)
--- /dev/null
+++ src/box/lua/ctl.lua
@@ -0,0 +1,56 @@
+-- ctl.lua (internal file)
+
+-- checks whether given replica is dead
+local function is_dead(replica)
+    -- self
+    if replica.uuid == box.info.uuid then
+        return false
+    end
+    -- no applier no relay
+    if replica.upstream == nil and replica.downstream == nil then
+        return true
+    end
+    -- disconnected or stopped relay
+    if replica.upstream == nil and replica.downstream ~= nil then
+        if replica.downstream.status == 'disconnected' or replica.downstream.status == 'stopped' then
+            return true
+        else
+            return false
+        end
+    end
+    -- in other cases replica is alive
+    return false
+end
+
+-- return list of replicas suspected to be dead
+function replicaset_list_inactive()
+    local inactive_list = {}
+    local replicaset = box.info.replication
+    for i, replica in pairs(replicaset) do
+        -- current replica is alive
+        if is_dead(replica) then
+            table.insert(inactive_list, replica.uuid)
+        end
+    end
+    return inactive_list
+end
+
+-- throw away any replica from system space
+function replica_prune(uuid)
+    if uuid == nil then
+        error("Usage: replica_displace([uuid])")
+    end
+    box.space._cluster.index.uuid:delete{uuid}
+end
+
+-- delete table of dead replica obtained from replicaset_list_inactive() or
+-- formed by admin
+function replicaset_purge(uuid_table)
+    if uuid_table == nil then
+        error("Usage: replicaset_trim([uuid_table])")
+    end
+   for i in pairs(uuid_table) do
+        print("Deleting replica with uuid ".. i.. " "..uuid_table[i])
+        replica_prune(uuid_table[i])
+    end
+end
blob - ccb4c6a4650ff72606a6f80fb9bc370d746415d3
blob + 0339ea92eb707ddffe01c7e7fc4e31f38eb1a67e
--- src/box/lua/init.c
+++ src/box/lua/init.c
@@ -63,6 +63,7 @@
 extern char session_lua[],
 	tuple_lua[],
 	schema_lua[],
+	ctl_lua[],
 	load_cfg_lua[],
 	xlog_lua[],
 	checkpoint_daemon_lua[],
@@ -82,6 +83,7 @@ static const char *lua_sources[] = {
 	"box/console", console_lua,
 	"box/load_cfg", load_cfg_lua,
 	"box/xlog", xlog_lua,
+	"box/ctl", ctl_lua,
 	NULL
 };
 
blob - /dev/null
blob + f79fe6e02def1d164564a49545b7542d9a046655 (mode 644)
--- /dev/null
+++ test/replication/trim.lua
@@ -0,0 +1,32 @@
+#!/usr/bin/env tarantool
+
+-- get instance name from filename (trim1.lua => trim1)
+local INSTANCE_ID = string.match(arg[0], "%d")
+local SOCKET_DIR = require('fio').cwd()
+local TIMEOUT = tonumber(arg[1])
+local CON_TIMEOUT = arg[2] and tonumber(arg[2]) or 30.0
+
+local function instance_uri(instance_id)
+    --return 'localhost:'..(3310 + instance_id)
+    return SOCKET_DIR..'/trim'..instance_id..'.sock';
+end
+
+require('console').listen(os.getenv('ADMIN'))
+
+box.cfg({
+    listen = instance_uri(INSTANCE_ID);
+    replication_timeout = TIMEOUT;
+    replication_connect_timeout = CON_TIMEOUT;
+    replication = {
+        instance_uri(1);
+        instance_uri(2);
+        instance_uri(3);
+    };
+})
+
+box.once("bootstrap", function()
+    local test_run = require('test_run').new()
+    box.schema.user.grant("guest", 'replication')
+    box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+    box.space.test:create_index('primary')
+end)
blob - /dev/null
blob + f85b3db97c837af50da089ca85cda4a08007255a (mode 644)
--- /dev/null
+++ test/replication/trim.result
@@ -0,0 +1,244 @@
+test_run = require('test_run').new()
+---
+...
+SERVERS = {'trim1', 'trim2', 'trim3'}
+---
+...
+-- Deploy cluster
+test_run:create_cluster(SERVERS, "replication", {args="0.1"})
+---
+...
+test_run:wait_fullmesh(SERVERS)
+---
+...
+test_run:cmd('switch trim1')
+---
+- true
+...
+len = box.space._cluster:len()
+---
+...
+-- no errors
+replicaset_list_inactive()
+---
+- []
+...
+replica_prune()
+---
+- error: 'builtin/box/ctl.lua:41: Usage: replica_displace([uuid])'
+...
+replicaset_purge()
+---
+- error: 'builtin/box/ctl.lua:50: Usage: replicaset_trim([uuid_table])'
+...
+-- create zombies after restart all replicas
+test_run:cmd('switch trim1')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+old_trim2 = test_run:get_param('trim2', 'id')
+---
+...
+old_trim3 = test_run:get_param('trim3', 'id')
+---
+...
+len = box.space._cluster:len()
+---
+...
+test_run:cmd('switch default')
+---
+- true
+...
+test_run:cmd('stop server trim2')
+---
+- true
+...
+test_run:cmd('cleanup server trim2')
+---
+- true
+...
+test_run:cmd('start server trim2')
+---
+- true
+...
+test_run:cmd('stop server trim3')
+---
+- true
+...
+test_run:cmd('cleanup server trim3')
+---
+- true
+...
+test_run:cmd('start server trim3')
+---
+- true
+...
+test_run:cmd('switch trim1')
+---
+- true
+...
+replicaset_list_inactive() ~= nil
+---
+- true
+...
+box.space._cluster:len() == len + #replicaset_list_inactive()
+---
+- true
+...
+-- check that we showed and throw away only dead replicas
+trim2 = test_run:get_param('trim2', 'id')
+---
+...
+trim3 = test_run:get_param('trim3', 'id')
+---
+...
+while box.info.replication[trim2[1]].upstream.status == 'follow' do fiber.sleep(0.01) end
+---
+...
+while box.info.replication[trim3[1]].upstream.status =='follow' do fiber.sleep(0.01) end
+---
+...
+box.info.replication[trim2[1]].downstream.status == nil
+---
+- true
+...
+box.info.replication[trim3[1]].downstream.status == nil
+---
+- true
+...
+box.info.replication[old_trim2[1]].upstream == nil
+---
+- true
+...
+box.info.replication[old_trim3[1]].upstream == nil
+---
+- true
+...
+box.info.replication[old_trim2[1]].downstream.status == 'stopped'
+---
+- true
+...
+box.info.replication[old_trim3[1]].downstream.status == 'stopped'
+---
+- true
+...
+--
+replicaset_list_inactive() == 2
+---
+- false
+...
+replicaset_purge(replicaset_list_inactive())
+---
+...
+#replicaset_list_inactive() == 0
+---
+- true
+...
+box.space._cluster:len() == len
+---
+- true
+...
+box.info.replication[trim2[1]] ~= nil
+---
+- true
+...
+box.info.replication[trim3[1]] ~= nil
+---
+- true
+...
+box.info.replication[trim2[1]].downstream.status == nil
+---
+- true
+...
+box.info.replication[trim3[1]].downstream.status == nil
+---
+- true
+...
+box.info.replication[old_trim2[1]] == nil
+---
+- true
+...
+box.info.replication[old_trim3[1]] == nil
+---
+- true
+...
+-- no applier no relay
+test_run:cmd('switch default')
+---
+- true
+...
+test_run:cmd('stop server trim2')
+---
+- true
+...
+test_run:cmd('cleanup server trim2')
+---
+- true
+...
+test_run:cmd('start server trim2')
+---
+- true
+...
+test_run:cmd('stop server trim3')
+---
+- true
+...
+test_run:cmd('cleanup server trim3')
+---
+- true
+...
+test_run:cmd('start server trim3')
+---
+- true
+...
+test_run:cmd('stop server trim1')
+---
+- true
+...
+test_run:cmd('cleanup server trim1')
+---
+- true
+...
+test_run:cmd('start server trim1')
+---
+- true
+...
+test_run:cmd('switch trim1')
+---
+- true
+...
+box.ctl.wait_rw(10)
+---
+- error: timed out
+...
+inactive = replicaset_list_inactive()
+---
+...
+-- prune given replica
+replica_prune(inactive[1])
+---
+- error: Can't modify data because this instance is in read-only mode.
+...
+#replicaset_list_inactive() ~= #inactive
+---
+- false
+...
+replicaset_purge(replicaset_list_inactive())
+---
+- error: Can't modify data because this instance is in read-only mode.
+...
+box.space._cluster:len() == 3
+---
+- false
+...
+-- Cleanup
+test_run:cmd('switch default')
+---
+- true
+...
+test_run:drop_cluster(SERVERS)
+---
+...
blob - /dev/null
blob + 2c467e313f1de8835364c119470d3c8c5098ec91 (mode 644)
--- /dev/null
+++ test/replication/trim.test.lua
@@ -0,0 +1,90 @@
+test_run = require('test_run').new()
+
+SERVERS = {'trim1', 'trim2', 'trim3'}
+
+-- Deploy cluster
+test_run:create_cluster(SERVERS, "replication", {args="0.1"})
+test_run:wait_fullmesh(SERVERS)
+
+test_run:cmd('switch trim1')
+len = box.space._cluster:len()
+
+-- no errors
+replicaset_list_inactive()
+replica_prune()
+replicaset_purge()
+
+-- create zombies after restart all replicas
+test_run:cmd('switch trim1')
+fiber = require('fiber')
+old_trim2 = test_run:get_param('trim2', 'id')
+old_trim3 = test_run:get_param('trim3', 'id')
+
+len = box.space._cluster:len()
+test_run:cmd('switch default')
+test_run:cmd('stop server trim2')
+test_run:cmd('cleanup server trim2')
+test_run:cmd('start server trim2')
+test_run:cmd('stop server trim3')
+test_run:cmd('cleanup server trim3')
+test_run:cmd('start server trim3')
+test_run:cmd('switch trim1')
+
+replicaset_list_inactive() ~= nil
+box.space._cluster:len() == len + #replicaset_list_inactive()
+
+-- check that we showed and throw away only dead replicas
+trim2 = test_run:get_param('trim2', 'id')
+trim3 = test_run:get_param('trim3', 'id')
+
+while box.info.replication[trim2[1]].upstream.status == 'follow' do fiber.sleep(0.01) end
+while box.info.replication[trim3[1]].upstream.status =='follow' do fiber.sleep(0.01) end
+box.info.replication[trim2[1]].downstream.status == nil
+box.info.replication[trim3[1]].downstream.status == nil
+
+box.info.replication[old_trim2[1]].upstream == nil
+box.info.replication[old_trim3[1]].upstream == nil
+box.info.replication[old_trim2[1]].downstream.status == 'stopped'
+box.info.replication[old_trim3[1]].downstream.status == 'stopped'
+--
+replicaset_list_inactive() == 2
+replicaset_purge(replicaset_list_inactive())
+#replicaset_list_inactive() == 0
+box.space._cluster:len() == len
+
+box.info.replication[trim2[1]] ~= nil
+box.info.replication[trim3[1]] ~= nil
+box.info.replication[trim2[1]].downstream.status == nil
+box.info.replication[trim3[1]].downstream.status == nil
+
+box.info.replication[old_trim2[1]] == nil
+box.info.replication[old_trim3[1]] == nil
+
+
+-- no applier no relay
+
+test_run:cmd('switch default')
+test_run:cmd('stop server trim2')
+test_run:cmd('cleanup server trim2')
+test_run:cmd('start server trim2')
+test_run:cmd('stop server trim3')
+test_run:cmd('cleanup server trim3')
+test_run:cmd('start server trim3')
+test_run:cmd('stop server trim1')
+test_run:cmd('cleanup server trim1')
+test_run:cmd('start server trim1')
+test_run:cmd('switch trim1')
+
+box.ctl.wait_rw(10)
+
+inactive = replicaset_list_inactive()
+
+-- prune given replica
+replica_prune(inactive[1])
+#replicaset_list_inactive() ~= #inactive
+replicaset_purge(replicaset_list_inactive())
+box.space._cluster:len() == 3
+
+-- Cleanup
+test_run:cmd('switch default')
+test_run:drop_cluster(SERVERS)
blob - /dev/null
blob + 14e98bd68fae33b7b53b3c0912c78e32cc4ac797 (mode 120000)
--- /dev/null
+++ test/replication/trim1.lua
@@ -0,0 +1 @@
+trim.lua
\ No newline at end of file
blob - /dev/null
blob + 14e98bd68fae33b7b53b3c0912c78e32cc4ac797 (mode 120000)
--- /dev/null
+++ test/replication/trim2.lua
@@ -0,0 +1 @@
+trim.lua
\ No newline at end of file
blob - /dev/null
blob + 14e98bd68fae33b7b53b3c0912c78e32cc4ac797 (mode 120000)
--- /dev/null
+++ test/replication/trim3.lua
@@ -0,0 +1 @@
+trim.lua
\ No newline at end of file