diff --git a/sql/binlog.cc b/sql/binlog.cc index 478aff5..71f6abf 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -77,6 +77,7 @@ const char *log_bin_basename= 0; extern uint g_tdsql_mode; extern my_bool g_reliable_relaylog; MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period, WRITE_CACHE); +int collect_xa_prepared(std::set&xa_prepared); static int binlog_init(void *p); static int binlog_start_trans_and_stmt(THD *thd, Log_event *start_event); @@ -9451,6 +9452,8 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, Log_event *ev; HASH xids; MEM_ROOT mem_root; + std::set xa_prepared; + /* The flag is used for handling the case that a transaction is partially written to the binlog. @@ -9471,7 +9474,8 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, { // TDSQL TODO: handle XA transactions. if (ev->get_type_code() == binary_log::QUERY_EVENT && - !strcmp(((Query_log_event*)ev)->query, "BEGIN")) + (!strcmp(((Query_log_event*)ev)->query, "BEGIN") || + !strncmp(((Query_log_event*)ev)->query, "XA START", 8))) in_transaction= TRUE; if (ev->get_type_code() == binary_log::QUERY_EVENT && @@ -9490,6 +9494,13 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, if (!x || my_hash_insert(&xids, x)) goto err2; } + else if (ev->get_type_code() == binary_log::XA_PREPARE_LOG_EVENT) + { + DBUG_ASSERT(in_transaction == TRUE); + in_transaction= FALSE; + XA_prepare_log_event *xev=(XA_prepare_log_event *)ev; + xa_prepared.insert(xev->get_xid_str()); + } /* Recorded valid position for the crashed binlog file @@ -9533,13 +9544,16 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, delete ev; } + if (collect_xa_prepared(xa_prepared) < 0) + goto err2; + /* Call ha_recover if and only if there is a registered engine that does 2PC, otherwise in DBUG builds calling ha_recover directly will result in an assert. (Production builds would be safe since ha_recover returns right away if total_ha_2pc <= opt_log_bin.) */ - if (total_ha_2pc > 1 && ha_recover(&xids)) + if (total_ha_2pc > 1 && ha_recover(&xids, &xa_prepared)) goto err2; free_root(&mem_root, MYF(0)); diff --git a/sql/handler.h b/sql/handler.h index 71aa39b..0f0bb33 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -36,6 +36,7 @@ #include #include +#include class Alter_info; class SE_cost_constants; // see opt_costconstants.h @@ -4294,7 +4295,7 @@ int ha_prepare(THD *thd); there should be no prepared transactions in this case. */ -int ha_recover(HASH *commit_list); +int ha_recover(HASH *commit_list, const std::set *xa_prepared=NULL); /* transactions: interface to low-level handlerton functions. These are diff --git a/sql/log_event.h b/sql/log_event.h index 5552b0e..9f3216f 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -2040,6 +2040,12 @@ public: { return xid_bufs_size + my_xid.gtrid_length + my_xid.bqual_length; } + + std::string get_xid_str() const + { + std::string xid_str(my_xid.data, my_xid.gtrid_length); + return xid_str; + } #ifdef MYSQL_SERVER bool write(IO_CACHE* file); #else diff --git a/sql/xa.cc b/sql/xa.cc index 1692721..bf78587 100644 --- a/sql/xa.cc +++ b/sql/xa.cc @@ -38,6 +38,9 @@ static HASH transaction_cache; extern ulong xa_commit_lock_wait_timeout; extern uint g_tdsql_mode; + +static bool serialize_xa_prepared(std::set&xa_prepared); + static inline ulong commit_lock_wait_timeout(THD *thd) { return (g_tdsql_mode ? xa_commit_lock_wait_timeout : thd->variables.lock_wait_timeout); @@ -74,9 +77,18 @@ static void ha_commit_or_rollback_by_xid(THD *thd, XID *xid, bool commit) struct xarecover_st { + typedef std::set xa_prepared_set_t; + xarecover_st(const xa_prepared_set_t *xap) : binlog_xa_prepared(xap){} int len, found_foreign_xids, found_my_xids; XID *list; HASH *commit_list; + // prepared xa txn branches that are found from xa_prepared.info as well as + // last binlog file, all other prepared xa txns found from engines than these + // should be rolled back. + const xa_prepared_set_t *binlog_xa_prepared; + // prepared xa txn branches that are found from storage engines(e.g. innodb, etc) + // during recovery. They should be stored into xa_prepared.info + xa_prepared_set_t *binlog_xa_prepared_engine; bool dry_run; }; @@ -99,14 +111,22 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, my_xid x= info->list[i].get_my_xid(); if (!x) // not "mine" - that is generated by external TM { -#ifndef DBUG_OFF - char buf[XIDDATASIZE * 4 + 6]; // see xid_to_str - XID *xid= info->list + i; - sql_print_information("ignore xid %s", xid->xid_to_str(buf)); -#endif - transaction_cache_insert_recovery(info->list + i); - info->found_foreign_xids++; - continue; + const XID &target_xid= info->list[i]; + std::string xid_data(target_xid.get_data(), target_xid.get_gtrid_length()); + + if (info->binlog_xa_prepared && info->binlog_xa_prepared->find(xid_data) == info->binlog_xa_prepared->end()) + { + hton->rollback_by_xid(hton, info->list + i); + sql_print_information("Rolled back XA txn branch (%s) which is only prepared in engine but not in binlog.", xid_data.c_str()); + continue; + } + else + { + transaction_cache_insert_recovery(info->list + i); + info->found_foreign_xids++; + info->binlog_xa_prepared_engine->insert(xid_data); + continue; + } } if (info->dry_run) { @@ -143,10 +163,12 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, } -int ha_recover(HASH *commit_list) +int ha_recover(HASH *commit_list, const xarecover_st::xa_prepared_set_t* xa_prepared) { - struct xarecover_st info; + struct xarecover_st info(xa_prepared); + xarecover_st::xa_prepared_set_t engine_prepared; DBUG_ENTER("ha_recover"); + info.binlog_xa_prepared_engine= &engine_prepared; info.found_foreign_xids= info.found_my_xids= 0; info.commit_list= commit_list; info.dry_run= (info.commit_list == 0 && @@ -201,8 +223,13 @@ int ha_recover(HASH *commit_list) plugin_foreach(NULL, xarecover_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &info); - + my_free(info.list); + + // Update the xa_prepared.info file, even if engine_prepared is empty. + if (serialize_xa_prepared(engine_prepared)) + DBUG_RETURN(1); + if (info.found_foreign_xids) sql_print_warning("Found %d prepared XA transactions", info.found_foreign_xids); @@ -1423,3 +1450,94 @@ std::string &XID_STATE::to_string(std::string &str) const str= buf; return str; } + + +/* + @retval the number of xa-gtids inserted into xa_prepared. or -1 on error. + errors including duplicate xa-gtids found or io error. +*/ +int collect_xa_prepared(std::set&xa_prepared) +{ + char full_path[FN_REFLEN]; + char xid[512]; + const char *fn= "xa_prepared.info"; + int ret= 0; + + if (normalize_binlog_name(full_path, fn, false)) + return -1; + + FILE *fp= fopen(full_path, "r"); + // It's OK if the file doesn't exist, it will be created in serialize_xa_prepared. + if (!fp && errno != ENOENT) + { + sql_print_error(ER(ER_CANT_OPEN_FILE), full_path, errno, strerror(errno)); + return -1; + } + + while (!feof(fp)) + { + xid[0]= '\0'; + if (!fgets(xid, 512, fp)) + { + if (!feof(fp)) + { + ret= -1; + goto err; + } + break; + } + + std::pair::iterator, bool> iret= + xa_prepared.insert(std::string(xid, strlen(xid) - 1));// excluding the '\n'. + + if (iret.second) + { + ret++; + } + else + { + sql_print_error("%s : %s", ER(ER_XAER_DUPID), xid); + ret= -1; + goto err; + } + } +err: + fclose(fp); + return ret; +} + + +static bool serialize_xa_prepared(std::set&xa_prepared) +{ + char full_path[FN_REFLEN]; + const char *fn= "xa_prepared.info"; + + if (normalize_binlog_name(full_path, fn, false)) + return true; + + FILE *fp= fopen(full_path, "w+"); + + if (!fp) + { + sql_print_error(ER(ER_CANT_OPEN_FILE), full_path, errno, strerror(errno)); + return true; + } + + bool ret= false; + + for (std::set::iterator i= xa_prepared.begin(); + i != xa_prepared.end(); ++i) + { + if (fprintf(fp, "%s\n", (*i).c_str()) < 0) + { + ret= true; + sql_print_error(ER(ER_IO_WRITE_ERROR), errno, strerror(errno), full_path); + goto err; + } + } +err: + fflush(fp); + fsync(fileno(fp)); + fclose(fp); + return ret; +}