commit - 4c7d8281502b1a26c59ffc9a87e082e7e8826932
commit + 9fcbbb3e7d5e9f5a876ee27a7bf93303321e26b2
blob - df48b47962d2abf98328d13178341a10d4f2e482
blob + 6ca4cca94b25ee2a129698feeb1d29567b03f6bc
--- src/box/applier.cc
+++ src/box/applier.cc
struct xrow_header *first_row = &stailq_first_entry(rows,
struct applier_tx_row, next)->row;
struct xrow_header *last_row;
+ last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
struct replica *replica = replica_by_id(first_row->replica_id);
/*
* In a full mesh topology, the same set of changes
&replicaset.applier.order_latch);
latch_lock(latch);
if (vclock_get(&replicaset.applier.vclock,
- first_row->replica_id) >= first_row->lsn) {
+ last_row->replica_id) >= last_row->lsn) {
latch_unlock(latch);
return 0;
+ } else if (vclock_get(&replicaset.applier.vclock,
+ first_row->replica_id) >= first_row->lsn) {
+ /*
+ * We've received part of the tx from an old
+ * instance not knowing of tx boundaries.
+ * Skip the already applied part.
+ */
+ struct xrow_header *tmp;
+ while (true) {
+ tmp = &stailq_first_entry(rows,
+ struct applier_tx_row,
+ next)->row;
+ if (tmp->lsn <= vclock_get(&replicaset.applier.vclock,
+ tmp->replica_id)) {
+ stailq_shift(rows);
+ } else {
+ break;
+ }
+ }
}
/**
* instances, which send every single tx row as a separate
* transaction.
*/
- last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
last_row->lsn);
latch_unlock(latch);