Commit Diff


commit - 4de3d0d620adcb566119f55bc6722eef624310f1
commit + 9434531b431839b099ee65997f8800126d9b458d
blob - 75a6f975a9e757fc0cf61dc7375dae2faa01e078
blob + 1522705a63c6a771b44673b7227e27fa75c8d032
--- src/box/memtx_engine.cc
+++ src/box/memtx_engine.cc
@@ -1218,7 +1218,14 @@ send_join_header(struct xstream *stream, const struct 
 	struct xrow_header row;
 	/* Encoding replication request uses fiber()->gc region. */
 	RegionGuard region_guard(&fiber()->gc);
-	xrow_encode_vclock_ignore0(&row, vclock);
+	/*
+	 * Vclock is encoded with 0th component, as in case of checkpoint
+	 * join it corresponds to the vclock of the checkpoint, where 0th
+	 * component is essential, as otherwise signature won't be correct.
+	 * Client sends this vclock in IPROTO_CURSOR, when he wants to
+	 * continue fetching from the same checkpoint.
+	 */
+	xrow_encode_vclock(&row, vclock);
 	xstream_write(stream, &row);
 }
 
blob - 24e99f214bbbb0279c281faa1785767a42e4c5b3
blob + bfa5c4bbb692155fc3219198974a60005bc7d815
--- src/box/xrow.c
+++ src/box/xrow.c
@@ -64,15 +64,22 @@ mp_sizeof_vclock_ignore0(const struct vclock *vclock)
 					     mp_sizeof_uint(UINT64_MAX));
 }
 
+static inline uint32_t
+mp_sizeof_vclock(const struct vclock *vclock)
+{
+	uint32_t size = vclock_size(vclock);
+	return mp_sizeof_map(size) + size * (mp_sizeof_uint(UINT32_MAX) +
+					     mp_sizeof_uint(UINT64_MAX));
+}
+
 static inline char *
-mp_encode_vclock_ignore0(char *data, const struct vclock *vclock)
+mp_encode_vclock_impl(char *data, const struct vclock *vclock, bool ignore0)
 {
-	data = mp_encode_map(data, vclock_size_ignore0(vclock));
 	struct vclock_iterator it;
 	vclock_iterator_init(&it, vclock);
 	struct vclock_c replica;
 	replica = vclock_iterator_next(&it);
-	if (replica.id == 0)
+	if (replica.id == 0 && ignore0)
 		replica = vclock_iterator_next(&it);
 	for ( ; replica.id < VCLOCK_MAX; replica = vclock_iterator_next(&it)) {
 		data = mp_encode_uint(data, replica.id);
@@ -81,8 +88,22 @@ mp_encode_vclock_ignore0(char *data, const struct vclo
 	return data;
 }
 
+static inline char *
+mp_encode_vclock_ignore0(char *data, const struct vclock *vclock)
+{
+	data = mp_encode_map(data, vclock_size_ignore0(vclock));
+	return mp_encode_vclock_impl(data, vclock, true);
+}
+
+static inline char *
+mp_encode_vclock(char *data, const struct vclock *vclock)
+{
+	data = mp_encode_map(data, vclock_size(vclock));
+	return mp_encode_vclock_impl(data, vclock, false);
+}
+
 static int
-mp_decode_vclock_ignore0(const char **data, struct vclock *vclock)
+mp_decode_vclock(const char **data, struct vclock *vclock)
 {
 	vclock_create(vclock);
 	if (mp_typeof(**data) != MP_MAP)
@@ -95,13 +116,18 @@ mp_decode_vclock_ignore0(const char **data, struct vcl
 		if (mp_typeof(**data) != MP_UINT)
 			return -1;
 		int64_t lsn = mp_decode_uint(data);
-		/*
-		 * Skip vclock[0] coming from the remote
-		 * instances.
-		 */
-		if (lsn > 0 && id != 0)
+		if (lsn > 0)
 			vclock_follow(vclock, id, lsn);
 	}
+	return 0;
+}
+
+static int
+mp_decode_vclock_ignore0(const char **data, struct vclock *vclock)
+{
+	if (mp_decode_vclock(data, vclock) != 0)
+		return -1;
+	vclock_reset(vclock, 0, 0);
 	return 0;
 }
 
@@ -1991,6 +2017,8 @@ struct replication_request {
 	struct tt_uuid *instance_uuid;
 	/** IPROTO_VCLOCK. */
 	struct vclock *vclock_ignore0;
+	/** IPROTO_VCLOCK. */
+	struct vclock *vclock;
 	/** IPROTO_ID_FILTER. */
 	uint32_t *id_filter;
 	/** IPROTO_SERVER_VERSION. */
@@ -2007,8 +2035,14 @@ xrow_encode_replication_request(struct xrow_header *ro
 {
 	memset(row, 0, sizeof(*row));
 	size_t size = XROW_BODY_LEN_MAX;
-	if (req->vclock_ignore0 != NULL)
+	if (req->vclock_ignore0 != NULL) {
 		size += mp_sizeof_vclock_ignore0(req->vclock_ignore0);
+		assert(req->vclock == NULL);
+	}
+	if (req->vclock != NULL) {
+		size += mp_sizeof_vclock(req->vclock);
+		assert(req->vclock_ignore0 == NULL);
+	}
 	char *buf = xregion_alloc(&fiber()->gc, size);
 	/* Skip one byte for future map header. */
 	char *data = buf + 1;
@@ -2027,6 +2061,11 @@ xrow_encode_replication_request(struct xrow_header *ro
 		++map_size;
 		data = mp_encode_uint(data, IPROTO_VCLOCK);
 		data = mp_encode_vclock_ignore0(data, req->vclock_ignore0);
+	}
+	if (req->vclock != NULL) {
+		++map_size;
+		data = mp_encode_uint(data, IPROTO_VCLOCK);
+		data = mp_encode_vclock(data, req->vclock);
 	}
 	if (req->version_id != NULL) {
 		++map_size;
@@ -2432,6 +2471,15 @@ xrow_encode_vclock_ignore0(struct xrow_header *row, co
 {
 	const struct replication_request base_req = {
 		.vclock_ignore0 = (struct vclock *)vclock,
+	};
+	xrow_encode_replication_request(row, &base_req, IPROTO_OK);
+}
+
+void
+xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock)
+{
+	const struct replication_request base_req = {
+		.vclock = (struct vclock *)vclock,
 	};
 	xrow_encode_replication_request(row, &base_req, IPROTO_OK);
 }
blob - 18a1d4d3969fc01765fe058c54b790cd5d9c5417
blob + 19c86b9a1e148e44c82e73a074b3075f739e2578
--- src/box/xrow.h
+++ src/box/xrow.h
@@ -675,6 +675,10 @@ xrow_decode_applier_heartbeat(const struct xrow_header
 void
 xrow_encode_vclock_ignore0(struct xrow_header *row,
 			   const struct vclock *vclock);
+
+/** Encode vclock including 0th component. */
+void
+xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
 
 /** Decode vclock ignoring 0th component. */
 int