From 3971115038ebbf66752abb37e0fd25e021dc88a6 Mon Sep 17 00:00:00 2001 From: willhan Date: Wed, 5 Jun 2019 20:33:44 +0800 Subject: [PATCH] Compressed binary log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We add new event types to support compress the binlog as follow: QUERY_COMPRESSED_EVENT, WRITE_ROWS_COMPRESSED_EVENT_V1, UPDATE_ROWS_COMPRESSED_EVENT_V1, DELETE_POWS_COMPRESSED_EVENT_V1, WRITE_ROWS_COMPRESSED_EVENT, UPDATE_ROWS_COMPRESSED_EVENT, DELETE_POWS_COMPRESSED_EVENT We introduce two option for this feature: "log_bin_compress " and "log_bin_compress_min_len", the former is a switch of whether the binlog should be compressed and the latter is the minimum length of sql statement(in statement mode) or record(in row mode) that can be compressed. In master, it can be described by the code: if binlog_format == statement { if log_bin_compress == true and query_len >= log_bin_compress_min_len create a Query_compressed_log_event; else create a Query_log_event; } if binlog_format == row { if log_bin_compress == true and record_len >= log_bin_compress_min_len create a Write_rows_compressed_log_event(when INSERT) else create a Write_log_event(when INSERT); } And in slave, the compressed binlog events would be converted to the uncompressed form in IO thread, such as QUERY_COMPRESSED_EVENT convert to QUERY_EVENT. So the SQL and worker threads can be stay unchanged. And the sql thread will be executed as fast as executing uncompressed binlog. Now, we use zlib as compressed algrithm. And the compressed record is Compressed Record(3 parts) Record Header: 1 Byte 0 Bit: Always 1, mean compressed; 1-3 Bit: Reversed, compressed algorithm。0 means zlib, It's always 0 by now. 4-7 Bit: Bytes of "Record Original Length" Record Original Length: 1-4 Bytes Compressed Buf: the real compressed part. The feture can reduce 42% ~ 70% binlog capacity in our production environment. --- client/mysqlbinlog.cc | 14 +- libbinlogevents/include/binlog_event.h | 9 + libbinlogevents/src/control_events.cpp | 13 + libbinlogevents/src/rows_event.cpp | 4 +- mysql-test/r/flush2.result | 4 + .../r/mysqlbinlog_row_compressed.result | 424 +++++++++++++ .../r/mysqlbinlog_stmt_compressed.result | 186 ++++++ mysql-test/r/mysqld--help-notwin.result | 6 + .../binlog/r/binlog_variables_log_bin.result | 4 + .../r/binlog_variables_log_bin_index.result | 4 + .../suite/perfschema/r/show_sanity.result | 4 + .../suite/rpl/r/rpl_binlog_compress.result | 70 +++ ...rpl_multi_source_corrupt_repository.result | 36 +- .../suite/rpl/t/rpl_binlog_compress.test | 61 ++ mysql-test/suite/sys_vars/r/all_vars.result | 4 + mysql-test/t/mysqlbinlog_row_compressed.test | 37 ++ mysql-test/t/mysqlbinlog_stmt_compressed.test | 39 ++ sql/binlog.cc | 65 +- sql/binlog.h | 6 + sql/log_event.cc | 566 +++++++++++++++++- sql/log_event.h | 104 ++++ sql/mysqld.cc | 2 + sql/mysqld.h | 2 + sql/rpl_slave.cc | 48 +- sql/rpl_trx_boundary_parser.cc | 7 + sql/share/errmsg-utf8.txt | 2 + sql/sql_binlog.cc | 6 + sql/sys_vars.cc | 11 + 28 files changed, 1695 insertions(+), 43 deletions(-) create mode 100644 mysql-test/r/mysqlbinlog_row_compressed.result create mode 100644 mysql-test/r/mysqlbinlog_stmt_compressed.result create mode 100644 mysql-test/suite/rpl/r/rpl_binlog_compress.result create mode 100644 mysql-test/suite/rpl/t/rpl_binlog_compress.test create mode 100644 mysql-test/t/mysqlbinlog_row_compressed.test create mode 100644 mysql-test/t/mysqlbinlog_stmt_compressed.test diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 98015b1865f..8247328789a 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -1535,9 +1535,15 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, case binary_log::WRITE_ROWS_EVENT: case binary_log::DELETE_ROWS_EVENT: case binary_log::UPDATE_ROWS_EVENT: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT: case binary_log::WRITE_ROWS_EVENT_V1: case binary_log::UPDATE_ROWS_EVENT_V1: case binary_log::DELETE_ROWS_EVENT_V1: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1: case binary_log::PRE_GA_WRITE_ROWS_EVENT: case binary_log::PRE_GA_DELETE_ROWS_EVENT: case binary_log::PRE_GA_UPDATE_ROWS_EVENT: @@ -1547,9 +1553,15 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, if (ev_type == binary_log::WRITE_ROWS_EVENT || ev_type == binary_log::DELETE_ROWS_EVENT || ev_type == binary_log::UPDATE_ROWS_EVENT || + ev_type == binary_log::WRITE_ROWS_COMPRESSED_EVENT || + ev_type == binary_log::DELETE_ROWS_COMPRESSED_EVENT || + ev_type == binary_log::UPDATE_ROWS_COMPRESSED_EVENT || ev_type == binary_log::WRITE_ROWS_EVENT_V1 || ev_type == binary_log::DELETE_ROWS_EVENT_V1 || - ev_type == binary_log::UPDATE_ROWS_EVENT_V1) + ev_type == binary_log::UPDATE_ROWS_EVENT_V1 || + ev_type == binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1 || + ev_type == binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1 || + ev_type == binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1) { Rows_log_event *new_ev= (Rows_log_event*) ev; if (new_ev->get_flags(Rows_log_event::STMT_END_F)) diff --git a/libbinlogevents/include/binlog_event.h b/libbinlogevents/include/binlog_event.h index bb4219f14cd..e59485531cc 100644 --- a/libbinlogevents/include/binlog_event.h +++ b/libbinlogevents/include/binlog_event.h @@ -326,6 +326,15 @@ enum Log_event_type /* Prepared XA transaction terminal event similar to Xid */ XA_PREPARE_LOG_EVENT= 38, + + /* Compressed binlog event. */ + QUERY_COMPRESSED_EVENT = 50, + WRITE_ROWS_COMPRESSED_EVENT_V1 = 51, + UPDATE_ROWS_COMPRESSED_EVENT_V1 = 52, + DELETE_ROWS_COMPRESSED_EVENT_V1 = 53, + WRITE_ROWS_COMPRESSED_EVENT = 54, + UPDATE_ROWS_COMPRESSED_EVENT = 55, + DELETE_ROWS_COMPRESSED_EVENT = 56, /** Add new events here - right above this comment! Existing events (except ENUM_END_EVENT) should never change their numbers diff --git a/libbinlogevents/src/control_events.cpp b/libbinlogevents/src/control_events.cpp index cbd7169c453..fa564e33de8 100644 --- a/libbinlogevents/src/control_events.cpp +++ b/libbinlogevents/src/control_events.cpp @@ -165,6 +165,19 @@ Format_description_event::Format_description_event(uint8_t binlog_ver, */ post_header_len.insert(post_header_len.begin(), server_event_header_length, server_event_header_length + number_of_event_types); + /* + Each time add a new type event, if the event are not continuous, + you must set the event head len explicit + */ + for (int i = PREVIOUS_GTIDS_LOG_EVENT; i < QUERY_COMPRESSED_EVENT; i++) + post_header_len[i] = 0; + post_header_len[QUERY_COMPRESSED_EVENT - 1] = QUERY_HEADER_LEN; + post_header_len[WRITE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1; + post_header_len[UPDATE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1; + post_header_len[DELETE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1; + post_header_len[WRITE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2; + post_header_len[UPDATE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2; + post_header_len[DELETE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2; // Sanity-check that all post header lengths are initialized. #ifndef DBUG_OFF for (int i= 0; i < number_of_event_types; i++) diff --git a/libbinlogevents/src/rows_event.cpp b/libbinlogevents/src/rows_event.cpp index 168652dfbb7..b30b65a540b 100644 --- a/libbinlogevents/src/rows_event.cpp +++ b/libbinlogevents/src/rows_event.cpp @@ -289,7 +289,9 @@ Rows_event::Rows_event(const char *buf, unsigned int event_len, columns_after_image= columns_before_image; if ((event_type == UPDATE_ROWS_EVENT) || - (event_type == UPDATE_ROWS_EVENT_V1)) + (event_type == UPDATE_ROWS_COMPRESSED_EVENT) || + (event_type == UPDATE_ROWS_EVENT_V1) || + (event_type == UPDATE_ROWS_COMPRESSED_EVENT_V1)) { columns_after_image.reserve((m_width + 7) / 8); columns_after_image.clear(); diff --git a/mysql-test/r/flush2.result b/mysql-test/r/flush2.result index 412e8363c42..67a48753317 100644 --- a/mysql-test/r/flush2.result +++ b/mysql-test/r/flush2.result @@ -4,6 +4,8 @@ show variables like 'log_bin%'; Variable_name Value log_bin OFF log_bin_basename +log_bin_compress OFF +log_bin_compress_min_len 256 log_bin_index log_bin_trust_function_creators ON log_bin_use_v1_row_events OFF @@ -22,6 +24,8 @@ show variables like 'log_bin%'; Variable_name Value log_bin OFF log_bin_basename +log_bin_compress OFF +log_bin_compress_min_len 256 log_bin_index log_bin_trust_function_creators ON log_bin_use_v1_row_events OFF diff --git a/mysql-test/r/mysqlbinlog_row_compressed.result b/mysql-test/r/mysqlbinlog_row_compressed.result new file mode 100644 index 00000000000..41b008c6d91 --- /dev/null +++ b/mysql-test/r/mysqlbinlog_row_compressed.result @@ -0,0 +1,424 @@ +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; +FLUSH BINARY LOGS; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at # +# # server id # end_log_pos # CRC32 # Start: binlog v #, server v # created # at startup +ROLLBACK/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Previous-GTIDs +# [empty] +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +use `test`/*!*/; +SET TIMESTAMP=#/*!*/; +SET @@session.pseudo_thread_id=#/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/; +SET @@session.sql_mode=1436549152/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C latin1 *//*!*/; +SET @@session.character_set_client=8,@@session.collation_connection=8,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t1` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Write_compressed_rows: table id # flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t1` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Write_compressed_rows: table id # flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t1` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Write_compressed_rows: table id # flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* MEDIUMINT meta=0 nullable=1 is_null=1 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t1` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Write_compressed_rows: table id # flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=0 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t2` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Write_compressed_rows: table id # flags: STMT_END_F +### INSERT INTO `test`.`t2` +### SET +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### INSERT INTO `test`.`t2` +### SET +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### INSERT INTO `test`.`t2` +### SET +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* INT meta=0 nullable=1 is_null=1 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### INSERT INTO `test`.`t2` +### SET +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=0 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t2` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Update_compressed_rows: table id # flags: STMT_END_F +### UPDATE `test`.`t2` +### WHERE +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### SET +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### UPDATE `test`.`t2` +### WHERE +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### SET +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### UPDATE `test`.`t2` +### WHERE +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* INT meta=0 nullable=1 is_null=1 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### SET +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t1` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Delete_compressed_rows: table id # flags: STMT_END_F +### DELETE FROM `test`.`t1` +### WHERE +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t1` +### WHERE +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### DELETE FROM `test`.`t1` +### WHERE +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* MEDIUMINT meta=0 nullable=1 is_null=1 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t1` +### WHERE +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=0 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=yes +/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Table_map: `test`.`t2` mapped to number # +# at # +# # server id # end_log_pos # CRC32 # Delete_compressed_rows: table id # flags: STMT_END_F +### DELETE FROM `test`.`t2` +### WHERE +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t2` +### WHERE +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### DELETE FROM `test`.`t2` +### WHERE +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t2` +### WHERE +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=0 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Rotate to master-bin.000002 pos: 4 +SET @@SESSION.GTID_NEXT= '#' /* added by mysqlbinlog */ /*!*/; +DELIMITER ; +# End of log file +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/; +DROP TABLE t1,t2; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/mysql-test/r/mysqlbinlog_stmt_compressed.result b/mysql-test/r/mysqlbinlog_stmt_compressed.result new file mode 100644 index 00000000000..86d35b77e7c --- /dev/null +++ b/mysql-test/r/mysqlbinlog_stmt_compressed.result @@ -0,0 +1,186 @@ +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +set binlog_format=statement; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; +FLUSH BINARY LOGS; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at # +# # server id # end_log_pos # CRC32 # Start: binlog v #, server v # created # at startup +ROLLBACK/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Previous-GTIDs +# [empty] +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +use `test`/*!*/; +SET TIMESTAMP=#/*!*/; +SET @@session.pseudo_thread_id=#/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/; +SET @@session.sql_mode=1436549152/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C latin1 *//*!*/; +SET @@session.character_set_client=8,@@session.collation_connection=8,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, "") +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL) +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A") +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A") +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +INSERT INTO t2 SELECT * FROM t1 +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +DELETE FROM t1 +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Anonymous_GTID last_committed=# sequence_number=# rbr_only=no +SET @@SESSION.GTID_NEXT= '#'/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +BEGIN +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Query_compressed thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=#/*!*/; +DELETE FROM t2 +/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Xid = # +COMMIT/*!*/; +# at # +# # server id # end_log_pos # CRC32 # Rotate to master-bin.000002 pos: 4 +SET @@SESSION.GTID_NEXT= '#' /* added by mysqlbinlog */ /*!*/; +DELIMITER ; +# End of log file +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/; +DROP TABLE t1,t2; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index a87c4634d28..1b6a3052338 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -406,6 +406,10 @@ The following options may be given as the first argument: strongly recommended to avoid replication problems if server's hostname changes) argument should be the chosen location for the binary log files. + --log-bin-compress Whether the binary log can be compressed + --log-bin-compress-min-len[=#] + Minimum length of sql statement(in statement mode) or + record(in row mode)that can be compressed. --log-bin-index=name File that holds the names for binary log files. --log-bin-trust-function-creators @@ -1381,6 +1385,8 @@ lc-time-names en_US local-infile TRUE lock-wait-timeout 31536000 log-bin (No default value) +log-bin-compress FALSE +log-bin-compress-min-len 256 log-bin-index (No default value) log-bin-trust-function-creators FALSE log-bin-use-v1-row-events FALSE diff --git a/mysql-test/suite/binlog/r/binlog_variables_log_bin.result b/mysql-test/suite/binlog/r/binlog_variables_log_bin.result index fbfd1cc2cd3..4ea240b3964 100644 --- a/mysql-test/suite/binlog/r/binlog_variables_log_bin.result +++ b/mysql-test/suite/binlog/r/binlog_variables_log_bin.result @@ -3,6 +3,10 @@ Variable_name log_bin Value ON Variable_name log_bin_basename Value MYSQLTEST_VARDIR/mysqld.1/data/other +Variable_name log_bin_compress +Value OFF +Variable_name log_bin_compress_min_len +Value 256 Variable_name log_bin_index Value MYSQLTEST_VARDIR/mysqld.1/data/other.index Variable_name log_bin_trust_function_creators diff --git a/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result b/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result index 368bc560e64..98a20e66f48 100644 --- a/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result +++ b/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result @@ -3,6 +3,10 @@ Variable_name log_bin Value ON Variable_name log_bin_basename Value MYSQLTEST_VARDIR/mysqld.1/data/other +Variable_name log_bin_compress +Value OFF +Variable_name log_bin_compress_min_len +Value 256 Variable_name log_bin_index Value MYSQLTEST_VARDIR/tmp/something.index Variable_name log_bin_trust_function_creators diff --git a/mysql-test/suite/perfschema/r/show_sanity.result b/mysql-test/suite/perfschema/r/show_sanity.result index 71bc92a2c2f..fcc531ab73e 100644 --- a/mysql-test/suite/perfschema/r/show_sanity.result +++ b/mysql-test/suite/perfschema/r/show_sanity.result @@ -413,6 +413,8 @@ SHOW_MODE SOURCE VARIABLE_NAME 5.6 I_S.SESSION_VARIABLES INNODB_DEADLOCK_DETECT 5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED 5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS +5.6 I_S.SESSION_VARIABLES LOG_BIN_COMPRESS +5.6 I_S.SESSION_VARIABLES LOG_BIN_COMPRESS_MIN_LEN 5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG 5.6 I_S.SESSION_VARIABLES TLS_VERSION @@ -441,6 +443,8 @@ SHOW_MODE SOURCE VARIABLE_NAME 5.6 I_S.SESSION_VARIABLES INNODB_DEADLOCK_DETECT 5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED 5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS +5.6 I_S.SESSION_VARIABLES LOG_BIN_COMPRESS +5.6 I_S.SESSION_VARIABLES LOG_BIN_COMPRESS_MIN_LEN 5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG 5.6 I_S.SESSION_VARIABLES TLS_VERSION diff --git a/mysql-test/suite/rpl/r/rpl_binlog_compress.result b/mysql-test/suite/rpl/r/rpl_binlog_compress.result new file mode 100644 index 00000000000..f6630315d06 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_binlog_compress.result @@ -0,0 +1,70 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +set @old_log_bin_compress=@@log_bin_compress; +set @old_log_bin_compress_min_len=@@log_bin_compress_min_len; +set @old_binlog_format=@@binlog_format; +set @old_binlog_row_image=@@binlog_row_image; +set global log_bin_compress=on; +set global log_bin_compress_min_len=10; +drop table if exists t1; +Warnings: +Note 1051 Unknown table 'test.t1' +CREATE TABLE t1 (pr_id int(10) unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY, pr_page int(11) NOT NULL, pr_type varbinary(60) NOT NULL, test int, UNIQUE KEY pr_pagetype (pr_page,pr_type)) ENGINE=myisam AUTO_INCREMENT=136; +set binlog_format=statement; +insert into t1 (pr_page, pr_type, test) values(1,"one",0),(2,"two",0); +replace into t1 (pr_page, pr_type,test) values(1,"one",2); +update t1 set test=test+1 where pr_page > 1; +delete from t1 where test=1; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +set binlog_format=row; +insert into t1 (pr_page, pr_type, test) values(3,"three",0),(4,"four",4),(5, "five", 0); +replace into t1 (pr_page, pr_type,test) values(3,"one",2); +update t1 set test=test+1 where pr_page > 3; +delete from t1 where test=1; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +142 3 one 2 +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +142 3 one 2 +set binlog_row_image=minimal; +insert into t1 (pr_page, pr_type, test) values(6,"six",0),(7,"seven",7),(8, "eight", 0); +replace into t1 (pr_page, pr_type,test) values(6,"six",2); +update t1 set test=test+1 where pr_page > 6; +delete from t1 where test=1; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +144 7 seven 8 +142 3 one 2 +146 6 six 2 +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +144 7 seven 8 +142 3 one 2 +146 6 six 2 +drop table t1; +set global log_bin_compress=@old_log_bin_compress; +set global log_bin_compress_min_len=@old_log_bin_compress_min_len; +set binlog_format=@old_binlog_format; +set binlog_row_image=@old_binlog_row_image; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_multi_source_corrupt_repository.result b/mysql-test/suite/rpl/r/rpl_multi_source_corrupt_repository.result index 0139c06b8db..35b5452657a 100644 --- a/mysql-test/suite/rpl/r/rpl_multi_source_corrupt_repository.result +++ b/mysql-test/suite/rpl/r/rpl_multi_source_corrupt_repository.result @@ -95,10 +95,10 @@ FLUSH RELAY LOGS; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_1] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return NULL for channel '' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return NULL for channel '' and position 825.] [connection server_1] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return NULL for channel '' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return NULL for channel '' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel ''. FLUSH RELAY LOGS FOR CHANNEL ''; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel ''. @@ -138,10 +138,10 @@ RESET SLAVE ALL FOR CHANNEL ''; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_2] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_2' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_2' and position 825.] [connection server_2] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_2' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_2' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel 'channel_2'. FLUSH RELAY LOGS FOR CHANNEL 'channel_2'; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel 'channel_2'. @@ -185,10 +185,10 @@ RESET SLAVE ALL FOR CHANNEL 'channel_2'; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_3] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_3' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_3' and position 825.] [connection server_3] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_3' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_3' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel 'channel_3'. FLUSH RELAY LOGS FOR CHANNEL 'channel_3'; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel 'channel_3'. @@ -316,10 +316,10 @@ FLUSH RELAY LOGS; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_1] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return 0 for channel '' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return 0 for channel '' and position 825.] [connection server_1] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return -1 for channel '' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return -1 for channel '' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel ''. FLUSH RELAY LOGS FOR CHANNEL ''; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel ''. @@ -363,10 +363,10 @@ RESET SLAVE ALL FOR CHANNEL ''; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_2] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_2' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_2' and position 825.] [connection server_2] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_2' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_2' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel 'channel_2'. FLUSH RELAY LOGS FOR CHANNEL 'channel_2'; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel 'channel_2'. @@ -406,10 +406,10 @@ RESET SLAVE ALL FOR CHANNEL 'channel_2'; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_3] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_3' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_3' and position 825.] [connection server_3] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_3' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_3' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel 'channel_3'. FLUSH RELAY LOGS FOR CHANNEL 'channel_3'; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel 'channel_3'. @@ -537,10 +537,10 @@ FLUSH RELAY LOGS; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_1] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return 0 for channel '' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return 0 for channel '' and position 825.] [connection server_1] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return -1 for channel '' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return -1 for channel '' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel ''. FLUSH RELAY LOGS FOR CHANNEL ''; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel ''. @@ -584,10 +584,10 @@ RESET SLAVE ALL FOR CHANNEL ''; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_2] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_2' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return 0 for channel 'channel_2' and position 825.] [connection server_2] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_2' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return -1 for channel 'channel_2' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel 'channel_2'. FLUSH RELAY LOGS FOR CHANNEL 'channel_2'; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel 'channel_2'. @@ -631,10 +631,10 @@ RESET SLAVE ALL FOR CHANNEL 'channel_2'; ===== Executing MASTER_POS_WAIT() and WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS(). [connection server_3] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_3' and position 807.] +include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_3' and position 825.] [connection server_3] [connection server_4] -include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_3' and position 808.] +include/assert.inc [MASTER_POS_WAIT should return NULL for channel 'channel_3' and position 826.] ===== Executing FLUSH RELAY LOGS FOR CHANNEL on channel 'channel_3'. FLUSH RELAY LOGS FOR CHANNEL 'channel_3'; ===== Executing SHOW RELAYLOG EVENTS FOR CHANNEL on channel 'channel_3'. diff --git a/mysql-test/suite/rpl/t/rpl_binlog_compress.test b/mysql-test/suite/rpl/t/rpl_binlog_compress.test new file mode 100644 index 00000000000..dc1ee3c07a7 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_binlog_compress.test @@ -0,0 +1,61 @@ +# +# Test of compressed binlog with replication +# +--source include/master-slave.inc +--source include/have_binlog_format_mixed_or_row.inc + +set @old_log_bin_compress=@@log_bin_compress; +set @old_log_bin_compress_min_len=@@log_bin_compress_min_len; +set @old_binlog_format=@@binlog_format; +set @old_binlog_row_image=@@binlog_row_image; + +set global log_bin_compress=on; +set global log_bin_compress_min_len=10; + +drop table if exists t1; +CREATE TABLE t1 (pr_id int(10) unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY, pr_page int(11) NOT NULL, pr_type varbinary(60) NOT NULL, test int, UNIQUE KEY pr_pagetype (pr_page,pr_type)) ENGINE=myisam AUTO_INCREMENT=136; + +set binlog_format=statement; +insert into t1 (pr_page, pr_type, test) values(1,"one",0),(2,"two",0); +replace into t1 (pr_page, pr_type,test) values(1,"one",2); +update t1 set test=test+1 where pr_page > 1; +delete from t1 where test=1; + +select * from t1; +sync_slave_with_master; +connection slave; +select * from t1; +connection master; + + +set binlog_format=row; +insert into t1 (pr_page, pr_type, test) values(3,"three",0),(4,"four",4),(5, "five", 0); +replace into t1 (pr_page, pr_type,test) values(3,"one",2); +update t1 set test=test+1 where pr_page > 3; +delete from t1 where test=1; + +select * from t1; +sync_slave_with_master; +connection slave; +select * from t1; +connection master; + + +set binlog_row_image=minimal; +insert into t1 (pr_page, pr_type, test) values(6,"six",0),(7,"seven",7),(8, "eight", 0); +replace into t1 (pr_page, pr_type,test) values(6,"six",2); +update t1 set test=test+1 where pr_page > 6; +delete from t1 where test=1; + +select * from t1; +sync_slave_with_master; +connection slave; +select * from t1; +connection master; +drop table t1; + +set global log_bin_compress=@old_log_bin_compress; +set global log_bin_compress_min_len=@old_log_bin_compress_min_len; +set binlog_format=@old_binlog_format; +set binlog_row_image=@old_binlog_row_image; +--source include/rpl_end.inc diff --git a/mysql-test/suite/sys_vars/r/all_vars.result b/mysql-test/suite/sys_vars/r/all_vars.result index 5ddc39a3594..1f1a37298e9 100644 --- a/mysql-test/suite/sys_vars/r/all_vars.result +++ b/mysql-test/suite/sys_vars/r/all_vars.result @@ -17,6 +17,10 @@ DISABLED_STORAGE_ENGINES DISABLED_STORAGE_ENGINES KEYRING_OPERATIONS KEYRING_OPERATIONS +LOG_BIN_COMPRESS +LOG_BIN_COMPRESS +LOG_BIN_COMPRESS_MIN_LEN +LOG_BIN_COMPRESS_MIN_LEN TLS_VERSION TLS_VERSION drop table t1; diff --git a/mysql-test/t/mysqlbinlog_row_compressed.test b/mysql-test/t/mysqlbinlog_row_compressed.test new file mode 100644 index 00000000000..c0952834416 --- /dev/null +++ b/mysql-test/t/mysqlbinlog_row_compressed.test @@ -0,0 +1,37 @@ +# +# Test for compressed row event +# + +--source include/have_log_bin.inc +--source include/have_binlog_format_row.inc + +# +# +# mysqlbinlog: comprssed row event +# +# + +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; + +--let $binlog = query_get_value(SHOW MASTER STATUS, File, 1) +--let $datadir = `SELECT @@datadir` + +FLUSH BINARY LOGS; +--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR +--replace_regex /TIMESTAMP=[0-9]*/TIMESTAMP=#/ /#[0-9]*[ ]*[0-9]*:[0-9]*:[0-9]* server id [0-9]*/# # server id #/ /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /last_committed=[0-9]*/last_committed=#/ /sequence_number=[0-9]*/sequence_number=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #/ /Start: binlog v [0-9]*/Start: binlog v #/ /created [0-9]*[ ]*[0-9]*:[0-9]*:[0-9]* at startup/created # at startup/ /(@[0-9]*=[0-9-]*[.][0-9]{1,3})[0-9e+-]*[^ ]*[ ]*(.*(FLOAT|DOUBLE).*[*].)/\1... \2/ /SET @@SESSION.GTID_NEXT= '.*'/SET @@SESSION.GTID_NEXT= '#'/ /CRC32 0x[0-9a-f]{8}/CRC32 #/ /# [a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}:/# #:/ +--exec $MYSQL_BINLOG --verbose --verbose --base64-output=DECODE-ROWS $datadir/$binlog + +DROP TABLE t1,t2; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/mysql-test/t/mysqlbinlog_stmt_compressed.test b/mysql-test/t/mysqlbinlog_stmt_compressed.test new file mode 100644 index 00000000000..2e5cc64cc71 --- /dev/null +++ b/mysql-test/t/mysqlbinlog_stmt_compressed.test @@ -0,0 +1,39 @@ +# +# Test for compressed query event +# +--source include/have_log_bin.inc +#--source include/have_binlog_format_statement.inc + +# +# +# mysqlbinlog: comprssed query event +# +# + +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +set binlog_format=statement; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; + +--let $binlog = query_get_value(SHOW MASTER STATUS, File, 1) +--let $datadir = `SELECT @@datadir` + +FLUSH BINARY LOGS; +--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR +--replace_regex /TIMESTAMP=[0-9]*/TIMESTAMP=#/ /#[0-9]*[ ]*[0-9]*:[0-9]*:[0-9]* server id [0-9]*/# # server id #/ /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /last_committed=[0-9]*/last_committed=#/ /sequence_number=[0-9]*/sequence_number=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #/ /Start: binlog v [0-9]*/Start: binlog v #/ /created [0-9]*[ ]*[0-9]*:[0-9]*:[0-9]* at startup/created # at startup/ /(@[0-9]*=[0-9-]*[.][0-9]{1,3})[0-9e+-]*[^ ]*[ ]*(.*(FLOAT|DOUBLE).*[*].)/\1... \2/ /SET @@SESSION.GTID_NEXT= '.*'/SET @@SESSION.GTID_NEXT= '#'/ /CRC32 0x[0-9a-f]{8}/CRC32 #/ /# [a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}:/# #:/ +--exec $MYSQL_BINLOG --verbose --verbose --base64-output=DECODE-ROWS $datadir/$binlog + + + +DROP TABLE t1,t2; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/sql/binlog.cc b/sql/binlog.cc index 3aff8b69432..274e0aecb0e 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -9843,11 +9843,13 @@ int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle, while ((ev= Log_event::read_log_event(log, 0, fdle, TRUE)) && ev->is_valid()) { - if (ev->get_type_code() == binary_log::QUERY_EVENT && + if ((ev->get_type_code() == binary_log::QUERY_EVENT || + ev->get_type_code() == binary_log::QUERY_COMPRESSED_EVENT) && !strcmp(((Query_log_event*)ev)->query, "BEGIN")) in_transaction= TRUE; - if (ev->get_type_code() == binary_log::QUERY_EVENT && + if ((ev->get_type_code() == binary_log::QUERY_EVENT || + ev->get_type_code() == binary_log::QUERY_COMPRESSED_EVENT) && !strcmp(((Query_log_event*)ev)->query, "COMMIT")) { DBUG_ASSERT(in_transaction == TRUE); @@ -11591,10 +11593,15 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, size_t const len= pack_row(table, table->write_set, row_data, record); - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, len, is_trans, - static_cast(0), - extra_row_info); + Rows_log_event* ev; + if (binlog_should_compress(len)) + ev= binlog_prepare_pending_rows_event(table, server_id, len, is_trans, + static_cast(0), + extra_row_info); + else + ev= binlog_prepare_pending_rows_event(table, server_id, len, is_trans, + static_cast(0), + extra_row_info); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; @@ -11644,11 +11651,17 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, DBUG_DUMP("before_row", before_row, before_size); DBUG_DUMP("after_row", after_row, after_size); - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, - before_size + after_size, is_trans, - static_cast(0), - extra_row_info); + Rows_log_event* ev; + if (binlog_should_compress(before_size + after_size)) + ev= binlog_prepare_pending_rows_event(table, server_id, + before_size + after_size, is_trans, + static_cast(0), + extra_row_info); + else + ev= binlog_prepare_pending_rows_event(table, server_id, + before_size + after_size, is_trans, + static_cast(0), + extra_row_info); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; @@ -11699,10 +11712,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8); size_t const len= pack_row(table, table->read_set, row_data, record); - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, len, is_trans, - static_cast(0), - extra_row_info); + Rows_log_event* ev; + if (binlog_should_compress(len)) + ev= binlog_prepare_pending_rows_event(table, server_id, len, is_trans, + static_cast(0), + extra_row_info); + else + ev= binlog_prepare_pending_rows_event(table, server_id, len, is_trans, + static_cast(0), + extra_row_info); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; @@ -12139,15 +12157,26 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, const char *query_arg, flush the pending rows event if necessary. */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, - suppress_use, errcode); + int error = 0; + if (binlog_should_compress(query_len)) + { + Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error = mysql_bin_log.write_event(&qinfo); + } + else + { + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error = mysql_bin_log.write_event(&qinfo); + } + /* Binlog table maps will be irrelevant after a Query_log_event (they are just removed on the slave side) so after the query log event is written to the binary log, we pretend that no table maps were written. */ - int error= mysql_bin_log.write_event(&qinfo); binlog_table_maps= 0; DBUG_RETURN(error); } diff --git a/sql/binlog.h b/sql/binlog.h index 8cd1c0f559c..f5a15643c92 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -1062,4 +1062,10 @@ inline bool normalize_binlog_name(char *to, const char *from, bool is_relay_log) end: DBUG_RETURN(error); } + +inline bool binlog_should_compress(ulong len) +{ + return opt_bin_log_compress && len >= opt_bin_log_compress_min_len; +} + #endif /* BINLOG_H_INCLUDED */ diff --git a/sql/log_event.cc b/sql/log_event.cc index 32ce3b9a9c1..5667fed9baa 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -562,6 +562,270 @@ char *str_to_hex(char *to, const char *from, size_t len) return to; // pointer to end 0 of 'to' } + +#define BINLOG_COMPRESSED_HEADER_LEN 1 +#define BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES 4 +/* + Compressed Record + Record Header: 1 Byte + 0 Bit: Always 1, mean compressed; + 1-3 Bit: Reversed, compressed algorithm. Always 0, means zlib + 4-7 Bit: Bytes of "Record Original Length" + Record Original Length: 1-4 Bytes + Compressed Buf: +*/ + +/* + Get the length of compress content. +*/ +size_t binlog_get_compress_len(size_t len) +{ + /* 5 for the begin content, 1 reserved for a '\0'*/ + return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES) + + compressBound(len) + 1); +} + +/* + Compress buf from 'src' to 'dst'. + + Note: 1) Then the caller should guarantee the length of 'dst', which + can be got by binlog_get_uncompress_len, is enough to hold + the content uncompressed. + 2) The 'comlen' should stored the length of 'dst', and it will + be set as the size of compressed content after return. + + return zero if successful, others otherwise. +*/ +int binlog_buf_compress(const char *src, char *dst, size_t len, size_t *comlen) +{ + uchar lenlen; + if (len & 0xff000000) + { + dst[1] = uchar(len >> 24); + dst[2] = uchar(len >> 16); + dst[3] = uchar(len >> 8); + dst[4] = uchar(len); + lenlen = 4; + } + else if (len & 0x00FF0000) + { + dst[1] = uchar(len >> 16); + dst[2] = uchar(len >> 8); + dst[3] = uchar(len); + lenlen = 3; + } + else if (len & 0x0000ff00) + { + dst[1] = uchar(len >> 8); + dst[2] = uchar(len); + lenlen = 2; + } + else + { + dst[1] = uchar(len); + lenlen = 1; + } + dst[0] = 0x80 | (lenlen & 0x07); + + uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1; + if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen, + (const Bytef *)src, (uLongf)len) != Z_OK) + { + return 1; + } + *comlen = (size_t)tmplen + BINLOG_COMPRESSED_HEADER_LEN + lenlen; + return 0; +} + +/* + Convert a query_compressed_log_event to query_log_event + from 'src' to 'dst'(malloced inside), the size after compress + stored in 'newlen'. + + @Warning: + 1)The caller should call my_free to release 'dst'. + + return zero if successful, others otherwise. +*/ + +int query_event_uncompress(const Format_description_log_event *description_event, + bool contain_checksum, + const char *src, + char **dst, + ulong *newlen) +{ + ulong len = uint4korr(src + EVENT_LEN_OFFSET); + const char *tmp = src; + + uint8 common_header_len = description_event->common_header_len; + uint8 post_header_len = description_event->post_header_len[binary_log::QUERY_COMPRESSED_EVENT - 1]; + + tmp += common_header_len; + + uint db_len = (uint)tmp[Query_log_event::Q_DB_LEN_OFFSET]; + uint16 status_vars_len = uint2korr(tmp + Query_log_event::Q_STATUS_VARS_LEN_OFFSET); + + tmp += post_header_len + status_vars_len + db_len + 1; + + size_t un_len = binlog_get_uncompress_len(tmp); + *newlen = (tmp - src) + un_len; + if (contain_checksum) + *newlen += BINLOG_CHECKSUM_LEN; + + *dst = (char *)my_malloc(key_memory_log_event, *newlen, MYF(MY_FAE)); + if (!*dst) + { + return 1; + } + + /* copy the head*/ + memcpy(*dst, src, tmp - src); + if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len)) + { + my_free(*dst); + return 1; + } + + (*dst)[EVENT_TYPE_OFFSET] = binary_log::QUERY_EVENT; + int4store(*dst + EVENT_LEN_OFFSET, *newlen); + if (contain_checksum) + { + ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; + int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len)); + } + return 0; +} + +int Row_log_event_uncompress(const Format_description_log_event *description_event, + bool contain_checksum, + const char *src, + char **dst, + ulong *newlen) +{ + Log_event_type type = (Log_event_type)src[EVENT_TYPE_OFFSET]; + ulong len = uint4korr(src + EVENT_LEN_OFFSET); + const char *tmp = src; + + uint8 common_header_len = description_event->common_header_len; + uint8 post_header_len = description_event->post_header_len[type - 1]; + + tmp += common_header_len + description_event->ROWS_HEADER_LEN_V1; + if (post_header_len == description_event->ROWS_HEADER_LEN_V2) + { + /* + Have variable length header, check length, + which includes length bytes + */ + uint16 var_header_len = uint2korr(tmp); + assert(var_header_len >= 2); + + /* skip over var-len header, extracting 'chunks' */ + tmp += var_header_len; + type = (Log_event_type)(type - binary_log::WRITE_ROWS_COMPRESSED_EVENT + binary_log::WRITE_ROWS_EVENT); + } + else + { + type = (Log_event_type)(type - binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1 + binary_log::WRITE_ROWS_EVENT_V1); + } + + + ulong m_width = net_field_length((uchar **)&tmp); + tmp += (m_width + 7) / 8; + + if (type == binary_log::UPDATE_ROWS_EVENT_V1 || type == binary_log::UPDATE_ROWS_EVENT) + { + tmp += (m_width + 7) / 8; + } + + size_t un_len = binlog_get_uncompress_len(tmp); + *newlen = (tmp - src) + un_len; + if (contain_checksum) + *newlen += BINLOG_CHECKSUM_LEN; + + *dst = (char *)my_malloc(key_memory_log_event, *newlen, MYF(MY_FAE)); + if (!*dst) + { + return 1; + } + + /* copy the head*/ + memcpy(*dst, src, tmp - src); + if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len)) + { + my_free(*dst); + return 1; + } + + (*dst)[EVENT_TYPE_OFFSET] = type; + int4store(*dst + EVENT_LEN_OFFSET, *newlen); + if (contain_checksum) + { + ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; + int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len)); + } + return 0; +} + +/* + Get the length of uncompress content. +*/ + +size_t binlog_get_uncompress_len(const char *buf) +{ + size_t lenlen = buf[0] & 0x07; + size_t len = 0; + switch (lenlen) + { + case 1: + len = uchar(buf[1]); + break; + case 2: + len = uchar(buf[1]) << 8 | uchar(buf[2]); + break; + case 3: + len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]); + break; + case 4: + len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 | + uchar(buf[3]) << 8 | uchar(buf[4]); + break; + default: + DBUG_ASSERT(lenlen >= 1 && lenlen <= 4); + break; + } + return len; +} + +/* + Uncompress the content in 'src' with length of 'len' to 'dst'. + + Note: 1) Then the caller should guarantee the length of 'dst' (which + can be got by statement_get_uncompress_len) is enough to hold + the content uncompressed. + 2) The 'newlen' should stored the length of 'dst', and it will + be set as the size of uncompressed content after return. + + return zero if successful, others otherwise. +*/ +int binlog_buf_uncompress(const char *src, char *dst, size_t len, size_t *newlen) +{ + if ((src[0] & 0x80) == 0) + { + return 1; + } + + size_t lenlen = src[0] & 0x07; + uLongf buflen = *newlen; + if (uncompress((Bytef *)dst, &buflen, (const Bytef*)src + 1 + lenlen, len) != Z_OK) + { + return 1; + } + + *newlen = (size_t)buflen; + return 0; +} + #ifndef MYSQL_CLIENT /** @@ -682,6 +946,13 @@ const char* Log_event::get_type_str(Log_event_type type) case binary_log::TRANSACTION_CONTEXT_EVENT: return "Transaction_context"; case binary_log::VIEW_CHANGE_EVENT: return "View_change"; case binary_log::XA_PREPARE_LOG_EVENT: return "XA_prepare"; + case binary_log::QUERY_COMPRESSED_EVENT: return "Query_compressed"; + case binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1: return "Write_rows_compressed_v1"; + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1: return "Update_rows_compressed_v1"; + case binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1: return "Delete_rows_compressed_v1"; + case binary_log::WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed"; + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed"; + case binary_log::DELETE_ROWS_COMPRESSED_EVENT: return "Delete_rows_compressed"; default: return "Unknown"; /* impossible */ } } @@ -1623,6 +1894,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, ev = new Query_log_event(buf, event_len, description_event, binary_log::QUERY_EVENT); break; + case binary_log::QUERY_COMPRESSED_EVENT: + ev = new Query_compressed_log_event(buf, event_len, description_event, binary_log::QUERY_COMPRESSED_EVENT); + break; case binary_log::LOAD_EVENT: case binary_log::NEW_LOAD_EVENT: #ifndef DBUG_OFF @@ -1733,6 +2007,18 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case binary_log::VIEW_CHANGE_EVENT: ev = new View_change_log_event(buf, event_len, description_event); break; + case binary_log::WRITE_ROWS_COMPRESSED_EVENT: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1: + ev = new Write_rows_compressed_log_event(buf, event_len, description_event); + break; + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1: + ev = new Update_rows_compressed_log_event(buf, event_len, description_event); + break; + case binary_log::DELETE_ROWS_COMPRESSED_EVENT: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1: + ev = new Delete_rows_compressed_log_event(buf, event_len, description_event); + break; #endif case binary_log::XA_PREPARE_LOG_EVENT: ev= new XA_prepare_log_event(buf, description_event); @@ -2685,6 +2971,27 @@ void Log_event::print_base64(IO_CACHE* file, &fd_evt); break; } + case binary_log::WRITE_ROWS_COMPRESSED_EVENT: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1: + { + ev = new Write_rows_compressed_log_event((const char*)ptr, size, + glob_description_event); + break; + } + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1: + { + ev = new Update_rows_compressed_log_event((const char*)ptr, size, + glob_description_event); + break; + } + case binary_log::DELETE_ROWS_COMPRESSED_EVENT: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1: + { + ev = new Delete_rows_compressed_log_event((const char*)ptr, size, + glob_description_event); + break; + } default: break; } @@ -2766,6 +3073,7 @@ bool Log_event::contains_partition_info(bool end_group_sets_max_dbs) break; case binary_log::QUERY_EVENT: + case binary_log::QUERY_COMPRESSED_EVENT: { Query_log_event *qev= static_cast(this); if ((ends_group() && end_group_sets_max_dbs) || @@ -3883,6 +4191,23 @@ bool Query_log_event::write(IO_CACHE* file) write_footer(file)) ? 1 : 0; } +bool Query_compressed_log_event::write(IO_CACHE* file) +{ + const char *query_tmp = query; + size_t q_len_tmp = q_len; + bool ret = true; + q_len = binlog_get_compress_len(q_len); + query = (char *)my_malloc(key_memory_log_event, q_len, MYF(MY_FAE)); + if (query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len)) + { + ret = Query_log_event::write(file); + } + my_free((void *)query); + query = query_tmp; + q_len = q_len_tmp; + return ret; +} + /** The simplest constructor that could possibly work. This is used for @@ -4157,6 +4482,33 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu", (ulong) flags2, (ulonglong) sql_mode)); } + +Query_compressed_log_event::Query_compressed_log_event(THD* thd_arg, + const char* query_arg, + ulong query_length, + bool using_trans, + bool direct, + bool suppress_use, + int errcode) + : binary_log::Query_event(query_arg, + thd_arg->catalog().str, + thd_arg->db().str, + query_length, + thd_arg->thread_id(), + thd_arg->variables.sql_mode, + thd_arg->variables.auto_increment_increment, + thd_arg->variables.auto_increment_offset, + thd_arg->variables.lc_time_names->number, + (ulonglong)thd_arg->table_map_for_update, + errcode, + thd_arg->db().str ? strlen(thd_arg->db().str) : 0, + thd_arg->catalog().str ? strlen(thd_arg->catalog().str) : 0), + Query_log_event(thd_arg, query_arg, query_length, using_trans, + direct, suppress_use, errcode), + query_buf(0) +{ + common_header->type_code = binary_log::QUERY_COMPRESSED_EVENT; +} #endif /* MYSQL_CLIENT */ @@ -4223,6 +4575,31 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, DBUG_VOID_RETURN; } +Query_compressed_log_event::Query_compressed_log_event(const char *buf, + uint event_len, + const Format_description_log_event + *description_event, + Log_event_type event_type) + :binary_log::Query_event(buf, event_len, description_event, event_type), + Query_log_event(buf, event_len, description_event, event_type), + query_buf(NULL) +{ + if (query) + { + size_t un_len = binlog_get_uncompress_len(query); + query_buf = (Log_event_header::Byte*)my_malloc(key_memory_log_event, un_len + 1, MYF(MY_WME)); //reserve one byte for '\0' + if (query_buf && !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len)) + { + query_buf[un_len] = 0; + query = (const char *)query_buf; + q_len = un_len; + } + else + { + query = 0; + } + } +} #ifdef MYSQL_CLIENT /** @@ -9307,7 +9684,9 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, m_cols_ai.bitmap= m_cols.bitmap; //See explanation below while setting is_valid. if ((m_type == binary_log::UPDATE_ROWS_EVENT) || - (m_type == binary_log::UPDATE_ROWS_EVENT_V1)) + (m_type == binary_log::UPDATE_ROWS_COMPRESSED_EVENT) || + (m_type == binary_log::UPDATE_ROWS_EVENT_V1) || + (m_type == binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1)) { /* if bitmap_init fails, is_valid will be set to false*/ if (likely(!bitmap_init(&m_cols_ai, @@ -9365,6 +9744,30 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, DBUG_VOID_RETURN; } +void Rows_log_event::uncompress_buf() +{ + size_t un_len = binlog_get_uncompress_len((char *)m_rows_buf); + uchar *new_buf = (uchar*)my_malloc(key_memory_log_event, un_len, MYF(MY_WME)); + if (new_buf) + { + if (!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf, m_rows_cur - m_rows_buf, &un_len)) + { + m_rows_buf = new_buf; +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + m_curr_row = m_rows_buf; +#endif + m_rows_end = m_rows_buf + un_len; + m_rows_cur = m_rows_end; + return; + } + else + { + my_free(new_buf); + } + } + m_cols.bitmap = 0; // catch it in is_valid +} + Rows_log_event::~Rows_log_event() { if (m_cols.bitmap) @@ -9388,7 +9791,7 @@ size_t Rows_log_event::get_data_size() (m_rows_cur - m_rows_buf);); int data_size= 0; - bool is_v2_event= common_header->type_code > binary_log::DELETE_ROWS_EVENT_V1; + bool is_v2_event= LOG_EVENT_IS_ROW_V2(common_header->type_code); if (is_v2_event) { data_size= Binary_log_event::ROWS_HEADER_LEN_V2 + @@ -11622,6 +12025,27 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) return res; } + +bool Rows_log_event::write_compressed(IO_CACHE *file) +{ + uchar *m_rows_buf_tmp = m_rows_buf; + uchar *m_rows_cur_tmp = m_rows_cur; + bool ret = true; + size_t comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp); + m_rows_buf = (uchar *)my_malloc(key_memory_log_event, comlen, MYF(MY_FAE)); + if (m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, + (char *)m_rows_buf, + m_rows_cur_tmp - m_rows_buf_tmp, + &comlen)) + { + m_rows_cur = comlen + m_rows_buf; + ret = Log_event::write(file); + } + my_free(m_rows_buf); + m_rows_buf = m_rows_buf_tmp; + m_rows_cur = m_rows_cur_tmp; + return ret; +} #endif #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) @@ -12254,6 +12678,26 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg, { common_header->type_code= m_type; } + +Write_rows_compressed_log_event::Write_rows_compressed_log_event(THD *thd_arg, + TABLE *tbl_arg, + const Table_id& tid_arg, + bool is_transactional, + const uchar* extra_row_info) + : binary_log::Rows_event(m_type), + Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional, + extra_row_info) +{ + m_type = log_bin_use_v1_row_events ? + binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1 : + binary_log::WRITE_ROWS_COMPRESSED_EVENT; + common_header->type_code = m_type; +} + +bool Write_rows_compressed_log_event::write(IO_CACHE *file) +{ + return Rows_log_event::write_compressed(file); +} #endif /* @@ -12269,6 +12713,16 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len, { DBUG_ASSERT(header()->type_code == m_type); } + +Write_rows_compressed_log_event::Write_rows_compressed_log_event(const char *buf, + uint event_len, + const Format_description_log_event + *description_event) + : binary_log::Rows_event(buf, event_len, description_event), + Write_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -12788,6 +13242,23 @@ void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) {DBUG_SET("+d,simulate_my_b_fill_error");}); Rows_log_event::print_helper(file, print_event_info, "Write_rows"); } + +void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) +{ + char *new_buf; + ulong len; + if (!Row_log_event_uncompress(glob_description_event, + common_footer->checksum_alg, temp_buf, &new_buf, &len)) + { + my_free(temp_buf); + temp_buf = new_buf; + Rows_log_event::print_helper(file, print_event_info, "Write_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, "ERROR: uncompress write_compressed_rows failed\n"); + } +} #endif /************************************************************************** @@ -12813,6 +13284,24 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, { common_header->type_code= m_type; } + +Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(THD *thd_arg, + TABLE *tbl_arg, + const Table_id& tid_arg, + bool is_transactional, + const uchar* extra_row_info) + :binary_log::Rows_event(m_type), + Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional, extra_row_info) +{ + m_type = log_bin_use_v1_row_events ? binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1 : + binary_log::DELETE_ROWS_COMPRESSED_EVENT; + common_header->type_code = m_type; +} + +bool Delete_rows_compressed_log_event::write(IO_CACHE *file) +{ + return Rows_log_event::write_compressed(file); +} #endif /* #if !defined(MYSQL_CLIENT) */ /* @@ -12828,6 +13317,16 @@ Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len, { DBUG_ASSERT(header()->type_code == m_type); } + +Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(const char *buf, + uint event_len, + const Format_description_log_event + *description_event) + : binary_log::Rows_event(buf, event_len, description_event), + Delete_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -12886,6 +13385,23 @@ void Delete_rows_log_event::print(FILE *file, { Rows_log_event::print_helper(file, print_event_info, "Delete_rows"); } + +void Delete_rows_compressed_log_event::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + char *new_buf; + ulong len; + if (!Row_log_event_uncompress(glob_description_event, common_footer->checksum_alg, temp_buf, &new_buf, &len)) + { + my_free(temp_buf); + temp_buf = new_buf; + Rows_log_event::print_helper(file, print_event_info, "Delete_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, "ERROR: uncompress delete_compressed_rows failed\n"); + } +} #endif @@ -12914,6 +13430,25 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, is_valid_param= true; } +Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, + TABLE *tbl_arg, + const Table_id& tid, + bool is_transactional, + const uchar* extra_row_info) + : binary_log::Rows_event(m_type), + Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional, extra_row_info) +{ + m_type = log_bin_use_v1_row_events ? binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1 : + binary_log::UPDATE_ROWS_COMPRESSED_EVENT; + common_header->type_code = m_type; +} + +bool Update_rows_compressed_log_event::write(IO_CACHE* file) +{ + return Rows_log_event::write_compressed(file); +} + + void Update_rows_log_event::init(MY_BITMAP const *cols) { /* if bitmap_init fails, caught in is_valid() */ @@ -12958,6 +13493,16 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, is_valid_param= true; DBUG_ASSERT(header()->type_code== m_type); } + +Update_rows_compressed_log_event::Update_rows_compressed_log_event(const char *buf, + uint event_len, + const Format_description_log_event + *description_event) + : binary_log::Rows_event(buf, event_len, description_event), + Update_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -13046,6 +13591,23 @@ void Update_rows_log_event::print(FILE *file, { Rows_log_event::print_helper(file, print_event_info, "Update_rows"); } + +void Update_rows_compressed_log_event::print(FILE *file, + PRINT_EVENT_INFO *print_event_info) +{ + char *new_buf; + ulong len; + if (!Row_log_event_uncompress(glob_description_event, common_footer->checksum_alg, temp_buf, &new_buf, &len)) + { + my_free(temp_buf); + temp_buf = new_buf; + Rows_log_event::print_helper(file, print_event_info, "Update_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, "ERROR: uncompress update_compressed_rows failed\n"); + } +} #endif diff --git a/sql/log_event.h b/sql/log_event.h index 00a56118f6b..ca31fe89283 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -321,6 +321,12 @@ struct sql_ex_info #endif #undef EXPECTED_OPTIONS /* You shouldn't use this one */ +#define LOG_EVENT_IS_ROW_V2(type) \ + ((type >= binary_log::WRITE_ROWS_EVENT && \ + type <= binary_log::DELETE_ROWS_EVENT) || \ + (type >= binary_log::WRITE_ROWS_COMPRESSED_EVENT && \ + type <= binary_log::DELETE_ROWS_COMPRESSED_EVENT)) + /** Maximum value of binlog logical timestamp. */ @@ -1107,6 +1113,7 @@ class Log_event { DBUG_ASSERT(ends_group() || get_type_code() == binary_log::QUERY_EVENT || + get_type_code() == binary_log::QUERY_COMPRESSED_EVENT || get_type_code() == binary_log::EXEC_LOAD_EVENT || get_type_code() == binary_log::EXECUTE_LOAD_QUERY_EVENT); common_header->flags |= LOG_EVENT_MTS_ISOLATE_F; @@ -1540,6 +1547,28 @@ class Query_log_event: public virtual binary_log::Query_event, public Log_event } }; +class Query_compressed_log_event : public Query_log_event +{ +protected: + Log_event_header::Byte* query_buf; +public: + Query_compressed_log_event(const char* buf, uint event_len, + const Format_description_log_event *description_event, + Log_event_type event_type); + ~Query_compressed_log_event() + { + if (query_buf) + my_free(query_buf); + } + Log_event_type get_type_code() { return binary_log::QUERY_COMPRESSED_EVENT; } +#ifdef MYSQL_SERVER + Query_compressed_log_event(THD* thd_arg, const char* query_arg, + ulong query_length, + bool using_trans, bool direct, + bool suppress_use, int error); + virtual bool write(IO_CACHE* file); +#endif +}; /** @class Load_log_event @@ -3029,6 +3058,7 @@ class Rows_log_event : public virtual binary_log::Rows_event, public Log_event #ifdef MYSQL_SERVER virtual bool write_data_header(IO_CACHE *file); virtual bool write_data_body(IO_CACHE *file); + virtual bool write_compressed(IO_CACHE *file); virtual const char *get_db() { return m_table->s->db.str; } #endif @@ -3049,6 +3079,7 @@ class Rows_log_event : public virtual binary_log::Rows_event, public Log_event #endif Rows_log_event(const char *row_data, uint event_len, const Format_description_event *description_event); + void uncompress_buf(); #ifdef MYSQL_CLIENT void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name); @@ -3448,6 +3479,25 @@ class Write_rows_log_event : public Rows_log_event, #endif }; +class Write_rows_compressed_log_event : public Write_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Write_rows_compressed_log_event(THD*, TABLE*, const Table_id& table_id, + bool is_transactional, + const uchar* extra_row_info); + virtual bool write(IO_CACHE* file); +#endif +#ifdef HAVE_REPLICATION + Write_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#ifdef MYSQL_CLIENT + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; + /** @class Update_rows_log_event @@ -3548,6 +3598,25 @@ class Update_rows_log_event : public Rows_log_event, #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */ }; +class Update_rows_compressed_log_event : public Update_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Update_rows_compressed_log_event(THD*, TABLE*, const Table_id& table_id, + bool is_transactional, + const uchar* extra_row_info); + virtual bool write(IO_CACHE* file); +#endif +#ifdef HAVE_REPLICATION + Update_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#ifdef MYSQL_CLIENT + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; + /** @class Delete_rows_log_event @@ -3642,6 +3711,26 @@ class Delete_rows_log_event : public Rows_log_event, #endif }; +class Delete_rows_compressed_log_event : public Delete_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Delete_rows_compressed_log_event(THD*, TABLE*, const Table_id& table_id, + bool is_transactional, + const uchar* extra_row_info); + virtual bool write(IO_CACHE* file); +#endif +#ifdef HAVE_REPLICATION + Delete_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: + virtual Log_event_type get_type_code() { return binary_log::DELETE_ROWS_COMPRESSED_EVENT; } +#ifdef MYSQL_CLIENT + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; + #include "log_event_old.h" @@ -4494,6 +4583,21 @@ size_t my_strmov_quoted_identifier_helper(int q, char *buffer, const char* identifier, size_t length); +int binlog_buf_compress(const char *src, char *dst, size_t len, + size_t *comlen); +int binlog_buf_uncompress(const char *src, char *dst, size_t len, + size_t *newlen); +size_t binlog_get_compress_len(size_t len); +size_t binlog_get_uncompress_len(const char *buf); + +int query_event_uncompress(const Format_description_log_event *description_event, + bool contain_checksum, const char *src, char **dst, ulong *newlen); + +int Row_log_event_uncompress(const Format_description_log_event *description_event, + bool contain_checksum, + const char *src, char **dst, ulong *newlen); + + /** @} (end of group Replication) */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 4cbc748a80a..61a991b8594 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -304,6 +304,8 @@ LEX_STRING opt_init_connect, opt_init_slave; /* Global variables */ bool opt_bin_log, opt_ignore_builtin_innodb= 0; +bool opt_bin_log_compress; +uint opt_bin_log_compress_min_len; bool opt_general_log, opt_slow_log, opt_general_log_raw; ulonglong log_output_options; my_bool opt_log_queries_not_using_indexes= 0; diff --git a/sql/mysqld.h b/sql/mysqld.h index 3dea6ae4db0..3e1a0722585 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -120,6 +120,8 @@ extern CHARSET_INFO *character_set_filesystem; extern MY_BITMAP temp_pool; extern bool opt_large_files, server_id_supplied; extern bool opt_update_log, opt_bin_log; +extern bool opt_bin_log_compress; +extern uint opt_bin_log_compress_min_len; extern my_bool opt_log_slave_updates; extern my_bool opt_log_unsafe_statements; extern bool opt_general_log, opt_slow_log, opt_general_log_raw; diff --git a/sql/rpl_slave.cc b/sql/rpl_slave.cc index 985abc09e69..ad185f09644 100644 --- a/sql/rpl_slave.cc +++ b/sql/rpl_slave.cc @@ -5233,7 +5233,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) */ DBUG_EXECUTE_IF("incomplete_group_in_relay_log", if ((ev->get_type_code() == binary_log::XID_EVENT) || - ((ev->get_type_code() == binary_log::QUERY_EVENT) && + (((ev->get_type_code() == binary_log::QUERY_EVENT) || + (ev->get_type_code() == binary_log::QUERY_COMPRESSED_EVENT)) && strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) { DBUG_ASSERT(thd->get_transaction()->cannot_safely_rollback( @@ -5926,7 +5927,8 @@ ignore_log_space_limit=%d", thd->killed= THD::KILLED_NO_VALUE; ); DBUG_EXECUTE_IF("stop_io_after_reading_query_log_event", - if (event_buf[EVENT_TYPE_OFFSET] == binary_log::QUERY_EVENT) + if (event_buf[EVENT_TYPE_OFFSET] == binary_log::QUERY_EVENT || + event_buf[EVENT_TYPE_OFFSET] == binary_log::QUERY_COMPRESSED_EVENT) thd->killed= THD::KILLED_NO_VALUE; ); DBUG_EXECUTE_IF("stop_io_after_reading_user_var_log_event", @@ -5942,7 +5944,8 @@ ignore_log_space_limit=%d", thd->killed= THD::KILLED_NO_VALUE; ); DBUG_EXECUTE_IF("stop_io_after_reading_write_rows_log_event", - if (event_buf[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) + if (event_buf[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT || + event_buf[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_COMPRESSED_EVENT) thd->killed= THD::KILLED_NO_VALUE; ); DBUG_EXECUTE_IF("stop_io_after_reading_unknown_event", @@ -8072,6 +8075,7 @@ bool queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); ulong s_id; int lock_count= 0; + bool compressed_event = FALSE; /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -8536,6 +8540,42 @@ bool queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + case binary_log::QUERY_COMPRESSED_EVENT: + { + inc_pos = event_len; + if (query_event_uncompress(mi->get_mi_description_event(), checksum_alg, buf, (char **)&buf, &event_len)) + { + char errbuf[1024]; + char llbuf[22]; + sprintf(errbuf, "master log_pos: %s", llstr(mi->get_master_log_pos(), llbuf)); + mi->report(ERROR_LEVEL, ER_BINLOG_UNCOMPRESS_ERROR, + ER(ER_BINLOG_UNCOMPRESS_ERROR), errbuf); + goto err; + } + compressed_event = true; + } + break; + case binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT: + { + inc_pos = event_len; + if (Row_log_event_uncompress(mi->get_mi_description_event(), checksum_alg, buf, (char **)&buf, &event_len)) + { + char errbuf[1024]; + char llbuf[22]; + sprintf(errbuf, "master log_pos: %s", llstr(mi->get_master_log_pos(), llbuf)); + mi->report(ERROR_LEVEL, ER_BINLOG_UNCOMPRESS_ERROR, + ER(ER_BINLOG_UNCOMPRESS_ERROR), errbuf); + goto err; + } + compressed_event = true; + } + break; + case binary_log::ANONYMOUS_GTID_LOG_EVENT: /* This cannot normally happen, because the master has a check that @@ -8728,6 +8768,8 @@ bool queue_event(Master_info* mi,const char* buf, ulong event_len) if (lock_count >= 2) mysql_mutex_unlock(log_lock); DBUG_PRINT("info", ("error: %d", error)); + if (compressed_event) + my_free((void *)buf); DBUG_RETURN(error); } diff --git a/sql/rpl_trx_boundary_parser.cc b/sql/rpl_trx_boundary_parser.cc index cb634ae3ed2..7e2806952eb 100644 --- a/sql/rpl_trx_boundary_parser.cc +++ b/sql/rpl_trx_boundary_parser.cc @@ -126,6 +126,7 @@ Transaction_boundary_parser::get_event_boundary_type( ROLLBACK and the rest. */ case binary_log::QUERY_EVENT: + case binary_log::QUERY_COMPRESSED_EVENT: { char *query= NULL; size_t qlen= 0; @@ -200,9 +201,15 @@ Transaction_boundary_parser::get_event_boundary_type( case binary_log::WRITE_ROWS_EVENT: case binary_log::UPDATE_ROWS_EVENT: case binary_log::DELETE_ROWS_EVENT: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT: case binary_log::WRITE_ROWS_EVENT_V1: case binary_log::UPDATE_ROWS_EVENT_V1: case binary_log::DELETE_ROWS_EVENT_V1: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1: case binary_log::PRE_GA_WRITE_ROWS_EVENT: case binary_log::PRE_GA_DELETE_ROWS_EVENT: case binary_log::PRE_GA_UPDATE_ROWS_EVENT: diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 33b8850b1cd..048f318f7c4 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -7774,6 +7774,8 @@ ER_XA_REPLICATION_FILTERS ER_CANT_OPEN_ERROR_LOG eng "Could not open file '%s' for error logging%s%s" +ER_BINLOG_UNCOMPRESS_ERROR + eng "Uncompress the compressed binlog failed" # # End of 5.7 error messages. # diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index e2ece7a5a8c..5c7b17a7968 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -68,9 +68,15 @@ static int check_event_type(int type, Relay_log_info *rli) case binary_log::WRITE_ROWS_EVENT: case binary_log::UPDATE_ROWS_EVENT: case binary_log::DELETE_ROWS_EVENT: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT: case binary_log::WRITE_ROWS_EVENT_V1: case binary_log::UPDATE_ROWS_EVENT_V1: case binary_log::DELETE_ROWS_EVENT_V1: + case binary_log::WRITE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::UPDATE_ROWS_COMPRESSED_EVENT_V1: + case binary_log::DELETE_ROWS_COMPRESSED_EVENT_V1: case binary_log::PRE_GA_WRITE_ROWS_EVENT: case binary_log::PRE_GA_UPDATE_ROWS_EVENT: case binary_log::PRE_GA_DELETE_ROWS_EVENT: diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 5e5b55cc39f..0cd9e2810e8 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1834,6 +1834,17 @@ static Sys_var_mybool Sys_log_bin( "log_bin", "Whether the binary log is enabled", READ_ONLY GLOBAL_VAR(opt_bin_log), NO_CMD_LINE, DEFAULT(FALSE)); +static Sys_var_mybool Sys_log_bin_compress( + "log_bin_compress", "Whether the binary log can be compressed", + GLOBAL_VAR(opt_bin_log_compress), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +static Sys_var_uint Sys_log_bin_compress_min_len( + "log_bin_compress_min_len", + "Minimum length of sql statement(in statement mode) or record(in row mode)" + "that can be compressed.", + GLOBAL_VAR(opt_bin_log_compress_min_len), + CMD_LINE(OPT_ARG), VALID_RANGE(10, 1024), DEFAULT(256), BLOCK_SIZE(1)); + static bool transaction_write_set_check(sys_var *self, THD *thd, set_var *var) { #ifdef HAVE_REPLICATION