commit - f886d9069653e47f83d7164e802326cf99cc9e26
commit + 803cfffc3f36bd570fcd836d87ef179b486f526c
blob - /dev/null
blob + 36699bd478ce44d567365f0148a043f69d65c7f7 (mode 644)
--- /dev/null
+++ changelogs/unreleased/xlog-reader-show-unknown-header-fields.md
+## bugfix/lua
+
+* Fixed the xlog reader Lua module to show unknown row header fields. Before
+ this change the xlog reader silently skipped them.
blob - 971a26afedf9aa4c7a86009797b09f6f15564544
blob + 493f98b8f9e9c96b7a16a8a0bc8d8600e6e2626a
--- src/box/lua/xlog.c
+++ src/box/lua/xlog.c
#include <box/error.h>
#include <box/xlog.h>
-#include <box/xrow.h>
#include <box/iproto_constants.h>
#include <box/tuple.h>
#include <box/lua/tuple.h>
luaL_pushresult(&b);
}
+/**
+ * Helper function for lbox_xlog_parse_body that parses one key value pair
+ * and adds it to the result table. The MsgPack data must be checked.
+ */
static void
-lbox_xlog_parse_body_kv(struct lua_State *L, int type, const char **beg, const char *end)
+lbox_xlog_parse_body_kv(struct lua_State *L, int type, const char **beg)
{
- if (mp_typeof(**beg) != MP_UINT)
- luaL_error(L, "Broken type of body key");
+ if (mp_typeof(**beg) != MP_UINT) {
+ /* Invalid key type - ignore. */
+ mp_next(beg);
+ mp_next(beg);
+ return;
+ }
uint32_t v = mp_decode_uint(beg);
if (iproto_type_is_dml(type) && iproto_key_name(v)) {
lbox_xlog_pushkey(L, iproto_key_name(v));
/*
* Push Lua objects
*/
- const char *tmp = *beg;
- if (mp_check(&tmp, end) != 0) {
- lua_pushstring(L, "<invalid msgpack>");
- } else {
- luamp_decode(L, luaL_msgpack_default, beg);
- }
+ luamp_decode(L, luaL_msgpack_default, beg);
}
lua_settable(L, -3);
}
-static int
-lbox_xlog_parse_body(struct lua_State *L, int type, const char *ptr, size_t len)
+/**
+ * Parses a request body and pushes it to the Lua stack.
+ * The MsgPack data must be checked and represent a map.
+ */
+static void
+lbox_xlog_parse_body(struct lua_State *L, int type, const char **beg)
{
- const char **beg = &ptr;
- const char *end = ptr + len;
- if (mp_typeof(**beg) != MP_MAP)
- return -1;
+ lua_newtable(L);
uint32_t size = mp_decode_map(beg);
- uint32_t i;
- for (i = 0; i < size && *beg < end; i++)
- lbox_xlog_parse_body_kv(L, type, beg, end);
- if (i != size)
- say_warn("warning: decoded %u values from"
- " MP_MAP, %u expected", i, size);
+ for (uint32_t i = 0; i < size; i++)
+ lbox_xlog_parse_body_kv(L, type, beg);
+}
+
+/**
+ * Parses a row and pushes it along with its LSN to the Lua stack.
+ * On success returns the number of values pushed (> 0). On EOF returns 0.
+ */
+static int
+lbox_xlog_parse_row(struct lua_State *L, const char **pos, const char *end)
+{
+ int top = lua_gettop(L);
+ const char *tmp = *pos;
+ if (mp_check(&tmp, end) != 0 || mp_typeof(**pos) != MP_MAP)
+ goto bad_row;
+ /*
+ * Sic: The nrec argument of lua_createtable is chosen so that the
+ * output looks pretty when encoded in YAML.
+ */
+ lua_createtable(L, 0, 8);
+ lua_pushliteral(L, "HEADER");
+ lua_createtable(L, 0, 8);
+ uint64_t type = 0;
+ uint64_t tsn = 0;
+ uint64_t lsn = 0;
+ bool has_tsn = false;
+ bool is_commit = false;
+ uint32_t size = mp_decode_map(pos);
+ for (uint32_t i = 0; i < size; i++) {
+ if (mp_typeof(**pos) != MP_UINT) {
+ /* Invalid key type - ignore. */
+ mp_next(pos);
+ mp_next(pos);
+ continue;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ const char *key_name = iproto_key_name(key);
+ if (key < IPROTO_KEY_MAX &&
+ mp_typeof(**pos) != iproto_key_type[key]) {
+ /* Bad value type - dump as is. */
+ goto dump;
+ }
+ switch (key) {
+ case IPROTO_REQUEST_TYPE: {
+ type = mp_decode_uint(pos);
+ lua_pushliteral(L, "type");
+ const char *type_name = iproto_type_name(type);
+ if (type_name != NULL)
+ lua_pushstring(L, type_name);
+ else
+ luaL_pushuint64(L, type);
+ lua_settable(L, -3);
+ continue;
+ }
+ case IPROTO_FLAGS: {
+ /* We're only interested in the commit flag. */
+ uint64_t flags = mp_decode_uint(pos);
+ if ((flags & IPROTO_FLAG_COMMIT) != 0)
+ is_commit = true;
+ continue;
+ }
+ case IPROTO_TSN:
+ /*
+ * TSN is encoded as diff so we dump it after we finish
+ * parsing the header.
+ */
+ tsn = mp_decode_uint(pos);
+ has_tsn = true;
+ continue;
+ case IPROTO_LSN:
+ /* Remember LSN to calculate TSN later. */
+ tmp = *pos;
+ lsn = mp_decode_uint(&tmp);
+ break;
+ default:
+ break;
+ }
+dump:
+ if (key_name != NULL)
+ lbox_xlog_pushkey(L, key_name);
+ else
+ luaL_pushuint64(L, key);
+ luamp_decode(L, luaL_msgpack_default, pos);
+ lua_settable(L, -3);
+ }
+ /* The commit flag isn't set for single-statement transactions. */
+ if (!has_tsn)
+ is_commit = true;
+ tsn = lsn - tsn;
+ /* Show TSN and commit flag only for multi-statement transactions. */
+ if (tsn != lsn || !is_commit) {
+ lua_pushliteral(L, "tsn");
+ luaL_pushuint64(L, tsn);
+ lua_settable(L, -3);
+ }
+ if (is_commit && tsn != lsn) {
+ lua_pushliteral(L, "commit");
+ lua_pushboolean(L, true);
+ lua_settable(L, -3);
+ }
+ lua_settable(L, -3); /* HEADER */
+ if (*pos < end && type != IPROTO_NOP) {
+ tmp = *pos;
+ if (mp_check(&tmp, end) != 0 || mp_typeof(**pos) != MP_MAP)
+ goto bad_row;
+ lua_pushliteral(L, "BODY");
+ lbox_xlog_parse_body(L, type, pos);
+ lua_settable(L, -3); /* BODY */
+ }
+ luaL_pushuint64(L, lsn);
+ lua_insert(L, -2);
+ return 2;
+bad_row:
+ /* Silently assume EOF on bad row. */
+ lua_settop(L, top);
return 0;
}
{
struct xlog_cursor *cur = lbox_checkcursor(L, 1, "xlog:pairs()");
- struct xrow_header row;
int rc = 0;
/* skip all bad read requests */
while (true) {
- rc = xlog_cursor_next_row(cur, &row);
- if (rc == 0)
- break;
- if (rc < 0) {
- struct error *e = diag_last_error(diag_get());
- if (e->type != &type_XlogError)
- luaT_error(L);
+ const char **data;
+ const char *end;
+ rc = xlog_cursor_next_row_raw(cur, &data, &end);
+ assert(rc >= 0);
+ if (rc == 0) {
+ rc = lbox_xlog_parse_row(L, data, end);
+ if (rc > 0)
+ return rc;
}
while ((rc = xlog_cursor_next_tx(cur)) < 0) {
struct error *e = diag_last_error(diag_get());
if (rc == 1)
break;
}
- if (rc == 1)
- return 0; /* EOF */
- assert(rc == 0);
-
- lua_pushinteger(L, row.lsn);
- lua_createtable(L, 0, 8);
- lua_pushstring(L, "HEADER");
-
- lua_createtable(L, 0, 8);
- lua_pushstring(L, iproto_key_name(IPROTO_REQUEST_TYPE));
- const char *typename = iproto_type_name(row.type);
- if (typename != NULL) {
- lua_pushstring(L, typename);
- } else {
- lua_pushnumber(L, row.type); /* unknown key */
- }
- lua_settable(L, -3); /* type */
- if (row.sync != 0) {
- lbox_xlog_pushkey(L, iproto_key_name(IPROTO_SYNC));
- lua_pushinteger(L, row.sync);
- lua_settable(L, -3); /* sync */
- }
- if (row.lsn != 0) {
- lbox_xlog_pushkey(L, iproto_key_name(IPROTO_LSN));
- lua_pushinteger(L, row.lsn);
- lua_settable(L, -3); /* lsn */
- }
- if (row.replica_id != 0) {
- lbox_xlog_pushkey(L, iproto_key_name(IPROTO_REPLICA_ID));
- lua_pushinteger(L, row.replica_id);
- lua_settable(L, -3); /* replica_id */
- }
- if (row.group_id != 0) {
- lbox_xlog_pushkey(L, iproto_key_name(IPROTO_GROUP_ID));
- lua_pushinteger(L, row.group_id);
- lua_settable(L, -3); /* group_id */
- }
- if (row.tm != 0) {
- lbox_xlog_pushkey(L, iproto_key_name(IPROTO_TIMESTAMP));
- lua_pushnumber(L, row.tm);
- lua_settable(L, -3); /* timestamp */
- }
- if (row.tsn != row.lsn || !row.is_commit) {
- lua_pushstring(L, "tsn");
- lua_pushnumber(L, row.tsn);
- lua_settable(L, -3); /* transaction identifier */
- }
- if (row.is_commit && row.tsn != row.lsn) {
- lua_pushstring(L, "commit");
- lua_pushboolean(L, true);
- /*
- * is_commit, set for last row in multi-statement
- * transaction
- */
- lua_settable(L, -3);
- }
-
- lua_settable(L, -3); /* HEADER */
-
- if (row.bodycnt > 0) {
- assert(row.bodycnt == 1);
- lua_pushstring(L, "BODY");
- lua_newtable(L);
- lbox_xlog_parse_body(L, row.type, row.body[0].iov_base,
- row.body[0].iov_len);
- lua_settable(L, -3); /* BODY */
- }
- return 2;
+ return 0;
}
/* }}} */
blob - 08ce2c76360cdf7c9fddec74a02b1ee01f6fd7dc
blob + e70688e37b0eff6d0f09a9122095a7a1d1d5d3fe
--- src/box/xlog.c
+++ src/box/xlog.c
ibuf_reset(&tx_cursor->rows);
return -1;
}
+
+ return 0;
+}
+int
+xlog_tx_cursor_next_row_raw(struct xlog_tx_cursor *tx_cursor,
+ const char ***data, const char **end)
+{
+ if (ibuf_used(&tx_cursor->rows) == 0)
+ return 1;
+ *data = (const char **)&tx_cursor->rows.rpos;
+ *end = (const char *)tx_cursor->rows.wpos;
return 0;
}
{
int rc;
assert(xlog_cursor_is_open(i));
-
+ if (i->state == XLOG_CURSOR_TX) {
+ i->state = XLOG_CURSOR_ACTIVE;
+ xlog_tx_cursor_destroy(&i->tx_cursor);
+ }
/* load at least magic to check eof */
rc = xlog_cursor_ensure(i, sizeof(log_magic_t));
if (rc < 0)
assert(xlog_cursor_is_open(cursor));
if (cursor->state != XLOG_CURSOR_TX)
return 1;
- int rc = xlog_tx_cursor_next_row(&cursor->tx_cursor, xrow);
- if (rc != 0) {
- cursor->state = XLOG_CURSOR_ACTIVE;
- xlog_tx_cursor_destroy(&cursor->tx_cursor);
- }
- return rc;
+ return xlog_tx_cursor_next_row(&cursor->tx_cursor, xrow);
}
int
+xlog_cursor_next_row_raw(struct xlog_cursor *cursor,
+ const char ***data, const char **end)
+{
+ assert(xlog_cursor_is_open(cursor));
+ if (cursor->state != XLOG_CURSOR_TX)
+ return 1;
+ return xlog_tx_cursor_next_row_raw(&cursor->tx_cursor, data, end);
+}
+
+int
xlog_cursor_next(struct xlog_cursor *cursor,
struct xrow_header *xrow, bool force_recovery)
{
blob - 1d7f564aba755de68c554483118be1d57aef19d0
blob + 21e644d1585f793ddd9f4f10b2df02904bab01e3
--- src/box/xlog.h
+++ src/box/xlog.h
* Fetch next xrow from xlog tx cursor
*
* @retval 0 for Ok
+ * @retval 1 if current tx is done
* @retval -1 for error
*/
int
xlog_tx_cursor_next_row(struct xlog_tx_cursor *tx_cursor, struct xrow_header *xrow);
/**
+ * Fetch next xrow from current xlog tx cursor.
+ *
+ * This function is similar to xlog_tx_cursor_next_row() except it doesn't
+ * parse the xrow nor does it advance the data pointer.
+ *
+ * @param cursor cursor
+ * @param[out] data pointer to the position in the internal buffer where
+ * the next xrow is stored
+ * @param[out] end end of the buffer
+ *
+ * @retval 0 for Ok
+ * @retval 1 if current tx is done
+ */
+int
+xlog_tx_cursor_next_row_raw(struct xlog_tx_cursor *cursor,
+ const char ***data, const char **end);
+
+/**
* Return current tx cursor position
*
* @param tx_cursor tx_cursor
xlog_cursor_next_row(struct xlog_cursor *cursor, struct xrow_header *xrow);
/**
+ * Fetch next xrow from current xlog tx.
+ *
+ * This function is similar to xlog_cursor_next_row() except it doesn't
+ * parse the xrow nor does it advance the data pointer.
+ *
+ * @param cursor cursor
+ * @param[out] data pointer to the position in the internal buffer where
+ * the next xrow is stored
+ * @param[out] end end of the buffer
+ *
+ * @retval 0 for Ok
+ * @retval 1 if current tx is done
+ */
+int
+xlog_cursor_next_row_raw(struct xlog_cursor *cursor,
+ const char ***data, const char **end);
+
+/**
* Fetch next row from cursor, ignores xlog tx boundary,
* open a next one tx if current is done.
*
blob - 87fbe72e09cfefeb4fe4e22d8a415eeb0110288b
blob + 3bfb01d5a133cdfc7f2fae8aa7787986f342fc33
--- src/box/xrow.c
+++ src/box/xrow.c
/* Header */
char *d = data + 1; /* Skip 1 byte for MP_MAP */
int map_size = 0;
+
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_KEY, {
+ d = mp_encode_bool(d, true);
+ d = mp_encode_uint(d, 1);
+ map_size++;
+ });
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_VALUE, {
+ d = mp_encode_uint(d, IPROTO_KEY);
+ d = mp_encode_uint(d, 1);
+ map_size++;
+ });
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, {
+ d = mp_encode_uint(d, 666);
+ d = mp_encode_uint(d, 1);
+ map_size++;
+ });
+
+ uint32_t type = header->type;
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_TYPE, {
+ type = 777;
+ });
if (true) {
d = mp_encode_uint(d, IPROTO_REQUEST_TYPE);
- d = mp_encode_uint(d, header->type);
+ d = mp_encode_uint(d, type);
map_size++;
}
}
assert(d <= data + XROW_HEADER_LEN_MAX);
mp_encode_map(data, map_size);
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_HEADER, {
+ mp_encode_array(data, 0);
+ });
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_CORRUPTED_HEADER, {
+ *data = 0xc1;
+ });
out->iov_len = d - (char *) out->iov_base;
out++;
}
char *pos = begin + 1; /* skip 1 byte for MP_MAP */
int map_size = 0;
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_KEY, {
+ pos = mp_encode_bool(pos, true);
+ pos = mp_encode_uint(pos, 2);
+ map_size++;
+ });
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_VALUE, {
+ pos = mp_encode_uint(pos, IPROTO_KEY);
+ pos = mp_encode_uint(pos, 2);
+ map_size++;
+ });
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, {
+ pos = mp_encode_uint(pos, 666);
+ pos = mp_encode_uint(pos, 2);
+ map_size++;
+ });
if (request->space_id) {
pos = mp_encode_uint(pos, IPROTO_SPACE_ID);
pos = mp_encode_uint(pos, request->space_id);
assert(pos <= begin + len);
mp_encode_map(begin, map_size);
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_BODY, {
+ mp_encode_array(begin, 0);
+ });
+ ERROR_INJECT(ERRINJ_XLOG_WRITE_CORRUPTED_BODY, {
+ *begin = 0xc1;
+ });
iov[0].iov_base = begin;
iov[0].iov_len = pos - begin;
blob - 6c7d2736e3ac15cccb8d09be3906de223e870f67
blob + 1c177a42f76989f012a4c20bfb371f6a2c7ce1a3
--- src/lib/core/errinj.h
+++ src/lib/core/errinj.h
_(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_XLOG_READ, ERRINJ_INT, {.iparam = -1}) \
_(ERRINJ_XLOG_RENAME_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_CORRUPTED_BODY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_CORRUPTED_HEADER, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_INVALID_BODY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_INVALID_HEADER, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_INVALID_KEY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_INVALID_VALUE, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_XLOG_WRITE_UNKNOWN_TYPE, ERRINJ_BOOL, {.bparam = false}) \
ENUM0(errinj_id, ERRINJ_LIST);
extern struct errinj errinjs[];
blob - 8dccaf6d04e63b42c7be309b7cbd74baadfa659b
blob + bdb99a438f1a0ef0b802cfc4781617674b896d02
--- test/box/errinj.result
+++ test/box/errinj.result
- ERRINJ_XLOG_META: false
- ERRINJ_XLOG_READ: -1
- ERRINJ_XLOG_RENAME_DELAY: false
+ - ERRINJ_XLOG_WRITE_CORRUPTED_BODY: false
+ - ERRINJ_XLOG_WRITE_CORRUPTED_HEADER: false
+ - ERRINJ_XLOG_WRITE_INVALID_BODY: false
+ - ERRINJ_XLOG_WRITE_INVALID_HEADER: false
+ - ERRINJ_XLOG_WRITE_INVALID_KEY: false
+ - ERRINJ_XLOG_WRITE_INVALID_VALUE: false
+ - ERRINJ_XLOG_WRITE_UNKNOWN_KEY: false
+ - ERRINJ_XLOG_WRITE_UNKNOWN_TYPE: false
...
errinj.set("some-injection", true)
---
blob - /dev/null
blob + fcf0d61ce976360de1a3d319f0a113991fd9f603 (mode 644)
--- /dev/null
+++ test/box-luatest/xlog_reader_test.lua
+local fio = require('fio')
+local server = require('luatest.server')
+local t = require('luatest')
+local xlog = require('xlog')
+
+local g = t.group()
+
+local IPROTO_SPACE_ID = 16
+local IPROTO_TUPLE = 33
+
+g.before_all(function(cg)
+ cg.server = server:new()
+ cg.server:start()
+end)
+
+g.after_all(function(cg)
+ cg.server:drop()
+end)
+
+g.after_test('test_bad_xlog', function(cg)
+ cg.server:exec(function()
+ if box.space.test then
+ box.space.test:drop()
+ end
+ box.snapshot()
+ end)
+end)
+
+g.test_bad_xlog = function(cg)
+ t.tarantool.skip_if_not_debug()
+ local info = cg.server:exec(function()
+ local s = box.schema.create_space('test')
+ s:create_index('primary', {parts = {1, 'string'}})
+ box.snapshot()
+ local lsn = box.info.lsn
+ local function test(errinj)
+ box.error.injection.set(errinj, true)
+ local ok = pcall(s.insert, s, {errinj})
+ box.error.injection.set(errinj, false)
+ t.assert(ok)
+ end
+ box.space.test:insert({'BEGIN'})
+ test('ERRINJ_XLOG_WRITE_CORRUPTED_HEADER')
+ test('ERRINJ_XLOG_WRITE_INVALID_HEADER')
+ test('ERRINJ_XLOG_WRITE_CORRUPTED_BODY')
+ test('ERRINJ_XLOG_WRITE_INVALID_BODY')
+ test('ERRINJ_XLOG_WRITE_INVALID_KEY')
+ test('ERRINJ_XLOG_WRITE_INVALID_VALUE')
+ test('ERRINJ_XLOG_WRITE_UNKNOWN_KEY')
+ test('ERRINJ_XLOG_WRITE_UNKNOWN_TYPE')
+ box.space.test:insert({'END'})
+ return {
+ replica_id = box.info.id,
+ lsn = lsn,
+ space_id = box.space.test.id
+ }
+ end)
+ local path = fio.pathjoin(cg.server.workdir,
+ string.format('%020d.xlog', info.lsn))
+ local result = {}
+ for _, row in xlog.pairs(path) do
+ if type(row.HEADER) == 'table' then
+ if type(row.HEADER.timestamp) == 'number' then
+ row.HEADER.timestamp = '<timestamp>'
+ end
+ end
+ table.insert(result, row)
+ end
+ t.assert_equals(result, {
+ {
+ HEADER = {
+ type = 'INSERT',
+ replica_id = info.replica_id,
+ lsn = info.lsn + 1,
+ timestamp = '<timestamp>',
+ },
+ BODY = {
+ space_id = info.space_id,
+ tuple = {'BEGIN'},
+ },
+ },
+ -- Row with a corrupted header is skipped.
+ -- Row with an invalid header is skipped.
+ -- Row with a corrupted body is skipped.
+ -- Row with an invalid body is skipped.
+ {
+ -- Invalid keys are ignored.
+ HEADER = {
+ type = 'INSERT',
+ replica_id = info.replica_id,
+ lsn = info.lsn + 6,
+ timestamp = '<timestamp>',
+ },
+ BODY = {
+ space_id = info.space_id,
+ tuple = {'ERRINJ_XLOG_WRITE_INVALID_KEY'},
+ },
+ },
+ {
+ -- Invalid values are dumped as is.
+ HEADER = {
+ type = 'INSERT',
+ replica_id = info.replica_id,
+ lsn = info.lsn + 7,
+ timestamp = '<timestamp>',
+ key = 1,
+ },
+ BODY = {
+ space_id = info.space_id,
+ tuple = {'ERRINJ_XLOG_WRITE_INVALID_VALUE'},
+ key = 2,
+ },
+ },
+ {
+ -- Unknown keys are dumped as is.
+ HEADER = {
+ type = 'INSERT',
+ replica_id = info.replica_id,
+ lsn = info.lsn + 8,
+ timestamp = '<timestamp>',
+ [666] = 1,
+ },
+ BODY = {
+ space_id = info.space_id,
+ tuple = {'ERRINJ_XLOG_WRITE_UNKNOWN_KEY'},
+ [666] = 2,
+ },
+ },
+ {
+ -- Unknown type is dumped as is.
+ HEADER = {
+ type = 777,
+ replica_id = info.replica_id,
+ lsn = info.lsn + 9,
+ timestamp = '<timestamp>',
+ },
+ BODY = {
+ [IPROTO_SPACE_ID] = info.space_id,
+ [IPROTO_TUPLE] = {'ERRINJ_XLOG_WRITE_UNKNOWN_TYPE'},
+ },
+ },
+ {
+ HEADER = {
+ type = 'INSERT',
+ replica_id = info.replica_id,
+ lsn = info.lsn + 10,
+ timestamp = '<timestamp>',
+ },
+ BODY = {
+ space_id = info.space_id,
+ tuple = {'END'},
+ },
+ },
+ })
+end