diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index 1ce9a566721..a10dacdcbb3 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -3613,6 +3613,10 @@ struct Buf_fetch { mtr_t *m_mtr{}; /** Mark page as dirty even if page is being pinned without any latch. */ bool m_dirty_with_no_latch{}; + /** Allow reading a freed block and return null */ + bool m_allow_freed{}; + /** True if reading a freed block */ + bool m_freed_flag {}; /** Number of retries before giving up. */ size_t m_retries{}; /** Buffer pool to fetch from. */ @@ -3649,6 +3653,11 @@ dberr_t Buf_fetch_normal::get(buf_block_t *&block) noexcept { if (block != nullptr) { if (block->page.was_stale()) { + if (m_allow_freed) { + rw_lock_s_unlock(m_hash_lock); + return (DB_NOT_FOUND); + } + if (!buf_page_free_stale(m_buf_pool, &block->page, m_hash_lock)) { /* The page is during IO and can't be released. We wait some to not go into loop that would consume CPU. This is not something that will be @@ -3669,6 +3678,10 @@ dberr_t Buf_fetch_normal::get(buf_block_t *&block) noexcept { /* Page not in buf_pool: needs to be read from file */ read_page(); + + if (m_freed_flag) { + return (DB_NOT_FOUND); + } } return DB_SUCCESS; @@ -3700,6 +3713,11 @@ dberr_t Buf_fetch_other::get(buf_block_t *&block) noexcept { if (block != nullptr) { /* Here we have MDL latches making the stale status to not change. */ if (block->page.was_stale()) { + if (m_allow_freed) { + rw_lock_s_unlock(m_hash_lock); + return (DB_NOT_FOUND); + } + if (!buf_page_free_stale(m_buf_pool, &block->page, m_hash_lock)) { /* The page is during IO and can't be released. We wait some to not go into loop that would consume CPU. This is not something that will be @@ -4050,6 +4068,9 @@ void Buf_fetch::read_page() { buf_read_ahead_random(m_page_id, m_page_size, ibuf_inside(m_mtr)); } m_retries = 0; + } else if (m_allow_freed) { + m_freed_flag = true; + return; } else if (m_retries < BUF_PAGE_READ_MAX_RETRIES) { ++m_retries; @@ -4231,7 +4252,7 @@ buf_block_t *Buf_fetch::single_page() { if (static_cast(this)->get(block) == DB_NOT_FOUND) { return (nullptr); } - ut_a(!block->page.was_stale()); + ut_a(!block->page.was_stale() || m_allow_freed); if (is_optimistic()) { const auto bpage = &block->page; @@ -4309,7 +4330,7 @@ buf_block_t *Buf_fetch::single_page() { } #endif /* UNIV_DEBUG */ - ut_ad(is_possibly_freed() || !block->page.file_page_was_freed); + ut_ad(is_possibly_freed() || m_allow_freed || !block->page.file_page_was_freed); /* Check if this is the first access to the page */ const auto access_time = buf_page_is_accessed(&block->page); @@ -4367,7 +4388,7 @@ buf_block_t *Buf_fetch::single_page() { ut_ad(!rw_lock_own(m_hash_lock, RW_LOCK_X)); ut_ad(!rw_lock_own(m_hash_lock, RW_LOCK_S)); - ut_a(!block->page.was_stale()); + ut_a(!block->page.was_stale() || m_allow_freed); return (block); } @@ -4376,7 +4397,7 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, const page_size_t &page_size, ulint rw_latch, buf_block_t *guess, Page_fetch mode, ut::Location location, mtr_t *mtr, - bool dirty_with_no_latch) { + bool dirty_with_no_latch, bool allow_freed) { #ifdef UNIV_DEBUG ut_ad(mtr->is_active()); @@ -4421,6 +4442,8 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, fetch.m_line = location.line; fetch.m_mtr = mtr; fetch.m_dirty_with_no_latch = dirty_with_no_latch; + fetch.m_allow_freed = allow_freed; + fetch.m_freed_flag = false; return (fetch.single_page()); @@ -4434,6 +4457,8 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, fetch.m_line = location.line; fetch.m_mtr = mtr; fetch.m_dirty_with_no_latch = dirty_with_no_latch; + fetch.m_allow_freed = allow_freed; + fetch.m_freed_flag = false; return (fetch.single_page()); } diff --git a/storage/innobase/clone/clone0repl.cc b/storage/innobase/clone/clone0repl.cc index 97736155e44..fbb91d75a2f 100644 --- a/storage/innobase/clone/clone0repl.cc +++ b/storage/innobase/clone/clone0repl.cc @@ -42,6 +42,71 @@ this program; if not, write to the Free Software Foundation, Inc., /* To get current session thread default THD */ THD *thd_get_current_thd(); +static std::atomic s_add_index {0}; +Clone_persist_gtid::Clone_persist_gtid() { + m_event = os_event_create(); + /* No background is created yet. */ + m_thread_active.store(false); + m_gtid_trx_no.store(0); + m_flush_number.store(0); + m_explicit_request.store(false); + m_active_number.store(m_flush_number.load() + 1); + /* We accept GTID even before the background service is started. This + is needed because we add GTIDs from undo log during recovery. */ + m_active.store(true); + m_num_gtid_mem.store(0); + m_flush_in_progress.store(false); + m_close_thread.store(false); + + m_lock = static_cast( + ut::malloc_withkey(UT_NEW_THIS_FILE_PSI_KEY, sizeof(*m_lock))); + rw_lock_create(PFS_NOT_INSTRUMENTED, m_lock, LATCH_ID_CLONE_REPL_LOCK); + + for (uint64_t i = 0; i < GTID_INFO_LIST_MAX_SLOT; i++) { + mutex_create(LATCH_ID_CLONE_REPL_MUTEX, &(m_mutexs[i])); + } +} + +/** Destructor: stop gtid thread */ +Clone_persist_gtid::~Clone_persist_gtid() { + ut_ad(!m_thread_active.load()); + stop(); + os_event_destroy(m_event); + + rw_lock_free(m_lock); + ut::free(m_lock); + + for (uint64_t i = 0; i < GTID_INFO_LIST_MAX_SLOT; i++) { + mutex_free(&(m_mutexs[i])); + } +} + +void Clone_persist_gtid::s_lock() { + rw_lock_s_lock(m_lock, UT_LOCATION_HERE); +} + +void Clone_persist_gtid::s_unlock() { + rw_lock_s_unlock(m_lock); +} + +void Clone_persist_gtid::x_lock() { + rw_lock_x_lock(m_lock, UT_LOCATION_HERE); +} + +void Clone_persist_gtid::x_unlock() { + rw_lock_x_unlock(m_lock); +} + +void Clone_persist_gtid::lock_shard(uint64_t idx) { + ut_a(idx < GTID_INFO_LIST_MAX_SLOT); + mutex_enter(&(m_mutexs[idx])); +} + +void Clone_persist_gtid::unlock_shard(uint64_t idx) { + ut_a(idx < GTID_INFO_LIST_MAX_SLOT); + mutex_exit(&(m_mutexs[idx])); +} + void Clone_persist_gtid::add(const Gtid_desc >id_desc) { /* Check if valid descriptor. */ if (!gtid_desc.m_is_set) { @@ -51,26 +116,32 @@ void Clone_persist_gtid::add(const Gtid_desc >id_desc) { if (!is_active() || gtid_table_persistor == nullptr) { return; } - ut_ad(trx_sys_serialisation_mutex_own()); + s_lock(); /* If too many GTIDs are accumulated, wait for all to get flushed. Ignore timeout and loop to avoid possible hang. The insert should already be slowed down by the wait here. */ if (check_max_gtid_threshold() && is_thread_active()) { - trx_sys_serialisation_mutex_exit(); + s_unlock(); wait_flush(false, false, nullptr); - trx_sys_serialisation_mutex_enter(); + s_lock(); } - ut_ad(trx_sys_serialisation_mutex_own()); + uint64_t idx = (s_add_index.fetch_add(1)) % GTID_INFO_LIST_MAX_SLOT; + lock_shard(idx); + /* Get active GTID list */ - auto ¤t_gtids = get_active_list(); + auto ¤t_gtids = get_active_list(idx); /* Add input GTID to the set */ current_gtids.push_back(gtid_desc); + /* Atomic increment. */ int current_value = ++m_num_gtid_mem; + unlock_shard(idx); + s_unlock(); + /* Wake up background if GTIDs crossed threshold. */ if (current_value == s_gtid_threshold) { os_event_set(m_event); @@ -78,9 +149,7 @@ void Clone_persist_gtid::add(const Gtid_desc >id_desc) { DBUG_EXECUTE_IF("dont_compress_gtid_table", { /* For predictable outcome of mtr test we flush the GTID immediately. */ - trx_sys_serialisation_mutex_exit(); wait_flush(false, false, nullptr); - trx_sys_serialisation_mutex_enter(); }); } @@ -417,17 +486,12 @@ bool Clone_persist_gtid::debug_skip_write(bool compression) { } int Clone_persist_gtid::write_to_table(uint64_t flush_list_number, + uint64_t idx, Gtid_set &table_gtid_set, - Tsid_map &tsid_map) { + Gtid_set &write_gtid_set) { int err = 0; - Gtid_set write_gtid_set(&tsid_map, nullptr); - - /* Allocate some intervals from stack */ - static const int PREALLOCATED_INTERVAL_COUNT = 64; - Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT]; - write_gtid_set.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv); - auto &flush_list = get_list(flush_list_number); + auto &flush_list = get_list(flush_list_number, idx); /* Extract GTIDs from flush list. */ for (auto >id_desc : flush_list) { auto status = RETURN_STATUS_UNREPORTED_ERROR; @@ -458,26 +522,9 @@ int Clone_persist_gtid::write_to_table(uint64_t flush_list_number, return (0); } - bool is_recovery = !m_thread_active.load(); - if (is_recovery) { - /* During recovery, eliminate GTIDs already in gtid_executed table. */ - write_gtid_set.remove_gtid_set(&table_gtid_set); - table_gtid_set.add_gtid_set(&write_gtid_set); - } else { - /* Handle concurrent write by other threads when binlog is enabled. */ - gtid_state->update_prev_gtids(&write_gtid_set); - } - - /* Write GTIDs to table. */ - if (!write_gtid_set.is_empty()) { - ++m_compression_counter; - err = gtid_table_persistor->save(&write_gtid_set, false); - } - /* Clear flush list and return */ flush_list.clear(); ut_ad((m_flush_number + 1) == flush_list_number); - m_flush_number.store(flush_list_number); return (err); } @@ -513,7 +560,7 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { bool explicit_request = m_explicit_request.load(); - trx_sys_serialisation_mutex_enter(); + x_lock(); /* Get oldest transaction number that is yet to be committed. Any transaction with lower transaction number is committed and is added to GTID list. */ auto oldest_trx_no = trx_sys_oldest_trx_no(); @@ -524,8 +571,34 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { /* Switch active list and get the previous list to write to disk table. */ auto flush_list_number = switch_active_list(); /* Exit trx mutex during write to table. */ - trx_sys_serialisation_mutex_exit(); - err = write_to_table(flush_list_number, table_gtid_set, tsid_map); + x_unlock(); + Gtid_set write_gtid_set(&tsid_map, nullptr); + /* Allocate some intervals from stack */ + static const int PREALLOCATED_INTERVAL_COUNT = 64; + Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT]; + write_gtid_set.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv); + + for (uint64_t i = 0; i < GTID_INFO_LIST_MAX_SLOT; i++) { + err = write_to_table(flush_list_number, i, table_gtid_set, write_gtid_set); + } + + bool is_recovery = !m_thread_active.load(); + if (is_recovery) { + /* During recovery, eliminate GTIDs already in gtid_executed table. */ + write_gtid_set.remove_gtid_set(&table_gtid_set); + table_gtid_set.add_gtid_set(&write_gtid_set); + } else { + /* Handle concurrent write by other threads when binlog is enabled. */ + gtid_state->update_prev_gtids(&write_gtid_set); + } + + /* Write GTIDs to table. */ + if (!write_gtid_set.is_empty()) { + ++m_compression_counter; + err = gtid_table_persistor->save(&write_gtid_set, false); + } + + m_flush_number.store(flush_list_number); m_flush_in_progress.store(false); /* Compress always after recovery, if GTIDs are added. */ if (!m_thread_active.load()) { @@ -533,7 +606,7 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { ib::info(ER_IB_CLONE_GTID_PERSIST) << "GTID compression after recovery. "; } } else { - trx_sys_serialisation_mutex_exit(); + x_unlock(); } if (is_recovery) { @@ -577,7 +650,6 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { } bool Clone_persist_gtid::check_max_gtid_threshold() { - ut_ad(trx_sys_serialisation_mutex_own()); /* Allow only one GTID to flush at a time. */ DBUG_EXECUTE_IF("dont_compress_gtid_table", { return m_num_gtid_mem.load() > 0; }); diff --git a/storage/innobase/ddl/ddl0ddl.cc b/storage/innobase/ddl/ddl0ddl.cc index d83102e67dd..7e76abe649e 100644 --- a/storage/innobase/ddl/ddl0ddl.cc +++ b/storage/innobase/ddl/ddl0ddl.cc @@ -253,6 +253,8 @@ dict_index_t *create_index(trx_t *trx, dict_table_t *table, return nullptr; } + trx->add_scn_index(table->id, index->id); + if (dict_index_is_spatial(index)) { index->fill_srid_value(index_def->m_srid, index_def->m_srid_is_valid); } diff --git a/storage/innobase/dict/dict0crea.cc b/storage/innobase/dict/dict0crea.cc index 6a85731fd0f..67f673f8cb5 100644 --- a/storage/innobase/dict/dict0crea.cc +++ b/storage/innobase/dict/dict0crea.cc @@ -381,6 +381,9 @@ void dict_build_index_def(const dict_table_t *table, /*!< in: table */ /* Note that the index was created by this transaction. */ index->trx_id = trx->id; + + /* Init to zero, and will be set while committing the transaction */ + index->trx_scn = 0; } /** Creates an index tree for the index if it is not a member of a cluster. diff --git a/storage/innobase/dict/dict0dd.cc b/storage/innobase/dict/dict0dd.cc index 622ccd56bfa..62cc5aeba9a 100644 --- a/storage/innobase/dict/dict0dd.cc +++ b/storage/innobase/dict/dict0dd.cc @@ -5213,6 +5213,18 @@ dict_table_t *dd_open_table_one(dd::cache::Dictionary_client *client, index->id = id; index->trx_id = trx_id; + trx_id_t scn = scn_mgr->get_scn_fast(trx_id); + if (scn == TRX_ID_MAX) { + /* still active */ + } else if (scn == 0) { + //FIXME: Also persist index scn to DD, otherwise it may + //get lost after reloading + ut_a(scn_mgr->startup_scn() > 0); + scn = scn_mgr->startup_scn(); + } + + index->trx_scn = scn; + /** Look up the spatial reference system in the dictionary. Since this may cause a table open to read the dictionary tables, it must be done while not holding diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc index ff66dc32792..ad68239fd10 100644 --- a/storage/innobase/dict/dict0dict.cc +++ b/storage/innobase/dict/dict0dict.cc @@ -2491,6 +2491,7 @@ dberr_t dict_index_add_to_cache_w_vcol(dict_table_t *table, dict_index_t *index, } new_index->n_total_fields = new_index->n_def; new_index->trx_id = index->trx_id; + new_index->trx_scn = index->trx_scn; new_index->set_committed(index->is_committed()); new_index->allow_duplicates = index->allow_duplicates; new_index->nulls_equal = index->nulls_equal; @@ -2644,6 +2645,44 @@ dberr_t dict_index_add_to_cache_w_vcol(dict_table_t *table, dict_index_t *index, return (DB_SUCCESS); } +/** Get index object by table id and index id +@param[in] table_id table id +@param[in] index_id index id +@return index, NULL if does not exist */ +dict_index_t *dict_index_get_by_id(table_id_t table_id, space_index_t index_id) { + /** Get table object by id */ + mutex_enter(&dict_sys->mutex); + dict_table_t *table = nullptr; + + HASH_SEARCH(id_hash, dict_sys->table_id_hash, ut::hash_uint64(table_id), + dict_table_t *, table, + ut_ad(table->cached), table->id == table_id); + + if (table == nullptr) { + mutex_exit(&dict_sys->mutex); + return nullptr; + } + + table->acquire(); + + mutex_exit(&dict_sys->mutex); + + /* Get the index object by id */ + dict_index_t *index = table->first_index(); + while (index != nullptr) { + if (index->id == index_id) { + /* Table will be released by caller */ + return index; + } + + index = index->next(); + } + + table->release(); + + return nullptr; +} + /** Removes an index from the dictionary cache. */ static void dict_index_remove_from_cache_low( dict_table_t *table, /*!< in/out: table */ diff --git a/storage/innobase/dict/dict0mem.cc b/storage/innobase/dict/dict0mem.cc index 611ef80e89b..257b4e94d75 100644 --- a/storage/innobase/dict/dict0mem.cc +++ b/storage/innobase/dict/dict0mem.cc @@ -612,10 +612,12 @@ bool dict_index_t::is_usable(const trx_t *trx) const { return false; } +// ut_ad(trx_scn > 0 || trx_id == 0 || table->is_intrinsic() || table->is_temporary()); + /* Check if the specified transaction can see this index. */ - return (table->is_temporary() || trx_id == 0 || + return (table->is_temporary() || trx_id == 0 || trx->id == trx_id || !MVCC::is_view_active(trx->read_view) || - trx->read_view->changes_visible(trx_id, table->name)); + (trx_scn > 0 && trx->read_view->sees_version(trx_scn))); } #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index cf964ee2f4b..aae79541a30 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -749,6 +749,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { PSI_MUTEX_KEY(sync_thread_mutex, 0, 0, PSI_DOCUMENT_ME), #endif /* UNIV_DEBUG */ PSI_MUTEX_KEY(trx_undo_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(trx_scn_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_pool_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_pool_manager_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(temp_pool_manager_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -769,6 +770,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { PSI_MUTEX_KEY(trx_sys_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_shard_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_serialisation_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(trx_sys_mvcc_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(zip_pad_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(master_key_id_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(sync_array_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -19165,11 +19167,7 @@ int ha_innobase::external_lock(THD *thd, /*!< in: handle to the user thread */ } else if (trx->isolation_level <= TRX_ISO_READ_COMMITTED && MVCC::is_view_active(trx->read_view)) { - mutex_enter(&trx_sys->mutex); - trx_sys->mvcc->view_close(trx->read_view, true); - - mutex_exit(&trx_sys->mutex); } } @@ -19760,12 +19758,7 @@ THR_LOCK_DATA **ha_innobase::store_lock( MVCC::is_view_active(trx->read_view)) { /* At low transaction isolation levels we let each consistent read set its own snapshot */ - - mutex_enter(&trx_sys->mutex); - trx_sys->mvcc->view_close(trx->read_view, true); - - mutex_exit(&trx_sys->mutex); } } diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc index 6b77bdc4281..2d12363b585 100644 --- a/storage/innobase/handler/handler0alter.cc +++ b/storage/innobase/handler/handler0alter.cc @@ -3267,9 +3267,9 @@ static void online_retry_drop_dict_indexes(dict_table_t *table, bool locked) { /* Since the table has been modified, table->def_trx_id should be adjusted like ddl::drop_indexes(). However, this function may be called before the DDL transaction starts, so it is impossible to - get current DDL transaction ID. Thus advancing def_trx_id by 1 to + get current DDL transaction ID. Thus advancing def_trx_id by 2 to simply inform other threads about this change. */ - ++table->def_trx_id; + table->def_trx_id += 2; reset_column_ord_part(table); } diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index a0dfad5c871..8ea133ea648 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -433,7 +433,14 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, const page_size_t &page_size, ulint rw_latch, buf_block_t *guess, Page_fetch mode, ut::Location location, mtr_t *mtr, - bool dirty_with_no_latch = false); + bool dirty_with_no_latch = false, bool allow_freed = false); + +inline buf_block_t *buf_page_get_allow_freed(const page_id_t &id, const page_size_t &size, + ulint latch, ut::Location location, + mtr_t *mtr) { + return buf_page_get_gen(id, size, latch, nullptr, Page_fetch::NORMAL, + location, mtr, false, true); +} /** NOTE! The following macros should be used instead of buf_page_get_gen, to improve debugging. Only values RW_S_LATCH and RW_X_LATCH are allowed diff --git a/storage/innobase/include/clone0repl.h b/storage/innobase/include/clone0repl.h index f9a745125f3..def2a57e760 100644 --- a/storage/innobase/include/clone0repl.h +++ b/storage/innobase/include/clone0repl.h @@ -54,6 +54,7 @@ using Gtid_info = std::array; struct Gtid_desc; +#define GTID_INFO_LIST_MAX_SLOT 256 /** List of GTIDs */ using Gtid_info_list = std::vector; @@ -71,28 +72,10 @@ struct Gtid_desc { class Clone_persist_gtid { public: /** Constructor: start gtid thread */ - Clone_persist_gtid() { - m_event = os_event_create(); - /* No background is created yet. */ - m_thread_active.store(false); - m_gtid_trx_no.store(0); - m_flush_number.store(0); - m_explicit_request.store(false); - m_active_number.store(m_flush_number.load() + 1); - /* We accept GTID even before the background service is started. This - is needed because we add GTIDs from undo log during recovery. */ - m_active.store(true); - m_num_gtid_mem.store(0); - m_flush_in_progress.store(false); - m_close_thread.store(false); - } + Clone_persist_gtid(); /** Destructor: stop gtid thread */ - ~Clone_persist_gtid() { - ut_ad(!m_thread_active.load()); - stop(); - os_event_destroy(m_event); - } + ~Clone_persist_gtid(); /** Start GTID persistence and background thread. @return true, if successful. */ @@ -199,6 +182,18 @@ class Clone_persist_gtid { Clone_persist_gtid &operator=(Clone_persist_gtid const &) = delete; private: + void x_lock(); + + void x_unlock(); + + void s_lock(); + + void s_unlock(); + + void lock_shard(uint64_t idx); + + void unlock_shard(uint64_t idx); + /** Check if GTID needs to persist at XA prepare. @param[in] thd session THD @param[in,out] trx current innnodb transaction @@ -234,17 +229,16 @@ class Clone_persist_gtid { bool early_timeout, Clone_Alert_Func cbk); /** @return current active GTID list */ - Gtid_info_list &get_active_list() { - ut_ad(trx_sys_serialisation_mutex_own()); - return (get_list(m_active_number)); + Gtid_info_list &get_active_list(uint64_t idx) { + return (get_list(m_active_number, idx)); } /** @return GTID list by number. @param[in] list_number list number @return GTID list reference. */ - Gtid_info_list &get_list(uint64_t list_number) { + Gtid_info_list &get_list(uint64_t list_number, uint64_t idx) { int list_index = (list_number & static_cast(1)); - return (m_gtids[list_index]); + return (m_gtids[list_index][idx]); } /** Check if we need to skip write or compression based on debug variables. @@ -256,7 +250,7 @@ class Clone_persist_gtid { @param[in] compress request compression of GTID table @return flush list number to track and wait for flush to complete. */ uint64_t request_immediate_flush(bool compress) { - trx_sys_serialisation_mutex_enter(); + x_lock(); /* We want to flush all GTIDs. */ uint64_t request_number = m_active_number.load(); /* If no GTIDs added to active, wait for previous index. */ @@ -265,7 +259,7 @@ class Clone_persist_gtid { --request_number; } m_flush_request_number = request_number; - trx_sys_serialisation_mutex_exit(); + x_unlock(); if (compress) { m_explicit_request.store(true); @@ -292,16 +286,11 @@ class Clone_persist_gtid { /** Switch active GTID list. */ uint64_t switch_active_list() { /* Switch active list under transaction system mutex. */ - ut_ad(trx_sys_serialisation_mutex_own()); uint64_t flush_number = m_active_number; ++m_active_number; m_compression_gtid_counter += m_num_gtid_mem; m_num_gtid_mem.store(0); -#ifdef UNIV_DEBUG - /* The new active list must have no elements. */ - auto &active_list = get_active_list(); - ut_ad(active_list.size() == 0); -#endif + return (flush_number); } @@ -310,8 +299,8 @@ class Clone_persist_gtid { @param[in,out] table_gtid_set GTIDs in table during recovery @param[in,out] tsid_map TSID map for GTIDs @return mysql error code. */ - int write_to_table(uint64_t flush_list_number, Gtid_set &table_gtid_set, - Tsid_map &tsid_map); + int write_to_table(uint64_t flush_list_number, uint64_t idx, Gtid_set &table_gtid_set, + Gtid_set &write_gtid_set); /** Update transaction number up to which GTIDs are flushed to table. @param[in] new_gtid_trx_no GTID transaction number */ @@ -343,7 +332,7 @@ class Clone_persist_gtid { /** Two lists of GTID. One of them is active where running transactions add their GTIDs. Other list is used to persist them to table from time to time. */ - Gtid_info_list m_gtids[2]; + Gtid_info_list m_gtids[2][GTID_INFO_LIST_MAX_SLOT]; /** Number of the current GTID list. Increased when list is switched */ std::atomic m_active_number; @@ -384,6 +373,10 @@ class Clone_persist_gtid { /** true, if GTID persistence is active.*/ std::atomic m_active; + + ib_mutex_t m_mutexs[GTID_INFO_LIST_MAX_SLOT]; + + rw_lock_t* m_lock; }; #endif /* CLONE_REPL_INCLUDE */ diff --git a/storage/innobase/include/dict0dict.h b/storage/innobase/include/dict0dict.h index 00aac445b4c..e5144d6d967 100644 --- a/storage/innobase/include/dict0dict.h +++ b/storage/innobase/include/dict0dict.h @@ -1649,6 +1649,12 @@ static inline void dict_allocate_mem_intrinsic_cache(dict_index_t *index); @param[in] table_id table id */ bool dict_table_is_system(table_id_t table_id); +/** Get index object by table id and index id +@param[in] table_id table id +@param[in] index_id index id +@return index, NULL if does not exist */ +dict_index_t *dict_index_get_by_id(table_id_t table_id, space_index_t index_id); + /** Change the table_id of SYS_* tables if they have been created after an earlier upgrade. This will update the table_id by adding DICT_MAX_DD_TABLES */ diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h index 8d62f7801f6..116c0505a5c 100644 --- a/storage/innobase/include/dict0mem.h +++ b/storage/innobase/include/dict0mem.h @@ -1229,6 +1229,9 @@ struct dict_index_t { when InnoDB was started up */ trx_id_t trx_id; + /** SCN of related trx_id */ + trx_id_t trx_scn; + /** Information about state of compression failures and successes */ zip_pad_info_t zip_pad; diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h index 484582ea38f..5de08da91d8 100644 --- a/storage/innobase/include/lock0lock.h +++ b/storage/innobase/include/lock0lock.h @@ -597,6 +597,7 @@ bool lock_clust_rec_cons_read_sees( passed over by a read cursor */ dict_index_t *index, /*!< in: clustered index */ const ulint *offsets, /*!< in: rec_get_offsets(rec, index) */ + btr_pcur_t *pcur, /*!< in: cursor of rec, or NULL */ ReadView *view); /*!< in: consistent read view */ /** Checks that a non-clustered index record is seen in a consistent read. diff --git a/storage/innobase/include/read0read.h b/storage/innobase/include/read0read.h index e7693c93b8b..dbb0f341b3c 100644 --- a/storage/innobase/include/read0read.h +++ b/storage/innobase/include/read0read.h @@ -40,6 +40,8 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0types.h" #include "univ.i" +#define MAX_SNAPSHOT_SIZE 256 + /** The MVCC read view manager */ class MVCC { public: @@ -55,7 +57,7 @@ class MVCC { @param view View owned by this class created for the caller. Must be freed by calling view_close() @param trx Transaction instance of caller */ - void view_open(ReadView *&view, trx_t *trx); + void view_open(ReadView *&view, trx_t *trx, bool is_shared = false); /** Close a view created by the above function. @@ -73,7 +75,7 @@ class MVCC { /** @return the number of active views */ - ulint size() const; + ulint size(); /** @return true if the view is active and valid */ @@ -94,17 +96,20 @@ class MVCC { view->creator_trx_id(id); } + /** Get oldest scn for Purge system */ + void get_oldest_version(ReadView *purge_view); + private: /** Validates a read view list. */ - bool validate() const; + bool validate(uint64_t slot) const; /** Find a free view from the active list, if none found then allocate a new view. This function will also attempt to move delete marked views from the active list to the freed list. @return a view to use */ - inline ReadView *get_view(); + inline ReadView *get_view(uint64_t slot); /** Get the oldest view in the system. It will also move the delete @@ -118,14 +123,26 @@ class MVCC { MVCC &operator=(const MVCC &); private: + void enter(uint64_t slot); + + void exit(uint64_t slot); + + uint64_t get_slot() { + return (m_slot_index++) % MAX_SNAPSHOT_SIZE; + } + + std::atomic m_slot_index; + typedef UT_LIST_BASE_NODE_T(ReadView, m_view_list) view_list_t; /** Free views ready for reuse. */ - view_list_t m_free; + view_list_t m_free[MAX_SNAPSHOT_SIZE]; /** Active and closed views, the closed views will have the creator trx id set to TRX_ID_MAX */ - view_list_t m_views; + view_list_t m_views[MAX_SNAPSHOT_SIZE]; + + ib_mutex_t m_mutexs[MAX_SNAPSHOT_SIZE]; }; #endif /* read0read_h */ diff --git a/storage/innobase/include/read0types.h b/storage/innobase/include/read0types.h index 4eaf1732aab..e75e39215ab 100644 --- a/storage/innobase/include/read0types.h +++ b/storage/innobase/include/read0types.h @@ -34,14 +34,17 @@ this program; if not, write to the Free Software Foundation, Inc., #ifndef read0types_h #define read0types_h +#include #include #include "dict0mem.h" #include "trx0types.h" - +#include "ut0seq_lock.h" // Friend declaration class MVCC; +typedef std::set trx_ids_set_t; + /** Read view lists the trx ids of those transactions for which a consistent read should not see the modifications to the database. */ @@ -156,36 +159,43 @@ class ReadView { @param[in] name table name */ static void check_trx_id_sanity(trx_id_t id, const table_name_t &name); - /** Check whether the changes by id are visible. - @param[in] id transaction id to check against the view - @param[in] name table name - @return whether the view sees the modifications of id. */ - [[nodiscard]] bool changes_visible(trx_id_t id, - const table_name_t &name) const { - ut_ad(id > 0); - - if (id < m_up_limit_id || id == m_creator_trx_id) { - return (true); - } + /** + @param id transaction to check + @return true if view sees transaction id */ + bool sees(trx_id_t id) const { return (id < m_up_limit_id); } - check_trx_id_sanity(id, name); + trx_id_t up_limit_id() { return m_up_limit_id; } - if (id >= m_low_limit_id) { - return (false); + /** Check whether the changes on record are visible. + @param[in] index index object + @param[in] rec clust record + @param[in] offsets offset of the record + @return whether the view sees */ + bool changes_visible( + const dict_index_t *index, + const rec_t *rec, const ulint *offsets); - } else if (m_ids.empty()) { - return (true); + /** + @param scn scn to check + @return true if view sees transaction scn */ + bool sees_version(trx_id_t scn) const { + if (scn == TRX_ID_MAX) return false; + if (m_committing_scns.find(scn) != m_committing_scns.end()) { + /* Being committed while opening read view, always not visible */ + return false; } - const ids_t::value_type *p = m_ids.data(); - - return (!std::binary_search(p, p + m_ids.size(), id)); + return (m_version > scn); } /** - @param id transaction to check - @return true if view sees transaction id */ - bool sees(trx_id_t id) const { return (id < m_up_limit_id); } + @return version number of the view */ + trx_id_t version() { return m_version; } + + /** Store trx pointer which create this read view */ + void set_trx(trx_t *trx) { + m_trx = trx; + } /** Mark the view as closed */ @@ -198,6 +208,14 @@ class ReadView { @return true if the view is closed */ bool is_closed() const { return (m_closed); } + uint64_t get_slot() { + return m_slot; + } + + void set_slot(uint64_t slot) { + m_slot = slot; + } + /** Write the limits to the file. @param file file to write to */ @@ -217,6 +235,10 @@ class ReadView { ut_d(m_view_low_limit_no = m_low_limit_no); m_low_limit_no = trx_no; } + + if (m_low_limit_no < m_version) { + m_version = m_low_limit_no; + } } /** @@ -255,14 +277,8 @@ class ReadView { inline void prepare(trx_id_t id); /** - Copy state from another view. Must call copy_complete() to finish. - @param other view to copy from */ - inline void copy_prepare(const ReadView &other); - - /** - Complete the copy, insert the creator transaction id into the - m_trx_ids too and adjust the m_up_limit_id *, if required */ - inline void copy_complete(); + Copy state from another view. Must call copy_complete() to finish. */ + inline void copy_prepare(trx_id_t version, trx_id_t low_id, trx_id_t up_id); /** Set the creator transaction id, existing id must be 0 */ @@ -296,6 +312,14 @@ class ReadView { was taken */ ids_t m_ids; + /** SCN set that are being committed but not finished yet */ + trx_ids_set_t m_committing_scns; + + /** IDs set that are being committed but not finished yet */ + trx_ids_set_t m_invisible_ids; + + trx_ids_set_t m_long_running_ids; + /** The view does not need to see the undo logs for transactions whose transaction number is strictly smaller (<) than this value: they can be removed in purge if not needed by other views */ @@ -309,9 +333,22 @@ class ReadView { trx_id_t m_view_low_limit_no; #endif /* UNIV_DEBUG */ + /** The transaction that opens this read view */ + trx_t *m_trx; + + /** Version of the snapshot */ + trx_id_t m_version; + + /** The array index of mvcc */ + uint64_t m_slot; + /** AC-NL-RO transaction view that has been "closed". */ bool m_closed; + /** Read view is shared by multiple threads such as + SELECT COUNT(*) */ + bool m_shared; + typedef UT_LIST_NODE_T(ReadView) node_t; /** List of read views in trx_sys */ @@ -319,4 +356,163 @@ class ReadView { node_t m_view_list; }; +#define SCN_MAP_MAX_SIZE (1 * 1024 * 1024) +struct trx_data_t { + trx_id_t id {0}; + trx_id_t scn{0}; +}; + +typedef ut::Seq_lock Trx_seq_with_lock; + +/** A map used to store mapping of trx id to scn. */ +class Scn_Map { + public: + Scn_Map(); + ~Scn_Map(); + + inline void store(trx_id_t id, trx_id_t scn) { + Trx_seq_with_lock &seq = m_datas[(id/2) % m_size]; + seq.locking_write([&](trx_data_t &data) { + data.id = id; + data.scn = scn; + }); + } + + inline trx_id_t read(trx_id_t id) { + Trx_seq_with_lock &seq = m_datas[(id/2) % m_size]; + trx_data_t data; + + data = seq.read([](const trx_data_t &stored_data) { + return trx_data_t{stored_data.id, stored_data.scn}; + }); + + if (id == data.id) { + return data.scn; + } else { + return 0; + } + } + + private: + Trx_seq_with_lock *m_datas; + uint64_t m_size; +}; + +/** Handler of SCN Manager */ +class SCN_Mgr { +public: + /** Constructer */ + SCN_Mgr(); + + /** Destructor */ + ~SCN_Mgr(); + + void init(); + + trx_id_t startup_scn() { + return m_startup_scn; + } + + /** + @return true if it's SCN number */ + static bool is_scn(trx_id_t id) { + return ((id & 1) != 0); + } + + /** Store scn of the transaction for fast lookup + @param[in] id transaction id + @param[in] scn transaction no while committing. */ + void store_scn(trx_id_t id, trx_id_t scn) { + m_scn_map->store(id, scn); + } + + /** Quickly lookup scn of relative transaction id + @param[in] id transaction id + @return TRX_ID_MAX if still active, or 0 if not found, or scn value */ + trx_id_t get_scn_fast(trx_id_t id, trx_id_t *version = nullptr); + + /** Get SCN with relative transaction id + @param[in] id transaction id + @param[in] index index object where roll_ptr resides on + @param[in] roll_ptr rollback pointer of clust record */ + trx_id_t get_scn(trx_id_t id, const dict_index_t *index, roll_ptr_t roll_ptr, trx_id_t *version = nullptr); + + /** Get offset where scn is stored + @param[in] index index object + @param[in] offsets offset array of the clust record + @return offset where scn is stored */ + ulint scn_offset(const dict_index_t *index, const ulint *offsets); + + /** Start background threads */ + void start(); + + /** Stop background threads */ + void stop(); + + /* Periodically generate safe up limit id for taking snapshot */ + void view_task(); + + /**@return min active transaction id. This is not + an accurate number */ + trx_id_t min_active_id() { + return m_min_active_id.load(std::memory_order_relaxed); + } + + void take_up_ids(trx_id_t &up_id, trx_ids_set_t &slow_ids); + +private: + void x_lock() { + rw_lock_x_lock(m_lock, UT_LOCATION_HERE); + } + + void x_unlock() { + rw_lock_x_unlock(m_lock); + } + + void s_lock() { + rw_lock_s_lock(m_lock, UT_LOCATION_HERE); + } + + void s_unlock() { + rw_lock_s_unlock(m_lock); + } + + /** Storing trx id->scn mapping */ + Scn_Map *m_scn_map; + + /** Storing trx id->scn mapping to avoid duplicate + looking up */ + Scn_Map *m_random_map; + + /** up transaction id on startup */ + trx_id_t m_startup_id; + + /** SCN number taken on startup */ + trx_id_t m_startup_scn; + + /* Transaction ids that may be running too long, while + calculating m_min_active_id, these transactions will be ignored */ + trx_ids_set_t m_long_running_ids; + + /** thread event */ + os_event_t m_view_event; + + std::atomic m_has_slow_ids; + + /** Min active transaction id */ + std::atomic m_min_active_id; + + std::atomic m_fast_min_active_id; + + /** Flag to tell if background threads should stop or not */ + std::atomic m_abort; + + /** True if thread is active */ + std::atomic m_view_active; + + rw_lock_t* m_lock; +}; + +extern SCN_Mgr *scn_mgr; + #endif diff --git a/storage/innobase/include/row0pread.h b/storage/innobase/include/row0pread.h index 6cecee578e2..c1667d56411 100644 --- a/storage/innobase/include/row0pread.h +++ b/storage/innobase/include/row0pread.h @@ -606,7 +606,7 @@ class Parallel_reader::Scan_ctx { built from the undo log. @param[in,out] mtr Mini-transaction covering the read. @return true if row is visible to the transaction. */ - [[nodiscard]] bool check_visibility(const rec_t *&rec, ulint *&offsets, + [[nodiscard]] bool check_visibility(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr); /** Create an execution context for a range and add it to @@ -726,9 +726,9 @@ class Parallel_reader::Ctx { built from the undo log. @param[in,out] mtr Mini-transaction covering the read. @return true if row is visible to the transaction. */ - bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, + bool is_rec_visible(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { - return m_scan_ctx->check_visibility(rec, offsets, heap, mtr); + return m_scan_ctx->check_visibility(block, rec, offsets, heap, mtr); } private: diff --git a/storage/innobase/include/row0purge.h b/storage/innobase/include/row0purge.h index b8fe1e7440e..58a5b59541d 100644 --- a/storage/innobase/include/row0purge.h +++ b/storage/innobase/include/row0purge.h @@ -101,6 +101,9 @@ struct purge_node_t { /** Trx that created this undo record */ trx_id_t modifier_trx_id; + + /** Trx no of modifier_trx_id */ + trx_id_t modifier_trx_no; }; using Recs = std::list>; diff --git a/storage/innobase/include/sync0sync.h b/storage/innobase/include/sync0sync.h index 2052199296b..fb7967e8acc 100644 --- a/storage/innobase/include/sync0sync.h +++ b/storage/innobase/include/sync0sync.h @@ -156,6 +156,7 @@ extern mysql_pfs_key_t srv_monitor_file_mutex_key; extern mysql_pfs_key_t sync_thread_mutex_key; #endif /* UNIV_DEBUG */ extern mysql_pfs_key_t trx_undo_mutex_key; +extern mysql_pfs_key_t trx_scn_mutex_key; extern mysql_pfs_key_t trx_mutex_key; extern mysql_pfs_key_t trx_pool_mutex_key; extern mysql_pfs_key_t trx_pool_manager_mutex_key; @@ -166,6 +167,7 @@ extern mysql_pfs_key_t lock_wait_mutex_key; extern mysql_pfs_key_t trx_sys_mutex_key; extern mysql_pfs_key_t trx_sys_shard_mutex_key; extern mysql_pfs_key_t trx_sys_serialisation_mutex_key; +extern mysql_pfs_key_t trx_sys_mvcc_mutex_key; extern mysql_pfs_key_t srv_sys_mutex_key; extern mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/storage/innobase/include/sync0types.h b/storage/innobase/include/sync0types.h index c891effe436..d0a8d4c9144 100644 --- a/storage/innobase/include/sync0types.h +++ b/storage/innobase/include/sync0types.h @@ -417,6 +417,7 @@ enum latch_id_t { LATCH_ID_SRV_MONITOR_FILE, LATCH_ID_SYNC_THREAD, LATCH_ID_TRX_UNDO, + LATCH_ID_TRX_SCN, LATCH_ID_TRX_POOL, LATCH_ID_TRX_POOL_MANAGER, LATCH_ID_TEMP_POOL_MANAGER, @@ -424,6 +425,7 @@ enum latch_id_t { LATCH_ID_TRX_SYS, LATCH_ID_TRX_SYS_SHARD, LATCH_ID_TRX_SYS_SERIALISATION, + LATCH_ID_TRX_SYS_MVCC, LATCH_ID_SRV_SYS, LATCH_ID_SRV_SYS_TASKS, LATCH_ID_PAGE_ZIP_STAT_PER_INDEX, @@ -465,6 +467,9 @@ enum latch_id_t { LATCH_ID_DBLR, LATCH_ID_REDO_LOG_ARCHIVE_ADMIN_MUTEX, LATCH_ID_REDO_LOG_ARCHIVE_QUEUE_MUTEX, + LATCH_ID_CLONE_REPL_LOCK, + LATCH_ID_CLONE_REPL_MUTEX, + LATCH_ID_SCN_MGR_LOCK, LATCH_ID_TEST_MUTEX, LATCH_ID_MAX = LATCH_ID_TEST_MUTEX }; diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index 002b4b9e54f..523b9c11990 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -81,6 +81,7 @@ void trx_purge_add_update_undo_to_history( bool update_rseg_history_len, /*!< in: if true: update rseg history len else skip updating it. */ + bool is_insert, /*!< in: true if it's insert undo */ ulint n_added_logs, /*!< in: number of logs added */ mtr_t *mtr); /*!< in: mtr */ @@ -134,6 +135,9 @@ struct purge_iter_t { /** The transaction that created the undo log record, the Modifier trx id */ trx_id_t modifier_trx_id; + + /** Commit no of transaction with modifier_trx_id */ + trx_id_t modifier_trx_no; }; /* Namespace to hold all the related functions and variables needed @@ -1017,6 +1021,11 @@ struct trx_purge_t { /** The purge will not remove undo logs which are >= this view (purge view) */ ReadView view; + /** The scn of purge system before which undo can be purged */ + std::atomic version; + + std::atomic min_up_id; + /** Count of total tasks submitted to the task queue */ ulint n_submitted; diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h index 96d5e0f43c0..d73fe6f27e6 100644 --- a/storage/innobase/include/trx0rec.h +++ b/storage/innobase/include/trx0rec.h @@ -318,6 +318,15 @@ constexpr uint32_t TRX_UNDO_UPD_EXTERN = 128; constexpr uint32_t TRX_UNDO_INSERT_OP = 1; constexpr uint32_t TRX_UNDO_MODIFY_OP = 2; +/* store + - trx id(8 bytes) + why not 6 bytes ? because it need to be logged + and have no log type for 6 bytes), + - hdr page no (4 bytes), + - log hdr offset(2 bytes) +at end of page */ +#define TRX_UNDO_PAGE_RESERVE_SIZE 14 + /** The type and compilation info flag in the undo record for update. For easier understanding let the 8 bits be numbered as 7, 6, 5, 4, 3, 2, 1, 0. */ @@ -372,6 +381,24 @@ const byte *trx_undo_rec_get_pars( table_id_t *table_id, /*!< out: table id */ type_cmpl_t &type_cmpl); /*!< out: type compilation info. */ +bool trx_undo_rec_get_hdr( + trx_id_t id, + trx_undo_rec_t *undo_rec, + page_no_t &undo_hdr_no, + uint32_t &offset); + +trx_id_t trx_undo_hdr_get_scn( + trx_id_t trx_id, + page_id_t &page_id, + uint32_t offset, + mtr_t *mtr, + page_t *undo_page); + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id); + /** Get the max free space of undo log by assuming it's a fresh new page and the free space doesn't count for the undo log header too. */ size_t trx_undo_max_free_space(); diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h index cf91a7919bb..cf6a08d6496 100644 --- a/storage/innobase/include/trx0sys.h +++ b/storage/innobase/include/trx0sys.h @@ -138,11 +138,6 @@ the trx_sys_serialisation_mutex must be acquired. @return new, allocated trx no */ inline trx_id_t trx_sys_allocate_trx_no(); -/** Retrieves a next value that will be allocated if trx_sys_allocate_trx_id() -or trx_sys_allocate_trx_id_trx_no() was called. -@return the next trx->id or trx->no that will be allocated */ -inline trx_id_t trx_sys_get_next_trx_id_or_no(); - #ifdef UNIV_DEBUG /* Flag to control TRX_RSEG_N_SLOTS behavior debugging. */ extern uint trx_rseg_n_slots_debug; @@ -218,7 +213,7 @@ void trx_sys_close(void); /** Determine if there are incomplete transactions in the system. @return whether incomplete transactions need rollback */ -static inline bool trx_sys_need_rollback(); +bool trx_sys_need_rollback(); /** Reads number of recovered transactions which have state equal to TRX_STATE_ACTIVE (so are not prepared transactions). @@ -243,12 +238,6 @@ static inline void trx_sys_rw_trx_add(trx_t *trx); #endif /* !UNIV_HOTBACKUP */ -#ifdef UNIV_DEBUG -/** Validate the trx_sys_t::rw_trx_list. - @return true if the list is valid */ -bool trx_sys_validate_trx_list(); -#endif /* UNIV_DEBUG */ - /** Initialize trx_sys_undo_spaces, called once during srv_start(). */ void trx_sys_undo_spaces_init(); @@ -366,7 +355,7 @@ class Space_Ids : public std::vector> { }; /** Number of shards created for transactions. */ -constexpr size_t TRX_SHARDS_N = 256; +constexpr size_t TRX_SHARDS_N = 512; /** Computes shard number for a given trx_id. @param[in] trx_id trx_id for which shard_no should be computed @@ -377,6 +366,32 @@ inline size_t trx_get_shard_no(trx_id_t trx_id) { } #ifndef UNIV_HOTBACKUP +struct lock_t; +struct dict_table_t; +struct trx_i_s_cache_t; + +class Trx_commit_serialisation_list { + public: + void init_list() { + UT_LIST_INIT(serialisation_list); + } + + /** Add trx_t to serialisation_list */ + void add_list(trx_t &trx); + + /** Remove trx_t from serialisation_list */ + void remove_list(trx_t &trx); + + /** Collect transaction id and no from serialisation_list */ + void collect_commit_ids(trx_ids_set_t &id_set, trx_ids_set_t &no_set, trx_id_t limit); + + trx_id_t min_id() const { return m_min_id.load();} + + UT_LIST_BASE_NODE_T(trx_t, no_list) serialisation_list; + + std::atomic m_min_id{0}; +}; + class Trx_by_id_with_min { struct Trx_track_hash { size_t operator()(const trx_id_t &key) const { @@ -395,9 +410,16 @@ class Trx_by_id_with_min { @see trx_rw_is_active for details.*/ std::atomic m_min_id{0}; + trx_ids_set_t m_long_running_ids; + + trx_id_t m_fast_min_id; + public: + By_id const &by_id() const { return m_by_id; } trx_id_t min_id() const { return m_min_id.load(); } + trx_t *get(const XID *xid); + trx_t *get(trx_id_t trx_id) const { const auto it = m_by_id.find(trx_id); trx_t *trx = it == m_by_id.end() ? nullptr : it->second; @@ -411,15 +433,140 @@ class Trx_by_id_with_min { void insert(trx_t &trx) { const trx_id_t trx_id = trx.id; ut_ad(0 == m_by_id.count(trx_id)); + trx.start_rw_time = std::chrono::steady_clock::now(); m_by_id.emplace(trx_id, &trx); if (m_by_id.size() == 1 || trx_id < m_min_id.load(std::memory_order_relaxed)) { m_min_id.store(trx_id, std::memory_order_release); + if (trx_id > m_fast_min_id) { + m_fast_min_id = trx_id; + } + } + } + + /** Collect prepared transaction ids */ + void collect_prepared_ids(std::vector &trx_ids); + + /** Free prepared transactions on shutdown */ + void free_prepared(); + + /** Count of row for crash recovery */ + uint64_t recovered_rows() { + uint64_t count = 0; + for (auto item : m_by_id) { + const trx_t *trx = item.second; + ut_ad(trx->is_recovered); + if (trx_state_eq(trx, TRX_STATE_ACTIVE)) { + count += trx->undo_no; + } + } + + return count; + } + + /** Recover prepared transactions and add to list */ + void recover_prepared(XA_recover_txn *txn_list, MEM_ROOT *mem_root, ulint &index, ulint limit); + + /** Recover xa transactions */ + void recover_tc(Xa_state_list &xa_list); + + /** Count of active transactions */ + uint64_t size() { return m_by_id.size(); } + + /** Count of recovered active transactions */ + size_t recovered_active_count(); + + /** Fetch data of transaction into cache */ + bool fetch_data(trx_i_s_cache_t *cache); +#ifdef UNIV_DEBUG + void validate_table_lock(); + + const lock_t *table_locks_lookup(const dict_table_t *table); + + bool holds_expl_lock(ulint precise_mode, const buf_block_t *block, ulint heap_no, const trx_t *impl_trx); +#endif + + /** Remove locks on table */ + ulint recovered_trx_record_locks(dict_table_t *table); + + /** Collect recovered transaction ids */ + void collect_recovered_ids(std::vector &ids); + + /** Collect transaction ids of internel transactrion */ + void collect_internel_ids(std::vector &ids) { + for (auto item : m_by_id) { + const trx_t *trx = item.second; + if (trx->mysql_thd == nullptr) { + ut_a(trx->id > 0); + ids.push_back(trx->id); + } } } + + void get_min_with_slow(trx_id_t &real_min, trx_id_t &fast_min, trx_ids_set_t &slow_sets, trx_id_t limit) { + if (m_by_id.empty()) { + /* Will be ignored by caller */ + real_min = 0; + fast_min = 0; + + return; + } + + real_min = m_min_id.load(std::memory_order_relaxed); + ut_ad(m_fast_min_id != 0); + ut_ad(m_by_id.count(real_min) == 1); + + size_t long_ids_size = m_long_running_ids.size(); + size_t total_size = m_by_id.size(); + ut_ad(long_ids_size <= total_size); + + auto time_now = std::chrono::steady_clock::now(); + while (long_ids_size < total_size) { + auto itr = m_by_id.find(m_fast_min_id); + if (itr == m_by_id.end()) { + m_fast_min_id += TRX_SHARDS_N; + continue; + } + + trx_t *trx = itr->second; + ut_ad(trx != nullptr); + + //Not in heavy workload + if (limit < trx->id + 1024) { + break; + } + + if (time_now - trx->start_rw_time > std::chrono::seconds{10}) { + /* Find a long running transaction */ + m_long_running_ids.insert(m_fast_min_id); + long_ids_size++; + trx->in_long_set = true; + + m_fast_min_id += TRX_SHARDS_N; + } else { + break; + } + } + + slow_sets = m_long_running_ids; + if (long_ids_size == total_size) { + /* All transactions in set are slow, ignore it by setting to zero */ + fast_min = 0; + } else { + fast_min = m_fast_min_id; + } + } + void erase(trx_id_t trx_id) { ut_ad(1 == m_by_id.count(trx_id)); - m_by_id.erase(trx_id); + auto itr = m_by_id.find(trx_id); + if (itr->second->in_long_set) { + m_long_running_ids.erase(trx_id); + itr->second->in_long_set = false; + } + + m_by_id.erase(itr); + if (m_min_id.load(std::memory_order_relaxed) == trx_id) { // We want at most 1 release store, so we use a local variable for the // loop. @@ -436,6 +583,9 @@ class Trx_by_id_with_min { } } m_min_id.store(new_min, std::memory_order_release); + if (new_min > m_fast_min_id) { + m_fast_min_id = new_min; + } } } }; @@ -447,6 +597,9 @@ struct Trx_shard { Use latch_and_execute() interface to access other members. */ ut::Cacheline_padded> active_rw_trxs; + + ut::Cacheline_padded> + commit_rw_trxs; }; /** The transaction system central memory data structure. */ @@ -492,7 +645,11 @@ struct trx_sys_t { trx_sys_t::serialisation_mutex. Note: it might be in parallel used for both trx->id and trx->no assignments (for different trx_t objects). */ - std::atomic next_trx_id_or_no; + std::atomic next_trx_id; + + std::atomic next_trx_id_version; + + std::atomic next_trx_scn; /** @} */ @@ -530,12 +687,6 @@ struct trx_sys_t { /** Mutex protecting most fields in this structure (the default one). */ TrxSysMutex mutex; - char pad5[ut::INNODB_CACHE_LINE_SIZE]; - - /** List of active and committed in memory read-write transactions, sorted - on trx id, biggest first. Recovered transactions are always on this list. */ - UT_LIST_BASE_NODE_T(trx_t, trx_list) rw_trx_list; - char pad6[ut::INNODB_CACHE_LINE_SIZE]; /** List of transactions created for MySQL. All user transactions are @@ -545,19 +696,13 @@ struct trx_sys_t { been started in InnoDB. */ UT_LIST_BASE_NODE_T(trx_t, mysql_trx_list) mysql_trx_list; - /** Array of Read write transaction IDs for MVCC snapshot. A ReadView would - take a snapshot of these transactions whose changes are not visible to it. - We should remove transactions from the list before committing in memory and - releasing locks to ensure right order of removal and consistent snapshot. */ - trx_ids_t rw_trx_ids; - char pad7[ut::INNODB_CACHE_LINE_SIZE]; /** Mapping from transaction id to transaction instance. */ Trx_shard shards[TRX_SHARDS_N]; /** Number of transactions currently in the XA PREPARED state. */ - ulint n_prepared_trx; + std::atomic n_prepared_trx; /** True if XA PREPARED trxs are found. */ bool found_prepared_trx; @@ -578,6 +723,22 @@ struct trx_sys_t { }, loc); } + + trx_id_t get_next_trx_id() { + return next_trx_id.fetch_add(2); + } + + trx_id_t get_max_trx_id() { + return next_trx_id.load(); + } + + trx_id_t get_next_trx_scn() { + return next_trx_scn.fetch_add(2); + } + + trx_id_t get_max_trx_scn() { + return next_trx_scn.load(); + } }; #endif /* !UNIV_HOTBACKUP */ @@ -606,22 +767,8 @@ static inline void trx_sys_mutex_exit() { trx_sys->mutex.exit(); } /** Test if trx_sys->mutex is owned. */ static inline bool trx_sys_mutex_own() { return trx_sys->mutex.is_owned(); } -/** Test if trx_sys->serialisation_mutex is owned. */ -static inline bool trx_sys_serialisation_mutex_own() { - return trx_sys->serialisation_mutex.is_owned(); -} #endif -/** Acquire the trx_sys->serialisation_mutex. */ -static inline void trx_sys_serialisation_mutex_enter() { - mutex_enter(&trx_sys->serialisation_mutex); -} - -/** Release the trx_sys->serialisation_mutex. */ -static inline void trx_sys_serialisation_mutex_exit() { - trx_sys->serialisation_mutex.exit(); -} - #endif /* !UNIV_HOTBACKUP */ #include "trx0sys.ic" diff --git a/storage/innobase/include/trx0sys.ic b/storage/innobase/include/trx0sys.ic index d23e51b28c5..248f52fcc09 100644 --- a/storage/innobase/include/trx0sys.ic +++ b/storage/innobase/include/trx0sys.ic @@ -210,7 +210,11 @@ static inline trx_t *trx_rw_is_active(trx_id_t trx_id, bool do_ref_count) { was set to larger than trx_id, so we decide to return nullptr, even though, if we were to repeat the call in just a moment we might get a different result if the trx_sys_rw_trx_add() for the trx_id happens meanwhile. */ - ut_ad(trx_id < trx_sys_get_next_trx_id_or_no()); + if (SCN_Mgr::is_scn(trx_id)) { + return nullptr; + } + + ut_ad(trx_id < trx_sys->get_max_trx_id()); auto &shard = trx_sys->get_shard_by_trx_id(trx_id); if (trx_id < shard.active_rw_trxs.peek().min_id()) { return nullptr; @@ -233,12 +237,8 @@ inline trx_id_t trx_sys_get_trx_id_write_margin() { /** Allocates a new transaction id or transaction number. @return new, allocated trx id or trx no */ -inline trx_id_t trx_sys_allocate_trx_id_or_no() { - ut_ad(trx_sys_mutex_own() || trx_sys_serialisation_mutex_own()); - - trx_id_t trx_id = trx_sys->next_trx_id_or_no.fetch_add(1); - - if (trx_id % trx_sys_get_trx_id_write_margin() == 0) { +inline void trx_sys_flush_trx_id_or_no(trx_id_t id) { + if (id % trx_sys_get_trx_id_write_margin() == 0) { /* Reserve the next range of trx_id values. This thread has acquired either the trx_sys_mutex or the trx_sys_serialisation_mutex. @@ -252,17 +252,18 @@ inline trx_id_t trx_sys_allocate_trx_id_or_no() { trx_sys_write_max_trx_id(); } - return trx_id; } inline trx_id_t trx_sys_allocate_trx_id() { - ut_ad(trx_sys_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + trx_id_t id = trx_sys->get_next_trx_id(); + trx_sys_flush_trx_id_or_no(id); + return id; } inline trx_id_t trx_sys_allocate_trx_no() { - ut_ad(trx_sys_serialisation_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + trx_id_t scn = trx_sys->get_next_trx_scn(); + trx_sys_flush_trx_id_or_no(scn + 1); + return scn; } /** Reads trx->no up to which all transactions have been serialised. @@ -271,26 +272,6 @@ static inline trx_id_t trx_get_serialisation_min_trx_no(void) { return (trx_sys->serialisation_min_trx_no.load()); } -inline trx_id_t trx_sys_get_next_trx_id_or_no() { - return trx_sys->next_trx_id_or_no.load(); -} - -/** Determine if there are incomplete transactions in the system. -@return whether incomplete transactions need rollback */ -static inline bool trx_sys_need_rollback() { - ulint n_trx; - - trx_sys_mutex_enter(); - - n_trx = UT_LIST_GET_LEN(trx_sys->rw_trx_list); - ut_ad(n_trx >= trx_sys->n_prepared_trx); - n_trx -= trx_sys->n_prepared_trx; - - trx_sys_mutex_exit(); - - return (n_trx > 0); -} - static inline void trx_sys_rw_trx_add(trx_t *trx) { const trx_id_t trx_id = trx->id; ut_ad(trx_id != 0); diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 8edbdb09e1e..4be2a4ddb8f 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -55,6 +55,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0read.h" #include "sql/handler.h" // Xa_state_list #include "srv0srv.h" +#include +#include +#include /* std::vector to store the trx id & table id of tables that needs to be * rollbacked. We take SHARED MDL on these tables inside @@ -211,7 +214,7 @@ void trx_mark_sql_stat_end(trx_t *trx); /*!< in: trx handle */ /** Assigns a read view for a consistent read query. All the consistent reads within the same transaction will get the same read view, which is created when this function is first called for a new started transaction. */ -ReadView *trx_assign_read_view(trx_t *trx); /*!< in: active transaction */ +ReadView *trx_assign_read_view(trx_t *trx, bool is_shared = false); /*!< in: active transaction */ /** @return the transaction's read view or NULL if one not assigned. */ static inline ReadView *trx_get_read_view(trx_t *trx); @@ -272,7 +275,6 @@ static inline void trx_set_dict_operation(trx_t *trx, enum trx_dict_op_t op); /** Determines if a transaction is in the given state. The caller must hold trx_sys->mutex, or it must be the thread that is serving a running transaction. -A running RW transaction must be in trx_sys->rw_trx_list. @param[in] trx Transaction. @param[in] state State. @return true if trx->state == state */ @@ -672,6 +674,7 @@ enum trx_rseg_type_t { TRX_RSEG_TYPE_NOREDO /*!< non-redo rollback segment. */ }; +typedef std::vector> SCNIndexIds; struct trx_t { enum isolation_level_t { @@ -705,6 +708,9 @@ struct trx_t { `lock`, which are protected by lock_sys latches) */ mutable TrxMutex mutex; + /** Mutex protecting allocating && assign trx scn */ + TrxMutex scn_mutex; + /* Note: in_depth was split from in_innodb for fixing a RO performance issue. Acquiring the trx_t::mutex for each row costs ~3% in performance. It is not required for correctness. @@ -783,13 +789,13 @@ struct trx_t { list. During this switch we assign it a rollback segment. When a transaction is NOT_STARTED, it can be in_mysql_trx_list if - it is a user transaction. It cannot be in rw_trx_list. + it is a user transaction. It cannot be in trx shard. - ACTIVE->PREPARED->COMMITTED is only possible when trx->in_rw_trx_list. + ACTIVE->PREPARED->COMMITTED is only possible when trx is in trx shard. The transition ACTIVE->PREPARED is protected by trx_sys->mutex. ACTIVE->COMMITTED is possible when the transaction is in - rw_trx_list. + trx shard. Transitions to COMMITTED are protected by trx->mutex. @@ -828,9 +834,6 @@ struct trx_t { Set to true when srv_is_being_started for recovered transactions. Set to false without any protection in trx_init (where no other thread should access this object anyway). - Can be read safely when holding trx_sys->mutex and trx belongs to rw_trx_list, - as trx_init can not be called until trx leaves rw_trx_list which requires the - trx_sys->mutex. */ bool is_recovered; @@ -904,6 +907,7 @@ struct trx_t { change, and must flush */ bool in_truncate; /* This trx is doing truncation */ + bool in_long_set; /* Fields protected by the srv_conc_mutex. */ bool declared_to_be_inside_innodb; /*!< this is true if we have declared @@ -926,6 +930,8 @@ struct trx_t { std::chrono::system_clock::time_point{}}; static_assert(decltype(start_time)::is_always_lock_free); + std::chrono::steady_clock::time_point start_rw_time; + lsn_t commit_lsn; /*!< lsn at the time of the commit */ /*------------------------------*/ @@ -950,11 +956,6 @@ struct trx_t { statement uses, except those in consistent read */ /*------------------------------*/ -#ifdef UNIV_DEBUG - /** True iff in trx_sys->rw_trx_list */ - bool in_rw_trx_list; - -#endif /* UNIV_DEBUG */ UT_LIST_NODE_T(trx_t) mysql_trx_list; /*!< list of transactions created for MySQL; protected by trx_sys->mutex */ @@ -1102,6 +1103,13 @@ struct trx_t { #endif /* UNIV_DEBUG */ ulint magic_n; + /** Index ids set that need to set its SCN numnber */ + SCNIndexIds scn_indexs; + + void add_scn_index(table_id_t table_id, space_index_t index_id) { + scn_indexs.push_back(std::make_pair(table_id, index_id)); + } + bool is_read_uncommitted() const { return (isolation_level == READ_UNCOMMITTED); } @@ -1168,14 +1176,6 @@ static inline void check_trx_state(const trx_t *t) { ut_error; } -/** -Assert that the transaction is in the trx_sys_t::rw_trx_list */ -static inline void assert_trx_in_rw_list(const trx_t *t) { - ut_ad(!t->read_only); - ut_ad(t->in_rw_trx_list == !(t->read_only || !t->rsegs.m_redo.rseg)); - check_trx_state(t); -} - /** Check if transaction is free so that it can be re-initialized. @param t transaction handle */ static inline void assert_trx_is_free(const trx_t *t) { @@ -1198,14 +1198,13 @@ static inline void assert_trx_is_inactive(const trx_t *t) { #ifdef UNIV_DEBUG /** Assert that an autocommit non-locking select cannot be in the - rw_trx_list and that it is a read-only transaction. + trx shard and that it is a read-only transaction. The transaction must be in the mysql_trx_list. */ static inline void assert_trx_nonlocking_or_in_list(const trx_t *t) { if (trx_is_autocommit_non_locking(t)) { trx_state_t t_state = t->state; ut_ad(t->read_only); ut_ad(!t->is_recovered); - ut_ad(!t->in_rw_trx_list); ut_ad(t->in_mysql_trx_list); ut_ad(t_state == TRX_STATE_NOT_STARTED || t_state == TRX_STATE_FORCED_ROLLBACK || t_state == TRX_STATE_ACTIVE); @@ -1215,7 +1214,7 @@ static inline void assert_trx_nonlocking_or_in_list(const trx_t *t) { } #else /* UNIV_DEBUG */ /** Assert that an autocommit non-locking select cannot be in the - rw_trx_list and that it is a read-only transaction. + trx shard and that it is a read-only transaction. The transaction must be in the mysql_trx_list. */ #define assert_trx_nonlocking_or_in_list(trx) ((void)0) #endif /* UNIV_DEBUG */ diff --git a/storage/innobase/include/trx0trx.ic b/storage/innobase/include/trx0trx.ic index 675dc81ca40..7d309a28552 100644 --- a/storage/innobase/include/trx0trx.ic +++ b/storage/innobase/include/trx0trx.ic @@ -59,8 +59,6 @@ static inline bool trx_state_eq(const trx_t *trx, trx_state_t state) { ut_a(state == TRX_STATE_NOT_STARTED || state == TRX_STATE_FORCED_ROLLBACK); - ut_ad(!trx->in_rw_trx_list); - return true; } ut_error; diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h index e90650cafca..2e9036fbbb2 100644 --- a/storage/innobase/include/trx0types.h +++ b/storage/innobase/include/trx0types.h @@ -44,6 +44,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include #include #include +#include /** printf(3) format used for printing DB_TRX_ID and other system fields */ #define TRX_ID_FMT IB_ID_FMT diff --git a/storage/innobase/include/trx0undo.h b/storage/innobase/include/trx0undo.h index 2cd648a12e1..0820e04f44e 100644 --- a/storage/innobase/include/trx0undo.h +++ b/storage/innobase/include/trx0undo.h @@ -152,6 +152,7 @@ trx_undo_rec_t *trx_undo_get_next_rec( @param[in,out] mtr Mini-transaction @return undo log record, the page latched, NULL if none */ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, + trx_id_t *modifier_trx_no, space_id_t space, const page_size_t &page_size, page_no_t page_no, ulint offset, @@ -227,7 +228,8 @@ ulint trx_undo_lists_init( @return undo log segment header page, x-latched */ page_t *trx_undo_set_state_at_finish( trx_undo_t *undo, /*!< in: undo log memory copy */ - mtr_t *mtr); /*!< in: mtr */ + mtr_t *mtr, /*!< in: mtr */ + bool is_temp = false); /*!< in: true if it's tmp undo */ /** Set the state of the undo log segment at a XA PREPARE or XA ROLLBACK. @param[in,out] trx Transaction @@ -257,7 +259,7 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr); /** Frees an insert undo log after a transaction commit or rollback. @@ -388,6 +390,7 @@ struct trx_undo_t { field */ trx_id_t trx_id; /*!< id of the trx assigned to the undo log */ + trx_id_t trx_no; /*!< commit no of trx with trx_id */ XID xid; /*!< X/Open XA transaction identification */ ulint flag; /*!< flag for current transaction XID and GTID. diff --git a/storage/innobase/include/trx0undo.ic b/storage/innobase/include/trx0undo.ic index 05bb932c4d4..1c91836009e 100644 --- a/storage/innobase/include/trx0undo.ic +++ b/storage/innobase/include/trx0undo.ic @@ -139,7 +139,12 @@ static inline page_t *trx_undo_page_get_s_latched(const page_id_t &page_id, const page_size_t &page_size, mtr_t *mtr) { buf_block_t *block = - buf_page_get(page_id, page_size, RW_S_LATCH, UT_LOCATION_HERE, mtr); + buf_page_get_allow_freed(page_id, page_size, RW_S_LATCH, UT_LOCATION_HERE, mtr); + + if (block == nullptr) { + return nullptr; + } + buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE); return (buf_block_get_frame(block)); diff --git a/storage/innobase/include/ut0guarded.h b/storage/innobase/include/ut0guarded.h index 231f32b668a..63dd9a17b62 100644 --- a/storage/innobase/include/ut0guarded.h +++ b/storage/innobase/include/ut0guarded.h @@ -54,6 +54,11 @@ class Guarded { return std::forward(f)(inner); } + template + auto execute_no_latch(F &&f) { + return std::forward(f)(inner); + } + const Inner &peek() const { return inner; } }; } // namespace ut diff --git a/storage/innobase/include/ut0seq_lock.h b/storage/innobase/include/ut0seq_lock.h index 0a1fc4436c5..9c2ca5806ba 100644 --- a/storage/innobase/include/ut0seq_lock.h +++ b/storage/innobase/include/ut0seq_lock.h @@ -73,6 +73,24 @@ class Seq_lock : private Non_copyable { op(m_value); m_seq.store(old + 2, std::memory_order_release); } + + template + void locking_write(Op &&op) { + auto old = m_seq.load(std::memory_order_relaxed); + while (true) { + if (old & 1) { + std::this_thread::yield(); + old = m_seq.load(std::memory_order_relaxed); + } else if (m_seq.compare_exchange_weak(old, old + 1)) { + break; + } + } + + std::atomic_thread_fence(std::memory_order_release); + op(m_value); + m_seq.store(old + 2, std::memory_order_release); + } + /* Reads the value of the stored value of type T using operation op(). The op() can use memory_order_relaxed loads. The op() can't assume the data stored inside T is logically consistent. Calls to this method don't need to be diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 309dec0cee4..d1c4133284a 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -220,7 +220,13 @@ bool lock_check_trx_id_sanity(trx_id_t trx_id, const rec_t *rec, const dict_index_t *index, const ulint *offsets) { ut_ad(rec_offs_validate(rec, index, offsets)); - trx_id_t next_trx_id = trx_sys_get_next_trx_id_or_no(); + trx_id_t next_trx_id; + if (SCN_Mgr::is_scn(trx_id)) { + next_trx_id = trx_sys->get_max_trx_scn(); + } else { + next_trx_id = trx_sys->get_max_trx_id(); + } + bool is_ok = trx_id < next_trx_id; if (!is_ok) { @@ -238,6 +244,7 @@ bool lock_clust_rec_cons_read_sees( passed over by a read cursor */ dict_index_t *index, /*!< in: clustered index */ const ulint *offsets, /*!< in: rec_get_offsets(rec, index) */ + btr_pcur_t *pcur, /*!< in: cursor of rec, or NULL */ ReadView *view) /*!< in: consistent read view */ { ut_ad(index->is_clustered()); @@ -253,12 +260,7 @@ bool lock_clust_rec_cons_read_sees( return (true); } - /* NOTE that we call this function while holding the search - system latch. */ - - trx_id_t trx_id = row_get_rec_trx_id(rec, index, offsets); - - return (view->changes_visible(trx_id, index->table->name)); + return (view->changes_visible(index, rec, offsets)); } /** Checks that a non-clustered index record is seen in a consistent read. @@ -964,34 +966,25 @@ The difficulties to keep in mind here: the seen trx_id is still active or not */ static bool can_older_trx_be_still_active(trx_id_t max_old_active_id) { - if (mutex_enter_nowait(&trx_sys->mutex) != 0) { - ut_ad(!trx_sys_mutex_own()); - /* The mutex is currently locked by somebody else. Instead of wasting time - on spinning and waiting to acquire it, we loop over the shards and check if - any of them contains a value in the range (-infinity,max_old_active_id]. - NOTE: Do not be tempted to "cache" the minimum, until you also enforce that - transactions are inserted to shards in a monotone order! - Current implementation heavily depends on the property that even if we put - a trx with smaller id to any structure later, it could not have modified a - row the caller saw earlier. */ - static_assert(TRX_SHARDS_N < 1000, "The loop should be short"); - for (auto &shard : trx_sys->shards) { - if (shard.active_rw_trxs.peek().min_id() <= max_old_active_id) { - return true; - } + ut_ad(!trx_sys_mutex_own()); + /* The mutex is currently locked by somebody else. Instead of wasting time + on spinning and waiting to acquire it, we loop over the shards and check if + any of them contains a value in the range (-infinity,max_old_active_id]. + NOTE: Do not be tempted to "cache" the minimum, until you also enforce that + transactions are inserted to shards in a monotone order! + Current implementation heavily depends on the property that even if we put + a trx with smaller id to any structure later, it could not have modified a + row the caller saw earlier. */ + static_assert(TRX_SHARDS_N < 1000, "The loop should be short"); + for (auto &shard : trx_sys->shards) { + trx_id_t min_id = shard.active_rw_trxs.peek().min_id(); + if (min_id == 0 || SCN_Mgr::is_scn(min_id)) continue; + if (min_id <= max_old_active_id) { + return true; } - return false; - } - ut_ad(trx_sys_mutex_own()); - const trx_t *trx = UT_LIST_GET_LAST(trx_sys->rw_trx_list); - if (trx == nullptr) { - trx_sys_mutex_exit(); - return false; } - assert_trx_in_rw_list(trx); - const trx_id_t min_active_now_id = trx->id; - trx_sys_mutex_exit(); - return min_active_now_id <= max_old_active_id; + + return false; } /** Checks if some transaction has an implicit x-lock on a record in a secondary @@ -1041,6 +1034,23 @@ static trx_t *lock_sec_rec_some_has_impl(const rec_t *rec, dict_index_t *index, } #ifdef UNIV_DEBUG +bool Trx_by_id_with_min::holds_expl_lock( + ulint precise_mode, const buf_block_t *block, ulint heap_no, const trx_t *impl_trx) { + + for (auto item : m_by_id) { + const trx_t* trx = item.second; + const lock_t *expl_lock = + lock_rec_has_expl(precise_mode, block, heap_no, trx); + if (expl_lock && expl_lock->trx != impl_trx) { + /* An explicit lock is held by trx other than + the trx holding the implicit lock. */ + return true; + } + } + + return false; +} + /** Checks if some transaction, other than given trx_id, has an explicit lock on the given rec, in the given precise_mode. @param[in] precise_mode LOCK_S or LOCK_X possibly ORed to LOCK_GAP or @@ -1067,21 +1077,19 @@ static bool lock_rec_other_trx_holds_expl(ulint precise_mode, const trx_t *trx, the transaction was not committed yet. */ if (trx_t *impl_trx = trx_rw_is_active(trx->id, false)) { ulint heap_no = page_rec_get_heap_no(rec); - mutex_enter(&trx_sys->mutex); - for (auto t : trx_sys->rw_trx_list) { - const lock_t *expl_lock = - lock_rec_has_expl(precise_mode, block, heap_no, t); + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + holds = + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.holds_expl_lock(precise_mode, block, heap_no, impl_trx); + }, UT_LOCATION_HERE); - if (expl_lock && expl_lock->trx != impl_trx) { - /* An explicit lock is held by trx other than - the trx holding the implicit lock. */ - holds = true; + if (holds) { break; } } - - mutex_exit(&trx_sys->mutex); } return (holds); @@ -4248,29 +4256,14 @@ static void lock_remove_all_on_table_for_trx( trx_mutex_exit(trx); } -/** Remove any explicit record locks held by recovering transactions on - the table. - @return number of recovered transactions examined */ -static ulint lock_remove_recovered_trx_record_locks( - dict_table_t *table) /*!< in: check if there are any locks - held on records in this table or on the - table itself */ -{ - ut_a(table != nullptr); - /* We need exclusive lock_sys latch, as we are about to iterate over locks - held by multiple transactions while they might be operating. */ - ut_ad(locksys::owns_exclusive_global_latch()); - +ulint Trx_by_id_with_min::recovered_trx_record_locks(dict_table_t *table) { ulint n_recovered_trx = 0; - - mutex_enter(&trx_sys->mutex); - - for (trx_t *trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - + for (auto item : m_by_id) { + trx_t *trx = item.second; if (!trx->is_recovered) { continue; } + /* We need trx->mutex to iterate over trx->lock.trx_lock and it is needed by lock_table_remove_low() but we haven't acquired it yet. */ ut_ad(!trx_mutex_own(trx)); @@ -4305,7 +4298,32 @@ static ulint lock_remove_recovered_trx_record_locks( ++n_recovered_trx; } - mutex_exit(&trx_sys->mutex); + return n_recovered_trx; +} + +/** Remove any explicit record locks held by recovering transactions on + the table. + @return number of recovered transactions examined */ +static ulint lock_remove_recovered_trx_record_locks( + dict_table_t *table) /*!< in: check if there are any locks + held on records in this table or on the + table itself */ +{ + ut_a(table != nullptr); + /* We need exclusive lock_sys latch, as we are about to iterate over locks + held by multiple transactions while they might be operating. */ + ut_ad(locksys::owns_exclusive_global_latch()); + + ulint n_recovered_trx = 0; + + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + n_recovered_trx += + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.recovered_trx_record_locks(table); + }, UT_LOCATION_HERE); + } return (n_recovered_trx); } @@ -4503,7 +4521,7 @@ void lock_print_info_summary(FILE *file) { file); fprintf(file, "Trx id counter " TRX_ID_FMT "\n", - trx_sys_get_next_trx_id_or_no()); + trx_sys->get_max_trx_id()); fprintf(file, "Purge done for trx's n:o < " TRX_ID_FMT " undo n:o < " TRX_ID_FMT @@ -4609,64 +4627,6 @@ class TrxLockIterator { ulint m_index; }; -/** This iterates over RW trx_sys lists only. We need to keep -track where the iterator was up to and we do that using an ordinal value. */ - -class TrxListIterator { - public: - TrxListIterator() : m_index() { - /* We iterate over the RW trx list only. */ - - m_trx_list = &trx_sys->rw_trx_list; - } - - /** Get the current transaction whose ordinality is m_index. - @return current transaction or 0 */ - - const trx_t *current() { return (reposition()); } - - /** Advance the transaction current ordinal value and reset the - transaction lock ordinal value */ - - void next() { - ++m_index; - m_lock_iter.rewind(); - } - - TrxLockIterator &lock_iter() { return (m_lock_iter); } - - private: - /** Reposition the "cursor" on the current transaction. If it - is the first time then the "cursor" will be positioned on the - first transaction. - - @return transaction instance or 0 */ - const trx_t *reposition() const { - ulint i = 0; - - /* Make the transaction at the ordinal value of m_index - the current transaction. ie. reposition/restore */ - - for (auto trx : *m_trx_list) { - if (i++ == m_index) { - return trx; - } - check_trx_state(trx); - } - - return nullptr; - } - - /** Ordinal value of the transaction in the current transaction list */ - ulint m_index; - - /** Current transaction list */ - decltype(trx_sys->rw_trx_list) *m_trx_list; - - /** For iterating over a transaction's locks */ - TrxLockIterator m_lock_iter; -}; - /** Prints transaction lock wait and MVCC state. @param[in,out] file file where to print @param[in] trx transaction */ @@ -4818,64 +4778,62 @@ void lock_print_info_all_transactions(FILE *file) { fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n"); - mutex_enter(&trx_sys->mutex); + std::vector active_ids; + active_ids.clear(); + + trx_sys_mutex_enter(); /* First print info on non-active transactions */ /* NOTE: information of auto-commit non-locking read-only transactions will be omitted here. The information will be available from INFORMATION_SCHEMA.INNODB_TRX. */ + for (auto trx : trx_sys->mysql_trx_list) { + /* We require exclusive access to lock_sys */ + ut_ad(locksys::owns_exclusive_global_latch()); + ut_ad(trx->in_mysql_trx_list); - PrintNotStarted print_not_started(file); - ut_list_map(trx_sys->mysql_trx_list, print_not_started); - - const trx_t *trx; - TrxListIterator trx_iter; - const trx_t *prev_trx = nullptr; - - /* Control whether a block should be fetched from the buffer pool. */ - bool load_block = true; - bool monitor = srv_print_innodb_lock_monitor; - - while ((trx = trx_iter.current()) != nullptr) { - check_trx_state(trx); - - if (trx != prev_trx) { - lock_trx_print_wait_and_mvcc_state(file, trx); - prev_trx = trx; - - /* The transaction that read in the page is no - longer the one that read the page in. We need to - force a page read. */ - load_block = true; + /* See state transitions and locking rules in trx0trx.h */ + trx_mutex_enter(trx); + if (trx_state_eq(trx, TRX_STATE_NOT_STARTED)) { + fputs("---", file); + trx_print_latched(file, trx, 600); } - /* If we need to print the locked record contents then we - need to fetch the containing block from the buffer pool. */ - if (monitor) { - /* Print the locks owned by the current transaction. */ - TrxLockIterator &lock_iter = trx_iter.lock_iter(); - - if (!lock_trx_print_locks(file, trx, lock_iter, load_block)) { - /* Resync trx_iter, the trx_sys->mutex and exclusive global latch were - temporarily released. A page was successfully read in. We need to print - its contents on the next call to lock_trx_print_locks(). On the next - call to lock_trx_print_locks() we should simply print the contents of - the page just read in.*/ - load_block = false; - - continue; - } + //TBD: Is it thread safe ? + trx_id_t id = trx->id; + if (id > 0) { + active_ids.push_back(id); } + trx_mutex_exit(trx); + } - load_block = true; + trx_sys_mutex_exit(); - /* All record lock details were printed without fetching - a page from disk, or we didn't need to print the detail. */ - trx_iter.next(); + /* Collect internel transaction ids*/ + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.collect_internel_ids(active_ids); + }, + UT_LOCATION_HERE); } - mutex_exit(&trx_sys->mutex); + bool monitor = srv_print_innodb_lock_monitor; + for (auto id : active_ids) { + trx_sys->latch_and_execute_with_active_trx(id, [&](trx_t *trx) { + if (trx == nullptr) return; + check_trx_state(trx); + lock_trx_print_wait_and_mvcc_state(file, trx); + + if (monitor) { + TrxLockIterator lock_iter; + //FIXME: load block is disabled + lock_trx_print_locks(file, trx, lock_iter, false); + } + }, UT_LOCATION_HERE); + } } #ifdef UNIV_DEBUG @@ -4887,13 +4845,8 @@ static bool lock_table_queue_validate( { /* We actually hold exclusive latch here, but we require just the shard */ ut_ad(locksys::owns_table_shard(*table)); - ut_ad(trx_sys_mutex_own()); for (auto lock : table->locks) { - /* lock->trx->state cannot change to NOT_STARTED until transaction released - its table locks and that is prevented here by the locksys shard's mutex. */ - ut_ad(trx_assert_started(lock->trx)); - if (!lock_get_wait(lock)) { ut_a(!lock_table_other_has_incompatible(lock->trx, 0, table, lock_get_mode(lock))); @@ -5074,13 +5027,9 @@ static void lock_rec_validate_page( mutex_exit(&trx_sys->mutex); } -/** Validates the table locks. */ -static void lock_validate_table_locks() { - /* We need exclusive access to lock_sys to iterate over trxs' locks */ - ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); - - for (const trx_t *trx : trx_sys->rw_trx_list) { +void Trx_by_id_with_min::validate_table_lock() { + for (auto item : m_by_id) { + const trx_t *trx = item.second; check_trx_state(trx); for (const lock_t *lock : trx->lock.trx_locks) { @@ -5091,6 +5040,21 @@ static void lock_validate_table_locks() { } } +/** Validates the table locks. */ +static void lock_validate_table_locks() { + /* We need exclusive access to lock_sys to iterate over trxs' locks */ + ut_ad(locksys::owns_exclusive_global_latch()); + + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.validate_table_lock(); + }, + UT_LOCATION_HERE); + } +} + /** Validate a record lock's block */ static void lock_rec_block_validate(const page_id_t &page_id) { /* The lock and the block that it is referring to may be freed at @@ -5128,7 +5092,6 @@ bool lock_validate() { /* lock_validate_table_locks() needs exclusive global latch, and we will inspect record locks from all shards */ locksys::Global_exclusive_latch_guard guard{UT_LOCATION_HERE}; - mutex_enter(&trx_sys->mutex); lock_validate_table_locks(); @@ -5141,8 +5104,6 @@ bool lock_validate() { pages.emplace(lock->rec_lock.page_id); return false; }); - - mutex_exit(&trx_sys->mutex); } std::for_each(pages.cbegin(), pages.cend(), lock_rec_block_validate); @@ -5930,7 +5891,6 @@ void lock_trx_release_locks(trx_t *trx) /*!< in/out: transaction */ check_trx_state(trx); ut_ad(trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)); - ut_ad(!trx->in_rw_trx_list); if (trx_is_referenced(trx)) { while (trx_is_referenced(trx)) { @@ -6003,20 +5963,9 @@ bool lock_cancel_if_waiting_and_release(const TrxVersion trx_version) { } #ifdef UNIV_DEBUG -/** Scans all locks of all transactions in the rw_trx_list searching for any -lock (table or rec) against the table. -@param[in] table the table for which we perform the search -@return lock if found */ -static const lock_t *lock_table_locks_lookup(const dict_table_t *table) { - ut_a(table != nullptr); - /* We are going to iterate over multiple transactions, so even though we know - which table we are looking for we can not narrow required latch to just the - shard which contains the table, because accessing trx->lock.trx_locks would be - unsafe */ - ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); - - for (auto trx : trx_sys->rw_trx_list) { +const lock_t * Trx_by_id_with_min::table_locks_lookup(const dict_table_t *table) { + for (auto item : m_by_id) { + trx_t *trx = item.second; check_trx_state(trx); for (auto lock : trx->lock.trx_locks) { @@ -6034,6 +5983,35 @@ static const lock_t *lock_table_locks_lookup(const dict_table_t *table) { } } + return nullptr; +} + +/** Scans all locks of all transactions in the rw_trx_list searching for any +lock (table or rec) against the table. +@param[in] table the table for which we perform the search +@return lock if found */ +static const lock_t *lock_table_locks_lookup(const dict_table_t *table) { + ut_a(table != nullptr); + /* We are going to iterate over multiple transactions, so even though we know + which table we are looking for we can not narrow required latch to just the + shard which contains the table, because accessing trx->lock.trx_locks would be + unsafe */ + ut_ad(locksys::owns_exclusive_global_latch()); + + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + const lock_t* lock = + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.table_locks_lookup(table); + }, + UT_LOCATION_HERE); + + if (lock != nullptr) { + return lock; + } + } + return (nullptr); } #endif /* UNIV_DEBUG */ diff --git a/storage/innobase/mtr/mtr0log.cc b/storage/innobase/mtr/mtr0log.cc index ddedbd1bf06..0eb2b532883 100644 --- a/storage/innobase/mtr/mtr0log.cc +++ b/storage/innobase/mtr/mtr0log.cc @@ -383,10 +383,6 @@ const byte *mlog_parse_string( ulint offset; ulint len; - ut_a(!page || !page_zip || - (fil_page_get_type(page) != FIL_PAGE_INDEX && - fil_page_get_type(page) != FIL_PAGE_RTREE)); - if (end_ptr < ptr + 4) { return (nullptr); } @@ -406,6 +402,10 @@ const byte *mlog_parse_string( return (nullptr); } + ut_a(!page || !page_zip || len == 6 || + (fil_page_get_type(page) != FIL_PAGE_INDEX && + fil_page_get_type(page) != FIL_PAGE_RTREE)); + if (page) { if (page_zip) { memcpy(((page_zip_des_t *)page_zip)->data + offset, ptr, len); diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index 01f379b96fe..8d33c3d76ae 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -2381,7 +2381,8 @@ void page_cur_delete_rec( part of the buffer pool. */ if (mtr != nullptr) { - buf_block_modify_clock_inc(page_cur_get_block(cursor)); + buf_block_t *block = page_cur_get_block(cursor); + buf_block_modify_clock_inc(block); } /* 2. Find the next and the previous record. Note that the cursor is diff --git a/storage/innobase/page/page0page.cc b/storage/innobase/page/page0page.cc index 60f64c0a7e2..05241cb3e50 100644 --- a/storage/innobase/page/page0page.cc +++ b/storage/innobase/page/page0page.cc @@ -2174,7 +2174,7 @@ bool page_validate(const page_t *page, dict_index_t *index, trx_id_t max_trx_id = page_get_max_trx_id(page); /* This will be 0 during recv_apply_hashed_log_recs(), because the transaction system has not been initialized yet */ - trx_id_t sys_next_trx_id_or_no = trx_sys_get_next_trx_id_or_no(); + trx_id_t sys_next_trx_id_or_no = trx_sys->get_max_trx_id(); if (max_trx_id == 0 || (sys_next_trx_id_or_no != 0 && max_trx_id >= sys_next_trx_id_or_no)) { diff --git a/storage/innobase/read/read0read.cc b/storage/innobase/read/read0read.cc index a918b11b06f..b677c6f0807 100644 --- a/storage/innobase/read/read0read.cc +++ b/storage/innobase/read/read0read.cc @@ -36,6 +36,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "srv0srv.h" #include "trx0sys.h" +#include "trx0rec.h" +#include "trx0rseg.h" +#include "row0row.h" /* ------------------------------------------------------------------------------- @@ -182,6 +185,7 @@ will mark their views as closed but not actually free their views. /** Minimum number of elements to reserve in ReadView::ids_t */ static const ulint MIN_TRX_IDS = 32; +SCN_Mgr *scn_mgr = nullptr; #ifdef UNIV_DEBUG /** Functor to validate the view list. */ @@ -200,12 +204,10 @@ struct ViewCheck { /** Validates a read view list. */ -bool MVCC::validate() const { +bool MVCC::validate(uint64_t slot) const { ViewCheck check; - ut_ad(trx_sys_mutex_own()); - - ut_list_map(m_views, check); + ut_list_map(m_views[slot], check); return (true); } @@ -317,9 +319,14 @@ ReadView::ReadView() m_up_limit_id(), m_creator_trx_id(), m_ids(), - m_low_limit_no() { + m_low_limit_no(), + m_trx(nullptr), + m_version(), + m_slot(MAX_SNAPSHOT_SIZE), + m_shared() { ut_d(::memset(&m_view_list, 0x0, sizeof(m_view_list))); ut_d(m_view_low_limit_no = 0); + m_long_running_ids.clear(); } /** @@ -330,22 +337,120 @@ ReadView::~ReadView() { /** Constructor @param size Number of views to pre-allocate */ -MVCC::MVCC(ulint size) : m_free(), m_views() { - for (ulint i = 0; i < size; ++i) { - ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); +MVCC::MVCC(ulint size) { + + m_slot_index = 0; + + for (auto slot = 0; slot < MAX_SNAPSHOT_SIZE; slot++) { + UT_LIST_INIT(m_free[slot]); + UT_LIST_INIT(m_views[slot]); - UT_LIST_ADD_FIRST(m_free, view); + for (ulint i = 0; i < size; ++i) { + ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); + UT_LIST_ADD_FIRST(m_free[slot], view); + } + + mutex_create(LATCH_ID_TRX_SYS_MVCC, &(m_mutexs[slot])); } } MVCC::~MVCC() { - while (ReadView *view = UT_LIST_GET_FIRST(m_free)) { - UT_LIST_REMOVE(m_free, view); + for (auto slot = 0; slot < MAX_SNAPSHOT_SIZE; slot++) { + while (ReadView *view = UT_LIST_GET_FIRST(m_free[slot])) { + UT_LIST_REMOVE(m_free[slot], view); - ut::delete_(view); + ut::delete_(view); + } + + ut_a(UT_LIST_GET_LEN(m_views[slot]) == 0); + + mutex_destroy(&(m_mutexs[slot])); } +} - ut_a(UT_LIST_GET_LEN(m_views) == 0); +bool ReadView::changes_visible( + const dict_index_t *index, + const rec_t *rec, + const ulint *offsets) { + + ut_a(index->is_clustered()); + ut_a(scn_mgr != nullptr); + + if (index->table->is_temporary()) { + return true; + } + + /* Get transaction id from record */ + ulint offset = scn_mgr->scn_offset(index, offsets); + trx_id_t id = mach_read_from_6(rec + offset); + + /* If it's scn, direct compare*/ + ut_a(!SCN_Mgr::is_scn(id)); + + /* Trx itself */ + if (id == m_creator_trx_id) { + return true; + } + + if (id < m_up_limit_id) { + if (m_long_running_ids.empty() + || id < *(m_long_running_ids.begin()) + || id > *(m_long_running_ids.rbegin())) { + return true; + } + + if (m_long_running_ids.find(id) != m_long_running_ids.end()) { + /* slow transaction check if it's active */ + if (m_invisible_ids.find(id) != m_invisible_ids.end()) { + return false; + } + + if (trx_rw_is_active(id, false)) { + m_invisible_ids.insert(id); + return false; + } + } else { + return true; + } + } + + if (id >= m_low_limit_id) { + return false; + } + + if (m_invisible_ids.find(id) != m_invisible_ids.end()) { + /* Not visible to current view */ + return false; + } + + trx_id_t committing_version = 0; + /* Get SCN from undo log */ + trx_id_t scn = scn_mgr->get_scn(id, index, row_get_rec_roll_ptr(rec, index, offsets), &committing_version); + + if (!m_shared && committing_version != 0 && committing_version < m_version) { + /* Consider such scenario: + - active trx: get trx->no = 5 + - open read view: version = 7 + - before committing trx completely: not visible + - after committing trx: visible because it's deregistered + and scn is written to undo (5 < 7) + + Problem: consistent read is broken, so we must + record such kind of scn and id */ + ut_a(scn == TRX_ID_MAX); + m_invisible_ids.insert(id); + m_committing_scns.insert(committing_version); + + ut_a(committing_version >= m_low_limit_no); + } else if (scn == TRX_ID_MAX) { + /* Still active, add id to invisible set */ + m_invisible_ids.insert(id); + return false; + } + + ut_a(scn > 0); + + return (sees_version(scn)); } /** @@ -445,44 +550,58 @@ point in time are seen in the view. @param id Creator transaction id */ void ReadView::prepare(trx_id_t id) { - ut_ad(trx_sys_mutex_own()); m_creator_trx_id = id; - m_low_limit_no = trx_get_serialisation_min_trx_no(); + m_long_running_ids.clear(); - m_low_limit_id = trx_sys_get_next_trx_id_or_no(); - - ut_a(m_low_limit_no <= m_low_limit_id); - - if (!trx_sys->rw_trx_ids.empty()) { - copy_trx_ids(trx_sys->rw_trx_ids); + if (m_slot != MAX_SNAPSHOT_SIZE) { + //user session + m_low_limit_no = trx_get_serialisation_min_trx_no(); + scn_mgr->take_up_ids(m_up_limit_id, m_long_running_ids); } else { - m_ids.clear(); + m_low_limit_no = trx_sys_oldest_trx_no(); + m_up_limit_id = scn_mgr->min_active_id(); } - /* The first active transaction has the smallest id. */ - m_up_limit_id = !m_ids.empty() ? m_ids.front() : m_low_limit_id; + m_version = trx_sys->get_max_trx_scn(); + m_low_limit_id = trx_sys->get_max_trx_id(); - ut_a(m_up_limit_id <= m_low_limit_id); + if (m_low_limit_no > m_version) { + m_low_limit_no = m_version; + } + + m_committing_scns.clear(); + m_invisible_ids.clear(); + + if (m_shared || m_slot == MAX_SNAPSHOT_SIZE) { + /* This is a read view shared by multiple threads such as + select count(*), so we must prepare m_committing_scns and + m_invisible_ids before hand */ + for (ulint i = 1; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + shard.commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + trx_list_with_min.collect_commit_ids(m_invisible_ids, m_committing_scns, m_version); + }, UT_LOCATION_HERE); + } + } ut_d(m_view_low_limit_no = m_low_limit_no); m_closed = false; } - /** Find a free view from the active list, if none found then allocate a new view. @return a view to use */ -ReadView *MVCC::get_view() { - ut_ad(trx_sys_mutex_own()); +ReadView *MVCC::get_view(uint64_t slot) { ReadView *view; - if (UT_LIST_GET_LEN(m_free) > 0) { - view = UT_LIST_GET_FIRST(m_free); - UT_LIST_REMOVE(m_free, view); + if (UT_LIST_GET_LEN(m_free[slot]) > 0) { + view = UT_LIST_GET_FIRST(m_free[slot]); + UT_LIST_REMOVE(m_free[slot], view); } else { view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); @@ -498,7 +617,7 @@ ReadView *MVCC::get_view() { @param view View owned by this class created for the caller. Must be freed by calling view_close() @param trx Transaction instance of caller */ -void MVCC::view_open(ReadView *&view, trx_t *trx) { +void MVCC::view_open(ReadView *&view, trx_t *trx, bool is_shared) { ut_ad(!srv_read_only_mode); /** If no new RW transaction has been started since the last view @@ -521,7 +640,9 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { if (trx_is_autocommit_non_locking(trx) && view->empty()) { view->m_closed = false; - if (view->m_low_limit_id == trx_sys_get_next_trx_id_or_no()) { + if (view->m_low_limit_id == trx_sys->get_max_trx_id() && + view->m_version == trx_sys->get_max_trx_scn()) { + view->m_shared = is_shared; return; } else { view->m_closed = true; @@ -529,93 +650,94 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { } } - trx_sys_mutex_enter(); + uint64_t slot; + if (view != nullptr) { + slot = view->get_slot(); + } else { + slot = get_slot(); + } + + enter(slot); if (view != nullptr) { - UT_LIST_REMOVE(m_views, view); + UT_LIST_REMOVE(m_views[slot], view); } else { - view = get_view(); + view = get_view(slot); } if (view != nullptr) { + view->m_shared = is_shared; + view->prepare(trx->id); - UT_LIST_ADD_FIRST(m_views, view); + UT_LIST_ADD_FIRST(m_views[slot], view); ut_ad(!view->is_closed()); - ut_ad(validate()); + view->set_slot(slot); } - trx_sys_mutex_exit(); + exit(slot); } +void MVCC::enter(uint64_t slot) { + mutex_enter(&(m_mutexs[slot])); +} + +void MVCC::exit(uint64_t slot) { + mutex_exit(&(m_mutexs[slot])); +} + + /** Get the oldest (active) view in the system. @return oldest view if found or NULL */ +void MVCC::get_oldest_version(ReadView *purge_view) { + trx_id_t purge_version = std::min(purge_view->version(), purge_view->low_limit_no()); + trx_id_t purge_low_id = purge_view->low_limit_id(); + trx_id_t purge_up_limit_id = purge_view->up_limit_id(); + + for (auto i = 0; i < MAX_SNAPSHOT_SIZE; i++) { + enter(i); + for (ReadView* view = UT_LIST_GET_LAST(m_views[i]); view != nullptr; + view = UT_LIST_GET_PREV(m_view_list, view)) { + if (view->is_closed()) { + continue; + } -ReadView *MVCC::get_oldest_view() const { - ReadView *view; + trx_id_t min_no = std::min(view->low_limit_no(), view->version()); + if (min_no < purge_version) { + purge_version = min_no; + } - ut_ad(trx_sys_mutex_own()); + if (view->low_limit_id() < purge_low_id) { + purge_low_id = view->low_limit_id(); + } + + if (view->up_limit_id() < purge_up_limit_id) { + purge_up_limit_id = view->up_limit_id(); + } - for (view = UT_LIST_GET_LAST(m_views); view != nullptr; - view = UT_LIST_GET_PREV(m_view_list, view)) { - if (!view->is_closed()) { break; } - } - return (view); -} - -/** -Copy state from another view. Must call copy_complete() to finish. -@param other view to copy from */ - -void ReadView::copy_prepare(const ReadView &other) { - ut_ad(&other != this); - - if (!other.m_ids.empty()) { - const ids_t::value_type *p = other.m_ids.data(); - - m_ids.assign(p, p + other.m_ids.size()); - } else { - m_ids.clear(); + exit(i); } - m_up_limit_id = other.m_up_limit_id; - - m_low_limit_no = other.m_low_limit_no; - - ut_d(m_view_low_limit_no = other.m_view_low_limit_no); - - m_low_limit_id = other.m_low_limit_id; - - m_creator_trx_id = other.m_creator_trx_id; + purge_view->copy_prepare(purge_version, purge_low_id, purge_up_limit_id); } /** -Complete the copy, insert the creator transaction id into the -m_ids too and adjust the m_up_limit_id, if required */ - -void ReadView::copy_complete() { - ut_ad(!trx_sys_mutex_own()); - - if (m_creator_trx_id > 0) { - m_ids.insert(m_creator_trx_id); - } - - if (!m_ids.empty()) { - /* The last active transaction has the smallest id. */ - m_up_limit_id = std::min(m_ids.front(), m_up_limit_id); - } +@param other view to copy from */ - ut_ad(m_up_limit_id <= m_low_limit_id); +void ReadView::copy_prepare(trx_id_t version, trx_id_t low_id, trx_id_t up_id) { + ut_a(up_id <= low_id); - /* We added the creator transaction ID to the m_ids. */ - m_creator_trx_id = 0; + m_up_limit_id = up_id; + m_low_limit_no = version; + m_version = version; + m_low_limit_id = low_id; } /** Clones the oldest view and stores it in view. No need to @@ -625,22 +747,11 @@ m_free list. This function is called by Purge to determine whether it should purge the delete marked record or not. @param view Preallocated view, owned by the caller */ void MVCC::clone_oldest_view(ReadView *view) { - trx_sys_mutex_enter(); - - ReadView *oldest_view = get_oldest_view(); - - if (oldest_view == nullptr) { - view->prepare(0); - - trx_sys_mutex_exit(); - } else { - view->copy_prepare(*oldest_view); + view->prepare(0); - trx_sys_mutex_exit(); + get_oldest_version(view); - view->copy_complete(); - } /* Update view to block purging transaction till GTID is persisted. */ auto >id_persistor = clone_sys->get_gtid_persistor(); auto gtid_oldest_trxno = gtid_persistor.get_oldest_trx_no(); @@ -650,19 +761,19 @@ void MVCC::clone_oldest_view(ReadView *view) { /** @return the number of active views */ -ulint MVCC::size() const { - trx_sys_mutex_enter(); - +ulint MVCC::size() { ulint size = 0; - for (const ReadView *view : m_views) { - if (!view->is_closed()) { - ++size; + for (auto i = 0; i < MAX_SNAPSHOT_SIZE; i++) { + enter(i); + for (const ReadView *view : m_views[i]) { + if (!view->is_closed()) { + ++size; + } } + exit(i); } - trx_sys_mutex_exit(); - return (size); } @@ -688,14 +799,319 @@ void MVCC::view_close(ReadView *&view, bool own_mutex) { view = reinterpret_cast(p | 0x1); } else { view = reinterpret_cast(p & ~1); + auto slot = view->get_slot(); + enter(slot); view->close(); - UT_LIST_REMOVE(m_views, view); - UT_LIST_ADD_LAST(m_free, view); + UT_LIST_REMOVE(m_views[slot], view); + UT_LIST_ADD_LAST(m_free[slot], view); + + ut_ad(validate(slot)); - ut_ad(validate()); + exit(slot); view = nullptr; } } + +Scn_Map::Scn_Map() { + m_size = SCN_MAP_MAX_SIZE; + m_datas = new Trx_seq_with_lock[m_size]; + ut_a(m_datas != nullptr); +} + +Scn_Map::~Scn_Map() { + delete[] m_datas; +} + +SCN_Mgr::SCN_Mgr() { + m_scn_map = new Scn_Map(); + m_random_map = new Scn_Map(); + m_startup_id = 0; + m_startup_scn = 0; + m_abort = false; + m_view_active = false; + m_min_active_id = 0; + m_fast_min_active_id = 0; + m_long_running_ids.clear(); + m_has_slow_ids = false; + m_view_event = os_event_create(); + os_event_reset(m_view_event); + m_lock = static_cast( + ut::malloc_withkey(UT_NEW_THIS_FILE_PSI_KEY, sizeof(*m_lock))); + rw_lock_create(PFS_NOT_INSTRUMENTED, m_lock, LATCH_ID_SCN_MGR_LOCK); +} + +SCN_Mgr::~SCN_Mgr() { + delete m_scn_map; + delete m_random_map; + os_event_destroy(m_view_event); + rw_lock_free(m_lock); + ut::free(m_lock); +} + +/** Init scn/id on startup */ +void SCN_Mgr::init() { + m_startup_scn = trx_sys->get_max_trx_scn() - 2; + + trx_id_t current_min_id = trx_sys->get_max_trx_id(); + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + trx_id_t min_id = shard.active_rw_trxs.execute_no_latch( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return (trx_by_id_with_min.min_id());}); + + if (current_min_id > min_id && min_id != 0) { + current_min_id = min_id; + } + } + + m_startup_id = current_min_id; + + m_min_active_id = m_startup_id; + + m_fast_min_active_id = m_startup_id; + + purge_sys->min_up_id = m_startup_id; +} + +trx_id_t SCN_Mgr::get_scn_fast(trx_id_t id, trx_id_t *committing_version) { + trx_id_t scn; + if (id < purge_sys->min_up_id.load(std::memory_order_relaxed) || id == 0) { + /* Too old transaction, and it should be visible to all + sessions, so it's safe to give it a fake SCN */ + scn = m_startup_scn; + } else { + scn = m_scn_map->read(id); + if (scn == 0) { + scn = m_random_map->read(id); + } + + if (scn == 0) { + /* Check if transaction is still active */ + trx_t *trx = trx_rw_is_active(id, true); + if (trx != nullptr) { + /* transaction is still active */ + if (committing_version == nullptr) { + trx_release_reference(trx); + return TRX_ID_MAX; + } + + /* Get scn from trx if trx->no is set */ + mutex_enter(&trx->scn_mutex); + trx_id_t version = trx->no; + mutex_exit(&trx->scn_mutex); + + trx_release_reference(trx); + + if (version != TRX_ID_MAX) { + *committing_version = version; + } + + return TRX_ID_MAX; + } + } + } + + return scn; +} + +trx_id_t SCN_Mgr::get_scn(trx_id_t id, const dict_index_t *index, roll_ptr_t roll_ptr, trx_id_t *committing_version) { + if (index->table->is_temporary()) { + ib::info() << "temp for id " << id; + return TRX_ID_MAX; + } + + trx_id_t scn = get_scn_fast(id, committing_version); + + if (scn == TRX_ID_MAX) { + /* Transaction is still active */ + return TRX_ID_MAX; + } + + if (scn != 0) { + return scn; + } + + /* Slow path */ + scn = trx_undo_get_scn(index, roll_ptr, id); + if (scn > 0) { + m_random_map->store(id, scn); + } + + ut_a(scn < trx_sys->get_max_trx_scn()); + + if (scn == 0) { + return TRX_ID_MAX; + } + + return scn; +} + +ulint SCN_Mgr::scn_offset(const dict_index_t *index, const ulint *offsets) { + ulint offset = index->trx_id_offset; + + if (!offset) { + offset = row_get_trx_id_offset(index, offsets); + } + + return offset; +} + +void SCN_Mgr::take_up_ids(trx_id_t &up_id, trx_ids_set_t &slow_set) { + if (!m_has_slow_ids.load(std::memory_order_relaxed)) { + up_id = m_min_active_id.load(); + return; + } + + s_lock(); + if (m_has_slow_ids.load()) { + ut_ad(!m_long_running_ids.empty()); + slow_set = m_long_running_ids; + up_id = m_fast_min_active_id; + } + + s_unlock(); +} + +void SCN_Mgr::view_task() { + m_view_active = true; + + while (!m_abort.load()) { + uint64_t sig_counter = os_event_reset(m_view_event); + trx_id_t limit_id = trx_sys->get_max_trx_id(); + trx_id_t current_min_id = limit_id; + trx_id_t current_fast_min_id = limit_id; + + while (trx_sys->next_trx_id.load() + != trx_sys->next_trx_id_version.load()) { + ut_delay(1); + } + + trx_ids_set_t slow_set; + slow_set.clear(); + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + trx_id_t min_id = 0; + trx_id_t fast_min_id = 0; + trx_ids_set_t shard_slow_set; + shard_slow_set.clear(); + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.get_min_with_slow(min_id, fast_min_id, shard_slow_set, limit_id);}, UT_LOCATION_HERE); + + if (current_min_id > min_id && min_id != 0) { + current_min_id = min_id; + } + + if (current_fast_min_id > fast_min_id && fast_min_id != 0) { + current_fast_min_id = fast_min_id; + } + + for (auto id : shard_slow_set) { + slow_set.insert(id); + } + } + + /* Erase ids larger thna m_fast_min_active_id from set if it has */ + auto itr = slow_set.upper_bound(current_fast_min_id); + if (itr != slow_set.end()) { + slow_set.erase(itr, slow_set.end()); + } + + ib::info() << "current min id " << current_min_id << " fast min id " << current_fast_min_id; + if (!slow_set.empty()) { + fprintf(stderr, "slow ids: "); + for (auto id : slow_set) { + fprintf(stderr, "%lu ", id); + } + fprintf(stderr, "\n"); + } + + if (slow_set.empty()) { + m_has_slow_ids = false; + if (!m_long_running_ids.empty()) { + x_lock(); + m_long_running_ids.clear(); + x_unlock(); + } + } else { + bool is_set_changed = false; + if (m_long_running_ids.size() == slow_set.size()) { + /* This is the only thread that changes m_long_running_ids, + so it's safe to iterate without lock */ + for (auto id : m_long_running_ids) { + if (slow_set.find(id) == slow_set.end()) { + is_set_changed = true; + break; + } + } + } else { + is_set_changed = true; + } + + if (is_set_changed) { + x_lock(); + m_long_running_ids.swap(slow_set); + m_fast_min_active_id = current_fast_min_id; + x_unlock(); + m_has_slow_ids = true; + } else { + ut_ad(m_has_slow_ids.load()); + } + } + + m_min_active_id = current_min_id; + m_fast_min_active_id = current_fast_min_id; + ut_ad(m_fast_min_active_id.load() >= m_min_active_id.load()); + + trx_id_t current_min_no = trx_sys->get_max_trx_scn(); + for (ulint i = 1; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + trx_id_t min_no = shard.commit_rw_trxs.execute_no_latch( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + return (trx_list_with_min.min_id());}); + + if (current_min_no > min_no && min_no != 0) { + ut_a(SCN_Mgr::is_scn(min_no)); + current_min_no = min_no; + } + } + + if (trx_sys->serialisation_min_trx_no < current_min_no) { + trx_sys->serialisation_min_trx_no = current_min_no; + srv_purge_wakeup(); + } + + trx_sys->serialisation_min_trx_no = current_min_no; + + os_event_wait_time_low(m_view_event, std::chrono::seconds{1}, sig_counter); + } + + m_view_active = false; +} + +void run_view_task() { + scn_mgr->view_task(); +} + +void SCN_Mgr::start() { + m_abort = false; + std::thread th(run_view_task); + th.detach(); + while (!m_view_active.load()) { + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + +void SCN_Mgr::stop() { + m_abort = true; + while (m_view_active.load()) { + os_event_set(m_view_event); + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc index 5cf4ff3bf5c..9d8e2a4b13a 100644 --- a/storage/innobase/row/row0mysql.cc +++ b/storage/innobase/row/row0mysql.cc @@ -2955,6 +2955,7 @@ dberr_t row_create_index_for_mysql( goto error_handling; } + trx->add_scn_index(table->id, index->id); } else { dict_build_index_def(table, index, trx); #ifdef UNIV_DEBUG @@ -4570,7 +4571,7 @@ dberr_t row_scan_index_for_mysql(row_prebuilt_t *prebuilt, dict_index_t *index, /* No INSERT INTO ... SELECT and non-locking selects only. */ trx_start_if_not_started_xa(prebuilt->trx, false, UT_LOCATION_HERE); - trx_assign_read_view(prebuilt->trx); + trx_assign_read_view(prebuilt->trx, true); auto trx = prebuilt->trx; diff --git a/storage/innobase/row/row0pread-histogram.cc b/storage/innobase/row/row0pread-histogram.cc index c7f812ae7bc..c22b10809dc 100644 --- a/storage/innobase/row/row0pread-histogram.cc +++ b/storage/innobase/row/row0pread-histogram.cc @@ -337,7 +337,7 @@ dberr_t Histogram_sampler::process_non_leaf_rec( offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, UT_LOCATION_HERE, &heap); - if (ctx->is_rec_visible(rec, offsets, heap, &mtr)) { + if (ctx->is_rec_visible(page_cur_get_block(&cur), rec, offsets, heap, &mtr)) { err = sample_rec(ctx, rec, offsets, index, prebuilt); if (err != DB_SUCCESS) { diff --git a/storage/innobase/row/row0pread.cc b/storage/innobase/row/row0pread.cc index 1b5766614f6..7fff7c5dec6 100644 --- a/storage/innobase/row/row0pread.cc +++ b/storage/innobase/row/row0pread.cc @@ -463,11 +463,11 @@ dberr_t PCursor::move_to_next_block(dict_index_t *index) { return err; } -bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, +bool Parallel_reader::Scan_ctx::check_visibility(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { - const auto table_name = m_config.m_index->table->name; +// const auto table_name = m_config.m_index->table->name; ut_ad(!m_trx || m_trx->read_view == nullptr || MVCC::is_view_active(m_trx->read_view)); @@ -478,16 +478,8 @@ bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, auto view = m_trx->read_view; if (m_config.m_index->is_clustered()) { - trx_id_t rec_trx_id; - - if (m_config.m_index->trx_id_offset > 0) { - rec_trx_id = trx_read_trx_id(rec + m_config.m_index->trx_id_offset); - } else { - rec_trx_id = row_get_rec_trx_id(rec, m_config.m_index, offsets); - } - if (m_trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && - !view->changes_visible(rec_trx_id, table_name)) { + !view->changes_visible(m_config.m_index, rec, offsets)) { rec_t *old_vers; row_vers_build_for_consistent_read(rec, mtr, m_config.m_index, &offsets, @@ -742,7 +734,7 @@ dberr_t Parallel_reader::Ctx::traverse_recs(PCursor *pcursor, mtr_t *mtr) { bool skip{}; if (page_is_leaf(cur->block->frame)) { - skip = !m_scan_ctx->check_visibility(rec, offsets, heap, mtr); + skip = !m_scan_ctx->check_visibility(cur->block, rec, offsets, heap, mtr); } if (!skip) { diff --git a/storage/innobase/row/row0sel.cc b/storage/innobase/row/row0sel.cc index ec9b535cf1d..580ac8bf36c 100644 --- a/storage/innobase/row/row0sel.cc +++ b/storage/innobase/row/row0sel.cc @@ -883,7 +883,7 @@ static inline bool row_sel_test_other_conds( old_vers = nullptr; - if (!lock_clust_rec_cons_read_sees(clust_rec, index, offsets, + if (!lock_clust_rec_cons_read_sees(clust_rec, index, offsets, &plan->clust_pcur, node->read_view)) { err = row_sel_build_prev_vers(node->read_view, index, clust_rec, &offsets, @@ -1389,7 +1389,7 @@ static ulint row_sel_try_search_shortcut( UT_LOCATION_HERE, &heap); if (index->is_clustered()) { - if (!lock_clust_rec_cons_read_sees(rec, index, offsets, node->read_view)) { + if (!lock_clust_rec_cons_read_sees(rec, index, offsets, &(plan->pcur), node->read_view)) { ret = SEL_RETRY; goto func_exit; } @@ -1752,6 +1752,7 @@ skip_lock: if (index->is_clustered()) { if (!lock_clust_rec_cons_read_sees(rec, index, offsets, + &(plan->pcur), node->read_view)) { err = row_sel_build_prev_vers(node->read_view, index, rec, &offsets, &heap, &plan->old_vers_heap, &old_vers, @@ -3293,6 +3294,7 @@ non-clustered index. Does the necessary locking. if (trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && !lock_clust_rec_cons_read_sees(clust_rec, clust_index, *offsets, + prebuilt->clust_pcur, trx_get_read_view(trx))) { if (clust_rec != cached_clust_rec) { /* The following call returns 'offsets' associated with 'old_vers' */ @@ -3750,7 +3752,7 @@ static ulint row_sel_try_search_shortcut_for_mysql( *offsets = rec_get_offsets(rec, index, *offsets, ULINT_UNDEFINED, UT_LOCATION_HERE, heap); - if (!lock_clust_rec_cons_read_sees(rec, index, *offsets, + if (!lock_clust_rec_cons_read_sees(rec, index, *offsets, pcur, trx_get_read_view(trx))) { return (SEL_RETRY); } @@ -5323,7 +5325,7 @@ rec_loop: by skipping this lookup */ if (srv_force_recovery < 5 && - !lock_clust_rec_cons_read_sees(rec, index, offsets, + !lock_clust_rec_cons_read_sees(rec, index, offsets, pcur, trx_get_read_view(trx))) { rec_t *old_vers; /* The following call returns 'offsets' associated with 'old_vers' */ diff --git a/storage/innobase/row/row0vers.cc b/storage/innobase/row/row0vers.cc index 1573601d488..d1e21ec4558 100644 --- a/storage/innobase/row/row0vers.cc +++ b/storage/innobase/row/row0vers.cc @@ -587,7 +587,11 @@ bool row_vers_must_preserve_del_marked(trx_id_t trx_id, mtr_s_lock(&purge_sys->latch, mtr, UT_LOCATION_HERE); - return (!purge_sys->view.changes_visible(trx_id, name)); + if (!SCN_Mgr::is_scn(trx_id)) { + return true; + } + + return (!purge_sys->view.sees_version(trx_id)); } /** Check whether all non-virtual columns in a index entries match @@ -1254,7 +1258,6 @@ dberr_t row_vers_build_for_consistent_read( DBUG_TRACE; const rec_t *version; rec_t *prev_version; - trx_id_t trx_id; mem_heap_t *heap = nullptr; byte *buf; dberr_t err; @@ -1266,14 +1269,16 @@ dberr_t row_vers_build_for_consistent_read( ut_ad(rec_offs_validate(rec, index, *offsets)); - trx_id = row_get_rec_trx_id(rec, index, *offsets); +#ifdef UNIV_DEBUG + trx_id_t trx_id = row_get_rec_trx_id(rec, index, *offsets); +#endif /* Reset the collected LOB undo information. */ if (lob_undo != nullptr) { lob_undo->reset(); } - ut_ad(!view->changes_visible(trx_id, index->table->name)); + ut_ad(!SCN_Mgr::is_scn(trx_id) || !view->sees_version(trx_id)); ut_ad(!vrow || !(*vrow)); @@ -1315,9 +1320,7 @@ dberr_t row_vers_build_for_consistent_read( ut_a(!rec_offs_any_null_extern(index, prev_version, *offsets)); #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ - trx_id = row_get_rec_trx_id(prev_version, index, *offsets); - - if (view->changes_visible(trx_id, index->table->name)) { + if (view->changes_visible(index, prev_version, *offsets)) { /* The view already sees this version: we can copy it to in_heap and return */ diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index fb852a6fcd8..9ce6277208b 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -1747,10 +1747,8 @@ void srv_export_innodb_status(void) { rw_lock_s_unlock(&purge_sys->latch); - trx_sys_serialisation_mutex_enter(); /* Maximum transaction number added to history list for purge. */ trx_id_t max_trx_no = trx_sys->rw_max_trx_no; - trx_sys_serialisation_mutex_exit(); if (done_trx_no == 0 || max_trx_no < done_trx_no) { export_vars.innodb_purge_trx_id_age = 0; diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index 93f0a552a46..9fb75202a7a 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -1779,6 +1779,8 @@ dberr_t srv_start(bool create_new_db) { trx_sys_create(); lock_sys_create(srv_lock_table_size); + scn_mgr = new SCN_Mgr(); + /* Create i/o-handler threads: */ os_aio_start_threads(); @@ -2065,6 +2067,8 @@ dberr_t srv_start(bool create_new_db) { auto *dict_metadata = recv_recovery_from_checkpoint_finish(false); ut_a(dict_metadata != nullptr); + scn_mgr->start(); + /* We need to save the dynamic metadata collected from redo log to DD buffer table here. This is to make sure that the dynamic metadata is not lost by any future checkpoint. Since DD and data dictionary in memory @@ -2994,6 +2998,10 @@ void srv_shutdown() { buffer pool to disk. */ dict_persist_to_dd_table_buffer(); + if (scn_mgr != nullptr) { + scn_mgr->stop(); + } + /* The steps 1-4 is the real InnoDB shutdown. All before was to stop activity which could produce new changes. All after is just cleaning up (freeing memory). */ @@ -3058,6 +3066,11 @@ void srv_shutdown() { lock_sys_close(); trx_pool_close(); + if (scn_mgr != nullptr) { + delete scn_mgr; + scn_mgr = nullptr; + } + dict_close(); dict_persist_close(); undo_spaces_deinit(); diff --git a/storage/innobase/sync/sync0debug.cc b/storage/innobase/sync/sync0debug.cc index 77f1af2fe54..a13f174e91d 100644 --- a/storage/innobase/sync/sync0debug.cc +++ b/storage/innobase/sync/sync0debug.cc @@ -1363,6 +1363,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(TRX_UNDO, SYNC_TRX_UNDO, trx_undo_mutex_key); + LATCH_ADD_MUTEX(TRX_SCN, SYNC_NO_ORDER_CHECK, trx_scn_mutex_key); + LATCH_ADD_MUTEX(TRX_POOL, SYNC_POOL, trx_pool_mutex_key); LATCH_ADD_MUTEX(TRX_POOL_MANAGER, SYNC_POOL_MANAGER, @@ -1388,6 +1390,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(TRX_SYS_SERIALISATION, SYNC_TRX_SYS_SERIALISATION, trx_sys_serialisation_mutex_key); + LATCH_ADD_MUTEX(TRX_SYS_MVCC, SYNC_NO_ORDER_CHECK, trx_sys_mvcc_mutex_key); + LATCH_ADD_MUTEX(SRV_SYS, SYNC_THREADS, srv_sys_mutex_key); LATCH_ADD_MUTEX(SRV_SYS_TASKS, SYNC_ANY_LATCH, srv_threads_mutex_key); @@ -1430,6 +1434,9 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_RWLOCK(BUF_BLOCK_LOCK, SYNC_LEVEL_VARYING, PFS_NOT_INSTRUMENTED); #endif /* !PFS_SKIP_BUFFER_MUTEX_RWLOCK */ + LATCH_ADD_RWLOCK(CLONE_REPL_LOCK, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); + + LATCH_ADD_RWLOCK(SCN_MGR_LOCK, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); #ifdef UNIV_DEBUG LATCH_ADD_RWLOCK(BUF_BLOCK_DEBUG, SYNC_NO_ORDER_CHECK, buf_block_debug_latch_key); @@ -1494,6 +1501,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(DBLWR, SYNC_DBLWR, dblwr_mutex_key); + LATCH_ADD_MUTEX(CLONE_REPL_MUTEX, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); + LATCH_ADD_MUTEX(TEST_MUTEX, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); latch_id_t id = LATCH_ID_NONE; diff --git a/storage/innobase/sync/sync0sync.cc b/storage/innobase/sync/sync0sync.cc index f14e5b309e2..54e46763110 100644 --- a/storage/innobase/sync/sync0sync.cc +++ b/storage/innobase/sync/sync0sync.cc @@ -123,6 +123,7 @@ mysql_pfs_key_t srv_monitor_file_mutex_key; mysql_pfs_key_t sync_thread_mutex_key; #endif /* UNIV_DEBUG */ mysql_pfs_key_t trx_undo_mutex_key; +mysql_pfs_key_t trx_scn_mutex_key; mysql_pfs_key_t trx_mutex_key; mysql_pfs_key_t trx_pool_mutex_key; mysql_pfs_key_t trx_pool_manager_mutex_key; @@ -133,6 +134,7 @@ mysql_pfs_key_t lock_wait_mutex_key; mysql_pfs_key_t trx_sys_mutex_key; mysql_pfs_key_t trx_sys_shard_mutex_key; mysql_pfs_key_t trx_sys_serialisation_mutex_key; +mysql_pfs_key_t trx_sys_mvcc_mutex_key; mysql_pfs_key_t srv_sys_mutex_key; mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/storage/innobase/trx/trx0i_s.cc b/storage/innobase/trx/trx0i_s.cc index 8fde00c3d61..6bc15eeacd9 100644 --- a/storage/innobase/trx/trx0i_s.cc +++ b/storage/innobase/trx/trx0i_s.cc @@ -709,6 +709,62 @@ static void trx_i_s_cache_clear( ha_storage_empty(&cache->storage); } +bool fetch_trx_data_into_cache(trx_i_s_cache_t *cache, trx_t* trx, bool read_only) { + i_s_trx_row_t *trx_row; + + trx_mutex_enter(trx); + + /* Note: Read only transactions that modify temporary + tables have a transaction ID. + + Note: auto-commit non-locking read-only transactions + can have trx->state set from NOT_STARTED to ACTIVE and + then from ACTIVE to NOT_STARTED with neither trx_sys->mutex + nor trx->mutex acquired. However, as long as these transactions + are members of mysql_trx_list they are not freed. For such + transactions "trx_was_started(trx)" might be considered random, + but whatever is its result, the code below handles that well + (transaction won't release locks until its trx->mutex is acquired). + + Note: locking read-only transactions can have trx->state set from + NOT_STARTED to ACTIVE with neither trx_sys->mutex nor trx->mutex + acquired. However, such transactions need to be marked as COMMITTED + before trx->state is set to NOT_STARTED and that is protected by the + trx->mutex. Therefore the assertion assert_trx_nonlocking_or_in_list() + should hold few lines below (note: the name of the assertion is wrong, + because it actually checks if the transaction is autocommit nonlocking, + whereas its name suggests that it only checks if the trx is nonlocking). */ + if (!trx_was_started(trx) || + (read_only && trx->id != 0 && !trx->read_only)) { + trx_mutex_exit(trx); + return true; + } + + assert_trx_nonlocking_or_in_list(trx); + + trx_row = reinterpret_cast( + table_cache_create_empty_row(&cache->innodb_trx, cache)); + + /* memory could not be allocated */ + if (trx_row == nullptr) { + cache->is_truncated = true; + trx_mutex_exit(trx); + return false; + } + + if (!fill_trx_row(trx_row, trx, cache)) { + /* memory could not be allocated */ + --cache->innodb_trx.rows_used; + cache->is_truncated = true; + trx_mutex_exit(trx); + return false; + } + + trx_mutex_exit(trx); + + return true; +} + /** Fetches the data needed to fill the 3 INFORMATION SCHEMA tables into the table cache buffer. Cache must be locked for write. @param[in,out] cache the cache @@ -720,73 +776,38 @@ static void fetch_data_into_cache_low(trx_i_s_cache_t *cache, /* We are going to iterate over many different shards of lock_sys so we need exclusive access */ ut_ad(locksys::owns_exclusive_global_latch()); - constexpr bool rw_trx_list = - std::is_samerw_trx_list)>::value; - - static_assert( - rw_trx_list || - std::is_samemysql_trx_list)>::value, - "only rw_trx_list and mysql_trx_list are supported"); /* Iterate over the transaction list and add each one to innodb_trx's cache. We also add all locks that are relevant to each transaction into innodb_locks' and innodb_lock_waits' caches. */ - for (auto trx : *trx_list) { - i_s_trx_row_t *trx_row; - - trx_mutex_enter(trx); - - /* Note: Read only transactions that modify temporary - tables have a transaction ID. - - Note: auto-commit non-locking read-only transactions - can have trx->state set from NOT_STARTED to ACTIVE and - then from ACTIVE to NOT_STARTED with neither trx_sys->mutex - nor trx->mutex acquired. However, as long as these transactions - are members of mysql_trx_list they are not freed. For such - transactions "trx_was_started(trx)" might be considered random, - but whatever is its result, the code below handles that well - (transaction won't release locks until its trx->mutex is acquired). - - Note: locking read-only transactions can have trx->state set from - NOT_STARTED to ACTIVE with neither trx_sys->mutex nor trx->mutex - acquired. However, such transactions need to be marked as COMMITTED - before trx->state is set to NOT_STARTED and that is protected by the - trx->mutex. Therefore the assertion assert_trx_nonlocking_or_in_list() - should hold few lines below (note: the name of the assertion is wrong, - because it actually checks if the transaction is autocommit nonlocking, - whereas its name suggests that it only checks if the trx is nonlocking). */ - if (!trx_was_started(trx) || - (!rw_trx_list && trx->id != 0 && !trx->read_only)) { - trx_mutex_exit(trx); - continue; - } - - assert_trx_nonlocking_or_in_list(trx); - - ut_ad(trx->in_rw_trx_list == rw_trx_list); - - trx_row = reinterpret_cast( - table_cache_create_empty_row(&cache->innodb_trx, cache)); - - /* memory could not be allocated */ - if (trx_row == nullptr) { - cache->is_truncated = true; - trx_mutex_exit(trx); + if (!fetch_trx_data_into_cache(cache, trx, true)) { return; } + } +} - if (!fill_trx_row(trx_row, trx, cache)) { - /* memory could not be allocated */ - --cache->innodb_trx.rows_used; - cache->is_truncated = true; - trx_mutex_exit(trx); - return; +bool Trx_by_id_with_min::fetch_data(trx_i_s_cache_t *cache) { + for (auto item : m_by_id) { + trx_t *trx = item.second; + if (!fetch_trx_data_into_cache(cache, trx, false)) { + return false; } + } - trx_mutex_exit(trx); + return true; +} + +static void fetch_data_into_cache_from_rw_shard(trx_i_s_cache_t *cache) { + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + if (!shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.fetch_data(cache); + }, UT_LOCATION_HERE)) { + break; + } } } @@ -797,16 +818,17 @@ static void fetch_data_into_cache(trx_i_s_cache_t *cache) /*!< in/out: cache */ /* We are going to iterate over many different shards of lock_sys so we need exclusive access */ ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); trx_i_s_cache_clear(cache); /* Capture the state of the read-write transactions. This includes internal transactions too. They are not on mysql_trx_list */ - fetch_data_into_cache_low(cache, &trx_sys->rw_trx_list); + fetch_data_into_cache_from_rw_shard(cache); + trx_sys_mutex_enter(); /* Capture the state of the read-only active transactions */ fetch_data_into_cache_low(cache, &trx_sys->mysql_trx_list); + trx_sys_mutex_exit(); cache->is_truncated = false; } @@ -825,11 +847,7 @@ int trx_i_s_possibly_fetch_data_into_cache( /* We need to read trx_sys and record/table lock queues */ locksys::Global_exclusive_latch_guard guard{UT_LOCATION_HERE}; - trx_sys_mutex_enter(); - fetch_data_into_cache(cache); - - trx_sys_mutex_exit(); } return (0); diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index cbbac9eb59f..5a5c15fee14 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -176,7 +176,8 @@ const page_size_t TrxUndoRsegsIterator::set_next() { const page_size_t page_size(m_purge_sys->rseg->page_size); - ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no); + ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no || + purge_sys->iter.trx_no == purge_sys->rseg->last_trx_no + 2); m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no; m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset; @@ -263,6 +264,10 @@ void trx_purge_sys_initialize(uint32_t n_purge_threads, trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + purge_sys->version = purge_sys->view.version(); + + purge_sys->min_up_id = purge_sys->view.up_limit_id(); + purge_sys->rseg_iter = ut::new_withkey( UT_NEW_THIS_FILE_PSI_KEY, purge_sys); } @@ -320,6 +325,7 @@ void trx_purge_add_update_undo_to_history( bool update_rseg_history_len, /*!< in: if true: update rseg history len else skip updating it. */ + bool is_insert, /*!< in: true if it's insert undo */ ulint n_added_logs, /*!< in: number of logs added */ mtr_t *mtr) /*!< in: mtr */ { @@ -328,7 +334,14 @@ void trx_purge_add_update_undo_to_history( trx_rsegf_t *rseg_header; trx_ulogf_t *undo_header; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + + ut_a(undo != nullptr); + rseg = undo->rseg; rseg_header = trx_rsegf_get(undo->rseg->space_id, undo->rseg->page_no, @@ -1709,7 +1722,7 @@ static void trx_purge_rseg_get_next_history_log( ut_a(rseg->last_page_no != FIL_NULL); - purge_sys->iter.trx_no = rseg->last_trx_no + 1; + purge_sys->iter.trx_no = rseg->last_trx_no + 2; purge_sys->iter.undo_no = 0; purge_sys->iter.undo_rseg_space = SPACE_UNKNOWN; purge_sys->next_stored = false; @@ -1781,6 +1794,11 @@ static void trx_purge_rseg_get_next_history_log( rseg->latch(); + if (purge_sys->iter.trx_no == trx_no + 2) { + //FIXME: ASSERT the page type is TRX_UNDO_INSERT + purge_sys->iter.trx_no = trx_no; + } + rseg->last_page_no = prev_log_addr.page; rseg->last_offset = prev_log_addr.boffset; rseg->last_trx_no = trx_no; @@ -1813,6 +1831,7 @@ static void trx_purge_read_undo_rec(trx_purge_t *purge_sys, uint64_t undo_no; space_id_t undo_rseg_space; trx_id_t modifier_trx_id; + trx_id_t modifier_trx_no; purge_sys->hdr_offset = purge_sys->rseg->last_offset; page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no; @@ -1824,7 +1843,7 @@ static void trx_purge_read_undo_rec(trx_purge_t *purge_sys, mtr_start(&mtr); undo_rec = trx_undo_get_first_rec( - &modifier_trx_id, purge_sys->rseg->space_id, page_size, + &modifier_trx_id, &modifier_trx_no, purge_sys->rseg->space_id, page_size, purge_sys->hdr_page_no, purge_sys->hdr_offset, RW_S_LATCH, &mtr); if (undo_rec != nullptr) { @@ -1890,7 +1909,7 @@ static trx_undo_rec_t *trx_purge_get_next_rec( mtr_t mtr; ut_ad(purge_sys->next_stored); - ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no()); + ut_ad(purge_sys->iter.trx_no < purge_sys->version.load()); space = purge_sys->rseg->space_id; page_no = purge_sys->page_no; @@ -2210,7 +2229,7 @@ void Purge_groups_t::distribute_if_needed() { } } - if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) { + if (purge_sys->iter.trx_no >= purge_sys->version.load()) { return nullptr; } @@ -2405,6 +2424,10 @@ ulint trx_purge(ulint n_purge_threads, /*!< in: number of purge tasks trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + purge_sys->version = purge_sys->view.version(); + + purge_sys->min_up_id = purge_sys->view.up_limit_id(); + rw_lock_x_unlock(&purge_sys->latch); #ifdef UNIV_DEBUG diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 4f9c74a644a..d4447ffb8a3 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -150,12 +150,12 @@ static inline ulint trx_undo_left(const page_t *page, /*!< in: undo log page */ #ifdef UNIV_DEBUG ut_ad(ptr >= page); size_t diff = ptr - page; - size_t max_free = UNIV_PAGE_SIZE - 10 - FIL_PAGE_DATA_END; + size_t max_free = UNIV_PAGE_SIZE - 10 - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE; ut_ad(diff < UNIV_PAGE_SIZE); ut_ad(diff <= max_free); #endif /* UNIV_DEBUG */ - return (UNIV_PAGE_SIZE - (ptr - page) - 10 - FIL_PAGE_DATA_END); + return (UNIV_PAGE_SIZE - (ptr - page) - 10 - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE); } size_t trx_undo_max_free_space() { @@ -165,7 +165,7 @@ size_t trx_undo_max_free_space() { UNIV_PAGE_SIZE - 290. */ size_t free_space = UNIV_PAGE_SIZE - (TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE + - TRX_UNDO_LOG_XA_HDR_SIZE + FIL_PAGE_DATA_END + 10); + TRX_UNDO_LOG_XA_HDR_SIZE + FIL_PAGE_DATA_END + 10 + TRX_UNDO_PAGE_RESERVE_SIZE); /* Undo number, table id, undo log type and pointer to next. Also refer to the beginning of trx_undo_page_report_insert() */ @@ -549,6 +549,194 @@ static ulint trx_undo_page_report_insert( return (trx_undo_page_set_next_prev_and_add(undo_page, ptr, mtr)); } +static inline trx_id_t trx_purge_get_version() { + trx_id_t purge_version = purge_sys->version.load(); + ut_a(purge_version > 2); + ut_a(SCN_Mgr::is_scn(purge_version)); + + return (purge_version - 2); +} + +static inline bool trx_scn_sanity_check(trx_id_t trx_id, trx_id_t trx_scn) { + if (!SCN_Mgr::is_scn(trx_scn)) { + return false; + } + + if (trx_scn >= trx_sys->get_max_trx_scn()) { + return false; + } + + if (trx_scn < scn_mgr->startup_scn()) { + return false; + } + + return true; +} + +trx_id_t trx_undo_hdr_get_scn(trx_id_t trx_id, page_id_t &page_id, uint32_t offset, mtr_t *mtr, page_t *undo_page) { + + trx_id_t purge_version = trx_purge_get_version(); + + if (undo_page == nullptr) { + page_no_t space_size = fil_space_get_size(page_id.space()); + if (space_size <= page_id.page_no()) { + return purge_version; + } + + bool found; + const page_size_t &page_size = fil_space_get_page_size(page_id.space(), &found); + ut_ad(found); + + undo_page = trx_undo_page_get_s_latched(page_id, page_size, mtr); + + if (undo_page == nullptr || + mach_read_from_2(undo_page + FIL_PAGE_TYPE) != FIL_PAGE_UNDO_LOG) { + /* invalid page */ + return purge_version; + } + } + + /* Get undo header */ + trx_ulogf_t *undo_header = undo_page + offset; + trx_id_t modifier_trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + if (modifier_trx_id != trx_id) { + /* Possiblely purged */ + return purge_version; + } + + trx_id_t scn = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + if (!trx_scn_sanity_check(trx_id, scn)) { + /* Fail to pass sanity checking */ + return purge_version; + } + + return scn; +} + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id) { + trx_id_t purge_version = trx_purge_get_version(); + + if (id < purge_sys->min_up_id.load()) { + //always visible to all open views, give it + //a fake scn + return purge_version; + } + + /* Decode rollback pointer */ + bool is_insert; + ulint rseg_id; + space_id_t space_id; + page_no_t page_no; + ulint offset; + trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no, &offset); + space_id = trx_rseg_id_to_space_id(rseg_id, false); + + page_no_t space_size = fil_space_get_size(space_id); + /* out of range, it must be purged */ + if (page_no >= space_size) { + return purge_version; + } + + bool found; + const page_size_t &page_size = fil_space_get_page_size(space_id, &found); + ut_ad(found); + + /* Get the page */ + mtr_t mtr; + mtr_start(&mtr); + page_t *undo_page = trx_undo_page_get_s_latched(page_id_t(space_id, page_no), + page_size, &mtr); + + if (undo_page == nullptr || + mach_read_from_2(undo_page + FIL_PAGE_TYPE) != FIL_PAGE_UNDO_LOG) { + /* invalid page */ + mtr_commit(&mtr); + return purge_version; + } + + /* Get record */ + trx_undo_rec_t *undo_rec = (trx_undo_rec_t *)(undo_page + offset); + if (trx_undo_rec_get_table_id(undo_rec) != index->table->id) { + /* Wrong undo log, possiblely purged */ + mtr_commit(&mtr); + return purge_version; + } + + /** Check if it's in first page chain */ + trx_upagef_t *page_hdr = undo_page + TRX_UNDO_PAGE_HDR; + ulint undo_start_offset = mach_read_from_2(page_hdr + TRX_UNDO_PAGE_START); + page_no_t undo_hdr_no; + uint32_t undo_hdr_offset; + + if (undo_start_offset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE) { + /* invalid page */ + mtr_commit(&mtr); + return purge_version; + } else if (undo_start_offset == TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE) { + /* This page is not fist one in chain, read the page footer */ + ulint page_end_offset = UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE; + byte* page_end = undo_page + page_end_offset; + trx_id_t trx_id = mach_read_from_8(page_end); + undo_hdr_no = mach_read_from_4(page_end + 8); + undo_hdr_offset = mach_read_from_2(page_end + 8 + 4); + + if (trx_id != id + || undo_hdr_no == 0 || undo_hdr_no >= space_size + || undo_hdr_offset < (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE) || undo_hdr_offset >= page_end_offset) { + /* invalid page */ + mtr_commit(&mtr); + return purge_version; + } + } else { + /* The undo page is first one in chain, find out the offset of undo header */ + undo_hdr_no = page_no; + + /** validate undo start offset */ + trx_usegf_t *seg_hdr = undo_page + TRX_UNDO_SEG_HDR; + ulint last_log_offset = mach_read_from_2(seg_hdr + TRX_UNDO_LAST_LOG); + if (last_log_offset > UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_LOG_OLD_HDR_SIZE) { + /* Invalid offset */ + mtr_commit(&mtr); + return purge_version; + } + + /* Find out the right offset */ + while (last_log_offset > offset) { + ulint prev_log_offset = mach_read_from_2(undo_page + last_log_offset + TRX_UNDO_PREV_LOG); + if (prev_log_offset >= last_log_offset + || (last_log_offset - prev_log_offset < TRX_UNDO_LOG_OLD_HDR_SIZE)) { + /* Invalid offset */ + mtr_commit(&mtr); + return purge_version; + } + + last_log_offset = prev_log_offset; + } + + undo_hdr_offset = last_log_offset; + } + + page_id_t undo_hdr_id = {space_id, undo_hdr_no}; + trx_id_t scn; + + if (undo_hdr_no == page_no) { + /* No need to read and lock page again */ + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, undo_page); + } else { + mtr_commit(&mtr); + mtr_start(&mtr); + + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, nullptr); + } + + mtr_commit(&mtr); + + return scn; +} + /** Reads from an undo log record the general parameters. @return remaining part of undo log record after reading these values */ const byte *trx_undo_rec_get_pars( @@ -1153,6 +1341,7 @@ static byte *trx_undo_report_blob_update(page_t *undo_page, dict_index_t *index, succeed, 0 if fail */ static ulint trx_undo_page_report_modify( /*========================*/ + trx_undo_t *undo, /*!< in: undo log object */ page_t *undo_page, /*!< in: undo log page */ trx_t *trx, /*!< in: transaction */ dict_index_t *index, /*!< in: clustered index where update or @@ -1256,6 +1445,7 @@ static ulint trx_undo_page_report_modify( ut_ad(flen == DATA_TRX_ID_LEN); trx_id = trx_read_trx_id(field); +//ut_a(SCN_Mgr::is_scn(trx_id) || trx_id == trx->id || index->table->is_temporary()); /* If it is an update of a delete marked record, then we are allowed to ignore blob prefixes if the delete marking was done @@ -2085,7 +2275,7 @@ static bool trx_undo_erase_page_end( first_free = mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE); memset(undo_page + first_free, 0xff, - (UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) - first_free); + (UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE) - first_free); mlog_write_initial_log_record(undo_page, MLOG_UNDO_ERASE_END, mtr); return (first_free != TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE); @@ -2257,7 +2447,7 @@ dberr_t trx_undo_report_row_operation( default: ut_ad(op_type == TRX_UNDO_MODIFY_OP); offset = - trx_undo_page_report_modify(undo_page, trx, index, rec, offsets, + trx_undo_page_report_modify(undo, undo_page, trx, index, rec, offsets, update, cmpl_info, clust_entry, &mtr); } @@ -2391,8 +2581,11 @@ err_exit: mtr_start(&mtr); - undo_page = trx_undo_page_get_s_latched(page_id_t(space_id, page_no), - page_size, &mtr); + { + buf_block_t *block = buf_page_get(page_id_t(space_id, page_no), + page_size, RW_S_LATCH, UT_LOCATION_HERE, &mtr); + undo_page = buf_block_get_frame(block); + } undo_rec = trx_undo_rec_copy(undo_page, static_cast(offset), heap); @@ -2423,7 +2616,7 @@ err_exit: rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_history = purge_sys->view.changes_visible(trx_id, name); + missing_history = purge_sys->view.sees_version(trx_id); if (!missing_history) { *undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap, is_temp); } @@ -2479,6 +2672,10 @@ bool trx_undo_prev_version_build( rec_trx_id = row_get_rec_trx_id(rec, index, offsets); + /* We didn't write back scn to data or undo */ + ut_ad(!SCN_Mgr::is_scn(rec_trx_id)); + + rec_trx_id = scn_mgr->get_scn(rec_trx_id, index, roll_ptr); /* REDO rollback segments are used only for non-temporary objects. For temporary objects NON-REDO rollback segments are used. */ bool is_temp = index->table->is_temporary(); @@ -2539,6 +2736,13 @@ bool trx_undo_prev_version_build( type_cmpl); ut_a(ptr); + trx_id_t scn = scn_mgr->get_scn(trx_id, index, roll_ptr); + if (scn != TRX_ID_MAX) { + trx_id = scn; + } + + ut_a(SCN_Mgr::is_scn(trx_id) || scn == TRX_ID_MAX); + if (row_upd_changes_field_size_or_external(index, offsets, update)) { /* We should confirm the existence of disowned external data, if the previous version record is delete marked. If the trx_id @@ -2562,8 +2766,7 @@ bool trx_undo_prev_version_build( rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_extern = - purge_sys->view.changes_visible(trx_id, index->table->name); + missing_extern = purge_sys->view.sees_version(trx_id); rw_lock_s_unlock(&purge_sys->latch); diff --git a/storage/innobase/trx/trx0roll.cc b/storage/innobase/trx/trx0roll.cc index f6acd099445..aeed4391c7a 100644 --- a/storage/innobase/trx/trx0roll.cc +++ b/storage/innobase/trx/trx0roll.cc @@ -649,9 +649,6 @@ static bool trx_rollback_or_clean_resurrected( bool all) /*!< in: false=roll back dictionary transactions; true=roll back all non-PREPARED transactions */ { - ut_ad(trx_sys_mutex_own()); - ut_ad(trx->in_rw_trx_list); - /* Generally, an HA transaction with is_recovered && state==TRX_STATE_PREPARED can be committed or rolled back by a client who knows its XID at any time. To prove that no such state transition is possible while our thread operates, @@ -678,7 +675,6 @@ static bool trx_rollback_or_clean_resurrected( switch (state) { case TRX_STATE_COMMITTED_IN_MEMORY: - trx_sys_mutex_exit(); ib::info(ER_IB_MSG_1188) << "Cleaning up trx with id " << trx_get_id_for_print(trx); @@ -688,7 +684,6 @@ static bool trx_rollback_or_clean_resurrected( return true; case TRX_STATE_ACTIVE: if (all || trx->ddl_operation) { - trx_sys_mutex_exit(); trx_rollback_active(trx); trx_free_for_background(trx); ut_ad(!trx->is_recovered); @@ -705,6 +700,16 @@ static bool trx_rollback_or_clean_resurrected( ut_error; } +void Trx_by_id_with_min::collect_recovered_ids(std::vector &ids) { + for (auto item : m_by_id) { + const trx_t* trx = item.second; + ut_a(trx->id > 0); + if (trx->is_recovered) { + ids.push_back(trx->id); + } + } +} + /** Rollback or clean up any incomplete transactions which were encountered in crash recovery. If the transaction already was committed, then we clean up a possible insert undo log. If the @@ -716,7 +721,6 @@ void trx_rollback_or_clean_recovered( ut_ad(!srv_read_only_mode); ut_a(srv_force_recovery < SRV_FORCE_NO_TRX_UNDO); - ut_ad(!all || trx_sys_need_rollback()); if (all) { ib::info(ER_IB_MSG_1189) << "Starting in background the rollback" @@ -730,13 +734,19 @@ void trx_rollback_or_clean_recovered( /* Loop over the transaction list as long as there are recovered transactions to clean up or recover. */ + std::vector recovered_ids; + recovered_ids.clear(); + + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.collect_recovered_ids(recovered_ids); + }, + UT_LOCATION_HERE); + } - trx_sys_mutex_enter(); - for (bool need_one_more_scan = true; need_one_more_scan;) { - need_one_more_scan = false; - for (auto trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - + for (auto id : recovered_ids) { /* In case of slow shutdown, we have to wait for the background thread (trx_recovery_rollback) which is doing the rollbacks of recovered transactions. Note that it can add undo to purge. @@ -751,25 +761,19 @@ void trx_rollback_or_clean_recovered( state == SRV_SHUTDOWN_EXIT_THREADS; })); - trx_sys_mutex_exit(); - if (all) { ib::info(ER_IB_MSG_TRX_RECOVERY_ROLLBACK_NOT_COMPLETED); } return; } - /* If this function does a cleanup or rollback - then it will release the trx_sys->mutex, therefore - we need to reacquire it before retrying the loop. */ - if (trx_rollback_or_clean_resurrected(trx, all)) { - trx_sys_mutex_enter(); - need_one_more_scan = true; - break; - } - } + trx_t* trx = trx_rw_is_active(id, false); + if (trx == nullptr) continue; + + trx_rollback_or_clean_resurrected(trx, all); + + continue; } - trx_sys_mutex_exit(); if (all) { ib::info(ER_IB_MSG_TRX_RECOVERY_ROLLBACK_COMPLETED); diff --git a/storage/innobase/trx/trx0sys.cc b/storage/innobase/trx/trx0sys.cc index b021780266d..ab53c861970 100644 --- a/storage/innobase/trx/trx0sys.cc +++ b/storage/innobase/trx/trx0sys.cc @@ -68,7 +68,12 @@ void ReadView::check_trx_id_sanity(trx_id_t id, const table_name_t &name) { return; } - if (id >= trx_sys_get_next_trx_id_or_no()) { + if (SCN_Mgr::is_scn(id)) { + //FIXME + return; + } + + if (id >= trx_sys->get_max_trx_id()) { ib::warn(ER_IB_MSG_1196) << "A transaction id" << " in a record of table " << name << " is newer than the" @@ -112,26 +117,29 @@ void trx_sys_write_max_trx_id(void) { acquiring the x-lock and it will again read the newest max_trx_id, and possibly re-write it. */ - ut_ad(trx_sys_mutex_own() || trx_sys_serialisation_mutex_own()); + static trx_id_t recorded_max_trx_id = 0; if (!srv_read_only_mode) { DBUG_EXECUTE_IF( "trx_sys_write_max_trx_id__all_blocked", while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); }); -#ifdef UNIV_DEBUG - if (trx_sys_serialisation_mutex_own()) { - DEBUG_SYNC_C("trx_sys_write_max_trx_id__ser"); - } -#endif /* UNIV_DEBUG */ - mtr_start(&mtr); + trx_id_t max_trx_id = std::max(trx_sys->get_max_trx_id(), + trx_sys->get_max_trx_scn()); - sys_header = trx_sysf_get(&mtr); + if (max_trx_id < recorded_max_trx_id) { + mtr_commit(&mtr); + return; + } - const trx_id_t max_trx_id = trx_sys->next_trx_id_or_no.load(); + sys_header = trx_sysf_get(&mtr); - mlog_write_ull(sys_header + TRX_SYS_TRX_ID_STORE, max_trx_id, &mtr); + trx_id_t read_max_id = mach_read_from_8(sys_header + TRX_SYS_TRX_ID_STORE); + if (max_trx_id > read_max_id) { + mlog_write_ull(sys_header + TRX_SYS_TRX_ID_STORE, max_trx_id, &mtr); + recorded_max_trx_id = max_trx_id; + } mtr_commit(&mtr); } @@ -149,30 +157,65 @@ void trx_sys_persist_gtid_num(trx_id_t gtid_trx_no) { } trx_id_t trx_sys_oldest_trx_no() { - ut_ad(trx_sys_serialisation_mutex_own()); - /* Get the oldest transaction from serialisation list. */ - if (UT_LIST_GET_LEN(trx_sys->serialisation_list) > 0) { - auto trx = UT_LIST_GET_FIRST(trx_sys->serialisation_list); - return (trx->no); + trx_id_t current_min_no = trx_sys->get_max_trx_scn(); + for (ulint i = 1; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + trx_id_t min_no = shard.commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + return (trx_list_with_min.min_id());}, UT_LOCATION_HERE); + + if (current_min_no > min_no && min_no != 0) { + ut_a(SCN_Mgr::is_scn(min_no)); + current_min_no = min_no; + } + } + + return current_min_no; +} + +static uint64_t trx_sys_rw_trx_count() { + uint64_t total = 0; + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + total += shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.size(); + }, + UT_LOCATION_HERE); + } + + return total; +} + +bool trx_sys_need_rollback() { + uint64_t prepared_trx = trx_sys->n_prepared_trx.load(); + uint64_t n_trx = trx_sys_rw_trx_count(); + return (n_trx > prepared_trx); +} + +void Trx_by_id_with_min::collect_prepared_ids(std::vector &trx_ids) { + for (auto item : m_by_id) { + const trx_t* trx = item.second; + if (trx_state_eq(trx, TRX_STATE_PREPARED) && trx_is_mysql_xa(trx)) { + trx_ids.push_back(trx->id); + } } - return trx_sys_get_next_trx_id_or_no(); } void trx_sys_get_binlog_prepared(std::vector &trx_ids) { - trx_sys_mutex_enter(); - /* Exit fast if no prepared transaction. */ if (trx_sys->n_prepared_trx == 0) { - trx_sys_mutex_exit(); return; } + /* Check and find binary log prepared transaction. */ - for (auto trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - if (trx_state_eq(trx, TRX_STATE_PREPARED) && trx_is_mysql_xa(trx)) { - trx_ids.push_back(trx->id); - } + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.collect_prepared_ids(trx_ids); + }, + UT_LOCATION_HERE); } - trx_sys_mutex_exit(); } /** Read binary log positions from buffer passed. @@ -495,18 +538,24 @@ purge_pq_t *trx_sys_init_at_db_start(void) { - and one that has acquired the trx_sys_serialisation_mutex. If you decreased the factor 2, the test innodb.max_trx_id should fail. */ - trx_sys->next_trx_id_or_no.store(max_trx_id + + trx_sys->next_trx_id.store(max_trx_id + 2 * trx_sys_get_trx_id_write_margin()); - trx_sys->serialisation_min_trx_no.store(trx_sys->next_trx_id_or_no.load()); - mtr.commit(); + if (trx_sys->next_trx_id % 2 != 0) { + trx_sys->next_trx_id++; + } + trx_sys->next_trx_scn = trx_sys->next_trx_id.load() + 1; + + trx_sys->next_trx_id_version = trx_sys->next_trx_id.load(); + + trx_sys->serialisation_min_trx_no.store(trx_sys->next_trx_scn.load()); #ifdef UNIV_DEBUG /* max_trx_id is the next transaction ID to assign. Initialize maximum transaction number to one less if all transactions are already purged. */ if (trx_sys->rw_max_trx_no == 0) { - trx_sys->rw_max_trx_no = trx_sys_get_next_trx_id_or_no() - 1; + trx_sys->rw_max_trx_no = trx_sys->get_max_trx_scn() - 2; } #endif /* UNIV_DEBUG */ @@ -518,41 +567,38 @@ purge_pq_t *trx_sys_init_at_db_start(void) { trx_lists_init_at_db_start(); + scn_mgr->init(); + /* This mutex is not strictly required, it is here only to satisfy the debug code (assertions). We are still running in single threaded bootstrap mode. */ + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + rows_to_undo += shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.recovered_rows(); + }, + UT_LOCATION_HERE); + } - trx_sys_mutex_enter(); - - if (UT_LIST_GET_LEN(trx_sys->rw_trx_list) > 0) { - for (auto trx : trx_sys->rw_trx_list) { - ut_ad(trx->is_recovered); - assert_trx_in_rw_list(trx); - - if (trx_state_eq(trx, TRX_STATE_ACTIVE)) { - rows_to_undo += trx->undo_no; - } - } - + if (rows_to_undo > 0) { if (rows_to_undo > 1000000000) { unit = "M"; rows_to_undo = rows_to_undo / 1000000; } ib::info(ER_IB_MSG_1198) - << UT_LIST_GET_LEN(trx_sys->rw_trx_list) + << trx_sys_rw_trx_count() << " transaction(s) which must be rolled back or" " cleaned up in total " << rows_to_undo << unit << " row operations to undo"; ib::info(ER_IB_MSG_1199) - << "Trx id counter is " << trx_sys_get_next_trx_id_or_no(); + << "Trx id counter is " << trx_sys->get_max_trx_id(); } trx_sys->found_prepared_trx = trx_sys->n_prepared_trx > 0; - trx_sys_mutex_exit(); - return (purge_queue); } @@ -564,10 +610,7 @@ void trx_sys_create(void) { ut::zalloc_withkey(UT_NEW_THIS_FILE_PSI_KEY, sizeof(*trx_sys))); mutex_create(LATCH_ID_TRX_SYS, &trx_sys->mutex); - mutex_create(LATCH_ID_TRX_SYS_SERIALISATION, &trx_sys->serialisation_mutex); - UT_LIST_INIT(trx_sys->serialisation_list); - UT_LIST_INIT(trx_sys->rw_trx_list); UT_LIST_INIT(trx_sys->mysql_trx_list); trx_sys->mvcc = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY, 1024); @@ -576,11 +619,12 @@ void trx_sys_create(void) { ut_d(trx_sys->rw_max_trx_no = 0); - new (&trx_sys->rw_trx_ids) - trx_ids_t(ut::allocator(mem_key_trx_sys_t_rw_trx_ids)); - for (auto &shard : trx_sys->shards) { new (&shard) Trx_shard{}; + shard.commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + trx_list_with_min.init_list(); + }, UT_LOCATION_HERE); } new (&trx_sys->rsegs) Rsegs(); @@ -601,6 +645,12 @@ void trx_sys_create_sys_pages(void) { mtr_commit(&mtr); } +void Trx_by_id_with_min::free_prepared() { + while (!m_by_id.empty()) { + trx_free_prepared_or_active_recovered(m_by_id.begin()->second); + } +} + /********************************************************************* Shutdown/Close the transaction system. */ void trx_sys_close(void) { @@ -629,8 +679,13 @@ void trx_sys_close(void) { shutdown). Free all of them. */ ut_d(trx_sys_after_background_threads_shutdown_validate()); - while (auto trx = UT_LIST_GET_FIRST(trx_sys->rw_trx_list)) { - trx_free_prepared_or_active_recovered(trx); + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + /* On shutdown there's no concurrent session, so it's + ok to not latch */ + shard.active_rw_trxs.execute_no_latch( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.free_prepared();}); } /* There can't be any active transactions. */ @@ -640,20 +695,16 @@ void trx_sys_close(void) { ut::delete_(trx_sys->mvcc); - ut_a(UT_LIST_GET_LEN(trx_sys->rw_trx_list) == 0); + ut_a(trx_sys_rw_trx_count() == 0); ut_a(UT_LIST_GET_LEN(trx_sys->mysql_trx_list) == 0); - ut_a(UT_LIST_GET_LEN(trx_sys->serialisation_list) == 0); for (auto &shard : trx_sys->shards) { shard.~Trx_shard(); } /* We used placement new to create this mutex. Call the destructor. */ - mutex_free(&trx_sys->serialisation_mutex); mutex_free(&trx_sys->mutex); - trx_sys->rw_trx_ids.~trx_ids_t(); - ut::free(trx_sys); trx_sys = nullptr; @@ -709,7 +760,7 @@ void trx_sys_after_pre_dd_shutdown_validate() { } trx_sys_mutex_enter(); - ut_a(UT_LIST_GET_LEN(trx_sys->rw_trx_list) == + ut_a(trx_sys_rw_trx_count() == trx_sys->n_prepared_trx + active_recovered_trxs); trx_sys_mutex_exit(); } @@ -720,37 +771,31 @@ void trx_sys_after_background_threads_shutdown_validate() { ut_a(UT_LIST_GET_LEN(trx_sys->mysql_trx_list) == 0); } -size_t trx_sys_recovered_active_trxs_count() { +size_t Trx_by_id_with_min::recovered_active_count() { size_t total_trx = 0; - trx_sys_mutex_enter(); - /* Recovered transactions are never citizens of mysql_trx_list, - so it's enough to check rw_trx_list. */ - for (auto trx : trx_sys->rw_trx_list) { + for (auto item : m_by_id) { + const trx_t* trx = item.second; if (trx_state_eq(trx, TRX_STATE_ACTIVE) && trx->is_recovered) { total_trx++; } } - trx_sys_mutex_exit(); - return (total_trx); -} -#ifdef UNIV_DEBUG -/** Validate the trx_sys_t::rw_trx_list. - @return true if the list is valid. */ -bool trx_sys_validate_trx_list() { - ut_ad(trx_sys_mutex_own()); - - const trx_t *prev_trx = nullptr; + return total_trx; +} - for (auto trx : trx_sys->rw_trx_list) { - check_trx_state(trx); - ut_a(prev_trx == nullptr || prev_trx->id > trx->id); - prev_trx = trx; +size_t trx_sys_recovered_active_trxs_count() { + size_t total_trx = 0; + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + total_trx += shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.recovered_active_count(); + }, + UT_LOCATION_HERE); } - return (true); + return (total_trx); } -#endif /* UNIV_DEBUG */ #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 6fd4f54d2bb..92f7bbc3ada 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -242,7 +242,11 @@ static void trx_init(trx_t *trx) { trx->flush_observer = nullptr; + trx->scn_indexs.clear(); + ++trx->version; + + trx->in_long_set = false; } trx_guid_t::trx_guid_t(const trx_t &trx) @@ -263,6 +267,8 @@ struct TrxFactory { the constructors of the trx_t members. */ new (trx) trx_t(); + new (&trx->scn_indexs) SCNIndexIds(); + trx_init(trx); trx->state.store(TRX_STATE_NOT_STARTED, std::memory_order_relaxed); @@ -282,6 +288,7 @@ struct TrxFactory { mutex_create(LATCH_ID_TRX, &trx->mutex); mutex_create(LATCH_ID_TRX_UNDO, &trx->undo_mutex); + mutex_create(LATCH_ID_TRX_SCN, &trx->scn_mutex); lock_trx_alloc_locks(trx); } @@ -290,7 +297,6 @@ struct TrxFactory { @param trx the transaction for which to release resources */ static void destroy(trx_t *trx) { ut_a(trx->magic_n == TRX_MAGIC_N); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx->in_mysql_trx_list); ut_a(trx->lock.wait_lock == nullptr); @@ -313,6 +319,7 @@ struct TrxFactory { mutex_free(&trx->mutex); mutex_free(&trx->undo_mutex); + mutex_free(&trx->scn_mutex); trx->mod_tables.~trx_mod_tables_t(); @@ -335,6 +342,8 @@ struct TrxFactory { trx->lock.rec_pool.~lock_pool_t(); trx->lock.table_pool.~lock_pool_t(); + + trx->scn_indexs.~SCNIndexIds(); } /** Enforce any invariants here, this is called before the transaction @@ -353,7 +362,6 @@ struct TrxFactory { ut_ad(trx->mysql_thd == nullptr); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx->in_mysql_trx_list); ut_a(trx->lock.wait_thr == nullptr); @@ -506,6 +514,8 @@ static void trx_free(trx_t *&trx) { trx->mod_tables.clear(); + trx->scn_indexs.clear(); + ut_ad(trx->read_view == nullptr); ut_ad(trx->is_dd_trx == false); ut_ad(trx->in_depth == 0); @@ -617,12 +627,10 @@ void trx_free_prepared_or_active_recovered(trx_t *trx) { XA ROLLBACK. Usually the field is cleared during rollback or commit, here we have to do it ourselves as we neither rollback nor commit, just "free" it. */ ut_ad(!trx->will_lock || trx_state_eq(trx, TRX_STATE_PREPARED)); - assert_trx_in_rw_list(trx); trx_release_impl_and_expl_locks(trx, false); trx_undo_free_trx_with_prepared_or_active_logs(trx, was_prepared); - ut_ad(!trx->in_rw_trx_list); ut_a(!trx->read_only); trx->state.store(TRX_STATE_NOT_STARTED, std::memory_order_relaxed); @@ -652,8 +660,6 @@ inline void trx_disconnect_from_mysql(trx_t *trx, bool prepared) { trx_sys->mvcc->view_close(trx->read_view, true); } - ut_ad(trx_sys_validate_trx_list()); - if (prepared) { ut_ad(trx_state_eq(trx, TRX_STATE_PREPARED)); @@ -876,14 +882,11 @@ static trx_t *trx_resurrect_insert( std::memory_order_relaxed); } - /* We give a dummy value for the trx no; this should have no - relevance since purge is not interested in committed - transaction numbers, unless they are in the history - list, in which case it looks the number from the disk based - undo log structure */ - - trx->no = trx->id; - + if (undo->trx_no > 0) { + trx->no = undo->trx_no; + } else { + trx->no = TRX_ID_MAX; + } } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); @@ -981,10 +984,11 @@ static void trx_resurrect_update( if (undo->state != TRX_UNDO_ACTIVE) { trx_resurrect_update_in_prepared_state(trx, undo); - /* We give a dummy value for the trx number */ - - trx->no = trx->id; - + if (undo->trx_no > 0) { + trx->no = undo->trx_no; + } else { + trx->no = TRX_ID_MAX; + } } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); @@ -1055,28 +1059,6 @@ static void trx_resurrect(trx_rseg_t *rseg) { } } -/** Adds the transaction to trx_sys->rw_trx_list -Requires trx_sys->mutex, unless called in the single threaded startup code. -@param[in] trx The transaction assumed to not be in the rw_trx_list yet -*/ -static inline void trx_add_to_rw_trx_list(trx_t *trx) { - ut_ad(srv_is_being_started || trx_sys_mutex_own()); - ut_ad(!trx->in_rw_trx_list); - UT_LIST_ADD_FIRST(trx_sys->rw_trx_list, trx); - ut_d(trx->in_rw_trx_list = true); -} - -/** Removes the transaction from trx_sys->rw_trx_list. -Requires trx_sys->mutex, unless called in the single threaded startup code. -@param[in] trx The transaction assumed to be in the rw_trx_list -*/ -static inline void trx_remove_from_rw_trx_list(trx_t *trx) { - ut_ad(srv_is_being_started || trx_sys_mutex_own()); - ut_ad(trx->in_rw_trx_list); - UT_LIST_REMOVE(trx_sys->rw_trx_list, trx); - ut_d(trx->in_rw_trx_list = false); -} - /** Creates trx objects for transactions and initializes the trx list of trx_sys at database start. Rollback segments and undo log lists must already exist when this function is called, because the lists of @@ -1113,16 +1095,6 @@ void trx_lists_init_at_db_start(void) { }, UT_LOCATION_HERE); } - std::sort(trxs.begin(), trxs.end(), - [&](trx_t *a, trx_t *b) { return a->id < b->id; }); - - for (trx_t *trx : trxs) { - if (trx->state.load(std::memory_order_relaxed) == TRX_STATE_ACTIVE || - trx->state.load(std::memory_order_relaxed) == TRX_STATE_PREPARED) { - trx_sys->rw_trx_ids.push_back(trx->id); - } - trx_add_to_rw_trx_list(trx); - } } /** Get next redo rollback segment in round-robin fashion. @@ -1285,15 +1257,11 @@ void trx_assign_rseg_temp(trx_t *trx) { srv_read_only_mode ? nullptr : get_next_temp_rseg(); if (trx->id == 0) { - trx_sys_mutex_enter(); - trx->id = trx_sys_allocate_trx_id(); - trx_sys->rw_trx_ids.push_back(trx->id); - - trx_sys_mutex_exit(); - trx_sys_rw_trx_add(trx); + + trx_sys->next_trx_id_version.fetch_add(2); } } @@ -1382,8 +1350,6 @@ static void trx_start_low( change must be protected by the trx_sys->mutex, so that lock_print_info_all_transactions() will have a consistent view. */ - ut_ad(!trx->in_rw_trx_list); - /* We tend to over assert and that complicates the code somewhat. e.g., the transaction state can be set earlier but we are forced to set it under the protection of the trx_sys_t::mutex because some @@ -1403,25 +1369,17 @@ static void trx_start_low( updates a temporary table */ DEBUG_SYNC_C("trx_sys_before_assign_id"); - trx_sys_mutex_enter(); - trx->id = trx_sys_allocate_trx_id(); - trx_sys->rw_trx_ids.push_back(trx->id); - ut_ad(trx->rsegs.m_redo.rseg != nullptr || srv_read_only_mode || srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO); - trx_add_to_rw_trx_list(trx); - trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); - ut_ad(trx_sys_validate_trx_list()); - - trx_sys_mutex_exit(); - trx_sys_rw_trx_add(trx); + trx_sys->next_trx_id_version.fetch_add(2); + } else { trx->id = 0; @@ -1431,20 +1389,16 @@ static void trx_start_low( to write to the temporary table. */ if (read_write) { - trx_sys_mutex_enter(); - ut_ad(!srv_read_only_mode); trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); trx->id = trx_sys_allocate_trx_id(); - trx_sys->rw_trx_ids.push_back(trx->id); - - trx_sys_mutex_exit(); - trx_sys_rw_trx_add(trx); + trx_sys->next_trx_id_version.fetch_add(2); + } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); } @@ -1459,51 +1413,73 @@ static void trx_start_low( MONITOR_INC(MONITOR_TRX_ACTIVE); } +void Trx_commit_serialisation_list::add_list(trx_t &trx) { + ut_a(trx.id > 0); + + mutex_enter(&(trx.scn_mutex)); + trx.no = trx_sys_allocate_trx_no(); + mutex_exit(&(trx.scn_mutex)); + + //add to list + UT_LIST_ADD_LAST(serialisation_list, &trx); + if (UT_LIST_GET_LEN(serialisation_list) == 1) { + m_min_id = trx.no; + } +} + +void Trx_commit_serialisation_list::remove_list(trx_t &trx) { + ut_a(trx.id > 0); + ut_a(trx.no != TRX_ID_MAX); + + UT_LIST_REMOVE(serialisation_list, &trx); + if (UT_LIST_GET_LEN(serialisation_list) > 0) { + m_min_id = UT_LIST_GET_FIRST(serialisation_list)->no; + } else { + m_min_id = 0; + } +} + +void Trx_commit_serialisation_list::collect_commit_ids( + trx_ids_set_t &id_set, trx_ids_set_t &no_set, trx_id_t limit) { + for (auto trx = UT_LIST_GET_FIRST(serialisation_list); + trx != nullptr; trx = UT_LIST_GET_NEXT(no_list, trx)) { + ut_a(trx->no != 0); + if (trx->no >= limit) { + break; + } + + if (trx->id == 0) continue; + + id_set.insert(trx->id); + no_set.insert(trx->no); + } +} + /** Assigns the trx->no and add the transaction to the serialisation_list. Skips adding to the serialisation_list if the transaction is read-only, in which case still the trx->no is assigned. @param[in,out] trx the modified transaction @return true if added to the serialisation_list (non read-only trx) */ static inline bool trx_add_to_serialisation_list(trx_t *trx) { - trx_sys_serialisation_mutex_enter(); - - trx->no = trx_sys_allocate_trx_no(); - - /* Update the latest transaction number. */ - ut_d(trx_sys->rw_max_trx_no = trx->no); - if (trx->read_only) { - trx_sys_serialisation_mutex_exit(); + trx->no = trx_sys_allocate_trx_no(); return false; } - UT_LIST_ADD_LAST(trx_sys->serialisation_list, trx); + /* Update the latest transaction number. */ + //FIXME + ut_d(trx_sys->rw_max_trx_no = trx->no); - if (UT_LIST_GET_LEN(trx_sys->serialisation_list) == 1) { - trx_sys->serialisation_min_trx_no.store(trx->no); - } + const auto trx_shard_no = trx_get_shard_no(trx->id + 1); + trx_sys->shards[trx_shard_no].commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + trx_list_with_min.add_list(*trx); + }, + UT_LOCATION_HERE); - trx_sys_serialisation_mutex_exit(); return true; } -/** Erases transaction from the serialisation_list. Caller must have -acquired trx_sys->serialisation_mutex prior to calling this function. -@param[in,out] trx the transaction to erase */ -static inline void trx_erase_from_serialisation_list_low(trx_t *trx) { - ut_ad(trx_sys_serialisation_mutex_own()); - - UT_LIST_REMOVE(trx_sys->serialisation_list, trx); - - if (UT_LIST_GET_LEN(trx_sys->serialisation_list) > 0) { - trx_sys->serialisation_min_trx_no.store( - UT_LIST_GET_FIRST(trx_sys->serialisation_list)->no); - - } else { - trx_sys->serialisation_min_trx_no.store(trx_sys_get_next_trx_id_or_no()); - } -} - /** Set the transaction serialisation number. @return true if the transaction number was added to the serialisation_list. */ static bool trx_serialisation_number_get( @@ -1597,20 +1573,15 @@ static bool trx_write_serialisation_history( temp_mtr.set_log_mode(MTR_LOG_NO_REDO); } - /* If transaction involves insert then truncate undo logs. */ - if (trx->rsegs.m_redo.insert_undo != nullptr) { - trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); - } - if (trx->rsegs.m_noredo.insert_undo != nullptr) { - trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr); + trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr, true); } bool serialised = false; /* If transaction involves update then add rollback segments to purge queue. */ - if (trx->rsegs.m_redo.update_undo != nullptr || + if (trx_is_redo_rseg_updated(trx) || trx->rsegs.m_noredo.update_undo != nullptr) { /* Assign the transaction serialisation number and add these rollback segments to purge trx-no sorted priority queue @@ -1618,7 +1589,7 @@ static bool trx_write_serialisation_history( rollback segments. */ trx_undo_ptr_t *redo_rseg_undo_ptr = - trx->rsegs.m_redo.update_undo != nullptr ? &trx->rsegs.m_redo : nullptr; + trx_is_redo_rseg_updated(trx) ? &trx->rsegs.m_redo : nullptr; trx_undo_ptr_t *temp_rseg_undo_ptr = trx->rsegs.m_noredo.update_undo != nullptr ? &trx->rsegs.m_noredo @@ -1628,11 +1599,13 @@ static bool trx_write_serialisation_history( serialised = trx_serialisation_number_get(trx, redo_rseg_undo_ptr, temp_rseg_undo_ptr); + ulint added_log = 0; /* It is not necessary to obtain trx->undo_mutex here because only a single OS thread is allowed to do the transaction commit for this transaction. */ if (trx->rsegs.m_redo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish(trx->rsegs.m_redo.update_undo, mtr); @@ -1641,28 +1614,40 @@ static bool trx_write_serialisation_history( non-redo update_undo too. This is to avoid immediate invocation of purge as we need to club these 2 segments with same trx-no as single unit. */ - bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr || trx->rsegs.m_redo.insert_undo != nullptr); /* Set flag if GTID information need to persist. */ auto undo_ptr = &trx->rsegs.m_redo; trx_undo_gtid_set(trx, undo_ptr->update_undo, false); - trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, - (update_rseg_len ? 1 : 0), mtr); + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, false, + (update_rseg_len ? added_log : 0), mtr); } DBUG_EXECUTE_IF("ib_trx_crash_during_commit", DBUG_SUICIDE();); + if (trx->rsegs.m_redo.insert_undo != nullptr) { + page_t *undo_hdr_page; + added_log++; + + undo_hdr_page = + trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); + + bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + auto undo_ptr = &trx->rsegs.m_redo; + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, true, + (update_rseg_len ? added_log : 0), mtr); + } + if (trx->rsegs.m_noredo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish( trx->rsegs.m_noredo.update_undo, &temp_mtr); - ulint n_added_logs = (redo_rseg_undo_ptr != nullptr) ? 2 : 1; - - trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, - n_added_logs, &temp_mtr); + trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, false, + added_log, &temp_mtr); } } @@ -1822,33 +1807,34 @@ static void trx_update_mod_tables_timestamp(trx_t *trx) /*!< in: transaction */ trx->mod_tables.clear(); } -/** -Erase the transaction from running transaction lists and serialization -list. Active RW transaction list of a MVCC snapshot(ReadView::prepare) -won't include this transaction after this call. All implicit locks are -also released by this call as trx is removed from rw_trx_list. -@param[in] trx Transaction to erase, must have an ID > 0 */ -static void trx_erase_lists(trx_t *trx) { - ut_ad(trx->id > 0); - ut_ad(trx_sys_mutex_own()); +static void trx_update_index_scn(trx_t *trx) { + if (trx->scn_indexs.empty()) { + return; + } - trx_ids_t::iterator it = std::lower_bound(trx_sys->rw_trx_ids.begin(), - trx_sys->rw_trx_ids.end(), trx->id); + trx_id_t scn = trx->no; + if (scn == TRX_ID_MAX) { + scn = trx_sys->get_next_trx_scn(); + } - ut_ad(*it == trx->id); - trx_sys->rw_trx_ids.erase(it); + for (auto &item : trx->scn_indexs) { + dict_index_t *index = dict_index_get_by_id(item.first, item.second); - if (trx->read_only || trx->rsegs.m_redo.rseg == nullptr) { - ut_ad(!trx->in_rw_trx_list); - } else { - trx_remove_from_rw_trx_list(trx); - ut_ad(trx_sys_validate_trx_list()); + if (index != nullptr) { + if (unlikely(index->trx_id > trx->id)) { + trx_id_t curr_scn = scn_mgr->get_scn_fast(index->trx_id); - if (trx->read_view != nullptr) { - trx_sys->mvcc->view_close(trx->read_view, true); + if (curr_scn != TRX_ID_MAX) { + scn = std::max(curr_scn, scn); + } + } + + index->trx_scn = scn; + index->table->release(); } } - DEBUG_SYNC_C("after_trx_erase_lists"); + + trx->scn_indexs.clear(); } static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { @@ -1856,9 +1842,6 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) || trx_state_eq(trx, TRX_STATE_PREPARED)); - bool trx_sys_latch_is_needed = - (trx->id > 0) || trx_state_eq(trx, TRX_STATE_PREPARED); - /* Check and get GTID to be persisted. Do it outside mutex. It must be done before trx->state is changed to TRX_STATE_COMMITTED_IN_MEMORY, because the gtid_persistor.get_gtid_info() calls gtid_persistor.has_gtid() which checks @@ -1870,15 +1853,12 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { gtid_persistor.get_gtid_info(trx, gtid_desc); } - if (trx_sys_latch_is_needed) { - trx_sys_mutex_enter(); + if (trx->id > 0) { + trx_update_index_scn(trx); } - if (trx->id > 0) { - /* For consistent snapshot, we need to remove current - transaction from running transaction id list for mvcc - before doing commit and releasing locks. */ - trx_erase_lists(trx); + if (trx->id > 0 && trx->read_view != nullptr) { + trx_sys->mvcc->view_close(trx->read_view, true); } if (trx_state_eq(trx, TRX_STATE_PREPARED)) { @@ -1886,10 +1866,6 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { --trx_sys->n_prepared_trx; } - if (trx_sys_latch_is_needed) { - trx_sys_mutex_exit(); - } - auto state_transition = [&]() { trx_mutex_enter(trx); /* Please consider this particular point in time as the moment the trx's @@ -1933,8 +1909,6 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { become purged (because trx->no would no longer protect them). */ if (serialised) { - trx_sys_serialisation_mutex_enter(); - /* Add GTID to be persisted to disk table. It must be done ... 1.After the transaction is marked committed in undo. Otherwise GTID might get committed before the transaction commit on disk. @@ -1948,9 +1922,16 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { gtid_persistor.add(gtid_desc); } - trx_erase_from_serialisation_list_low(trx); + const auto trx_shard_no = trx_get_shard_no(trx->id + 1); + trx_sys->shards[trx_shard_no].commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + trx_list_with_min.remove_list(*trx); + }, UT_LOCATION_HERE); - trx_sys_serialisation_mutex_exit(); + ut_a(trx->no != TRX_ID_MAX); + scn_mgr->store_scn(trx->id, trx->no); + + DEBUG_SYNC_C("after_trx_erase_lists"); } lock_trx_release_locks(trx); @@ -1976,7 +1957,6 @@ written */ ut_ad(trx->read_only); ut_a(!trx->is_recovered); ut_ad(trx->rsegs.m_redo.rseg == nullptr); - ut_ad(!trx->in_rw_trx_list); /* Note: We are asserting without holding the locksys latch. But that is OK because this transaction is not waiting and cannot @@ -2032,10 +2012,6 @@ written */ gtid_persistor.set_persist_gtid(trx, false); if (mtr != nullptr) { - if (trx->rsegs.m_redo.insert_undo != nullptr) { - trx_undo_insert_cleanup(&trx->rsegs.m_redo, false); - } - if (trx->rsegs.m_noredo.insert_undo != nullptr) { trx_undo_insert_cleanup(&trx->rsegs.m_noredo, true); } @@ -2296,7 +2272,6 @@ void trx_cleanup_at_db_startup(trx_t *trx) /*!< in: transaction */ trx_sys_mutex_enter(); ut_a(!trx->read_only); - trx_remove_from_rw_trx_list(trx); trx_sys_mutex_exit(); @@ -2304,7 +2279,6 @@ void trx_cleanup_at_db_startup(trx_t *trx) /*!< in: transaction */ that it no longer is in the trx_list. Recovered transactions are never placed in the mysql_trx_list. */ ut_ad(trx->is_recovered); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx->in_mysql_trx_list); trx->state.store(TRX_STATE_NOT_STARTED, std::memory_order_relaxed); } @@ -2313,7 +2287,7 @@ void trx_cleanup_at_db_startup(trx_t *trx) /*!< in: transaction */ within the same transaction will get the same read view, which is created when this function is first called for a new started transaction. @return consistent read view */ -ReadView *trx_assign_read_view(trx_t *trx) /*!< in/out: active transaction */ +ReadView *trx_assign_read_view(trx_t *trx, bool is_shared) /*!< in/out: active transaction */ { ut_ad(trx_can_be_handled_by_current_thread_or_is_hp_victim(trx)); ut_ad(trx->state.load(std::memory_order_relaxed) == TRX_STATE_ACTIVE); @@ -2323,7 +2297,8 @@ ReadView *trx_assign_read_view(trx_t *trx) /*!< in/out: active transaction */ return (nullptr); } else if (!MVCC::is_view_active(trx->read_view)) { - trx_sys->mvcc->view_open(trx->read_view, trx); + trx_sys->mvcc->view_open(trx->read_view, trx, is_shared); + trx->read_view->set_trx(trx); } return (trx->read_view); @@ -2559,8 +2534,6 @@ void trx_print_low(FILE *f, bool newline; const char *op_info; - ut_ad(trx_sys_mutex_own()); - fprintf(f, "TRANSACTION " TRX_ID_FMT, trx_get_id_for_print(trx)); const auto trx_state = trx->state.load(std::memory_order_relaxed); @@ -2660,7 +2633,6 @@ void trx_print_latched(FILE *f, const trx_t *trx, ulint max_query_len) { /* We need exclusive access to lock_sys for lock_number_of_rows_locked(), and accessing trx->lock fields without trx->mutex.*/ ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); trx_print_low(f, trx, max_query_len, lock_number_of_rows_locked(&trx->lock), UT_LIST_GET_LEN(trx->lock.trx_locks), @@ -3095,10 +3067,7 @@ static void trx_set_prepared_in_tc(trx_t *trx) { /* Add GTID to be persisted to disk table, if needed. */ if (gtid_desc.m_is_set) { - /* The gtid_persistor.add() might release and re-acquire the mutex. */ - trx_sys_serialisation_mutex_enter(); gtid_persistor.add(gtid_desc); - trx_sys_serialisation_mutex_exit(); } /* Reset after successfully adding GTID to in memory table. */ @@ -3187,36 +3156,22 @@ static bool get_info_about_prepared_transaction(XA_recover_txn *txn_list, return false; } -/** This function is used to find number of prepared transactions and - their transaction objects for a recovery. - @return number of prepared transactions stored in xid_list */ -int trx_recover_for_mysql( - XA_recover_txn *txn_list, /*!< in/out: prepared transactions */ - ulint len, /*!< in: number of slots in xid_list */ - MEM_ROOT *mem_root) /*!< in: memory for table names */ -{ - ulint count = 0; - - ut_ad(txn_list); - ut_ad(len); - - /* We should set those transactions which are in the prepared state - to the xid_list */ - - trx_sys_mutex_enter(); - - for (const trx_t *trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - +void Trx_by_id_with_min::recover_prepared( + XA_recover_txn *txn_list, + MEM_ROOT *mem_root, + ulint &index, + ulint limit) { + for (auto item : m_by_id) { + const trx_t *trx = item.second; /* The state of a read-write transaction cannot change from or to NOT_STARTED while we are holding the trx_sys->mutex. It may change to PREPARED, but not if trx->is_recovered. */ if (trx_state_eq(trx, TRX_STATE_PREPARED)) { - if (get_info_about_prepared_transaction(&txn_list[count], trx, mem_root)) + if (get_info_about_prepared_transaction(&txn_list[index], trx, mem_root)) break; - if (count == 0) { + if (index == 0) { ib::info(ER_IB_MSG_1207) << "Starting recovery for" " XA transactions..."; } @@ -3227,15 +3182,41 @@ int trx_recover_for_mysql( ib::info(ER_IB_MSG_1209) << "Transaction contains changes to " << trx->undo_no << " rows"; - count++; + index++; - if (count == len) { + if (index == limit) { break; } } } +} - trx_sys_mutex_exit(); +/** This function is used to find number of prepared transactions and + their transaction objects for a recovery. + @return number of prepared transactions stored in xid_list */ +int trx_recover_for_mysql( + XA_recover_txn *txn_list, /*!< in/out: prepared transactions */ + ulint len, /*!< in: number of slots in xid_list */ + MEM_ROOT *mem_root) /*!< in: memory for table names */ +{ + ulint count = 0; + + ut_ad(txn_list); + ut_ad(len); + + /* We should set those transactions which are in the prepared state + to the xid_list */ + + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.recover_prepared(txn_list, mem_root, count, len);}, UT_LOCATION_HERE); + + if (count == len) { + break; + } + } if (count > 0) { ib::info(ER_IB_MSG_1210) << count @@ -3246,19 +3227,13 @@ int trx_recover_for_mysql( return (int(count)); } -int trx_recover_tc_for_mysql(Xa_state_list &xa_list) { - /* We should set those transactions which are in the prepared state - to the xid_list */ - - trx_sys_mutex_enter(); - - for (trx_t *trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - +void Trx_by_id_with_min::recover_tc(Xa_state_list &xa_list) { + for (auto item : m_by_id) { /* The state of a read-write transaction cannot change from or to NOT_STARTED while we are holding the trx_sys->mutex. It may change to PREPARED, but not if trx->is_recovered. */ + trx_t *trx = item.second; if (trx_state_eq(trx, TRX_STATE_PREPARED)) { if (trx_is_prepared_in_tc(trx)) { /* We found the transaction in 2nd phase of prepare, add to XA @@ -3270,31 +3245,29 @@ int trx_recover_tc_for_mysql(Xa_state_list &xa_list) { } } } +} - trx_sys_mutex_exit(); +int trx_recover_tc_for_mysql(Xa_state_list &xa_list) { + /* We should set those transactions which are in the prepared state + to the xid_list */ + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.recover_tc(xa_list);}, UT_LOCATION_HERE); + } return 0; } -/** This function is used to find one X/Open XA distributed transaction - which is in the prepared state - @return trx on match, the trx->xid will be invalidated; - */ -[[nodiscard]] static trx_t *trx_get_trx_by_xid_low( - const XID *xid) /*!< in: X/Open XA transaction - identifier */ -{ - ut_ad(trx_sys_mutex_own()); - - for (auto trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - +trx_t* Trx_by_id_with_min::get(const XID *xid) { + for (auto item: m_by_id) { + trx_t *trx = item.second; /* Most of the time server layer takes care of synchronizing access to a XID from several connections, but when disconnecting there is a short period in which server allows a new connection to pick up XID still processed by old connection at InnoDB layer. To synchronize with trx_disconnect_from_mysql(), we use trx->mysql_thd under protection of trx_sys->mutex. */ - if (trx->mysql_thd == nullptr && trx_state_eq(trx, TRX_STATE_PREPARED) && xid->eq(trx->xid)) { /* Invalidate the XID, so that subsequent calls @@ -3314,13 +3287,16 @@ trx_t *trx_get_trx_by_xid(const XID *xid) { return (nullptr); } - trx_sys_mutex_enter(); - - /* Recovered/Resurrected transactions are always only on the - trx_sys_t::rw_trx_list. */ - trx = trx_get_trx_by_xid_low(xid); + for (ulint i = 0; i < TRX_SHARDS_N; i+=2) { + Trx_shard &shard = trx_sys->shards[i]; + trx = shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.get(xid);}, UT_LOCATION_HERE); - trx_sys_mutex_exit(); + if (trx != nullptr) { + break; + } + } return (trx); } @@ -3419,7 +3395,6 @@ void trx_start_internal_read_only_low(trx_t *trx) { void trx_set_rw_mode(trx_t *trx) /*!< in/out: transaction that is RW */ { ut_ad(trx->rsegs.m_redo.rseg == nullptr); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx_is_autocommit_non_locking(trx)); ut_ad(!trx->read_only); ut_ad(trx_can_be_handled_by_current_thread_or_is_hp_victim(trx)); @@ -3441,22 +3416,17 @@ void trx_set_rw_mode(trx_t *trx) /*!< in/out: transaction that is RW */ DEBUG_SYNC_C("trx_sys_before_assign_id"); - trx_sys_mutex_enter(); - ut_ad(trx->id == 0); trx->id = trx_sys_allocate_trx_id(); - trx_sys->rw_trx_ids.push_back(trx->id); - /* So that we can see our own changes. */ if (MVCC::is_view_active(trx->read_view)) { MVCC::set_view_creator_trx_id(trx->read_view, trx->id); } - trx_add_to_rw_trx_list(trx); - - trx_sys_mutex_exit(); trx_sys_rw_trx_add(trx); + + trx_sys->next_trx_id_version.fetch_add(2); } void trx_kill_blocking(trx_t *trx) { diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc index 5bc422daeb5..3536a4827a3 100644 --- a/storage/innobase/trx/trx0undo.cc +++ b/storage/innobase/trx/trx0undo.cc @@ -294,12 +294,14 @@ trx_undo_rec_t *trx_undo_get_next_rec( @param[in,out] mtr Mini-transaction @return undo log record, the page latched, NULL if none */ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, + trx_id_t *modifier_trx_no, space_id_t space, const page_size_t &page_size, page_no_t page_no, ulint offset, ulint mode, mtr_t *mtr) { page_t *undo_page; trx_undo_rec_t *rec; + ulint type; const page_id_t page_id(space, page_no); @@ -312,6 +314,15 @@ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, if (modifier_trx_id != nullptr) { trx_ulogf_t *undo_header = undo_page + offset; *modifier_trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + ut_a(modifier_trx_no != nullptr); + *modifier_trx_no = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + ut_a(SCN_Mgr::is_scn(*modifier_trx_no)); + + /* For insert undo, return null */ + type = mtr_read_ulint(undo_page + TRX_UNDO_PAGE_HDR, MLOG_2BYTES, mtr); + if (type == TRX_UNDO_INSERT) { + return nullptr; + } } rec = trx_undo_page_get_first_rec(undo_page, page_no, offset); @@ -545,6 +556,7 @@ static ulint trx_undo_header_create( mach_write_to_2(log_hdr + TRX_UNDO_DEL_MARKS, true); mach_write_to_8(log_hdr + TRX_UNDO_TRX_ID, trx_id); + mach_write_to_8(log_hdr + TRX_UNDO_TRX_NO, 0); mach_write_to_2(log_hdr + TRX_UNDO_LOG_START, new_free); mach_write_to_1(log_hdr + TRX_UNDO_FLAGS, 0); @@ -688,11 +700,7 @@ void trx_undo_gtid_read_and_persist(trx_ulogf_t *undo_header) { /* Mark GTID valid. */ gtid_desc.m_is_set = true; - /* No concurrency is involved during recovery but satisfy - the interface requirement. */ - trx_sys_serialisation_mutex_enter(); gtid_persistor.add(gtid_desc); - trx_sys_serialisation_mutex_exit(); } if ((flag & TRX_UNDO_FLAG_GTID) == 0) { @@ -709,11 +717,7 @@ void trx_undo_gtid_read_and_persist(trx_ulogf_t *undo_header) { /* Mark GTID valid. */ gtid_desc.m_is_set = true; - /* No concurrency is involved during recovery but satisfy - the interface requirement. */ - trx_sys_serialisation_mutex_enter(); gtid_persistor.add(gtid_desc); - trx_sys_serialisation_mutex_exit(); } void trx_undo_gtid_write(trx_t *trx, trx_ulogf_t *undo_header, trx_undo_t *undo, @@ -904,6 +908,7 @@ static ulint trx_undo_insert_header_reuse( log_hdr = undo_page + free; mach_write_to_8(log_hdr + TRX_UNDO_TRX_ID, trx_id); + mach_write_to_8(log_hdr + TRX_UNDO_TRX_NO, 0); mach_write_to_2(log_hdr + TRX_UNDO_LOG_START, new_free); mach_write_to_1(log_hdr + TRX_UNDO_FLAGS, 0); @@ -972,6 +977,15 @@ buf_block_t *trx_undo_add_page( flst_add_last(header_page + TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST, new_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE, mtr); + + /* Stroe transaction id and hdr inforation in added page footer */ + byte* page_end = new_page + UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE; + mlog_write_ull(page_end, trx->id, mtr); + page_end += 8; + mlog_write_ulint(page_end, undo->hdr_page_no, MLOG_4BYTES, mtr); + page_end += 4; + mlog_write_ulint(page_end, undo->hdr_offset, MLOG_2BYTES, mtr); + undo->size++; rseg->incr_curr_size(); @@ -1198,7 +1212,7 @@ loop: mtr.set_log_mode(MTR_LOG_NO_REDO); } - rec = trx_undo_get_first_rec(nullptr, rseg->space_id, rseg->page_size, + rec = trx_undo_get_first_rec(nullptr, nullptr, rseg->space_id, rseg->page_size, hdr_page_no, hdr_offset, RW_X_LATCH, &mtr); if (rec == nullptr) { /* Already empty */ @@ -1292,6 +1306,7 @@ static trx_undo_t *trx_undo_mem_init( ulint type; ulint state; trx_id_t trx_id; + trx_id_t trx_no; ulint offset; fil_addr_t last_addr; page_t *last_page; @@ -1316,6 +1331,8 @@ static trx_undo_t *trx_undo_mem_init( trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + trx_no = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + auto flag = mtr_read_ulint(undo_header + TRX_UNDO_FLAGS, MLOG_1BYTE, mtr); bool xid_exists = ((flag & TRX_UNDO_FLAG_XID) != 0); @@ -1333,6 +1350,8 @@ static trx_undo_t *trx_undo_mem_init( undo->dict_operation = mtr_read_ulint(undo_header + TRX_UNDO_DICT_TRANS, MLOG_1BYTE, mtr); + undo->trx_no = trx_no; + undo->flag = flag; undo->m_gtid_storage = trx_undo_t::Gtid_storage::NONE; @@ -1640,8 +1659,9 @@ static trx_undo_t *trx_undo_reuse_cached(trx_rseg_t *rseg, ulint type, auto undo_page = trx_undo_page_get(page_id_t(undo->space, undo->hdr_page_no), undo->page_size, mtr); - ulint offset; + ulint offset = trx_undo_header_create(undo_page, trx_id, mtr); +#if 0 if (type == TRX_UNDO_INSERT) { offset = trx_undo_insert_header_reuse(undo_page, trx_id, mtr); gtid_storage = trx_undo_t::Gtid_storage::NONE; @@ -1652,6 +1672,7 @@ static trx_undo_t *trx_undo_reuse_cached(trx_rseg_t *rseg, ulint type, offset = trx_undo_header_create(undo_page, trx_id, mtr); } +#endif trx_undo_header_add_space_for_xid(undo_page, undo_page + offset, mtr, gtid_storage); @@ -1798,7 +1819,8 @@ func_exit: @return undo log segment header page, x-latched */ page_t *trx_undo_set_state_at_finish( trx_undo_t *undo, /*!< in: undo log memory copy */ - mtr_t *mtr) /*!< in: mtr */ + mtr_t *mtr, /*!< in: mtr */ + bool is_temp) /*!< in: true if it's tmp undo */ { trx_usegf_t *seg_hdr; trx_upagef_t *page_hdr; @@ -1816,8 +1838,7 @@ page_t *trx_undo_set_state_at_finish( if (undo->size == 1 && mach_read_from_2(page_hdr + TRX_UNDO_PAGE_FREE) < TRX_UNDO_PAGE_REUSE_LIMIT) { state = TRX_UNDO_CACHED; - - } else if (undo->type == TRX_UNDO_INSERT) { + } else if (is_temp && (undo->type == TRX_UNDO_INSERT)) { state = TRX_UNDO_TO_FREE; } else { state = TRX_UNDO_TO_PURGE; @@ -1921,25 +1942,38 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr) { trx_rseg_t *rseg; trx_undo_t *undo; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + rseg = undo_ptr->rseg; ut_ad(mutex_own(&(rseg->mutex))); trx_purge_add_update_undo_to_history( - trx, undo_ptr, undo_page, update_rseg_history_len, n_added_logs, mtr); + trx, undo_ptr, undo_page, update_rseg_history_len, is_insert, n_added_logs, mtr); - UT_LIST_REMOVE(rseg->update_undo_list, undo); - - undo_ptr->update_undo = nullptr; + if (is_insert) { + UT_LIST_REMOVE(rseg->insert_undo_list, undo); + undo_ptr->insert_undo = nullptr; + } else { + UT_LIST_REMOVE(rseg->update_undo_list, undo); + undo_ptr->update_undo = nullptr; + } if (undo->state == TRX_UNDO_CACHED) { - UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + if (is_insert) { + UT_LIST_ADD_FIRST(rseg->insert_undo_cached, undo); + } else { + UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + } MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED); } else {