commit 9434531b431839b099ee65997f8800126d9b458d from: Nikita Zheleztsov via: Serge Petrenko <35663196+sergepetrenko@users.noreply.github.com> date: Fri Aug 16 10:51:40 2024 UTC engine: send vclock with 0th component during join This commit makes engine to send vclock without ignoring 0th component during join, which is needed for checkpoint FETCH SNAPSHOT. Currently engine join functions are invoked only from relay_initial_join, which is done during JOIN or FETCH SNAPSHOT. They respond with vclock of the read view we're going to send. In the following commit checkpoint FETCH SNAPSHOT will be introduced, which responds with vclock of the checkpoint, we're going to send. Such vclock may include 0th component and it's crucial to send it to a client, as in case of connection failure, client will send us the same vclock and we'll have to use its signature to figure out, which checkpoint client wants. So, we have to send and receive 0th component of the vclock during FETCH_SNAPSHOT. This commit also introduces decoding vclocks without ignoring 0th component, as they'll be used in the following commit too. Needed for tarantool/tarantool-ee#741 NO_DOC=internal NO_TEST=ee NO_CHANGELOG=internal (cherry picked from commit 56058393cab1fafb3bee32e8275dd1964a1a35d7) commit - 4de3d0d620adcb566119f55bc6722eef624310f1 commit + 9434531b431839b099ee65997f8800126d9b458d blob - 75a6f975a9e757fc0cf61dc7375dae2faa01e078 blob + 1522705a63c6a771b44673b7227e27fa75c8d032 --- src/box/memtx_engine.cc +++ src/box/memtx_engine.cc @@ -1218,7 +1218,14 @@ send_join_header(struct xstream *stream, const struct struct xrow_header row; /* Encoding replication request uses fiber()->gc region. */ RegionGuard region_guard(&fiber()->gc); - xrow_encode_vclock_ignore0(&row, vclock); + /* + * Vclock is encoded with 0th component, as in case of checkpoint + * join it corresponds to the vclock of the checkpoint, where 0th + * component is essential, as otherwise signature won't be correct. + * Client sends this vclock in IPROTO_CURSOR, when he wants to + * continue fetching from the same checkpoint. + */ + xrow_encode_vclock(&row, vclock); xstream_write(stream, &row); } blob - 24e99f214bbbb0279c281faa1785767a42e4c5b3 blob + bfa5c4bbb692155fc3219198974a60005bc7d815 --- src/box/xrow.c +++ src/box/xrow.c @@ -64,15 +64,22 @@ mp_sizeof_vclock_ignore0(const struct vclock *vclock) mp_sizeof_uint(UINT64_MAX)); } +static inline uint32_t +mp_sizeof_vclock(const struct vclock *vclock) +{ + uint32_t size = vclock_size(vclock); + return mp_sizeof_map(size) + size * (mp_sizeof_uint(UINT32_MAX) + + mp_sizeof_uint(UINT64_MAX)); +} + static inline char * -mp_encode_vclock_ignore0(char *data, const struct vclock *vclock) +mp_encode_vclock_impl(char *data, const struct vclock *vclock, bool ignore0) { - data = mp_encode_map(data, vclock_size_ignore0(vclock)); struct vclock_iterator it; vclock_iterator_init(&it, vclock); struct vclock_c replica; replica = vclock_iterator_next(&it); - if (replica.id == 0) + if (replica.id == 0 && ignore0) replica = vclock_iterator_next(&it); for ( ; replica.id < VCLOCK_MAX; replica = vclock_iterator_next(&it)) { data = mp_encode_uint(data, replica.id); @@ -81,8 +88,22 @@ mp_encode_vclock_ignore0(char *data, const struct vclo return data; } +static inline char * +mp_encode_vclock_ignore0(char *data, const struct vclock *vclock) +{ + data = mp_encode_map(data, vclock_size_ignore0(vclock)); + return mp_encode_vclock_impl(data, vclock, true); +} + +static inline char * +mp_encode_vclock(char *data, const struct vclock *vclock) +{ + data = mp_encode_map(data, vclock_size(vclock)); + return mp_encode_vclock_impl(data, vclock, false); +} + static int -mp_decode_vclock_ignore0(const char **data, struct vclock *vclock) +mp_decode_vclock(const char **data, struct vclock *vclock) { vclock_create(vclock); if (mp_typeof(**data) != MP_MAP) @@ -95,13 +116,18 @@ mp_decode_vclock_ignore0(const char **data, struct vcl if (mp_typeof(**data) != MP_UINT) return -1; int64_t lsn = mp_decode_uint(data); - /* - * Skip vclock[0] coming from the remote - * instances. - */ - if (lsn > 0 && id != 0) + if (lsn > 0) vclock_follow(vclock, id, lsn); } + return 0; +} + +static int +mp_decode_vclock_ignore0(const char **data, struct vclock *vclock) +{ + if (mp_decode_vclock(data, vclock) != 0) + return -1; + vclock_reset(vclock, 0, 0); return 0; } @@ -1991,6 +2017,8 @@ struct replication_request { struct tt_uuid *instance_uuid; /** IPROTO_VCLOCK. */ struct vclock *vclock_ignore0; + /** IPROTO_VCLOCK. */ + struct vclock *vclock; /** IPROTO_ID_FILTER. */ uint32_t *id_filter; /** IPROTO_SERVER_VERSION. */ @@ -2007,8 +2035,14 @@ xrow_encode_replication_request(struct xrow_header *ro { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX; - if (req->vclock_ignore0 != NULL) + if (req->vclock_ignore0 != NULL) { size += mp_sizeof_vclock_ignore0(req->vclock_ignore0); + assert(req->vclock == NULL); + } + if (req->vclock != NULL) { + size += mp_sizeof_vclock(req->vclock); + assert(req->vclock_ignore0 == NULL); + } char *buf = xregion_alloc(&fiber()->gc, size); /* Skip one byte for future map header. */ char *data = buf + 1; @@ -2027,6 +2061,11 @@ xrow_encode_replication_request(struct xrow_header *ro ++map_size; data = mp_encode_uint(data, IPROTO_VCLOCK); data = mp_encode_vclock_ignore0(data, req->vclock_ignore0); + } + if (req->vclock != NULL) { + ++map_size; + data = mp_encode_uint(data, IPROTO_VCLOCK); + data = mp_encode_vclock(data, req->vclock); } if (req->version_id != NULL) { ++map_size; @@ -2432,6 +2471,15 @@ xrow_encode_vclock_ignore0(struct xrow_header *row, co { const struct replication_request base_req = { .vclock_ignore0 = (struct vclock *)vclock, + }; + xrow_encode_replication_request(row, &base_req, IPROTO_OK); +} + +void +xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock) +{ + const struct replication_request base_req = { + .vclock = (struct vclock *)vclock, }; xrow_encode_replication_request(row, &base_req, IPROTO_OK); } blob - 18a1d4d3969fc01765fe058c54b790cd5d9c5417 blob + 19c86b9a1e148e44c82e73a074b3075f739e2578 --- src/box/xrow.h +++ src/box/xrow.h @@ -675,6 +675,10 @@ xrow_decode_applier_heartbeat(const struct xrow_header void xrow_encode_vclock_ignore0(struct xrow_header *row, const struct vclock *vclock); + +/** Encode vclock including 0th component. */ +void +xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); /** Decode vclock ignoring 0th component. */ int