diff --git a/libbinlogevents/include/statement_events.h b/libbinlogevents/include/statement_events.h index 47b976c..92be891 100644 --- a/libbinlogevents/include/statement_events.h +++ b/libbinlogevents/include/statement_events.h @@ -552,7 +552,7 @@ public: bool flags2_inited; bool sql_mode_inited; bool charset_inited; - + char m_release_query_buf;// 1: free(this->query); 2: delete[]this->query. uint32_t flags2; /* In connections sql_mode is 32 bits now but will be 64 bits soon */ uint64_t sql_mode; @@ -653,8 +653,26 @@ public: Query_event(Log_event_type type_arg= QUERY_EVENT); virtual ~Query_event() { + if (query) + { + if (m_release_query_buf == 1) + free(const_cast(query)); + else if (m_release_query_buf == 2) + delete [] (const_cast(query)); + + query= NULL; + } } + /** + @param how set to 1 to free(this->query); + set to 2 to delete[] this->query; + */ + void set_release_query(char how) + { + if (how == 1 || how == 2) + m_release_query_buf= how; + } #ifndef HAVE_MYSYS void print_event_info(std::ostream& info); void print_long_info(std::ostream& info); diff --git a/libbinlogevents/src/statement_events.cpp b/libbinlogevents/src/statement_events.cpp index 9c01370..0f4e7c0 100644 --- a/libbinlogevents/src/statement_events.cpp +++ b/libbinlogevents/src/statement_events.cpp @@ -33,6 +33,7 @@ Query_event::Query_event(Log_event_type type_arg) query(0), db(0), user(0), user_len(0), host(0), host_len(0), db_len(0), q_len(0) { + m_release_query_buf= 0; } /** @@ -64,6 +65,7 @@ Query_event::Query_event(const char* query_arg, const char* catalog_arg, master_data_written(0), explicit_defaults_ts(TERNARY_UNSET), mts_accessed_dbs(0) { + m_release_query_buf= 0; } /** @@ -130,6 +132,7 @@ Query_event::Query_event(const char* buf, unsigned int event_len, query_data_written= 0; + m_release_query_buf= 0; common_header_len= description_event->common_header_len; post_header_len= description_event->post_header_len[event_type - 1]; diff --git a/sql/rpl_slave.cc b/sql/rpl_slave.cc index a56a9c4..3df5708 100644 --- a/sql/rpl_slave.cc +++ b/sql/rpl_slave.cc @@ -4998,6 +5017,20 @@ apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli) SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK); } + +static inline std::string &rpl_pos_info(Relay_log_info *rli, std::string &pos_info) +{ + char buf[2048]; + snprintf(buf, 2048, "Current rli relay log pos: %s : %llu; master binlog pos: %s : %llu", + rli->get_group_relay_log_name(), + rli->get_group_relay_log_pos(), + rli->get_group_master_log_name(), + rli->get_group_master_log_pos()); + pos_info= buf; + return pos_info; +} + + /** Let the worker applying the current group to rollback and gracefully finish its work before. @@ -5052,7 +5093,148 @@ static bool coord_handle_partial_binlogged_transaction(Relay_log_info *rli, mysql_mutex_lock(&rli->data_lock); } + THD *worker_thd= (rli->last_assigned_worker ? + rli->last_assigned_worker->info_thd : thd); + XID_STATE *xid_state= worker_thd->get_transaction()->xid_state(); + const XID_STATE::xa_states xa_state= xid_state->get_state(); + std::string xid_state_str; + LogDebug("In coord_handle_partial_binlogged_transaction, slave worker thd: %p, xid_state: %s", + worker_thd, xid_state->to_string(xid_state_str).c_str()); + if (xid_state->get_xa_type() == XID_STATE::XA_EXTERNAL || + xa_state == XID_STATE::XA_ACTIVE || xa_state == XID_STATE::XA_IDLE) + { + /* + It seems that xa txns are ignored and not reexecuted, so here we don't + rollback it anymore, which should work correctly. + + We have to rollback such xa txns. + */ + //DBUG_RETURN(false); + DBUG_PRINT("info",("Injecting QUERY(XA END and/or XA ROLLBACK) to rollback worker")); + LogDebug("Injecting QUERY(XA END and/or XA ROLLBACK) to rollback worker, %s", + rpl_pos_info(rli, pos_info).c_str()); + switch (xa_state) + { + case XID_STATE::XA_ACTIVE: + { + const char *xid_ptr= xid_state->get_xid()->get_data(); + const size_t xid_len= xid_state->get_xid()->get_gtrid_length(); + std::string xa_end_cmd= "XA END ''"; + xa_end_cmd.insert(xa_end_cmd.length() - 1, xid_ptr, xid_len); + const size_t qbuflen= xa_end_cmd.length() + 1; + char *qbuf= new char[qbuflen]; + DBUG_ASSERT(qbuf); + if (qbuf == NULL) + { + LogError("Injecting QUERY(XA END) to rollback worker failed: Not enough memory when allocating %lu bytes.", qbuflen); + DBUG_RETURN(true); + } + + strncpy(qbuf, xa_end_cmd.c_str(), qbuflen); + Log_event *xa_end_event= new Query_log_event(thd, + qbuf, + qbuflen - 1,// not even an extra char is allowed + true, /* using_trans */ + false, /* immediate */ + true, /* suppress_use */ + 0, /* error */ + true /* ignore_command */); + ((Query_log_event*) xa_end_event)->db= ""; + xa_end_event->common_header->data_written= 0; + /* + Slave must never execute more gtids than master with respect to + master's uuid, otherwise ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER is returned. + */ + xa_end_event->server_id= ev->server_id; + ((Query_log_event *)xa_end_event)->set_release_query(2); + + /* + We must be careful to avoid SQL thread increasing its position + farther than the event that triggered this QUERY(XA END). + */ + xa_end_event->common_header->log_pos= ev->common_header->log_pos; + xa_end_event->future_event_relay_log_pos= ev->future_event_relay_log_pos; + LogDebug("Enqueuing %s", qbuf); + + if (apply_event_and_update_pos(&xa_end_event, thd, rli) != + SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK) + { + delete xa_end_event; + DBUG_RETURN(true); + } + + LogDebug("Enqueued %s", xa_end_cmd.c_str()); + mysql_mutex_lock(&rli->data_lock); + + // FALL THROUGH + } + case XID_STATE::XA_IDLE: + { + const char *xid_ptr= xid_state->get_xid()->get_data(); + const size_t xid_len= xid_state->get_xid()->get_gtrid_length(); + std::string xa_rb_cmd= "XA ROLLBACK ''"; + xa_rb_cmd.insert(xa_rb_cmd.length() - 1, xid_ptr, xid_len); + const size_t qbuflen= xa_rb_cmd.length() + 1; + char *qbuf= new char[qbuflen]; + DBUG_ASSERT(qbuf); + if (qbuf == NULL) + { + LogError("Injecting QUERY(XA ROLLBACK) to rollback worker failed: Not enough memory when allocating %lu bytes.", qbuflen); + DBUG_RETURN(true); + } + + strncpy(qbuf, xa_rb_cmd.c_str(), qbuflen); + Log_event *xa_rb_event= new Query_log_event(thd, + qbuf, + qbuflen - 1,// not even an extra char is allowed + true, /* using_trans */ + false, /* immediate */ + true, /* suppress_use */ + 0, /* error */ + true /* ignore_command */); + ((Query_log_event*) xa_rb_event)->db= ""; + xa_rb_event->common_header->data_written= 0; + /* + Slave must never execute more gtids than master with respect to + master's uuid, otherwise ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER is returned. + XA COMMIT and XA ROLLBACK are independent binlog event groups which + have their own gtids, so never use master's server_id. + */ + xa_rb_event->server_id= ev->server_id; + ((Query_log_event *)xa_rb_event)->set_release_query(2); + /* + We must be careful to avoid SQL thread increasing its position + farther than the event that triggered this QUERY(XA ROLLBACK). + */ + xa_rb_event->common_header->log_pos= ev->common_header->log_pos; + xa_rb_event->future_event_relay_log_pos= ev->future_event_relay_log_pos; + + LogDebug("Enqueuing %s", qbuf); + if (apply_event_and_update_pos(&xa_rb_event, thd, rli) != + SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK) + { + delete xa_rb_event; + DBUG_RETURN(true); + } + + LogDebug("Enqueued %s", xa_rb_cmd.c_str()); + mysql_mutex_lock(&rli->data_lock); + break; + } + default: + /* + If the xa trx is already prepared, then it ended as a binlog event group + already, so should not be re-executed; in other states(NOTR,ROLLBACKED), + it can't be rolled back. + */ + break; + } + + DBUG_RETURN(false); + } DBUG_PRINT("info",("Injecting QUERY(ROLLBACK) to rollback worker")); + LogDebug("Injecting QUERY(ROLLBACK) to rollback worker, %s", + rpl_pos_info(rli, pos_info).c_str()); Log_event *rollback_event= new Query_log_event(thd, STRING_WITH_LEN("ROLLBACK"), true, /* using_trans */ diff --git a/sql/xa.cc b/sql/xa.cc index 3615d34..a2fc7cb 100644 --- a/sql/xa.cc +++ b/sql/xa.cc @@ -620,13 +620,13 @@ bool Sql_cmd_xa_start::trans_xa_start(THD *thd) MYSQL_SET_TRANSACTION_XID(thd->m_transaction_psi, (const void *)xid_state->get_xid(), (int)xid_state->get_state()); + xid_state->set_xa_type(XID_STATE::XA_EXTERNAL); // started by 'xa start'. if (transaction_cache_insert(m_xid, thd->get_transaction())) { xid_state->reset(); trans_rollback(thd); } } - DBUG_RETURN(thd->is_error() || !xid_state->has_state(XID_STATE::XA_ACTIVE)); } diff --git a/sql/xa.h b/sql/xa.h index 683bc98..11f5c49 100644 --- a/sql/xa.h +++ b/sql/xa.h @@ -409,7 +409,7 @@ class XID_STATE { public: enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY}; - + enum xa_types {XA_INTERNAL, XA_EXTERNAL}; /** Transaction identifier. For now, this is only used to catch duplicated external xids. @@ -432,15 +432,41 @@ private: Checked and reset at XA-commit/rollback. */ bool m_is_binlogged; - + xa_types xa_type;// 0: internal, i.e. not an xa txn branch; 1: external, ie. started by 'xa start'. public: XID_STATE() : xa_state(XA_NOTR), in_recovery(false), rm_error(0), - m_is_binlogged(false) + m_is_binlogged(false), xa_type(XA_INTERNAL) { m_xid.null(); } + void set_xa_type(xa_types t) + { + xa_type= t; + } + + xa_types get_xa_type() const + { return xa_type; } + + // all strings must be shorter than 16 bytes. + const char *get_xa_type_str() const + { + const char *res= NULL; + switch(xa_type) + { + case XID_STATE::XA_INTERNAL: + res= "internal"; + break; + case XID_STATE::XA_EXTERNAL: + res= "external"; + break; + default: + break; + } + return res; + } + void set_state(xa_states state) { xa_state= state; }