=== modified file 'plugin/semisync/semisync_master_plugin.cc' --- plugin/semisync/semisync_master_plugin.cc revid:venkatesh.duggirala@oracle.com-20140125113609-gkot2r5wlzfeb9mg +++ plugin/semisync/semisync_master_plugin.cc 2014-02-12 03:57:31 +0000 @@ -76,13 +76,18 @@ { /* One more semi-sync slave */ repl_semisync.add_slave(); - + /* Tell server it will observe the transmission.*/ + param->set_observe_flag(); + /* Let's assume this semi-sync slave has already received all binlog events before the filename and position it requests. */ repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos); } + else + param->set_dont_observe_flag(); + sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", semi_sync_slave ? "semi-sync" : "asynchronous", param->server_id, log_file, (unsigned long)log_pos); === modified file 'sql/replication.h' --- sql/replication.h revid:venkatesh.duggirala@oracle.com-20140125113609-gkot2r5wlzfeb9mg +++ sql/replication.h 2014-02-12 03:57:31 +0000 @@ -1,4 +1,4 @@ -/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. +/* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -139,6 +139,23 @@ typedef struct Binlog_transmit_param { uint32 server_id; uint32 flags; + /* Let us keep 1-16 as output flags and 17-32 as input flags */ + static const uint32 F_OBSERVE= 1; + static const uint32 F_DONT_OBSERVE= 2; + + void set_observe_flag() { flags|= F_OBSERVE; } + void set_dont_observe_flag() { flags|= F_DONT_OBSERVE; } + /** + If F_OBSERVE is set by any plugin, then it should observe binlog + transmission, even F_DONT_OBSERVE is set by some plugins. + + If both F_OBSERVE and F_DONT_OBSERVE are not set, then it is an old + plugin. In this case, it should always observe binlog transmission. + */ + bool should_observe() + { + return (flags & F_OBSERVE) || !(flags & F_DONT_OBSERVE); + } } Binlog_transmit_param; /** === modified file 'sql/rpl_handler.cc' --- sql/rpl_handler.cc revid:venkatesh.duggirala@oracle.com-20140125113609-gkot2r5wlzfeb9mg +++ sql/rpl_handler.cc 2014-02-12 03:57:31 +0000 @@ -1,4 +1,4 @@ -/* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. +/* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -264,13 +264,15 @@ #ifdef HAVE_REPLICATION int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, const char *log_file, - my_off_t log_pos) + my_off_t log_pos, + bool *observe_transmission) { Binlog_transmit_param param; param.flags= flags; int ret= 0; FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos)); + *observe_transmission= param.should_observe(); return ret; } @@ -279,6 +281,8 @@ Binlog_transmit_param param; param.flags= flags; + DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE();); + int ret= 0; FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m)); return ret; @@ -299,6 +303,8 @@ param.flags= flags; param.server_id= thd->server_id; + DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE();); + int ret= 0; read_lock(); Observer_info_iterator iter= observer_info_iter(); @@ -344,6 +350,8 @@ Binlog_transmit_param param; param.flags= flags; + DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE();); + int ret= 0; FOREACH_OBSERVER(ret, before_send_event, thd, (¶m, (uchar *)packet->c_ptr(), @@ -360,6 +368,8 @@ Binlog_transmit_param param; param.flags= flags; + DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE();); + int ret= 0; FOREACH_OBSERVER(ret, after_send_event, thd, (¶m, packet->c_ptr(), packet->length(), === modified file 'sql/rpl_handler.h' --- sql/rpl_handler.h revid:venkatesh.duggirala@oracle.com-20140125113609-gkot2r5wlzfeb9mg +++ sql/rpl_handler.h 2014-02-12 03:57:31 +0000 @@ -1,4 +1,4 @@ -/* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. +/* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -215,7 +215,8 @@ typedef Binlog_transmit_observer Observer; int transmit_start(THD *thd, ushort flags, - const char *log_file, my_off_t log_pos); + const char *log_file, my_off_t log_pos, + bool *observe_transmission); int transmit_stop(THD *thd, ushort flags); int reserve_header(THD *thd, ushort flags, String *packet); int before_send_event(THD *thd, ushort flags, === modified file 'sql/rpl_master.cc' --- sql/rpl_master.cc revid:venkatesh.duggirala@oracle.com-20140125113609-gkot2r5wlzfeb9mg +++ sql/rpl_master.cc 2014-02-12 03:57:31 +0000 @@ -1,4 +1,4 @@ -/* Copyright (c) 2010, 2013, Oracle and/or its affiliates. All rights reserved. +/* Copyright (c) 2010, 2014, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -422,7 +422,8 @@ should be called before store the event data to the packet buffer. */ static int reset_transmit_packet(THD *thd, ushort flags, - ulong *ev_offset, const char **errmsg) + ulong *ev_offset, const char **errmsg, + bool observe_transmission) { int ret= 0; String *packet= &thd->packet; @@ -431,7 +432,8 @@ packet->length(0); packet->set("\0", 1, &my_charset_bin); - if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) + if (observe_transmission && + RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) { *errmsg= "Failed to run hook 'reserve_header'"; my_errno= ER_UNKNOWN_ERROR; @@ -653,7 +655,8 @@ static int send_last_skip_group_heartbeat(THD *thd, NET* net, String *packet, const struct event_coordinates *last_skip_coord, ulong *ev_offset, - uint8 checksum_alg_arg, const char **errmsg) + uint8 checksum_alg_arg, const char **errmsg, + bool observe_transmission) { DBUG_ENTER("send_last_skip_group_heartbeat"); String save_packet; @@ -662,7 +665,7 @@ /* Save the current read packet */ save_packet.swap(*packet); - if (reset_transmit_packet(thd, 0, ev_offset, errmsg)) + if (reset_transmit_packet(thd, 0, ev_offset, errmsg, observe_transmission)) DBUG_RETURN(-1); /* Send heart beat event to the slave to update slave threads coordinates */ @@ -903,6 +906,8 @@ /* Coordinates of the last skip group */ LOG_POS_COORD last_skip_coord_buf= {last_skip_log_name, BIN_LOG_HEADER_SIZE}, *p_last_skip_coord= &last_skip_coord_buf; + bool observe_transmission= false; + const uint32 NO_FLAG= 0; if (heartbeat_period != LL(0)) { @@ -912,7 +917,8 @@ if (log_warnings > 1) sql_print_information("Start binlog_dump to master_thread_id(%lu) slave_server(%u), pos(%s, %lu)", thd->thread_id, thd->server_id, log_ident, (ulong)pos); - if (RUN_HOOK(binlog_transmit, transmit_start, (thd, 0/*flags*/, log_ident, pos))) + if (RUN_HOOK(binlog_transmit, transmit_start, + (thd, NO_FLAG, log_ident, pos, &observe_transmission))) { errmsg= "Failed to run hook 'transmit_start'"; my_errno= ER_UNKNOWN_ERROR; @@ -992,7 +998,8 @@ } /* reset transmit packet for the fake rotate event below */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg)) + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission)) GOTO_ERR; /* @@ -1054,7 +1061,8 @@ { /* reset transmit packet for the event read from binary log file */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg)) + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission)) GOTO_ERR; /* @@ -1151,7 +1159,8 @@ /* reset the transmit packet for the event read from binary log file */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg)) + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission)) GOTO_ERR; DBUG_EXECUTE_IF("semi_sync_3-way_deadlock", { @@ -1308,8 +1317,9 @@ event_type, searching_first_gtid, skip_group, log_file_name, my_b_tell(&log))); pos = my_b_tell(&log); - if (RUN_HOOK(binlog_transmit, before_send_event, - (thd, 0/*flags*/, packet, log_file_name, pos))) + if (observe_transmission && + RUN_HOOK(binlog_transmit, before_send_event, + (thd, NO_FLAG, packet, log_file_name, pos))) { my_errno= ER_UNKNOWN_ERROR; errmsg= "run 'before_send_event' hook failed"; @@ -1339,7 +1349,7 @@ if (send_last_skip_group_heartbeat(thd, net, packet, p_last_skip_coord, &ev_offset, current_checksum_alg, - &errmsg)) + &errmsg, observe_transmission)) { GOTO_ERR; } @@ -1375,8 +1385,9 @@ } } - if (RUN_HOOK(binlog_transmit, after_send_event, (thd, 0/*flags*/, packet, - log_file_name, skip_group ? pos : 0))) + if (observe_transmission && + RUN_HOOK(binlog_transmit, after_send_event, + (thd, NO_FLAG, packet, log_file_name, skip_group ? pos : 0))) { errmsg= "Failed to run hook 'after_send_event'"; my_errno= ER_UNKNOWN_ERROR; @@ -1384,7 +1395,8 @@ } /* reset transmit packet for next loop */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg)) + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission)) GOTO_ERR; } @@ -1441,7 +1453,8 @@ /* reset the transmit packet for the event read from binary log file */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg)) + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission)) GOTO_ERR; /* @@ -1485,7 +1498,8 @@ Suicide on failure, since if it happens the entire purpose of the test is comprimised. */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg) || + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission) || send_heartbeat_event(net, packet, p_coord, current_checksum_alg)) DBUG_SUICIDE(); }); @@ -1520,7 +1534,8 @@ { if (send_last_skip_group_heartbeat(thd, net, packet, p_coord, &ev_offset, - current_checksum_alg, &errmsg)) + current_checksum_alg, &errmsg, + observe_transmission)) { thd->EXIT_COND(&old_stage); GOTO_ERR; @@ -1542,7 +1557,8 @@ } #endif /* reset transmit packet for the heartbeat event */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg)) + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission)) { thd->EXIT_COND(&old_stage); GOTO_ERR; @@ -1648,16 +1664,18 @@ /* If the last group was skipped, send a HB event */ if (last_skip_group && send_last_skip_group_heartbeat(thd, net, packet, - p_last_skip_coord, &ev_offset, - current_checksum_alg, &errmsg)) + p_last_skip_coord, &ev_offset, + current_checksum_alg, &errmsg, + observe_transmission)) { GOTO_ERR; } THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave); pos = my_b_tell(&log); - if (RUN_HOOK(binlog_transmit, before_send_event, - (thd, 0/*flags*/, packet, log_file_name, pos))) + if (observe_transmission && + RUN_HOOK(binlog_transmit, before_send_event, + (thd, NO_FLAG, packet, log_file_name, pos))) { my_errno= ER_UNKNOWN_ERROR; errmsg= "run 'before_send_event' hook failed"; @@ -1684,8 +1702,10 @@ if(!goto_next_binlog) { - if (RUN_HOOK(binlog_transmit, after_send_event, (thd, 0/*flags*/, packet, - log_file_name, skip_group ? pos : 0))) + if (observe_transmission && + RUN_HOOK(binlog_transmit, after_send_event, + (thd, NO_FLAG, packet, log_file_name, + skip_group ? pos : 0))) { my_errno= ER_UNKNOWN_ERROR; errmsg= "Failed to run hook 'after_send_event'"; @@ -1721,7 +1741,8 @@ mysql_file_close(file, MYF(MY_WME)); /* reset transmit packet for the possible fake rotate event */ - if (reset_transmit_packet(thd, 0/*flags*/, &ev_offset, &errmsg)) + if (reset_transmit_packet(thd, NO_FLAG, &ev_offset, &errmsg, + observe_transmission)) GOTO_ERR; /* @@ -1757,7 +1778,7 @@ end_io_cache(&log); mysql_file_close(file, MYF(MY_WME)); - (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, 0/*flags*/)); + (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, NO_FLAG)); my_eof(thd); THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); mysql_mutex_lock(&LOCK_thread_count); @@ -1789,7 +1810,7 @@ error_text[sizeof(error_text) - 1]= '\0'; } end_io_cache(&log); - (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, 0/*flags*/)); + (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, NO_FLAG)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through