/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package com.euroccp.tps.threads; import com.euroccp.tps.common.logging.TPSMessages; import com.euroccp.tps.entities.facade.FixTradeFacade; import com.euroccp.tps.entities.facade.InstructionsFacade; import com.euroccp.tps.entities.facade.PositionsFacade; import com.euroccp.tps.entities.mapping.FixTradeData; import com.euroccp.tps.main.TPS; import static com.euroccp.tps.statics.TPSStatics.tpsLogger; import com.mysql.clusterj.ClusterJDatastoreException; import com.mysql.clusterj.ClusterJUserException; import com.mysql.clusterj.Session; import com.mysql.clusterj.SessionFactory; import java.math.BigDecimal; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; import org.ffpojo.FFPojoHelper; /** * * @author Administrator */ public class MySQLClusterJ implements Runnable { private final int threadNumber; private final Random rand = new Random(); private FFPojoHelper ffpojo; private Session session; private FixTradeFacade fixTradeFacade; private PositionsFacade positionsFacade; private InstructionsFacade instructionsFacade; public MySQLClusterJ(Session session, int threadNumber) { this.session = session; this.threadNumber = threadNumber; } @Override public void run() { Thread.currentThread().setName("TPS-Worker-" + threadNumber); processCommand(); } private void processCommand() { try { ffpojo = FFPojoHelper.getInstance(); fixTradeFacade = new FixTradeFacade(); positionsFacade = new PositionsFacade(); instructionsFacade = new InstructionsFacade(); //Started successfully tpsLogger.info(TPSMessages.getMessage("TPS0008", threadNumber)); long start_time = System.currentTimeMillis(); for (Integer key : TPS.testData.keySet()) { String fixTrade = TPS.testData.remove(key); if (fixTrade != null) { int attempt = 1; while (attempt <= 2) { try { session.currentTransaction().begin(); FixTradeData trade = ffpojo.createFromText(FixTradeData.class, fixTrade); trade.setLastPx("1"); trade.setId(trade.getMsgSeqNum()); //Add record fixTradeFacade.insertRecordClusterJ(session, trade); //Positions positionsFacade.updateRecordClusterJ(session, rand.nextInt((100 - 1) + 1) + 1, new BigDecimal(trade.getLastPx())); //Instructions instructionsFacade.updateRecordClusterJ(session, rand.nextInt((10 - 1) + 1) + 1, new BigDecimal(trade.getLastPx())); session.currentTransaction().commit(); break; } catch (ClusterJDatastoreException ex) { if (session.currentTransaction().isActive()) { session.currentTransaction().rollback(); } if (attempt == 2) { tpsLogger.error(ex.getMessage(), ex); } attempt++; } /*catch (ClusterJUserException ex) { tpsLogger.error(ex.getMessage(), ex); session.currentTransaction().rollback(); attempt++; }*/ catch (Exception ex) { tpsLogger.error(ex.getMessage(), ex); if (session.currentTransaction().isActive()) { session.currentTransaction().rollback(); } //attempt++; break; } } } } String message = "It took:" + (System.currentTimeMillis() - start_time) + " ms"; System.out.println(message); tpsLogger.info(message); tpsLogger.info(TPSMessages.getMessage("TPS0009", threadNumber)); } catch (Exception ex) { Logger.getLogger(MySQLClusterJ.class.getName()).log(Level.SEVERE, null, ex); } } }