diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 386dce6..0e49eb7 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -279,6 +279,7 @@ static bool innodb_optimize_fulltext_only = FALSE; static char *innodb_version_str = (char *)INNODB_VERSION_STR; static Innodb_data_lock_inspector innodb_data_lock_inspector; +static ulong innobase_rseg_init_threads = 1; /** Note we cannot use rec_format_enum because we do not allow COMPRESSED row format for innodb_default_row_format option. */ @@ -750,6 +751,7 @@ static PSI_thread_info all_innodb_threads[] = { PSI_KEY(fts_parallel_tokenization_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(srv_ts_alter_encrypt_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(parallel_read_thread, 0, 0, PSI_DOCUMENT_ME), + PSI_KEY(parallel_rseg_init_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(meb::redo_log_archive_consumer_thread, 0, 0, PSI_DOCUMENT_ME)}; #endif /* UNIV_PFS_THREAD */ @@ -4453,6 +4455,8 @@ static int innodb_init_params() { srv_buf_pool_size = srv_buf_pool_curr_size; + srv_rseg_init_threads = (ulint)innobase_rseg_init_threads; + innodb_log_checksums_func_update(srv_log_checksums); #ifdef HAVE_LINUX_LARGE_PAGES @@ -22031,6 +22035,12 @@ static MYSQL_SYSVAR_ENUM( " NULLS_UNEQUAL and NULLS_IGNORED", nullptr, nullptr, SRV_STATS_NULLS_EQUAL, &innodb_stats_method_typelib); +static MYSQL_SYSVAR_ULONG(rseg_init_threads, + innobase_rseg_init_threads, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY, + "Init undo log segments in parallel mode using assigned threads", + NULL, NULL, 1, 1, 128, 0); + #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG static MYSQL_SYSVAR_UINT( change_buffering_debug, ibuf_debug, PLUGIN_VAR_RQCMDARG, @@ -22415,6 +22425,7 @@ static SYS_VAR *innobase_system_variables[] = { MYSQL_SYSVAR(compression_failure_threshold_pct), MYSQL_SYSVAR(compression_pad_pct_max), MYSQL_SYSVAR(default_row_format), + MYSQL_SYSVAR(rseg_init_threads), MYSQL_SYSVAR(redo_log_archive_dirs), MYSQL_SYSVAR(redo_log_encrypt), MYSQL_SYSVAR(print_ddl_logs), diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index 6868ff5..1bb2b82 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -354,6 +354,7 @@ extern FILE *srv_misc_tmpfile; /* Server parameters which are read from the initfile */ extern char *srv_data_home; +extern ulint srv_rseg_init_threads; /** Number of pages per doublewrite thread/segment */ extern ulong srv_dblwr_pages; @@ -786,6 +787,7 @@ extern mysql_pfs_key_t srv_worker_thread_key; extern mysql_pfs_key_t trx_recovery_rollback_thread_key; extern mysql_pfs_key_t srv_ts_alter_encrypt_thread_key; extern mysql_pfs_key_t parallel_read_thread_key; +extern mysql_pfs_key_t parallel_rseg_init_thread_key; #endif /* UNIV_PFS_THREAD */ #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index cf963f6..19abca1 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -1051,6 +1051,9 @@ struct trx_purge_t { /** Set of all THDs allocated by the purge system. */ ut::unordered_set thds; + + /** Set of all rseg queue. */ + std::vector rsegs_queue; }; /** Choose the rollback segment with the smallest trx_no. */ diff --git a/storage/innobase/include/trx0rseg.h b/storage/innobase/include/trx0rseg.h index 60164d6..4a5f204 100644 --- a/storage/innobase/include/trx0rseg.h +++ b/storage/innobase/include/trx0rseg.h @@ -130,6 +130,10 @@ They require an upgrade of undo tablespaces and that cannot happen with active undo logs. @param[in] purge_queue queue of rsegs to purge */ void trx_rsegs_init(purge_pq_t *purge_queue); +void trx_rsegs_parallel_init(purge_pq_t* purge_queue); + +void trx_purge_sys_mem_create(); +void trx_purge_sys_initialize(ulint n_purge_threads, purge_pq_t *purge_queue); /** Create and initialize a rollback segment object. Some of the values for the fields are read from the segment header page. diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 54ed05d..940346e 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -133,6 +133,8 @@ char *srv_doublewrite_dir = NULL; deliminated by ';', i.e the FIL_PATH_SEPARATOR. */ char *srv_innodb_directories = nullptr; +ulint srv_rseg_init_threads = 1; + /** Undo tablespace directories. This can be multiple paths separated by ';' and can also be absolute paths. */ char *srv_undo_dir = nullptr; diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index b5c08f6..ad66e08 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -199,6 +199,7 @@ mysql_pfs_key_t srv_purge_thread_key; mysql_pfs_key_t srv_worker_thread_key; mysql_pfs_key_t trx_recovery_rollback_thread_key; mysql_pfs_key_t srv_ts_alter_encrypt_thread_key; +mysql_pfs_key_t parallel_rseg_init_thread_key; #endif /* UNIV_PFS_THREAD */ #ifdef HAVE_PSI_STAGE_INTERFACE @@ -2387,12 +2388,14 @@ files_checked: after the double write buffers haves been created. */ trx_sys_create_sys_pages(); + trx_purge_sys_mem_create(); + purge_queue = trx_sys_init_at_db_start(); /* The purge system needs to create the purge view and therefore requires that the trx_sys is inited. */ - trx_purge_sys_create(srv_threads.m_purge_workers_n, purge_queue); + trx_purge_sys_initialize(srv_threads.m_purge_workers_n, purge_queue); err = dict_create(); @@ -2676,6 +2679,11 @@ files_checked: return (srv_init_abort(err)); } + trx_purge_sys_mem_create(); + + /* The purge system needs to create the purge view and + therefore requires that the trx_sys is inited. */ + purge_queue = trx_sys_init_at_db_start(); if (srv_is_upgrade_mode) { @@ -2698,7 +2706,7 @@ files_checked: /* The purge system needs to create the purge view and therefore requires that the trx_sys and trx lists were initialized in trx_sys_init_at_db_start(). */ - trx_purge_sys_create(srv_threads.m_purge_workers_n, purge_queue); + trx_purge_sys_initialize(srv_threads.m_purge_workers_n, purge_queue); } /* Open temp-tablespace and keep it open until shutdown. */ diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index 27a0fa0..24b4476 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -203,6 +203,59 @@ static que_t *trx_purge_graph_build(trx_t *trx, ulint n_purge_threads) { return (fork); } +void trx_purge_sys_mem_create() { + purge_sys = static_cast(ut_zalloc_nokey(sizeof(*purge_sys))); + + purge_sys->state = PURGE_STATE_INIT; + purge_sys->event = os_event_create(); + + new (&purge_sys->iter) purge_iter_t; + new (&purge_sys->limit) purge_iter_t; + new (&purge_sys->undo_trunc) undo::Truncate; + new (&purge_sys->thds) ut::unordered_set; +#ifdef UNIV_DEBUG + new (&purge_sys->done) purge_iter_t; +#endif /* UNIV_DEBUG */ + + rw_lock_create(trx_purge_latch_key, &purge_sys->latch, SYNC_PURGE_LATCH); + + mutex_create(LATCH_ID_PURGE_SYS_PQ, &purge_sys->pq_mutex); + + purge_sys->heap = mem_heap_create(8 * 1024); +} + +void trx_purge_sys_initialize(ulint n_purge_threads, purge_pq_t *purge_queue) { + /* Take ownership of purge_queue, we are responsible for freeing it. */ + purge_sys->purge_queue = purge_queue; + + ut_a(n_purge_threads > 0); + + purge_sys->sess = sess_open(); + + purge_sys->trx = purge_sys->sess->trx; + + ut_a(purge_sys->trx->sess == purge_sys->sess); + + /* A purge transaction is not a real transaction, we use a transaction + here only because the query threads code requires it. It is otherwise + quite unnecessary. We should get rid of it eventually. */ + purge_sys->trx->id = 0; + purge_sys->trx->start_time = ut_time(); + purge_sys->trx->state = TRX_STATE_ACTIVE; + purge_sys->trx->op_info = "purge trx"; + purge_sys->trx->purge_sys_trx = true; + + purge_sys->query = trx_purge_graph_build(purge_sys->trx, n_purge_threads); + + new (&purge_sys->view) ReadView(); + + trx_sys->mvcc->clone_oldest_view(&purge_sys->view); + + purge_sys->view_active = true; + + purge_sys->rseg_iter = UT_NEW_NOKEY(TrxUndoRsegsIterator(purge_sys)); +} + void trx_purge_sys_create(ulint n_purge_threads, purge_pq_t *purge_queue) { purge_sys = static_cast(ut_zalloc_nokey(sizeof(*purge_sys))); diff --git a/storage/innobase/trx/trx0rseg.cc b/storage/innobase/trx/trx0rseg.cc index bcdbc42..a952084 100644 --- a/storage/innobase/trx/trx0rseg.cc +++ b/storage/innobase/trx/trx0rseg.cc @@ -44,6 +44,8 @@ this program; if not, write to the Free Software Foundation, Inc., #include "trx0purge.h" #include "trx0undo.h" +static int active_rseg_init_threads = 1; + /** Creates a rollback segment header. This function is called only when a new rollback segment is created in the database. @@ -217,10 +219,9 @@ static void trx_rseg_persist_gtid(trx_rseg_t *rseg, trx_id_t gtid_trx_no) { } } -trx_rseg_t *trx_rseg_mem_create(ulint id, space_id_t space_id, +trx_rseg_t *trx_rseg_mem_initialize(ulint id, space_id_t space_id, page_no_t page_no, const page_size_t &page_size, - trx_id_t gtid_trx_no, purge_pq_t *purge_queue, - mtr_t *mtr) { + purge_pq_t *purge_queue) { auto rseg = static_cast(ut_zalloc_nokey(sizeof(trx_rseg_t))); rseg->id = id; @@ -242,7 +243,16 @@ trx_rseg_t *trx_rseg_mem_create(ulint id, space_id_t space_id, UT_LIST_INIT(rseg->insert_undo_list, &trx_undo_t::undo_list); UT_LIST_INIT(rseg->insert_undo_cached, &trx_undo_t::undo_list); - auto rseg_header = trx_rsegf_get_new(space_id, page_no, page_size, mtr); + return (rseg); +} + +trx_rseg_t *trx_rseg_physical_initialize( + trx_rseg_t* rseg, + purge_pq_t* purge_queue, + trx_id_t gtid_trx_no, + mtr_t* mtr) +{ + auto rseg_header = trx_rsegf_get_new(rseg->space_id, rseg->page_no, rseg->page_size, mtr); rseg->max_size = mtr_read_ulint(rseg_header + TRX_RSEG_MAX_SIZE, MLOG_4BYTES, mtr); @@ -258,7 +268,7 @@ trx_rseg_t *trx_rseg_mem_create(ulint id, space_id_t space_id, auto len = flst_get_len(rseg_header + TRX_RSEG_HISTORY); if (len > 0) { - trx_sys->rseg_history_len += len; + os_atomic_increment_ulint(&trx_sys->rseg_history_len, len); /* Extract GTID from history and send to GTID persister. */ trx_rseg_persist_gtid(rseg, gtid_trx_no); @@ -279,7 +289,9 @@ trx_rseg_t *trx_rseg_mem_create(ulint id, space_id_t space_id, #ifdef UNIV_DEBUG /* Update last transaction number during recovery. */ if (rseg->last_trx_no > trx_sys->rw_max_trx_no) { + mutex_enter(&purge_sys->pq_mutex); trx_sys->rw_max_trx_no = rseg->last_trx_no; + mutex_exit(&purge_sys->pq_mutex); } #endif /* UNIV_DEBUG */ @@ -295,10 +307,12 @@ trx_rseg_t *trx_rseg_mem_create(ulint id, space_id_t space_id, mutex is needed here. */ ut_ad(srv_is_being_started); - ut_ad(space_id == TRX_SYS_SPACE || - (srv_is_upgrade_mode != undo::is_reserved(space_id))); + ut_ad(rseg->space_id == TRX_SYS_SPACE || + (srv_is_upgrade_mode != undo::is_reserved(rseg->space_id))); + mutex_enter(&purge_sys->pq_mutex); purge_queue->push(elem); + mutex_exit(&purge_sys->pq_mutex); } } else { rseg->last_page_no = FIL_NULL; @@ -325,6 +339,279 @@ page_no_t trx_rseg_get_page_no(space_id_t space_id, ulint rseg_id) { return (page_no); } +/** +@return OS_THREAD_DUMMY_RETURN */ + +void trx_rseg_init_thread(void* arg) +{ + trx_rseg_t* rseg = NULL; + purge_pq_t* purge_queue = (purge_pq_t *)arg; + auto >id_persistor = clone_sys->get_gtid_persistor(); + + while (true) { + mutex_enter(&purge_sys->pq_mutex); + if (purge_sys->rsegs_queue.empty()) { + mutex_exit(&purge_sys->pq_mutex); + break; + } + + mtr_t mtr; + rseg = purge_sys->rsegs_queue.back(); + purge_sys->rsegs_queue.pop_back(); + mutex_exit(&purge_sys->pq_mutex); + + mtr_start(&mtr); + trx_rseg_physical_initialize( + rseg, + purge_queue, + gtid_persistor.get_oldest_trx_no(), + &mtr); + mtr_commit(&mtr); + } + os_atomic_decrement_ulint(&active_rseg_init_threads, 1); +} + +trx_rseg_t *trx_rseg_mem_create(ulint id, space_id_t space_id, + page_no_t page_no, const page_size_t &page_size, + trx_id_t gtid_trx_no, purge_pq_t *purge_queue, + mtr_t *mtr) { + auto rseg = static_cast(ut_zalloc_nokey(sizeof(trx_rseg_t))); + + rseg->id = id; + rseg->space_id = space_id; + rseg->page_size.copy_from(page_size); + rseg->page_no = page_no; + rseg->trx_ref_count = 0; + + if (fsp_is_system_temporary(space_id)) { + mutex_create(LATCH_ID_TEMP_SPACE_RSEG, &rseg->mutex); + } else if (fsp_is_undo_tablespace(space_id)) { + mutex_create(LATCH_ID_UNDO_SPACE_RSEG, &rseg->mutex); + } else { + mutex_create(LATCH_ID_TRX_SYS_RSEG, &rseg->mutex); + } + + UT_LIST_INIT(rseg->update_undo_list, &trx_undo_t::undo_list); + UT_LIST_INIT(rseg->update_undo_cached, &trx_undo_t::undo_list); + UT_LIST_INIT(rseg->insert_undo_list, &trx_undo_t::undo_list); + UT_LIST_INIT(rseg->insert_undo_cached, &trx_undo_t::undo_list); + + auto rseg_header = trx_rsegf_get_new(space_id, page_no, page_size, mtr); + + rseg->max_size = + mtr_read_ulint(rseg_header + TRX_RSEG_MAX_SIZE, MLOG_4BYTES, mtr); + + /* Initialize the undo log lists according to the rseg header */ + + auto sum_of_undo_sizes = trx_undo_lists_init(rseg); + + rseg->set_curr_size( + mtr_read_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr) + + 1 + sum_of_undo_sizes); + + auto len = flst_get_len(rseg_header + TRX_RSEG_HISTORY); + + if (len > 0) { + trx_sys->rseg_history_len += len; + + /* Extract GTID from history and send to GTID persister. */ + trx_rseg_persist_gtid(rseg, gtid_trx_no); + + auto node_addr = trx_purge_get_log_from_hist( + flst_get_last(rseg_header + TRX_RSEG_HISTORY, mtr)); + + rseg->last_page_no = node_addr.page; + rseg->last_offset = node_addr.boffset; + + auto undo_log_hdr = + trx_undo_page_get(page_id_t(rseg->space_id, node_addr.page), + rseg->page_size, mtr) + + node_addr.boffset; + + rseg->last_trx_no = mach_read_from_8(undo_log_hdr + TRX_UNDO_TRX_NO); + +#ifdef UNIV_DEBUG + /* Update last transaction number during recovery. */ + if (rseg->last_trx_no > trx_sys->rw_max_trx_no) { + trx_sys->rw_max_trx_no = rseg->last_trx_no; + } +#endif /* UNIV_DEBUG */ + + rseg->last_del_marks = + mtr_read_ulint(undo_log_hdr + TRX_UNDO_DEL_MARKS, MLOG_2BYTES, mtr); + + TrxUndoRsegs elem(rseg->last_trx_no); + elem.push_back(rseg); + + if (rseg->last_page_no != FIL_NULL) { + /* The only time an rseg is added that has existing + undo is when the server is being started. So no + mutex is needed here. */ + ut_ad(srv_is_being_started); + + ut_ad(space_id == TRX_SYS_SPACE || + (srv_is_upgrade_mode != undo::is_reserved(space_id))); + + purge_queue->push(elem); + } + } else { + rseg->last_page_no = FIL_NULL; + } + + return (rseg); +} + +/** Read each rollback segment slot in the TRX_SYS page and the RSEG_ARRAY +page of each undo tablespace. Create trx_rseg_t objects for all rollback +segments found. This runs at database startup and initializes the in-memory +lists of trx_rseg_t objects. We need to look at all slots in TRX_SYS and +each RSEG_ARRAY page because we need to look for any existing undo log that +may need to be recovered by purge. No latch is needed since this is still +single-threaded startup. If we find existing rseg slots in TRX_SYS page +that reference undo tablespaces and have active undo logs, then quit. +They require an upgrade of undo tablespaces and that cannot happen with +active undo logs. +@param[in] purge_queue queue of rsegs to purge */ +void trx_rsegs_init_start(purge_pq_t *purge_queue) { + trx_sys->rseg_history_len = 0; + + ulint slot; + mtr_t mtr; + space_id_t space_id; + page_no_t page_no; + trx_rseg_t *rseg = nullptr; + + /* Get GTID transaction number from SYS */ + mtr.start(); + trx_sysf_t *sys_header = trx_sysf_get(&mtr); + auto page = sys_header - TRX_SYS; + auto gtid_trx_no = mach_read_from_8(page + TRX_SYS_TRX_NUM_GTID); + + mtr.commit(); + + auto >id_persistor = clone_sys->get_gtid_persistor(); + gtid_persistor.set_oldest_trx_no_recovery(gtid_trx_no); + + for (slot = 0; slot < TRX_SYS_N_RSEGS; slot++) { + mtr.start(); + trx_sysf_t *sys_header = trx_sysf_get(&mtr); + + page_no = trx_sysf_rseg_get_page_no(sys_header, slot, &mtr); + + if (page_no != FIL_NULL) { + space_id = trx_sysf_rseg_get_space(sys_header, slot, &mtr); + + if (!undo::is_active_truncate_log_present(undo::id2num(space_id))) { + /* Create the trx_rseg_t object. + Note that all tablespaces with rollback segments + use univ_page_size. (system, temp & undo) */ + rseg = trx_rseg_mem_initialize( + slot, space_id, page_no, univ_page_size, + purge_queue); + + ut_a(rseg->id == slot); + + purge_sys->rsegs_queue.push_back(rseg); + + trx_sys->rsegs.push_back(rseg); + } + } + mtr.commit(); + } + + undo::spaces->s_lock(); + for (auto undo_space : undo::spaces->m_spaces) { + /* Remember the size of the purge queue before processing this + undo tablespace. */ + size_t purge_queue_size = purge_queue->size(); + + undo_space->rsegs()->x_lock(); + + for (slot = 0; slot < FSP_MAX_ROLLBACK_SEGMENTS; slot++) { + page_no = trx_rseg_get_page_no(undo_space->id(), slot); + + /* There are no gaps in an RSEG_ARRAY page. New rsegs + are added sequentially and never deleted until the + undo tablespace is truncated.*/ + if (page_no == FIL_NULL) { + break; + } + + mtr.start(); + + /* Create the trx_rseg_t object. + Note that all tablespaces with rollback segments + use univ_page_size. */ + rseg = + trx_rseg_mem_initialize( + slot, + undo_space->id(), + page_no, + univ_page_size, + purge_queue); + + ut_a(rseg->id == slot); + + undo_space->rsegs()->push_back(rseg); + purge_sys->rsegs_queue.push_back(rseg); + + mtr.commit(); + } + undo_space->rsegs()->x_unlock(); + + /* If there are no undo logs in this explicit undo tablespace at + startup, mark it empty so that it will not be used until the state + recorded in the DD can be applied in apply_dd_undo_state(). */ + if (undo_space->is_explicit() && !undo_space->is_empty()) { + size_t cur_size = purge_queue->size(); + if (purge_queue_size == cur_size) { + undo_space->set_empty(); + } + } + } + undo::spaces->s_unlock(); +} + +void +trx_rsegs_init_end() +{ + while (os_atomic_increment_ulint(&active_rseg_init_threads, 0) != 0) + os_thread_sleep(100); + + for (ulint i = 0; i < purge_sys->rsegs_queue.size(); i++) { + trx_rseg_t* rseg = purge_sys->rsegs_queue.at(i); + ut_a(rseg->get_curr_size() > 0); + } + + purge_sys->rsegs_queue.clear(); +} + +/** +@param[in] purge_queue rseg queue*/ +void +trx_rsegs_parallel_init( +/*================*/ + purge_pq_t* purge_queue) /*!< in: rseg queue */ +{ + purge_sys->rsegs_queue.clear(); + active_rseg_init_threads = srv_rseg_init_threads; + + trx_rsegs_init_start(purge_queue); + + if (purge_sys->rsegs_queue.empty()) { + return; + } + + for (ulint i = 0; i < srv_rseg_init_threads; i++) { + auto thread = os_thread_create(parallel_rseg_init_thread_key, + trx_rseg_init_thread, + (void *)purge_queue); + thread.start(); + } + + trx_rsegs_init_end(); +} + /** Read each rollback segment slot in the TRX_SYS page and the RSEG_ARRAY page of each undo tablespace. Create trx_rseg_t objects for all rollback segments found. This runs at database startup and initializes the in-memory diff --git a/storage/innobase/trx/trx0sys.cc b/storage/innobase/trx/trx0sys.cc index 19dc9c5..dcc8da3 100644 --- a/storage/innobase/trx/trx0sys.cc +++ b/storage/innobase/trx/trx0sys.cc @@ -426,7 +426,11 @@ purge_pq_t *trx_sys_init_at_db_start(void) { /* Create the memory objects for all the rollback segments referred to in the TRX_SYS page or any undo tablespace RSEG_ARRAY page. */ - trx_rsegs_init(purge_queue); + if (srv_rseg_init_threads > 1) { + trx_rsegs_parallel_init(purge_queue); + } else { + trx_rsegs_init(purge_queue); + } } /* VERY important: after the database is started, max_trx_id value is