commit - 7fa6f1a5a0b48cc7616e72d5b3d8a38b0311b1dc
commit + 803a77b2da6aedde7e333a5480f2a66eae623910
blob - /dev/null
blob + 4bc8f67d21b5f1be7c193ec2a9d55a21046d633c (mode 644)
--- /dev/null
+++ changelogs/unreleased/gh-9917-speed-up-transaction-queue-processing.md
+## bugfix/limbo
+
+* Optimized synchronous transaction queue processing (gh-9917).
blob - bc6de2cb5acce780ca822573144adfd33cecb6f0
blob + fb2a4376cb4efa47538fd3e64d4cb73d9a165516
--- src/box/txn_limbo.c
+++ src/box/txn_limbo.c
limbo->promote_greatest_term = 0;
latch_create(&limbo->promote_latch);
limbo->confirmed_lsn = 0;
+ limbo->entry_to_confirm = NULL;
limbo->rollback_count = 0;
limbo->is_in_rollback = false;
limbo->svp_confirmed_lsn = -1;
size_t size;
struct txn_limbo_entry *e = region_alloc_object(&txn->region,
typeof(*e), &size);
+ if (limbo->entry_to_confirm == NULL &&
+ txn_has_flag(txn, TXN_WAIT_ACK)) {
+ limbo->entry_to_confirm = e;
+ limbo->ack_count = 0;
+ }
if (e == NULL) {
diag_set(OutOfMemory, size, "region_alloc_object", "e");
return NULL;
}
e->txn = txn;
e->lsn = -1;
- e->ack_count = 0;
e->is_commit = false;
e->is_rollback = false;
e->insertion_time = fiber_clock();
txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
{
entry->is_rollback = true;
+ if (entry == limbo->entry_to_confirm)
+ limbo->entry_to_confirm = NULL;
/*
* The simple rule about rollback/commit order applies
* here as well: commit always in the order of WAL write,
entry->lsn = lsn;
}
+static void
+txn_limbo_confirm(struct txn_limbo *limbo);
+
void
txn_limbo_assign_local_lsn(struct txn_limbo *limbo,
struct txn_limbo_entry *entry, int64_t lsn)
assert(lsn > 0);
entry->lsn = lsn;
- /*
- * The entry just got its LSN after a WAL write. It could
- * happen that this LSN was already ACKed by some
- * replicas. Update the ACK counter to take them into
- * account.
- */
- struct vclock_iterator iter;
- vclock_iterator_init(&iter, &limbo->vclock);
- int ack_count = 0;
- vclock_foreach(&iter, vc)
- ack_count += vc.lsn >= lsn;
- assert(ack_count >= entry->ack_count);
- entry->ack_count = ack_count;
+ if (entry == limbo->entry_to_confirm)
+ limbo->ack_count = vclock_count_ge(&limbo->vclock, entry->lsn);
}
void
limbo->owner_id = replica_id;
limbo->confirmed_lsn = vclock_get(&limbo->confirmed_vclock,
replica_id);
+ limbo->entry_to_confirm = NULL;
box_update_ro_summary();
}
return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, lsn);
}
+/**
+ * Check that some synchronous transactions have gathered quorum and
+ * write a confirmation entry of the last confirmed transaction.
+ */
+static void
+txn_limbo_confirm(struct txn_limbo *limbo)
+{
+ assert(limbo->owner_id == instance_id);
+ if (limbo->is_in_rollback)
+ return;
+ if (limbo->entry_to_confirm == NULL ||
+ limbo->entry_to_confirm->lsn == -1)
+ return;
+ if (limbo->ack_count < replication_synchro_quorum)
+ return;
+ int32_t k = (int32_t)vclock_size(&limbo->vclock)
+ - replication_synchro_quorum;
+ /**
+ * limbo->ack_count >= replication_synchro_quorum =>
+ * vclock_size(&limbo->vclock) >= replication_synchro_quorum
+ */
+ assert(k >= 0);
+ int64_t confirm_lsn = vclock_nth_element(&limbo->vclock, k);
+ assert(confirm_lsn >= limbo->entry_to_confirm->lsn);
+ struct txn_limbo_entry *e = limbo->entry_to_confirm;
+ limbo->entry_to_confirm = NULL;
+ int64_t max_assigned_lsn = -1;
+ for (; !rlist_entry_is_head(e, &limbo->queue, in_queue);
+ e = rlist_next_entry(e, in_queue)) {
+ if (!txn_has_flag(e->txn, TXN_WAIT_ACK))
+ continue;
+ if (e->lsn == -1 || e->lsn > confirm_lsn) {
+ limbo->entry_to_confirm = e;
+ /**
+ * It may be that a quorum has been gathered, but
+ * ack_count = 0. It's ok. CONFIRM will be written as
+ * soon as the lsn is assigned to the transaction.
+ */
+ limbo->ack_count = (e->lsn == -1) ? 0 :
+ vclock_count_ge(&limbo->vclock, e->lsn);
+ break;
+ } else {
+ max_assigned_lsn = e->lsn;
+ }
+ }
+ assert(max_assigned_lsn != -1);
+ txn_limbo_write_confirm(limbo, max_assigned_lsn);
+ txn_limbo_read_confirm(limbo, max_assigned_lsn);
+}
+
void
txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
{
return;
assert(limbo->owner_id != REPLICA_ID_NIL);
int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
+ assert(lsn >= prev_lsn);
/*
* One of the reasons why can happen - the remote instance is not
- * read-only and wrote something under its own insance_id. For qsync
+ * read-only and wrote something under its own instance_id. For qsync
* that most likely means that the remote instance decided to take over
* the limbo ownership, and the current node is going to become a
* replica very soon.
if (lsn == prev_lsn)
return;
vclock_follow(&limbo->vclock, replica_id, lsn);
- struct txn_limbo_entry *e;
- int64_t confirm_lsn = -1;
- rlist_foreach_entry(e, &limbo->queue, in_queue) {
- assert(e->ack_count <= VCLOCK_MAX);
- if (e->lsn > lsn)
- break;
- /*
- * Sync transactions need to collect acks. Async
- * transactions are automatically committed right
- * after all the previous sync transactions are.
- */
- if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
- continue;
- } else if (e->lsn <= prev_lsn) {
- continue;
- } else if (++e->ack_count < replication_synchro_quorum) {
- continue;
- } else {
- confirm_lsn = e->lsn;
- }
+
+ if (limbo->entry_to_confirm != NULL &&
+ limbo->entry_to_confirm->lsn != -1) {
+ if (limbo->entry_to_confirm->lsn <= prev_lsn ||
+ lsn < limbo->entry_to_confirm->lsn)
+ return;
+ if (++limbo->ack_count >= replication_synchro_quorum)
+ txn_limbo_confirm(limbo);
}
- if (confirm_lsn == -1 || confirm_lsn <= limbo->confirmed_lsn)
- return;
- txn_limbo_write_confirm(limbo, confirm_lsn);
- txn_limbo_read_confirm(limbo, confirm_lsn);
}
/**
{
if (rlist_empty(&limbo->queue) || txn_limbo_is_frozen(limbo))
return;
- struct txn_limbo_entry *e;
- int64_t confirm_lsn = -1;
- rlist_foreach_entry(e, &limbo->queue, in_queue) {
- assert(e->ack_count <= VCLOCK_MAX);
- if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
- continue;
- } else if (e->ack_count < replication_synchro_quorum) {
- continue;
- } else {
- confirm_lsn = e->lsn;
- assert(confirm_lsn > 0);
- }
- }
- if (confirm_lsn > limbo->confirmed_lsn && !limbo->is_in_rollback) {
- txn_limbo_write_confirm(limbo, confirm_lsn);
- txn_limbo_read_confirm(limbo, confirm_lsn);
- }
+ /* The replication_synchro_quorum value may have changed. */
+ if (limbo->owner_id == instance_id)
+ txn_limbo_confirm(limbo);
/*
* Wakeup all the others - timed out will rollback. Also
* there can be non-transactional waiters, such as CONFIRM
blob - d2774d166dc009f91777f1c6b502fea631fa5080
blob + 6f434c71216b6c6bc0b1e920ab8fb3eaa72c5b5a
--- src/box/txn_limbo.h
+++ src/box/txn_limbo.h
*/
int64_t lsn;
/**
- * Number of ACKs. Or in other words - how many replicas
- * confirmed receipt of the transaction.
- */
- int ack_count;
- /**
* Result flags. Only one of them can be true. But both
* can be false if the transaction is still waiting for
* its resolution.
* illegal.
*/
int64_t confirmed_lsn;
+ /**
+ * The first unconfirmed synchronous transaction in the current term.
+ * Is NULL if there is no such transaction, or if the current instance
+ * does not own limbo.
+ */
+ struct txn_limbo_entry *entry_to_confirm;
+ /**
+ * Number of ACKs of the first unconfirmed synchronous transaction
+ * (entry_to_confirm->txn). Contains the actual value only for a
+ * non-NULL entry_to_confirm with a local lsn assigned. Otherwise
+ * it may contain any trash.
+ */
+ int ack_count;
/**
* Total number of performed rollbacks. It used as a guard
* to do some actions assuming all limbo transactions will