From 1a03448f4d11881b66a9b70cdf239c85d7b394dd Mon Sep 17 00:00:00 2001 From: Yibo Cai Date: Tue, 24 Sep 2019 10:00:53 +0000 Subject: [PATCH 2/3] rwlock: refine lock->lock_word with C++11 atomics lock_word is core of rwlock implementation. It's a signed integer reflecting current locking status(S, SX, X). The most important changes of this patch are locking and unlocking code. Refine locking(rw_lock_lock_word_decr) with acquire order, and unlocking(rw_lock_lock_word_incr) with release order. Unlocking is followed by lock->waiters checking to signal waiting threads. Incrementing lock->word must happen before checking lock->waiters. After changing unlocking to release order, this sequence is not guaranteed. So adding an explicit acquire fence before loading lock->waiters to fix the problem. Another improvement comes from removing of "volatile" definition, which eliminates some unnecessary memory accesses in code. Change-Id: I5ddd1fa504c3f9672d46d207ea394a756260af82 --- storage/innobase/include/sync0rw.h | 2 +- storage/innobase/include/sync0rw.ic | 95 ++++++++++++++++++++++--------------- storage/innobase/sync/sync0arr.cc | 8 ++-- storage/innobase/sync/sync0rw.cc | 48 +++++++++++-------- 4 files changed, 89 insertions(+), 64 deletions(-) diff --git a/storage/innobase/include/sync0rw.h b/storage/innobase/include/sync0rw.h index f333487..e5db35b 100644 --- a/storage/innobase/include/sync0rw.h +++ b/storage/innobase/include/sync0rw.h @@ -558,7 +558,7 @@ struct rw_lock_t #endif /* UNIV_DEBUG */ { /** Holds the state of the lock. */ - volatile lint lock_word; + os_atomic_t lock_word; /** 1: there are waiters */ volatile ulint waiters; diff --git a/storage/innobase/include/sync0rw.ic b/storage/innobase/include/sync0rw.ic index eb1740b..0be1204 100644 --- a/storage/innobase/include/sync0rw.ic +++ b/storage/innobase/include/sync0rw.ic @@ -105,7 +105,7 @@ void rw_lock_reset_waiter_flag(rw_lock_t *lock) /*!< in/out: rw-lock */ UNIV_INLINE ulint rw_lock_get_writer(const rw_lock_t *lock) /*!< in: rw-lock */ { - lint lock_word = lock->lock_word; + lint lock_word = lock->lock_word.load(order_relaxed); ut_ad(lock_word <= X_LOCK_DECR); if (lock_word > X_LOCK_HALF_DECR) { @@ -132,7 +132,7 @@ ulint rw_lock_get_writer(const rw_lock_t *lock) /*!< in: rw-lock */ UNIV_INLINE ulint rw_lock_get_reader_count(const rw_lock_t *lock) /*!< in: rw-lock */ { - lint lock_word = lock->lock_word; + lint lock_word = lock->lock_word.load(order_relaxed); ut_ad(lock_word <= X_LOCK_DECR); if (lock_word > X_LOCK_HALF_DECR) { @@ -169,7 +169,7 @@ ib_mutex_t *rw_lock_get_mutex(rw_lock_t *lock) { return (&(lock->mutex)); } UNIV_INLINE ulint rw_lock_get_x_lock_count(const rw_lock_t *lock) /*!< in: rw-lock */ { - lint lock_copy = lock->lock_word; + lint lock_copy = lock->lock_word.load(order_relaxed); ut_ad(lock_copy <= X_LOCK_DECR); if (lock_copy == 0 || lock_copy == -X_LOCK_HALF_DECR) { @@ -198,7 +198,7 @@ UNIV_INLINE ulint rw_lock_get_sx_lock_count(const rw_lock_t *lock) /*!< in: rw-lock */ { #ifdef UNIV_DEBUG - lint lock_copy = lock->lock_word; + lint lock_copy = lock->lock_word.load(order_relaxed); ut_ad(lock_copy <= X_LOCK_DECR); @@ -230,21 +230,23 @@ bool rw_lock_lock_word_decr(rw_lock_t *lock, /*!< in/out: rw-lock */ #ifdef INNODB_RW_LOCKS_USE_ATOMICS lint local_lock_word; - os_rmb; - local_lock_word = lock->lock_word; + local_lock_word = lock->lock_word.load(order_acquire); while (local_lock_word > threshold) { - if (os_compare_and_swap_lint(&lock->lock_word, local_lock_word, - local_lock_word - amount)) { + if (lock->lock_word.compare_exchange_weak(local_lock_word, + local_lock_word - amount, + order_acquire)) { return (true); } - local_lock_word = lock->lock_word; } return (false); #else /* INNODB_RW_LOCKS_USE_ATOMICS */ bool success = false; + lint local_lock_word; + mutex_enter(&(lock->mutex)); - if (lock->lock_word > threshold) { - lock->lock_word -= amount; + local_lock_word = lock->lock_word.load(order_relaxed); + if (local_lock_word > threshold) { + lock->lock_word.store(local_lock_word - amount); success = true; } mutex_exit(&(lock->mutex)); @@ -259,14 +261,14 @@ lint rw_lock_lock_word_incr(rw_lock_t *lock, /*!< in/out: rw-lock */ ulint amount) /*!< in: amount of increment */ { #ifdef INNODB_RW_LOCKS_USE_ATOMICS - return (os_atomic_increment_lint(&lock->lock_word, amount)); + return lock->lock_word.fetch_add(amount, order_release) + amount; #else /* INNODB_RW_LOCKS_USE_ATOMICS */ lint local_lock_word; mutex_enter(&(lock->mutex)); - lock->lock_word += amount; - local_lock_word = lock->lock_word; + local_lock_word = lock->lock_word.load(order_relaxed) + amount; + lock->lock_word.store(local_lock_word, order_relaxed); mutex_exit(&(lock->mutex)); @@ -382,13 +384,16 @@ ibool rw_lock_x_lock_func_nowait( ibool success; #ifdef INNODB_RW_LOCKS_USE_ATOMICS - success = os_compare_and_swap_lint(&lock->lock_word, X_LOCK_DECR, 0); + lint x_lock_decr = X_LOCK_DECR; + + success = lock->lock_word.compare_exchange_strong(x_lock_decr, 0, + order_acquire); #else success = FALSE; mutex_enter(&(lock->mutex)); - if (lock->lock_word == X_LOCK_DECR) { - lock->lock_word = 0; + if (lock->lock_word.load(order_relaxed) == X_LOCK_DECR) { + lock->lock_word.store(0, order_relaxed); success = TRUE; } mutex_exit(&(lock->mutex)); @@ -402,19 +407,22 @@ ibool rw_lock_x_lock_func_nowait( /* Relock: this lock_word modification is safe since no other threads can modify (lock, unlock, or reserve) lock_word while there is an exclusive writer and this is the writer thread. */ - if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) { + lint lock_word = lock->lock_word.load(order_relaxed); + + if (lock_word == 0 || lock_word == -X_LOCK_HALF_DECR) { /* There are 1 x-locks */ - lock->lock_word -= X_LOCK_DECR; - } else if (lock->lock_word <= -X_LOCK_DECR) { + lock_word -= X_LOCK_DECR; + } else if (lock_word <= -X_LOCK_DECR) { /* There are 2 or more x-locks */ - lock->lock_word--; + lock_word--; } else { /* Failure */ return (FALSE); } + lock->lock_word.store(lock_word, order_relaxed); /* Watch for too many recursive locks */ - ut_ad(lock->lock_word < 0); + ut_ad(lock_word < 0); } else { /* Failure */ @@ -440,14 +448,16 @@ void rw_lock_s_unlock_func( #endif /* UNIV_DEBUG */ rw_lock_t *lock) /*!< in/out: rw-lock */ { - ut_ad(lock->lock_word > -X_LOCK_DECR); - ut_ad(lock->lock_word != 0); - ut_ad(lock->lock_word < X_LOCK_DECR); + lint lock_word = lock->lock_word.load(order_relaxed); + + ut_ad(lock_word > -X_LOCK_DECR); + ut_ad(lock_word != 0); + ut_ad(lock_word < X_LOCK_DECR); ut_d(rw_lock_remove_debug_info(lock, pass, RW_LOCK_S)); /* Increment lock_word to indicate 1 less reader */ - lint lock_word = rw_lock_lock_word_incr(lock, 1); + lock_word = rw_lock_lock_word_incr(lock, 1); if (lock_word == 0 || lock_word == -X_LOCK_HALF_DECR) { /* wait_ex waiter exists. It may not be asleep, but we signal anyway. We do not wake other waiters, because they can't @@ -468,8 +478,10 @@ void rw_lock_x_unlock_func( #endif /* UNIV_DEBUG */ rw_lock_t *lock) /*!< in/out: rw-lock */ { - ut_ad(lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR || - lock->lock_word <= -X_LOCK_DECR); + lint lock_word = lock->lock_word.load(order_relaxed); + + ut_ad(lock_word == 0 || lock_word == -X_LOCK_HALF_DECR || + lock_word <= -X_LOCK_DECR); /* lock->recursive flag also indicates if lock->writer_thread is valid or stale. If we are the last of the recursive callers @@ -477,14 +489,15 @@ void rw_lock_x_unlock_func( lock->writer_thread is now stale. Note that since we still hold the x-lock we can safely read the lock_word. */ - if (lock->lock_word == 0) { + if (lock_word == 0) { /* Last caller in a possible recursive chain. */ lock->recursive.store(FALSE, order_relaxed); } ut_d(rw_lock_remove_debug_info(lock, pass, RW_LOCK_X)); - if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) { + lock_word = lock->lock_word.load(order_relaxed); + if (lock_word == 0 || lock_word == -X_LOCK_HALF_DECR) { /* There is 1 x-lock */ /* atomic increment is needed, because it is last */ if (rw_lock_lock_word_incr(lock, X_LOCK_DECR) <= 0) { @@ -496,19 +509,20 @@ void rw_lock_x_unlock_func( We need to signal read/write waiters. We do not need to signal wait_ex waiters, since they cannot exist when there is a writer. */ + std::atomic_thread_fence(order_acquire); if (lock->waiters) { rw_lock_reset_waiter_flag(lock); os_event_set(lock->event); sync_array_object_signalled(); } - } else if (lock->lock_word == -X_LOCK_DECR || - lock->lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR)) { + } else if (lock_word == -X_LOCK_DECR || + lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR)) { /* There are 2 x-locks */ - lock->lock_word += X_LOCK_DECR; + lock->lock_word.store(lock_word + X_LOCK_DECR, order_relaxed); } else { /* There are more than 2 x-locks. */ - ut_ad(lock->lock_word < -X_LOCK_DECR); - lock->lock_word += 1; + ut_ad(lock_word < -X_LOCK_DECR); + lock->lock_word.store(lock_word + 1, order_relaxed); } ut_ad(rw_lock_validate(lock)); @@ -531,8 +545,10 @@ void rw_lock_sx_unlock_func( ut_d(rw_lock_remove_debug_info(lock, pass, RW_LOCK_SX)); if (lock->sx_recursive == 0) { + lint lock_word = lock->lock_word.load(order_relaxed); + /* Last caller in a possible recursive chain. */ - if (lock->lock_word > 0) { + if (lock_word > 0) { lock->recursive.store(FALSE, order_relaxed); UNIV_MEM_INVALID(&lock->writer_thread, sizeof lock->writer_thread); @@ -543,6 +559,7 @@ void rw_lock_sx_unlock_func( waiters. We do not need to signal wait_ex waiters, since they cannot exist when there is an sx-lock holder. */ + std::atomic_thread_fence(order_acquire); if (lock->waiters) { rw_lock_reset_waiter_flag(lock); os_event_set(lock->event); @@ -550,9 +567,9 @@ void rw_lock_sx_unlock_func( } } else { /* still has x-lock */ - ut_ad(lock->lock_word == -X_LOCK_HALF_DECR || - lock->lock_word <= -(X_LOCK_DECR + X_LOCK_HALF_DECR)); - lock->lock_word += X_LOCK_HALF_DECR; + ut_ad(lock_word == -X_LOCK_HALF_DECR || + lock_word <= -(X_LOCK_DECR + X_LOCK_HALF_DECR)); + lock->lock_word.store(lock_word + X_LOCK_HALF_DECR, order_relaxed); } } diff --git a/storage/innobase/sync/sync0arr.cc b/storage/innobase/sync/sync0arr.cc index ca95cdb..e096e2f 100644 --- a/storage/innobase/sync/sync0arr.cc +++ b/storage/innobase/sync/sync0arr.cc @@ -545,7 +545,7 @@ static void sync_array_cell_print(FILE *file, /*!< in: file where to print */ "Last time read locked in file %s line %lu\n" "Last time write locked in file %s line %lu\n", rw_lock_get_reader_count(rwlock), rwlock->waiters, - static_cast(rwlock->lock_word), + static_cast(rwlock->lock_word.load(order_relaxed)), innobase_basename(rwlock->last_s_file_name), static_cast(rwlock->last_s_line), rwlock->last_x_file_name, static_cast(rwlock->last_x_line)); @@ -897,7 +897,7 @@ static bool sync_arr_cell_can_wake_up( lock = cell->latch.lock; os_rmb; - if (lock->lock_word > X_LOCK_HALF_DECR) { + if (lock->lock_word.load(order_relaxed) > X_LOCK_HALF_DECR) { /* Either unlocked or only read locked. */ return (true); @@ -911,7 +911,7 @@ static bool sync_arr_cell_can_wake_up( /* lock_word == 0 means all readers or sx have left */ os_rmb; - if (lock->lock_word == 0) { + if (lock->lock_word.load(order_relaxed) == 0) { return (true); } break; @@ -922,7 +922,7 @@ static bool sync_arr_cell_can_wake_up( /* lock_word > 0 means no writer or reserved writer */ os_rmb; - if (lock->lock_word > 0) { + if (lock->lock_word.load(order_relaxed) > 0) { return (true); } } diff --git a/storage/innobase/sync/sync0rw.cc b/storage/innobase/sync/sync0rw.cc index 899be12..4b50541 100644 --- a/storage/innobase/sync/sync0rw.cc +++ b/storage/innobase/sync/sync0rw.cc @@ -221,7 +221,7 @@ void rw_lock_create_func( #endif #endif /* INNODB_RW_LOCKS_USE_ATOMICS */ - lock->lock_word = X_LOCK_DECR; + lock->lock_word.store(X_LOCK_DECR, order_relaxed); lock->waiters = 0; /* We set this value to signify that lock->writer_thread @@ -279,7 +279,7 @@ void rw_lock_free_func(rw_lock_t *lock) /*!< in/out: rw-lock */ { os_rmb; ut_ad(rw_lock_validate(lock)); - ut_a(lock->lock_word == X_LOCK_DECR); + ut_a(lock->lock_word.load(order_relaxed) == X_LOCK_DECR); mutex_enter(&rw_lock_list_mutex); @@ -325,7 +325,8 @@ lock_loop: /* Spin waiting for the writer field to become free */ os_rmb; - while (i < srv_n_spin_wait_rounds && lock->lock_word <= 0) { + while (i < srv_n_spin_wait_rounds && + lock->lock_word.load(order_relaxed) <= 0) { if (srv_spin_wait_delay) { ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); } @@ -430,9 +431,9 @@ void rw_lock_x_lock_wait_func( uint64_t count_os_wait = 0; os_rmb; - ut_ad(lock->lock_word <= threshold); + ut_ad(lock->lock_word.load(order_relaxed) <= threshold); - while (lock->lock_word < threshold) { + while (lock->lock_word.load(order_relaxed) < threshold) { if (srv_spin_wait_delay) { ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); } @@ -454,7 +455,7 @@ void rw_lock_x_lock_wait_func( i = 0; /* Check lock_word to ensure wake-up isn't missed.*/ - if (lock->lock_word < threshold) { + if (lock->lock_word.load(order_relaxed) < threshold) { ++count_os_wait; /* Add debug info as it is needed to detect possible @@ -540,12 +541,15 @@ ibool rw_lock_x_lock_low( } else { /* At least one X lock by this thread already exists. Add another. */ - if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) { - lock->lock_word -= X_LOCK_DECR; + lint lock_word = lock->lock_word.load(order_relaxed); + + if (lock_word == 0 || lock_word == -X_LOCK_HALF_DECR) { + lock_word -= X_LOCK_DECR; } else { - ut_ad(lock->lock_word <= -X_LOCK_DECR); - --lock->lock_word; + ut_ad(lock_word <= -X_LOCK_DECR); + --lock_word; } + lock->lock_word.store(lock_word, order_relaxed); } locked = true; } @@ -618,11 +622,12 @@ ibool rw_lock_sx_lock_low( condition above therefore we must be the only thread working on this lock and it is safe to read and write to the lock_word. */ + lint lock_word = lock->lock_word.load(order_relaxed); - ut_ad((lock->lock_word == 0) || - ((lock->lock_word <= -X_LOCK_DECR) && - (lock->lock_word > -(X_LOCK_DECR + X_LOCK_HALF_DECR)))); - lock->lock_word -= X_LOCK_HALF_DECR; + ut_ad((lock_word == 0) || + ((lock_word <= -X_LOCK_DECR) && + (lock_word > -(X_LOCK_DECR + X_LOCK_HALF_DECR)))); + lock->lock_word.store(lock_word - X_LOCK_HALF_DECR, order_relaxed); } locked = true; } @@ -686,7 +691,8 @@ lock_loop: /* Spin waiting for the lock_word to become free */ os_rmb; - while (i < srv_n_spin_wait_rounds && lock->lock_word <= X_LOCK_HALF_DECR) { + while (i < srv_n_spin_wait_rounds && + lock->lock_word.load(order_relaxed) <= X_LOCK_HALF_DECR) { if (srv_spin_wait_delay) { ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); } @@ -780,7 +786,8 @@ lock_loop: /* Spin waiting for the lock_word to become free */ os_rmb; - while (i < srv_n_spin_wait_rounds && lock->lock_word <= X_LOCK_HALF_DECR) { + while (i < srv_n_spin_wait_rounds && + lock->lock_word.load(order_relaxed) <= X_LOCK_HALF_DECR) { if (srv_spin_wait_delay) { ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); } @@ -844,7 +851,7 @@ bool rw_lock_validate(const rw_lock_t *lock) /*!< in: rw-lock */ ut_ad(lock); waiters = rw_lock_get_waiters(lock); - lock_word = lock->lock_word; + lock_word = lock->lock_word.load(order_relaxed); ut_ad(lock->magic_n == RW_LOCK_MAGIC_N); ut_ad(waiters == 0 || waiters == 1); @@ -905,10 +912,11 @@ void rw_lock_add_debug_info( /* Recursive x while holding SX (lock_type == RW_LOCK_X && lock_word == -X_LOCK_HALF_DECR) is treated as not-relock (new lock). */ + lint lock_word = lock->lock_word.load(order_relaxed); - if ((lock_type == RW_LOCK_X && lock->lock_word < -X_LOCK_HALF_DECR) || + if ((lock_type == RW_LOCK_X && lock_word < -X_LOCK_HALF_DECR) || (lock_type == RW_LOCK_SX && - (lock->lock_word < 0 || lock->sx_recursive == 1))) { + (lock_word < 0 || lock->sx_recursive == 1))) { sync_check_lock_validate(lock); sync_check_lock_granted(lock); } else { @@ -1072,7 +1080,7 @@ void rw_lock_list_print_info(FILE *file) /*!< in: file where to print */ mutex_enter(&lock->mutex); #endif /* INNODB_RW_LOCKS_USE_ATOMICS */ - if (lock->lock_word != X_LOCK_DECR) { + if (lock->lock_word.load(order_relaxed) != X_LOCK_DECR) { fprintf(file, "RW-LOCK: %p ", (void *)lock); if (rw_lock_get_waiters(lock)) { -- 2.7.4