diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc index 5d6d5ae..8cc01a0 100644 --- a/storage/innobase/fts/fts0fts.cc +++ b/storage/innobase/fts/fts0fts.cc @@ -260,16 +260,18 @@ static const char* fts_config_table_insert_values_sql = "INSERT INTO \"%s\" VALUES ('" FTS_TABLE_STATE "', '0');\n"; -/****************************************************************//** -Run SYNC on the table, i.e., write out data from the cache to the +/** Run SYNC on the table, i.e., write out data from the cache to the FTS auxiliary INDEX table and clear the cache at the end. -@return DB_SUCCESS if all OK */ +@param[in,out] sync sync state +@param[in] unlock_cache whether unlock cache lock when write node +@pram[in] wait whether wait when a sync is in progress +@return DB_SUCCESS if all OK */ static dberr_t fts_sync( -/*=====*/ - fts_sync_t* sync) /*!< in: sync state */ - __attribute__((nonnull)); + fts_sync_t* sync, + bool unlock_cache, + bool wait); /****************************************************************//** Release all resources help by the words rb tree e.g., the node ilist. */ @@ -653,6 +655,7 @@ fts_cache_create( mem_heap_zalloc(heap, sizeof(fts_sync_t))); cache->sync->table = table; + cache->sync->event = os_event_create(); /* Create the index cache vector that will hold the inverted indexes. */ cache->indexes = ib_vector_create( @@ -1207,6 +1210,7 @@ fts_cache_destroy( mutex_free(&cache->optimize_lock); mutex_free(&cache->deleted_lock); mutex_free(&cache->doc_id_lock); + os_event_free(cache->sync->event); if (cache->stopword_info.cached_stopword) { rbt_free(cache->stopword_info.cached_stopword); @@ -1435,7 +1439,7 @@ fts_cache_add_doc( ib_vector_last(word->nodes)); } - if (fts_node == NULL + if (fts_node == NULL || fts_node->synced || fts_node->ilist_size > FTS_ILIST_MAX_SIZE || doc_id < fts_node->last_doc_id) { @@ -3553,16 +3557,28 @@ fts_add_doc_by_id( get_doc->index_cache, doc_id, doc.tokens); + bool need_sync = false; + if ((cache->total_size > fts_max_cache_size / 5 + || fts_need_sync) + && !cache->sync->in_progress) { + need_sync = true; + } + rw_lock_x_unlock(&table->fts->cache->lock); DBUG_EXECUTE_IF( "fts_instrument_sync", - fts_sync(cache->sync); + fts_optimize_request_sync_table(table); + os_event_wait(cache->sync->event); + ); + + DBUG_EXECUTE_IF( + "fts_instrument_sync_debug", + fts_sync(cache->sync, true, true); ); - if (cache->total_size > fts_max_cache_size - || fts_need_sync) { - fts_sync(cache->sync); + if (need_sync) { + fts_optimize_request_sync_table(table); } mtr_start(&mtr); @@ -3933,16 +3949,17 @@ fts_sync_add_deleted_cache( return(error); } -/*********************************************************************//** -Write the words and ilist to disk. +/** Write the words and ilist to disk. +@param[in,out] trx transaction +@param[in] index_cache index cache +@param[in] unlock_cache whether unlock cache when write node @return DB_SUCCESS if all went well else error code */ static __attribute__((nonnull, warn_unused_result)) dberr_t fts_sync_write_words( -/*=================*/ - trx_t* trx, /*!< in: transaction */ - fts_index_cache_t* - index_cache) /*!< in: index cache */ + trx_t* trx, + fts_index_cache_t* index_cache, + bool unlock_cache) { fts_table_t fts_table; ulint n_nodes = 0; @@ -3950,8 +3967,8 @@ fts_sync_write_words( const ib_rbt_node_t* rbt_node; dberr_t error = DB_SUCCESS; ibool print_error = FALSE; -#ifdef FTS_DOC_STATS_DEBUG dict_table_t* table = index_cache->index->table; +#ifdef FTS_DOC_STATS_DEBUG ulint n_new_words = 0; #endif /* FTS_DOC_STATS_DEBUG */ @@ -3964,7 +3981,7 @@ fts_sync_write_words( since we want to free the memory used during caching. */ for (rbt_node = rbt_first(index_cache->words); rbt_node; - rbt_node = rbt_first(index_cache->words)) { + rbt_node = rbt_next(index_cache->words, rbt_node)) { ulint i; ulint selected; @@ -3997,27 +4014,43 @@ fts_sync_write_words( } #endif /* FTS_DOC_STATS_DEBUG */ - n_nodes += ib_vector_size(word->nodes); - - /* We iterate over all the nodes even if there was an error, - this is to free the memory of the fts_node_t elements. */ + /* We iterate over all the nodes even if there was an error */ for (i = 0; i < ib_vector_size(word->nodes); ++i) { fts_node_t* fts_node = static_cast( ib_vector_get(word->nodes, i)); + if (fts_node->synced) { + continue; + } else { + fts_node->synced = true; + } + + /*FIXME: we need to handle the error properly. */ if (error == DB_SUCCESS) { + if (unlock_cache) { + rw_lock_x_unlock( + &table->fts->cache->lock); + } error = fts_write_node( trx, &index_cache->ins_graph[selected], &fts_table, &word->text, fts_node); - } - ut_free(fts_node->ilist); - fts_node->ilist = NULL; + DEBUG_SYNC_C("fts_write_node"); + DBUG_EXECUTE_IF("fts_write_node_crash", + DBUG_SUICIDE();); + + if (unlock_cache) { + rw_lock_x_lock( + &table->fts->cache->lock); + } + } } + n_nodes += ib_vector_size(word->nodes); + if (error != DB_SUCCESS && !print_error) { ut_print_timestamp(stderr); fprintf(stderr, " InnoDB: Error (%s) writing " @@ -4026,9 +4059,6 @@ fts_sync_write_words( print_error = TRUE; } - - /* NOTE: We are responsible for free'ing the node */ - ut_free(rbt_remove_node(index_cache->words, rbt_node)); } #ifdef FTS_DOC_STATS_DEBUG @@ -4329,7 +4359,7 @@ fts_sync_index( ut_ad(rbt_validate(index_cache->words)); - error = fts_sync_write_words(trx, index_cache); + error = fts_sync_write_words(sync->trx, index_cache, sync->unlock_cache); #ifdef FTS_DOC_STATS_DEBUG /* FTS_RESOLVE: the word counter info in auxiliary table "DOC_ID" @@ -4345,6 +4375,36 @@ fts_sync_index( return(error); } +/** Check if index cache has been synced completely +@param[in,out] sync sync state +@param[in,out] index_cache index cache +@return true if index is synced, otherwise false. */ +static +bool +fts_sync_index_check( + fts_sync_t* sync, + fts_index_cache_t* index_cache) +{ + const ib_rbt_node_t* rbt_node; + + for (rbt_node = rbt_first(index_cache->words); + rbt_node != NULL; + rbt_node = rbt_next(index_cache->words, rbt_node)) { + + fts_tokenizer_word_t* word; + word = rbt_value(fts_tokenizer_word_t, rbt_node); + + fts_node_t* fts_node; + fts_node = static_cast(ib_vector_last(word->nodes)); + + if (!fts_node->synced) { + return(false); + } + } + + return(true); +} + /*********************************************************************//** Commit the SYNC, change state of processed doc ids etc. @return DB_SUCCESS if all OK */ @@ -4421,21 +4481,53 @@ fts_sync_rollback( trx_t* trx = sync->trx; fts_cache_t* cache = sync->table->fts->cache; + for (ulint i = 0; i < ib_vector_size(cache->indexes); ++i) { + ulint j; + fts_index_cache_t* index_cache; + + index_cache = static_cast( + ib_vector_get(cache->indexes, i)); + + for (j = 0; fts_index_selector[j].value; ++j) { + + if (index_cache->ins_graph[j] != NULL) { + + fts_que_graph_free_check_lock( + NULL, index_cache, + index_cache->ins_graph[j]); + + index_cache->ins_graph[j] = NULL; + } + + if (index_cache->sel_graph[j] != NULL) { + + fts_que_graph_free_check_lock( + NULL, index_cache, + index_cache->sel_graph[j]); + + index_cache->sel_graph[j] = NULL; + } + } + } + rw_lock_x_unlock(&cache->lock); fts_sql_rollback(trx); trx_free_for_background(trx); } -/****************************************************************//** -Run SYNC on the table, i.e., write out data from the cache to the +/** Run SYNC on the table, i.e., write out data from the cache to the FTS auxiliary INDEX table and clear the cache at the end. +@param[in,out] sync sync state +@param[in] unlock_cache whether unlock cache lock when write node +@pram[in] wait whether wait when a sync is in progress @return DB_SUCCESS if all OK */ static dberr_t fts_sync( -/*=====*/ - fts_sync_t* sync) /*!< in: sync state */ + fts_sync_t* sync, + bool unlock_cache, + bool wait) { ulint i; dberr_t error = DB_SUCCESS; @@ -4443,8 +4535,34 @@ fts_sync( rw_lock_x_lock(&cache->lock); + /* Check if cache is being synced. + Note: we release cache lock in fts_sync_write_words() to + avoid long wait for the lock by other threads. */ + while (sync->in_progress) { + rw_lock_x_unlock(&cache->lock); + + if (wait) { + os_event_wait(sync->event); + } else { + return(DB_SUCCESS); + } + + rw_lock_x_lock(&cache->lock); + } + + sync->unlock_cache = unlock_cache; + sync->in_progress = true; + fts_sync_begin(sync); +begin_sync: + if (cache->total_size > fts_max_cache_size) { + /* Avoid the case: sync never finish when + insert/update keeps comming. */ + ut_ad(sync->unlock_cache); + sync->unlock_cache = false; + } + for (i = 0; i < ib_vector_size(cache->indexes); ++i) { fts_index_cache_t* index_cache; @@ -4459,21 +4577,43 @@ fts_sync( if (error != DB_SUCCESS && !sync->interrupted) { - break; + goto end_sync; } } DBUG_EXECUTE_IF("fts_instrument_sync_interrupted", sync->interrupted = true; error = DB_INTERRUPTED; + goto end_sync; ); + /* Make sure all the caches are synced. */ + for (i = 0; i < ib_vector_size(cache->indexes); ++i) { + fts_index_cache_t* index_cache; + + index_cache = static_cast( + ib_vector_get(cache->indexes, i)); + + if (index_cache->index->to_be_dropped + || fts_sync_index_check(sync, index_cache)) { + continue; + } + + goto begin_sync; + } + +end_sync: if (error == DB_SUCCESS && !sync->interrupted) { error = fts_sync_commit(sync); } else { fts_sync_rollback(sync); } + rw_lock_x_lock(&cache->lock); + sync->in_progress = false; + os_event_set(sync->event); + rw_lock_x_unlock(&cache->lock); + /* We need to check whether an optimize is required, for that we make copies of the two variables that control the trigger. These variables can change behind our back and we don't want to hold the @@ -4488,21 +4628,25 @@ fts_sync( return(error); } -/****************************************************************//** -Run SYNC on the table, i.e., write out data from the cache to the -FTS auxiliary INDEX table and clear the cache at the end. */ +/** Run SYNC on the table, i.e., write out data from the cache to the +FTS auxiliary INDEX table and clear the cache at the end. +@param[in,out] table fts table +@param[in] unlock_cache whether unlock cache when write node +@param[in] wait whether wait for existing sync to finish +@return DB_SUCCESS on success, error code on failure. */ UNIV_INTERN dberr_t fts_sync_table( -/*===========*/ - dict_table_t* table) /*!< in: table */ + dict_table_t* table, + bool unlock_cache, + bool wait) { dberr_t err = DB_SUCCESS; ut_ad(table->fts); if (!dict_table_is_discarded(table) && table->fts->cache) { - err = fts_sync(table->fts->cache->sync); + err = fts_sync(table->fts->cache->sync, false, true); } return(err); diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc index 2a0aa4d..3f42610 100644 --- a/storage/innobase/fts/fts0opt.cc +++ b/storage/innobase/fts/fts0opt.cc @@ -87,6 +87,7 @@ enum fts_msg_type_t { FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize threads work queue */ + FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */ }; /** Compressed list of words that have been read from FTS INDEX @@ -2652,6 +2653,38 @@ fts_optimize_remove_table( os_event_free(event); } +/** Send sync fts cache for the table. +@param[in] table table to sync */ +UNIV_INTERN +void +fts_optimize_request_sync_table( + dict_table_t* table) +{ + fts_msg_t* msg; + table_id_t* table_id; + + /* if the optimize system not yet initialized, return */ + if (!fts_optimize_wq) { + return; + } + + /* FTS optimizer thread is already exited */ + if (fts_opt_start_shutdown) { + ib_logf(IB_LOG_LEVEL_INFO, + "Try to sync table %s after FTS optimize" + " thread exiting.", table->name); + return; + } + + msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL); + + table_id = static_cast( + mem_heap_alloc(msg->heap, sizeof(table_id_t))); + msg->ptr = table_id; + + ib_wqueue_add(fts_optimize_wq, msg, msg->heap); +} + /**********************************************************************//** Find the slot for a particular table. @return slot if found else NULL. */ @@ -2932,6 +2965,25 @@ fts_optimize_need_sync( } #endif +/** Sync fts cache of a table +@param[in] table_id table id */ +void +fts_optimize_sync_table( + table_id_t table_id) +{ + dict_table_t* table = NULL; + + table = dict_table_open_on_id(table_id, FALSE, DICT_TABLE_OP_NORMAL); + + if (table) { + if (dict_table_has_fts_index(table) && table->fts->cache) { + fts_sync_table(table, true, false); + } + + dict_table_close(table, FALSE, FALSE); + } +} + /**********************************************************************//** Optimize all FTS tables. @return Dummy return */ @@ -3053,6 +3105,11 @@ fts_optimize_thread( ((fts_msg_del_t*) msg->ptr)->event); break; + case FTS_MSG_SYNC_TABLE: + fts_optimize_sync_table( + *static_cast(msg->ptr)); + break; + default: ut_error; } @@ -3079,26 +3136,7 @@ fts_optimize_thread( ib_vector_get(tables, i)); if (slot->state != FTS_STATE_EMPTY) { - dict_table_t* table = NULL; - - /*slot->table may be freed, so we try to open - table by slot->table_id.*/ - table = dict_table_open_on_id( - slot->table_id, FALSE, - DICT_TABLE_OP_NORMAL); - - if (table) { - - if (dict_table_has_fts_index(table)) { - fts_sync_table(table); - } - - if (table->fts) { - fts_free(table); - } - - dict_table_close(table, FALSE, FALSE); - } + fts_optimize_sync_table(slot->table_id); } } } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 53a60bb..6bd4924 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -11363,7 +11363,7 @@ ha_innobase::optimize( if (innodb_optimize_fulltext_only) { if (prebuilt->table->fts && prebuilt->table->fts->cache && !dict_table_is_discarded(prebuilt->table)) { - fts_sync_table(prebuilt->table); + fts_sync_table(prebuilt->table, false, true); fts_optimize_table(prebuilt->table); } return(HA_ADMIN_OK); diff --git a/storage/innobase/include/fts0fts.h b/storage/innobase/include/fts0fts.h index d54ed28..59f8eb0 100644 --- a/storage/innobase/include/fts0fts.h +++ b/storage/innobase/include/fts0fts.h @@ -724,6 +724,13 @@ fts_optimize_remove_table( /*======================*/ dict_table_t* table); /*!< in: table to remove */ +/** Send sync fts cache for the table. +@param[in] table table to sync */ +UNIV_INTERN +void +fts_optimize_request_sync_table( + dict_table_t* table); + /**********************************************************************//** Signal the optimize thread to prepare for shutdown. */ UNIV_INTERN @@ -826,15 +833,18 @@ fts_drop_index_split_tables( dict_index_t* index) /*!< in: fts instance */ __attribute__((nonnull, warn_unused_result)); -/****************************************************************//** -Run SYNC on the table, i.e., write out data from the cache to the -FTS auxiliary INDEX table and clear the cache at the end. */ +/** Run SYNC on the table, i.e., write out data from the cache to the +FTS auxiliary INDEX table and clear the cache at the end. +@param[in,out] table fts table +@param[in] unlock_cache whether unlock cache when write node +@param[in] wait whether wait for existing sync to finish +@return DB_SUCCESS on success, error code on failure. */ UNIV_INTERN dberr_t fts_sync_table( -/*===========*/ - dict_table_t* table) /*!< in: table */ - __attribute__((nonnull)); + dict_table_t* table, + bool unlock_cache, + bool wait); /****************************************************************//** Free the query graph but check whether dict_sys->mutex is already diff --git a/storage/innobase/include/fts0types.h b/storage/innobase/include/fts0types.h index 6467742..a53b491 100644 --- a/storage/innobase/include/fts0types.h +++ b/storage/innobase/include/fts0types.h @@ -122,7 +122,11 @@ struct fts_sync_t { doc_id_t max_doc_id; /*!< The doc id at which the cache was noted as being full, we use this to set the upper_limit field */ - ib_time_t start_time; /*!< SYNC start time */ + ib_time_t start_time; /*!< SYNC start time */ + bool in_progress; /*!< flag whether sync is in progress.*/ + bool unlock_cache; /*!< flag whether unlock cache when + write fts node */ + os_event_t event; /*!< sync finish event */ }; /** The cache for the FTS system. It is a memory-based inverted index @@ -165,7 +169,6 @@ struct fts_cache_t { objects, they are recreated after a SYNC is completed */ - ib_alloc_t* self_heap; /*!< This heap is the heap out of which an instance of the cache itself was created. Objects created using @@ -212,6 +215,7 @@ struct fts_node_t { ulint ilist_size_alloc; /*!< Allocated size of ilist in bytes */ + bool synced; /*!< flag whether the node is synced */ }; /** A tokenizer word. Contains information about one word. */ diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index e9be0b1..ccfa15e 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -1986,7 +1986,8 @@ wait_again: if (max_doc_id && err == DB_SUCCESS) { /* Sync fts cache for other fts indexes to keep all fts indexes consistent in sync_doc_id. */ - err = fts_sync_table(const_cast(new_table)); + err = fts_sync_table(const_cast(new_table), + false, true); if (err == DB_SUCCESS) { fts_update_next_doc_id(