commit 7383266879c3c3842c6ff8105b66ae079cb70b45 from: Nikolay Shirokovskiy via: Vladimir Davydov date: Mon Oct 23 09:53:58 2023 UTC misc: use ibuf API to discard/allocate/consume The API functions additionally poison related data in ASAN build. Follow-up #7327 NO_TEST=refactoring NO_CHANGELOG=refactoring NO_DOC=refactoring commit - ebafd68409ef6a9ac9c7fd1ef47a308f24136c1d commit + 7383266879c3c3842c6ff8105b66ae079cb70b45 blob - 52a0c135140c54a4d1ff3a93dff1045e4b24653f blob + ba0d1f76e01b951246783a56dafb84adc00fc1c8 --- src/box/iproto.cc +++ src/box/iproto.cc @@ -945,7 +945,7 @@ iproto_connection_close(struct iproto_connection *con) * parsed data is processed. It's important this * is done only once. */ - con->p_ibuf->wpos -= con->parse_size; + ibuf_discard(con->p_ibuf, con->parse_size); mh_int_t node; mh_foreach(con->streams, node) { struct iproto_stream *stream = (struct iproto_stream *) @@ -1052,16 +1052,16 @@ iproto_connection_input_buffer(struct iproto_connectio } xibuf_reserve(new_ibuf, to_read + con->parse_size); - /* - * Discard unparsed data in the old buffer, otherwise it - * won't be recycled when all parsed requests are processed. - */ - old_ibuf->wpos -= con->parse_size; if (con->parse_size != 0) { /* Move the cached request prefix to the new buffer. */ - memcpy(new_ibuf->rpos, old_ibuf->wpos, con->parse_size); - new_ibuf->wpos += con->parse_size; + void *wpos = ibuf_alloc(new_ibuf, con->parse_size); + memcpy(wpos, old_ibuf->wpos - con->parse_size, con->parse_size); /* + * Discard unparsed data in the old buffer, otherwise it + * won't be recycled when all parsed requests are processed. + */ + ibuf_discard(old_ibuf, con->parse_size); + /* * We made ibuf idle. If obuf was already idle it * makes the both ibuf and obuf idle, time to trim * them. blob - bbe8ca3d02ac6ba3b67d013e5553f0d8f16531e7 blob + 99c5b31b1a3d997fa58f9f355c6536077d47a990 --- src/box/lua/merger.c +++ src/box/lua/merger.c @@ -115,8 +115,8 @@ decode_header(struct ibuf *buf, size_t *len_p) static void encode_header(struct ibuf *output_buffer, uint32_t result_len) { - ibuf_reserve(output_buffer, mp_sizeof_array(result_len)); - output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len); + void *wpos = xibuf_alloc(output_buffer, mp_sizeof_array(result_len)); + mp_encode_array(wpos, result_len); } /** @@ -596,10 +596,10 @@ luaL_merge_source_buffer_next(struct merge_source *bas return -1; } --source->remaining_tuple_count; - source->buf->rpos = (char *) tuple_end; if (format == NULL) format = tuple_format_runtime; struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end); + ibuf_consume_before(source->buf, tuple_end); if (tuple == NULL) return -1; @@ -1140,9 +1140,8 @@ encode_result_buffer(struct lua_State *L, struct merge merge_source_next(source, NULL, &tuple)) == 0 && tuple != NULL) { uint32_t bsize = tuple_bsize(tuple); - ibuf_reserve(output_buffer, bsize); - memcpy(output_buffer->wpos, tuple_data(tuple), bsize); - output_buffer->wpos += bsize; + void *wpos = xibuf_alloc(output_buffer, bsize); + memcpy(wpos, tuple_data(tuple), bsize); result_len_offset += bsize; ++result_len; blob - 7cba024bfd6c14cbefa0fa3fd94bb602ec29f6d9 blob + 8e14945356591a0702036a41e323b84540f26df4 --- src/box/lua/net_box.c +++ src/box/lua/net_box.c @@ -270,6 +270,8 @@ struct netbox_transport { struct ibuf send_buf; /** Connection receive buffer. */ struct ibuf recv_buf; + /** Size of the last received message. */ + size_t last_msg_size; /** Signalled when send_buf becomes empty. */ struct fiber_cond on_send_buf_empty; /** Next request id. */ @@ -523,6 +525,7 @@ netbox_transport_create(struct netbox_transport *trans iostream_clear(&transport->io); ibuf_create(&transport->send_buf, &cord()->slabc, NETBOX_READAHEAD); ibuf_create(&transport->recv_buf, &cord()->slabc, NETBOX_READAHEAD); + transport->last_msg_size = 0; fiber_cond_create(&transport->on_send_buf_empty); transport->next_sync = 1; transport->requests = mh_i64ptr_new(); @@ -580,6 +583,7 @@ netbox_transport_set_error(struct netbox_transport *tr /* Reset buffers. */ ibuf_reinit(&transport->send_buf); ibuf_reinit(&transport->recv_buf); + transport->last_msg_size = 0; fiber_cond_broadcast(&transport->on_send_buf_empty); /* Complete requests and clean up the hash. */ struct mh_i64ptr_t *h = transport->requests; @@ -1142,7 +1146,7 @@ netbox_transport_communicate(struct netbox_transport * "Peer closed"); return -1; } if (rc > 0) { - recv_buf->wpos += rc; + VERIFY(ibuf_alloc(recv_buf, rc) != NULL); } else if (rc == IOSTREAM_ERROR) { goto io_error; } else { @@ -1156,7 +1160,7 @@ netbox_transport_communicate(struct netbox_transport * ssize_t rc = iostream_write(io, send_buf->rpos, ibuf_used(send_buf)); if (rc >= 0) { - send_buf->rpos += rc; + ibuf_consume(send_buf, rc); if (ibuf_used(send_buf) == 0) fiber_cond_broadcast(on_send_buf_empty); } else if (rc == IOSTREAM_ERROR) { @@ -1193,6 +1197,7 @@ static int netbox_transport_send_and_recv(struct netbox_transport *transport, struct xrow_header *hdr) { + ibuf_consume(&transport->recv_buf, transport->last_msg_size); while (true) { size_t required; size_t data_len = ibuf_used(&transport->recv_buf); @@ -1212,10 +1217,11 @@ netbox_transport_send_and_recv(struct netbox_transport required = size + len; if (data_len >= required) { const char *body_end = rpos + len; - transport->recv_buf.rpos = (char *)body_end; - return xrow_header_decode( - hdr, &rpos, body_end, - /*end_is_exact=*/true); + int rc = xrow_header_decode( + hdr, &rpos, body_end, + /*end_is_exact=*/true); + transport->last_msg_size = body_end - bufpos; + return rc; } } if (netbox_transport_communicate(transport, required) != 0) blob - 646cad25f0a01fe27ac5992e6441393cb1a6bf15 blob + 06644b3825490ab2773a732c2567afc5db78ca7d --- src/box/lua/schema.lua +++ src/box/lua/schema.lua @@ -1941,7 +1941,7 @@ local function iterator_pos_set(index, pos, ibuf) iterator_pos_end[0] = iterator_pos[0] + #pos return true else - ibuf.rpos = ibuf.wpos + ibuf:consume(ibuf.wpos - ibuf.rpos) local tuple, tuple_end = tuple_encode(ibuf, pos) return builtin.box_index_tuple_position( index.space_id, index.id, tuple, tuple_end, blob - a38a4f84ef509e3ede17629d103e69b6209077e0 blob + 584fe32ab1a69a8b177e49bcd12cdc583f5b4066 --- src/box/lua/tuple.lua +++ src/box/lua/tuple.lua @@ -304,9 +304,7 @@ end local function tuple_to_msgpack(buf, tuple) assert(ffi.istype(tuple_t, tuple)) local bsize = builtin.box_tuple_bsize(tuple) - buf:reserve(bsize) - builtin.box_tuple_to_buf(tuple, buf.wpos, bsize) - buf.wpos = buf.wpos + bsize + builtin.box_tuple_to_buf(tuple, buf:alloc(bsize), bsize) end local function tuple_bsize(tuple) blob - 025346e149bfdf33357c4381d04ab95fdc3c30c4 blob + 14880200d1feeb1abd2210802505c7a1801abb97 --- src/httpc.c +++ src/httpc.c @@ -68,7 +68,7 @@ curl_easy_io_read_cb(char *buffer, size_t size, size_t size_t read_len = ibuf_len < buffer_size ? ibuf_len : buffer_size; memcpy(buffer, req->send.rpos, read_len); - req->send.rpos += read_len; + ibuf_consume(&req->send, read_len); fiber_cond_broadcast(&req->io_send_cond); return read_len; @@ -541,7 +541,7 @@ httpc_request_io_read(struct httpc_request *req, char if (copied == ibuf_len) ibuf_reset(&req->io_recv); else - req->io_recv.rpos += copied; + ibuf_consume(&req->io_recv, copied); } if (copied < len && recv_len > 0) { blob - 0d90a483c06d7d73aa6ee93a123c087651fcf702 blob + 110cc700c5b598bf722f3354f7113266bf2602d7 --- src/lib/core/coio_buf.h +++ src/lib/core/coio_buf.h @@ -50,7 +50,7 @@ coio_bread(struct iostream *io, struct ibuf *buf, size ssize_t n = coio_read_ahead(io, buf->wpos, sz, ibuf_unused(buf)); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } @@ -68,7 +68,7 @@ coio_bread_timeout(struct iostream *io, struct ibuf *b timeout); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } @@ -80,7 +80,7 @@ coio_breadn(struct iostream *io, struct ibuf *buf, siz ssize_t n = coio_readn_ahead(io, buf->wpos, sz, ibuf_unused(buf)); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } @@ -99,7 +99,7 @@ coio_breadn_timeout(struct iostream *io, struct ibuf * timeout); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } blob - e60dc7d04bb59c42cc71ca83f816d2da8e097ca2 blob + da7d49a18e8714a8252bcd2e014dda5b89953b89 --- src/lib/core/prbuf.c +++ src/lib/core/prbuf.c @@ -427,6 +427,7 @@ prbuf_reader_wrap(struct prbuf_reader *reader) ibuf_reset(&reader->buf); reader->pos = reader->data_begin; reader->read_pos = reader->pos; + reader->last_read_size = 0; } int @@ -445,6 +446,8 @@ prbuf_reader_next(struct prbuf_reader *reader, return 0; } + /* Consume the record data of the previous read. */ + ibuf_consume(&reader->buf, reader->last_read_size); /* Check if we hit end of buffer and need to wrap around. */ if (reader->data_end - reader->pos < (off_t)record_size_overhead) prbuf_reader_wrap(reader); @@ -472,14 +475,14 @@ prbuf_reader_next(struct prbuf_reader *reader, } /* Read record data. */ - reader->buf.rpos += record_size_overhead; + ibuf_consume(&reader->buf, record_size_overhead); if (prbuf_reader_ensure(reader, sz) != 0) return -1; entry->ptr = reader->buf.rpos; entry->size = sz; + reader->last_read_size = sz; reader->pos += full_sz; - reader->buf.rpos += sz; reader->unread_size -= full_sz; return 0; } blob - ab5303bb0d9a9412bdad072a9a93f116ded2805a blob + c06426984ddfb3ad87641eee86184d420091427f --- src/lib/core/prbuf.h +++ src/lib/core/prbuf.h @@ -95,6 +95,10 @@ struct prbuf_reader { */ size_t unread_size; /** + * Number of bytes in the last record read by client. + */ + size_t last_read_size; + /** * File offset of the beginning of the data area. * Data area is the area after header till the end of the buffer. */ @@ -122,6 +126,9 @@ prbuf_reader_create(struct prbuf_reader *reader, int f * Read the next record into entry argument. If there are no more records * then returned record will be a terminator (EOF, ptr == NULL && size == 0). * + * Pointer to data on successful read is valid until next call to this + * function. + * * After EOF the function can be called again and will return EOF. * * After failure reader is invalid and can only be closed. blob - c34298a3dc4d5fffc920b6ca3485ec0c425abd19 blob + 7d6ad9952415bfe858edafbe74c9ffa63c92d969 --- src/lua/buffer.lua +++ src/lua/buffer.lua @@ -152,6 +152,13 @@ local function ibuf_read(buf, size) local rpos = buf.rpos buf.rpos = rpos + size return rpos +end + +local function ibuf_consume(buf, size) + checkibuf(buf, 'consume') + checksize(buf, size) + utils.poison_memory_region(buf.rpos, size); + buf.rpos = buf.rpos + size end local function ibuf_serialize(buf) @@ -168,6 +175,7 @@ local ibuf_methods = { checksize = ibuf_checksize; read = ibuf_read; + consume = ibuf_consume; __serialize = ibuf_serialize; size = ibuf_used; blob - e694f1fc1db4e7d48e2a740fce11ce91258b24b3 blob + a8f058bf6acb41ec0df033d9c96cec42845b90a7 --- src/lua/httpc.lua +++ src/lua/httpc.lua @@ -424,7 +424,7 @@ local function io_read(self, opts, timeout) local len = check(self, chunk, delimiter) if len ~= nil then local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end @@ -438,15 +438,15 @@ local function io_read(self, opts, timeout) self._errno = nil local len = rbuf:size() local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data else - rbuf.wpos = rbuf.wpos + res + rbuf:alloc(res) local len = check(self, chunk, delimiter) if len ~= nil then self._errno = nil local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end end blob - 28581480497bde2d590a782e3f2de014239e099b blob + 2b6b59759d73c467827582b73f23a07fa8ee952d --- src/lua/socket.lua +++ src/lua/socket.lua @@ -705,7 +705,7 @@ local function read(self, limit, timeout, check, ...) if len ~= nil then self._errno = nil local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end @@ -720,15 +720,15 @@ local function read(self, limit, timeout, check, ...) self._errno = nil local len = rbuf:size() local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data elseif res ~= nil then - rbuf.wpos = rbuf.wpos + res + rbuf:alloc(res) local len = check(self, limit, ...) if len ~= nil then self._errno = nil local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end elseif not errno_is_transient[self._errno] then