--- a/sql/item_func.cc 2008-12-03 01:30:22.000000000 +0900 +++ b/sql/item_func.cc 2009-01-23 17:52:17.000000000 +0900 @@ -3760,7 +3760,6 @@ #define extra_size sizeof(double) - static user_var_entry *get_variable(HASH *hash, LEX_STRING &name, bool create_if_not_exists) { @@ -3805,6 +3804,13 @@ return entry; } +#ifdef HAVE_NDB_BINLOG +user_var_entry *get_user_var_entry(HASH *hash, LEX_STRING &name, + bool create_if_not_exists) +{ + return get_variable(hash, name, create_if_not_exists); +} +#endif bool Item_func_set_user_var::set_entry(THD *thd, bool create_if_not_exists) { --- a/sql/mysql_priv.h 2008-12-03 01:30:24.000000000 +0900 +++ b/sql/mysql_priv.h 2009-01-21 23:09:20.000000000 +0900 @@ -585,6 +585,7 @@ /* BINLOG_DUMP options */ #define BINLOG_DUMP_NON_BLOCK 1 +#define BINLOG_EPOCH_IN_PACKET 2 /* sql_show.cc:show_log_files() */ #define SHOW_LOG_STATUS_FREE "FREE" --- a/sql/slave.cc 2008-12-03 01:30:26.000000000 +0900 +++ b/sql/slave.cc 2009-02-27 07:33:04.000000000 +0900 @@ -1672,6 +1672,95 @@ DBUG_RETURN(0); } +#ifdef HAVE_NDB_BINLOG +#include "ha_ndbcluster_tables.h" +extern user_var_entry *get_user_var_entry(HASH *hash, LEX_STRING &name, + bool create_if_not_exists); + +/* + get the latest epoch from mysql.ndb_appy_status table. + */ +ulonglong get_epoch_from_binlog_table() +{ + char the_sql[] = "SELECT MAX(epoch) INTO @latest FROM mysql.ndb_apply_status;"; + ulonglong binlog_epoch; + const char* found_semicolon= NULL; + LEX_STRING s_latest; + user_var_entry *var_latest; + int error= 0; + DBUG_ENTER("get_epoch_from_binlog_table"); + + THD *thd = current_thd; + if (!thd) + { + sql_print_error("Can't get an existing THD instance."); + error= 1; + goto err; + } + ulong save_thd_query_length= thd->query_length; + char *save_thd_query= thd->query; + ulong save_thread_id= thd->variables.pseudo_thread_id; + struct system_status_var save_thd_status_var= thd->status_var; + THD_TRANS save_thd_transaction_all= thd->transaction.all; + THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt; + ulonglong save_thd_options= thd->options; + DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options)); + NET save_thd_net= thd->net; + + bzero((char*) &thd->net, sizeof(NET)); + thd->variables.pseudo_thread_id= thread_id; + thd->transaction.stmt.modified_non_trans_table= FALSE; + + /* + execute the query + */ + thd->query_length= strlen(the_sql); + thd->query= the_sql; + thd->transaction.stmt.modified_non_trans_table= FALSE; + DBUG_PRINT("query", ("%s", thd->query)); + mysql_parse(thd, thd->query, thd->query_length, &found_semicolon); + + /* + pickup values from thd->user_var. + */ + my_bool is_null_value; + lex_string_set(&s_latest, "latest"); + if(!(var_latest=get_user_var_entry(&thd->user_vars, s_latest, FALSE))) { + sql_print_error("Can't retrieve a user variable from an internal THD."); + error= 1; + goto err; + } + binlog_epoch= var_latest->val_int(&is_null_value); + if(is_null_value) { // for safety + sql_print_error("Can't retrieve a user variable from an internal THD."); + error =1; + goto err; + } + +err: + /* + The following code comes from run_query(). + */ + close_thread_tables(thd); + if (!thd->main_da.is_error()) + { + thd->main_da.reset_diagnostics_area(); + } + thd->options= save_thd_options; + thd->query_length= save_thd_query_length; + thd->query= save_thd_query; + thd->variables.pseudo_thread_id= save_thread_id; + thd->status_var= save_thd_status_var; + thd->transaction.all= save_thd_transaction_all; + thd->transaction.stmt= save_thd_transaction_stmt; + thd->net= save_thd_net; + + if(error) + DBUG_RETURN(0); + + DBUG_RETURN(binlog_epoch); +} +#endif static int request_dump(MYSQL* mysql, Master_info* mi, bool *suppress_warnings) @@ -1680,16 +1769,37 @@ int len; int binlog_flags = 0; // for now char* logname = mi->master_log_name; +#ifdef HAVE_NDB_BINLOG + ulonglong binlog_epoch; +#endif DBUG_ENTER("request_dump"); *suppress_warnings= FALSE; - + // TODO if big log files: Change next to int8store() + len = (uint) strlen(logname); int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); int4store(buf + 6, server_id); - len = (uint) strlen(logname); memcpy(buf + 10, logname,len); +#ifdef HAVE_NDB_BINLOG + if(len == 0 && (mi->master_log_pos == 0 || mi->master_log_pos == 4)) { + binlog_epoch = get_epoch_from_binlog_table(); + if(binlog_epoch) { + binlog_flags |= BINLOG_EPOCH_IN_PACKET; + logname = "\0EPCH"; + len = 6; // strlen("EPOCH"); + int2store(buf + 4, binlog_flags); + memcpy(buf + 10, logname, len); + int8store(buf + 16, binlog_epoch); + len += 8; // ulonglong + + char epoch_str[32]; + sprintf(&epoch_str[0], "%llu", (ulonglong)binlog_epoch); + sql_print_information("COM_BINLOG_DUMP packet is adjusted according to epoch (%s).", epoch_str); + } + } +#endif if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) { /* --- a/sql/sql_class.h 2008-12-03 01:30:27.000000000 +0900 +++ b/sql/sql_class.h 2009-01-25 07:41:20.000000000 +0900 @@ -1001,7 +1001,8 @@ SYSTEM_THREAD_SLAVE_SQL= 4, SYSTEM_THREAD_NDBCLUSTER_BINLOG= 8, SYSTEM_THREAD_EVENT_SCHEDULER= 16, - SYSTEM_THREAD_EVENT_WORKER= 32 + SYSTEM_THREAD_EVENT_WORKER= 32, + SYSTEM_THREAD_TEMPORARY = 64 }; inline char const * --- a/sql/sql_parse.cc 2008-12-03 01:30:28.000000000 +0900 +++ b/sql/sql_parse.cc 2009-01-26 22:26:54.000000000 +0900 @@ -1369,6 +1369,9 @@ ulong pos; ushort flags; uint32 slave_server_id; +#ifdef HAVE_NDB_BINLOG + ulonglong epoch; +#endif status_var_increment(thd->status_var.com_other); thd->enable_slow_log= opt_log_slow_admin_statements; @@ -1385,7 +1388,27 @@ general_log_print(thd, command, "Log: '%s' Pos: %ld", packet+10, (long) pos); + +#ifdef HAVE_NDB_BINLOG + if(flags & BINLOG_EPOCH_IN_PACKET && + !strncmp(packet+11, "EPCH", 5) && + (epoch = uint8korr(packet + 16))) { + char *binlog_name = (char*)thd->calloc(FN_REFLEN); + ulonglong tmp_pos; + if(get_binlog_pos_from_epoch(binlog_name, &tmp_pos, epoch)) { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + const char* errmsg= "Failed to retrieve a binlog name and position from epoch."; + my_message(my_errno, errmsg, MYF(0)); + break; + } + pos = (ulong)tmp_pos; + mysql_binlog_send(thd, binlog_name, (my_off_t) pos, flags); + } else { + mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags); + } +#else mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags); +#endif unregister_slave(thd,1,1); /* fake COM_QUIT -- if we get here, the thread needs to terminate */ error = TRUE; --- a/sql/sql_repl.cc 2008-12-03 01:30:29.000000000 +0900 +++ b/sql/sql_repl.cc 2009-02-27 07:32:09.000000000 +0900 @@ -1931,6 +1931,131 @@ return 0; } +#ifdef HAVE_NDB_BINLOG +extern user_var_entry *get_user_var_entry(HASH *hash, LEX_STRING &name, + bool create_if_not_exists); + +int get_binlog_pos_from_epoch(char *binlog_name, ulonglong *pos, ulonglong epoch) +{ + char the_sql[] = "SELECT SUBSTRING_INDEX(File, '/', -1), Position\ + INTO @file, @pos\ + FROM mysql.ndb_binlog_index\ + WHERE epoch >= @epoch\ + ORDER BY epoch ASC LIMIT 1;"; + LEX_STRING s_file, s_pos, s_epoch; + user_var_entry *var_file, *var_pos, *var_epoch; + int error= 0; + const char* found_semicolon= NULL; + DBUG_ENTER("get_binlog_pos_from_epoch"); + + THD *thd = current_thd; + if (!thd) + { + sql_print_error("Can't get an existing THD instance."); + error= 1; + goto err; + } + ulong save_thd_query_length= thd->query_length; + char *save_thd_query= thd->query; + ulong save_thread_id= thd->variables.pseudo_thread_id; + struct system_status_var save_thd_status_var= thd->status_var; + THD_TRANS save_thd_transaction_all= thd->transaction.all; + THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt; + ulonglong save_thd_options= thd->options; + DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options)); + NET save_thd_net= thd->net; + + bzero((char*) &thd->net, sizeof(NET)); + thd->variables.pseudo_thread_id= thread_id; + thd->transaction.stmt.modified_non_trans_table= FALSE; + + /* + populate @epoch + */ + lex_string_set(&s_epoch, "epoch"); + if(!(var_epoch=get_user_var_entry(&thd->user_vars, s_epoch, TRUE))) { + sql_print_error("Can't assign a user variable for an internal THD."); + error= 1; + goto err; + } + var_epoch->type= INT_RESULT; + var_epoch->length= sizeof(longlong); + var_epoch->value= (char*)var_epoch + ALIGN_SIZE(sizeof(user_var_entry));; + memcpy(var_epoch->value, (char*)&epoch, 8); + + /* + execute the query + */ + thd->query_length= strlen(the_sql); + thd->query= the_sql; + DBUG_PRINT("query", ("%s", thd->query)); + mysql_parse(thd, thd->query, thd->query_length, &found_semicolon); + + /* + pickup values from thd->user_var. + */ + my_bool is_null_value; + lex_string_set(&s_pos, "pos"); + if(!(var_pos=get_user_var_entry(&thd->user_vars, s_pos, FALSE))) { + sql_print_error("Can't retrieve a user variable (pos) from an internal THD."); + error= 1; + goto err; + } + ulonglong tmp_pos= var_pos->val_int(&is_null_value); + if(is_null_value) { // for safety + sql_print_error("Can't retrieve a user variable (pos) from an internal THD."); + error =1; + goto err; + } + + lex_string_set(&s_file, "file"); + if(!(var_file=get_user_var_entry(&thd->user_vars, s_file, FALSE))) { + sql_print_error("Can't retrieve a user variable (file) from an internal THD."); + error= 1; + goto err; + } + String* str= new (thd->mem_root) String(); + str= var_file->val_str(&is_null_value, str, 0); + if(is_null_value) { // for safety + sql_print_error("Can't retrieve a user variable (file) from an internal THD."); + error =1; + goto err; + } + if(str->length() > FN_REFLEN) { + sql_print_error("Binlog filename is too long."); + error =1; + goto err; + } + + *pos= tmp_pos; + strnmov(binlog_name, str->c_ptr(), FN_REFLEN); + + sql_print_information("binlog name is changed to %s with position %d according to epoch.", + binlog_name, (ulong)*pos); + /* + necessary cleanups + */ +err: + /* + The following code comes from run_query(). + */ + close_thread_tables(thd); + if (!thd->main_da.is_error()) + { + thd->main_da.reset_diagnostics_area(); + } + thd->options= save_thd_options; + thd->query_length= save_thd_query_length; + thd->query= save_thd_query; + thd->variables.pseudo_thread_id= save_thread_id; + thd->status_var= save_thd_status_var; + thd->transaction.all= save_thd_transaction_all; + thd->transaction.stmt= save_thd_transaction_stmt; + thd->net= save_thd_net; + DBUG_RETURN(error); +} +#endif + #endif /* HAVE_REPLICATION */ --- a/sql/sql_repl.h 2008-12-03 01:30:29.000000000 +0900 +++ b/sql/sql_repl.h 2009-01-22 18:32:55.000000000 +0900 @@ -63,5 +63,9 @@ int log_loaded_block(IO_CACHE* file); int init_replication_sys_vars(); +#ifdef HAVE_NDB_BINLOG +int get_binlog_pos_from_epoch(char *binlog_name, ulonglong *pos, ulonglong epoch); +#endif + #endif /* HAVE_REPLICATION */