Commit Diff


commit - ebafd68409ef6a9ac9c7fd1ef47a308f24136c1d
commit + 7383266879c3c3842c6ff8105b66ae079cb70b45
blob - 52a0c135140c54a4d1ff3a93dff1045e4b24653f
blob + ba0d1f76e01b951246783a56dafb84adc00fc1c8
--- src/box/iproto.cc
+++ src/box/iproto.cc
@@ -945,7 +945,7 @@ iproto_connection_close(struct iproto_connection *con)
 		 * parsed data is processed.  It's important this
 		 * is done only once.
 		 */
-		con->p_ibuf->wpos -= con->parse_size;
+		ibuf_discard(con->p_ibuf, con->parse_size);
 		mh_int_t node;
 		mh_foreach(con->streams, node) {
 			struct iproto_stream *stream = (struct iproto_stream *)
@@ -1052,16 +1052,16 @@ iproto_connection_input_buffer(struct iproto_connectio
 	}
 
 	xibuf_reserve(new_ibuf, to_read + con->parse_size);
-	/*
-	 * Discard unparsed data in the old buffer, otherwise it
-	 * won't be recycled when all parsed requests are processed.
-	 */
-	old_ibuf->wpos -= con->parse_size;
 	if (con->parse_size != 0) {
 		/* Move the cached request prefix to the new buffer. */
-		memcpy(new_ibuf->rpos, old_ibuf->wpos, con->parse_size);
-		new_ibuf->wpos += con->parse_size;
+		void *wpos = ibuf_alloc(new_ibuf, con->parse_size);
+		memcpy(wpos, old_ibuf->wpos - con->parse_size, con->parse_size);
 		/*
+		 * Discard unparsed data in the old buffer, otherwise it
+		 * won't be recycled when all parsed requests are processed.
+		 */
+		ibuf_discard(old_ibuf, con->parse_size);
+		/*
 		 * We made ibuf idle. If obuf was already idle it
 		 * makes the both ibuf and obuf idle, time to trim
 		 * them.
blob - bbe8ca3d02ac6ba3b67d013e5553f0d8f16531e7
blob + 99c5b31b1a3d997fa58f9f355c6536077d47a990
--- src/box/lua/merger.c
+++ src/box/lua/merger.c
@@ -115,8 +115,8 @@ decode_header(struct ibuf *buf, size_t *len_p)
 static void
 encode_header(struct ibuf *output_buffer, uint32_t result_len)
 {
-	ibuf_reserve(output_buffer, mp_sizeof_array(result_len));
-	output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len);
+	void *wpos = xibuf_alloc(output_buffer, mp_sizeof_array(result_len));
+	mp_encode_array(wpos, result_len);
 }
 
 /**
@@ -596,10 +596,10 @@ luaL_merge_source_buffer_next(struct merge_source *bas
 		return -1;
 	}
 	--source->remaining_tuple_count;
-	source->buf->rpos = (char *) tuple_end;
 	if (format == NULL)
 		format = tuple_format_runtime;
 	struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end);
+	ibuf_consume_before(source->buf, tuple_end);
 	if (tuple == NULL)
 		return -1;
 
@@ -1140,9 +1140,8 @@ encode_result_buffer(struct lua_State *L, struct merge
 	       merge_source_next(source, NULL, &tuple)) == 0 &&
 	       tuple != NULL) {
 		uint32_t bsize = tuple_bsize(tuple);
-		ibuf_reserve(output_buffer, bsize);
-		memcpy(output_buffer->wpos, tuple_data(tuple), bsize);
-		output_buffer->wpos += bsize;
+		void *wpos = xibuf_alloc(output_buffer, bsize);
+		memcpy(wpos, tuple_data(tuple), bsize);
 		result_len_offset += bsize;
 		++result_len;
 
blob - 7cba024bfd6c14cbefa0fa3fd94bb602ec29f6d9
blob + 8e14945356591a0702036a41e323b84540f26df4
--- src/box/lua/net_box.c
+++ src/box/lua/net_box.c
@@ -270,6 +270,8 @@ struct netbox_transport {
 	struct ibuf send_buf;
 	/** Connection receive buffer. */
 	struct ibuf recv_buf;
+	/** Size of the last received message. */
+	size_t last_msg_size;
 	/** Signalled when send_buf becomes empty. */
 	struct fiber_cond on_send_buf_empty;
 	/** Next request id. */
@@ -523,6 +525,7 @@ netbox_transport_create(struct netbox_transport *trans
 	iostream_clear(&transport->io);
 	ibuf_create(&transport->send_buf, &cord()->slabc, NETBOX_READAHEAD);
 	ibuf_create(&transport->recv_buf, &cord()->slabc, NETBOX_READAHEAD);
+	transport->last_msg_size = 0;
 	fiber_cond_create(&transport->on_send_buf_empty);
 	transport->next_sync = 1;
 	transport->requests = mh_i64ptr_new();
@@ -580,6 +583,7 @@ netbox_transport_set_error(struct netbox_transport *tr
 	/* Reset buffers. */
 	ibuf_reinit(&transport->send_buf);
 	ibuf_reinit(&transport->recv_buf);
+	transport->last_msg_size = 0;
 	fiber_cond_broadcast(&transport->on_send_buf_empty);
 	/* Complete requests and clean up the hash. */
 	struct mh_i64ptr_t *h = transport->requests;
@@ -1142,7 +1146,7 @@ netbox_transport_communicate(struct netbox_transport *
 						"Peer closed");
 				return -1;
 			} if (rc > 0) {
-				recv_buf->wpos += rc;
+				VERIFY(ibuf_alloc(recv_buf, rc) != NULL);
 			} else if (rc == IOSTREAM_ERROR) {
 				goto io_error;
 			} else {
@@ -1156,7 +1160,7 @@ netbox_transport_communicate(struct netbox_transport *
 			ssize_t rc = iostream_write(io, send_buf->rpos,
 						    ibuf_used(send_buf));
 			if (rc >= 0) {
-				send_buf->rpos += rc;
+				ibuf_consume(send_buf, rc);
 				if (ibuf_used(send_buf) == 0)
 					fiber_cond_broadcast(on_send_buf_empty);
 			} else if (rc == IOSTREAM_ERROR) {
@@ -1193,6 +1197,7 @@ static int
 netbox_transport_send_and_recv(struct netbox_transport *transport,
 			       struct xrow_header *hdr)
 {
+	ibuf_consume(&transport->recv_buf, transport->last_msg_size);
 	while (true) {
 		size_t required;
 		size_t data_len = ibuf_used(&transport->recv_buf);
@@ -1212,10 +1217,11 @@ netbox_transport_send_and_recv(struct netbox_transport
 			required = size + len;
 			if (data_len >= required) {
 				const char *body_end = rpos + len;
-				transport->recv_buf.rpos = (char *)body_end;
-				return xrow_header_decode(
-					hdr, &rpos, body_end,
-					/*end_is_exact=*/true);
+				int rc = xrow_header_decode(
+						hdr, &rpos, body_end,
+						/*end_is_exact=*/true);
+				transport->last_msg_size = body_end - bufpos;
+				return rc;
 			}
 		}
 		if (netbox_transport_communicate(transport, required) != 0)
blob - 646cad25f0a01fe27ac5992e6441393cb1a6bf15
blob + 06644b3825490ab2773a732c2567afc5db78ca7d
--- src/box/lua/schema.lua
+++ src/box/lua/schema.lua
@@ -1941,7 +1941,7 @@ local function iterator_pos_set(index, pos, ibuf)
         iterator_pos_end[0] = iterator_pos[0] + #pos
         return true
     else
-        ibuf.rpos = ibuf.wpos
+        ibuf:consume(ibuf.wpos - ibuf.rpos)
         local tuple, tuple_end = tuple_encode(ibuf, pos)
         return builtin.box_index_tuple_position(
                 index.space_id, index.id, tuple, tuple_end,
blob - a38a4f84ef509e3ede17629d103e69b6209077e0
blob + 584fe32ab1a69a8b177e49bcd12cdc583f5b4066
--- src/box/lua/tuple.lua
+++ src/box/lua/tuple.lua
@@ -304,9 +304,7 @@ end
 local function tuple_to_msgpack(buf, tuple)
     assert(ffi.istype(tuple_t, tuple))
     local bsize = builtin.box_tuple_bsize(tuple)
-    buf:reserve(bsize)
-    builtin.box_tuple_to_buf(tuple, buf.wpos, bsize)
-    buf.wpos = buf.wpos + bsize
+    builtin.box_tuple_to_buf(tuple, buf:alloc(bsize), bsize)
 end
 
 local function tuple_bsize(tuple)
blob - 025346e149bfdf33357c4381d04ab95fdc3c30c4
blob + 14880200d1feeb1abd2210802505c7a1801abb97
--- src/httpc.c
+++ src/httpc.c
@@ -68,7 +68,7 @@ curl_easy_io_read_cb(char *buffer, size_t size, size_t
 
 	size_t read_len = ibuf_len < buffer_size ? ibuf_len : buffer_size;
 	memcpy(buffer, req->send.rpos, read_len);
-	req->send.rpos += read_len;
+	ibuf_consume(&req->send, read_len);
 
 	fiber_cond_broadcast(&req->io_send_cond);
 	return read_len;
@@ -541,7 +541,7 @@ httpc_request_io_read(struct httpc_request *req, char 
 		if (copied == ibuf_len)
 			ibuf_reset(&req->io_recv);
 		else
-			req->io_recv.rpos += copied;
+			ibuf_consume(&req->io_recv, copied);
 	}
 
 	if (copied < len && recv_len > 0) {
blob - 0d90a483c06d7d73aa6ee93a123c087651fcf702
blob + 110cc700c5b598bf722f3354f7113266bf2602d7
--- src/lib/core/coio_buf.h
+++ src/lib/core/coio_buf.h
@@ -50,7 +50,7 @@ coio_bread(struct iostream *io, struct ibuf *buf, size
 	ssize_t n = coio_read_ahead(io, buf->wpos, sz, ibuf_unused(buf));
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
@@ -68,7 +68,7 @@ coio_bread_timeout(struct iostream *io, struct ibuf *b
 			                    timeout);
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
@@ -80,7 +80,7 @@ coio_breadn(struct iostream *io, struct ibuf *buf, siz
 	ssize_t n = coio_readn_ahead(io, buf->wpos, sz, ibuf_unused(buf));
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
@@ -99,7 +99,7 @@ coio_breadn_timeout(struct iostream *io, struct ibuf *
 			                     timeout);
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
blob - e60dc7d04bb59c42cc71ca83f816d2da8e097ca2
blob + da7d49a18e8714a8252bcd2e014dda5b89953b89
--- src/lib/core/prbuf.c
+++ src/lib/core/prbuf.c
@@ -427,6 +427,7 @@ prbuf_reader_wrap(struct prbuf_reader *reader)
 	ibuf_reset(&reader->buf);
 	reader->pos = reader->data_begin;
 	reader->read_pos = reader->pos;
+	reader->last_read_size = 0;
 }
 
 int
@@ -445,6 +446,8 @@ prbuf_reader_next(struct prbuf_reader *reader,
 		return 0;
 	}
 
+	/* Consume the record data of the previous read. */
+	ibuf_consume(&reader->buf, reader->last_read_size);
 	/* Check if we hit end of buffer and need to wrap around. */
 	if (reader->data_end - reader->pos < (off_t)record_size_overhead)
 		prbuf_reader_wrap(reader);
@@ -472,14 +475,14 @@ prbuf_reader_next(struct prbuf_reader *reader,
 	}
 
 	/* Read record data. */
-	reader->buf.rpos += record_size_overhead;
+	ibuf_consume(&reader->buf, record_size_overhead);
 	if (prbuf_reader_ensure(reader, sz) != 0)
 		return -1;
 
 	entry->ptr = reader->buf.rpos;
 	entry->size = sz;
+	reader->last_read_size = sz;
 	reader->pos += full_sz;
-	reader->buf.rpos += sz;
 	reader->unread_size -= full_sz;
 	return 0;
 }
blob - ab5303bb0d9a9412bdad072a9a93f116ded2805a
blob + c06426984ddfb3ad87641eee86184d420091427f
--- src/lib/core/prbuf.h
+++ src/lib/core/prbuf.h
@@ -95,6 +95,10 @@ struct prbuf_reader {
 	 */
 	size_t unread_size;
 	/**
+	 * Number of bytes in the last record read by client.
+	 */
+	size_t last_read_size;
+	/**
 	 * File offset of the beginning of the data area.
 	 * Data area is the area after header till the end of the buffer.
 	 */
@@ -122,6 +126,9 @@ prbuf_reader_create(struct prbuf_reader *reader, int f
  * Read the next record into entry argument. If there are no more records
  * then returned record will be a terminator (EOF, ptr == NULL && size == 0).
  *
+ * Pointer to data on successful read is valid until next call to this
+ * function.
+ *
  * After EOF the function can be called again and will return EOF.
  *
  * After failure reader is invalid and can only be closed.
blob - c34298a3dc4d5fffc920b6ca3485ec0c425abd19
blob + 7d6ad9952415bfe858edafbe74c9ffa63c92d969
--- src/lua/buffer.lua
+++ src/lua/buffer.lua
@@ -152,6 +152,13 @@ local function ibuf_read(buf, size)
     local rpos = buf.rpos
     buf.rpos = rpos + size
     return rpos
+end
+
+local function ibuf_consume(buf, size)
+    checkibuf(buf, 'consume')
+    checksize(buf, size)
+    utils.poison_memory_region(buf.rpos, size);
+    buf.rpos = buf.rpos + size
 end
 
 local function ibuf_serialize(buf)
@@ -168,6 +175,7 @@ local ibuf_methods = {
 
     checksize = ibuf_checksize;
     read = ibuf_read;
+    consume = ibuf_consume;
     __serialize = ibuf_serialize;
 
     size = ibuf_used;
blob - e694f1fc1db4e7d48e2a740fce11ce91258b24b3
blob + a8f058bf6acb41ec0df033d9c96cec42845b90a7
--- src/lua/httpc.lua
+++ src/lua/httpc.lua
@@ -424,7 +424,7 @@ local function io_read(self, opts, timeout)
     local len = check(self, chunk, delimiter)
     if len ~= nil then
         local data = ffi.string(rbuf.rpos, len)
-        rbuf.rpos = rbuf.rpos + len
+        rbuf:consume(len)
         return data
     end
 
@@ -438,15 +438,15 @@ local function io_read(self, opts, timeout)
             self._errno = nil
             local len = rbuf:size()
             local data = ffi.string(rbuf.rpos, len)
-            rbuf.rpos = rbuf.rpos + len
+            rbuf:consume(len)
             return data
         else
-            rbuf.wpos = rbuf.wpos + res
+            rbuf:alloc(res)
             local len = check(self, chunk, delimiter)
             if len ~= nil then
                 self._errno = nil
                 local data = ffi.string(rbuf.rpos, len)
-                rbuf.rpos = rbuf.rpos + len
+                rbuf:consume(len)
                 return data
             end
         end
blob - 28581480497bde2d590a782e3f2de014239e099b
blob + 2b6b59759d73c467827582b73f23a07fa8ee952d
--- src/lua/socket.lua
+++ src/lua/socket.lua
@@ -705,7 +705,7 @@ local function read(self, limit, timeout, check, ...)
     if len ~= nil then
         self._errno = nil
         local data = ffi.string(rbuf.rpos, len)
-        rbuf.rpos = rbuf.rpos + len
+        rbuf:consume(len)
         return data
     end
 
@@ -720,15 +720,15 @@ local function read(self, limit, timeout, check, ...)
             self._errno = nil
             local len = rbuf:size()
             local data = ffi.string(rbuf.rpos, len)
-            rbuf.rpos = rbuf.rpos + len
+            rbuf:consume(len)
             return data
         elseif res ~= nil then
-            rbuf.wpos = rbuf.wpos + res
+            rbuf:alloc(res)
             local len = check(self, limit, ...)
             if len ~= nil then
                 self._errno = nil
                 local data = ffi.string(rbuf.rpos, len)
-                rbuf.rpos = rbuf.rpos + len
+                rbuf:consume(len)
                 return data
             end
         elseif not errno_is_transient[self._errno] then