diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h index 6f7cd45..2080b30 100644 --- a/plugin/semisync/semisync_master.h +++ b/plugin/semisync/semisync_master.h @@ -30,7 +30,9 @@ extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; struct TranxNode { char log_name_[FN_REFLEN]; - my_off_t log_pos_; + my_off_t log_pos_; + mysql_cond_t cond; + int n_waiters; struct TranxNode *next_; /* the next node in the sorted list */ struct TranxNode *hash_next_; /* the next node during hash collision */ }; @@ -128,6 +130,7 @@ public: trx_node->log_pos_= 0; trx_node->next_= 0; trx_node->hash_next_= 0; + trx_node->n_waiters= 0; return trx_node; } @@ -246,6 +249,12 @@ private: /* New Block is always the current_block */ current_block= block; ++block_num; + + for (int i=0; i< BLOCK_TRANX_NODES; i++) + mysql_cond_init(key_ss_cond_COND_binlog_send_, + ¤t_block->nodes[i].cond, + NULL); + return 0; } return 1; @@ -257,6 +266,8 @@ private: */ void free_block(Block *block) { + for (int i=0; i< BLOCK_TRANX_NODES; i++) + mysql_cond_destroy(&block->nodes[i].cond); my_free(block); --block_num; } @@ -330,6 +341,11 @@ private: } public: + int signal_waiting_sessions_all(); + int signal_waiting_sessions_up_to(const char *log_file_name, + my_off_t log_file_pos); + TranxNode* find_active_tranx_node(const char *log_file_name, + my_off_t log_file_pos); ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); ~ActiveTranx(); @@ -376,11 +392,6 @@ class ReplSemiSyncMaster /* True when initObject has been called */ bool init_done_; - /* This cond variable is signaled when enough binlog has been sent to slave, - * so that a waiting trx can return the 'ok' to the client for a commit. - */ - mysql_cond_t COND_binlog_send_; - /* Mutex that protects the following state variables and the active * transaction list. * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are @@ -434,8 +445,6 @@ class ReplSemiSyncMaster void lock(); void unlock(); - void cond_broadcast(); - int cond_timewait(struct timespec *wait_time); /* Is semi-sync replication on? */ bool is_on() { diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc index d84c558..16bab13 100644 --- a/plugin/semisync/semisync_master.cc +++ b/plugin/semisync/semisync_master.cc @@ -224,6 +224,54 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, return (entry != NULL); } +int ActiveTranx::signal_waiting_sessions_all() +{ + const char *kWho = "ActiveTranx::signal_waiting_sessions_all"; + function_enter(kWho); + for (TranxNode* entry= trx_front_; entry; entry=entry->next_) + mysql_cond_broadcast(&entry->cond); + + return function_exit(kWho, 0); +} + +int ActiveTranx::signal_waiting_sessions_up_to(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx::signal_waiting_sessions_up_to"; + function_enter(kWho); + + TranxNode* entry= trx_front_; + int cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ; + while (entry && cmp <= 0) + { + mysql_cond_broadcast(&entry->cond); + entry= entry->next_; + if (entry) + cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ; + } + + return function_exit(kWho, (entry != NULL)); +} + +TranxNode * ActiveTranx::find_active_tranx_node(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx::find_active_tranx_node"; + function_enter(kWho); + + TranxNode* entry= trx_front_; + + while (entry) + { + if (ActiveTranx::compare(log_file_name, log_file_pos, entry->log_name_, + entry->log_pos_) <= 0) + break; + entry= entry->next_; + } + function_exit(kWho, 0); + return entry; +} + int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, my_off_t log_file_pos) { @@ -238,7 +286,8 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, while (new_front) { - if (compare(new_front, log_file_name, log_file_pos) > 0) + if (compare(new_front, log_file_name, log_file_pos) > 0 || + new_front->n_waiters > 0) break; new_front = new_front->next_; } @@ -365,8 +414,6 @@ int ReplSemiSyncMaster::initObject() /* Mutex initialization can only be done after MY_INIT(). */ mysql_mutex_init(key_ss_mutex_LOCK_binlog_, &LOCK_binlog_, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_ss_cond_COND_binlog_send_, - &COND_binlog_send_, NULL); if (rpl_semi_sync_master_enabled) result = enableMaster(); @@ -442,7 +489,6 @@ ReplSemiSyncMaster::~ReplSemiSyncMaster() if (init_done_) { mysql_mutex_destroy(&LOCK_binlog_); - mysql_cond_destroy(&COND_binlog_send_); } delete active_tranxs_; @@ -458,22 +504,6 @@ void ReplSemiSyncMaster::unlock() mysql_mutex_unlock(&LOCK_binlog_); } -void ReplSemiSyncMaster::cond_broadcast() -{ - mysql_cond_broadcast(&COND_binlog_send_); -} - -int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time) -{ - const char *kWho = "ReplSemiSyncMaster::cond_timewait()"; - int wait_res; - - function_enter(kWho); - wait_res= mysql_cond_timedwait(&COND_binlog_send_, - &LOCK_binlog_, wait_time); - return function_exit(kWho, wait_res); -} - void ReplSemiSyncMaster::add_slave() { lock(); @@ -579,10 +609,6 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, reply_file_pos_ = log_file_pos; reply_file_name_inited_ = true; - /* Remove all active transaction nodes before this point. */ - assert(active_tranxs_ != NULL); - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos); - if (trace_level_ & kTraceDetail) { if(!skipped_event) @@ -612,16 +638,14 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, } l_end: - unlock(); if (can_release_threads) { if (trace_level_ & kTraceDetail) sql_print_information("%s: signal all waiting threads.", kWho); - - cond_broadcast(); + active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_); } - + unlock(); return function_exit(kWho, 0); } @@ -648,8 +672,18 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, /* Acquire the mutex. */ lock(); + TranxNode* entry= NULL; + mysql_cond_t* thd_cond= NULL; + if (active_tranxs_) + { + entry= + active_tranxs_->find_active_tranx_node(trx_wait_binlog_name, + trx_wait_binlog_pos); + if (entry) + thd_cond= &entry->cond; + } /* This must be called after acquired the lock */ - THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_, + THD_ENTER_COND(NULL, thd_cond, &LOCK_binlog_, & stage_waiting_for_semi_sync_ack_from_slave, & old_stage); @@ -751,7 +785,11 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, kWho, wait_timeout_, wait_file_name_, (unsigned long)wait_file_pos_); - wait_result = cond_timewait(&abstime); + /* wait for the position to be ACK'ed back */ + assert(entry); + entry->n_waiters++; + wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime); + entry->n_waiters--; rpl_semi_sync_master_wait_sessions--; if (wait_result != 0) @@ -790,14 +828,12 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, } } - /* - At this point, the binlog file and position of this transaction - must have been removed from ActiveTranx. - */ - assert(!getMasterEnabled() || - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name, - trx_wait_binlog_pos)); - l_end: + /* Last waiter removes the TranxNode */ + if (is_on() && active_tranxs_ && entry && entry->n_waiters == 0) + active_tranxs_->clear_active_tranx_nodes(trx_wait_binlog_name, + trx_wait_binlog_pos); + +l_end: /* Update the status counter. */ if (is_on()) rpl_semi_sync_master_yes_transactions++; @@ -838,15 +874,17 @@ int ReplSemiSyncMaster::switch_off() function_enter(kWho); state_ = false; - /* Clear the active transaction list. */ - assert(active_tranxs_ != NULL); - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); - rpl_semi_sync_master_off_times++; wait_file_name_inited_ = false; reply_file_name_inited_ = false; sql_print_information("Semi-sync replication switched OFF."); - cond_broadcast(); /* wake up all waiting threads */ + + /* signal waiting sessions */ + active_tranxs_->signal_waiting_sessions_all(); + + /* Clear the active transaction list. */ + assert(active_tranxs_ != NULL); + result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); return function_exit(kWho, result); } @@ -1234,6 +1272,7 @@ int ReplSemiSyncMaster::resetMaster() rpl_semi_sync_master_trx_wait_time = 0; rpl_semi_sync_master_net_wait_num = 0; rpl_semi_sync_master_net_wait_time = 0; + active_tranxs_->clear_active_tranx_nodes(NULL, 0); unlock();