commit ccab33084acf61eb553067d08beafba02625a8fc Author: GAO Xiaoxin Date: Tue Oct 6 01:09:33 2020 +0800 Issue #101056 Ack_receiver may lost the slave semi-sync ack due to net timeout Description: For semi-sync master, the Ack_receiver will loop all semi-sync slaves to receive slave ack response packet and invoke reportReplyPacket. But if the Ack_receiver read one slave ack packet partial with net timeout, Ack_receiver will not be able to get this ack packet correctly any more. The reason is: 1. Before invoke my_net_read to read client ack packet, "net_clear(&net, false);" is invoked, which will clear the previous partial read packet info. 2. All slaves share one net.buff, so the next slave must do net_clear before read its ack packet 3. my_net_read does not designed to handle packet partial received situation. For each invoke, it will always use net->buff to receive the packet header and then use net->buff to receive the packet body again. Each receiving store the net info from the begin of the net->buff, which means the net->buff is reset by each net_read_raw_loop. So if partial read packet, the received part will always be abandoned, so the rest part will not be able to receive correctly. For semi-sync, the Ack_receiver may get error "packet out of order" or "magic number error" or "larger than max_allowed_packet", if the ack packet partial received. How to repeat: Run the test case in the attached patch. The master server will not get enough ack response from slaves due to ack packet partial received, and the semi-sync timeout will happen. Suggested fix: In order to fix this bug: 1. we should use different net buffer for each semi-sync slave 2. store the partial read info into the slave, and use this info to reset the NET object before invoking my_net_read Add the following attributes into the struct Slave: unsigned char *buff, *buff_end, *write_pos, *read_pos; unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; bool has_read_head; ulong body_len_nead; ulong has_read_len; uchar pkt_nr; uchar compress_pkt_nr; Above attributes will provide the separate net buffer for each slave, and kept the partial packet info. And for struct NET, we add the following attributes: bool need_handle_partial_read; bool partial_read_head_once; bool partial_read_body_once; bool has_read_head; ulong body_len_nead; ulong has_read_len; So each time, we use the slave's kept info to reset NET before invoke my_net_read, and after packet handling , we store the current NET attributes into the related slave. The my_net_read will calculate the rest part of packet from the info of NET, and return the correct packet for reportReplyPacket. diff --git a/include/mysql.h.pp b/include/mysql.h.pp index 88a6aba..80ac858 100644 --- a/include/mysql.h.pp +++ b/include/mysql.h.pp @@ -151,6 +151,12 @@ typedef struct NET { char last_error[512]; char sqlstate[5 + 1]; void *extension; + bool need_handle_partial_read; + bool partial_read_head_once; + bool partial_read_body_once; + bool has_read_head; + ulong body_len_nead; + ulong has_read_len; } NET; enum mysql_enum_shutdown_level { SHUTDOWN_DEFAULT = 0, diff --git a/include/mysql_com.h b/include/mysql_com.h index 0aea32e..c734cab 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -881,6 +881,12 @@ typedef struct NET { to maintain the server internal instrumentation for the connection. */ void *extension; + bool need_handle_partial_read; + bool partial_read_head_once; + bool partial_read_body_once; + bool has_read_head; + ulong body_len_nead; + ulong has_read_len; } NET; #define packet_error (~(unsigned long)0) diff --git a/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial-master.opt b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial-master.opt new file mode 100644 index 0000000..c92c4da --- /dev/null +++ b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial-master.opt @@ -0,0 +1,2 @@ +$SEMISYNC_PLUGIN_OPT --skip-ssl +--force-restart diff --git a/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial-slave.opt b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial-slave.opt new file mode 100644 index 0000000..58029d2 --- /dev/null +++ b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial-slave.opt @@ -0,0 +1 @@ +$SEMISYNC_PLUGIN_OPT diff --git a/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial.cnf b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial.cnf new file mode 100644 index 0000000..e9c3f85 --- /dev/null +++ b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial.cnf @@ -0,0 +1,23 @@ +!include ../my.cnf + +# Set innodb-write-io-threads and innodb-read-io-threads to 2, +# instead of the default value 4, so that the aio-max-nr limit +# is not exceeded due to the increased number of concurrent mysqld +# instances when MTR runs rpl tests with parallel 4 or more. + +[mysqld.1] +server_id=1 + +[mysqld.2] +server_id=2 + +[mysqld.3] +server_id=3 +loose-innodb_write_io_threads= 2 +loose-innodb_read_io_threads= 2 + + +[ENV] +SERVER_MYPORT_3= @mysqld.3.port +SERVER_MYSOCK_3= @mysqld.3.socket + diff --git a/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial.test b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial.test new file mode 100644 index 0000000..58d3d76 --- /dev/null +++ b/mysql-test/suite/rpl_nogtid/t/rpl_semi_sync_read_ack_partial.test @@ -0,0 +1,78 @@ +################################################################################ +# Semisync: test semi sync ack read partial +# +################################################################################ +--source include/not_group_replication_plugin.inc +--source include/have_debug.inc + +--let rpl_topology=1->2, 1->3 +--source include/rpl_init.inc +CREATE TABLE t1(c1 INT); +--source include/rpl_sync.inc + +--let $rpl_connection_name= server_1 +--source include/rpl_connection.inc +--source include/install_semisync_master.inc +--let $rpl_connection_name= server_2 +--source include/rpl_connection.inc +--source include/install_semisync_slave.inc +--let $rpl_connection_name= server_3 +--source include/rpl_connection.inc +--source include/install_semisync_slave.inc + +--let $rpl_connection_name= server_1 +--source include/rpl_connection.inc +--source include/wait_for_semisync_master_status_on.inc + +# It is for coverage test to cover the debug code. +SET GLOBAL rpl_semi_sync_master_trace_level= 255; + +--echo #################################################################### +--echo # Test Case: test read ack packet head partial +--echo #################################################################### +--source include/save_semisync_yesno_tx.inc + +SET @save_debug=@@global.debug; +SET GLOBAL DEBUG='d,semi_sync_ack_parital_read_head'; +# The statement will timeout, just make a smaller wait time. +SET GLOBAL rpl_semi_sync_master_timeout=3000; +# Wait all slave, so if the ack receive fail the semi-sync wait will timeout +SET GLOBAL rpl_semi_sync_master_wait_for_slave_count = 2; + +INSERT INTO t1 VALUES(1); + +--let $semi_sync_yes_tx_increment= 1 +--source include/assert_semisync_yesno_tx_increment.inc + +# do a normal insert +INSERT INTO t1 VALUES(2); + +--echo #################################################################### +--echo # Test Case: test read ack packet body partial +--echo #################################################################### +--source include/save_semisync_yesno_tx.inc + +SET GLOBAL DEBUG='d,semi_sync_ack_parital_read_body'; +INSERT INTO t1 VALUES(3); + +--let $semi_sync_yes_tx_increment= 1 +--source include/assert_semisync_yesno_tx_increment.inc + +SET GLOBAL DEBUG=''; + +--let $rpl_connection_name= server_2 +--source include/rpl_connection.inc +--source include/uninstall_semisync_slave.inc +--let $rpl_connection_name= server_3 +--source include/rpl_connection.inc +--source include/uninstall_semisync_slave.inc +--let $rpl_connection_name= server_1 +--source include/rpl_connection.inc +--source include/uninstall_semisync_master.inc + +--let $rpl_connection_name= server_1 +--source include/rpl_connection.inc +DROP TABLE t1; +CALL mtr.add_suppression(".* Timeout waiting for reply of binlog .*"); +CALL mtr.add_suppression(".* Failed to allocate memory for ack_array .*"); +--source include/rpl_end.inc diff --git a/plugin/semisync/semisync_master_ack_receiver.cc b/plugin/semisync/semisync_master_ack_receiver.cc index 678470a..a8aadef 100644 --- a/plugin/semisync/semisync_master_ack_receiver.cc +++ b/plugin/semisync/semisync_master_ack_receiver.cc @@ -141,6 +141,16 @@ bool Ack_receiver::add_slave(THD *thd) { slave.thread_id = thd->thread_id(); slave.server_id = thd->server_id; slave.compress_ctx.algorithm = enum_compression_algorithm::MYSQL_UNCOMPRESSED; + slave.buff = slave.net_buff; + slave.buff_end = slave.net_buff + REPLY_MESSAGE_MAX_LENGTH; + slave.write_pos = slave.buff; + slave.read_pos = slave.buff; + slave.has_read_head = false; + slave.body_len_nead = 0; + slave.has_read_len = 0; + slave.pkt_nr = 0; + slave.compress_pkt_nr = 0; + char *cmp_algorithm_name = thd->get_protocol()->get_compression_algorithm(); if (cmp_algorithm_name != nullptr) { enum enum_compression_algorithm algorithm = @@ -238,6 +248,46 @@ static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) { net->buff = buff; net->buff_end = buff + buff_len; net->read_pos = net->buff; +#ifndef DBUG_OFF + net->partial_read_head_once = false; + net->partial_read_body_once = false; + net->need_handle_partial_read = false; + net->has_read_head = false; + net->body_len_nead = 0; + net->has_read_len = 0; +#endif +} + +void set_net_attr_by_slave(NET *net, Slave *slave_obj) +{ + //reset the net attr from slave's releated attr, so the net can continue to + //read the packet of this slave from the previous partial read pos of this + //slave + net->buff = slave_obj->buff; + net->buff_end = slave_obj->buff_end; + net->read_pos = slave_obj->read_pos; + net->write_pos = slave_obj->write_pos; + net->has_read_head = slave_obj->has_read_head; + net->body_len_nead = slave_obj->body_len_nead; + net->has_read_len = slave_obj->has_read_len; + net->compress_pkt_nr = slave_obj->compress_pkt_nr; + net->pkt_nr = slave_obj->pkt_nr; +} + +void set_slave_attr_by_net(NET *net, Slave *slave_obj) +{ + //keep the net read pos info to the slave obj, which will be used to + //continue read packet from the partial read pos in the next time + slave_obj->buff = net->buff; + slave_obj->buff_end = net->buff_end; + slave_obj->read_pos = net->read_pos; + slave_obj->write_pos = net->write_pos; + slave_obj->has_read_head = net->has_read_head; + slave_obj->body_len_nead = net->body_len_nead; + slave_obj->has_read_len = net->has_read_len; + slave_obj->compress_pkt_nr = net->compress_pkt_nr; + slave_obj->pkt_nr = net->pkt_nr; + } void Ack_receiver::run() { @@ -255,11 +305,15 @@ void Ack_receiver::run() { server_extn.m_after_header = nullptr; server_extn.compress_ctx.algorithm = MYSQL_UNCOMPRESSED; net.extension = &server_extn; + net.need_handle_partial_read = true; mysql_mutex_lock(&m_mutex); m_slaves_changed = true; mysql_mutex_unlock(&m_mutex); + bool has_test_semi_sync_ack_parital_read_head = false; + bool has_test_semi_sync_ack_partial_read_body = false; + while (true) { int ret; @@ -290,6 +344,19 @@ void Ack_receiver::run() { continue; } + DBUG_EXECUTE_IF("semi_sync_ack_parital_read_head", + if (!has_test_semi_sync_ack_parital_read_head) { + net.partial_read_head_once = true; + has_test_semi_sync_ack_parital_read_head = true; + } + ); + DBUG_EXECUTE_IF("semi_sync_ack_parital_read_body", + if (!has_test_semi_sync_ack_partial_read_body) { + net.partial_read_body_once = true; + has_test_semi_sync_ack_partial_read_body = true; + } + ); + set_stage_info(stage_reading_semi_sync_ack); i = 0; while (i < listener.number_of_slave_sockets() && m_status == ST_UP) { @@ -297,6 +364,13 @@ void Ack_receiver::run() { Slave slave_obj = listener.get_slave_obj(i); ulong len; net.vio = slave_obj.vio; + + mysql_mutex_lock(&m_mutex); + Slave *slave_real = get_slave_safe(i, slave_obj.server_id); + if (slave_real) + set_net_attr_by_slave(&net, slave_real); + mysql_mutex_unlock(&m_mutex); + /* Set compress flag. This is needed to support Slave_compress_protocol flag enabled Slaves @@ -309,15 +383,21 @@ void Ack_receiver::run() { (server_extension->compress_ctx.algorithm == MYSQL_ZSTD); do { - net_clear(&net, false); - len = my_net_read(&net); - if (likely(len != packet_error)) + if (likely(len != packet_error)) { repl_semisync->reportReplyPacket(slave_obj.server_id, net.read_pos, len); + net_clear(&net, false); + } else if (net.last_errno == ER_NET_READ_ERROR) listener.clear_socket_info(i); } while (net.vio->has_data(net.vio) && m_status == ST_UP); + mysql_mutex_lock(&m_mutex); + slave_real = get_slave_safe(i, slave_obj.server_id); + if (slave_real) + set_slave_attr_by_net(&net, slave_real); + mysql_mutex_unlock(&m_mutex); + } i++; } diff --git a/plugin/semisync/semisync_master_ack_receiver.h b/plugin/semisync/semisync_master_ack_receiver.h index fd385be..c596619 100644 --- a/plugin/semisync/semisync_master_ack_receiver.h +++ b/plugin/semisync/semisync_master_ack_receiver.h @@ -41,6 +41,14 @@ struct Slave { bool is_leaving; my_socket sock_fd() const { return vio->mysql_socket.fd; } + + unsigned char *buff, *buff_end, *write_pos, *read_pos; + unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; + bool has_read_head; + ulong body_len_nead; + ulong has_read_len; + uchar pkt_nr; + uchar compress_pkt_nr; }; typedef std::vector Slave_vector; @@ -109,6 +117,18 @@ class Ack_receiver : public ReplSemiSyncBase { return false; } + Slave *get_slave_safe(uint pos, uint server_id) + { + //assume the invoker has hold the mutext + if (!m_slaves_changed) + return &m_slaves[pos]; + for (uint i = 0; i < m_slaves.size(); i++) { + if (m_slaves[i].server_id == server_id) + return &m_slaves[i]; + } + return NULL; + } + private: enum status { ST_UP, ST_DOWN, ST_STOPPING }; uint8 m_status; diff --git a/sql-common/net_serv.cc b/sql-common/net_serv.cc index 29669f4..b397386 100644 --- a/sql-common/net_serv.cc +++ b/sql-common/net_serv.cc @@ -151,6 +151,13 @@ bool my_net_init(NET *net, Vio *vio) { ext->compress_ctx.algorithm = enum_compression_algorithm::MYSQL_UNCOMPRESSED; net->extension = ext; #endif + net->partial_read_head_once = false; + net->partial_read_head_once = false; + net->need_handle_partial_read = false; + net->has_read_head = false; + net->body_len_nead = 0; + net->has_read_len = 0; + if (vio) { /* For perl DBI/DBD. */ net->fd = vio_fd(vio); @@ -239,6 +246,10 @@ void net_clear(NET *net, bool check_buffer MY_ATTRIBUTE((unused))) { /* Ready for new command */ net->pkt_nr = net->compress_pkt_nr = 0; net->write_pos = net->buff; + net->has_read_head = false; + net->body_len_nead = 0; + net->has_read_len = 0; + net->error = 0; } /** Flush write_buffer if not empty. */ @@ -1216,6 +1227,20 @@ static bool net_read_raw_loop(NET *net, size_t count) { unsigned int retry_count = 0; uchar *buf = net->buff + net->where_b; + if (net->need_handle_partial_read) { + count -= net->has_read_len; + buf += net->has_read_len; + } + size_t need_len = count; +#ifndef DBUG_OFF +#ifdef MYSQL_SERVER + if (net->has_read_head && net->partial_read_body_once) { + count = count - 4; //not read the whole head size to simulate partial packet read timeout exception + } +#endif +#endif + + while (count) { size_t recvcnt = vio_read(net->vio, buf, count); @@ -1239,6 +1264,21 @@ static bool net_read_raw_loop(NET *net, size_t count) { thd_increment_bytes_received(recvcnt); #endif } + if (net->need_handle_partial_read) + net->has_read_len += need_len - count; + +#ifndef DBUG_OFF +#ifdef MYSQL_SERVER + if (net->has_read_head && net->partial_read_body_once) { + net->partial_read_body_once = false; + net->last_errno = ER_NET_READ_INTERRUPTED; + net->error = 2; + net->has_read_len = need_len - 4; + net->has_read_head = true; + return true; + } +#endif +#endif /* On failure, propagate the error code. */ if (count) { @@ -1287,6 +1327,13 @@ static bool net_read_packet_header(NET *net) { size_t count = NET_HEADER_SIZE; bool rc; +#ifndef DBUG_OFF +#ifdef MYSQL_SERVER + if (net->partial_read_head_once) { + --count; //not read the whole head size to simulate partial packet read timeout exception + } +#endif +#endif if (net->compress) count += COMP_HEADER_SIZE; #ifdef MYSQL_SERVER @@ -1309,7 +1356,18 @@ static bool net_read_packet_header(NET *net) { } if (rc) return true; - +#ifndef DBUG_OFF +#ifdef MYSQL_SERVER + if (net->partial_read_head_once) { + net->partial_read_head_once = false; + net->last_errno = ER_NET_READ_INTERRUPTED; + net->error = 2; + net->has_read_len = NET_HEADER_SIZE - 1; + net->has_read_head = false; + return true; + } +#endif +#endif DBUG_DUMP("packet_header", net->buff + net->where_b, NET_HEADER_SIZE); pkt_nr = net->buff[net->where_b + 3]; @@ -1589,6 +1647,12 @@ static size_t net_read_packet(NET *net, size_t *complen) { net->reading_or_writing = 1; + if (net->need_handle_partial_read && net->has_read_head) { + pkt_len = net->body_len_nead; + if (net_read_raw_loop(net, pkt_len)) goto error; + goto end; + } + /* Retrieve packet length and number. */ if (net_read_packet_header(net)) goto error; @@ -1621,6 +1685,11 @@ static size_t net_read_packet(NET *net, size_t *complen) { if ((pkt_data_len >= net->max_packet) && net_realloc(net, pkt_data_len)) goto error; + if (net->need_handle_partial_read) { + net->has_read_head = true; + net->body_len_nead = pkt_len; + net->has_read_len = 0; + } /* Read the packet data (payload). */ if (net_read_raw_loop(net, pkt_len)) goto error;