diff --git a/storage/innobase/btr/btr0btr.cc b/storage/innobase/btr/btr0btr.cc index 97bc20c3287..c143e5fe08a 100644 --- a/storage/innobase/btr/btr0btr.cc +++ b/storage/innobase/btr/btr0btr.cc @@ -1182,6 +1182,8 @@ bool btr_page_reorganize_low(bool recovery, ulint z_level, page_cur_t *cursor, ulint pos; bool log_compressed; + /* Cleanout the scn of the block if needed */ + scn_mgr->batch_write_scn(block, mtr); #ifndef UNIV_HOTBACKUP ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table)); #endif /* !UNIV_HOTBACKUP */ @@ -2434,6 +2436,7 @@ func_start: btr_page_create(new_block, new_page_zip, cursor->index, btr_page_get_level(page), mtr); + ut_ad(new_block->is_cursor_empty()); /* 3. Calculate the first record on the upper half-page, and the first record (move_limit) on original page which ends up on the upper half */ @@ -2504,6 +2507,9 @@ func_start: has segment header and already modified in most of cases.*/ } + ut_ad(block->is_cursor_empty()); + ut_ad(new_block->is_cursor_empty()); + /* 5. Move then the records to the new page */ if (direction == FSP_DOWN) { /* fputs("Split left\n", stderr); */ @@ -3045,6 +3051,8 @@ bool btr_compress(btr_cur_t *cursor, bool adjust, mtr_t *mtr) { page = btr_cur_get_page(cursor); index = cursor->index; + scn_mgr->batch_write_scn(block, mtr); + btr_assert_not_corrupted(block, index); #ifdef UNIV_DEBUG diff --git a/storage/innobase/btr/btr0cur.cc b/storage/innobase/btr/btr0cur.cc index 1e8c4d6f037..8f4ffa8d6f2 100644 --- a/storage/innobase/btr/btr0cur.cc +++ b/storage/innobase/btr/btr0cur.cc @@ -3014,6 +3014,13 @@ dberr_t btr_cur_pessimistic_insert( } } + buf_block_t *block = btr_cur_get_block(cursor); + if (index->is_clustered() && !index->table->is_intrinsic()) { + scn_mgr->batch_write_scn(block, mtr); + } + + ut_ad(block->is_cursor_empty()); + if (dict_index_get_page(index) == btr_cur_get_block(cursor)->page.id.page_no()) { /* The page is the root page */ @@ -3115,6 +3122,11 @@ dberr_t btr_cur_pessimistic_insert( } } + if (thr) { + scn_mgr->set_scn(thr_get_trx(thr)->id, mtr, + btr_cur_get_rec(cursor), index, offsets); + } + /* Append the info about the update in the undo log */ return (trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr, index, @@ -4313,6 +4325,8 @@ dberr_t btr_cur_del_mark_set_clust_rec( return (err); } + scn_mgr->set_scn(thr_get_trx(thr)->id, mtr, rec, index, offsets); + err = trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr, index, entry, nullptr, 0, rec, offsets, &roll_ptr); @@ -4645,6 +4659,10 @@ bool btr_cur_pessimistic_delete(dberr_t *err, bool has_reserved_extents, Scoped_heap heap_ptr(1024, UT_LOCATION_HERE); mem_heap_t *heap = heap_ptr.get(); + if (index->is_clustered()) { + scn_mgr->batch_write_scn(block, mtr); + } + rec = btr_cur_get_rec(cursor); #ifdef UNIV_ZIP_DEBUG page_zip_des_t *page_zip = buf_block_get_page_zip(block); diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index 1ce9a566721..754c42e5078 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -763,6 +763,7 @@ static void buf_block_init( block->page.m_version = 0; block->modify_clock = 0; + block->clear_cursor_no_mutex(); ut_d(block->page.file_page_was_freed = false); @@ -1047,6 +1048,8 @@ static buf_chunk_t *buf_chunk_init( block = chunk->blocks; for (i = chunk->size; i--;) { + block->alloc_scn_recs(); + buf_block_init(buf_pool, block, frame); UNIV_MEM_INVALID(block->frame, UNIV_PAGE_SIZE); @@ -1397,6 +1400,7 @@ static void buf_pool_free_instance(buf_pool_t *buf_pool) { buf_block_t *block = chunk->blocks; for (ulint i = chunk->size; i--; block++) { + block->free_scn_recs(); mutex_free(&block->mutex); rw_lock_free(&block->lock); @@ -1628,6 +1632,7 @@ static bool buf_page_realloc(buf_pool_t *buf_pool, buf_block_t *block) { ut_ad(new_block->page.in_page_hash); + block->clear_cursor_no_mutex(); buf_block_modify_clock_inc(block); memset(block->frame + FIL_PAGE_OFFSET, 0xff, 4); memset(block->frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, 0xff, 4); @@ -2383,6 +2388,8 @@ withdraw_retry: buf_block_t *block = chunk->blocks; for (ulint j = chunk->size; j--; block++) { + /* Can't hold block mutex, otherwise break lock order */ + block->free_scn_recs_no_mutex(); mutex_free(&block->mutex); rw_lock_free(&block->lock); @@ -3374,6 +3381,7 @@ static inline void buf_block_init_low( block->ahi.index = nullptr; block->made_dirty_with_no_latch = false; + block->clear_cursor_no_mutex(); block->n_hash_helps = 0; block->ahi.recommended_prefix_info = {0, 1, true}; ut_a(block->page.get_space() != nullptr); @@ -3613,6 +3621,10 @@ struct Buf_fetch { mtr_t *m_mtr{}; /** Mark page as dirty even if page is being pinned without any latch. */ bool m_dirty_with_no_latch{}; + /** Allow reading a freed block and return null */ + bool m_allow_freed{}; + /** True if reading a freed block */ + bool m_freed_flag {}; /** Number of retries before giving up. */ size_t m_retries{}; /** Buffer pool to fetch from. */ @@ -3649,6 +3661,11 @@ dberr_t Buf_fetch_normal::get(buf_block_t *&block) noexcept { if (block != nullptr) { if (block->page.was_stale()) { + if (m_allow_freed) { + rw_lock_s_unlock(m_hash_lock); + return (DB_NOT_FOUND); + } + if (!buf_page_free_stale(m_buf_pool, &block->page, m_hash_lock)) { /* The page is during IO and can't be released. We wait some to not go into loop that would consume CPU. This is not something that will be @@ -3669,6 +3686,10 @@ dberr_t Buf_fetch_normal::get(buf_block_t *&block) noexcept { /* Page not in buf_pool: needs to be read from file */ read_page(); + + if (m_freed_flag) { + return (DB_NOT_FOUND); + } } return DB_SUCCESS; @@ -3700,6 +3721,11 @@ dberr_t Buf_fetch_other::get(buf_block_t *&block) noexcept { if (block != nullptr) { /* Here we have MDL latches making the stale status to not change. */ if (block->page.was_stale()) { + if (m_allow_freed) { + rw_lock_s_unlock(m_hash_lock); + return (DB_NOT_FOUND); + } + if (!buf_page_free_stale(m_buf_pool, &block->page, m_hash_lock)) { /* The page is during IO and can't be released. We wait some to not go into loop that would consume CPU. This is not something that will be @@ -4050,6 +4076,9 @@ void Buf_fetch::read_page() { buf_read_ahead_random(m_page_id, m_page_size, ibuf_inside(m_mtr)); } m_retries = 0; + } else if (m_allow_freed) { + m_freed_flag = true; + return; } else if (m_retries < BUF_PAGE_READ_MAX_RETRIES) { ++m_retries; @@ -4231,7 +4260,7 @@ buf_block_t *Buf_fetch::single_page() { if (static_cast(this)->get(block) == DB_NOT_FOUND) { return (nullptr); } - ut_a(!block->page.was_stale()); + ut_a(!block->page.was_stale() || m_allow_freed); if (is_optimistic()) { const auto bpage = &block->page; @@ -4309,7 +4338,7 @@ buf_block_t *Buf_fetch::single_page() { } #endif /* UNIV_DEBUG */ - ut_ad(is_possibly_freed() || !block->page.file_page_was_freed); + ut_ad(is_possibly_freed() || m_allow_freed || !block->page.file_page_was_freed); /* Check if this is the first access to the page */ const auto access_time = buf_page_is_accessed(&block->page); @@ -4367,7 +4396,7 @@ buf_block_t *Buf_fetch::single_page() { ut_ad(!rw_lock_own(m_hash_lock, RW_LOCK_X)); ut_ad(!rw_lock_own(m_hash_lock, RW_LOCK_S)); - ut_a(!block->page.was_stale()); + ut_a(!block->page.was_stale() || m_allow_freed); return (block); } @@ -4376,7 +4405,7 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, const page_size_t &page_size, ulint rw_latch, buf_block_t *guess, Page_fetch mode, ut::Location location, mtr_t *mtr, - bool dirty_with_no_latch) { + bool dirty_with_no_latch, bool allow_freed) { #ifdef UNIV_DEBUG ut_ad(mtr->is_active()); @@ -4421,6 +4450,8 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, fetch.m_line = location.line; fetch.m_mtr = mtr; fetch.m_dirty_with_no_latch = dirty_with_no_latch; + fetch.m_allow_freed = allow_freed; + fetch.m_freed_flag = false; return (fetch.single_page()); @@ -4434,6 +4465,8 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, fetch.m_line = location.line; fetch.m_mtr = mtr; fetch.m_dirty_with_no_latch = dirty_with_no_latch; + fetch.m_allow_freed = allow_freed; + fetch.m_freed_flag = false; return (fetch.single_page()); } @@ -4962,6 +4995,7 @@ buf_page_t *buf_page_init_for_read(ulint mode, const page_id_t &page_id, ut_d(bpage->in_flush_list = false); ut_d(bpage->in_free_list = false); ut_d(bpage->in_LRU_list = false); + ut_d(bpage->in_scn_cleanout = false); ut_d(bpage->in_page_hash = true); diff --git a/storage/innobase/ddl/ddl0ddl.cc b/storage/innobase/ddl/ddl0ddl.cc index d83102e67dd..7e76abe649e 100644 --- a/storage/innobase/ddl/ddl0ddl.cc +++ b/storage/innobase/ddl/ddl0ddl.cc @@ -253,6 +253,8 @@ dict_index_t *create_index(trx_t *trx, dict_table_t *table, return nullptr; } + trx->add_scn_index(table->id, index->id); + if (dict_index_is_spatial(index)) { index->fill_srid_value(index_def->m_srid, index_def->m_srid_is_valid); } diff --git a/storage/innobase/dict/dict0crea.cc b/storage/innobase/dict/dict0crea.cc index 6a85731fd0f..67f673f8cb5 100644 --- a/storage/innobase/dict/dict0crea.cc +++ b/storage/innobase/dict/dict0crea.cc @@ -381,6 +381,9 @@ void dict_build_index_def(const dict_table_t *table, /*!< in: table */ /* Note that the index was created by this transaction. */ index->trx_id = trx->id; + + /* Init to zero, and will be set while committing the transaction */ + index->trx_scn = 0; } /** Creates an index tree for the index if it is not a member of a cluster. diff --git a/storage/innobase/dict/dict0dd.cc b/storage/innobase/dict/dict0dd.cc index 13fada010a3..b53bb75927a 100644 --- a/storage/innobase/dict/dict0dd.cc +++ b/storage/innobase/dict/dict0dd.cc @@ -5213,6 +5213,18 @@ dict_table_t *dd_open_table_one(dd::cache::Dictionary_client *client, index->id = id; index->trx_id = trx_id; + trx_id_t scn = scn_mgr->get_scn_fast(trx_id); + if (scn == TRX_ID_MAX) { + /* still active */ + } else if (scn == 0) { + //FIXME: Also persist index scn to DD, otherwise it may + //get lost after reloading + ut_a(scn_mgr->startup_scn() > 0); + scn = scn_mgr->startup_scn(); + } + + index->trx_scn = scn; + /** Look up the spatial reference system in the dictionary. Since this may cause a table open to read the dictionary tables, it must be done while not holding diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc index af3b90fb342..eb2c7bd3fb6 100644 --- a/storage/innobase/dict/dict0dict.cc +++ b/storage/innobase/dict/dict0dict.cc @@ -2487,6 +2487,7 @@ dberr_t dict_index_add_to_cache_w_vcol(dict_table_t *table, dict_index_t *index, } new_index->n_total_fields = new_index->n_def; new_index->trx_id = index->trx_id; + new_index->trx_scn = index->trx_scn; new_index->set_committed(index->is_committed()); new_index->allow_duplicates = index->allow_duplicates; new_index->nulls_equal = index->nulls_equal; @@ -2640,6 +2641,44 @@ dberr_t dict_index_add_to_cache_w_vcol(dict_table_t *table, dict_index_t *index, return (DB_SUCCESS); } +/** Get index object by table id and index id +@param[in] table_id table id +@param[in] index_id index id +@return index, NULL if does not exist */ +dict_index_t *dict_index_get_by_id(table_id_t table_id, space_index_t index_id) { + /** Get table object by id */ + mutex_enter(&dict_sys->mutex); + dict_table_t *table = nullptr; + + HASH_SEARCH(id_hash, dict_sys->table_id_hash, ut::hash_uint64(table_id), + dict_table_t *, table, + ut_ad(table->cached), table->id == table_id); + + if (table == nullptr) { + mutex_exit(&dict_sys->mutex); + return nullptr; + } + + table->acquire(); + + mutex_exit(&dict_sys->mutex); + + /* Get the index object by id */ + dict_index_t *index = table->first_index(); + while (index != nullptr) { + if (index->id == index_id) { + /* Table will be released by caller */ + return index; + } + + index = index->next(); + } + + table->release(); + + return nullptr; +} + /** Removes an index from the dictionary cache. */ static void dict_index_remove_from_cache_low( dict_table_t *table, /*!< in/out: table */ diff --git a/storage/innobase/dict/dict0mem.cc b/storage/innobase/dict/dict0mem.cc index 611ef80e89b..257b4e94d75 100644 --- a/storage/innobase/dict/dict0mem.cc +++ b/storage/innobase/dict/dict0mem.cc @@ -612,10 +612,12 @@ bool dict_index_t::is_usable(const trx_t *trx) const { return false; } +// ut_ad(trx_scn > 0 || trx_id == 0 || table->is_intrinsic() || table->is_temporary()); + /* Check if the specified transaction can see this index. */ - return (table->is_temporary() || trx_id == 0 || + return (table->is_temporary() || trx_id == 0 || trx->id == trx_id || !MVCC::is_view_active(trx->read_view) || - trx->read_view->changes_visible(trx_id, table->name)); + (trx_scn > 0 && trx->read_view->sees_version(trx_scn))); } #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 2ad58f17f25..0222cea4081 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -748,6 +748,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { PSI_MUTEX_KEY(sync_thread_mutex, 0, 0, PSI_DOCUMENT_ME), #endif /* UNIV_DEBUG */ PSI_MUTEX_KEY(trx_undo_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(trx_scn_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_pool_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_pool_manager_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(temp_pool_manager_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -768,6 +769,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { PSI_MUTEX_KEY(trx_sys_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_shard_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(trx_sys_serialisation_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(trx_sys_mvcc_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(zip_pad_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(master_key_id_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(sync_array_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -1296,6 +1298,9 @@ static SHOW_VAR innodb_status_variables[] = { {"undo_tablespaces_active", (char *)&export_vars.innodb_undo_tablespaces_active, SHOW_LONG, SHOW_SCOPE_GLOBAL}, + {"scn_cleanout_records", + (char *)&export_vars.innodb_scn_cleanout_records, SHOW_LONG, + SHOW_SCOPE_GLOBAL}, #ifdef UNIV_DEBUG {"purge_trx_id_age", (char *)&export_vars.innodb_purge_trx_id_age, SHOW_LONG, SHOW_SCOPE_GLOBAL}, @@ -23394,6 +23399,11 @@ char **thd_innodb_interpreter(THD *thd) { } #endif /* UNIV_DEBUG */ +static MYSQL_SYSVAR_UINT(cleanout_threads, srv_cleanout_threads, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "Number of threads to cleanout SCN", + NULL, NULL, 8, 1, 256, 0); + static SYS_VAR *innobase_system_variables[] = { MYSQL_SYSVAR(api_trx_level), MYSQL_SYSVAR(api_bk_commit_interval), @@ -23614,6 +23624,7 @@ static SYS_VAR *innobase_system_variables[] = { #endif /* UNIV_DEBUG */ MYSQL_SYSVAR(parallel_read_threads), MYSQL_SYSVAR(segment_reserve_factor), + MYSQL_SYSVAR(cleanout_threads), nullptr}; mysql_declare_plugin(innobase){ diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc index 23af83f7df8..f0b8a4d558e 100644 --- a/storage/innobase/handler/handler0alter.cc +++ b/storage/innobase/handler/handler0alter.cc @@ -3267,9 +3267,9 @@ static void online_retry_drop_dict_indexes(dict_table_t *table, bool locked) { /* Since the table has been modified, table->def_trx_id should be adjusted like ddl::drop_indexes(). However, this function may be called before the DDL transaction starts, so it is impossible to - get current DDL transaction ID. Thus advancing def_trx_id by 1 to + get current DDL transaction ID. Thus advancing def_trx_id by 2 to simply inform other threads about this change. */ - ++table->def_trx_id; + table->def_trx_id += 2; reset_column_ord_part(table); } diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index a0dfad5c871..27ea2815e1a 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -433,7 +433,14 @@ buf_block_t *buf_page_get_gen(const page_id_t &page_id, const page_size_t &page_size, ulint rw_latch, buf_block_t *guess, Page_fetch mode, ut::Location location, mtr_t *mtr, - bool dirty_with_no_latch = false); + bool dirty_with_no_latch = false, bool allow_freed = false); + +inline buf_block_t *buf_page_get_allow_freed(const page_id_t &id, const page_size_t &size, + ulint latch, ut::Location location, + mtr_t *mtr) { + return buf_page_get_gen(id, size, latch, nullptr, Page_fetch::NORMAL, + location, mtr, false, true); +} /** NOTE! The following macros should be used instead of buf_page_get_gen, to improve debugging. Only values RW_S_LATCH and RW_X_LATCH are allowed @@ -1186,7 +1193,8 @@ class buf_page_t { in_free_list(other.in_free_list), in_LRU_list(other.in_LRU_list), in_page_hash(other.in_page_hash), - in_zip_hash(other.in_zip_hash) + in_zip_hash(other.in_zip_hash), + in_scn_cleanout(other.in_scn_cleanout) #endif /* UNIV_DEBUG */ #endif /* !UNIV_HOTBACKUP */ { @@ -1711,6 +1719,9 @@ class buf_page_t { /** true if in buf_pool->zip_hash */ bool in_zip_hash; + + /** true if it's writing scn to page */ + bool in_scn_cleanout; #endif /* UNIV_DEBUG */ #endif /* !UNIV_HOTBACKUP */ @@ -1743,6 +1754,11 @@ struct alignas(alignof(uint64_t)) btr_search_prefix_info_t { } }; +#include +#include + +typedef std::unordered_map> ScnCleanoutRecs; + /** The buffer control block structure */ struct buf_block_t { /** @name General fields */ @@ -1911,6 +1927,63 @@ struct buf_block_t { new mutex in InnoDB-5.1 to relieve contention on the buffer pool mutex */ BPageMutex mutex; + /** The record position set that need to write back SCN + into record with redo log, added while reading record */ + ScnCleanoutRecs* m_scn_recs; + + /** Clear the set */ + void clear_cursor_no_mutex() { + m_scn_recs->clear(); + } + + /** Clear the set */ + void clear_cursor() { + mutex_enter(&mutex); + clear_cursor_no_mutex(); + mutex_exit(&mutex); + } + + /** Add record to set */ + void add_scn_cursor(byte *pos, trx_id_t id, trx_id_t scn) { + mutex_enter(&mutex); + m_scn_recs->insert(std::make_pair(pos, std::make_pair(id, scn))); + mutex_exit(&mutex); + } + + /** Copy records and empty it */ + void copy_scn_and_free(ScnCleanoutRecs &recs) { + mutex_enter(&mutex); + m_scn_recs->swap(recs); + m_scn_recs->clear(); + mutex_exit(&mutex); + } + + /** Check if cursor cache is empty */ + bool is_cursor_empty() { + mutex_enter(&mutex); + bool ret = m_scn_recs->empty(); + mutex_exit(&mutex); + return ret; + } + + void alloc_scn_recs() { + m_scn_recs = new ScnCleanoutRecs(); + } + + /** Free object */ + void free_scn_recs() { + mutex_enter(&mutex); + free_scn_recs_no_mutex(); + mutex_exit(&mutex); + } + + void free_scn_recs_no_mutex() { + if (m_scn_recs != nullptr) { + delete m_scn_recs; + m_scn_recs = nullptr; + } + } + /** Get the modified clock (version) value. @param[in] single_threaded Thread can only be written to or read by a single thread @@ -1934,6 +2007,10 @@ struct buf_block_t { @return page number of the current buffer block. */ page_no_t get_page_no() const { return (page.id.page_no()); } + /** Get the space id of the current buffer block. + @return space id of the current buffer block. */ + space_id_t get_space_id() const { return (page.id.space()); } + /** Get the next page number of the current buffer block. @return next page number of the current buffer block. */ page_no_t get_next_page_no() const { diff --git a/storage/innobase/include/buf0buf.ic b/storage/innobase/include/buf0buf.ic index fcbd41645d9..181f6de536f 100644 --- a/storage/innobase/include/buf0buf.ic +++ b/storage/innobase/include/buf0buf.ic @@ -748,8 +748,9 @@ inline void buf_page_t::set_oldest_lsn(lsn_t lsn) noexcept { mutex_own(buf_page_get_mutex(this))); /* To modify the page we should have matching MDL latches, that will make the result of stale guaranteed to be current. */ - ut_ad(lsn == 0 || !was_stale()); + ut_ad(lsn == 0 || !was_stale() || in_scn_cleanout); oldest_modification = lsn; + ut_d(in_scn_cleanout = false); } /** Increments the bufferfix count. diff --git a/storage/innobase/include/buf0types.h b/storage/innobase/include/buf0types.h index 6479cf53f50..a14d25c3781 100644 --- a/storage/innobase/include/buf0types.h +++ b/storage/innobase/include/buf0types.h @@ -234,6 +234,16 @@ class page_id_t { page_id_t(space_id_t space, page_no_t page_no) : m_space(space), m_page_no(page_no) {} + page_id_t(uint64_t compact_value) { + m_space = (compact_value >> 32); + m_page_no = (compact_value & 0xFFFFFFFF); + } + + /** Compact two uint32_t into one uint64_t */ + uint64_t compact_value() const { + return (((uint64_t)m_space) << 32 | m_page_no); + } + /** Retrieve the tablespace id. @return tablespace id */ inline space_id_t space() const { return (m_space); } diff --git a/storage/innobase/include/dict0dict.h b/storage/innobase/include/dict0dict.h index 8e88ab30fdf..4f5312bed20 100644 --- a/storage/innobase/include/dict0dict.h +++ b/storage/innobase/include/dict0dict.h @@ -1645,6 +1645,12 @@ static inline void dict_allocate_mem_intrinsic_cache(dict_index_t *index); @param[in] table_id table id */ bool dict_table_is_system(table_id_t table_id); +/** Get index object by table id and index id +@param[in] table_id table id +@param[in] index_id index id +@return index, NULL if does not exist */ +dict_index_t *dict_index_get_by_id(table_id_t table_id, space_index_t index_id); + /** Change the table_id of SYS_* tables if they have been created after an earlier upgrade. This will update the table_id by adding DICT_MAX_DD_TABLES */ diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h index 5a9e444714e..4c65c1401c3 100644 --- a/storage/innobase/include/dict0mem.h +++ b/storage/innobase/include/dict0mem.h @@ -1234,6 +1234,9 @@ struct dict_index_t { when InnoDB was started up */ trx_id_t trx_id; + /** SCN of related trx_id */ + trx_id_t trx_scn; + /** Information about state of compression failures and successes */ zip_pad_info_t zip_pad; diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h index bd5fbad6638..a3a23b0c34a 100644 --- a/storage/innobase/include/lock0lock.h +++ b/storage/innobase/include/lock0lock.h @@ -596,6 +596,7 @@ bool lock_clust_rec_cons_read_sees( passed over by a read cursor */ dict_index_t *index, /*!< in: clustered index */ const ulint *offsets, /*!< in: rec_get_offsets(rec, index) */ + btr_pcur_t *pcur, /*!< in: cursor of rec, or NULL */ ReadView *view); /*!< in: consistent read view */ /** Checks that a non-clustered index record is seen in a consistent read. diff --git a/storage/innobase/include/mtr0mtr.h b/storage/innobase/include/mtr0mtr.h index 8bc8d50c6fc..732c9e4359d 100644 --- a/storage/innobase/include/mtr0mtr.h +++ b/storage/innobase/include/mtr0mtr.h @@ -189,6 +189,11 @@ struct mtr_t { /** true if the mini-transaction might have modified buffer pool pages */ bool m_modifications; +#ifdef UNIV_DEBUG + /* true if the mtr modifies scn */ + bool m_modify_scn; +#endif + /** true if mtr is forced to NO_LOG mode because redo logging is disabled globally. In this case, mtr increments the global counter at ::start and must decrement it back at ::commit. */ @@ -382,6 +387,15 @@ struct mtr_t { /** Assure that there are no slots that are latching any resources. Only buffer fixing a page is allowed. */ void check_is_not_latching() const; + + void set_modify_scn(bool modify_scn) { + m_impl.m_modify_scn = modify_scn; + } + + bool modify_scn() const { + return m_impl.m_modify_scn; + } + #endif /* UNIV_DEBUG */ /** Start a mini-transaction. diff --git a/storage/innobase/include/read0read.h b/storage/innobase/include/read0read.h index e7693c93b8b..23bf9303626 100644 --- a/storage/innobase/include/read0read.h +++ b/storage/innobase/include/read0read.h @@ -40,6 +40,8 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0types.h" #include "univ.i" +#define MAX_SNAPSHOT_SIZE 256 + /** The MVCC read view manager */ class MVCC { public: @@ -55,7 +57,7 @@ class MVCC { @param view View owned by this class created for the caller. Must be freed by calling view_close() @param trx Transaction instance of caller */ - void view_open(ReadView *&view, trx_t *trx); + void view_open(ReadView *&view, trx_t *trx, bool is_shared = false); /** Close a view created by the above function. @@ -73,7 +75,7 @@ class MVCC { /** @return the number of active views */ - ulint size() const; + ulint size(); /** @return true if the view is active and valid */ @@ -94,17 +96,19 @@ class MVCC { view->creator_trx_id(id); } + void get_oldest_version(ReadView *purge_view); + private: /** Validates a read view list. */ - bool validate() const; + bool validate(uint64_t slot) const; /** Find a free view from the active list, if none found then allocate a new view. This function will also attempt to move delete marked views from the active list to the freed list. @return a view to use */ - inline ReadView *get_view(); + inline ReadView *get_view(uint64_t slot); /** Get the oldest view in the system. It will also move the delete @@ -118,14 +122,27 @@ class MVCC { MVCC &operator=(const MVCC &); private: + + void enter(uint64_t slot); + + void exit(uint64_t slot); + + uint64_t get_slot() { + return (m_slot_index++) % MAX_SNAPSHOT_SIZE; + } + + std::atomic m_slot_index; + typedef UT_LIST_BASE_NODE_T(ReadView, m_view_list) view_list_t; /** Free views ready for reuse. */ - view_list_t m_free; + view_list_t m_free[MAX_SNAPSHOT_SIZE]; /** Active and closed views, the closed views will have the creator trx id set to TRX_ID_MAX */ - view_list_t m_views; + view_list_t m_views[MAX_SNAPSHOT_SIZE]; + + ib_mutex_t m_mutexs[MAX_SNAPSHOT_SIZE]; }; #endif /* read0read_h */ diff --git a/storage/innobase/include/read0types.h b/storage/innobase/include/read0types.h index 4eaf1732aab..589a801c55e 100644 --- a/storage/innobase/include/read0types.h +++ b/storage/innobase/include/read0types.h @@ -34,6 +34,7 @@ this program; if not, write to the Free Software Foundation, Inc., #ifndef read0types_h #define read0types_h +#include #include #include "dict0mem.h" @@ -42,6 +43,8 @@ this program; if not, write to the Free Software Foundation, Inc., // Friend declaration class MVCC; +typedef std::set trx_ids_set_t; + /** Read view lists the trx ids of those transactions for which a consistent read should not see the modifications to the database. */ @@ -156,36 +159,42 @@ class ReadView { @param[in] name table name */ static void check_trx_id_sanity(trx_id_t id, const table_name_t &name); - /** Check whether the changes by id are visible. - @param[in] id transaction id to check against the view - @param[in] name table name - @return whether the view sees the modifications of id. */ - [[nodiscard]] bool changes_visible(trx_id_t id, - const table_name_t &name) const { - ut_ad(id > 0); - - if (id < m_up_limit_id || id == m_creator_trx_id) { - return (true); - } - - check_trx_id_sanity(id, name); + /** + @param id transaction to check + @return true if view sees transaction id */ + bool sees(trx_id_t id) const { return (id < m_up_limit_id); } - if (id >= m_low_limit_id) { - return (false); + /** Check whether the changes on record are visible. + @param[in] index index object + @param[in] block the block that contains record + @param[in] rec clust record + @param[in] offsets offset of the record + @return whether the view sees */ + bool changes_visible( + const dict_index_t *index, buf_block_t *block, + const rec_t *rec, const ulint *offsets); - } else if (m_ids.empty()) { - return (true); + /** + @param scn scn to check + @return true if view sees transaction scn */ + bool sees_version(trx_id_t scn) const { + if (scn == TRX_ID_MAX) return false; + if (m_committing_scns.find(scn) != m_committing_scns.end()) { + /* Being committed while opening read view, always not visible */ + return false; } - const ids_t::value_type *p = m_ids.data(); - - return (!std::binary_search(p, p + m_ids.size(), id)); + return (m_version > scn); } /** - @param id transaction to check - @return true if view sees transaction id */ - bool sees(trx_id_t id) const { return (id < m_up_limit_id); } + @return version number of the view */ + trx_id_t version() { return m_version; } + + /** Store trx pointer which create this read view */ + void set_trx(trx_t *trx) { + m_trx = trx; + } /** Mark the view as closed */ @@ -198,6 +207,14 @@ class ReadView { @return true if the view is closed */ bool is_closed() const { return (m_closed); } + uint64_t get_slot() { + return m_slot; + } + + void set_slot(uint64_t slot) { + m_slot = slot; + } + /** Write the limits to the file. @param file file to write to */ @@ -217,6 +234,10 @@ class ReadView { ut_d(m_view_low_limit_no = m_low_limit_no); m_low_limit_no = trx_no; } + + if (m_low_limit_no < m_version) { + m_version = m_low_limit_no; + } } /** @@ -255,14 +276,8 @@ class ReadView { inline void prepare(trx_id_t id); /** - Copy state from another view. Must call copy_complete() to finish. - @param other view to copy from */ - inline void copy_prepare(const ReadView &other); - - /** - Complete the copy, insert the creator transaction id into the - m_trx_ids too and adjust the m_up_limit_id *, if required */ - inline void copy_complete(); + Copy state from another view. Must call copy_complete() to finish. */ + inline void copy_prepare(trx_id_t version, trx_id_t low_id); /** Set the creator transaction id, existing id must be 0 */ @@ -296,6 +311,12 @@ class ReadView { was taken */ ids_t m_ids; + /** SCN set that are being committed but not finished yet */ + trx_ids_set_t m_committing_scns; + + /** IDs set that are being committed but not finished yet */ + trx_ids_set_t m_committing_ids; + /** The view does not need to see the undo logs for transactions whose transaction number is strictly smaller (<) than this value: they can be removed in purge if not needed by other views */ @@ -309,9 +330,22 @@ class ReadView { trx_id_t m_view_low_limit_no; #endif /* UNIV_DEBUG */ + /** The transaction that opens this read view */ + trx_t *m_trx; + + /** Version of the snapshot */ + trx_id_t m_version; + + /** The array index of mvcc */ + uint64_t m_slot; + /** AC-NL-RO transaction view that has been "closed". */ bool m_closed; + /** Read view is shared by multiple threads such as + SELECT COUNT(*) */ + bool m_shared; + typedef UT_LIST_NODE_T(ReadView) node_t; /** List of read views in trx_sys */ @@ -319,4 +353,390 @@ class ReadView { node_t m_view_list; }; +/** A simple spin read-write lock implemention */ +const int32_t MAX_LOCK_WORD = 163840; //max concurrency +class SpinLock { +public: + SpinLock() { + m_lock_word = MAX_LOCK_WORD; + } + + ~SpinLock() {} + + bool try_read_lock() { + if (m_lock_word.load(std::memory_order_relaxed) <= 0) { + /* Someone is modifying, can't read */ + return false; + } + + uint32_t old_val = m_lock_word.fetch_sub(1); + if (old_val <= 0) { + m_lock_word.fetch_add(1); + + return false; + } + + return true; + } + + void read_lock() { + while (!try_read_lock()) { + ut_delay(1); + } + } + + void read_unlock() { + m_lock_word.fetch_add(1); + } + + bool try_write_lock() { + if (m_lock_word.load(std::memory_order_relaxed) != MAX_LOCK_WORD) { + /* Someone is reading, can't modify */ + return false; + } + + if (m_lock_word.fetch_sub(MAX_LOCK_WORD) != MAX_LOCK_WORD) { + m_lock_word.fetch_add(MAX_LOCK_WORD); + return false; + } + + return true; + } + + void write_lock() { + m_lock_word.fetch_sub(MAX_LOCK_WORD); + + while (m_lock_word.load() != 0) { + m_lock_word.fetch_add(MAX_LOCK_WORD); + ut_delay(1); + m_lock_word.fetch_sub(MAX_LOCK_WORD); + } + } + + void write_unlock() { + m_lock_word.fetch_add(MAX_LOCK_WORD); + } +private: + std::atomic m_lock_word; +}; + +#define SCN_MAP_MAX_SIZE (1 * 1024 * 1024) + +#define LF_ARRAY_MAX_SIZE 16384 + +/** Multiple producer-one consumer lock free array */ +class LF_Array { + public: + LF_Array(uint64_t size) { + m_size = size; + m_array = new std::atomic [m_size]; + m_free_index = 0; + m_consume_index = 0; + + init(); + } + + ~LF_Array() { + delete [] m_array; + } + + void init() { + for (uint64_t i = 0; i < m_size; i++) { + m_array[i] = 0; + } + } + + /* Multiple-producer */ + bool add(uint64_t value) { + int count = 0; + while (count++ < 10) { + uint64_t free_index = m_free_index.load(); + uint64_t comsume_index = m_consume_index.load(); + + if (free_index - comsume_index == m_size) { + /* array is full, return directly */ + return false; + } + + if (!m_free_index.compare_exchange_weak(free_index, free_index + 1)) { + continue; + } + + m_array[free_index % m_size] = value; + + return true; + } + + return false; + } + + /* One-consumer, + return value or 0 of empty */ + uint64_t get() { + uint64_t consume_index = m_consume_index.load(); + if (consume_index == m_free_index.load()) { + /*empty */ + return 0; + } + + auto idx = consume_index % m_size; + uint64_t value = m_array[idx].load(); + if (value == 0) { + /* Being added by producer, try again later */ + return 0; + } + + /* reset to zero */ + m_array[idx] = 0; + + m_consume_index++; + + return value; + } + + private: + uint64_t m_size; + + std::atomic *m_array; + + std::atomic m_consume_index; + + std::atomic m_free_index; +}; + +/** A map used to store mapping of trx id to scn. */ +class Scn_Map { + public: + Scn_Map(); + ~Scn_Map(); + + struct Elem { + public: + Elem() { + m_id = 0; + m_scn = 0; + new (&m_lock) SpinLock(); + } + + bool store(trx_id_t id, trx_id_t scn) { + if (!m_lock.try_write_lock()) { + return false; + } + + /* Now safe to store */ + m_id = id; + m_scn = scn; + + m_lock.write_unlock(); + + return true; + } + + trx_id_t read(trx_id_t id) { + trx_id_t ret = 0; + if (m_id != id) { + /* quick checking */ + return ret; + } + + if (!m_lock.try_read_lock()) { + return ret; + } + + if (id != m_id) { + m_lock.read_unlock(); + return ret; + } + + ret = m_scn; + + m_lock.read_unlock(); + return ret; + } + + SpinLock m_lock; + trx_id_t m_id; + trx_id_t m_scn; + }; + + inline bool store(trx_id_t id, trx_id_t scn) { + return m_elems[(id/2) % m_size].store(id, scn); + } + + inline trx_id_t read(trx_id_t id) { + return m_elems[(id/2) % m_size].read(id); + } + +private: + struct Elem* m_elems; + uint64_t m_size; +}; + +/** Handler of SCN Manager */ +class SCN_Mgr { +public: + + /** Constructer */ + SCN_Mgr(); + + /** Destructor */ + ~SCN_Mgr(); + + class CleanoutWorker { + public: + CleanoutWorker(); + ~CleanoutWorker(); + + void add_page(uint64_t compact_page_id); + + void take_pages(PageSets &pages); + + uint64_t reset() { + return os_event_reset(m_event); + } + + void wait(uint64_t sig_count) { + os_event_wait_time_low(m_event, std::chrono::seconds{1}, sig_count); + } + + void wakeup() { + os_event_set(m_event); + } + + private: + LF_Array* m_pages; + + ib_mutex_t m_mutex; + + os_event_t m_event; + }; + + void init(); + + trx_id_t startup_scn() { + return m_startup_scn; + } + + /** + @return true if it's SCN number */ + static bool is_scn(trx_id_t id) { + return (id % 2 != 0); + } + + /** Store scn of the transaction for fast lookup + @param[in] id transaction id + @param[in] scn transaction no while committing + @return true if success */ + bool store_scn(trx_id_t id, trx_id_t scn) { + return m_scn_map->store(id, scn); + } + + /** Quickly lookup scn of relative transaction id + @param[in] id transaction id + @return TRX_ID_MAX if still active, or 0 if not found, or scn value */ + trx_id_t get_scn_fast(trx_id_t id, trx_id_t *version = nullptr); + + /** Get SCN with relative transaction id + @param[in] id transaction id + @param[in] index index object where roll_ptr resides on + @param[in] roll_ptr rollback pointer of clust record */ + trx_id_t get_scn(trx_id_t id, const dict_index_t *index, roll_ptr_t roll_ptr, trx_id_t *version = nullptr); + + /** Write SCN to clust record + @param[in] current_id trx id of current session + @param[in] mtr mini transaction + @param[in] rec target record + @param[in] index index object + @param[in] offsets offset array for the record + */ + void set_scn(trx_id_t current_id, mtr_t *mtr, rec_t *rec, dict_index_t *index, const ulint *offsets); + + /** Add cursor to background set which needs to be cleanout + @param[in] trx the transaction that reads scn or null + @param[in] block the block object where record resides in + @param[in] pos the start position where scn should be written to + @param[in] id transaction id + @param[in] scn scn of id */ + void add_scn_cursor(trx_t *trx, buf_block_t *block, byte *pos, trx_id_t id, trx_id_t scn); + + /** Get offset where scn is stored + @param[in] index index object + @param[in] offsets offset array of the clust record + @return offset where scn is stored */ + ulint scn_offset(const dict_index_t *index, const ulint *offsets); + + /** Start background threads */ + void start(); + + /** Stop background threads */ + void stop(); + + /** Write scn back to one block + @param[in] block the block object to be cleanout + @param[in] mtr mini transaction */ + void batch_write_scn(buf_block_t *block, mtr_t *mtr); + + /** Background thread for writing back SCN */ + void cleanout_task(); + + /* Periodically generate safe up limit id for taking snapshot */ + void view_task(); + + /**@return min active transaction id. This is not + an accurate number */ + trx_id_t min_active_id() { + return m_min_active_id.load(std::memory_order_relaxed); + } + + uint64_t cleanout_records() { + return m_cleanout_records.load(); + } +private: + + /** Set scn to specified position + @param[in] mtr mini transaction + @param[in] ptr position where scn is written to + @param[in] id transaction id + @param[in] scn scn number of transaction */ + void set_scn(mtr_t *mtr, byte *ptr, trx_id_t id, trx_id_t scn); + + /** Storing trx id->scn mapping */ + Scn_Map *m_scn_map; + + /** Storing trx id->scn mapping to avoid duplicate + looking up */ + Scn_Map *m_random_map; + + CleanoutWorker **m_cleanout_workers; + + /** Number of threads for cleanout */ + uint32_t m_max_cleanout_threads; + + /** up transaction id on startup */ + trx_id_t m_startup_id; + + /** SCN number taken on startup */ + trx_id_t m_startup_scn; + + /** thread event */ + os_event_t m_view_event; + + /** Min active transaction id */ + std::atomic m_min_active_id; + + /** Flag to tell if background threads should stop or not */ + std::atomic m_abort; + + /** True if cleanout thread is active */ + std::atomic m_cleanout_threads; + + /** True if thread is active */ + std::atomic m_view_active; + + /** Counter of cleanout records */ + std::atomic m_cleanout_records; +}; + +extern SCN_Mgr *scn_mgr; + #endif diff --git a/storage/innobase/include/row0pread.h b/storage/innobase/include/row0pread.h index 6cecee578e2..c1667d56411 100644 --- a/storage/innobase/include/row0pread.h +++ b/storage/innobase/include/row0pread.h @@ -606,7 +606,7 @@ class Parallel_reader::Scan_ctx { built from the undo log. @param[in,out] mtr Mini-transaction covering the read. @return true if row is visible to the transaction. */ - [[nodiscard]] bool check_visibility(const rec_t *&rec, ulint *&offsets, + [[nodiscard]] bool check_visibility(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr); /** Create an execution context for a range and add it to @@ -726,9 +726,9 @@ class Parallel_reader::Ctx { built from the undo log. @param[in,out] mtr Mini-transaction covering the read. @return true if row is visible to the transaction. */ - bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, + bool is_rec_visible(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { - return m_scan_ctx->check_visibility(rec, offsets, heap, mtr); + return m_scan_ctx->check_visibility(block, rec, offsets, heap, mtr); } private: diff --git a/storage/innobase/include/row0purge.h b/storage/innobase/include/row0purge.h index b8fe1e7440e..58a5b59541d 100644 --- a/storage/innobase/include/row0purge.h +++ b/storage/innobase/include/row0purge.h @@ -101,6 +101,9 @@ struct purge_node_t { /** Trx that created this undo record */ trx_id_t modifier_trx_id; + + /** Trx no of modifier_trx_id */ + trx_id_t modifier_trx_no; }; using Recs = std::list>; diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index b6bf10810e0..331260f427f 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -777,6 +777,8 @@ extern bool srv_cmp_per_index_enabled; extern bool srv_redo_log; +extern uint32_t srv_cleanout_threads; + /** Status variables to be passed to MySQL */ extern struct export_var_t export_vars; @@ -1212,6 +1214,7 @@ struct export_var_t { the dba created explicitly. */ ulint innodb_undo_tablespaces_active; /*!< number of active undo tablespaces */ + ulint innodb_scn_cleanout_records; /*!< number of records that have scn written back */ #ifdef UNIV_DEBUG ulint innodb_purge_trx_id_age; /*!< rw_max_trx_no - purged trx_no */ ulint innodb_purge_view_trx_id_age; /*!< rw_max_trx_no diff --git a/storage/innobase/include/sync0sync.h b/storage/innobase/include/sync0sync.h index 62ef8af2245..aad699ddfb5 100644 --- a/storage/innobase/include/sync0sync.h +++ b/storage/innobase/include/sync0sync.h @@ -155,6 +155,7 @@ extern mysql_pfs_key_t srv_monitor_file_mutex_key; extern mysql_pfs_key_t sync_thread_mutex_key; #endif /* UNIV_DEBUG */ extern mysql_pfs_key_t trx_undo_mutex_key; +extern mysql_pfs_key_t trx_scn_mutex_key; extern mysql_pfs_key_t trx_mutex_key; extern mysql_pfs_key_t trx_pool_mutex_key; extern mysql_pfs_key_t trx_pool_manager_mutex_key; @@ -165,6 +166,7 @@ extern mysql_pfs_key_t lock_wait_mutex_key; extern mysql_pfs_key_t trx_sys_mutex_key; extern mysql_pfs_key_t trx_sys_shard_mutex_key; extern mysql_pfs_key_t trx_sys_serialisation_mutex_key; +extern mysql_pfs_key_t trx_sys_mvcc_mutex_key; extern mysql_pfs_key_t srv_sys_mutex_key; extern mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/storage/innobase/include/sync0types.h b/storage/innobase/include/sync0types.h index c7804f99618..51b97775753 100644 --- a/storage/innobase/include/sync0types.h +++ b/storage/innobase/include/sync0types.h @@ -414,6 +414,7 @@ enum latch_id_t { LATCH_ID_SRV_MONITOR_FILE, LATCH_ID_SYNC_THREAD, LATCH_ID_TRX_UNDO, + LATCH_ID_TRX_SCN, LATCH_ID_TRX_POOL, LATCH_ID_TRX_POOL_MANAGER, LATCH_ID_TEMP_POOL_MANAGER, @@ -421,6 +422,7 @@ enum latch_id_t { LATCH_ID_TRX_SYS, LATCH_ID_TRX_SYS_SHARD, LATCH_ID_TRX_SYS_SERIALISATION, + LATCH_ID_TRX_SYS_MVCC, LATCH_ID_SRV_SYS, LATCH_ID_SRV_SYS_TASKS, LATCH_ID_PAGE_ZIP_STAT_PER_INDEX, diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index 002b4b9e54f..e2a185b414f 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -81,6 +81,7 @@ void trx_purge_add_update_undo_to_history( bool update_rseg_history_len, /*!< in: if true: update rseg history len else skip updating it. */ + bool is_insert, /*!< in: true if it's insert undo */ ulint n_added_logs, /*!< in: number of logs added */ mtr_t *mtr); /*!< in: mtr */ @@ -134,6 +135,9 @@ struct purge_iter_t { /** The transaction that created the undo log record, the Modifier trx id */ trx_id_t modifier_trx_id; + + /** Commit no of transaction with modifier_trx_id */ + trx_id_t modifier_trx_no; }; /* Namespace to hold all the related functions and variables needed @@ -1017,6 +1021,9 @@ struct trx_purge_t { /** The purge will not remove undo logs which are >= this view (purge view) */ ReadView view; + /** The scn of purge system before which undo can be purged */ + std::atomic version; + /** Count of total tasks submitted to the task queue */ ulint n_submitted; diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h index 96d5e0f43c0..03ddcd5e408 100644 --- a/storage/innobase/include/trx0rec.h +++ b/storage/innobase/include/trx0rec.h @@ -58,6 +58,11 @@ static inline trx_undo_rec_t *trx_undo_rec_copy(const page_t *undo_page, uint32_t undo_offset, mem_heap_t *heap); +/** Get start position of undo record +@param[in] undo_rec undo log record +@return start position of undo record */ +static inline ulint trx_undo_start_offset(const trx_undo_rec_t *undo_rec); + /** Reads the undo log record type. @return record type */ static inline ulint trx_undo_rec_get_type( @@ -318,6 +323,8 @@ constexpr uint32_t TRX_UNDO_UPD_EXTERN = 128; constexpr uint32_t TRX_UNDO_INSERT_OP = 1; constexpr uint32_t TRX_UNDO_MODIFY_OP = 2; +#define TRX_UNDO_NEW_VERSION_TAG 0x00 + /** The type and compilation info flag in the undo record for update. For easier understanding let the 8 bits be numbered as 7, 6, 5, 4, 3, 2, 1, 0. */ @@ -372,6 +379,24 @@ const byte *trx_undo_rec_get_pars( table_id_t *table_id, /*!< out: table id */ type_cmpl_t &type_cmpl); /*!< out: type compilation info. */ +bool trx_undo_rec_get_hdr( + trx_id_t id, + trx_undo_rec_t *undo_rec, + page_no_t &undo_hdr_no, + uint32_t &offset); + +trx_id_t trx_undo_hdr_get_scn( + trx_id_t trx_id, + page_id_t &page_id, + uint32_t offset, + mtr_t *mtr, + page_t *undo_page); + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id); + /** Get the max free space of undo log by assuming it's a fresh new page and the free space doesn't count for the undo log header too. */ size_t trx_undo_max_free_space(); diff --git a/storage/innobase/include/trx0rec.ic b/storage/innobase/include/trx0rec.ic index 87a43ce4443..4d85a12144a 100644 --- a/storage/innobase/include/trx0rec.ic +++ b/storage/innobase/include/trx0rec.ic @@ -32,12 +32,21 @@ this program; if not, write to the Free Software Foundation, Inc., *******************************************************/ #ifndef UNIV_HOTBACKUP +static inline ulint trx_undo_start_offset(const trx_undo_rec_t *undo_rec) { + const byte *ptr = undo_rec + 2; + if (*ptr == TRX_UNDO_NEW_VERSION_TAG) { + return 11; + } else { + return 2; + } +} + /** Reads from an undo log record the record type. @return record type */ static inline ulint trx_undo_rec_get_type( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - return (mach_read_from_1(undo_rec + 2) & (TRX_UNDO_CMPL_INFO_MULT - 1)); + return (mach_read_from_1(undo_rec + trx_undo_start_offset(undo_rec)) & (TRX_UNDO_CMPL_INFO_MULT - 1)); } /** Reads from an undo log record the record compiler info. @@ -45,7 +54,7 @@ static inline ulint trx_undo_rec_get_type( static inline ulint trx_undo_rec_get_cmpl_info( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - return (mach_read_from_1(undo_rec + 2) / TRX_UNDO_CMPL_INFO_MULT); + return (mach_read_from_1(undo_rec + trx_undo_start_offset(undo_rec)) / TRX_UNDO_CMPL_INFO_MULT); } /** Returns true if an undo log record contains an extern storage field. @@ -53,7 +62,7 @@ static inline ulint trx_undo_rec_get_cmpl_info( static inline bool trx_undo_rec_get_extern_storage( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - if (mach_read_from_1(undo_rec + 2) & TRX_UNDO_UPD_EXTERN) { + if (mach_read_from_1(undo_rec + trx_undo_start_offset(undo_rec)) & TRX_UNDO_UPD_EXTERN) { return true; } @@ -65,7 +74,7 @@ static inline bool trx_undo_rec_get_extern_storage( static inline undo_no_t trx_undo_rec_get_undo_no( const trx_undo_rec_t *undo_rec) /*!< in: undo log record */ { - const byte *ptr = undo_rec + 2; + const byte *ptr = undo_rec + trx_undo_start_offset(undo_rec); uint8_t type_cmpl = mach_read_from_1(ptr); const bool blob_undo = type_cmpl & TRX_UNDO_MODIFY_BLOB; @@ -73,9 +82,9 @@ static inline undo_no_t trx_undo_rec_get_undo_no( if (blob_undo) { /* The next record offset takes 2 bytes + 1 byte for type_cmpl flag + 1 byte for the new flag. Total 4 bytes.*/ - ptr = undo_rec + 4; + ptr += 2; } else { - ptr = undo_rec + 3; + ptr += 1; } return (mach_u64_read_much_compressed(ptr)); } diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h index cf91a7919bb..0d806537943 100644 --- a/storage/innobase/include/trx0sys.h +++ b/storage/innobase/include/trx0sys.h @@ -138,11 +138,6 @@ the trx_sys_serialisation_mutex must be acquired. @return new, allocated trx no */ inline trx_id_t trx_sys_allocate_trx_no(); -/** Retrieves a next value that will be allocated if trx_sys_allocate_trx_id() -or trx_sys_allocate_trx_id_trx_no() was called. -@return the next trx->id or trx->no that will be allocated */ -inline trx_id_t trx_sys_get_next_trx_id_or_no(); - #ifdef UNIV_DEBUG /* Flag to control TRX_RSEG_N_SLOTS behavior debugging. */ extern uint trx_rseg_n_slots_debug; @@ -492,7 +487,9 @@ struct trx_sys_t { trx_sys_t::serialisation_mutex. Note: it might be in parallel used for both trx->id and trx->no assignments (for different trx_t objects). */ - std::atomic next_trx_id_or_no; + std::atomic next_trx_id; + + std::atomic next_trx_scn; /** @} */ @@ -578,6 +575,22 @@ struct trx_sys_t { }, loc); } + + trx_id_t get_next_trx_id() { + return next_trx_id.fetch_add(2); + } + + trx_id_t get_max_trx_id() { + return next_trx_id.load(); + } + + trx_id_t get_next_trx_scn() { + return next_trx_scn.fetch_add(2); + } + + trx_id_t get_max_trx_scn() { + return next_trx_scn.load(); + } }; #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/include/trx0sys.ic b/storage/innobase/include/trx0sys.ic index d23e51b28c5..b80c3dc6690 100644 --- a/storage/innobase/include/trx0sys.ic +++ b/storage/innobase/include/trx0sys.ic @@ -210,7 +210,11 @@ static inline trx_t *trx_rw_is_active(trx_id_t trx_id, bool do_ref_count) { was set to larger than trx_id, so we decide to return nullptr, even though, if we were to repeat the call in just a moment we might get a different result if the trx_sys_rw_trx_add() for the trx_id happens meanwhile. */ - ut_ad(trx_id < trx_sys_get_next_trx_id_or_no()); + if (SCN_Mgr::is_scn(trx_id)) { + return nullptr; + } + + ut_ad(trx_id < trx_sys->get_max_trx_id()); auto &shard = trx_sys->get_shard_by_trx_id(trx_id); if (trx_id < shard.active_rw_trxs.peek().min_id()) { return nullptr; @@ -233,12 +237,10 @@ inline trx_id_t trx_sys_get_trx_id_write_margin() { /** Allocates a new transaction id or transaction number. @return new, allocated trx id or trx no */ -inline trx_id_t trx_sys_allocate_trx_id_or_no() { +inline void trx_sys_flush_trx_id_or_no(trx_id_t id) { ut_ad(trx_sys_mutex_own() || trx_sys_serialisation_mutex_own()); - trx_id_t trx_id = trx_sys->next_trx_id_or_no.fetch_add(1); - - if (trx_id % trx_sys_get_trx_id_write_margin() == 0) { + if (id % trx_sys_get_trx_id_write_margin() == 0) { /* Reserve the next range of trx_id values. This thread has acquired either the trx_sys_mutex or the trx_sys_serialisation_mutex. @@ -252,17 +254,20 @@ inline trx_id_t trx_sys_allocate_trx_id_or_no() { trx_sys_write_max_trx_id(); } - return trx_id; } inline trx_id_t trx_sys_allocate_trx_id() { ut_ad(trx_sys_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + trx_id_t id = trx_sys->get_next_trx_id(); + trx_sys_flush_trx_id_or_no(id); + return id; } inline trx_id_t trx_sys_allocate_trx_no() { ut_ad(trx_sys_serialisation_mutex_own()); - return trx_sys_allocate_trx_id_or_no(); + trx_id_t scn = trx_sys->get_next_trx_scn(); + trx_sys_flush_trx_id_or_no(scn + 1); + return scn; } /** Reads trx->no up to which all transactions have been serialised. @@ -271,10 +276,6 @@ static inline trx_id_t trx_get_serialisation_min_trx_no(void) { return (trx_sys->serialisation_min_trx_no.load()); } -inline trx_id_t trx_sys_get_next_trx_id_or_no() { - return trx_sys->next_trx_id_or_no.load(); -} - /** Determine if there are incomplete transactions in the system. @return whether incomplete transactions need rollback */ static inline bool trx_sys_need_rollback() { diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 1d4958f7ab3..ab2ccafebf5 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -55,6 +55,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "read0read.h" #include "sql/handler.h" // Xa_state_list #include "srv0srv.h" +#include +#include +#include /* std::vector to store the trx id & table id of tables that needs to be * rollbacked. We take SHARED MDL on these tables inside @@ -211,7 +214,7 @@ void trx_mark_sql_stat_end(trx_t *trx); /*!< in: trx handle */ /** Assigns a read view for a consistent read query. All the consistent reads within the same transaction will get the same read view, which is created when this function is first called for a new started transaction. */ -ReadView *trx_assign_read_view(trx_t *trx); /*!< in: active transaction */ +ReadView *trx_assign_read_view(trx_t *trx, bool is_shared = false); /*!< in: active transaction */ /** @return the transaction's read view or NULL if one not assigned. */ static inline ReadView *trx_get_read_view(trx_t *trx); @@ -681,6 +684,9 @@ enum trx_rseg_type_t { TRX_RSEG_TYPE_NOREDO /*!< non-redo rollback segment. */ }; +typedef std::vector> SCNIndexIds; +#define TRX_CLEAN_PAGE_SET_THRESHOLD 64 + struct trx_t { enum isolation_level_t { @@ -714,6 +720,9 @@ struct trx_t { `lock`, which are protected by lock_sys latches) */ mutable TrxMutex mutex; + /** Mutex protecting allocating && assign trx scn */ + TrxMutex scn_mutex; + /* Note: in_depth was split from in_innodb for fixing a RO performance issue. Acquiring the trx_t::mutex for each row costs ~3% in performance. It is not required for correctness. @@ -1111,6 +1120,13 @@ struct trx_t { #endif /* UNIV_DEBUG */ ulint magic_n; + /** Index ids set that need to set its SCN numnber */ + SCNIndexIds scn_indexs; + + void add_scn_index(table_id_t table_id, space_index_t index_id) { + scn_indexs.push_back(std::make_pair(table_id, index_id)); + } + bool is_read_uncommitted() const { return (isolation_level == READ_UNCOMMITTED); } diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h index dbb41f9c128..ac98733ec33 100644 --- a/storage/innobase/include/trx0types.h +++ b/storage/innobase/include/trx0types.h @@ -44,6 +44,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include #include #include +#include /** printf(3) format used for printing DB_TRX_ID and other system fields */ #define TRX_ID_FMT IB_ID_FMT @@ -602,5 +603,7 @@ struct TrxVersion { uint64_t m_version; }; +typedef std::unordered_set PageSets; + typedef std::vector> hit_list_t; #endif /* trx0types_h */ diff --git a/storage/innobase/include/trx0undo.h b/storage/innobase/include/trx0undo.h index 2cd648a12e1..0820e04f44e 100644 --- a/storage/innobase/include/trx0undo.h +++ b/storage/innobase/include/trx0undo.h @@ -152,6 +152,7 @@ trx_undo_rec_t *trx_undo_get_next_rec( @param[in,out] mtr Mini-transaction @return undo log record, the page latched, NULL if none */ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, + trx_id_t *modifier_trx_no, space_id_t space, const page_size_t &page_size, page_no_t page_no, ulint offset, @@ -227,7 +228,8 @@ ulint trx_undo_lists_init( @return undo log segment header page, x-latched */ page_t *trx_undo_set_state_at_finish( trx_undo_t *undo, /*!< in: undo log memory copy */ - mtr_t *mtr); /*!< in: mtr */ + mtr_t *mtr, /*!< in: mtr */ + bool is_temp = false); /*!< in: true if it's tmp undo */ /** Set the state of the undo log segment at a XA PREPARE or XA ROLLBACK. @param[in,out] trx Transaction @@ -257,7 +259,7 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr); /** Frees an insert undo log after a transaction commit or rollback. @@ -388,6 +390,7 @@ struct trx_undo_t { field */ trx_id_t trx_id; /*!< id of the trx assigned to the undo log */ + trx_id_t trx_no; /*!< commit no of trx with trx_id */ XID xid; /*!< X/Open XA transaction identification */ ulint flag; /*!< flag for current transaction XID and GTID. diff --git a/storage/innobase/include/trx0undo.ic b/storage/innobase/include/trx0undo.ic index 05bb932c4d4..1c91836009e 100644 --- a/storage/innobase/include/trx0undo.ic +++ b/storage/innobase/include/trx0undo.ic @@ -139,7 +139,12 @@ static inline page_t *trx_undo_page_get_s_latched(const page_id_t &page_id, const page_size_t &page_size, mtr_t *mtr) { buf_block_t *block = - buf_page_get(page_id, page_size, RW_S_LATCH, UT_LOCATION_HERE, mtr); + buf_page_get_allow_freed(page_id, page_size, RW_S_LATCH, UT_LOCATION_HERE, mtr); + + if (block == nullptr) { + return nullptr; + } + buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE); return (buf_block_get_frame(block)); diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 451f39c5c6a..6266662092c 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -214,7 +214,13 @@ bool lock_check_trx_id_sanity(trx_id_t trx_id, const rec_t *rec, const dict_index_t *index, const ulint *offsets) { ut_ad(rec_offs_validate(rec, index, offsets)); - trx_id_t next_trx_id = trx_sys_get_next_trx_id_or_no(); + trx_id_t next_trx_id; + if (SCN_Mgr::is_scn(trx_id)) { + next_trx_id = trx_sys->get_max_trx_scn(); + } else { + next_trx_id = trx_sys->get_max_trx_id(); + } + bool is_ok = trx_id < next_trx_id; if (!is_ok) { @@ -232,6 +238,7 @@ bool lock_clust_rec_cons_read_sees( passed over by a read cursor */ dict_index_t *index, /*!< in: clustered index */ const ulint *offsets, /*!< in: rec_get_offsets(rec, index) */ + btr_pcur_t *pcur, /*!< in: cursor of rec, or NULL */ ReadView *view) /*!< in: consistent read view */ { ut_ad(index->is_clustered()); @@ -247,12 +254,7 @@ bool lock_clust_rec_cons_read_sees( return (true); } - /* NOTE that we call this function while holding the search - system latch. */ - - trx_id_t trx_id = row_get_rec_trx_id(rec, index, offsets); - - return (view->changes_visible(trx_id, index->table->name)); + return (view->changes_visible(index, pcur != nullptr ? pcur->get_block() : nullptr, rec, offsets)); } /** Checks that a non-clustered index record is seen in a consistent read. @@ -4641,7 +4643,7 @@ void lock_print_info_summary(FILE *file) { file); fprintf(file, "Trx id counter " TRX_ID_FMT "\n", - trx_sys_get_next_trx_id_or_no()); + trx_sys->get_max_trx_id()); fprintf(file, "Purge done for trx's n:o < " TRX_ID_FMT " undo n:o < " TRX_ID_FMT diff --git a/storage/innobase/mtr/mtr0log.cc b/storage/innobase/mtr/mtr0log.cc index 51d9d75e25d..02d38375e29 100644 --- a/storage/innobase/mtr/mtr0log.cc +++ b/storage/innobase/mtr/mtr0log.cc @@ -383,10 +383,6 @@ const byte *mlog_parse_string( ulint offset; ulint len; - ut_a(!page || !page_zip || - (fil_page_get_type(page) != FIL_PAGE_INDEX && - fil_page_get_type(page) != FIL_PAGE_RTREE)); - if (end_ptr < ptr + 4) { return (nullptr); } @@ -406,6 +402,10 @@ const byte *mlog_parse_string( return (nullptr); } + ut_a(!page || !page_zip || len == 6 || + (fil_page_get_type(page) != FIL_PAGE_INDEX && + fil_page_get_type(page) != FIL_PAGE_RTREE)); + if (page) { if (page_zip) { memcpy(((page_zip_des_t *)page_zip)->data + offset, ptr, len); diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc index e5811ee28e9..05a9859a2b7 100644 --- a/storage/innobase/mtr/mtr0mtr.cc +++ b/storage/innobase/mtr/mtr0mtr.cc @@ -224,6 +224,10 @@ bool mtr_t::conflicts_with(const mtr_t *mtr2) const { Mtr_memo_contains check(mtr2, MTR_MEMO_MODIFY); Iterate iterator(check); + if (mtr2->modify_scn()) { + return false; + } + bool conflict = !m_impl.m_memo.for_each_block_in_reverse(iterator); if (conflict) { print_memos(std::cout); @@ -598,6 +602,7 @@ void mtr_t::start(bool sync) { reusing MTR without committing or destructing it. */ ut_a(res.second); m_restart_count++; + m_impl.m_modify_scn = false; #endif /* UNIV_DEBUG */ } diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index 01f379b96fe..e095a8aa3c2 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -2381,7 +2381,9 @@ void page_cur_delete_rec( part of the buffer pool. */ if (mtr != nullptr) { - buf_block_modify_clock_inc(page_cur_get_block(cursor)); + buf_block_t *block = page_cur_get_block(cursor); + buf_block_modify_clock_inc(block); + block->clear_cursor(); } /* 2. Find the next and the previous record. Note that the cursor is diff --git a/storage/innobase/page/page0page.cc b/storage/innobase/page/page0page.cc index 60f64c0a7e2..bb18715c0d0 100644 --- a/storage/innobase/page/page0page.cc +++ b/storage/innobase/page/page0page.cc @@ -317,6 +317,8 @@ page_t *page_create_low(buf_block_t *block, ulint comp, page_type_t page_type) { buf_block_modify_clock_inc(block); + block->clear_cursor(); + page = buf_block_get_frame(block); ut_ad(page_type == FIL_PAGE_INDEX || page_type == FIL_PAGE_RTREE || @@ -432,6 +434,7 @@ void page_create_empty(buf_block_t *block, dict_index_t *index, mtr_t *mtr) { page_zip_des_t *page_zip = buf_block_get_page_zip(block); ut_ad(fil_page_index_page_check(page)); + block->clear_cursor(); /* Multiple transactions cannot simultaneously operate on the same temp-table in parallel. @@ -941,6 +944,7 @@ void page_delete_rec_list_end( frame modify clock */ buf_block_modify_clock_inc(block); + block->clear_cursor(); page_delete_rec_list_write_log(rec, index, MLOG_LIST_END_DELETE, mtr); @@ -2174,7 +2178,7 @@ bool page_validate(const page_t *page, dict_index_t *index, trx_id_t max_trx_id = page_get_max_trx_id(page); /* This will be 0 during recv_apply_hashed_log_recs(), because the transaction system has not been initialized yet */ - trx_id_t sys_next_trx_id_or_no = trx_sys_get_next_trx_id_or_no(); + trx_id_t sys_next_trx_id_or_no = trx_sys->get_max_trx_id(); if (max_trx_id == 0 || (sys_next_trx_id_or_no != 0 && max_trx_id >= sys_next_trx_id_or_no)) { diff --git a/storage/innobase/page/page0zip.cc b/storage/innobase/page/page0zip.cc index 49b0252fe7a..94fc799c01f 100644 --- a/storage/innobase/page/page0zip.cc +++ b/storage/innobase/page/page0zip.cc @@ -2124,7 +2124,9 @@ void page_zip_write_trx_id_and_roll_ptr(page_zip_des_t *page_zip, byte *rec, rec_get_nth_field(nullptr, rec, offsets, trx_id_col + 1, &len)); ut_ad(len == DATA_ROLL_PTR_LEN); #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG - ut_a(!memcmp(storage, field, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)); + ut_a(!memcmp(storage + DATA_TRX_ID_LEN, + field + DATA_TRX_ID_LEN, + DATA_ROLL_PTR_LEN)); #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */ mach_write_to_6(field, trx_id); mach_write_to_7(field + DATA_TRX_ID_LEN, roll_ptr); diff --git a/storage/innobase/read/read0read.cc b/storage/innobase/read/read0read.cc index a918b11b06f..b5a7943da9e 100644 --- a/storage/innobase/read/read0read.cc +++ b/storage/innobase/read/read0read.cc @@ -36,6 +36,9 @@ this program; if not, write to the Free Software Foundation, Inc., #include "srv0srv.h" #include "trx0sys.h" +#include "trx0rec.h" +#include "trx0rseg.h" +#include "row0row.h" /* ------------------------------------------------------------------------------- @@ -182,6 +185,7 @@ will mark their views as closed but not actually free their views. /** Minimum number of elements to reserve in ReadView::ids_t */ static const ulint MIN_TRX_IDS = 32; +SCN_Mgr *scn_mgr = nullptr; #ifdef UNIV_DEBUG /** Functor to validate the view list. */ @@ -200,12 +204,12 @@ struct ViewCheck { /** Validates a read view list. */ -bool MVCC::validate() const { +bool MVCC::validate(uint64_t slot) const { ViewCheck check; ut_ad(trx_sys_mutex_own()); - ut_list_map(m_views, check); + ut_list_map(m_views[slot], check); return (true); } @@ -317,7 +321,11 @@ ReadView::ReadView() m_up_limit_id(), m_creator_trx_id(), m_ids(), - m_low_limit_no() { + m_low_limit_no(), + m_trx(nullptr), + m_version(), + m_slot(), + m_shared() { ut_d(::memset(&m_view_list, 0x0, sizeof(m_view_list))); ut_d(m_view_low_limit_no = 0); } @@ -330,22 +338,111 @@ ReadView::~ReadView() { /** Constructor @param size Number of views to pre-allocate */ -MVCC::MVCC(ulint size) : m_free(), m_views() { - for (ulint i = 0; i < size; ++i) { - ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); +MVCC::MVCC(ulint size) { - UT_LIST_ADD_FIRST(m_free, view); + m_slot_index = 0; + + for (auto slot = 0; slot < MAX_SNAPSHOT_SIZE; slot++) { + UT_LIST_INIT(m_free[slot]); + UT_LIST_INIT(m_views[slot]); + + for (ulint i = 0; i < size; ++i) { + ReadView *view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); + UT_LIST_ADD_FIRST(m_free[slot], view); + } + + mutex_create(LATCH_ID_TRX_SYS_MVCC, &(m_mutexs[slot])); } } MVCC::~MVCC() { - while (ReadView *view = UT_LIST_GET_FIRST(m_free)) { - UT_LIST_REMOVE(m_free, view); + for (auto slot = 0; slot < MAX_SNAPSHOT_SIZE; slot++) { + while (ReadView *view = UT_LIST_GET_FIRST(m_free[slot])) { + UT_LIST_REMOVE(m_free[slot], view); + + ut::delete_(view); + } + + ut_a(UT_LIST_GET_LEN(m_views[slot]) == 0); + + mutex_destroy(&(m_mutexs[slot])); + } +} + +bool ReadView::changes_visible( + const dict_index_t *index, + buf_block_t *block, + const rec_t *rec, + const ulint *offsets) { + + ut_a(index->is_clustered()); + ut_a(scn_mgr != nullptr); - ut::delete_(view); + if (index->table->is_temporary()) { + return true; } - ut_a(UT_LIST_GET_LEN(m_views) == 0); + /* Get transaction id from record */ + ulint offset = scn_mgr->scn_offset(index, offsets); + trx_id_t id = mach_read_from_6(rec + offset); + + /* If it's scn, direct compare*/ + if (SCN_Mgr::is_scn(id)) { + return sees_version(id); + } + + /* Trx itself */ + if (id == m_creator_trx_id) { + return true; + } + + if (id < m_up_limit_id) { + return true; + } + + if (id >= m_low_limit_id) { + return false; + } + + if (m_committing_ids.find(id) != m_committing_ids.end()) { + /* Not visible to current view */ + return false; + } + + trx_id_t committing_version = 0; + /* Get SCN from undo log */ + trx_id_t scn = scn_mgr->get_scn(id, index, row_get_rec_roll_ptr(rec, index, offsets), &committing_version); + + if (!m_shared && committing_version != 0 && committing_version < m_version) { + /* Consider such scenario: + - active trx: get trx->no = 5 + - open read view: version = 7 + - before committing trx completely: not visible + - after committing trx: visible because it's deregistered + and scn is written to undo (5 < 7) + + Problem: consistent read is broken, so we must + record such kind of scn and id */ + ut_a(scn == TRX_ID_MAX); + m_committing_ids.insert(id); + m_committing_scns.insert(committing_version); + + ut_a(committing_version >= m_low_limit_no); + } + + if (scn == TRX_ID_MAX) { + /* Still active */ + return false; + } + + ut_a(scn > 0); + + if (block != nullptr && !srv_read_only_mode) { + /* Attch record to block */ + scn_mgr->add_scn_cursor(m_trx, block, const_cast(rec) + offset, id, scn); + } + + return (sees_version(scn)); } /** @@ -445,26 +542,43 @@ point in time are seen in the view. @param id Creator transaction id */ void ReadView::prepare(trx_id_t id) { - ut_ad(trx_sys_mutex_own()); m_creator_trx_id = id; m_low_limit_no = trx_get_serialisation_min_trx_no(); - m_low_limit_id = trx_sys_get_next_trx_id_or_no(); + m_up_limit_id = scn_mgr->min_active_id(); + m_version = trx_sys->get_max_trx_scn(); + m_low_limit_id = trx_sys->get_max_trx_id(); - ut_a(m_low_limit_no <= m_low_limit_id); - - if (!trx_sys->rw_trx_ids.empty()) { - copy_trx_ids(trx_sys->rw_trx_ids); - } else { - m_ids.clear(); + if (m_low_limit_no > m_version) { + m_low_limit_no = m_version; } - /* The first active transaction has the smallest id. */ - m_up_limit_id = !m_ids.empty() ? m_ids.front() : m_low_limit_id; + m_committing_scns.clear(); + m_committing_ids.clear(); + + if (m_shared) { + /* This is a read view shared by multiple threads such as + select count(*), so we must prepare m_committing_scns and + m_committing_ids before hand */ + trx_sys_serialisation_mutex_enter(); + for (auto trx = UT_LIST_GET_FIRST(trx_sys->serialisation_list); + trx != nullptr; trx = UT_LIST_GET_NEXT(no_list, trx)) { + ut_a(trx->no != TRX_ID_MAX); + if (trx->no >= m_version) { + break; + } + + if (trx->id == 0) { + continue; + } - ut_a(m_up_limit_id <= m_low_limit_id); + m_committing_scns.insert(trx->no); + m_committing_ids.insert(trx->id); + } + trx_sys_serialisation_mutex_exit(); + } ut_d(m_view_low_limit_no = m_low_limit_no); m_closed = false; @@ -475,14 +589,13 @@ Find a free view from the active list, if none found then allocate a new view. @return a view to use */ -ReadView *MVCC::get_view() { - ut_ad(trx_sys_mutex_own()); +ReadView *MVCC::get_view(uint64_t slot) { ReadView *view; - if (UT_LIST_GET_LEN(m_free) > 0) { - view = UT_LIST_GET_FIRST(m_free); - UT_LIST_REMOVE(m_free, view); + if (UT_LIST_GET_LEN(m_free[slot]) > 0) { + view = UT_LIST_GET_FIRST(m_free[slot]); + UT_LIST_REMOVE(m_free[slot], view); } else { view = ut::new_withkey(UT_NEW_THIS_FILE_PSI_KEY); @@ -498,7 +611,7 @@ ReadView *MVCC::get_view() { @param view View owned by this class created for the caller. Must be freed by calling view_close() @param trx Transaction instance of caller */ -void MVCC::view_open(ReadView *&view, trx_t *trx) { +void MVCC::view_open(ReadView *&view, trx_t *trx, bool is_shared) { ut_ad(!srv_read_only_mode); /** If no new RW transaction has been started since the last view @@ -521,7 +634,9 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { if (trx_is_autocommit_non_locking(trx) && view->empty()) { view->m_closed = false; - if (view->m_low_limit_id == trx_sys_get_next_trx_id_or_no()) { + if (view->m_low_limit_id == trx_sys->get_max_trx_id() && + view->m_version == trx_sys->get_max_trx_scn()) { + view->m_shared = is_shared; return; } else { view->m_closed = true; @@ -529,93 +644,87 @@ void MVCC::view_open(ReadView *&view, trx_t *trx) { } } - trx_sys_mutex_enter(); + uint64_t slot; + if (view != nullptr) { + slot = view->get_slot(); + } else { + slot = get_slot(); + } + + enter(slot); if (view != nullptr) { - UT_LIST_REMOVE(m_views, view); + UT_LIST_REMOVE(m_views[slot], view); } else { - view = get_view(); + view = get_view(slot); } if (view != nullptr) { + view->m_shared = is_shared; + view->prepare(trx->id); - UT_LIST_ADD_FIRST(m_views, view); + UT_LIST_ADD_FIRST(m_views[slot], view); ut_ad(!view->is_closed()); - ut_ad(validate()); + view->set_slot(slot); } - trx_sys_mutex_exit(); + exit(slot); +} + +void MVCC::enter(uint64_t slot) { + mutex_enter(&(m_mutexs[slot])); +} + +void MVCC::exit(uint64_t slot) { + mutex_exit(&(m_mutexs[slot])); } + /** Get the oldest (active) view in the system. @return oldest view if found or NULL */ +void MVCC::get_oldest_version(ReadView *purge_view) { + trx_id_t purge_version = std::min(purge_view->version(), purge_view->low_limit_no()); + trx_id_t purge_low_id = purge_view->low_limit_id(); + + for (auto i = 0; i < MAX_SNAPSHOT_SIZE; i++) { + enter(i); + for (ReadView* view = UT_LIST_GET_LAST(m_views[i]); view != nullptr; + view = UT_LIST_GET_PREV(m_view_list, view)) { + if (view->is_closed()) { + continue; + } -ReadView *MVCC::get_oldest_view() const { - ReadView *view; + trx_id_t min_no = std::min(view->low_limit_no(), view->version()); + if (min_no < purge_version) { + purge_version = min_no; + } - ut_ad(trx_sys_mutex_own()); + if (view->low_limit_id() < purge_low_id) { + purge_low_id = view->low_limit_id(); + } - for (view = UT_LIST_GET_LAST(m_views); view != nullptr; - view = UT_LIST_GET_PREV(m_view_list, view)) { - if (!view->is_closed()) { break; } - } - - return (view); -} -/** -Copy state from another view. Must call copy_complete() to finish. -@param other view to copy from */ - -void ReadView::copy_prepare(const ReadView &other) { - ut_ad(&other != this); - - if (!other.m_ids.empty()) { - const ids_t::value_type *p = other.m_ids.data(); - - m_ids.assign(p, p + other.m_ids.size()); - } else { - m_ids.clear(); + exit(i); } - m_up_limit_id = other.m_up_limit_id; - - m_low_limit_no = other.m_low_limit_no; - - ut_d(m_view_low_limit_no = other.m_view_low_limit_no); - - m_low_limit_id = other.m_low_limit_id; - - m_creator_trx_id = other.m_creator_trx_id; + purge_view->copy_prepare(purge_version, purge_low_id); } /** -Complete the copy, insert the creator transaction id into the -m_ids too and adjust the m_up_limit_id, if required */ - -void ReadView::copy_complete() { - ut_ad(!trx_sys_mutex_own()); - - if (m_creator_trx_id > 0) { - m_ids.insert(m_creator_trx_id); - } - - if (!m_ids.empty()) { - /* The last active transaction has the smallest id. */ - m_up_limit_id = std::min(m_ids.front(), m_up_limit_id); - } - - ut_ad(m_up_limit_id <= m_low_limit_id); +@param other view to copy from */ - /* We added the creator transaction ID to the m_ids. */ - m_creator_trx_id = 0; +void ReadView::copy_prepare(trx_id_t version, trx_id_t low_id) { + m_up_limit_id = 0; + m_low_limit_no = version; + m_version = version; + m_low_limit_id = low_id; } /** Clones the oldest view and stores it in view. No need to @@ -625,22 +734,11 @@ m_free list. This function is called by Purge to determine whether it should purge the delete marked record or not. @param view Preallocated view, owned by the caller */ void MVCC::clone_oldest_view(ReadView *view) { - trx_sys_mutex_enter(); - ReadView *oldest_view = get_oldest_view(); + view->prepare(0); - if (oldest_view == nullptr) { - view->prepare(0); - - trx_sys_mutex_exit(); - - } else { - view->copy_prepare(*oldest_view); + get_oldest_version(view); - trx_sys_mutex_exit(); - - view->copy_complete(); - } /* Update view to block purging transaction till GTID is persisted. */ auto >id_persistor = clone_sys->get_gtid_persistor(); auto gtid_oldest_trxno = gtid_persistor.get_oldest_trx_no(); @@ -650,19 +748,19 @@ void MVCC::clone_oldest_view(ReadView *view) { /** @return the number of active views */ -ulint MVCC::size() const { - trx_sys_mutex_enter(); - +ulint MVCC::size() { ulint size = 0; - for (const ReadView *view : m_views) { - if (!view->is_closed()) { - ++size; + for (auto i = 0; i < MAX_SNAPSHOT_SIZE; i++) { + enter(i); + for (const ReadView *view : m_views[i]) { + if (!view->is_closed()) { + ++size; + } } + exit(i); } - trx_sys_mutex_exit(); - return (size); } @@ -688,14 +786,386 @@ void MVCC::view_close(ReadView *&view, bool own_mutex) { view = reinterpret_cast(p | 0x1); } else { view = reinterpret_cast(p & ~1); + auto slot = view->get_slot(); + enter(slot); view->close(); - UT_LIST_REMOVE(m_views, view); - UT_LIST_ADD_LAST(m_free, view); + UT_LIST_REMOVE(m_views[slot], view); + UT_LIST_ADD_LAST(m_free[slot], view); - ut_ad(validate()); + exit(slot); + ut_ad(validate(slot)); view = nullptr; } } + +Scn_Map::Scn_Map() { + m_size = SCN_MAP_MAX_SIZE; + m_elems = new Elem[m_size]; + ut_a(m_elems != nullptr); +} + +Scn_Map::~Scn_Map() { + delete[] m_elems; +} + +SCN_Mgr::CleanoutWorker::CleanoutWorker() { + m_pages = new LF_Array(LF_ARRAY_MAX_SIZE); + m_event = os_event_create(); + os_event_reset(m_event); +} + +SCN_Mgr::CleanoutWorker::~CleanoutWorker() { + os_event_destroy(m_event); + delete m_pages; +} + +void SCN_Mgr::CleanoutWorker::add_page(uint64_t compact_page_id) { + m_pages->add(compact_page_id); +} + +void SCN_Mgr::CleanoutWorker::take_pages(PageSets &pages) { + pages.clear(); + + uint64_t val = 0; + while ((val = m_pages->get()) != 0) { + pages.insert(val); + } +} + +SCN_Mgr::SCN_Mgr() { + m_scn_map = new Scn_Map(); + m_random_map = new Scn_Map(); + m_startup_id = 0; + m_startup_scn = 0; + m_abort = false; + m_cleanout_threads = 0; + m_view_active = false; + m_max_cleanout_threads = srv_cleanout_threads; + + m_cleanout_workers = new CleanoutWorker* [m_max_cleanout_threads]; + + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + m_cleanout_workers[i] = new CleanoutWorker(); + } + + m_view_event = os_event_create(); + os_event_reset(m_view_event); +} + +SCN_Mgr::~SCN_Mgr() { + delete m_scn_map; + delete m_random_map; + + ut_a(m_cleanout_threads.load() == 0); + + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + delete m_cleanout_workers[i]; + } + + delete [] m_cleanout_workers; + + os_event_destroy(m_view_event); +} + +/** Init scn/id on startup */ +void SCN_Mgr::init() { + m_startup_scn = trx_sys->get_max_trx_scn() - 2; + + trx_sys_mutex_enter(); + if (trx_sys->rw_trx_ids.empty()) { + m_startup_id = trx_sys->get_max_trx_id(); + } else { + m_startup_id = trx_sys->rw_trx_ids.front(); + } + + trx_sys_mutex_exit(); +} + +trx_id_t SCN_Mgr::get_scn_fast(trx_id_t id, trx_id_t *committing_version) { + trx_id_t scn; + if (id < m_startup_id) { + /* Too old transaction */ + scn = m_startup_scn; + } else { + scn = m_scn_map->read(id); + if (scn == 0) { + scn = m_random_map->read(id); + } + + if (scn == 0) { + /* Check if transaction is still active */ + trx_t *trx = trx_rw_is_active(id, true); + if (trx != nullptr) { + /* transaction is still active */ + if (committing_version == nullptr) { + trx_release_reference(trx); + return TRX_ID_MAX; + } + + /* Get scn from trx if trx->no is set */ + mutex_enter(&trx->scn_mutex); + trx_id_t version = trx->no; + mutex_exit(&trx->scn_mutex); + + trx_release_reference(trx); + + if (version != TRX_ID_MAX) { + *committing_version = version; + } + + return TRX_ID_MAX; + } + } + } + + return scn; +} + +trx_id_t SCN_Mgr::get_scn(trx_id_t id, const dict_index_t *index, roll_ptr_t roll_ptr, trx_id_t *committing_version) { + if (index->table->is_temporary()) { + return TRX_ID_MAX; + } + + trx_id_t scn = get_scn_fast(id, committing_version); + + if (scn == TRX_ID_MAX) { + /* Transaction is still active */ + return TRX_ID_MAX; + } + + if (scn != 0) { + return scn; + } + + /* Slow path */ + scn = trx_undo_get_scn(index, roll_ptr, id); + if (scn > 0) { + m_random_map->store(id, scn); + } + + ut_a(scn < trx_sys->get_max_trx_scn()); + + if (scn == 0) { + return TRX_ID_MAX; + } + + return scn; +} + +ulint SCN_Mgr::scn_offset(const dict_index_t *index, const ulint *offsets) { + ulint offset = index->trx_id_offset; + + if (!offset) { + offset = row_get_trx_id_offset(index, offsets); + } + + return offset; +} + +void SCN_Mgr::set_scn(mtr_t *mtr, byte *ptr, trx_id_t id, trx_id_t scn) { + trx_id_t stored_id = mach_read_from_6(ptr); + + if (is_scn(stored_id)) { + if (stored_id > scn) { + /* Never revert back to smaller scn */ + return; + } + } else if (stored_id != id) { + /* don't match */ + return; + } + + mach_write_to_6(ptr, scn); + m_cleanout_records++; +} + +void SCN_Mgr::set_scn( + trx_id_t current_id, + mtr_t *mtr, + rec_t *rec, + dict_index_t *index, + const ulint *offsets) { + + if (index->table->is_temporary()) { + /* No need to set scn for temp table */ + return; + } + + ulint offset = scn_offset(index, offsets); + + /* Read id */ + trx_id_t id = mach_read_from_6(rec + offset); + if (is_scn(id) || id == current_id) { + /* Already be filled with scn, do nothing */ + return; + } + + trx_id_t scn = get_scn(id, index, row_get_rec_roll_ptr(rec, index, offsets)); + + if (scn == 0 || scn == TRX_ID_MAX) { + return; + } + + set_scn(mtr, rec + offset, id, scn); +} + +void SCN_Mgr::add_scn_cursor(trx_t *trx, buf_block_t *block, byte *pos, trx_id_t id, trx_id_t scn) { + + if (fsp_is_system_temporary(block->get_space_id())) { + return; + } + + block->add_scn_cursor(pos, id, scn); + + /* Add page number to the set */ + page_id_t page_id{block->get_space_id(), block->get_page_no()}; + uint64_t compact_id = page_id.compact_value(); + uint32_t slot = page_id.hash() % m_max_cleanout_threads; + + m_cleanout_workers[slot]->add_page(compact_id); +} + +void SCN_Mgr::view_task() { + m_view_active = true; + + while (!m_abort.load()) { + uint64_t sig_counter = os_event_reset(m_view_event); + + trx_sys_mutex_enter(); + if (!trx_sys->rw_trx_ids.empty()) { + m_min_active_id = trx_sys->rw_trx_ids.front(); + } else { + m_min_active_id = trx_sys->get_max_trx_id(); + } + trx_sys_mutex_exit(); + + os_event_wait_time_low(m_view_event, std::chrono::seconds{1}, sig_counter); + } + + m_view_active = false; +} + +void SCN_Mgr::batch_write_scn(buf_block_t *block, mtr_t *mtr) { + ScnCleanoutRecs recs; + recs.clear(); + block->copy_scn_and_free(recs); + + if (recs.empty()) { + return; + } + + for (auto & rec : recs) { + set_scn(mtr, rec.first, rec.second.first, rec.second.second); + } +} + +void SCN_Mgr::cleanout_task() { + + uint32_t slot = m_cleanout_threads.fetch_add(1); + + ib::info() << "Cleanout thread id " << slot << " starts..."; + + while (!m_abort.load()) { + uint64_t sig_counter = m_cleanout_workers[slot]->reset(); + + PageSets pages; + pages.clear(); + + m_cleanout_workers[slot]->take_pages(pages); + + if (!pages.empty()) { + /* Process pages that need to be modified */ + for ( auto &val : pages) { + page_id_t id(val); + //TBD: OPTIMIZE IT + uint32_t flags = fil_space_get_flags(id.space()); + if (flags == UINT32_UNDEFINED) { + continue; + } + page_size_t page_size(flags); + + mtr_t mtr; + buf_block_t *block = nullptr; + mtr_start(&mtr); + + mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO); + block = buf_page_get_gen(id, page_size, RW_X_LATCH, nullptr, + Page_fetch::IF_IN_POOL, UT_LOCATION_HERE, &mtr, false, true); + + if (block != nullptr) { + if (!block->page.was_stale()) { + ut_d(block->page.in_scn_cleanout = true); + batch_write_scn(block, &mtr); + } else { + block->clear_cursor(); + } + } + + mtr_commit(&mtr); + } + } + + m_cleanout_workers[slot]->wait(sig_counter); + } + + ib::info() << "Cleanout thread id " << slot << " exit"; + + m_cleanout_threads--; +} + +void run_cleanout_task() { + scn_mgr->cleanout_task(); +} + +void run_view_task() { + scn_mgr->view_task(); +} + +void SCN_Mgr::start() { + m_abort = false; + + std::vector ths; + ths.clear(); + + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + ths.push_back(std::thread(run_cleanout_task)); + } + + for (auto itr = ths.begin(); itr != ths.end(); itr++) { + itr->detach(); + } + + std::thread th2(run_view_task); + th2.detach(); + + while (m_cleanout_threads != m_max_cleanout_threads) { + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } + + while (!m_view_active.load()) { + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + +void SCN_Mgr::stop() { + m_abort = true; + + while (m_cleanout_threads.load() > 0 || m_view_active.load()) { + os_event_set(m_view_event); + + if (m_cleanout_threads.load() > 0) { + for (uint32_t i = 0; i < m_max_cleanout_threads; i++) { + m_cleanout_workers[i]->wakeup(); + } + } + + std::this_thread::sleep_for( + std::chrono::microseconds(100)); + } +} + diff --git a/storage/innobase/row/row0log.cc b/storage/innobase/row/row0log.cc index 1bbc5239d44..417223848cd 100644 --- a/storage/innobase/row/row0log.cc +++ b/storage/innobase/row/row0log.cc @@ -1897,7 +1897,10 @@ flag_ok: &len) == rec_trx_id + DATA_TRX_ID_LEN); ut_ad(len == DATA_ROLL_PTR_LEN); - if (memcmp(mrec_trx_id, rec_trx_id, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { + //FIXME + if (memcmp(mrec_trx_id + DATA_TRX_ID_LEN, + rec_trx_id + DATA_TRX_ID_LEN, + DATA_ROLL_PTR_LEN)) { /* The ROW_T_DELETE was logged for a different PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR. This is possible if a ROW_T_INSERT was skipped @@ -2155,7 +2158,7 @@ flag_ok: /* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what was buffered. */ ulint len; - const void *rec_trx_id = rec_get_nth_field( + const byte *rec_trx_id = rec_get_nth_field( nullptr, pcur.get_rec(), cur_offsets, index->n_uniq, &len); ut_ad(len == DATA_TRX_ID_LEN); ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq)->len == DATA_TRX_ID_LEN); @@ -2165,8 +2168,9 @@ flag_ok: static_cast( dtuple_get_nth_field(old_pk, index->n_uniq)->data) == dtuple_get_nth_field(old_pk, index->n_uniq + 1)->data); - if (memcmp(rec_trx_id, dtuple_get_nth_field(old_pk, index->n_uniq)->data, - DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { + if (memcmp(rec_trx_id + DATA_TRX_ID_LEN, + static_cast(dtuple_get_nth_field(old_pk, index->n_uniq)->data) + DATA_TRX_ID_LEN, + DATA_ROLL_PTR_LEN)) { /* The ROW_T_UPDATE was logged for a different DB_TRX_ID,DB_ROLL_PTR. This is possible if an earlier ROW_T_INSERT or ROW_T_UPDATE was diverted diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc index 47159170461..5485e63b595 100644 --- a/storage/innobase/row/row0mysql.cc +++ b/storage/innobase/row/row0mysql.cc @@ -2955,6 +2955,7 @@ dberr_t row_create_index_for_mysql( goto error_handling; } + trx->add_scn_index(table->id, index->id); } else { dict_build_index_def(table, index, trx); #ifdef UNIV_DEBUG @@ -4570,7 +4571,7 @@ dberr_t row_scan_index_for_mysql(row_prebuilt_t *prebuilt, dict_index_t *index, /* No INSERT INTO ... SELECT and non-locking selects only. */ trx_start_if_not_started_xa(prebuilt->trx, false, UT_LOCATION_HERE); - trx_assign_read_view(prebuilt->trx); + trx_assign_read_view(prebuilt->trx, true); auto trx = prebuilt->trx; diff --git a/storage/innobase/row/row0pread-histogram.cc b/storage/innobase/row/row0pread-histogram.cc index c7f812ae7bc..c22b10809dc 100644 --- a/storage/innobase/row/row0pread-histogram.cc +++ b/storage/innobase/row/row0pread-histogram.cc @@ -337,7 +337,7 @@ dberr_t Histogram_sampler::process_non_leaf_rec( offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, UT_LOCATION_HERE, &heap); - if (ctx->is_rec_visible(rec, offsets, heap, &mtr)) { + if (ctx->is_rec_visible(page_cur_get_block(&cur), rec, offsets, heap, &mtr)) { err = sample_rec(ctx, rec, offsets, index, prebuilt); if (err != DB_SUCCESS) { diff --git a/storage/innobase/row/row0pread.cc b/storage/innobase/row/row0pread.cc index 1b5766614f6..71973f87238 100644 --- a/storage/innobase/row/row0pread.cc +++ b/storage/innobase/row/row0pread.cc @@ -463,11 +463,11 @@ dberr_t PCursor::move_to_next_block(dict_index_t *index) { return err; } -bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, +bool Parallel_reader::Scan_ctx::check_visibility(buf_block_t *block, const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { - const auto table_name = m_config.m_index->table->name; +// const auto table_name = m_config.m_index->table->name; ut_ad(!m_trx || m_trx->read_view == nullptr || MVCC::is_view_active(m_trx->read_view)); @@ -478,16 +478,8 @@ bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, auto view = m_trx->read_view; if (m_config.m_index->is_clustered()) { - trx_id_t rec_trx_id; - - if (m_config.m_index->trx_id_offset > 0) { - rec_trx_id = trx_read_trx_id(rec + m_config.m_index->trx_id_offset); - } else { - rec_trx_id = row_get_rec_trx_id(rec, m_config.m_index, offsets); - } - if (m_trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && - !view->changes_visible(rec_trx_id, table_name)) { + !view->changes_visible(m_config.m_index, block, rec, offsets)) { rec_t *old_vers; row_vers_build_for_consistent_read(rec, mtr, m_config.m_index, &offsets, @@ -742,7 +734,7 @@ dberr_t Parallel_reader::Ctx::traverse_recs(PCursor *pcursor, mtr_t *mtr) { bool skip{}; if (page_is_leaf(cur->block->frame)) { - skip = !m_scan_ctx->check_visibility(rec, offsets, heap, mtr); + skip = !m_scan_ctx->check_visibility(cur->block, rec, offsets, heap, mtr); } if (!skip) { diff --git a/storage/innobase/row/row0sel.cc b/storage/innobase/row/row0sel.cc index 84d3f2055d9..1b4ee1bb5c8 100644 --- a/storage/innobase/row/row0sel.cc +++ b/storage/innobase/row/row0sel.cc @@ -883,7 +883,7 @@ static inline bool row_sel_test_other_conds( old_vers = nullptr; - if (!lock_clust_rec_cons_read_sees(clust_rec, index, offsets, + if (!lock_clust_rec_cons_read_sees(clust_rec, index, offsets, &plan->clust_pcur, node->read_view)) { err = row_sel_build_prev_vers(node->read_view, index, clust_rec, &offsets, @@ -1389,7 +1389,7 @@ static ulint row_sel_try_search_shortcut( UT_LOCATION_HERE, &heap); if (index->is_clustered()) { - if (!lock_clust_rec_cons_read_sees(rec, index, offsets, node->read_view)) { + if (!lock_clust_rec_cons_read_sees(rec, index, offsets, &(plan->pcur), node->read_view)) { ret = SEL_RETRY; goto func_exit; } @@ -1752,6 +1752,7 @@ skip_lock: if (index->is_clustered()) { if (!lock_clust_rec_cons_read_sees(rec, index, offsets, + &(plan->pcur), node->read_view)) { err = row_sel_build_prev_vers(node->read_view, index, rec, &offsets, &heap, &plan->old_vers_heap, &old_vers, @@ -3291,6 +3292,7 @@ non-clustered index. Does the necessary locking. if (trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && !lock_clust_rec_cons_read_sees(clust_rec, clust_index, *offsets, + prebuilt->clust_pcur, trx_get_read_view(trx))) { if (clust_rec != cached_clust_rec) { /* The following call returns 'offsets' associated with 'old_vers' */ @@ -3748,7 +3750,7 @@ static ulint row_sel_try_search_shortcut_for_mysql( *offsets = rec_get_offsets(rec, index, *offsets, ULINT_UNDEFINED, UT_LOCATION_HERE, heap); - if (!lock_clust_rec_cons_read_sees(rec, index, *offsets, + if (!lock_clust_rec_cons_read_sees(rec, index, *offsets, pcur, trx_get_read_view(trx))) { return (SEL_RETRY); } @@ -5311,7 +5313,7 @@ rec_loop: by skipping this lookup */ if (srv_force_recovery < 5 && - !lock_clust_rec_cons_read_sees(rec, index, offsets, + !lock_clust_rec_cons_read_sees(rec, index, offsets, pcur, trx_get_read_view(trx))) { rec_t *old_vers; /* The following call returns 'offsets' associated with 'old_vers' */ diff --git a/storage/innobase/row/row0vers.cc b/storage/innobase/row/row0vers.cc index 1573601d488..6ace3120c71 100644 --- a/storage/innobase/row/row0vers.cc +++ b/storage/innobase/row/row0vers.cc @@ -587,7 +587,11 @@ bool row_vers_must_preserve_del_marked(trx_id_t trx_id, mtr_s_lock(&purge_sys->latch, mtr, UT_LOCATION_HERE); - return (!purge_sys->view.changes_visible(trx_id, name)); + if (!SCN_Mgr::is_scn(trx_id)) { + return true; + } + + return (!purge_sys->view.sees_version(trx_id)); } /** Check whether all non-virtual columns in a index entries match @@ -1273,7 +1277,7 @@ dberr_t row_vers_build_for_consistent_read( lob_undo->reset(); } - ut_ad(!view->changes_visible(trx_id, index->table->name)); + ut_ad(!SCN_Mgr::is_scn(trx_id) || !view->sees_version(trx_id)); ut_ad(!vrow || !(*vrow)); @@ -1315,9 +1319,7 @@ dberr_t row_vers_build_for_consistent_read( ut_a(!rec_offs_any_null_extern(index, prev_version, *offsets)); #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ - trx_id = row_get_rec_trx_id(prev_version, index, *offsets); - - if (view->changes_visible(trx_id, index->table->name)) { + if (view->changes_visible(index, nullptr, prev_version, *offsets)) { /* The view already sees this version: we can copy it to in_heap and return */ diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index a614e8b6450..1abe17d25de 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -654,6 +654,8 @@ static ulint srv_log_writes_and_flush = 0; #endif /* !UNIV_HOTBACKUP */ +uint32_t srv_cleanout_threads = 8; + /* Interval in seconds at which various tasks are performed by the master thread when server is active. In order to balance the workload, we should try to keep intervals such that they are not multiple of @@ -1733,6 +1735,10 @@ void srv_export_innodb_status(void) { } undo::spaces->s_unlock(); + if (scn_mgr != nullptr) { + export_vars.innodb_scn_cleanout_records = scn_mgr->cleanout_records(); + } + #ifdef UNIV_DEBUG rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); trx_id_t done_trx_no = purge_sys->done.trx_no; diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index 9fb947494e6..c4cf78ec0c9 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -1779,6 +1779,8 @@ dberr_t srv_start(bool create_new_db) { trx_sys_create(); lock_sys_create(srv_lock_table_size); + scn_mgr = new SCN_Mgr(); + /* Create i/o-handler threads: */ os_aio_start_threads(); @@ -2065,6 +2067,8 @@ dberr_t srv_start(bool create_new_db) { auto *dict_metadata = recv_recovery_from_checkpoint_finish(false); ut_a(dict_metadata != nullptr); + scn_mgr->start(); + /* We need to save the dynamic metadata collected from redo log to DD buffer table here. This is to make sure that the dynamic metadata is not lost by any future checkpoint. Since DD and data dictionary in memory @@ -2994,6 +2998,10 @@ void srv_shutdown() { buffer pool to disk. */ dict_persist_to_dd_table_buffer(); + if (scn_mgr != nullptr) { + scn_mgr->stop(); + } + /* The steps 1-4 is the real InnoDB shutdown. All before was to stop activity which could produce new changes. All after is just cleaning up (freeing memory). */ @@ -3058,6 +3066,11 @@ void srv_shutdown() { lock_sys_close(); trx_pool_close(); + if (scn_mgr != nullptr) { + delete scn_mgr; + scn_mgr = nullptr; + } + dict_close(); dict_persist_close(); undo_spaces_deinit(); diff --git a/storage/innobase/sync/sync0debug.cc b/storage/innobase/sync/sync0debug.cc index 4f0ee43fe14..c5d3307d654 100644 --- a/storage/innobase/sync/sync0debug.cc +++ b/storage/innobase/sync/sync0debug.cc @@ -1359,6 +1359,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(TRX_UNDO, SYNC_TRX_UNDO, trx_undo_mutex_key); + LATCH_ADD_MUTEX(TRX_SCN, SYNC_NO_ORDER_CHECK, trx_scn_mutex_key); + LATCH_ADD_MUTEX(TRX_POOL, SYNC_POOL, trx_pool_mutex_key); LATCH_ADD_MUTEX(TRX_POOL_MANAGER, SYNC_POOL_MANAGER, @@ -1384,6 +1386,8 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(TRX_SYS_SERIALISATION, SYNC_TRX_SYS_SERIALISATION, trx_sys_serialisation_mutex_key); + LATCH_ADD_MUTEX(TRX_SYS_MVCC, SYNC_NO_ORDER_CHECK, trx_sys_mvcc_mutex_key); + LATCH_ADD_MUTEX(SRV_SYS, SYNC_THREADS, srv_sys_mutex_key); LATCH_ADD_MUTEX(SRV_SYS_TASKS, SYNC_ANY_LATCH, srv_threads_mutex_key); diff --git a/storage/innobase/sync/sync0sync.cc b/storage/innobase/sync/sync0sync.cc index 064ed631abc..7f82dc115ba 100644 --- a/storage/innobase/sync/sync0sync.cc +++ b/storage/innobase/sync/sync0sync.cc @@ -122,6 +122,7 @@ mysql_pfs_key_t srv_monitor_file_mutex_key; mysql_pfs_key_t sync_thread_mutex_key; #endif /* UNIV_DEBUG */ mysql_pfs_key_t trx_undo_mutex_key; +mysql_pfs_key_t trx_scn_mutex_key; mysql_pfs_key_t trx_mutex_key; mysql_pfs_key_t trx_pool_mutex_key; mysql_pfs_key_t trx_pool_manager_mutex_key; @@ -132,6 +133,7 @@ mysql_pfs_key_t lock_wait_mutex_key; mysql_pfs_key_t trx_sys_mutex_key; mysql_pfs_key_t trx_sys_shard_mutex_key; mysql_pfs_key_t trx_sys_serialisation_mutex_key; +mysql_pfs_key_t trx_sys_mvcc_mutex_key; mysql_pfs_key_t srv_sys_mutex_key; mysql_pfs_key_t srv_threads_mutex_key; #ifndef PFS_SKIP_EVENT_MUTEX diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index 2b0a777f3af..4dc5b1eee89 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -175,7 +175,8 @@ const page_size_t TrxUndoRsegsIterator::set_next() { const page_size_t page_size(m_purge_sys->rseg->page_size); - ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no); + ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no || + purge_sys->iter.trx_no == purge_sys->rseg->last_trx_no + 2); m_purge_sys->iter.trx_no = m_purge_sys->rseg->last_trx_no; m_purge_sys->hdr_offset = m_purge_sys->rseg->last_offset; @@ -262,6 +263,8 @@ void trx_purge_sys_initialize(uint32_t n_purge_threads, trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + purge_sys->version = purge_sys->view.version(); + purge_sys->rseg_iter = ut::new_withkey( UT_NEW_THIS_FILE_PSI_KEY, purge_sys); } @@ -319,6 +322,7 @@ void trx_purge_add_update_undo_to_history( bool update_rseg_history_len, /*!< in: if true: update rseg history len else skip updating it. */ + bool is_insert, /*!< in: true if it's insert undo */ ulint n_added_logs, /*!< in: number of logs added */ mtr_t *mtr) /*!< in: mtr */ { @@ -327,7 +331,14 @@ void trx_purge_add_update_undo_to_history( trx_rsegf_t *rseg_header; trx_ulogf_t *undo_header; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + + ut_a(undo != nullptr); + rseg = undo->rseg; rseg_header = trx_rsegf_get(undo->rseg->space_id, undo->rseg->page_no, @@ -1708,7 +1719,7 @@ static void trx_purge_rseg_get_next_history_log( ut_a(rseg->last_page_no != FIL_NULL); - purge_sys->iter.trx_no = rseg->last_trx_no + 1; + purge_sys->iter.trx_no = rseg->last_trx_no + 2; purge_sys->iter.undo_no = 0; purge_sys->iter.undo_rseg_space = SPACE_UNKNOWN; purge_sys->next_stored = false; @@ -1780,6 +1791,11 @@ static void trx_purge_rseg_get_next_history_log( rseg->latch(); + if (purge_sys->iter.trx_no == trx_no + 2) { + //FIXME: ASSERT the page type is TRX_UNDO_INSERT + purge_sys->iter.trx_no = trx_no; + } + rseg->last_page_no = prev_log_addr.page; rseg->last_offset = prev_log_addr.boffset; rseg->last_trx_no = trx_no; @@ -1812,6 +1828,7 @@ static void trx_purge_read_undo_rec(trx_purge_t *purge_sys, uint64_t undo_no; space_id_t undo_rseg_space; trx_id_t modifier_trx_id; + trx_id_t modifier_trx_no; purge_sys->hdr_offset = purge_sys->rseg->last_offset; page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no; @@ -1823,7 +1840,7 @@ static void trx_purge_read_undo_rec(trx_purge_t *purge_sys, mtr_start(&mtr); undo_rec = trx_undo_get_first_rec( - &modifier_trx_id, purge_sys->rseg->space_id, page_size, + &modifier_trx_id, &modifier_trx_no, purge_sys->rseg->space_id, page_size, purge_sys->hdr_page_no, purge_sys->hdr_offset, RW_S_LATCH, &mtr); if (undo_rec != nullptr) { @@ -1889,7 +1906,7 @@ static trx_undo_rec_t *trx_purge_get_next_rec( mtr_t mtr; ut_ad(purge_sys->next_stored); - ut_ad(purge_sys->iter.trx_no < purge_sys->view.low_limit_no()); + ut_ad(purge_sys->iter.trx_no < purge_sys->version.load()); space = purge_sys->rseg->space_id; page_no = purge_sys->page_no; @@ -2212,7 +2229,7 @@ void Purge_groups_t::distribute_if_needed() { } } - if (purge_sys->iter.trx_no >= purge_sys->view.low_limit_no()) { + if (purge_sys->iter.trx_no >= purge_sys->version.load()) { return nullptr; } @@ -2407,6 +2424,8 @@ ulint trx_purge(ulint n_purge_threads, /*!< in: number of purge tasks trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + purge_sys->version = purge_sys->view.version(); + rw_lock_x_unlock(&purge_sys->latch); #ifdef UNIV_DEBUG diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 4f9c74a644a..2192e384fbe 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -481,6 +481,7 @@ static bool trx_undo_report_insert_virtual(page_t *undo_page, /** Reports in the undo log of an insert of a clustered index record. @return offset of the inserted entry on the page if succeed, 0 if fail */ static ulint trx_undo_page_report_insert( + trx_undo_t *undo, /*!< in: undo log hdr object */ page_t *undo_page, /*!< in: undo log page */ trx_t *trx, /*!< in: transaction */ dict_index_t *index, /*!< in: clustered index */ @@ -502,7 +503,7 @@ static ulint trx_undo_page_report_insert( ut_ad(first_free <= UNIV_PAGE_SIZE); - if (trx_undo_left(undo_page, ptr) < 2 + 1 + 11 + 11) { + if (trx_undo_left(undo_page, ptr) < 2 + 1 + 11 + 11 + 9) { /* Not enough space for writing the general parameters */ return (0); @@ -511,6 +512,18 @@ static ulint trx_undo_page_report_insert( /* Reserve 2 bytes for the pointer to the next undo log record */ ptr += 2; + /* Indicate it's new version, that undo log also stores header offset */ + *ptr++ = (byte)TRX_UNDO_NEW_VERSION_TAG; + /* Store low-two bytes of trx id for verification */ + mach_write_to_2(ptr, (0xFFFF & trx->id)); + ptr += 2; + /* Write page no of header offset */ + mach_write_to_4(ptr, undo->hdr_page_no); + ptr += 4; + /* Write header offset */ + mach_write_to_2(ptr, undo->hdr_offset); + ptr += 2; + /* Store first some general parameters to the undo log */ *ptr++ = TRX_UNDO_INSERT_REC; ptr += mach_u64_write_much_compressed(ptr, trx->undo_no); @@ -549,6 +562,173 @@ static ulint trx_undo_page_report_insert( return (trx_undo_page_set_next_prev_and_add(undo_page, ptr, mtr)); } +bool trx_undo_rec_get_hdr( + trx_id_t id, + trx_undo_rec_t *undo_rec, + page_no_t &undo_hdr_no, + uint32_t &offset) { + const byte *ptr; + + ptr = undo_rec + 2; + + if (*ptr != TRX_UNDO_NEW_VERSION_TAG) { + /* old version */ + return false; + } + + ptr += 1; + uint16_t low_id = mach_read_from_2(ptr); + if (low_id != (id & 0xFFFF)) { + /* Possiblely purged */ + return false; + } + + ptr += 2; + + undo_hdr_no = mach_read_from_4(ptr); + ptr += 4; + offset = mach_read_from_2(ptr); + + return true; +} + +static inline trx_id_t trx_purge_get_version() { + trx_id_t purge_version = purge_sys->version.load(); + ut_a(purge_version > 2); + ut_a(SCN_Mgr::is_scn(purge_version)); + + return (purge_version - 2); +} + +static inline bool trx_scn_sanity_check(trx_id_t trx_id, trx_id_t trx_scn) { + if (!SCN_Mgr::is_scn(trx_scn)) { + return false; + } + + if (trx_scn >= trx_sys->get_max_trx_scn()) { + return false; + } + + if (trx_scn < scn_mgr->startup_scn()) { + return false; + } + + return true; +} + +trx_id_t trx_undo_hdr_get_scn(trx_id_t trx_id, page_id_t &page_id, uint32_t offset, mtr_t *mtr, page_t *undo_page) { + + trx_id_t purge_version = trx_purge_get_version(); + + if (undo_page == nullptr) { + page_no_t space_size = fil_space_get_size(page_id.space()); + if (space_size <= page_id.page_no()) { + return purge_version; + } + + bool found; + const page_size_t &page_size = fil_space_get_page_size(page_id.space(), &found); + ut_ad(found); + + undo_page = trx_undo_page_get_s_latched(page_id, page_size, mtr); + + if (undo_page == nullptr) { + return purge_version; + } + } + + /* Get undo header */ + trx_ulogf_t *undo_header = undo_page + offset; + trx_id_t modifier_trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + if (modifier_trx_id != trx_id) { + /* Possiblely purged */ + return purge_version; + } + + trx_id_t scn = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + + if (!trx_scn_sanity_check(trx_id, scn)) { + /* Fail to pass sanity checking */ + return purge_version; + } + + return scn; +} + +trx_id_t trx_undo_get_scn( + const dict_index_t *index, + roll_ptr_t roll_ptr, + trx_id_t id) { + trx_id_t purge_version = trx_purge_get_version(); + + /* Decode rollback pointer */ + bool is_insert; + ulint rseg_id; + space_id_t space_id; + page_no_t page_no; + ulint offset; + trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no, &offset); + space_id = trx_rseg_id_to_space_id(rseg_id, false); + + page_no_t space_size = fil_space_get_size(space_id); + /* out of range, it must be purged */ + if (page_no >= space_size) { + return purge_version; + } + + bool found; + const page_size_t &page_size = fil_space_get_page_size(space_id, &found); + ut_ad(found); + + /* Get the page */ + mtr_t mtr; + mtr_start(&mtr); + page_t *undo_page = trx_undo_page_get_s_latched(page_id_t(space_id, page_no), + page_size, &mtr); + + if (undo_page == nullptr) { + /* invalid page */ + mtr_commit(&mtr); + return purge_version; + } + + /* Get record */ + trx_undo_rec_t *undo_rec = (trx_undo_rec_t *)(undo_page + offset); + + page_no_t undo_hdr_no; + uint32_t undo_hdr_offset; + + if (!trx_undo_rec_get_hdr(id, undo_rec, undo_hdr_no, undo_hdr_offset) + || undo_hdr_no >= space_size) { + /* older version or invalid undo log */ + mtr_commit(&mtr); + return purge_version; + } + + if (trx_undo_rec_get_table_id(undo_rec) != index->table->id) { + /* Wrong undo log, possiblely purged */ + mtr_commit(&mtr); + return purge_version; + } + + page_id_t undo_hdr_id = {space_id, undo_hdr_no}; + trx_id_t scn; + + if (undo_hdr_no == page_no) { + /* No need to read and lock page again */ + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, undo_page); + } else { + mtr_commit(&mtr); + mtr_start(&mtr); + + scn = trx_undo_hdr_get_scn(id, undo_hdr_id, undo_hdr_offset, &mtr, nullptr); + } + + mtr_commit(&mtr); + + return scn; +} + /** Reads from an undo log record the general parameters. @return remaining part of undo log record after reading these values */ const byte *trx_undo_rec_get_pars( @@ -565,7 +745,7 @@ const byte *trx_undo_rec_get_pars( { const byte *ptr; - ptr = undo_rec + 2; + ptr = undo_rec + trx_undo_start_offset(undo_rec); ptr = type_cmpl.read(ptr); *updated_extern = type_cmpl.is_lob_updated(); @@ -592,7 +772,7 @@ const byte *trx_undo_rec_get_pars( @param[in] undo_rec Undo log record @return the table ID */ table_id_t trx_undo_rec_get_table_id(const trx_undo_rec_t *undo_rec) { - const byte *ptr = undo_rec + 2; + const byte *ptr = undo_rec + trx_undo_start_offset(undo_rec); uint8_t type_cmpl = mach_read_from_1(ptr); const bool blob_undo = type_cmpl & TRX_UNDO_MODIFY_BLOB; @@ -602,9 +782,9 @@ table_id_t trx_undo_rec_get_table_id(const trx_undo_rec_t *undo_rec) { type_cmpl flag + 1 byte for the new flag. Total 4 bytes. The new flag is currently unused and is available for future use. */ - ptr = undo_rec + 4; + ptr += 2; } else { - ptr = undo_rec + 3; + ptr++; } /* Skip the UNDO number */ @@ -1153,6 +1333,7 @@ static byte *trx_undo_report_blob_update(page_t *undo_page, dict_index_t *index, succeed, 0 if fail */ static ulint trx_undo_page_report_modify( /*========================*/ + trx_undo_t *undo, /*!< in: undo log object */ page_t *undo_page, /*!< in: undo log page */ trx_t *trx, /*!< in: transaction */ dict_index_t *index, /*!< in: clustered index where update or @@ -1204,7 +1385,7 @@ static ulint trx_undo_page_report_modify( ut_ad(first_free <= UNIV_PAGE_SIZE); - if (trx_undo_left(undo_page, ptr) < 50) { + if (trx_undo_left(undo_page, ptr) < 50 + 9) { /* NOTE: the value 50 must be big enough so that the general fields written below fit on the undo log page */ @@ -1214,6 +1395,18 @@ static ulint trx_undo_page_report_modify( /* Reserve 2 bytes for the pointer to the next undo log record */ ptr += 2; + /* Indicate it's new version, that undo log also stores header offset */ + *ptr++ = (byte)TRX_UNDO_NEW_VERSION_TAG; + /* Store low-two bytes of trx id for verification */ + mach_write_to_2(ptr, (0xFFFF & trx->id)); + ptr += 2; + /* Write page no of header offset */ + mach_write_to_4(ptr, undo->hdr_page_no); + ptr += 4; + /* Write header offset */ + mach_write_to_2(ptr, undo->hdr_offset); + ptr += 2; + /* Store first some general parameters to the undo log */ if (!update) { @@ -1256,6 +1449,7 @@ static ulint trx_undo_page_report_modify( ut_ad(flen == DATA_TRX_ID_LEN); trx_id = trx_read_trx_id(field); + ut_a(SCN_Mgr::is_scn(trx_id) || trx_id == trx->id || index->table->is_temporary()); /* If it is an update of a delete marked record, then we are allowed to ignore blob prefixes if the delete marking was done @@ -2251,13 +2445,13 @@ dberr_t trx_undo_report_row_operation( switch (op_type) { case TRX_UNDO_INSERT_OP: - offset = trx_undo_page_report_insert(undo_page, trx, index, clust_entry, + offset = trx_undo_page_report_insert(undo, undo_page, trx, index, clust_entry, &mtr); break; default: ut_ad(op_type == TRX_UNDO_MODIFY_OP); offset = - trx_undo_page_report_modify(undo_page, trx, index, rec, offsets, + trx_undo_page_report_modify(undo, undo_page, trx, index, rec, offsets, update, cmpl_info, clust_entry, &mtr); } @@ -2391,8 +2585,11 @@ err_exit: mtr_start(&mtr); - undo_page = trx_undo_page_get_s_latched(page_id_t(space_id, page_no), - page_size, &mtr); + { + buf_block_t *block = buf_page_get(page_id_t(space_id, page_no), + page_size, RW_S_LATCH, UT_LOCATION_HERE, &mtr); + undo_page = buf_block_get_frame(block); + } undo_rec = trx_undo_rec_copy(undo_page, static_cast(offset), heap); @@ -2423,7 +2620,7 @@ err_exit: rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_history = purge_sys->view.changes_visible(trx_id, name); + missing_history = purge_sys->view.sees_version(trx_id); if (!missing_history) { *undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap, is_temp); } @@ -2479,6 +2676,10 @@ bool trx_undo_prev_version_build( rec_trx_id = row_get_rec_trx_id(rec, index, offsets); + if (!SCN_Mgr::is_scn(rec_trx_id)) { + rec_trx_id = scn_mgr->get_scn(rec_trx_id, index, roll_ptr); + } + /* REDO rollback segments are used only for non-temporary objects. For temporary objects NON-REDO rollback segments are used. */ bool is_temp = index->table->is_temporary(); @@ -2510,6 +2711,17 @@ bool trx_undo_prev_version_build( ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr, &info_bits); + trx_id_t scn = trx_id; + if (!SCN_Mgr::is_scn(trx_id)) { + /* read from undo header */ + scn = scn_mgr->get_scn(trx_id, index, roll_ptr); + if (scn != TRX_ID_MAX) { + trx_id = scn; + } + } + + ut_a(SCN_Mgr::is_scn(trx_id) || scn == TRX_ID_MAX); + /* (a) If a clustered index record version is such that the trx id stamp in it is bigger than purge_sys->view, then the BLOBs in that version are known to exist (the purge has not @@ -2562,8 +2774,7 @@ bool trx_undo_prev_version_build( rw_lock_s_lock(&purge_sys->latch, UT_LOCATION_HERE); - missing_extern = - purge_sys->view.changes_visible(trx_id, index->table->name); + missing_extern = purge_sys->view.sees_version(trx_id); rw_lock_s_unlock(&purge_sys->latch); diff --git a/storage/innobase/trx/trx0sys.cc b/storage/innobase/trx/trx0sys.cc index b021780266d..d284e03fae2 100644 --- a/storage/innobase/trx/trx0sys.cc +++ b/storage/innobase/trx/trx0sys.cc @@ -68,7 +68,12 @@ void ReadView::check_trx_id_sanity(trx_id_t id, const table_name_t &name) { return; } - if (id >= trx_sys_get_next_trx_id_or_no()) { + if (SCN_Mgr::is_scn(id)) { + //FIXME + return; + } + + if (id >= trx_sys->get_max_trx_id()) { ib::warn(ER_IB_MSG_1196) << "A transaction id" << " in a record of table " << name << " is newer than the" @@ -129,7 +134,8 @@ void trx_sys_write_max_trx_id(void) { sys_header = trx_sysf_get(&mtr); - const trx_id_t max_trx_id = trx_sys->next_trx_id_or_no.load(); + const trx_id_t max_trx_id = std::max(trx_sys->get_max_trx_id(), + trx_sys->get_max_trx_scn()); mlog_write_ull(sys_header + TRX_SYS_TRX_ID_STORE, max_trx_id, &mtr); @@ -155,7 +161,7 @@ trx_id_t trx_sys_oldest_trx_no() { auto trx = UT_LIST_GET_FIRST(trx_sys->serialisation_list); return (trx->no); } - return trx_sys_get_next_trx_id_or_no(); + return trx_sys->get_max_trx_scn(); } void trx_sys_get_binlog_prepared(std::vector &trx_ids) { @@ -495,18 +501,22 @@ purge_pq_t *trx_sys_init_at_db_start(void) { - and one that has acquired the trx_sys_serialisation_mutex. If you decreased the factor 2, the test innodb.max_trx_id should fail. */ - trx_sys->next_trx_id_or_no.store(max_trx_id + + trx_sys->next_trx_id.store(max_trx_id + 2 * trx_sys_get_trx_id_write_margin()); - trx_sys->serialisation_min_trx_no.store(trx_sys->next_trx_id_or_no.load()); - mtr.commit(); + if (trx_sys->next_trx_id % 2 != 0) { + trx_sys->next_trx_id++; + } + trx_sys->next_trx_scn = trx_sys->next_trx_id.load() + 1; + + trx_sys->serialisation_min_trx_no.store(trx_sys->next_trx_scn.load()); #ifdef UNIV_DEBUG /* max_trx_id is the next transaction ID to assign. Initialize maximum transaction number to one less if all transactions are already purged. */ if (trx_sys->rw_max_trx_no == 0) { - trx_sys->rw_max_trx_no = trx_sys_get_next_trx_id_or_no() - 1; + trx_sys->rw_max_trx_no = trx_sys->get_max_trx_scn() - 2; } #endif /* UNIV_DEBUG */ @@ -518,6 +528,8 @@ purge_pq_t *trx_sys_init_at_db_start(void) { trx_lists_init_at_db_start(); + scn_mgr->init(); + /* This mutex is not strictly required, it is here only to satisfy the debug code (assertions). We are still running in single threaded bootstrap mode. */ @@ -546,7 +558,7 @@ purge_pq_t *trx_sys_init_at_db_start(void) { << rows_to_undo << unit << " row operations to undo"; ib::info(ER_IB_MSG_1199) - << "Trx id counter is " << trx_sys_get_next_trx_id_or_no(); + << "Trx id counter is " << trx_sys->get_max_trx_id(); } trx_sys->found_prepared_trx = trx_sys->n_prepared_trx > 0; diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 1040f21d3b8..83fd952ec7b 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -242,6 +242,8 @@ static void trx_init(trx_t *trx) { trx->flush_observer = nullptr; + trx->scn_indexs.clear(); + ++trx->version; } @@ -259,6 +261,8 @@ struct TrxFactory { the constructors of the trx_t members. */ new (trx) trx_t(); + new (&trx->scn_indexs) SCNIndexIds(); + trx_init(trx); trx->state.store(TRX_STATE_NOT_STARTED, std::memory_order_relaxed); @@ -278,6 +282,7 @@ struct TrxFactory { mutex_create(LATCH_ID_TRX, &trx->mutex); mutex_create(LATCH_ID_TRX_UNDO, &trx->undo_mutex); + mutex_create(LATCH_ID_TRX_SCN, &trx->scn_mutex); lock_trx_alloc_locks(trx); } @@ -309,6 +314,7 @@ struct TrxFactory { mutex_free(&trx->mutex); mutex_free(&trx->undo_mutex); + mutex_free(&trx->scn_mutex); trx->mod_tables.~trx_mod_tables_t(); @@ -331,6 +337,8 @@ struct TrxFactory { trx->lock.rec_pool.~lock_pool_t(); trx->lock.table_pool.~lock_pool_t(); + + trx->scn_indexs.~SCNIndexIds(); } /** Enforce any invariants here, this is called before the transaction @@ -502,6 +510,8 @@ static void trx_free(trx_t *&trx) { trx->mod_tables.clear(); + trx->scn_indexs.clear(); + ut_ad(trx->read_view == nullptr); ut_ad(trx->is_dd_trx == false); ut_ad(trx->in_depth == 0); @@ -872,14 +882,11 @@ static trx_t *trx_resurrect_insert( std::memory_order_relaxed); } - /* We give a dummy value for the trx no; this should have no - relevance since purge is not interested in committed - transaction numbers, unless they are in the history - list, in which case it looks the number from the disk based - undo log structure */ - - trx->no = trx->id; - + if (undo->trx_no > 0) { + trx->no = undo->trx_no; + } else { + trx->no = TRX_ID_MAX; + } } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); @@ -977,10 +984,11 @@ static void trx_resurrect_update( if (undo->state != TRX_UNDO_ACTIVE) { trx_resurrect_update_in_prepared_state(trx, undo); - /* We give a dummy value for the trx number */ - - trx->no = trx->id; - + if (undo->trx_no > 0) { + trx->no = undo->trx_no; + } else { + trx->no = TRX_ID_MAX; + } } else { trx->state.store(TRX_STATE_ACTIVE, std::memory_order_relaxed); @@ -1463,7 +1471,9 @@ which case still the trx->no is assigned. static inline bool trx_add_to_serialisation_list(trx_t *trx) { trx_sys_serialisation_mutex_enter(); + mutex_enter(&trx->scn_mutex); trx->no = trx_sys_allocate_trx_no(); + mutex_exit(&trx->scn_mutex); /* Update the latest transaction number. */ ut_d(trx_sys->rw_max_trx_no = trx->no); @@ -1496,7 +1506,7 @@ static inline void trx_erase_from_serialisation_list_low(trx_t *trx) { UT_LIST_GET_FIRST(trx_sys->serialisation_list)->no); } else { - trx_sys->serialisation_min_trx_no.store(trx_sys_get_next_trx_id_or_no()); + trx_sys->serialisation_min_trx_no.store(trx_sys->get_max_trx_scn()); } } @@ -1593,20 +1603,15 @@ static bool trx_write_serialisation_history( temp_mtr.set_log_mode(MTR_LOG_NO_REDO); } - /* If transaction involves insert then truncate undo logs. */ - if (trx->rsegs.m_redo.insert_undo != nullptr) { - trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); - } - if (trx->rsegs.m_noredo.insert_undo != nullptr) { - trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr); + trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr, true); } bool serialised = false; /* If transaction involves update then add rollback segments to purge queue. */ - if (trx->rsegs.m_redo.update_undo != nullptr || + if (trx_is_redo_rseg_updated(trx) || trx->rsegs.m_noredo.update_undo != nullptr) { /* Assign the transaction serialisation number and add these rollback segments to purge trx-no sorted priority queue @@ -1614,7 +1619,7 @@ static bool trx_write_serialisation_history( rollback segments. */ trx_undo_ptr_t *redo_rseg_undo_ptr = - trx->rsegs.m_redo.update_undo != nullptr ? &trx->rsegs.m_redo : nullptr; + trx_is_redo_rseg_updated(trx) ? &trx->rsegs.m_redo : nullptr; trx_undo_ptr_t *temp_rseg_undo_ptr = trx->rsegs.m_noredo.update_undo != nullptr ? &trx->rsegs.m_noredo @@ -1624,11 +1629,13 @@ static bool trx_write_serialisation_history( serialised = trx_serialisation_number_get(trx, redo_rseg_undo_ptr, temp_rseg_undo_ptr); + ulint added_log = 0; /* It is not necessary to obtain trx->undo_mutex here because only a single OS thread is allowed to do the transaction commit for this transaction. */ if (trx->rsegs.m_redo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish(trx->rsegs.m_redo.update_undo, mtr); @@ -1637,28 +1644,40 @@ static bool trx_write_serialisation_history( non-redo update_undo too. This is to avoid immediate invocation of purge as we need to club these 2 segments with same trx-no as single unit. */ - bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr || trx->rsegs.m_redo.insert_undo != nullptr); /* Set flag if GTID information need to persist. */ auto undo_ptr = &trx->rsegs.m_redo; trx_undo_gtid_set(trx, undo_ptr->update_undo, false); - trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, - (update_rseg_len ? 1 : 0), mtr); + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, false, + (update_rseg_len ? added_log : 0), mtr); } DBUG_EXECUTE_IF("ib_trx_crash_during_commit", DBUG_SUICIDE();); + if (trx->rsegs.m_redo.insert_undo != nullptr) { + page_t *undo_hdr_page; + added_log++; + + undo_hdr_page = + trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr); + + bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr); + auto undo_ptr = &trx->rsegs.m_redo; + trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len, true, + (update_rseg_len ? added_log : 0), mtr); + } + if (trx->rsegs.m_noredo.update_undo != nullptr) { page_t *undo_hdr_page; + added_log++; undo_hdr_page = trx_undo_set_state_at_finish( trx->rsegs.m_noredo.update_undo, &temp_mtr); - ulint n_added_logs = (redo_rseg_undo_ptr != nullptr) ? 2 : 1; - - trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, - n_added_logs, &temp_mtr); + trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true, false, + added_log, &temp_mtr); } } @@ -1818,6 +1837,36 @@ static void trx_update_mod_tables_timestamp(trx_t *trx) /*!< in: transaction */ trx->mod_tables.clear(); } +static void trx_update_index_scn(trx_t *trx) { + if (trx->scn_indexs.empty()) { + return; + } + + trx_id_t scn = trx->no; + if (scn == TRX_ID_MAX) { + scn = trx_sys->get_next_trx_scn(); + } + + for (auto &item : trx->scn_indexs) { + dict_index_t *index = dict_index_get_by_id(item.first, item.second); + + if (index != nullptr) { + if (unlikely(index->trx_id > trx->id)) { + trx_id_t curr_scn = scn_mgr->get_scn_fast(index->trx_id); + + if (curr_scn != TRX_ID_MAX) { + scn = std::max(curr_scn, scn); + } + } + + index->trx_scn = scn; + index->table->release(); + } + } + + trx->scn_indexs.clear(); +} + /** Erase the transaction from running transaction lists and serialization list. Active RW transaction list of a MVCC snapshot(ReadView::prepare) @@ -1844,6 +1893,7 @@ static void trx_erase_lists(trx_t *trx) { trx_sys->mvcc->view_close(trx->read_view, true); } } + DEBUG_SYNC_C("after_trx_erase_lists"); } @@ -1866,6 +1916,10 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { gtid_persistor.get_gtid_info(trx, gtid_desc); } + if (trx->id > 0) { + trx_update_index_scn(trx); + } + if (trx_sys_latch_is_needed) { trx_sys_mutex_enter(); } @@ -1947,6 +2001,9 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialised) { trx_erase_from_serialisation_list_low(trx); trx_sys_serialisation_mutex_exit(); + + ut_a(trx->no != TRX_ID_MAX); + scn_mgr->store_scn(trx->id, trx->no); } lock_trx_release_locks(trx); @@ -2028,10 +2085,6 @@ written */ gtid_persistor.set_persist_gtid(trx, false); if (mtr != nullptr) { - if (trx->rsegs.m_redo.insert_undo != nullptr) { - trx_undo_insert_cleanup(&trx->rsegs.m_redo, false); - } - if (trx->rsegs.m_noredo.insert_undo != nullptr) { trx_undo_insert_cleanup(&trx->rsegs.m_noredo, true); } @@ -2309,7 +2362,7 @@ void trx_cleanup_at_db_startup(trx_t *trx) /*!< in: transaction */ within the same transaction will get the same read view, which is created when this function is first called for a new started transaction. @return consistent read view */ -ReadView *trx_assign_read_view(trx_t *trx) /*!< in/out: active transaction */ +ReadView *trx_assign_read_view(trx_t *trx, bool is_shared) /*!< in/out: active transaction */ { ut_ad(trx_can_be_handled_by_current_thread_or_is_hp_victim(trx)); ut_ad(trx->state.load(std::memory_order_relaxed) == TRX_STATE_ACTIVE); @@ -2319,7 +2372,8 @@ ReadView *trx_assign_read_view(trx_t *trx) /*!< in/out: active transaction */ return (nullptr); } else if (!MVCC::is_view_active(trx->read_view)) { - trx_sys->mvcc->view_open(trx->read_view, trx); + trx_sys->mvcc->view_open(trx->read_view, trx, is_shared); + trx->read_view->set_trx(trx); } return (trx->read_view); diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc index fd96065b94f..0d3f02d1d2c 100644 --- a/storage/innobase/trx/trx0undo.cc +++ b/storage/innobase/trx/trx0undo.cc @@ -294,12 +294,14 @@ trx_undo_rec_t *trx_undo_get_next_rec( @param[in,out] mtr Mini-transaction @return undo log record, the page latched, NULL if none */ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, + trx_id_t *modifier_trx_no, space_id_t space, const page_size_t &page_size, page_no_t page_no, ulint offset, ulint mode, mtr_t *mtr) { page_t *undo_page; trx_undo_rec_t *rec; + ulint type; const page_id_t page_id(space, page_no); @@ -312,6 +314,15 @@ trx_undo_rec_t *trx_undo_get_first_rec(trx_id_t *modifier_trx_id, if (modifier_trx_id != nullptr) { trx_ulogf_t *undo_header = undo_page + offset; *modifier_trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + ut_a(modifier_trx_no != nullptr); + *modifier_trx_no = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + ut_a(SCN_Mgr::is_scn(*modifier_trx_no)); + + /* For insert undo, return null */ + type = mtr_read_ulint(undo_page + TRX_UNDO_PAGE_HDR, MLOG_2BYTES, mtr); + if (type == TRX_UNDO_INSERT) { + return nullptr; + } } rec = trx_undo_page_get_first_rec(undo_page, page_no, offset); @@ -1198,7 +1209,7 @@ loop: mtr.set_log_mode(MTR_LOG_NO_REDO); } - rec = trx_undo_get_first_rec(nullptr, rseg->space_id, rseg->page_size, + rec = trx_undo_get_first_rec(nullptr, nullptr, rseg->space_id, rseg->page_size, hdr_page_no, hdr_offset, RW_X_LATCH, &mtr); if (rec == nullptr) { /* Already empty */ @@ -1292,6 +1303,7 @@ static trx_undo_t *trx_undo_mem_init( ulint type; ulint state; trx_id_t trx_id; + trx_id_t trx_no; ulint offset; fil_addr_t last_addr; page_t *last_page; @@ -1316,6 +1328,8 @@ static trx_undo_t *trx_undo_mem_init( trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID); + trx_no = mach_read_from_8(undo_header + TRX_UNDO_TRX_NO); + auto flag = mtr_read_ulint(undo_header + TRX_UNDO_FLAGS, MLOG_1BYTE, mtr); bool xid_exists = ((flag & TRX_UNDO_FLAG_XID) != 0); @@ -1333,6 +1347,8 @@ static trx_undo_t *trx_undo_mem_init( undo->dict_operation = mtr_read_ulint(undo_header + TRX_UNDO_DICT_TRANS, MLOG_1BYTE, mtr); + undo->trx_no = trx_no; + undo->flag = flag; undo->m_gtid_storage = trx_undo_t::Gtid_storage::NONE; @@ -1640,8 +1656,9 @@ static trx_undo_t *trx_undo_reuse_cached(trx_rseg_t *rseg, ulint type, auto undo_page = trx_undo_page_get(page_id_t(undo->space, undo->hdr_page_no), undo->page_size, mtr); - ulint offset; + ulint offset = trx_undo_header_create(undo_page, trx_id, mtr); +#if 0 if (type == TRX_UNDO_INSERT) { offset = trx_undo_insert_header_reuse(undo_page, trx_id, mtr); gtid_storage = trx_undo_t::Gtid_storage::NONE; @@ -1652,6 +1669,7 @@ static trx_undo_t *trx_undo_reuse_cached(trx_rseg_t *rseg, ulint type, offset = trx_undo_header_create(undo_page, trx_id, mtr); } +#endif trx_undo_header_add_space_for_xid(undo_page, undo_page + offset, mtr, gtid_storage); @@ -1798,7 +1816,8 @@ func_exit: @return undo log segment header page, x-latched */ page_t *trx_undo_set_state_at_finish( trx_undo_t *undo, /*!< in: undo log memory copy */ - mtr_t *mtr) /*!< in: mtr */ + mtr_t *mtr, /*!< in: mtr */ + bool is_temp) /*!< in: true if it's tmp undo */ { trx_usegf_t *seg_hdr; trx_upagef_t *page_hdr; @@ -1816,8 +1835,7 @@ page_t *trx_undo_set_state_at_finish( if (undo->size == 1 && mach_read_from_2(page_hdr + TRX_UNDO_PAGE_FREE) < TRX_UNDO_PAGE_REUSE_LIMIT) { state = TRX_UNDO_CACHED; - - } else if (undo->type == TRX_UNDO_INSERT) { + } else if (is_temp && (undo->type == TRX_UNDO_INSERT)) { state = TRX_UNDO_TO_FREE; } else { state = TRX_UNDO_TO_PURGE; @@ -1921,25 +1939,38 @@ skip updating it. @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr, page_t *undo_page, bool update_rseg_history_len, - + bool is_insert, ulint n_added_logs, mtr_t *mtr) { trx_rseg_t *rseg; trx_undo_t *undo; - undo = undo_ptr->update_undo; + if (is_insert) { + undo = undo_ptr->insert_undo; + } else { + undo = undo_ptr->update_undo; + } + rseg = undo_ptr->rseg; ut_ad(mutex_own(&(rseg->mutex))); trx_purge_add_update_undo_to_history( - trx, undo_ptr, undo_page, update_rseg_history_len, n_added_logs, mtr); + trx, undo_ptr, undo_page, update_rseg_history_len, is_insert, n_added_logs, mtr); - UT_LIST_REMOVE(rseg->update_undo_list, undo); - - undo_ptr->update_undo = nullptr; + if (is_insert) { + UT_LIST_REMOVE(rseg->insert_undo_list, undo); + undo_ptr->insert_undo = nullptr; + } else { + UT_LIST_REMOVE(rseg->update_undo_list, undo); + undo_ptr->update_undo = nullptr; + } if (undo->state == TRX_UNDO_CACHED) { - UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + if (is_insert) { + UT_LIST_ADD_FIRST(rseg->insert_undo_cached, undo); + } else { + UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo); + } MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED); } else {