Commit Diff


commit - 5ebbed7711d442e9b5ea21703c4aa5ba613a1a5d
commit + 1f75231a0c0ce062e08660e0bc51485f7086412c
blob - 4c550c6b3c65aed1d27fb8b4a0692c67f06863c6
blob + 813fca94158c75f59e835cc1724037b4cca3ea39
--- src/box/relay.cc
+++ src/box/relay.cc
@@ -552,7 +552,7 @@ relay_final_join(struct replica *replica, struct iostr
 	/*
 	 * Save the first vclock as 'received'. Because it was really received.
 	 */
-	vclock_copy(&relay->last_recv_ack.vclock, start_vclock);
+	vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock);
 	relay->r = recovery_new(wal_dir(), false, start_vclock);
 	vclock_copy(&relay->stop_vclock, stop_vclock);
 
@@ -780,6 +780,7 @@ relay_reader_f(va_list ap)
 		while (!fiber_is_cancelled()) {
 			FiberGCChecker gc_check;
 			struct xrow_header xrow;
+			ERROR_INJECT_YIELD(ERRINJ_RELAY_READ_ACK_DELAY);
 			coio_read_xrow_timeout_xc(relay->io, &ibuf, &xrow,
 					replication_disconnect_timeout());
 			xrow_decode_applier_heartbeat_xc(&xrow, last_recv_ack);
@@ -1113,11 +1114,10 @@ relay_subscribe(struct replica *replica, struct iostre
 	/*
 	 * Save the first vclock as 'received'. Because it was really received.
 	 */
-	vclock_copy(&relay->last_recv_ack.vclock, start_vclock);
+	vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock);
 	relay->r = recovery_new(wal_dir(), false, start_vclock);
-	vclock_copy(&relay->tx.vclock, start_vclock);
+	vclock_copy_ignore0(&relay->tx.vclock, start_vclock);
 	relay->version_id = replica_version_id;
-
 	relay->id_filter |= replica_id_filter;
 
 	struct cord cord;
blob - b277159754d8c89f62f8f8954e912b375b621226
blob + 7783106975df0e69de0ce685f99cc7390a997aec
--- src/lib/core/errinj.h
+++ src/lib/core/errinj.h
@@ -121,6 +121,7 @@ struct errinj {
 	_(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_RELAY_WAL_START_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_RELAY_READ_ACK_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_REPLICASET_VCLOCK, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_REPLICA_JOIN_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_SIGILL_MAIN_THREAD, ERRINJ_BOOL, {.bparam = false}) \
blob - 6034259b3f0837ec63c0ecaf278a4f6cdb8305e3
blob + 85aa011beecdd272ae6ca412dfd2d367928930b5
--- src/lib/vclock/vclock.h
+++ src/lib/vclock/vclock.h
@@ -202,6 +202,13 @@ vclock_copy(struct vclock *dst, const struct vclock *s
 			 sizeof(*dst->lsn) * max_pos);
 }
 
+static inline void
+vclock_copy_ignore0(struct vclock *dst, const struct vclock *src)
+{
+	vclock_copy(dst, src);
+	vclock_reset(dst, 0, 0);
+}
+
 static inline uint32_t
 vclock_size(const struct vclock *vclock)
 {
blob - e31d2317c7673df2da67de3730df123555ab3c2d
blob + df5a3f06f253d294158288a44d00a2a1b86f5d9a
--- test/box/errinj.result
+++ test/box/errinj.result
@@ -90,6 +90,7 @@ evals
   - ERRINJ_RELAY_FINAL_JOIN: false
   - ERRINJ_RELAY_FINAL_SLEEP: false
   - ERRINJ_RELAY_FROM_TX_DELAY: false
+  - ERRINJ_RELAY_READ_ACK_DELAY: false
   - ERRINJ_RELAY_REPORT_INTERVAL: 0
   - ERRINJ_RELAY_SEND_DELAY: false
   - ERRINJ_RELAY_TIMEOUT: 0
blob - /dev/null
blob + c796b0d968286cc1964fbe45265d52aea75c07dd (mode 644)
--- /dev/null
+++ test/replication-luatest/gh_10047_local_vclock_downstream_test.lua
@@ -0,0 +1,99 @@
+local t = require('luatest')
+local server = require('luatest.server')
+local replica_set = require('luatest.replica_set')
+
+local g = t.group('gh_10047')
+local wait_timeout = 10
+
+g.before_all(function(cg)
+    t.tarantool.skip_if_not_debug()
+    cg.replica_set = replica_set:new({})
+    cg.replication = {
+        server.build_listen_uri('server1', cg.replica_set.id),
+    }
+    cg.server1 = cg.replica_set:build_and_add_server{
+        alias = 'server1',
+        box_cfg = {
+            replication_timeout = 0.1,
+        },
+    }
+    cg.server2 = cg.replica_set:build_and_add_server{
+        alias = 'server2',
+        box_cfg = {
+            replication_timeout = 0.1,
+            replication = server.build_listen_uri('server1', cg.replica_set.id),
+        },
+    }
+    cg.replica_set:start()
+    cg.server1:exec(function()
+        local s1 = box.schema.create_space('test')
+        s1:create_index('pk')
+        local s2 = box.schema.create_space('test_loc', {is_local = true})
+        s2:create_index('pk')
+    end)
+end)
+
+g.after_all(function(cg)
+    cg.replica_set:drop()
+end)
+
+--
+-- gh-10047: relay used to save local vclock[0] as the last received ACK from
+-- the replica which just subscribed and didn't send any real ACKs. When a real
+-- ACK was received, it didn't have vclock[0] and it looked like vclock[0] went
+-- backwards. That broke an assert in relay.
+--
+g.test_downstream_vclock_no_local = function(cg)
+    -- Make sure there is a local row on master and vclock isn't empty.
+    cg.server1:exec(function()
+        box.space.test:replace{1}
+        box.space.test_loc:replace{1}
+    end)
+    cg.server2:wait_for_vclock_of(cg.server1)
+    local server2_id = cg.server2:get_instance_id()
+    cg.server2:stop()
+    cg.server1:exec(function()
+        -- On restart the replica's ACKs are not received for a while. Need to
+        -- catch the moment when the subscribe vclock appears in
+        -- info.replication and it isn't yet overridden by a real ACK.
+        box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", true)
+        -- While the replica is away, the master moves a bit. To make replica's
+        -- vclock smaller so as it would receive actual data after subscribe and
+        -- send a real ACK (not just an empty heartbeat).
+        box.space.test:replace{2}
+    end)
+    cg.server2:start()
+    cg.server1:exec(function(id, timeout)
+        local fiber = require('fiber')
+        -- Wait until subscribe is done.
+        t.helpers.retrying({timeout = timeout}, function()
+            local info = box.info.replication[id]
+            if info and info.downstream and info.downstream.vclock and
+               next(info.downstream.vclock) then
+                t.assert_equals(info.downstream.vclock[0], nil)
+                return
+            end
+            error("No subscribe from the replica")
+        end)
+        -- When subscribe is just finished, relay has subscribe vclock saved as
+        -- last-ACK. But the crash was triggered when before-last-ACK was >=
+        -- last-ACK. The latter becomes the former when relay exchanges status
+        -- messages with TX thread.
+        --
+        -- Hence need to wait until the TX thread gets notified by the relay
+        -- about anything.
+        t.helpers.retrying({timeout = timeout}, function()
+            local ack_period = box.cfg.replication_timeout
+            fiber.sleep(ack_period)
+            local info = box.info.replication[id]
+            if info and info.downstream and info.downstream.idle and
+               info.downstream.idle < ack_period then
+                return
+            end
+            error("No report to TX thread")
+        end)
+        -- Receive a real ACK. It must be >= subscribe vclock or master dies.
+        box.error.injection.set("ERRINJ_RELAY_READ_ACK_DELAY", false)
+    end, {server2_id, wait_timeout})
+    cg.server2:wait_for_vclock_of(cg.server1)
+end