commit 2d525a4562d7dc9e62901ed6e671f10901071f63 Author: Justin Tolmer Date: Mon Oct 29 16:26:15 2012 -0700 Add single stress test which hits the DBUG problem diff --git a/mysql-test/include/wait_for_slave_to_sync_with_master.inc b/mysql-test/include/wait_for_slave_to_sync_with_master.inc new file mode 100644 index 0000000..bae56fb --- /dev/null +++ b/mysql-test/include/wait_for_slave_to_sync_with_master.inc @@ -0,0 +1,27 @@ +# +# Similar to sync_slave_with_master but supports a configurable timeout +# let $slave_sync_timeout = 300; +# source include/wait_for_slave_to_sync_with_master; +# + +connection slave; +stop slave; +start slave; + +connection master; + +let $file = query_get_value(SHOW MASTER STATUS, File, 1); +let $pos = query_get_value(SHOW MASTER STATUS, Position, 1); + +connection slave; +--disable_result_log +--disable_query_log +eval select master_pos_wait("$file", $pos, $slave_sync_timeout); +--enable_result_log +--enable_query_log + +connection master; +sync_slave_with_master; + +connection master; + diff --git a/mysql-test/mysql-test-run.pl b/mysql-test/mysql-test-run.pl index 16f7103..68d7f8e 100755 --- a/mysql-test/mysql-test-run.pl +++ b/mysql-test/mysql-test-run.pl @@ -163,7 +163,7 @@ our $opt_vs_config = $ENV{'MTR_VS_CONFIG'}; # If you add a new suite, please check TEST_DIRS in Makefile.am. # -my $DEFAULT_SUITES= "main,sys_vars,binlog,federated,rpl,innodb,innodb_fts,perfschema,funcs_1,opt_trace"; +my $DEFAULT_SUITES= "main,sys_vars,binlog,federated,rpl,innodb,innodb_fts,perfschema,funcs_1,opt_trace,innodb_stress"; my $opt_suites; our $opt_verbose= 0; # Verbose output, enable with --verbose diff --git a/mysql-test/suite/innodb_stress/include/innodb_stress.inc b/mysql-test/suite/innodb_stress/include/innodb_stress.inc new file mode 100644 index 0000000..847ca9a --- /dev/null +++ b/mysql-test/suite/innodb_stress/include/innodb_stress.inc @@ -0,0 +1,111 @@ +# Populate a table with 1000 records. Allow the replica to sync with the master. +# Run concurrent threads that run OLTP transactions on master. +# Kill the master database server at random points. +# Check the table against the replica. +# Reinvoke the threads. + +# create the directory for temporary log files. +--exec mkdir -p $MYSQL_TMP_DIR/load_generator + +--connection master + +# since this test generates lot of errors in log, suppress checking errors +call mtr.add_suppression(".*"); + +--sync_slave_with_master + +--connection master +--let $pid_file = `SELECT @@pid_file` +--let $crash_num = 0 +--let $master_host = 127.0.0.1 +--let $table = test +--let $user = root + +while ($num_crashes) +{ + connection master; + exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect; + if ($crash_num) + { + let $num_records = 0; # do not populate the table except for the first run. + } + + if ($use_blob) + { + let $exec = +python suite/innodb_stress/t/load_generator.py $pid_file $kill_db_after +$num_records $num_workers $num_transactions $user $master_host $MASTER_MYPORT +$table 1 $max_rows $MYSQL_TMP_DIR/load_generator; + } + if (!$use_blob) + { + let $exec = +python suite/innodb_stress/t/load_generator.py $pid_file $kill_db_after +$num_records $num_workers $num_transactions $user $master_host $MASTER_MYPORT +$table 0 $max_rows $MYSQL_TMP_DIR/load_generator; + } + + exec $exec; + + if ($do_crash) + { + --echo Wait for reconnect + enable_reconnect; + # Call script that will poll the server waiting for it to be back online again + source include/wait_until_connected_again.inc; + connection slave; + source include/wait_until_connected_again.inc; + connection master; + } + + --echo Checksum master + let $master_checksum = query_get_value(CHECKSUM TABLE t1, Checksum, 1); + + # if sync_slave_with_master had a configurable timeout this would not be needed + let $slave_sync_timeout = 7200; + --source include/wait_for_slave_to_sync_with_master.inc + + connection slave; + --echo Checksum slave + let $slave_checksum=query_get_value(CHECKSUM TABLE t1, Checksum, 1); + let $not_same = `SELECT $master_checksum-$slave_checksum`; + if ($not_same) + { + let $msg = +The checksums of table t1 for master and slave do not match for $crash_num th +crash. This may happen if there is a corrupt recovery log or a bug in crash +recovery. You can take a look at the logs in $MYSQL_TMP_DIR/load_generator to see the +queries issued before the crash.; + echo $msg; + + connection master; + eval select * into outfile '$MYSQLTEST_VARDIR/tmp/master_all' from t1 order by id; + eval select id into outfile '$MYSQLTEST_VARDIR/tmp/master_id' from t1 order by id; + show master status; + + connection slave; + eval select * into outfile '$MYSQLTEST_VARDIR/tmp/slave_all' from t1 order by id; + eval select id into outfile '$MYSQLTEST_VARDIR/tmp/slave_id' from t1 order by id; + show slave status; + + die; + } + dec $num_crashes; + inc $crash_num; +} + +# final cleanup +--connection master + +DROP TABLE t1; + +# if sync_slave_with_master had a configurable timeout this would not be needed +let $slave_sync_timeout = 7200; +--source include/wait_for_slave_to_sync_with_master.inc + +--connection slave +stop slave; + +connection master; + +# --exec rm -rf $MYSQL_TMP_DIR/load_generator diff --git a/mysql-test/suite/innodb_stress/r/innodb_bigstress_crash_nocompress.result b/mysql-test/suite/innodb_stress/r/innodb_bigstress_crash_nocompress.result new file mode 100644 index 0000000..f4f90f0 --- /dev/null +++ b/mysql-test/suite/innodb_stress/r/innodb_bigstress_crash_nocompress.result @@ -0,0 +1,28 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master.info repository is not secure and is therefore not recommended. Please see the MySQL Manual for more about this issue and possible alternatives. +[connection master] +DROP TABLE IF EXISTS t1; +CREATE TABLE t1(id INT AUTO_INCREMENT PRIMARY KEY, +msg_prefix VARCHAR(255), +msg VARCHAR(255), +msg_length int, +msg_checksum varchar(128), +KEY msg_i(msg_prefix)) +ENGINE=INNODB; +call mtr.add_suppression(".*"); +Wait for reconnect +Checksum master +stop slave; +start slave; +Checksum slave +Wait for reconnect +Checksum master +stop slave; +start slave; +Checksum slave +DROP TABLE t1; +stop slave; +start slave; +stop slave; diff --git a/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress-master.opt b/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress-master.opt new file mode 100644 index 0000000..5fe698a --- /dev/null +++ b/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress-master.opt @@ -0,0 +1,7 @@ +--binlog-do-db=test +--innodb-file-per-table +--innodb_file_format='Barracuda' +--sync_binlog=10 +--innodb_flush_log_at_trx_commit=2 +--force-restart +--innodb_buffer_pool_size=512M diff --git a/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress-slave.opt b/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress-slave.opt new file mode 100644 index 0000000..5a897f0 --- /dev/null +++ b/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress-slave.opt @@ -0,0 +1,4 @@ +--innodb_file_format='Barracuda' +--innodb_flush_log_at_trx_commit=2 +--innodb_buffer_pool_size=512M +--log_slave_updates=0 diff --git a/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress.test b/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress.test new file mode 100644 index 0000000..d67d18e --- /dev/null +++ b/mysql-test/suite/innodb_stress/t/innodb_bigstress_crash_nocompress.test @@ -0,0 +1,33 @@ +# stress tests: blobs=no, crash=yes, compress=no + +# Don't test this under valgrind, memory leaks will occur +--source include/not_valgrind.inc +--source include/have_innodb.inc +--source include/master-slave.inc +--source include/big_test.inc + +--disable_warnings +DROP TABLE IF EXISTS t1; +--enable_warnings + +# create the actual table +CREATE TABLE t1(id INT AUTO_INCREMENT PRIMARY KEY, + msg_prefix VARCHAR(255), + msg VARCHAR(255), + msg_length int, + msg_checksum varchar(128), + KEY msg_i(msg_prefix)) +ENGINE=INNODB; + +let $use_blob=0; +let $do_crash=1; +let $do_compress=0; + +--let $num_crashes = 2 +--let $num_workers = 20 +--let $num_transactions = 0 +--let $kill_db_after = 90 +--let $num_records = 1000 +--let $max_rows = 4096 + +--source suite/innodb_stress/include/innodb_stress.inc diff --git a/mysql-test/suite/innodb_stress/t/load_generator.py b/mysql-test/suite/innodb_stress/t/load_generator.py new file mode 100644 index 0000000..00c1c5e --- /dev/null +++ b/mysql-test/suite/innodb_stress/t/load_generator.py @@ -0,0 +1,259 @@ +import cStringIO +import hashlib +import MySQLdb +import os +import random +import signal +import sys +import threading +import time +import string + +CHARS = string.letters + string.digits + +def sha1(x): + return hashlib.sha1(str(x)).hexdigest() + +def get_msg(do_blob): + if do_blob: + blob_length = random.randint(1, 24000) + else: + blob_length = random.randint(1, 255) + + if random.randint(1, 2) == 1: + # blob that cannot be compressed (well, compresses to 85% of original size) + return ''.join([random.choice(CHARS) for x in xrange(blob_length)]) + else: + # blob that can be compressed + return random.choice(CHARS) * blob_length + +def populate_table(con, num_records_before, do_blob, log): + cur = con.cursor() + stmt = None + + try: + for i in xrange(num_records_before): + msg = get_msg(do_blob) + # print >> log, "length is %d, complen is %d" % (len(msg), len(zlib.compress(msg, 6))) + stmt = """ +INSERT INTO t1(id,msg_prefix,msg,msg_length,msg_checksum) VALUES (NULL,'%s','%s',%d,'%s') +""" % (msg[0:255], msg, len(msg), sha1(msg)) + cur.execute(stmt) + + con.commit() + return True + + except MySQLdb.Error, e: + print >> log, "cannot insert (%s) for statement (%s)" % (e, stmt) + return False + +def get_update(msg, idx): + return """ +UPDATE t1 SET msg_prefix='%s',msg='%s',msg_length=%d,msg_checksum='%s' WHERE id=%d""" % ( +msg[0:255], msg, len(msg), sha1(msg), idx) + +def get_insert_on_dup(msg, idx): + return """ +INSERT INTO t1 (msg_prefix,msg,msg_length,msg_checksum,id) VALUES ('%s','%s',%d,%s,%d) +ON DUPLICATE KEY UPDATE +msg_prefix=VALUES(msg_prefix), +msg=VALUES(msg), +msg_length=VALUES(msg_length), +msg_checksum=VALUES(msg_checksum), +id=VALUES(id)""" % ( +msg[0:255], msg, len(msg), sha1(msg), idx) + +def get_insert(msg, idx): + return """ +INSERT INTO t1 (msg_prefix,msg,msg_length,msg_checksum,id) VALUES ('%s','%s',%d,'%s',%d)""" % ( +msg[0:255], msg, len(msg), sha1(msg), idx) + +def get_insert_null(msg): + return """ +INSERT INTO t1 (msg_prefix,msg,msg_length,msg_checksum,id) VALUES ('%s','%s',%d,'%s',NULL)""" % ( +msg[0:255], msg, len(msg), sha1(msg)) + +class Worker(threading.Thread): + global LG_TMP_DIR + + def __init__(self, num_xactions, xid, con, server_pid, do_blob, max_id): + threading.Thread.__init__(self) + self.do_blob = do_blob + self.xid = xid + con.autocommit(False) + self.con = con + self.num_xactions = num_xactions + cur = self.con.cursor() + self.rand = random.Random() + self.rand.seed(xid * server_pid) + self.loop_num = 0 + self.max_id = max_id + self.num_inserts = 0 + self.num_deletes = 0 + self.num_updates = 0 + self.time_spent = 0 + self.log = open('/%s/worker%02d.log' % (LG_TMP_DIR, self.xid), 'a') +# print "num_inserts=%d,num_updates=%d,num_deletes=%d,time_spent=%d" %\ +#(self.num_inserts, self.num_updates, self.num_deletes, self.time_spent) + self.start() + + def finish(self): + print >> self.log, "loop_num:%d, total time: %.2f s" % ( + self.loop_num, time.time() - self.start_time + self.time_spent) + self.log.close() + + def validate_msg(self, msg_prefix, msg, msg_length, msg_checksum, idx): + + prefix_match = msg_prefix == msg[0:255] + + checksum = sha1(msg) + checksum_match = checksum == msg_checksum + + len_match = len(msg) == msg_length + + if not prefix_match or not checksum_match or not len_match: + errmsg = "id(%d), length(%s,%d,%d), checksum(%s,%s,%s) prefix(%s,%s,%s)" % ( + idx, + len_match, len(msg), msg_length, + checksum_match, checksum, msg_checksum, + prefix_match, msg_prefix, msg[0:255]) + print >> self.log, errmsg + + cursor = self.con.cursor() + cursor.execute("INSERT INTO errors VALUES('%s')" % errmsg) + cursor.execute("COMMIT") + raise Exception('validate_msg failed') + else: + print >> self.log, "Validated for length(%d) and id(%d)" % (msg_length, idx) + + def run(self): + try: + self.runme() + print >> self.log, "ok, with do_blob %s" % self.do_blob + except Exception, e: + + try: + cursor = self.con.cursor() + cursor.execute("INSERT INTO errors VALUES('%s')" % e) + cursor.execute("COMMIT") + except MySQLdb.Error, e2: + print >> self.log, "caught while inserting error (%s)" % e2 + + print >> self.log, "caught (%s)" % e + finally: + self.finish() + + def runme(self): + self.start_time = time.time() + cur = self.con.cursor() + print >> self.log, "thread %d started, run from %d to %d" % ( + self.xid, self.loop_num, self.num_xactions) + + while not self.num_xactions or (self.loop_num < self.num_xactions): + idx = self.rand.randint(0, self.max_id) + insert_or_update = self.rand.randint(0, 3) + self.loop_num += 1 + try: + stmt = None + + msg = get_msg(self.do_blob) + + cur.execute("SELECT msg_prefix,msg,msg_length,msg_checksum FROM t1 WHERE id=%d" % idx) + res = cur.fetchone() + if res: + self.validate_msg(res[0], res[1], res[2], res[3], idx) + + if insert_or_update: + if res: + if self.rand.randint(0, 1): + stmt = get_update(msg, idx) + else: + stmt = get_insert_on_dup(msg, idx) + self.num_updates += 1 + else: + r = self.rand.randint(0, 2) + if r == 0: + stmt = get_insert(msg, idx) + elif r == 1: + stmt = get_insert_on_dup(msg, idx) + else: + stmt = get_insert_null(msg) + self.num_inserts += 1 + else: + stmt = "DELETE FROM t1 WHERE id=%d" % idx + self.num_deletes += 1 + + query_result = cur.execute(stmt) + if (self.loop_num % 100) == 0: + print >> self.log, "Thread %d loop_num %d: result %d: %s" % (self.xid, + self.loop_num, query_result, + stmt) + + # 30% commit, 10% rollback, 60% don't end the trx + r = self.rand.randint(1,10) + if r < 4: + self.con.commit() + elif r == 4: + self.con.rollback() + + except MySQLdb.Error, e: + if e.args[0] == 2006: # server is killed + print >> self.log, "mysqld down, transaction %d" % self.xid + return + else: + print >> self.log, "mysql error for stmt(%s) %s" % (stmt, e) + + try: + self.con.commit() + except Exception, e: + print >> self.log, "commit error %s" % e + +if __name__ == '__main__': + global LG_TMP_DIR + + pid_file = sys.argv[1] + kill_db_after = int(sys.argv[2]) + num_records_before = int(sys.argv[3]) + num_workers = int(sys.argv[4]) + num_xactions_per_worker = int(sys.argv[5]) + user = sys.argv[6] + host = sys.argv[7] + port = int(sys.argv[8]) + db = sys.argv[9] + do_blob = int(sys.argv[10]) + max_id = int(sys.argv[11]) + LG_TMP_DIR = sys.argv[12] + workers = [] + server_pid = int(open(pid_file).read()) + log = open('/%s/main.log' % LG_TMP_DIR, 'a') + +# print "kill_db_after = ",kill_db_after," num_records_before = ", \ +#num_records_before, " num_workers= ",num_workers, "num_xactions_per_worker =",\ +#num_xactions_per_worker, "user = ",user, "host =", host,"port = ",port,\ +#" db = ", db, " server_pid = ", server_pid + + if num_records_before: + print >> log, "populate table do_blob is %d" % do_blob + con = MySQLdb.connect(user=user, host=host, port=port, db=db) + if not populate_table(con, num_records_before, do_blob, log): + sys.exit(1) + con.close() + + print >> log, "start %d threads" % num_workers + for i in xrange(num_workers): + worker = Worker(num_xactions_per_worker, i, + MySQLdb.connect(user=user, host=host, port=port, db=db), + server_pid, do_blob, max_id) + workers.append(worker) + + if kill_db_after: + print >> log, "kill mysqld" + time.sleep(kill_db_after) + os.kill(server_pid, signal.SIGKILL) + + print >> log, "wait for threads" + for w in workers: + w.join() + + print >> log, "all threads done" +