commit 706b3a582268f0afa9056277266f78723c8e6512 Author: GAO Xiaoxin Date: Thu Feb 27 00:19:33 2020 +0800 Bug #94803 rpl sql_thread may broken due to XAER_RMFAIL error for unfinished xa transaction Description: If the slave mysql generate a partial relay log with a unfinished xa transaction, the sql_thread of this slave will broken due to the following error: " Error 'XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state' on query. Default database: ''. Query: 'ROLLBACK'". How to repeat: 1. do a big xa transaction in master mysql, we name this transaction as trx1 2. shutdown the slave mysql before it finishes writing the relay log of trx1, so the relay log of trx1 is partial 3. restart slave mysql without auto-start slave, and start the sql_thread only. 4. use show slave status\G to check the slave error. Suggested fix: 1. In coord_handle_partial_binlogged_transaction, we should check it is a external xa transaction 2. if yes, check the xid_state 3. if xid_state is XA_ACTIVE, generate and apply the Query_log_event "XA END", then generate and apply the Query_log_event "XA ROLLBACK" 4. if xid_state is XA_IDLE, generate and apply the Query_log_event "XA ROLLBACK" For xa rollback event execution, the xid_state should be checked again after execution ha_rollback_low: 1. if the xid_state is XA_ACTIVE, we should invoke gtid_state->update_on_rollback to avoid the unexpected modification to executed_gtids 2. if the xid_state is XA_IDEL, we invoke gtid_state->update_on_commit diff --git a/sql/binlog.cc b/sql/binlog.cc index b0fe898..6ee0e69 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -2607,8 +2607,14 @@ end: { error= ha_rollback_low(thd, all); /* Successful XA-rollback commits the new gtid_state */ - if (!error && !thd->is_error()) - gtid_state->update_on_commit(thd); + if (!error && !thd->is_error()) { + bool is_xa_rollback= thd->get_transaction()->xid_state()->has_state(XID_STATE::XA_PREPARED); + if (is_xa_rollback) { + gtid_state->update_on_commit(thd); + } else { + gtid_state->update_on_rollback(thd); + } + } } /* When a statement errors out on auto-commit mode it is rollback diff --git a/sql/rpl_slave.cc b/sql/rpl_slave.cc index 12b571e..64be358 100644 --- a/sql/rpl_slave.cc +++ b/sql/rpl_slave.cc @@ -5011,6 +5011,33 @@ apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli) SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK); } +Log_event *get_xa_rollback_event(THD *thd, const char *sql_buf, ulong data_len) +{ + char *buf= 0; + if (!(buf = (char*) my_malloc(key_memory_log_event, + data_len + 1, MYF(MY_WME)))) + { + return NULL; + } + buf[data_len] = 0; + memcpy(buf, sql_buf, data_len); + + Log_event *rollback_event= new Query_log_event(thd, + buf, + data_len, + true, /* using_trans */ + false, /* immediate */ + true, /* suppress_use */ + 0, /* error */ + true /* ignore_command */); + + rollback_event->register_temp_buf(buf); + ((Query_log_event*) rollback_event)->db= ""; + rollback_event->common_header->data_written= 0; + + return rollback_event; +} + /** Let the worker applying the current group to rollback and gracefully finish its work before. @@ -5065,6 +5092,61 @@ static bool coord_handle_partial_binlogged_transaction(Relay_log_info *rli, mysql_mutex_lock(&rli->data_lock); } + THD *worker_thd= (rli->last_assigned_worker ? + rli->last_assigned_worker->info_thd : thd); + XID_STATE *xid_state= worker_thd->get_transaction()->xid_state(); + if (!xid_state->get_xid()->get_my_xid()) { //It is an external xa transaction + char buf[XIDDATASIZE * 4 + 6]; // see xid_to_str + + std::string xa_end_sql("XA END "); + xa_end_sql.append(xid_state->get_xid()->xid_to_str(buf)); + std::string xa_rollback_sql("XA ROLLBACK "); + xa_rollback_sql.append(xid_state->get_xid()->xid_to_str(buf)); + + if (xid_state->has_state(XID_STATE::XA_ACTIVE)) { + DBUG_PRINT("info",("Injecting QUERY(XA END) to rollback worker")); + Log_event *end_event = get_xa_rollback_event(thd, xa_end_sql.c_str(), xa_end_sql.length()); + if (!end_event) + DBUG_RETURN(true); + end_event->server_id= ev->server_id; + /* + We must be careful to avoid SQL thread increasing its position + farther than the event that triggered this QUERY(ROLLBACK). + */ + end_event->common_header->log_pos= ev->common_header->log_pos; + end_event->future_event_relay_log_pos= ev->future_event_relay_log_pos; + + if (apply_event_and_update_pos(&end_event, thd, rli) != + SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK) + { + delete end_event; + DBUG_RETURN(true); + } + } + if (xid_state->has_state(XID_STATE::XA_ACTIVE) || xid_state->has_state(XID_STATE::XA_IDLE)) { + DBUG_PRINT("info",("Injecting QUERY(XA ROLLBACK) to rollback worker")); + Log_event *rollback_event = get_xa_rollback_event(thd, xa_rollback_sql.c_str(), xa_rollback_sql.length()); + if (!rollback_event) + DBUG_RETURN(true); + rollback_event->server_id= ev->server_id; + /* + We must be careful to avoid SQL thread increasing its position + farther than the event that triggered this QUERY(ROLLBACK). + */ + rollback_event->common_header->log_pos= ev->common_header->log_pos; + rollback_event->future_event_relay_log_pos= ev->future_event_relay_log_pos; + + if (apply_event_and_update_pos(&rollback_event, thd, rli) != + SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK) + { + delete rollback_event; + DBUG_RETURN(true); + } + mysql_mutex_lock(&rli->data_lock); + DBUG_RETURN(false); + } + } + DBUG_PRINT("info",("Injecting QUERY(ROLLBACK) to rollback worker")); Log_event *rollback_event= new Query_log_event(thd, STRING_WITH_LEN("ROLLBACK"), @@ -5947,6 +6029,13 @@ ignore_log_space_limit=%d", DBUG_EXECUTE_IF("stop_io_after_queuing_event", thd->killed= THD::KILLED_NO_VALUE; ); + DBUG_EXECUTE_IF("stop_io_after_reading_delete_rows_log_event", + if (event_buf[EVENT_TYPE_OFFSET] == binary_log::DELETE_ROWS_EVENT) { + thd->killed= THD::KILLED_NO_VALUE; + continue; + } + ); + /* After event is flushed to relay log file, memory used by thread's mem_root is not required any more.