From aa3a7993a761271d65eabed7f3a30484533cd8de Mon Sep 17 00:00:00 2001 From: willhan Date: Wed, 5 Jun 2019 13:50:51 +0800 Subject: [PATCH] Control the Maximum speed(KB/s) to read binlog from master --- include/mysql.h.pp | 1 + include/mysql_com.h | 1 + include/sql_common.h | 3 ++ mysql-test/r/mysqld--help-notwin.result | 4 ++ .../suite/perfschema/r/show_sanity.result | 2 + mysql-test/suite/sys_vars/r/all_vars.result | 2 + sql-common/client.c | 16 +++++- sql/net_serv.cc | 14 ++++- sql/rpl_slave.cc | 54 +++++++++++++++++-- sql/rpl_slave.h | 1 + sql/sys_vars.cc | 6 +++ 11 files changed, 98 insertions(+), 6 deletions(-) diff --git a/include/mysql.h.pp b/include/mysql.h.pp index 7db03d5dd59..139bc382fe6 100644 --- a/include/mysql.h.pp +++ b/include/mysql.h.pp @@ -136,6 +136,7 @@ const unsigned char *packet, size_t len); my_bool net_write_packet(NET *net, const unsigned char *packet, size_t length); unsigned long my_net_read(NET *net); +unsigned long my_net_read_packet_reallen(NET *net, unsigned long *reallen); struct rand_struct { unsigned long seed1,seed2,max_value; double max_value_dbl; diff --git a/include/mysql_com.h b/include/mysql_com.h index 9ce72fbaa19..999f63c6adb 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -501,6 +501,7 @@ my_bool net_write_command(NET *net,unsigned char command, const unsigned char *packet, size_t len); my_bool net_write_packet(NET *net, const unsigned char *packet, size_t length); unsigned long my_net_read(NET *net); +unsigned long my_net_read_packet_reallen(NET *net, unsigned long *reallen); #ifdef MY_GLOBAL_INCLUDED void my_net_set_write_timeout(NET *net, uint timeout); diff --git a/include/sql_common.h b/include/sql_common.h index 2fe33dac26e..f32d5e5b267 100644 --- a/include/sql_common.h +++ b/include/sql_common.h @@ -173,8 +173,11 @@ cli_advanced_command(MYSQL *mysql, enum enum_server_command command, const unsigned char *arg, size_t arg_length, my_bool skip_check, MYSQL_STMT *stmt); unsigned long cli_safe_read(MYSQL *mysql, my_bool *is_data_packet); +unsigned long cli_safe_read_reallen(MYSQL *mysql, my_bool *is_data_packet, unsigned long *reallen); unsigned long cli_safe_read_with_ok(MYSQL *mysql, my_bool parse_ok, my_bool *is_data_packet); +unsigned long cli_safe_read_reallen_with_ok(MYSQL *mysql, my_bool parse_ok, + my_bool *is_data_packet, unsigned long *reallen); void net_clear_error(NET *net); void set_stmt_errmsg(MYSQL_STMT *stmt, NET *net); void set_stmt_error(MYSQL_STMT *stmt, int errcode, const char *sqlstate, diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index a87c4634d28..f61a7ab80d0 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -919,6 +919,9 @@ The following options may be given as the first argument: optimization of a query, index range scan will not be considered for this query. A value of 0 means range optimizer does not have any cap on memory. + --read-binlog-speed-limit=# + Maximum speed(KB/s) to read binlog from master (0 = no + limit) --read-buffer-size=# Each thread that does a sequential scan allocates a buffer of this size for each table it scans. If you do @@ -1540,6 +1543,7 @@ query-cache-wlock-invalidate FALSE query-prealloc-size 8192 range-alloc-block-size 4096 range-optimizer-max-mem-size 8388608 +read-binlog-speed-limit 0 read-buffer-size 131072 read-only FALSE read-rnd-buffer-size 262144 diff --git a/mysql-test/suite/perfschema/r/show_sanity.result b/mysql-test/suite/perfschema/r/show_sanity.result index 71bc92a2c2f..91084e15e63 100644 --- a/mysql-test/suite/perfschema/r/show_sanity.result +++ b/mysql-test/suite/perfschema/r/show_sanity.result @@ -414,6 +414,7 @@ SHOW_MODE SOURCE VARIABLE_NAME 5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED 5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS 5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG +5.6 I_S.SESSION_VARIABLES READ_BINLOG_SPEED_LIMIT 5.6 I_S.SESSION_VARIABLES TLS_VERSION ================================================================================ @@ -442,6 +443,7 @@ SHOW_MODE SOURCE VARIABLE_NAME 5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED 5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS 5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG +5.6 I_S.SESSION_VARIABLES READ_BINLOG_SPEED_LIMIT 5.6 I_S.SESSION_VARIABLES TLS_VERSION ================================================================================ diff --git a/mysql-test/suite/sys_vars/r/all_vars.result b/mysql-test/suite/sys_vars/r/all_vars.result index 5ddc39a3594..067bf99af43 100644 --- a/mysql-test/suite/sys_vars/r/all_vars.result +++ b/mysql-test/suite/sys_vars/r/all_vars.result @@ -17,6 +17,8 @@ DISABLED_STORAGE_ENGINES DISABLED_STORAGE_ENGINES KEYRING_OPERATIONS KEYRING_OPERATIONS +READ_BINLOG_SPEED_LIMIT +READ_BINLOG_SPEED_LIMIT TLS_VERSION TLS_VERSION drop table t1; diff --git a/sql-common/client.c b/sql-common/client.c index ad34f480fbd..4f1513ac515 100644 --- a/sql-common/client.c +++ b/sql-common/client.c @@ -1048,6 +1048,14 @@ void read_ok_ex(MYSQL *mysql, ulong length) ulong cli_safe_read_with_ok(MYSQL *mysql, my_bool parse_ok, my_bool *is_data_packet) +{ + ulong reallen = 0; + return cli_safe_read_reallen_with_ok(mysql, parse_ok, is_data_packet, &reallen); +} +ulong +cli_safe_read_reallen_with_ok(MYSQL *mysql, my_bool parse_ok, + my_bool *is_data_packet, + ulong *reallen) { NET *net= &mysql->net; ulong len=0; @@ -1058,7 +1066,8 @@ cli_safe_read_with_ok(MYSQL *mysql, my_bool parse_ok, *is_data_packet= FALSE; if (net->vio != 0) - len=my_net_read(net); + len=my_net_read_packet_reallen(net, reallen); + if (len == packet_error || len == 0) { @@ -1193,6 +1202,11 @@ ulong cli_safe_read(MYSQL *mysql, my_bool *is_data_packet) { return cli_safe_read_with_ok(mysql, 0, is_data_packet); } +ulong cli_safe_read_reallen(MYSQL *mysql, my_bool *is_data_packet, ulong *reallen) +{ + return cli_safe_read_reallen_with_ok(mysql, 0, is_data_packet, reallen); +} + void free_rows(MYSQL_DATA *cur) diff --git a/sql/net_serv.cc b/sql/net_serv.cc index a6bc8570f2e..98d7908f98b 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -887,11 +887,19 @@ static size_t net_read_packet(NET *net, size_t *complen) ulong my_net_read(NET *net) +{ + ulong reallen = 0; + return my_net_read_packet_reallen(net, &reallen); +} + +ulong +my_net_read_packet_reallen(NET *net, ulong* reallen) { size_t len, complen; MYSQL_NET_READ_START(); + *reallen = 0; #ifdef HAVE_COMPRESS if (!net->compress) { @@ -914,7 +922,10 @@ my_net_read(NET *net) } net->read_pos = net->buff + net->where_b; if (len != packet_error) - net->read_pos[len]=0; /* Safeguard for mysql_use_result */ + { + net->read_pos[len] = 0; /* Safeguard for mysql_use_result */ + *reallen = len; + } MYSQL_NET_READ_DONE(0, len); return static_cast(len); #ifdef HAVE_COMPRESS @@ -1020,6 +1031,7 @@ my_net_read(NET *net) return packet_error; } buf_length+= complen; + *reallen+= packet_len; } net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE; diff --git a/sql/rpl_slave.cc b/sql/rpl_slave.cc index 985abc09e69..f2d2f8da917 100644 --- a/sql/rpl_slave.cc +++ b/sql/rpl_slave.cc @@ -79,6 +79,7 @@ char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; char* slave_load_tmpdir = 0; my_bool replicate_same_server_id; ulonglong relay_log_space_limit = 0; +ulonglong opt_read_binlog_speed_limit = 0; const char *relay_log_index= 0; const char *relay_log_basename= 0; @@ -4463,13 +4464,16 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, try a reconnect. We do not want to print anything to the error log in this case because this a anormal event in an idle server. + network_read_len get the real network read length in VIO, especially + using compressed protocol RETURN VALUES 'packet_error' Error number Length of packet */ -static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) +static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, + ulong* network_read_len) { ulong len; DBUG_ENTER("read_event"); @@ -4484,7 +4488,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) DBUG_RETURN(packet_error); #endif - len= cli_safe_read(mysql, NULL); + len= cli_safe_read_reallen(mysql, NULL, network_read_len); if (len == packet_error || (long) len < 1) { if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) @@ -5761,9 +5765,11 @@ requesting master dump") || const char *event_buf; DBUG_ASSERT(mi->last_error().number == 0); + ulonglong lastchecktime = my_micro_time(); + ulonglong tokenamount = opt_read_binlog_speed_limit * 1024; while (!io_slave_killed(thd,mi)) { - ulong event_len; + ulong event_len, network_read_len = 0; /* We say "waiting" because read_event() will wait if there's nothing to read. But if there's something to read, it will not wait. The @@ -5771,7 +5777,7 @@ requesting master dump") || we're in fact receiving nothing. */ THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); - event_len= read_event(mysql, mi, &suppress_warnings); + event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len); if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ reading event")) goto err; @@ -5834,6 +5840,46 @@ Stopping slave I/O thread due to out-of-memory error from master"); goto err; } + /* Control the binlog read speed of master + when read_binlog_speed_limit is non-zero + */ + ulonglong speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024; + if (speed_limit_in_bytes) + { + /* Prevent the tokenamount become a large value, + for example, the IO thread doesn't work for a long time + */ + if (tokenamount > speed_limit_in_bytes * 2) + { + lastchecktime = my_micro_time(); + tokenamount = speed_limit_in_bytes * 2; + } + + do + { + ulonglong currenttime = my_micro_time(); + tokenamount += (currenttime - lastchecktime) * speed_limit_in_bytes / (1000 * 1000); + lastchecktime = currenttime; + if (tokenamount < network_read_len) + { + ulonglong micro_time = 1000 * 1000 * (network_read_len - tokenamount) / speed_limit_in_bytes; + ulonglong second_time = micro_time / (1000 * 1000); + micro_time = micro_time % (1000 * 1000); + + // at least sleep 1000 micro second + my_sleep(micro_time > 1000 ? micro_time : 1000); + + /* + If it sleep more than one second, + it should use slave_sleep() to avoid the STOP SLAVE hang. + */ + if (second_time) + slave_sleep(thd, second_time, io_slave_killed, mi); + } + } while (tokenamount < network_read_len); + tokenamount -= network_read_len; + } + /* XXX: 'synced' should be updated by queue_event to indicate whether event has been synced to disk */ bool synced= 0; diff --git a/sql/rpl_slave.h b/sql/rpl_slave.h index 42dfde18f3f..4091eeac81f 100644 --- a/sql/rpl_slave.h +++ b/sql/rpl_slave.h @@ -267,6 +267,7 @@ extern my_bool opt_skip_slave_start, opt_reckless_slave; extern my_bool opt_log_slave_updates; extern char *opt_slave_skip_errors; extern ulonglong relay_log_space_limit; +extern ulonglong opt_read_binlog_speed_limit; extern const char *relay_log_index; extern const char *relay_log_basename; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 5e5b55cc39f..9cf54412ef1 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -5316,6 +5316,12 @@ static Sys_var_ulonglong Sys_relay_log_space_limit( READ_ONLY GLOBAL_VAR(relay_log_space_limit), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1)); +static Sys_var_ulonglong Sys_read_binlog_speed_limit( + "read_binlog_speed_limit", "Maximum speed(KB/s) to read binlog from" + " master (0 = no limit)", + GLOBAL_VAR(opt_read_binlog_speed_limit), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1)); + static Sys_var_uint Sys_sync_relaylog_period( "sync_relay_log", "Synchronously flush relay log to disk after " "every #th event. Use 0 to disable synchronous flushing",