commit - 9434531b431839b099ee65997f8800126d9b458d
commit + 23c7899e5c81d2080c8feb193823893de1381491
blob - cd8690c1e9f1f4d1ce4ff589f576ec9b1e4323e1
blob + ac94ee024a8eb7468d5755f05c268c0daab945fe
--- src/box/applier.cc
+++ src/box/applier.cc
struct iostream *io = &applier->io;
struct xrow_header row;
+ struct vclock vclock;
+ vclock_create(&vclock);
struct fetch_snapshot_request req = {
.version_id = tarantool_version_id(),
+ /* Applier doesn't support checkpoint join. */
+ .is_checkpoint_join = false,
+ .checkpoint_vclock = vclock,
+ .checkpoint_lsn = 0,
};
RegionGuard region_guard(&fiber()->gc);
xrow_encode_fetch_snapshot(&row, &req);
blob - 58c68b67a0b77e9d99f587d53609ee2ff7859d85
blob + 008a0a1131cab69eaf51488eaeba68b4b3fb96dc
--- src/box/box.cc
+++ src/box/box.cc
"wal_mode = 'none'");
}
- say_info("sending current read-view to replica at %s", sio_socketname(io->fd));
+ say_info("sending read-view to replica at %s", sio_socketname(io->fd));
+ /* Used for checkpoint initial join. */
+ struct checkpoint_cursor cursor;
+ struct checkpoint_cursor *cursor_ptr = NULL;
+ if (req.is_checkpoint_join) {
+ memset(&cursor, 0, sizeof(cursor));
+ cursor.vclock = &req.checkpoint_vclock;
+ cursor.start_lsn = req.checkpoint_lsn;
+ cursor_ptr = &cursor;
+ }
/* Send the snapshot data to the instance. */
struct vclock start_vclock;
- relay_initial_join(io, header->sync, &start_vclock, req.version_id);
+ relay_initial_join(io, header->sync, &start_vclock, req.version_id,
+ cursor_ptr);
say_info("read-view sent.");
/* Remember master's vclock after the last request */
* Initial stream: feed replica with dirty data from engines.
*/
struct vclock start_vclock;
- relay_initial_join(io, header->sync, &start_vclock, req.version_id);
+ relay_initial_join(io, header->sync, &start_vclock, req.version_id,
+ NULL);
say_info("initial data sent.");
/**
blob - 0e8a2ad7e4df64689dacef76b63c90c3113921e6
blob + f521c19c31508330d24cfcfbf215568feb08fe19
--- src/box/engine.h
+++ src/box/engine.h
const struct engine_read_view_vtab *vtab;
/** Link in read_view::engines. */
struct rlist link;
+};
+
+/**
+ * Cursor used during checkpoint initial join. Shared between engines.
+ */
+struct checkpoint_cursor {
+ /** Signature of the checkpoint to take data from. */
+ struct vclock *vclock;
+ /** Checkpoint lsn to start from. */
+ int64_t start_lsn;
+ /** Counter, shared between engines */
+ int64_t lsn_counter;
};
struct engine_join_ctx {
struct vclock *vclock;
/** Whether sending JOIN_META stage is required. */
bool send_meta;
+ /** Checkpoint join cursor. */
+ struct checkpoint_cursor *cursor;
/** Array of engine join contexts, one per each engine. */
void **data;
};
blob - 58c6516638a6eb7b05fc8ef24ee0f05bae1c6603
blob + 16edd280802e0af82efa420efea563277c6c8778
--- src/box/iproto_constants.h
+++ src/box/iproto_constants.h
* authentication method.
*/ \
_(AUTH_TYPE, 0x5b, MP_STR) \
+ /**
+ * Flag indicating whether checkpoint join should be done.
+ */ \
+ _(IS_CHECKPOINT_JOIN, 0x62, MP_BOOL) \
+ /**
+ * Shows the signature of the checkpoint to read from.
+ * Requires CHECKPOINT_JOIN to be true.
+ */ \
+ _(CHECKPOINT_VCLOCK, 0x63, MP_MAP) \
+ /**
+ * Shows the lsn to start sending from. Server sends all rows
+ * >= IPROTO_CHECKPOINT_LSN. Requires CHECKPOINT_JOIN to be
+ * true and CHECKPOINT_VCLOCK to be set.
+ */ \
+ _(CHECKPOINT_LSN, 0x64, MP_UINT) \
#define IPROTO_KEY_MEMBER(s, v, ...) IPROTO_ ## s = v,
blob - 1522705a63c6a771b44673b7227e27fa75c8d032
blob + b38b19a0dbcb346350e8331273e3a562b249b0fd
--- src/box/memtx_engine.cc
+++ src/box/memtx_engine.cc
xrow_encode_synchro(&row, body, synchro_req);
row.replica_id = synchro_req->replica_id;
xstream_write(stream, &row);
+}
+
+#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR)
+#include "memtx_checkpoint_join.cc"
+#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
+static int
+memtx_engine_prepare_checkpoint_join(struct engine *engine,
+ struct engine_join_ctx *ctx)
+{
+ (void)engine;
+ (void)ctx;
+ diag_set(ClientError, ER_UNSUPPORTED, "Community edition",
+ "checkpoint join");
+ return -1;
}
+static int
+memtx_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx,
+ struct xstream *stream)
+{
+ (void)engine;
+ (void)ctx;
+ (void)stream;
+ unreachable();
+ return -1;
+}
+
+static void
+memtx_engine_complete_checkpoint_join(struct engine *engine,
+ struct engine_join_ctx *ctx)
+{
+ (void)engine;
+ (void)ctx;
+}
+
+#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
/** Space filter for replica join. */
static bool
memtx_join_space_filter(struct space *space, void *arg)
static int
memtx_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg)
{
+ if (arg->cursor != NULL)
+ return memtx_engine_prepare_checkpoint_join(engine, arg);
+
struct memtx_join_ctx *ctx =
(struct memtx_join_ctx *)malloc(sizeof(*ctx));
if (ctx == NULL) {
memtx_engine_join(struct engine *engine, struct engine_join_ctx *arg,
struct xstream *stream)
{
+ if (arg->cursor != NULL)
+ return memtx_engine_checkpoint_join(engine, arg, stream);
+
struct memtx_join_ctx *ctx =
(struct memtx_join_ctx *)arg->data[engine->id];
ctx->stream = stream;
static void
memtx_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg)
{
+ if (arg->cursor != NULL)
+ return memtx_engine_complete_checkpoint_join(engine, arg);
+
struct memtx_join_ctx *ctx =
(struct memtx_join_ctx *)arg->data[engine->id];
read_view_close(&ctx->rv);
blob - 43e9000aec58d921dfc12038908594f83dfaaf8b
blob + a9943711e566b8d975d6778a9512d1593aca8494
--- src/box/relay.cc
+++ src/box/relay.cc
void
relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
- uint32_t replica_version_id)
+ uint32_t replica_version_id,
+ struct checkpoint_cursor *cursor)
{
struct relay *relay = relay_new(NULL);
if (relay == NULL)
*/
ctx.send_meta = replica_version_id > 0;
ctx.vclock = vclock;
+ ctx.cursor = cursor;
engine_prepare_join_xc(&ctx);
auto join_guard = make_scoped_guard([&] {
engine_complete_join(&ctx);
blob - ead0d8062358325f37e0a8d24b6d1ac42f8a9d6b
blob + af100c9119bb445aa400229660b7b4725be5c97c
--- src/box/relay.h
+++ src/box/relay.h
struct replica;
struct tt_uuid;
struct vclock;
+struct checkpoint_cursor;
enum relay_state {
/**
* @param sync sync from incoming JOIN request
* @param vclock[out] vclock of the read view sent to the replica
* @param replica_version_id peer's version
+ * @param cursor cursor for checkpoint join, if NULL - read-view join.
*/
void
relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
- uint32_t replica_version_id);
+ uint32_t replica_version_id,
+ struct checkpoint_cursor *cursor);
/**
* Send final JOIN rows to the replica.
blob - 64f619b1daa31201b8841e7983291bf829a4022a
blob + 2c563ce481c39de39cb6f5e4e4707db51825d162
--- src/box/vinyl.c
+++ src/box/vinyl.c
/** Read view at the time when the initial join started. */
struct vy_read_view *rv;
};
+
+#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR)
+#include "vinyl_checkpoint_join.c"
+#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
+static int
+vinyl_engine_prepare_checkpoint_join(struct engine *engine,
+ struct engine_join_ctx *ctx)
+{
+ (void)engine;
+ (void)ctx;
+ unreachable();
+ return -1;
+}
+
+static int
+vinyl_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx,
+ struct xstream *stream)
+{
+ (void)engine;
+ (void)ctx;
+ (void)stream;
+ unreachable();
+ return -1;
+}
+static void
+vinyl_engine_complete_checkpoint_join(struct engine *engine,
+ struct engine_join_ctx *ctx)
+{
+ (void)engine;
+ (void)ctx;
+}
+
+#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
static int
vy_join_add_space(struct space *space, void *arg)
{
static int
vinyl_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg)
{
+ if (arg->cursor != NULL)
+ return vinyl_engine_prepare_checkpoint_join(engine, arg);
+
struct vy_env *env = vy_env(engine);
struct vy_join_ctx *ctx = malloc(sizeof(*ctx));
if (ctx == NULL) {
vinyl_engine_join(struct engine *engine, struct engine_join_ctx *arg,
struct xstream *stream)
{
+ if (arg->cursor != NULL)
+ return vinyl_engine_checkpoint_join(engine, arg, stream);
+
int loops = 0;
struct vy_join_ctx *ctx = arg->data[engine->id];
struct vy_join_entry *join_entry;
static void
vinyl_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg)
{
+ if (arg->cursor != NULL)
+ return vinyl_engine_complete_checkpoint_join(engine, arg);
+
struct vy_env *env = vy_env(engine);
struct vy_join_ctx *ctx = arg->data[engine->id];
struct vy_join_entry *entry, *next;
blob - bfa5c4bbb692155fc3219198974a60005bc7d815
blob + eddf80c9657b1541129740c388456f95e8f441d2
--- src/box/xrow.c
+++ src/box/xrow.c
uint32_t *version_id;
/** IPROTO_REPLICA_ANON. */
bool *is_anon;
+ /** IPROTO_IS_CHECKPOINT_JOIN. */
+ bool *is_checkpoint_join;
+ /** IPROTO_CHECKPOINT_VCLOCK. */
+ struct vclock *checkpoint_vclock;
+ /** IPROTO_CHECKPOINT_LSN. */
+ uint64_t *checkpoint_lsn;
};
/** Encode a replication request template. */
if (val >= VCLOCK_MAX)
goto id_filter_decode_err;
*req->id_filter |= 1 << val;
+ }
+ break;
+ case IPROTO_IS_CHECKPOINT_JOIN:
+ if (req->is_checkpoint_join == NULL)
+ goto skip;
+ if (mp_typeof(*d) != MP_BOOL) {
+ xrow_on_decode_err(
+ row, ER_INVALID_MSGPACK,
+ "invalid IS_CHECKPOINT_JOIN");
+ return -1;
}
+ *req->is_checkpoint_join = mp_decode_bool(&d);
break;
+ case IPROTO_CHECKPOINT_VCLOCK:
+ if (req->checkpoint_vclock == NULL)
+ goto skip;
+ if (mp_decode_vclock(&d, req->checkpoint_vclock) != 0) {
+ xrow_on_decode_err(row, ER_INVALID_MSGPACK,
+ "invalid CHECKPOINT_VCLOCK");
+ return -1;
+ }
+ break;
+ case IPROTO_CHECKPOINT_LSN:
+ if (req->checkpoint_lsn == NULL)
+ goto skip;
+ if (mp_typeof(*d) != MP_UINT) {
+ xrow_on_decode_err(row, ER_INVALID_MSGPACK,
+ "invalid CHECKPOINT_LSN");
+ return -1;
+ }
+ *req->checkpoint_lsn = mp_decode_uint(&d);
+ break;
default: skip:
mp_next(&d); /* value */
}
memset(req, 0, sizeof(*req));
struct replication_request base_req = {
.version_id = &req->version_id,
+ .is_checkpoint_join = &req->is_checkpoint_join,
+ .checkpoint_vclock = &req->checkpoint_vclock,
+ .checkpoint_lsn = &req->checkpoint_lsn,
};
+ /*
+ * Vclock must be cleared, as it sets -1 signature, which cannot be
+ * done by memset above. This is done in order to distinguish not
+ * initialized vclock from the zero one.
+ */
+ vclock_clear(&req->checkpoint_vclock);
return xrow_decode_replication_request(row, &base_req);
}
blob - 19c86b9a1e148e44c82e73a074b3075f739e2578
blob + cf20865faa5b5c344da19dca99e6f3d27e731cb7
--- src/box/xrow.h
+++ src/box/xrow.h
struct fetch_snapshot_request {
/** Replica's version. */
uint32_t version_id;
+ /** Flag indicating whether checkpoint join should be done. */
+ bool is_checkpoint_join;
+ /** Checkpoint's vclock, signature of the snapshot. */
+ struct vclock checkpoint_vclock;
+ /** Checkpoint's lsn, the row number to start from. */
+ uint64_t checkpoint_lsn;
};
/** Encode FETCH_SNAPSHOT request. */
blob - 5d4cde0ab0bad91ee10e1c54173ee9fbdaa7ba3a
blob + dfdb9129d6ec47244ae4ed0baf417f4864758339
--- src/trivia/config.h.cmake
+++ src/trivia/config.h.cmake
#cmakedefine ENABLE_READ_VIEW 1
#cmakedefine ENABLE_SECURITY 1
#cmakedefine ENABLE_COMPRESS_MODULE 1
+#cmakedefine ENABLE_FETCH_SNAPSHOT_CURSOR 1
#cmakedefine EXPORT_LIBCURL_SYMBOLS 1
blob - a300fc05e5b0cd02d05d101d473d6b7d5ec777d1
blob + 1c94a177118fb3e08e192fdeaf0970f22ebda9db
--- test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua
+++ test/box-luatest/gh_7894_export_iproto_constants_and_features_test.lua
TXN_ISOLATION = 0x59,
VCLOCK_SYNC = 0x5a,
AUTH_TYPE = 0x5b,
+ IS_CHECKPOINT_JOIN = 0x62,
+ CHECKPOINT_VCLOCK = 0x63,
+ CHECKPOINT_LSN = 0x64,
},
-- `iproto_metadata_key` enumeration.