diff --git a/storage/innobase/ddl/ddl0builder.cc b/storage/innobase/ddl/ddl0builder.cc index 5e61568d89f..27f0f88ab9b 100644 --- a/storage/innobase/ddl/ddl0builder.cc +++ b/storage/innobase/ddl/ddl0builder.cc @@ -389,6 +389,22 @@ void Merge_cursor::clear_eof() noexcept { } } +void Merge_cursor::reset_for_next_range() noexcept { + while (!m_pq.empty()) { + m_pq.pop(); + } + m_err = DB_END_OF_INDEX; + for (auto cursor : m_cursors) { + cursor->m_err = DB_END_OF_INDEX; + } +} + +void Merge_cursor::stage_inc() noexcept { + if (m_stage != nullptr) { + m_stage->inc(1); + } +} + dberr_t Merge_cursor::open() noexcept { ut_a(m_pq.empty()); ut_a(!m_cursors.empty()); diff --git a/storage/innobase/ddl/ddl0merge.cc b/storage/innobase/ddl/ddl0merge.cc index 26cdf2fd264..185d5e624e6 100644 --- a/storage/innobase/ddl/ddl0merge.cc +++ b/storage/innobase/ddl/ddl0merge.cc @@ -50,6 +50,7 @@ std::ostream &operator<<(std::ostream &out, /** Cursor for merging blocks from the same file. */ struct Merge_file_sort::Cursor : private ut::Non_copyable { + friend class Merge_file_sort; /** Constructor. @param[in,out] builder Index builder instance. @param[in] file File to iterate over. @@ -403,27 +404,68 @@ Merge_file_sort::Range Merge_file_sort::next_range( dberr_t Merge_file_sort::merge_rows(Cursor &cursor, Output_file &output_file) noexcept { dberr_t err; - ulint *offsets{}; - const mrec_t *mrec{}; - while ((err = cursor.fetch(mrec, offsets)) == DB_SUCCESS) { - /* If we are simply appending from a single partion then enable duplicate - key checking for the write phase. */ - auto dup = cursor.size() == 0 ? m_merge_ctx->m_dup : nullptr; + const auto &file_readers = cursor.m_cursor.file_readers(); + ut_a(file_readers.size() == 2); - err = output_file.write(mrec, offsets, dup); + const auto file_reader0 = file_readers[0]; + const auto file_reader1 = file_readers[1]; + File_reader *file_reader = nullptr; - if (unlikely(err != DB_SUCCESS)) { + const auto dup = m_merge_ctx->m_dup; + + /* Merge two partitions */ + while (!file_reader0->eof() && !file_reader1->eof()) { + auto cmp = cmp_rec_rec_simple( + file_reader0->m_mrec, file_reader1->m_mrec, + &(file_reader0->m_offsets[0]), &(file_reader1->m_offsets[0]), + file_reader0->m_index, dup != nullptr ? dup->m_table : nullptr); + + /* Check for duplicates. */ + if (cmp == 0 && dup != nullptr) { + dup->report(file_reader0->m_mrec, &(file_reader0->m_offsets[0])); + } + + /* PFS stage monitoring */ + cursor.m_cursor.stage_inc(); + + file_reader = cmp <= 0 ? file_reader0 : file_reader1; + + err = output_file.write(file_reader->m_mrec, &(file_reader->m_offsets[0]), + nullptr); + if (err != DB_SUCCESS) { + return err; + } + err = file_reader->next(); + + if (err != DB_SUCCESS) { break; } + } - err = cursor.next(); + file_reader = file_reader0->eof() ? file_reader1 : file_reader0; - if (unlikely(err != DB_SUCCESS)) { + /* Process the remaining rows */ + while (!file_reader->eof()) { + /* PFS stage monitoring */ + cursor.m_cursor.stage_inc(); + + /* Check for duplicates when we are appending from a single partition */ + err = output_file.write(file_reader->m_mrec, &(file_reader->m_offsets[0]), + dup); + if (err != DB_SUCCESS) { + return err; + } + + err = file_reader->next(); + if (err != DB_SUCCESS) { break; } } + /* Clear the Merge_cursor to pass the subsequent checks in clear_eof() */ + cursor.m_cursor.reset_for_next_range(); + return err; } diff --git a/storage/innobase/include/ddl0impl-builder.h b/storage/innobase/include/ddl0impl-builder.h index 4240c837722..156daa7cca3 100644 --- a/storage/innobase/include/ddl0impl-builder.h +++ b/storage/innobase/include/ddl0impl-builder.h @@ -541,6 +541,12 @@ struct Merge_cursor : public Load_cursor { return m_cursors.size(); } + /** Clear m_pq, called at the end of merge_rows_fast() */ + void reset_for_next_range() noexcept; + + /* PFS stage monitoring */ + void stage_inc() noexcept; + private: /** @return the current cursor at the head of the queue. */ [[nodiscard]] File_cursor *pop() noexcept; diff --git a/storage/innobase/include/ddl0impl-file-reader.h b/storage/innobase/include/ddl0impl-file-reader.h index d8032933fe1..340986747c2 100644 --- a/storage/innobase/include/ddl0impl-file-reader.h +++ b/storage/innobase/include/ddl0impl-file-reader.h @@ -37,6 +37,7 @@ namespace ddl { // Forward declaration struct File_cursor; +struct Merge_file_sort; /** Read rows from the temporary file. */ struct File_reader : private ut::Non_copyable { @@ -150,6 +151,7 @@ struct File_reader : private ut::Non_copyable { uint64_t m_n_rows_read{}; friend File_cursor; + friend Merge_file_sort; }; } // namespace ddl