diff --git a/sql/binlog.cc b/sql/binlog.cc index cf68a8d..f9c8dc0 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -1773,7 +1773,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) bool -Stage_manager::Mutex_queue::append(THD *first) +Stage_manager::Mutex_queue::append(THD *first, int *slot) { DBUG_ENTER("Stage_manager::Mutex_queue::append"); lock(); @@ -1784,6 +1784,28 @@ Stage_manager::Mutex_queue::append(THD *first) int32 count= 1; bool empty= (m_first == NULL); *m_last= first; + + if (empty) + { + DBUG_ASSERT(m_first == first); + if (first->stage_cond_id == UNDEF_COND_SLOT) + { + if (unlikely(cond_index < 0)) + { + /* adjust to zero */ + cond_index= 0; + } + + first->stage_cond_id= ((cond_index++)%MAX_STAGE_COND); + } + } + else + { + first->prev_to_commit= m_stage_last; + } + + *slot= m_first->stage_cond_id; + DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx", (ulonglong) m_first, (ulonglong) &m_first, (ulonglong) m_last)); @@ -1800,6 +1822,7 @@ Stage_manager::Mutex_queue::append(THD *first) } my_atomic_add32(&m_size, count); + m_stage_last= first; m_last= &first->next_to_commit; DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx", (ulonglong) m_first, (ulonglong) &m_first, @@ -1810,44 +1833,17 @@ Stage_manager::Mutex_queue::append(THD *first) DBUG_RETURN(empty); } - -std::pair -Stage_manager::Mutex_queue::pop_front() -{ - DBUG_ENTER("Stage_manager::Mutex_queue::pop_front"); - lock(); - THD *result= m_first; - bool more= true; - /* - We do not set next_to_commit to NULL here since this is only used - in the flush stage. We will have to call fetch_queue last here, - and will then "cut" the linked list by setting the end of that - queue to NULL. - */ - if (result) - m_first= result->next_to_commit; - if (m_first == NULL) - { - more= false; - m_last = &m_first; - } - DBUG_ASSERT(my_atomic_load32(&m_size) > 0); - my_atomic_add32(&m_size, -1); - DBUG_ASSERT(m_first || m_last == &m_first); - unlock(); - DBUG_PRINT("return", ("result: 0x%llx, more: %s", - (ulonglong) result, YESNO(more))); - DBUG_RETURN(std::make_pair(more, result)); -} - - bool Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex) { // If the queue was empty: we're the leader for this batch DBUG_PRINT("debug", ("Enqueue 0x%llx to queue for stage %d", (ulonglong) thd, stage)); - bool leader= m_queue[stage].append(thd); + + int slot= UNDEF_COND_SLOT; + bool leader= m_queue[stage].append(thd, &slot); + + DBUG_ASSERT(slot != UNDEF_COND_SLOT); #ifdef HAVE_REPLICATION if (stage == FLUSH_STAGE && has_commit_order_manager(thd)) @@ -1866,6 +1862,9 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex) if (stage_mutex) mysql_mutex_unlock(stage_mutex); + if (leader) + thd->stage_leader= true; + #ifndef DBUG_OFF if (stage == Stage_manager::SYNC_STAGE) DEBUG_SYNC(thd, "bgc_after_enrolling_for_sync_stage"); @@ -1874,12 +1873,13 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex) /* If the queue was not empty, we're a follower and wait for the leader to process the queue. If we were holding a mutex, we have + to release it before going to sleep. */ if (!leader) { - mysql_mutex_lock(&m_lock_done); #ifndef DBUG_OFF + mysql_mutex_lock(&m_lock_preempt); /* Leader can be awaiting all-clear to preempt follower's execution. With setting the status the follower ensures it won't execute anything @@ -1888,10 +1888,19 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex) thd->get_transaction()->m_flags.ready_preempt= 1; if (leader_await_preempt_status) mysql_cond_signal(&m_cond_preempt); + mysql_mutex_unlock(&m_lock_preempt); #endif + mutex_enter_slot(slot); while (thd->get_transaction()->m_flags.pending) - mysql_cond_wait(&m_cond_done, &m_lock_done); - mysql_mutex_unlock(&m_lock_done); + enter_cond_slot(slot); + mutex_exit_slot(slot); + + if (thd->stage_leader) + { + mutex_enter_slot(thd->stage_cond_id); + cond_signal_slot(thd->stage_cond_id); + mutex_exit_slot(thd->stage_cond_id); + } } return leader; } @@ -1905,8 +1914,10 @@ THD *Stage_manager::Mutex_queue::fetch_and_empty() (ulonglong) m_first, (ulonglong) &m_first, (ulonglong) m_last)); THD *result= m_first; + result->prev_to_commit= m_stage_last; m_first= NULL; m_last= &m_first; + m_stage_last= NULL; DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx", (ulonglong) m_first, (ulonglong) &m_first, (ulonglong) m_last)); @@ -1946,11 +1957,24 @@ time_t Stage_manager::wait_count_or_timeout(ulong count, time_t usec, StageID st void Stage_manager::signal_done(THD *queue) { - mysql_mutex_lock(&m_lock_done); - for (THD *thd= queue ; thd ; thd = thd->next_to_commit) - thd->get_transaction()->m_flags.pending= false; - mysql_mutex_unlock(&m_lock_done); - mysql_cond_broadcast(&m_cond_done); + THD* node= queue->prev_to_commit; + THD* prev_node= NULL; + + while(node) + { + prev_node= node->prev_to_commit; + barrier(); + node->get_transaction()->m_flags.pending= false; + + if (node == queue) + break; + + node= prev_node; + } + + mutex_enter_slot(queue->stage_cond_id); + cond_signal_slot(queue->stage_cond_id); + mutex_exit_slot(queue->stage_cond_id); } #ifndef DBUG_OFF @@ -1958,14 +1982,14 @@ void Stage_manager::clear_preempt_status(THD *head) { DBUG_ASSERT(head); - mysql_mutex_lock(&m_lock_done); + mysql_mutex_lock(&m_lock_preempt); while(!head->get_transaction()->m_flags.ready_preempt) { leader_await_preempt_status= true; - mysql_cond_wait(&m_cond_preempt, &m_lock_done); + mysql_cond_wait(&m_cond_preempt, &m_lock_preempt); } leader_await_preempt_status= false; - mysql_mutex_unlock(&m_lock_done); + mysql_mutex_unlock(&m_lock_preempt); } #endif @@ -8717,6 +8741,10 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) thd->get_transaction()->m_flags.xid_written= false; thd->get_transaction()->m_flags.commit_low= !skip_commit; thd->get_transaction()->m_flags.run_hooks= !skip_commit; + thd->stage_leader= false; + thd->stage_cond_id= UNDEF_COND_SLOT; + thd->prev_to_commit= NULL; + #ifndef DBUG_OFF /* The group commit Leader may have to wait for follower whose transaction diff --git a/sql/binlog.h b/sql/binlog.h index 8f76d36..dd454e5 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -37,6 +37,10 @@ struct Gtid; typedef int64 query_id_t; +#define barrier() __asm__ __volatile__("" ::: "memory") +#define MAX_STAGE_COND 128 +#define UNDEF_COND_SLOT -1 + /** Logical timestamp generator for logical timestamping binlog transactions. A transaction is associated with two sequence numbers see @@ -86,7 +90,8 @@ public: friend class Stage_manager; public: Mutex_queue() - : m_first(NULL), m_last(&m_first), m_size(0) + : m_first(NULL), m_last(&m_first), + m_stage_last(NULL), m_size(0) { } @@ -107,7 +112,7 @@ public: } /** Append a linked list of threads to the queue */ - bool append(THD *first); + bool append(THD *first, int *slot); /** Fetch the entire queue for a stage. @@ -116,8 +121,6 @@ public: */ THD *fetch_and_empty(); - std::pair pop_front(); - inline int32 get_size() { return my_atomic_load32(&m_size); @@ -141,9 +144,15 @@ public: */ THD **m_last; + /** The last thd object of the queue*/ + THD *m_stage_last; + /** size of the queue */ int32 m_size; + /** counter for lock partition */ + int cond_index; + /** Lock for protecting the queue. */ mysql_mutex_t m_lock; } __attribute__((aligned(CPU_LEVEL1_DCACHE_LINESIZE))); @@ -177,9 +186,13 @@ public: #endif ) { - mysql_mutex_init(key_LOCK_done, &m_lock_done, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_done, &m_cond_done); + for(int i= 0; i< MAX_STAGE_COND; i++) + { + mysql_mutex_init(key_LOCK_done, &m_lock_done[i], MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_done, &m_cond_done[i]); + } #ifndef DBUG_OFF + mysql_mutex_init(key_LOCK_done, &m_lock_preempt, MY_MUTEX_INIT_FAST); /* reuse key_COND_done 'cos a new PSI object would be wasteful in !DBUG_OFF */ mysql_cond_init(key_COND_done, &m_cond_preempt); #endif @@ -204,8 +217,17 @@ public: { for (size_t i = 0 ; i < STAGE_COUNTER ; ++i) m_queue[i].deinit(); - mysql_cond_destroy(&m_cond_done); - mysql_mutex_destroy(&m_lock_done); + + for(int i= 0; i< MAX_STAGE_COND; i++) + { + mysql_cond_destroy(&m_cond_done[i]); + mysql_mutex_destroy(&m_lock_done[i]); + } + +#ifndef DBUG_OFF + mysql_cond_destroy(&m_cond_preempt); + mysql_mutex_destroy(&m_lock_preempt); +#endif } /** @@ -232,11 +254,6 @@ public: */ bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex); - std::pair pop_front(StageID stage) - { - return m_queue[stage].pop_front(); - } - #ifndef DBUG_OFF /** The method ensures the follower's execution path can be preempted @@ -276,6 +293,26 @@ public: void signal_done(THD *queue); + void mutex_enter_slot(int slot) + { + mysql_mutex_lock(&(m_lock_done[slot])); + } + + void mutex_exit_slot(int slot) + { + mysql_mutex_unlock(&(m_lock_done[slot])); + } + + void enter_cond_slot(int slot) + { + mysql_cond_wait(&(m_cond_done[slot]), &(m_lock_done[slot])); + } + + void cond_signal_slot(int slot) + { + mysql_cond_broadcast(&(m_cond_done[slot])); + } + private: /** Queues for sessions. @@ -287,11 +324,13 @@ private: Mutex_queue m_queue[STAGE_COUNTER]; /** Condition variable to indicate that the commit was processed */ - mysql_cond_t m_cond_done; + mysql_cond_t m_cond_done[MAX_STAGE_COND]; /** Mutex used for the condition variable above */ - mysql_mutex_t m_lock_done; + mysql_mutex_t m_lock_done[MAX_STAGE_COND]; #ifndef DBUG_OFF + mysql_mutex_t m_lock_preempt; + /** Flag is set by Leader when it starts waiting for follower's all-clear */ bool leader_await_preempt_status; diff --git a/sql/sql_class.h b/sql/sql_class.h index d815a1c..0e6d7d1 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2576,6 +2576,15 @@ public: */ THD *next_to_commit; + /* The previous node of commit queue for binary log group commit */ + THD *prev_to_commit; + + /* Stage leader of group commit if true*/ + bool stage_leader; + + /* If this thread is a leader, then allocate a mysql_cond_t for it*/ + int stage_cond_id; + /** The member is served for marking a query that CREATEs or ALTERs a table declared with a TIMESTAMP column as dependent on