diff --git a/sql/mdl.cc b/sql/mdl.cc index 846b4fd6..36ffd2ce 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -53,8 +53,13 @@ #include "mysqld_error.h" #include "prealloced_array.h" #include "sql/debug_sync.h" +#include "sql/mysqld_thd_manager.h" +#include "sql/sql_class.h" +#include "sql/system_variables.h" #include "sql/thr_malloc.h" +#include + extern MYSQL_PLUGIN_IMPORT CHARSET_INFO *system_charset_info; static PSI_memory_key key_memory_MDL_context_acquire_locks; @@ -3349,6 +3354,49 @@ void MDL_lock::object_lock_notify_conflicting_locks(MDL_context *ctx, } } +/** + Length of the brief retry window (in seconds) +*/ +static constexpr int MDL_KILL_BLOCKERS_GRACE_SECONDS = 10; + +namespace { + +class Collect_user_thread_ids : public Do_THD_Impl { + public: + Collect_user_thread_ids(my_thread_id self_id, + std::vector *out) + : m_self_id(self_id), m_out(out) {} + + void operator()(THD *thd) override { + if (thd->is_system_thread()) return; + if (thd->thread_id() == m_self_id) return; + m_out->push_back(thd->thread_id()); + } + + private: + const my_thread_id m_self_id; + std::vector *const m_out; +}; +} // namespace + +// Collect user thread ids +static void snapshot_user_thread_ids(my_thread_id self_id, + std::vector *thds) { + Collect_user_thread_ids collector(self_id, thds); + Global_THD_manager::get_instance()->do_for_all_thd(&collector); +} + +static void kill_user_threads(const std::vector &targets) { + Global_THD_manager *thd_manager = Global_THD_manager::get_instance(); + for (my_thread_id id : targets) { + Find_thd_with_id finder(id); + THD_ptr target_thd = thd_manager->find_thd(&finder); + if (!target_thd) continue; + if (target_thd->killed == THD::KILL_CONNECTION) continue; + target_thd->awake(THD::KILL_CONNECTION); + } +} + /** Acquire one lock with waiting for conflicting locks to go away if needed. @@ -3539,6 +3587,29 @@ bool MDL_context::acquire_lock(MDL_request *mdl_request, mdl_request->key.get_wait_state_name()); } + /** + kill all user threads + */ + if (wait_status == MDL_wait::TIMEOUT) { + THD *requestor_thd = get_thd(); + if (requestor_thd != nullptr && + requestor_thd->variables.force_ddl_execution && + m_owner->is_executing_ddl()) { + std::vector thds; + snapshot_user_thread_ids(requestor_thd->thread_id(), &thds); + + if (!thds.empty()) { + kill_user_threads(thds); + + struct timespec abs_grace; + set_timespec(&abs_grace, MDL_KILL_BLOCKERS_GRACE_SECONDS); + m_wait.reset_status(); + wait_status = m_wait.timed_wait(m_owner, &abs_grace, true, + mdl_request->key.get_wait_state_name()); + } + } + } + done_waiting_for(); #ifdef HAVE_PSI_METADATA_INTERFACE diff --git a/sql/mdl.h b/sql/mdl.h index 6e57431f..27986390 100644 --- a/sql/mdl.h +++ b/sql/mdl.h @@ -185,6 +185,11 @@ class MDL_context_owner { of PRNG for the MDL_context. */ virtual uint get_rand_seed() const = 0; + + /** + Weather the owner is currently executing a DDL statement + */ + virtual bool is_executing_ddl() const { return false; } }; /** diff --git a/sql/sql_class.h b/sql/sql_class.h index 0462c6ef..0155f81b 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2806,7 +2806,28 @@ class THD : public MDL_context_owner, */ bool m_is_fatal_error; + /** + Set to true during executing DDL. + */ + bool m_is_executing_ddl{false}; + public: + class DDL_executing_scope { + public: + explicit DDL_executing_scope(THD *thd) + : m_thd(thd), m_prev_value(thd->m_is_executing_ddl) { + m_thd->m_is_executing_ddl = true; + } + ~DDL_executing_scope() { m_thd->m_is_executing_ddl = m_prev_value; } + + DDL_executing_scope(const DDL_executing_scope &) = delete; + DDL_executing_scope &operator=(const DDL_executing_scope &) = delete; + + private: + THD *const m_thd; + const bool m_prev_value; + }; + /** Set by a storage engine to request the entire transaction (that possibly spans multiple engines) to @@ -3135,6 +3156,9 @@ class THD : public MDL_context_owner, } int is_killed() const final { return killed; } + + bool is_executing_ddl() const override { return m_is_executing_ddl; } + bool might_have_commit_order_waiters() const final { /* We need to return if this thread can have any commit order waiters diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index b27ac336..f9dcee1b 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -3031,6 +3031,11 @@ bool remove_automatic_sp_privileges(THD *thd, enum_sp_type sp_type, int mysql_execute_command(THD *thd, bool first_level) { int res = false; LEX *const lex = thd->lex; + + std::optional ddl_executing_scope; + if (sql_command_flags[lex->sql_command] & CF_POTENTIAL_ATOMIC_DDL) { + ddl_executing_scope.emplace(thd); + } /* first Query_block (have special meaning for many of non-SELECTcommands) */ Query_block *const query_block = lex->query_block; /* first table of first Query_block */ diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 8ef0199c..ffc1a7cd 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2381,6 +2381,12 @@ static Sys_var_ulong Sys_lock_wait_timeout( HINT_UPDATEABLE SESSION_VAR(lock_wait_timeout), CMD_LINE(REQUIRED_ARG), VALID_RANGE(1, LONG_TIMEOUT), DEFAULT(LONG_TIMEOUT), BLOCK_SIZE(1)); +static Sys_var_bool Sys_force_ddl_execution( + "force_ddl_execution", + "If ON, a DDL statement that times out waiting for a metadata lock " + "will kill other users' sessions(connections).", + SESSION_VAR(force_ddl_execution), CMD_LINE(OPT_ARG), DEFAULT(false)); + #ifdef HAVE_MLOCKALL static Sys_var_bool Sys_locked_in_memory( "locked_in_memory", "Whether mysqld was locked in memory with --memlock", diff --git a/sql/system_variables.h b/sql/system_variables.h index 2271656a..96d598db 100644 --- a/sql/system_variables.h +++ b/sql/system_variables.h @@ -241,6 +241,7 @@ struct System_variables { ulonglong histogram_generation_max_mem_size; ulong join_buff_size; ulong lock_wait_timeout; + bool force_ddl_execution; ulong max_allowed_packet; ulong max_error_count; ulong max_length_for_sort_data; ///< Unused.