diff --git a/sql/binlog.cc b/sql/binlog.cc index ddead6a..afc1d56 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -1668,7 +1668,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) bool -Stage_manager::Mutex_queue::append(THD *first) +Stage_manager::Mutex_queue::append(THD *first, int *slot) { DBUG_ENTER("Stage_manager::Mutex_queue::append"); lock(); @@ -1688,6 +1688,21 @@ Stage_manager::Mutex_queue::append(THD *first) the queue as well. */ + if (empty) + { + DBUG_ASSERT(m_first == first); + + if (first->stage_cond_id == UNDEF_COND_SLOT) + first->stage_cond_id= (++cond_index%MAX_STAGE_COND); + } + + DBUG_ASSERT(m_first && + (m_first->stage_cond_id != UNDEF_COND_SLOT)); + + /* The follower thread will always wait for the leader of + current stage. */ + *slot= m_first->stage_cond_id; + while (first->next_to_commit) { count++; @@ -1742,7 +1757,11 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex) // If the queue was empty: we're the leader for this batch DBUG_PRINT("debug", ("Enqueue 0x%llx to queue for stage %d", (ulonglong) thd, stage)); - bool leader= m_queue[stage].append(thd); + int slot= UNDEF_COND_SLOT; + bool leader= m_queue[stage].append(thd, &slot); + + if (leader) + thd->stage_leader= true; #ifdef HAVE_REPLICATION if (stage == FLUSH_STAGE && has_commit_order_manager(thd)) @@ -1773,8 +1792,8 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex) */ if (!leader) { - mysql_mutex_lock(&m_lock_done); #ifndef DBUG_OFF + mysql_mutex_lock(&m_lock_preempt); /* Leader can be awaiting all-clear to preempt follower's execution. With setting the status the follower ensures it won't execute anything @@ -1783,10 +1802,20 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex) thd->get_transaction()->m_flags.ready_preempt= 1; if (leader_await_preempt_status) mysql_cond_signal(&m_cond_preempt); + mysql_mutex_unlock(&m_lock_preempt); #endif + + mutex_enter_slot(slot); while (thd->get_transaction()->m_flags.pending) - mysql_cond_wait(&m_cond_done, &m_lock_done); - mysql_mutex_unlock(&m_lock_done); + enter_cond_slot(slot); + mutex_exit_slot(slot); + + if (thd->stage_leader) + { + mutex_enter_slot(thd->stage_cond_id); + mutex_exit_slot(thd->stage_cond_id); + cond_signal_slot(thd->stage_cond_id); + } } return leader; } @@ -1841,11 +1870,19 @@ time_t Stage_manager::wait_count_or_timeout(ulong count, time_t usec, StageID st void Stage_manager::signal_done(THD *queue) { - mysql_mutex_lock(&m_lock_done); - for (THD *thd= queue ; thd ; thd = thd->next_to_commit) - thd->get_transaction()->m_flags.pending= false; - mysql_mutex_unlock(&m_lock_done); - mysql_cond_broadcast(&m_cond_done); + THD* node= queue; + THD* next_node= NULL; + + while(node) + { + next_node= node->next_to_commit; + node->get_transaction()->m_flags.pending= false; + node= next_node; + } + + mutex_enter_slot(queue->stage_cond_id); + mutex_exit_slot(queue->stage_cond_id); + cond_signal_slot(queue->stage_cond_id); } #ifndef DBUG_OFF @@ -1853,14 +1890,14 @@ void Stage_manager::clear_preempt_status(THD *head) { DBUG_ASSERT(head); - mysql_mutex_lock(&m_lock_done); + mysql_mutex_lock(&m_lock_preempt); while(!head->get_transaction()->m_flags.ready_preempt) { leader_await_preempt_status= true; - mysql_cond_wait(&m_cond_preempt, &m_lock_done); + mysql_cond_wait(&m_cond_preempt, &m_lock_preempt); } leader_await_preempt_status= false; - mysql_mutex_unlock(&m_lock_done); + mysql_mutex_unlock(&m_lock_preempt); } #endif @@ -8404,6 +8441,8 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) thd->get_transaction()->m_flags.xid_written= false; thd->get_transaction()->m_flags.commit_low= !skip_commit; thd->get_transaction()->m_flags.run_hooks= !skip_commit; + thd->stage_leader= false; + thd->stage_cond_id= UNDEF_COND_SLOT; #ifndef DBUG_OFF /* The group commit Leader may have to wait for follower whose transaction diff --git a/sql/binlog.h b/sql/binlog.h index 7eb1a88..7bc1ff4 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -32,6 +32,9 @@ class Log_event; class Gtid_set; struct Gtid; +#define MAX_STAGE_COND 128 +#define UNDEF_COND_SLOT -1 + /** Logical timestamp generator for logical timestamping binlog transactions. A transaction is associated with two sequence numbers see @@ -102,7 +105,7 @@ public: } /** Append a linked list of threads to the queue */ - bool append(THD *first); + bool append(THD *first, int *slot); /** Fetch the entire queue for a stage. @@ -138,7 +141,8 @@ public: /** size of the queue */ int32 m_size; - + /* Allocated slot in m_lock_done/m_cond_done, only changed at first stage. */ + int cond_index; /** Lock for protecting the queue. */ mysql_mutex_t m_lock; } __attribute__((aligned(CPU_LEVEL1_DCACHE_LINESIZE))); @@ -172,9 +176,13 @@ public: #endif ) { - mysql_mutex_init(key_LOCK_done, &m_lock_done, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_done, &m_cond_done); + for(int i= 0; i< MAX_STAGE_COND; i++) + { + mysql_mutex_init(key_LOCK_done, &m_lock_done[i], MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_done, &m_cond_done[i]); + } #ifndef DBUG_OFF + mysql_mutex_init(key_LOCK_done, &m_lock_preempt, MY_MUTEX_INIT_FAST); /* reuse key_COND_done 'cos a new PSI object would be wasteful in !DBUG_OFF */ mysql_cond_init(key_COND_done, &m_cond_preempt); #endif @@ -199,8 +207,16 @@ public: { for (size_t i = 0 ; i < STAGE_COUNTER ; ++i) m_queue[i].deinit(); - mysql_cond_destroy(&m_cond_done); - mysql_mutex_destroy(&m_lock_done); + + for(int i= 0; i< MAX_STAGE_COND; i++) + { + mysql_cond_destroy(&m_cond_done[i]); + mysql_mutex_destroy(&m_lock_done[i]); + } +#ifndef DBUG_OFF + mysql_cond_destroy(&m_cond_preempt); + mysql_mutex_destroy(&m_lock_preempt); +#endif } /** @@ -271,6 +287,26 @@ public: void signal_done(THD *queue); + void mutex_enter_slot(int slot) + { + mysql_mutex_lock(&(m_lock_done[slot])); + } + + void mutex_exit_slot(int slot) + { + mysql_mutex_unlock(&(m_lock_done[slot])); + } + + void enter_cond_slot(int slot) + { + mysql_cond_wait(&(m_cond_done[slot]), &(m_lock_done[slot])); + } + + void cond_signal_slot(int slot) + { + mysql_cond_broadcast(&(m_cond_done[slot])); + } + private: /** Queues for sessions. @@ -282,14 +318,14 @@ private: Mutex_queue m_queue[STAGE_COUNTER]; /** Condition variable to indicate that the commit was processed */ - mysql_cond_t m_cond_done; + mysql_cond_t m_cond_done[MAX_STAGE_COND]; /** Mutex used for the condition variable above */ - mysql_mutex_t m_lock_done; + mysql_mutex_t m_lock_done[MAX_STAGE_COND]; #ifndef DBUG_OFF /** Flag is set by Leader when it starts waiting for follower's all-clear */ bool leader_await_preempt_status; - + mysql_mutex_t m_lock_preempt; /** Condition variable to indicate a follower started waiting for commit */ mysql_cond_t m_cond_preempt; #endif diff --git a/sql/sql_class.h b/sql/sql_class.h index 9d497d8..74460ef 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2467,6 +2467,11 @@ public: */ THD *next_to_commit; + /*stage leader of group commit if true*/ + bool stage_leader; + /*if this thread is a leader, then allocate a mysql_cond_t for it*/ + int stage_cond_id; + /** Functions to set and get transaction position.