=== modified file 'sql/ha_ndbcluster.cc' --- sql/ha_ndbcluster.cc 2010-05-12 11:56:28 +0000 +++ sql/ha_ndbcluster.cc 2010-05-21 13:41:59 +0000 @@ -3407,6 +3407,7 @@ inline void ha_ndbcluster::eventSetAnyValue(THD *thd, NdbOperation::OperationOptions *options) { + options->anyValue= 0; if (unlikely(m_slow_path)) { /* @@ -3418,15 +3419,34 @@ ha_ndbcluster::eventSetAnyValue(THD *thd Thd_ndb *thd_ndb= get_thd_ndb(thd); if (thd->slave_thread) { + /* + Slave-thread, server_id comes from the Binlog event and + in the case of server-id-bits < 31 it may be a composite + so we set it as-is when applying. + In future it may be useful to support *not* mapping composite + AnyValues to/from Binlogged server-ids + */ options->optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE; options->anyValue=thd->server_id; } else if (thd_ndb->trans_options & TNTO_NO_LOGGING) { options->optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE; - options->anyValue=NDB_ANYVALUE_FOR_NOLOGGING; + ndbcluster_anyvalue_set_nologging(options->anyValue); } } +#ifndef DBUG_OFF + /* + MySQLD will set the user-portion of AnyValue (if any) to all 1s + This tests code filtering ServerIds on the value of server-id-bits. + */ + const char* p = getenv("NDB_TEST_ANYVALUE_USERDATA"); + if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N') + { + options->optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE; + dbug_ndbcluster_anyvalue_set_userbits(options->anyValue); + } +#endif } bool ha_ndbcluster::isManualBinlogExec(THD *thd) === modified file 'sql/ha_ndbcluster_binlog.cc' --- sql/ha_ndbcluster_binlog.cc 2010-05-04 13:16:50 +0000 +++ sql/ha_ndbcluster_binlog.cc 2010-05-21 13:41:59 +0000 @@ -45,6 +45,9 @@ extern my_bool opt_ndb_log_updated_only; extern my_bool ndb_log_binlog_index; +extern uint opt_server_id_bits; +Uint32 serverIdMask = 0; /* Inited from server_id_bits below */ + /* defines for cluster replication table names */ @@ -2022,7 +2025,7 @@ int ndbcluster_log_schema_op(THD *thd, r|= op->setValue(SCHEMA_TYPE_I, log_type); DBUG_ASSERT(r == 0); /* any value */ - Uint32 anyValue; + Uint32 anyValue= 0; if (! thd->slave_thread) { /* Schema change originating from this MySQLD, check SQL_LOG_BIN @@ -2031,24 +2034,43 @@ int ndbcluster_log_schema_op(THD *thd, if (thd->options & OPTION_BIN_LOG) /* e.g. SQL_LOG_BIN == on */ { DBUG_PRINT("info", ("Schema event for binlogging")); - anyValue = 0; + ndbcluster_anyvalue_set_normal(anyValue); } else { DBUG_PRINT("info", ("Schema event not for binlogging")); - anyValue = NDB_ANYVALUE_FOR_NOLOGGING; + ndbcluster_anyvalue_set_nologging(anyValue); } } else { - /* Slave applying replicated schema event - * Pass original applier's serverId in AnyValue - */ + /* + Slave propagating replicated schema event in ndb_schema + In case replicated serverId is composite + (server-id-bits < 31) we copy it into the + AnyValue as-is + This is for 'future', as currently Schema operations + do not have composite AnyValues. + In future it may be useful to support *not* mapping composite + AnyValues to/from Binlogged server-ids. + */ DBUG_PRINT("info", ("Replicated schema event with original server id %d", thd->server_id)); anyValue = thd->server_id; } +#ifndef DBUG_OFF + /* + MySQLD will set the user-portion of AnyValue (if any) to all 1s + This tests code filtering ServerIds on the value of server-id-bits. + */ + const char* p = getenv("NDB_TEST_ANYVALUE_USERDATA"); + if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N') + { + dbug_ndbcluster_anyvalue_set_userbits(anyValue); + } +#endif + r|= op->setAnyValue(anyValue); DBUG_ASSERT(r == 0); #if 0 @@ -2298,38 +2320,52 @@ static void ndb_binlog_query(THD *thd, C /* any_value == 0 means local cluster sourced change that * should be logged */ - if (schema->any_value != 0) + if (ndbcluster_anyvalue_is_reserved(schema->any_value)) { - if (schema->any_value & NDB_ANYVALUE_RESERVED) + /* Originating SQL node did not want this query logged */ + if (!ndbcluster_anyvalue_is_nologging(schema->any_value)) + sql_print_warning("NDB: unknown value for binlog signalling 0x%X, " + "query not logged", + schema->any_value); + return; + } + + Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value); + /* + Start with serverId as received AnyValue, in case it's a composite + (server_id_bits < 31). + This is for 'future', as currently schema ops do not have composite + AnyValues. + In future it may be useful to support *not* mapping composite + AnyValues to/from Binlogged server-ids. + */ + Uint32 loggedServerId = schema->any_value; + + if (queryServerId) + { + /* + AnyValue has non-zero serverId, must be a query applied by a slave + mysqld. + TODO : Assert that we are running in the Binlog injector thread? + */ + if (! g_ndb_log_slave_updates) { - /* Originating SQL node did not want this query logged */ - if (schema->any_value != NDB_ANYVALUE_FOR_NOLOGGING) - sql_print_warning("NDB: unknown value for binlog signalling 0x%X, " - "query not logged", - schema->any_value); + /* This MySQLD does not log slave updates */ return; } - else - { - /* AnyValue is set to non-zero serverId, must be a query applied - * by a slave mysqld. - * TODO : Assert that we are running in the Binlog injector thread? - */ - if (! g_ndb_log_slave_updates) - { - /* This MySQLD does not log slave updates */ - return; - } - } + } + else + { + /* No ServerId associated with this query, mark it as ours */ + ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id); } uint32 thd_server_id_save= thd->server_id; DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id)); char *thd_db_save= thd->db; - if (schema->any_value == 0) - thd->server_id= ::server_id; - else - thd->server_id= schema->any_value; + + thd->server_id = loggedServerId; + thd->db= schema->db; int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, @@ -3241,13 +3277,24 @@ int ndbcluster_binlog_start() if (::server_id == 0) { - sql_print_warning("NDB: server id set to zero will cause any other mysqld " - "with bin log to log with wrong server id"); + sql_print_warning("NDB: server id set to zero - changes logged to " + "bin log with server id zero will be logged with " + "another server id by slave mysqlds"); } - else if (::server_id & 0x1 << 31) + + /* + Create serverId mask now - the variable is const. + Mask is set for serverId (lowest) bits + */ + serverIdMask = (opt_server_id_bits == 32)? + ~ Uint32(0): + (1 << opt_server_id_bits) - 1; + + if ((::server_id & 0x1 << 31) || // Reserved bit + !ndbcluster_anyvalue_is_serverid_in_range(::server_id)) // server_id_bits { - sql_print_error("NDB: server id's with high bit set is reserved for internal " - "purposes"); + sql_print_error("NDB: server id provided is too large to be represented in " + "opt_server_id_bits or is reserved"); DBUG_RETURN(-1); } @@ -5151,17 +5198,20 @@ ndb_binlog_thread_handle_data_event(Ndb return 0; } - uint32 originating_server_id= pOp->getAnyValue(); - if (originating_server_id == 0) - originating_server_id= ::server_id; - else if (originating_server_id & NDB_ANYVALUE_RESERVED) + uint32 anyValue= pOp->getAnyValue(); + if (ndbcluster_anyvalue_is_reserved(anyValue)) { - if (originating_server_id != NDB_ANYVALUE_FOR_NOLOGGING) + if (!ndbcluster_anyvalue_is_nologging(anyValue)) sql_print_warning("NDB: unknown value for binlog signalling 0x%X, " "event not logged", - originating_server_id); + anyValue); return 0; } + + uint32 originating_server_id= ndbcluster_anyvalue_get_serverid(anyValue); + + if (originating_server_id == 0) + originating_server_id= ::server_id; else if (!g_ndb_log_slave_updates) { /* @@ -5171,6 +5221,16 @@ ndb_binlog_thread_handle_data_event(Ndb return 0; } + /* + Start with logged_server_id as AnyValue in case it's a composite + (server_id_bits < 31). This way any user-values are passed-through + to the Binlog in the high bits of the event's Server Id. + In future it may be useful to support *not* mapping composite + AnyValues to/from Binlogged server-ids. + */ + uint32 logged_server_id= anyValue; + ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id); + DBUG_ASSERT(trans.good()); DBUG_ASSERT(table != 0); @@ -5220,7 +5280,7 @@ ndb_binlog_thread_handle_data_event(Ndb DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]); - IF_DBUG(int ret=) trans.write_row(originating_server_id, + IF_DBUG(int ret=) trans.write_row(logged_server_id, injector::transaction::table(table, TRUE), &b, n_fields, table->record[0]); @@ -5261,7 +5321,7 @@ ndb_binlog_thread_handle_data_event(Ndb } ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]); DBUG_EXECUTE("info", print_records(table, table->record[n]);); - IF_DBUG(int ret =) trans.delete_row(originating_server_id, + IF_DBUG(int ret =) trans.delete_row(logged_server_id, injector::transaction::table(table, TRUE), &b, n_fields, table->record[n]); @@ -5293,7 +5353,7 @@ ndb_binlog_thread_handle_data_event(Ndb since table has a primary key, we can do a write using only after values */ - IF_DBUG(int ret =) trans.write_row(originating_server_id, + IF_DBUG(int ret =) trans.write_row(logged_server_id, injector::transaction::table(table, TRUE), &b, n_fields, table->record[0]);// after values DBUG_ASSERT(ret == 0); @@ -5315,7 +5375,7 @@ ndb_binlog_thread_handle_data_event(Ndb } ndb_unpack_record(table, event_data->ndb_value[1], &b, table->record[1]); DBUG_EXECUTE("info", print_records(table, table->record[1]);); - IF_DBUG(int ret =) trans.update_row(originating_server_id, + IF_DBUG(int ret =) trans.update_row(logged_server_id, injector::transaction::table(table, TRUE), &b, n_fields, @@ -6440,4 +6500,88 @@ ndbcluster_show_status_binlog(THD* thd, DBUG_RETURN(FALSE); } +/* + AnyValue carries ServerId or Reserved codes + Bits from opt_ndb_server_id_bits to 30 may carry other data + so we ignore them. + + 332 21 10 0 + 10987654321098765432109876543210 + roooooooooooooooooooooooosssssss + + r = Reserved bit indicates whether + bits 0-7+ have ServerId (0) or + some special reserved code (1). + o = Optional bits, depending on value + of ndb-serverid-bits will be + serverid bits or user-specific + data + s = Serverid bits or reserved codes + At least 7 bits will be available + for serverid or reserved codes + +*/ + +#define NDB_ANYVALUE_RESERVED_BIT 0x80000000 +#define NDB_ANYVALUE_RESERVED_MASK 0x8000007f + +#define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f + + +bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue) +{ + return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0); +} + +bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue) +{ + return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) == + NDB_ANYVALUE_NOLOGGING_CODE); +} + +void ndbcluster_anyvalue_set_nologging(Uint32& anyValue) +{ + anyValue |= NDB_ANYVALUE_NOLOGGING_CODE; +} + +void ndbcluster_anyvalue_set_normal(Uint32& anyValue) +{ + /* Clear reserved bit and serverid bits */ + anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT); + anyValue &= ~(serverIdMask); +} + +bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId) +{ + return ((serverId & ~serverIdMask) == 0); +} + +Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue) +{ + assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) ); + + return (anyValue & serverIdMask); +} + +void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId) +{ + assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) ); + anyValue &= ~(serverIdMask); + anyValue |= (serverId & serverIdMask); +} + +#ifndef DBUG_OFF +void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue) +{ + /* + Set userData part of AnyValue (if there is one) to + all 1s to test that it is ignored + */ + const Uint32 userDataMask = ~(serverIdMask | + NDB_ANYVALUE_RESERVED_BIT); + + anyValue |= userDataMask; +} +#endif + #endif === modified file 'sql/ha_ndbcluster_binlog.h' --- sql/ha_ndbcluster_binlog.h 2010-05-04 13:16:50 +0000 +++ sql/ha_ndbcluster_binlog.h 2010-05-21 13:41:59 +0000 @@ -32,10 +32,6 @@ extern ulong ndb_extra_logging; #define NDB_INVALID_SCHEMA_OBJECT 241 -/* server id's with high bit set is reservered */ -#define NDB_ANYVALUE_FOR_NOLOGGING 0xFFFFFFFF -#define NDB_ANYVALUE_RESERVED 0x80000000 - extern handlerton *ndbcluster_hton; class Ndb_event_data @@ -348,3 +344,16 @@ private: THD *m_thd; int m_lock; }; + + +bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue); +bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue); +void ndbcluster_anyvalue_set_nologging(Uint32& anyValue); +bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId); +void ndbcluster_anyvalue_set_normal(Uint32& anyValue); +Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue); +void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId); + +#ifndef DBUG_OFF +void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue); +#endif === modified file 'sql/log_event.cc' --- sql/log_event.cc 2010-04-28 13:54:56 +0000 +++ sql/log_event.cc 2010-05-21 13:41:02 +0000 @@ -818,11 +818,14 @@ Log_event::do_shall_skip(Relay_log_info { DBUG_PRINT("info", ("ev->server_id=%lu, ::server_id=%lu," " rli->replicate_same_server_id=%d," - " rli->slave_skip_counter=%d", + " rli->slave_skip_counter=%d," + " rli->server_id_mask=%x", (ulong) server_id, (ulong) ::server_id, rli->replicate_same_server_id, - rli->slave_skip_counter)); - if ((server_id == ::server_id && !rli->replicate_same_server_id) || + rli->slave_skip_counter, + rli->mi->server_id_mask)); + ulong masked_server_id = server_id & rli->mi->server_id_mask; + if ((masked_server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group())) return EVENT_SKIP_IGNORE; else if (rli->slave_skip_counter > 0) === modified file 'sql/mysqld.cc' --- sql/mysqld.cc 2010-05-04 14:34:54 +0000 +++ sql/mysqld.cc 2010-05-21 13:41:02 +0000 @@ -473,6 +473,8 @@ extern TYPELIB ndb_distribution_typelib; extern const char *opt_ndb_distribution; extern enum ndb_distribution opt_ndb_distribution_id; #endif +uint opt_server_id_bits= 0; + my_bool opt_readonly, use_temp_pool, relay_log_purge; my_bool opt_sync_frm, opt_allow_suspicious_udfs; my_bool opt_secure_auth= 0; @@ -3921,6 +3923,15 @@ using --replicate-same-server-id in conj server."); unireg_abort(1); } + ulong server_id_mask = (opt_server_id_bits == 32)? + ~ ulong(0) : (1 << opt_server_id_bits) -1; + if (server_id != (server_id & server_id_mask)) + { + sql_print_error("\ +server-id configured is too large to represent with \ +server-id-bits configured."); + unireg_abort(1); + } #endif if (opt_bin_log) @@ -6377,6 +6388,12 @@ master-ssl", (uchar**) &opt_ndb_cluster_connection_pool, 0, GET_ULONG, REQUIRED_ARG, 1, 1, 63, 0, 0, 0}, #endif + {"server-id-bits", 255, + "Set number of significant bits in ServerId", + (uchar**) &opt_server_id_bits, + (uchar**) &opt_server_id_bits, + /* Default + Max 32 bits, minimum 7 bits */ + 0, GET_UINT, REQUIRED_ARG, 32, 7, 32, 0, 0, 0}, {"new", 'n', "Use very new possible 'unsafe' functions.", (uchar**) &global_system_variables.new_mode, (uchar**) &max_system_variables.new_mode, === modified file 'sql/rpl_mi.cc' --- sql/rpl_mi.cc 2010-05-06 12:28:22 +0000 +++ sql/rpl_mi.cc 2010-05-21 13:41:02 +0000 @@ -37,7 +37,7 @@ Master_info::Master_info() ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0), heartbeat_period(0), received_heartbeats(0), inited(0), abort_slave(0), slave_running(0), slave_run_id(0), - master_id(0) + master_id(0), server_id_mask(~0) { host[0] = 0; user[0] = 0; password[0] = 0; bind_addr[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -142,6 +142,12 @@ void init_master_info_with_options(Maste strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1); /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; + + /* Init serverId mask from server-id-bits option */ + mi->server_id_mask = (opt_server_id_bits == 32)? + ~ ulong(0) : + (1 << opt_server_id_bits) -1; + DBUG_VOID_RETURN; } @@ -384,6 +390,11 @@ file '%s')", fname); mi->ssl= (my_bool) ssl; mi->ssl_verify_server_cert= ssl_verify_server_cert; mi->heartbeat_period= master_heartbeat_period; + + /* Init serverId mask from server-id-bits option */ + mi->server_id_mask = (opt_server_id_bits == 32)? + ~ ulong(0) : + (1 << opt_server_id_bits) -1; } DBUG_PRINT("master_info",("log_file_name: %s position: %ld", mi->master_log_name, === modified file 'sql/rpl_mi.h' --- sql/rpl_mi.h 2009-09-24 11:05:08 +0000 +++ sql/rpl_mi.h 2010-05-21 13:41:02 +0000 @@ -111,6 +111,7 @@ class Master_info : public Slave_reporti */ long clock_diff_with_master; uint64 master_epoch; + ulong server_id_mask; }; void init_master_info_with_options(Master_info* mi); === modified file 'sql/set_var.cc' --- sql/set_var.cc 2010-05-04 14:34:54 +0000 +++ sql/set_var.cc 2010-05-21 13:41:02 +0000 @@ -78,6 +78,9 @@ extern my_bool opt_ndb_log_updated_only; extern my_bool opt_ndb_log_empty_epochs; #endif +extern uint opt_server_id_bits; + + extern CHARSET_INFO *character_set_filesystem; @@ -760,6 +763,10 @@ sys_ndb_optimized_node_selection(&vars, &SV::ndb_optimized_node_selection); #endif //WITH_NDBCLUSTER_STORAGE_ENGINE +static sys_var_const +sys_server_id_bits(&vars, "server_id_bits", OPT_GLOBAL, SHOW_INT, + (uchar*) &opt_server_id_bits); + /* Time/date/datetime formats */ static sys_var_thd_date_time_format sys_time_format(&vars, "time_format", === modified file 'sql/slave.h' --- sql/slave.h 2010-03-12 10:36:52 +0000 +++ sql/slave.h 2010-05-21 13:41:02 +0000 @@ -223,6 +223,8 @@ extern char *report_host, *report_passwo extern my_bool master_ssl; extern char *master_ssl_ca, *master_ssl_capath, *master_ssl_cert; extern char *master_ssl_cipher, *master_ssl_key; + +extern uint opt_server_id_bits; extern I_List threads;