diff --git a/mysql-8.0.31/storage/innobase/btr/btr0btr.cc b/mysql-8.0.31/storage/innobase/btr/btr0btr.cc index 05a7b9a..9904e1f 100644 --- a/mysql-8.0.31/storage/innobase/btr/btr0btr.cc +++ b/mysql-8.0.31/storage/innobase/btr/btr0btr.cc @@ -1155,6 +1155,8 @@ bool btr_page_reorganize_low(bool recovery, ulint z_level, page_cur_t *cursor, ulint pos; bool log_compressed; + /* Cleanout the scn of the block if needed */ + scn_mgr->cleanout_block(current_trx(), block, mtr); #ifndef UNIV_HOTBACKUP ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table)); #endif /* !UNIV_HOTBACKUP */ @@ -2407,6 +2409,7 @@ func_start: btr_page_create(new_block, new_page_zip, cursor->index, btr_page_get_level(page), mtr); + ut_ad(new_block->is_cursor_empty()); /* 3. Calculate the first record on the upper half-page, and the first record (move_limit) on original page which ends up on the upper half */ @@ -2477,6 +2480,9 @@ func_start: has segment header and already modified in most of cases.*/ } + ut_ad(block->is_cursor_empty()); + ut_ad(new_block->is_cursor_empty()); + /* 5. Move then the records to the new page */ if (direction == FSP_DOWN) { /* fputs("Split left\n", stderr); */ @@ -2992,6 +2998,8 @@ bool btr_compress(btr_cur_t *cursor, bool adjust, mtr_t *mtr) { page = btr_cur_get_page(cursor); index = cursor->index; + scn_mgr->cleanout_block(current_trx(), block, mtr); + btr_assert_not_corrupted(block, index); #ifdef UNIV_DEBUG diff --git a/mysql-8.0.31/storage/innobase/btr/btr0cur.cc b/mysql-8.0.31/storage/innobase/btr/btr0cur.cc index 1e8e68c..6685b79 100644 --- a/mysql-8.0.31/storage/innobase/btr/btr0cur.cc +++ b/mysql-8.0.31/storage/innobase/btr/btr0cur.cc @@ -2836,6 +2836,10 @@ dberr_t btr_cur_optimistic_insert( *rec = page_cur_tuple_insert(page_cursor, entry, index, offsets, heap, mtr); + + if (*rec && thr) { + scn_mgr->add_instant_cursor(thr_get_trx(thr), block, *rec, index, *offsets); + } } reorg = page_cursor_rec != page_cur_get_rec(page_cursor); @@ -3015,6 +3019,15 @@ dberr_t btr_cur_pessimistic_insert( } } + bool record_scn = (index->is_clustered() && !index->table->is_intrinsic()); + + buf_block_t *block = btr_cur_get_block(cursor); + if (record_scn) { + scn_mgr->cleanout_block((thr != nullptr ? thr_get_trx(thr) : nullptr), block, mtr); + } + + ut_ad(block->is_cursor_empty()); + if (dict_index_get_page(index) == btr_cur_get_block(cursor)->page.id.page_no()) { /* The page is the root page */ @@ -3027,6 +3040,11 @@ dberr_t btr_cur_pessimistic_insert( return DB_OUT_OF_FILE_SPACE; } + if (record_scn && thr) { + scn_mgr->add_instant_cursor(thr_get_trx(thr), btr_cur_get_block(cursor), + *rec, index, *offsets); + } + ut_ad(page_rec_get_next(btr_cur_get_rec(cursor)) == *rec || dict_index_is_spatial(index)); @@ -3116,6 +3134,17 @@ dberr_t btr_cur_pessimistic_insert( } } + { + trx_t *trx = nullptr; + if (thr == nullptr) { + trx = current_trx(); + } else { + trx = thr_get_trx(thr); + } + + scn_mgr->set_scn(trx != nullptr ? trx->id : 0, mtr, btr_cur_get_rec(cursor), index, offsets); + } + /* Append the info about the update in the undo log */ return (trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr, index, @@ -3424,6 +3453,10 @@ dberr_t btr_cur_update_in_place(ulint flags, btr_cur_t *cursor, ulint *offsets, btr_cur_update_in_place_log(flags, rec, index, update, trx_id, roll_ptr, mtr); + if (thr) { + scn_mgr->add_instant_cursor(thr_get_trx(thr), block, rec, index, offsets); + } + if (was_delete_marked && !rec_get_deleted_flag(rec, page_is_comp(buf_block_get_frame(block)))) { /* The new updated record owns its possible externally @@ -3659,6 +3692,10 @@ dberr_t btr_cur_optimistic_update(ulint flags, btr_cur_t *cursor, rec = btr_cur_insert_if_possible(cursor, new_entry, offsets, heap, mtr); ut_a(rec); /* <- We calculated above the insert would fit */ + if (thr) { + scn_mgr->add_instant_cursor(thr_get_trx(thr), block, rec, index, *offsets); + } + /* Restore the old explicit lock state on the record */ if (!dict_table_is_locking_disabled(index->table)) { lock_rec_restore_from_page_infimum(block, rec, block); @@ -4274,6 +4311,8 @@ dberr_t btr_cur_del_mark_set_clust_rec( return (err); } + scn_mgr->set_scn(thr_get_trx(thr)->id, mtr, rec, index, offsets); + err = trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr, index, entry, nullptr, 0, rec, offsets, &roll_ptr); @@ -4606,6 +4645,10 @@ bool btr_cur_pessimistic_delete(dberr_t *err, bool has_reserved_extents, Scoped_heap heap_ptr(1024, UT_LOCATION_HERE); mem_heap_t *heap = heap_ptr.get(); + if (index->is_clustered()) { + scn_mgr->cleanout_block(nullptr, block, mtr); + } + rec = btr_cur_get_rec(cursor); #ifdef UNIV_ZIP_DEBUG page_zip_des_t *page_zip = buf_block_get_page_zip(block); diff --git a/mysql-8.0.31/storage/innobase/buf/buf0buf.cc b/mysql-8.0.31/storage/innobase/buf/buf0buf.cc index 267e1e6..3b91c2d 100644 --- a/mysql-8.0.31/storage/innobase/buf/buf0buf.cc +++ b/mysql-8.0.31/storage/innobase/buf/buf0buf.cc @@ -762,6 +762,9 @@ static void buf_block_init( block->page.m_version = 0; block->modify_clock = 0; + block->layout_clock = 0; + block->lazy_recs->clear(); + block->instant_recs->clear(); ut_d(block->page.file_page_was_freed = false); @@ -1046,6 +1049,9 @@ static buf_chunk_t *buf_chunk_init( block = chunk->blocks; for (i = chunk->size; i--;) { + block->lazy_recs = new LazyCleanoutRecs(); + block->instant_recs = new InstantCleanoutRecs(); + buf_block_init(buf_pool, block, frame); UNIV_MEM_INVALID(block->frame, UNIV_PAGE_SIZE); @@ -1401,6 +1407,9 @@ static void buf_pool_free_instance(buf_pool_t *buf_pool) { mutex_free(&block->mutex); rw_lock_free(&block->lock); + delete block->lazy_recs; + delete block->instant_recs; + ut_d(rw_lock_free(&block->debug_latch)); } @@ -1616,6 +1625,7 @@ static bool buf_page_realloc(buf_pool_t *buf_pool, buf_block_t *block) { ut_ad(new_block->page.in_page_hash); + block->clear_cursor(); buf_block_modify_clock_inc(block); memset(block->frame + FIL_PAGE_OFFSET, 0xff, 4); memset(block->frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, 0xff, 4); @@ -2398,6 +2408,8 @@ withdraw_retry: for (ulint j = chunk->size; j--; block++) { mutex_free(&block->mutex); rw_lock_free(&block->lock); + delete block->lazy_recs; + delete block->instant_recs; ut_d(rw_lock_free(&block->debug_latch)); } @@ -3352,6 +3364,8 @@ static inline void buf_block_init_low( block->index = nullptr; block->made_dirty_with_no_latch = false; + block->lazy_recs->clear(); + block->instant_recs->clear(); block->n_hash_helps = 0; block->n_fields = 1; block->n_bytes = 0; @@ -3589,6 +3603,10 @@ struct Buf_fetch { mtr_t *m_mtr{}; /** Mark page as dirty even if page is being pinned without any latch. */ bool m_dirty_with_no_latch{}; + /** Allow reading a freed block and return null */ + bool m_allow_freed{}; + /** True if reading a freed block */ + bool m_freed_flag {}; /** Number of retries before giving up. */ size_t m_retries{}; /** Buffer pool to fetch from. */ @@ -3645,6 +3663,10 @@ dberr_t Buf_fetch_normal::get(buf_block_t *&block) noexcept { /* Page not in buf_pool: needs to be read from file */ read_page(); + + if (m_freed_flag) { + return (DB_NOT_FOUND); + } } return DB_SUCCESS; @@ -4044,6 +4066,9 @@ void Buf_fetch::read_page() { buf_read_ahead_random(m_page_id, m_page_size, ibuf_inside(m_mtr)); } m_retries = 0; + } else if (m_allow_freed) { + m_freed_flag = true; + return; } else if (m_retries < BUF_PAGE_READ_MAX_RETRIES) { ++m_retries; @@ -4298,7 +4323,7 @@ buf_block_t *Buf_fetch::single_page() { #endif /* UNIV_DEBUG */ ut_ad(m_mode == Page_fetch::POSSIBLY_FREED || - !block->page.file_page_was_freed); + !block->page.file_page_was_freed || m_allow_freed); /* Check if this is the first access to the page */ const auto access_time = buf_page_is_accessed(&block->page); @@ -4366,7 +4391,7 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, const page_size_t &page_size, ulint rw_latch, buf_block_t *guess, Page_fetch mode, ut::Location location, mtr_t *mtr, - bool dirty_with_no_latch) { + bool dirty_with_no_latch, bool allow_freed) { #ifdef UNIV_DEBUG ut_ad(mtr->is_active()); @@ -4410,6 +4435,8 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, fetch.m_line = location.line; fetch.m_mtr = mtr; fetch.m_dirty_with_no_latch = dirty_with_no_latch; + fetch.m_allow_freed = allow_freed; + fetch.m_freed_flag = false; return (fetch.single_page()); @@ -4423,6 +4450,8 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, fetch.m_line = location.line; fetch.m_mtr = mtr; fetch.m_dirty_with_no_latch = dirty_with_no_latch; + fetch.m_allow_freed = allow_freed; + fetch.m_freed_flag = false; return (fetch.single_page()); } diff --git a/mysql-8.0.31/storage/innobase/ddl/ddl0ddl.cc b/mysql-8.0.31/storage/innobase/ddl/ddl0ddl.cc index 246cc29..d8e85d2 100644 --- a/mysql-8.0.31/storage/innobase/ddl/ddl0ddl.cc +++ b/mysql-8.0.31/storage/innobase/ddl/ddl0ddl.cc @@ -252,6 +252,8 @@ dict_index_t *create_index(trx_t *trx, dict_table_t *table, return nullptr; } + trx->add_scn_index(table->id, index->id); + if (dict_index_is_spatial(index)) { index->fill_srid_value(index_def->m_srid, index_def->m_srid_is_valid); } diff --git a/mysql-8.0.31/storage/innobase/dict/dict0crea.cc b/mysql-8.0.31/storage/innobase/dict/dict0crea.cc index 94852c3..0ccf6e1 100644 --- a/mysql-8.0.31/storage/innobase/dict/dict0crea.cc +++ b/mysql-8.0.31/storage/innobase/dict/dict0crea.cc @@ -380,6 +380,9 @@ void dict_build_index_def(const dict_table_t *table, /*!< in: table */ /* Note that the index was created by this transaction. */ index->trx_id = trx->id; + + /* Init to zero, and will be set while committing the transaction */ + index->trx_scn = 0; } /** Creates an index tree for the index if it is not a member of a cluster. diff --git a/mysql-8.0.31/storage/innobase/dict/dict0dd.cc b/mysql-8.0.31/storage/innobase/dict/dict0dd.cc index 1eab513..e36cfee 100644 --- a/mysql-8.0.31/storage/innobase/dict/dict0dd.cc +++ b/mysql-8.0.31/storage/innobase/dict/dict0dd.cc @@ -5031,6 +5031,11 @@ dict_table_t *dd_open_table_one(dd::cache::Dictionary_client *client, index->id = id; index->trx_id = trx_id; + /* Take safe scn FROM purge system. It should be fine as the index is firstly + loaded and should be visible to all sessions */ + index->trx_scn = purge_sys->version.load(); + ut_a(index->trx_scn > 0); + /** Look up the spatial reference system in the dictionary. Since this may cause a table open to read the dictionary tables, it must be done while not holding diff --git a/mysql-8.0.31/storage/innobase/dict/dict0dict.cc b/mysql-8.0.31/storage/innobase/dict/dict0dict.cc index 9cb899c..5faad38 100644 --- a/mysql-8.0.31/storage/innobase/dict/dict0dict.cc +++ b/mysql-8.0.31/storage/innobase/dict/dict0dict.cc @@ -2481,6 +2481,7 @@ dberr_t dict_index_add_to_cache_w_vcol(dict_table_t *table, dict_index_t *index, } new_index->n_total_fields = new_index->n_def; new_index->trx_id = index->trx_id; + new_index->trx_scn = index->trx_scn; new_index->set_committed(index->is_committed()); new_index->allow_duplicates = index->allow_duplicates; new_index->nulls_equal = index->nulls_equal; @@ -2634,6 +2635,44 @@ dberr_t dict_index_add_to_cache_w_vcol(dict_table_t *table, dict_index_t *index, return (DB_SUCCESS); } +/** Get index object by table id and index id +@param[in] table_id table id +@param[in] index_id index id +@return index, NULL if does not exist */ +dict_index_t *dict_index_get_by_id(table_id_t table_id, space_index_t index_id) { + /** Get table object by id */ + mutex_enter(&dict_sys->mutex); + dict_table_t *table = nullptr; + + HASH_SEARCH(id_hash, dict_sys->table_id_hash, ut::hash_uint64(table_id), + dict_table_t *, table, + ut_ad(table->cached), table->id == table_id); + + if (table == nullptr) { + mutex_exit(&dict_sys->mutex); + return nullptr; + } + + table->acquire(); + + mutex_exit(&dict_sys->mutex); + + /* Get the index object by id */ + dict_index_t *index = table->first_index(); + while (index != nullptr) { + if (index->id == index_id) { + /* Table will be released by caller */ + return index; + } + + index = index->next(); + } + + table->release(); + + return nullptr; +} + /** Removes an index from the dictionary cache. */ static void dict_index_remove_from_cache_low( dict_table_t *table, /*!< in/out: table */ diff --git a/mysql-8.0.31/storage/innobase/dict/dict0mem.cc b/mysql-8.0.31/storage/innobase/dict/dict0mem.cc index 0a2e184..1f16fe6 100644 --- a/mysql-8.0.31/storage/innobase/dict/dict0mem.cc +++ b/mysql-8.0.31/storage/innobase/dict/dict0mem.cc @@ -611,10 +611,12 @@ bool dict_index_t::is_usable(const trx_t *trx) const { return false; } + ut_ad(trx_scn > 0 || trx_id == 0 || table->is_intrinsic()); + /* Check if the specified transaction can see this index. */ return (table->is_temporary() || trx_id == 0 || !MVCC::is_view_active(trx->read_view) || - trx->read_view->changes_visible(trx_id, table->name)); + (trx_scn > 0 && trx->read_view->sees_version(trx_scn))); } #endif /* !UNIV_HOTBACKUP */ diff --git a/mysql-8.0.31/storage/innobase/handler/ha_innodb.cc b/mysql-8.0.31/storage/innobase/handler/ha_innodb.cc index f71797d..9c510a0 100644 --- a/mysql-8.0.31/storage/innobase/handler/ha_innodb.cc +++ b/mysql-8.0.31/storage/innobase/handler/ha_innodb.cc @@ -772,6 +772,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { PSI_MUTEX_KEY(trx_sys_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_shard_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_serialisation_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(trx_sys_mvcc_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(zip_pad_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(master_key_id_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(sync_array_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -1297,6 +1298,9 @@ static SHOW_VAR innodb_status_variables[] = { {"undo_tablespaces_active", (char *)&export_vars.innodb_undo_tablespaces_active, SHOW_LONG, SHOW_SCOPE_GLOBAL}, + {"scn_cleanout_records", + (char *)&export_vars.innodb_scn_cleanout_records, SHOW_LONG, + SHOW_SCOPE_GLOBAL}, #ifdef UNIV_DEBUG {"purge_trx_id_age", (char *)&export_vars.innodb_purge_trx_id_age, SHOW_LONG, SHOW_SCOPE_GLOBAL}, @@ -2840,6 +2844,14 @@ trx_t *check_trx_exists(THD *thd) /*!< in: user thread handle */ return (trx); } +trx_t* current_trx() { + THD *thd = current_thd; + if (likely(thd != nullptr) && innodb_hton_ptr->slot != HA_SLOT_UNDEF) { + return thd_to_trx(thd); + } + return nullptr; +} + /** InnoDB transaction object that is currently associated with THD is replaced with that of the 2nd argument. The previous value is returned through the 3rd argument's buffer, unless it's NULL. When @@ -23130,6 +23142,17 @@ char **thd_innodb_interpreter(THD *thd) { } #endif /* UNIV_DEBUG */ +static MYSQL_SYSVAR_BOOL(trx_cache_cleanout_page, + opt_trx_cache_cleanout_page, + PLUGIN_VAR_RQCMDARG, + "Cache affected pages that need to write back scn", + NULL, NULL, false); + +static MYSQL_SYSVAR_UINT(cleanout_threads, srv_cleanout_threads, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "Number of threads to cleanout SCN", + NULL, NULL, 8, 1, 256, 0); + static SYS_VAR *innobase_system_variables[] = { MYSQL_SYSVAR(api_trx_level), MYSQL_SYSVAR(api_bk_commit_interval), @@ -23350,6 +23373,8 @@ static SYS_VAR *innobase_system_variables[] = { #endif /* UNIV_DEBUG */ MYSQL_SYSVAR(parallel_read_threads), MYSQL_SYSVAR(segment_reserve_factor), + MYSQL_SYSVAR(cleanout_threads), + MYSQL_SYSVAR(trx_cache_cleanout_page), nullptr}; mysql_declare_plugin(innobase){ diff --git a/mysql-8.0.31/storage/innobase/handler/handler0alter.cc b/mysql-8.0.31/storage/innobase/handler/handler0alter.cc index 89c21f8..d01c7c5 100644 --- a/mysql-8.0.31/storage/innobase/handler/handler0alter.cc +++ b/mysql-8.0.31/storage/innobase/handler/handler0alter.cc @@ -3213,9 +3213,9 @@ static void online_retry_drop_dict_indexes(dict_table_t *table, bool locked) { /* Since the table has been modified, table->def_trx_id should be adjusted like ddl::drop_indexes(). However, this function may be called before the DDL transaction starts, so it is impossible to - get current DDL transaction ID. Thus advancing def_trx_id by 1 to + get current DDL transaction ID. Thus advancing def_trx_id by 2 to simply inform other threads about this change. */ - ++table->def_trx_id; + table->def_trx_id += 2; reset_column_ord_part(table); } diff --git a/mysql-8.0.31/storage/innobase/include/buf0buf.h b/mysql-8.0.31/storage/innobase/include/buf0buf.h index fe53f10..5160396 100644 --- a/mysql-8.0.31/storage/innobase/include/buf0buf.h +++ b/mysql-8.0.31/storage/innobase/include/buf0buf.h @@ -417,7 +417,14 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, const page_size_t &page_size, ulint rw_latch, buf_block_t *guess, Page_fetch mode, ut::Location location, mtr_t *mtr, - bool dirty_with_no_latch = false); + bool dirty_with_no_latch = false, bool allow_freed = false); + +inline buf_block_t *buf_page_get_allow_freed(const page_id_t &id, const page_size_t &size, + ulint latch, ut::Location location, + mtr_t *mtr) { + return buf_page_get_gen(id, size, latch, nullptr, Page_fetch::NORMAL, + location, mtr, false, true); +} /** NOTE! The following macros should be used instead of buf_page_get_gen, to improve debugging. Only values RW_S_LATCH and RW_X_LATCH are allowed @@ -1659,6 +1666,20 @@ class buf_page_t { #endif /* !UNIV_HOTBACKUP */ }; +#include +#include + +typedef std::unordered_map> LazyCleanoutRecs; + +struct UndoHdrPos { + trx_id_t id; + space_id_t undo_space; + page_no_t undo_page_no; + ulint undo_hdr_offset; +}; + +typedef std::unordered_map InstantCleanoutRecs; + /** The buffer control block structure */ struct buf_block_t { @@ -1803,6 +1824,10 @@ struct buf_block_t { or (3) the block must belong to an intrinsic table */ uint64_t modify_clock; + /** Clock counter increases when any record position on the page is + changed. */ + uint64_t layout_clock; + /** @} */ /** mutex protecting this block: state (also protected by the buffer @@ -1810,6 +1835,45 @@ struct buf_block_t { new mutex in InnoDB-5.1 to relieve contention on the buffer pool mutex */ BPageMutex mutex; + /** The record position set that need to write back SCN + * into record with redo log, added while reading record */ + LazyCleanoutRecs* lazy_recs; + + /** The records that need to write back SCN after transaction + * is committed, added while DML record */ + InstantCleanoutRecs* instant_recs; + + /** Clear the set */ + void clear_cursor() { + mutex_enter(&mutex); + lazy_recs->clear(); + instant_recs->clear(); + layout_clock++; + mutex_exit(&mutex); + } + + /** Add record to set */ + void add_lazy_cursor(byte *pos, trx_id_t id, trx_id_t scn) { + mutex_enter(&mutex); + lazy_recs->insert(std::make_pair(pos, std::make_pair(id, scn))); + mutex_exit(&mutex); + } + + /** Add record to set */ + void add_instant_cursor(byte *pos, struct UndoHdrPos &undo_hdr) { + mutex_enter(&mutex); + instant_recs->insert(std::make_pair(pos, undo_hdr)); + mutex_exit(&mutex); + } + + /** Check if cursor cache is empty */ + bool is_cursor_empty() { + mutex_enter(&mutex); + bool ret = (instant_recs->empty() && lazy_recs->empty()); + mutex_exit(&mutex); + return ret; + } + /** Get the modified clock (version) value. @param[in] single_threaded Thread can only be written to or read by a single thread @@ -1833,6 +1897,10 @@ struct buf_block_t { @return page number of the current buffer block. */ page_no_t get_page_no() const { return (page.id.page_no()); } + /** Get the space id of the current buffer block. + @return space id of the current buffer block. */ + space_id_t get_space_id() const { return (page.id.space()); } + /** Get the next page number of the current buffer block. @return next page number of the current buffer block. */ page_no_t get_next_page_no() const { diff --git a/mysql-8.0.31/storage/innobase/include/buf0types.h b/mysql-8.0.31/storage/innobase/include/buf0types.h index 12d9d2f..c48e382 100644 --- a/mysql-8.0.31/storage/innobase/include/buf0types.h +++ b/mysql-8.0.31/storage/innobase/include/buf0types.h @@ -233,6 +233,16 @@ class page_id_t { page_id_t(space_id_t space, page_no_t page_no) : m_space(space), m_page_no(page_no) {} + page_id_t(uint64_t compact_value) { + m_space = (compact_value >> 32); + m_page_no = (compact_value & 0xFFFFFFFF); + } + + /** Compact two uint32_t into one uint64_t */ + uint64_t compact_value() const { + return (((uint64_t)m_space) << 32 | m_page_no); + } + /** Retrieve the tablespace id. @return tablespace id */ inline space_id_t space() const { return (m_space); } diff --git a/mysql-8.0.31/storage/innobase/include/dict0dict.h b/mysql-8.0.31/storage/innobase/include/dict0dict.h index 80e6933..251f199 100644 --- a/mysql-8.0.31/storage/innobase/include/dict0dict.h +++ b/mysql-8.0.31/storage/innobase/include/dict0dict.h @@ -1666,6 +1666,12 @@ table or not. @return true if successful, false otherwise */ bool dict_sys_table_id_build(); +/** Get index object by table id and index id +@param[in] table_id table id +@param[in] index_id index id +@return index, NULL if does not exist */ +dict_index_t *dict_index_get_by_id(table_id_t table_id, space_index_t index_id); + /** Change the table_id of SYS_* tables if they have been created after an earlier upgrade. This will update the table_id by adding DICT_MAX_DD_TABLES */ diff --git a/mysql-8.0.31/storage/innobase/include/dict0mem.h b/mysql-8.0.31/storage/innobase/include/dict0mem.h index bf581fc..e6392d8 100644 --- a/mysql-8.0.31/storage/innobase/include/dict0mem.h +++ b/mysql-8.0.31/storage/innobase/include/dict0mem.h @@ -1209,6 +1209,9 @@ struct dict_index_t { when InnoDB was started up */ trx_id_t trx_id; + /** SCN of related trx_id */ + trx_id_t trx_scn; + /** Information about state of compression failures and successes */ zip_pad_info_t zip_pad; diff --git a/mysql-8.0.31/storage/innobase/include/lock0lock.h b/mysql-8.0.31/storage/innobase/include/lock0lock.h index 2661a4a..bff954e 100644 --- a/mysql-8.0.31/storage/innobase/include/lock0lock.h +++ b/mysql-8.0.31/storage/innobase/include/lock0lock.h @@ -595,6 +595,7 @@ bool lock_clust_rec_cons_read_sees( passed over by a read cursor */ dict_index_t *index, /*!< in: clustered index */ const ulint *offsets, /*!< in: rec_get_offsets(rec, index) */ + btr_pcur_t *pcur, /*!< in: cursor of rec, or NULL */ ReadView *view); /*!< in: consistent read view */ /** Checks that a non-clustered index record is seen in a consistent read. diff --git a/mysql-8.0.31/storage/innobase/include/read0read.h b/mysql-8.0.31/storage/innobase/include/read0read.h index 9fdcaef..4d556f7 100644 --- a/mysql-8.0.31/storage/innobase/include/read0read.h +++ b/mysql-8.0.31/storage/innobase/include/read0read.h @@ -39,6 +39,8 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0types.h" #include "univ.i" +#define MAX_SNAPSHOT_SIZE 128 + /** The MVCC read view manager */ class MVCC { public: @@ -62,12 +64,6 @@ class MVCC { @param own_mutex true if caller owns trx_sys_t::mutex */ void view_close(ReadView *&view, bool own_mutex); - /** - Release a view that is inactive but not closed. Caller must own - the trx_sys_t::mutex. - @param view View to release */ - void view_release(ReadView *&view); - /** Clones the oldest view and stores it in view. No need to call view_close(). The caller owns the view that is passed in. It will also move the closed views from the m_views list to the @@ -78,7 +74,7 @@ class MVCC { /** @return the number of active views */ - ulint size() const; + ulint size(); /** @return true if the view is active and valid */ @@ -102,21 +98,20 @@ class MVCC { private: /** Validates a read view list. */ - bool validate() const; + bool validate(uint64_t slot) const; /** Find a free view from the active list, if none found then allocate a new view. This function will also attempt to move delete marked views from the active list to the freed list. @return a view to use */ - inline ReadView *get_view(); + inline ReadView *get_view(uint64_t slot); /** Get the oldest view in the system. It will also move the delete marked read views from the views list to the freed list. @return oldest view if found or NULL */ - inline ReadView *get_oldest_view() const; - ReadView *get_view_created_by_trx_id(trx_id_t trx_id) const; + inline trx_id_t get_oldest_version() ; private: // Prevent copying @@ -124,14 +119,27 @@ class MVCC { MVCC &operator=(const MVCC &); private: + + void enter(uint64_t slot); + + void exit(uint64_t slot); + + uint64_t get_slot() { + return (m_slot_index++) % MAX_SNAPSHOT_SIZE; + } + + std::atomic m_slot_index; + typedef UT_LIST_BASE_NODE_T(ReadView, m_view_list) view_list_t; /** Free views ready for reuse. */ - view_list_t m_free; + view_list_t m_free[MAX_SNAPSHOT_SIZE]; /** Active and closed views, the closed views will have the creator trx id set to TRX_ID_MAX */ - view_list_t m_views; + view_list_t m_views[MAX_SNAPSHOT_SIZE]; + + ib_mutex_t m_mutexs[MAX_SNAPSHOT_SIZE]; }; #endif /* read0read_h */ diff --git a/mysql-8.0.31/storage/innobase/include/read0types.h b/mysql-8.0.31/storage/innobase/include/read0types.h index 6563a21..648635a 100644 --- a/mysql-8.0.31/storage/innobase/include/read0types.h +++ b/mysql-8.0.31/storage/innobase/include/read0types.h @@ -155,36 +155,37 @@ class ReadView { @param[in] name table name */ static void check_trx_id_sanity(trx_id_t id, const table_name_t &name); - /** Check whether the changes by id are visible. - @param[in] id transaction id to check against the view - @param[in] name table name - @return whether the view sees the modifications of id. */ - [[nodiscard]] bool changes_visible(trx_id_t id, - const table_name_t &name) const { - ut_ad(id > 0); - - if (id < m_up_limit_id || id == m_creator_trx_id) { - return (true); - } - - check_trx_id_sanity(id, name); - - if (id >= m_low_limit_id) { - return (false); - - } else if (m_ids.empty()) { - return (true); - } + /** + @param id transaction to check + @return true if view sees transaction id */ + bool sees(trx_id_t id) const { return (id < m_up_limit_id); } - const ids_t::value_type *p = m_ids.data(); + /** Check whether the changes on record are visible. + @param[in] index index object + @param[in] block the block that contains record + @param[in] rec clust record + @param[in] offsets offset of the record + @return whether the view sees */ + bool changes_visible( + const dict_index_t *index, buf_block_t *block, + const rec_t *rec, const ulint *offsets); - return (!std::binary_search(p, p + m_ids.size(), id)); + /** + @param scn scn to check + @return true if view sees transaction scn */ + bool sees_version(trx_id_t scn) const { + if (scn == TRX_ID_MAX) return false; + return (m_version > scn); } /** - @param id transaction to check - @return true if view sees transaction id */ - bool sees(trx_id_t id) const { return (id < m_up_limit_id); } + @return version number of the view */ + trx_id_t version() { return m_version; } + + /** Store trx pointer which create this read view */ + void set_trx(trx_t *trx) { + m_trx = trx; + } /** Mark the view as closed */ @@ -197,6 +198,14 @@ class ReadView { @return true if the view is closed */ bool is_closed() const { return (m_closed); } + uint64_t get_slot() { + return m_slot; + } + + void set_slot(uint64_t slot) { + m_slot = slot; + } + /** Write the limits to the file. @param file file to write to */ @@ -216,6 +225,10 @@ class ReadView { ut_d(m_view_low_limit_no = m_low_limit_no); m_low_limit_no = trx_no; } + + if (m_low_limit_no < m_version) { + m_version = m_low_limit_no; + } } /** @@ -254,14 +267,8 @@ class ReadView { inline void prepare(trx_id_t id); /** - Copy state from another view. Must call copy_complete() to finish. - @param other view to copy from */ - inline void copy_prepare(const ReadView &other); - - /** - Complete the copy, insert the creator transaction id into the - m_trx_ids too and adjust the m_up_limit_id *, if required */ - inline void copy_complete(); + Copy state from another view. Must call copy_complete() to finish. */ + inline void copy_prepare(trx_id_t version); /** Set the creator transaction id, existing id must be 0 */ @@ -308,6 +315,15 @@ class ReadView { trx_id_t m_view_low_limit_no; #endif /* UNIV_DEBUG */ + /** The transaction that opens this read view */ + trx_t *m_trx; + + /** Version of the snapshot */ + trx_id_t m_version; + + /** The array index of mvcc */ + uint64_t m_slot; + /** AC-NL-RO transaction view that has been "closed". */ bool m_closed; @@ -318,4 +334,410 @@ class ReadView { node_t m_view_list; }; +/** A simple spin read-write lock implemention */ +const int32_t MAX_LOCK_WORD = 163840; //max concurrency +class SpinLock { +public: + SpinLock() { + m_lock_word = MAX_LOCK_WORD; + } + + ~SpinLock() {}; + + bool try_read_lock() { + if (m_lock_word.load(std::memory_order_relaxed) <= 0) { + /* Someone is modifying, can't read */ + return false; + } + + uint32_t old_val = m_lock_word.fetch_sub(1); + if (old_val <= 0) { + m_lock_word.fetch_add(1); + + return false; + } + + return true; + } + + void read_lock() { + while (!try_read_lock()) { + ut_delay(1); + } + } + + void read_unlock() { + m_lock_word.fetch_add(1); + } + + bool try_write_lock() { + if (m_lock_word.load(std::memory_order_relaxed) != MAX_LOCK_WORD) { + /* Someone is reading, can't modify */ + return false; + } + + if (m_lock_word.fetch_sub(MAX_LOCK_WORD) != MAX_LOCK_WORD) { + m_lock_word.fetch_add(MAX_LOCK_WORD); + return false; + } + + return true; + } + + void write_lock() { + m_lock_word.fetch_sub(MAX_LOCK_WORD); + + while (m_lock_word.load() != 0) { + ut_delay(1); + } + } + + void write_unlock() { + m_lock_word.fetch_add(MAX_LOCK_WORD); + } +private: + std::atomic m_lock_word; +}; + +#define SCN_MAP_MAX_SIZE (1 * 1024 * 1024) + +#define LF_ARRAY_MAX_SIZE 16384 + +/** Multiple producer-one consumer lock free array */ +class LF_Array { + public: + LF_Array(uint64_t size) { + m_size = size; + m_array = new std::atomic [m_size]; + m_free_index = 0; + m_consume_index = 0; + + init(); + } + + ~LF_Array() { + delete [] m_array; + } + + void init() { + for (uint64_t i = 0; i < m_size; i++) { + m_array[i] = 0; + } + } + + /* Multiple-producer */ + bool add(uint64_t value) { + int count = 0; + while (count++ < 10) { + uint64_t free_index = m_free_index.load(); + uint64_t comsume_index = m_consume_index.load(); + + if (free_index - comsume_index == m_size) { + /* array is full, return directly */ + return false; + } + + if (!m_free_index.compare_exchange_weak(free_index, free_index + 1)) { + continue; + } + + m_array[free_index % m_size] = value; + + return true; + } + + return false; + } + + /* One-consumer, + return value or 0 of empty */ + uint64_t get() { + uint64_t consume_index = m_consume_index.load(); + if (consume_index == m_free_index.load()) { + /*empty */ + return 0; + } + + auto idx = consume_index % m_size; + uint64_t value = m_array[idx].load(); + if (value == 0) { + /* Being added by producer, try again later */ + return 0; + } + + /* take value and reset it to zero */ + if (!m_array[idx].compare_exchange_weak(value, 0)) { + return 0; + } + + m_consume_index++; + + return value; + } + + private: + uint64_t m_size; + + std::atomic *m_array; + + std::atomic m_consume_index; + + std::atomic m_free_index; +}; + +/** A map used to store mapping of trx id to scn. */ +class Scn_Map { + public: + Scn_Map(); + ~Scn_Map(); + + struct Elem { + public: + Elem() { + m_id = 0; + m_scn = 0; + new (&m_lock) SpinLock(); + } + + bool store(trx_id_t id, trx_id_t scn) { + if (!m_lock.try_write_lock()) { + return false; + } + + /* Now safe to store */ + m_id = id; + m_scn = scn; + + m_lock.write_unlock(); + + return true; + } + + trx_id_t read(trx_id_t id) { + trx_id_t ret = 0; + if (m_id != id) { + /* quick checking */ + return ret; + } + + if (!m_lock.try_read_lock()) { + return ret; + } + + if (id != m_id) { + m_lock.read_unlock(); + return ret; + } + + ret = m_scn; + + m_lock.read_unlock(); + return ret; + } + + SpinLock m_lock; + trx_id_t m_id; + trx_id_t m_scn; + }; + + inline bool store(trx_id_t id, trx_id_t scn) { + return m_elems[(id/2) % m_size].store(id, scn); + } + + inline trx_id_t read(trx_id_t id) { + return m_elems[(id/2) % m_size].read(id); + } + +private: + struct Elem* m_elems; + uint64_t m_size; +}; + +/** Handler of SCN Manager */ +class SCN_Mgr { +public: + + /** Constructer */ + SCN_Mgr(); + + /** Destructor */ + ~SCN_Mgr(); + + class CleanoutWorker { + public: + CleanoutWorker(); + ~CleanoutWorker(); + + void add_page(uint64_t compact_page_id); + + void add_pages(PageSets &pages); + + void take_pages(PageSets &pages); + + uint64_t reset() { + return os_event_reset(m_event); + } + + void wait(uint64_t sig_count) { + os_event_wait_time_low(m_event, std::chrono::seconds{1}, sig_count); + } + + void wakeup() { + os_event_set(m_event); + } + + private: + LF_Array* m_pages; + + ib_mutex_t m_mutex; + + os_event_t m_event; + }; + + void init(); + + /** + @return true if it's SCN number */ + static bool is_scn(trx_id_t id) { + return (id % 2 != 0); + } + + /** Store scn of the transaction for fast lookup + @param[in] id transaction id + @param[in] scn transaction no while committing + @return true if success */ + bool store_scn(trx_id_t id, trx_id_t scn) { + return m_scn_map->store(id, scn); + } + + /** Quickly lookup scn of relative transaction id + @param[in] id transaction id + @return TRX_ID_MAX if still active, or 0 if not found, or scn value */ + trx_id_t get_scn_fast(trx_id_t id); + + /** Get SCN with relative transaction id + @param[in] id transaction id + @param[in] index index object where roll_ptr resides on + @param[in] roll_ptr rollback pointer of clust record */ + trx_id_t get_scn(trx_id_t id, const dict_index_t *index, roll_ptr_t roll_ptr); + + /** Write SCN to clust record + @param[in] current_id trx id of current session + @param[in] mtr mini transaction + @param[in] rec target record + @param[in] index index object + @param[in] offsets offset array for the record + */ + void set_scn(trx_id_t current_id, mtr_t *mtr, rec_t *rec, dict_index_t *index, const ulint *offsets); + + /** Add cursor to background set which needs to be cleanout + @param[in] trx the transaction that reads scn or null + @param[in] block the block object where record resides in + @param[in] pos the start position where scn should be written to + @param[in] id transaction id + @param[in] scn scn of id */ + void add_lazy_cursor(trx_t *trx, buf_block_t *block, byte *pos, trx_id_t id, trx_id_t scn); + + /** Add cursor to block cache + @param[in] trx the transaction that reads scn or null + @param[in] block the block object where record resides in + @param[in] rec clust record + @param[in] index clust index + @param[in] offsets offset array for the record. */ + void add_instant_cursor(trx_t *trx, buf_block_t *block, rec_t *rec, dict_index_t *index, const ulint *offsets); + + /** Move records from instant cache to lazy cache if possible + @param[in] block the block object + @param[in] page_id page id of block + @param[in] page_size page size of block + @param[in] mtr mini transaction + @return true if new cursors are added and need to be processed */ + bool instant_to_lazy(buf_block_t *block, page_id_t &page_id, page_size_t &page_size, mtr_t *mtr); + + /** Add pages that need to be lazily cleanout and collected by transaction. + @param[in] trx the transaction that reads scn */ + void add_pages(trx_t *trx); + + /** Get offset where scn is stored + @param[in] index index object + @param[in] offsets offset array of the clust record + @return offset where scn is stored */ + ulint scn_offset(const dict_index_t *index, const ulint *offsets); + + /** Start background threads */ + void start(); + + /** Stop background threads */ + void stop(); + + /** Cleanout records on one block + @param[in] trx the transaction that invokes this function + @param[in] block the block object to be cleanout + @param[in] mtr mini transaction + @param[in] inc_clock True if clear all cached cursor on block */ + void cleanout_block(trx_t *trx, buf_block_t *block, mtr_t *mtr, bool clear_all = true); + + /** Background thread for writing back SCN */ + void cleanout_task(); + + /* Periodically generate safe up limit id for taking snapshot */ + void view_task(); + + /**@return min active transaction id. This is not + an accurate number */ + trx_id_t min_active_id() { + return m_min_active_id.load(std::memory_order_relaxed); + } + + uint64_t cleanout_records() { + return m_cleanout_records.load(); + } +private: + + /** Set scn to specified position + @param[in] mtr mini transaction + @param[in] ptr position where scn is written to + @param[in] id transaction id + @param[in] scn scn number of transaction */ + void set_scn(mtr_t *mtr, byte *ptr, trx_id_t id, trx_id_t scn); + + /** Storing trx id->scn mapping */ + Scn_Map *m_scn_map; + + /** Storing trx id->scn mapping to avoid duplicate + looking up */ + Scn_Map *m_random_map; + + CleanoutWorker **m_cleanout_workers; + + /** Number of threads for cleanout */ + uint32_t m_max_cleanout_threads; + + /** up transaction id on startup */ + trx_id_t m_startup_id; + + /** SCN number taken on startup */ + trx_id_t m_startup_scn; + + /** thread event */ + os_event_t m_view_event; + + /** Min active transaction id */ + std::atomic m_min_active_id; + + /** Flag to tell if background threads should stop or not */ + std::atomic m_abort; + + /** True if cleanout thread is active */ + std::atomic m_cleanout_threads; + + /** True if thread is active */ + std::atomic m_view_active; + + /** Counter of cleanout records */ + std::atomic m_cleanout_records; +}; + +extern SCN_Mgr *scn_mgr; + #endif diff --git a/mysql-8.0.31/storage/innobase/include/row0pread.h b/mysql-8.0.31/storage/innobase/include/row0pread.h index 3529c0b..2e7cff1 100644 --- a/mysql-8.0.31/storage/innobase/include/row0pread.h +++ b/mysql-8.0.31/storage/innobase/include/row0pread.h @@ -605,7 +605,7 @@ class Parallel_reader::Scan_ctx { built from the undo log. @param[in,out] mtr Mini-transaction covering the read. @return true if row is visible to the transaction. */ - [[nodiscard]] bool check_visibility(const rec_t *&rec, ulint *&offsets, + [[nodiscard]] bool check_visibility(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr); /** Create an execution context for a range and add it to @@ -725,9 +725,9 @@ class Parallel_reader::Ctx { built from the undo log. @param[in,out] mtr Mini-transaction covering the read. @return true if row is visible to the transaction. */ - bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, + bool is_rec_visible(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { - return m_scan_ctx->check_visibility(rec, offsets, heap, mtr); + return m_scan_ctx->check_visibility(block, rec, offsets, heap, mtr); } private: diff --git a/mysql-8.0.31/storage/innobase/include/row0purge.h b/mysql-8.0.31/storage/innobase/include/row0purge.h index 00a4757..08e5bd4 100644 --- a/mysql-8.0.31/storage/innobase/include/row0purge.h +++ b/mysql-8.0.31/storage/innobase/include/row0purge.h @@ -100,6 +100,9 @@ struct purge_node_t { /** Trx that created this undo record */ trx_id_t modifier_trx_id; + + /** Trx no of modifier_trx_id */ + trx_id_t modifier_trx_no; }; using Recs = std::list>; diff --git a/mysql-8.0.31/storage/innobase/include/srv0srv.h b/mysql-8.0.31/storage/innobase/include/srv0srv.h index e712399..3fe8274 100644 --- a/mysql-8.0.31/storage/innobase/include/srv0srv.h +++ b/mysql-8.0.31/storage/innobase/include/srv0srv.h @@ -761,6 +761,9 @@ extern bool srv_cmp_per_index_enabled; extern bool srv_redo_log; +extern uint32_t srv_cleanout_threads; +extern bool opt_trx_cache_cleanout_page; + /** Status variables to be passed to MySQL */ extern struct export_var_t export_vars; @@ -1198,6 +1201,7 @@ struct export_var_t { the dba created explicitly. */ ulint innodb_undo_tablespaces_active; /*!< number of active undo tablespaces */ + ulint innodb_scn_cleanout_records; /*!< number of records that have scn written back */ #ifdef UNIV_DEBUG ulint innodb_purge_trx_id_age; /*!< rw_max_trx_no - purged trx_no */ ulint innodb_purge_view_trx_id_age; /*!< rw_max_trx_no diff --git a/mysql-8.0.31/storage/innobase/include/sync0sync.h b/mysql-8.0.31/storage/innobase/include/sync0sync.h index e4c34cc..388bb6d 100644 --- a/mysql-8.0.31/storage/innobase/include/sync0sync.h +++ b/mysql-8.0.31/storage/innobase/include/sync0sync.h @@ -163,6 +163,7 @@ extern mysql_pfs_key_t lock_wait_mutex_key; extern mysql_pfs_key_t trx_sys_mutex_key; extern mysql_pfs_key_t trx_sys_shard_mutex_key; extern mysql_pfs_key_t trx_sys_serialisation_mutex_key; +extern mysql_pfs_key_t trx_sys_mvcc_mutex_key; extern mysql_pfs_key_t srv_sys_mutex_key; extern mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/mysql-8.0.31/storage/innobase/include/sync0types.h b/mysql-8.0.31/storage/innobase/include/sync0types.h index 0b4ff55..35bf74c 100644 --- a/mysql-8.0.31/storage/innobase/include/sync0types.h +++ b/mysql-8.0.31/storage/innobase/include/sync0types.h @@ -415,6 +415,7 @@ enum latch_id_t { LATCH_ID_TRX_SYS, LATCH_ID_TRX_SYS_SHARD, LATCH_ID_TRX_SYS_SERIALISATION, + LATCH_ID_TRX_SYS_MVCC, LATCH_ID_SRV_SYS, LATCH_ID_SRV_SYS_TASKS, LATCH_ID_PAGE_ZIP_STAT_PER_INDEX, diff --git a/mysql-8.0.31/storage/innobase/include/trx0purge.h b/mysql-8.0.31/storage/innobase/include/trx0purge.h index 7ad46b5..75e3159 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0purge.h +++ b/mysql-8.0.31/storage/innobase/include/trx0purge.h @@ -81,6 +81,7 @@ void trx_purge_add_update_undo_to_history( bool update_rseg_history_len, /*!< in: if true: update rseg history len else skip updating it. */ + bool is_insert, /*!< in: true if it's insert undo */ ulint n_added_logs, /*!< in: number of logs added */ mtr_t *mtr); /*!< in: mtr */ @@ -134,6 +135,8 @@ struct purge_iter_t { /** The transaction that created the undo log record, the Modifier trx id */ trx_id_t modifier_trx_id; + + trx_id_t modifier_trx_no; }; /* Namespace to hold all the related functions and variables needed @@ -1017,6 +1020,9 @@ struct trx_purge_t { /** The purge will not remove undo logs which are >= this view (purge view) */ ReadView view; + /** The scn of purge system before which undo can be purged */ + std::atomic version; + /** true if view is active */ bool view_active; diff --git a/mysql-8.0.31/storage/innobase/include/trx0rec.h b/mysql-8.0.31/storage/innobase/include/trx0rec.h index 3405fff..448f6b6 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0rec.h +++ b/mysql-8.0.31/storage/innobase/include/trx0rec.h @@ -57,6 +57,11 @@ static inline trx_undo_rec_t *trx_undo_rec_copy(const page_t *undo_page, uint32_t undo_offset, mem_heap_t *heap); +/** Get start position of undo record +@param[in] undo_rec undo log record +@return start position of undo record */ +static inline ulint trx_undo_start_offset(const trx_undo_rec_t *undo_rec); + /** Reads the undo log record type. @return record type */ static inline ulint trx_undo_rec_get_type( @@ -317,6 +322,8 @@ constexpr uint32_t TRX_UNDO_UPD_EXTERN = 128; constexpr uint32_t TRX_UNDO_INSERT_OP = 1; constexpr uint32_t TRX_UNDO_MODIFY_OP = 2; +#define TRX_UNDO_NEW_VERSION_TAG 0x00 + /** The type and compilation info flag in the undo record for update. For easier understanding let the 8 bits be numbered as 7, 6, 5, 4, 3, 2, 1, 0. */ @@ -371,6 +378,24 @@ byte *trx_undo_rec_get_pars( table_id_t *table_id, /*!< out: table id */ type_cmpl_t &type_cmpl); /*!< out: type compilation info. */ +bool trx_undo_rec_get_hdr( + trx_id_t id, + trx_undo_rec_t *undo_rec, + page_no_t &undo_hdr_no, + uint32_t &offset); + +trx_id_t trx_undo_hdr_get_scn( + trx_id_t trx_id, + page_id_t &page_id, + uint32_t offset, + mtr_t *mtr, + page_t *undo_page); + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id); + /** Get the max free space of undo log by assuming it's a fresh new page and the free space doesn't count for the undo log header too. */ size_t trx_undo_max_free_space(); diff --git a/mysql-8.0.31/storage/innobase/include/trx0rec.ic b/mysql-8.0.31/storage/innobase/include/trx0rec.ic index 138f865..1008819 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0rec.ic +++ b/mysql-8.0.31/storage/innobase/include/trx0rec.ic @@ -31,12 +31,21 @@ this program; if not, write to the Free Software Foundation, Inc., *******************************************************/ #ifndef UNIV_HOTBACKUP +static inline ulint trx_undo_start_offset(const trx_undo_rec_t *undo_rec) { + const byte *ptr = undo_rec + 2; + if (*ptr == TRX_UNDO_NEW_VERSION_TAG) { + return 11; + } else { + return 2; + } +} + /** Reads from an undo log record the record type. @return record type */ static inline ulint trx_undo_rec_get_type( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - return (mach_read_from_1(undo_rec + 2) & (TRX_UNDO_CMPL_INFO_MULT - 1)); + return (mach_read_from_1(undo_rec + trx_undo_start_offset(undo_rec)) & (TRX_UNDO_CMPL_INFO_MULT - 1)); } /** Reads from an undo log record the record compiler info. @@ -44,7 +53,7 @@ static inline ulint trx_undo_rec_get_type( static inline ulint trx_undo_rec_get_cmpl_info( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - return (mach_read_from_1(undo_rec + 2) / TRX_UNDO_CMPL_INFO_MULT); + return (mach_read_from_1(undo_rec + trx_undo_start_offset(undo_rec)) / TRX_UNDO_CMPL_INFO_MULT); } /** Returns true if an undo log record contains an extern storage field. @@ -52,7 +61,7 @@ static inline ulint trx_undo_rec_get_cmpl_info( static inline bool trx_undo_rec_get_extern_storage( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - if (mach_read_from_1(undo_rec + 2) & TRX_UNDO_UPD_EXTERN) { + if (mach_read_from_1(undo_rec + trx_undo_start_offset(undo_rec)) & TRX_UNDO_UPD_EXTERN) { return true; } @@ -64,7 +73,7 @@ static inline bool trx_undo_rec_get_extern_storage( static inline undo_no_t trx_undo_rec_get_undo_no( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - const byte *ptr = undo_rec + 2; + const byte *ptr = undo_rec + trx_undo_start_offset(undo_rec); uint8_t type_cmpl = mach_read_from_1(ptr); const bool blob_undo = type_cmpl & TRX_UNDO_MODIFY_BLOB; @@ -72,9 +81,9 @@ static inline undo_no_t trx_undo_rec_get_undo_no( if (blob_undo) { /* The next record offset takes 2 bytes + 1 byte for type_cmpl flag + 1 byte for the new flag. Total 4 bytes.*/ - ptr = undo_rec + 4; + ptr += 2; } else { - ptr = undo_rec + 3; + ptr += 1; } return (mach_u64_read_much_compressed(ptr)); } diff --git a/mysql-8.0.31/storage/innobase/include/trx0sys.h b/mysql-8.0.31/storage/innobase/include/trx0sys.h index 33b287a..2586148 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0sys.h +++ b/mysql-8.0.31/storage/innobase/include/trx0sys.h @@ -62,6 +62,8 @@ class ReadView; /** The transaction system */ extern trx_sys_t *trx_sys; +trx_t* current_trx(); + /** Checks if a page address is the trx sys header page. @param[in] page_id page id @return true if trx sys header page */ @@ -137,11 +139,6 @@ the trx_sys_serialisation_mutex must be acquired. @return new, allocated trx no */ inline trx_id_t trx_sys_allocate_trx_no(); -/** Retrieves a next value that will be allocated if trx_sys_allocate_trx_id() -or trx_sys_allocate_trx_id_trx_no() was called. -@return the next trx->id or trx->no that will be allocated */ -inline trx_id_t trx_sys_get_next_trx_id_or_no(); - #ifdef UNIV_DEBUG /* Flag to control TRX_RSEG_N_SLOTS behavior debugging. */ extern uint trx_rseg_n_slots_debug; @@ -491,7 +488,9 @@ struct trx_sys_t { trx_sys_t::serialisation_mutex. Note: it might be in parallel used for both trx->id and trx->no assignments (for different trx_t objects). */ - std::atomic next_trx_id_or_no; + std::atomic next_trx_id; + + std::atomic next_trx_scn; /** @} */ @@ -577,6 +576,22 @@ struct trx_sys_t { }, loc); } + + trx_id_t get_next_trx_id() { + return next_trx_id.fetch_add(2); + } + + trx_id_t get_max_trx_id() { + return next_trx_id.load(); + } + + trx_id_t get_next_trx_scn() { + return next_trx_scn.fetch_add(2); + } + + trx_id_t get_max_trx_scn() { + return next_trx_scn.load(); + } }; #endif /* !UNIV_HOTBACKUP */ diff --git a/mysql-8.0.31/storage/innobase/include/trx0sys.ic b/mysql-8.0.31/storage/innobase/include/trx0sys.ic index ab1a619..d8d8f4b 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0sys.ic +++ b/mysql-8.0.31/storage/innobase/include/trx0sys.ic @@ -209,7 +209,7 @@ static inline trx_t *trx_rw_is_active(trx_id_t trx_id, bool do_ref_count) { was set to larger than trx_id, so we decide to return nullptr, even though, if we were to repeat the call in just a moment we might get a different result if the trx_sys_rw_trx_add() for the trx_id happens meanwhile. */ - ut_ad(trx_id < trx_sys_get_next_trx_id_or_no()); + ut_ad(trx_id < trx_sys->get_max_trx_id()); auto &shard = trx_sys->get_shard_by_trx_id(trx_id); if (trx_id < shard.active_rw_trxs.peek().min_id()) { return nullptr; @@ -232,12 +232,10 @@ inline trx_id_t trx_sys_get_trx_id_write_margin() { /** Allocates a new transaction id or transaction number. @return new, allocated trx id or trx no */ -inline trx_id_t trx_sys_allocate_trx_id_or_no() { +inline void trx_sys_flush_trx_id_or_no(trx_id_t id) { ut_ad(trx_sys_mutex_own() || trx_sys_serialisation_mutex_own()); - trx_id_t trx_id = trx_sys->next_trx_id_or_no.fetch_add(1); - - if (trx_id % trx_sys_get_trx_id_write_margin() == 0) { + if (id % trx_sys_get_trx_id_write_margin() == 0) { /* Reserve the next range of trx_id values. This thread has acquired either the trx_sys_mutex or the trx_sys_serialisation_mutex. @@ -251,17 +249,20 @@ inline trx_id_t trx_sys_allocate_trx_id_or_no() { trx_sys_write_max_trx_id(); } - return trx_id; } inline trx_id_t trx_sys_allocate_trx_id() { ut_ad(trx_sys_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + trx_id_t id = trx_sys->get_next_trx_id(); + trx_sys_flush_trx_id_or_no(id); + return id; } inline trx_id_t trx_sys_allocate_trx_no() { ut_ad(trx_sys_serialisation_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + trx_id_t scn = trx_sys->get_next_trx_scn(); + trx_sys_flush_trx_id_or_no(scn + 1); + return scn; } /** Reads trx->no up to which all transactions have been serialised. @@ -270,10 +271,6 @@ static inline trx_id_t trx_get_serialisation_min_trx_no(void) { return (trx_sys->serialisation_min_trx_no.load()); } -inline trx_id_t trx_sys_get_next_trx_id_or_no() { - return trx_sys->next_trx_id_or_no.load(); -} - /** Determine if there are incomplete transactions in the system. @return whether incomplete transactions need rollback */ static inline bool trx_sys_need_rollback() { diff --git a/mysql-8.0.31/storage/innobase/include/trx0trx.h b/mysql-8.0.31/storage/innobase/include/trx0trx.h index 8190ea4..974878b 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0trx.h +++ b/mysql-8.0.31/storage/innobase/include/trx0trx.h @@ -54,6 +54,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0read.h" #include "sql/handler.h" // Xa_state_list #include "srv0srv.h" +#include +#include +#include // Forward declaration struct mtr_t; @@ -677,6 +680,9 @@ enum trx_rseg_type_t { TRX_RSEG_TYPE_NOREDO /*!< non-redo rollback segment. */ }; +typedef std::vector> SCNIndexIds; +#define TRX_CLEAN_PAGE_SET_THRESHOLD 64 + struct trx_t { enum isolation_level_t { @@ -1107,6 +1113,34 @@ struct trx_t { #endif /* UNIV_DEBUG */ ulint magic_n; + /** Index ids set that need to set its SCN numnber */ + SCNIndexIds scn_indexs; + + /** The pages that have SCN to be written back */ + PageSets scn_pages; + + /** True if it caches page id in scn_pages, and push to + background thread while exceeding threshold or transaction commits */ + bool cache_scn_page; + + uint32_t add_scn_page(uint64_t compact_id) { + scn_pages.insert(compact_id); + return scn_pages.size(); + } + + void remove_scn_page(uint64_t compact_id) { + scn_pages.erase(compact_id); + } + + void reset_scn_pages() { + scn_pages.clear(); + } + + void add_scn_index(table_id_t table_id, space_index_t index_id) { + scn_indexs.push_back(std::make_pair(table_id, index_id)); + } + + bool is_read_uncommitted() const { return (isolation_level == READ_UNCOMMITTED); } diff --git a/mysql-8.0.31/storage/innobase/include/trx0types.h b/mysql-8.0.31/storage/innobase/include/trx0types.h index a2c0d99..a12ed29 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0types.h +++ b/mysql-8.0.31/storage/innobase/include/trx0types.h @@ -43,6 +43,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include #include #include +#include /** printf(3) format used for printing DB_TRX_ID and other system fields */ #define TRX_ID_FMT IB_ID_FMT @@ -601,5 +602,7 @@ struct TrxVersion { uint64_t m_version; }; +typedef std::unordered_set PageSets; + typedef std::vector> hit_list_t; #endif /* trx0types_h */ diff --git a/mysql-8.0.31/storage/innobase/include/trx0undo.h b/mysql-8.0.31/storage/innobase/include/trx0undo.h index 3e5e269..c767985 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0undo.h +++ b/mysql-8.0.31/storage/innobase/include/trx0undo.h @@ -151,6 +151,7 @@ trx_undo_rec_t *trx_undo_get_next_rec( @param[in,out] mtr Mini-transaction @return undo log record, the page latched, NULL if none */ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, + trx_id_t *modifier_trx_no, space_id_t space, const page_size_t &page_size, page_no_t page_no, ulint offset, @@ -256,7 +257,7 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr); /** Frees an insert undo log after a transaction commit or rollback. diff --git a/mysql-8.0.31/storage/innobase/include/trx0undo.ic b/mysql-8.0.31/storage/innobase/include/trx0undo.ic index 0d7ed9f..a57903b 100644 --- a/mysql-8.0.31/storage/innobase/include/trx0undo.ic +++ b/mysql-8.0.31/storage/innobase/include/trx0undo.ic @@ -138,10 +138,10 @@ static inline page_t *trx_undo_page_get_s_latched(const page_id_t &page_id, const page_size_t &page_size, mtr_t *mtr) { buf_block_t *block = - buf_page_get(page_id, page_size, RW_S_LATCH, UT_LOCATION_HERE, mtr); + buf_page_get_allow_freed(page_id, page_size, RW_S_LATCH, UT_LOCATION_HERE, mtr); buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE); - return (buf_block_get_frame(block)); + return ((block != nullptr) ? buf_block_get_frame(block) : nullptr); } /** Returns the start offset of the undo log records of the specified undo diff --git a/mysql-8.0.31/storage/innobase/lock/lock0lock.cc b/mysql-8.0.31/storage/innobase/lock/lock0lock.cc index d002914..5a6d9b3 100644 --- a/mysql-8.0.31/storage/innobase/lock/lock0lock.cc +++ b/mysql-8.0.31/storage/innobase/lock/lock0lock.cc @@ -213,7 +213,7 @@ bool lock_check_trx_id_sanity(trx_id_t trx_id, const rec_t *rec, const dict_index_t *index, const ulint *offsets) { ut_ad(rec_offs_validate(rec, index, offsets)); - trx_id_t next_trx_id = trx_sys_get_next_trx_id_or_no(); + trx_id_t next_trx_id = trx_sys->get_max_trx_id(); bool is_ok = trx_id < next_trx_id; if (!is_ok) { @@ -231,6 +231,7 @@ bool lock_clust_rec_cons_read_sees( passed over by a read cursor */ dict_index_t *index, /*!< in: clustered index */ const ulint *offsets, /*!< in: rec_get_offsets(rec, index) */ + btr_pcur_t *pcur, /*!< in: cursor of rec, or NULL */ ReadView *view) /*!< in: consistent read view */ { ut_ad(index->is_clustered()); @@ -246,12 +247,7 @@ bool lock_clust_rec_cons_read_sees( return (true); } - /* NOTE that we call this function while holding the search - system latch. */ - - trx_id_t trx_id = row_get_rec_trx_id(rec, index, offsets); - - return (view->changes_visible(trx_id, index->table->name)); + return (view->changes_visible(index, pcur != nullptr ? pcur->get_block() : nullptr, rec, offsets)); } /** Checks that a non-clustered index record is seen in a consistent read. @@ -4645,7 +4641,7 @@ void lock_print_info_summary(FILE *file) { file); fprintf(file, "Trx id counter " TRX_ID_FMT "\n", - trx_sys_get_next_trx_id_or_no()); + trx_sys->get_max_trx_id()); fprintf(file, "Purge done for trx's n:o < " TRX_ID_FMT " undo n:o < " TRX_ID_FMT diff --git a/mysql-8.0.31/storage/innobase/page/page0cur.cc b/mysql-8.0.31/storage/innobase/page/page0cur.cc index 3f30cac..cf0d6f1 100644 --- a/mysql-8.0.31/storage/innobase/page/page0cur.cc +++ b/mysql-8.0.31/storage/innobase/page/page0cur.cc @@ -2380,7 +2380,9 @@ void page_cur_delete_rec( part of the buffer pool. */ if (mtr != nullptr) { - buf_block_modify_clock_inc(page_cur_get_block(cursor)); + buf_block_t *block = page_cur_get_block(cursor); + buf_block_modify_clock_inc(block); + block->clear_cursor(); } /* 2. Find the next and the previous record. Note that the cursor is diff --git a/mysql-8.0.31/storage/innobase/page/page0page.cc b/mysql-8.0.31/storage/innobase/page/page0page.cc index a578467..ad7d19d 100644 --- a/mysql-8.0.31/storage/innobase/page/page0page.cc +++ b/mysql-8.0.31/storage/innobase/page/page0page.cc @@ -317,6 +317,8 @@ static page_t *page_create_low(buf_block_t *block, ulint comp, buf_block_modify_clock_inc(block); + block->clear_cursor(); + page = buf_block_get_frame(block); ut_ad(page_type == FIL_PAGE_INDEX || page_type == FIL_PAGE_RTREE || @@ -432,6 +434,7 @@ void page_create_empty(buf_block_t *block, dict_index_t *index, mtr_t *mtr) { page_zip_des_t *page_zip = buf_block_get_page_zip(block); ut_ad(fil_page_index_page_check(page)); + block->clear_cursor(); /* Multiple transactions cannot simultaneously operate on the same temp-table in parallel. @@ -941,6 +944,7 @@ void page_delete_rec_list_end( frame modify clock */ buf_block_modify_clock_inc(block); + block->clear_cursor(); page_delete_rec_list_write_log(rec, index, MLOG_LIST_END_DELETE, mtr); @@ -2170,7 +2174,7 @@ bool page_validate( trx_id_t max_trx_id = page_get_max_trx_id(page); /* This will be 0 during recv_apply_hashed_log_recs(), because the transaction system has not been initialized yet */ - trx_id_t sys_next_trx_id_or_no = trx_sys_get_next_trx_id_or_no(); + trx_id_t sys_next_trx_id_or_no = trx_sys->get_max_trx_id(); if (max_trx_id == 0 || (sys_next_trx_id_or_no != 0 && max_trx_id >= sys_next_trx_id_or_no)) { diff --git a/mysql-8.0.31/storage/innobase/read/read0read.cc b/mysql-8.0.31/storage/innobase/read/read0read.cc index 1b2cf0c..f0f0e1f 100644 --- a/mysql-8.0.31/storage/innobase/read/read0read.cc +++ b/mysql-8.0.31/storage/innobase/read/read0read.cc @@ -35,6 +35,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "srv0srv.h" #include "trx0sys.h" +#include "trx0rec.h" +#include "trx0rseg.h" +#include "row0row.h" /* ------------------------------------------------------------------------------- @@ -181,6 +184,7 @@ will mark their views as closed but not actually free their views. /** Minimum number of elements to reserve in ReadView::ids_t */ static const ulint MIN_TRX_IDS = 32; +SCN_Mgr *scn_mgr = nullptr; #ifdef UNIV_DEBUG /** Functor to validate the view list. */ @@ -199,12 +203,12 @@ struct ViewCheck { /** Validates a read view list. */ -bool MVCC::validate() const { +bool MVCC::validate(uint64_t slot) const { ViewCheck check; ut_ad(trx_sys_mutex_own()); - ut_list_map(m_views, check); + ut_list_map(m_views[slot], check); return (true); } @@ -316,7 +320,10 @@ ReadView::ReadView() m_up_limit_id(), m_creator_trx_id(), m_ids(), - m_low_limit_no() { + m_low_limit_no(), + m_trx(nullptr), + m_version(), + m_slot() { ut_d(::memset(&m_view_list, 0x0, sizeof(m_view_list))); ut_d(m_view_low_limit_no = 0); } @@ -329,22 +336,84 @@ ReadView::~ReadView() { /** Constructor @param size Number of views to pre-allocate */ -MVCC::MVCC(ulint size) : m_free(), m_views() { - for (ulint i = 0; i < size; ++i) { - ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); +MVCC::MVCC(ulint size) { - UT_LIST_ADD_FIRST(m_free, view); + m_slot_index = 0; + + for (auto slot = 0; slot < MAX_SNAPSHOT_SIZE; slot++) { + UT_LIST_INIT(m_free[slot]); + UT_LIST_INIT(m_views[slot]); + + for (ulint i = 0; i < size; ++i) { + ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); + UT_LIST_ADD_FIRST(m_free[slot], view); + } + + mutex_create(LATCH_ID_TRX_SYS_MVCC, &(m_mutexs[slot])); } } MVCC::~MVCC() { - while (ReadView *view = UT_LIST_GET_FIRST(m_free)) { - UT_LIST_REMOVE(m_free, view); + for (auto slot = 0; slot < MAX_SNAPSHOT_SIZE; slot++) { + while (ReadView *view = UT_LIST_GET_FIRST(m_free[slot])) { + UT_LIST_REMOVE(m_free[slot], view); + + ut::delete_(view); + } + + ut_a(UT_LIST_GET_LEN(m_views[slot]) == 0); + + mutex_destroy(&(m_mutexs[slot])); + } +} + +bool ReadView::changes_visible( + const dict_index_t *index, + buf_block_t *block, + const rec_t *rec, + const ulint *offsets) { + + ut_a(index->is_clustered()); + ut_a(scn_mgr != nullptr); + + /* Get transaction id from record */ + ulint offset = scn_mgr->scn_offset(index, offsets); + trx_id_t id = mach_read_from_6(rec + offset); + + /* If it's scn, direct compare*/ + if (SCN_Mgr::is_scn(id)) { + return sees_version(id); + } + + /* Trx itself */ + if (id == m_creator_trx_id) { + return true; + } + + if (id < m_up_limit_id) { + return true; + } + + if (id >= m_low_limit_id) { + return false; + } + + /* Get SCN from undo log */ + trx_id_t scn = scn_mgr->get_scn(id, index, row_get_rec_roll_ptr(rec, index, offsets)); + + if (scn == TRX_ID_MAX) { + /* Still active */ + return false; + } + + ut_a(scn > 0); - ut::delete_(view); + if (block != nullptr) { + /* Attch record to block */ + scn_mgr->add_lazy_cursor(m_trx, block, const_cast(rec) + offset, id, scn); } - ut_a(UT_LIST_GET_LEN(m_views) == 0); + return (sees_version(scn)); } /** @@ -444,27 +513,19 @@ point in time are seen in the view. @param id Creator transaction id */ void ReadView::prepare(trx_id_t id) { - ut_ad(trx_sys_mutex_own()); m_creator_trx_id = id; m_low_limit_no = trx_get_serialisation_min_trx_no(); - m_low_limit_id = trx_sys_get_next_trx_id_or_no(); - - ut_a(m_low_limit_no <= m_low_limit_id); + m_up_limit_id = scn_mgr->min_active_id(); + m_version = trx_sys->get_max_trx_scn(); + m_low_limit_id = trx_sys->get_max_trx_id(); - if (!trx_sys->rw_trx_ids.empty()) { - copy_trx_ids(trx_sys->rw_trx_ids); - } else { - m_ids.clear(); + if (m_low_limit_no > m_version) { + m_low_limit_no = m_version; } - /* The first active transaction has the smallest id. */ - m_up_limit_id = !m_ids.empty() ? m_ids.front() : m_low_limit_id; - - ut_a(m_up_limit_id <= m_low_limit_id); - ut_d(m_view_low_limit_no = m_low_limit_no); m_closed = false; } @@ -474,14 +535,13 @@ Find a free view from the active list, if none found then allocate a new view. @return a view to use */ -ReadView *MVCC::get_view() { - ut_ad(trx_sys_mutex_own()); +ReadView *MVCC::get_view(uint64_t slot) { ReadView *view; - if (UT_LIST_GET_LEN(m_free) > 0) { - view = UT_LIST_GET_FIRST(m_free); - UT_LIST_REMOVE(m_free, view); + if (UT_LIST_GET_LEN(m_free[slot]) > 0) { + view = UT_LIST_GET_FIRST(m_free[slot]); + UT_LIST_REMOVE(m_free[slot], view); } else { view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); @@ -493,34 +553,6 @@ ReadView *MVCC::get_view() { return (view); } -/** -Release a view that is inactive but not closed. Caller must own -the trx_sys_t::mutex. -@param view View to release */ -void MVCC::view_release(ReadView *&view) { - ut_ad(!srv_read_only_mode); - ut_ad(trx_sys_mutex_own()); - - uintptr_t p = reinterpret_cast(view); - - ut_a(p & 0x1); - - view = reinterpret_cast(p & ~1); - - ut_ad(view->m_closed); - - /** RW transactions should not free their views here. Their views - should freed using view_close_view() */ - - ut_ad(view->m_creator_trx_id == 0); - - UT_LIST_REMOVE(m_views, view); - - UT_LIST_ADD_LAST(m_free, view); - - view = nullptr; -} - /** Allocate and create a view. @param view View owned by this class created for the caller. Must be freed by calling view_close() @@ -548,7 +580,8 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { if (trx_is_autocommit_non_locking(trx) && view->empty()) { view->m_closed = false; - if (view->m_low_limit_id == trx_sys_get_next_trx_id_or_no()) { + if (view->m_low_limit_id == trx_sys->get_max_trx_id() && + view->m_version == trx_sys->get_max_trx_scn()) { return; } else { view->m_closed = true; @@ -556,111 +589,86 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { } } - trx_sys_mutex_enter(); + uint64_t slot; + if (view != nullptr) { + slot = view->get_slot(); + } else { + slot = get_slot(); + } + + enter(slot); if (view != nullptr) { - UT_LIST_REMOVE(m_views, view); + UT_LIST_REMOVE(m_views[slot], view); } else { - view = get_view(); + view = get_view(slot); } if (view != nullptr) { view->prepare(trx->id); - UT_LIST_ADD_FIRST(m_views, view); + UT_LIST_ADD_FIRST(m_views[slot], view); ut_ad(!view->is_closed()); - ut_ad(validate()); + view->set_slot(slot); } - trx_sys_mutex_exit(); + exit(slot); } -ReadView *MVCC::get_view_created_by_trx_id(trx_id_t trx_id) const { - ReadView *view; - - ut_ad(trx_sys_mutex_own()); - - for (view = UT_LIST_GET_LAST(m_views); view != nullptr; - view = UT_LIST_GET_PREV(m_view_list, view)) { - if (view->is_closed()) { - continue; - } - - if (view->m_creator_trx_id == trx_id) { - break; - } - } +void MVCC::enter(uint64_t slot) { + mutex_enter(&(m_mutexs[slot])); +} - return (view); +void MVCC::exit(uint64_t slot) { + mutex_exit(&(m_mutexs[slot])); } /** Get the oldest (active) view in the system. @return oldest view if found or NULL */ -ReadView *MVCC::get_oldest_view() const { - ReadView *view; +trx_id_t MVCC::get_oldest_version() { + trx_id_t purge_version = TRX_ID_MAX; - ut_ad(trx_sys_mutex_own()); + for (auto i = 0; i < MAX_SNAPSHOT_SIZE; i++) { + enter(i); + for (ReadView* view = UT_LIST_GET_LAST(m_views[i]); view != nullptr; + view = UT_LIST_GET_PREV(m_view_list, view)) { + if (view->is_closed()) { + continue; + } + + trx_id_t min_no = std::min(view->low_limit_no(), view->version()); + if (min_no < purge_version) { + purge_version = min_no; + } - for (view = UT_LIST_GET_LAST(m_views); view != nullptr; - view = UT_LIST_GET_PREV(m_view_list, view)) { - if (!view->is_closed()) { break; } + + exit(i); } - return (view); + return purge_version; } /** Copy state from another view. Must call copy_complete() to finish. @param other view to copy from */ -void ReadView::copy_prepare(const ReadView &other) { - ut_ad(&other != this); - - if (!other.m_ids.empty()) { - const ids_t::value_type *p = other.m_ids.data(); - - m_ids.assign(p, p + other.m_ids.size()); - } else { - m_ids.clear(); - } - - m_up_limit_id = other.m_up_limit_id; +void ReadView::copy_prepare(trx_id_t version) { - m_low_limit_no = other.m_low_limit_no; + m_up_limit_id = 0; - ut_d(m_view_low_limit_no = other.m_view_low_limit_no); + m_low_limit_no = version; - m_low_limit_id = other.m_low_limit_id; + m_version = version; - m_creator_trx_id = other.m_creator_trx_id; -} + m_low_limit_id = trx_sys->get_max_trx_id(); -/** -Complete the copy, insert the creator transaction id into the -m_ids too and adjust the m_up_limit_id, if required */ - -void ReadView::copy_complete() { - ut_ad(!trx_sys_mutex_own()); - - if (m_creator_trx_id > 0) { - m_ids.insert(m_creator_trx_id); - } - - if (!m_ids.empty()) { - /* The last active transaction has the smallest id. */ - m_up_limit_id = std::min(m_ids.front(), m_up_limit_id); - } - - ut_ad(m_up_limit_id <= m_low_limit_id); - - /* We added the creator transaction ID to the m_ids. */ m_creator_trx_id = 0; } @@ -671,21 +679,13 @@ m_free list. This function is called by Purge to determine whether it should purge the delete marked record or not. @param view Preallocated view, owned by the caller */ void MVCC::clone_oldest_view(ReadView *view) { - trx_sys_mutex_enter(); - ReadView *oldest_view = get_oldest_view(); + trx_id_t version = get_oldest_version(); - if (oldest_view == nullptr) { + if (version == TRX_ID_MAX) { view->prepare(0); - - trx_sys_mutex_exit(); - } else { - view->copy_prepare(*oldest_view); - - trx_sys_mutex_exit(); - - view->copy_complete(); + view->copy_prepare(version); } /* Update view to block purging transaction till GTID is persisted. */ auto >id_persistor = clone_sys->get_gtid_persistor(); @@ -696,19 +696,19 @@ void MVCC::clone_oldest_view(ReadView *view) { /** @return the number of active views */ -ulint MVCC::size() const { - trx_sys_mutex_enter(); - +ulint MVCC::size() { ulint size = 0; - for (const ReadView *view : m_views) { - if (!view->is_closed()) { - ++size; + for (auto i = 0; i < MAX_SNAPSHOT_SIZE; i++) { + enter(i); + for (const ReadView *view : m_views[i]) { + if (!view->is_closed()) { + ++size; + } } + exit(i); } - trx_sys_mutex_exit(); - return (size); } @@ -734,14 +734,559 @@ void MVCC::view_close(ReadView *&view, bool own_mutex) { view = reinterpret_cast(p | 0x1); } else { view = reinterpret_cast(p & ~1); + auto slot = view->get_slot(); + enter(slot); view->close(); - UT_LIST_REMOVE(m_views, view); - UT_LIST_ADD_LAST(m_free, view); + UT_LIST_REMOVE(m_views[slot], view); + UT_LIST_ADD_LAST(m_free[slot], view); - ut_ad(validate()); + exit(slot); + ut_ad(validate(slot)); view = nullptr; } } + +Scn_Map::Scn_Map() { + m_size = SCN_MAP_MAX_SIZE; + m_elems = new Elem[m_size]; + ut_a(m_elems != nullptr); +} + +Scn_Map::~Scn_Map() { + delete[] m_elems; +} + +SCN_Mgr::CleanoutWorker::CleanoutWorker() { + m_pages = new LF_Array(LF_ARRAY_MAX_SIZE); + m_event = os_event_create(); + os_event_reset(m_event); +} + +SCN_Mgr::CleanoutWorker::~CleanoutWorker() { + os_event_destroy(m_event); + delete m_pages; +} + +void SCN_Mgr::CleanoutWorker::add_page(uint64_t compact_page_id) { + m_pages->add(compact_page_id); +} + +void SCN_Mgr::CleanoutWorker::add_pages(PageSets &pages) { + if (pages.empty()) { + return; + } + + for (auto &val : pages) { + if(!m_pages->add(val)) { + /* The array is full or fail to add because of + hot competition between threads. */ + break; + } + } +} + +void SCN_Mgr::CleanoutWorker::take_pages(PageSets &pages) { + pages.clear(); + + uint64_t val = 0; + while ((val = m_pages->get()) != 0) { + pages.insert(val); + } +} + +SCN_Mgr::SCN_Mgr() { + m_scn_map = new Scn_Map(); + m_random_map = new Scn_Map(); + m_startup_id = 0; + m_startup_scn = 0; + m_abort = false; + m_cleanout_threads = 0; + m_view_active = false; + m_max_cleanout_threads = srv_cleanout_threads; + + m_cleanout_workers = new CleanoutWorker* [m_max_cleanout_threads]; + + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + m_cleanout_workers[i] = new CleanoutWorker(); + } + + m_view_event = os_event_create(); + os_event_reset(m_view_event); +} + +SCN_Mgr::~SCN_Mgr() { + delete m_scn_map; + delete m_random_map; + + ut_a(m_cleanout_threads.load() == 0); + + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + delete m_cleanout_workers[i]; + } + + delete [] m_cleanout_workers; + + os_event_destroy(m_view_event); +} + +/** Init scn/id on startup */ +void SCN_Mgr::init() { + m_startup_scn = trx_sys->get_max_trx_scn() - 2; + + trx_sys_mutex_enter(); + if (trx_sys->rw_trx_ids.empty()) { + m_startup_id = trx_sys->get_max_trx_id(); + } else { + m_startup_id = trx_sys->rw_trx_ids.front(); + } + + trx_sys_mutex_exit(); +} + +trx_id_t SCN_Mgr::get_scn_fast(trx_id_t id) { + trx_id_t scn; + if (id < m_startup_id) { + /* Too old transaction */ + scn = m_startup_scn; + } else { + scn = m_scn_map->read(id); + if (scn == 0) { + scn = m_random_map->read(id); + } + + if (scn == 0) { + /* Check if transaction is still active */ + if (trx_rw_is_active(id, false) != nullptr) { + scn = TRX_ID_MAX; + } + } + } + + return scn; +} + +trx_id_t SCN_Mgr::get_scn(trx_id_t id, const dict_index_t *index, roll_ptr_t roll_ptr) { + trx_id_t scn = get_scn_fast(id); + + if (scn == TRX_ID_MAX) { + /* Transaction is still active */ + return TRX_ID_MAX; + } + + if (scn != 0) { + return scn; + } + + /* Slow path */ + scn = trx_undo_get_scn(index, roll_ptr, id); + if (scn > 0) { + m_random_map->store(id, scn); + } + + ut_a(scn < trx_sys->get_max_trx_scn()); + + if (scn == 0) { + return TRX_ID_MAX; + } + + return scn; +} + +ulint SCN_Mgr::scn_offset(const dict_index_t *index, const ulint *offsets) { + ulint offset = index->trx_id_offset; + + if (!offset) { + offset = row_get_trx_id_offset(index, offsets); + } + + return offset; +} + +void SCN_Mgr::set_scn(mtr_t *mtr, byte *ptr, trx_id_t id, trx_id_t scn) { + trx_id_t stored_id = mach_read_from_6(ptr); + + if (is_scn(stored_id)) { + if (stored_id > scn) { + /* Never revert back to smaller scn */ + return; + } + } else if (stored_id != id) { + /* don't match */ + return; + } + + mach_write_to_6(ptr, scn); +// mlog_log_string(ptr, 6, mtr); + m_cleanout_records++; +} + +void SCN_Mgr::set_scn( + trx_id_t current_id, + mtr_t *mtr, + rec_t *rec, + dict_index_t *index, + const ulint *offsets) { + + ulint offset = scn_offset(index, offsets); + + /* Read id */ + trx_id_t id = mach_read_from_6(rec + offset); + if (is_scn(id) || id == current_id) { + /* Already be filled with scn, do nothing */ + return; + } + + trx_id_t scn = get_scn(id, index, row_get_rec_roll_ptr(rec, index, offsets)); + + if (scn == 0 || scn == TRX_ID_MAX) { + return; + } + + set_scn(mtr, rec + offset, id, scn); +} + +void SCN_Mgr::add_pages(trx_t *trx) { + if (trx->scn_pages.empty()) { + return; + } + + for (auto compact_val : trx->scn_pages) { + page_id_t id(compact_val); + m_cleanout_workers[id.hash() % m_max_cleanout_threads ]->add_page(compact_val); + } + + /*Note: we don't gurantee all pags are added successfully, but + it's ok because we have lazy cleanout which will fillin scn + later sometime. */ + trx->reset_scn_pages(); +} + +void SCN_Mgr::add_instant_cursor(trx_t *trx, buf_block_t *block, rec_t *rec, dict_index_t *index, const ulint *offsets) { + ut_a(trx != nullptr); + + if (!trx->cache_scn_page + || !index->is_clustered() + || fsp_is_system_temporary(block->get_space_id()) + || dict_index_is_ibuf(index) + || !page_is_leaf(buf_block_get_frame(block))) { + /* do nothing */ + return; + } + + ut_a(trx->id > 0); + trx_undo_ptr_t *redo_rseg_undo_ptr = &trx->rsegs.m_redo; + trx_undo_t *undo = nullptr; + + undo = redo_rseg_undo_ptr->update_undo; + if (undo == nullptr) { + undo = redo_rseg_undo_ptr->insert_undo; + } + + if (undo == nullptr) { + return; + } + + ulint offset = scn_offset(index, offsets); + + struct UndoHdrPos hdr = {trx->id, undo->space, undo->hdr_page_no, undo->hdr_offset}; + + block->add_instant_cursor(rec + offset, hdr); + + page_id_t id{block->get_space_id(), block->get_page_no()}; + uint64_t compact_id = id.compact_value(); + if (trx->add_scn_page(compact_id) + >= TRX_CLEAN_PAGE_SET_THRESHOLD) { + add_pages(trx); + } +} + +void SCN_Mgr::add_lazy_cursor(trx_t *trx, buf_block_t *block, byte *pos, trx_id_t id, trx_id_t scn) { + + if (fsp_is_system_temporary(block->get_space_id())) { + return; + } + + block->add_lazy_cursor(pos, id, scn); + + /* Add page number to the set */ + page_id_t page_id{block->get_space_id(), block->get_page_no()}; + uint64_t compact_id = page_id.compact_value(); + uint32_t slot = page_id.hash() % m_max_cleanout_threads; + + if (trx == nullptr || !trx->cache_scn_page) { + m_cleanout_workers[slot]->add_page(compact_id); + } else { + if (trx->add_scn_page(compact_id) >= TRX_CLEAN_PAGE_SET_THRESHOLD) { + add_pages(trx); + } + } +} + +void SCN_Mgr::view_task() { + m_view_active = true; + + while (!m_abort.load()) { + uint64_t sig_counter = os_event_reset(m_view_event); + + trx_sys_mutex_enter(); + if (!trx_sys->rw_trx_ids.empty()) { + m_min_active_id = trx_sys->rw_trx_ids.front(); + } else { + m_min_active_id = trx_sys->get_max_trx_id(); + } + trx_sys_mutex_exit(); + + os_event_wait_time_low(m_view_event, std::chrono::seconds{1}, sig_counter); + } + + m_view_active = false; +} + +void SCN_Mgr::cleanout_block(trx_t *trx, buf_block_t *block, mtr_t *mtr, bool clear_all) { + /* Now the block is X locked, and it's safe to access cleanout array + without mutex. */ + for (auto & rec : *block->lazy_recs) { + set_scn(mtr, rec.first, rec.second.first, rec.second.second); + } + + if (clear_all) { + block->clear_cursor(); + } else { + block->lazy_recs->clear(); + } + + if (trx) { + uint64_t compact_val = block->get_page_id().compact_value(); + trx->remove_scn_page(compact_val); + } +} + +bool SCN_Mgr::instant_to_lazy(buf_block_t *block, page_id_t &page_id, page_size_t &page_size, mtr_t *mtr) { + ut_a(!block->instant_recs->empty()); + + InstantCleanoutRecs* instant_recs = block->instant_recs; + block->instant_recs = new InstantCleanoutRecs(); + block->instant_recs->clear(); + + uint64_t layout_clock = block->layout_clock; + mtr_commit(mtr); + mtr_start(mtr); + + std::unordered_map scn_map; + InstantCleanoutRecs* incomplete_recs = new InstantCleanoutRecs(); + LazyCleanoutRecs* complete_recs = new LazyCleanoutRecs(); + + scn_map.clear(); + incomplete_recs->clear(); + complete_recs->clear(); + + for (auto &item : *instant_recs) { + auto itr = scn_map.find(item.second.id); + if (itr != scn_map.end()) { + /* Already find scn */ + complete_recs->insert(std::make_pair(item.first, std::make_pair(item.second.id, itr->second))); + continue; + } + + trx_id_t id = item.second.id; + trx_id_t scn = get_scn_fast(id); + if (scn == TRX_ID_MAX) { + /* Transaction is still active */ + incomplete_recs->insert(std::make_pair(item.first, item.second)); + continue; + } + + if (scn == 0) { + mtr_t local_mtr; + mtr_start(&local_mtr); + page_id_t page_id(item.second.undo_space, item.second.undo_page_no); + scn = trx_undo_hdr_get_scn(id, page_id, + (uint32_t)(item.second.undo_hdr_offset), &local_mtr, nullptr); + mtr_commit(&local_mtr); + } + + ut_a(scn != 0); + + scn_map.insert(std::make_pair(item.second.id, scn)); + + complete_recs->insert(std::make_pair(item.first, std::make_pair(id, scn))); + } + + bool has_completed_recs = (!complete_recs->empty()); + + /* Re-acquire the block */ + buf_block_t *get_block = buf_page_get_gen(page_id, page_size, RW_X_LATCH, nullptr, + Page_fetch::IF_IN_POOL, UT_LOCATION_HERE, mtr, false, true); + + if (get_block != block + || get_block->layout_clock != layout_clock) { + delete instant_recs; + delete incomplete_recs; + delete complete_recs; + return false; + } + + /* Merge to lazy_recs map */ + if (!complete_recs->empty()) { + if (block->lazy_recs->empty()) { + delete block->lazy_recs; + block->lazy_recs = complete_recs; + } else { + for (auto &item : *complete_recs) { + if (block->lazy_recs->find(item.first) == block->lazy_recs->end()) { + block->lazy_recs->insert(item); + } + } + + delete complete_recs; + } + } else { + delete complete_recs; + } + + if (!incomplete_recs->empty()) { + if (block->instant_recs->empty()) { + delete block->instant_recs; + block->instant_recs = incomplete_recs; + } else { + for (auto &item : *incomplete_recs) { + if (block->instant_recs->find(item.first) == block->instant_recs->end()) { + block->instant_recs->insert(item); + } + } + + delete incomplete_recs; + } + } else { + delete incomplete_recs; + } + + delete instant_recs; + + return has_completed_recs; +} + +void SCN_Mgr::cleanout_task() { + + uint32_t slot = m_cleanout_threads.fetch_add(1); + + ib::info() << "Cleanout thread id " << slot << " starts..."; + + while (!m_abort.load()) { + uint64_t sig_counter = m_cleanout_workers[slot]->reset(); + + PageSets pages; + pages.clear(); + + PageSets incomplete_page; + incomplete_page.clear(); + + m_cleanout_workers[slot]->take_pages(pages); + + if (!pages.empty()) { + /* Process pages that need to be modified */ + for ( auto &val : pages) { + page_id_t id(val); + //TBD: OPTIMIZE IT + uint32_t flags = fil_space_get_flags(id.space()); + if (flags == UINT32_UNDEFINED) { + continue; + } + page_size_t page_size(flags); + + mtr_t mtr; + buf_block_t *block = nullptr; + mtr_start(&mtr); + block = buf_page_get_gen(id, page_size, RW_X_LATCH, nullptr, + Page_fetch::IF_IN_POOL, UT_LOCATION_HERE, &mtr, false, true); + + if (block == nullptr) { + mtr_commit(&mtr); + continue; + } + +loop: + cleanout_block(nullptr, block, &mtr, false); + + if (block->instant_recs->empty()) { + mtr_commit(&mtr); + continue; + } + + if (instant_to_lazy(block, id, page_size, &mtr)) { + goto loop; + } + + mtr_commit(&mtr); + incomplete_page.insert(val); + } + } + + if (!incomplete_page.empty()) { + m_cleanout_workers[slot]->add_pages(incomplete_page); + } + + m_cleanout_workers[slot]->wait(sig_counter); + } + + ib::info() << "Cleanout thread id " << slot << " exit"; + + m_cleanout_threads--; +} + +void run_cleanout_task() { + scn_mgr->cleanout_task(); +} + +void run_view_task() { + scn_mgr->view_task(); +} + +void SCN_Mgr::start() { + m_abort = false; + + std::vector ths; + ths.clear(); + + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + ths.push_back(std::thread(run_cleanout_task)); + } + + for (auto itr = ths.begin(); itr != ths.end(); itr++) { + itr->detach(); + } + + std::thread th2(run_view_task); + th2.detach(); + + while (m_cleanout_threads != m_max_cleanout_threads) { + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } + + while (!m_view_active.load()) { + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + +void SCN_Mgr::stop() { + m_abort = true; + + while (m_cleanout_threads.load() > 0 || m_view_active.load()) { + os_event_set(m_view_event); + + if (m_cleanout_threads.load() > 0) { + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + m_cleanout_workers[i]->wakeup(); + } + } + + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + diff --git a/mysql-8.0.31/storage/innobase/row/row0mysql.cc b/mysql-8.0.31/storage/innobase/row/row0mysql.cc index d5d918a..2e35a60 100644 --- a/mysql-8.0.31/storage/innobase/row/row0mysql.cc +++ b/mysql-8.0.31/storage/innobase/row/row0mysql.cc @@ -2961,6 +2961,7 @@ dberr_t row_create_index_for_mysql( goto error_handling; } + trx->add_scn_index(table->id, index->id); } else { dict_build_index_def(table, index, trx); #ifdef UNIV_DEBUG diff --git a/mysql-8.0.31/storage/innobase/row/row0pread-histogram.cc b/mysql-8.0.31/storage/innobase/row/row0pread-histogram.cc index 7238c08..8ab941b 100644 --- a/mysql-8.0.31/storage/innobase/row/row0pread-histogram.cc +++ b/mysql-8.0.31/storage/innobase/row/row0pread-histogram.cc @@ -335,7 +335,7 @@ dberr_t Histogram_sampler::process_non_leaf_rec( offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, UT_LOCATION_HERE, &heap); - if (ctx->is_rec_visible(rec, offsets, heap, &mtr)) { + if (ctx->is_rec_visible(page_cur_get_block(&cur), rec, offsets, heap, &mtr)) { err = sample_rec(ctx, rec, offsets, index, prebuilt); if (err != DB_SUCCESS) { diff --git a/mysql-8.0.31/storage/innobase/row/row0pread.cc b/mysql-8.0.31/storage/innobase/row/row0pread.cc index 6056afe..3750cb0 100644 --- a/mysql-8.0.31/storage/innobase/row/row0pread.cc +++ b/mysql-8.0.31/storage/innobase/row/row0pread.cc @@ -97,6 +97,11 @@ Parallel_reader::~Parallel_reader() { ut::delete_(thread_ctx); } } + + trx_t *trx = current_trx(); + if (trx != nullptr) { + trx->cache_scn_page = opt_trx_cache_cleanout_page; + } } size_t Parallel_reader::available_threads(size_t n_required, @@ -195,6 +200,13 @@ Parallel_reader::Parallel_reader(size_t max_threads) m_event = os_event_create(); m_sig_count = os_event_reset(m_event); + + trx_t *trx = current_trx(); + if (trx != nullptr) { + trx->cache_scn_page = false; + trx->reset_scn_pages(); + } + } Parallel_reader::Scan_ctx::Scan_ctx(Parallel_reader *reader, size_t id, @@ -462,11 +474,11 @@ dberr_t PCursor::move_to_next_block(dict_index_t *index) { return err; } -bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, +bool Parallel_reader::Scan_ctx::check_visibility(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { - const auto table_name = m_config.m_index->table->name; +// const auto table_name = m_config.m_index->table->name; ut_ad(!m_trx || m_trx->read_view == nullptr || MVCC::is_view_active(m_trx->read_view)); @@ -477,16 +489,8 @@ bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, auto view = m_trx->read_view; if (m_config.m_index->is_clustered()) { - trx_id_t rec_trx_id; - - if (m_config.m_index->trx_id_offset > 0) { - rec_trx_id = trx_read_trx_id(rec + m_config.m_index->trx_id_offset); - } else { - rec_trx_id = row_get_rec_trx_id(rec, m_config.m_index, offsets); - } - if (m_trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && - !view->changes_visible(rec_trx_id, table_name)) { + !view->changes_visible(m_config.m_index, block, rec, offsets)) { rec_t *old_vers; row_vers_build_for_consistent_read(rec, mtr, m_config.m_index, &offsets, @@ -741,7 +745,7 @@ dberr_t Parallel_reader::Ctx::traverse_recs(PCursor *pcursor, mtr_t *mtr) { bool skip{}; if (page_is_leaf(cur->block->frame)) { - skip = !m_scan_ctx->check_visibility(rec, offsets, heap, mtr); + skip = !m_scan_ctx->check_visibility(cur->block, rec, offsets, heap, mtr); } if (!skip) { diff --git a/mysql-8.0.31/storage/innobase/row/row0sel.cc b/mysql-8.0.31/storage/innobase/row/row0sel.cc index c1c82b5..561ad4d 100644 --- a/mysql-8.0.31/storage/innobase/row/row0sel.cc +++ b/mysql-8.0.31/storage/innobase/row/row0sel.cc @@ -882,7 +882,7 @@ static inline bool row_sel_test_other_conds( old_vers = nullptr; - if (!lock_clust_rec_cons_read_sees(clust_rec, index, offsets, + if (!lock_clust_rec_cons_read_sees(clust_rec, index, offsets, &plan->clust_pcur, node->read_view)) { err = row_sel_build_prev_vers(node->read_view, index, clust_rec, &offsets, @@ -1388,7 +1388,7 @@ static ulint row_sel_try_search_shortcut( UT_LOCATION_HERE, &heap); if (index->is_clustered()) { - if (!lock_clust_rec_cons_read_sees(rec, index, offsets, node->read_view)) { + if (!lock_clust_rec_cons_read_sees(rec, index, offsets, &(plan->pcur), node->read_view)) { ret = SEL_RETRY; goto func_exit; } @@ -1751,6 +1751,7 @@ skip_lock: if (index->is_clustered()) { if (!lock_clust_rec_cons_read_sees(rec, index, offsets, + &(plan->pcur), node->read_view)) { err = row_sel_build_prev_vers(node->read_view, index, rec, &offsets, &heap, &plan->old_vers_heap, &old_vers, @@ -3268,6 +3269,7 @@ non-clustered index. Does the necessary locking. if (trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && !lock_clust_rec_cons_read_sees(clust_rec, clust_index, *offsets, + prebuilt->clust_pcur, trx_get_read_view(trx))) { if (clust_rec != cached_clust_rec) { /* The following call returns 'offsets' associated with 'old_vers' */ @@ -3712,7 +3714,7 @@ static ulint row_sel_try_search_shortcut_for_mysql( *offsets = rec_get_offsets(rec, index, *offsets, ULINT_UNDEFINED, UT_LOCATION_HERE, heap); - if (!lock_clust_rec_cons_read_sees(rec, index, *offsets, + if (!lock_clust_rec_cons_read_sees(rec, index, *offsets, pcur, trx_get_read_view(trx))) { return (SEL_RETRY); } @@ -5275,7 +5277,7 @@ rec_loop: by skipping this lookup */ if (srv_force_recovery < 5 && - !lock_clust_rec_cons_read_sees(rec, index, offsets, + !lock_clust_rec_cons_read_sees(rec, index, offsets, pcur, trx_get_read_view(trx))) { rec_t *old_vers; /* The following call returns 'offsets' associated with 'old_vers' */ diff --git a/mysql-8.0.31/storage/innobase/row/row0vers.cc b/mysql-8.0.31/storage/innobase/row/row0vers.cc index 41608b2..4ad615a 100644 --- a/mysql-8.0.31/storage/innobase/row/row0vers.cc +++ b/mysql-8.0.31/storage/innobase/row/row0vers.cc @@ -586,7 +586,11 @@ bool row_vers_must_preserve_del_marked(trx_id_t trx_id, mtr_s_lock(&purge_sys->latch, mtr, UT_LOCATION_HERE); - return (!purge_sys->view.changes_visible(trx_id, name)); + if (!SCN_Mgr::is_scn(trx_id)) { + return true; + } + + return (!purge_sys->view.sees_version(trx_id)); } /** Check whether all non-virtual columns in a index entries match @@ -1272,7 +1276,7 @@ dberr_t row_vers_build_for_consistent_read( lob_undo->reset(); } - ut_ad(!view->changes_visible(trx_id, index->table->name)); + ut_ad(!(SCN_Mgr::is_scn(trx_id) && view->sees_version(trx_id))); ut_ad(!vrow || !(*vrow)); @@ -1314,9 +1318,7 @@ dberr_t row_vers_build_for_consistent_read( ut_a(!rec_offs_any_null_extern(index, prev_version, *offsets)); #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ - trx_id = row_get_rec_trx_id(prev_version, index, *offsets); - - if (view->changes_visible(trx_id, index->table->name)) { + if (view->changes_visible(index, nullptr, prev_version, *offsets)) { /* The view already sees this version: we can copy it to in_heap and return */ diff --git a/mysql-8.0.31/storage/innobase/srv/srv0srv.cc b/mysql-8.0.31/storage/innobase/srv/srv0srv.cc index 20910d5..5cf77ba 100644 --- a/mysql-8.0.31/storage/innobase/srv/srv0srv.cc +++ b/mysql-8.0.31/storage/innobase/srv/srv0srv.cc @@ -650,6 +650,9 @@ static ulint srv_log_writes_and_flush = 0; #endif /* !UNIV_HOTBACKUP */ +uint32_t srv_cleanout_threads = 8; +bool opt_trx_cache_cleanout_page = false; + /* Interval in seconds at which various tasks are performed by the master thread when server is active. In order to balance the workload, we should try to keep intervals such that they are not multiple of @@ -1722,6 +1725,10 @@ void srv_export_innodb_status(void) { } undo::spaces->s_unlock(); + if (scn_mgr != nullptr) { + export_vars.innodb_scn_cleanout_records = scn_mgr->cleanout_records(); + } + #ifdef UNIV_DEBUG rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); trx_id_t done_trx_no = purge_sys->done.trx_no; diff --git a/mysql-8.0.31/storage/innobase/srv/srv0start.cc b/mysql-8.0.31/storage/innobase/srv/srv0start.cc index 429ac32..f3e2b22 100644 --- a/mysql-8.0.31/storage/innobase/srv/srv0start.cc +++ b/mysql-8.0.31/storage/innobase/srv/srv0start.cc @@ -1783,6 +1783,8 @@ dberr_t srv_start(bool create_new_db) { trx_sys_create(); lock_sys_create(srv_lock_table_size); + scn_mgr = new SCN_Mgr(); + /* Create i/o-handler threads: */ /* For read only mode, we don't need ibuf and log I/O thread. @@ -2101,6 +2103,8 @@ dberr_t srv_start(bool create_new_db) { srv_dict_metadata = recv_recovery_from_checkpoint_finish(false); + scn_mgr->start(); + if (recv_sys->is_cloned_db && srv_dict_metadata != nullptr) { ut::delete_(srv_dict_metadata); srv_dict_metadata = nullptr; @@ -3176,6 +3180,10 @@ void srv_shutdown() { buffer pool to disk. */ dict_persist_to_dd_table_buffer(); + if (scn_mgr != nullptr) { + scn_mgr->stop(); + } + /* The steps 1-4 is the real InnoDB shutdown. All before was to stop activity which could produce new changes. All after is just cleaning up (freeing memory). */ @@ -3240,6 +3248,11 @@ void srv_shutdown() { lock_sys_close(); trx_pool_close(); + if (scn_mgr != nullptr) { + delete scn_mgr; + scn_mgr = nullptr; + } + dict_close(); dict_persist_close(); btr_search_sys_free(); diff --git a/mysql-8.0.31/storage/innobase/sync/sync0debug.cc b/mysql-8.0.31/storage/innobase/sync/sync0debug.cc index 20b400f..3936094 100644 --- a/mysql-8.0.31/storage/innobase/sync/sync0debug.cc +++ b/mysql-8.0.31/storage/innobase/sync/sync0debug.cc @@ -1387,6 +1387,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(TRX_SYS_SERIALISATION, SYNC_TRX_SYS_SERIALISATION, trx_sys_serialisation_mutex_key); + LATCH_ADD_MUTEX(TRX_SYS_MVCC, SYNC_NO_ORDER_CHECK, trx_sys_mvcc_mutex_key); + LATCH_ADD_MUTEX(SRV_SYS, SYNC_THREADS, srv_sys_mutex_key); LATCH_ADD_MUTEX(SRV_SYS_TASKS, SYNC_ANY_LATCH, srv_threads_mutex_key); diff --git a/mysql-8.0.31/storage/innobase/sync/sync0sync.cc b/mysql-8.0.31/storage/innobase/sync/sync0sync.cc index 70b9664..ceb8f43 100644 --- a/mysql-8.0.31/storage/innobase/sync/sync0sync.cc +++ b/mysql-8.0.31/storage/innobase/sync/sync0sync.cc @@ -130,6 +130,7 @@ mysql_pfs_key_t lock_wait_mutex_key; mysql_pfs_key_t trx_sys_mutex_key; mysql_pfs_key_t trx_sys_shard_mutex_key; mysql_pfs_key_t trx_sys_serialisation_mutex_key; +mysql_pfs_key_t trx_sys_mvcc_mutex_key; mysql_pfs_key_t srv_sys_mutex_key; mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/mysql-8.0.31/storage/innobase/trx/trx0purge.cc b/mysql-8.0.31/storage/innobase/trx/trx0purge.cc index 73dbf2b..95096ee 100644 --- a/mysql-8.0.31/storage/innobase/trx/trx0purge.cc +++ b/mysql-8.0.31/storage/innobase/trx/trx0purge.cc @@ -262,6 +262,8 @@ void trx_purge_sys_initialize(uint32_t n_purge_threads, trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + purge_sys->version = purge_sys->view.version(); + purge_sys->view_active = true; purge_sys->rseg_iter = ut::new_withkey( @@ -320,6 +322,7 @@ void trx_purge_add_update_undo_to_history( page_t *undo_page, /*!< in: update undo log header page, x-latched */ bool update_rseg_history_len, + bool is_insert, /*!< in: if true: update rseg history len else skip updating it. */ ulint n_added_logs, /*!< in: number of logs added */ @@ -330,7 +333,14 @@ void trx_purge_add_update_undo_to_history( trx_rsegf_t *rseg_header; trx_ulogf_t *undo_header; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + + ut_a(undo != nullptr); + rseg = undo->rseg; rseg_header = trx_rsegf_get(undo->rseg->space_id, undo->rseg->page_no, @@ -1711,7 +1721,7 @@ static void trx_purge_rseg_get_next_history_log( ut_a(rseg->last_page_no != FIL_NULL); - purge_sys->iter.trx_no = rseg->last_trx_no + 1; + purge_sys->iter.trx_no = rseg->last_trx_no + 2; purge_sys->iter.undo_no = 0; purge_sys->iter.undo_rseg_space = SPACE_UNKNOWN; purge_sys->next_stored = false; @@ -1783,6 +1793,11 @@ static void trx_purge_rseg_get_next_history_log( rseg->latch(); + if (purge_sys->iter.trx_no == trx_no + 2) { + //FIXME: ASSERT the page type is TRX_UNDO_INSERT + purge_sys->iter.trx_no = trx_no; + } + rseg->last_page_no = prev_log_addr.page; rseg->last_offset = prev_log_addr.boffset; rseg->last_trx_no = trx_no; @@ -1815,6 +1830,7 @@ static void trx_purge_read_undo_rec(trx_purge_t *purge_sys, uint64_t undo_no; space_id_t undo_rseg_space; trx_id_t modifier_trx_id; + trx_id_t modifier_trx_no; purge_sys->hdr_offset = purge_sys->rseg->last_offset; page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no; @@ -1826,7 +1842,7 @@ static void trx_purge_read_undo_rec(trx_purge_t *purge_sys, mtr_start(&mtr); undo_rec = trx_undo_get_first_rec( - &modifier_trx_id, purge_sys->rseg->space_id, page_size, + &modifier_trx_id, &modifier_trx_no, purge_sys->rseg->space_id, page_size, purge_sys->hdr_page_no, purge_sys->hdr_offset, RW_S_LATCH, &mtr); if (undo_rec != nullptr) { @@ -1892,7 +1908,7 @@ static trx_undo_rec_t *trx_purge_get_next_rec( mtr_t mtr; ut_ad(purge_sys->next_stored); - ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no()); + ut_ad(purge_sys->iter.trx_no < purge_sys->version.load()); space = purge_sys->rseg->space_id; page_no = purge_sys->page_no; @@ -2215,7 +2231,7 @@ void Purge_groups_t::distribute_if_needed() { } } - if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) { + if (purge_sys->iter.trx_no >= purge_sys->version.load()) { return nullptr; } @@ -2412,6 +2428,8 @@ ulint trx_purge(ulint n_purge_threads, /*!< in: number of purge tasks trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + purge_sys->version = purge_sys->view.version(); + purge_sys->view_active = true; rw_lock_x_unlock(&purge_sys->latch); diff --git a/mysql-8.0.31/storage/innobase/trx/trx0rec.cc b/mysql-8.0.31/storage/innobase/trx/trx0rec.cc index 60e4e41..81eb588 100644 --- a/mysql-8.0.31/storage/innobase/trx/trx0rec.cc +++ b/mysql-8.0.31/storage/innobase/trx/trx0rec.cc @@ -476,6 +476,7 @@ static bool trx_undo_report_insert_virtual(page_t *undo_page, /** Reports in the undo log of an insert of a clustered index record. @return offset of the inserted entry on the page if succeed, 0 if fail */ static ulint trx_undo_page_report_insert( + trx_undo_t *undo, /*!< in: undo log hdr object */ page_t *undo_page, /*!< in: undo log page */ trx_t *trx, /*!< in: transaction */ dict_index_t *index, /*!< in: clustered index */ @@ -497,7 +498,7 @@ static ulint trx_undo_page_report_insert( ut_ad(first_free <= UNIV_PAGE_SIZE); - if (trx_undo_left(undo_page, ptr) < 2 + 1 + 11 + 11) { + if (trx_undo_left(undo_page, ptr) < 2 + 1 + 11 + 11 + 9) { /* Not enough space for writing the general parameters */ return (0); @@ -506,6 +507,18 @@ static ulint trx_undo_page_report_insert( /* Reserve 2 bytes for the pointer to the next undo log record */ ptr += 2; + /* Indicate it's new version, that undo log also stores header offset */ + *ptr++ = (byte)TRX_UNDO_NEW_VERSION_TAG; + /* Store low-two bytes of trx id for verification */ + mach_write_to_2(ptr, (0xFF & trx->id)); + ptr += 2; + /* Write page no of header offset */ + mach_write_to_4(ptr, undo->hdr_page_no); + ptr += 4; + /* Write header offset */ + mach_write_to_2(ptr, undo->hdr_offset); + ptr += 2; + /* Store first some general parameters to the undo log */ *ptr++ = TRX_UNDO_INSERT_REC; ptr += mach_u64_write_much_compressed(ptr, trx->undo_no); @@ -544,6 +557,152 @@ static ulint trx_undo_page_report_insert( return (trx_undo_page_set_next_prev_and_add(undo_page, ptr, mtr)); } +bool trx_undo_rec_get_hdr( + trx_id_t id, + trx_undo_rec_t *undo_rec, + page_no_t &undo_hdr_no, + uint32_t &offset) { + const byte *ptr; + + ptr = undo_rec + 2; + + if (*ptr != TRX_UNDO_NEW_VERSION_TAG) { + /* old version */ + return false; + } + + ptr += 1; + uint16_t low_id = mach_read_from_2(ptr); + if (low_id != (id & 0xFF)) { + /* Possiblely purged */ + return false; + } + + ptr += 2; + + undo_hdr_no = mach_read_from_4(ptr); + ptr += 4; + offset = mach_read_from_2(ptr); + + return true; +} + +static inline trx_id_t trx_purge_get_version() { + trx_id_t purge_version = purge_sys->version.load(); + ut_a(purge_version > 2); + ut_a(SCN_Mgr::is_scn(purge_version)); + + return (purge_version - 2); +} + +trx_id_t trx_undo_hdr_get_scn(trx_id_t trx_id, page_id_t &page_id, uint32_t offset, mtr_t *mtr, page_t *undo_page) { + + trx_id_t purge_version = trx_purge_get_version(); + + if (undo_page == nullptr) { + page_no_t space_size = fil_space_get_size(page_id.space()); + if (space_size <= page_id.page_no()) { + return purge_version; + } + + bool found; + const page_size_t &page_size = fil_space_get_page_size(page_id.space(), &found); + ut_ad(found); + + undo_page = trx_undo_page_get_s_latched(page_id, page_size, mtr); + + if (undo_page == nullptr) { + return purge_version; + } + } + + /* Get undo header */ + trx_ulogf_t *undo_header = undo_page + offset; + trx_id_t modifier_trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + if (modifier_trx_id != trx_id) { + /* Possiblely purged */ + return purge_version; + } + + trx_id_t scn = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + + return scn; +} + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id) { + trx_id_t purge_version = trx_purge_get_version(); + + /* Decode rollback pointer */ + bool is_insert; + ulint rseg_id; + space_id_t space_id; + page_no_t page_no; + ulint offset; + trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no, &offset); + space_id = trx_rseg_id_to_space_id(rseg_id, false); + + page_no_t space_size = fil_space_get_size(space_id); + /* out of range, it must be purged */ + if (page_no >= space_size) { + return purge_version; + } + + bool found; + const page_size_t &page_size = fil_space_get_page_size(space_id, &found); + ut_ad(found); + + /* Get the page */ + mtr_t mtr; + mtr_start(&mtr); + page_t *undo_page = trx_undo_page_get_s_latched(page_id_t(space_id, page_no), + page_size, &mtr); + + if (undo_page == nullptr) { + /* invalid page */ + mtr_commit(&mtr); + return purge_version; + } + + /* Get record */ + trx_undo_rec_t *undo_rec = (trx_undo_rec_t *)(undo_page + offset); + + page_no_t undo_hdr_no; + uint32_t undo_hdr_offset; + + if (!trx_undo_rec_get_hdr(id, undo_rec, undo_hdr_no, undo_hdr_offset) + || undo_hdr_no >= space_size) { + /* older version or invalid undo log */ + mtr_commit(&mtr); + return purge_version; + } + + if (trx_undo_rec_get_table_id(undo_rec) != index->table->id) { + /* Wrong undo log, possiblely purged */ + mtr_commit(&mtr); + return purge_version; + } + + page_id_t undo_hdr_id = {space_id, undo_hdr_no}; + trx_id_t scn; + + if (undo_hdr_no == page_no) { + /* No need to read and lock page again */ + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, undo_page); + } else { + mtr_commit(&mtr); + mtr_start(&mtr); + + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, nullptr); + } + + mtr_commit(&mtr); + + return scn; +} + /** Reads from an undo log record the general parameters. @return remaining part of undo log record after reading these values */ byte *trx_undo_rec_get_pars( @@ -560,7 +719,7 @@ byte *trx_undo_rec_get_pars( { const byte *ptr; - ptr = undo_rec + 2; + ptr = undo_rec + trx_undo_start_offset(undo_rec); ptr = type_cmpl.read(ptr); *updated_extern = type_cmpl.is_lob_updated(); @@ -587,7 +746,7 @@ byte *trx_undo_rec_get_pars( @param[in] undo_rec Undo log record @return the table ID */ table_id_t trx_undo_rec_get_table_id(const trx_undo_rec_t *undo_rec) { - const byte *ptr = undo_rec + 2; + const byte *ptr = undo_rec + trx_undo_start_offset(undo_rec); uint8_t type_cmpl = mach_read_from_1(ptr); const bool blob_undo = type_cmpl & TRX_UNDO_MODIFY_BLOB; @@ -597,9 +756,9 @@ table_id_t trx_undo_rec_get_table_id(const trx_undo_rec_t *undo_rec) { type_cmpl flag + 1 byte for the new flag. Total 4 bytes. The new flag is currently unused and is available for future use. */ - ptr = undo_rec + 4; + ptr += 2; } else { - ptr = undo_rec + 3; + ptr++; } /* Skip the UNDO number */ @@ -1148,6 +1307,7 @@ static byte *trx_undo_report_blob_update(page_t *undo_page, dict_index_t *index, succeed, 0 if fail */ static ulint trx_undo_page_report_modify( /*========================*/ + trx_undo_t *undo, /*!< in: undo log object */ page_t *undo_page, /*!< in: undo log page */ trx_t *trx, /*!< in: transaction */ dict_index_t *index, /*!< in: clustered index where update or @@ -1199,7 +1359,7 @@ static ulint trx_undo_page_report_modify( ut_ad(first_free <= UNIV_PAGE_SIZE); - if (trx_undo_left(undo_page, ptr) < 50) { + if (trx_undo_left(undo_page, ptr) < 50 + 9) { /* NOTE: the value 50 must be big enough so that the general fields written below fit on the undo log page */ @@ -1209,6 +1369,18 @@ static ulint trx_undo_page_report_modify( /* Reserve 2 bytes for the pointer to the next undo log record */ ptr += 2; + /* Indicate it's new version, that undo log also stores header offset */ + *ptr++ = (byte)TRX_UNDO_NEW_VERSION_TAG; + /* Store low-two bytes of trx id for verification */ + mach_write_to_2(ptr, (0xFF & trx->id)); + ptr += 2; + /* Write page no of header offset */ + mach_write_to_4(ptr, undo->hdr_page_no); + ptr += 4; + /* Write header offset */ + mach_write_to_2(ptr, undo->hdr_offset); + ptr += 2; + /* Store first some general parameters to the undo log */ if (!update) { @@ -1251,6 +1423,7 @@ static ulint trx_undo_page_report_modify( ut_ad(flen == DATA_TRX_ID_LEN); trx_id = trx_read_trx_id(field); + ut_a(SCN_Mgr::is_scn(trx_id) || trx_id == trx->id); /* If it is an update of a delete marked record, then we are allowed to ignore blob prefixes if the delete marking was done @@ -2244,13 +2417,13 @@ dberr_t trx_undo_report_row_operation( switch (op_type) { case TRX_UNDO_INSERT_OP: - offset = trx_undo_page_report_insert(undo_page, trx, index, clust_entry, + offset = trx_undo_page_report_insert(undo, undo_page, trx, index, clust_entry, &mtr); break; default: ut_ad(op_type == TRX_UNDO_MODIFY_OP); offset = - trx_undo_page_report_modify(undo_page, trx, index, rec, offsets, + trx_undo_page_report_modify(undo, undo_page, trx, index, rec, offsets, update, cmpl_info, clust_entry, &mtr); } @@ -2416,7 +2589,7 @@ err_exit: rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_history = purge_sys->view.changes_visible(trx_id, name); + missing_history = purge_sys->view.sees_version(trx_id); if (!missing_history) { *undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap, is_temp); } @@ -2472,6 +2645,10 @@ bool trx_undo_prev_version_build( rec_trx_id = row_get_rec_trx_id(rec, index, offsets); + if (!SCN_Mgr::is_scn(rec_trx_id)) { + rec_trx_id = scn_mgr->get_scn(rec_trx_id, index, roll_ptr); + } + /* REDO rollback segments are used only for non-temporary objects. For temporary objects NON-REDO rollback segments are used. */ bool is_temp = index->table->is_temporary(); @@ -2503,6 +2680,17 @@ bool trx_undo_prev_version_build( ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr, &info_bits); + trx_id_t scn = trx_id; + if (!SCN_Mgr::is_scn(trx_id)) { + /* read from undo header */ + scn = scn_mgr->get_scn(trx_id, index, roll_ptr); + if (scn != TRX_ID_MAX) { + trx_id = scn; + } + } + + ut_a(SCN_Mgr::is_scn(trx_id) || scn == TRX_ID_MAX); + /* (a) If a clustered index record version is such that the trx id stamp in it is bigger than purge_sys->view, then the BLOBs in that version are known to exist (the purge has not @@ -2555,8 +2743,7 @@ bool trx_undo_prev_version_build( rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_extern = - purge_sys->view.changes_visible(trx_id, index->table->name); + missing_extern = purge_sys->view.sees_version(trx_id); rw_lock_s_unlock(&purge_sys->latch); diff --git a/mysql-8.0.31/storage/innobase/trx/trx0sys.cc b/mysql-8.0.31/storage/innobase/trx/trx0sys.cc index 7a507cd..382e765 100644 --- a/mysql-8.0.31/storage/innobase/trx/trx0sys.cc +++ b/mysql-8.0.31/storage/innobase/trx/trx0sys.cc @@ -67,7 +67,12 @@ void ReadView::check_trx_id_sanity(trx_id_t id, const table_name_t &name) { return; } - if (id >= trx_sys_get_next_trx_id_or_no()) { + if (SCN_Mgr::is_scn(id)) { + //FIXME + return; + } + + if (id >= trx_sys->get_max_trx_id()) { ib::warn(ER_IB_MSG_1196) << "A transaction id" << " in a record of table " << name << " is newer than the" @@ -128,7 +133,8 @@ void trx_sys_write_max_trx_id(void) { sys_header = trx_sysf_get(&mtr); - const trx_id_t max_trx_id = trx_sys->next_trx_id_or_no.load(); + const trx_id_t max_trx_id = std::max(trx_sys->get_max_trx_id(), + trx_sys->get_max_trx_scn()); mlog_write_ull(sys_header + TRX_SYS_TRX_ID_STORE, max_trx_id, &mtr); @@ -154,7 +160,7 @@ trx_id_t trx_sys_oldest_trx_no() { auto trx = UT_LIST_GET_FIRST(trx_sys->serialisation_list); return (trx->no); } - return trx_sys_get_next_trx_id_or_no(); + return trx_sys->get_max_trx_scn(); } void trx_sys_get_binlog_prepared(std::vector &trx_ids) { @@ -494,18 +500,22 @@ purge_pq_t *trx_sys_init_at_db_start(void) { - and one that has acquired the trx_sys_serialisation_mutex. If you decreased the factor 2, the test innodb.max_trx_id should fail. */ - trx_sys->next_trx_id_or_no.store(max_trx_id + + trx_sys->next_trx_id.store(max_trx_id + 2 * trx_sys_get_trx_id_write_margin()); - trx_sys->serialisation_min_trx_no.store(trx_sys->next_trx_id_or_no.load()); - mtr.commit(); + if (trx_sys->next_trx_id % 2 != 0) { + trx_sys->next_trx_id++; + } + trx_sys->next_trx_scn = trx_sys->next_trx_id.load() + 1; + + trx_sys->serialisation_min_trx_no.store(trx_sys->next_trx_scn.load()); #ifdef UNIV_DEBUG /* max_trx_id is the next transaction ID to assign. Initialize maximum transaction number to one less if all transactions are already purged. */ if (trx_sys->rw_max_trx_no == 0) { - trx_sys->rw_max_trx_no = trx_sys_get_next_trx_id_or_no() - 1; + trx_sys->rw_max_trx_no = trx_sys->get_max_trx_scn() - 2; } #endif /* UNIV_DEBUG */ @@ -517,6 +527,8 @@ purge_pq_t *trx_sys_init_at_db_start(void) { trx_lists_init_at_db_start(); + scn_mgr->init(); + /* This mutex is not strictly required, it is here only to satisfy the debug code (assertions). We are still running in single threaded bootstrap mode. */ @@ -545,7 +557,7 @@ purge_pq_t *trx_sys_init_at_db_start(void) { << rows_to_undo << unit << " row operations to undo"; ib::info(ER_IB_MSG_1199) - << "Trx id counter is " << trx_sys_get_next_trx_id_or_no(); + << "Trx id counter is " << trx_sys->get_max_trx_id(); } trx_sys->found_prepared_trx = trx_sys->n_prepared_trx > 0; diff --git a/mysql-8.0.31/storage/innobase/trx/trx0trx.cc b/mysql-8.0.31/storage/innobase/trx/trx0trx.cc index 3a5bfc4..da84129 100644 --- a/mysql-8.0.31/storage/innobase/trx/trx0trx.cc +++ b/mysql-8.0.31/storage/innobase/trx/trx0trx.cc @@ -236,6 +236,10 @@ static void trx_init(trx_t *trx) { trx->flush_observer = nullptr; + trx->scn_indexs.clear(); + trx->reset_scn_pages(); + trx->cache_scn_page = opt_trx_cache_cleanout_page; + ++trx->version; } @@ -253,6 +257,9 @@ struct TrxFactory { the constructors of the trx_t members. */ new (trx) trx_t(); + new (&trx->scn_indexs) SCNIndexIds(); + new (&trx->scn_pages) PageSets(); + trx_init(trx); trx->state.store(TRX_STATE_NOT_STARTED, std::memory_order_relaxed); @@ -322,6 +329,9 @@ struct TrxFactory { trx->lock.rec_pool.~lock_pool_t(); trx->lock.table_pool.~lock_pool_t(); + + trx->scn_indexs.~SCNIndexIds(); + trx->scn_pages.~PageSets(); } /** Enforce any invariants here, this is called before the transaction @@ -493,6 +503,9 @@ static void trx_free(trx_t *&trx) { trx->mod_tables.clear(); + trx->scn_indexs.clear(); + trx->reset_scn_pages(); + ut_ad(trx->read_view == nullptr); ut_ad(trx->is_dd_trx == false); @@ -1444,7 +1457,7 @@ static inline void trx_erase_from_serialisation_list_low(trx_t *trx) { UT_LIST_GET_FIRST(trx_sys->serialisation_list)->no); } else { - trx_sys->serialisation_min_trx_no.store(trx_sys_get_next_trx_id_or_no()); + trx_sys->serialisation_min_trx_no.store(trx_sys->get_max_trx_scn()); } } @@ -1541,11 +1554,6 @@ static bool trx_write_serialisation_history( temp_mtr.set_log_mode(MTR_LOG_NO_REDO); } - /* If transaction involves insert then truncate undo logs. */ - if (trx->rsegs.m_redo.insert_undo != nullptr) { - trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); - } - if (trx->rsegs.m_noredo.insert_undo != nullptr) { trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr); } @@ -1554,7 +1562,7 @@ static bool trx_write_serialisation_history( /* If transaction involves update then add rollback segments to purge queue. */ - if (trx->rsegs.m_redo.update_undo != nullptr || + if (trx_is_redo_rseg_updated(trx) || trx->rsegs.m_noredo.update_undo != nullptr) { /* Assign the transaction serialisation number and add these rollback segments to purge trx-no sorted priority queue @@ -1562,7 +1570,7 @@ static bool trx_write_serialisation_history( rollback segments. */ trx_undo_ptr_t *redo_rseg_undo_ptr = - trx->rsegs.m_redo.update_undo != nullptr ? &trx->rsegs.m_redo : nullptr; + trx_is_redo_rseg_updated(trx) ? &trx->rsegs.m_redo : nullptr; trx_undo_ptr_t *temp_rseg_undo_ptr = trx->rsegs.m_noredo.update_undo != nullptr ? &trx->rsegs.m_noredo @@ -1572,11 +1580,13 @@ static bool trx_write_serialisation_history( serialised = trx_serialisation_number_get(trx, redo_rseg_undo_ptr, temp_rseg_undo_ptr); + ulint added_log = 0; /* It is not necessary to obtain trx->undo_mutex here because only a single OS thread is allowed to do the transaction commit for this transaction. */ if (trx->rsegs.m_redo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish(trx->rsegs.m_redo.update_undo, mtr); @@ -1585,28 +1595,40 @@ static bool trx_write_serialisation_history( non-redo update_undo too. This is to avoid immediate invocation of purge as we need to club these 2 segments with same trx-no as single unit. */ - bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr || trx->rsegs.m_redo.insert_undo != nullptr); /* Set flag if GTID information need to persist. */ auto undo_ptr = &trx->rsegs.m_redo; trx_undo_gtid_set(trx, undo_ptr->update_undo, false); - trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, - (update_rseg_len ? 1 : 0), mtr); + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, false, + (update_rseg_len ? added_log : 0), mtr); } DBUG_EXECUTE_IF("ib_trx_crash_during_commit", DBUG_SUICIDE();); + if (trx->rsegs.m_redo.insert_undo != nullptr) { + page_t *undo_hdr_page; + added_log++; + + undo_hdr_page = + trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); + + bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + auto undo_ptr = &trx->rsegs.m_redo; + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, true, + (update_rseg_len ? added_log : 0), mtr); + } + if (trx->rsegs.m_noredo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish( trx->rsegs.m_noredo.update_undo, &temp_mtr); - ulint n_added_logs = (redo_rseg_undo_ptr != nullptr) ? 2 : 1; - - trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, - n_added_logs, &temp_mtr); + trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, false, + added_log, &temp_mtr); } } @@ -1791,6 +1813,26 @@ static void trx_erase_lists(trx_t *trx) { trx_sys->mvcc->view_close(trx->read_view, true); } } + + if (!trx->scn_indexs.empty()) { + trx_id_t scn = trx->no; + if (scn == TRX_ID_MAX) { + scn = trx_sys->get_next_trx_scn(); + } + + for (auto &item : trx->scn_indexs) { + dict_index_t *index = dict_index_get_by_id(item.first, item.second); + + if (index != nullptr) { + ut_a(index->trx_id == trx->id); + index->trx_scn = scn; + index->table->release(); + } + } + + trx->scn_indexs.clear(); + } + DEBUG_SYNC_C("after_trx_erase_lists"); } @@ -1894,6 +1936,9 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { trx_erase_from_serialisation_list_low(trx); trx_sys_serialisation_mutex_exit(); + + ut_a(trx->no != TRX_ID_MAX); + scn_mgr->store_scn(trx->id, trx->no); } lock_trx_release_locks(trx); @@ -1975,9 +2020,7 @@ written */ gtid_persistor.set_persist_gtid(trx, false); if (mtr != nullptr) { - if (trx->rsegs.m_redo.insert_undo != nullptr) { - trx_undo_insert_cleanup(&trx->rsegs.m_redo, false); - } + scn_mgr->add_pages(trx); if (trx->rsegs.m_noredo.insert_undo != nullptr) { trx_undo_insert_cleanup(&trx->rsegs.m_noredo, true); @@ -2267,6 +2310,7 @@ ReadView *trx_assign_read_view(trx_t *trx) /*!< in/out: active transaction */ } else if (!MVCC::is_view_active(trx->read_view)) { trx_sys->mvcc->view_open(trx->read_view, trx); + trx->read_view->set_trx(trx); } return (trx->read_view); diff --git a/mysql-8.0.31/storage/innobase/trx/trx0undo.cc b/mysql-8.0.31/storage/innobase/trx/trx0undo.cc index 0147a5a..7a38740 100644 --- a/mysql-8.0.31/storage/innobase/trx/trx0undo.cc +++ b/mysql-8.0.31/storage/innobase/trx/trx0undo.cc @@ -293,12 +293,14 @@ trx_undo_rec_t *trx_undo_get_next_rec( @param[in,out] mtr Mini-transaction @return undo log record, the page latched, NULL if none */ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, + trx_id_t *modifier_trx_no, space_id_t space, const page_size_t &page_size, page_no_t page_no, ulint offset, ulint mode, mtr_t *mtr) { page_t *undo_page; trx_undo_rec_t *rec; + ulint type; const page_id_t page_id(space, page_no); @@ -311,6 +313,15 @@ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, if (modifier_trx_id != nullptr) { trx_ulogf_t *undo_header = undo_page + offset; *modifier_trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + ut_a(modifier_trx_no != nullptr); + *modifier_trx_no = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + ut_a(SCN_Mgr::is_scn(*modifier_trx_no)); + + /* For insert undo, return null */ + type = mtr_read_ulint(undo_page + TRX_UNDO_PAGE_HDR, MLOG_2BYTES, mtr); + if (type == TRX_UNDO_INSERT) { + return nullptr; + } } rec = trx_undo_page_get_first_rec(undo_page, page_no, offset); @@ -1197,7 +1208,7 @@ loop: mtr.set_log_mode(MTR_LOG_NO_REDO); } - rec = trx_undo_get_first_rec(nullptr, rseg->space_id, rseg->page_size, + rec = trx_undo_get_first_rec(nullptr, nullptr, rseg->space_id, rseg->page_size, hdr_page_no, hdr_offset, RW_X_LATCH, &mtr); if (rec == nullptr) { /* Already empty */ @@ -1639,8 +1650,9 @@ static trx_undo_t *trx_undo_reuse_cached(trx_rseg_t *rseg, ulint type, auto undo_page = trx_undo_page_get(page_id_t(undo->space, undo->hdr_page_no), undo->page_size, mtr); - ulint offset; + ulint offset = trx_undo_header_create(undo_page, trx_id, mtr); +#if 0 if (type == TRX_UNDO_INSERT) { offset = trx_undo_insert_header_reuse(undo_page, trx_id, mtr); gtid_storage = trx_undo_t::Gtid_storage::NONE; @@ -1651,6 +1663,7 @@ static trx_undo_t *trx_undo_reuse_cached(trx_rseg_t *rseg, ulint type, offset = trx_undo_header_create(undo_page, trx_id, mtr); } +#endif trx_undo_header_add_space_for_xid(undo_page, undo_page + offset, mtr, gtid_storage); @@ -1815,9 +1828,6 @@ page_t *trx_undo_set_state_at_finish( if (undo->size == 1 && mach_read_from_2(page_hdr + TRX_UNDO_PAGE_FREE) < TRX_UNDO_PAGE_REUSE_LIMIT) { state = TRX_UNDO_CACHED; - - } else if (undo->type == TRX_UNDO_INSERT) { - state = TRX_UNDO_TO_FREE; } else { state = TRX_UNDO_TO_PURGE; } @@ -1920,25 +1930,38 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr) { trx_rseg_t *rseg; trx_undo_t *undo; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + rseg = undo_ptr->rseg; ut_ad(mutex_own(&(rseg->mutex))); trx_purge_add_update_undo_to_history( - trx, undo_ptr, undo_page, update_rseg_history_len, n_added_logs, mtr); - - UT_LIST_REMOVE(rseg->update_undo_list, undo); + trx, undo_ptr, undo_page, update_rseg_history_len, is_insert, n_added_logs, mtr); - undo_ptr->update_undo = nullptr; + if (is_insert) { + UT_LIST_REMOVE(rseg->insert_undo_list, undo); + undo_ptr->insert_undo = nullptr; + } else { + UT_LIST_REMOVE(rseg->update_undo_list, undo); + undo_ptr->update_undo = nullptr; + } if (undo->state == TRX_UNDO_CACHED) { - UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + if (is_insert) { + UT_LIST_ADD_FIRST(rseg->insert_undo_cached, undo); + } else { + UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + } MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED); } else {