import mysql.connector import time import threading import sys ver = sys.argv[1] connect_opts = { 'autocommit': False, 'host': 'mysql57-'+ver, 'user': 'root', 'password': 'r00t'} def insert(cursor, uuid, key, val, delay=0, commit_cnx=None): query = ('INSERT INTO `test_table` ' '(`id`,`uuid`,`key`,`value`,`cnt`) ' 'VALUES (NULL, %s, %s, %s, 1) ' 'ON DUPLICATE KEY UPDATE ' '`value`=VALUES(`value`),`cnt`=`cnt`+1') if delay > 0: time.sleep(delay) print('{}: insert ({}, {})'.format(uuid, key, val)) cursor.execute(query, (uuid, key, val)) if commit_cnx: commit_cnx.commit() def initialize(): cnx = mysql.connector.connect(**connect_opts) cursor = cnx.cursor() cursor.execute( 'CREATE DATABASE IF NOT EXISTS test_db ' 'DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_bin') cursor.execute('USE test_db') cursor.execute( 'CREATE TABLE IF NOT EXISTS `test_table` (' ' `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,' ' `uuid` varchar(255) COLLATE utf8mb4_bin NOT NULL,' ' `key` varchar(255) COLLATE utf8mb4_bin NOT NULL,' ' `value` mediumblob NOT NULL,' ' `cnt` int(10) unsigned NOT NULL DEFAULT 0,' ' PRIMARY KEY (`id`),' ' UNIQUE KEY `i1` (`uuid`,`key`),' ' KEY `i2` (`key`,`uuid`)' ') ENGINE=InnoDB AUTO_INCREMENT=1 ' 'DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ' 'ROW_FORMAT=DYNAMIC') cursor.execute('TRUNCATE test_table') insert(cursor, 'foo', 'key', 'val') insert(cursor, 'bar', 'key', 'val') cnx.commit() cursor.close() cnx.close() initialize() # test cnx_a = mysql.connector.connect(**connect_opts) cursor_a = cnx_a.cursor() cnx_b = mysql.connector.connect(**connect_opts) cursor_b = cnx_b.cursor() cursor_a.execute('USE test_db') cursor_b.execute('USE test_db') lock = ('SELECT * FROM `test_table` ' 'WHERE `uuid` = %s FOR UPDATE') cursor_a.execute(lock, ('foo', )) for r in cursor_a: print('foo: {}'.format(r)) cursor_b.execute(lock, ('bar', )) for r in cursor_b: print('bar: {}'.format(r)) insert(cursor_b, 'bar', 'key', 'val') t_a = threading.Thread(target=insert, args=(cursor_a, 'foo', 'key', 'val', 0, cnx_a)) t_b = threading.Thread(target=insert, args=(cursor_b, 'bar', 'key2', 'val', 1, cnx_b)) t_a.start() t_b.start() t_a.join() t_b.join() cursor_a.close() cursor_b.close() cnx_a.close() cnx_b.close()