From 5852bd38b79d44fad4c6c14d174643adb4c93ea9 Mon Sep 17 00:00:00 2001 From: lilinhua Date: Wed, 27 Nov 2024 16:55:33 +0800 Subject: [PATCH] feat: support flashback Problem: -------- Old master may have extra transactions, if add the old master as slave to new master, data will be inconsistent. Solution: -------- flashback to the gtid that new master and old master has same data, extra binlog will be truncated, and extra gtid will be rollbacked. --- include/my_sqlcommand.h | 2 + include/mysql/plugin_audit.h.pp | 2 + libbinlogevents/include/rows_event.h | 1 + libbinlogevents/src/rows_event.cpp | 2 + mysql-test/r/information_schema_keywords.result | 1 + mysql-test/r/mysqld--help-notwin.result | 2 +- .../binlog_gtid/r/binlog_gtid_flashback.result | 142 +++++ .../binlog_gtid/t/binlog_gtid_flashback-master.opt | 2 + .../suite/binlog_gtid/t/binlog_gtid_flashback.test | 214 +++++++ share/messages_to_clients.txt | 6 + share/messages_to_error_log.txt | 27 + sql/binlog.cc | 63 ++ sql/binlog.h | 2 + sql/lex.h | 1 + sql/log_event.cc | 599 +++++++++++++++++- sql/log_event.h | 22 +- sql/mysqld.cc | 21 +- sql/mysqld.h | 1 + sql/parse_tree_nodes.cc | 7 + sql/parse_tree_nodes.h | 11 + sql/rpl_gtid.h | 31 +- sql/rpl_gtid_execution.cc | 14 +- sql/rpl_gtid_persist.cc | 61 ++ sql/rpl_gtid_persist.h | 4 + sql/rpl_gtid_state.cc | 71 ++- sql/rpl_replica.cc | 671 +++++++++++++++++++++ sql/rpl_replica.h | 48 ++ sql/rpl_rli.cc | 1 + sql/rpl_rli.h | 3 + sql/rpl_tblmap.cc | 85 +++ sql/rpl_tblmap.h | 50 ++ sql/sp.cc | 1 + sql/sql_binlog.cc | 2 +- sql/sql_binlog.h | 4 +- sql/sql_lex.h | 1 + sql/sql_parse.cc | 21 +- sql/sql_show.cc | 4 + sql/sql_show.h | 5 + sql/sql_yacc.yy | 32 + storage/perfschema/pfs.cc | 2 - 40 files changed, 2174 insertions(+), 65 deletions(-) create mode 100644 mysql-test/suite/binlog_gtid/r/binlog_gtid_flashback.result create mode 100644 mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback-master.opt create mode 100644 mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback.test diff --git a/include/my_sqlcommand.h b/include/my_sqlcommand.h index 384ef99..816f2f2 100644 --- a/include/my_sqlcommand.h +++ b/include/my_sqlcommand.h @@ -202,6 +202,8 @@ enum enum_sql_command { SQLCOM_RESTART_SERVER, SQLCOM_CREATE_SRS, SQLCOM_DROP_SRS, + SQLCOM_FLASH_BACK, + SQLCOM_SHOW_FLASHBACK_STATUS, /* This should be the last !!! */ SQLCOM_END }; diff --git a/include/mysql/plugin_audit.h.pp b/include/mysql/plugin_audit.h.pp index 3738ce4..07194d6 100644 --- a/include/mysql/plugin_audit.h.pp +++ b/include/mysql/plugin_audit.h.pp @@ -336,6 +336,8 @@ enum enum_sql_command { SQLCOM_RESTART_SERVER, SQLCOM_CREATE_SRS, SQLCOM_DROP_SRS, + SQLCOM_FLASH_BACK, + SQLCOM_SHOW_FLASHBACK_STATUS, SQLCOM_END }; #include "plugin_audit_message_types.h" diff --git a/libbinlogevents/include/rows_event.h b/libbinlogevents/include/rows_event.h index 437cfeb..3624008 100644 --- a/libbinlogevents/include/rows_event.h +++ b/libbinlogevents/include/rows_event.h @@ -954,6 +954,7 @@ class Rows_event : public Binary_log_event { std::vector columns_before_image; std::vector columns_after_image; std::vector row; + unsigned long long m_rows_before_size; /* The length before row */ public: class Extra_row_info { diff --git a/libbinlogevents/src/rows_event.cpp b/libbinlogevents/src/rows_event.cpp index 4f3e66d..9cf0ce4 100644 --- a/libbinlogevents/src/rows_event.cpp +++ b/libbinlogevents/src/rows_event.cpp @@ -472,6 +472,8 @@ Rows_event::Rows_event(const char *buf, const Format_description_event *fde) } else columns_after_image = columns_before_image; + m_rows_before_size = reader().position(); + data_size = READER_CALL(available_to_read); READER_TRY_CALL(assign, &row, data_size); // JAG: TODO: Investigate and comment here about the need of this extra byte diff --git a/mysql-test/r/information_schema_keywords.result b/mysql-test/r/information_schema_keywords.result index 5503565..f03c2b6 100644 --- a/mysql-test/r/information_schema_keywords.result +++ b/mysql-test/r/information_schema_keywords.result @@ -200,6 +200,7 @@ FINISH 0 FIRST 0 FIRST_VALUE 1 FIXED 0 +FLASHBACK 0 FLOAT 1 FLOAT4 1 FLOAT8 1 diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index ef10f24..5690515 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -1918,7 +1918,7 @@ performance-schema-max-socket-classes 10 performance-schema-max-socket-instances -1 performance-schema-max-sql-text-length 1024 performance-schema-max-stage-classes 175 -performance-schema-max-statement-classes 219 +performance-schema-max-statement-classes 221 performance-schema-max-statement-stack 10 performance-schema-max-table-handles -1 performance-schema-max-table-instances -1 diff --git a/mysql-test/suite/binlog_gtid/r/binlog_gtid_flashback.result b/mysql-test/suite/binlog_gtid/r/binlog_gtid_flashback.result new file mode 100644 index 0000000..4622c95 --- /dev/null +++ b/mysql-test/suite/binlog_gtid/r/binlog_gtid_flashback.result @@ -0,0 +1,142 @@ +include/gtid_utils.inc +CREATE TABLE t1 (a INT, b INT) ENGINE = InnoDB; +INSERT INTO t1 VALUES (1, 500); +INSERT INTO t1 VALUES (2, 501); +INSERT INTO t1 VALUES (3, 502); +INSERT INTO t1 VALUES (4, 503); +RESET MASTER; +INSERT INTO t1 VALUES (5, 504); +delete from t1 where a = 2; +update t1 set b=b+1 where a = 3; +select *from t1; +a b +1 500 +3 503 +4 503 +5 504 +include/assert.inc [committed gno 3 on master] +flashback to 'MASTER_UUID:1'; +ERROR HY000: flash back access denied, read_only should set to true +show flashback status; +start_gtid transaction_rollback_counts binlog_name start_time end_time +select *from t1; +a b +1 500 +3 503 +4 503 +5 504 +set global read_only=1; +flashback to 'MASTER_UUID:1'; +use test; +include/assert.inc [flash back to 1 on master] +show flashback status; +start_gtid transaction_rollback_counts binlog_name start_time end_time + 2 +select *from t1; +a b +1 500 +3 502 +4 503 +5 504 +2 501 +RESET MASTER; +update t1 set b=b+1 where a = 3; +update t1 set b=b+1 where a = 3; +alter table t1 add c int; +insert into t1 values(6,505,1001); +select *from t1; +a b c +1 500 NULL +3 504 NULL +4 503 NULL +5 504 NULL +2 501 NULL +6 505 1001 +include/assert.inc [committed gno2 4 on master] +flashback to 'MASTER_UUID:2'; +ERROR HY000: flash back error, find unsupported event in last binlog file +show flashback status; +start_gtid transaction_rollback_counts binlog_name start_time end_time + 2 +select *from t1; +a b c +1 500 NULL +3 504 NULL +4 503 NULL +5 504 NULL +2 501 NULL +6 505 1001 +CREATE TABLE t2 (a INT, b json, c blob) ENGINE = InnoDB; +INSERT INTO t2 VALUES (1, '{ "key_a": 1, "key_b": 2, "key_c": 3 }', repeat('blob', 2)); +INSERT INTO t2 VALUES (2, '{ "key_d": 4, "key_e": 5, "key_f": 6 }', repeat('text', 2)); +INSERT INTO t2 VALUES (3, '{ "key_h": 7, "key_i": 8, "key_j": 9 }', repeat('good', 2)); +RESET MASTER; +begin; +update t2 set b='{ "key_u": 10, "key_v": 11, "key_w": 12 }' where a = 2; +delete from t2 where a = 1; +commit; +select *from t2; +a b c +2 {"key_u": 10, "key_v": 11, "key_w": 12} texttext +3 {"key_h": 7, "key_i": 8, "key_j": 9} goodgood +begin; +INSERT INTO t2 VALUES (4, '{ "key_l": 17, "key_m": 18, "key_n": 19 }', repeat('ksql', 2)); +update t2 set b='{ "key_u": 10, "key_v": 11, "key_w": 12 }' where a = 3; +commit; +begin; +delete from t2 where a = 4; +commit; +select *from t2; +a b c +2 {"key_u": 10, "key_v": 11, "key_w": 12} texttext +3 {"key_u": 10, "key_v": 11, "key_w": 12} goodgood +include/assert.inc [committed gno3 3 on master] +flashback to 'MASTER_UUID:1'; +use test; +include/assert.inc [flash back to 1 on master] +show flashback status; +start_gtid transaction_rollback_counts binlog_name start_time end_time + 2 +select *from t2; +a b c +2 {"key_u": 10, "key_v": 11, "key_w": 12} texttext +3 {"key_h": 7, "key_i": 8, "key_j": 9} goodgood +CREATE TABLE t3 (a INT) ENGINE = InnoDB; +RESET MASTER; +INSERT INTO t3 VALUES (1); +INSERT INTO t3 VALUES (2); +INSERT INTO t3 VALUES (3); +select *from t3; +a +1 +2 +3 +flashback to 'MASTER_UUID:1'; +ERROR HY000: Lost connection to MySQL server during query +# restart: --read-only=1 +select *from t3; +a +1 +2 +3 +CREATE TABLE t4 (a INT) ENGINE = InnoDB; +RESET MASTER; +INSERT INTO t4 VALUES (1); +INSERT INTO t4 VALUES (2); +FLUSH LOGS; +INSERT INTO t4 VALUES (3); +INSERT INTO t4 VALUES (4); +include/assert.inc [committed gno5 4 on master] +flashback to 'MASTER_UUID:1'; +ERROR HY000: flash back error, start gtid is invalid +show flashback status; +start_gtid transaction_rollback_counts binlog_name start_time end_time +select *from t4; +a +1 +2 +3 +4 +SET GLOBAL read_only= 0; +DROP TABLE test.t1, test.t2, test.t3, test.t4; +include/gtid_utils_end.inc diff --git a/mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback-master.opt b/mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback-master.opt new file mode 100644 index 0000000..9e69dff --- /dev/null +++ b/mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback-master.opt @@ -0,0 +1,2 @@ +--default-storage-engine=innodb +--gtid-mode=ON diff --git a/mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback.test b/mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback.test new file mode 100644 index 0000000..031c0b7 --- /dev/null +++ b/mysql-test/suite/binlog_gtid/t/binlog_gtid_flashback.test @@ -0,0 +1,214 @@ +--source include/have_log_bin.inc +--source include/have_debug.inc +--source include/gtid_utils.inc +--source include/have_binlog_format_row.inc + +connect(conn1,localhost,root,,); +CREATE TABLE t1 (a INT, b INT) ENGINE = InnoDB; +--let $saved_value= `SELECT @@GLOBAL.read_only` + +--let $master_uuid= `SELECT @@SERVER_UUID` +--let $gno= 0 + +INSERT INTO t1 VALUES (1, 500); +INSERT INTO t1 VALUES (2, 501); +INSERT INTO t1 VALUES (3, 502); +INSERT INTO t1 VALUES (4, 503); +# Clean gtid_executed so that test can execute after other tests +RESET MASTER; + +INSERT INTO t1 VALUES (5, 504); +--inc $gno +delete from t1 where a = 2; +--inc $gno +update t1 set b=b+1 where a = 3; +--inc $gno + +select *from t1; + +--let $assert_text= committed gno $gno on master +--let $assert_cond= GTID_IS_EQUAL(@@GLOBAL.GTID_EXECUTED, "$master_uuid:1-$gno") +--source include/assert.inc + +#----------------------------------------------------------------------------- +# +# 1. flashback need read_only=1 +# +--replace_result $master_uuid MASTER_UUID +--error ER_FLASHBACK_ACCESS_DENIED_ERROR +--eval flashback to '$master_uuid:1' +--replace_column 1 3 4 5 +show flashback status; +select *from t1; + + +#----------------------------------------------------------------------------- +# +# 2. flashback auto commit dml +# +set global read_only=1; +--replace_result $master_uuid MASTER_UUID +--eval flashback to '$master_uuid:1' +use test; +--let $assert_text= flash back to 1 on master +--let $assert_cond= GTID_IS_EQUAL(@@GLOBAL.GTID_EXECUTED, "$master_uuid:1") +--source include/assert.inc + +--replace_column 1 3 4 5 +show flashback status; +select *from t1; + +#----------------------------------------------------------------------------- +# +# 3. flashback ddl +# +RESET MASTER; +--let $gno2= 0 +update t1 set b=b+1 where a = 3; +--inc $gno2 +update t1 set b=b+1 where a = 3; +--inc $gno2 +alter table t1 add c int; +--inc $gno2 +insert into t1 values(6,505,1001); +--inc $gno2 + +select *from t1; + +--let $assert_text= committed gno2 $gno2 on master +--let $assert_cond= GTID_IS_EQUAL(@@GLOBAL.GTID_EXECUTED, "$master_uuid:1-$gno2") +--source include/assert.inc + +--replace_result $master_uuid MASTER_UUID +--error ER_FLASH_BACK_ERROR +--eval flashback to '$master_uuid:2' + +--replace_column 1 3 4 5 +show flashback status; +select *from t1; + +#----------------------------------------------------------------------------- +# +# 4. flashback multi-statements transaction dml and json field +# +CREATE TABLE t2 (a INT, b json, c blob) ENGINE = InnoDB; + +INSERT INTO t2 VALUES (1, '{ "key_a": 1, "key_b": 2, "key_c": 3 }', repeat('blob', 2)); +INSERT INTO t2 VALUES (2, '{ "key_d": 4, "key_e": 5, "key_f": 6 }', repeat('text', 2)); +INSERT INTO t2 VALUES (3, '{ "key_h": 7, "key_i": 8, "key_j": 9 }', repeat('good', 2)); + +RESET MASTER; + +--let $gno3= 0 +begin; +update t2 set b='{ "key_u": 10, "key_v": 11, "key_w": 12 }' where a = 2; +delete from t2 where a = 1; +commit; +--inc $gno3 + +select *from t2; + +begin; +INSERT INTO t2 VALUES (4, '{ "key_l": 17, "key_m": 18, "key_n": 19 }', repeat('ksql', 2)); +update t2 set b='{ "key_u": 10, "key_v": 11, "key_w": 12 }' where a = 3; +commit; +--inc $gno3 + +begin; +delete from t2 where a = 4; +commit; +--inc $gno3 + +select *from t2; +--let $assert_text= committed gno3 $gno3 on master +--let $assert_cond= GTID_IS_EQUAL(@@GLOBAL.GTID_EXECUTED, "$master_uuid:1-$gno3") +--source include/assert.inc + +--replace_result $master_uuid MASTER_UUID +--eval flashback to '$master_uuid:1' +use test; +--let $assert_text= flash back to 1 on master +--let $assert_cond= GTID_IS_EQUAL(@@GLOBAL.GTID_EXECUTED, "$master_uuid:1") +--source include/assert.inc + +--replace_column 1 3 4 5 +show flashback status; +select *from t2; + +#----------------------------------------------------------------------------- +# +# 5. flashback crash safe +# +CREATE TABLE t3 (a INT) ENGINE = InnoDB; +RESET MASTER; +--let $gno4= 0 +INSERT INTO t3 VALUES (1); +--inc $gno4 +INSERT INTO t3 VALUES (2); +--inc $gno4 +INSERT INTO t3 VALUES (3); +--inc $gno4 + +select *from t3; + +--disable_query_log +set session debug="+d,flashback_crash_before_commit"; +--enable_query_log + +# Write file to make mysql-test-run.pl wait for the server to stop +--exec echo "wait" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect + +--replace_result $master_uuid MASTER_UUID +--error 2013 +--eval flashback to '$master_uuid:1' +# Call script that will poll the server waiting for it to disappear +--source include/wait_until_disconnected.inc + +--let $_expect_file_name= $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +--let $restart_parameters= restart: --read-only=1 +--source include/start_mysqld.inc + +--disable_query_log +set session debug="-d,flashback_crash_before_commit"; +--enable_query_log + +select *from t3; + +#----------------------------------------------------------------------------- +# +# 6. flashback input a gtid not in last binlogfile +# +CREATE TABLE t4 (a INT) ENGINE = InnoDB; +RESET MASTER; +--let $gno5= 0 +INSERT INTO t4 VALUES (1); +--inc $gno5 +INSERT INTO t4 VALUES (2); +--inc $gno5 + +FLUSH LOGS; + +INSERT INTO t4 VALUES (3); +--inc $gno5 + +INSERT INTO t4 VALUES (4); +--inc $gno5 + +--let $assert_text= committed gno5 $gno5 on master +--let $assert_cond= GTID_IS_EQUAL(@@GLOBAL.GTID_EXECUTED, "$master_uuid:1-$gno5") +--source include/assert.inc + +--replace_result $master_uuid MASTER_UUID +--error ER_FLASH_BACK_ERROR +--eval flashback to '$master_uuid:1' + +--replace_column 1 3 4 5 +show flashback status; + +select *from t4; + + +--eval SET GLOBAL read_only= $saved_value +DROP TABLE test.t1, test.t2, test.t3, test.t4; + +--source include/gtid_utils_end.inc diff --git a/share/messages_to_clients.txt b/share/messages_to_clients.txt index fcfb78f..ed9d075 100644 --- a/share/messages_to_clients.txt +++ b/share/messages_to_clients.txt @@ -9944,6 +9944,12 @@ ER_INNODB_INSTANT_ADD_NOT_SUPPORTED_MAX_FIELDS ER_CANT_SET_PERSISTED eng "Failed to set persisted options." +ER_FLASH_BACK_ERROR + eng "flash back error, %s" + +ER_FLASHBACK_ACCESS_DENIED_ERROR + eng "flash back access denied, %s" + # # End of 8.0 error messages (server-to-client). # Do NOT add messages intended for the error log above! diff --git a/share/messages_to_error_log.txt b/share/messages_to_error_log.txt index c593f51..a2c2668 100644 --- a/share/messages_to_error_log.txt +++ b/share/messages_to_error_log.txt @@ -12072,6 +12072,33 @@ ER_IB_INDEX_LOADER_DONE ER_IB_INDEX_BUILDER_DONE eng "Builder::finish(): Completed building index=%s of table=%s, err=%zu." +ER_FLASH_BACK_ROW_LENGTH_ERROR + eng "row length: %d error, can not flashback" + +ER_FLASH_BACK_OUT_OF_MEMORY + eng "out of memory, can not flashback" + +ER_FLASH_BACK_DATA_INCONSISTENT + eng "Server shutdown where flash back, data is inconsistent, set read_only=on and restart server" + +ER_FLASH_BACK_CHANGE_EVENT_FAIL + eng "event type: %d change_to_flashback_event failed" + +ER_FLASH_BACK_MSG + eng "%s" + +ER_FLASH_BACK_PARSE_LAST_BINLOG_FILE + eng "total events need to flash back: %d" + +ER_FLASH_BACK_BINLOG_CANT_DELETE_FILE + eng "Failed to delete flashback backup file '%s'" + +ER_FLASH_BACK_FAIL_MSG + eng "flash back end failed, start_gtid: %s, %s" + +ER_FLASH_BACK_SUCCESS_MSG + eng "flash back end success, start_gtid: %s, and rollback %ld transactions" + # DO NOT add server-to-client messages here; # they go in messages_to_clients.txt # in the same directory as this file. diff --git a/sql/binlog.cc b/sql/binlog.cc index bec6a1f..12e1a46 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -134,6 +134,7 @@ #include "sql/xa/sql_cmd_xa.h" // Sql_cmd_xa_* #include "sql_partition.h" #include "thr_lock.h" +#include class Item; @@ -5698,6 +5699,18 @@ bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool delete_only) { goto err; } } + + /* delete flashback file if exists */ + char backup_file_name[1024]; + strcpy(backup_file_name, linfo.log_file_name); + sprintf(backup_file_name, "%s%s", linfo.log_file_name, "_flashback"); + std::ifstream is_exist(backup_file_name, std::ios::binary); + if (is_exist) { + if (my_delete_allow_opened(backup_file_name, MYF(0)) != 0) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_BINLOG_CANT_DELETE_FILE, linfo.log_file_name); + } + } + if (find_next_log(&linfo, false /*need_lock_index=false*/)) break; } @@ -6277,6 +6290,17 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space, goto err; } } + + /* delete flashback file if exists */ + char backup_file_name[1024]; + strcpy(backup_file_name, log_info.log_file_name); + sprintf(backup_file_name, "%s%s", log_info.log_file_name, "_flashback"); + std::ifstream is_exist(backup_file_name, std::ios::binary); + if (is_exist) { + if (my_delete_allow_opened(backup_file_name, MYF(0)) != 0) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_BINLOG_CANT_DELETE_FILE, backup_file_name); + } + } } } } @@ -7935,6 +7959,45 @@ err: return error; } +int backup_binlog_file_before_truncate(char *log_name) { + char backup_file_name[FN_REFLEN]; + strcpy(backup_file_name, log_name); + sprintf(backup_file_name, "%s%s", log_name, "_flashback"); + std::ifstream src(log_name, std::ios::binary); + std::ifstream is_exist(backup_file_name, std::ios::binary); + if (is_exist) { + if (my_delete_allow_opened(backup_file_name, MYF(0)) != 0) { + return 1; + } + } + std::ofstream dst(backup_file_name, std::ios::binary); + dst << src.rdbuf(); + return 0; +} + +bool MYSQL_BIN_LOG::truncate_binlog_file(my_off_t valid_pos, char *log_name) { + if (backup_binlog_file_before_truncate(log_name)) { + return true; + } + + bool dont_care = false; + my_off_t binlog_size = m_binlog_file->position(); + + /* Change binlog file size to valid_pos */ + if (valid_pos < binlog_size) { + if (m_binlog_file->truncate(valid_pos)) { + LogErr(ERROR_LEVEL, ER_BINLOG_CANT_TRIM_CRASHED_BINLOG); + return true; + } + } + + if (rotate(true, &dont_care)) { + return true; + } + + return false; +} + /** Truncate the active relay log file in the specified position. diff --git a/sql/binlog.h b/sql/binlog.h index f0df701..84bee1d 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -50,6 +50,7 @@ #include "sql/tc_log.h" // TC_LOG #include "sql/transaction_info.h" // Transaction_ctx #include "thr_mutex.h" +#include "sql/binlog_reader.h" class Format_description_log_event; class Gtid_monitoring_info; @@ -677,6 +678,7 @@ class MYSQL_BIN_LOG : public TC_LOG { void close() override; enum_result commit(THD *thd, bool all) override; int rollback(THD *thd, bool all) override; + bool truncate_binlog_file(my_off_t valid_pos, char *log_name); bool truncate_relaylog_file(Master_info *mi, my_off_t valid_pos); int prepare(THD *thd, bool all) override; #if defined(MYSQL_SERVER) diff --git a/sql/lex.h b/sql/lex.h index 7761761..d130732 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -267,6 +267,7 @@ static const SYMBOL symbols[] = { {SYM("FIRST", FIRST_SYM)}, {SYM("FIRST_VALUE", FIRST_VALUE_SYM)}, {SYM("FIXED", FIXED_SYM)}, + {SYM("FLASHBACK", FLASHBACK_SYM)}, {SYM("FLOAT", FLOAT_SYM)}, {SYM("FLOAT4", FLOAT_SYM)}, {SYM("FLOAT8", DOUBLE_SYM)}, diff --git a/sql/log_event.cc b/sql/log_event.cc index 073825f..c3e2ec6 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -82,10 +82,10 @@ #ifndef MYSQL_SERVER #include "client/mysqlbinlog.h" +#endif #include "sql-common/json_binary.h" #include "sql-common/json_dom.h" // Json_wrapper #include "sql/json_diff.h" // enum_json_diff_operation -#endif #ifdef MYSQL_SERVER @@ -428,10 +428,12 @@ static bool set_thd_db(THD *thd, const char *db, size_t db_len) { new_db.str = db; /* This function is called by a slave thread. */ - assert(thd->rli_slave); + assert(thd->rli_slave || (thd->rli_fake && thd->rli_fake->is_flashback)); - Rpl_filter *rpl_filter = thd->rli_slave->rpl_filter; - new_db.str = rpl_filter->get_rewrite_db(new_db.str, &new_db.length); + if (thd->rli_slave) { + Rpl_filter *rpl_filter = thd->rli_slave->rpl_filter; + new_db.str = rpl_filter->get_rewrite_db(new_db.str, &new_db.length); + } if (lower_case_table_names) { /* lcase_db_buf != new_db.str means that lcase_db_buf is rewritten. */ @@ -1449,6 +1451,7 @@ void Log_event::print_header(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info, my_b_write(file, reinterpret_cast("# "), 2); } } +#endif // ifndef MYSQL_SERVER /** Auxiliary function that sets up a conversion table for m_b_write_quoted. @@ -1585,7 +1588,6 @@ static void my_b_write_sint32_and_uint32(IO_CACHE *file, int32 si, uint32 ui) { if (si < 0) my_b_printf(file, " (%u)", ui); } -#ifndef MYSQL_SERVER static const char *json_diff_operation_name(enum_json_diff_operation op, int last_path_char) { switch (op) { @@ -1785,7 +1787,6 @@ static const char *print_json_diff(IO_CACHE *out, const uchar *data, return nullptr; } -#endif // ifndef MYSQL_SERVER /** Print a packed value of the given SQL type into IO cache @@ -2152,7 +2153,7 @@ static size_t log_event_print_value(IO_CACHE *file, const uchar *ptr, uint type, @retval - number of bytes scanned. */ - +#ifndef MYSQL_SERVER size_t Rows_log_event::print_verbose_one_row( IO_CACHE *file, table_def *td, PRINT_EVENT_INFO *print_event_info, MY_BITMAP *cols_bitmap, const uchar *value, const uchar *prefix, @@ -2238,6 +2239,7 @@ size_t Rows_log_event::print_verbose_one_row( } return value - value0; } +#endif /** Print a row event into IO cache in human readable form (in SQL format) @@ -2245,6 +2247,7 @@ size_t Rows_log_event::print_verbose_one_row( @param[in] file IO cache @param[in] print_event_info Print parameters */ +#ifndef MYSQL_SERVER void Rows_log_event::print_verbose(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info) { // Quoted length of the identifier can be twice the original length @@ -2463,7 +2466,6 @@ void Log_event::print_timestamp(IO_CACHE *file, time_t *ts) const { res->tm_mon + 1, res->tm_mday, res->tm_hour, res->tm_min, res->tm_sec); } - #endif /* !MYSQL_SERVER */ #if defined(MYSQL_SERVER) @@ -4782,9 +4784,13 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, Parser_state parser_state; if (!parser_state.init(thd, thd->query().str, thd->query().length)) { parser_state.m_input.m_has_digest = true; - assert(thd->m_digest == nullptr); + if (!(thd->rli_fake && thd->rli_fake->is_flashback)) { + assert(thd->m_digest == nullptr); + } thd->m_digest = &thd->m_digest_state; - assert(thd->m_statement_psi == nullptr); + if (!(thd->rli_fake && thd->rli_fake->is_flashback)) { + assert(thd->m_statement_psi == nullptr); + } thd->m_statement_psi = MYSQL_START_STATEMENT( &thd->m_statement_state, stmt_info_rpl.m_key, thd->db().str, thd->db().length, thd->charset(), nullptr); @@ -5078,11 +5084,13 @@ end: don't suffer from these assignments to 0 as DROP TEMPORARY TABLE uses the db.table syntax. */ - thd->set_catalog(NULL_CSTR); - thd->set_db(NULL_CSTR); /* will free the current database */ - thd->reset_query(); - thd->lex->sql_command = SQLCOM_END; - DBUG_PRINT("info", ("end: query= 0")); + if (!(thd->rli_fake && thd->rli_fake->is_flashback)) { + thd->set_catalog(NULL_CSTR); + thd->set_db(NULL_CSTR); /* will free the current database */ + thd->reset_query(); + thd->lex->sql_command = SQLCOM_END; + DBUG_PRINT("info", ("end: query= 0")); + } /* Mark the statement completed. */ MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); @@ -14321,3 +14329,564 @@ std::pair extract_log_event_basic_info( uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F; return std::make_pair(false, event_info); } + +#ifdef MYSQL_SERVER +/** + Print a packed value of the given SQL type into IO cache + + @param[in] file IO cache + @param[in] ptr Pointer to string + @param[in] type Column type + @param[in] meta Column meta information + @param[out] typestr SQL type string buffer (for verbose output) + @param[in] typestr_length Size of typestr + @param[in] col_name Column name + @param[in] is_partial True if this is a JSON column that will be + read in partial format, false otherwise. + + @retval 0 on error + @retval number of bytes scanned from ptr for non-NULL fields, or + another positive number for NULL fields +*/ +static size_t log_event_get_value(IO_CACHE *file, const uchar *ptr, uint type, + uint meta, char *typestr, + size_t typestr_length, char *col_name, + bool is_partial) { + uint32 length = 0; + + if (type == MYSQL_TYPE_STRING) { + if (meta >= 256) { + uint byte0 = meta >> 8; + uint byte1 = meta & 0xFF; + + if ((byte0 & 0x30) != 0x30) { + /* a long CHAR() field: see #37426 */ + length = byte1 | (((byte0 & 0x30) ^ 0x30) << 4); + type = byte0 | 0x30; + } else + length = meta & 0xFF; + } else + length = meta; + } + + switch (type) { + case MYSQL_TYPE_LONG: { + snprintf(typestr, typestr_length, "INT"); + if (!ptr) return my_b_printf(file, "NULL"); + int32 si = sint4korr(ptr); + uint32 ui = uint4korr(ptr); + my_b_write_sint32_and_uint32(file, si, ui); + return 4; + } + + case MYSQL_TYPE_TINY: { + snprintf(typestr, typestr_length, "TINYINT"); + if (!ptr) return my_b_printf(file, "NULL"); + my_b_write_sint32_and_uint32(file, (int)(signed char)*ptr, + (uint)(unsigned char)*ptr); + return 1; + } + + case MYSQL_TYPE_SHORT: { + snprintf(typestr, typestr_length, "SHORTINT"); + if (!ptr) return my_b_printf(file, "NULL"); + int32 si = (int32)sint2korr(ptr); + uint32 ui = (uint32)uint2korr(ptr); + my_b_write_sint32_and_uint32(file, si, ui); + return 2; + } + + case MYSQL_TYPE_INT24: { + snprintf(typestr, typestr_length, "MEDIUMINT"); + if (!ptr) return my_b_printf(file, "NULL"); + int32 si = sint3korr(ptr); + uint32 ui = uint3korr(ptr); + my_b_write_sint32_and_uint32(file, si, ui); + return 3; + } + + case MYSQL_TYPE_LONGLONG: { + snprintf(typestr, typestr_length, "LONGINT"); + if (!ptr) return my_b_printf(file, "NULL"); + char tmp[64]; + longlong si = sint8korr(ptr); + longlong10_to_str(si, tmp, -10); + my_b_printf(file, "%s", tmp); + if (si < 0) { + ulonglong ui = uint8korr(ptr); + longlong10_to_str((longlong)ui, tmp, 10); + my_b_printf(file, " (%s)", tmp); + } + return 8; + } + + case MYSQL_TYPE_NEWDECIMAL: { + uint precision = meta >> 8; + uint decimals = meta & 0xFF; + snprintf(typestr, typestr_length, "DECIMAL(%d,%d)", precision, decimals); + if (!ptr) return my_b_printf(file, "NULL"); + uint bin_size = my_decimal_get_binary_size(precision, decimals); + my_decimal dec; + binary2my_decimal(E_DEC_FATAL_ERROR, pointer_cast(ptr), + &dec, precision, decimals); + char buff[DECIMAL_MAX_STR_LENGTH + 1]; + int len = sizeof(buff); + decimal2string(&dec, buff, &len); + my_b_printf(file, "%s", buff); + return bin_size; + } + + case MYSQL_TYPE_FLOAT: { + snprintf(typestr, typestr_length, "FLOAT"); + if (!ptr) return my_b_printf(file, "NULL"); + float fl = float4get(ptr); + char tmp[320]; + sprintf(tmp, "%-20g", (double)fl); + my_b_printf(file, "%s", tmp); /* my_b_printf doesn't support %-20g */ + return 4; + } + + case MYSQL_TYPE_DOUBLE: { + strcpy(typestr, "DOUBLE"); + if (!ptr) return my_b_printf(file, "NULL"); + double dbl = float8get(ptr); + char tmp[320]; + sprintf(tmp, "%-.20g", dbl); /* my_b_printf doesn't support %-20g */ + my_b_printf(file, "%s", tmp); + return 8; + } + + case MYSQL_TYPE_BIT: { + /* Meta-data: bit_len, bytes_in_rec, 2 bytes */ + uint nbits = ((meta >> 8) * 8) + (meta & 0xFF); + snprintf(typestr, typestr_length, "BIT(%d)", nbits); + if (!ptr) return my_b_printf(file, "NULL"); + length = (nbits + 7) / 8; + my_b_write_bit(file, ptr, nbits); + return length; + } + + case MYSQL_TYPE_TIMESTAMP: { + snprintf(typestr, typestr_length, "TIMESTAMP"); + if (!ptr) return my_b_printf(file, "NULL"); + uint32 i32 = uint4korr(ptr); + my_b_printf(file, "%d", i32); + return 4; + } + + case MYSQL_TYPE_TIMESTAMP2: { + snprintf(typestr, typestr_length, "TIMESTAMP(%d)", meta); + if (!ptr) return my_b_printf(file, "NULL"); + char buf[MAX_DATE_STRING_REP_LENGTH]; + my_timeval tm; + my_timestamp_from_binary(&tm, ptr, meta); + int buflen = my_timeval_to_str(&tm, buf, meta); + my_b_write(file, pointer_cast(buf), buflen); + return my_timestamp_binary_length(meta); + } + + case MYSQL_TYPE_DATETIME: { + snprintf(typestr, typestr_length, "DATETIME"); + if (!ptr) return my_b_printf(file, "NULL"); + size_t d, t; + uint64 i64 = uint8korr(ptr); /* YYYYMMDDhhmmss */ + d = static_cast(i64 / 1000000); + t = i64 % 1000000; + my_b_printf(file, "%04d-%02d-%02d %02d:%02d:%02d", + static_cast(d / 10000), + static_cast(d % 10000) / 100, static_cast(d % 100), + static_cast(t / 10000), + static_cast(t % 10000) / 100, static_cast(t % 100)); + return 8; + } + + case MYSQL_TYPE_DATETIME2: { + snprintf(typestr, typestr_length, "DATETIME(%d)", meta); + if (!ptr) return my_b_printf(file, "NULL"); + char buf[MAX_DATE_STRING_REP_LENGTH]; + MYSQL_TIME ltime; + longlong packed = my_datetime_packed_from_binary(ptr, meta); + TIME_from_longlong_datetime_packed(<ime, packed); + int buflen = my_datetime_to_str(ltime, buf, meta); + my_b_write_quoted(file, (uchar *)buf, buflen); + return my_datetime_binary_length(meta); + } + + case MYSQL_TYPE_TIME: { + snprintf(typestr, typestr_length, "TIME"); + if (!ptr) return my_b_printf(file, "NULL"); + uint32 i32 = uint3korr(ptr); + my_b_printf(file, "'%02d:%02d:%02d'", i32 / 10000, (i32 % 10000) / 100, + i32 % 100); + return 3; + } + + case MYSQL_TYPE_TIME2: { + snprintf(typestr, typestr_length, "TIME(%d)", meta); + if (!ptr) return my_b_printf(file, "NULL"); + char buf[MAX_DATE_STRING_REP_LENGTH]; + MYSQL_TIME ltime; + longlong packed = my_time_packed_from_binary(ptr, meta); + TIME_from_longlong_time_packed(<ime, packed); + int buflen = my_time_to_str(ltime, buf, meta); + my_b_write_quoted(file, (uchar *)buf, buflen); + return my_time_binary_length(meta); + } + + case MYSQL_TYPE_NEWDATE: { + snprintf(typestr, typestr_length, "DATE"); + if (!ptr) return my_b_printf(file, "NULL"); + uint32 tmp = uint3korr(ptr); + int part; + char buf[11]; + char *pos = &buf[10]; // start from '\0' to the beginning + + /* Copied from field.cc */ + *pos-- = 0; // End NULL + part = (int)(tmp & 31); + *pos-- = (char)('0' + part % 10); + *pos-- = (char)('0' + part / 10); + *pos-- = ':'; + part = (int)(tmp >> 5 & 15); + *pos-- = (char)('0' + part % 10); + *pos-- = (char)('0' + part / 10); + *pos-- = ':'; + part = (int)(tmp >> 9); + *pos-- = (char)('0' + part % 10); + part /= 10; + *pos-- = (char)('0' + part % 10); + part /= 10; + *pos-- = (char)('0' + part % 10); + part /= 10; + *pos = (char)('0' + part); + my_b_printf(file, "'%s'", buf); + return 3; + } + + case MYSQL_TYPE_YEAR: { + snprintf(typestr, typestr_length, "YEAR"); + if (!ptr) return my_b_printf(file, "NULL"); + uint32 i32 = *ptr; + my_b_printf(file, "%04d", i32 + 1900); + return 1; + } + + case MYSQL_TYPE_ENUM: + switch (meta & 0xFF) { + case 1: + snprintf(typestr, typestr_length, "ENUM(1 byte)"); + if (!ptr) return my_b_printf(file, "NULL"); + my_b_printf(file, "%d", (int)*ptr); + return 1; + case 2: { + snprintf(typestr, typestr_length, "ENUM(2 bytes)"); + if (!ptr) return my_b_printf(file, "NULL"); + int32 i32 = uint2korr(ptr); + my_b_printf(file, "%d", i32); + return 2; + } + default: + my_b_printf(file, "!! Unknown ENUM packlen=%d", meta & 0xFF); + return 0; + } + break; + + case MYSQL_TYPE_SET: + snprintf(typestr, typestr_length, "SET(%d bytes)", meta & 0xFF); + if (!ptr) return my_b_printf(file, "NULL"); + my_b_write_bit(file, ptr, (meta & 0xFF) * 8); + return meta & 0xFF; + + case MYSQL_TYPE_BLOB: + switch (meta) { + case 1: + snprintf(typestr, typestr_length, "TINYBLOB/TINYTEXT"); + if (!ptr) return my_b_printf(file, "NULL"); + length = *ptr; + my_b_write_quoted(file, ptr + 1, length); + return length + 1; + case 2: + snprintf(typestr, typestr_length, "BLOB/TEXT"); + if (!ptr) return my_b_printf(file, "NULL"); + length = uint2korr(ptr); + my_b_write_quoted(file, ptr + 2, length); + return length + 2; + case 3: + snprintf(typestr, typestr_length, "MEDIUMBLOB/MEDIUMTEXT"); + if (!ptr) return my_b_printf(file, "NULL"); + length = uint3korr(ptr); + my_b_write_quoted(file, ptr + 3, length); + return length + 3; + case 4: + snprintf(typestr, typestr_length, "LONGBLOB/LONGTEXT"); + if (!ptr) return my_b_printf(file, "NULL"); + length = uint4korr(ptr); + my_b_write_quoted(file, ptr + 4, length); + return length + 4; + default: + my_b_printf(file, "!! Unknown BLOB packlen=%d", length); + return 0; + } + + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_VAR_STRING: + length = meta; + snprintf(typestr, typestr_length, "VARSTRING(%d)", length); + if (!ptr) return my_b_printf(file, "NULL"); + return my_b_write_quoted_with_length(file, ptr, length); + + case MYSQL_TYPE_STRING: + snprintf(typestr, typestr_length, "STRING(%d)", length); + if (!ptr) return my_b_printf(file, "NULL"); + return my_b_write_quoted_with_length(file, ptr, length); + + case MYSQL_TYPE_JSON: { + snprintf(typestr, typestr_length, "JSON"); + if (!ptr) return my_b_printf(file, "NULL"); + length = uint4korr(ptr); + ptr += 4; + if (is_partial) { + const char *error = print_json_diff(file, ptr, length, col_name); + if (error != nullptr) + my_b_printf(file, "Error %s while printing JSON diff\n", error); + } else { + json_binary::Value value = + json_binary::parse_binary((const char *)ptr, length); + if (value.type() == json_binary::Value::ERROR) { + if (my_b_printf( + file, + "Invalid JSON\n")) /* purecov: inspected */ // corrupted + // event + return 0; /* purecov: inspected */ // error writing output + } else { + Json_wrapper wrapper(value); + StringBuffer s; + if (json_wrapper_to_string(file, &s, &wrapper, true)) + my_b_printf(file, "Failed to format JSON object as string.\n"); + /* purecov: inspected */ // OOM + } + } + return length + meta; + } + case MYSQL_TYPE_BOOL: + case MYSQL_TYPE_INVALID: + default: { + char tmp[5]; + snprintf(tmp, sizeof(tmp), "%04x", meta); + my_b_printf(file, + "!! Don't know how to handle column type=%d meta=%d (%s)\n", + type, meta, tmp); + } break; + } + *typestr = 0; + return 0; +} + +size_t Rows_log_event::calculate_length(table_def *td, + MY_BITMAP *cols_bitmap, const uchar *value, + enum_row_image_type row_image_type) { + const uchar *value0 = value; + char typestr[64] = ""; + + // Read value_options if this is AI for PARTIAL_UPDATE_ROWS_EVENT + ulonglong value_options = 0; + Bit_reader partial_bits; + if (get_type_code() == binary_log::PARTIAL_UPDATE_ROWS_EVENT && + row_image_type == enum_row_image_type::UPDATE_AI) { + size_t length = m_rows_end - value; + if (net_field_length_checked(&value, &length, &value_options)) { + return 0; + } + if ((value_options & PARTIAL_JSON_UPDATES) != 0) { + partial_bits.set_ptr(value); + value += (td->json_column_count() + 7) / 8; + } + } + + /* + Metadata bytes which gives the information about nullabity of + master columns. Master writes one bit for each column in the + image. + */ + Bit_reader null_bits(value); + value += (bitmap_bits_set(cols_bitmap) + 7) / 8; + + for (size_t i = 0; i < td->size(); i++) { + /* + Note: need to read partial bit before reading cols_bitmap, since + the partial_bits bitmap has a bit for every JSON column + regardless of whether it is included in the bitmap or not. + */ + bool is_partial = (value_options & PARTIAL_JSON_UPDATES) != 0 && + row_image_type == enum_row_image_type::UPDATE_AI && + td->type(i) == MYSQL_TYPE_JSON && partial_bits.get(); + + if (bitmap_is_set(cols_bitmap, i) == 0) continue; + + bool is_null = null_bits.get(); + + if (!is_null) { + size_t fsize = + td->calc_field_size((uint)i, pointer_cast(value)); + if (fsize > (size_t)(m_rows_end - value)) { + return 0; + } + } + char col_name[256]; + sprintf(col_name, "@%lu", (unsigned long)i + 1); + IO_CACHE tmp_cache; + open_cached_file(&tmp_cache, nullptr, nullptr, 0, MYF(MY_WME | MY_NABP)); + size_t size = log_event_get_value( + &tmp_cache, is_null ? nullptr : value, td->type(i), td->field_metadata(i), + typestr, sizeof(typestr), col_name, is_partial); + if (!size) return 0; + + if (!is_null) value += size; + } + return value - value0; +} + +int Rows_log_event::change_to_flashback_event_helper( + table_mapping_flashback &flash_back_table_map, + uchar *rows_buff, Log_event_type ev_type) { + Table_map_log_event *map; + table_def *td; + std::vector rows_arr; + uchar *swap_buff1 = nullptr; + uchar *swap_buff2 = nullptr; + uchar *rows_pos = rows_buff + m_rows_before_size; + int ret = 0; + enum_row_image_type row_image_type = + get_general_type_code() == binary_log::WRITE_ROWS_EVENT + ? enum_row_image_type::WRITE_AI + : get_general_type_code() == binary_log::DELETE_ROWS_EVENT + ? enum_row_image_type::DELETE_BI + : enum_row_image_type::UPDATE_BI; + + if (!(map = flash_back_table_map.get_table(m_table_id)) || + !(td = map->create_table_def())) { + return 1; + } + + if (((get_general_type_code() == binary_log::WRITE_ROWS_EVENT) && + (m_rows_buf == m_rows_end))) + goto end; + for (uchar *value = m_rows_buf; value < m_rows_end;) { + uchar *start_pos = value; + size_t length1 = 0; + if (!(length1 = calculate_length(td, &m_cols, value, row_image_type))) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_ROW_LENGTH_ERROR, length1); + ret = 1; + goto end; + } + value += length1; + swap_buff1 = (uchar *)my_malloc(key_memory_log_event, length1, MYF(0)); + if (!swap_buff1) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_OUT_OF_MEMORY); + goto end; + } + + memcpy(swap_buff1, start_pos, length1); + size_t length2 = 0; + if (ev_type == binary_log::UPDATE_ROWS_EVENT || + ev_type == binary_log::UPDATE_ROWS_EVENT_V1) { + if (!(length2 = calculate_length( + td, &m_cols, value, + enum_row_image_type::UPDATE_BI))) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_ROW_LENGTH_ERROR, length2); + ret = 1; + goto end; + } + value += length2; + swap_buff2 = (uchar *)my_malloc(key_memory_log_event, length2, MYF(0)); + if (!swap_buff2) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_OUT_OF_MEMORY); + ret = 1; + goto end; + } + memcpy(swap_buff2, start_pos + length1, length2); // WHERE part + } + if (ev_type == binary_log::UPDATE_ROWS_EVENT || + ev_type == binary_log::UPDATE_ROWS_EVENT_V1) { + + memcpy(start_pos, swap_buff2, length2); + memcpy(start_pos + length2, swap_buff1, length1); + } + + my_free(swap_buff1); + if (ev_type == binary_log::UPDATE_ROWS_EVENT || + ev_type == binary_log::UPDATE_ROWS_EVENT_V1) + my_free(swap_buff2); + + LEX_STRING one_row; + one_row.length = length1 + length2; + one_row.str = + (char *)my_malloc(key_memory_log_event, one_row.length, MYF(0)); + memcpy(one_row.str, start_pos, one_row.length); + if (one_row.str == NULL) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_OUT_OF_MEMORY); + ret = 1; + return ret; + } else { + rows_arr.push_back(one_row); + } + } + + /* Copying rows from the end to the begining into event */ + for (uint i = rows_arr.size(); i > 0; --i) { + LEX_STRING *one_row = &rows_arr[i - 1]; + memcpy(rows_pos, (uchar *)one_row->str, one_row->length); + rows_pos += one_row->length; + my_free(one_row->str); + } +end: + delete td; + return ret; +} + +void Log_event::set_table_map(table_mapping_flashback &flash_back_table_map) { + uchar *ptr = (uchar *)temp_buf; + Log_event_type ev_type = (Log_event_type)ptr[EVENT_TYPE_OFFSET]; + assert(ev_type == binary_log::TABLE_MAP_EVENT); + + enum_binlog_checksum_alg ev_checksum_alg = common_footer->checksum_alg; + binary_log::Format_description_event fd_evt = + Format_description_event(BINLOG_VERSION, server_version); + fd_evt.footer()->checksum_alg = ev_checksum_alg; + + Table_map_log_event *map = new Table_map_log_event((const char *)ptr, &fd_evt); + flash_back_table_map.set_table(map->get_table_id(), map); +} + +/* Flashback the given event */ +int Log_event::change_to_flashback_event(table_mapping_flashback &flash_back_table_map, Rows_log_event **new_event) { + uchar *ptr = (uchar *)temp_buf; + int ret = 0; + enum_binlog_checksum_alg ev_checksum_alg = common_footer->checksum_alg; + binary_log::Format_description_event fd_evt = + Format_description_event(BINLOG_VERSION, server_version); + fd_evt.footer()->checksum_alg = ev_checksum_alg; + + Log_event_type ev_type = (Log_event_type)ptr[EVENT_TYPE_OFFSET]; + switch (ev_type) { + case binary_log::WRITE_ROWS_EVENT: + ptr[EVENT_TYPE_OFFSET] = binary_log::DELETE_ROWS_EVENT; + *new_event = new Delete_rows_log_event((const char *)ptr, &fd_evt); + ret = (*new_event)->change_to_flashback_event_helper(flash_back_table_map, ptr, ev_type); + break; + case binary_log::DELETE_ROWS_EVENT: + ptr[EVENT_TYPE_OFFSET] = binary_log::WRITE_ROWS_EVENT; + *new_event = new Write_rows_log_event((const char *)ptr, &fd_evt); + ret = (*new_event)->change_to_flashback_event_helper(flash_back_table_map, ptr, ev_type); + break; + case binary_log::UPDATE_ROWS_EVENT: + *new_event = new Update_rows_log_event((const char *)ptr, &fd_evt); + ret = (*new_event)->change_to_flashback_event_helper(flash_back_table_map, ptr, ev_type); + break; + default: + ret = 1; + break; + } + return ret; +} +#endif diff --git a/sql/log_event.h b/sql/log_event.h index 4255213..a0e6eef 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -69,6 +69,7 @@ #include "sql/thr_malloc.h" #include "sql_string.h" #include "typelib.h" // TYPELIB +#include "sql/rpl_tblmap.h" class THD; class Table_id; @@ -88,10 +89,6 @@ class Basic_ostream; #include "sql/xa.h" #endif -#ifndef MYSQL_SERVER -#include "sql/rpl_tblmap.h" // table_mapping -#endif - #include #include #include @@ -390,7 +387,6 @@ class List; class Relay_log_info; class Gtid_log_event; -#ifndef MYSQL_SERVER enum enum_base64_output_mode { BASE64_OUTPUT_NEVER = 0, BASE64_OUTPUT_AUTO = 1, @@ -509,7 +505,6 @@ struct PRINT_EVENT_INFO { */ uint32_t immediate_server_version; }; -#endif /* A specific to the database-scheduled MTS type. @@ -748,6 +743,9 @@ class Log_event { int net_send(Protocol *protocol, const char *log_name, my_off_t pos); + void set_table_map(table_mapping_flashback &flash_back_table_map); + int change_to_flashback_event(table_mapping_flashback &flash_back_table_map, Rows_log_event **new_event); + /** Stores a string representation of this event in the Protocol. This is used by SHOW BINLOG EVENTS. @@ -2326,12 +2324,12 @@ class Table_map_log_event : public binary_log::Table_map_event, ~Table_map_log_event() override; -#ifndef MYSQL_SERVER table_def *create_table_def() { assert(m_colcnt > 0); return new table_def(m_coltype, m_colcnt, m_field_metadata, m_field_metadata_size, m_null_bits, m_flags); } +#ifndef MYSQL_SERVER static bool rewrite_db_in_buffer(char **buf, ulong *event_len, const Format_description_event &fde); #endif @@ -2641,6 +2639,15 @@ class Rows_log_event : public virtual binary_log::Rows_event, public Log_event { int pack_info(Protocol *protocol) override; #endif +#ifdef MYSQL_SERVER + int change_to_flashback_event_helper(table_mapping_flashback &flash_back_table_map, + uchar *rows_buff, Log_event_type ev_type); + + size_t calculate_length(table_def *td, + MY_BITMAP *cols_bitmap, const uchar *value, + enum_row_image_type row_image_type); +#endif + #ifndef MYSQL_SERVER void print_verbose(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info); size_t print_verbose_one_row(IO_CACHE *file, table_def *td, @@ -2655,7 +2662,6 @@ class Rows_log_event : public virtual binary_log::Rows_event, public Log_event { return do_add_row_data(data, length); } #endif - /* Member functions to implement superclass interface */ size_t get_data_size() override; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index dda66f3..4b90754 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -4469,6 +4469,13 @@ SHOW_VAR com_status_vars[] = { {"xa_start", (char *)offsetof(System_status_var, com_stat[(uint)SQLCOM_XA_START]), SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, + {"show_flashback_status", + (char *)offsetof(System_status_var, + com_stat[(uint)SQLCOM_SHOW_FLASHBACK_STATUS]), + SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, + {"flash_back", + (char *)offsetof(System_status_var, com_stat[(uint)SQLCOM_FLASH_BACK]), + SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, {NullS, NullS, SHOW_LONG, SHOW_SCOPE_ALL}}; LEX_CSTRING sql_statement_names[(uint)SQLCOM_END + 1]; @@ -6295,6 +6302,14 @@ static int init_server_components() { LogErr(ERROR_LEVEL, ER_RPL_CANT_MAKE_PATHS, (int)FN_REFLEN, (int)FN_LEN); unireg_abort(MYSQLD_ABORT_EXIT); } + + if (flashback_logging_is_active()) { + LogErr(ERROR_LEVEL, ER_FLASH_BACK_DATA_INCONSISTENT); + if (!read_only) { + unireg_abort(MYSQLD_ABORT_EXIT); + } + } + } DBUG_PRINT("debug", @@ -6924,6 +6939,8 @@ static int init_server_components() { init_icu_data_directory(); #endif // MYSQL_ICU_DATADIR + flash_back_ins.init(); + return 0; } @@ -11959,6 +11976,7 @@ PSI_file_key key_file_relaylog_index; PSI_file_key key_file_relaylog_index_cache; PSI_file_key key_file_sdi; PSI_file_key key_file_hash_join; +PSI_file_key key_file_flashback; /* clang-format off */ static PSI_file_info all_server_files[]= @@ -11991,7 +12009,8 @@ static PSI_file_info all_server_files[]= { &key_file_trn, "trigger", 0, 0, PSI_DOCUMENT_ME}, { &key_file_init, "init", 0, 0, PSI_DOCUMENT_ME}, { &key_file_sdi, "SDI", 0, 0, PSI_DOCUMENT_ME}, - { &key_file_hash_join, "hash_join", 0, 0, PSI_DOCUMENT_ME} + { &key_file_hash_join, "hash_join", 0, 0, PSI_DOCUMENT_ME}, + { &key_file_flashback, "flashback", 0, 0, PSI_DOCUMENT_ME} }; /* clang-format on */ #endif /* HAVE_PSI_INTERFACE */ diff --git a/sql/mysqld.h b/sql/mysqld.h index 30d91a2..bdd6d6d 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -537,6 +537,7 @@ extern PSI_file_key key_file_relaylog_index; extern PSI_file_key key_file_relaylog_index_cache; extern PSI_file_key key_file_sdi; extern PSI_file_key key_file_hash_join; +extern PSI_file_key key_file_flashback; extern PSI_socket_key key_socket_tcpip; extern PSI_socket_key key_socket_unix; diff --git a/sql/parse_tree_nodes.cc b/sql/parse_tree_nodes.cc index e3fec91..e40ed43 100644 --- a/sql/parse_tree_nodes.cc +++ b/sql/parse_tree_nodes.cc @@ -4773,3 +4773,10 @@ PT_base_index_option *make_index_secondary_engine_attribute(MEM_ROOT *mem_root, return false; }); } + +Sql_cmd *PT_show_flashback_status::make_cmd(THD *thd) { + LEX *lex = thd->lex; + lex->sql_command = m_sql_command; + + return &m_sql_cmd; +} diff --git a/sql/parse_tree_nodes.h b/sql/parse_tree_nodes.h index 2c2d7f3..4441be5 100644 --- a/sql/parse_tree_nodes.h +++ b/sql/parse_tree_nodes.h @@ -5217,4 +5217,15 @@ PT_column_attr_base *make_column_secondary_engine_attribute(MEM_ROOT *, PT_base_index_option *make_index_engine_attribute(MEM_ROOT *, LEX_CSTRING); PT_base_index_option *make_index_secondary_engine_attribute(MEM_ROOT *, LEX_CSTRING); + +class PT_show_flashback_status final : public PT_show_base { + public: + PT_show_flashback_status(const POS &pos) + : PT_show_base(pos, SQLCOM_SHOW_FLASHBACK_STATUS) {} + + Sql_cmd *make_cmd(THD *thd) override; + + private: + Sql_cmd_show_flashback_status m_sql_cmd; +}; #endif /* PARSE_TREE_NODES_INCLUDED */ diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index cab7c98..f483119 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -2831,34 +2831,10 @@ class Gtid_state { that gtid_mode!=ON before calling this function, or else the gtid_mode could have changed to ON by a concurrent SET GTID_MODE.) */ - void acquire_anonymous_ownership() { - DBUG_TRACE; - sid_lock->assert_some_lock(); - assert(global_gtid_mode.get() != Gtid_mode::ON); -#ifndef NDEBUG - int32 new_value = -#endif - ++atomic_anonymous_gtid_count; - DBUG_PRINT("info", - ("atomic_anonymous_gtid_count increased to %d", new_value)); - assert(new_value >= 1); - return; - } + void acquire_anonymous_ownership(THD *thd); /// Release anonymous ownership. - void release_anonymous_ownership() { - DBUG_TRACE; - sid_lock->assert_some_lock(); - assert(global_gtid_mode.get() != Gtid_mode::ON); -#ifndef NDEBUG - int32 new_value = -#endif - --atomic_anonymous_gtid_count; - DBUG_PRINT("info", - ("atomic_anonymous_gtid_count decreased to %d", new_value)); - assert(new_value >= 0); - return; - } + void release_anonymous_ownership(THD *thd); /// Return the number of clients that hold anonymous ownership. int32 get_anonymous_ownership_count() { return atomic_anonymous_gtid_count; } @@ -3297,6 +3273,9 @@ class Gtid_state { -1 Error */ int save_gtids_of_last_binlog_into_table(); + + int rollback_gtid(THD *thd, Gtid_set &flashback_gtid_set); + /** Fetch gtids from gtid_executed table and store them into gtid_executed set. diff --git a/sql/rpl_gtid_execution.cc b/sql/rpl_gtid_execution.cc index 0e12aa4..16eebf1 100644 --- a/sql/rpl_gtid_execution.cc +++ b/sql/rpl_gtid_execution.cc @@ -81,11 +81,13 @@ bool set_gtid_next(THD *thd, const Gtid_specification &spec) { break; case ANONYMOUS_GTID: - if (global_gtid_mode.get() == Gtid_mode::ON) { - my_error(ER_CANT_SET_GTID_NEXT_TO_ANONYMOUS_WHEN_GTID_MODE_IS_ON, - MYF(0)); - goto err; - } + if (thd->rli_fake == nullptr || thd->rli_fake->is_flashback == false) { + if (global_gtid_mode.get() == Gtid_mode::ON) { + my_error(ER_CANT_SET_GTID_NEXT_TO_ANONYMOUS_WHEN_GTID_MODE_IS_ON, + MYF(0)); + goto err; + } + } /* The 'has_gtid_consistency_violation' must not be set, @@ -95,7 +97,7 @@ bool set_gtid_next(THD *thd, const Gtid_specification &spec) { thd->variables.gtid_next.set_anonymous(); thd->owned_gtid.sidno = THD::OWNED_SIDNO_ANONYMOUS; thd->owned_gtid.gno = 0; - gtid_state->acquire_anonymous_ownership(); + gtid_state->acquire_anonymous_ownership(thd); break; case ASSIGNED_GTID: diff --git a/sql/rpl_gtid_persist.cc b/sql/rpl_gtid_persist.cc index 37f35a4..bf1e1cf 100644 --- a/sql/rpl_gtid_persist.cc +++ b/sql/rpl_gtid_persist.cc @@ -336,6 +336,67 @@ end: return 0; } +/* Delete rows from gtid_executed table for flashback */ +int Gtid_table_persistor::flashback_delete_row(THD *thd, + string sid, rpl_gno flashback_gno_start) { + int err = 0; + TABLE *table = nullptr; + string cur_sid; + rpl_gno cur_gno_start = 0; + rpl_gno cur_gno_end = 0; + rpl_gno gno_end = 0; + Gtid_table_access_context table_access_ctx; + + mysql_mutex_lock(&LOCK_reset_gtid_table); + if (table_access_ctx.init(&thd, &table, true)) { + err = 1; + goto end; + } + + if ((err = table->file->ha_index_init(0, true))) { + table->file->print_error(err, MYF(0)); + goto end; + } + + /* Read each row by the PK(sid, gno_start) in increasing order. */ + err = table->file->ha_index_first(table->record[0]); + while (!err) { + get_gtid_interval(table, cur_sid, cur_gno_start, cur_gno_end); + if (sid == cur_sid) { + if (flashback_gno_start - 1 < cur_gno_start) { + err = table->file->ha_delete_row(table->record[0]); + err = table->file->ha_index_first(table->record[0]); + } else if (flashback_gno_start - 1 < cur_gno_end) { + gno_end = flashback_gno_start - 1; + table->file->ha_index_end(); + err = update_row(table, sid.c_str(), cur_gno_start, gno_end); + if (!err) { + err = table->file->ha_index_init(0, true); + if (!err) { + err = table->file->ha_index_first(table->record[0]); + } + } + } else { + err = table->file->ha_index_next(table->record[0]); + } + } else { + err = table->file->ha_index_next(table->record[0]); + } + } + + table->file->ha_index_end(); + + if (err == HA_ERR_END_OF_FILE) { + err = 0; + } + +end: + table_access_ctx.deinit(thd, table, 0 != err, true); + mysql_mutex_unlock(&LOCK_reset_gtid_table); + return err; +} + + int Gtid_table_persistor::save(THD *thd, const Gtid *gtid) { DBUG_TRACE; int error = 0; diff --git a/sql/rpl_gtid_persist.h b/sql/rpl_gtid_persist.h index 123b00d..cd511d5 100644 --- a/sql/rpl_gtid_persist.h +++ b/sql/rpl_gtid_persist.h @@ -164,6 +164,9 @@ class Gtid_table_persistor { */ int reset(THD *thd); + /* Delete rows from gtid_executed table for flashback */ + int flashback_delete_row(THD *thd, std::string sid, rpl_gno flashback_gno_start); + /** Fetch gtids from gtid_executed table and store them into gtid_executed set. @@ -333,6 +336,7 @@ class Gtid_table_persistor { @param table Reference to a table object. @retval Return the encoded gtid text. */ + std::string encode_gtid_text(TABLE *table); /** Get gtid interval from the the current row of the table. diff --git a/sql/rpl_gtid_state.cc b/sql/rpl_gtid_state.cc index 080858e..dc3c58a 100644 --- a/sql/rpl_gtid_state.cc +++ b/sql/rpl_gtid_state.cc @@ -47,6 +47,7 @@ #include "sql/sql_error.h" #include "sql/system_variables.h" #include "sql/thr_malloc.h" +#include "sql/rpl_rli.h" class Table_ref; @@ -213,6 +214,40 @@ void Gtid_state::update_on_rollback(THD *thd) { update_gtids_impl(thd, false); } +void Gtid_state::acquire_anonymous_ownership(THD *thd) +{ + DBUG_TRACE; + sid_lock->assert_some_lock(); + if (thd->rli_fake == nullptr || thd->rli_fake->is_flashback == false) { + assert(global_gtid_mode.get() != Gtid_mode::ON); + } +#ifndef NDEBUG + int32 new_value = +#endif + ++atomic_anonymous_gtid_count; + DBUG_PRINT("info", + ("atomic_anonymous_gtid_count increased to %d", new_value)); + assert(new_value >= 1); + return; +} + +/// Release anonymous ownership. +void Gtid_state::release_anonymous_ownership(THD *thd) { + DBUG_TRACE; + sid_lock->assert_some_lock(); + if (thd->rli_fake == nullptr || thd->rli_fake->is_flashback == false) { + assert(global_gtid_mode.get() != Gtid_mode::ON); + } +#ifndef NDEBUG + int32 new_value = +#endif + --atomic_anonymous_gtid_count; + DBUG_PRINT("info", + ("atomic_anonymous_gtid_count decreased to %d", new_value)); + assert(new_value >= 0); + return; +} + void Gtid_state::update_gtids_impl(THD *thd, bool is_commit) { DBUG_TRACE; @@ -532,7 +567,7 @@ enum_return_status Gtid_state::generate_automatic_gtid( // using an anonymous transaction. thd->owned_gtid.sidno = THD::OWNED_SIDNO_ANONYMOUS; thd->owned_gtid.gno = 0; - acquire_anonymous_ownership(); + acquire_anonymous_ownership(thd); thd->owned_gtid.dbug_print( nullptr, "set owned_gtid (anonymous) in generate_automatic_gtid"); } @@ -938,7 +973,7 @@ void Gtid_state::update_gtids_impl_own_anonymous(THD *thd, bool *more_trx) { } } if (!(*more_trx && thd->variables.gtid_next.type == ANONYMOUS_GTID)) { - release_anonymous_ownership(); + release_anonymous_ownership(thd); thd->clear_owned_gtids(); } } @@ -968,3 +1003,35 @@ error: BINLOG_ERROR(("Out of memory."), (ER_OUT_OF_RESOURCES, MYF(0))); RETURN_REPORTED_ERROR; } + +/* rollback gtid for flashback */ +int Gtid_state::rollback_gtid(THD *thd, + Gtid_set &flashback_gtid_set) { + int err = 0; + char buf[binary_log::Uuid::TEXT_LENGTH + 1]; + + global_sid_lock->wrlock(); + Gtid_set::Gtid_iterator git(&flashback_gtid_set); + Gtid g = git.get(); + rpl_sid sid = global_sid_map->sidno_to_sid(g.sidno); + sid.to_string(buf); + rpl_gno start_gno = g.gno; + + while (g.sidno != 0) { + if (start_gno > g.gno) { + start_gno = g.gno; + } + lock_sidno(g.sidno); + next_free_gno = start_gno; + executed_gtids._remove_gtid(g); + previous_gtids_logged._remove_gtid(g); + unlock_sidno(g.sidno); + git.next(); + g = git.get(); + } + + err = gtid_table_persistor->flashback_delete_row(thd, buf, start_gno); + global_sid_lock->unlock(); + + return err; +} diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index bf4a2fb..64c8f8d 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -146,6 +146,7 @@ #include "sql/rpl_trx_boundary_parser.h" #include "sql/rpl_utility.h" #include "sql/sql_backup_lock.h" // is_instance_backup_locked +#include "sql/sql_binlog.h" #include "sql/sql_class.h" // THD #include "sql/sql_const.h" #include "sql/sql_error.h" @@ -160,6 +161,10 @@ #include "sql_common.h" // end_server #include "sql_string.h" #include "typelib.h" + +#include "sql/binlog_ostream.h" +#include "sql/tztime.h" + #ifndef NDEBUG #include "rpl_debug_points.h" #endif @@ -238,6 +243,10 @@ int disconnect_slave_event_count = 0, abort_slave_event_count = 0; static thread_local Master_info *RPL_MASTER_INFO = nullptr; +uint flashback_success_counter = 0; +Flash_back flash_back_ins; +Flash_back_monitor fb_monitor; + /** Encapsulates the messages and thread stages used for a specific call to try_to_reconnect. Different Reconnect_messages objects may be @@ -11324,3 +11333,665 @@ static bool check_replica_configuration_errors(Master_info *mi, } return false; } + +/** + Exit status for flashback functions. +*/ +enum Flashback_error_code { + OK_CONTINUE = 0, //no error occured + OPEN_BINLOG_INDEX_FAILED = 1, + FIND_LOG_POS_FAILED = 2, + PARSE_START_GTID_FAILED = 3, + OPEN_LASTEST_BINLOG_FAILED = 4, + CHANGE_TO_FLASHBACK_EVENT_FAILED = 5, + FIND_UNSUPPORTED_EVENT_IN_BINLOG = 6, + APPLY_EVENT_FAILED = 7, + ROLLBACK_GTID_FAILED = 8, + TRUNCATED_BINLOG_FAILED = 9, + CREATE_FLASHBACK_START_LOGGING_FAILED = 10, + INVALID_START_GTID = 11, + DELETE_FLASHBACK_START_LOGGING_FAILED = 12, + CHECK_EVENT_TYPE_FAILED = 13, +}; + +const char * convert_errorcode_to_string(Flashback_error_code &ret) { + switch(ret){ + case OPEN_BINLOG_INDEX_FAILED: + return "open binlog log index file failed"; + + case FIND_LOG_POS_FAILED: + return "could not find first log file name in binary log index file"; + + case PARSE_START_GTID_FAILED: + return "parse start gtid failed"; + + case OPEN_LASTEST_BINLOG_FAILED: + return "open last binlog log file failed"; + + case CHANGE_TO_FLASHBACK_EVENT_FAILED: + return "change to flash back event failed"; + + case FIND_UNSUPPORTED_EVENT_IN_BINLOG: + return "find unsupported event in last binlog file"; + + case APPLY_EVENT_FAILED: + return "apply flash back event failed"; + + case ROLLBACK_GTID_FAILED: + return "rollback gtid failed"; + + case TRUNCATED_BINLOG_FAILED: + return "truncated failed"; + + case CREATE_FLASHBACK_START_LOGGING_FAILED: + return "create flash back start logging failed"; + + case INVALID_START_GTID: + return "start gtid is invalid"; + + case DELETE_FLASHBACK_START_LOGGING_FAILED: + return "delete flash back start logging failed."; + + case CHECK_EVENT_TYPE_FAILED: + return "check event type failed"; + + default: + break; + } + return "Unknown error"; +} + +void Flash_back::init() { + flashback_remove_gtid_set = new Gtid_set(global_sid_map); + mysql_mutex_init(0, &LOCK_flash_back, nullptr); + begin_events = nullptr; + commit_events = nullptr; + valid_pos = 0; + first_gtid_seen = false; + first_gtid_exceed = false; +} + +void Flash_back::reset() { + events_in_stmt.clear(); + m_events.clear(); + flash_back_table_map.clear_tables(); + + valid_pos = 0; + first_gtid_seen = false; + first_gtid_exceed = false; + + if (flashback_remove_gtid_set != nullptr) { + delete flashback_remove_gtid_set; + flashback_remove_gtid_set = nullptr; + } + if (begin_events !=nullptr ) { + delete begin_events; + begin_events = nullptr; + } + if (commit_events !=nullptr ) { + delete commit_events; + commit_events = nullptr; + } + + unlock(); +} + +/** + Update flashback monitor if flashback succeed. + + @param[in] src The start gtid + @param[in] log_name the flashback binlog name + @param[in] trx_counter numbers of trx are flashbacked + @param[in] begin_time start time of flashback + @param[in] end_time end time of flashback +*/ +void Flash_back_monitor::update_monitor(char* src, char* log_name, + uint trx_counter, ulonglong begin_time, ulonglong end_time) { + strcpy(start_gtid_for_monitor, src); + strcpy(binlog_log_name, log_name); + flashback_transaction_counter = trx_counter; + flashback_begin_time = begin_time; + flashback_end_time = end_time; +} + +bool exceed_gtid(Gtid_specification &spec, + rpl_sidno sidno, rpl_gno gno) { + if (spec.gtid.sidno == sidno && spec.gtid.gno < gno) { + return true; + } else { + return false; + } +} + +static bool shall_skip_gtids(const Log_event *ev, + uint &flashback_transaction_counter, + Gtid_specification &flash_back_gtid, + Binlog_file_reader &binlog_file_reader) { + if (flash_back_ins.first_gtid_exceed) { + return true; + } + + bool filtered = false; + static bool flash_back_filter_on_gtids = false; + + switch (ev->get_type_code()) { + case binary_log::GTID_LOG_EVENT:{ + Gtid_log_event *gtid_ev = + const_cast(down_cast(ev)); + bool is_exceed = exceed_gtid(flash_back_gtid, gtid_ev->get_sidno(true), gtid_ev->get_gno()); + if (is_exceed) { + Gtid gtid = {0, 0}; + gtid.sidno = gtid_ev->get_sidno(true); + flash_back_ins.flashback_remove_gtid_set->get_sid_map()->get_sid_lock()->rdlock(); + gtid.gno = gtid_ev->get_gno(); + flash_back_ins.flashback_remove_gtid_set->ensure_sidno(gtid.sidno); + flash_back_ins.flashback_remove_gtid_set->_add_gtid(gtid); + flash_back_ins.flashback_remove_gtid_set->get_sid_map()->get_sid_lock()->unlock(); + } + + if (!flash_back_ins.first_gtid_seen) { + flash_back_ins.first_gtid_seen = true; + if (is_exceed) { + flash_back_ins.first_gtid_exceed = true; + return true; + } + } + + filtered = filtered || !is_exceed; + flash_back_filter_on_gtids = filtered; + } break; + case binary_log::XID_EVENT: + filtered = flash_back_filter_on_gtids; + flash_back_filter_on_gtids = false; + if (filtered) { + flash_back_ins.valid_pos = binlog_file_reader.position(); + } else { + flashback_transaction_counter++; + } + break; + case binary_log::FORMAT_DESCRIPTION_EVENT: + case binary_log::PREVIOUS_GTIDS_LOG_EVENT: + filtered = false; + break; + default: + filtered = flash_back_filter_on_gtids; + break; + } + + return filtered; +} + +/** + Open last binlog and flashback event according to start gtid + + @param[in] thd + @param[in] start_gtid start gtid of flashback + @param[in] rli dummy relay log info for apply event + @param[in,out] log_name last binlog name + @param[in,out] flashback_transaction_counter numbers of trx flashbacked + @param[in,out] binlog_file_reader + @return Flashback_error_code OK_CONTINUE if no error +*/ +Flashback_error_code open_last_binlog_file_and_get_event(THD *thd, + char *start_gtid, Relay_log_info *rli, + char (&log_name)[FN_REFLEN], + uint &flashback_transaction_counter, + Binlog_file_reader &binlog_file_reader) { + LOG_INFO log_info; + Flashback_error_code retval = OK_CONTINUE; + IO_CACHE *index_file; + Log_event *ev = nullptr; + bool stmt_end = false; + bool first_begin_seen = false; + bool first_commit_seen = false; + int error = 0; + std::vector events_in_stmt; + Gtid_specification flash_back_gtid; + + mysql_bin_log.lock_index(); + index_file = mysql_bin_log.get_index_file(); + if (!my_b_inited(index_file)) { + /* There was a failure to open the index file, can't open the binlog */ + mysql_bin_log.unlock_index(); + retval = OPEN_BINLOG_INDEX_FAILED; + goto err; + } + mysql_bin_log.unlock_index(); + + if ((error = mysql_bin_log.find_log_pos(&log_info, NullS, true /*need_lock_index=true*/))) { + if (error != LOG_INFO_EOF) { + LogErr(ERROR_LEVEL, ER_BINLOG_CANT_FIND_LOG_IN_INDEX, error); + } + retval = FIND_LOG_POS_FAILED; + goto err; + } + + /* parse start_gtid to flash_back_gtid */ + if (start_gtid != nullptr) { + global_sid_lock->rdlock(); + if (flash_back_gtid.parse(global_sid_map, start_gtid) != RETURN_STATUS_OK) { + global_sid_lock->unlock(); + retval = PARSE_START_GTID_FAILED; + goto err; + } + global_sid_lock->unlock(); + } else { + retval = PARSE_START_GTID_FAILED; + goto err; + } + + do { + strmake(log_name, log_info.log_file_name, sizeof(log_name) - 1); + } while (!(error = mysql_bin_log.find_next_log(&log_info, true /*need_lock_index=true*/))); + + if (error != LOG_INFO_EOF) { + LogErr(ERROR_LEVEL, ER_BINLOG_CANT_FIND_LOG_IN_INDEX, error); + retval = OPEN_LASTEST_BINLOG_FAILED; + goto err; + } + + if (binlog_file_reader.open(log_name)) { + LogErr(ERROR_LEVEL, ER_BINLOG_FILE_OPEN_FAILED, + binlog_file_reader.get_error_str()); + retval = OPEN_LASTEST_BINLOG_FAILED; + goto err; + } + + if (!rli) { + /* when trying to create an rli from a client, there is no channel*/ + if ((rli = Rpl_info_factory::create_rli(INFO_REPOSITORY_DUMMY, false, + (const char *)"", true))) { + thd->rli_fake = rli; + rli->info_thd = thd; + } + } + + while ((ev = binlog_file_reader.read_event_object()) != nullptr) { + if (flash_back_ins.first_gtid_exceed) { + retval = INVALID_START_GTID; + goto err; + } + + if (shall_skip_gtids(ev, flashback_transaction_counter, flash_back_gtid, binlog_file_reader)) { + delete ev; + continue; + } + + switch (ev->get_type_code()) { + case binary_log::TABLE_MAP_EVENT:{ + if (check_event_type(ev->get_type_code(), rli)) { + retval = CHECK_EVENT_TYPE_FAILED; + goto err; + } + Rows_log_event *new_ev = (Rows_log_event *)ev; + new_ev->set_table_map(flash_back_ins.flash_back_table_map); + events_in_stmt.clear(); + events_in_stmt.push_back(new_ev); + break; + } + case binary_log::WRITE_ROWS_EVENT: + case binary_log::UPDATE_ROWS_EVENT: + case binary_log::DELETE_ROWS_EVENT:{ + if (check_event_type(ev->get_type_code(), rli)) { + retval = CHECK_EVENT_TYPE_FAILED; + goto err; + } + Rows_log_event *new_ev = (Rows_log_event *)ev; + if (new_ev->get_flags(Rows_log_event::STMT_END_F)) { + stmt_end = true; + if (events_in_stmt.size() >= 1) { + new_ev->clear_flags(Rows_log_event::STMT_END_F); + } + } else if (events_in_stmt.size() == 0) { + new_ev->set_flags(Rows_log_event::STMT_END_F); + } + events_in_stmt.push_back(new_ev); + if (stmt_end) { + size_t len = events_in_stmt.size(); + if (len >= 2) { + Log_event *e = nullptr; + Rows_log_event *new_event2 = nullptr; + std::vector events_in_stmt_temp(events_in_stmt); + for (uint i = len - 1; i >= 1; i--) { + events_in_stmt[i] = events_in_stmt_temp[len-i]; + } + for (uint i = 1; i< len; i++) { + e = events_in_stmt[i]; + int ret = e->change_to_flashback_event(flash_back_ins.flash_back_table_map, &new_event2); + if (ret) { + retval = CHANGE_TO_FLASHBACK_EVENT_FAILED; + goto err; + } else { + delete e; + events_in_stmt[i] = new_event2; + } + } + } + flash_back_ins.m_events.push_back(events_in_stmt); + } + stmt_end = false; + break; + } + case binary_log::QUERY_EVENT:{ + Query_log_event *new_ev = (Query_log_event *)ev; + if (strcmp("BEGIN", new_ev->query) != 0) { + retval = FIND_UNSUPPORTED_EVENT_IN_BINLOG; + goto err; + } else if (!first_begin_seen) { + flash_back_ins.begin_events = new_ev; + first_begin_seen = true; + } + break; + } + case binary_log::FORMAT_DESCRIPTION_EVENT: + if (check_event_type(ev->get_type_code(), rli)) { + retval = CHECK_EVENT_TYPE_FAILED; + goto err; + } + break; + case binary_log::PREVIOUS_GTIDS_LOG_EVENT: + case binary_log::ROWS_QUERY_LOG_EVENT: + case binary_log::GTID_LOG_EVENT: + break; + case binary_log::XID_EVENT:{ + Xid_log_event *new_ev = (Xid_log_event*)ev; + if (!first_commit_seen) { + flash_back_ins.commit_events = new_ev; + first_commit_seen = true; + } + break; + } + default:{ + retval = FIND_UNSUPPORTED_EVENT_IN_BINLOG; + goto err; + } + } + } + + std::reverse(flash_back_ins.m_events.begin(), flash_back_ins.m_events.end()); + retval = OK_CONTINUE; + +err: + return retval; +} + +bool get_flashback_log_name(char *name) { + bool error = false; + char log_dirpart[FN_REFLEN]; + size_t log_dirpart_len; + char buff[FN_REFLEN]; + const char *opt_name = log_bin_basename; + if (opt_name) { + dirname_part(log_dirpart, opt_name, &log_dirpart_len); + } + if (log_dirpart_len > 0) { + if (fn_format(buff, "flashback_temp_log", log_dirpart, "", + MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH)) == nullptr) { + error = true; + goto end; + } + } + + if (buff) { + size_t length = strlen(buff); + + if (length && buff[length - 1] == '\n') { + buff[length - 1] = 0; + length--; + if (length && buff[length - 1] == '\r') { + buff[length - 1] = 0; + length--; + } + } + if (!length) { + error = true; + goto end; + } + strmake(name, buff, length); + } + +end: + return error; +} + +int flashback_start_logging() { + char full_log_name[FN_REFLEN]; + full_log_name[0] = 0; + if (get_flashback_log_name(full_log_name)) { + return 1; + } + + if (mysql_file_create(key_file_flashback, full_log_name, CREATE_MODE, + O_WRONLY | O_EXCL | O_NOFOLLOW, MYF(MY_WME)) < 0) { + return 1; + } + return 0; +} + +int flashback_done_logging() { + char full_log_name[FN_REFLEN]; + int fd = 0; + full_log_name[0] = 0; + if (get_flashback_log_name(full_log_name)) { + return 1; + } + if ((fd = mysql_file_open(key_file_flashback, full_log_name, O_RDWR, MYF(0))) < 0) { + return 1; + } else { + mysql_file_close(fd, MYF(0)); + mysql_file_delete(key_file_flashback, full_log_name, MYF(MY_WME)); + } + return 0; +} + +bool flashback_logging_is_active() { + char full_log_name[FN_REFLEN]; + int fd = 0; + full_log_name[0] = 0; + if (get_flashback_log_name(full_log_name)) { + return false; + } + if ((fd = mysql_file_open(key_file_flashback, full_log_name, O_RDWR, MYF(0))) < 0) { + return false; + } + return true; +} + +/** +flashback to gtid. +@return false if flashback succeed. +*/ +bool flash_back_to_gtid(THD *thd, char *start_gtid) { + char log_name[FN_REFLEN]; + char start_gtid_temp[256]; + strcpy(start_gtid_temp, start_gtid); + uint flashback_transaction_counter = 0; + ulonglong flashback_begin_time = 0; + ulonglong flashback_end_time = 0; + Binlog_file_reader binlog_file_reader(opt_source_verify_checksum); + + flash_back_ins.lock(); + if (flash_back_ins.flashback_remove_gtid_set == nullptr) { + flash_back_ins.flashback_remove_gtid_set = new Gtid_set(global_sid_map); + } + + ulonglong saved_options = thd->variables.option_bits; + auto saved_gtid_type = thd->variables.gtid_next.type; + + Relay_log_info *rli = thd->rli_fake; + if (!rli) { + /* when trying to create an rli from a client, there is no channel*/ + if ((rli = Rpl_info_factory::create_rli(INFO_REPOSITORY_DUMMY, false, + (const char *)"", true))) { + thd->rli_fake = rli; + rli->info_thd = thd; + } + } + rli->is_flashback = true; + flashback_begin_time = my_micro_time(); + + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_MSG, "flash back start"); + + mysql_mutex_lock(mysql_bin_log.get_log_lock()); + + Flashback_error_code ret = open_last_binlog_file_and_get_event(thd, start_gtid_temp, + rli, log_name, flashback_transaction_counter, binlog_file_reader); + binlog_file_reader.close(); + + if (ret != OK_CONTINUE) { + goto finish; + } else if(flash_back_ins.m_events.size() > 0) { + std::vector>::iterator it = flash_back_ins.m_events.begin(); + std::vector tempVect; + + /* turn off binlog */ + thd->variables.option_bits &= ~OPTION_BIN_LOG; + + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_PARSE_LAST_BINLOG_FILE, flash_back_ins.m_events.size()); + + /* apply begin event */ + flash_back_ins.begin_events->thd = thd; + if (flash_back_ins.begin_events->apply_event(rli)) { + ret = APPLY_EVENT_FAILED; + goto finish; + } + + /* all dml event was applied in on transaction */ + for (; it != flash_back_ins.m_events.end(); it++) { + tempVect = *it; + for (std::vector::iterator it1 = tempVect.begin(); it1 != tempVect.end(); it1++) { + Log_event *ev = *it1; + ev->thd = thd; + int res = ev->apply_event(rli); + delete ev; + if (res) { + ret = APPLY_EVENT_FAILED; + goto finish; + } + } + } + + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_MSG, "apply flash back events finished"); + + DBUG_EXECUTE_IF("flashback_crash_before_commit", { + DBUG_SUICIDE(); + }); + + /* create flash log file */ + if (flashback_start_logging()) { + ret = CREATE_FLASHBACK_START_LOGGING_FAILED; + goto finish; + } + + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_MSG, "flash back start logging"); + + /* apply commit event */ + flash_back_ins.commit_events->thd = thd; + if (flash_back_ins.commit_events->apply_event(rli)) { + ret = APPLY_EVENT_FAILED; + goto finish; + } + + /* rollback gtid */ + if (flash_back_ins.valid_pos > 0) { + if (gtid_state->rollback_gtid(thd, *(flash_back_ins.flashback_remove_gtid_set))) { + ret = ROLLBACK_GTID_FAILED; + goto finish; + } + } else { + assert(flash_back_ins.flashback_remove_gtid_set->is_empty()); + } + + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_MSG, "flash back rollback gtid finished"); + + /* truncate binlog file */ + if (flash_back_ins.valid_pos > 0) { + if (mysql_bin_log.truncate_binlog_file(flash_back_ins.valid_pos, log_name)) { + ret = TRUNCATED_BINLOG_FAILED; + goto finish; + } + } + + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_MSG, "flash back truncate binlog file finished"); + } else { + ret = INVALID_START_GTID; + } + +finish: + thd->variables.option_bits = saved_options; + thd->variables.gtid_next.type = saved_gtid_type; + + /* remove flashback log */ + if (flashback_logging_is_active()) { + if (flashback_done_logging()) { + ret = DELETE_FLASHBACK_START_LOGGING_FAILED; + } else { + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_MSG, "flash back done logging"); + } + } + + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); + + flash_back_ins.reset(); + + if (ret != OK_CONTINUE) { + const char *error_msg = convert_errorcode_to_string(ret); + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_FAIL_MSG, start_gtid_temp, error_msg); + my_error(ER_FLASH_BACK_ERROR, MYF(0), error_msg); + return true; + } else { + flashback_success_counter++; + flashback_end_time = my_micro_time(); + /* update flashback monitor */ + fb_monitor.update_monitor(start_gtid_temp, + log_name, flashback_transaction_counter, flashback_begin_time, flashback_end_time); + LogErr(INFORMATION_LEVEL, ER_FLASH_BACK_SUCCESS_MSG, start_gtid_temp, flashback_transaction_counter); + my_ok(thd); + } + + return false; +} + +/** +show flashback status +@return false if succeed. +*/ +bool show_flashback_status(THD *thd) { + Protocol *protocol = thd->get_protocol(); + mem_root_deque field_list(thd->mem_root); + DBUG_TRACE; + + field_list.push_back(new Item_empty_string("start_gtid", NAME_CHAR_LEN)); + field_list.push_back(new Item_return_int("transaction_rollback_counts", 10, MYSQL_TYPE_LONG)); + field_list.push_back(new Item_empty_string("binlog_name", NAME_CHAR_LEN)); + field_list.push_back(new Item_temporal(MYSQL_TYPE_TIMESTAMP, Name_string("start_time", sizeof("start_time")-1),0,0)); + field_list.push_back(new Item_temporal(MYSQL_TYPE_TIMESTAMP, Name_string("end_time", sizeof("end_time")-1),0,0)); + + if (thd->send_result_metadata(field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) { + return true; + } + + if (flashback_success_counter > 0) { + protocol->start_row(); + protocol->store(fb_monitor.get_start_gtid(), system_charset_info); + protocol->store(fb_monitor.get_transaction_counter()); + protocol->store(fb_monitor.get_log_name(), system_charset_info); + MYSQL_TIME timestamp_start; + struct timeval start_time; + my_micro_time_to_timeval(fb_monitor.get_begin_time(), &start_time); + my_tz_SYSTEM->gmt_sec_to_TIME(×tamp_start, (my_time_t)start_time.tv_sec); + protocol->store_datetime(timestamp_start, 2); + MYSQL_TIME timestamp_end; + struct timeval end_time; + my_micro_time_to_timeval(fb_monitor.get_end_time(), &end_time); + my_tz_SYSTEM->gmt_sec_to_TIME(×tamp_end, (my_time_t)end_time.tv_sec); + protocol->store_datetime(timestamp_end, 2); + if (protocol->end_row()) { + return true; + } + } + + my_eof(thd); + return false; +} diff --git a/sql/rpl_replica.h b/sql/rpl_replica.h index 8f22b7e..b9b3110 100644 --- a/sql/rpl_replica.h +++ b/sql/rpl_replica.h @@ -39,6 +39,10 @@ #include "sql/changestreams/apply/constants.h" #include "sql/current_thd.h" #include "sql/debug_sync.h" +#include "my_io.h" +#include "sql/log_event.h" +#include "sql/rpl_tblmap.h" +#include "sql/rpl_gtid.h" class Master_info; class Relay_log_info; @@ -670,7 +674,51 @@ bool is_network_error(uint errorno); int init_replica_thread(THD *thd, SLAVE_THD_TYPE thd_type); + +class Flash_back { +public: + void init(); + void reset(); + void lock() { mysql_mutex_lock(&LOCK_flash_back); } + void unlock() { mysql_mutex_unlock(&LOCK_flash_back); } + + /* used for store all flashback event */ + std::vector> m_events; + std::vector events_in_stmt; + table_mapping_flashback flash_back_table_map; + Log_event *begin_events{nullptr}; + Log_event *commit_events{nullptr}; + my_off_t valid_pos = 0; + Gtid_set *flashback_remove_gtid_set{nullptr}; + mysql_mutex_t LOCK_flash_back; + bool first_gtid_seen{false}; + bool first_gtid_exceed{false}; +}; + +class Flash_back_monitor { +public: + void update_monitor(char* src, char* log_name, uint trx_counter, ulonglong begin_time, ulonglong end_time); + char* get_start_gtid() { return start_gtid_for_monitor; } + char* get_log_name() { return binlog_log_name; } + uint get_transaction_counter() { return flashback_transaction_counter; } + ulonglong get_begin_time() { return flashback_begin_time; } + ulonglong get_end_time() { return flashback_end_time; } + +private: + char start_gtid_for_monitor[256]{0}; + char binlog_log_name[FN_REFLEN]{0}; + uint flashback_transaction_counter{0}; + ulonglong flashback_begin_time{0}; + ulonglong flashback_end_time{0}; +}; + +bool flash_back_to_gtid(THD *thd, char *start_gtid); /** @} (end of group Replication) */ + +bool show_flashback_status(THD *thd); +bool flashback_logging_is_active(); + +extern Flash_back flash_back_ins; #endif diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 81a6c1b..5991d70 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -126,6 +126,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, #endif param_id, param_channel), replicate_same_server_id(::replicate_same_server_id), + is_flashback(false), relay_log(&sync_relaylog_period, true), is_relay_log_recovery(is_slave_recovery), save_temporary_tables(nullptr), diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index bf954b9..9c2828a 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -323,6 +323,9 @@ class Relay_log_info : public Rpl_info { */ bool replicate_same_server_id; + /* flashback ignore gtid mode in apply_event */ + bool is_flashback; + /* Protected with internal locks. Must get data_lock when resetting the logs. diff --git a/sql/rpl_tblmap.cc b/sql/rpl_tblmap.cc index d5a0a52..8b2516e 100644 --- a/sql/rpl_tblmap.cc +++ b/sql/rpl_tblmap.cc @@ -151,3 +151,88 @@ void table_mapping::clear_tables() { } m_table_ids.clear(); } + +table_mapping_flashback::table_mapping_flashback() + : m_mem_root(table_psi_key, TABLE_ID_HASH_SIZE * sizeof(entry)), + m_free(nullptr), + m_table_ids(table_psi_key) {} + +table_mapping_flashback::~table_mapping_flashback() { + clear_tables(); +} + +Table_map_log_event *table_mapping_flashback::get_table(ulonglong table_id) { + DBUG_TRACE; + DBUG_PRINT("enter", ("table_id: %llu", table_id)); + auto it = m_table_ids.find(table_id); + if (it != m_table_ids.end()) { + entry *e = it->second; + return e->table; + } + + DBUG_PRINT("info", ("tid %llu is not mapped!", table_id)); + return nullptr; +} + +/* + Called when we are out of table id entries. Creates TABLE_ID_CHUNK + new entries, chain them and attach them at the head of the list of free + (free for use) entries. +*/ +int table_mapping_flashback::expand() { + entry *tmp = new (&m_mem_root) entry[TABLE_ID_CHUNK]; + if (tmp == nullptr) return ERR_MEMORY_ALLOCATION; // Memory allocation failed + + /* Find the end of this fresh new array of free entries */ + entry *e_end = tmp + TABLE_ID_CHUNK - 1; + for (entry *e = tmp; e < e_end; e++) e->next = e + 1; + e_end->next = m_free; + m_free = tmp; + return 0; +} + +int table_mapping_flashback::set_table(ulonglong table_id, Table_map_log_event *table) { + DBUG_TRACE; + entry *e; + auto it = m_table_ids.find(table_id); + if (it == m_table_ids.end()) { + if (m_free == nullptr && expand()) + return ERR_MEMORY_ALLOCATION; // Memory allocation failed + e = m_free; + m_free = m_free->next; + } else { + e = it->second; + m_table_ids.erase(table_id); + } + e->table_id = table_id; + e->table = table; + m_table_ids.emplace(table_id, e); + + return 0; // All OK +} + +int table_mapping_flashback::remove_table(ulonglong table_id) { + auto it = m_table_ids.find(table_id); + if (it != m_table_ids.end()) { + /* we add this entry to the chain of free (free for use) entries */ + it->second->next = m_free; + m_free = it->second; + m_table_ids.erase(it); + return 0; // All OK + } + return 1; // No table to remove +} + +/* + Puts all entries into the list of free-for-use entries (does not free any + memory), and empties the hash. +*/ +void table_mapping_flashback::clear_tables() { + DBUG_TRACE; + for (const auto &key_and_value : m_table_ids) { + entry *e = key_and_value.second; + e->next = m_free; + m_free = e; + } + m_table_ids.clear(); +} diff --git a/sql/rpl_tblmap.h b/sql/rpl_tblmap.h index 5bbb2b0..0146e2e 100644 --- a/sql/rpl_tblmap.h +++ b/sql/rpl_tblmap.h @@ -31,6 +31,8 @@ /* Forward declarations */ #ifdef MYSQL_SERVER +class Table_map_log_event; + struct TABLE; typedef TABLE Mapped_table; @@ -96,4 +98,52 @@ class table_mapping { malloc_unordered_map m_table_ids; }; +class table_mapping_flashback { + private: + MEM_ROOT m_mem_root; + + public: + enum enum_error { + ERR_NO_ERROR = 0, + ERR_LIMIT_EXCEEDED, + ERR_MEMORY_ALLOCATION + }; + + table_mapping_flashback(); + ~table_mapping_flashback(); + + Table_map_log_event *get_table(ulonglong table_id); + + int set_table(ulonglong table_id, Table_map_log_event *table); + int remove_table(ulonglong table_id); + void clear_tables(); + ulong count() const { return static_cast(m_table_ids.size()); } + + private: + struct entry { + ulonglong table_id; + union { + Table_map_log_event *table; + entry *next; + }; + }; + + int expand(); + + /* + Head of the list of free entries; "free" in the sense that it's an + allocated entry free for use, NOT in the sense that it's freed + memory. + */ + entry *m_free; + + /* + Map from table ids (numbers) to Mapped_table objects. + + No destructor for entries passed here, as the entries are allocated in a + MEM_ROOT (freed as a whole in the destructor), they cannot be freed one by + one. + */ + malloc_unordered_map m_table_ids; +}; #endif diff --git a/sql/sp.cc b/sql/sp.cc index 06fc56b..2c08f74 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -2303,6 +2303,7 @@ uint sp_get_flags_for_command(LEX *lex) { case SQLCOM_SHOW_VARIABLES: case SQLCOM_SHOW_WARNS: case SQLCOM_REPAIR: + case SQLCOM_SHOW_FLASHBACK_STATUS: flags = sp_head::MULTI_RESULTS; break; /* diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index e7ace58..4f82d76 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -55,7 +55,7 @@ @retval 0 if the event type is ok. @retval 1 if the event type is not ok. */ -static int check_event_type(int type, Relay_log_info *rli) { +int check_event_type(int type, Relay_log_info *rli) { Format_description_log_event *fd_event = rli->get_rli_description_event(); switch (type) { diff --git a/sql/sql_binlog.h b/sql/sql_binlog.h index 6245565..69c7e75 100644 --- a/sql/sql_binlog.h +++ b/sql/sql_binlog.h @@ -22,9 +22,9 @@ #ifndef SQL_BINLOG_INCLUDED #define SQL_BINLOG_INCLUDED - +#include "sql/rpl_rli.h" class THD; void mysql_client_binlog_statement(THD *thd); - +int check_event_type(int type, Relay_log_info *rli); #endif /* SQL_BINLOG_INCLUDED */ diff --git a/sql/sql_lex.h b/sql/sql_lex.h index e1b210f..83ac36a 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -3736,6 +3736,7 @@ struct LEX : public Query_tables_list { char *help_arg; char *to_log; /* For PURGE MASTER LOGS TO */ const char *x509_subject, *x509_issuer, *ssl_cipher; + char *flash_back_to_gtid; /* For FLASHBACK TO GTID */ // Widcard from SHOW ... LIKE statements. String *wild; Query_result *result; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 063c473..5db0113 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -648,6 +648,7 @@ void init_sql_command_flags() { sql_command_flags[SQLCOM_SHOW_PROCESSLIST] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_GRANTS] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_CREATE_DB] = CF_STATUS_COMMAND; + sql_command_flags[SQLCOM_SHOW_FLASHBACK_STATUS] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_CREATE] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_MASTER_STAT] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_SLAVE_STAT] = CF_STATUS_COMMAND; @@ -796,6 +797,7 @@ void init_sql_command_flags() { sql_command_flags[SQLCOM_ALTER_TABLESPACE] |= CF_AUTO_COMMIT_TRANS; sql_command_flags[SQLCOM_CREATE_SRS] |= CF_AUTO_COMMIT_TRANS; sql_command_flags[SQLCOM_DROP_SRS] |= CF_AUTO_COMMIT_TRANS; + sql_command_flags[SQLCOM_FLASH_BACK] |= CF_AUTO_COMMIT_TRANS; /* The following statements can deal with temporary tables, @@ -906,6 +908,7 @@ void init_sql_command_flags() { sql_command_flags[SQLCOM_IMPORT] |= CF_DISALLOW_IN_RO_TRANS; sql_command_flags[SQLCOM_CREATE_SRS] |= CF_DISALLOW_IN_RO_TRANS; sql_command_flags[SQLCOM_DROP_SRS] |= CF_DISALLOW_IN_RO_TRANS; + sql_command_flags[SQLCOM_FLASH_BACK] |= CF_DISALLOW_IN_RO_TRANS; /* Mark statements that are allowed to be executed by the plugins. @@ -1057,7 +1060,7 @@ void init_sql_command_flags() { sql_command_flags[SQLCOM_END] |= CF_ALLOW_PROTOCOL_PLUGIN; sql_command_flags[SQLCOM_CREATE_SRS] |= CF_ALLOW_PROTOCOL_PLUGIN; sql_command_flags[SQLCOM_DROP_SRS] |= CF_ALLOW_PROTOCOL_PLUGIN; - + sql_command_flags[SQLCOM_FLASH_BACK] |= CF_ALLOW_PROTOCOL_PLUGIN; /* Mark DDL statements which require that auto-commit mode to be temporarily turned off. See sqlcom_needs_autocommit_off() for more details. @@ -4652,6 +4655,7 @@ int mysql_execute_command(THD *thd, bool first_level) { case SQLCOM_SHOW_ERRORS: case SQLCOM_SHOW_EVENTS: case SQLCOM_SHOW_FIELDS: + case SQLCOM_SHOW_FLASHBACK_STATUS: case SQLCOM_SHOW_FUNC_CODE: case SQLCOM_SHOW_GRANTS: case SQLCOM_SHOW_KEYS: @@ -4689,6 +4693,21 @@ int mysql_execute_command(THD *thd, bool first_level) { break; } + case SQLCOM_FLASH_BACK: { + if (global_gtid_mode.get() != Gtid_mode::ON) { + my_error(ER_FLASHBACK_ACCESS_DENIED_ERROR, MYF(0), "gtid_mode should set to on"); + goto error; + } + + if (!read_only) { + my_error(ER_FLASHBACK_ACCESS_DENIED_ERROR, MYF(0), "read_only should set to true"); + goto error; + } + + /* flashback to gtid */ + res = flash_back_to_gtid(thd, lex->flash_back_to_gtid); + break; + } case SQLCOM_ALTER_USER: { LEX_USER *user, *tmp_user; bool changing_own_password = false; diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 730cb0c..b25ad38 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -139,6 +139,7 @@ #include "sql_string.h" #include "template_utils.h" #include "thr_lock.h" +#include "sql/rpl_replica.h" /* @see dynamic_privileges_table.cc */ bool iterate_all_dynamic_privileges(THD *thd, @@ -646,6 +647,9 @@ bool Sql_cmd_show_replica_status::execute_inner(THD *thd) { return show_slave_status_cmd(thd); } +bool Sql_cmd_show_flashback_status::execute_inner(THD *thd) { + return show_flashback_status(thd); +} /** Try acquire high priority share metadata lock on a table (with optional wait for conflicting locks to go away). diff --git a/sql/sql_show.h b/sql/sql_show.h index 8edb859..eca5d7f 100644 --- a/sql/sql_show.h +++ b/sql/sql_show.h @@ -597,4 +597,9 @@ class Sql_cmd_show_warnings : public Sql_cmd_show_noplan { } }; +class Sql_cmd_show_flashback_status : public Sql_cmd_show_noplan { + public: + Sql_cmd_show_flashback_status() : Sql_cmd_show_noplan(SQLCOM_SHOW_FLASHBACK_STATUS) {} + bool execute_inner(THD *thd) override; +}; #endif /* SQL_SHOW_H */ diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index c01d1eb..40ce11f 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -1386,6 +1386,11 @@ void warn_about_deprecated_binary(THD *thd) %token GENERATE_SYM 1203 /* MYSQL */ /* + Tokens for flashback +*/ +%token FLASHBACK_SYM 1204 /* MYSQL */ + +/* Precedence rules used to resolve the ambiguity when using keywords as idents in the case e.g.: @@ -1877,6 +1882,7 @@ void warn_about_deprecated_binary(THD *thd) drop_srs_stmt explain_stmt explainable_stmt + flashback_stmt handler_stmt insert_stmt keycache_stmt @@ -1912,6 +1918,7 @@ void warn_about_deprecated_binary(THD *thd) show_engines_stmt show_errors_stmt show_events_stmt + show_flashback_stmt show_function_code_stmt show_function_status_stmt show_grants_stmt @@ -2355,6 +2362,7 @@ simple_statement: | drop_view_stmt { $$= nullptr; } | execute { $$= nullptr; } | explain_stmt + | flashback_stmt { $$= nullptr; } | flush { $$= nullptr; } | get_diagnostics { $$= nullptr; } | group_replication { $$= nullptr; } @@ -2408,6 +2416,7 @@ simple_statement: | show_engines_stmt | show_errors_stmt | show_events_stmt + | show_flashback_stmt | show_function_code_stmt | show_function_status_stmt | show_grants_stmt @@ -13887,6 +13896,13 @@ show_procedure_code_stmt: } ; +show_flashback_stmt: + SHOW FLASHBACK_SYM STATUS_SYM + { + $$ = NEW_PTN PT_show_flashback_status(@$); + } + ; + show_function_code_stmt: SHOW FUNCTION_SYM CODE_SYM sp_name { @@ -14306,6 +14322,22 @@ purge_option: } ; +/* flash back */ +flashback_stmt: + FLASHBACK_SYM flashback_option + { + LEX *lex= Lex; + lex->sql_command = SQLCOM_FLASH_BACK; + } + ; + +flashback_option: + TO_SYM TEXT_STRING_sys + { + Lex->flash_back_to_gtid = $2.str; + } + ; + /* kill threads */ kill: diff --git a/storage/perfschema/pfs.cc b/storage/perfschema/pfs.cc index 661bb32..a4754d8 100644 --- a/storage/perfschema/pfs.cc +++ b/storage/perfschema/pfs.cc @@ -4826,8 +4826,6 @@ PSI_idle_locker *pfs_start_idle_wait_v1(PSI_idle_locker_state *state, state->m_thread = reinterpret_cast(pfs_thread); flags = STATE_FLAG_THREAD; - assert(pfs_thread->m_events_statements_count == 0); - if (global_idle_class.m_timed) { timer_start = get_idle_timer(); state->m_timer_start = timer_start; --