commit 8a25d170ea897d5aa2f1a81e1cbe8292596a6a6d from: Vladimir Davydov via: Vladimir Davydov date: Fri Jun 30 07:18:31 2023 UTC lua/xlog: don't ignore unknown header fields The xlog reader Lua module uses the xlog_cursor_next_row, which decodes the row header with xrow_header_decode. The latter silently ignores any unknown fields, which complicates catching bugs when garbage is written to a row header by mistake, for example, see #8783. Let's parse a row header without using xrow_header_decode in the xlog reader module, like we parse a row body, and output all unknown/invalid keys as is. To do that, we have to extend the xlog cursor API with the new method xlog_cursor_next_row_raw that returns a pointer to the position in the tx buffer where the next xrow is stored without advancing it. To avoid a memory leak in case the caller fails to parse an xrow returned by this function, we also have to move the call to xlog_tx_cursor_destroy from xlog_tx_cursor_next_row to xlog_cursor_next_tx. While we are at it, - Don't raise an error if a key type encountered in a row body is invalid (not an integer). Instead, silently ignore such keys. - Remove the useless body MsgPack validness check because we already check it after decoding the header. - Add error injection based tests to check all the corner cases. NO_DOC=bug fix commit - f058cee74eca494e8fca8e3aae6b94ce8105165b commit + 8a25d170ea897d5aa2f1a81e1cbe8292596a6a6d blob - /dev/null blob + 36699bd478ce44d567365f0148a043f69d65c7f7 (mode 644) --- /dev/null +++ changelogs/unreleased/xlog-reader-show-unknown-header-fields.md @@ -0,0 +1,4 @@ +## bugfix/lua + +* Fixed the xlog reader Lua module to show unknown row header fields. Before + this change the xlog reader silently skipped them. blob - 3234d011a2f06e74143e59bd15aa90a4e3aeed50 blob + 43a6880f927e0e2fdc21cf0647dd39c8b0451734 --- src/box/lua/xlog.c +++ src/box/lua/xlog.c @@ -39,7 +39,6 @@ #include #include -#include #include #include #include @@ -90,11 +89,19 @@ lbox_xlog_pushkey(lua_State *L, const char *key) luaL_pushresult(&b); } +/** + * Helper function for lbox_xlog_parse_body that parses one key value pair + * and adds it to the result table. The MsgPack data must be checked. + */ static void -lbox_xlog_parse_body_kv(struct lua_State *L, int type, const char **beg, const char *end) +lbox_xlog_parse_body_kv(struct lua_State *L, int type, const char **beg) { - if (mp_typeof(**beg) != MP_UINT) - luaL_error(L, "Broken type of body key"); + if (mp_typeof(**beg) != MP_UINT) { + /* Invalid key type - ignore. */ + mp_next(beg); + mp_next(beg); + return; + } uint32_t v = mp_decode_uint(beg); if (iproto_type_is_dml(type) && iproto_key_name(v)) { /* @@ -129,30 +136,135 @@ lbox_xlog_parse_body_kv(struct lua_State *L, int type, /* * Push Lua objects */ - const char *tmp = *beg; - if (mp_check(&tmp, end) != 0) { - lua_pushstring(L, ""); - } else { - luamp_decode(L, luaL_msgpack_default, beg); - } + luamp_decode(L, luaL_msgpack_default, beg); } lua_settable(L, -3); } -static int -lbox_xlog_parse_body(struct lua_State *L, int type, const char *ptr, size_t len) +/** + * Parses a request body and pushes it to the Lua stack. + * The MsgPack data must be checked and represent a map. + */ +static void +lbox_xlog_parse_body(struct lua_State *L, int type, const char **beg) { - const char **beg = &ptr; - const char *end = ptr + len; - if (mp_typeof(**beg) != MP_MAP) - return -1; + lua_newtable(L); uint32_t size = mp_decode_map(beg); - uint32_t i; - for (i = 0; i < size && *beg < end; i++) - lbox_xlog_parse_body_kv(L, type, beg, end); - if (i != size) - say_warn("warning: decoded %u values from" - " MP_MAP, %u expected", i, size); + for (uint32_t i = 0; i < size; i++) + lbox_xlog_parse_body_kv(L, type, beg); +} + +/** + * Parses a row and pushes it along with its LSN to the Lua stack. + * On success returns the number of values pushed (> 0). On EOF returns 0. + */ +static int +lbox_xlog_parse_row(struct lua_State *L, const char **pos, const char *end) +{ + int top = lua_gettop(L); + const char *tmp = *pos; + if (mp_check(&tmp, end) != 0 || mp_typeof(**pos) != MP_MAP) + goto bad_row; + /* + * Sic: The nrec argument of lua_createtable is chosen so that the + * output looks pretty when encoded in YAML. + */ + lua_createtable(L, 0, 8); + lua_pushliteral(L, "HEADER"); + lua_createtable(L, 0, 8); + uint64_t type = 0; + uint64_t tsn = 0; + uint64_t lsn = 0; + bool has_tsn = false; + bool is_commit = false; + uint32_t size = mp_decode_map(pos); + for (uint32_t i = 0; i < size; i++) { + if (mp_typeof(**pos) != MP_UINT) { + /* Invalid key type - ignore. */ + mp_next(pos); + mp_next(pos); + continue; + } + uint64_t key = mp_decode_uint(pos); + const char *key_name = iproto_key_name(key); + if (key < iproto_key_MAX && + mp_typeof(**pos) != iproto_key_type[key]) { + /* Bad value type - dump as is. */ + goto dump; + } + switch (key) { + case IPROTO_REQUEST_TYPE: { + type = mp_decode_uint(pos); + lua_pushliteral(L, "type"); + const char *type_name = iproto_type_name(type); + if (type_name != NULL) + lua_pushstring(L, type_name); + else + luaL_pushuint64(L, type); + lua_settable(L, -3); + continue; + } + case IPROTO_FLAGS: { + /* We're only interested in the commit flag. */ + uint64_t flags = mp_decode_uint(pos); + if ((flags & IPROTO_FLAG_COMMIT) != 0) + is_commit = true; + continue; + } + case IPROTO_TSN: + /* + * TSN is encoded as diff so we dump it after we finish + * parsing the header. + */ + tsn = mp_decode_uint(pos); + has_tsn = true; + continue; + case IPROTO_LSN: + /* Remember LSN to calculate TSN later. */ + tmp = *pos; + lsn = mp_decode_uint(&tmp); + break; + default: + break; + } +dump: + if (key_name != NULL) + lbox_xlog_pushkey(L, key_name); + else + luaL_pushuint64(L, key); + luamp_decode(L, luaL_msgpack_default, pos); + lua_settable(L, -3); + } + /* The commit flag isn't set for single-statement transactions. */ + if (!has_tsn) + is_commit = true; + tsn = lsn - tsn; + /* Show TSN and commit flag only for multi-statement transactions. */ + if (tsn != lsn || !is_commit) { + lua_pushliteral(L, "tsn"); + luaL_pushuint64(L, tsn); + lua_settable(L, -3); + } + if (is_commit && tsn != lsn) { + lua_pushliteral(L, "commit"); + lua_pushboolean(L, true); + lua_settable(L, -3); + } + lua_settable(L, -3); /* HEADER */ + if (*pos < end && type != IPROTO_NOP) { + tmp = *pos; + if (mp_check(&tmp, end) != 0 || mp_typeof(**pos) != MP_MAP) + goto bad_row; + lua_pushliteral(L, "BODY"); + lbox_xlog_parse_body(L, type, pos); + lua_settable(L, -3); /* BODY */ + } + luaL_pushuint64(L, lsn); + lua_insert(L, -2); + return 2; +bad_row: + /* Silently assume EOF on bad row. */ + lua_settop(L, top); return 0; } @@ -161,17 +273,17 @@ lbox_xlog_parser_iterate(struct lua_State *L) { struct xlog_cursor *cur = lbox_checkcursor(L, 1, "xlog:pairs()"); - struct xrow_header row; int rc = 0; /* skip all bad read requests */ while (true) { - rc = xlog_cursor_next_row(cur, &row); - if (rc == 0) - break; - if (rc < 0) { - struct error *e = diag_last_error(diag_get()); - if (e->type != &type_XlogError) - luaT_error(L); + const char **data; + const char *end; + rc = xlog_cursor_next_row_raw(cur, &data, &end); + assert(rc >= 0); + if (rc == 0) { + rc = lbox_xlog_parse_row(L, data, end); + if (rc > 0) + return rc; } while ((rc = xlog_cursor_next_tx(cur)) < 0) { struct error *e = diag_last_error(diag_get()); @@ -185,74 +297,7 @@ lbox_xlog_parser_iterate(struct lua_State *L) if (rc == 1) break; } - if (rc == 1) - return 0; /* EOF */ - assert(rc == 0); - - lua_pushinteger(L, row.lsn); - lua_createtable(L, 0, 8); - lua_pushstring(L, "HEADER"); - - lua_createtable(L, 0, 8); - lua_pushstring(L, "type"); - const char *typename = iproto_type_name(row.type); - if (typename != NULL) { - lua_pushstring(L, typename); - } else { - lua_pushnumber(L, row.type); /* unknown key */ - } - lua_settable(L, -3); /* type */ - if (row.sync != 0) { - lbox_xlog_pushkey(L, iproto_key_name(IPROTO_SYNC)); - lua_pushinteger(L, row.sync); - lua_settable(L, -3); /* sync */ - } - if (row.lsn != 0) { - lbox_xlog_pushkey(L, iproto_key_name(IPROTO_LSN)); - lua_pushinteger(L, row.lsn); - lua_settable(L, -3); /* lsn */ - } - if (row.replica_id != 0) { - lbox_xlog_pushkey(L, iproto_key_name(IPROTO_REPLICA_ID)); - lua_pushinteger(L, row.replica_id); - lua_settable(L, -3); /* replica_id */ - } - if (row.group_id != 0) { - lbox_xlog_pushkey(L, iproto_key_name(IPROTO_GROUP_ID)); - lua_pushinteger(L, row.group_id); - lua_settable(L, -3); /* group_id */ - } - if (row.tm != 0) { - lbox_xlog_pushkey(L, iproto_key_name(IPROTO_TIMESTAMP)); - lua_pushnumber(L, row.tm); - lua_settable(L, -3); /* timestamp */ - } - if (row.tsn != row.lsn || !row.is_commit) { - lua_pushstring(L, "tsn"); - lua_pushnumber(L, row.tsn); - lua_settable(L, -3); /* transaction identifier */ - } - if (row.is_commit && row.tsn != row.lsn) { - lua_pushstring(L, "commit"); - lua_pushboolean(L, true); - /* - * is_commit, set for last row in multi-statement - * transaction - */ - lua_settable(L, -3); - } - - lua_settable(L, -3); /* HEADER */ - - if (row.bodycnt > 0) { - assert(row.bodycnt == 1); - lua_pushstring(L, "BODY"); - lua_newtable(L); - lbox_xlog_parse_body(L, row.type, row.body[0].iov_base, - row.body[0].iov_len); - lua_settable(L, -3); /* BODY */ - } - return 2; + return 0; } /* }}} */ blob - fc088d20cdde9490dddfe973e4855a2ec951136c blob + 6efd210db9b8c27e7e324d9f0c1dfee6e1474fd5 --- src/box/xlog.c +++ src/box/xlog.c @@ -1864,7 +1864,18 @@ xlog_tx_cursor_next_row(struct xlog_tx_cursor *tx_curs ibuf_reset(&tx_cursor->rows); return -1; } + + return 0; +} +int +xlog_tx_cursor_next_row_raw(struct xlog_tx_cursor *tx_cursor, + const char ***data, const char **end) +{ + if (ibuf_used(&tx_cursor->rows) == 0) + return 1; + *data = (const char **)&tx_cursor->rows.rpos; + *end = (const char *)tx_cursor->rows.wpos; return 0; } @@ -1908,7 +1919,10 @@ xlog_cursor_next_tx(struct xlog_cursor *i) { int rc; assert(xlog_cursor_is_open(i)); - + if (i->state == XLOG_CURSOR_TX) { + i->state = XLOG_CURSOR_ACTIVE; + xlog_tx_cursor_destroy(&i->tx_cursor); + } /* load at least magic to check eof */ rc = xlog_cursor_ensure(i, sizeof(log_magic_t)); if (rc < 0) @@ -1961,15 +1975,20 @@ xlog_cursor_next_row(struct xlog_cursor *cursor, struc assert(xlog_cursor_is_open(cursor)); if (cursor->state != XLOG_CURSOR_TX) return 1; - int rc = xlog_tx_cursor_next_row(&cursor->tx_cursor, xrow); - if (rc != 0) { - cursor->state = XLOG_CURSOR_ACTIVE; - xlog_tx_cursor_destroy(&cursor->tx_cursor); - } - return rc; + return xlog_tx_cursor_next_row(&cursor->tx_cursor, xrow); } int +xlog_cursor_next_row_raw(struct xlog_cursor *cursor, + const char ***data, const char **end) +{ + assert(xlog_cursor_is_open(cursor)); + if (cursor->state != XLOG_CURSOR_TX) + return 1; + return xlog_tx_cursor_next_row_raw(&cursor->tx_cursor, data, end); +} + +int xlog_cursor_next(struct xlog_cursor *cursor, struct xrow_header *xrow, bool force_recovery) { blob - 1cc2934611a9a4cdeb9fd2e5730b613b888a1959 blob + fbfcbd838dd142adce893e661c7c4b95be348e9f --- src/box/xlog.h +++ src/box/xlog.h @@ -595,12 +595,31 @@ xlog_tx_cursor_destroy(struct xlog_tx_cursor *tx_curso * Fetch next xrow from xlog tx cursor * * @retval 0 for Ok + * @retval 1 if current tx is done * @retval -1 for error */ int xlog_tx_cursor_next_row(struct xlog_tx_cursor *tx_cursor, struct xrow_header *xrow); /** + * Fetch next xrow from current xlog tx cursor. + * + * This function is similar to xlog_tx_cursor_next_row() except it doesn't + * parse the xrow nor does it advance the data pointer. + * + * @param cursor cursor + * @param[out] data pointer to the position in the internal buffer where + * the next xrow is stored + * @param[out] end end of the buffer + * + * @retval 0 for Ok + * @retval 1 if current tx is done + */ +int +xlog_tx_cursor_next_row_raw(struct xlog_tx_cursor *cursor, + const char ***data, const char **end); + +/** * Return current tx cursor position * * @param tx_cursor tx_cursor @@ -758,6 +777,24 @@ int xlog_cursor_next_row(struct xlog_cursor *cursor, struct xrow_header *xrow); /** + * Fetch next xrow from current xlog tx. + * + * This function is similar to xlog_cursor_next_row() except it doesn't + * parse the xrow nor does it advance the data pointer. + * + * @param cursor cursor + * @param[out] data pointer to the position in the internal buffer where + * the next xrow is stored + * @param[out] end end of the buffer + * + * @retval 0 for Ok + * @retval 1 if current tx is done + */ +int +xlog_cursor_next_row_raw(struct xlog_cursor *cursor, + const char ***data, const char **end); + +/** * Fetch next row from cursor, ignores xlog tx boundary, * open a next one tx if current is done. * blob - 65469b7355198a35e22845d98ee84dc7fa703186 blob + 140dc7b365b0e06e287773553086811bfb255770 --- src/box/xrow.c +++ src/box/xrow.c @@ -280,9 +280,30 @@ xrow_header_encode(const struct xrow_header *header, u /* Header */ char *d = data + 1; /* Skip 1 byte for MP_MAP */ int map_size = 0; + + ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_KEY, { + d = mp_encode_bool(d, true); + d = mp_encode_uint(d, 1); + map_size++; + }); + ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_VALUE, { + d = mp_encode_uint(d, IPROTO_KEY); + d = mp_encode_uint(d, 1); + map_size++; + }); + ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, { + d = mp_encode_uint(d, 666); + d = mp_encode_uint(d, 1); + map_size++; + }); + + uint32_t type = header->type; + ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_TYPE, { + type = 777; + }); if (true) { d = mp_encode_uint(d, IPROTO_REQUEST_TYPE); - d = mp_encode_uint(d, header->type); + d = mp_encode_uint(d, type); map_size++; } @@ -359,6 +380,12 @@ xrow_header_encode(const struct xrow_header *header, u } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); + ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_HEADER, { + mp_encode_array(data, 0); + }); + ERROR_INJECT(ERRINJ_XLOG_WRITE_CORRUPTED_HEADER, { + *data = 0xc1; + }); out->iov_len = d - (char *) out->iov_base; out++; @@ -1091,6 +1118,21 @@ xrow_encode_dml(const struct request *request, struct char *begin = xregion_alloc(region, len); char *pos = begin + 1; /* skip 1 byte for MP_MAP */ int map_size = 0; + ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_KEY, { + pos = mp_encode_bool(pos, true); + pos = mp_encode_uint(pos, 2); + map_size++; + }); + ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_VALUE, { + pos = mp_encode_uint(pos, IPROTO_KEY); + pos = mp_encode_uint(pos, 2); + map_size++; + }); + ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, { + pos = mp_encode_uint(pos, 666); + pos = mp_encode_uint(pos, 2); + map_size++; + }); if (request->space_id) { pos = mp_encode_uint(pos, IPROTO_SPACE_ID); pos = mp_encode_uint(pos, request->space_id); @@ -1150,6 +1192,12 @@ xrow_encode_dml(const struct request *request, struct assert(pos <= begin + len); mp_encode_map(begin, map_size); + ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_BODY, { + mp_encode_array(begin, 0); + }); + ERROR_INJECT(ERRINJ_XLOG_WRITE_CORRUPTED_BODY, { + *begin = 0xc1; + }); iov[0].iov_base = begin; iov[0].iov_len = pos - begin; *iovcnt = 1; blob - f54174b9889dbaf7aa6e817b7017a61427fe9f28 blob + 6921bfe50126fed47156739ef3028d95bb2b4c1e --- src/lib/core/errinj.h +++ src/lib/core/errinj.h @@ -187,6 +187,14 @@ struct errinj { _(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_XLOG_READ, ERRINJ_INT, {.iparam = -1}) \ _(ERRINJ_XLOG_RENAME_DELAY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_CORRUPTED_BODY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_CORRUPTED_HEADER, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_INVALID_BODY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_INVALID_HEADER, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_INVALID_KEY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_INVALID_VALUE, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_XLOG_WRITE_UNKNOWN_TYPE, ERRINJ_BOOL, {.bparam = false}) \ ENUM0(errinj_id, ERRINJ_LIST); extern struct errinj errinjs[]; blob - f0bab0577999f8b0f8c1b43e7caff68902ecd9f8 blob + 4335468707e5d88808860b67a0c0b99b0b41faa5 --- test/box/errinj.result +++ test/box/errinj.result @@ -161,6 +161,14 @@ evals - ERRINJ_XLOG_META: false - ERRINJ_XLOG_READ: -1 - ERRINJ_XLOG_RENAME_DELAY: false + - ERRINJ_XLOG_WRITE_CORRUPTED_BODY: false + - ERRINJ_XLOG_WRITE_CORRUPTED_HEADER: false + - ERRINJ_XLOG_WRITE_INVALID_BODY: false + - ERRINJ_XLOG_WRITE_INVALID_HEADER: false + - ERRINJ_XLOG_WRITE_INVALID_KEY: false + - ERRINJ_XLOG_WRITE_INVALID_VALUE: false + - ERRINJ_XLOG_WRITE_UNKNOWN_KEY: false + - ERRINJ_XLOG_WRITE_UNKNOWN_TYPE: false ... errinj.set("some-injection", true) --- blob - /dev/null blob + bc6ea35322b9289febc8500e0ba83541eb0bda41 (mode 644) --- /dev/null +++ test/box-luatest/xlog_reader_test.lua @@ -0,0 +1,152 @@ +local fio = require('fio') +local server = require('luatest.server') +local t = require('luatest') +local xlog = require('xlog') + +local g = t.group() + +g.before_all(function(cg) + cg.server = server:new() + cg.server:start() +end) + +g.after_all(function(cg) + cg.server:drop() +end) + +g.after_test('test_bad_xlog', function(cg) + cg.server:exec(function() + if box.space.test then + box.space.test:drop() + end + box.snapshot() + end) +end) + +g.test_bad_xlog = function(cg) + t.tarantool.skip_if_not_debug() + local info = cg.server:exec(function() + local s = box.schema.create_space('test') + s:create_index('primary', {parts = {1, 'string'}}) + box.snapshot() + local lsn = box.info.lsn + local function test(errinj) + box.error.injection.set(errinj, true) + local ok = pcall(s.insert, s, {errinj}) + box.error.injection.set(errinj, false) + t.assert(ok) + end + box.space.test:insert({'BEGIN'}) + test('ERRINJ_XLOG_WRITE_CORRUPTED_HEADER') + test('ERRINJ_XLOG_WRITE_INVALID_HEADER') + test('ERRINJ_XLOG_WRITE_CORRUPTED_BODY') + test('ERRINJ_XLOG_WRITE_INVALID_BODY') + test('ERRINJ_XLOG_WRITE_INVALID_KEY') + test('ERRINJ_XLOG_WRITE_INVALID_VALUE') + test('ERRINJ_XLOG_WRITE_UNKNOWN_KEY') + test('ERRINJ_XLOG_WRITE_UNKNOWN_TYPE') + box.space.test:insert({'END'}) + return { + replica_id = box.info.id, + lsn = lsn, + space_id = box.space.test.id + } + end) + local path = fio.pathjoin(cg.server.workdir, + string.format('%020d.xlog', info.lsn)) + local result = {} + for _, row in xlog.pairs(path) do + if type(row.HEADER) == 'table' then + if type(row.HEADER.timestamp) == 'number' then + row.HEADER.timestamp = '' + end + end + table.insert(result, row) + end + t.assert_equals(result, { + { + HEADER = { + type = 'INSERT', + replica_id = info.replica_id, + lsn = info.lsn + 1, + timestamp = '', + }, + BODY = { + space_id = info.space_id, + tuple = {'BEGIN'}, + }, + }, + -- Row with a corrupted header is skipped. + -- Row with an invalid header is skipped. + -- Row with a corrupted body is skipped. + -- Row with an invalid body is skipped. + { + -- Invalid keys are ignored. + HEADER = { + type = 'INSERT', + replica_id = info.replica_id, + lsn = info.lsn + 6, + timestamp = '', + }, + BODY = { + space_id = info.space_id, + tuple = {'ERRINJ_XLOG_WRITE_INVALID_KEY'}, + }, + }, + { + -- Invalid values are dumped as is. + HEADER = { + type = 'INSERT', + replica_id = info.replica_id, + lsn = info.lsn + 7, + timestamp = '', + key = 1, + }, + BODY = { + space_id = info.space_id, + tuple = {'ERRINJ_XLOG_WRITE_INVALID_VALUE'}, + key = 2, + }, + }, + { + -- Unknown keys are dumped as is. + HEADER = { + type = 'INSERT', + replica_id = info.replica_id, + lsn = info.lsn + 8, + timestamp = '', + [666] = 1, + }, + BODY = { + space_id = info.space_id, + tuple = {'ERRINJ_XLOG_WRITE_UNKNOWN_KEY'}, + [666] = 2, + }, + }, + { + -- Unknown type is dumped as is. + HEADER = { + type = 777, + replica_id = info.replica_id, + lsn = info.lsn + 9, + timestamp = '', + }, + BODY = { + [box.iproto.key.SPACE_ID] = info.space_id, + [box.iproto.key.TUPLE] = {'ERRINJ_XLOG_WRITE_UNKNOWN_TYPE'}, + }, + }, + { + HEADER = { + type = 'INSERT', + replica_id = info.replica_id, + lsn = info.lsn + 10, + timestamp = '', + }, + BODY = { + space_id = info.space_id, + tuple = {'END'}, + }, + }, + }) +end