diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 40178ce..da66d45 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -1,4 +1,4 @@ -/* Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved. +/* Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -1740,6 +1740,10 @@ int my_b_flush_io_cache(IO_CACHE *info, DBUG_ENTER("my_b_flush_io_cache"); DBUG_PRINT("enter", ("cache: 0x%lx", (long) info)); + DBUG_EXECUTE_IF("simulate_error_during_flush_cache_to_file", + { + DBUG_RETURN(TRUE); + }); if (!append_cache) need_append_buffer_lock= 0; diff --git a/sql/binlog.cc b/sql/binlog.cc index 719716c..f55c173 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -736,6 +736,17 @@ public: return stmt_cache.is_binlog_empty() && trx_cache.is_binlog_empty(); } + /* + clear stmt_cache and trx_cache if they are not empty + */ + void reset() + { + if (!stmt_cache.is_binlog_empty()) + stmt_cache.reset(); + if (!trx_cache.is_binlog_empty()) + trx_cache.reset(); + } + #ifndef DBUG_OFF bool dbug_any_finalized() const { return stmt_cache.dbug_is_finalized() || trx_cache.dbug_is_finalized(); @@ -5928,7 +5939,8 @@ bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data) if (rand() % 3 == 0) { write_error=1; - goto err; + thd->commit_error= THD::CE_FLUSH_ERROR; + DBUG_RETURN(0); } };); @@ -6670,24 +6682,27 @@ MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first) #ifndef DBUG_OFF stage_manager.clear_preempt_status(head); #endif - if (head->commit_error == THD::CE_NONE) + /* + Flush/Sync error should be ignored and continue + to commit phase. And thd->commit_error cannot be + COMMIT_ERROR at this moment. + */ + DBUG_ASSERT(head->commit_error != THD::CE_COMMIT_ERROR); + excursion.try_to_attach_to(head); + bool all= head->transaction.flags.real_commit; + if (head->transaction.flags.commit_low) { - excursion.try_to_attach_to(head); - bool all= head->transaction.flags.real_commit; - if (head->transaction.flags.commit_low) - { - /* head is parked to have exited append() */ - DBUG_ASSERT(head->transaction.flags.ready_preempt); - /* - storage engine commit - */ - if (ha_commit_low(head, all, false)) - head->commit_error= THD::CE_COMMIT_ERROR; - } - DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s", - head->commit_error, - YESNO(head->transaction.flags.pending))); + /* head is parked to have exited append() */ + DBUG_ASSERT(head->transaction.flags.ready_preempt); + /* + storage engine commit + */ + if (ha_commit_low(head, all, false)) + head->commit_error= THD::CE_COMMIT_ERROR; } + DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s", + head->commit_error, + YESNO(head->transaction.flags.pending))); /* Decrement the prepared XID counter after storage engine commit. We also need decrement the prepared XID when encountering a @@ -6713,7 +6728,7 @@ MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first) for (THD *head= first; head; head= head->next_to_commit) { if (head->transaction.flags.run_hooks && - head->commit_error == THD::CE_NONE) + head->commit_error != THD::CE_COMMIT_ERROR) { /* @@ -6809,38 +6824,11 @@ MYSQL_BIN_LOG::change_stage(THD *thd, int MYSQL_BIN_LOG::flush_cache_to_file(my_off_t *end_pos_var) { - if (DBUG_EVALUATE_IF("simulate_error_during_flush_cache_to_file", 1, - flush_io_cache(&log_file))) + if (flush_io_cache(&log_file)) { - if (binlog_error_action == ABORT_SERVER) - { - THD *thd= current_thd; - /* - On fatal error when code enters here we should forcefully clear the - previous errors so that a new critical error message can be pushed - to the client side. - */ - thd->clear_error(); - my_error(ER_BINLOG_LOGGING_IMPOSSIBLE, MYF(0), "An error occured during " - "flushing cache to file. 'binlog_error_action' is set to " - "'ABORT_SERVER'. Hence aborting the server"); - sql_print_error("An error occured during flushing cache to file. " - "'binlog_error_action' is set to 'ABORT_SERVER'. " - "Hence aborting the server"); - thd->protocol->end_statement(); - _exit(EXIT_FAILURE); - } - else - { - sql_print_error("An error occured during flushing cache to file. " - "'binlog_error_action' is set to 'IGNORE_ERROR'. " - "Hence turning logging off for the whole duration " - "of the MySQL server process. To turn it on " - "again: fix the cause, shutdown the MySQL " - "server and restart it."); - close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); - return ER_ERROR_ON_WRITE; - } + THD *thd= current_thd; + thd->commit_error= THD::CE_FLUSH_ERROR; + return ER_ERROR_ON_WRITE; } *end_pos_var= my_b_tell(&log_file); return 0; @@ -6858,8 +6846,13 @@ MYSQL_BIN_LOG::sync_binlog_file(bool force) if (force || (sync_period && ++sync_counter >= sync_period)) { sync_counter= 0; - if (mysql_file_sync(log_file.file, MYF(MY_WME))) + if (DBUG_EVALUATE_IF("simulate_error_during_sync_binlog_file", 1, + mysql_file_sync(log_file.file, MYF(MY_WME)))) + { + THD *thd= current_thd; + thd->commit_error= THD::CE_SYNC_ERROR; return std::make_pair(true, synced); + } synced= true; } return std::make_pair(false, synced); @@ -6889,14 +6882,25 @@ MYSQL_BIN_LOG::sync_binlog_file(bool force) int MYSQL_BIN_LOG::finish_commit(THD *thd) { + /* + In some unlikely situations, it can happen that binary + log is closed before the thread flushes it's cache. + In that case, clear the caches before doing commit. + */ + if (unlikely(!is_open())) + { + binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd); + if (cache_mngr) + cache_mngr->reset(); + } if (thd->transaction.flags.commit_low) { const bool all= thd->transaction.flags.real_commit; /* storage engine commit */ - if (thd->commit_error == THD::CE_NONE && - ha_commit_low(thd, all, false)) + DBUG_ASSERT(thd->commit_error != THD::CE_COMMIT_ERROR); + if (ha_commit_low(thd, all, false)) thd->commit_error= THD::CE_COMMIT_ERROR; /* Decrement the prepared XID counter after storage engine commit @@ -6910,7 +6914,7 @@ MYSQL_BIN_LOG::finish_commit(THD *thd) if and be the only after_commit invocation left in the code. */ - if ((thd->commit_error == THD::CE_NONE) && thd->transaction.flags.run_hooks) + if ((thd->commit_error != THD::CE_COMMIT_ERROR ) && thd->transaction.flags.run_hooks) { (void) RUN_HOOK(transaction, after_commit, (thd, all)); thd->transaction.flags.run_hooks= false; @@ -6931,10 +6935,80 @@ MYSQL_BIN_LOG::finish_commit(THD *thd) DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized()); DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", thd->thread_id, thd->commit_error)); - return thd->commit_error; + /* + flush or sync errors are handled by the leader of the group + (using binlog_error_action). Hence treat only COMMIT_ERRORs as errors. + */ + return (thd->commit_error == THD::CE_COMMIT_ERROR); } - +/** + Helper function to handle flush or sync stage errors. + If binlog_error_action= ABORT_SERVER, server will be aborted + after reporting the error to the client. + If binlog_error_action= IGNORE_ERROR, binlog will be closed + for the life time of the server. close() call is protected + with LOCK_log to avoid any parallel operations on binary log. + + @param thd Thread object that faced flush/sync error + @param need_lock_log + > Indicates true if LOCk_log is needed before closing + binlog (happens when we are handling sync error) + > Indicates false if LOCK_log is already acquired + by the thread (happens when we are handling flush + error) + + @return void +*/ +void MYSQL_BIN_LOG::handle_binlog_flush_or_sync_error(THD *thd, + bool need_lock_log) +{ + char errmsg[MYSQL_ERRMSG_SIZE]; + sprintf(errmsg, "An error occurred during %s stage of the commit. " + "'binlog_error_action' is set to '%s'. ", + thd->commit_error== THD::CE_FLUSH_ERROR ? "flush" : "sync", + binlog_error_action == ABORT_SERVER ? "ABORT_SERVER" : "IGNORE_ERROR"); + if (binlog_error_action == ABORT_SERVER) + { + sprintf(errmsg, "%s. Hence aborting the server.", errmsg); + /* + On fatal error when code enters here we should forcefully clear the + previous errors so that a new critical error message can be pushed + to the client side. + */ + thd->clear_error(); + my_error(ER_BINLOG_LOGGING_IMPOSSIBLE, MYF(0), errmsg); + sql_print_error("%s", errmsg); + thd->protocol->end_statement(); + _exit(EXIT_FAILURE); + } + else + { + DEBUG_SYNC(thd, "before_binlog_closed_due_to_error"); + if (need_lock_log) + mysql_mutex_lock(&LOCK_log); + else + mysql_mutex_assert_owner(&LOCK_log); + /* + It can happen that other group leader encountered + error and already closed the binary log. So print + error only if it is in open state. But we should + call close() always just in case if the previous + close did not close index file. + */ + if (is_open()) + { + sql_print_error("%s. Hence turning logging off for the whole duration " + "of the MySQL server process. To turn it on again: fix " + "the cause, shutdown the MySQL server and restart it.", + errmsg); + } + close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); + if (need_lock_log) + mysql_mutex_unlock(&LOCK_log); + DEBUG_SYNC(thd, "after_binlog_closed_due_to_error"); + } +} /** Flush and commit the transaction. @@ -6987,7 +7061,7 @@ MYSQL_BIN_LOG::finish_commit(THD *thd) int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) { DBUG_ENTER("MYSQL_BIN_LOG::ordered_commit"); - int flush_error= 0; + int flush_error= 0, sync_error= 0; my_off_t total_bytes= 0; bool do_rotate= false; @@ -7036,6 +7110,7 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) anything more since it is possible that a thread entered and appointed itself leader for the flush phase. */ + DEBUG_SYNC(thd, "waiting_to_enter_flush_stage"); if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log)) { DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", @@ -7043,10 +7118,26 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) DBUG_RETURN(finish_commit(thd)); } - THD *wait_queue= NULL; - flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue); - + THD *wait_queue= NULL, *final_queue= NULL; + mysql_mutex_t *leave_mutex_before_commit_stage= NULL; my_off_t flush_end_pos= 0; + bool need_LOCK_log; + if (unlikely(!is_open())) + { + final_queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE); + leave_mutex_before_commit_stage= &LOCK_log; + /* + binary log is closed, flush stage and sync stage should be + ignored. Binlog cache should be cleared, but instead of doing + it here, do that work in 'finish_commit' function so that + leader and followers thread caches will be cleared. + */ + goto commit_stage; + } + DEBUG_SYNC(thd, "waiting_in_the_middle_of_flush_stage"); + flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, + &wait_queue); + if (flush_error == 0 && total_bytes > 0) flush_error= flush_cache_to_file(&flush_end_pos); @@ -7071,10 +7162,18 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); } + if (flush_error) + { + /* + Handle flush error (if any) after leader finishes it's flush stage. + */ + handle_binlog_flush_or_sync_error(thd, false /* need_lock_log */); + } + /* Stage #2: Syncing binary log file to disk */ - bool need_LOCK_log= (get_sync_period() == 1); + need_LOCK_log= (get_sync_period() == 1); /* LOCK_log is not released when sync_binlog is 1. It guarantees that the @@ -7087,17 +7186,17 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) thd->thread_id, thd->commit_error)); DBUG_RETURN(finish_commit(thd)); } - THD *final_queue= stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE); + final_queue= stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE); if (flush_error == 0 && total_bytes > 0) { DEBUG_SYNC(thd, "before_sync_binlog_file"); std::pair result= sync_binlog_file(false); - flush_error= result.first; + sync_error= result.first; } if (need_LOCK_log) mysql_mutex_unlock(&LOCK_log); - + leave_mutex_before_commit_stage= &LOCK_sync; /* Stage #3: Commit all transactions in order. @@ -7107,10 +7206,18 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) Howver, since we are keeping the lock from the previous stage, we need to unlock it if we skip the stage. */ - if (opt_binlog_order_commits) +commit_stage: + /* + We are delaying the handling of sync error until + all locks are released but we should not enter into + commit stage if binlog_error_action is ABORT_SERVER. + */ + if (opt_binlog_order_commits && + (sync_error == 0 || binlog_error_action != ABORT_SERVER)) { if (change_stage(thd, Stage_manager::COMMIT_STAGE, - final_queue, &LOCK_sync, &LOCK_commit)) + final_queue, leave_mutex_before_commit_stage, + &LOCK_commit)) { DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", thd->thread_id, thd->commit_error)); @@ -7128,8 +7235,14 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) process_after_commit_stage_queue(thd, commit_queue); final_queue= commit_queue; } - else - mysql_mutex_unlock(&LOCK_sync); + else if (leave_mutex_before_commit_stage) + mysql_mutex_unlock(leave_mutex_before_commit_stage); + + /* + Handle sync error after we release all locks in order to avoid deadlocks + */ + if (sync_error) + handle_binlog_flush_or_sync_error(thd, true /* need_lock_log */); /* Commit done so signal all waiting threads */ stage_manager.signal_done(final_queue); @@ -7158,6 +7271,10 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) DEBUG_SYNC(thd, "ready_to_do_rotation"); bool check_purge= false; mysql_mutex_lock(&LOCK_log); + /* + If rotate fails then depends on binlog_error_action variable + appropriate action will be taken inside rotate call. + */ int error= rotate(false, &check_purge); mysql_mutex_unlock(&LOCK_log); @@ -7166,7 +7283,11 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) else if (check_purge) purge(); } - DBUG_RETURN(thd->commit_error); + /* + flush or sync errors are handled above (using binlog_error_action). + Hence treat only COMMIT_ERRORs as errors. + */ + DBUG_RETURN(thd->commit_error == THD::CE_COMMIT_ERROR); } diff --git a/sql/binlog.h b/sql/binlog.h index b08234a..4ed6db2 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -518,6 +518,7 @@ private: int process_flush_stage_queue(my_off_t *total_bytes_var, bool *rotate_var, THD **out_queue_var); int ordered_commit(THD *thd, bool all, bool skip_commit = false); + void handle_binlog_flush_or_sync_error(THD *thd, bool need_lock_log); public: int open_binlog(const char *opt_name); void close(); diff --git a/sql/sql_class.h b/sql/sql_class.h index edd6af0..c1a42ea 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3001,6 +3001,7 @@ public: { CE_NONE= 0, CE_FLUSH_ERROR, + CE_SYNC_ERROR, CE_COMMIT_ERROR, CE_ERROR_COUNT } commit_error;