# 用串行的调度模拟并发,线程用于处理阻塞情况 import re import os import pymysql # MySQL 数据库连接参数 mysql_config = { 'host': '127.0.0.1', 'user': 'root', 'database': 'test0', 'password': '', 'port': 3308 } table_name = 'tWepKvSE' create_sql = """CREATE TABLE tWepKvSE (ID INT AUTO_INCREMENT UNIQUE, VAL INT, c0 BIGINT UNIQUE KEY NULL);""" data_sql = [ "INSERT INTO tWepKvSE (VAL, c0) VALUES (44854, NULL);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (50744, -2134275173);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (39591, -1008914785);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (58114, -351361077);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (32424, -235958547);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (81150, -211170753);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (54994, 47626112);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (41409, 69331423);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (15114, 397883104);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (27305, 515927826);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (32383, 689941044);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (83580, 737030094);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (88740, 813918762);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (92769, 964530977);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (19406, 1144228744);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (67933, 1369607908);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (93422, 1794799621);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (72374, 1938044707);", "INSERT INTO tWepKvSE (VAL, c0) VALUES (17350, 1982217668);" ] schedule = [1,2,1,2,1,2,1,2] Txn1 = [ """BEGIN;""", """SELECT *, ID AS read_id, VAL AS read_value, CURRENT_TIMESTAMP(6) AS read_timestamp, CONNECTION_ID() AS session_id FROM tWepKvSE WHERE (NOT (0)) OR (c0 >= -1726137651 AND c0 <= -671629208) ORDER BY tWepKvSE.c0;""", """UPDATE tWepKvSE SET VAL = (ROUND(RAND() * 100000)), c0 = LEAST(889401561, 1323017564) WHERE (CAST(-1139874939 AS CHAR)) IN (CAST(0 AS CHAR)) OR (c0 = 1938044707);""", """COMMIT;""" ] Txn2 = [ """BEGIN;""", """SELECT *, ID AS read_id, VAL AS read_value, CURRENT_TIMESTAMP(6) AS read_timestamp, CONNECTION_ID() AS session_id FROM tWepKvSE WHERE (0) AND (0) OR (c0 = 1938044707);""", """DELETE FROM tWepKvSE WHERE FALSE OR (c0 >= -1726137651 AND c0 <= -671629208);""", """COMMIT;""" ] def create_table(): try: conn = pymysql.connect(**mysql_config) cur = conn.cursor() cur.execute("DROP TABLE IF EXISTS " + table_name) cur.execute(create_sql) cur.close() conn.commit() conn.close() except Exception as e: error_message = f"Create Error: {e}" print(error_message) return error_message def execute_data_sql(): for sql in data_sql: try: conn = pymysql.connect(**mysql_config) cur = conn.cursor() cur.execute(sql) cur.close() conn.commit() conn.close() except Exception as e: error_message = f"Error: {e}" print("Init:" + error_message) print(sql) def execute_transactions(): transactions = [Txn1, Txn2] indices = [0] * len(transactions) connections = [pymysql.connect(**mysql_config) for _ in transactions] execution_results = [] read_results = [] error_messages = [] def execute_transaction(transaction, idx, conn): cursor = conn.cursor() try: if idx < len(transaction): stmt = transaction[idx] try: cursor.execute(stmt) result = cursor.fetchall() if result: read_results.append((stmt, result)) execution_results.append((stmt, result)) print((stmt, result)) except Exception as err: error_message = f"Error executing statement: {stmt}, Error: {err}" execution_results.append((stmt, error_message)) print((stmt, error_message)) error_messages.append(error_message) return False else: message = f"Transaction {transaction} has no statement at index {idx}" print(message) error_messages.append(message) return False except Exception as e: print(f"Unexpected error: {e}") return False finally: cursor.close() return True try: for conn in connections: conn.begin() conn.cursor().execute("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE") for s in schedule: transaction_idx = s - 1 if transaction_idx < len(transactions): conn = connections[transaction_idx] success = execute_transaction(transactions[transaction_idx], indices[transaction_idx], conn) indices[transaction_idx] += 1 for conn in connections: conn.commit() except Exception as e: print(f"Transaction execution failed: {e}") for conn in connections: conn.commit() finally: for conn in connections: conn.close() return execution_results, error_messages, read_results create_table() execute_data_sql() execute_transactions()