import mysql.connector import random import datetime from concurrent.futures import ThreadPoolExecutor START_DATE = datetime.datetime(year=2024, month=5, day=1) # total_rows = 1000 TOTAL_ROWS = 5_000_000 COMMIT_EVERY = 5000 THREADS = 20 records_per_thread = int(TOTAL_ROWS/THREADS) start_time = datetime.datetime.now() def random_date(start, end): """Generate a random datetime between `start` and `end`""" return start + datetime.timedelta( # Get a random amount of seconds between `start` and `end` seconds=random.randint(0, int((end - start).total_seconds())), ) print("Generator started") # Establish a connection connection = mysql.connector.connect( host="benchmark-db-mysql", user="benchmark", password="benchmark", database="benchmark" ) get_device_ids_imeis = "select id, imei from device_tenant_id where tenant_id='t146989263'" device_ids = [] imeis = [] deviceid_imei_map = {} # Create a cursor cursor = connection.cursor() cursor.execute(get_device_ids_imeis) result = cursor.fetchall() for row in result: device_ids.append(row[0]) imeis.append(row[1]) deviceid_imei_map[row[0]] = row[1] cursor.execute(f"truncate table device_message") cursor.execute(f"truncate table sent_messages") cursor.close() connection.close() def insert_batch(worker_id, total_rows): try: counter = 0 total_inserted = 0 # Establish a connection conn = mysql.connector.connect( host="benchmark-db-mysql", user="benchmark", password="benchmark", database="benchmark" ) # Create a cursor cur = conn.cursor() for i in range(total_rows): idx = random.randint(0, len(get_device_ids_imeis) - 1) device_id = device_ids[idx] imei = deviceid_imei_map[device_id] msg_time = random_date(START_DATE, START_DATE+ datetime.timedelta(days=30)) cur.execute(f""" INSERT INTO device_message (message, IMEI, received_time) values('abc', '{imei}' , '{msg_time}') """) cur.execute(f""" INSERT INTO sent_messages(message_type, message, device_id, message_timestamp) values(0, 'abc' ,'{device_id}' , '{msg_time}') """) counter +=1 total_inserted +=1 if counter == COMMIT_EVERY: print(f"worker_id: {worker_id}, total inserted: {total_inserted} , time since started: {datetime.datetime.now() - start_time}") conn.commit() counter=0 cur.close() conn.close() print(f"woker_id: {worker_id} finished") except Exception as e: print(f"got exception {e}") with ThreadPoolExecutor(max_workers=THREADS) as executor: for i in range(THREADS): print(f"starting woker {i}") executor.submit(insert_batch, i, records_per_thread)