commit 0bf75387bc249028d3d5d4004d3410fab51b1b55 Author: wenfeng.zhang Date: Sun Jan 19 13:54:04 2020 +0800 fix xa commit bug for trx below: XA START '1'; INSERT INTO t1 VALUES(1); XA END '1'; XA COMMIT '1'; In replication mode, when master crash in XA COMMIT, it may write binlog and send to slave, while still not commit in engine. Cause slave receive the binlog, it will apply the event and commit success. When crash server restart, the trx will not auto commit. diff --git a/sql/binlog.cc b/sql/binlog.cc index 3aff8b69..06720b35 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -91,6 +91,35 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all); static int binlog_xa_commit(handlerton *hton, XID *xid); static int binlog_xa_rollback(handlerton *hton, XID *xid); static void exec_binlog_error_action_abort(const char* err_string); +static inline uint char_val(char X) { + return (uint)(X >= '0' && X <= '9' + ? X - '0' + : X >= 'A' && X <= 'Z' ? X - 'A' + 10 : X - 'a' + 10); +} + +// the format in binlog is: XA COMMIT X'31',X'32',1 +static XID get_xid_from_xa_commit_rollback(std::string query_sql) { + XID xid; + size_t gtrid_begin_pos = query_sql.find('\'', 0); + size_t gtrid_end_pos = query_sql.find('\'', gtrid_begin_pos + 1); + std::string gtrid; + for (size_t i = 0; i < gtrid_end_pos - gtrid_begin_pos - 1; i += 2) { + gtrid += (char)(char_val(query_sql[gtrid_begin_pos + 1 + i]) * 16 + + char_val(query_sql[gtrid_begin_pos + 1 + i + 1])); + } + size_t bqual_begin_pos = query_sql.find('\'', gtrid_end_pos + 1); + size_t bqual_end_pos = query_sql.find('\'', bqual_begin_pos + 1); + std::string bqual; + for (size_t i = 0; i < bqual_end_pos - bqual_begin_pos - 1; i += 2) { + bqual += (char)(char_val(query_sql[bqual_begin_pos + 1 + i]) * 16 + + char_val(query_sql[bqual_begin_pos + 1 + i + 1])); + } + size_t format_id_pos = query_sql.find(',', bqual_end_pos + 1); + long format_id = atol(query_sql.c_str() + format_id_pos + 1); + + xid.set(format_id, gtrid.c_str(), gtrid.size(), bqual.c_str(), bqual.size()); + return xid; +} /** Helper class to hold a mutex for the duration of the @@ -9824,6 +9853,8 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, Log_event *ev; HASH xids; MEM_ROOT mem_root; + std::set xa_prepared; + std::map xa_committed; /* The flag is used for handling the case that a transaction is partially written to the binlog. @@ -9844,7 +9875,8 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, && ev->is_valid()) { if (ev->get_type_code() == binary_log::QUERY_EVENT && - !strcmp(((Query_log_event*)ev)->query, "BEGIN")) + (!strcmp(((Query_log_event *)ev)->query, "BEGIN") || + !strncmp(((Query_log_event *)ev)->query, "XA START", 8))) in_transaction= TRUE; if (ev->get_type_code() == binary_log::QUERY_EVENT && @@ -9862,8 +9894,39 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, sizeof(xev->xid)); if (!x || my_hash_insert(&xids, x)) goto err2; + } else if (ev->get_type_code() == binary_log::XA_PREPARE_LOG_EVENT) { + DBUG_ASSERT(in_transaction == TRUE); + in_transaction = FALSE; + // TDSQL: one phase commits use XA PREPARE event as end, but they are not + // prepared but ended. + XA_prepare_log_event *xev = (XA_prepare_log_event *)ev; + XID xid = xev->get_xid(); + if (!xev->is_one_phase()) { + xa_prepared.insert(xid); + xa_committed.erase(xid); + } else { + xa_committed[xid] = true; + xa_prepared.erase(xid); + } + } else if (ev->get_type_code() == binary_log::QUERY_EVENT && + !strncmp(((Query_log_event *)ev)->query, "XA COMMIT", 9)) { + DBUG_ASSERT(in_transaction == FALSE); + in_transaction = FALSE; + + // the format in binlog is: XA COMMIT X'31',X'32',1 + // find gtrid and bqual from the single-quote is ok + XID xid = get_xid_from_xa_commit_rollback(((Query_log_event *)ev)->query); + xa_committed[xid] = true; + xa_prepared.erase(xid); + } else if (ev->get_type_code() == binary_log::QUERY_EVENT && + !strncmp(((Query_log_event *)ev)->query, "XA ROLLBACK", 11)) { + DBUG_ASSERT(in_transaction == FALSE); + in_transaction = FALSE; + + XID xid = get_xid_from_xa_commit_rollback(((Query_log_event *)ev)->query); + xa_committed[xid] = false; + xa_prepared.erase(xid); } - /* Recorded valid position for the crashed binlog file which did not contain incorrect events. The following @@ -9912,7 +9975,7 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, will result in an assert. (Production builds would be safe since ha_recover returns right away if total_ha_2pc <= opt_log_bin.) */ - if (total_ha_2pc > 1 && ha_recover(&xids)) + if (total_ha_2pc > 1 && ha_recover(&xids, &xa_committed)) goto err2; free_root(&mem_root, MYF(0)); diff --git a/sql/handler.h b/sql/handler.h index 92e8f80a..59c27bde 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -35,6 +35,7 @@ #include "mysql/psi/psi.h" #include +#include #include class Alter_info; @@ -4102,7 +4103,7 @@ int ha_prepare(THD *thd); there should be no prepared transactions in this case. */ -int ha_recover(HASH *commit_list); +int ha_recover(HASH *commit_list, std::map *xa_commtted = NULL); /* transactions: interface to low-level handlerton functions. These are diff --git a/sql/log_event.h b/sql/log_event.h index 00a56118..dc7a8b5a 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -2076,6 +2076,17 @@ public: my_xid.bqual_length == 0); xid= NULL; } + + bool is_one_phase() const { return one_phase; } + XID get_xid() { + XID xid; + xid.set_format_id(my_xid.formatID); + xid.set_gtrid_length(my_xid.gtrid_length); + xid.set_bqual_length(my_xid.bqual_length); + xid.set_data(my_xid.data, my_xid.gtrid_length + my_xid.bqual_length); + return xid; + } + Log_event_type get_type_code() { return binary_log::XA_PREPARE_LOG_EVENT; } size_t get_data_size() { diff --git a/sql/xa.cc b/sql/xa.cc index 24249589..2cda3a19 100644 --- a/sql/xa.cc +++ b/sql/xa.cc @@ -64,6 +64,23 @@ static void ha_commit_or_rollback_by_xid(THD *thd, XID *xid, bool commit) MYSQL_STORAGE_ENGINE_PLUGIN, xid); } +bool operator<(const xid_t &xs1, const xid_t &xs2) { + if (xs1.get_format_id() < xs2.get_format_id()) + return true; + if (xs1.get_gtrid_length() < xs2.get_gtrid_length()) { + return true; + } else if (xs1.get_gtrid_length() > xs2.get_gtrid_length()) { + return false; + } + if (xs1.get_bqual_length() < xs2.get_bqual_length()) { + return true; + } else if (xs1.get_bqual_length() > xs2.get_bqual_length()) { + return false; + } + return strncmp(xs1.get_data(), xs2.get_data(), + xs1.get_gtrid_length() + xs1.get_bqual_length()) == -1; +} +bool operator==(const xid_t &xs1, const xid_t &xs2) { return xs1.eq(&xs2); } struct xarecover_st { @@ -71,6 +88,7 @@ struct xarecover_st XID *list; HASH *commit_list; bool dry_run; + std::map *binlog_xa_committed; }; @@ -92,11 +110,20 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, my_xid x= info->list[i].get_my_xid(); if (!x) // not "mine" - that is generated by external TM { -#ifndef DBUG_OFF - char buf[XIDDATASIZE * 4 + 6]; // see xid_to_str - XID *xid= info->list + i; - sql_print_information("ignore xid %s", xid->xid_to_str(buf)); -#endif + const XID &target_xid = info->list[i]; + if (info->binlog_xa_committed) { + std::map::const_iterator commit_it = + info->binlog_xa_committed->find(target_xid); + if (commit_it != info->binlog_xa_committed->end()) { + bool is_commit = commit_it->second; + if (is_commit) { + hton->commit_by_xid(hton, info->list + i); + } else { + hton->rollback_by_xid(hton, info->list + i); + } + continue; + } + } transaction_cache_insert_recovery(info->list + i); info->found_foreign_xids++; continue; @@ -135,9 +162,7 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, return false; } - -int ha_recover(HASH *commit_list) -{ +int ha_recover(HASH *commit_list, std::map *xa_commtted) { struct xarecover_st info; DBUG_ENTER("ha_recover"); info.found_foreign_xids= info.found_my_xids= 0; @@ -145,6 +170,7 @@ int ha_recover(HASH *commit_list) info.dry_run= (info.commit_list == 0 && tc_heuristic_recover == TC_HEURISTIC_NOT_USED); info.list= NULL; + info.binlog_xa_committed = xa_commtted; /* commit_list and tc_heuristic_recover cannot be set both */ DBUG_ASSERT(info.commit_list == 0 || @@ -215,7 +241,6 @@ int ha_recover(HASH *commit_list) DBUG_RETURN(0); } - bool xa_trans_force_rollback(THD *thd) { /* diff --git a/sql/xa.h b/sql/xa.h index ccb117c0..9f7a018a 100644 --- a/sql/xa.h +++ b/sql/xa.h @@ -404,6 +404,8 @@ private: friend class XID_STATE; } XID; +extern bool operator<(const xid_t &xs1, const xid_t &xs2); +extern bool operator==(const xid_t &xs1, const xid_t &xs2); class XID_STATE {