commit - 5ebbed7711d442e9b5ea21703c4aa5ba613a1a5d
commit + 1f75231a0c0ce062e08660e0bc51485f7086412c
blob - 4c550c6b3c65aed1d27fb8b4a0692c67f06863c6
blob + 813fca94158c75f59e835cc1724037b4cca3ea39
--- src/box/relay.cc
+++ src/box/relay.cc
/*
* Save the first vclock as 'received'. Because it was really received.
*/
- vclock_copy(&relay->last_recv_ack.vclock, start_vclock);
+ vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock);
relay->r = recovery_new(wal_dir(), false, start_vclock);
vclock_copy(&relay->stop_vclock, stop_vclock);
while (!fiber_is_cancelled()) {
FiberGCChecker gc_check;
struct xrow_header xrow;
+ ERROR_INJECT_YIELD(ERRINJ_RELAY_READ_ACK_DELAY);
coio_read_xrow_timeout_xc(relay->io, &ibuf, &xrow,
replication_disconnect_timeout());
xrow_decode_applier_heartbeat_xc(&xrow, last_recv_ack);
/*
* Save the first vclock as 'received'. Because it was really received.
*/
- vclock_copy(&relay->last_recv_ack.vclock, start_vclock);
+ vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock);
relay->r = recovery_new(wal_dir(), false, start_vclock);
- vclock_copy(&relay->tx.vclock, start_vclock);
+ vclock_copy_ignore0(&relay->tx.vclock, start_vclock);
relay->version_id = replica_version_id;
-
relay->id_filter |= replica_id_filter;
struct cord cord;
blob - b277159754d8c89f62f8f8954e912b375b621226
blob + 7783106975df0e69de0ce685f99cc7390a997aec
--- src/lib/core/errinj.h
+++ src/lib/core/errinj.h
_(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
_(ERRINJ_RELAY_WAL_START_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_RELAY_READ_ACK_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_REPLICASET_VCLOCK, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_REPLICA_JOIN_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_SIGILL_MAIN_THREAD, ERRINJ_BOOL, {.bparam = false}) \
blob - 6034259b3f0837ec63c0ecaf278a4f6cdb8305e3
blob + 85aa011beecdd272ae6ca412dfd2d367928930b5
--- src/lib/vclock/vclock.h
+++ src/lib/vclock/vclock.h
sizeof(*dst->lsn) * max_pos);
}
+static inline void
+vclock_copy_ignore0(struct vclock *dst, const struct vclock *src)
+{
+ vclock_copy(dst, src);
+ vclock_reset(dst, 0, 0);
+}
+
static inline uint32_t
vclock_size(const struct vclock *vclock)
{
blob - e31d2317c7673df2da67de3730df123555ab3c2d
blob + df5a3f06f253d294158288a44d00a2a1b86f5d9a
--- test/box/errinj.result
+++ test/box/errinj.result
- ERRINJ_RELAY_FINAL_JOIN: false
- ERRINJ_RELAY_FINAL_SLEEP: false
- ERRINJ_RELAY_FROM_TX_DELAY: false
+ - ERRINJ_RELAY_READ_ACK_DELAY: false
- ERRINJ_RELAY_REPORT_INTERVAL: 0
- ERRINJ_RELAY_SEND_DELAY: false
- ERRINJ_RELAY_TIMEOUT: 0
blob - /dev/null
blob + c796b0d968286cc1964fbe45265d52aea75c07dd (mode 644)
--- /dev/null
+++ test/replication-luatest/gh_10047_local_vclock_downstream_test.lua
+local t = require('luatest')
+local server = require('luatest.server')
+local replica_set = require('luatest.replica_set')
+
+local g = t.group('gh_10047')
+local wait_timeout = 10
+
+g.before_all(function(cg)
+ t.tarantool.skip_if_not_debug()
+ cg.replica_set = replica_set:new({})
+ cg.replication = {
+ server.build_listen_uri('server1', cg.replica_set.id),
+ }
+ cg.server1 = cg.replica_set:build_and_add_server{
+ alias = 'server1',
+ box_cfg = {
+ replication_timeout = 0.1,
+ },
+ }
+ cg.server2 = cg.replica_set:build_and_add_server{
+ alias = 'server2',
+ box_cfg = {
+ replication_timeout = 0.1,
+ replication = server.build_listen_uri('server1', cg.replica_set.id),
+ },
+ }
+ cg.replica_set:start()
+ cg.server1:exec(function()
+ local s1 = box.schema.create_space('test')
+ s1:create_index('pk')
+ local s2 = box.schema.create_space('test_loc', {is_local = true})
+ s2:create_index('pk')
+ end)
+end)
+
+g.after_all(function(cg)
+ cg.replica_set:drop()
+end)
+
+--
+-- gh-10047: relay used to save local vclock[0] as the last received ACK from
+-- the replica which just subscribed and didn't send any real ACKs. When a real
+-- ACK was received, it didn't have vclock[0] and it looked like vclock[0] went
+-- backwards. That broke an assert in relay.
+--
+g.test_downstream_vclock_no_local = function(cg)
+ -- Make sure there is a local row on master and vclock isn't empty.
+ cg.server1:exec(function()
+ box.space.test:replace{1}
+ box.space.test_loc:replace{1}
+ end)
+ cg.server2:wait_for_vclock_of(cg.server1)
+ local server2_id = cg.server2:get_instance_id()
+ cg.server2:stop()
+ cg.server1:exec(function()
+ -- On restart the replica's ACKs are not received for a while. Need to
+ -- catch the moment when the subscribe vclock appears in
+ -- info.replication and it isn't yet overridden by a real ACK.
+ box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", true)
+ -- While the replica is away, the master moves a bit. To make replica's
+ -- vclock smaller so as it would receive actual data after subscribe and
+ -- send a real ACK (not just an empty heartbeat).
+ box.space.test:replace{2}
+ end)
+ cg.server2:start()
+ cg.server1:exec(function(id, timeout)
+ local fiber = require('fiber')
+ -- Wait until subscribe is done.
+ t.helpers.retrying({timeout = timeout}, function()
+ local info = box.info.replication[id]
+ if info and info.downstream and info.downstream.vclock and
+ next(info.downstream.vclock) then
+ t.assert_equals(info.downstream.vclock[0], nil)
+ return
+ end
+ error("No subscribe from the replica")
+ end)
+ -- When subscribe is just finished, relay has subscribe vclock saved as
+ -- last-ACK. But the crash was triggered when before-last-ACK was >=
+ -- last-ACK. The latter becomes the former when relay exchanges status
+ -- messages with TX thread.
+ --
+ -- Hence need to wait until the TX thread gets notified by the relay
+ -- about anything.
+ t.helpers.retrying({timeout = timeout}, function()
+ local ack_period = box.cfg.replication_timeout
+ fiber.sleep(ack_period)
+ local info = box.info.replication[id]
+ if info and info.downstream and info.downstream.idle and
+ info.downstream.idle < ack_period then
+ return
+ end
+ error("No report to TX thread")
+ end)
+ -- Receive a real ACK. It must be >= subscribe vclock or master dies.
+ box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", false)
+ end, {server2_id, wait_timeout})
+ cg.server2:wait_for_vclock_of(cg.server1)
+end