diff --git a/mysql-test/collections/disabled.def b/mysql-test/collections/disabled.def index eedccca4c60..907ce95deef 100644 --- a/mysql-test/collections/disabled.def +++ b/mysql-test/collections/disabled.def @@ -23,6 +23,9 @@ audit_null.audit_plugin_bugs : BUG#28080637 Test fails consistently binlog.binlog_mysqlbinlog_rewrite_db @windows : BUG#26717205 Requires a debug client binary and fails consistently. binlog_gtid.binlog_xa_select_gtid_executed_explicitly_crash : Bug#28588717 Fails both on FreeBSD and other platforms +# clone suite tests +clone.local_xa : BUG#00000000 : cross-engine consistent clone does not block XA commits + # func1 suite tests funcs_1.is_basics_mixed @darwin : BUG#25882809 INCORRECT RESULT WHEN USING SUBQUERY ON TOP OF VIEW. funcs_1.is_basics_mixed @windows : BUG#25882809 INCORRECT RESULT WHEN USING SUBQUERY ON TOP OF VIEW. diff --git a/plugin/clone/CMakeLists.txt b/plugin/clone/CMakeLists.txt index db479f6e581..2937cd947cd 100644 --- a/plugin/clone/CMakeLists.txt +++ b/plugin/clone/CMakeLists.txt @@ -29,6 +29,7 @@ ADD_DEFINITIONS(-DLOG_COMPONENT_TAG="Clone") MYSQL_ADD_PLUGIN(clone src/clone_plugin.cc src/clone_client.cc + src/clone_common.cc src/clone_server.cc src/clone_status.cc src/clone_local.cc diff --git a/plugin/clone/include/clone_client.h b/plugin/clone/include/clone_client.h index c100dbfa933..4dfd2a2f104 100644 --- a/plugin/clone/include/clone_client.h +++ b/plugin/clone/include/clone_client.h @@ -30,6 +30,7 @@ Clone Plugin: Client Interface #define CLONE_CLIENT_H #include "plugin/clone/include/clone.h" +#include "plugin/clone/include/clone_common.h" #include "plugin/clone/include/clone_hton.h" #include "plugin/clone/include/clone_status.h" @@ -784,7 +785,7 @@ class Client { }; /** Clone client interface to handle callback from Storage Engine */ -class Client_Cbk : public Ha_clone_cbk { +class Client_Cbk : public Ha_clone_common_cbk { public: /** Construct Callback. Set clone client object. @param[in] clone clone client object */ diff --git a/plugin/clone/include/clone_common.h b/plugin/clone/include/clone_common.h new file mode 100644 index 00000000000..7761016151a --- /dev/null +++ b/plugin/clone/include/clone_common.h @@ -0,0 +1,31 @@ +/* + Copyright (C) 2022-2023 Laurynas Biveinis + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CLONE_COMMON_H +#define CLONE_COMMON_H + +#include "sql/handler.h" + +namespace myclone { + +class Ha_clone_common_cbk : public Ha_clone_cbk { + public: + [[nodiscard]] int synchronize_engines() override; +}; + +} // namespace myclone + +#endif // CLONE_COMMON_H diff --git a/plugin/clone/include/clone_local.h b/plugin/clone/include/clone_local.h index 2a6268bfce4..874597fe16e 100644 --- a/plugin/clone/include/clone_local.h +++ b/plugin/clone/include/clone_local.h @@ -31,6 +31,7 @@ Clone Plugin: Local clone interface #include "plugin/clone/include/clone.h" #include "plugin/clone/include/clone_client.h" +#include "plugin/clone/include/clone_common.h" #include "plugin/clone/include/clone_hton.h" #include "plugin/clone/include/clone_server.h" @@ -78,7 +79,7 @@ class Local { }; /** Clone Local interface to handle callback from Storage Engines */ -class Local_Callback : public Ha_clone_cbk { +class Local_Callback : public Ha_clone_common_cbk { public: /** Construct Callback. Set clone local object. @param[in] clone clone local object */ diff --git a/plugin/clone/include/clone_server.h b/plugin/clone/include/clone_server.h index 45e471ef475..e26f3286f1e 100644 --- a/plugin/clone/include/clone_server.h +++ b/plugin/clone/include/clone_server.h @@ -30,6 +30,7 @@ Clone Plugin: Server interface #define CLONE_SERVER_H #include "plugin/clone/include/clone.h" +#include "plugin/clone/include/clone_common.h" #include "plugin/clone/include/clone_hton.h" #include "plugin/clone/include/clone_os.h" @@ -244,7 +245,7 @@ class Server { }; /** Clone server interface to handle callback from Storage Engine */ -class Server_Cbk : public Ha_clone_cbk { +class Server_Cbk : public Ha_clone_common_cbk { public: /** Construct Callback. Set clone server object. @param[in] clone clone server object */ diff --git a/plugin/clone/src/clone_common.cc b/plugin/clone/src/clone_common.cc new file mode 100644 index 00000000000..702c6232183 --- /dev/null +++ b/plugin/clone/src/clone_common.cc @@ -0,0 +1,100 @@ +/* + Copyright (C) 2022, 2023, Laurynas Biveinis + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "plugin/clone/include/clone_common.h" + +#include +#include + +#include "lex_string.h" +#include "my_dbug.h" +#include "my_inttypes.h" +#include "plugin/clone/include/clone_hton.h" +#include "plugin/clone/include/clone_status.h" +#include "sql/handler.h" +#include "sql/json_dom.h" +#include "sql/sql_plugin_ref.h" +#include "storage/perfschema/table_log_status.h" + +namespace myclone { + +// Perform the cross-engine synchronization: execute a +// performance_schema.log_status query, and call set_log_stop for each storage +// engine with its part of STORAGE_ENGINES column JSON object from that query. +int Ha_clone_common_cbk::synchronize_engines() { + const auto &all_locators = get_all_locators(); + + std::unique_ptr table{static_cast( + table_log_status::create(&table_log_status::m_share))}; + + auto err = table->rnd_init(true); + assert(err == 0); + + err = table->rnd_next(); + if (err != 0) return err; + + auto &log_status_row = table->get_row(); + const auto &se_positions = log_status_row.w_storage_engines; + assert(se_positions.is_dom()); + + const auto *const json_dom = se_positions.get_dom(); + // get_dom above returns only a const pointer, correctly. Json_wrapper only + // takes a non-const pointer. Let's trust it will not modify the passed value. + const Json_wrapper json_for_str{const_cast(json_dom), true}; + String json_str_buf; + // This may fail and return true, in which case there's nothing reasonable to + // do, so try to print what's in the buffer anyway. + const auto json_format_failed [[maybe_unused]] = + json_for_str.to_string(&json_str_buf, true, __PRETTY_FUNCTION__); + assert(!json_format_failed); + // Size reverse-engineered from ER_CLONE_SERVER_TRACE used by log_error. No + // way to track it automatically, but very unlikely it would silently shrink. + char msg_buf[512]; + snprintf(msg_buf, sizeof(msg_buf), "engine positions: %.*s", + static_cast(json_str_buf.length()), json_str_buf.ptr()); + log_error(nullptr, false, 0, msg_buf); + + assert(json_dom->json_type() == enum_json_type::J_OBJECT); + + const auto *const json_obj = static_cast(json_dom); + for (const auto &json_se_pos : *json_obj) { + const auto &se_name = json_se_pos.first; + const LEX_CSTRING lex_c_se_name{.str = se_name.c_str(), + .length = se_name.length()}; + auto *const plugin_ref = ha_resolve_by_name_raw(nullptr, lex_c_se_name); + + auto *const hton = plugin_data(plugin_ref); + + if (hton->clone_interface.clone_set_log_stop) { + // O(n^2) but n == 2 + const auto loc_itr = std::find_if( + all_locators.begin(), all_locators.cend(), + [hton](const Locator &loc) { return loc.m_hton == hton; }); + assert(loc_itr != all_locators.cend()); + + (hton->clone_interface.clone_set_log_stop)( + loc_itr->m_loc, loc_itr->m_loc_len, *json_se_pos.second); + } + + plugin_unlock(nullptr, plugin_ref); + } + + log_status_row.cleanup(); + + return 0; +} + +} // namespace myclone diff --git a/plugin/clone/src/clone_hton.cc b/plugin/clone/src/clone_hton.cc index cd52ab4c456..0a2b68f9bc6 100644 --- a/plugin/clone/src/clone_hton.cc +++ b/plugin/clone/src/clone_hton.cc @@ -105,6 +105,27 @@ static bool run_hton_clone_begin(THD *thd, plugin_ref plugin, void *arg) { return (false); } +// Make InnoDB first in the storage locator and task vectors - to drive the +// cross-engine synchronization & to wait for reconnects +static void make_innodb_first(Storage_Vector &clone_loc_vec, + Task_Vector &task_vec) { + auto innodb_loc_itr = + std::find_if(clone_loc_vec.begin(), clone_loc_vec.end(), + [](const myclone::Locator &loc) { + return loc.m_hton->db_type == DB_TYPE_INNODB; + }); + assert(innodb_loc_itr != clone_loc_vec.end()); + if (innodb_loc_itr != clone_loc_vec.begin()) { + std::iter_swap(innodb_loc_itr, clone_loc_vec.begin()); + if (!task_vec.empty()) { + assert(task_vec.size() == clone_loc_vec.size()); + const auto index = innodb_loc_itr - clone_loc_vec.begin(); + std::swap(task_vec[index], task_vec[0]); + } + } + assert(clone_loc_vec[0].m_hton->db_type == DB_TYPE_INNODB); +} + int hton_clone_begin(THD *thd, Storage_Vector &clone_loc_vec, Task_Vector &task_vec, Ha_clone_type clone_type, Ha_clone_mode clone_mode) { @@ -124,9 +145,13 @@ int hton_clone_begin(THD *thd, Storage_Vector &clone_loc_vec, plugin_foreach(thd, run_hton_clone_begin, MYSQL_STORAGE_ENGINE_PLUGIN, &clone_args); + make_innodb_first(clone_loc_vec, task_vec); + return (clone_args.m_err); } + assert(clone_loc_vec[0].m_hton->db_type == DB_TYPE_INNODB); + for (auto &loc_iter : clone_loc_vec) { uint32_t task_id = 0; @@ -162,8 +187,11 @@ int hton_clone_begin(THD *thd, Storage_Vector &clone_loc_vec, int hton_clone_copy(THD *thd, Storage_Vector &clone_loc_vec, Task_Vector &task_vec, Ha_clone_cbk *clone_cbk) { - uint index = 0; + assert(clone_loc_vec[0].m_hton->db_type == DB_TYPE_INNODB); + + clone_cbk->set_all_locators(&clone_loc_vec); + uint index = 0; for (auto &loc_iter : clone_loc_vec) { assert(index < task_vec.size()); clone_cbk->set_loc_index(index); @@ -173,16 +201,20 @@ int hton_clone_copy(THD *thd, Storage_Vector &clone_loc_vec, task_vec[index], clone_cbk); if (err != 0) { + clone_cbk->reset_all_locators(); return (err); } index++; } + clone_cbk->reset_all_locators(); return (0); } int hton_clone_end(THD *thd, Storage_Vector &clone_loc_vec, Task_Vector &task_vec, int in_err) { + assert(clone_loc_vec[0].m_hton->db_type == DB_TYPE_INNODB); + uint index = 0; for (auto &loc_iter : clone_loc_vec) { @@ -252,9 +284,13 @@ int hton_clone_apply_begin(THD *thd, const char *clone_data_dir, plugin_foreach(thd, run_hton_clone_apply_begin, MYSQL_STORAGE_ENGINE_PLUGIN, &clone_args); + make_innodb_first(clone_loc_vec, task_vec); + return (clone_args.m_err); } + assert(clone_loc_vec[0].m_hton->db_type == DB_TYPE_INNODB); + uint32_t loop_index = 0; for (auto &loc_iter : clone_loc_vec) { @@ -296,6 +332,7 @@ int hton_clone_apply_begin(THD *thd, const char *clone_data_dir, int hton_clone_apply_error(THD *thd, Storage_Vector &clone_loc_vec, Task_Vector &task_vec, int in_err) { + assert(clone_loc_vec[0].m_hton->db_type == DB_TYPE_INNODB); assert(in_err != 0); uint index = 0; @@ -316,6 +353,8 @@ int hton_clone_apply_error(THD *thd, Storage_Vector &clone_loc_vec, int hton_clone_apply_end(THD *thd, Storage_Vector &clone_loc_vec, Task_Vector &task_vec, int in_err) { + assert(clone_loc_vec[0].m_hton->db_type == DB_TYPE_INNODB); + uint index = 0; for (auto &loc_iter : clone_loc_vec) { /* Task vector could be empty if we are exiting immediately diff --git a/sql/handler.h b/sql/handler.h index 50419116e4f..7283c757683 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1041,6 +1041,13 @@ struct Ha_clone_file { }; }; +namespace myclone { + +struct Locator; +using Storage_Vector = std::vector; + +} // namespace myclone + /* Abstract callback interface to stream data back to the caller. */ class Ha_clone_cbk { protected: @@ -1086,6 +1093,10 @@ class Ha_clone_cbk { @param[in] estimate_delta how many bytes to add to the clone size estimate */ virtual void add_to_data_size_estimate(std::uint64_t estimate_delta) = 0; + /** Callback to synchronize all the transactional storage engines for + consistent clone. */ + [[nodiscard]] virtual int synchronize_engines() = 0; + /** virtual destructor. */ virtual ~Ha_clone_cbk() = default; @@ -1197,6 +1213,22 @@ class Ha_clone_cbk { return (m_flag & HA_CLONE_STATE_CHANGE); } + void set_all_locators(myclone::Storage_Vector *new_all_locators) noexcept { + assert(all_locators == nullptr); + all_locators = new_all_locators; + } + + void reset_all_locators() noexcept { + assert(all_locators != nullptr); + all_locators = nullptr; + } + + protected: + [[nodiscard]] const myclone::Storage_Vector &get_all_locators() + const noexcept { + return *all_locators; + } + private: /** Handlerton for the SE */ handlerton *m_hton; @@ -1222,6 +1254,10 @@ class Ha_clone_cbk { /** Estimated bytes to be transferred. */ uint64_t m_state_estimate; + /** Current clone storage vector, used by cross-engine synchronization. + nullptr when not in the cross-engine synchronization code path. */ + myclone::Storage_Vector *all_locators{nullptr}; + /** Flag storing data related options */ int m_flag; @@ -2085,6 +2121,19 @@ using Clone_ack_t = int (*)(handlerton *hton, THD *thd, const uchar *loc, uint loc_len, uint task_id, int in_err, Ha_clone_cbk *cbk); +/** Use the result of performance_schema.log_status query to set the clone data +end point, such as log archiving stop LSN. The cloned instance will contain data +up to this point inclusively, and nothing after. This implements cross-engine +consistency for clone. +@param[in] loc locator +@param[in] loc_len locator length in bytes +@param[in] log_stop_pos The part of the JSON document from + performance_schema.log_status query result set + STORAGE_ENGINES column that corresponds to this + SE. */ +using Clone_set_log_stop_t = void (*)(const uchar *loc, uint loc_len, + const Json_dom &log_stop_pos); + /** End copy from source database @param[in] hton handlerton for SE @param[in] thd server thread handle @@ -2141,6 +2207,7 @@ struct Clone_interface_t { /* Interfaces to copy data. */ Clone_begin_t clone_begin; Clone_copy_t clone_copy; + Clone_set_log_stop_t clone_set_log_stop; Clone_ack_t clone_ack; Clone_end_t clone_end; diff --git a/storage/innobase/arch/arch0log.cc b/storage/innobase/arch/arch0log.cc index d38fc789a29..a7f39565238 100644 --- a/storage/innobase/arch/arch0log.cc +++ b/storage/innobase/arch/arch0log.cc @@ -253,6 +253,7 @@ is attached to current group. @param[out] start_lsn start lsn for client @param[out] header redo log header @param[in] is_durable if client needs durable archiving +@param[in] stop_lsn archiving target stop LSN @return error code */ int Arch_Log_Sys::start(Arch_Group *&group, lsn_t &start_lsn, byte *header, bool is_durable) { @@ -262,6 +263,8 @@ int Arch_Log_Sys::start(Arch_Group *&group, lsn_t &start_lsn, byte *header, log_request_checkpoint(*log_sys, true); + ut_ad(get_stop_lsn_master_thread() == LSN_MAX); + arch_mutex_enter(); if (m_state == ARCH_STATE_READ_ONLY) { @@ -437,21 +440,74 @@ int Arch_Log_Sys::stop(Arch_Group *group, lsn_t &stop_lsn, byte *log_blk, int err = 0; blk_len = 0; stop_lsn = m_archived_lsn.load(); + auto se_sync_stop_lsn = get_stop_lsn_master_thread(); + ut_ad(se_sync_stop_lsn != LSN_MAX || log_blk == nullptr); + ut_ad(stop_lsn <= se_sync_stop_lsn); if (log_blk != nullptr) { - /* Get the current LSN and trailer block. */ - log_buffer_get_last_block(*log_sys, stop_lsn, log_blk, blk_len); - - DBUG_EXECUTE_IF("clone_arch_log_stop_file_end", - group->adjust_end_lsn(stop_lsn, blk_len);); + DBUG_EXECUTE_IF("clone_arch_log_stop_file_end", { + // This desyncs storage engines - use in InnoDB-only tests + group->adjust_end_lsn(stop_lsn, blk_len); + set_stop_lsn(stop_lsn); + se_sync_stop_lsn = stop_lsn; + }); /* Will throw error, if shutdown. We still continue with detach but return the error. */ - err = wait_archive_complete(stop_lsn); + err = wait_archive_complete(se_sync_stop_lsn); + if (!err) { + stop_lsn = se_sync_stop_lsn; + const auto last_log_seg_start_lsn = + ut_uint64_align_down(stop_lsn, OS_FILE_LOG_BLOCK_SIZE); + if (last_log_seg_start_lsn < stop_lsn) { + ut_ad(stop_lsn <= last_log_seg_start_lsn + OS_FILE_LOG_BLOCK_SIZE); + + // In the older InnoDB-only clone implementation log archiving could + // only stop at the end of the log. If that fell in a middle of a redo + // log block, that block was present in the in-memory log buffer. + // + // With clone synchronized across storage engines, the log archiving + // will stop at an LSN not greater than the redo log end LSN, which may + // still be in a middle of a block. Depending on how much this LSN is + // behind the current LSN, the needed block might be on disk, in memory, + // or be in a middle of move from memory to disk. Handle all those cases + // uniformly by forcing the write of this block, and reading it from the + // disk like any older block. + const auto needs_log_write = + log_sys->write_lsn.load(std::memory_order_acquire) < stop_lsn; + if (needs_log_write) log_write_up_to(*log_sys, stop_lsn, false); + + const auto last_log_seg_end_lsn = + last_log_seg_start_lsn + OS_FILE_LOG_BLOCK_SIZE; + byte last_log_seg_buf[OS_FILE_LOG_BLOCK_SIZE]; + recv_read_log_seg(*log_sys, &last_log_seg_buf[0], + last_log_seg_start_lsn, last_log_seg_end_lsn, true); + + // Roughly mimicking log_buffer_get_last_block, but without setting the + // fields that should be already set as we read the actual valid block + // from disk. + const auto data_len = stop_lsn % OS_FILE_LOG_BLOCK_SIZE; + ut_ad(data_len >= LOG_BLOCK_HDR_SIZE); + std::memcpy(log_blk, last_log_seg_buf, data_len); + // Our redo block copy must not include any records past the log + // archiving stop LSN. + std::memset(log_blk + data_len, 0x00, + OS_FILE_LOG_BLOCK_SIZE - data_len); + log_block_set_data_len(log_blk, data_len); + ut_ad(log_block_get_first_rec_group(log_blk) <= data_len); + log_block_store_checksum(log_blk); + blk_len = OS_FILE_LOG_BLOCK_SIZE; + } + } } arch_mutex_enter(); + // Relaxed store will be ordered by the arch_mutex release. This is fine as + // all the reads from any non-master threads later will be after arch_mutex + // acquisition. + m_stop_lsn.store(LSN_MAX, std::memory_order_relaxed); + if (m_state == ARCH_STATE_READ_ONLY) { arch_mutex_exit(); return 0; @@ -539,6 +595,7 @@ Arch_State Arch_Log_Sys::check_set_state(bool is_abort, lsn_t *archived_lsn, if (*archived_lsn != LSN_MAX) { /* Update system archived LSN from input */ ut_ad(*archived_lsn >= m_archived_lsn.load()); + ut_ad(*archived_lsn <= get_stop_lsn_any_thread()); m_archived_lsn.store(*archived_lsn); } else { /* If input is not initialized, @@ -549,9 +606,20 @@ Arch_State Arch_Log_Sys::check_set_state(bool is_abort, lsn_t *archived_lsn, lsn_t lsn_diff; /* Check redo log data ready to archive. */ - ut_ad(log_sys->write_lsn.load() >= m_archived_lsn.load()); - - lsn_diff = log_sys->write_lsn.load() - m_archived_lsn.load(); + { +#ifdef UNIV_DEBUG + const auto local_lsn = log_get_lsn(*log_sys); +#endif + const auto local_write_lsn = log_sys->write_lsn.load(); + const auto local_archived_lsn = m_archived_lsn.load(); + ut_ad(local_write_lsn >= local_archived_lsn); + const auto stop_lsn = get_stop_lsn_any_thread(); + ut_ad(stop_lsn >= local_archived_lsn); + ut_ad(stop_lsn <= local_lsn || stop_lsn == LSN_MAX || + DBUG_EVALUATE_IF("clone_arch_log_stop_file_end", true, false)); + + lsn_diff = std::min(stop_lsn, local_write_lsn) - local_archived_lsn; + } lsn_diff = ut_uint64_align_down(lsn_diff, OS_FILE_LOG_BLOCK_SIZE); @@ -734,6 +802,8 @@ We need to wait till current log sys LSN during archive stop. @param[in] target_lsn target archive LSN to wait for @return error code */ int Arch_Log_Sys::wait_archive_complete(lsn_t target_lsn) { + ut_ad(target_lsn != LSN_MAX); + target_lsn = ut_uint64_align_down(target_lsn, OS_FILE_LOG_BLOCK_SIZE); /* Check and wait for archiver thread if needed. */ @@ -842,8 +912,19 @@ bool Arch_Log_Sys::archive(bool init, Arch_File_Ctx *curr_ctx, lsn_t *arch_lsn, if (curr_state == ARCH_STATE_ACTIVE) { /* Adjust archiver length to no go beyond file end. */ - DBUG_EXECUTE_IF("clone_arch_log_stop_file_end", - m_current_group->adjust_copy_length(*arch_lsn, arch_len);); + DBUG_EXECUTE_IF( + "clone_arch_log_stop_file_end", + // This desyncs storage engines - use in InnoDB-only tests + if (get_stop_lsn_any_thread() == LSN_MAX) { + // Get the end LSN of the current log file and set that as + // the stop LSN. + lsn_t file_stop_lsn; + uint32_t blk_len; + m_current_group->adjust_end_lsn(file_stop_lsn, blk_len); + set_stop_lsn(file_stop_lsn); + } + // This desyncs storage engines - use in InnoDB-only tests + m_current_group->adjust_copy_length(*arch_lsn, arch_len);); /* Simulate archive error. */ DBUG_EXECUTE_IF("clone_redo_no_archive", arch_len = 0;); diff --git a/storage/innobase/clone/clone0api.cc b/storage/innobase/clone/clone0api.cc index 58dd998535b..f284de5e006 100644 --- a/storage/innobase/clone/clone0api.cc +++ b/storage/innobase/clone/clone0api.cc @@ -37,6 +37,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include "os0thread-create.h" #include "sql/clone_handler.h" +#include "sql/json_dom.h" #include "sql/mysqld.h" #include "sql/sql_backup_lock.h" #include "sql/sql_class.h" @@ -544,6 +545,22 @@ int innodb_clone_copy(handlerton *hton, THD *thd, const byte *loc, uint loc_len, return (err); } +void innodb_clone_set_log_stop(const uchar *loc, uint loc_len, + const Json_dom &log_stop_pos) { + ut_ad(log_stop_pos.json_type() == enum_json_type::J_OBJECT); + const auto &json_obj = static_cast(log_stop_pos); + + const auto &lsn_json = *json_obj.get(log_status_lsn_key); + + ut_ad(lsn_json.json_type() == enum_json_type::J_INT); + const auto &lsn_json_int = static_cast(lsn_json); + const lsn_t log_stop_lsn = lsn_json_int.value(); + + auto *const clone_hdl = clone_sys->get_clone_by_index(loc, loc_len); + auto &snapshot = clone_hdl->get_active_snapshot(); + snapshot.set_stop_lsn(log_stop_lsn); +} + int innodb_clone_ack(handlerton *hton, THD *thd, const byte *loc, uint loc_len, uint task_id, int in_err, Ha_clone_cbk *cbk) { cbk->set_hton(hton); @@ -2772,3 +2789,6 @@ Clone_Sys::Wait_stage::~Wait_stage() { thd->set_proc_info(m_saved_info); } } + +// Make it header-only constexpr once in C++20 +const std::string log_status_lsn_key{"LSN"}; diff --git a/storage/innobase/clone/clone0apply.cc b/storage/innobase/clone/clone0apply.cc index 5ba04efc147..7428e1da2d0 100644 --- a/storage/innobase/clone/clone0apply.cc +++ b/storage/innobase/clone/clone0apply.cc @@ -607,7 +609,7 @@ int Clone_Handle::apply_state_metadata(Clone_Task *task, err = fix_all_renamed(task); if (err == 0) { - err = move_to_next_state(task, nullptr, &state_desc); + err = move_to_next_state(task, callback, &state_desc); } #ifdef UNIV_DEBUG diff --git a/storage/innobase/clone/clone0clone.cc b/storage/innobase/clone/clone0clone.cc index 7db22d5dd0e..93f53975e92 100644 --- a/storage/innobase/clone/clone0clone.cc +++ b/storage/innobase/clone/clone0clone.cc @@ -1769,7 +1781,8 @@ int Clone_Task_Manager::finish_state(Clone_Task *task) { int Clone_Task_Manager::change_state(Clone_Task *task, Clone_Desc_State *state_desc, Snapshot_State new_state, - Clone_Alert_Func cbk, uint &num_wait) { + Clone_Alert_Func cbk, + Ha_clone_cbk *clone_cbk, uint &num_wait) { mutex_enter(&m_state_mutex); num_wait = 0; @@ -1824,10 +1837,9 @@ int Clone_Task_Manager::change_state(Clone_Task *task, << "Clone Apply State Change : Number of tasks = " << m_num_tasks; } - err = m_clone_snapshot->change_state(state_desc, m_next_state, - task->m_current_buffer, - task->m_buffer_alloc_len, cbk); - + err = m_clone_snapshot->change_state( + state_desc, m_next_state, task->m_current_buffer, + task->m_buffer_alloc_len, cbk, clone_cbk); if (err != 0) { return (err); } @@ -2151,8 +2163,8 @@ int Clone_Handle::move_to_next_state(Clone_Task *task, Ha_clone_cbk *callback, /* Move to new state */ uint num_wait = 0; - auto err = m_clone_task_manager.change_state(task, state_desc, next_state, - alert_callback, num_wait); + auto err = m_clone_task_manager.change_state( + task, state_desc, next_state, alert_callback, callback, num_wait); /* Need to wait for all other tasks to move over, if any. */ if (num_wait > 0) { @@ -2162,8 +2174,9 @@ int Clone_Handle::move_to_next_state(Clone_Task *task, Ha_clone_cbk *callback, [&](bool alert, bool &result) { /* For multi threaded clone, master task does the state change. */ if (task->m_is_master) { - err = m_clone_task_manager.change_state( - task, state_desc, next_state, alert_callback, num_wait); + err = m_clone_task_manager.change_state(task, state_desc, + next_state, alert_callback, + callback, num_wait); } else { err = m_clone_task_manager.check_state(task, next_state, false, 0, num_wait); diff --git a/storage/innobase/clone/clone0copy.cc b/storage/innobase/clone/clone0copy.cc index 255091899ad..47a0846938c 100644 --- a/storage/innobase/clone/clone0copy.cc +++ b/storage/innobase/clone/clone0copy.cc @@ -447,10 +466,15 @@ int Clone_Snapshot::wait_for_binlog_prepared_trx() { } int Clone_Snapshot::init_redo_copy(Snapshot_State new_state, - Clone_Alert_Func cbk) { + Clone_Alert_Func cbk, + Ha_clone_cbk *clone_cbk) { ut_ad(m_snapshot_handle_type == CLONE_HDL_COPY); ut_ad(m_snapshot_type != HA_CLONE_BLOCKING); + /* This code is disabled due to clone cross-engine synchronization happenning + through a performance_schema.LOG_STATUS query, see the synchronize_engines + call below. */ +#if 0 /* Block external XA operations. XA prepare commit and rollback operations are first logged to binlog and added to global gtid_executed before doing operation in SE. Without blocking, we might persist such GTIDs from global @@ -485,6 +509,10 @@ int Clone_Snapshot::init_redo_copy(Snapshot_State new_state, redo log during recovery. */ dict_persist_t::Enable_immediate dyn_metadata_guard(dict_persist); +#else + int binlog_error = 0; +#endif + /* Use it only for local clone. For remote clone, donor session is different from the sessions created within mtr test case. */ DEBUG_SYNC_C("clone_donor_after_saving_dynamic_metadata"); @@ -492,10 +520,16 @@ int Clone_Snapshot::init_redo_copy(Snapshot_State new_state, /* Start transition to next state. */ State_transit transit_guard(this, new_state); + const auto sync_error = clone_cbk->synchronize_engines(); + /* Stop redo archiving even on error. */ auto redo_error = m_redo_ctx.stop(m_redo_trailer, m_redo_trailer_size, m_redo_trailer_offset); + if (sync_error != 0) { + return sync_error; + } + if (binlog_error != 0) { return binlog_error; /* purecov: inspected */ } diff --git a/storage/innobase/clone/clone0snapshot.cc b/storage/innobase/clone/clone0snapshot.cc index de508bcac2e..f6548612ffe 100644 --- a/storage/innobase/clone/clone0snapshot.cc +++ b/storage/innobase/clone/clone0snapshot.cc @@ -555,7 +574,8 @@ uint32_t Clone_Snapshot::get_blocks_per_chunk() const { int Clone_Snapshot::change_state(Clone_Desc_State *state_desc, Snapshot_State new_state, byte *temp_buffer, - uint temp_buffer_len, Clone_Alert_Func cbk) { + uint temp_buffer_len, Clone_Alert_Func cbk, + Ha_clone_cbk *clone_cbk) { ut_ad(m_snapshot_state != CLONE_SNAPSHOT_NONE); int err = 0; @@ -592,10 +612,10 @@ int Clone_Snapshot::change_state(Clone_Desc_State *state_desc, DEBUG_SYNC_C("clone_start_redo_archiving"); break; case CLONE_SNAPSHOT_REDO_COPY: ib::info(ER_IB_CLONE_OPERATION) << "Clone State BEGIN REDO COPY"; - err = init_redo_copy(new_state, cbk); + err = init_redo_copy(new_state, cbk, clone_cbk); break; diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 2705bd34011..53e398b80e5 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -5282,6 +5282,7 @@ static int innodb_init(void *p) { innobase_hton->clone_interface.clone_capability = innodb_clone_get_capability; innobase_hton->clone_interface.clone_begin = innodb_clone_begin; + innobase_hton->clone_interface.clone_set_log_stop = innodb_clone_set_log_stop; innobase_hton->clone_interface.clone_copy = innodb_clone_copy; innobase_hton->clone_interface.clone_ack = innodb_clone_ack; innobase_hton->clone_interface.clone_end = innodb_clone_end; @@ -19036,7 +19038,7 @@ static bool innobase_collect_hton_log_info(handlerton *hton, Json_dom *json) { Json_int json_lsn(lsn); Json_int json_lsn_checkpoint(lsn_checkpoint); - ret_val = json_innodb.add_clone("LSN", &json_lsn); + ret_val = json_innodb.add_clone(log_status_lsn_key, &json_lsn); if (!ret_val) ret_val = json_innodb.add_clone("LSN_checkpoint", &json_lsn_checkpoint); if (!ret_val) ret_val = json_engines->add_clone("InnoDB", &json_innodb); diff --git a/storage/innobase/include/arch0arch.h b/storage/innobase/include/arch0arch.h index 1f243f04b03..10df8442572 100644 --- a/storage/innobase/include/arch0arch.h +++ b/storage/innobase/include/arch0arch.h @@ -1344,6 +1344,24 @@ class Arch_Log_Sys { /** Release redo log archiver mutex */ void arch_mutex_exit() { mutex_exit(&m_mutex); } + lsn_t get_stop_lsn_any_thread() const noexcept { + return m_stop_lsn.load(std::memory_order_acquire); + } + + lsn_t get_stop_lsn_master_thread() const noexcept { + // Not entirely correct for debug builds - m_stop_lsn may be updated by the + // log archiver thread in "clone_arch_log_stop_file_end" debug injection. + // Let's ignore it until it's an actual issue there. + return m_stop_lsn.load(std::memory_order_relaxed); + } + + void set_stop_lsn(lsn_t stop_lsn) noexcept { + ut_ad(get_stop_lsn_any_thread() == LSN_MAX || + DBUG_EVALUATE_IF("clone_arch_log_stop_file_end", true, false)); + + m_stop_lsn.store(stop_lsn, std::memory_order_release); + } + /** Disable copy construction */ Arch_Log_Sys(Arch_Log_Sys const &) = delete; @@ -1396,6 +1414,8 @@ class Arch_Log_Sys { mutexes. Same is true for #m_archived_lsn. */ Arch_State m_state; + atomic_lsn_t m_stop_lsn{LSN_MAX}; + /** System has archived log up to this LSN */ atomic_lsn_t m_archived_lsn; diff --git a/storage/innobase/include/arch0log.h b/storage/innobase/include/arch0log.h index d73e8ef949d..08892807a54 100644 --- a/storage/innobase/include/arch0log.h +++ b/storage/innobase/include/arch0log.h @@ -82,6 +82,12 @@ class Log_Arch_Client_Ctx { /** Release archived data so that system can purge it */ void release(); + /** Set log archiving target stop LSN. + @param[in] stop_lsn target stop LSN */ + void set_stop_lsn(lsn_t stop_lsn) { + arch_log_sys->set_stop_lsn(stop_lsn); + } + private: /** Archiver client state */ Arch_Client_State m_state; diff --git a/storage/innobase/include/clone0api.h b/storage/innobase/include/clone0api.h index 6150f4d29cc..c869d568886 100644 --- a/storage/innobase/include/clone0api.h +++ b/storage/innobase/include/clone0api.h @@ -36,6 +36,8 @@ this program; if not, write to the Free Software Foundation, Inc., #ifndef UNIV_HOTBACKUP #include "sql/handler.h" +class Json_dom; + /** Get capability flags for clone operation @param[out] flags capability flag */ void innodb_clone_get_capability(Ha_clone_flagset &flags); @@ -64,6 +66,13 @@ int innodb_clone_begin(handlerton *hton, THD *thd, const byte *&loc, int innodb_clone_copy(handlerton *hton, THD *thd, const byte *loc, uint loc_len, uint task_id, Ha_clone_cbk *cbk); +/** Set log position to stop cloning for InnoDB +@param[in] loc locator +@param[in] loc_len locator length in bytes +@param[in] log_stop_pos log position to stop cloning in JSON */ +void innodb_clone_set_log_stop(const uchar *loc, uint loc_len, + const Json_dom &log_stop_pos); + /** Acknowledge data to source database @param[in] hton handlerton for SE @param[in] thd server thread handle @@ -265,4 +274,6 @@ class Clone_notify { int m_error; }; +extern const std::string log_status_lsn_key; + #endif /* CLONE_API_INCLUDE */ diff --git a/storage/innobase/include/clone0clone.h b/storage/innobase/include/clone0clone.h index 7a5a959c519..5dcf55c06a6 100644 --- a/storage/innobase/include/clone0clone.h +++ b/storage/innobase/include/clone0clone.h @@ -358,11 +358,12 @@ class Clone_Task_Manager { @param[in] state_desc descriptor for next state @param[in] new_state next state to move to @param[in] cbk alert callback for long wait + @param[in] clone_cbk clone callback @param[out] num_wait unfinished tasks in current state @return error code */ int change_state(Clone_Task *task, Clone_Desc_State *state_desc, Snapshot_State new_state, Clone_Alert_Func cbk, - uint &num_wait); + Ha_clone_cbk *clone_cbk, uint &num_wait); /** Check if state transition is over and all tasks moved to next state @param[in] task requesting task @@ -755,6 +756,10 @@ class Clone_Handle { void close_master_file(); #endif /* UNIV_DEBUG */ + auto& get_active_snapshot() noexcept { + return *m_clone_task_manager.get_snapshot(); + } + private: /** Check if enough space is there to clone. @param[in] task current task diff --git a/storage/innobase/include/clone0snapshot.h b/storage/innobase/include/clone0snapshot.h index 0ae31c21225..f7118e180b3 100644 --- a/storage/innobase/include/clone0snapshot.h +++ b/storage/innobase/include/clone0snapshot.h @@ -442,7 +442,7 @@ class Clone_Snapshot { @return error code */ int change_state(Clone_Desc_State *state_desc, Snapshot_State new_state, byte *temp_buffer, uint temp_buffer_len, - Clone_Alert_Func cbk); + Clone_Alert_Func cbk, Ha_clone_cbk *clone_cbk); /** Add file metadata entry at destination @param[in] file_meta file metadata from donor @@ -575,6 +575,12 @@ class Clone_Snapshot { @param[in,out] block_num current, next block */ void skip_deleted_blocks(uint32_t chunk_num, uint32_t &block_num); + /** Set clone stop target LSN. + @param[in] stop_lsn archiving target stop LSN */ + void set_stop_lsn(lsn_t stop_lsn) noexcept { + m_redo_ctx.set_stop_lsn(stop_lsn); + } + private: /** Allow DDL file operation after 64 pages. */ const static uint32_t S_MAX_PAGES_PIN = 64; @@ -772,7 +778,8 @@ class Clone_Snapshot { @param[in] new_state state to move for apply @param[in] cbk alert callback for long wait @return error code */ - int init_redo_copy(Snapshot_State new_state, Clone_Alert_Func cbk); + int init_redo_copy(Snapshot_State new_state, Clone_Alert_Func cbk, + Ha_clone_cbk *clone_cbk); /** Initialize state while applying cloned data @param[in] state_desc snapshot state descriptor diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h index 4cb91b39a89..b25d9fc0486 100644 --- a/storage/innobase/include/log0recv.h +++ b/storage/innobase/include/log0recv.h @@ -238,6 +238,15 @@ void recv_sys_init(ulint max_mem); @return LSN after data addition */ lsn_t recv_calc_lsn_on_data_add(lsn_t lsn, uint64_t len); +/** Reads a specified log segment to a buffer. +@param[in,out] log redo log +@param[in,out] buf buffer where to read +@param[in] start_lsn read area start +@param[in] end_lsn read area end +@param[in] online whether this read is for a running server */ +void recv_read_log_seg(log_t &log, byte *buf, lsn_t start_lsn, lsn_t end_lsn, + bool online = false); + /** Empties the hash table of stored log records, applying them to appropriate pages. @param[in,out] log Redo log diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index 88f15431d45..a75df830952 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -213,14 +213,6 @@ static bool recv_writer_is_active() { #ifndef UNIV_HOTBACKUP -/** Reads a specified log segment to a buffer. -@param[in,out] log redo log -@param[in,out] buf buffer where to read -@param[in] start_lsn read area start -@param[in] end_lsn read area end */ -static void recv_read_log_seg(log_t &log, byte *buf, lsn_t start_lsn, - lsn_t end_lsn); - /** Initialize crash recovery environment. Can be called if recv_needed_recovery == false @return DB_SUCCESS for success, others for errors */ @@ -3683,15 +3675,18 @@ bool meb_read_log_encryption(IORequest &encryption_request, @param[in,out] log redo log @param[in,out] buf buffer where to read @param[in] start_lsn read area start -@param[in] end_lsn read area end */ -static void recv_read_log_seg(log_t &log, byte *buf, lsn_t start_lsn, - lsn_t end_lsn) { - log_background_threads_inactive_validate(log); +@param[in] end_lsn read area end +@param[in] online whether this read is for a running server */ +void recv_read_log_seg(log_t &log, byte *buf, lsn_t start_lsn, + lsn_t end_lsn, bool online) { + if (!online) log_background_threads_inactive_validate(log); do { lsn_t source_offset; + if (online) log_writer_mutex_enter(log); source_offset = log_files_real_offset_for_lsn(log, start_lsn); + if (online) log_writer_mutex_exit(log); ut_a(end_lsn - start_lsn <= ULINT_MAX); diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index d1869614365..c0a367eea59 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -2581,10 +2581,17 @@ files_checked: srv_dict_metadata = recv_recovery_from_checkpoint_finish(*log_sys, false); + // Disabled for the clone cross-engine synchronization: the metadata updates + // are redo-logged and the clone will take the ones up to the clone LSN. + // Any metadata past that point is inconsistent. Thus keep the metadata we + // reconstructed from the redo log. In the case there is no second storage + // engine this is still a correct implementation. +#if 0 if (recv_sys->is_cloned_db && srv_dict_metadata != nullptr) { ut::delete_(srv_dict_metadata); srv_dict_metadata = nullptr; } +#endif /* We need to save the dynamic metadata collected from redo log to DD buffer table here. This is to make sure that the dynamic metadata is not @@ -2592,7 +2599,13 @@ files_checked: objects are not fully initialized at this point, the usual mechanism to persist dynamic metadata at checkpoint wouldn't work. */ - if (srv_dict_metadata != nullptr && !srv_dict_metadata->empty()) { + // Do not do this in the case of cross-engine consistent clone: any redo log + // writes risk failing due to not enough redo log space, and we already have + // metadata from redo log application. If we crash before the first + // checkpoint, we'd read it again from the redo, otherwise the first + // checkpoint will write it to its table. + if (!recv_sys->is_cloned_db && srv_dict_metadata != nullptr && + !srv_dict_metadata->empty()) { /* Open this table in case srv_dict_metadata should be applied to this table before checkpoint. And because DD is not fully up yet, the table can be opened by internal APIs. */ diff --git a/storage/perfschema/table_log_status.h b/storage/perfschema/table_log_status.h index 85aedeff957..9c78da43b44 100644 --- a/storage/perfschema/table_log_status.h +++ b/storage/perfschema/table_log_status.h @@ -90,6 +90,11 @@ class table_log_status : public PFS_engine_table { int rnd_next() override; int rnd_pos(const void *pos) override; + + // Added for the MyRocks clone cross-engine sync + st_row_log_status &get_row() noexcept { + return m_row; + } }; #endif