Commit Diff


commit - f886d9069653e47f83d7164e802326cf99cc9e26
commit + 803cfffc3f36bd570fcd836d87ef179b486f526c
blob - /dev/null
blob + 36699bd478ce44d567365f0148a043f69d65c7f7 (mode 644)
--- /dev/null
+++ changelogs/unreleased/xlog-reader-show-unknown-header-fields.md
@@ -0,0 +1,4 @@
+## bugfix/lua
+
+* Fixed the xlog reader Lua module to show unknown row header fields. Before
+  this change the xlog reader silently skipped them.
blob - 971a26afedf9aa4c7a86009797b09f6f15564544
blob + 493f98b8f9e9c96b7a16a8a0bc8d8600e6e2626a
--- src/box/lua/xlog.c
+++ src/box/lua/xlog.c
@@ -37,7 +37,6 @@
 
 #include <box/error.h>
 #include <box/xlog.h>
-#include <box/xrow.h>
 #include <box/iproto_constants.h>
 #include <box/tuple.h>
 #include <box/lua/tuple.h>
@@ -89,11 +88,19 @@ lbox_xlog_pushkey(lua_State *L, const char *key)
 	luaL_pushresult(&b);
 }
 
+/**
+ * Helper function for lbox_xlog_parse_body that parses one key value pair
+ * and adds it to the result table. The MsgPack data must be checked.
+ */
 static void
-lbox_xlog_parse_body_kv(struct lua_State *L, int type, const char **beg, const char *end)
+lbox_xlog_parse_body_kv(struct lua_State *L, int type, const char **beg)
 {
-	if (mp_typeof(**beg) != MP_UINT)
-		luaL_error(L, "Broken type of body key");
+	if (mp_typeof(**beg) != MP_UINT) {
+		/* Invalid key type - ignore. */
+		mp_next(beg);
+		mp_next(beg);
+		return;
+	}
 	uint32_t v = mp_decode_uint(beg);
 	if (iproto_type_is_dml(type) && iproto_key_name(v)) {
 		lbox_xlog_pushkey(L, iproto_key_name(v));
@@ -123,30 +130,135 @@ lbox_xlog_parse_body_kv(struct lua_State *L, int type,
 		/*
 		 * Push Lua objects
 		 */
-		const char *tmp = *beg;
-		if (mp_check(&tmp, end) != 0) {
-			lua_pushstring(L, "<invalid msgpack>");
-		} else {
-			luamp_decode(L, luaL_msgpack_default, beg);
-		}
+		luamp_decode(L, luaL_msgpack_default, beg);
 	}
 	lua_settable(L, -3);
 }
 
-static int
-lbox_xlog_parse_body(struct lua_State *L, int type, const char *ptr, size_t len)
+/**
+ * Parses a request body and pushes it to the Lua stack.
+ * The MsgPack data must be checked and represent a map.
+ */
+static void
+lbox_xlog_parse_body(struct lua_State *L, int type, const char **beg)
 {
-	const char **beg = &ptr;
-	const char *end = ptr + len;
-	if (mp_typeof(**beg) != MP_MAP)
-		return -1;
+	lua_newtable(L);
 	uint32_t size = mp_decode_map(beg);
-	uint32_t i;
-	for (i = 0; i < size && *beg < end; i++)
-		lbox_xlog_parse_body_kv(L, type, beg, end);
-	if (i != size)
-		say_warn("warning: decoded %u values from"
-			 " MP_MAP, %u expected", i, size);
+	for (uint32_t i = 0; i < size; i++)
+		lbox_xlog_parse_body_kv(L, type, beg);
+}
+
+/**
+ * Parses a row and pushes it along with its LSN to the Lua stack.
+ * On success returns the number of values pushed (> 0). On EOF returns 0.
+ */
+static int
+lbox_xlog_parse_row(struct lua_State *L, const char **pos, const char *end)
+{
+	int top = lua_gettop(L);
+	const char *tmp = *pos;
+	if (mp_check(&tmp, end) != 0 || mp_typeof(**pos) != MP_MAP)
+		goto bad_row;
+	/*
+	 * Sic: The nrec argument of lua_createtable is chosen so that the
+	 * output looks pretty when encoded in YAML.
+	 */
+	lua_createtable(L, 0, 8);
+	lua_pushliteral(L, "HEADER");
+	lua_createtable(L, 0, 8);
+	uint64_t type = 0;
+	uint64_t tsn = 0;
+	uint64_t lsn = 0;
+	bool has_tsn = false;
+	bool is_commit = false;
+	uint32_t size = mp_decode_map(pos);
+	for (uint32_t i = 0; i < size; i++) {
+		if (mp_typeof(**pos) != MP_UINT) {
+			/* Invalid key type - ignore. */
+			mp_next(pos);
+			mp_next(pos);
+			continue;
+		}
+		uint64_t key = mp_decode_uint(pos);
+		const char *key_name = iproto_key_name(key);
+		if (key < IPROTO_KEY_MAX &&
+		    mp_typeof(**pos) != iproto_key_type[key]) {
+			/* Bad value type - dump as is. */
+			goto dump;
+		}
+		switch (key) {
+		case IPROTO_REQUEST_TYPE: {
+			type = mp_decode_uint(pos);
+			lua_pushliteral(L, "type");
+			const char *type_name = iproto_type_name(type);
+			if (type_name != NULL)
+				lua_pushstring(L, type_name);
+			else
+				luaL_pushuint64(L, type);
+			lua_settable(L, -3);
+			continue;
+		}
+		case IPROTO_FLAGS: {
+			/* We're only interested in the commit flag. */
+			uint64_t flags = mp_decode_uint(pos);
+			if ((flags & IPROTO_FLAG_COMMIT) != 0)
+				is_commit = true;
+			continue;
+		}
+		case IPROTO_TSN:
+			/*
+			 * TSN is encoded as diff so we dump it after we finish
+			 * parsing the header.
+			 */
+			tsn = mp_decode_uint(pos);
+			has_tsn = true;
+			continue;
+		case IPROTO_LSN:
+			/* Remember LSN to calculate TSN later. */
+			tmp = *pos;
+			lsn = mp_decode_uint(&tmp);
+			break;
+		default:
+			break;
+		}
+dump:
+		if (key_name != NULL)
+			lbox_xlog_pushkey(L, key_name);
+		else
+			luaL_pushuint64(L, key);
+		luamp_decode(L, luaL_msgpack_default, pos);
+		lua_settable(L, -3);
+	}
+	/* The commit flag isn't set for single-statement transactions. */
+	if (!has_tsn)
+		is_commit = true;
+	tsn = lsn - tsn;
+	/* Show TSN and commit flag only for multi-statement transactions. */
+	if (tsn != lsn || !is_commit) {
+		lua_pushliteral(L, "tsn");
+		luaL_pushuint64(L, tsn);
+		lua_settable(L, -3);
+	}
+	if (is_commit && tsn != lsn) {
+		lua_pushliteral(L, "commit");
+		lua_pushboolean(L, true);
+		lua_settable(L, -3);
+	}
+	lua_settable(L, -3); /* HEADER */
+	if (*pos < end && type != IPROTO_NOP) {
+		tmp = *pos;
+		if (mp_check(&tmp, end) != 0 || mp_typeof(**pos) != MP_MAP)
+			goto bad_row;
+		lua_pushliteral(L, "BODY");
+		lbox_xlog_parse_body(L, type, pos);
+		lua_settable(L, -3); /* BODY */
+	}
+	luaL_pushuint64(L, lsn);
+	lua_insert(L, -2);
+	return 2;
+bad_row:
+	/* Silently assume EOF on bad row. */
+	lua_settop(L, top);
 	return 0;
 }
 
@@ -155,17 +267,17 @@ lbox_xlog_parser_iterate(struct lua_State *L)
 {
 	struct xlog_cursor *cur = lbox_checkcursor(L, 1, "xlog:pairs()");
 
-	struct xrow_header row;
 	int rc = 0;
 	/* skip all bad read requests */
 	while (true) {
-		rc = xlog_cursor_next_row(cur, &row);
-		if (rc == 0)
-			break;
-		if (rc < 0) {
-			struct error *e = diag_last_error(diag_get());
-			if (e->type != &type_XlogError)
-				luaT_error(L);
+		const char **data;
+		const char *end;
+		rc = xlog_cursor_next_row_raw(cur, &data, &end);
+		assert(rc >= 0);
+		if (rc == 0) {
+			rc = lbox_xlog_parse_row(L, data, end);
+			if (rc > 0)
+				return rc;
 		}
 		while ((rc = xlog_cursor_next_tx(cur)) < 0) {
 			struct error *e = diag_last_error(diag_get());
@@ -179,74 +291,7 @@ lbox_xlog_parser_iterate(struct lua_State *L)
 		if (rc == 1)
 			break;
 	}
-	if (rc == 1)
-		return 0; /* EOF */
-	assert(rc == 0);
-
-	lua_pushinteger(L, row.lsn);
-	lua_createtable(L, 0, 8);
-	lua_pushstring(L, "HEADER");
-
-	lua_createtable(L, 0, 8);
-	lua_pushstring(L, iproto_key_name(IPROTO_REQUEST_TYPE));
-	const char *typename = iproto_type_name(row.type);
-	if (typename != NULL) {
-		lua_pushstring(L, typename);
-	} else {
-		lua_pushnumber(L, row.type); /* unknown key */
-	}
-	lua_settable(L, -3); /* type */
-	if (row.sync != 0) {
-		lbox_xlog_pushkey(L, iproto_key_name(IPROTO_SYNC));
-		lua_pushinteger(L, row.sync);
-		lua_settable(L, -3); /* sync */
-	}
-	if (row.lsn != 0) {
-		lbox_xlog_pushkey(L, iproto_key_name(IPROTO_LSN));
-		lua_pushinteger(L, row.lsn);
-		lua_settable(L, -3); /* lsn */
-	}
-	if (row.replica_id != 0) {
-		lbox_xlog_pushkey(L, iproto_key_name(IPROTO_REPLICA_ID));
-		lua_pushinteger(L, row.replica_id);
-		lua_settable(L, -3); /* replica_id */
-	}
-	if (row.group_id != 0) {
-		lbox_xlog_pushkey(L, iproto_key_name(IPROTO_GROUP_ID));
-		lua_pushinteger(L, row.group_id);
-		lua_settable(L, -3); /* group_id */
-	}
-	if (row.tm != 0) {
-		lbox_xlog_pushkey(L, iproto_key_name(IPROTO_TIMESTAMP));
-		lua_pushnumber(L, row.tm);
-		lua_settable(L, -3); /* timestamp */
-	}
-	if (row.tsn != row.lsn || !row.is_commit) {
-		lua_pushstring(L, "tsn");
-		lua_pushnumber(L, row.tsn);
-		lua_settable(L, -3); /* transaction identifier */
-	}
-	if (row.is_commit && row.tsn != row.lsn) {
-		lua_pushstring(L, "commit");
-		lua_pushboolean(L, true);
-		/*
-		 * is_commit, set for last row in multi-statement
-		 * transaction
-		 */
-		lua_settable(L, -3);
-	}
-
-	lua_settable(L, -3); /* HEADER */
-
-	if (row.bodycnt > 0) {
-		assert(row.bodycnt == 1);
-		lua_pushstring(L, "BODY");
-		lua_newtable(L);
-		lbox_xlog_parse_body(L, row.type, row.body[0].iov_base,
-				     row.body[0].iov_len);
-		lua_settable(L, -3);  /* BODY */
-	}
-	return 2;
+	return 0;
 }
 
 /* }}} */
blob - 08ce2c76360cdf7c9fddec74a02b1ee01f6fd7dc
blob + e70688e37b0eff6d0f09a9122095a7a1d1d5d3fe
--- src/box/xlog.c
+++ src/box/xlog.c
@@ -1823,7 +1823,18 @@ xlog_tx_cursor_next_row(struct xlog_tx_cursor *tx_curs
 		ibuf_reset(&tx_cursor->rows);
 		return -1;
 	}
+
+	return 0;
+}
 
+int
+xlog_tx_cursor_next_row_raw(struct xlog_tx_cursor *tx_cursor,
+			    const char ***data, const char **end)
+{
+	if (ibuf_used(&tx_cursor->rows) == 0)
+		return 1;
+	*data = (const char **)&tx_cursor->rows.rpos;
+	*end = (const char *)tx_cursor->rows.wpos;
 	return 0;
 }
 
@@ -1867,7 +1878,10 @@ xlog_cursor_next_tx(struct xlog_cursor *i)
 {
 	int rc;
 	assert(xlog_cursor_is_open(i));
-
+	if (i->state == XLOG_CURSOR_TX) {
+		i->state = XLOG_CURSOR_ACTIVE;
+		xlog_tx_cursor_destroy(&i->tx_cursor);
+	}
 	/* load at least magic to check eof */
 	rc = xlog_cursor_ensure(i, sizeof(log_magic_t));
 	if (rc < 0)
@@ -1920,15 +1934,20 @@ xlog_cursor_next_row(struct xlog_cursor *cursor, struc
 	assert(xlog_cursor_is_open(cursor));
 	if (cursor->state != XLOG_CURSOR_TX)
 		return 1;
-	int rc = xlog_tx_cursor_next_row(&cursor->tx_cursor, xrow);
-	if (rc != 0) {
-		cursor->state = XLOG_CURSOR_ACTIVE;
-		xlog_tx_cursor_destroy(&cursor->tx_cursor);
-	}
-	return rc;
+	return xlog_tx_cursor_next_row(&cursor->tx_cursor, xrow);
 }
 
 int
+xlog_cursor_next_row_raw(struct xlog_cursor *cursor,
+			 const char ***data, const char **end)
+{
+	assert(xlog_cursor_is_open(cursor));
+	if (cursor->state != XLOG_CURSOR_TX)
+		return 1;
+	return xlog_tx_cursor_next_row_raw(&cursor->tx_cursor, data, end);
+}
+
+int
 xlog_cursor_next(struct xlog_cursor *cursor,
 		 struct xrow_header *xrow, bool force_recovery)
 {
blob - 1d7f564aba755de68c554483118be1d57aef19d0
blob + 21e644d1585f793ddd9f4f10b2df02904bab01e3
--- src/box/xlog.h
+++ src/box/xlog.h
@@ -595,12 +595,31 @@ xlog_tx_cursor_destroy(struct xlog_tx_cursor *tx_curso
  * Fetch next xrow from xlog tx cursor
  *
  * @retval 0 for Ok
+ * @retval 1 if current tx is done
  * @retval -1 for error
  */
 int
 xlog_tx_cursor_next_row(struct xlog_tx_cursor *tx_cursor, struct xrow_header *xrow);
 
 /**
+ * Fetch next xrow from current xlog tx cursor.
+ *
+ * This function is similar to xlog_tx_cursor_next_row() except it doesn't
+ * parse the xrow nor does it advance the data pointer.
+ *
+ * @param cursor cursor
+ * @param[out] data pointer to the position in the internal buffer where
+ *                  the next xrow is stored
+ * @param[out] end end of the buffer
+ *
+ * @retval 0 for Ok
+ * @retval 1 if current tx is done
+ */
+int
+xlog_tx_cursor_next_row_raw(struct xlog_tx_cursor *cursor,
+			    const char ***data, const char **end);
+
+/**
  * Return current tx cursor position
  *
  * @param tx_cursor tx_cursor
@@ -754,6 +773,24 @@ int
 xlog_cursor_next_row(struct xlog_cursor *cursor, struct xrow_header *xrow);
 
 /**
+ * Fetch next xrow from current xlog tx.
+ *
+ * This function is similar to xlog_cursor_next_row() except it doesn't
+ * parse the xrow nor does it advance the data pointer.
+ *
+ * @param cursor cursor
+ * @param[out] data pointer to the position in the internal buffer where
+ *                  the next xrow is stored
+ * @param[out] end end of the buffer
+ *
+ * @retval 0 for Ok
+ * @retval 1 if current tx is done
+ */
+int
+xlog_cursor_next_row_raw(struct xlog_cursor *cursor,
+			 const char ***data, const char **end);
+
+/**
  * Fetch next row from cursor, ignores xlog tx boundary,
  * open a next one tx if current is done.
  *
blob - 87fbe72e09cfefeb4fe4e22d8a415eeb0110288b
blob + 3bfb01d5a133cdfc7f2fae8aa7787986f342fc33
--- src/box/xrow.c
+++ src/box/xrow.c
@@ -264,9 +264,30 @@ xrow_header_encode(const struct xrow_header *header, u
 	/* Header */
 	char *d = data + 1; /* Skip 1 byte for MP_MAP */
 	int map_size = 0;
+
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_KEY, {
+		d = mp_encode_bool(d, true);
+		d = mp_encode_uint(d, 1);
+		map_size++;
+	});
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_VALUE, {
+		d = mp_encode_uint(d, IPROTO_KEY);
+		d = mp_encode_uint(d, 1);
+		map_size++;
+	});
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, {
+		d = mp_encode_uint(d, 666);
+		d = mp_encode_uint(d, 1);
+		map_size++;
+	});
+
+	uint32_t type = header->type;
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_TYPE, {
+		type = 777;
+	});
 	if (true) {
 		d = mp_encode_uint(d, IPROTO_REQUEST_TYPE);
-		d = mp_encode_uint(d, header->type);
+		d = mp_encode_uint(d, type);
 		map_size++;
 	}
 
@@ -343,6 +364,12 @@ xrow_header_encode(const struct xrow_header *header, u
 	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_HEADER, {
+		mp_encode_array(data, 0);
+	});
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_CORRUPTED_HEADER, {
+		*data = 0xc1;
+	});
 	out->iov_len = d - (char *) out->iov_base;
 	out++;
 
@@ -942,6 +969,21 @@ xrow_encode_dml(const struct request *request, struct 
 	}
 	char *pos = begin + 1;     /* skip 1 byte for MP_MAP */
 	int map_size = 0;
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_KEY, {
+		pos = mp_encode_bool(pos, true);
+		pos = mp_encode_uint(pos, 2);
+		map_size++;
+	});
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_VALUE, {
+		pos = mp_encode_uint(pos, IPROTO_KEY);
+		pos = mp_encode_uint(pos, 2);
+		map_size++;
+	});
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, {
+		pos = mp_encode_uint(pos, 666);
+		pos = mp_encode_uint(pos, 2);
+		map_size++;
+	});
 	if (request->space_id) {
 		pos = mp_encode_uint(pos, IPROTO_SPACE_ID);
 		pos = mp_encode_uint(pos, request->space_id);
@@ -987,6 +1029,12 @@ xrow_encode_dml(const struct request *request, struct 
 
 	assert(pos <= begin + len);
 	mp_encode_map(begin, map_size);
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_INVALID_BODY, {
+		mp_encode_array(begin, 0);
+	});
+	ERROR_INJECT(ERRINJ_XLOG_WRITE_CORRUPTED_BODY, {
+		*begin = 0xc1;
+	});
 	iov[0].iov_base = begin;
 	iov[0].iov_len = pos - begin;
 
blob - 6c7d2736e3ac15cccb8d09be3906de223e870f67
blob + 1c177a42f76989f012a4c20bfb371f6a2c7ce1a3
--- src/lib/core/errinj.h
+++ src/lib/core/errinj.h
@@ -185,6 +185,14 @@ struct errinj {
 	_(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_XLOG_READ, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_XLOG_RENAME_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_CORRUPTED_BODY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_CORRUPTED_HEADER, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_INVALID_BODY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_INVALID_HEADER, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_INVALID_KEY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_INVALID_VALUE, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_UNKNOWN_KEY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_XLOG_WRITE_UNKNOWN_TYPE, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
blob - 8dccaf6d04e63b42c7be309b7cbd74baadfa659b
blob + bdb99a438f1a0ef0b802cfc4781617674b896d02
--- test/box/errinj.result
+++ test/box/errinj.result
@@ -159,6 +159,14 @@ evals
   - ERRINJ_XLOG_META: false
   - ERRINJ_XLOG_READ: -1
   - ERRINJ_XLOG_RENAME_DELAY: false
+  - ERRINJ_XLOG_WRITE_CORRUPTED_BODY: false
+  - ERRINJ_XLOG_WRITE_CORRUPTED_HEADER: false
+  - ERRINJ_XLOG_WRITE_INVALID_BODY: false
+  - ERRINJ_XLOG_WRITE_INVALID_HEADER: false
+  - ERRINJ_XLOG_WRITE_INVALID_KEY: false
+  - ERRINJ_XLOG_WRITE_INVALID_VALUE: false
+  - ERRINJ_XLOG_WRITE_UNKNOWN_KEY: false
+  - ERRINJ_XLOG_WRITE_UNKNOWN_TYPE: false
 ...
 errinj.set("some-injection", true)
 ---
blob - /dev/null
blob + fcf0d61ce976360de1a3d319f0a113991fd9f603 (mode 644)
--- /dev/null
+++ test/box-luatest/xlog_reader_test.lua
@@ -0,0 +1,155 @@
+local fio = require('fio')
+local server = require('luatest.server')
+local t = require('luatest')
+local xlog = require('xlog')
+
+local g = t.group()
+
+local IPROTO_SPACE_ID = 16
+local IPROTO_TUPLE = 33
+
+g.before_all(function(cg)
+    cg.server = server:new()
+    cg.server:start()
+end)
+
+g.after_all(function(cg)
+    cg.server:drop()
+end)
+
+g.after_test('test_bad_xlog', function(cg)
+    cg.server:exec(function()
+        if box.space.test then
+            box.space.test:drop()
+        end
+        box.snapshot()
+    end)
+end)
+
+g.test_bad_xlog = function(cg)
+    t.tarantool.skip_if_not_debug()
+    local info = cg.server:exec(function()
+        local s = box.schema.create_space('test')
+        s:create_index('primary', {parts = {1, 'string'}})
+        box.snapshot()
+        local lsn = box.info.lsn
+        local function test(errinj)
+            box.error.injection.set(errinj, true)
+            local ok = pcall(s.insert, s, {errinj})
+            box.error.injection.set(errinj, false)
+            t.assert(ok)
+        end
+        box.space.test:insert({'BEGIN'})
+        test('ERRINJ_XLOG_WRITE_CORRUPTED_HEADER')
+        test('ERRINJ_XLOG_WRITE_INVALID_HEADER')
+        test('ERRINJ_XLOG_WRITE_CORRUPTED_BODY')
+        test('ERRINJ_XLOG_WRITE_INVALID_BODY')
+        test('ERRINJ_XLOG_WRITE_INVALID_KEY')
+        test('ERRINJ_XLOG_WRITE_INVALID_VALUE')
+        test('ERRINJ_XLOG_WRITE_UNKNOWN_KEY')
+        test('ERRINJ_XLOG_WRITE_UNKNOWN_TYPE')
+        box.space.test:insert({'END'})
+        return {
+            replica_id = box.info.id,
+            lsn = lsn,
+            space_id = box.space.test.id
+        }
+    end)
+    local path = fio.pathjoin(cg.server.workdir,
+                              string.format('%020d.xlog', info.lsn))
+    local result = {}
+    for _, row in xlog.pairs(path) do
+        if type(row.HEADER) == 'table' then
+            if type(row.HEADER.timestamp) == 'number' then
+                row.HEADER.timestamp = '<timestamp>'
+            end
+        end
+        table.insert(result, row)
+    end
+    t.assert_equals(result, {
+        {
+            HEADER = {
+                type = 'INSERT',
+                replica_id = info.replica_id,
+                lsn = info.lsn + 1,
+                timestamp = '<timestamp>',
+            },
+            BODY = {
+                space_id = info.space_id,
+                tuple = {'BEGIN'},
+            },
+        },
+        -- Row with a corrupted header is skipped.
+        -- Row with an invalid header is skipped.
+        -- Row with a corrupted body is skipped.
+        -- Row with an invalid body is skipped.
+        {
+            -- Invalid keys are ignored.
+            HEADER = {
+                type = 'INSERT',
+                replica_id = info.replica_id,
+                lsn = info.lsn + 6,
+                timestamp = '<timestamp>',
+            },
+            BODY = {
+                space_id = info.space_id,
+                tuple = {'ERRINJ_XLOG_WRITE_INVALID_KEY'},
+            },
+        },
+        {
+            -- Invalid values are dumped as is.
+            HEADER = {
+                type = 'INSERT',
+                replica_id = info.replica_id,
+                lsn = info.lsn + 7,
+                timestamp = '<timestamp>',
+                key = 1,
+            },
+            BODY = {
+                space_id = info.space_id,
+                tuple = {'ERRINJ_XLOG_WRITE_INVALID_VALUE'},
+                key = 2,
+            },
+        },
+        {
+            -- Unknown keys are dumped as is.
+            HEADER = {
+                type = 'INSERT',
+                replica_id = info.replica_id,
+                lsn = info.lsn + 8,
+                timestamp = '<timestamp>',
+                [666] = 1,
+            },
+            BODY = {
+                space_id = info.space_id,
+                tuple = {'ERRINJ_XLOG_WRITE_UNKNOWN_KEY'},
+                [666] = 2,
+            },
+        },
+        {
+            -- Unknown type is dumped as is.
+            HEADER = {
+                type = 777,
+                replica_id = info.replica_id,
+                lsn = info.lsn + 9,
+                timestamp = '<timestamp>',
+            },
+            BODY = {
+                [IPROTO_SPACE_ID] = info.space_id,
+                [IPROTO_TUPLE] = {'ERRINJ_XLOG_WRITE_UNKNOWN_TYPE'},
+            },
+        },
+        {
+            HEADER = {
+                type = 'INSERT',
+                replica_id = info.replica_id,
+                lsn = info.lsn + 10,
+                timestamp = '<timestamp>',
+            },
+            BODY = {
+                space_id = info.space_id,
+                tuple = {'END'},
+            },
+        },
+    })
+end