commit 23c7899e5c81d2080c8feb193823893de1381491 from: Nikita Zheleztsov via: Serge Petrenko <35663196+sergepetrenko@users.noreply.github.com> date: Fri Aug 16 10:51:40 2024 UTC engine: introduce stubs for checkpoint FETCH_SNAPSHOT This commit introduces engine stubs that enable a new method of fetching snapshots for anonymous replicas. Instead of using the traditional read-view join approach, this update allows file snapshot fetching. Note that file snapshot fetching is only available in Tarantool EE. Checkpoint fetching is done via IPROTO_IS_CHECKPOINT_JOIN, IPROTO_CHECKPOINT_VCLOCK and IPROTO_CHECKPOINT_LSN fields. If IPROTO_CHECKPOINT_JOIN is set to true, join will be done from files: .snap for memtx, .run for vinyl, if false - from read view. Checkpoint join allows to continue from the place, where client stopped in case of snapshot fetching error. This allows to avoid rebootstrap of an anonymous client. This can be done by specifying CHECKPOINT_VCLOCK, which says from which file server should continue join, client gets vclock at the beginning of the join. Specifying CHECKPOINT_LSN allows to continue from some position in checkpoint. Server sends all data >= CHECKPOINT_LSN. If CHECKPOINT_VCLOCK is not specified, fetching is done from the latest available checkpoint. If CHECKPOINT_LSN is not specified - start from the beginning of the snap. So, specifying only IS_CHECKPOINT_JOIN triggers fetching the latest checkpoint from files. Needed for tarantool/tarantool-ee#741 NO_DOC=ee NO_TEST=ee NO_CHANGELOG=ee (cherry picked from commit 2fca5c133b512ab40a5b42603fe27dd7e7db8bae) commit - 9434531b431839b099ee65997f8800126d9b458d commit + 23c7899e5c81d2080c8feb193823893de1381491 blob - cd8690c1e9f1f4d1ce4ff589f576ec9b1e4323e1 blob + ac94ee024a8eb7468d5755f05c268c0daab945fe --- src/box/applier.cc +++ src/box/applier.cc @@ -870,8 +870,14 @@ applier_fetch_snapshot(struct applier *applier) struct iostream *io = &applier->io; struct xrow_header row; + struct vclock vclock; + vclock_create(&vclock); struct fetch_snapshot_request req = { .version_id = tarantool_version_id(), + /* Applier doesn't support checkpoint join. */ + .is_checkpoint_join = false, + .checkpoint_vclock = vclock, + .checkpoint_lsn = 0, }; RegionGuard region_guard(&fiber()->gc); xrow_encode_fetch_snapshot(&row, &req); blob - 58c68b67a0b77e9d99f587d53609ee2ff7859d85 blob + 008a0a1131cab69eaf51488eaeba68b4b3fb96dc --- src/box/box.cc +++ src/box/box.cc @@ -3944,11 +3944,21 @@ box_process_fetch_snapshot(struct iostream *io, "wal_mode = 'none'"); } - say_info("sending current read-view to replica at %s", sio_socketname(io->fd)); + say_info("sending read-view to replica at %s", sio_socketname(io->fd)); + /* Used for checkpoint initial join. */ + struct checkpoint_cursor cursor; + struct checkpoint_cursor *cursor_ptr = NULL; + if (req.is_checkpoint_join) { + memset(&cursor, 0, sizeof(cursor)); + cursor.vclock = &req.checkpoint_vclock; + cursor.start_lsn = req.checkpoint_lsn; + cursor_ptr = &cursor; + } /* Send the snapshot data to the instance. */ struct vclock start_vclock; - relay_initial_join(io, header->sync, &start_vclock, req.version_id); + relay_initial_join(io, header->sync, &start_vclock, req.version_id, + cursor_ptr); say_info("read-view sent."); /* Remember master's vclock after the last request */ @@ -4182,7 +4192,8 @@ box_process_join(struct iostream *io, const struct xro * Initial stream: feed replica with dirty data from engines. */ struct vclock start_vclock; - relay_initial_join(io, header->sync, &start_vclock, req.version_id); + relay_initial_join(io, header->sync, &start_vclock, req.version_id, + NULL); say_info("initial data sent."); /** blob - 0e8a2ad7e4df64689dacef76b63c90c3113921e6 blob + f521c19c31508330d24cfcfbf215568feb08fe19 --- src/box/engine.h +++ src/box/engine.h @@ -304,6 +304,18 @@ struct engine_read_view { const struct engine_read_view_vtab *vtab; /** Link in read_view::engines. */ struct rlist link; +}; + +/** + * Cursor used during checkpoint initial join. Shared between engines. + */ +struct checkpoint_cursor { + /** Signature of the checkpoint to take data from. */ + struct vclock *vclock; + /** Checkpoint lsn to start from. */ + int64_t start_lsn; + /** Counter, shared between engines */ + int64_t lsn_counter; }; struct engine_join_ctx { @@ -311,6 +323,8 @@ struct engine_join_ctx { struct vclock *vclock; /** Whether sending JOIN_META stage is required. */ bool send_meta; + /** Checkpoint join cursor. */ + struct checkpoint_cursor *cursor; /** Array of engine join contexts, one per each engine. */ void **data; }; blob - 58c6516638a6eb7b05fc8ef24ee0f05bae1c6603 blob + 16edd280802e0af82efa420efea563277c6c8778 --- src/box/iproto_constants.h +++ src/box/iproto_constants.h @@ -194,6 +194,21 @@ extern const char *iproto_flag_bit_strs[]; * authentication method. */ \ _(AUTH_TYPE, 0x5b, MP_STR) \ + /** + * Flag indicating whether checkpoint join should be done. + */ \ + _(IS_CHECKPOINT_JOIN, 0x62, MP_BOOL) \ + /** + * Shows the signature of the checkpoint to read from. + * Requires CHECKPOINT_JOIN to be true. + */ \ + _(CHECKPOINT_VCLOCK, 0x63, MP_MAP) \ + /** + * Shows the lsn to start sending from. Server sends all rows + * >= IPROTO_CHECKPOINT_LSN. Requires CHECKPOINT_JOIN to be + * true and CHECKPOINT_VCLOCK to be set. + */ \ + _(CHECKPOINT_LSN, 0x64, MP_UINT) \ #define IPROTO_KEY_MEMBER(s, v, ...) IPROTO_ ## s = v, blob - 1522705a63c6a771b44673b7227e27fa75c8d032 blob + b38b19a0dbcb346350e8331273e3a562b249b0fd --- src/box/memtx_engine.cc +++ src/box/memtx_engine.cc @@ -1248,8 +1248,44 @@ send_join_meta(struct xstream *stream, const struct ra xrow_encode_synchro(&row, body, synchro_req); row.replica_id = synchro_req->replica_id; xstream_write(stream, &row); +} + +#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR) +#include "memtx_checkpoint_join.cc" +#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */ + +static int +memtx_engine_prepare_checkpoint_join(struct engine *engine, + struct engine_join_ctx *ctx) +{ + (void)engine; + (void)ctx; + diag_set(ClientError, ER_UNSUPPORTED, "Community edition", + "checkpoint join"); + return -1; } +static int +memtx_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx, + struct xstream *stream) +{ + (void)engine; + (void)ctx; + (void)stream; + unreachable(); + return -1; +} + +static void +memtx_engine_complete_checkpoint_join(struct engine *engine, + struct engine_join_ctx *ctx) +{ + (void)engine; + (void)ctx; +} + +#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */ + /** Space filter for replica join. */ static bool memtx_join_space_filter(struct space *space, void *arg) @@ -1263,6 +1299,9 @@ memtx_join_space_filter(struct space *space, void *arg static int memtx_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg) { + if (arg->cursor != NULL) + return memtx_engine_prepare_checkpoint_join(engine, arg); + struct memtx_join_ctx *ctx = (struct memtx_join_ctx *)malloc(sizeof(*ctx)); if (ctx == NULL) { @@ -1343,6 +1382,9 @@ static int memtx_engine_join(struct engine *engine, struct engine_join_ctx *arg, struct xstream *stream) { + if (arg->cursor != NULL) + return memtx_engine_checkpoint_join(engine, arg, stream); + struct memtx_join_ctx *ctx = (struct memtx_join_ctx *)arg->data[engine->id]; ctx->stream = stream; @@ -1401,6 +1443,9 @@ memtx_engine_join(struct engine *engine, struct engine static void memtx_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg) { + if (arg->cursor != NULL) + return memtx_engine_complete_checkpoint_join(engine, arg); + struct memtx_join_ctx *ctx = (struct memtx_join_ctx *)arg->data[engine->id]; read_view_close(&ctx->rv); blob - 43e9000aec58d921dfc12038908594f83dfaaf8b blob + a9943711e566b8d975d6778a9512d1593aca8494 --- src/box/relay.cc +++ src/box/relay.cc @@ -458,7 +458,8 @@ relay_cord_init(struct relay *relay) void relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock, - uint32_t replica_version_id) + uint32_t replica_version_id, + struct checkpoint_cursor *cursor) { struct relay *relay = relay_new(NULL); if (relay == NULL) @@ -480,6 +481,7 @@ relay_initial_join(struct iostream *io, uint64_t sync, */ ctx.send_meta = replica_version_id > 0; ctx.vclock = vclock; + ctx.cursor = cursor; engine_prepare_join_xc(&ctx); auto join_guard = make_scoped_guard([&] { engine_complete_join(&ctx); blob - ead0d8062358325f37e0a8d24b6d1ac42f8a9d6b blob + af100c9119bb445aa400229660b7b4725be5c97c --- src/box/relay.h +++ src/box/relay.h @@ -42,6 +42,7 @@ struct relay; struct replica; struct tt_uuid; struct vclock; +struct checkpoint_cursor; enum relay_state { /** @@ -126,10 +127,12 @@ relay_push_raft(struct relay *relay, const struct raft * @param sync sync from incoming JOIN request * @param vclock[out] vclock of the read view sent to the replica * @param replica_version_id peer's version + * @param cursor cursor for checkpoint join, if NULL - read-view join. */ void relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock, - uint32_t replica_version_id); + uint32_t replica_version_id, + struct checkpoint_cursor *cursor); /** * Send final JOIN rows to the replica. blob - 64f619b1daa31201b8841e7983291bf829a4022a blob + 2c563ce481c39de39cb6f5e4e4707db51825d162 --- src/box/vinyl.c +++ src/box/vinyl.c @@ -3030,7 +3030,42 @@ struct vy_join_ctx { /** Read view at the time when the initial join started. */ struct vy_read_view *rv; }; + +#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR) +#include "vinyl_checkpoint_join.c" +#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */ + +static int +vinyl_engine_prepare_checkpoint_join(struct engine *engine, + struct engine_join_ctx *ctx) +{ + (void)engine; + (void)ctx; + unreachable(); + return -1; +} + +static int +vinyl_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx, + struct xstream *stream) +{ + (void)engine; + (void)ctx; + (void)stream; + unreachable(); + return -1; +} +static void +vinyl_engine_complete_checkpoint_join(struct engine *engine, + struct engine_join_ctx *ctx) +{ + (void)engine; + (void)ctx; +} + +#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */ + static int vy_join_add_space(struct space *space, void *arg) { @@ -3065,6 +3100,9 @@ vy_join_add_space(struct space *space, void *arg) static int vinyl_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg) { + if (arg->cursor != NULL) + return vinyl_engine_prepare_checkpoint_join(engine, arg); + struct vy_env *env = vy_env(engine); struct vy_join_ctx *ctx = malloc(sizeof(*ctx)); if (ctx == NULL) { @@ -3106,6 +3144,9 @@ static int vinyl_engine_join(struct engine *engine, struct engine_join_ctx *arg, struct xstream *stream) { + if (arg->cursor != NULL) + return vinyl_engine_checkpoint_join(engine, arg, stream); + int loops = 0; struct vy_join_ctx *ctx = arg->data[engine->id]; struct vy_join_entry *join_entry; @@ -3131,6 +3172,9 @@ vinyl_engine_join(struct engine *engine, struct engine static void vinyl_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg) { + if (arg->cursor != NULL) + return vinyl_engine_complete_checkpoint_join(engine, arg); + struct vy_env *env = vy_env(engine); struct vy_join_ctx *ctx = arg->data[engine->id]; struct vy_join_entry *entry, *next; blob - bfa5c4bbb692155fc3219198974a60005bc7d815 blob + eddf80c9657b1541129740c388456f95e8f441d2 --- src/box/xrow.c +++ src/box/xrow.c @@ -2025,6 +2025,12 @@ struct replication_request { uint32_t *version_id; /** IPROTO_REPLICA_ANON. */ bool *is_anon; + /** IPROTO_IS_CHECKPOINT_JOIN. */ + bool *is_checkpoint_join; + /** IPROTO_CHECKPOINT_VCLOCK. */ + struct vclock *checkpoint_vclock; + /** IPROTO_CHECKPOINT_LSN. */ + uint64_t *checkpoint_lsn; }; /** Encode a replication request template. */ @@ -2188,8 +2194,38 @@ id_filter_decode_err: xrow_on_decode_err(row, ER_INVA if (val >= VCLOCK_MAX) goto id_filter_decode_err; *req->id_filter |= 1 << val; + } + break; + case IPROTO_IS_CHECKPOINT_JOIN: + if (req->is_checkpoint_join == NULL) + goto skip; + if (mp_typeof(*d) != MP_BOOL) { + xrow_on_decode_err( + row, ER_INVALID_MSGPACK, + "invalid IS_CHECKPOINT_JOIN"); + return -1; } + *req->is_checkpoint_join = mp_decode_bool(&d); break; + case IPROTO_CHECKPOINT_VCLOCK: + if (req->checkpoint_vclock == NULL) + goto skip; + if (mp_decode_vclock(&d, req->checkpoint_vclock) != 0) { + xrow_on_decode_err(row, ER_INVALID_MSGPACK, + "invalid CHECKPOINT_VCLOCK"); + return -1; + } + break; + case IPROTO_CHECKPOINT_LSN: + if (req->checkpoint_lsn == NULL) + goto skip; + if (mp_typeof(*d) != MP_UINT) { + xrow_on_decode_err(row, ER_INVALID_MSGPACK, + "invalid CHECKPOINT_LSN"); + return -1; + } + *req->checkpoint_lsn = mp_decode_uint(&d); + break; default: skip: mp_next(&d); /* value */ } @@ -2294,7 +2330,16 @@ xrow_decode_fetch_snapshot(const struct xrow_header *r memset(req, 0, sizeof(*req)); struct replication_request base_req = { .version_id = &req->version_id, + .is_checkpoint_join = &req->is_checkpoint_join, + .checkpoint_vclock = &req->checkpoint_vclock, + .checkpoint_lsn = &req->checkpoint_lsn, }; + /* + * Vclock must be cleared, as it sets -1 signature, which cannot be + * done by memset above. This is done in order to distinguish not + * initialized vclock from the zero one. + */ + vclock_clear(&req->checkpoint_vclock); return xrow_decode_replication_request(row, &base_req); } blob - 19c86b9a1e148e44c82e73a074b3075f739e2578 blob + cf20865faa5b5c344da19dca99e6f3d27e731cb7 --- src/box/xrow.h +++ src/box/xrow.h @@ -614,6 +614,12 @@ xrow_decode_join(const struct xrow_header *row, struct struct fetch_snapshot_request { /** Replica's version. */ uint32_t version_id; + /** Flag indicating whether checkpoint join should be done. */ + bool is_checkpoint_join; + /** Checkpoint's vclock, signature of the snapshot. */ + struct vclock checkpoint_vclock; + /** Checkpoint's lsn, the row number to start from. */ + uint64_t checkpoint_lsn; }; /** Encode FETCH_SNAPSHOT request. */ blob - 5d4cde0ab0bad91ee10e1c54173ee9fbdaa7ba3a blob + dfdb9129d6ec47244ae4ed0baf417f4864758339 --- src/trivia/config.h.cmake +++ src/trivia/config.h.cmake @@ -288,6 +288,7 @@ #cmakedefine ENABLE_READ_VIEW 1 #cmakedefine ENABLE_SECURITY 1 #cmakedefine ENABLE_COMPRESS_MODULE 1 +#cmakedefine ENABLE_FETCH_SNAPSHOT_CURSOR 1 #cmakedefine EXPORT_LIBCURL_SYMBOLS 1 blob - a300fc05e5b0cd02d05d101d473d6b7d5ec777d1 blob + 1c94a177118fb3e08e192fdeaf0970f22ebda9db --- test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua +++ test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua @@ -80,6 +80,9 @@ local reference_table = { TXN_ISOLATION = 0x59, VCLOCK_SYNC = 0x5a, AUTH_TYPE = 0x5b, + IS_CHECKPOINT_JOIN = 0x62, + CHECKPOINT_VCLOCK = 0x63, + CHECKPOINT_LSN = 0x64, }, -- `iproto_metadata_key` enumeration.