diff --git a/client/mysqltest.cc b/client/mysqltest.cc index 7e4d673608f..5b7a780f302 100644 --- a/client/mysqltest.cc +++ b/client/mysqltest.cc @@ -6425,10 +6425,6 @@ static void do_connect(struct st_command *command) { opt_protocol = MYSQL_PROTOCOL_PIPE; } - if (opt_compress || con_compress) { - enable_async_client = false; - } - if (opt_protocol) { mysql_options(&con_slot->mysql, MYSQL_OPT_PROTOCOL, (char *)&opt_protocol); /* @@ -9230,10 +9226,6 @@ int main(int argc, char **argv) { #endif SSL_SET_OPTIONS(&con->mysql); - if (opt_compress) { - enable_async_client = false; - } - #if defined(_WIN32) if (shared_memory_base_name) mysql_options(&con->mysql, MYSQL_SHARED_MEMORY_BASE_NAME, diff --git a/include/mysql.h.pp b/include/mysql.h.pp index e34437e6fc9..7d1262669ac 100644 --- a/include/mysql.h.pp +++ b/include/mysql.h.pp @@ -153,13 +153,17 @@ typedef struct NET { size_t async_bytes_wanted; enum net_async_read_packet_state async_packet_read_state; size_t async_packet_length; + size_t async_packet_uncompressed_length; + size_t multi_packet_offset; unsigned char *async_write_headers; struct iovec *async_write_vector; size_t async_write_vector_size; size_t async_write_vector_current; - unsigned char - inline_async_write_header[4 + 3 + 1 + 1]; + unsigned char inline_async_write_header[4 + 3 + + 4 + 1 + 1]; struct iovec inline_async_write_vector[3]; + unsigned char **compressed_write_buffers; + size_t compressed_buffers_size; unsigned long async_multipacket_read_saved_whereb; unsigned long async_multipacket_read_total_len; bool async_multipacket_read_started; diff --git a/include/mysql_com.h b/include/mysql_com.h index aba33742e5d..2b96eb94e4e 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -925,6 +925,8 @@ typedef struct NET { enum net_async_read_packet_state async_packet_read_state; /* Size of the packet we're currently reading */ size_t async_packet_length; + size_t async_packet_uncompressed_length; + size_t multi_packet_offset; /** Headers and vector for our async writes; see net_serv.c for @@ -934,9 +936,11 @@ typedef struct NET { struct iovec *async_write_vector; size_t async_write_vector_size; size_t async_write_vector_current; - unsigned char - inline_async_write_header[NET_HEADER_SIZE + COMP_HEADER_SIZE + 1 + 1]; + unsigned char inline_async_write_header[NET_HEADER_SIZE + COMP_HEADER_SIZE + + NET_HEADER_SIZE + 1 + 1]; struct iovec inline_async_write_vector[3]; + unsigned char **compressed_write_buffers; + size_t compressed_buffers_size; /* State for reading responses that are larger than MAX_PACKET_LENGTH */ unsigned long async_multipacket_read_saved_whereb; diff --git a/sql-common/net_serv.cc b/sql-common/net_serv.cc index 7968fcca48f..5f668602d62 100644 --- a/sql-common/net_serv.cc +++ b/sql-common/net_serv.cc @@ -88,6 +88,7 @@ extern void thd_increment_bytes_received(size_t length); #endif static bool net_write_buff(NET *, const uchar *, size_t); +static uchar *compress_packet(NET *net, const uchar *packet, size_t *length); static void reset_packet_write_state(NET *net) { DBUG_ENTER(__func__); @@ -107,6 +108,20 @@ static void reset_packet_write_state(NET *net) { net->async_write_vector_size = 0; net->async_write_vector_current = 0; + + if (net->compressed_write_buffers) { + // There are two entries per packet, one for header and one for payload. + // We only need to free payloads as headers have their own buffer. If the + // last packet was size 0, the vector size will be 1 lower and due to int + // truncation for odd numbers will be correctly accounted for + for (size_t i = 0; i < net->compressed_buffers_size; ++i) { + my_free(net->compressed_write_buffers[i]); + } + my_free(net->compressed_write_buffers); + net->compressed_write_buffers = nullptr; + net->compressed_buffers_size = 0; + } + DBUG_VOID_RETURN; } @@ -139,6 +154,9 @@ bool my_net_init(NET *net, Vio *vio) { net->async_packet_read_state = NET_ASYNC_PACKET_READ_IDLE; net->async_write_vector = nullptr; net->async_write_headers = nullptr; + net->multi_packet_offset = 0; + net->compressed_write_buffers = nullptr; + net->compressed_buffers_size = 0; if (vio) { /* For perl DBI/DBD. */ @@ -436,6 +454,10 @@ static int begin_packet_write_state(NET *net, uchar command, const uchar *optional_prefix, size_t prefix_len) { DBUG_ENTER(__func__); + size_t header_len = NET_HEADER_SIZE; + if (net->compress) { + header_len += NET_HEADER_SIZE + COMP_HEADER_SIZE; + } size_t total_len = packet_len + prefix_len; bool include_command = (command < COM_END); if (include_command) { @@ -446,6 +468,8 @@ static int begin_packet_write_state(NET *net, uchar command, struct iovec *vec; uchar *headers; + uchar **compressed_buffers = nullptr; + if (total_len < MAX_PACKET_LENGTH) { // Most writes hit this case, ie, less than MAX_PACKET_LENGTH of querytext. vec = net->inline_async_write_vector; @@ -459,20 +483,34 @@ static int begin_packet_write_state(NET *net, uchar command, DBUG_RETURN(0); } - headers = (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, - packet_count * (NET_HEADER_SIZE + 1), - MYF(MY_ZEROFILL)); + // Extra byte to header_len for command. + headers = + (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, + packet_count * (header_len + 1), MYF(MY_ZEROFILL)); if (!headers) { my_free(vec); DBUG_RETURN(0); } } - // Regardless of where vec and headers come from, these are what we - // feed to writev and populate below. + // Regardless of where vec and headers come from, these are what we feed to + // writev and populate below. net->async_write_vector = vec; net->async_write_headers = headers; + if (net->compress) { + // Will need to hand compress and manage at most 1 buffer per packet + compressed_buffers = + (uchar **)my_malloc(key_memory_NET_compress_packet, + sizeof(uchar *) * packet_count, MYF(MY_ZEROFILL)); + if (!compressed_buffers) { + reset_packet_write_state(net); + DBUG_RETURN(0); + } + } + + net->compressed_write_buffers = compressed_buffers; + // We sneak the command into the first header, so the special casing // below about packet_num == 0 relates to that. This lets us avoid // an extra allocation and copying the input buffers again. @@ -483,16 +521,35 @@ static int begin_packet_write_state(NET *net, uchar command, // amount of data, so that one actually might consume *three* iovec // entries. for (size_t packet_num = 0; packet_num < packet_count; ++packet_num) { - // First packet, our header. - uchar *buf = headers + packet_num * NET_HEADER_SIZE; - if (packet_num > 0) { - // First packet stole one extra byte from the header buffer for - // the command number, so account for it here. - ++buf; - } - size_t header_len = NET_HEADER_SIZE; + // The first iovec contains the headers only and command if it is + // provided. + uchar *buf = headers + packet_num * (header_len + 1); size_t bytes_queued = 0; + (*vec).iov_base = buf; + (*vec).iov_len = header_len; + + // If using compression, add the compression header. Usually, we would + // rely on compress_packet to add compression headers, but here we assume + // that headers do not compress well due to their short length and send + // them as is by constructing our own packet and incrementing + // compress_pkt_nr manually. + // + // We don't compress the headers together with the payload because that + // would mean extra memcpy's to concatenate the buffers to pass into + // compress_packet. + if (net->compress) { + size_t packet_len = NET_HEADER_SIZE; + if (packet_num == 0) { + packet_len += prefix_len + (include_command ? 1 : 0); + } + int3store(buf, packet_len); + buf[3] = (uchar)net->compress_pkt_nr++; + // The bytes in COMP_HEADER_SIZE are implicitly zero because they were + // zerofilled. A zero length means that the contents are uncompressed. + buf += NET_HEADER_SIZE + COMP_HEADER_SIZE; + } + size_t packet_size = min(MAX_PACKET_LENGTH, total_len); int3store(buf, packet_size); buf[3] = (uchar)net->pkt_nr++; @@ -502,25 +559,23 @@ static int begin_packet_write_state(NET *net, uchar command, // separate one-byte entry in our iovec. if (packet_num == 0 && include_command) { buf[4] = command; - ++header_len; + (*vec).iov_len++; // Our command byte counts against the packet size. ++bytes_queued; } - (*vec).iov_base = buf; - (*vec).iov_len = header_len; ++vec; - // Second packet, our optional prefix (if any). + // Second iovec (if any), our optional prefix. if (packet_num == 0 && optional_prefix != NULL) { (*vec).iov_base = (void *)optional_prefix; (*vec).iov_len = prefix_len; - ++vec; bytes_queued += prefix_len; + ++vec; } - // Final packet, the payload itself. Send however many bytes from - // packet we have left, and advance our packet pointer. + // Final iovec, the payload itself. Send however many bytes from packet we + // have left, and advance our packet pointer. size_t remaining_bytes = packet_size - bytes_queued; (*vec).iov_base = (void *)packet; (*vec).iov_len = remaining_bytes; @@ -530,6 +585,38 @@ static int begin_packet_write_state(NET *net, uchar command, packet += remaining_bytes; total_len -= bytes_queued; + // If we have a payload to compress, then compress_packet will + // add compression headers for us. This is what we have at this point where + // each line is an iovec. + // | len |cpn|uncompress len| len | pn | command | + // | prefix + command | 0| 0| total_len = command + prefix + payload | 0 | COM_* | + // + // | prefix | + // | ... | + // + // | payload | + // | ... | + // + // We want to transform into this: + // | len |cpn|uncompress len| len | pn | command | + // | prefix + command | 0| 0| total_len = command + prefix + payload | 0 | COM_* | + // + // | prefix | + // | ... | + // + // | len |cpn|uncompress len| compressed payload | + // | len(compressed payload) | 1| len(payload)| compress(payload) | + if (net->compress && remaining_bytes) { + (*vec).iov_base = + compress_packet(net, (uchar *)(*vec).iov_base, &(*vec).iov_len); + if (!(*vec).iov_base) { + reset_packet_write_state(net); + DBUG_RETURN(0); + } + compressed_buffers[net->compressed_buffers_size++] = + (uchar *)(*vec).iov_base; + } + ++vec; // Make sure we sent entire packets. @@ -544,6 +631,12 @@ static int begin_packet_write_state(NET *net, uchar command, net->async_write_vector_size = (vec - net->async_write_vector); net->async_write_vector_current = 0; + // This is needed because the packet reading code in net_read_packet_header + // uses pkt_nr for verification. + if (net->compress) { + net->pkt_nr = net->compress_pkt_nr; + } + DBUG_RETURN(1); } @@ -1380,7 +1473,11 @@ static net_async_status net_read_packet_header_nonblock(NET *net, bool *err_ptr) { DBUG_ENTER(__func__); uchar pkt_nr; - if (net_read_data_nonblocking(net, NET_HEADER_SIZE, err_ptr) == + size_t bytes_wanted = NET_HEADER_SIZE; + if (net->compress) { + bytes_wanted += COMP_HEADER_SIZE; + } + if (net_read_data_nonblocking(net, bytes_wanted, err_ptr) == NET_ASYNC_NOT_READY) { DBUG_RETURN(NET_ASYNC_NOT_READY); } @@ -1388,7 +1485,7 @@ static net_async_status net_read_packet_header_nonblock(NET *net, DBUG_RETURN(NET_ASYNC_COMPLETE); } - DBUG_DUMP("packet_header", net->buff + net->where_b, NET_HEADER_SIZE); + DBUG_DUMP("packet_header", net->buff + net->where_b, bytes_wanted); pkt_nr = net->buff[net->where_b + 3]; @@ -1447,10 +1544,27 @@ static net_async_status net_read_packet_nonblocking(NET *net, ulong *ret, net->async_packet_length = uint3korr(net->buff + net->where_b); DBUG_PRINT("info", ("async packet len: %lu", net->async_packet_length)); + // If this is using the compressed protocol, the next 3 bytes contain + // the length of the uncompressed contents of the payload. + if (net->compress) { + // The following uint3korr() may read 4 bytes, so make sure we don't + // read unallocated or uninitialized memory. The right-hand expression + // must match the size of the buffer allocated in net_realloc(). + DBUG_ASSERT(net->where_b + NET_HEADER_SIZE + sizeof(uint32) <= + net->max_packet + NET_HEADER_SIZE + COMP_HEADER_SIZE + 1); + + net->async_packet_uncompressed_length = + uint3korr(net->buff + net->where_b + NET_HEADER_SIZE); + } else { + net->async_packet_uncompressed_length = 0; + } + /* End of big multi-packet. */ if (!net->async_packet_length) goto end; - pkt_data_len = max(net->async_packet_length, *complen) + net->where_b; + pkt_data_len = + max(net->async_packet_length, net->async_packet_uncompressed_length) + + net->where_b; /* Expand packet buffer if necessary. */ if ((pkt_data_len >= net->max_packet) && net_realloc(net, pkt_data_len)) @@ -1483,6 +1597,20 @@ end: net->read_pos[*ret] = 0; net->reading_or_writing = 0; + + if (net->compress) { + *complen = net->async_packet_uncompressed_length; + if (my_uncompress(net->buff + net->where_b, net->async_packet_length, + complen)) { + net->error = 2; // caller will close socket + net->last_errno = ER_NET_UNCOMPRESS_ERROR; +#ifdef MYSQL_SERVER + my_error(ER_NET_UNCOMPRESS_ERROR, MYF(0)); +#endif + *ret = *complen = packet_error; + DBUG_RETURN(NET_ASYNC_COMPLETE); + } + } DBUG_RETURN(NET_ASYNC_COMPLETE); error: @@ -1554,9 +1682,141 @@ error: return packet_error; } +/* + * NET FIELDS + * net->buff the head of the buffer + * net->buf_length buff + buff_length is the buffer that contains data + * net->remain_in_buf the data in [remain_in_buf, buf_length) + * is data buffered to be read + * first_packet_offset Points to the header of the packet to be returned + * curr_packet_offset In multipackets, points to the next header that will + * be erased. + * + * To return Packet 2 + ---------------------------------------------- + |H1|P1|H2|P2|H3|P3|... + ---------------------------------------------- + ^ ^ ^ ^ + ^ ^ ^ net->where_b (used for network writes) + ^ ^ ^ net->buf_length (end of readable bytes) + ^ ^ ^ + ^ ^ net->read_pos (Return stripped packet) + ^ ^ + ^ first_packet_offset + ^ + net->buff +*/ +net_async_status my_net_read_compressed_nonblocking(NET *net, ulong *len_ptr, + ulong *complen_ptr) { + ulong curr_packet_offset; + ulong first_packet_offset; + uint read_length, multi_byte_packet = 0; + + if (net->remain_in_buf) { + first_packet_offset = curr_packet_offset = + (net->buf_length - net->remain_in_buf); + net->buff[curr_packet_offset] = net->save_char; + } else { + net->buf_length = first_packet_offset = curr_packet_offset = 0; + } + + for (;;) { + if (net->buf_length - curr_packet_offset >= NET_HEADER_SIZE) { + read_length = uint3korr(net->buff + curr_packet_offset); + if (!read_length) { + curr_packet_offset += NET_HEADER_SIZE; + break; + } + if (read_length + NET_HEADER_SIZE <= + net->buf_length - curr_packet_offset) { + // Strip headers from subsequent packets in multi-packets + if (multi_byte_packet) { + memmove(net->buff + curr_packet_offset, + net->buff + curr_packet_offset + NET_HEADER_SIZE, + net->buf_length - curr_packet_offset - NET_HEADER_SIZE); + + // curr_packet_offset is updated below + net->multi_packet_offset += read_length; + net->buf_length -= NET_HEADER_SIZE; + net->remain_in_buf -= NET_HEADER_SIZE; + } + + if (read_length < MAX_PACKET_LENGTH) { + // Subtract multi byte packet to account for stripped headers + curr_packet_offset += + read_length + NET_HEADER_SIZE - multi_byte_packet; + multi_byte_packet = 0; + break; + } else { + if (!net->multi_packet_offset) { + net->multi_packet_offset = read_length + NET_HEADER_SIZE; + } + curr_packet_offset = first_packet_offset + net->multi_packet_offset; + + multi_byte_packet = NET_HEADER_SIZE; + } + continue; + } + } + + // If we reach here, we need to read off network. + // Start by clearing the front of the buffer. + if (first_packet_offset) { + memmove(net->buff, net->buff + first_packet_offset, + net->buf_length - first_packet_offset); + net->buf_length -= first_packet_offset; + curr_packet_offset -= first_packet_offset; + first_packet_offset = 0; + } + + net->where_b = net->buf_length; + if (net_read_packet_nonblocking(net, len_ptr, complen_ptr) == + NET_ASYNC_NOT_READY) { + net->save_char = net->buff[first_packet_offset]; + *len_ptr = 0; + *complen_ptr = 0; + return NET_ASYNC_NOT_READY; + } + + if (*len_ptr == packet_error) { + return NET_ASYNC_COMPLETE; + } + + *len_ptr = *complen_ptr; + + net->buf_length += *complen_ptr; + net->remain_in_buf += *complen_ptr; + } + + net->multi_packet_offset = 0; + net->read_pos = net->buff + first_packet_offset + NET_HEADER_SIZE; + + net->remain_in_buf = (ulong)(net->buf_length - curr_packet_offset); + size_t len = ((ulong)(curr_packet_offset - first_packet_offset) - + NET_HEADER_SIZE - multi_byte_packet); + if (net->remain_in_buf) { + // If multi byte packet is non-zero then there is a zero length + // packet at read_pos[len]. Adding the size of one header + // reads the correct byte that will later be replaced. Guarded + // to avoid buffer overflow. If remain_buf = 0 then the char + // wont be restored anyway + net->save_char = net->read_pos[len + multi_byte_packet]; + } + net->read_pos[len] = 0; // Safeguard for mysql_use_result + + *len_ptr = *complen_ptr = len; + return NET_ASYNC_COMPLETE; +} + net_async_status my_net_read_nonblocking(NET *net, ulong *len_ptr) { size_t len, complen; - if (!net->compress) { + if (net->compress) { + if (my_net_read_compressed_nonblocking(net, &len, &complen) == + NET_ASYNC_NOT_READY) { + return NET_ASYNC_NOT_READY; + } + *len_ptr = len; + } else { if (net->async_multipacket_read_started == false) { net->async_multipacket_read_started = true; net->async_multipacket_read_saved_whereb = net->where_b;