Commit Diff


commit - 9434531b431839b099ee65997f8800126d9b458d
commit + 23c7899e5c81d2080c8feb193823893de1381491
blob - cd8690c1e9f1f4d1ce4ff589f576ec9b1e4323e1
blob + ac94ee024a8eb7468d5755f05c268c0daab945fe
--- src/box/applier.cc
+++ src/box/applier.cc
@@ -870,8 +870,14 @@ applier_fetch_snapshot(struct applier *applier)
 	struct iostream *io = &applier->io;
 	struct xrow_header row;
 
+	struct vclock vclock;
+	vclock_create(&vclock);
 	struct fetch_snapshot_request req = {
 		.version_id = tarantool_version_id(),
+		/* Applier doesn't support checkpoint join. */
+		.is_checkpoint_join = false,
+		.checkpoint_vclock = vclock,
+		.checkpoint_lsn = 0,
 	};
 	RegionGuard region_guard(&fiber()->gc);
 	xrow_encode_fetch_snapshot(&row, &req);
blob - 58c68b67a0b77e9d99f587d53609ee2ff7859d85
blob + 008a0a1131cab69eaf51488eaeba68b4b3fb96dc
--- src/box/box.cc
+++ src/box/box.cc
@@ -3944,11 +3944,21 @@ box_process_fetch_snapshot(struct iostream *io,
 			  "wal_mode = 'none'");
 	}
 
-	say_info("sending current read-view to replica at %s", sio_socketname(io->fd));
+	say_info("sending read-view to replica at %s", sio_socketname(io->fd));
+	/* Used for checkpoint initial join. */
+	struct checkpoint_cursor cursor;
+	struct checkpoint_cursor *cursor_ptr = NULL;
+	if (req.is_checkpoint_join) {
+		memset(&cursor, 0, sizeof(cursor));
+		cursor.vclock = &req.checkpoint_vclock;
+		cursor.start_lsn = req.checkpoint_lsn;
+		cursor_ptr = &cursor;
+	}
 
 	/* Send the snapshot data to the instance. */
 	struct vclock start_vclock;
-	relay_initial_join(io, header->sync, &start_vclock, req.version_id);
+	relay_initial_join(io, header->sync, &start_vclock, req.version_id,
+			   cursor_ptr);
 	say_info("read-view sent.");
 
 	/* Remember master's vclock after the last request */
@@ -4182,7 +4192,8 @@ box_process_join(struct iostream *io, const struct xro
 	 * Initial stream: feed replica with dirty data from engines.
 	 */
 	struct vclock start_vclock;
-	relay_initial_join(io, header->sync, &start_vclock, req.version_id);
+	relay_initial_join(io, header->sync, &start_vclock, req.version_id,
+			   NULL);
 	say_info("initial data sent.");
 
 	/**
blob - 0e8a2ad7e4df64689dacef76b63c90c3113921e6
blob + f521c19c31508330d24cfcfbf215568feb08fe19
--- src/box/engine.h
+++ src/box/engine.h
@@ -304,6 +304,18 @@ struct engine_read_view {
 	const struct engine_read_view_vtab *vtab;
 	/** Link in read_view::engines. */
 	struct rlist link;
+};
+
+/**
+ * Cursor used during checkpoint initial join. Shared between engines.
+ */
+struct checkpoint_cursor {
+	/** Signature of the checkpoint to take data from. */
+	struct vclock *vclock;
+	/** Checkpoint lsn to start from. */
+	int64_t start_lsn;
+	/** Counter, shared between engines */
+	int64_t lsn_counter;
 };
 
 struct engine_join_ctx {
@@ -311,6 +323,8 @@ struct engine_join_ctx {
 	struct vclock *vclock;
 	/** Whether sending JOIN_META stage is required. */
 	bool send_meta;
+	/** Checkpoint join cursor. */
+	struct checkpoint_cursor *cursor;
 	/** Array of engine join contexts, one per each engine. */
 	void **data;
 };
blob - 58c6516638a6eb7b05fc8ef24ee0f05bae1c6603
blob + 16edd280802e0af82efa420efea563277c6c8778
--- src/box/iproto_constants.h
+++ src/box/iproto_constants.h
@@ -194,6 +194,21 @@ extern const char *iproto_flag_bit_strs[];
 	 * authentication method.
 	 */								\
 	_(AUTH_TYPE, 0x5b, MP_STR)					\
+	 /**
+	  * Flag indicating whether checkpoint join should be done.
+	  */								\
+	 _(IS_CHECKPOINT_JOIN, 0x62, MP_BOOL)				\
+	 /**
+	  * Shows the signature of the checkpoint to read from.
+	  * Requires CHECKPOINT_JOIN to be true.
+	  */								\
+	 _(CHECKPOINT_VCLOCK, 0x63, MP_MAP)				\
+	 /**
+	  * Shows the lsn to start sending from. Server sends all rows
+	  * >= IPROTO_CHECKPOINT_LSN. Requires CHECKPOINT_JOIN to be
+	  * true and CHECKPOINT_VCLOCK to be set.
+	  */								\
+	 _(CHECKPOINT_LSN, 0x64, MP_UINT)				\
 
 #define IPROTO_KEY_MEMBER(s, v, ...) IPROTO_ ## s = v,
 
blob - 1522705a63c6a771b44673b7227e27fa75c8d032
blob + b38b19a0dbcb346350e8331273e3a562b249b0fd
--- src/box/memtx_engine.cc
+++ src/box/memtx_engine.cc
@@ -1248,8 +1248,44 @@ send_join_meta(struct xstream *stream, const struct ra
 	xrow_encode_synchro(&row, body, synchro_req);
 	row.replica_id = synchro_req->replica_id;
 	xstream_write(stream, &row);
+}
+
+#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR)
+#include "memtx_checkpoint_join.cc"
+#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
+static int
+memtx_engine_prepare_checkpoint_join(struct engine *engine,
+				     struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+	diag_set(ClientError, ER_UNSUPPORTED, "Community edition",
+		 "checkpoint join");
+	return -1;
 }
 
+static int
+memtx_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx,
+			     struct xstream *stream)
+{
+	(void)engine;
+	(void)ctx;
+	(void)stream;
+	unreachable();
+	return -1;
+}
+
+static void
+memtx_engine_complete_checkpoint_join(struct engine *engine,
+				      struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+}
+
+#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
 /** Space filter for replica join. */
 static bool
 memtx_join_space_filter(struct space *space, void *arg)
@@ -1263,6 +1299,9 @@ memtx_join_space_filter(struct space *space, void *arg
 static int
 memtx_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return memtx_engine_prepare_checkpoint_join(engine, arg);
+
 	struct memtx_join_ctx *ctx =
 		(struct memtx_join_ctx *)malloc(sizeof(*ctx));
 	if (ctx == NULL) {
@@ -1343,6 +1382,9 @@ static int
 memtx_engine_join(struct engine *engine, struct engine_join_ctx *arg,
 		  struct xstream *stream)
 {
+	if (arg->cursor != NULL)
+		return memtx_engine_checkpoint_join(engine, arg, stream);
+
 	struct memtx_join_ctx *ctx =
 		(struct memtx_join_ctx *)arg->data[engine->id];
 	ctx->stream = stream;
@@ -1401,6 +1443,9 @@ memtx_engine_join(struct engine *engine, struct engine
 static void
 memtx_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return memtx_engine_complete_checkpoint_join(engine, arg);
+
 	struct memtx_join_ctx *ctx =
 		(struct memtx_join_ctx *)arg->data[engine->id];
 	read_view_close(&ctx->rv);
blob - 43e9000aec58d921dfc12038908594f83dfaaf8b
blob + a9943711e566b8d975d6778a9512d1593aca8494
--- src/box/relay.cc
+++ src/box/relay.cc
@@ -458,7 +458,8 @@ relay_cord_init(struct relay *relay)
 
 void
 relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
-		   uint32_t replica_version_id)
+		   uint32_t replica_version_id,
+		   struct checkpoint_cursor *cursor)
 {
 	struct relay *relay = relay_new(NULL);
 	if (relay == NULL)
@@ -480,6 +481,7 @@ relay_initial_join(struct iostream *io, uint64_t sync,
 	 */
 	ctx.send_meta = replica_version_id > 0;
 	ctx.vclock = vclock;
+	ctx.cursor = cursor;
 	engine_prepare_join_xc(&ctx);
 	auto join_guard = make_scoped_guard([&] {
 		engine_complete_join(&ctx);
blob - ead0d8062358325f37e0a8d24b6d1ac42f8a9d6b
blob + af100c9119bb445aa400229660b7b4725be5c97c
--- src/box/relay.h
+++ src/box/relay.h
@@ -42,6 +42,7 @@ struct relay;
 struct replica;
 struct tt_uuid;
 struct vclock;
+struct checkpoint_cursor;
 
 enum relay_state {
 	/**
@@ -126,10 +127,12 @@ relay_push_raft(struct relay *relay, const struct raft
  * @param sync      sync from incoming JOIN request
  * @param vclock[out] vclock of the read view sent to the replica
  * @param replica_version_id peer's version
+ * @param cursor    cursor for checkpoint join, if NULL - read-view join.
  */
 void
 relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
-		   uint32_t replica_version_id);
+		   uint32_t replica_version_id,
+		   struct checkpoint_cursor *cursor);
 
 /**
  * Send final JOIN rows to the replica.
blob - 64f619b1daa31201b8841e7983291bf829a4022a
blob + 2c563ce481c39de39cb6f5e4e4707db51825d162
--- src/box/vinyl.c
+++ src/box/vinyl.c
@@ -3030,7 +3030,42 @@ struct vy_join_ctx {
 	/** Read view at the time when the initial join started. */
 	struct vy_read_view *rv;
 };
+
+#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR)
+#include "vinyl_checkpoint_join.c"
+#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
+static int
+vinyl_engine_prepare_checkpoint_join(struct engine *engine,
+				     struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+	unreachable();
+	return -1;
+}
+
+static int
+vinyl_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx,
+			     struct xstream *stream)
+{
+	(void)engine;
+	(void)ctx;
+	(void)stream;
+	unreachable();
+	return -1;
+}
 
+static void
+vinyl_engine_complete_checkpoint_join(struct engine *engine,
+				      struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+}
+
+#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
 static int
 vy_join_add_space(struct space *space, void *arg)
 {
@@ -3065,6 +3100,9 @@ vy_join_add_space(struct space *space, void *arg)
 static int
 vinyl_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return vinyl_engine_prepare_checkpoint_join(engine, arg);
+
 	struct vy_env *env = vy_env(engine);
 	struct vy_join_ctx *ctx = malloc(sizeof(*ctx));
 	if (ctx == NULL) {
@@ -3106,6 +3144,9 @@ static int
 vinyl_engine_join(struct engine *engine, struct engine_join_ctx *arg,
 		  struct xstream *stream)
 {
+	if (arg->cursor != NULL)
+		return vinyl_engine_checkpoint_join(engine, arg, stream);
+
 	int loops = 0;
 	struct vy_join_ctx *ctx = arg->data[engine->id];
 	struct vy_join_entry *join_entry;
@@ -3131,6 +3172,9 @@ vinyl_engine_join(struct engine *engine, struct engine
 static void
 vinyl_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return vinyl_engine_complete_checkpoint_join(engine, arg);
+
 	struct vy_env *env = vy_env(engine);
 	struct vy_join_ctx *ctx = arg->data[engine->id];
 	struct vy_join_entry *entry, *next;
blob - bfa5c4bbb692155fc3219198974a60005bc7d815
blob + eddf80c9657b1541129740c388456f95e8f441d2
--- src/box/xrow.c
+++ src/box/xrow.c
@@ -2025,6 +2025,12 @@ struct replication_request {
 	uint32_t *version_id;
 	/** IPROTO_REPLICA_ANON. */
 	bool *is_anon;
+	/** IPROTO_IS_CHECKPOINT_JOIN. */
+	bool *is_checkpoint_join;
+	/** IPROTO_CHECKPOINT_VCLOCK. */
+	struct vclock *checkpoint_vclock;
+	/** IPROTO_CHECKPOINT_LSN. */
+	uint64_t *checkpoint_lsn;
 };
 
 /** Encode a replication request template. */
@@ -2188,8 +2194,38 @@ id_filter_decode_err:		xrow_on_decode_err(row, ER_INVA
 				if (val >= VCLOCK_MAX)
 					goto id_filter_decode_err;
 				*req->id_filter |= 1 << val;
+			}
+			break;
+		case IPROTO_IS_CHECKPOINT_JOIN:
+			if (req->is_checkpoint_join == NULL)
+				goto skip;
+			if (mp_typeof(*d) != MP_BOOL) {
+				xrow_on_decode_err(
+					row, ER_INVALID_MSGPACK,
+					"invalid IS_CHECKPOINT_JOIN");
+				return -1;
 			}
+			*req->is_checkpoint_join = mp_decode_bool(&d);
 			break;
+		case IPROTO_CHECKPOINT_VCLOCK:
+			if (req->checkpoint_vclock == NULL)
+				goto skip;
+			if (mp_decode_vclock(&d, req->checkpoint_vclock) != 0) {
+				xrow_on_decode_err(row, ER_INVALID_MSGPACK,
+						   "invalid CHECKPOINT_VCLOCK");
+				return -1;
+			}
+			break;
+		case IPROTO_CHECKPOINT_LSN:
+			if (req->checkpoint_lsn == NULL)
+				goto skip;
+			if (mp_typeof(*d) != MP_UINT) {
+				xrow_on_decode_err(row, ER_INVALID_MSGPACK,
+						   "invalid CHECKPOINT_LSN");
+				return -1;
+			}
+			*req->checkpoint_lsn = mp_decode_uint(&d);
+			break;
 		default: skip:
 			mp_next(&d); /* value */
 		}
@@ -2294,7 +2330,16 @@ xrow_decode_fetch_snapshot(const struct xrow_header *r
 	memset(req, 0, sizeof(*req));
 	struct replication_request base_req = {
 		.version_id = &req->version_id,
+		.is_checkpoint_join = &req->is_checkpoint_join,
+		.checkpoint_vclock = &req->checkpoint_vclock,
+		.checkpoint_lsn = &req->checkpoint_lsn,
 	};
+	/*
+	 * Vclock must be cleared, as it sets -1 signature, which cannot be
+	 * done by memset above. This is done in order to distinguish not
+	 * initialized vclock from the zero one.
+	 */
+	vclock_clear(&req->checkpoint_vclock);
 	return xrow_decode_replication_request(row, &base_req);
 }
 
blob - 19c86b9a1e148e44c82e73a074b3075f739e2578
blob + cf20865faa5b5c344da19dca99e6f3d27e731cb7
--- src/box/xrow.h
+++ src/box/xrow.h
@@ -614,6 +614,12 @@ xrow_decode_join(const struct xrow_header *row, struct
 struct fetch_snapshot_request {
 	/** Replica's version. */
 	uint32_t version_id;
+	/** Flag indicating whether checkpoint join should be done. */
+	bool is_checkpoint_join;
+	/** Checkpoint's vclock, signature of the snapshot. */
+	struct vclock checkpoint_vclock;
+	/** Checkpoint's lsn, the row number to start from. */
+	uint64_t checkpoint_lsn;
 };
 
 /** Encode FETCH_SNAPSHOT request. */
blob - 5d4cde0ab0bad91ee10e1c54173ee9fbdaa7ba3a
blob + dfdb9129d6ec47244ae4ed0baf417f4864758339
--- src/trivia/config.h.cmake
+++ src/trivia/config.h.cmake
@@ -288,6 +288,7 @@
 #cmakedefine ENABLE_READ_VIEW 1
 #cmakedefine ENABLE_SECURITY 1
 #cmakedefine ENABLE_COMPRESS_MODULE 1
+#cmakedefine ENABLE_FETCH_SNAPSHOT_CURSOR 1
 
 #cmakedefine EXPORT_LIBCURL_SYMBOLS 1
 
blob - a300fc05e5b0cd02d05d101d473d6b7d5ec777d1
blob + 1c94a177118fb3e08e192fdeaf0970f22ebda9db
--- test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua
+++ test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua
@@ -80,6 +80,9 @@ local reference_table = {
         TXN_ISOLATION = 0x59,
         VCLOCK_SYNC = 0x5a,
         AUTH_TYPE = 0x5b,
+        IS_CHECKPOINT_JOIN = 0x62,
+        CHECKPOINT_VCLOCK = 0x63,
+        CHECKPOINT_LSN = 0x64,
     },
 
     -- `iproto_metadata_key` enumeration.