Commit Diff


commit - c2c87816ff11447d2e5861eb509eaf6d0da6de1c
commit + d615f3f74f9c9a36029b39e01dc6e9a1495442d5
blob - /dev/null
blob + 4bc8f67d21b5f1be7c193ec2a9d55a21046d633c (mode 644)
--- /dev/null
+++ changelogs/unreleased/gh-9917-speed-up-transaction-queue-processing.md
@@ -0,0 +1,3 @@
+## bugfix/limbo
+
+* Optimized synchronous transaction queue processing (gh-9917).
blob - 46157af1fc9bf3d149c710d3633ab1696ae84658
blob + 8a9945bf5dacfd2ce96f1efdedbe446602d5c1d1
--- src/box/txn_limbo.c
+++ src/box/txn_limbo.c
@@ -53,6 +53,7 @@ txn_limbo_create(struct txn_limbo *limbo)
 	limbo->promote_greatest_term = 0;
 	latch_create(&limbo->promote_latch);
 	limbo->confirmed_lsn = 0;
+	limbo->entry_to_confirm = NULL;
 	limbo->rollback_count = 0;
 	limbo->is_in_rollback = false;
 	limbo->svp_confirmed_lsn = -1;
@@ -161,13 +162,17 @@ txn_limbo_append(struct txn_limbo *limbo, uint32_t id,
 	size_t size;
 	struct txn_limbo_entry *e = region_alloc_object(&txn->region,
 							typeof(*e), &size);
+	if (limbo->entry_to_confirm == NULL &&
+	    txn_has_flag(txn, TXN_WAIT_ACK)) {
+		limbo->entry_to_confirm = e;
+		limbo->ack_count = 0;
+	}
 	if (e == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc_object", "e");
 		return NULL;
 	}
 	e->txn = txn;
 	e->lsn = -1;
-	e->ack_count = 0;
 	e->is_commit = false;
 	e->is_rollback = false;
 	rlist_add_tail_entry(&limbo->queue, e, in_queue);
@@ -200,6 +205,8 @@ void
 txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
 	entry->is_rollback = true;
+	if (entry == limbo->entry_to_confirm)
+		limbo->entry_to_confirm = NULL;
 	/*
 	 * The simple rule about rollback/commit order applies
 	 * here as well: commit always in the order of WAL write,
@@ -228,6 +235,9 @@ txn_limbo_assign_remote_lsn(struct txn_limbo *limbo,
 	entry->lsn = lsn;
 }
 
+static void
+txn_limbo_confirm(struct txn_limbo *limbo);
+
 void
 txn_limbo_assign_local_lsn(struct txn_limbo *limbo,
 			   struct txn_limbo_entry *entry, int64_t lsn)
@@ -238,19 +248,8 @@ txn_limbo_assign_local_lsn(struct txn_limbo *limbo,
 	assert(lsn > 0);
 
 	entry->lsn = lsn;
-	/*
-	 * The entry just got its LSN after a WAL write. It could
-	 * happen that this LSN was already ACKed by some
-	 * replicas. Update the ACK counter to take them into
-	 * account.
-	 */
-	struct vclock_iterator iter;
-	vclock_iterator_init(&iter, &limbo->vclock);
-	int ack_count = 0;
-	vclock_foreach(&iter, vc)
-		ack_count += vc.lsn >= lsn;
-	assert(ack_count >= entry->ack_count);
-	entry->ack_count = ack_count;
+	if (entry == limbo->entry_to_confirm)
+		limbo->ack_count = vclock_count_ge(&limbo->vclock, entry->lsn);
 }
 
 void
@@ -593,6 +592,7 @@ txn_limbo_read_promote(struct txn_limbo *limbo, uint32
 	limbo->owner_id = replica_id;
 	limbo->confirmed_lsn = vclock_get(&limbo->confirmed_vclock,
 					  replica_id);
+	limbo->entry_to_confirm = NULL;
 	box_update_ro_summary();
 }
 
@@ -629,6 +629,56 @@ txn_limbo_read_demote(struct txn_limbo *limbo, int64_t
 	return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, lsn);
 }
 
+/**
+ * Check that some synchronous transactions have gathered quorum and
+ * write a confirmation entry of the last confirmed transaction.
+ */
+static void
+txn_limbo_confirm(struct txn_limbo *limbo)
+{
+	assert(limbo->owner_id == instance_id);
+	if (limbo->is_in_rollback)
+		return;
+	if (limbo->entry_to_confirm == NULL ||
+	    limbo->entry_to_confirm->lsn == -1)
+		return;
+	if (limbo->ack_count < replication_synchro_quorum)
+		return;
+	int32_t k = (int32_t)vclock_size(&limbo->vclock)
+		    - replication_synchro_quorum;
+	/**
+	 * limbo->ack_count >= replication_synchro_quorum =>
+	 * vclock_size(&limbo->vclock) >= replication_synchro_quorum
+	 */
+	assert(k >= 0);
+	int64_t confirm_lsn = vclock_nth_element(&limbo->vclock, k);
+	assert(confirm_lsn >= limbo->entry_to_confirm->lsn);
+	struct txn_limbo_entry *e = limbo->entry_to_confirm;
+	limbo->entry_to_confirm = NULL;
+	int64_t max_assigned_lsn = -1;
+	for (; !rlist_entry_is_head(e, &limbo->queue, in_queue);
+	       e = rlist_next_entry(e, in_queue)) {
+		if (!txn_has_flag(e->txn, TXN_WAIT_ACK))
+			continue;
+		if (e->lsn == -1 || e->lsn > confirm_lsn) {
+			limbo->entry_to_confirm = e;
+			/**
+			 * It may be that a quorum has been gathered, but
+			 * ack_count = 0. It's ok. CONFIRM will be written as
+			 * soon as the lsn is assigned to the transaction.
+			 */
+			limbo->ack_count = (e->lsn == -1) ? 0 :
+				vclock_count_ge(&limbo->vclock, e->lsn);
+			break;
+		} else {
+			max_assigned_lsn = e->lsn;
+		}
+	}
+	assert(max_assigned_lsn != -1);
+	txn_limbo_write_confirm(limbo, max_assigned_lsn);
+	txn_limbo_read_confirm(limbo, max_assigned_lsn);
+}
+
 void
 txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 {
@@ -651,9 +701,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replic
 		return;
 	assert(limbo->owner_id != REPLICA_ID_NIL);
 	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
+	assert(lsn >= prev_lsn);
 	/*
 	 * One of the reasons why can happen - the remote instance is not
-	 * read-only and wrote something under its own insance_id. For qsync
+	 * read-only and wrote something under its own instance_id. For qsync
 	 * that most likely means that the remote instance decided to take over
 	 * the limbo ownership, and the current node is going to become a
 	 * replica very soon.
@@ -661,31 +712,15 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replic
 	if (lsn == prev_lsn)
 		return;
 	vclock_follow(&limbo->vclock, replica_id, lsn);
-	struct txn_limbo_entry *e;
-	int64_t confirm_lsn = -1;
-	rlist_foreach_entry(e, &limbo->queue, in_queue) {
-		assert(e->ack_count <= VCLOCK_MAX);
-		if (e->lsn > lsn)
-			break;
-		/*
-		 * Sync transactions need to collect acks. Async
-		 * transactions are automatically committed right
-		 * after all the previous sync transactions are.
-		 */
-		if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
-			continue;
-		} else if (e->lsn <= prev_lsn) {
-			continue;
-		} else if (++e->ack_count < replication_synchro_quorum) {
-			continue;
-		} else {
-			confirm_lsn = e->lsn;
-		}
+
+	if (limbo->entry_to_confirm != NULL &&
+	    limbo->entry_to_confirm->lsn != -1) {
+		if (limbo->entry_to_confirm->lsn <= prev_lsn ||
+		    lsn < limbo->entry_to_confirm->lsn)
+			return;
+		if (++limbo->ack_count >= replication_synchro_quorum)
+			txn_limbo_confirm(limbo);
 	}
-	if (confirm_lsn == -1 || confirm_lsn <= limbo->confirmed_lsn)
-		return;
-	txn_limbo_write_confirm(limbo, confirm_lsn);
-	txn_limbo_read_confirm(limbo, confirm_lsn);
 }
 
 /**
@@ -1225,23 +1260,9 @@ txn_limbo_on_parameters_change(struct txn_limbo *limbo
 {
 	if (rlist_empty(&limbo->queue) || txn_limbo_is_frozen(limbo))
 		return;
-	struct txn_limbo_entry *e;
-	int64_t confirm_lsn = -1;
-	rlist_foreach_entry(e, &limbo->queue, in_queue) {
-		assert(e->ack_count <= VCLOCK_MAX);
-		if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
-			continue;
-		} else if (e->ack_count < replication_synchro_quorum) {
-			continue;
-		} else {
-			confirm_lsn = e->lsn;
-			assert(confirm_lsn > 0);
-		}
-	}
-	if (confirm_lsn > limbo->confirmed_lsn && !limbo->is_in_rollback) {
-		txn_limbo_write_confirm(limbo, confirm_lsn);
-		txn_limbo_read_confirm(limbo, confirm_lsn);
-	}
+	/* The replication_synchro_quorum value may have changed. */
+	if (limbo->owner_id == instance_id)
+		txn_limbo_confirm(limbo);
 	/*
 	 * Wakeup all the others - timed out will rollback. Also
 	 * there can be non-transactional waiters, such as CONFIRM
blob - ce93c9d10c100476addf324a474d81ae07423085
blob + 0da59d176b00c26b38980f4dca79d8f7a0fde815
--- src/box/txn_limbo.h
+++ src/box/txn_limbo.h
@@ -58,11 +58,6 @@ struct txn_limbo_entry {
 	 */
 	int64_t lsn;
 	/**
-	 * Number of ACKs. Or in other words - how many replicas
-	 * confirmed receipt of the transaction.
-	 */
-	int ack_count;
-	/**
 	 * Result flags. Only one of them can be true. But both
 	 * can be false if the transaction is still waiting for
 	 * its resolution.
@@ -166,6 +161,19 @@ struct txn_limbo {
 	 * illegal.
 	 */
 	int64_t confirmed_lsn;
+	/**
+	 * The first unconfirmed synchronous transaction in the current term.
+	 * Is NULL if there is no such transaction, or if the current instance
+	 * does not own limbo.
+	 */
+	struct txn_limbo_entry *entry_to_confirm;
+	/**
+	 * Number of ACKs of the first unconfirmed synchronous transaction
+	 * (entry_to_confirm->txn). Contains the actual value only for a
+	 * non-NULL entry_to_confirm with a local lsn assigned. Otherwise
+	 * it may contain any trash.
+	 */
+	int ack_count;
 	/**
 	 * Total number of performed rollbacks. It used as a guard
 	 * to do some actions assuming all limbo transactions will