commit - ebafd68409ef6a9ac9c7fd1ef47a308f24136c1d
commit + 7383266879c3c3842c6ff8105b66ae079cb70b45
blob - 52a0c135140c54a4d1ff3a93dff1045e4b24653f
blob + ba0d1f76e01b951246783a56dafb84adc00fc1c8
--- src/box/iproto.cc
+++ src/box/iproto.cc
* parsed data is processed. It's important this
* is done only once.
*/
- con->p_ibuf->wpos -= con->parse_size;
+ ibuf_discard(con->p_ibuf, con->parse_size);
mh_int_t node;
mh_foreach(con->streams, node) {
struct iproto_stream *stream = (struct iproto_stream *)
}
xibuf_reserve(new_ibuf, to_read + con->parse_size);
- /*
- * Discard unparsed data in the old buffer, otherwise it
- * won't be recycled when all parsed requests are processed.
- */
- old_ibuf->wpos -= con->parse_size;
if (con->parse_size != 0) {
/* Move the cached request prefix to the new buffer. */
- memcpy(new_ibuf->rpos, old_ibuf->wpos, con->parse_size);
- new_ibuf->wpos += con->parse_size;
+ void *wpos = ibuf_alloc(new_ibuf, con->parse_size);
+ memcpy(wpos, old_ibuf->wpos - con->parse_size, con->parse_size);
/*
+ * Discard unparsed data in the old buffer, otherwise it
+ * won't be recycled when all parsed requests are processed.
+ */
+ ibuf_discard(old_ibuf, con->parse_size);
+ /*
* We made ibuf idle. If obuf was already idle it
* makes the both ibuf and obuf idle, time to trim
* them.
blob - bbe8ca3d02ac6ba3b67d013e5553f0d8f16531e7
blob + 99c5b31b1a3d997fa58f9f355c6536077d47a990
--- src/box/lua/merger.c
+++ src/box/lua/merger.c
static void
encode_header(struct ibuf *output_buffer, uint32_t result_len)
{
- ibuf_reserve(output_buffer, mp_sizeof_array(result_len));
- output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len);
+ void *wpos = xibuf_alloc(output_buffer, mp_sizeof_array(result_len));
+ mp_encode_array(wpos, result_len);
}
/**
return -1;
}
--source->remaining_tuple_count;
- source->buf->rpos = (char *) tuple_end;
if (format == NULL)
format = tuple_format_runtime;
struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end);
+ ibuf_consume_before(source->buf, tuple_end);
if (tuple == NULL)
return -1;
merge_source_next(source, NULL, &tuple)) == 0 &&
tuple != NULL) {
uint32_t bsize = tuple_bsize(tuple);
- ibuf_reserve(output_buffer, bsize);
- memcpy(output_buffer->wpos, tuple_data(tuple), bsize);
- output_buffer->wpos += bsize;
+ void *wpos = xibuf_alloc(output_buffer, bsize);
+ memcpy(wpos, tuple_data(tuple), bsize);
result_len_offset += bsize;
++result_len;
blob - 7cba024bfd6c14cbefa0fa3fd94bb602ec29f6d9
blob + 8e14945356591a0702036a41e323b84540f26df4
--- src/box/lua/net_box.c
+++ src/box/lua/net_box.c
struct ibuf send_buf;
/** Connection receive buffer. */
struct ibuf recv_buf;
+ /** Size of the last received message. */
+ size_t last_msg_size;
/** Signalled when send_buf becomes empty. */
struct fiber_cond on_send_buf_empty;
/** Next request id. */
iostream_clear(&transport->io);
ibuf_create(&transport->send_buf, &cord()->slabc, NETBOX_READAHEAD);
ibuf_create(&transport->recv_buf, &cord()->slabc, NETBOX_READAHEAD);
+ transport->last_msg_size = 0;
fiber_cond_create(&transport->on_send_buf_empty);
transport->next_sync = 1;
transport->requests = mh_i64ptr_new();
/* Reset buffers. */
ibuf_reinit(&transport->send_buf);
ibuf_reinit(&transport->recv_buf);
+ transport->last_msg_size = 0;
fiber_cond_broadcast(&transport->on_send_buf_empty);
/* Complete requests and clean up the hash. */
struct mh_i64ptr_t *h = transport->requests;
"Peer closed");
return -1;
} if (rc > 0) {
- recv_buf->wpos += rc;
+ VERIFY(ibuf_alloc(recv_buf, rc) != NULL);
} else if (rc == IOSTREAM_ERROR) {
goto io_error;
} else {
ssize_t rc = iostream_write(io, send_buf->rpos,
ibuf_used(send_buf));
if (rc >= 0) {
- send_buf->rpos += rc;
+ ibuf_consume(send_buf, rc);
if (ibuf_used(send_buf) == 0)
fiber_cond_broadcast(on_send_buf_empty);
} else if (rc == IOSTREAM_ERROR) {
netbox_transport_send_and_recv(struct netbox_transport *transport,
struct xrow_header *hdr)
{
+ ibuf_consume(&transport->recv_buf, transport->last_msg_size);
while (true) {
size_t required;
size_t data_len = ibuf_used(&transport->recv_buf);
required = size + len;
if (data_len >= required) {
const char *body_end = rpos + len;
- transport->recv_buf.rpos = (char *)body_end;
- return xrow_header_decode(
- hdr, &rpos, body_end,
- /*end_is_exact=*/true);
+ int rc = xrow_header_decode(
+ hdr, &rpos, body_end,
+ /*end_is_exact=*/true);
+ transport->last_msg_size = body_end - bufpos;
+ return rc;
}
}
if (netbox_transport_communicate(transport, required) != 0)
blob - 646cad25f0a01fe27ac5992e6441393cb1a6bf15
blob + 06644b3825490ab2773a732c2567afc5db78ca7d
--- src/box/lua/schema.lua
+++ src/box/lua/schema.lua
iterator_pos_end[0] = iterator_pos[0] + #pos
return true
else
- ibuf.rpos = ibuf.wpos
+ ibuf:consume(ibuf.wpos - ibuf.rpos)
local tuple, tuple_end = tuple_encode(ibuf, pos)
return builtin.box_index_tuple_position(
index.space_id, index.id, tuple, tuple_end,
blob - a38a4f84ef509e3ede17629d103e69b6209077e0
blob + 584fe32ab1a69a8b177e49bcd12cdc583f5b4066
--- src/box/lua/tuple.lua
+++ src/box/lua/tuple.lua
local function tuple_to_msgpack(buf, tuple)
assert(ffi.istype(tuple_t, tuple))
local bsize = builtin.box_tuple_bsize(tuple)
- buf:reserve(bsize)
- builtin.box_tuple_to_buf(tuple, buf.wpos, bsize)
- buf.wpos = buf.wpos + bsize
+ builtin.box_tuple_to_buf(tuple, buf:alloc(bsize), bsize)
end
local function tuple_bsize(tuple)
blob - 025346e149bfdf33357c4381d04ab95fdc3c30c4
blob + 14880200d1feeb1abd2210802505c7a1801abb97
--- src/httpc.c
+++ src/httpc.c
size_t read_len = ibuf_len < buffer_size ? ibuf_len : buffer_size;
memcpy(buffer, req->send.rpos, read_len);
- req->send.rpos += read_len;
+ ibuf_consume(&req->send, read_len);
fiber_cond_broadcast(&req->io_send_cond);
return read_len;
if (copied == ibuf_len)
ibuf_reset(&req->io_recv);
else
- req->io_recv.rpos += copied;
+ ibuf_consume(&req->io_recv, copied);
}
if (copied < len && recv_len > 0) {
blob - 0d90a483c06d7d73aa6ee93a123c087651fcf702
blob + 110cc700c5b598bf722f3354f7113266bf2602d7
--- src/lib/core/coio_buf.h
+++ src/lib/core/coio_buf.h
ssize_t n = coio_read_ahead(io, buf->wpos, sz, ibuf_unused(buf));
if (n < 0)
diag_raise();
- buf->wpos += n;
+ VERIFY(ibuf_alloc(buf, n) != NULL);
return n;
}
timeout);
if (n < 0)
diag_raise();
- buf->wpos += n;
+ VERIFY(ibuf_alloc(buf, n) != NULL);
return n;
}
ssize_t n = coio_readn_ahead(io, buf->wpos, sz, ibuf_unused(buf));
if (n < 0)
diag_raise();
- buf->wpos += n;
+ VERIFY(ibuf_alloc(buf, n) != NULL);
return n;
}
timeout);
if (n < 0)
diag_raise();
- buf->wpos += n;
+ VERIFY(ibuf_alloc(buf, n) != NULL);
return n;
}
blob - e60dc7d04bb59c42cc71ca83f816d2da8e097ca2
blob + da7d49a18e8714a8252bcd2e014dda5b89953b89
--- src/lib/core/prbuf.c
+++ src/lib/core/prbuf.c
ibuf_reset(&reader->buf);
reader->pos = reader->data_begin;
reader->read_pos = reader->pos;
+ reader->last_read_size = 0;
}
int
return 0;
}
+ /* Consume the record data of the previous read. */
+ ibuf_consume(&reader->buf, reader->last_read_size);
/* Check if we hit end of buffer and need to wrap around. */
if (reader->data_end - reader->pos < (off_t)record_size_overhead)
prbuf_reader_wrap(reader);
}
/* Read record data. */
- reader->buf.rpos += record_size_overhead;
+ ibuf_consume(&reader->buf, record_size_overhead);
if (prbuf_reader_ensure(reader, sz) != 0)
return -1;
entry->ptr = reader->buf.rpos;
entry->size = sz;
+ reader->last_read_size = sz;
reader->pos += full_sz;
- reader->buf.rpos += sz;
reader->unread_size -= full_sz;
return 0;
}
blob - ab5303bb0d9a9412bdad072a9a93f116ded2805a
blob + c06426984ddfb3ad87641eee86184d420091427f
--- src/lib/core/prbuf.h
+++ src/lib/core/prbuf.h
*/
size_t unread_size;
/**
+ * Number of bytes in the last record read by client.
+ */
+ size_t last_read_size;
+ /**
* File offset of the beginning of the data area.
* Data area is the area after header till the end of the buffer.
*/
* Read the next record into entry argument. If there are no more records
* then returned record will be a terminator (EOF, ptr == NULL && size == 0).
*
+ * Pointer to data on successful read is valid until next call to this
+ * function.
+ *
* After EOF the function can be called again and will return EOF.
*
* After failure reader is invalid and can only be closed.
blob - c34298a3dc4d5fffc920b6ca3485ec0c425abd19
blob + 7d6ad9952415bfe858edafbe74c9ffa63c92d969
--- src/lua/buffer.lua
+++ src/lua/buffer.lua
local rpos = buf.rpos
buf.rpos = rpos + size
return rpos
+end
+
+local function ibuf_consume(buf, size)
+ checkibuf(buf, 'consume')
+ checksize(buf, size)
+ utils.poison_memory_region(buf.rpos, size);
+ buf.rpos = buf.rpos + size
end
local function ibuf_serialize(buf)
checksize = ibuf_checksize;
read = ibuf_read;
+ consume = ibuf_consume;
__serialize = ibuf_serialize;
size = ibuf_used;
blob - e694f1fc1db4e7d48e2a740fce11ce91258b24b3
blob + a8f058bf6acb41ec0df033d9c96cec42845b90a7
--- src/lua/httpc.lua
+++ src/lua/httpc.lua
local len = check(self, chunk, delimiter)
if len ~= nil then
local data = ffi.string(rbuf.rpos, len)
- rbuf.rpos = rbuf.rpos + len
+ rbuf:consume(len)
return data
end
self._errno = nil
local len = rbuf:size()
local data = ffi.string(rbuf.rpos, len)
- rbuf.rpos = rbuf.rpos + len
+ rbuf:consume(len)
return data
else
- rbuf.wpos = rbuf.wpos + res
+ rbuf:alloc(res)
local len = check(self, chunk, delimiter)
if len ~= nil then
self._errno = nil
local data = ffi.string(rbuf.rpos, len)
- rbuf.rpos = rbuf.rpos + len
+ rbuf:consume(len)
return data
end
end
blob - 28581480497bde2d590a782e3f2de014239e099b
blob + 2b6b59759d73c467827582b73f23a07fa8ee952d
--- src/lua/socket.lua
+++ src/lua/socket.lua
if len ~= nil then
self._errno = nil
local data = ffi.string(rbuf.rpos, len)
- rbuf.rpos = rbuf.rpos + len
+ rbuf:consume(len)
return data
end
self._errno = nil
local len = rbuf:size()
local data = ffi.string(rbuf.rpos, len)
- rbuf.rpos = rbuf.rpos + len
+ rbuf:consume(len)
return data
elseif res ~= nil then
- rbuf.wpos = rbuf.wpos + res
+ rbuf:alloc(res)
local len = check(self, limit, ...)
if len ~= nil then
self._errno = nil
local data = ffi.string(rbuf.rpos, len)
- rbuf.rpos = rbuf.rpos + len
+ rbuf:consume(len)
return data
end
elseif not errno_is_transient[self._errno] then