commit e319c21ca3520de83de9faec1b722dc5da5d776f from: Astronomax via: Serge Petrenko <35663196+sergepetrenko@users.noreply.github.com> date: Fri Oct 04 15:41:15 2024 UTC limbo: introduce limits on synchro queue Two new fields added to the structure: the `size` counter and the `max_size` limit (both in bytes). And also added the corresponding configuration parameter: `replication.synchro_queue_max_size`. The counter is increased on every enqueued `txn_limbo_entry`, and decreased once an entry leaves the `txn_limbo.queue`. Also, the `approx_len` field has been added to the `txn_limbo_entry` structure, so that at the time of adding/deleting an entry to the queue, we have access to the size of the corresponding entry in the journal. This limitation only applies to the master queue. Once the size of master queue reaches the maximum value, txn_limbo blocks incoming requests until some of the transactions in the queue have a quorum of confirmations and there is free space. This limitation does not apply during the recovery process, because otherwise tarantool may fail during the process of the xlog files, if limbo queue size exceeds `replication.synchro_queue_max_size` and user will have to pick up the correct value of the `replication.synchro_queue_max_size` option in order to recover from his xlogs. The size limit isn't strict, i.e. if there's at least one free byte, the whole entry fits and no blocking is involved. Part of #7486 NO_CHANGELOG=Will be added in another commit @TarantoolBot document Title: new configuration option: 'replication.synchro_queue_max_size' Product: Tarantool Since: 3.3 Root document: https://www.tarantool.io/en/doc/latest/reference/configuration/configuration_reference/ `replication.synchro_queue_max_size` puts a limit on the number of transactions in the master synchronous queue. `replication.synchro_queue_max_size` is measured in number of bytes to be written (0 means unlimited, which was the default behaviour before). This option affects only the behavior of the master, and defaults to 16 megabytes. Now that `replication.synchro_queue_max_size` is set on the master node, tarantool will discard new transactions that try to queue after the limit is reached. If a transaction had to be discarded, user will get an error message "The synchronous transaction queue is full". This limitation does not apply during the recovery process. The current synchro queue size can be known using `box.info.synchro.queue.size`: ```lua tarantool> box.info.synchro --- - queue: owner: 1 size: 60 busy: false len: 1 term: 2 quorum: 2 ... ``` [box-info-synchro] https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_info/synchro/ commit - 78cfc5ef4cf43bf9d53dc51d0f843bee350609fd commit + e319c21ca3520de83de9faec1b722dc5da5d776f blob - 0af296552e68405251b54df889d74cba6cc80b51 blob + 9cc3f22eef7989f845abf381268a693babadda1f --- src/box/box.cc +++ src/box/box.cc @@ -1643,6 +1643,22 @@ box_check_wal_queue_max_size(void) if (size < 0) { diag_set(ClientError, ER_CFG, "wal_queue_max_size", "wal_queue_max_size must be >= 0"); + } + /* Unlimited. */ + if (size == 0) + size = INT64_MAX; + return size; +} + +/** Check replication_synchro_queue_max_size option validity. */ +static int64_t +box_check_replication_synchro_queue_max_size(void) +{ + int64_t size = cfg_geti64("replication_synchro_queue_max_size"); + if (size < 0) { + diag_set(ClientError, ER_CFG, + "replication_synchro_queue_max_size", + "replication_synchro_queue_max_size must be >= 0"); } /* Unlimited. */ if (size == 0) @@ -1974,6 +1990,8 @@ box_check_config(void) box_check_wal_mode(cfg_gets("wal_mode")); if (box_check_wal_queue_max_size() < 0) diag_raise(); + if (box_check_replication_synchro_queue_max_size() < 0) + diag_raise(); if (box_check_wal_cleanup_delay() < 0) diag_raise(); if (box_check_wal_retention_period() < 0) @@ -3240,6 +3258,22 @@ box_set_wal_queue_max_size(void) if (size < 0) return -1; wal_set_queue_max_size(size); + return 0; +} + +int +box_set_replication_synchro_queue_max_size(void) +{ + int64_t size = box_check_replication_synchro_queue_max_size(); + if (size < 0) + return -1; + if (size != INT64_MAX && recovery_state != FINISHED_RECOVERY) { + say_info("The option replication_synchro_queue_max_size will " + "actually take effect after the recovery is finished"); + txn_limbo_set_max_size(&txn_limbo, INT64_MAX); + return 0; + } + txn_limbo_set_max_size(&txn_limbo, size); return 0; } @@ -5161,6 +5195,8 @@ bootstrap_master(void) if (bootstrap_strategy == BOOTSTRAP_STRATEGY_AUTO) check_bootstrap_unanimity(); engine_bootstrap_xc(); + if (box_set_replication_synchro_queue_max_size() != 0) + diag_raise(); uint32_t replica_id = 1; box_insert_replica_record(replica_id, &INSTANCE_UUID, @@ -5254,6 +5290,8 @@ bootstrap_from_master(struct replica *master) } /* Finalize the new replica */ engine_end_recovery_xc(); + if (box_set_replication_synchro_queue_max_size() != 0) + diag_raise(); /* Switch applier to initial state */ applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY); @@ -5657,6 +5695,8 @@ box_cfg_xc(void) if (box_set_replication_synchro_quorum() != 0) diag_raise(); if (box_set_replication_synchro_timeout() != 0) + diag_raise(); + if (box_set_replication_synchro_queue_max_size() != 0) diag_raise(); box_set_replication_sync_timeout(); if (box_check_instance_name(cfg_instance_name) != 0) blob - fff2494a5146d11e0fc542ae7e31cf3d47d079d4 blob + d24f051e2452345b107b0833d850f40a6205f701 --- src/box/box.h +++ src/box/box.h @@ -357,6 +357,7 @@ void box_set_checkpoint_count(void); void box_set_checkpoint_interval(void); void box_set_checkpoint_wal_threshold(void); int box_set_wal_queue_max_size(void); +int box_set_replication_synchro_queue_max_size(void); int box_set_wal_cleanup_delay(void); void box_set_memtx_memory(void); void box_set_memtx_max_tuple_size(void); blob - 1b9c50c406f4f0fd23df6d128086875ea6e56e40 blob + a78e575f671bd5ee8ff960d32fa03c717bd40877 --- src/box/errcode.h +++ src/box/errcode.h @@ -435,6 +435,7 @@ struct errcode_record { _(ER_INVALID_VCLOCK, 288, "Invalid vclock", "value", STRING) \ _(ER_ARROW_IPC_ENCODE, 289, "Failed to encode Arrow IPC data", "method", STRING, "details", STRING) \ _(ER_ARROW_IPC_DECODE, 290, "Failed to decode Arrow IPC data", "method", STRING, "details", STRING) \ + _(ER_SYNC_QUEUE_FULL, 291, "The synchronous transaction queue is full") \ TEST_ERROR_CODES(_) /** This one should be last. */ /* blob - 80410de331d12f190282b369508b7eb478483a57 blob + 8a3f0ca6e04a228a513349fb7e4c435f6e60d7d2 --- src/box/lua/cfg.cc +++ src/box/lua/cfg.cc @@ -178,6 +178,14 @@ lbox_cfg_set_wal_queue_max_size(struct lua_State *L) } static int +lbox_cfg_set_replication_synchro_queue_max_size(struct lua_State *L) +{ + if (box_set_replication_synchro_queue_max_size() != 0) + luaT_error(L); + return 0; +} + +static int lbox_cfg_set_wal_cleanup_delay(struct lua_State *L) { if (box_set_wal_cleanup_delay() < 0) @@ -505,6 +513,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, {"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold}, {"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size}, + {"cfg_set_replication_synchro_queue_max_size", lbox_cfg_set_replication_synchro_queue_max_size}, {"cfg_set_wal_cleanup_delay", lbox_cfg_set_wal_cleanup_delay}, {"cfg_set_read_only", lbox_cfg_set_read_only}, {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, blob - 424164ad1262b6f139b7b3ccec6ee8472add975c blob + 24ce034371205db858d92df3d7b6705086c17a46 --- src/box/lua/config/instance_config.lua +++ src/box/lua/config/instance_config.lua @@ -1465,6 +1465,11 @@ return schema.new('instance_config', schema.record({ box_cfg = 'replication_synchro_timeout', default = 5, }), + synchro_queue_max_size = schema.scalar({ + type = 'integer', + box_cfg = 'replication_synchro_queue_max_size', + default = 16 * 1024 * 1024, + }), connect_timeout = schema.scalar({ type = 'number', box_cfg = 'replication_connect_timeout', blob - 583fa5d6b848191e40ef40368b4c86be9e4b7ab0 blob + 289c86134439093acf37a97534a77a0efe839822 --- src/box/lua/info.c +++ src/box/lua/info.c @@ -715,9 +715,11 @@ lbox_info_synchro(struct lua_State *L) /* Queue information. */ struct txn_limbo *queue = &txn_limbo; - lua_createtable(L, 0, 3); + lua_createtable(L, 0, 7); lua_pushnumber(L, queue->len); lua_setfield(L, -2, "len"); + lua_pushnumber(L, queue->size); + lua_setfield(L, -2, "size"); lua_pushnumber(L, queue->owner_id); lua_setfield(L, -2, "owner"); lua_pushboolean(L, latch_is_locked(&queue->promote_latch)); blob - b5f6c0c658c75b61ad5ba1f492b8bfb9fd92056b blob + 69f5d271b7caf6b3960502c6dd9bc4bd716fcdad --- src/box/lua/load_cfg.lua +++ src/box/lua/load_cfg.lua @@ -186,6 +186,7 @@ local default_cfg = { replication_sync_timeout = 0, replication_synchro_quorum = "N / 2 + 1", replication_synchro_timeout = 5, + replication_synchro_queue_max_size = 16 * 1024 * 1024, replication_connect_timeout = 30, replication_connect_quorum = nil, -- connect all replication_skip_conflict = false, @@ -387,6 +388,7 @@ local template_cfg = { replication_sync_timeout = 'number', replication_synchro_quorum = 'string, number', replication_synchro_timeout = 'number', + replication_synchro_queue_max_size = 'number', replication_connect_timeout = 'number', replication_connect_quorum = 'number', replication_skip_conflict = 'boolean', @@ -527,6 +529,8 @@ local dynamic_cfg = { replication_sync_timeout = private.cfg_set_replication_sync_timeout, replication_synchro_quorum = private.cfg_set_replication_synchro_quorum, replication_synchro_timeout = private.cfg_set_replication_synchro_timeout, + replication_synchro_queue_max_size = + private.cfg_set_replication_synchro_queue_max_size, replication_skip_conflict = private.cfg_set_replication_skip_conflict, replication_anon = private.cfg_set_replication_anon, bootstrap_strategy = private.cfg_set_bootstrap_strategy, @@ -643,6 +647,7 @@ local dynamic_cfg_order = { replication_sync_timeout = 150, replication_synchro_quorum = 150, replication_synchro_timeout = 150, + replication_synchro_queue_max_size = 150, replication_connect_timeout = 150, replication_connect_quorum = 150, -- Apply bootstrap_strategy before replication, but after @@ -690,6 +695,7 @@ local dynamic_cfg_skip_at_load = { replication_sync_timeout = true, replication_synchro_quorum = true, replication_synchro_timeout = true, + replication_synchro_queue_max_size = true, replication_skip_conflict = true, replication_anon = true, bootstrap_strategy = true, blob - 63fab94049c4fa866e62b48f1c5d77808f782c2b blob + 760fd47270cb7ce504b0a0b4eaeb9651963e2713 --- src/box/txn.c +++ src/box/txn.c @@ -1148,12 +1148,18 @@ txn_commit_nop(struct txn *txn) static int txn_add_limbo_entry(struct txn *txn, const struct journal_entry *req) { + uint32_t origin_id = req->rows[0]->replica_id; + if (origin_id == 0 && txn_limbo.size >= txn_limbo.max_size) { + diag_set(ClientError, ER_SYNC_QUEUE_FULL); + return -1; + } + /* * Remote rows, if any, come before local rows, so check for originating * instance id in the first row. */ - uint32_t origin_id = req->rows[0]->replica_id; - txn->limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); + txn->limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn, + req->approx_len); if (txn->limbo_entry == NULL) return -1; return 0; blob - 2f3d9ba1243411db071504c4aef7810cdfe01855 blob + a86da8e94437c76d53b1df41b81db24192a4b4a9 --- src/box/txn_limbo.c +++ src/box/txn_limbo.c @@ -60,8 +60,16 @@ txn_limbo_create(struct txn_limbo *limbo) limbo->is_frozen_until_promotion = true; limbo->do_validate = false; limbo->confirm_lag = 0; + limbo->max_size = 0; + limbo->size = 0; } +void +txn_limbo_set_max_size(struct txn_limbo *limbo, int64_t size) +{ + limbo->max_size = size; +} + static inline void txn_limbo_destroy(struct txn_limbo *limbo) { @@ -132,10 +140,31 @@ txn_limbo_last_synchro_entry(struct txn_limbo *limbo) return entry; } return NULL; +} + +/** Increase queue size on a new write request. */ +static inline void +txn_limbo_on_append(struct txn_limbo *limbo, + const struct txn_limbo_entry *entry) +{ + limbo->size += entry->approx_len; + limbo->len++; +} + +/** Decrease queue size once write request is complete. */ +static inline void +txn_limbo_on_remove(struct txn_limbo *limbo, + const struct txn_limbo_entry *entry) +{ + limbo->size -= entry->approx_len; + assert(limbo->size >= 0); + limbo->len--; + assert(limbo->len >= 0); } struct txn_limbo_entry * -txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn, + size_t approx_len) { assert(txn_has_flag(txn, TXN_WAIT_SYNC)); assert(limbo == &txn_limbo); @@ -181,13 +210,14 @@ txn_limbo_append(struct txn_limbo *limbo, uint32_t id, return NULL; } e->txn = txn; + e->approx_len = approx_len; e->lsn = -1; e->ack_count = 0; e->is_commit = false; e->is_rollback = false; e->insertion_time = fiber_clock(); rlist_add_tail_entry(&limbo->queue, e, in_queue); - limbo->len++; + txn_limbo_on_append(limbo, e); return e; } @@ -197,7 +227,7 @@ txn_limbo_remove(struct txn_limbo *limbo, struct txn_l assert(!rlist_empty(&entry->in_queue)); assert(txn_limbo_first_entry(limbo) == entry); rlist_del_entry(entry, in_queue); - limbo->len--; + txn_limbo_on_remove(limbo, entry); } static inline void @@ -208,7 +238,7 @@ txn_limbo_pop(struct txn_limbo *limbo, struct txn_limb assert(entry->is_rollback); rlist_del_entry(entry, in_queue); - limbo->len--; + txn_limbo_on_remove(limbo, entry); ++limbo->rollback_count; } blob - bc879af4ba6bd1e096e1b523141a5e195ae3b98c blob + a4f9d994ed64cf4294a5d217947b2802bdde4c82 --- src/box/txn_limbo.h +++ src/box/txn_limbo.h @@ -53,6 +53,10 @@ struct txn_limbo_entry { /** Transaction, waiting for a quorum. */ struct txn *txn; /** + * Approximate size of this request when encoded. + */ + size_t approx_len; + /** * LSN of the transaction by the originator's vclock * component. May be -1 in case the transaction is not * written to WAL yet. @@ -238,6 +242,10 @@ struct txn_limbo { * quorum. */ double confirm_lag; + /** Maximal size of entries enqueued in txn_limbo.queue (in bytes). */ + int64_t max_size; + /** Current approximate size of txn_limbo.queue. */ + int64_t size; }; /** @@ -302,7 +310,8 @@ txn_limbo_last_synchro_entry(struct txn_limbo *limbo); * The limbo entry is allocated on the transaction's region. */ struct txn_limbo_entry * -txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn); +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn, + size_t approx_len); /** Remove the entry from the limbo, mark as rolled back. */ void @@ -494,6 +503,10 @@ txn_limbo_init(); void txn_limbo_free(); +/** Set maximal limbo size in bytes. */ +void +txn_limbo_set_max_size(struct txn_limbo *limbo, int64_t size); + #if defined(__cplusplus) } #endif /* defined(__cplusplus) */ blob - cd312a9f421ada73410a8be1e1c4ffaeb922d4d4 blob + 1edcec158e1def5dbc535ebd86fbc21a20bb88ba --- test/box/admin.result +++ test/box/admin.result @@ -112,6 +112,8 @@ cfg_filter(box.cfg) - 10 - - replication_sync_timeout - + - - replication_synchro_queue_max_size + - 16777216 - - replication_synchro_quorum - N / 2 + 1 - - replication_synchro_timeout blob - 32889a017b5bc9ed9c21b4c8b1fd87fea12ec601 blob + 99ef35df2e72ada337782b37a08799d530961e2d --- test/box/cfg.result +++ test/box/cfg.result @@ -100,6 +100,8 @@ cfg_filter(box.cfg) | - 10 | - - replication_sync_timeout | - + | - - replication_synchro_queue_max_size + | - 16777216 | - - replication_synchro_quorum | - N / 2 + 1 | - - replication_synchro_timeout @@ -250,6 +252,8 @@ cfg_filter(box.cfg) | - 10 | - - replication_sync_timeout | - + | - - replication_synchro_queue_max_size + | - 16777216 | - - replication_synchro_quorum | - N / 2 + 1 | - - replication_synchro_timeout blob - 231aef1e721f0e4dd68fcb50b7e8714663f78d84 blob + ef1a3196bd3fc3d3ce2747f2852ee83253a6cb23 --- test/box/error.result +++ test/box/error.result @@ -501,6 +501,7 @@ t; | 288: box.error.INVALID_VCLOCK | 289: box.error.ARROW_IPC_ENCODE | 290: box.error.ARROW_IPC_DECODE + | 291: box.error.SYNC_QUEUE_FULL | ... test_run:cmd("setopt delimiter ''"); blob - 231dd7f58b3424d24dee001453537b5600e6a18c blob + 9b67c1bf24f10734e8a4add03223a064661b98dc --- test/box-tap/cfg.test.lua +++ test/box-tap/cfg.test.lua @@ -6,7 +6,7 @@ local socket = require('socket') local fio = require('fio') local uuid = require('uuid') local msgpack = require('msgpack') -test:plan(111) +test:plan(112) -------------------------------------------------------------------------------- -- Invalid values @@ -53,6 +53,7 @@ invalid('memtx_sort_threads', 'all') invalid('memtx_sort_threads', -1) invalid('memtx_sort_threads', 0) invalid('memtx_sort_threads', 257) +invalid('replication_synchro_queue_max_size', -1) local function invalid_combinations(name, val) local status, result = pcall(box.cfg, val) blob - 2647e25da603d5a8c88f4ec8df57ddd5097885ab blob + ef5f7af6698172578d61a6416bfae3014f3ae7f5 --- test/config-luatest/cluster_config_schema_test.lua +++ test/config-luatest/cluster_config_schema_test.lua @@ -228,6 +228,7 @@ g.test_defaults = function() threads = 1, timeout = 1, synchro_timeout = 5, + synchro_queue_max_size = 16777216, connect_timeout = 30, sync_timeout = box.NULL, sync_lag = 10, blob - a38acf720257d7f5caeb1c4bba261f9ccfdc22d5 blob + 16335e849cc806ac3e656e3451d0caf190a96910 --- test/config-luatest/instance_config_schema_test.lua +++ test/config-luatest/instance_config_schema_test.lua @@ -1122,6 +1122,7 @@ g.test_replication = function() threads = 1, timeout = 1, synchro_timeout = 1, + synchro_queue_max_size = 1, connect_timeout = 1, sync_timeout = 1, sync_lag = 1, @@ -1143,6 +1144,7 @@ g.test_replication = function() threads = 1, timeout = 1, synchro_timeout = 5, + synchro_queue_max_size = 16777216, connect_timeout = 30, sync_timeout = box.NULL, sync_lag = 10, blob - /dev/null blob + 96b4d699c2e6afdb7d47cf6bd81ad3357c940914 (mode 644) --- /dev/null +++ test/replication-luatest/gh_7486_replication_synchro_queue_max_size_test.lua @@ -0,0 +1,156 @@ +local t = require('luatest') +local server = require('luatest.server') + +local g = t.group('replication_synchro_queue_max_size') +-- +-- gh-7486: introduce `replication_synchro_queue_max_size`. +-- +local wait_timeout = 10 + +local function server_wait_synchro_queue_len_is_equal(server, expected) + server:exec(function(expected, wait_timeout) + t.helpers.retrying({timeout = wait_timeout}, function(expected) + t.assert_equals(box.info.synchro.queue.len, expected) + end, expected) + end, {expected, wait_timeout}) +end + +g.before_all(function(cg) + cg.box_cfg = { + replication_synchro_queue_max_size = 1, + replication_synchro_quorum = 1, + replication_synchro_timeout = 100000, + replication_timeout = 0.1, + election_fencing_mode = 'off', + } + cg.master = server:new({ + alias = 'master', + box_cfg = cg.box_cfg, + }) + cg.master:start() + cg.master:exec(function() + box.ctl.promote() + box.schema.space.create('test', {is_sync = true}) + box.space.test:create_index('pk') + end) +end) + +g.after_all(function(cg) + cg.master:drop() +end) + +g.after_each(function(cg) + cg.master:exec(function() + box.space.test:truncate() + end) +end) + +g.test_master_synchro_queue_limited = function(cg) + cg.master:exec(function() + box.cfg{ replication_synchro_quorum = 2, } + rawset(_G, "f", require('fiber') + .new(box.space.test.insert, box.space.test, {1})) + _G.f:set_joinable(true) + end) + server_wait_synchro_queue_len_is_equal(cg.master, 1) + cg.master:exec(function() + t.assert_error_msg_content_equals( + 'The synchronous transaction queue is full', + box.space.test.insert, box.space.test, {2}) + box.cfg{ replication_synchro_quorum = 1, } + local ok, _ = _G.f:join() + t.assert(ok) + end, {wait_timeout}) +end + +g.test_max_size_update_dynamically = function(cg) + cg.master:exec(function() + box.cfg{ + replication_synchro_queue_max_size = 0, + replication_synchro_quorum = 2, + } + rawset(_G, 'fibers_storage', {}) + rawset(_G, 'num_fibers', 3) + for i = 1, _G.num_fibers do + _G.fibers_storage[i] = require('fiber') + .new(box.space.test.insert, box.space.test, {i}) + _G.fibers_storage[i]:set_joinable(true) + end + end) + server_wait_synchro_queue_len_is_equal(cg.master, 3) + cg.master:exec(function() + box.cfg{ replication_synchro_queue_max_size = 1, } + t.assert_equals(box.info.synchro.queue.len, 3) + t.assert_error_msg_content_equals( + 'The synchronous transaction queue is full', + box.space.test.insert, box.space.test, {0}) + box.cfg{ replication_synchro_quorum = 1, } + for i = 1, _G.num_fibers do + local ok, _ = _G.fibers_storage[i]:join() + t.assert(ok) + end + t.assert_equals(box.info.synchro.queue.len, 0) + box.space.test:insert{0} + end) +end + +g.test_recovery_with_small_max_size = function(cg) + t.tarantool.skip_if_not_debug() + + cg.master:exec(function(wait_timeout) + box.cfg{ + replication_synchro_queue_max_size = 0, + replication_synchro_quorum = 2, + } + local lsn = box.info.lsn + 1000 + for key = 1, 1000 do + require('fiber').create(function() + box.space.test:insert({key}) + end) + end + t.helpers.retrying({timeout = wait_timeout}, + function() t.assert(box.info.lsn >= lsn) end) + end, {wait_timeout}) + server_wait_synchro_queue_len_is_equal(cg.master, 1000) + local box_cfg = table.copy(cg.box_cfg) + box_cfg.replication_synchro_queue_max_size = 1 + box_cfg.replication_synchro_timeout = 0.001 + cg.master:restart({ + box_cfg = box_cfg, + env = { + TARANTOOL_RUN_BEFORE_BOX_CFG = + "box.error.injection.set('ERRINJ_WAL_DELAY', true)" + } + }) + server_wait_synchro_queue_len_is_equal(cg.master, 1000) + cg.master:exec(function() + box.error.injection.set('ERRINJ_WAL_DELAY', false) + box.ctl.promote() + t.assert_equals(box.space.test:len(), 1000) + box.cfg{replication_synchro_timeout = 100000} + end) +end + +g.test_size_is_updated_correctly_after_commit = function(cg) + cg.master:exec(function() + box.space.test:insert{1} + box.space.test:insert{2} + end) +end + +g.test_size_is_updated_correctly_after_rollback = function(cg) + cg.master:exec(function() + box.cfg{ + replication_synchro_quorum = 2, + replication_synchro_timeout = 0.001, + } + t.assert_error_msg_content_equals( + 'Quorum collection for a synchronous transaction is timed out', + box.space.test.insert, box.space.test, {1}) + box.cfg{ + replication_synchro_quorum = 1, + replication_synchro_timeout = 100000, + } + box.space.test:insert{2} + end) +end blob - 5ba569459b7d6027b75980d60ef145dd497acc92 blob + 384329b1549c47536e8ea7192a8a3d3fda3711d7 --- test/unit/snap_quorum_delay.cc +++ test/unit/snap_quorum_delay.cc @@ -106,8 +106,8 @@ txn_process_func(va_list ap) * Instead, we push the transaction to the limbo manually * and call txn_commit (or another) later. */ - struct txn_limbo_entry *entry = txn_limbo_append(&txn_limbo, - instance_id, txn); + struct txn_limbo_entry *entry = txn_limbo_append( + &txn_limbo, instance_id, txn, 0); /* * The trigger is used to verify that the transaction has been * completed.