--- a/sql/sql_lex.h 2011-11-20 19:13:54.000000000 +0800 +++ b/sql/sql_lex.h 2012-02-02 14:57:39.000000000 +0800 @@ -221,6 +221,7 @@ char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher; char *relay_log_name; ulong relay_log_pos; + char *sign; // Multi-Master By P.Linux } LEX_MASTER_INFO; --- a/sql/sql_yacc.yy 2011-11-20 19:13:54.000000000 +0800 +++ b/sql/sql_yacc.yy 2012-02-03 17:16:23.000000000 +0800 @@ -1737,6 +1737,17 @@ } master_defs {} + /* Multi-Master By P.Linux */ + | CHANGE MASTER_SYM TEXT_STRING_sys TO_SYM + { + LEX *lex = Lex; + lex->sql_command = SQLCOM_CHANGE_MASTER; + bzero((char*) &lex->mi, sizeof(lex->mi)); + lex->mi.sign = $3.str; + } + master_defs + {} + /* End */ ; master_defs: @@ -6281,6 +6292,48 @@ lex->sql_command = SQLCOM_SLAVE_STOP; lex->type = 0; } + /* Multi-Master By P.Linux */ + | START_SYM SLAVE TEXT_STRING_sys slave_thread_opts + { + LEX *lex=Lex; + lex->sql_command = SQLCOM_SLAVE_START; + lex->type = 0; + /* We'll use mi structure for UNTIL options */ + bzero((char*) &lex->mi, sizeof(lex->mi)); + lex->mi.sign = $3.str; + /* If you change this code don't forget to update SLAVE START too */ + } + slave_until + {} + | STOP_SYM SLAVE TEXT_STRING_sys slave_thread_opts + { + LEX *lex=Lex; + lex->sql_command = SQLCOM_SLAVE_STOP; + lex->type = 0; + bzero((char*) &lex->mi, sizeof(lex->mi)); + lex->mi.sign = $3.str; + /* If you change this code don't forget to update SLAVE STOP too */ + } + | SLAVE TEXT_STRING_sys START_SYM slave_thread_opts + { + LEX *lex=Lex; + lex->sql_command = SQLCOM_SLAVE_START; + lex->type = 0; + /* We'll use mi structure for UNTIL options */ + bzero((char*) &lex->mi, sizeof(lex->mi)); + lex->mi.sign = $2.str; + } + slave_until + {} + | SLAVE TEXT_STRING_sys STOP_SYM slave_thread_opts + { + LEX *lex=Lex; + lex->sql_command = SQLCOM_SLAVE_STOP; + lex->type = 0; + bzero((char*) &lex->mi, sizeof(lex->mi)); + lex->mi.sign = $2.str; + } + /* End */ ; start: @@ -10388,11 +10441,29 @@ { Lex->sql_command = SQLCOM_SHOW_SLAVE_STAT; } + /* Multi-Master By P.Linux */ + | SLAVE TEXT_STRING_sys STATUS_SYM // Multi-Master By P.Linux + { + LEX *lex = Lex; + lex->sql_command = SQLCOM_SHOW_SLAVE_STAT; + bzero((char*) &lex->mi, sizeof(lex->mi)); + lex->mi.sign = $2.str; + } + /* End */ /* SHOW SLAVE STATUS NOLOCK */ | SLAVE STATUS_SYM NOLOCK_SYM { Lex->sql_command = SQLCOM_SHOW_SLAVE_NOLOCK_STAT; //SQLCOM_SHOW_SLAVE_NOLOCK_STAT; } + /* Multi-Master By P.Linux */ + | SLAVE TEXT_STRING_sys STATUS_SYM NOLOCK_SYM + { + LEX *lex = Lex; + lex->sql_command = SQLCOM_SHOW_SLAVE_NOLOCK_STAT; //SQLCOM_SHOW_SLAVE_NOLOCK_STAT; + bzero((char*) &lex->mi, sizeof(lex->mi)); + lex->mi.sign = $2.str; + } + /* End */ | CLIENT_STATS_SYM wild_and_where { LEX *lex= Lex; --- a/sql/sql_repl.cc 2011-11-20 19:13:54.000000000 +0800 +++ b/sql/sql_repl.cc 2012-02-13 19:56:32.000000000 +0800 @@ -1185,8 +1185,34 @@ thd_proc_info(thd, "Changing master"); LEX_MASTER_INFO* lex_mi= &thd->lex->mi; + /* Multi-Master By P.Linux */ + char buf_master_info_file[FN_REFLEN]; + char buf_relay_log_info_file[FN_REFLEN]; + if (lex_mi->sign) // if use CHANGE MASTER 'xxx' TO + { + strmake(mi->sign, lex_mi->sign, sizeof(mi->sign)-1); + concat_signed_file_name(buf_master_info_file, + master_info_file, ".", mi->sign); + concat_signed_file_name(buf_relay_log_info_file, + relay_log_info_file, ".", mi->sign); + /* if new Master_info not in HASH, add it */ + if (!master_info_index->get_master_info_from_hash(mi->sign)) + { + if (master_info_index->add_master_info_to_hash(mi, TRUE)) + exit(1); + } + sql_print_information("Sign:%s, Master_info:%s, Relay_info:%s", + mi->sign, buf_master_info_file, buf_relay_log_info_file); + } + else // if use CHANGE MASTER TO + { + strncpy(buf_master_info_file, master_info_file, sizeof(buf_master_info_file)-1); + strncpy(buf_relay_log_info_file, relay_log_info_file, sizeof(buf_relay_log_info_file)-1); + } + /* End */ // TODO: see if needs re-write - if (init_master_info(mi, master_info_file, relay_log_info_file, 0, + //if (init_master_info(mi, master_info_file, relay_log_info_file, 0, + if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file, 0, // Multi-Master By P.Linux thread_mask)) { my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0)); --- a/sql/sql_parse.cc 2011-11-20 19:13:54.000000000 +0800 +++ b/sql/sql_parse.cc 2012-02-10 17:07:57.000000000 +0800 @@ -18,6 +18,7 @@ #define MYSQL_LEX 1 #include "mysql_priv.h" #include "sql_repl.h" +#include "rpl_mi.h" #include "rpl_filter.h" #include "repl_failsafe.h" #include @@ -2618,7 +2619,22 @@ if (check_global_access(thd, SUPER_ACL)) goto error; pthread_mutex_lock(&LOCK_active_mi); + /* Multi-Master By P.Linux */ + LEX_MASTER_INFO* lex_mi= &thd->lex->mi; + Master_info *mi= 0; + if (lex_mi->sign != NULL) // if use CHANGE MASTER 'xxx' TO + { + mi= master_info_index->get_master_info_from_hash(lex_mi->sign); + if (mi == NULL) + mi= new Master_info; + res= change_master(thd,mi); + } + else // if use CHANGE MASTER TO + { res = change_master(thd,active_mi); + } + /* End */ + //res = change_master(thd,active_mi); pthread_mutex_unlock(&LOCK_active_mi); break; } @@ -2633,6 +2649,25 @@ { pthread_mutex_lock(&LOCK_active_mi); } + /* Multi-Master By P.Linux */ + LEX_MASTER_INFO* lex_mi= &thd->lex->mi; + Master_info *mi= 0; + if (lex_mi->sign != NULL) // if use SHOW SLAVE 'xxx' STATUS + { + mi= master_info_index->get_master_info_from_hash(lex_mi->sign); + if (mi != NULL) + { + res = show_master_info(thd, mi); + } + else + { + push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + WARN_NO_MASTER_INFO, ER(WARN_NO_MASTER_INFO)); + my_ok(thd); + } + } + else // if use SHOW SLAVE STATUS + { if (active_mi != NULL) { res = show_master_info(thd, active_mi); @@ -2643,6 +2678,18 @@ WARN_NO_MASTER_INFO, ER(WARN_NO_MASTER_INFO)); my_ok(thd); } + } + /* End */ + /*if (active_mi != NULL) + { + res = show_master_info(thd, active_mi); + } + else + { + push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + WARN_NO_MASTER_INFO, ER(WARN_NO_MASTER_INFO)); + my_ok(thd); + }*/ if(do_lock) { pthread_mutex_unlock(&LOCK_active_mi); @@ -3035,7 +3082,26 @@ case SQLCOM_SLAVE_START: { pthread_mutex_lock(&LOCK_active_mi); + /* Multi-Master By P.Linux */ + LEX_MASTER_INFO* lex_mi= &thd->lex->mi; + Master_info *mi= 0; + if (lex_mi->sign != NULL) // if use START SLAVE 'xxx' + { + mi= master_info_index->get_master_info_from_hash(lex_mi->sign); + if (mi == NULL) + { + my_message(ER_BAD_SLAVE, ER(ER_BAD_SLAVE), MYF(0)); + pthread_mutex_unlock(&LOCK_active_mi); + break; + } + start_slave(thd,mi,1); + } + else // if use START SLAVE + { start_slave(thd,active_mi,1 /* net report*/); + } + /* End */ + //start_slave(thd,active_mi,1 /* net report*/); pthread_mutex_unlock(&LOCK_active_mi); break; } @@ -3061,7 +3127,26 @@ } { pthread_mutex_lock(&LOCK_active_mi); + /* Multi-Master By P.Linux */ + LEX_MASTER_INFO* lex_mi= &thd->lex->mi; + Master_info *mi= 0; + if (lex_mi->sign != NULL) // if use START SLAVE 'xxx' + { + mi= master_info_index->get_master_info_from_hash(lex_mi->sign); + if (mi == NULL) + { + my_message(ER_BAD_SLAVE, ER(ER_BAD_SLAVE), MYF(0)); + pthread_mutex_unlock(&LOCK_active_mi); + break; + } + stop_slave(thd,mi,1); + } + else + { stop_slave(thd,active_mi,1/* net report*/); + } + /* End */ + //stop_slave(thd,active_mi,1/* net report*/); pthread_mutex_unlock(&LOCK_active_mi); break; } --- a/sql/rpl_rli.cc 2011-11-20 19:13:53.000000000 +0800 +++ b/sql/rpl_rli.cc 2012-02-13 10:29:39.000000000 +0800 @@ -180,11 +180,29 @@ "use '--relay-log=%s' to avoid this problem.", ln); name_warning_sent= 1; } + /* Multi-Master By P.Linux */ + Master_info* mi= rli->mi; + char *buf_relay_logname= 0; + char *buf_relaylog_index_name= 0; + if (mi->sign != NULL) + { + buf_relay_logname= (char *)my_malloc(FN_REFLEN, MYF(MY_FAE)); + ln= concat_signed_file_name(buf_relay_logname, ln, "-", mi->sign); + + if (opt_relaylog_index_name) + { + buf_relaylog_index_name= (char *)my_malloc(FN_REFLEN, MYF(MY_FAE)); + concat_signed_file_name(buf_relaylog_index_name, + opt_relaylog_index_name, "-", mi->sign); + } + } + /* End */ /* note, that if open() fails, we'll still have index file open but a destructor will take care of that */ - if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE) || + //if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE) || + if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || // Multi-Master By P.Linux rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0, (max_relay_log_size ? max_relay_log_size : max_binlog_size), 1, TRUE)) --- a/sql/rpl_mi.h 2011-11-20 19:13:53.000000000 +0800 +++ b/sql/rpl_mi.h 2012-02-14 10:14:41.000000000 +0800 @@ -68,6 +68,9 @@ char host[HOSTNAME_LENGTH+1]; char user[USERNAME_LENGTH+1]; char password[MAX_PASSWORD_LENGTH+1]; + /* P.Linux */ + char sign[FN_REFLEN]; + /* End*/ my_bool ssl; // enables use of SSL connection if true char ssl_ca[FN_REFLEN], ssl_capath[FN_REFLEN], ssl_cert[FN_REFLEN]; char ssl_cipher[FN_REFLEN], ssl_key[FN_REFLEN]; @@ -113,5 +116,63 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache, bool need_lock_relay_log); + +/* Multi-Master By P.Linux */ +class MASTER_INFO_INDEX +{ + private: + IO_CACHE index_file; + char index_file_name[FN_REFLEN]; + HASH master_info_hash; + + public: + MASTER_INFO_INDEX(); + ~MASTER_INFO_INDEX(); + + bool init_all_master_info(); + bool write_master_sign_to_index_file(const char *sign); + + /* Add a Master_info class to Hash Table */ + bool add_master_info_to_hash(Master_info *mi, bool write_to_file) + { + bool res= my_hash_insert(&master_info_hash, (uchar*) mi); + + if (!res) + { + sql_print_information("[Multi-Master] Add new Master_info '%s' To Hash table.", mi->sign); + if (write_to_file) + write_master_sign_to_index_file(mi->sign); + } + else + { + sql_print_error("[Multi-Master] Create new Master_info '%s' Failed!", mi->sign); + } + return res; + } + + /* Remove a Master_info class From Hash Table */ + bool remove_master_info_from_hash(const char *sign) + { + Master_info* mi= get_master_info_from_hash(sign); + if (mi) + return my_hash_delete(&master_info_hash, (uchar*) mi); + return TRUE; + } + + /* Get a Master_info class via mi->sign */ + Master_info* get_master_info_from_hash(const char *sign) + { + Master_info* mi= (Master_info*) hash_search(&master_info_hash, (uchar*) sign, strlen(sign)); + return mi; + } +}; + +char *concat_signed_file_name(char *res_file_name ,const char *info_file, + const char *separator, const char *sign); + +uchar *get_key_master_info(Master_info *mi, size_t *length, + my_bool not_used __attribute__((unused))); +void free_key_master_info(Master_info *mi); +/* End */ #endif /* HAVE_REPLICATION */ #endif /* RPL_MI_H */ --- a/sql/rpl_mi.cc 2011-11-20 19:13:53.000000000 +0800 +++ b/sql/rpl_mi.cc 2012-02-14 10:16:17.000000000 +0800 @@ -429,5 +429,200 @@ DBUG_VOID_RETURN; } +/* Multi-Master By P.Linux */ +uchar *get_key_master_info(Master_info *mi, size_t *length, + my_bool not_used __attribute__((unused))) +{ + *length = strlen(mi->sign); + return (uchar*)mi->sign; +} + +void free_key_master_info(Master_info *mi) +{ + terminate_slave_threads(mi,SLAVE_FORCE_ALL); + delete mi; + DBUG_VOID_RETURN; +} + +char *concat_signed_file_name(char *res_file_name ,const char *info_file, + const char *separator, const char *sign) +{ + if (!res_file_name || !info_file || + !info_file || !separator || !sign) + { + return NULL; + } + + strcpy(res_file_name, info_file); + strncat(res_file_name, separator, sizeof(res_file_name)-1); + strncat(res_file_name, sign, sizeof(res_file_name)-1); + + return res_file_name; +} + +MASTER_INFO_INDEX::MASTER_INFO_INDEX() +{ + index_file_name[0] = 0; + bzero((char*) &index_file, sizeof(index_file)); + + /* Create Master_info Index File */ + File index_file_nr= -1; + DBUG_ASSERT(!my_b_inited(&index_file)); + + fn_format(index_file_name, master_info_file, mysql_data_home, + ".index", MY_UNPACK_FILENAME | MY_APPEND_EXT); + + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_BINARY , + MYF(MY_WME))) < 0 || + my_sync(index_file_nr, MYF(MY_WME)) || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, READ_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) + { + if (index_file_nr>= 0) + my_close(index_file_nr,MYF(0)); + sql_print_error("[Multi-Master] Create Master Indo Index '%s' Error", index_file_name); + exit(1); + } + sql_print_information("[Multi-Master] Created Master Info Index '%s'", index_file_name); + + /* Initialize Master_info Hash Table */ + if (hash_init(&master_info_hash, system_charset_info, + MAX_REPLICATION_THREAD, 0, 0, + (hash_get_key)get_key_master_info, + (hash_free_key)free_key_master_info, 1)) + { + sql_print_error("[Multi-Master] Initializing Master_info hash table failed."); + exit(1); + } else + { + sql_print_information("[Multi-Master] Initialized Master_info hash table."); + } +} + +MASTER_INFO_INDEX::~MASTER_INFO_INDEX() +{ + hash_free(&master_info_hash); +} + +/* Load All Master_info from master.info.index File + * RETURN: + * 0 - All Success + * 1 - All Fail + * 2 - Some Success, Some Fail +*/ +bool MASTER_INFO_INDEX::init_all_master_info() +{ + int thread_mask; + int err_num= 0, succ_num= 0; // The number of success read Master_info + char sign[FN_REFLEN]; + DBUG_ENTER("init_all_master_info"); + + if (access(index_file_name,F_OK)) // if master.info.index not exist + DBUG_RETURN(1); + + reinit_io_cache(&index_file, READ_CACHE, 0L,0,0); + strcpy(sign, "test"); + while(!init_strvar_from_file(sign, sizeof(sign), + &index_file, NULL)) + { + Master_info *mi = new Master_info; + lock_slave_threads(mi); + init_thread_mask(&thread_mask,mi,0 /*not inverse*/); + + strmake(mi->sign, sign, sizeof(sign)-1); + + char buf_master_info_file[FN_REFLEN]; + char buf_relay_log_info_file[FN_REFLEN]; + concat_signed_file_name(buf_master_info_file, + master_info_file, ".", mi->sign); + concat_signed_file_name(buf_relay_log_info_file, + relay_log_info_file, ".", mi->sign); + sql_print_information("[Multi-Master] Reading Master_info:'%s', Relay_info:'%s' ...", + buf_master_info_file, buf_relay_log_info_file); + + if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file, + 0, thread_mask)) + { + err_num+= 1; + sql_print_error("[Multi-Master] Initialized Master_info from '%s' fail!", + buf_master_info_file); + unlock_slave_threads(mi); + delete mi; + continue; + } + else // if read Master_info success add it to HASH + { + sql_print_information("[Multi-Master] Initialized Master_info from '%s' success!", + buf_master_info_file); + if (!master_info_index->get_master_info_from_hash(mi->sign)) // Master_info not in HASH + { + if (master_info_index->add_master_info_to_hash(mi, FALSE)) + exit(1); + succ_num+= 1; + unlock_slave_threads(mi); + } + else // Master_info already in HASH + { + sql_print_error("[Multi-Master] Duplicate Master_info sign: '%s'", + mi->sign); + unlock_slave_threads(mi); + delete mi; + continue; + } + if (!opt_skip_slave_start) + { + if (start_slave_threads(1 /* need mutex */, + 0 /* no wait for start*/, + mi, + buf_master_info_file, + buf_relay_log_info_file, + SLAVE_IO | SLAVE_SQL)) + { + sql_print_error("[Multi-Master] Failed to create slave '%s' threads", mi->sign); + unlock_slave_threads(mi); + continue; + } + sql_print_information("[Multi-Master] Start Replication '%s' Success!", mi->sign); + unlock_slave_threads(mi); + } + } + } + if (!err_num) // No Error on read Master_info + { + sql_print_information("[Multi-Master] Read all Master_info Success!"); + DBUG_RETURN(0); + } + else if (!succ_num) // Have some Error and some Success + { + sql_print_warning("[Multi-Master] Read Some Master_info Error!"); + DBUG_RETURN(2); + } + else // All Success + { + sql_print_error("[Multi-Master] Read all Master_info Failed!"); + DBUG_RETURN(1); + } +} + +/* Write new master.info to master.info.index File */ +bool MASTER_INFO_INDEX::write_master_sign_to_index_file(const char *sign) +{ + DBUG_ASSERT(my_b_inited(&index_file) != 0); + reinit_io_cache(&index_file, WRITE_CACHE, + my_b_filelength(&index_file), 0, 0); + + if (my_b_write(&index_file, (uchar*) sign, strlen(sign)) || + my_b_write(&index_file, (uchar*) "\n", 1) || + flush_io_cache(&index_file) || + my_sync(index_file.file, MYF(MY_WME))) + { + sql_print_error("[Multi-Master] Write new Master_info '%s' to index file failed!", sign); + exit(1); + } +} +/* End */ #endif /* HAVE_REPLICATION */ --- a/sql/slave.h 2011-11-20 19:13:53.000000000 +0800 +++ b/sql/slave.h 2012-02-09 17:58:48.000000000 +0800 @@ -34,11 +34,14 @@ #define SLAVE_NET_TIMEOUT 3600 #define MAX_SLAVE_ERROR 2000 - +/* Multi-Master By P.Linux */ +#define MAX_REPLICATION_THREAD 64 +/* End */ // Forward declarations class Relay_log_info; class Master_info; +class MASTER_INFO_INDEX; // P.Linux /***************************************************************************** @@ -198,6 +201,7 @@ pthread_handler_t handle_slave_sql(void *arg); extern bool volatile abort_loop; extern Master_info main_mi, *active_mi; /* active_mi for multi-master */ +extern MASTER_INFO_INDEX *master_info_index; // Multi-Master By P.Linux extern LIST master_list; extern my_bool replicate_same_server_id; --- a/sql/slave.cc 2011-11-20 19:13:53.000000000 +0800 +++ b/sql/slave.cc 2012-02-14 10:04:05.000000000 +0800 @@ -59,6 +59,7 @@ char* slave_load_tmpdir = 0; Master_info *active_mi= 0; +MASTER_INFO_INDEX *master_info_index; // Multi-Master By P.Linux my_bool replicate_same_server_id; ulonglong relay_log_space_limit = 0; @@ -235,6 +236,12 @@ for multi-master */ active_mi= new Master_info; + master_info_index= new MASTER_INFO_INDEX; // Multi-Master By P.Linux + + /* Multi-Master By P.Linux */ + if (master_info_index->init_all_master_info()) + goto err; + /* End */ /* If --slave-skip-errors=... was not used, the string value for the @@ -703,6 +710,10 @@ */ terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); } + /* Multi-Master By P.Linux */ + if (master_info_index) + delete master_info_index; + /* End */ pthread_mutex_unlock(&LOCK_active_mi); DBUG_VOID_RETURN; }