diff --git a/storage/innobase/clone/clone0repl.cc b/storage/innobase/clone/clone0repl.cc index 97736155e44..ba6c24f9387 100644 --- a/storage/innobase/clone/clone0repl.cc +++ b/storage/innobase/clone/clone0repl.cc @@ -42,6 +42,45 @@ this program; if not, write to the Free Software Foundation, Inc., /* To get current session thread default THD */ THD *thd_get_current_thd(); +static std::atomic s_add_index {0}; +Clone_persist_gtid::Clone_persist_gtid() { + m_event = os_event_create(); + /* No background is created yet. */ + m_thread_active.store(false); + m_gtid_trx_no.store(0); + m_flush_number.store(0); + m_explicit_request.store(false); + m_active_number.store(m_flush_number.load() + 1); + /* We accept GTID even before the background service is started. This + is needed because we add GTIDs from undo log during recovery. */ + m_active.store(true); + m_num_gtid_mem.store(0); + m_flush_in_progress.store(false); + m_close_thread.store(false); + + m_lock = static_cast( + ut::malloc_withkey(UT_NEW_THIS_FILE_PSI_KEY, sizeof(*m_lock))); + rw_lock_create(PFS_NOT_INSTRUMENTED, m_lock, LATCH_ID_CLONE_REPL_LOCK); + + for (uint64_t i = 0; i < GTID_INFO_LIST_MAX_SLOT; i++) { + mutex_create(LATCH_ID_CLONE_REPL_MUTEX, &(m_mutexs[i])); + } +} + +/** Destructor: stop gtid thread */ +Clone_persist_gtid::~Clone_persist_gtid() { + ut_ad(!m_thread_active.load()); + stop(); + os_event_destroy(m_event); + + rw_lock_free(m_lock); + ut::free(m_lock); + + for (uint64_t i = 0; i < GTID_INFO_LIST_MAX_SLOT; i++) { + mutex_free(&(m_mutexs[i])); + } +} + void Clone_persist_gtid::add(const Gtid_desc >id_desc) { /* Check if valid descriptor. */ if (!gtid_desc.m_is_set) { @@ -51,26 +90,32 @@ void Clone_persist_gtid::add(const Gtid_desc >id_desc) { if (!is_active() || gtid_table_persistor == nullptr) { return; } - ut_ad(trx_sys_serialisation_mutex_own()); + s_lock(); /* If too many GTIDs are accumulated, wait for all to get flushed. Ignore timeout and loop to avoid possible hang. The insert should already be slowed down by the wait here. */ if (check_max_gtid_threshold() && is_thread_active()) { - trx_sys_serialisation_mutex_exit(); + s_unlock(); wait_flush(false, false, nullptr); - trx_sys_serialisation_mutex_enter(); + s_lock(); } - ut_ad(trx_sys_serialisation_mutex_own()); + uint64_t idx = (s_add_index.fetch_add(1)) % GTID_INFO_LIST_MAX_SLOT; + lock_shard(idx); + /* Get active GTID list */ - auto ¤t_gtids = get_active_list(); + auto ¤t_gtids = get_active_list(idx); /* Add input GTID to the set */ current_gtids.push_back(gtid_desc); + /* Atomic increment. */ int current_value = ++m_num_gtid_mem; + unlock_shard(idx); + s_unlock(); + /* Wake up background if GTIDs crossed threshold. */ if (current_value == s_gtid_threshold) { os_event_set(m_event); @@ -78,9 +123,7 @@ void Clone_persist_gtid::add(const Gtid_desc >id_desc) { DBUG_EXECUTE_IF("dont_compress_gtid_table", { /* For predictable outcome of mtr test we flush the GTID immediately. */ - trx_sys_serialisation_mutex_exit(); wait_flush(false, false, nullptr); - trx_sys_serialisation_mutex_enter(); }); } @@ -417,17 +460,12 @@ bool Clone_persist_gtid::debug_skip_write(bool compression) { } int Clone_persist_gtid::write_to_table(uint64_t flush_list_number, + uint64_t idx, Gtid_set &table_gtid_set, - Tsid_map &tsid_map) { + Gtid_set &write_gtid_set) { int err = 0; - Gtid_set write_gtid_set(&tsid_map, nullptr); - /* Allocate some intervals from stack */ - static const int PREALLOCATED_INTERVAL_COUNT = 64; - Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT]; - write_gtid_set.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv); - - auto &flush_list = get_list(flush_list_number); + auto &flush_list = get_list(flush_list_number, idx); /* Extract GTIDs from flush list. */ for (auto >id_desc : flush_list) { auto status = RETURN_STATUS_UNREPORTED_ERROR; @@ -454,30 +492,12 @@ int Clone_persist_gtid::write_to_table(uint64_t flush_list_number, if (debug_skip_write(false)) { flush_list.clear(); ut_ad((m_flush_number + 1) == flush_list_number); - m_flush_number.store(flush_list_number); return (0); } - bool is_recovery = !m_thread_active.load(); - if (is_recovery) { - /* During recovery, eliminate GTIDs already in gtid_executed table. */ - write_gtid_set.remove_gtid_set(&table_gtid_set); - table_gtid_set.add_gtid_set(&write_gtid_set); - } else { - /* Handle concurrent write by other threads when binlog is enabled. */ - gtid_state->update_prev_gtids(&write_gtid_set); - } - - /* Write GTIDs to table. */ - if (!write_gtid_set.is_empty()) { - ++m_compression_counter; - err = gtid_table_persistor->save(&write_gtid_set, false); - } - /* Clear flush list and return */ flush_list.clear(); ut_ad((m_flush_number + 1) == flush_list_number); - m_flush_number.store(flush_list_number); return (err); } @@ -513,7 +533,7 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { bool explicit_request = m_explicit_request.load(); - trx_sys_serialisation_mutex_enter(); + x_lock(); /* Get oldest transaction number that is yet to be committed. Any transaction with lower transaction number is committed and is added to GTID list. */ auto oldest_trx_no = trx_sys_oldest_trx_no(); @@ -524,8 +544,36 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { /* Switch active list and get the previous list to write to disk table. */ auto flush_list_number = switch_active_list(); /* Exit trx mutex during write to table. */ - trx_sys_serialisation_mutex_exit(); - err = write_to_table(flush_list_number, table_gtid_set, tsid_map); + x_unlock(); + Gtid_set write_gtid_set(&tsid_map, nullptr); + /* Allocate some intervals from stack */ + static const int PREALLOCATED_INTERVAL_COUNT = 64; + Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT]; + write_gtid_set.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv); + + for (uint64_t i = 0; i < GTID_INFO_LIST_MAX_SLOT; i++) { + err = write_to_table(flush_list_number, i, table_gtid_set, write_gtid_set); + } + + m_flush_number.store(flush_list_number); + + bool is_recovery = !m_thread_active.load(); + if (is_recovery) { + /* During recovery, eliminate GTIDs already in gtid_executed table. */ + write_gtid_set.remove_gtid_set(&table_gtid_set); + table_gtid_set.add_gtid_set(&write_gtid_set); + } else { + /* Handle concurrent write by other threads when binlog is enabled. */ + gtid_state->update_prev_gtids(&write_gtid_set); + } + + /* Write GTIDs to table. */ + if (!write_gtid_set.is_empty()) { + ++m_compression_counter; + err = gtid_table_persistor->save(&write_gtid_set, false); + } + + m_flush_number.store(flush_list_number); m_flush_in_progress.store(false); /* Compress always after recovery, if GTIDs are added. */ if (!m_thread_active.load()) { @@ -533,7 +581,7 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { ib::info(ER_IB_CLONE_GTID_PERSIST) << "GTID compression after recovery. "; } } else { - trx_sys_serialisation_mutex_exit(); + x_unlock(); } if (is_recovery) { @@ -577,7 +625,6 @@ void Clone_persist_gtid::flush_gtids(THD *thd) { } bool Clone_persist_gtid::check_max_gtid_threshold() { - ut_ad(trx_sys_serialisation_mutex_own()); /* Allow only one GTID to flush at a time. */ DBUG_EXECUTE_IF("dont_compress_gtid_table", { return m_num_gtid_mem.load() > 0; }); diff --git a/storage/innobase/dict/dict0mem.cc b/storage/innobase/dict/dict0mem.cc index 611ef80e89b..cda4da0e98d 100644 --- a/storage/innobase/dict/dict0mem.cc +++ b/storage/innobase/dict/dict0mem.cc @@ -615,7 +615,7 @@ bool dict_index_t::is_usable(const trx_t *trx) const { /* Check if the specified transaction can see this index. */ return (table->is_temporary() || trx_id == 0 || !MVCC::is_view_active(trx->read_view) || - trx->read_view->changes_visible(trx_id, table->name)); + trx->read_view->is_index_visible(trx_id, table->name)); } #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index cf964ee2f4b..f1b7c48e931 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -749,6 +749,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { PSI_MUTEX_KEY(sync_thread_mutex, 0, 0, PSI_DOCUMENT_ME), #endif /* UNIV_DEBUG */ PSI_MUTEX_KEY(trx_undo_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(trx_scn_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_pool_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_pool_manager_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(temp_pool_manager_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -769,6 +770,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { PSI_MUTEX_KEY(trx_sys_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_shard_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_serialisation_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(trx_sys_mvcc_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(zip_pad_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(master_key_id_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(sync_array_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -19165,11 +19167,7 @@ int ha_innobase::external_lock(THD *thd, /*!< in: handle to the user thread */ } else if (trx->isolation_level <= TRX_ISO_READ_COMMITTED && MVCC::is_view_active(trx->read_view)) { - mutex_enter(&trx_sys->mutex); - trx_sys->mvcc->view_close(trx->read_view, true); - - mutex_exit(&trx_sys->mutex); } } @@ -19760,12 +19758,7 @@ THR_LOCK_DATA **ha_innobase::store_lock( MVCC::is_view_active(trx->read_view)) { /* At low transaction isolation levels we let each consistent read set its own snapshot */ - - mutex_enter(&trx_sys->mutex); - trx_sys->mvcc->view_close(trx->read_view, true); - - mutex_exit(&trx_sys->mutex); } } @@ -23370,6 +23363,11 @@ static MYSQL_SYSVAR_BOOL( "Print all DDl logs to MySQL error log (off by default)", nullptr, nullptr, false); +static MYSQL_SYSVAR_BOOL( + mvcc_use_scn, srv_mvcc_use_scn, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY, + "Use SCN for MVCC or not", nullptr, nullptr, true); + #ifdef UNIV_DEBUG static MYSQL_SYSVAR_UINT(trx_rseg_n_slots_debug, trx_rseg_n_slots_debug, PLUGIN_VAR_RQCMDARG, @@ -23668,6 +23666,7 @@ static SYS_VAR *innobase_system_variables[] = { MYSQL_SYSVAR(redo_log_archive_dirs), MYSQL_SYSVAR(redo_log_encrypt), MYSQL_SYSVAR(print_ddl_logs), + MYSQL_SYSVAR(mvcc_use_scn), #ifdef UNIV_DEBUG MYSQL_SYSVAR(trx_rseg_n_slots_debug), MYSQL_SYSVAR(limit_optimistic_insert_debug), diff --git a/storage/innobase/include/clone0repl.h b/storage/innobase/include/clone0repl.h index f9a745125f3..7fb9eee0b30 100644 --- a/storage/innobase/include/clone0repl.h +++ b/storage/innobase/include/clone0repl.h @@ -54,6 +54,7 @@ using Gtid_info = std::array; struct Gtid_desc; +#define GTID_INFO_LIST_MAX_SLOT 256 /** List of GTIDs */ using Gtid_info_list = std::vector; @@ -71,28 +72,10 @@ struct Gtid_desc { class Clone_persist_gtid { public: /** Constructor: start gtid thread */ - Clone_persist_gtid() { - m_event = os_event_create(); - /* No background is created yet. */ - m_thread_active.store(false); - m_gtid_trx_no.store(0); - m_flush_number.store(0); - m_explicit_request.store(false); - m_active_number.store(m_flush_number.load() + 1); - /* We accept GTID even before the background service is started. This - is needed because we add GTIDs from undo log during recovery. */ - m_active.store(true); - m_num_gtid_mem.store(0); - m_flush_in_progress.store(false); - m_close_thread.store(false); - } + Clone_persist_gtid(); /** Destructor: stop gtid thread */ - ~Clone_persist_gtid() { - ut_ad(!m_thread_active.load()); - stop(); - os_event_destroy(m_event); - } + ~Clone_persist_gtid(); /** Start GTID persistence and background thread. @return true, if successful. */ @@ -199,6 +182,32 @@ class Clone_persist_gtid { Clone_persist_gtid &operator=(Clone_persist_gtid const &) = delete; private: + void x_lock() { + rw_lock_x_lock(m_lock, UT_LOCATION_HERE); + } + + void x_unlock() { + rw_lock_x_unlock(m_lock); + } + + void s_lock() { + rw_lock_s_lock(m_lock, UT_LOCATION_HERE); + } + + void s_unlock() { + rw_lock_s_unlock(m_lock); + } + + void lock_shard(uint64_t idx) { + ut_a(idx < GTID_INFO_LIST_MAX_SLOT); + mutex_enter(&(m_mutexs[idx])); + } + + void unlock_shard(uint64_t idx) { + ut_a(idx < GTID_INFO_LIST_MAX_SLOT); + mutex_exit(&(m_mutexs[idx])); + } + /** Check if GTID needs to persist at XA prepare. @param[in] thd session THD @param[in,out] trx current innnodb transaction @@ -234,17 +243,16 @@ class Clone_persist_gtid { bool early_timeout, Clone_Alert_Func cbk); /** @return current active GTID list */ - Gtid_info_list &get_active_list() { - ut_ad(trx_sys_serialisation_mutex_own()); - return (get_list(m_active_number)); + Gtid_info_list &get_active_list(uint64_t idx) { + return (get_list(m_active_number, idx)); } /** @return GTID list by number. @param[in] list_number list number @return GTID list reference. */ - Gtid_info_list &get_list(uint64_t list_number) { + Gtid_info_list &get_list(uint64_t list_number, uint64_t idx) { int list_index = (list_number & static_cast(1)); - return (m_gtids[list_index]); + return (m_gtids[list_index][idx]); } /** Check if we need to skip write or compression based on debug variables. @@ -256,7 +264,7 @@ class Clone_persist_gtid { @param[in] compress request compression of GTID table @return flush list number to track and wait for flush to complete. */ uint64_t request_immediate_flush(bool compress) { - trx_sys_serialisation_mutex_enter(); + x_lock(); /* We want to flush all GTIDs. */ uint64_t request_number = m_active_number.load(); /* If no GTIDs added to active, wait for previous index. */ @@ -265,7 +273,7 @@ class Clone_persist_gtid { --request_number; } m_flush_request_number = request_number; - trx_sys_serialisation_mutex_exit(); + x_unlock(); if (compress) { m_explicit_request.store(true); @@ -292,16 +300,11 @@ class Clone_persist_gtid { /** Switch active GTID list. */ uint64_t switch_active_list() { /* Switch active list under transaction system mutex. */ - ut_ad(trx_sys_serialisation_mutex_own()); uint64_t flush_number = m_active_number; ++m_active_number; m_compression_gtid_counter += m_num_gtid_mem; m_num_gtid_mem.store(0); -#ifdef UNIV_DEBUG - /* The new active list must have no elements. */ - auto &active_list = get_active_list(); - ut_ad(active_list.size() == 0); -#endif + return (flush_number); } @@ -310,8 +313,8 @@ class Clone_persist_gtid { @param[in,out] table_gtid_set GTIDs in table during recovery @param[in,out] tsid_map TSID map for GTIDs @return mysql error code. */ - int write_to_table(uint64_t flush_list_number, Gtid_set &table_gtid_set, - Tsid_map &tsid_map); + int write_to_table(uint64_t flush_list_number, uint64_t idx, Gtid_set &table_gtid_set, + Gtid_set &write_gtid_set); /** Update transaction number up to which GTIDs are flushed to table. @param[in] new_gtid_trx_no GTID transaction number */ @@ -343,7 +346,7 @@ class Clone_persist_gtid { /** Two lists of GTID. One of them is active where running transactions add their GTIDs. Other list is used to persist them to table from time to time. */ - Gtid_info_list m_gtids[2]; + Gtid_info_list m_gtids[2][GTID_INFO_LIST_MAX_SLOT]; /** Number of the current GTID list. Increased when list is switched */ std::atomic m_active_number; @@ -384,6 +387,10 @@ class Clone_persist_gtid { /** true, if GTID persistence is active.*/ std::atomic m_active; + + ib_mutex_t m_mutexs[GTID_INFO_LIST_MAX_SLOT]; + + rw_lock_t* m_lock; }; #endif /* CLONE_REPL_INCLUDE */ diff --git a/storage/innobase/include/read0read.h b/storage/innobase/include/read0read.h index e7693c93b8b..ff45e83074a 100644 --- a/storage/innobase/include/read0read.h +++ b/storage/innobase/include/read0read.h @@ -40,6 +40,8 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0types.h" #include "univ.i" +#define MAX_SNAPSHOT_SIZE 256 + /** The MVCC read view manager */ class MVCC { public: @@ -54,8 +56,10 @@ class MVCC { /** Allocate and create a view. @param view View owned by this class created for the caller. Must be freed by calling view_close() - @param trx Transaction instance of caller */ - void view_open(ReadView *&view, trx_t *trx); + @param trx Transaction instance of caller + @param is_shared shared read view by select count(*) + */ + void view_open(ReadView *&view, trx_t *trx, bool is_shared = false); /** Close a view created by the above function. @@ -73,7 +77,7 @@ class MVCC { /** @return the number of active views */ - ulint size() const; + ulint size(); /** @return true if the view is active and valid */ @@ -94,17 +98,20 @@ class MVCC { view->creator_trx_id(id); } + /** Get oldest scn for Purge system */ + void get_oldest_version(ReadView *purge_view); + private: /** Validates a read view list. */ - bool validate() const; + bool validate(uint64_t slot) const; /** Find a free view from the active list, if none found then allocate a new view. This function will also attempt to move delete marked views from the active list to the freed list. @return a view to use */ - inline ReadView *get_view(); + inline ReadView *get_view(uint64_t slot); /** Get the oldest view in the system. It will also move the delete @@ -118,14 +125,34 @@ class MVCC { MVCC &operator=(const MVCC &); private: + void enter(uint64_t slot) { + mutex_enter(&(m_mutexs[slot])); + } + + void exit(uint64_t slot) { + mutex_exit(&(m_mutexs[slot])); + } + + uint64_t get_slot() { + return (m_slot_index++) % m_slot_size; + } + + /** Index for calculating slot number */ + std::atomic m_slot_index; + typedef UT_LIST_BASE_NODE_T(ReadView, m_view_list) view_list_t; /** Free views ready for reuse. */ - view_list_t m_free; + view_list_t* m_free; /** Active and closed views, the closed views will have the creator trx id set to TRX_ID_MAX */ - view_list_t m_views; + view_list_t* m_views; + + ib_mutex_t* m_mutexs; + + /* Size of view list and mutex array */ + uint64_t m_slot_size; }; #endif /* read0read_h */ diff --git a/storage/innobase/include/read0types.h b/storage/innobase/include/read0types.h index 4eaf1732aab..d50b8e21b61 100644 --- a/storage/innobase/include/read0types.h +++ b/storage/innobase/include/read0types.h @@ -34,14 +34,17 @@ this program; if not, write to the Free Software Foundation, Inc., #ifndef read0types_h #define read0types_h +#include #include #include "dict0mem.h" #include "trx0types.h" - +#include "ut0seq_lock.h" // Friend declaration class MVCC; +typedef std::set trx_ids_set_t; + /** Read view lists the trx ids of those transactions for which a consistent read should not see the modifications to the database. */ @@ -187,6 +190,49 @@ class ReadView { @return true if view sees transaction id */ bool sees(trx_id_t id) const { return (id < m_up_limit_id); } + trx_id_t up_limit_id() { return m_up_limit_id; } + + trx_id_t real_up_limit_id() { return m_real_up_limit_id; } + + /** Check whether the changes on record are visible. + @param[in] index index object + @param[in] rec clust record + @param[in] offsets offset of the record + @return whether the view sees */ + bool changes_visible( + const dict_index_t *index, + const rec_t *rec, const ulint *offsets); + + bool changes_visible( + trx_id_t id, + const dict_index_t *index, + roll_ptr_t roll_ptr); + + bool is_index_visible(trx_id_t index_trx_id, const table_name_t &name); + + /** + @param id trx id + @param scn scn to check + @return true if view sees transaction scn */ + bool sees_version(trx_id_t id, trx_id_t scn) const { + if (scn == TRX_ID_MAX) return false; + if (m_invisible_ids.find(id) != m_invisible_ids.end()) { + /* Being committed while opening read view, always not visible */ + return false; + } + + return (m_version > scn); + } + + /** + @return version number of the view */ + trx_id_t version() { return m_version; } + + /** Store trx pointer which create this read view */ + void set_trx(trx_t *trx) { + m_trx = trx; + } + /** Mark the view as closed */ void close() { @@ -198,6 +244,14 @@ class ReadView { @return true if the view is closed */ bool is_closed() const { return (m_closed); } + uint64_t get_slot() { + return m_slot; + } + + void set_slot(uint64_t slot) { + m_slot = slot; + } + /** Write the limits to the file. @param file file to write to */ @@ -217,6 +271,10 @@ class ReadView { ut_d(m_view_low_limit_no = m_low_limit_no); m_low_limit_no = trx_no; } + + if (m_low_limit_no < m_version) { + m_version = m_low_limit_no; + } } /** @@ -254,6 +312,12 @@ class ReadView { @param id Creator transaction id */ inline void prepare(trx_id_t id); + /** Create snapshot with traditional way */ + inline void prepare_old(trx_id_t id); + + /** Create snapshot with SCN */ + inline void prepare_new(trx_id_t id); + /** Copy state from another view. Must call copy_complete() to finish. @param other view to copy from */ @@ -264,6 +328,10 @@ class ReadView { m_trx_ids too and adjust the m_up_limit_id *, if required */ inline void copy_complete(); + /** + Copy state from another view. Must call copy_complete() to finish. */ + inline void copy_prepare(trx_id_t version, trx_id_t low_id, trx_id_t up_id); + /** Set the creator transaction id, existing id must be 0 */ void creator_trx_id(trx_id_t id) { @@ -288,6 +356,9 @@ class ReadView { low water mark". */ trx_id_t m_up_limit_id; + /** Real up_limit_id considering long running transactions */ + trx_id_t m_real_up_limit_id; + /** trx id of creating transaction, set to TRX_ID_MAX for free views. */ trx_id_t m_creator_trx_id; @@ -296,6 +367,12 @@ class ReadView { was taken */ ids_t m_ids; + /** IDs set that are not visible to current session */ + std::unordered_set m_invisible_ids; + + /** IDs set when there's long running transactions */ + trx_ids_set_t m_long_running_ids; + /** The view does not need to see the undo logs for transactions whose transaction number is strictly smaller (<) than this value: they can be removed in purge if not needed by other views */ @@ -309,9 +386,22 @@ class ReadView { trx_id_t m_view_low_limit_no; #endif /* UNIV_DEBUG */ + /** The transaction that opens this read view */ + trx_t *m_trx; + + /** Version of the snapshot */ + trx_id_t m_version; + + /** The array index of mvcc */ + uint64_t m_slot; + /** AC-NL-RO transaction view that has been "closed". */ bool m_closed; + /** Read view is shared by multiple threads such as + SELECT COUNT(*) */ + bool m_shared; + typedef UT_LIST_NODE_T(ReadView) node_t; /** List of read views in trx_sys */ @@ -319,4 +409,162 @@ class ReadView { node_t m_view_list; }; +#define SCN_MAP_MAX_SIZE (1 * 1024 * 1024) +struct trx_data_t { + trx_id_t id {0}; + trx_id_t scn{0}; +}; + +typedef ut::Seq_lock Trx_seq_with_lock; + +/** A map used to store mapping of trx id to scn. */ +class Scn_Map { + public: + Scn_Map(); + ~Scn_Map(); + + inline void store(trx_id_t id, trx_id_t scn) { + Trx_seq_with_lock &seq = m_datas[(id/2) % m_size]; + seq.locking_write([&](trx_data_t &data) { + data.id = id; + data.scn = scn; + }); + } + + inline trx_id_t read(trx_id_t id) { + Trx_seq_with_lock &seq = m_datas[(id/2) % m_size]; + trx_data_t data; + + data = seq.read([](const trx_data_t &stored_data) { + return trx_data_t{stored_data.id, stored_data.scn}; + }); + + if (id == data.id) { + return data.scn; + } else { + return 0; + } + } + + private: + Trx_seq_with_lock *m_datas; + uint64_t m_size; +}; + +/** Handler of SCN Manager */ +class SCN_Mgr { +public: + /** Constructer */ + SCN_Mgr(); + + /** Destructor */ + ~SCN_Mgr(); + + void init(); + + trx_id_t startup_scn() { + return m_startup_scn; + } + + /** Store scn of the transaction for fast lookup + @param[in] id transaction id + @param[in] scn transaction no while committing. */ + void store_scn(trx_id_t id, trx_id_t scn) { + m_scn_map->store(id, scn); + } + + /** Quickly lookup scn of relative transaction id + @param[in] id transaction id + @return TRX_ID_MAX if still active, or 0 if not found, or scn value */ + trx_id_t get_scn_fast(trx_id_t id, trx_id_t *version = nullptr); + + /** Get SCN with relative transaction id + @param[in] id transaction id + @param[in] index index object where roll_ptr resides on + @param[in] roll_ptr rollback pointer of clust record */ + trx_id_t get_scn(trx_id_t id, const dict_index_t *index, roll_ptr_t roll_ptr, trx_id_t *version = nullptr); + + /** Get offset where scn is stored + @param[in] index index object + @param[in] offsets offset array of the clust record + @return offset where scn is stored */ + ulint scn_offset(const dict_index_t *index, const ulint *offsets); + + /** Start background threads */ + void start(); + + /** Stop background threads */ + void stop(); + + /* Periodically generate safe up limit id for taking snapshot */ + void view_task(); + + /**@return min active transaction id. This is not + an accurate number */ + trx_id_t min_active_id() { + return m_min_active_id.load(std::memory_order_relaxed); + } + + void take_up_ids(trx_id_t &up_id, trx_id_t &real_id, trx_ids_set_t &slow_ids); + +private: + + void set_view_id(trx_id_t limit_id); + + void set_view_no(trx_id_t limit_id); + + void x_lock() { + rw_lock_x_lock(m_lock, UT_LOCATION_HERE); + } + + void x_unlock() { + rw_lock_x_unlock(m_lock); + } + + void s_lock() { + rw_lock_s_lock(m_lock, UT_LOCATION_HERE); + } + + void s_unlock() { + rw_lock_s_unlock(m_lock); + } + + /** Storing trx id->scn mapping */ + Scn_Map *m_scn_map; + + /** Storing trx id->scn mapping to avoid duplicate + looking up */ + Scn_Map *m_random_map; + + /** up transaction id on startup */ + trx_id_t m_startup_id; + + /** SCN number taken on startup */ + trx_id_t m_startup_scn; + + /* Transaction ids that may be running too long, while + calculating m_min_active_id, these transactions will be ignored */ + trx_ids_set_t m_long_running_ids; + + /** thread event */ + os_event_t m_view_event; + + std::atomic m_has_slow_ids; + + /** Min active transaction id */ + std::atomic m_min_active_id; + + std::atomic m_fast_min_active_id; + + /** Flag to tell if background threads should stop or not */ + std::atomic m_abort; + + /** True if thread is active */ + std::atomic m_view_active; + + rw_lock_t* m_lock; +}; + +extern SCN_Mgr *scn_mgr; + #endif diff --git a/storage/innobase/include/row0purge.h b/storage/innobase/include/row0purge.h index b8fe1e7440e..58a5b59541d 100644 --- a/storage/innobase/include/row0purge.h +++ b/storage/innobase/include/row0purge.h @@ -101,6 +101,9 @@ struct purge_node_t { /** Trx that created this undo record */ trx_id_t modifier_trx_id; + + /** Trx no of modifier_trx_id */ + trx_id_t modifier_trx_no; }; using Recs = std::list>; diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index b6bf10810e0..e3c98534852 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -777,6 +777,7 @@ extern bool srv_cmp_per_index_enabled; extern bool srv_redo_log; +extern bool srv_mvcc_use_scn; /** Status variables to be passed to MySQL */ extern struct export_var_t export_vars; diff --git a/storage/innobase/include/sync0sync.h b/storage/innobase/include/sync0sync.h index 2052199296b..fb7967e8acc 100644 --- a/storage/innobase/include/sync0sync.h +++ b/storage/innobase/include/sync0sync.h @@ -156,6 +156,7 @@ extern mysql_pfs_key_t srv_monitor_file_mutex_key; extern mysql_pfs_key_t sync_thread_mutex_key; #endif /* UNIV_DEBUG */ extern mysql_pfs_key_t trx_undo_mutex_key; +extern mysql_pfs_key_t trx_scn_mutex_key; extern mysql_pfs_key_t trx_mutex_key; extern mysql_pfs_key_t trx_pool_mutex_key; extern mysql_pfs_key_t trx_pool_manager_mutex_key; @@ -166,6 +167,7 @@ extern mysql_pfs_key_t lock_wait_mutex_key; extern mysql_pfs_key_t trx_sys_mutex_key; extern mysql_pfs_key_t trx_sys_shard_mutex_key; extern mysql_pfs_key_t trx_sys_serialisation_mutex_key; +extern mysql_pfs_key_t trx_sys_mvcc_mutex_key; extern mysql_pfs_key_t srv_sys_mutex_key; extern mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/storage/innobase/include/sync0types.h b/storage/innobase/include/sync0types.h index c891effe436..d0a8d4c9144 100644 --- a/storage/innobase/include/sync0types.h +++ b/storage/innobase/include/sync0types.h @@ -417,6 +417,7 @@ enum latch_id_t { LATCH_ID_SRV_MONITOR_FILE, LATCH_ID_SYNC_THREAD, LATCH_ID_TRX_UNDO, + LATCH_ID_TRX_SCN, LATCH_ID_TRX_POOL, LATCH_ID_TRX_POOL_MANAGER, LATCH_ID_TEMP_POOL_MANAGER, @@ -424,6 +425,7 @@ enum latch_id_t { LATCH_ID_TRX_SYS, LATCH_ID_TRX_SYS_SHARD, LATCH_ID_TRX_SYS_SERIALISATION, + LATCH_ID_TRX_SYS_MVCC, LATCH_ID_SRV_SYS, LATCH_ID_SRV_SYS_TASKS, LATCH_ID_PAGE_ZIP_STAT_PER_INDEX, @@ -465,6 +467,9 @@ enum latch_id_t { LATCH_ID_DBLR, LATCH_ID_REDO_LOG_ARCHIVE_ADMIN_MUTEX, LATCH_ID_REDO_LOG_ARCHIVE_QUEUE_MUTEX, + LATCH_ID_CLONE_REPL_LOCK, + LATCH_ID_CLONE_REPL_MUTEX, + LATCH_ID_SCN_MGR_LOCK, LATCH_ID_TEST_MUTEX, LATCH_ID_MAX = LATCH_ID_TEST_MUTEX }; diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index 002b4b9e54f..f2ddb0b7646 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -73,7 +73,7 @@ void trx_purge_sys_close(void); /************************************************************************ Adds the update undo log as the first log in the history list. Removes the update undo log segment from the rseg slot if it is too big for reuse. */ -void trx_purge_add_update_undo_to_history( +void trx_purge_add_undo_to_history( trx_t *trx, /*!< in: transaction */ trx_undo_ptr_t *undo_ptr, /*!< in: update undo log. */ page_t *undo_page, /*!< in: update undo log header page, @@ -81,6 +81,7 @@ void trx_purge_add_update_undo_to_history( bool update_rseg_history_len, /*!< in: if true: update rseg history len else skip updating it. */ + bool is_insert, /*!< in: true if it's insert undo */ ulint n_added_logs, /*!< in: number of logs added */ mtr_t *mtr); /*!< in: mtr */ @@ -134,6 +135,9 @@ struct purge_iter_t { /** The transaction that created the undo log record, the Modifier trx id */ trx_id_t modifier_trx_id; + + /** Commit no of transaction with modifier_trx_id */ + trx_id_t modifier_trx_no; }; /* Namespace to hold all the related functions and variables needed @@ -1017,6 +1021,11 @@ struct trx_purge_t { /** The purge will not remove undo logs which are >= this view (purge view) */ ReadView view; + /** The scn of purge system before which undo can be purged */ + std::atomic version; + + std::atomic min_up_id; + /** Count of total tasks submitted to the task queue */ ulint n_submitted; diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h index 96d5e0f43c0..a004175d829 100644 --- a/storage/innobase/include/trx0rec.h +++ b/storage/innobase/include/trx0rec.h @@ -318,6 +318,15 @@ constexpr uint32_t TRX_UNDO_UPD_EXTERN = 128; constexpr uint32_t TRX_UNDO_INSERT_OP = 1; constexpr uint32_t TRX_UNDO_MODIFY_OP = 2; +/* store + - trx id(8 bytes) + why not 6 bytes ? because it need to be logged + and have no log type for 6 bytes), + - hdr page no (4 bytes), + - log hdr offset(2 bytes) +at end of page */ +#define TRX_UNDO_PAGE_RESERVE_SIZE 14 + /** The type and compilation info flag in the undo record for update. For easier understanding let the 8 bits be numbered as 7, 6, 5, 4, 3, 2, 1, 0. */ @@ -372,6 +381,18 @@ const byte *trx_undo_rec_get_pars( table_id_t *table_id, /*!< out: table id */ type_cmpl_t &type_cmpl); /*!< out: type compilation info. */ +trx_id_t trx_undo_hdr_get_scn( + trx_id_t trx_id, + page_id_t &page_id, + uint32_t offset, + mtr_t *mtr, + page_t *undo_page); + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id); + /** Get the max free space of undo log by assuming it's a fresh new page and the free space doesn't count for the undo log header too. */ size_t trx_undo_max_free_space(); diff --git a/storage/innobase/include/trx0rseg.h b/storage/innobase/include/trx0rseg.h index 280e3979cff..92b3cb5fb55 100644 --- a/storage/innobase/include/trx0rseg.h +++ b/storage/innobase/include/trx0rseg.h @@ -175,6 +175,8 @@ created so the next page created here should by FSP_FSEG_DIR_PAGE_NUM. @param[in] mtr mtr */ void trx_rseg_array_create(space_id_t space_id, mtr_t *mtr); +void trx_update_max_trx_id_startup(trx_id_t new_id); + /** Sets the page number of the nth rollback segment slot in the independent undo tablespace. @param[in] rsegs_header rollback segment array page header @@ -225,6 +227,11 @@ list. It is always increasing number over lifetime starting from zero. The size is 8 bytes. */ #define TRX_RSEG_MAX_TRX_NO TRX_RSEG_SLOT_END + +/* Use to track max transaction id */ +#define TRX_RSEG_MAX_TRX_ID (TRX_RSEG_MAX_TRX_NO + 8) + +#define TRX_RSEG_INSERT_HISTORY (TRX_RSEG_MAX_TRX_ID + 8) /*-------------------------------------------------------------*/ /** The offset of the Rollback Segment Directory header on an RSEG_ARRAY diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h index cf91a7919bb..c9e6401f38f 100644 --- a/storage/innobase/include/trx0sys.h +++ b/storage/innobase/include/trx0sys.h @@ -218,7 +218,7 @@ void trx_sys_close(void); /** Determine if there are incomplete transactions in the system. @return whether incomplete transactions need rollback */ -static inline bool trx_sys_need_rollback(); +bool trx_sys_need_rollback(); /** Reads number of recovered transactions which have state equal to TRX_STATE_ACTIVE (so are not prepared transactions). @@ -243,12 +243,6 @@ static inline void trx_sys_rw_trx_add(trx_t *trx); #endif /* !UNIV_HOTBACKUP */ -#ifdef UNIV_DEBUG -/** Validate the trx_sys_t::rw_trx_list. - @return true if the list is valid */ -bool trx_sys_validate_trx_list(); -#endif /* UNIV_DEBUG */ - /** Initialize trx_sys_undo_spaces, called once during srv_start(). */ void trx_sys_undo_spaces_init(); @@ -377,6 +371,33 @@ inline size_t trx_get_shard_no(trx_id_t trx_id) { } #ifndef UNIV_HOTBACKUP +struct lock_t; +struct dict_table_t; +struct trx_i_s_cache_t; + +class Trx_commit_serialisation_list { + public: + + /** Init the base node of list */ + void init_list() { + UT_LIST_INIT(serialisation_list); + } + + /** Add trx_t to serialisation_list */ + void add_list(trx_t &trx); + + /** Remove trx_t from serialisation_list */ + void remove_list(trx_t &trx); + + /*@return Min trx no on list */ + trx_id_t min_id() const { return m_min_id.load();} + + UT_LIST_BASE_NODE_T(trx_t, no_list) serialisation_list; + + /** Store min trx no of list */ + std::atomic m_min_id{0}; +}; + class Trx_by_id_with_min { struct Trx_track_hash { size_t operator()(const trx_id_t &key) const { @@ -395,9 +416,23 @@ class Trx_by_id_with_min { @see trx_rw_is_active for details.*/ std::atomic m_min_id{0}; + std::unordered_set m_long_running_ids; + + trx_id_t m_fast_min_id {0}; + public: + + /** Collect all active transaction ids */ + void collect_ids(std::unordered_set &id_set) { + for (auto item : m_by_id) { + id_set.insert(item.first); + } + } + By_id const &by_id() const { return m_by_id; } trx_id_t min_id() const { return m_min_id.load(); } + trx_t *get(const XID *xid); + trx_t *get(trx_id_t trx_id) const { const auto it = m_by_id.find(trx_id); trx_t *trx = it == m_by_id.end() ? nullptr : it->second; @@ -411,15 +446,138 @@ class Trx_by_id_with_min { void insert(trx_t &trx) { const trx_id_t trx_id = trx.id; ut_ad(0 == m_by_id.count(trx_id)); + trx.start_rw_time = std::chrono::steady_clock::now(); m_by_id.emplace(trx_id, &trx); if (m_by_id.size() == 1 || trx_id < m_min_id.load(std::memory_order_relaxed)) { m_min_id.store(trx_id, std::memory_order_release); + if (trx_id > m_fast_min_id || m_long_running_ids.empty()) { + m_fast_min_id = trx_id; + } + } + } + + /** Collect prepared transaction ids */ + void collect_prepared_ids(std::vector &trx_ids); + + /** Free prepared transactions on shutdown */ + void free_prepared(); + + /** Count of row for crash recovery */ + uint64_t recovered_rows() { + uint64_t count = 0; + for (auto item : m_by_id) { + const trx_t *trx = item.second; + ut_ad(trx->is_recovered); + if (trx_state_eq(trx, TRX_STATE_ACTIVE)) { + count += trx->undo_no; + } + } + + return count; + } + + /** Recover prepared transactions and add to list */ + void recover_prepared(XA_recover_txn *txn_list, MEM_ROOT *mem_root, ulint &index, ulint limit); + + /** Recover xa transactions */ + void recover_tc(Xa_state_list &xa_list); + + /** Count of active transactions */ + uint64_t size() { return m_by_id.size(); } + + /** Count of recovered active transactions */ + size_t recovered_active_count(); + + /** Fetch data of transaction into cache */ + bool fetch_data(trx_i_s_cache_t *cache); +#ifdef UNIV_DEBUG + void validate_table_lock(); + + const lock_t *table_locks_lookup(const dict_table_t *table); + + bool holds_expl_lock(ulint precise_mode, const buf_block_t *block, ulint heap_no, const trx_t *impl_trx); +#endif + + /** Remove locks on table */ + ulint recovered_trx_record_locks(dict_table_t *table); + + /** Collect recovered transaction ids */ + void collect_recovered_ids(std::vector &ids); + + /** Collect transaction ids of internel transactrion */ + void collect_internel_ids(std::vector &ids) { + for (auto item : m_by_id) { + const trx_t *trx = item.second; + if (trx->mysql_thd == nullptr) { + ut_a(trx->id > 0); + ids.push_back(trx->id); + } + } + } + + void get_min_with_slow( + trx_id_t &real_min, trx_id_t &fast_min, + std::unordered_set &slow_sets, trx_id_t limit) { + if (m_by_id.empty()) { + /* Will be ignored by caller */ + real_min = 0; + fast_min = 0; + + return; + } + + real_min = m_min_id.load(std::memory_order_relaxed); + ut_ad(m_fast_min_id != 0); + ut_ad(m_by_id.count(real_min) == 1); + + size_t long_ids_size = m_long_running_ids.size(); + size_t total_size = m_by_id.size(); + ut_ad(long_ids_size <= total_size); + + auto time_now = std::chrono::steady_clock::now(); + while (long_ids_size < total_size) { + auto itr = m_by_id.find(m_fast_min_id); + if (itr == m_by_id.end()) { + m_fast_min_id += TRX_SHARDS_N; + continue; + } + + trx_t *trx = itr->second; + ut_ad(trx != nullptr); + + //Not in heavy workload + if (limit < trx->id + 1024) { + break; + } + + if (time_now - trx->start_rw_time > std::chrono::seconds{10}) { + /* Find a long running transaction */ + m_long_running_ids.insert(m_fast_min_id); + long_ids_size++; + + m_fast_min_id += TRX_SHARDS_N; + } else { + break; + } + } + + slow_sets = m_long_running_ids; + if (long_ids_size == total_size) { + /* All transactions in set are slow, ignore it by setting to zero */ + fast_min = 0; + } else { + fast_min = m_fast_min_id; } } + void erase(trx_id_t trx_id) { ut_ad(1 == m_by_id.count(trx_id)); + + m_long_running_ids.erase(trx_id); + m_by_id.erase(trx_id); + if (m_min_id.load(std::memory_order_relaxed) == trx_id) { // We want at most 1 release store, so we use a local variable for the // loop. @@ -436,6 +594,9 @@ class Trx_by_id_with_min { } } m_min_id.store(new_min, std::memory_order_release); + if (new_min > m_fast_min_id || m_long_running_ids.empty()) { + m_fast_min_id = new_min; + } } } }; @@ -449,6 +610,11 @@ struct Trx_shard { active_rw_trxs; }; +struct Trx_commit_shard { + ut::Cacheline_padded> + commit_rw_trxs; +}; + /** The transaction system central memory data structure. */ struct trx_sys_t { /* Members protected by neither trx_sys_t::mutex nor serialisation_mutex. */ @@ -494,6 +660,15 @@ struct trx_sys_t { trx_t objects). */ std::atomic next_trx_id_or_no; + /** Used to sync with next_trx_id_or_no. While it + equals to next_trx_id_or_no, it can be sure that there's + no transaction on the way to adding to trx_shard */ + std::atomic next_trx_id_version; + + + /** resurrected max transaction id from undo seg on startup */ + std::atomic resurrect_max_trx_id; + /** @} */ /* Members protected by serialisation_mutex. */ @@ -530,12 +705,6 @@ struct trx_sys_t { /** Mutex protecting most fields in this structure (the default one). */ TrxSysMutex mutex; - char pad5[ut::INNODB_CACHE_LINE_SIZE]; - - /** List of active and committed in memory read-write transactions, sorted - on trx id, biggest first. Recovered transactions are always on this list. */ - UT_LIST_BASE_NODE_T(trx_t, trx_list) rw_trx_list; - char pad6[ut::INNODB_CACHE_LINE_SIZE]; /** List of transactions created for MySQL. All user transactions are @@ -556,8 +725,11 @@ struct trx_sys_t { /** Mapping from transaction id to transaction instance. */ Trx_shard shards[TRX_SHARDS_N]; + /** Sharded list for storing transactions being committed */ + Trx_commit_shard commit_shards[TRX_SHARDS_N]; + /** Number of transactions currently in the XA PREPARED state. */ - ulint n_prepared_trx; + std::atomic n_prepared_trx; /** True if XA PREPARED trxs are found. */ bool found_prepared_trx; @@ -606,22 +778,8 @@ static inline void trx_sys_mutex_exit() { trx_sys->mutex.exit(); } /** Test if trx_sys->mutex is owned. */ static inline bool trx_sys_mutex_own() { return trx_sys->mutex.is_owned(); } -/** Test if trx_sys->serialisation_mutex is owned. */ -static inline bool trx_sys_serialisation_mutex_own() { - return trx_sys->serialisation_mutex.is_owned(); -} #endif -/** Acquire the trx_sys->serialisation_mutex. */ -static inline void trx_sys_serialisation_mutex_enter() { - mutex_enter(&trx_sys->serialisation_mutex); -} - -/** Release the trx_sys->serialisation_mutex. */ -static inline void trx_sys_serialisation_mutex_exit() { - trx_sys->serialisation_mutex.exit(); -} - #endif /* !UNIV_HOTBACKUP */ #include "trx0sys.ic" diff --git a/storage/innobase/include/trx0sys.ic b/storage/innobase/include/trx0sys.ic index d23e51b28c5..57f2d517e12 100644 --- a/storage/innobase/include/trx0sys.ic +++ b/storage/innobase/include/trx0sys.ic @@ -231,38 +231,12 @@ inline trx_id_t trx_sys_get_trx_id_write_margin() { return TRX_SYS_TRX_ID_WRITE_MARGIN; } -/** Allocates a new transaction id or transaction number. -@return new, allocated trx id or trx no */ -inline trx_id_t trx_sys_allocate_trx_id_or_no() { - ut_ad(trx_sys_mutex_own() || trx_sys_serialisation_mutex_own()); - - trx_id_t trx_id = trx_sys->next_trx_id_or_no.fetch_add(1); - - if (trx_id % trx_sys_get_trx_id_write_margin() == 0) { - /* Reserve the next range of trx_id values. This thread has acquired - either the trx_sys_mutex or the trx_sys_serialisation_mutex. - - Therefore at least one of these two mutexes, is latched and it stays - latched until the call to trx_sys_write_max_trx_id() is finished. - - Meanwhile other threads could be acquiring the other of these two mutexes, - reserving more and more trx_id values, until TRX_SYS_TRX_ID_WRITE_MARGIN - next values are reserved, when another trx_sys_write_max_trx_id() would - be called. */ - - trx_sys_write_max_trx_id(); - } - return trx_id; -} - inline trx_id_t trx_sys_allocate_trx_id() { - ut_ad(trx_sys_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + return trx_sys->next_trx_id_or_no.fetch_add(1); } inline trx_id_t trx_sys_allocate_trx_no() { - ut_ad(trx_sys_serialisation_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + return trx_sys->next_trx_id_or_no.fetch_add(1); } /** Reads trx->no up to which all transactions have been serialised. @@ -275,22 +249,6 @@ inline trx_id_t trx_sys_get_next_trx_id_or_no() { return trx_sys->next_trx_id_or_no.load(); } -/** Determine if there are incomplete transactions in the system. -@return whether incomplete transactions need rollback */ -static inline bool trx_sys_need_rollback() { - ulint n_trx; - - trx_sys_mutex_enter(); - - n_trx = UT_LIST_GET_LEN(trx_sys->rw_trx_list); - ut_ad(n_trx >= trx_sys->n_prepared_trx); - n_trx -= trx_sys->n_prepared_trx; - - trx_sys_mutex_exit(); - - return (n_trx > 0); -} - static inline void trx_sys_rw_trx_add(trx_t *trx) { const trx_id_t trx_id = trx->id; ut_ad(trx_id != 0); diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 8edbdb09e1e..280a74a62e7 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -55,6 +55,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0read.h" #include "sql/handler.h" // Xa_state_list #include "srv0srv.h" +#include +#include +#include /* std::vector to store the trx id & table id of tables that needs to be * rollbacked. We take SHARED MDL on these tables inside @@ -211,7 +214,7 @@ void trx_mark_sql_stat_end(trx_t *trx); /*!< in: trx handle */ /** Assigns a read view for a consistent read query. All the consistent reads within the same transaction will get the same read view, which is created when this function is first called for a new started transaction. */ -ReadView *trx_assign_read_view(trx_t *trx); /*!< in: active transaction */ +ReadView *trx_assign_read_view(trx_t *trx, bool is_shared = false); /*!< in: active transaction */ /** @return the transaction's read view or NULL if one not assigned. */ static inline ReadView *trx_get_read_view(trx_t *trx); @@ -272,7 +275,6 @@ static inline void trx_set_dict_operation(trx_t *trx, enum trx_dict_op_t op); /** Determines if a transaction is in the given state. The caller must hold trx_sys->mutex, or it must be the thread that is serving a running transaction. -A running RW transaction must be in trx_sys->rw_trx_list. @param[in] trx Transaction. @param[in] state State. @return true if trx->state == state */ @@ -705,6 +707,9 @@ struct trx_t { `lock`, which are protected by lock_sys latches) */ mutable TrxMutex mutex; + /** Mutex protecting allocating && assign trx scn */ + TrxMutex scn_mutex; + /* Note: in_depth was split from in_innodb for fixing a RO performance issue. Acquiring the trx_t::mutex for each row costs ~3% in performance. It is not required for correctness. @@ -783,13 +788,13 @@ struct trx_t { list. During this switch we assign it a rollback segment. When a transaction is NOT_STARTED, it can be in_mysql_trx_list if - it is a user transaction. It cannot be in rw_trx_list. + it is a user transaction. It cannot be in trx shard. - ACTIVE->PREPARED->COMMITTED is only possible when trx->in_rw_trx_list. + ACTIVE->PREPARED->COMMITTED is only possible when trx is in trx shard. The transition ACTIVE->PREPARED is protected by trx_sys->mutex. ACTIVE->COMMITTED is possible when the transaction is in - rw_trx_list. + trx shard. Transitions to COMMITTED are protected by trx->mutex. @@ -828,9 +833,6 @@ struct trx_t { Set to true when srv_is_being_started for recovered transactions. Set to false without any protection in trx_init (where no other thread should access this object anyway). - Can be read safely when holding trx_sys->mutex and trx belongs to rw_trx_list, - as trx_init can not be called until trx leaves rw_trx_list which requires the - trx_sys->mutex. */ bool is_recovered; @@ -926,6 +928,8 @@ struct trx_t { std::chrono::system_clock::time_point{}}; static_assert(decltype(start_time)::is_always_lock_free); + std::chrono::steady_clock::time_point start_rw_time; + lsn_t commit_lsn; /*!< lsn at the time of the commit */ /*------------------------------*/ @@ -950,11 +954,6 @@ struct trx_t { statement uses, except those in consistent read */ /*------------------------------*/ -#ifdef UNIV_DEBUG - /** True iff in trx_sys->rw_trx_list */ - bool in_rw_trx_list; - -#endif /* UNIV_DEBUG */ UT_LIST_NODE_T(trx_t) mysql_trx_list; /*!< list of transactions created for MySQL; protected by trx_sys->mutex */ @@ -1168,14 +1167,6 @@ static inline void check_trx_state(const trx_t *t) { ut_error; } -/** -Assert that the transaction is in the trx_sys_t::rw_trx_list */ -static inline void assert_trx_in_rw_list(const trx_t *t) { - ut_ad(!t->read_only); - ut_ad(t->in_rw_trx_list == !(t->read_only || !t->rsegs.m_redo.rseg)); - check_trx_state(t); -} - /** Check if transaction is free so that it can be re-initialized. @param t transaction handle */ static inline void assert_trx_is_free(const trx_t *t) { @@ -1198,14 +1189,13 @@ static inline void assert_trx_is_inactive(const trx_t *t) { #ifdef UNIV_DEBUG /** Assert that an autocommit non-locking select cannot be in the - rw_trx_list and that it is a read-only transaction. + trx shard and that it is a read-only transaction. The transaction must be in the mysql_trx_list. */ static inline void assert_trx_nonlocking_or_in_list(const trx_t *t) { if (trx_is_autocommit_non_locking(t)) { trx_state_t t_state = t->state; ut_ad(t->read_only); ut_ad(!t->is_recovered); - ut_ad(!t->in_rw_trx_list); ut_ad(t->in_mysql_trx_list); ut_ad(t_state == TRX_STATE_NOT_STARTED || t_state == TRX_STATE_FORCED_ROLLBACK || t_state == TRX_STATE_ACTIVE); @@ -1215,7 +1205,7 @@ static inline void assert_trx_nonlocking_or_in_list(const trx_t *t) { } #else /* UNIV_DEBUG */ /** Assert that an autocommit non-locking select cannot be in the - rw_trx_list and that it is a read-only transaction. + trx shard and that it is a read-only transaction. The transaction must be in the mysql_trx_list. */ #define assert_trx_nonlocking_or_in_list(trx) ((void)0) #endif /* UNIV_DEBUG */ diff --git a/storage/innobase/include/trx0trx.ic b/storage/innobase/include/trx0trx.ic index 675dc81ca40..7d309a28552 100644 --- a/storage/innobase/include/trx0trx.ic +++ b/storage/innobase/include/trx0trx.ic @@ -59,8 +59,6 @@ static inline bool trx_state_eq(const trx_t *trx, trx_state_t state) { ut_a(state == TRX_STATE_NOT_STARTED || state == TRX_STATE_FORCED_ROLLBACK); - ut_ad(!trx->in_rw_trx_list); - return true; } ut_error; diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h index e90650cafca..2e9036fbbb2 100644 --- a/storage/innobase/include/trx0types.h +++ b/storage/innobase/include/trx0types.h @@ -44,6 +44,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include #include #include +#include /** printf(3) format used for printing DB_TRX_ID and other system fields */ #define TRX_ID_FMT IB_ID_FMT diff --git a/storage/innobase/include/trx0undo.h b/storage/innobase/include/trx0undo.h index 2cd648a12e1..863a0c93b11 100644 --- a/storage/innobase/include/trx0undo.h +++ b/storage/innobase/include/trx0undo.h @@ -227,7 +227,8 @@ ulint trx_undo_lists_init( @return undo log segment header page, x-latched */ page_t *trx_undo_set_state_at_finish( trx_undo_t *undo, /*!< in: undo log memory copy */ - mtr_t *mtr); /*!< in: mtr */ + mtr_t *mtr, /*!< in: mtr */ + bool is_temp = false); /*!< in: true if it's tmp undo */ /** Set the state of the undo log segment at a XA PREPARE or XA ROLLBACK. @param[in,out] trx Transaction @@ -257,7 +258,7 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr); /** Frees an insert undo log after a transaction commit or rollback. @@ -388,6 +389,7 @@ struct trx_undo_t { field */ trx_id_t trx_id; /*!< id of the trx assigned to the undo log */ + trx_id_t trx_no; /*!< commit no of trx with trx_id */ XID xid; /*!< X/Open XA transaction identification */ ulint flag; /*!< flag for current transaction XID and GTID. diff --git a/storage/innobase/include/ut0guarded.h b/storage/innobase/include/ut0guarded.h index 231f32b668a..63dd9a17b62 100644 --- a/storage/innobase/include/ut0guarded.h +++ b/storage/innobase/include/ut0guarded.h @@ -54,6 +54,11 @@ class Guarded { return std::forward(f)(inner); } + template + auto execute_no_latch(F &&f) { + return std::forward(f)(inner); + } + const Inner &peek() const { return inner; } }; } // namespace ut diff --git a/storage/innobase/include/ut0seq_lock.h b/storage/innobase/include/ut0seq_lock.h index 0a1fc4436c5..9c2ca5806ba 100644 --- a/storage/innobase/include/ut0seq_lock.h +++ b/storage/innobase/include/ut0seq_lock.h @@ -73,6 +73,24 @@ class Seq_lock : private Non_copyable { op(m_value); m_seq.store(old + 2, std::memory_order_release); } + + template + void locking_write(Op &&op) { + auto old = m_seq.load(std::memory_order_relaxed); + while (true) { + if (old & 1) { + std::this_thread::yield(); + old = m_seq.load(std::memory_order_relaxed); + } else if (m_seq.compare_exchange_weak(old, old + 1)) { + break; + } + } + + std::atomic_thread_fence(std::memory_order_release); + op(m_value); + m_seq.store(old + 2, std::memory_order_release); + } + /* Reads the value of the stored value of type T using operation op(). The op() can use memory_order_relaxed loads. The op() can't assume the data stored inside T is logically consistent. Calls to this method don't need to be diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 309dec0cee4..3b5f97ea90a 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -253,12 +253,7 @@ bool lock_clust_rec_cons_read_sees( return (true); } - /* NOTE that we call this function while holding the search - system latch. */ - - trx_id_t trx_id = row_get_rec_trx_id(rec, index, offsets); - - return (view->changes_visible(trx_id, index->table->name)); + return (view->changes_visible(index, rec, offsets)); } /** Checks that a non-clustered index record is seen in a consistent read. @@ -964,34 +959,25 @@ The difficulties to keep in mind here: the seen trx_id is still active or not */ static bool can_older_trx_be_still_active(trx_id_t max_old_active_id) { - if (mutex_enter_nowait(&trx_sys->mutex) != 0) { - ut_ad(!trx_sys_mutex_own()); - /* The mutex is currently locked by somebody else. Instead of wasting time - on spinning and waiting to acquire it, we loop over the shards and check if - any of them contains a value in the range (-infinity,max_old_active_id]. - NOTE: Do not be tempted to "cache" the minimum, until you also enforce that - transactions are inserted to shards in a monotone order! - Current implementation heavily depends on the property that even if we put - a trx with smaller id to any structure later, it could not have modified a - row the caller saw earlier. */ - static_assert(TRX_SHARDS_N < 1000, "The loop should be short"); - for (auto &shard : trx_sys->shards) { - if (shard.active_rw_trxs.peek().min_id() <= max_old_active_id) { - return true; - } + ut_ad(!trx_sys_mutex_own()); + /* The mutex is currently locked by somebody else. Instead of wasting time + on spinning and waiting to acquire it, we loop over the shards and check if + any of them contains a value in the range (-infinity,max_old_active_id]. + NOTE: Do not be tempted to "cache" the minimum, until you also enforce that + transactions are inserted to shards in a monotone order! + Current implementation heavily depends on the property that even if we put + a trx with smaller id to any structure later, it could not have modified a + row the caller saw earlier. */ + static_assert(TRX_SHARDS_N < 1000, "The loop should be short"); + for (auto &shard : trx_sys->shards) { + trx_id_t min_id = shard.active_rw_trxs.peek().min_id(); + if (min_id == 0) continue; + if (min_id <= max_old_active_id) { + return true; } - return false; } - ut_ad(trx_sys_mutex_own()); - const trx_t *trx = UT_LIST_GET_LAST(trx_sys->rw_trx_list); - if (trx == nullptr) { - trx_sys_mutex_exit(); - return false; - } - assert_trx_in_rw_list(trx); - const trx_id_t min_active_now_id = trx->id; - trx_sys_mutex_exit(); - return min_active_now_id <= max_old_active_id; + + return false; } /** Checks if some transaction has an implicit x-lock on a record in a secondary @@ -1041,6 +1027,23 @@ static trx_t *lock_sec_rec_some_has_impl(const rec_t *rec, dict_index_t *index, } #ifdef UNIV_DEBUG +bool Trx_by_id_with_min::holds_expl_lock( + ulint precise_mode, const buf_block_t *block, ulint heap_no, const trx_t *impl_trx) { + + for (auto item : m_by_id) { + const trx_t* trx = item.second; + const lock_t *expl_lock = + lock_rec_has_expl(precise_mode, block, heap_no, trx); + if (expl_lock && expl_lock->trx != impl_trx) { + /* An explicit lock is held by trx other than + the trx holding the implicit lock. */ + return true; + } + } + + return false; +} + /** Checks if some transaction, other than given trx_id, has an explicit lock on the given rec, in the given precise_mode. @param[in] precise_mode LOCK_S or LOCK_X possibly ORed to LOCK_GAP or @@ -1067,21 +1070,19 @@ static bool lock_rec_other_trx_holds_expl(ulint precise_mode, const trx_t *trx, the transaction was not committed yet. */ if (trx_t *impl_trx = trx_rw_is_active(trx->id, false)) { ulint heap_no = page_rec_get_heap_no(rec); - mutex_enter(&trx_sys->mutex); - for (auto t : trx_sys->rw_trx_list) { - const lock_t *expl_lock = - lock_rec_has_expl(precise_mode, block, heap_no, t); + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + holds = + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.holds_expl_lock(precise_mode, block, heap_no, impl_trx); + }, UT_LOCATION_HERE); - if (expl_lock && expl_lock->trx != impl_trx) { - /* An explicit lock is held by trx other than - the trx holding the implicit lock. */ - holds = true; + if (holds) { break; } } - - mutex_exit(&trx_sys->mutex); } return (holds); @@ -4248,26 +4249,10 @@ static void lock_remove_all_on_table_for_trx( trx_mutex_exit(trx); } -/** Remove any explicit record locks held by recovering transactions on - the table. - @return number of recovered transactions examined */ -static ulint lock_remove_recovered_trx_record_locks( - dict_table_t *table) /*!< in: check if there are any locks - held on records in this table or on the - table itself */ -{ - ut_a(table != nullptr); - /* We need exclusive lock_sys latch, as we are about to iterate over locks - held by multiple transactions while they might be operating. */ - ut_ad(locksys::owns_exclusive_global_latch()); - +ulint Trx_by_id_with_min::recovered_trx_record_locks(dict_table_t *table) { ulint n_recovered_trx = 0; - - mutex_enter(&trx_sys->mutex); - - for (trx_t *trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - + for (auto item : m_by_id) { + trx_t *trx = item.second; if (!trx->is_recovered) { continue; } @@ -4305,7 +4290,32 @@ static ulint lock_remove_recovered_trx_record_locks( ++n_recovered_trx; } - mutex_exit(&trx_sys->mutex); + return n_recovered_trx; +} + +/** Remove any explicit record locks held by recovering transactions on + the table. + @return number of recovered transactions examined */ +static ulint lock_remove_recovered_trx_record_locks( + dict_table_t *table) /*!< in: check if there are any locks + held on records in this table or on the + table itself */ +{ + ut_a(table != nullptr); + /* We need exclusive lock_sys latch, as we are about to iterate over locks + held by multiple transactions while they might be operating. */ + ut_ad(locksys::owns_exclusive_global_latch()); + + ulint n_recovered_trx = 0; + + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + n_recovered_trx += + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.recovered_trx_record_locks(table); + }, UT_LOCATION_HERE); + } return (n_recovered_trx); } @@ -4609,64 +4619,6 @@ class TrxLockIterator { ulint m_index; }; -/** This iterates over RW trx_sys lists only. We need to keep -track where the iterator was up to and we do that using an ordinal value. */ - -class TrxListIterator { - public: - TrxListIterator() : m_index() { - /* We iterate over the RW trx list only. */ - - m_trx_list = &trx_sys->rw_trx_list; - } - - /** Get the current transaction whose ordinality is m_index. - @return current transaction or 0 */ - - const trx_t *current() { return (reposition()); } - - /** Advance the transaction current ordinal value and reset the - transaction lock ordinal value */ - - void next() { - ++m_index; - m_lock_iter.rewind(); - } - - TrxLockIterator &lock_iter() { return (m_lock_iter); } - - private: - /** Reposition the "cursor" on the current transaction. If it - is the first time then the "cursor" will be positioned on the - first transaction. - - @return transaction instance or 0 */ - const trx_t *reposition() const { - ulint i = 0; - - /* Make the transaction at the ordinal value of m_index - the current transaction. ie. reposition/restore */ - - for (auto trx : *m_trx_list) { - if (i++ == m_index) { - return trx; - } - check_trx_state(trx); - } - - return nullptr; - } - - /** Ordinal value of the transaction in the current transaction list */ - ulint m_index; - - /** Current transaction list */ - decltype(trx_sys->rw_trx_list) *m_trx_list; - - /** For iterating over a transaction's locks */ - TrxLockIterator m_lock_iter; -}; - /** Prints transaction lock wait and MVCC state. @param[in,out] file file where to print @param[in] trx transaction */ @@ -4818,64 +4770,62 @@ void lock_print_info_all_transactions(FILE *file) { fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n"); - mutex_enter(&trx_sys->mutex); + std::vector active_ids; + active_ids.clear(); + + trx_sys_mutex_enter(); /* First print info on non-active transactions */ /* NOTE: information of auto-commit non-locking read-only transactions will be omitted here. The information will be available from INFORMATION_SCHEMA.INNODB_TRX. */ + for (auto trx : trx_sys->mysql_trx_list) { + /* We require exclusive access to lock_sys */ + ut_ad(locksys::owns_exclusive_global_latch()); + ut_ad(trx->in_mysql_trx_list); - PrintNotStarted print_not_started(file); - ut_list_map(trx_sys->mysql_trx_list, print_not_started); - - const trx_t *trx; - TrxListIterator trx_iter; - const trx_t *prev_trx = nullptr; - - /* Control whether a block should be fetched from the buffer pool. */ - bool load_block = true; - bool monitor = srv_print_innodb_lock_monitor; - - while ((trx = trx_iter.current()) != nullptr) { - check_trx_state(trx); - - if (trx != prev_trx) { - lock_trx_print_wait_and_mvcc_state(file, trx); - prev_trx = trx; - - /* The transaction that read in the page is no - longer the one that read the page in. We need to - force a page read. */ - load_block = true; + /* See state transitions and locking rules in trx0trx.h */ + trx_mutex_enter(trx); + if (trx_state_eq(trx, TRX_STATE_NOT_STARTED)) { + fputs("---", file); + trx_print_latched(file, trx, 600); } - /* If we need to print the locked record contents then we - need to fetch the containing block from the buffer pool. */ - if (monitor) { - /* Print the locks owned by the current transaction. */ - TrxLockIterator &lock_iter = trx_iter.lock_iter(); - - if (!lock_trx_print_locks(file, trx, lock_iter, load_block)) { - /* Resync trx_iter, the trx_sys->mutex and exclusive global latch were - temporarily released. A page was successfully read in. We need to print - its contents on the next call to lock_trx_print_locks(). On the next - call to lock_trx_print_locks() we should simply print the contents of - the page just read in.*/ - load_block = false; - - continue; - } + //TBD: Is it thread safe ? + trx_id_t id = trx->id; + if (id > 0) { + active_ids.push_back(id); } + trx_mutex_exit(trx); + } - load_block = true; + trx_sys_mutex_exit(); - /* All record lock details were printed without fetching - a page from disk, or we didn't need to print the detail. */ - trx_iter.next(); + /* Collect internel transaction ids*/ + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.collect_internel_ids(active_ids); + }, + UT_LOCATION_HERE); } - mutex_exit(&trx_sys->mutex); + bool monitor = srv_print_innodb_lock_monitor; + for (auto id : active_ids) { + trx_sys->latch_and_execute_with_active_trx(id, [&](trx_t *trx) { + if (trx == nullptr) return; + check_trx_state(trx); + lock_trx_print_wait_and_mvcc_state(file, trx); + + if (monitor) { + TrxLockIterator lock_iter; + //FIXME: load block is disabled + lock_trx_print_locks(file, trx, lock_iter, false); + } + }, UT_LOCATION_HERE); + } } #ifdef UNIV_DEBUG @@ -4887,13 +4837,8 @@ static bool lock_table_queue_validate( { /* We actually hold exclusive latch here, but we require just the shard */ ut_ad(locksys::owns_table_shard(*table)); - ut_ad(trx_sys_mutex_own()); for (auto lock : table->locks) { - /* lock->trx->state cannot change to NOT_STARTED until transaction released - its table locks and that is prevented here by the locksys shard's mutex. */ - ut_ad(trx_assert_started(lock->trx)); - if (!lock_get_wait(lock)) { ut_a(!lock_table_other_has_incompatible(lock->trx, 0, table, lock_get_mode(lock))); @@ -5074,13 +5019,9 @@ static void lock_rec_validate_page( mutex_exit(&trx_sys->mutex); } -/** Validates the table locks. */ -static void lock_validate_table_locks() { - /* We need exclusive access to lock_sys to iterate over trxs' locks */ - ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); - - for (const trx_t *trx : trx_sys->rw_trx_list) { +void Trx_by_id_with_min::validate_table_lock() { + for (auto item : m_by_id) { + const trx_t *trx = item.second; check_trx_state(trx); for (const lock_t *lock : trx->lock.trx_locks) { @@ -5091,6 +5032,21 @@ static void lock_validate_table_locks() { } } +/** Validates the table locks. */ +static void lock_validate_table_locks() { + /* We need exclusive access to lock_sys to iterate over trxs' locks */ + ut_ad(locksys::owns_exclusive_global_latch()); + + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.validate_table_lock(); + }, + UT_LOCATION_HERE); + } +} + /** Validate a record lock's block */ static void lock_rec_block_validate(const page_id_t &page_id) { /* The lock and the block that it is referring to may be freed at @@ -5128,7 +5084,6 @@ bool lock_validate() { /* lock_validate_table_locks() needs exclusive global latch, and we will inspect record locks from all shards */ locksys::Global_exclusive_latch_guard guard{UT_LOCATION_HERE}; - mutex_enter(&trx_sys->mutex); lock_validate_table_locks(); @@ -5141,8 +5096,6 @@ bool lock_validate() { pages.emplace(lock->rec_lock.page_id); return false; }); - - mutex_exit(&trx_sys->mutex); } std::for_each(pages.cbegin(), pages.cend(), lock_rec_block_validate); @@ -5930,7 +5883,6 @@ void lock_trx_release_locks(trx_t *trx) /*!< in/out: transaction */ check_trx_state(trx); ut_ad(trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)); - ut_ad(!trx->in_rw_trx_list); if (trx_is_referenced(trx)) { while (trx_is_referenced(trx)) { @@ -6003,20 +5955,9 @@ bool lock_cancel_if_waiting_and_release(const TrxVersion trx_version) { } #ifdef UNIV_DEBUG -/** Scans all locks of all transactions in the rw_trx_list searching for any -lock (table or rec) against the table. -@param[in] table the table for which we perform the search -@return lock if found */ -static const lock_t *lock_table_locks_lookup(const dict_table_t *table) { - ut_a(table != nullptr); - /* We are going to iterate over multiple transactions, so even though we know - which table we are looking for we can not narrow required latch to just the - shard which contains the table, because accessing trx->lock.trx_locks would be - unsafe */ - ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); - - for (auto trx : trx_sys->rw_trx_list) { +const lock_t * Trx_by_id_with_min::table_locks_lookup(const dict_table_t *table) { + for (auto item : m_by_id) { + trx_t *trx = item.second; check_trx_state(trx); for (auto lock : trx->lock.trx_locks) { @@ -6034,6 +5975,35 @@ static const lock_t *lock_table_locks_lookup(const dict_table_t *table) { } } + return nullptr; +} + +/** Scans all locks of all transactions in the rw_trx_list searching for any +lock (table or rec) against the table. +@param[in] table the table for which we perform the search +@return lock if found */ +static const lock_t *lock_table_locks_lookup(const dict_table_t *table) { + ut_a(table != nullptr); + /* We are going to iterate over multiple transactions, so even though we know + which table we are looking for we can not narrow required latch to just the + shard which contains the table, because accessing trx->lock.trx_locks would be + unsafe */ + ut_ad(locksys::owns_exclusive_global_latch()); + + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + const lock_t* lock = + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.table_locks_lookup(table); + }, + UT_LOCATION_HERE); + + if (lock != nullptr) { + return lock; + } + } + return (nullptr); } #endif /* UNIV_DEBUG */ diff --git a/storage/innobase/read/read0read.cc b/storage/innobase/read/read0read.cc index a918b11b06f..c07ffe660a5 100644 --- a/storage/innobase/read/read0read.cc +++ b/storage/innobase/read/read0read.cc @@ -36,6 +36,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "srv0srv.h" #include "trx0sys.h" +#include "trx0rec.h" +#include "trx0rseg.h" +#include "row0row.h" /* ------------------------------------------------------------------------------- @@ -182,6 +185,7 @@ will mark their views as closed but not actually free their views. /** Minimum number of elements to reserve in ReadView::ids_t */ static const ulint MIN_TRX_IDS = 32; +SCN_Mgr *scn_mgr = nullptr; #ifdef UNIV_DEBUG /** Functor to validate the view list. */ @@ -200,12 +204,10 @@ struct ViewCheck { /** Validates a read view list. */ -bool MVCC::validate() const { +bool MVCC::validate(uint64_t slot) const { ViewCheck check; - ut_ad(trx_sys_mutex_own()); - - ut_list_map(m_views, check); + ut_list_map(m_views[slot], check); return (true); } @@ -315,11 +317,17 @@ ReadView constructor */ ReadView::ReadView() : m_low_limit_id(), m_up_limit_id(), + m_real_up_limit_id(), m_creator_trx_id(), m_ids(), - m_low_limit_no() { + m_low_limit_no(), + m_trx(nullptr), + m_version(), + m_slot(MAX_SNAPSHOT_SIZE), + m_shared() { ut_d(::memset(&m_view_list, 0x0, sizeof(m_view_list))); ut_d(m_view_low_limit_no = 0); + m_long_running_ids.clear(); } /** @@ -330,29 +338,182 @@ ReadView::~ReadView() { /** Constructor @param size Number of views to pre-allocate */ -MVCC::MVCC(ulint size) : m_free(), m_views() { - for (ulint i = 0; i < size; ++i) { - ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); +MVCC::MVCC(ulint size) { - UT_LIST_ADD_FIRST(m_free, view); + m_slot_index = 0; + if (srv_mvcc_use_scn) { + m_slot_size = MAX_SNAPSHOT_SIZE; + } else { + m_slot_size = 1; + } + + m_free = new view_list_t[m_slot_size]; + m_views = new view_list_t[m_slot_size]; + m_mutexs = new ib_mutex_t[m_slot_size]; + + for (uint64_t slot = 0; slot < m_slot_size; slot++) { + UT_LIST_INIT(m_free[slot]); + UT_LIST_INIT(m_views[slot]); + + for (ulint i = 0; i < size; ++i) { + ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); + UT_LIST_ADD_FIRST(m_free[slot], view); + } + + mutex_create(LATCH_ID_TRX_SYS_MVCC, &(m_mutexs[slot])); } } MVCC::~MVCC() { - while (ReadView *view = UT_LIST_GET_FIRST(m_free)) { - UT_LIST_REMOVE(m_free, view); + for (uint64_t slot = 0; slot < m_slot_size; slot++) { + while (ReadView *view = UT_LIST_GET_FIRST(m_free[slot])) { + UT_LIST_REMOVE(m_free[slot], view); + + ut::delete_(view); + } + + ut_a(UT_LIST_GET_LEN(m_views[slot]) == 0); + + mutex_destroy(&(m_mutexs[slot])); + } + + delete [] m_free; + delete [] m_views; + delete [] m_mutexs; +} + +bool ReadView::is_index_visible(trx_id_t index_trx_id, const table_name_t &name) { + if (!srv_mvcc_use_scn || m_shared) { + return changes_visible(index_trx_id, name); + } + + if (index_trx_id == m_creator_trx_id) { + return true; + } + + if (m_invisible_ids.find(index_trx_id) != m_invisible_ids.end()) { + return false; + } + + if (index_trx_id < m_up_limit_id) { + if (m_long_running_ids.empty() + || index_trx_id < *(m_long_running_ids.begin()) + || index_trx_id > *(m_long_running_ids.rbegin())) { + return true; + } + } + + if (trx_rw_is_active(index_trx_id, false)) { + m_invisible_ids.insert(index_trx_id); + return false; + } + + return true; +} + +bool ReadView::changes_visible( + trx_id_t id, + const dict_index_t *index, + roll_ptr_t roll_ptr) { + if (srv_mvcc_use_scn) { + return sees_version(id, scn_mgr->get_scn(id, index, roll_ptr)); + } else { + /* Traditional MVCC */ + return changes_visible(id, index->table->name); + } +} + +bool ReadView::changes_visible( + const dict_index_t *index, + const rec_t *rec, + const ulint *offsets) { + + ut_a(index->is_clustered()); + ut_a(scn_mgr != nullptr); - ut::delete_(view); + if (index->table->is_temporary()) { + return true; } - ut_a(UT_LIST_GET_LEN(m_views) == 0); + /* Get transaction id from record */ + ulint offset = scn_mgr->scn_offset(index, offsets); + trx_id_t id = mach_read_from_6(rec + offset); + + if (!srv_mvcc_use_scn || m_shared) { + return changes_visible(id, index->table->name); + } + + /* Trx itself */ + if (id == m_creator_trx_id) { + return true; + } + + if (id < m_up_limit_id) { + if (m_long_running_ids.empty() + || id < *(m_long_running_ids.begin()) + || id > *(m_long_running_ids.rbegin())) { + return true; + } + + if (m_long_running_ids.find(id) != m_long_running_ids.end()) { + /* slow transaction check if it's active */ + if (m_invisible_ids.find(id) != m_invisible_ids.end()) { + return false; + } + + if (trx_rw_is_active(id, false)) { + m_invisible_ids.insert(id); + return false; + } + } else { + return true; + } + } + + if (id >= m_low_limit_id) { + return false; + } + + if (m_invisible_ids.find(id) != m_invisible_ids.end()) { + /* Not visible to current view */ + return false; + } + + trx_id_t committing_version = 0; + /* Get SCN from undo log */ + trx_id_t scn = scn_mgr->get_scn(id, index, row_get_rec_roll_ptr(rec, index, offsets), &committing_version); + + if (committing_version != 0 && committing_version < m_version) { + /* Consider such scenario: + - active trx: get trx->no = 5 + - open read view: version = 7 + - before committing trx completely: not visible + - after committing trx: visible because it's deregistered + and scn is written to undo (5 < 7) + + Problem: consistent read is broken, so we must + record such kind of scn and id */ + ut_a(scn == TRX_ID_MAX); + m_invisible_ids.insert(id); + + ut_a(committing_version >= m_low_limit_no); + } + + if (scn == TRX_ID_MAX) { + /* Still active, add id to invisible set */ + m_invisible_ids.insert(id); + return false; + } + + ut_a(scn > 0); + + return (sees_version(id, scn)); } /** Copy the transaction ids from the source vector */ void ReadView::copy_trx_ids(const trx_ids_t &trx_ids) { - ut_ad(trx_sys_mutex_own()); ulint size = trx_ids.size(); @@ -439,18 +600,13 @@ void ReadView::copy_trx_ids(const trx_ids_t &trx_ids) { #endif /* UNIV_DEBUG */ } -/** -Opens a read view where exactly the transactions serialized before this -point in time are seen in the view. -@param id Creator transaction id */ +void ReadView::prepare_old(trx_id_t id) { + ut_ad(!trx_sys_mutex_own()); -void ReadView::prepare(trx_id_t id) { - ut_ad(trx_sys_mutex_own()); + trx_sys_mutex_enter(); m_creator_trx_id = id; - m_low_limit_no = trx_get_serialisation_min_trx_no(); - m_low_limit_id = trx_sys_get_next_trx_id_or_no(); ut_a(m_low_limit_no <= m_low_limit_id); @@ -467,7 +623,102 @@ void ReadView::prepare(trx_id_t id) { ut_a(m_up_limit_id <= m_low_limit_id); ut_d(m_view_low_limit_no = m_low_limit_no); + m_closed = false; + + trx_sys_mutex_exit(); +} + +/** +Opens a read view where exactly the transactions serialized before this +point in time are seen in the view. +@param id Creator transaction id */ + +void ReadView::prepare_new(trx_id_t id) { + + m_creator_trx_id = id; + + m_long_running_ids.clear(); + + if (m_slot != MAX_SNAPSHOT_SIZE) { + //user session + scn_mgr->take_up_ids(m_up_limit_id, m_real_up_limit_id, m_long_running_ids); + } else { + m_up_limit_id = scn_mgr->min_active_id(); + } + + m_version = trx_sys_get_next_trx_id_or_no(); + m_low_limit_id = m_version; + + if (m_low_limit_no > m_version) { + m_low_limit_no = m_version; + } + + m_invisible_ids.clear(); + + if (m_shared) { + while (trx_sys_get_next_trx_id_or_no() + != trx_sys->next_trx_id_version.load()) { + ut_delay(1); + } + } + + if (m_shared || m_slot == MAX_SNAPSHOT_SIZE) { + /* This is a read view shared by multiple threads such as + select count(*), so we must prepare m_invisible_ids before hand */ + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_list_with_min) { + trx_list_with_min.collect_ids(m_invisible_ids); + }, UT_LOCATION_HERE); + } + } + + if (m_shared) { + if (!m_invisible_ids.empty()) { + trx_ids_t ids; + ids.clear(); + for (auto id : m_invisible_ids) { + ids.push_back(id); + } + + std::sort(ids.begin(), ids.end()); + auto itr = std::lower_bound(ids.begin(), ids.end(), m_low_limit_id); + if (itr != ids.end()) { + ids.erase(itr, ids.end()); + } + + if (!ids.empty()) { + copy_trx_ids(ids); + } else { + m_ids.clear(); + } + + m_up_limit_id = !m_ids.empty() ? m_ids.front() : m_low_limit_id; + } else { + m_up_limit_id = m_low_limit_id; + m_ids.clear(); + } + } + + ut_d(m_view_low_limit_no = m_low_limit_no); + m_closed = false; +} + +void ReadView::prepare(trx_id_t id) { + if (m_slot != MAX_SNAPSHOT_SIZE) { + //user session + m_low_limit_no = trx_get_serialisation_min_trx_no(); + } else { + m_low_limit_no = trx_sys_oldest_trx_no(); + } + + if (srv_mvcc_use_scn) { + prepare_new(id); + } else { + prepare_old(id); + } } /** @@ -475,14 +726,13 @@ Find a free view from the active list, if none found then allocate a new view. @return a view to use */ -ReadView *MVCC::get_view() { - ut_ad(trx_sys_mutex_own()); +ReadView *MVCC::get_view(uint64_t slot) { ReadView *view; - if (UT_LIST_GET_LEN(m_free) > 0) { - view = UT_LIST_GET_FIRST(m_free); - UT_LIST_REMOVE(m_free, view); + if (UT_LIST_GET_LEN(m_free[slot]) > 0) { + view = UT_LIST_GET_FIRST(m_free[slot]); + UT_LIST_REMOVE(m_free[slot], view); } else { view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); @@ -498,7 +748,7 @@ ReadView *MVCC::get_view() { @param view View owned by this class created for the caller. Must be freed by calling view_close() @param trx Transaction instance of caller */ -void MVCC::view_open(ReadView *&view, trx_t *trx) { +void MVCC::view_open(ReadView *&view, trx_t *trx, bool is_shared) { ut_ad(!srv_read_only_mode); /** If no new RW transaction has been started since the last view @@ -521,7 +771,8 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { if (trx_is_autocommit_non_locking(trx) && view->empty()) { view->m_closed = false; - if (view->m_low_limit_id == trx_sys_get_next_trx_id_or_no()) { + if (view->m_low_limit_id == trx_sys_get_next_trx_id_or_no() + && (view->m_shared == is_shared)) { return; } else { view->m_closed = true; @@ -529,38 +780,96 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { } } - trx_sys_mutex_enter(); + uint64_t slot; + if (view != nullptr) { + slot = view->get_slot(); + } else { + slot = get_slot(); + } + + enter(slot); if (view != nullptr) { - UT_LIST_REMOVE(m_views, view); + UT_LIST_REMOVE(m_views[slot], view); } else { - view = get_view(); + view = get_view(slot); } if (view != nullptr) { + view->set_slot(slot); + + view->m_shared = is_shared; + view->prepare(trx->id); - UT_LIST_ADD_FIRST(m_views, view); + UT_LIST_ADD_FIRST(m_views[slot], view); ut_ad(!view->is_closed()); - - ut_ad(validate()); } - trx_sys_mutex_exit(); + exit(slot); } /** Get the oldest (active) view in the system. @return oldest view if found or NULL */ +void MVCC::get_oldest_version(ReadView *purge_view) { + trx_id_t purge_version = std::min(purge_view->version(), purge_view->low_limit_no()); + trx_id_t purge_low_id = purge_view->low_limit_id(); + trx_id_t purge_up_limit_id = purge_view->up_limit_id(); + + for (uint64_t i = 0; i < m_slot_size; i++) { + enter(i); + for (ReadView* view = UT_LIST_GET_LAST(m_views[i]); view != nullptr; + view = UT_LIST_GET_PREV(m_view_list, view)) { + if (view->is_closed()) { + continue; + } + + trx_id_t min_no = std::min(view->low_limit_no(), view->version()); + if (min_no < purge_version) { + purge_version = min_no; + } + + if (view->low_limit_id() < purge_low_id) { + purge_low_id = view->low_limit_id(); + } + + if (view->real_up_limit_id() < purge_up_limit_id) { + purge_up_limit_id = view->real_up_limit_id(); + } + + break; + } + + exit(i); + } + purge_view->copy_prepare(purge_version, purge_low_id, purge_up_limit_id); +} + +/** +@param other view to copy from */ + +void ReadView::copy_prepare(trx_id_t version, trx_id_t low_id, trx_id_t up_id) { + ut_a(up_id <= low_id); + + m_up_limit_id = up_id; + m_low_limit_no = version; + m_version = version; + m_low_limit_id = low_id; +} + +/** +Get the oldest (active) view in the system. +@return oldest view if found or NULL */ ReadView *MVCC::get_oldest_view() const { ReadView *view; + ut_a(!srv_mvcc_use_scn); + ut_a(m_slot_size == 1); - ut_ad(trx_sys_mutex_own()); - - for (view = UT_LIST_GET_LAST(m_views); view != nullptr; + for (view = UT_LIST_GET_LAST(m_views[0]); view != nullptr; view = UT_LIST_GET_PREV(m_view_list, view)) { if (!view->is_closed()) { break; @@ -625,22 +934,24 @@ m_free list. This function is called by Purge to determine whether it should purge the delete marked record or not. @param view Preallocated view, owned by the caller */ void MVCC::clone_oldest_view(ReadView *view) { - trx_sys_mutex_enter(); - ReadView *oldest_view = get_oldest_view(); - - if (oldest_view == nullptr) { + if (srv_mvcc_use_scn) { view->prepare(0); - - trx_sys_mutex_exit(); - + get_oldest_version(view); } else { - view->copy_prepare(*oldest_view); - - trx_sys_mutex_exit(); + ut_a(m_slot_size == 1); + enter(0); + ReadView *oldest_view = get_oldest_view(); + if (oldest_view != nullptr) { + view->copy_prepare(*oldest_view); + view->copy_complete(); + } else { + view->prepare(0); + } - view->copy_complete(); + exit(0); } + /* Update view to block purging transaction till GTID is persisted. */ auto >id_persistor = clone_sys->get_gtid_persistor(); auto gtid_oldest_trxno = gtid_persistor.get_oldest_trx_no(); @@ -650,19 +961,19 @@ void MVCC::clone_oldest_view(ReadView *view) { /** @return the number of active views */ -ulint MVCC::size() const { - trx_sys_mutex_enter(); - +ulint MVCC::size() { ulint size = 0; - for (const ReadView *view : m_views) { - if (!view->is_closed()) { - ++size; + for (uint64_t i = 0; i < m_slot_size; i++) { + enter(i); + for (const ReadView *view : m_views[i]) { + if (!view->is_closed()) { + ++size; + } } + exit(i); } - trx_sys_mutex_exit(); - return (size); } @@ -674,6 +985,8 @@ Close a view created by the above function. void MVCC::view_close(ReadView *&view, bool own_mutex) { uintptr_t p = reinterpret_cast(view); + ut_ad(!trx_sys_mutex_own()); + /* Note: The assumption here is that AC-NL-RO transactions will call this function with own_mutex == false. */ if (!own_mutex) { @@ -688,14 +1001,335 @@ void MVCC::view_close(ReadView *&view, bool own_mutex) { view = reinterpret_cast(p | 0x1); } else { view = reinterpret_cast(p & ~1); + auto slot = view->get_slot(); + enter(slot); view->close(); - UT_LIST_REMOVE(m_views, view); - UT_LIST_ADD_LAST(m_free, view); + UT_LIST_REMOVE(m_views[slot], view); + UT_LIST_ADD_LAST(m_free[slot], view); + + ut_ad(validate(slot)); - ut_ad(validate()); + exit(slot); view = nullptr; } } + +Scn_Map::Scn_Map() { + m_size = SCN_MAP_MAX_SIZE; + m_datas = new Trx_seq_with_lock[m_size]; + ut_a(m_datas != nullptr); +} + +Scn_Map::~Scn_Map() { + delete[] m_datas; +} + +SCN_Mgr::SCN_Mgr() { + m_scn_map = new Scn_Map(); + m_random_map = new Scn_Map(); + m_startup_id = 0; + m_startup_scn = 0; + m_abort = false; + m_view_active = false; + m_min_active_id = 0; + m_fast_min_active_id = 0; + m_long_running_ids.clear(); + m_has_slow_ids = false; + m_view_event = os_event_create(); + os_event_reset(m_view_event); + m_lock = static_cast( + ut::malloc_withkey(UT_NEW_THIS_FILE_PSI_KEY, sizeof(*m_lock))); + rw_lock_create(PFS_NOT_INSTRUMENTED, m_lock, LATCH_ID_SCN_MGR_LOCK); +} + +SCN_Mgr::~SCN_Mgr() { + delete m_scn_map; + delete m_random_map; + os_event_destroy(m_view_event); + rw_lock_free(m_lock); + ut::free(m_lock); +} + +/** Init scn/id on startup */ +void SCN_Mgr::init() { + m_startup_scn = trx_sys->next_trx_id_or_no.load(); + + trx_id_t current_min_id = trx_sys->next_trx_id_or_no.load(); + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + trx_id_t min_id = shard.active_rw_trxs.execute_no_latch( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return (trx_by_id_with_min.min_id());}); + + if (current_min_id > min_id && min_id != 0) { + current_min_id = min_id; + } + } + + m_startup_id = current_min_id; + + m_min_active_id = m_startup_id; + + m_fast_min_active_id = m_startup_id; + + m_startup_scn = current_min_id; + if (m_startup_scn > 0) m_startup_scn--; + + purge_sys->min_up_id = m_startup_id; +} + +trx_id_t SCN_Mgr::get_scn_fast(trx_id_t id, trx_id_t *committing_version) { + trx_id_t scn; + if (id < purge_sys->min_up_id.load(std::memory_order_relaxed) || id == 0) { + /* Too old transaction, and it should be visible to all + sessions, so it's safe to give it a fake SCN */ + scn = purge_sys->version.load(std::memory_order_relaxed) - 1; + } else { + scn = m_scn_map->read(id); + if (scn == 0) { + scn = m_random_map->read(id); + } + + if (scn == 0) { + /* Check if transaction is still active */ + trx_t *trx = trx_rw_is_active(id, true); + if (trx != nullptr) { + /* transaction is still active */ + if (committing_version == nullptr) { + trx_release_reference(trx); + return TRX_ID_MAX; + } + + /* Get scn from trx if trx->no is set */ + mutex_enter(&trx->scn_mutex); + trx_id_t version = trx->no; + mutex_exit(&trx->scn_mutex); + + trx_release_reference(trx); + + if (version != TRX_ID_MAX) { + *committing_version = version; + } + + return TRX_ID_MAX; + } + } + } + + return scn; +} + +trx_id_t SCN_Mgr::get_scn(trx_id_t id, + const dict_index_t *index, + roll_ptr_t roll_ptr, trx_id_t *committing_version) { + if (index->table->is_temporary()) { + return TRX_ID_MAX; + } + + trx_id_t scn = get_scn_fast(id, committing_version); + + if (scn == TRX_ID_MAX) { + /* Transaction is still active */ + return TRX_ID_MAX; + } + + if (scn != 0) { + return scn; + } + + /* Slow path */ + scn = trx_undo_get_scn(index, roll_ptr, id); + if (scn > 0) { + m_random_map->store(id, scn); + } + + ut_a(scn < trx_sys->next_trx_id_or_no.load()); + + if (scn == 0) { + return TRX_ID_MAX; + } + + return scn; +} + +ulint SCN_Mgr::scn_offset(const dict_index_t *index, const ulint *offsets) { + ulint offset = index->trx_id_offset; + + if (!offset) { + offset = row_get_trx_id_offset(index, offsets); + } + + return offset; +} + +void SCN_Mgr::take_up_ids(trx_id_t &up_id, trx_id_t &real_id, trx_ids_set_t &slow_set) { + real_id = m_min_active_id.load(); + + if (!m_has_slow_ids.load(std::memory_order_relaxed)) { + up_id = real_id; + return; + } + + s_lock(); + if (m_has_slow_ids.load()) { + ut_ad(!m_long_running_ids.empty()); + slow_set = m_long_running_ids; + up_id = m_fast_min_active_id; + } else { + up_id = real_id; + } + + s_unlock(); +} + +void SCN_Mgr::set_view_id(trx_id_t limit_id) { + trx_id_t current_min_id = limit_id; + trx_id_t current_fast_min_id = limit_id; + + trx_ids_set_t slow_set; + slow_set.clear(); + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + trx_id_t min_id = 0; + trx_id_t fast_min_id = 0; + std::unordered_set shard_slow_set; + shard_slow_set.clear(); + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.get_min_with_slow(min_id, fast_min_id, shard_slow_set, limit_id);}, UT_LOCATION_HERE); + + if (current_min_id > min_id && min_id != 0) { + current_min_id = min_id; + } + + if (current_fast_min_id > fast_min_id && fast_min_id != 0) { + current_fast_min_id = fast_min_id; + } + + for (auto id : shard_slow_set) { + slow_set.insert(id); + } + } + + /* Erase ids larger thna m_fast_min_active_id from set if it has */ + auto itr = slow_set.upper_bound(current_fast_min_id); + if (itr != slow_set.end()) { + slow_set.erase(itr, slow_set.end()); + } +#ifdef UNIV_DEBUG + ib::info() << "current min id " << current_min_id << " fast min id " << current_fast_min_id; + if (!slow_set.empty()) { + fprintf(stderr, "slow ids: "); + for (auto id : slow_set) { + fprintf(stderr, "%lu ", id); + } + fprintf(stderr, "\n"); + } +#endif + + if (slow_set.empty()) { + m_has_slow_ids = false; + if (!m_long_running_ids.empty()) { + x_lock(); + m_long_running_ids.clear(); + x_unlock(); + } + } else { + bool is_set_changed = false; + if (m_long_running_ids.size() == slow_set.size()) { + /* This is the only thread that changes m_long_running_ids, + so it's safe to iterate without lock */ + for (auto id : m_long_running_ids) { + if (slow_set.find(id) == slow_set.end()) { + is_set_changed = true; + break; + } + } + } else { + is_set_changed = true; + } + + if (is_set_changed) { + x_lock(); + m_long_running_ids.swap(slow_set); + m_fast_min_active_id = current_fast_min_id; + x_unlock(); + m_has_slow_ids = true; + } else { + ut_ad(m_has_slow_ids.load()); + } + } + + m_min_active_id = current_min_id; + m_fast_min_active_id = current_fast_min_id; + ut_ad(m_fast_min_active_id.load() >= m_min_active_id.load()); +} + +void SCN_Mgr::set_view_no(trx_id_t limit_id) { + trx_id_t current_min_no = limit_id; + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_commit_shard &shard = trx_sys->commit_shards[i]; + trx_id_t min_no = shard.commit_rw_trxs.execute_no_latch( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + return (trx_list_with_min.min_id());}); + + if (current_min_no > min_no && min_no != 0) { + current_min_no = min_no; + } + } + + if (trx_sys->serialisation_min_trx_no < current_min_no) { + trx_sys->serialisation_min_trx_no = current_min_no; + srv_purge_wakeup(); + } +} + +void SCN_Mgr::view_task() { + m_view_active = true; + + while (!m_abort.load()) { + uint64_t sig_counter = os_event_reset(m_view_event); + trx_id_t limit_id = trx_sys_get_next_trx_id_or_no(); + while (trx_sys_get_next_trx_id_or_no() + != trx_sys->next_trx_id_version.load()) { + ut_delay(1); + } + + if (srv_mvcc_use_scn) { + set_view_id(limit_id); + } + + set_view_no(limit_id); + + os_event_wait_time_low(m_view_event, std::chrono::seconds{1}, sig_counter); + } + + m_view_active = false; +} + +void run_view_task() { + scn_mgr->view_task(); +} + +void SCN_Mgr::start() { + m_abort = false; + std::thread th(run_view_task); + th.detach(); + while (!m_view_active.load()) { + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + +void SCN_Mgr::stop() { + m_abort = true; + while (m_view_active.load()) { + os_event_set(m_view_event); + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc index 5cf4ff3bf5c..bc623a4ad59 100644 --- a/storage/innobase/row/row0mysql.cc +++ b/storage/innobase/row/row0mysql.cc @@ -4570,7 +4570,7 @@ dberr_t row_scan_index_for_mysql(row_prebuilt_t *prebuilt, dict_index_t *index, /* No INSERT INTO ... SELECT and non-locking selects only. */ trx_start_if_not_started_xa(prebuilt->trx, false, UT_LOCATION_HERE); - trx_assign_read_view(prebuilt->trx); + trx_assign_read_view(prebuilt->trx, true); auto trx = prebuilt->trx; diff --git a/storage/innobase/row/row0pread.cc b/storage/innobase/row/row0pread.cc index 1b5766614f6..83effeb21b5 100644 --- a/storage/innobase/row/row0pread.cc +++ b/storage/innobase/row/row0pread.cc @@ -467,8 +467,6 @@ bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { - const auto table_name = m_config.m_index->table->name; - ut_ad(!m_trx || m_trx->read_view == nullptr || MVCC::is_view_active(m_trx->read_view)); @@ -478,16 +476,8 @@ bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, auto view = m_trx->read_view; if (m_config.m_index->is_clustered()) { - trx_id_t rec_trx_id; - - if (m_config.m_index->trx_id_offset > 0) { - rec_trx_id = trx_read_trx_id(rec + m_config.m_index->trx_id_offset); - } else { - rec_trx_id = row_get_rec_trx_id(rec, m_config.m_index, offsets); - } - if (m_trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && - !view->changes_visible(rec_trx_id, table_name)) { + !view->changes_visible(m_config.m_index, rec, offsets)) { rec_t *old_vers; row_vers_build_for_consistent_read(rec, mtr, m_config.m_index, &offsets, diff --git a/storage/innobase/row/row0vers.cc b/storage/innobase/row/row0vers.cc index 1573601d488..fb680279eec 100644 --- a/storage/innobase/row/row0vers.cc +++ b/storage/innobase/row/row0vers.cc @@ -587,6 +587,11 @@ bool row_vers_must_preserve_del_marked(trx_id_t trx_id, mtr_s_lock(&purge_sys->latch, mtr, UT_LOCATION_HERE); + if (srv_mvcc_use_scn) { + //FIXME + return true; + } + return (!purge_sys->view.changes_visible(trx_id, name)); } @@ -1254,7 +1259,6 @@ dberr_t row_vers_build_for_consistent_read( DBUG_TRACE; const rec_t *version; rec_t *prev_version; - trx_id_t trx_id; mem_heap_t *heap = nullptr; byte *buf; dberr_t err; @@ -1266,14 +1270,12 @@ dberr_t row_vers_build_for_consistent_read( ut_ad(rec_offs_validate(rec, index, *offsets)); - trx_id = row_get_rec_trx_id(rec, index, *offsets); - /* Reset the collected LOB undo information. */ if (lob_undo != nullptr) { lob_undo->reset(); } - ut_ad(!view->changes_visible(trx_id, index->table->name)); + ut_ad(!view->changes_visible(index, rec, *offsets)); ut_ad(!vrow || !(*vrow)); @@ -1315,9 +1317,7 @@ dberr_t row_vers_build_for_consistent_read( ut_a(!rec_offs_any_null_extern(index, prev_version, *offsets)); #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ - trx_id = row_get_rec_trx_id(prev_version, index, *offsets); - - if (view->changes_visible(trx_id, index->table->name)) { + if (view->changes_visible(index, prev_version, *offsets)) { /* The view already sees this version: we can copy it to in_heap and return */ diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index fb852a6fcd8..2c7aa46c506 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -553,6 +553,9 @@ bool srv_cmp_per_index_enabled = false; /** If innodb redo logging is enabled. */ bool srv_redo_log = true; +/* If using scn for MVCC */ +bool srv_mvcc_use_scn = true; + /** The value of the configuration parameter innodb_fast_shutdown, controlling the InnoDB shutdown. @@ -1747,10 +1750,8 @@ void srv_export_innodb_status(void) { rw_lock_s_unlock(&purge_sys->latch); - trx_sys_serialisation_mutex_enter(); /* Maximum transaction number added to history list for purge. */ trx_id_t max_trx_no = trx_sys->rw_max_trx_no; - trx_sys_serialisation_mutex_exit(); if (done_trx_no == 0 || max_trx_no < done_trx_no) { export_vars.innodb_purge_trx_id_age = 0; diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index 93f0a552a46..8dc4df25ff8 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -1779,6 +1779,8 @@ dberr_t srv_start(bool create_new_db) { trx_sys_create(); lock_sys_create(srv_lock_table_size); + scn_mgr = new SCN_Mgr(); + /* Create i/o-handler threads: */ os_aio_start_threads(); @@ -2166,6 +2168,8 @@ dberr_t srv_start(bool create_new_db) { therefore requires that the trx_sys is inited. */ purge_queue = trx_sys_init_at_db_start(); + scn_mgr->start(); + if (srv_is_upgrade_mode) { if (!purge_queue->empty()) { ib::info(ER_IB_MSG_1144); @@ -2951,6 +2955,9 @@ void srv_shutdown() { ut_a(!srv_is_being_started); + /* Persist max trx id on shutdown */ + trx_sys_write_max_trx_id(); + /* Ensure threads below have been stopped. */ const auto threads_stopped_before_shutdown = { std::cref(srv_threads.m_purge_coordinator), @@ -2994,6 +3001,10 @@ void srv_shutdown() { buffer pool to disk. */ dict_persist_to_dd_table_buffer(); + if (scn_mgr != nullptr) { + scn_mgr->stop(); + } + /* The steps 1-4 is the real InnoDB shutdown. All before was to stop activity which could produce new changes. All after is just cleaning up (freeing memory). */ @@ -3058,6 +3069,11 @@ void srv_shutdown() { lock_sys_close(); trx_pool_close(); + if (scn_mgr != nullptr) { + delete scn_mgr; + scn_mgr = nullptr; + } + dict_close(); dict_persist_close(); undo_spaces_deinit(); diff --git a/storage/innobase/sync/sync0debug.cc b/storage/innobase/sync/sync0debug.cc index 77f1af2fe54..a13f174e91d 100644 --- a/storage/innobase/sync/sync0debug.cc +++ b/storage/innobase/sync/sync0debug.cc @@ -1363,6 +1363,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(TRX_UNDO, SYNC_TRX_UNDO, trx_undo_mutex_key); + LATCH_ADD_MUTEX(TRX_SCN, SYNC_NO_ORDER_CHECK, trx_scn_mutex_key); + LATCH_ADD_MUTEX(TRX_POOL, SYNC_POOL, trx_pool_mutex_key); LATCH_ADD_MUTEX(TRX_POOL_MANAGER, SYNC_POOL_MANAGER, @@ -1388,6 +1390,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(TRX_SYS_SERIALISATION, SYNC_TRX_SYS_SERIALISATION, trx_sys_serialisation_mutex_key); + LATCH_ADD_MUTEX(TRX_SYS_MVCC, SYNC_NO_ORDER_CHECK, trx_sys_mvcc_mutex_key); + LATCH_ADD_MUTEX(SRV_SYS, SYNC_THREADS, srv_sys_mutex_key); LATCH_ADD_MUTEX(SRV_SYS_TASKS, SYNC_ANY_LATCH, srv_threads_mutex_key); @@ -1430,6 +1434,9 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_RWLOCK(BUF_BLOCK_LOCK, SYNC_LEVEL_VARYING, PFS_NOT_INSTRUMENTED); #endif /* !PFS_SKIP_BUFFER_MUTEX_RWLOCK */ + LATCH_ADD_RWLOCK(CLONE_REPL_LOCK, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); + + LATCH_ADD_RWLOCK(SCN_MGR_LOCK, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); #ifdef UNIV_DEBUG LATCH_ADD_RWLOCK(BUF_BLOCK_DEBUG, SYNC_NO_ORDER_CHECK, buf_block_debug_latch_key); @@ -1494,6 +1501,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(DBLWR, SYNC_DBLWR, dblwr_mutex_key); + LATCH_ADD_MUTEX(CLONE_REPL_MUTEX, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); + LATCH_ADD_MUTEX(TEST_MUTEX, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); latch_id_t id = LATCH_ID_NONE; diff --git a/storage/innobase/sync/sync0sync.cc b/storage/innobase/sync/sync0sync.cc index f14e5b309e2..54e46763110 100644 --- a/storage/innobase/sync/sync0sync.cc +++ b/storage/innobase/sync/sync0sync.cc @@ -123,6 +123,7 @@ mysql_pfs_key_t srv_monitor_file_mutex_key; mysql_pfs_key_t sync_thread_mutex_key; #endif /* UNIV_DEBUG */ mysql_pfs_key_t trx_undo_mutex_key; +mysql_pfs_key_t trx_scn_mutex_key; mysql_pfs_key_t trx_mutex_key; mysql_pfs_key_t trx_pool_mutex_key; mysql_pfs_key_t trx_pool_manager_mutex_key; @@ -133,6 +134,7 @@ mysql_pfs_key_t lock_wait_mutex_key; mysql_pfs_key_t trx_sys_mutex_key; mysql_pfs_key_t trx_sys_shard_mutex_key; mysql_pfs_key_t trx_sys_serialisation_mutex_key; +mysql_pfs_key_t trx_sys_mvcc_mutex_key; mysql_pfs_key_t srv_sys_mutex_key; mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/storage/innobase/trx/trx0i_s.cc b/storage/innobase/trx/trx0i_s.cc index 8fde00c3d61..e0d0f494df2 100644 --- a/storage/innobase/trx/trx0i_s.cc +++ b/storage/innobase/trx/trx0i_s.cc @@ -709,6 +709,62 @@ static void trx_i_s_cache_clear( ha_storage_empty(&cache->storage); } +bool fetch_trx_data_into_cache(trx_i_s_cache_t *cache, trx_t* trx, bool read_only) { + i_s_trx_row_t *trx_row; + + trx_mutex_enter(trx); + + /* Note: Read only transactions that modify temporary + tables have a transaction ID. + + Note: auto-commit non-locking read-only transactions + can have trx->state set from NOT_STARTED to ACTIVE and + then from ACTIVE to NOT_STARTED with neither trx_sys->mutex + nor trx->mutex acquired. However, as long as these transactions + are members of mysql_trx_list they are not freed. For such + transactions "trx_was_started(trx)" might be considered random, + but whatever is its result, the code below handles that well + (transaction won't release locks until its trx->mutex is acquired). + + Note: locking read-only transactions can have trx->state set from + NOT_STARTED to ACTIVE with neither trx_sys->mutex nor trx->mutex + acquired. However, such transactions need to be marked as COMMITTED + before trx->state is set to NOT_STARTED and that is protected by the + trx->mutex. Therefore the assertion assert_trx_nonlocking_or_in_list() + should hold few lines below (note: the name of the assertion is wrong, + because it actually checks if the transaction is autocommit nonlocking, + whereas its name suggests that it only checks if the trx is nonlocking). */ + if (!trx_was_started(trx) || + (read_only && trx->id != 0 && !trx->read_only)) { + trx_mutex_exit(trx); + return true; + } + + assert_trx_nonlocking_or_in_list(trx); + + trx_row = reinterpret_cast( + table_cache_create_empty_row(&cache->innodb_trx, cache)); + + /* memory could not be allocated */ + if (trx_row == nullptr) { + cache->is_truncated = true; + trx_mutex_exit(trx); + return false; + } + + if (!fill_trx_row(trx_row, trx, cache)) { + /* memory could not be allocated */ + --cache->innodb_trx.rows_used; + cache->is_truncated = true; + trx_mutex_exit(trx); + return false; + } + + trx_mutex_exit(trx); + + return true; +} + /** Fetches the data needed to fill the 3 INFORMATION SCHEMA tables into the table cache buffer. Cache must be locked for write. @param[in,out] cache the cache @@ -720,73 +776,38 @@ static void fetch_data_into_cache_low(trx_i_s_cache_t *cache, /* We are going to iterate over many different shards of lock_sys so we need exclusive access */ ut_ad(locksys::owns_exclusive_global_latch()); - constexpr bool rw_trx_list = - std::is_samerw_trx_list)>::value; - - static_assert( - rw_trx_list || - std::is_samemysql_trx_list)>::value, - "only rw_trx_list and mysql_trx_list are supported"); /* Iterate over the transaction list and add each one to innodb_trx's cache. We also add all locks that are relevant to each transaction into innodb_locks' and innodb_lock_waits' caches. */ - for (auto trx : *trx_list) { - i_s_trx_row_t *trx_row; - - trx_mutex_enter(trx); - - /* Note: Read only transactions that modify temporary - tables have a transaction ID. - - Note: auto-commit non-locking read-only transactions - can have trx->state set from NOT_STARTED to ACTIVE and - then from ACTIVE to NOT_STARTED with neither trx_sys->mutex - nor trx->mutex acquired. However, as long as these transactions - are members of mysql_trx_list they are not freed. For such - transactions "trx_was_started(trx)" might be considered random, - but whatever is its result, the code below handles that well - (transaction won't release locks until its trx->mutex is acquired). - - Note: locking read-only transactions can have trx->state set from - NOT_STARTED to ACTIVE with neither trx_sys->mutex nor trx->mutex - acquired. However, such transactions need to be marked as COMMITTED - before trx->state is set to NOT_STARTED and that is protected by the - trx->mutex. Therefore the assertion assert_trx_nonlocking_or_in_list() - should hold few lines below (note: the name of the assertion is wrong, - because it actually checks if the transaction is autocommit nonlocking, - whereas its name suggests that it only checks if the trx is nonlocking). */ - if (!trx_was_started(trx) || - (!rw_trx_list && trx->id != 0 && !trx->read_only)) { - trx_mutex_exit(trx); - continue; - } - - assert_trx_nonlocking_or_in_list(trx); - - ut_ad(trx->in_rw_trx_list == rw_trx_list); - - trx_row = reinterpret_cast( - table_cache_create_empty_row(&cache->innodb_trx, cache)); - - /* memory could not be allocated */ - if (trx_row == nullptr) { - cache->is_truncated = true; - trx_mutex_exit(trx); + if (!fetch_trx_data_into_cache(cache, trx, true)) { return; } + } +} - if (!fill_trx_row(trx_row, trx, cache)) { - /* memory could not be allocated */ - --cache->innodb_trx.rows_used; - cache->is_truncated = true; - trx_mutex_exit(trx); - return; +bool Trx_by_id_with_min::fetch_data(trx_i_s_cache_t *cache) { + for (auto item : m_by_id) { + trx_t *trx = item.second; + if (!fetch_trx_data_into_cache(cache, trx, false)) { + return false; } + } - trx_mutex_exit(trx); + return true; +} + +static void fetch_data_into_cache_from_rw_shard(trx_i_s_cache_t *cache) { + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + if (!shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.fetch_data(cache); + }, UT_LOCATION_HERE)) { + break; + } } } @@ -797,16 +818,17 @@ static void fetch_data_into_cache(trx_i_s_cache_t *cache) /*!< in/out: cache */ /* We are going to iterate over many different shards of lock_sys so we need exclusive access */ ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); trx_i_s_cache_clear(cache); /* Capture the state of the read-write transactions. This includes internal transactions too. They are not on mysql_trx_list */ - fetch_data_into_cache_low(cache, &trx_sys->rw_trx_list); + fetch_data_into_cache_from_rw_shard(cache); + trx_sys_mutex_enter(); /* Capture the state of the read-only active transactions */ fetch_data_into_cache_low(cache, &trx_sys->mysql_trx_list); + trx_sys_mutex_exit(); cache->is_truncated = false; } @@ -825,11 +847,7 @@ int trx_i_s_possibly_fetch_data_into_cache( /* We need to read trx_sys and record/table lock queues */ locksys::Global_exclusive_latch_guard guard{UT_LOCATION_HERE}; - trx_sys_mutex_enter(); - fetch_data_into_cache(cache); - - trx_sys_mutex_exit(); } return (0); diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index cbbac9eb59f..68f9e33614a 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -263,6 +263,10 @@ void trx_purge_sys_initialize(uint32_t n_purge_threads, trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + purge_sys->version = purge_sys->view.version(); + + purge_sys->min_up_id = purge_sys->view.up_limit_id(); + purge_sys->rseg_iter = ut::new_withkey( UT_NEW_THIS_FILE_PSI_KEY, purge_sys); } @@ -312,7 +316,7 @@ void trx_purge_sys_close() { /** Adds the update undo log as the first log in the history list. Removes the update undo log segment from the rseg slot if it is too big for reuse. */ -void trx_purge_add_update_undo_to_history( +void trx_purge_add_undo_to_history( trx_t *trx, /*!< in: transaction */ trx_undo_ptr_t *undo_ptr, /*!< in/out: update undo log. */ page_t *undo_page, /*!< in: update undo log header page, @@ -320,6 +324,7 @@ void trx_purge_add_update_undo_to_history( bool update_rseg_history_len, /*!< in: if true: update rseg history len else skip updating it. */ + bool is_insert, /*!< in: true if it's insert undo */ ulint n_added_logs, /*!< in: number of logs added */ mtr_t *mtr) /*!< in: mtr */ { @@ -328,7 +333,14 @@ void trx_purge_add_update_undo_to_history( trx_rsegf_t *rseg_header; trx_ulogf_t *undo_header; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + + ut_a(undo != nullptr); + rseg = undo->rseg; rseg_header = trx_rsegf_get(undo->rseg->space_id, undo->rseg->page_no, @@ -362,8 +374,14 @@ void trx_purge_add_update_undo_to_history( } /* Add the log as the first in the history list */ - flst_add_first(rseg_header + TRX_RSEG_HISTORY, - undo_header + TRX_UNDO_HISTORY_NODE, mtr); + if (is_insert) { + ut_a(srv_mvcc_use_scn); + flst_add_first(rseg_header + TRX_RSEG_INSERT_HISTORY, + undo_header + TRX_UNDO_HISTORY_NODE, mtr); + } else { + flst_add_first(rseg_header + TRX_RSEG_HISTORY, + undo_header + TRX_UNDO_HISTORY_NODE, mtr); + } if (update_rseg_history_len) { trx_sys->rseg_history_len.fetch_add(n_added_logs); @@ -379,6 +397,9 @@ void trx_purge_add_update_undo_to_history( /* Write the trx number to the undo log header */ mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr); + mlog_write_ull(rseg_header + TRX_RSEG_MAX_TRX_ID, + trx_sys_get_next_trx_id_or_no(), mtr); + /* Write information about delete markings to the undo log header */ if (!undo->del_marks) { @@ -388,7 +409,7 @@ void trx_purge_add_update_undo_to_history( /* Write GTID information if there. */ trx_undo_gtid_write(trx, undo_header, undo, mtr, false); - if (rseg->last_page_no == FIL_NULL) { + if (!is_insert && rseg->last_page_no == FIL_NULL) { rseg->last_page_no = undo->hdr_page_no; rseg->last_offset = undo->hdr_offset; rseg->last_trx_no = trx->no; @@ -401,9 +422,14 @@ void trx_purge_add_update_undo_to_history( @param[in] log_hdr Undo log segment header @param[in,out] mtr Mini-transaction. */ static void trx_purge_remove_log_hdr(trx_rsegf_t *rseg_hdr, - trx_ulogf_t *log_hdr, mtr_t *mtr) { - flst_remove(rseg_hdr + TRX_RSEG_HISTORY, log_hdr + TRX_UNDO_HISTORY_NODE, - mtr); + trx_ulogf_t *log_hdr, mtr_t *mtr, bool is_insert) { + if (is_insert) { + flst_remove(rseg_hdr + TRX_RSEG_INSERT_HISTORY, log_hdr + TRX_UNDO_HISTORY_NODE, + mtr); + } else { + flst_remove(rseg_hdr + TRX_RSEG_HISTORY, log_hdr + TRX_UNDO_HISTORY_NODE, + mtr); + } trx_sys->rseg_history_len.fetch_sub(1); } @@ -414,7 +440,7 @@ Removes the rseg hdr from the history list. @param[in] hdr_addr file address of log_hdr @param[in] noredo skip redo logging. */ static void trx_purge_free_segment(trx_rseg_t *rseg, fil_addr_t hdr_addr, - bool noredo) { + bool noredo, bool is_insert) { mtr_t mtr; trx_rsegf_t *rseg_hdr; trx_ulogf_t *log_hdr; @@ -475,7 +501,7 @@ static void trx_purge_free_segment(trx_rseg_t *rseg, fil_addr_t hdr_addr, history list: otherwise, in case of a database crash, the segment could become inaccessible garbage in the file space. */ - trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr); + trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr, is_insert); do { /* Here we assume that a file segment with just the header @@ -523,10 +549,27 @@ static void trx_purge_truncate_rseg_history( rseg_hdr = trx_rsegf_get(rseg->space_id, rseg->page_no, rseg->page_size, &mtr); + bool is_insert_history = false; + hdr_addr = trx_purge_get_log_from_hist( flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr)); loop: if (hdr_addr.page == FIL_NULL) { + if (!is_insert_history + && !is_temp + && flst_get_len(rseg_hdr + TRX_RSEG_INSERT_HISTORY) > 0) { + is_insert_history = true; + mtr_commit(&mtr); + rseg->unlatch(); + mtr_start(&mtr); + rseg->latch(); + rseg_hdr = + trx_rsegf_get(rseg->space_id, rseg->page_no, rseg->page_size, &mtr); + hdr_addr = trx_purge_get_log_from_hist( + flst_get_last(rseg_hdr + TRX_RSEG_INSERT_HISTORY, &mtr)); + goto loop; + } + rseg->unlatch(); mtr_commit(&mtr); @@ -550,6 +593,11 @@ loop: limit->undo_no); } + if (!is_insert_history && !is_temp) { + hdr_addr.page = FIL_NULL; + goto loop; + } + rseg->unlatch(); mtr_commit(&mtr); @@ -570,12 +618,12 @@ loop: /* calls the trx_purge_remove_log_hdr() inside trx_purge_free_segment(). */ - trx_purge_free_segment(rseg, hdr_addr, is_temp); + trx_purge_free_segment(rseg, hdr_addr, is_temp, is_insert_history); } else { /* Remove the log hdr from the rseg history. */ - trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr); + trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr, is_insert_history); rseg->unlatch(); mtr_commit(&mtr); @@ -1568,6 +1616,9 @@ static bool trx_purge_truncate_marked_undo() { ut_d(undo::inject_crash("ib_undo_trunc_before_done_logging")); + /* Persist current max id before truncating undo space */ + trx_sys_write_max_trx_id(); + undo::spaces->x_lock(); undo::done_logging(space_num); undo::spaces->x_unlock(); @@ -1777,6 +1828,14 @@ static void trx_purge_rseg_get_next_history_log( auto del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS); + auto type = mtr_read_ulint(page_align(log_hdr) + TRX_UNDO_PAGE_HDR, MLOG_2BYTES, &mtr); + + if (purge_sys->iter.trx_no == trx_no + 1) { + /* same trx no has been processed before */ + purge_sys->iter.trx_no = trx_no; + ut_a(type == TRX_UNDO_INSERT); + } + mtr_commit(&mtr); rseg->latch(); @@ -1890,7 +1949,14 @@ static trx_undo_rec_t *trx_purge_get_next_rec( mtr_t mtr; ut_ad(purge_sys->next_stored); - ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no()); + +#ifdef UNIV_DEBUG + if (srv_mvcc_use_scn) { + ut_ad(purge_sys->iter.trx_no < purge_sys->version.load()); + } else { + ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no()); + } +#endif space = purge_sys->rseg->space_id; page_no = purge_sys->page_no; @@ -2210,7 +2276,7 @@ void Purge_groups_t::distribute_if_needed() { } } - if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) { + if (purge_sys->iter.trx_no >= purge_sys->version.load()) { return nullptr; } @@ -2405,6 +2471,14 @@ ulint trx_purge(ulint n_purge_threads, /*!< in: number of purge tasks trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + if (srv_mvcc_use_scn) { + purge_sys->version = purge_sys->view.version(); + } else { + purge_sys->version = purge_sys->view.low_limit_no(); + } + + purge_sys->min_up_id = purge_sys->view.up_limit_id(); + rw_lock_x_unlock(&purge_sys->latch); #ifdef UNIV_DEBUG diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 4f9c74a644a..da0fb46c6f0 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -150,12 +150,12 @@ static inline ulint trx_undo_left(const page_t *page, /*!< in: undo log page */ #ifdef UNIV_DEBUG ut_ad(ptr >= page); size_t diff = ptr - page; - size_t max_free = UNIV_PAGE_SIZE - 10 - FIL_PAGE_DATA_END; + size_t max_free = UNIV_PAGE_SIZE - 10 - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE; ut_ad(diff < UNIV_PAGE_SIZE); ut_ad(diff <= max_free); #endif /* UNIV_DEBUG */ - return (UNIV_PAGE_SIZE - (ptr - page) - 10 - FIL_PAGE_DATA_END); + return (UNIV_PAGE_SIZE - (ptr - page) - 10 - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE); } size_t trx_undo_max_free_space() { @@ -165,7 +165,7 @@ size_t trx_undo_max_free_space() { UNIV_PAGE_SIZE - 290. */ size_t free_space = UNIV_PAGE_SIZE - (TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE + - TRX_UNDO_LOG_XA_HDR_SIZE + FIL_PAGE_DATA_END + 10); + TRX_UNDO_LOG_XA_HDR_SIZE + FIL_PAGE_DATA_END + 10 + TRX_UNDO_PAGE_RESERVE_SIZE); /* Undo number, table id, undo log type and pointer to next. Also refer to the beginning of trx_undo_page_report_insert() */ @@ -549,6 +549,203 @@ static ulint trx_undo_page_report_insert( return (trx_undo_page_set_next_prev_and_add(undo_page, ptr, mtr)); } +static inline trx_id_t trx_purge_get_version() { + trx_id_t purge_version = purge_sys->version.load(); + ut_a(purge_version > 1); + + return (purge_version - 1); +} + +static inline bool trx_scn_sanity_check(trx_id_t trx_id, trx_id_t trx_scn) { + trx_id_t max_trx_id = trx_sys_get_next_trx_id_or_no(); + if (trx_scn == 0) { return false; } + + return (trx_id < max_trx_id && trx_scn < max_trx_id); +} + +trx_id_t trx_undo_hdr_get_scn(trx_id_t trx_id, page_id_t &page_id, uint32_t offset, mtr_t *mtr, page_t *undo_page) { + + trx_id_t purge_version = trx_purge_get_version(); + + if (undo_page == nullptr) { + page_no_t space_size = fil_space_get_size(page_id.space()); + if (space_size <= page_id.page_no()) { + return purge_version; + } + + bool found; + const page_size_t &page_size = fil_space_get_page_size(page_id.space(), &found); + ut_ad(found); + + undo_page = trx_undo_page_get_s_latched(page_id, page_size, mtr); + + if (undo_page == nullptr || + mach_read_from_2(undo_page + FIL_PAGE_TYPE) != FIL_PAGE_UNDO_LOG) { + /* invalid page */ + return purge_version; + } + } + + /* Get undo header */ + trx_ulogf_t *undo_header = undo_page + offset; + trx_id_t modifier_trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + if (modifier_trx_id != trx_id) { + /* Possiblely purged */ + return purge_version; + } + + trx_id_t scn = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + if (!trx_scn_sanity_check(trx_id, scn)) { + /* Fail to pass sanity checking */ + return purge_version; + } + + return scn; +} + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id) { + trx_id_t purge_version = trx_purge_get_version(); + + if (id < purge_sys->min_up_id.load()) { + //always visible to all open views, give it + //a fake scn + return purge_version; + } + + /* Decode rollback pointer */ + bool is_insert; + ulint rseg_id; + space_id_t space_id; + page_no_t page_no; + ulint offset; + trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no, &offset); + space_id = trx_rseg_id_to_space_id(rseg_id, false); + + /* Acquire space to avoid being truncated */ + fil_space_t *space = fil_space_acquire(space_id); + if (space == nullptr) { + return purge_version; + } + + page_no_t space_size = fil_space_get_size(space_id); + /* out of range, it must be purged */ + if (page_no >= space_size) { + fil_space_release(space); + return purge_version; + } + + const page_size_t &page_size = page_size_t(space->flags); + + /* Get the page */ + mtr_t mtr; + mtr_start(&mtr); + buf_block_t *block = + buf_page_get_gen(page_id_t(space_id, page_no), page_size, RW_S_LATCH, nullptr, + Page_fetch::POSSIBLY_FREED, UT_LOCATION_HERE, &mtr); + page_t *undo_page = nullptr; + if (block != nullptr) { + undo_page = buf_block_get_frame(block); + } + + if (undo_page == nullptr + || mach_read_from_2(undo_page + FIL_PAGE_TYPE) != FIL_PAGE_UNDO_LOG) { + /* invalid page */ + mtr_commit(&mtr); + fil_space_release(space); + return purge_version; + } + + /* Get record */ + trx_undo_rec_t *undo_rec = (trx_undo_rec_t *)(undo_page + offset); + if (trx_undo_rec_get_table_id(undo_rec) != index->table->id) { + /* Wrong undo log, possiblely purged */ + mtr_commit(&mtr); + fil_space_release(space); + return purge_version; + } + + /** Check if it's in first page chain */ + trx_upagef_t *page_hdr = undo_page + TRX_UNDO_PAGE_HDR; + ulint undo_start_offset = mach_read_from_2(page_hdr + TRX_UNDO_PAGE_START); + page_no_t undo_hdr_no; + uint32_t undo_hdr_offset; + + if (undo_start_offset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE) { + /* invalid page */ + mtr_commit(&mtr); + fil_space_release(space); + return purge_version; + } else if (undo_start_offset == TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE) { + /* This page is not fist one in chain, read the page footer */ + ulint page_end_offset = UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE; + byte* page_end = undo_page + page_end_offset; + trx_id_t trx_id = mach_read_from_8(page_end); + undo_hdr_no = mach_read_from_4(page_end + 8); + undo_hdr_offset = mach_read_from_2(page_end + 8 + 4); + + if (trx_id != id + || undo_hdr_no == 0 || undo_hdr_no >= space_size + || undo_hdr_offset < (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE) + || undo_hdr_offset >= page_end_offset) { + /* invalid page */ + mtr_commit(&mtr); + fil_space_release(space); + return purge_version; + } + } else { + /* The undo page is first one in chain, find out the offset of undo header */ + undo_hdr_no = page_no; + + /** validate undo start offset */ + trx_usegf_t *seg_hdr = undo_page + TRX_UNDO_SEG_HDR; + ulint last_log_offset = mach_read_from_2(seg_hdr + TRX_UNDO_LAST_LOG); + if (last_log_offset > UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_LOG_OLD_HDR_SIZE) { + /* Invalid offset */ + mtr_commit(&mtr); + fil_space_release(space); + return purge_version; + } + + /* Find out the right offset */ + while (last_log_offset > offset) { + ulint prev_log_offset = mach_read_from_2(undo_page + last_log_offset + TRX_UNDO_PREV_LOG); + if (prev_log_offset >= last_log_offset + || (last_log_offset - prev_log_offset < TRX_UNDO_LOG_OLD_HDR_SIZE)) { + /* Invalid offset */ + mtr_commit(&mtr); + fil_space_release(space); + return purge_version; + } + + last_log_offset = prev_log_offset; + } + + undo_hdr_offset = last_log_offset; + } + + page_id_t undo_hdr_id = {space_id, undo_hdr_no}; + trx_id_t scn; + + if (undo_hdr_no == page_no) { + /* No need to read and lock page again */ + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, undo_page); + } else { + mtr_commit(&mtr); + mtr_start(&mtr); + + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, nullptr); + } + + mtr_commit(&mtr); + + fil_space_release(space); + + return scn; +} + /** Reads from an undo log record the general parameters. @return remaining part of undo log record after reading these values */ const byte *trx_undo_rec_get_pars( @@ -2085,7 +2282,7 @@ static bool trx_undo_erase_page_end( first_free = mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE); memset(undo_page + first_free, 0xff, - (UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) - first_free); + (UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE) - first_free); mlog_write_initial_log_record(undo_page, MLOG_UNDO_ERASE_END, mtr); return (first_free != TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE); @@ -2417,13 +2614,13 @@ err_exit: [[nodiscard]] static bool trx_undo_get_undo_rec(roll_ptr_t roll_ptr, trx_id_t trx_id, mem_heap_t *heap, bool is_temp, - const table_name_t &name, + const dict_index_t *index, trx_undo_rec_t **undo_rec) { bool missing_history; rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_history = purge_sys->view.changes_visible(trx_id, name); + missing_history = purge_sys->view.changes_visible(trx_id, index, roll_ptr); if (!missing_history) { *undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap, is_temp); } @@ -2486,7 +2683,7 @@ bool trx_undo_prev_version_build( ut_ad(!index->table->skip_alter_undo); if (trx_undo_get_undo_rec(roll_ptr, rec_trx_id, heap, is_temp, - index->table->name, &undo_rec)) { + index, &undo_rec)) { if (v_status & TRX_UNDO_PREV_IN_PURGE) { /* We are fetching the record being purged */ undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap, is_temp); @@ -2562,8 +2759,7 @@ bool trx_undo_prev_version_build( rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_extern = - purge_sys->view.changes_visible(trx_id, index->table->name); + missing_extern = purge_sys->view.changes_visible(trx_id, index, roll_ptr); rw_lock_s_unlock(&purge_sys->latch); diff --git a/storage/innobase/trx/trx0roll.cc b/storage/innobase/trx/trx0roll.cc index f6acd099445..b6a05027436 100644 --- a/storage/innobase/trx/trx0roll.cc +++ b/storage/innobase/trx/trx0roll.cc @@ -649,9 +649,6 @@ static bool trx_rollback_or_clean_resurrected( bool all) /*!< in: false=roll back dictionary transactions; true=roll back all non-PREPARED transactions */ { - ut_ad(trx_sys_mutex_own()); - ut_ad(trx->in_rw_trx_list); - /* Generally, an HA transaction with is_recovered && state==TRX_STATE_PREPARED can be committed or rolled back by a client who knows its XID at any time. To prove that no such state transition is possible while our thread operates, @@ -678,7 +675,6 @@ static bool trx_rollback_or_clean_resurrected( switch (state) { case TRX_STATE_COMMITTED_IN_MEMORY: - trx_sys_mutex_exit(); ib::info(ER_IB_MSG_1188) << "Cleaning up trx with id " << trx_get_id_for_print(trx); @@ -688,7 +684,6 @@ static bool trx_rollback_or_clean_resurrected( return true; case TRX_STATE_ACTIVE: if (all || trx->ddl_operation) { - trx_sys_mutex_exit(); trx_rollback_active(trx); trx_free_for_background(trx); ut_ad(!trx->is_recovered); @@ -705,6 +700,17 @@ static bool trx_rollback_or_clean_resurrected( ut_error; } +/** Collect transaction ids recovered */ +void Trx_by_id_with_min::collect_recovered_ids(std::vector &ids) { + for (auto item : m_by_id) { + const trx_t* trx = item.second; + ut_a(trx->id > 0); + if (trx->is_recovered) { + ids.push_back(trx->id); + } + } +} + /** Rollback or clean up any incomplete transactions which were encountered in crash recovery. If the transaction already was committed, then we clean up a possible insert undo log. If the @@ -716,7 +722,6 @@ void trx_rollback_or_clean_recovered( ut_ad(!srv_read_only_mode); ut_a(srv_force_recovery < SRV_FORCE_NO_TRX_UNDO); - ut_ad(!all || trx_sys_need_rollback()); if (all) { ib::info(ER_IB_MSG_1189) << "Starting in background the rollback" @@ -730,13 +735,19 @@ void trx_rollback_or_clean_recovered( /* Loop over the transaction list as long as there are recovered transactions to clean up or recover. */ + std::vector recovered_ids; + recovered_ids.clear(); + + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.collect_recovered_ids(recovered_ids); + }, + UT_LOCATION_HERE); + } - trx_sys_mutex_enter(); - for (bool need_one_more_scan = true; need_one_more_scan;) { - need_one_more_scan = false; - for (auto trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - + for (auto id : recovered_ids) { /* In case of slow shutdown, we have to wait for the background thread (trx_recovery_rollback) which is doing the rollbacks of recovered transactions. Note that it can add undo to purge. @@ -751,25 +762,19 @@ void trx_rollback_or_clean_recovered( state == SRV_SHUTDOWN_EXIT_THREADS; })); - trx_sys_mutex_exit(); - if (all) { ib::info(ER_IB_MSG_TRX_RECOVERY_ROLLBACK_NOT_COMPLETED); } return; } - /* If this function does a cleanup or rollback - then it will release the trx_sys->mutex, therefore - we need to reacquire it before retrying the loop. */ - if (trx_rollback_or_clean_resurrected(trx, all)) { - trx_sys_mutex_enter(); - need_one_more_scan = true; - break; - } - } + trx_t* trx = trx_rw_is_active(id, false); + if (trx == nullptr) continue; + + trx_rollback_or_clean_resurrected(trx, all); + + continue; } - trx_sys_mutex_exit(); if (all) { ib::info(ER_IB_MSG_TRX_RECOVERY_ROLLBACK_COMPLETED); diff --git a/storage/innobase/trx/trx0rseg.cc b/storage/innobase/trx/trx0rseg.cc index 0070a54cbec..f217ef43746 100644 --- a/storage/innobase/trx/trx0rseg.cc +++ b/storage/innobase/trx/trx0rseg.cc @@ -100,6 +100,10 @@ page_no_t trx_rseg_header_create(space_id_t space_id, /* Initialize maximum transaction number. */ mlog_write_ull(rsegf + TRX_RSEG_MAX_TRX_NO, 0, mtr); + mlog_write_ull(rsegf + TRX_RSEG_MAX_TRX_ID, 0, mtr); + + flst_init(rsegf + TRX_RSEG_INSERT_HISTORY, mtr); + if (space_id == TRX_SYS_SPACE) { /* All rollback segments in the system tablespace need to be found in the TRX_SYS page in the rseg_id slot. @@ -246,6 +250,29 @@ static trx_rseg_t *trx_rseg_mem_initialize(ulint id, space_id_t space_id, return rseg; } +/* In case it's upgraded from older version, we need to +create history list for insert undo */ +static void trx_rseg_create_insert_history_if_needed( + trx_rseg_t *rseg, trx_rsegf_t *rseg_header, mtr_t *mtr) { + if (fsp_is_system_temporary(rseg->space_id)) { + return; + } + + flst_base_node_t* insert_base_node = rseg_header + TRX_RSEG_INSERT_HISTORY; + fil_faddr_t* faddr = insert_base_node + FLST_LAST; + auto page = mach_read_ulint(faddr + FIL_ADDR_PAGE, MLOG_4BYTES); + /* If newly created rseg it should be page no or FIL_NULL, but never be page zero */ + if (page == 0) { + flst_init(insert_base_node, mtr); + } +} + +void trx_update_max_trx_id_startup(trx_id_t new_id) { + trx_id_t old_id = trx_sys->resurrect_max_trx_id.load(); + while (old_id < new_id + && !trx_sys->resurrect_max_trx_id.compare_exchange_weak(old_id, new_id)); +} + static trx_rseg_t *trx_rseg_physical_initialize(trx_rseg_t *rseg, purge_pq_t *purge_queue, trx_id_t gtid_trx_no, @@ -264,8 +291,17 @@ static trx_rseg_t *trx_rseg_physical_initialize(trx_rseg_t *rseg, mtr_read_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr) + 1 + sum_of_undo_sizes); + auto rseg_max_trx_id = mach_read_from_8(rseg_header + TRX_RSEG_MAX_TRX_ID); + + trx_update_max_trx_id_startup(rseg_max_trx_id); + + trx_rseg_create_insert_history_if_needed(rseg, rseg_header, mtr); + auto len = flst_get_len(rseg_header + TRX_RSEG_HISTORY); + auto insert_len = flst_get_len(rseg_header + TRX_RSEG_INSERT_HISTORY); + trx_sys->rseg_history_len.fetch_add(insert_len); + if (len > 0) { trx_sys->rseg_history_len.fetch_add(len); @@ -403,6 +439,15 @@ trx_rseg_t *trx_rseg_mem_create(ulint id, space_id_t space_id, auto len = flst_get_len(rseg_header + TRX_RSEG_HISTORY); + auto rseg_max_trx_id = mach_read_from_8(rseg_header + TRX_RSEG_MAX_TRX_ID); + + trx_update_max_trx_id_startup(rseg_max_trx_id); + + trx_rseg_create_insert_history_if_needed(rseg, rseg_header, mtr); + + auto insert_len = flst_get_len(rseg_header + TRX_RSEG_INSERT_HISTORY); + trx_sys->rseg_history_len.fetch_add(insert_len); + if (len > 0) { trx_sys->rseg_history_len += len; diff --git a/storage/innobase/trx/trx0sys.cc b/storage/innobase/trx/trx0sys.cc index b021780266d..4e35c58c831 100644 --- a/storage/innobase/trx/trx0sys.cc +++ b/storage/innobase/trx/trx0sys.cc @@ -112,19 +112,11 @@ void trx_sys_write_max_trx_id(void) { acquiring the x-lock and it will again read the newest max_trx_id, and possibly re-write it. */ - ut_ad(trx_sys_mutex_own() || trx_sys_serialisation_mutex_own()); - if (!srv_read_only_mode) { DBUG_EXECUTE_IF( "trx_sys_write_max_trx_id__all_blocked", while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); }); -#ifdef UNIV_DEBUG - if (trx_sys_serialisation_mutex_own()) { - DEBUG_SYNC_C("trx_sys_write_max_trx_id__ser"); - } -#endif /* UNIV_DEBUG */ - mtr_start(&mtr); sys_header = trx_sysf_get(&mtr); @@ -134,6 +126,8 @@ void trx_sys_write_max_trx_id(void) { mlog_write_ull(sys_header + TRX_SYS_TRX_ID_STORE, max_trx_id, &mtr); mtr_commit(&mtr); + + fprintf(stderr, "InnoDB: write max transaction id %lu to system header\n", max_trx_id); } } @@ -149,30 +143,64 @@ void trx_sys_persist_gtid_num(trx_id_t gtid_trx_no) { } trx_id_t trx_sys_oldest_trx_no() { - ut_ad(trx_sys_serialisation_mutex_own()); - /* Get the oldest transaction from serialisation list. */ - if (UT_LIST_GET_LEN(trx_sys->serialisation_list) > 0) { - auto trx = UT_LIST_GET_FIRST(trx_sys->serialisation_list); - return (trx->no); + trx_id_t current_min_no = trx_sys->next_trx_id_or_no.load(); + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_commit_shard &shard = trx_sys->commit_shards[i]; + trx_id_t min_no = shard.commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + return (trx_list_with_min.min_id());}, UT_LOCATION_HERE); + + if (current_min_no > min_no && min_no != 0) { + current_min_no = min_no; + } + } + + return current_min_no; +} + +static uint64_t trx_sys_rw_trx_count() { + uint64_t total = 0; + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + total += shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.size(); + }, + UT_LOCATION_HERE); + } + + return total; +} + +bool trx_sys_need_rollback() { + uint64_t prepared_trx = trx_sys->n_prepared_trx.load(); + uint64_t n_trx = trx_sys_rw_trx_count(); + return (n_trx > prepared_trx); +} + +void Trx_by_id_with_min::collect_prepared_ids(std::vector &trx_ids) { + for (auto item : m_by_id) { + const trx_t* trx = item.second; + if (trx_state_eq(trx, TRX_STATE_PREPARED) && trx_is_mysql_xa(trx)) { + trx_ids.push_back(trx->id); + } } - return trx_sys_get_next_trx_id_or_no(); } void trx_sys_get_binlog_prepared(std::vector &trx_ids) { - trx_sys_mutex_enter(); - /* Exit fast if no prepared transaction. */ if (trx_sys->n_prepared_trx == 0) { - trx_sys_mutex_exit(); return; } + /* Check and find binary log prepared transaction. */ - for (auto trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - if (trx_state_eq(trx, TRX_STATE_PREPARED) && trx_is_mysql_xa(trx)) { - trx_ids.push_back(trx->id); - } + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.collect_prepared_ids(trx_ids); + }, + UT_LOCATION_HERE); } - trx_sys_mutex_exit(); } /** Read binary log positions from buffer passed. @@ -498,10 +526,16 @@ purge_pq_t *trx_sys_init_at_db_start(void) { trx_sys->next_trx_id_or_no.store(max_trx_id + 2 * trx_sys_get_trx_id_write_margin()); + trx_sys->next_trx_id_or_no = + std::max(trx_sys->next_trx_id_or_no.load(), + trx_sys->resurrect_max_trx_id.load()); + trx_sys->serialisation_min_trx_no.store(trx_sys->next_trx_id_or_no.load()); mtr.commit(); + trx_sys->next_trx_id_version = trx_sys->next_trx_id_or_no.load(); + #ifdef UNIV_DEBUG /* max_trx_id is the next transaction ID to assign. Initialize maximum transaction number to one less if all transactions are already purged. */ @@ -518,29 +552,28 @@ purge_pq_t *trx_sys_init_at_db_start(void) { trx_lists_init_at_db_start(); + scn_mgr->init(); + /* This mutex is not strictly required, it is here only to satisfy the debug code (assertions). We are still running in single threaded bootstrap mode. */ + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + rows_to_undo += shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.recovered_rows(); + }, + UT_LOCATION_HERE); + } - trx_sys_mutex_enter(); - - if (UT_LIST_GET_LEN(trx_sys->rw_trx_list) > 0) { - for (auto trx : trx_sys->rw_trx_list) { - ut_ad(trx->is_recovered); - assert_trx_in_rw_list(trx); - - if (trx_state_eq(trx, TRX_STATE_ACTIVE)) { - rows_to_undo += trx->undo_no; - } - } - + if (rows_to_undo > 0) { if (rows_to_undo > 1000000000) { unit = "M"; rows_to_undo = rows_to_undo / 1000000; } ib::info(ER_IB_MSG_1198) - << UT_LIST_GET_LEN(trx_sys->rw_trx_list) + << trx_sys_rw_trx_count() << " transaction(s) which must be rolled back or" " cleaned up in total " << rows_to_undo << unit << " row operations to undo"; @@ -551,8 +584,6 @@ purge_pq_t *trx_sys_init_at_db_start(void) { trx_sys->found_prepared_trx = trx_sys->n_prepared_trx > 0; - trx_sys_mutex_exit(); - return (purge_queue); } @@ -564,16 +595,15 @@ void trx_sys_create(void) { ut::zalloc_withkey(UT_NEW_THIS_FILE_PSI_KEY, sizeof(*trx_sys))); mutex_create(LATCH_ID_TRX_SYS, &trx_sys->mutex); - mutex_create(LATCH_ID_TRX_SYS_SERIALISATION, &trx_sys->serialisation_mutex); - UT_LIST_INIT(trx_sys->serialisation_list); - UT_LIST_INIT(trx_sys->rw_trx_list); UT_LIST_INIT(trx_sys->mysql_trx_list); trx_sys->mvcc = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY, 1024); trx_sys->serialisation_min_trx_no.store(0); + trx_sys->resurrect_max_trx_id = 0; + ut_d(trx_sys->rw_max_trx_no = 0); new (&trx_sys->rw_trx_ids) @@ -583,6 +613,14 @@ void trx_sys_create(void) { new (&shard) Trx_shard{}; } + for (auto &shard : trx_sys->commit_shards) { + new (&shard) Trx_commit_shard{}; + shard.commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + trx_list_with_min.init_list(); + }, UT_LOCATION_HERE); + } + new (&trx_sys->rsegs) Rsegs(); trx_sys->rsegs.set_empty(); @@ -601,6 +639,12 @@ void trx_sys_create_sys_pages(void) { mtr_commit(&mtr); } +void Trx_by_id_with_min::free_prepared() { + while (!m_by_id.empty()) { + trx_free_prepared_or_active_recovered(m_by_id.begin()->second); + } +} + /********************************************************************* Shutdown/Close the transaction system. */ void trx_sys_close(void) { @@ -629,8 +673,13 @@ void trx_sys_close(void) { shutdown). Free all of them. */ ut_d(trx_sys_after_background_threads_shutdown_validate()); - while (auto trx = UT_LIST_GET_FIRST(trx_sys->rw_trx_list)) { - trx_free_prepared_or_active_recovered(trx); + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + /* On shutdown there's no concurrent session, so it's + ok to not latch */ + shard.active_rw_trxs.execute_no_latch( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.free_prepared();}); } /* There can't be any active transactions. */ @@ -640,16 +689,18 @@ void trx_sys_close(void) { ut::delete_(trx_sys->mvcc); - ut_a(UT_LIST_GET_LEN(trx_sys->rw_trx_list) == 0); + ut_a(trx_sys_rw_trx_count() == 0); ut_a(UT_LIST_GET_LEN(trx_sys->mysql_trx_list) == 0); - ut_a(UT_LIST_GET_LEN(trx_sys->serialisation_list) == 0); for (auto &shard : trx_sys->shards) { shard.~Trx_shard(); } + for (auto &shard : trx_sys->commit_shards) { + shard.~Trx_commit_shard(); + } + /* We used placement new to create this mutex. Call the destructor. */ - mutex_free(&trx_sys->serialisation_mutex); mutex_free(&trx_sys->mutex); trx_sys->rw_trx_ids.~trx_ids_t(); @@ -709,7 +760,7 @@ void trx_sys_after_pre_dd_shutdown_validate() { } trx_sys_mutex_enter(); - ut_a(UT_LIST_GET_LEN(trx_sys->rw_trx_list) == + ut_a(trx_sys_rw_trx_count() == trx_sys->n_prepared_trx + active_recovered_trxs); trx_sys_mutex_exit(); } @@ -720,37 +771,31 @@ void trx_sys_after_background_threads_shutdown_validate() { ut_a(UT_LIST_GET_LEN(trx_sys->mysql_trx_list) == 0); } -size_t trx_sys_recovered_active_trxs_count() { +size_t Trx_by_id_with_min::recovered_active_count() { size_t total_trx = 0; - trx_sys_mutex_enter(); - /* Recovered transactions are never citizens of mysql_trx_list, - so it's enough to check rw_trx_list. */ - for (auto trx : trx_sys->rw_trx_list) { + for (auto item : m_by_id) { + const trx_t* trx = item.second; if (trx_state_eq(trx, TRX_STATE_ACTIVE) && trx->is_recovered) { total_trx++; } } - trx_sys_mutex_exit(); - return (total_trx); -} - -#ifdef UNIV_DEBUG -/** Validate the trx_sys_t::rw_trx_list. - @return true if the list is valid. */ -bool trx_sys_validate_trx_list() { - ut_ad(trx_sys_mutex_own()); - const trx_t *prev_trx = nullptr; + return total_trx; +} - for (auto trx : trx_sys->rw_trx_list) { - check_trx_state(trx); - ut_a(prev_trx == nullptr || prev_trx->id > trx->id); - prev_trx = trx; +size_t trx_sys_recovered_active_trxs_count() { + size_t total_trx = 0; + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + total_trx += shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.recovered_active_count(); + }, + UT_LOCATION_HERE); } - return (true); + return (total_trx); } -#endif /* UNIV_DEBUG */ #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 6fd4f54d2bb..6276fcd88cb 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -282,6 +282,7 @@ struct TrxFactory { mutex_create(LATCH_ID_TRX, &trx->mutex); mutex_create(LATCH_ID_TRX_UNDO, &trx->undo_mutex); + mutex_create(LATCH_ID_TRX_SCN, &trx->scn_mutex); lock_trx_alloc_locks(trx); } @@ -290,7 +291,6 @@ struct TrxFactory { @param trx the transaction for which to release resources */ static void destroy(trx_t *trx) { ut_a(trx->magic_n == TRX_MAGIC_N); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx->in_mysql_trx_list); ut_a(trx->lock.wait_lock == nullptr); @@ -313,6 +313,7 @@ struct TrxFactory { mutex_free(&trx->mutex); mutex_free(&trx->undo_mutex); + mutex_free(&trx->scn_mutex); trx->mod_tables.~trx_mod_tables_t(); @@ -353,7 +354,6 @@ struct TrxFactory { ut_ad(trx->mysql_thd == nullptr); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx->in_mysql_trx_list); ut_a(trx->lock.wait_thr == nullptr); @@ -617,12 +617,10 @@ void trx_free_prepared_or_active_recovered(trx_t *trx) { XA ROLLBACK. Usually the field is cleared during rollback or commit, here we have to do it ourselves as we neither rollback nor commit, just "free" it. */ ut_ad(!trx->will_lock || trx_state_eq(trx, TRX_STATE_PREPARED)); - assert_trx_in_rw_list(trx); trx_release_impl_and_expl_locks(trx, false); trx_undo_free_trx_with_prepared_or_active_logs(trx, was_prepared); - ut_ad(!trx->in_rw_trx_list); ut_a(!trx->read_only); trx->state.store(TRX_STATE_NOT_STARTED, std::memory_order_relaxed); @@ -641,6 +639,11 @@ void trx_free_prepared_or_active_recovered(trx_t *trx) { linked to another session in future. */ inline void trx_disconnect_from_mysql(trx_t *trx, bool prepared) { + + if (trx->read_view != nullptr) { + trx_sys->mvcc->view_close(trx->read_view, true); + } + trx_sys_mutex_enter(); ut_ad(trx->in_mysql_trx_list); @@ -648,12 +651,6 @@ inline void trx_disconnect_from_mysql(trx_t *trx, bool prepared) { UT_LIST_REMOVE(trx_sys->mysql_trx_list, trx); - if (trx->read_view != nullptr) { - trx_sys->mvcc->view_close(trx->read_view, true); - } - - ut_ad(trx_sys_validate_trx_list()); - if (prepared) { ut_ad(trx_state_eq(trx, TRX_STATE_PREPARED)); @@ -876,14 +873,11 @@ static trx_t *trx_resurrect_insert( std::memory_order_relaxed); } - /* We give a dummy value for the trx no; this should have no - relevance since purge is not interested in committed - transaction numbers, unless they are in the history - list, in which case it looks the number from the disk based - undo log structure */ - - trx->no = trx->id; - + if (undo->trx_no > 0) { + trx->no = undo->trx_no; + } else { + trx->no = TRX_ID_MAX; + } } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); @@ -981,10 +975,11 @@ static void trx_resurrect_update( if (undo->state != TRX_UNDO_ACTIVE) { trx_resurrect_update_in_prepared_state(trx, undo); - /* We give a dummy value for the trx number */ - - trx->no = trx->id; - + if (undo->trx_no > 0) { + trx->no = undo->trx_no; + } else { + trx->no = TRX_ID_MAX; + } } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); @@ -1055,28 +1050,6 @@ static void trx_resurrect(trx_rseg_t *rseg) { } } -/** Adds the transaction to trx_sys->rw_trx_list -Requires trx_sys->mutex, unless called in the single threaded startup code. -@param[in] trx The transaction assumed to not be in the rw_trx_list yet -*/ -static inline void trx_add_to_rw_trx_list(trx_t *trx) { - ut_ad(srv_is_being_started || trx_sys_mutex_own()); - ut_ad(!trx->in_rw_trx_list); - UT_LIST_ADD_FIRST(trx_sys->rw_trx_list, trx); - ut_d(trx->in_rw_trx_list = true); -} - -/** Removes the transaction from trx_sys->rw_trx_list. -Requires trx_sys->mutex, unless called in the single threaded startup code. -@param[in] trx The transaction assumed to be in the rw_trx_list -*/ -static inline void trx_remove_from_rw_trx_list(trx_t *trx) { - ut_ad(srv_is_being_started || trx_sys_mutex_own()); - ut_ad(trx->in_rw_trx_list); - UT_LIST_REMOVE(trx_sys->rw_trx_list, trx); - ut_d(trx->in_rw_trx_list = false); -} - /** Creates trx objects for transactions and initializes the trx list of trx_sys at database start. Rollback segments and undo log lists must already exist when this function is called, because the lists of @@ -1113,15 +1086,17 @@ void trx_lists_init_at_db_start(void) { }, UT_LOCATION_HERE); } - std::sort(trxs.begin(), trxs.end(), - [&](trx_t *a, trx_t *b) { return a->id < b->id; }); - for (trx_t *trx : trxs) { - if (trx->state.load(std::memory_order_relaxed) == TRX_STATE_ACTIVE || - trx->state.load(std::memory_order_relaxed) == TRX_STATE_PREPARED) { - trx_sys->rw_trx_ids.push_back(trx->id); + if (!srv_mvcc_use_scn) { + std::sort(trxs.begin(), trxs.end(), + [&](trx_t *a, trx_t *b) { return a->id < b->id; }); + + for (trx_t *trx : trxs) { + if (trx->state.load(std::memory_order_relaxed) == TRX_STATE_ACTIVE || + trx->state.load(std::memory_order_relaxed) == TRX_STATE_PREPARED) { + trx_sys->rw_trx_ids.push_back(trx->id); + } } - trx_add_to_rw_trx_list(trx); } } @@ -1275,6 +1250,17 @@ void trx_assign_rseg_durable(trx_t *trx) { trx->rsegs.m_redo.rseg = srv_read_only_mode ? nullptr : get_next_redo_rseg(); } +static inline void trx_sys_assign_trx_id(trx_t *trx) { + if (srv_mvcc_use_scn) { + trx->id = trx_sys_allocate_trx_id(); + } else { + trx_sys_mutex_enter(); + trx->id = trx_sys_allocate_trx_id(); + trx_sys->rw_trx_ids.push_back(trx->id); + trx_sys_mutex_exit(); + } +} + /** Assign a temp-tablespace bound rollback-segment to a transaction. @param[in,out] trx transaction that involves write to temp-table. */ void trx_assign_rseg_temp(trx_t *trx) { @@ -1285,15 +1271,11 @@ void trx_assign_rseg_temp(trx_t *trx) { srv_read_only_mode ? nullptr : get_next_temp_rseg(); if (trx->id == 0) { - trx_sys_mutex_enter(); - - trx->id = trx_sys_allocate_trx_id(); - - trx_sys->rw_trx_ids.push_back(trx->id); - - trx_sys_mutex_exit(); + trx_sys_assign_trx_id(trx); trx_sys_rw_trx_add(trx); + + trx_sys->next_trx_id_version++; } } @@ -1382,8 +1364,6 @@ static void trx_start_low( change must be protected by the trx_sys->mutex, so that lock_print_info_all_transactions() will have a consistent view. */ - ut_ad(!trx->in_rw_trx_list); - /* We tend to over assert and that complicates the code somewhat. e.g., the transaction state can be set earlier but we are forced to set it under the protection of the trx_sys_t::mutex because some @@ -1403,25 +1383,17 @@ static void trx_start_low( updates a temporary table */ DEBUG_SYNC_C("trx_sys_before_assign_id"); - trx_sys_mutex_enter(); - - trx->id = trx_sys_allocate_trx_id(); - - trx_sys->rw_trx_ids.push_back(trx->id); + trx_sys_assign_trx_id(trx); ut_ad(trx->rsegs.m_redo.rseg != nullptr || srv_read_only_mode || srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO); - trx_add_to_rw_trx_list(trx); - trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); - ut_ad(trx_sys_validate_trx_list()); - - trx_sys_mutex_exit(); - trx_sys_rw_trx_add(trx); + trx_sys->next_trx_id_version++; + } else { trx->id = 0; @@ -1431,20 +1403,16 @@ static void trx_start_low( to write to the temporary table. */ if (read_write) { - trx_sys_mutex_enter(); - ut_ad(!srv_read_only_mode); trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); - trx->id = trx_sys_allocate_trx_id(); - - trx_sys->rw_trx_ids.push_back(trx->id); - - trx_sys_mutex_exit(); + trx_sys_assign_trx_id(trx); trx_sys_rw_trx_add(trx); + trx_sys->next_trx_id_version++; + } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); } @@ -1459,51 +1427,62 @@ static void trx_start_low( MONITOR_INC(MONITOR_TRX_ACTIVE); } +void Trx_commit_serialisation_list::add_list(trx_t &trx) { + ut_a(trx.id > 0); + + mutex_enter(&(trx.scn_mutex)); + trx.no = trx_sys_allocate_trx_no(); + trx_sys->next_trx_id_version++; + mutex_exit(&(trx.scn_mutex)); + + //add to list + UT_LIST_ADD_LAST(serialisation_list, &trx); + if (UT_LIST_GET_LEN(serialisation_list) == 1) { + m_min_id = trx.no; + } +} + +void Trx_commit_serialisation_list::remove_list(trx_t &trx) { + ut_a(trx.id > 0); + ut_a(trx.no != TRX_ID_MAX); + + UT_LIST_REMOVE(serialisation_list, &trx); + if (UT_LIST_GET_LEN(serialisation_list) > 0) { + m_min_id = UT_LIST_GET_FIRST(serialisation_list)->no; + } else { + m_min_id = 0; + } +} + /** Assigns the trx->no and add the transaction to the serialisation_list. Skips adding to the serialisation_list if the transaction is read-only, in which case still the trx->no is assigned. @param[in,out] trx the modified transaction @return true if added to the serialisation_list (non read-only trx) */ static inline bool trx_add_to_serialisation_list(trx_t *trx) { - trx_sys_serialisation_mutex_enter(); - - trx->no = trx_sys_allocate_trx_no(); - - /* Update the latest transaction number. */ - ut_d(trx_sys->rw_max_trx_no = trx->no); - if (trx->read_only) { - trx_sys_serialisation_mutex_exit(); + trx->no = trx_sys_allocate_trx_no(); + trx_sys->next_trx_id_version++; return false; } - UT_LIST_ADD_LAST(trx_sys->serialisation_list, trx); + const auto trx_shard_no = trx_get_shard_no(trx->id + 1); + trx_sys->commit_shards[trx_shard_no].commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + trx_list_with_min.add_list(*trx); + }, + UT_LOCATION_HERE); - if (UT_LIST_GET_LEN(trx_sys->serialisation_list) == 1) { - trx_sys->serialisation_min_trx_no.store(trx->no); - } +#ifdef UNIV_DEBUG + /* Update the latest transaction number. */ + if (trx->rsegs.m_redo.update_undo != nullptr + || trx->rsegs.m_noredo.update_undo != nullptr) + ut_d(trx_sys->rw_max_trx_no = std::max(trx_sys->rw_max_trx_no, trx->no)); +#endif - trx_sys_serialisation_mutex_exit(); return true; } -/** Erases transaction from the serialisation_list. Caller must have -acquired trx_sys->serialisation_mutex prior to calling this function. -@param[in,out] trx the transaction to erase */ -static inline void trx_erase_from_serialisation_list_low(trx_t *trx) { - ut_ad(trx_sys_serialisation_mutex_own()); - - UT_LIST_REMOVE(trx_sys->serialisation_list, trx); - - if (UT_LIST_GET_LEN(trx_sys->serialisation_list) > 0) { - trx_sys->serialisation_min_trx_no.store( - UT_LIST_GET_FIRST(trx_sys->serialisation_list)->no); - - } else { - trx_sys->serialisation_min_trx_no.store(trx_sys_get_next_trx_id_or_no()); - } -} - /** Set the transaction serialisation number. @return true if the transaction number was added to the serialisation_list. */ static bool trx_serialisation_number_get( @@ -1598,19 +1577,19 @@ static bool trx_write_serialisation_history( } /* If transaction involves insert then truncate undo logs. */ - if (trx->rsegs.m_redo.insert_undo != nullptr) { - trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); + if (!srv_mvcc_use_scn && trx->rsegs.m_redo.insert_undo != nullptr) { + trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr, false); } if (trx->rsegs.m_noredo.insert_undo != nullptr) { - trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr); + trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr, true); } bool serialised = false; /* If transaction involves update then add rollback segments to purge queue. */ - if (trx->rsegs.m_redo.update_undo != nullptr || + if (trx_is_redo_rseg_updated(trx) || trx->rsegs.m_noredo.update_undo != nullptr) { /* Assign the transaction serialisation number and add these rollback segments to purge trx-no sorted priority queue @@ -1625,14 +1604,18 @@ static bool trx_write_serialisation_history( : nullptr; /* Will set trx->no and will add rseg to purge queue. */ - serialised = trx_serialisation_number_get(trx, redo_rseg_undo_ptr, - temp_rseg_undo_ptr); + if (srv_mvcc_use_scn || redo_rseg_undo_ptr != nullptr || temp_rseg_undo_ptr != nullptr) { + serialised = trx_serialisation_number_get(trx, redo_rseg_undo_ptr, + temp_rseg_undo_ptr); + } + ulint added_log = 0; /* It is not necessary to obtain trx->undo_mutex here because only a single OS thread is allowed to do the transaction commit for this transaction. */ if (trx->rsegs.m_redo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish(trx->rsegs.m_redo.update_undo, mtr); @@ -1641,28 +1624,45 @@ static bool trx_write_serialisation_history( non-redo update_undo too. This is to avoid immediate invocation of purge as we need to club these 2 segments with same trx-no as single unit. */ - bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + bool update_rseg_len = false; + if (srv_mvcc_use_scn) { + update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr || trx->rsegs.m_redo.insert_undo != nullptr); + } else { + update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + } /* Set flag if GTID information need to persist. */ auto undo_ptr = &trx->rsegs.m_redo; trx_undo_gtid_set(trx, undo_ptr->update_undo, false); - trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, - (update_rseg_len ? 1 : 0), mtr); + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, false, + (update_rseg_len ? added_log : 0), mtr); } DBUG_EXECUTE_IF("ib_trx_crash_during_commit", DBUG_SUICIDE();); + if (trx->rsegs.m_redo.insert_undo != nullptr && srv_mvcc_use_scn) { + page_t *undo_hdr_page; + added_log++; + + undo_hdr_page = + trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); + + bool update_rseg_len = (trx->rsegs.m_noredo.update_undo == nullptr); + auto undo_ptr = &trx->rsegs.m_redo; + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, true, + (update_rseg_len ? added_log : 0), mtr); + } + if (trx->rsegs.m_noredo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish( trx->rsegs.m_noredo.update_undo, &temp_mtr); - ulint n_added_logs = (redo_rseg_undo_ptr != nullptr) ? 2 : 1; - - trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, - n_added_logs, &temp_mtr); + trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, false, + added_log, &temp_mtr); } } @@ -1822,43 +1822,11 @@ static void trx_update_mod_tables_timestamp(trx_t *trx) /*!< in: transaction */ trx->mod_tables.clear(); } -/** -Erase the transaction from running transaction lists and serialization -list. Active RW transaction list of a MVCC snapshot(ReadView::prepare) -won't include this transaction after this call. All implicit locks are -also released by this call as trx is removed from rw_trx_list. -@param[in] trx Transaction to erase, must have an ID > 0 */ -static void trx_erase_lists(trx_t *trx) { - ut_ad(trx->id > 0); - ut_ad(trx_sys_mutex_own()); - - trx_ids_t::iterator it = std::lower_bound(trx_sys->rw_trx_ids.begin(), - trx_sys->rw_trx_ids.end(), trx->id); - - ut_ad(*it == trx->id); - trx_sys->rw_trx_ids.erase(it); - - if (trx->read_only || trx->rsegs.m_redo.rseg == nullptr) { - ut_ad(!trx->in_rw_trx_list); - } else { - trx_remove_from_rw_trx_list(trx); - ut_ad(trx_sys_validate_trx_list()); - - if (trx->read_view != nullptr) { - trx_sys->mvcc->view_close(trx->read_view, true); - } - } - DEBUG_SYNC_C("after_trx_erase_lists"); -} - static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { check_trx_state(trx); ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) || trx_state_eq(trx, TRX_STATE_PREPARED)); - bool trx_sys_latch_is_needed = - (trx->id > 0) || trx_state_eq(trx, TRX_STATE_PREPARED); - /* Check and get GTID to be persisted. Do it outside mutex. It must be done before trx->state is changed to TRX_STATE_COMMITTED_IN_MEMORY, because the gtid_persistor.get_gtid_info() calls gtid_persistor.has_gtid() which checks @@ -1870,15 +1838,19 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { gtid_persistor.get_gtid_info(trx, gtid_desc); } - if (trx_sys_latch_is_needed) { - trx_sys_mutex_enter(); - } - if (trx->id > 0) { - /* For consistent snapshot, we need to remove current - transaction from running transaction id list for mvcc - before doing commit and releasing locks. */ - trx_erase_lists(trx); + if (trx->read_view != nullptr) { + trx_sys->mvcc->view_close(trx->read_view, true); + } + + if (!srv_mvcc_use_scn) { + trx_sys_mutex_enter(); + trx_ids_t::iterator it = std::lower_bound(trx_sys->rw_trx_ids.begin(), + trx_sys->rw_trx_ids.end(), trx->id); + ut_ad(*it == trx->id); + trx_sys->rw_trx_ids.erase(it); + trx_sys_mutex_exit(); + } } if (trx_state_eq(trx, TRX_STATE_PREPARED)) { @@ -1886,10 +1858,6 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { --trx_sys->n_prepared_trx; } - if (trx_sys_latch_is_needed) { - trx_sys_mutex_exit(); - } - auto state_transition = [&]() { trx_mutex_enter(trx); /* Please consider this particular point in time as the moment the trx's @@ -1933,8 +1901,6 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { become purged (because trx->no would no longer protect them). */ if (serialised) { - trx_sys_serialisation_mutex_enter(); - /* Add GTID to be persisted to disk table. It must be done ... 1.After the transaction is marked committed in undo. Otherwise GTID might get committed before the transaction commit on disk. @@ -1948,9 +1914,18 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { gtid_persistor.add(gtid_desc); } - trx_erase_from_serialisation_list_low(trx); + const auto trx_shard_no = trx_get_shard_no(trx->id + 1); + trx_sys->commit_shards[trx_shard_no].commit_rw_trxs.latch_and_execute( + [&](Trx_commit_serialisation_list &trx_list_with_min) { + trx_list_with_min.remove_list(*trx); + }, UT_LOCATION_HERE); - trx_sys_serialisation_mutex_exit(); + if (srv_mvcc_use_scn) { + ut_a(trx->no != TRX_ID_MAX); + scn_mgr->store_scn(trx->id, trx->no); + } + + DEBUG_SYNC_C("after_trx_erase_lists"); } lock_trx_release_locks(trx); @@ -1976,7 +1951,6 @@ written */ ut_ad(trx->read_only); ut_a(!trx->is_recovered); ut_ad(trx->rsegs.m_redo.rseg == nullptr); - ut_ad(!trx->in_rw_trx_list); /* Note: We are asserting without holding the locksys latch. But that is OK because this transaction is not waiting and cannot @@ -2032,7 +2006,7 @@ written */ gtid_persistor.set_persist_gtid(trx, false); if (mtr != nullptr) { - if (trx->rsegs.m_redo.insert_undo != nullptr) { + if (!srv_mvcc_use_scn && trx->rsegs.m_redo.insert_undo != nullptr) { trx_undo_insert_cleanup(&trx->rsegs.m_redo, false); } @@ -2296,7 +2270,6 @@ void trx_cleanup_at_db_startup(trx_t *trx) /*!< in: transaction */ trx_sys_mutex_enter(); ut_a(!trx->read_only); - trx_remove_from_rw_trx_list(trx); trx_sys_mutex_exit(); @@ -2304,7 +2277,6 @@ void trx_cleanup_at_db_startup(trx_t *trx) /*!< in: transaction */ that it no longer is in the trx_list. Recovered transactions are never placed in the mysql_trx_list. */ ut_ad(trx->is_recovered); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx->in_mysql_trx_list); trx->state.store(TRX_STATE_NOT_STARTED, std::memory_order_relaxed); } @@ -2313,7 +2285,7 @@ void trx_cleanup_at_db_startup(trx_t *trx) /*!< in: transaction */ within the same transaction will get the same read view, which is created when this function is first called for a new started transaction. @return consistent read view */ -ReadView *trx_assign_read_view(trx_t *trx) /*!< in/out: active transaction */ +ReadView *trx_assign_read_view(trx_t *trx, bool is_shared) /*!< in/out: active transaction */ { ut_ad(trx_can_be_handled_by_current_thread_or_is_hp_victim(trx)); ut_ad(trx->state.load(std::memory_order_relaxed) == TRX_STATE_ACTIVE); @@ -2323,7 +2295,8 @@ ReadView *trx_assign_read_view(trx_t *trx) /*!< in/out: active transaction */ return (nullptr); } else if (!MVCC::is_view_active(trx->read_view)) { - trx_sys->mvcc->view_open(trx->read_view, trx); + trx_sys->mvcc->view_open(trx->read_view, trx, is_shared); + trx->read_view->set_trx(trx); } return (trx->read_view); @@ -2559,8 +2532,6 @@ void trx_print_low(FILE *f, bool newline; const char *op_info; - ut_ad(trx_sys_mutex_own()); - fprintf(f, "TRANSACTION " TRX_ID_FMT, trx_get_id_for_print(trx)); const auto trx_state = trx->state.load(std::memory_order_relaxed); @@ -2660,7 +2631,6 @@ void trx_print_latched(FILE *f, const trx_t *trx, ulint max_query_len) { /* We need exclusive access to lock_sys for lock_number_of_rows_locked(), and accessing trx->lock fields without trx->mutex.*/ ut_ad(locksys::owns_exclusive_global_latch()); - ut_ad(trx_sys_mutex_own()); trx_print_low(f, trx, max_query_len, lock_number_of_rows_locked(&trx->lock), UT_LIST_GET_LEN(trx->lock.trx_locks), @@ -3095,10 +3065,7 @@ static void trx_set_prepared_in_tc(trx_t *trx) { /* Add GTID to be persisted to disk table, if needed. */ if (gtid_desc.m_is_set) { - /* The gtid_persistor.add() might release and re-acquire the mutex. */ - trx_sys_serialisation_mutex_enter(); gtid_persistor.add(gtid_desc); - trx_sys_serialisation_mutex_exit(); } /* Reset after successfully adding GTID to in memory table. */ @@ -3187,36 +3154,22 @@ static bool get_info_about_prepared_transaction(XA_recover_txn *txn_list, return false; } -/** This function is used to find number of prepared transactions and - their transaction objects for a recovery. - @return number of prepared transactions stored in xid_list */ -int trx_recover_for_mysql( - XA_recover_txn *txn_list, /*!< in/out: prepared transactions */ - ulint len, /*!< in: number of slots in xid_list */ - MEM_ROOT *mem_root) /*!< in: memory for table names */ -{ - ulint count = 0; - - ut_ad(txn_list); - ut_ad(len); - - /* We should set those transactions which are in the prepared state - to the xid_list */ - - trx_sys_mutex_enter(); - - for (const trx_t *trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - +void Trx_by_id_with_min::recover_prepared( + XA_recover_txn *txn_list, + MEM_ROOT *mem_root, + ulint &index, + ulint limit) { + for (auto item : m_by_id) { + const trx_t *trx = item.second; /* The state of a read-write transaction cannot change from or to NOT_STARTED while we are holding the trx_sys->mutex. It may change to PREPARED, but not if trx->is_recovered. */ if (trx_state_eq(trx, TRX_STATE_PREPARED)) { - if (get_info_about_prepared_transaction(&txn_list[count], trx, mem_root)) + if (get_info_about_prepared_transaction(&txn_list[index], trx, mem_root)) break; - if (count == 0) { + if (index == 0) { ib::info(ER_IB_MSG_1207) << "Starting recovery for" " XA transactions..."; } @@ -3227,15 +3180,41 @@ int trx_recover_for_mysql( ib::info(ER_IB_MSG_1209) << "Transaction contains changes to " << trx->undo_no << " rows"; - count++; + index++; - if (count == len) { + if (index == limit) { break; } } } +} - trx_sys_mutex_exit(); +/** This function is used to find number of prepared transactions and + their transaction objects for a recovery. + @return number of prepared transactions stored in xid_list */ +int trx_recover_for_mysql( + XA_recover_txn *txn_list, /*!< in/out: prepared transactions */ + ulint len, /*!< in: number of slots in xid_list */ + MEM_ROOT *mem_root) /*!< in: memory for table names */ +{ + ulint count = 0; + + ut_ad(txn_list); + ut_ad(len); + + /* We should set those transactions which are in the prepared state + to the xid_list */ + + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.recover_prepared(txn_list, mem_root, count, len);}, UT_LOCATION_HERE); + + if (count == len) { + break; + } + } if (count > 0) { ib::info(ER_IB_MSG_1210) << count @@ -3246,19 +3225,13 @@ int trx_recover_for_mysql( return (int(count)); } -int trx_recover_tc_for_mysql(Xa_state_list &xa_list) { - /* We should set those transactions which are in the prepared state - to the xid_list */ - - trx_sys_mutex_enter(); - - for (trx_t *trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - +void Trx_by_id_with_min::recover_tc(Xa_state_list &xa_list) { + for (auto item : m_by_id) { /* The state of a read-write transaction cannot change from or to NOT_STARTED while we are holding the trx_sys->mutex. It may change to PREPARED, but not if trx->is_recovered. */ + trx_t *trx = item.second; if (trx_state_eq(trx, TRX_STATE_PREPARED)) { if (trx_is_prepared_in_tc(trx)) { /* We found the transaction in 2nd phase of prepare, add to XA @@ -3270,31 +3243,29 @@ int trx_recover_tc_for_mysql(Xa_state_list &xa_list) { } } } +} - trx_sys_mutex_exit(); +int trx_recover_tc_for_mysql(Xa_state_list &xa_list) { + /* We should set those transactions which are in the prepared state + to the xid_list */ + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + trx_by_id_with_min.recover_tc(xa_list);}, UT_LOCATION_HERE); + } return 0; } -/** This function is used to find one X/Open XA distributed transaction - which is in the prepared state - @return trx on match, the trx->xid will be invalidated; - */ -[[nodiscard]] static trx_t *trx_get_trx_by_xid_low( - const XID *xid) /*!< in: X/Open XA transaction - identifier */ -{ - ut_ad(trx_sys_mutex_own()); - - for (auto trx : trx_sys->rw_trx_list) { - assert_trx_in_rw_list(trx); - +trx_t* Trx_by_id_with_min::get(const XID *xid) { + for (auto item: m_by_id) { + trx_t *trx = item.second; /* Most of the time server layer takes care of synchronizing access to a XID from several connections, but when disconnecting there is a short period in which server allows a new connection to pick up XID still processed by old connection at InnoDB layer. To synchronize with trx_disconnect_from_mysql(), we use trx->mysql_thd under protection of trx_sys->mutex. */ - if (trx->mysql_thd == nullptr && trx_state_eq(trx, TRX_STATE_PREPARED) && xid->eq(trx->xid)) { /* Invalidate the XID, so that subsequent calls @@ -3314,13 +3285,16 @@ trx_t *trx_get_trx_by_xid(const XID *xid) { return (nullptr); } - trx_sys_mutex_enter(); - - /* Recovered/Resurrected transactions are always only on the - trx_sys_t::rw_trx_list. */ - trx = trx_get_trx_by_xid_low(xid); + for (ulint i = 0; i < TRX_SHARDS_N; i++) { + Trx_shard &shard = trx_sys->shards[i]; + trx = shard.active_rw_trxs.latch_and_execute( + [&](Trx_by_id_with_min &trx_by_id_with_min) { + return trx_by_id_with_min.get(xid);}, UT_LOCATION_HERE); - trx_sys_mutex_exit(); + if (trx != nullptr) { + break; + } + } return (trx); } @@ -3419,7 +3393,6 @@ void trx_start_internal_read_only_low(trx_t *trx) { void trx_set_rw_mode(trx_t *trx) /*!< in/out: transaction that is RW */ { ut_ad(trx->rsegs.m_redo.rseg == nullptr); - ut_ad(!trx->in_rw_trx_list); ut_ad(!trx_is_autocommit_non_locking(trx)); ut_ad(!trx->read_only); ut_ad(trx_can_be_handled_by_current_thread_or_is_hp_victim(trx)); @@ -3441,22 +3414,17 @@ void trx_set_rw_mode(trx_t *trx) /*!< in/out: transaction that is RW */ DEBUG_SYNC_C("trx_sys_before_assign_id"); - trx_sys_mutex_enter(); - ut_ad(trx->id == 0); - trx->id = trx_sys_allocate_trx_id(); - - trx_sys->rw_trx_ids.push_back(trx->id); + trx_sys_assign_trx_id(trx); /* So that we can see our own changes. */ if (MVCC::is_view_active(trx->read_view)) { MVCC::set_view_creator_trx_id(trx->read_view, trx->id); } - trx_add_to_rw_trx_list(trx); - - trx_sys_mutex_exit(); trx_sys_rw_trx_add(trx); + + trx_sys->next_trx_id_version++; } void trx_kill_blocking(trx_t *trx) { diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc index 5bc422daeb5..06f335c0b40 100644 --- a/storage/innobase/trx/trx0undo.cc +++ b/storage/innobase/trx/trx0undo.cc @@ -545,6 +545,7 @@ static ulint trx_undo_header_create( mach_write_to_2(log_hdr + TRX_UNDO_DEL_MARKS, true); mach_write_to_8(log_hdr + TRX_UNDO_TRX_ID, trx_id); + mach_write_to_8(log_hdr + TRX_UNDO_TRX_NO, 0); mach_write_to_2(log_hdr + TRX_UNDO_LOG_START, new_free); mach_write_to_1(log_hdr + TRX_UNDO_FLAGS, 0); @@ -688,11 +689,7 @@ void trx_undo_gtid_read_and_persist(trx_ulogf_t *undo_header) { /* Mark GTID valid. */ gtid_desc.m_is_set = true; - /* No concurrency is involved during recovery but satisfy - the interface requirement. */ - trx_sys_serialisation_mutex_enter(); gtid_persistor.add(gtid_desc); - trx_sys_serialisation_mutex_exit(); } if ((flag & TRX_UNDO_FLAG_GTID) == 0) { @@ -709,11 +706,7 @@ void trx_undo_gtid_read_and_persist(trx_ulogf_t *undo_header) { /* Mark GTID valid. */ gtid_desc.m_is_set = true; - /* No concurrency is involved during recovery but satisfy - the interface requirement. */ - trx_sys_serialisation_mutex_enter(); gtid_persistor.add(gtid_desc); - trx_sys_serialisation_mutex_exit(); } void trx_undo_gtid_write(trx_t *trx, trx_ulogf_t *undo_header, trx_undo_t *undo, @@ -904,6 +897,7 @@ static ulint trx_undo_insert_header_reuse( log_hdr = undo_page + free; mach_write_to_8(log_hdr + TRX_UNDO_TRX_ID, trx_id); + mach_write_to_8(log_hdr + TRX_UNDO_TRX_NO, 0); mach_write_to_2(log_hdr + TRX_UNDO_LOG_START, new_free); mach_write_to_1(log_hdr + TRX_UNDO_FLAGS, 0); @@ -972,6 +966,15 @@ buf_block_t *trx_undo_add_page( flst_add_last(header_page + TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST, new_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE, mtr); + + /* Stroe transaction id and hdr inforation in added page footer */ + byte* page_end = new_page + UNIV_PAGE_SIZE - FIL_PAGE_DATA_END - TRX_UNDO_PAGE_RESERVE_SIZE; + mlog_write_ull(page_end, trx->id, mtr); + page_end += 8; + mlog_write_ulint(page_end, undo->hdr_page_no, MLOG_4BYTES, mtr); + page_end += 4; + mlog_write_ulint(page_end, undo->hdr_offset, MLOG_2BYTES, mtr); + undo->size++; rseg->incr_curr_size(); @@ -1292,6 +1295,7 @@ static trx_undo_t *trx_undo_mem_init( ulint type; ulint state; trx_id_t trx_id; + trx_id_t trx_no; ulint offset; fil_addr_t last_addr; page_t *last_page; @@ -1316,6 +1320,10 @@ static trx_undo_t *trx_undo_mem_init( trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + trx_update_max_trx_id_startup(trx_id + 1); + + trx_no = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + auto flag = mtr_read_ulint(undo_header + TRX_UNDO_FLAGS, MLOG_1BYTE, mtr); bool xid_exists = ((flag & TRX_UNDO_FLAG_XID) != 0); @@ -1333,6 +1341,8 @@ static trx_undo_t *trx_undo_mem_init( undo->dict_operation = mtr_read_ulint(undo_header + TRX_UNDO_DICT_TRANS, MLOG_1BYTE, mtr); + undo->trx_no = trx_no; + undo->flag = flag; undo->m_gtid_storage = trx_undo_t::Gtid_storage::NONE; @@ -1640,15 +1650,15 @@ static trx_undo_t *trx_undo_reuse_cached(trx_rseg_t *rseg, ulint type, auto undo_page = trx_undo_page_get(page_id_t(undo->space, undo->hdr_page_no), undo->page_size, mtr); - ulint offset; - if (type == TRX_UNDO_INSERT) { + ulint offset; + if (type == TRX_UNDO_INSERT && !srv_mvcc_use_scn) { offset = trx_undo_insert_header_reuse(undo_page, trx_id, mtr); gtid_storage = trx_undo_t::Gtid_storage::NONE; } else { ut_a(mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE) == - TRX_UNDO_UPDATE); + TRX_UNDO_UPDATE || srv_mvcc_use_scn); offset = trx_undo_header_create(undo_page, trx_id, mtr); } @@ -1798,7 +1808,8 @@ func_exit: @return undo log segment header page, x-latched */ page_t *trx_undo_set_state_at_finish( trx_undo_t *undo, /*!< in: undo log memory copy */ - mtr_t *mtr) /*!< in: mtr */ + mtr_t *mtr, /*!< in: mtr */ + bool is_temp) /*!< in: true if it's tmp undo */ { trx_usegf_t *seg_hdr; trx_upagef_t *page_hdr; @@ -1816,8 +1827,7 @@ page_t *trx_undo_set_state_at_finish( if (undo->size == 1 && mach_read_from_2(page_hdr + TRX_UNDO_PAGE_FREE) < TRX_UNDO_PAGE_REUSE_LIMIT) { state = TRX_UNDO_CACHED; - - } else if (undo->type == TRX_UNDO_INSERT) { + } else if ((is_temp || !srv_mvcc_use_scn) && (undo->type == TRX_UNDO_INSERT)) { state = TRX_UNDO_TO_FREE; } else { state = TRX_UNDO_TO_PURGE; @@ -1921,25 +1931,38 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr) { trx_rseg_t *rseg; trx_undo_t *undo; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + rseg = undo_ptr->rseg; ut_ad(mutex_own(&(rseg->mutex))); - trx_purge_add_update_undo_to_history( - trx, undo_ptr, undo_page, update_rseg_history_len, n_added_logs, mtr); + trx_purge_add_undo_to_history( + trx, undo_ptr, undo_page, update_rseg_history_len, is_insert, n_added_logs, mtr); - UT_LIST_REMOVE(rseg->update_undo_list, undo); - - undo_ptr->update_undo = nullptr; + if (is_insert) { + UT_LIST_REMOVE(rseg->insert_undo_list, undo); + undo_ptr->insert_undo = nullptr; + } else { + UT_LIST_REMOVE(rseg->update_undo_list, undo); + undo_ptr->update_undo = nullptr; + } if (undo->state == TRX_UNDO_CACHED) { - UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + if (is_insert) { + UT_LIST_ADD_FIRST(rseg->insert_undo_cached, undo); + } else { + UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + } MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED); } else {