Commit Diff


commit - 4c7d8281502b1a26c59ffc9a87e082e7e8826932
commit + 9fcbbb3e7d5e9f5a876ee27a7bf93303321e26b2
blob - df48b47962d2abf98328d13178341a10d4f2e482
blob + 6ca4cca94b25ee2a129698feeb1d29567b03f6bc
--- src/box/applier.cc
+++ src/box/applier.cc
@@ -737,6 +737,7 @@ applier_apply_tx(struct stailq *rows)
 	struct xrow_header *first_row = &stailq_first_entry(rows,
 					struct applier_tx_row, next)->row;
 	struct xrow_header *last_row;
+	last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
 	struct replica *replica = replica_by_id(first_row->replica_id);
 	/*
 	 * In a full mesh topology, the same set of changes
@@ -748,9 +749,28 @@ applier_apply_tx(struct stailq *rows)
 			       &replicaset.applier.order_latch);
 	latch_lock(latch);
 	if (vclock_get(&replicaset.applier.vclock,
-		       first_row->replica_id) >= first_row->lsn) {
+		       last_row->replica_id) >= last_row->lsn) {
 		latch_unlock(latch);
 		return 0;
+	} else if (vclock_get(&replicaset.applier.vclock,
+			      first_row->replica_id) >= first_row->lsn) {
+		/*
+		 * We've received part of the tx from an old
+		 * instance not knowing of tx boundaries.
+		 * Skip the already applied part.
+		 */
+		struct xrow_header *tmp;
+		while (true) {
+			tmp = &stailq_first_entry(rows,
+						  struct applier_tx_row,
+						  next)->row;
+			if (tmp->lsn <= vclock_get(&replicaset.applier.vclock,
+						   tmp->replica_id)) {
+				stailq_shift(rows);
+			} else {
+				break;
+			}
+		}
 	}
 
 	/**
@@ -835,7 +855,6 @@ applier_apply_tx(struct stailq *rows)
 	 * instances, which send every single tx row as a separate
 	 * transaction.
 	 */
-	last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
 	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
 		      last_row->lsn);
 	latch_unlock(latch);