package com.richards.connector; import java.io.File; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import javax.sql.rowset.serial.SerialBlob; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; public class PreInsert { private Connector connector; private Statement responseStatement; private PreparedStatement requestStatement; private Connection connection; private String insertSQL = "insert into request (requestTimeStamp, clientSeqID, systemSeqID, sendingTimeStamp, clientID, supplierID, clientAddress, destinationAddress,msgProp1, msgProp2, msgProp3, msgProp4, msgProp5, msgProp6, msgProp7, msgProp8,retryAttempt, msg)values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; private int insertCount = 0; private RequestInsertThread requestInsertThread; private Thread requestGenerateThread; private Thread responseGenerateThread; private ResponseInsertThread responseInsertThread; private static Logger logger = Logger.getLogger(PreInsert.class); public PreInsert() throws Exception { connector = Connector.getConnector(); connection = connector.getConnection(); connection.setAutoCommit(false); requestStatement = connection.prepareStatement(insertSQL); responseStatement = connection.createStatement(); requestInsertThread = new RequestInsertThread(10000); requestGenerateThread = new Thread(requestInsertThread); requestGenerateThread.start(); responseInsertThread = new ResponseInsertThread(10000); responseGenerateThread = new Thread(responseInsertThread); responseGenerateThread.start(); } public void saveRequest(long requestTimeStamp, int clientSeqID, int systemSeqID, long sendingTimeStamp, int clientID, int supplierID, String clientAddress, String destinationAddress, byte msgProp1, byte msgProp2, byte msgProp3, byte msgProp4, byte msgProp5, byte msgProp6, String msgProp7, byte msgProp8, byte[] msg) { try { requestStatement.setTimestamp(1, new Timestamp(requestTimeStamp)); requestStatement.setInt(2, clientSeqID); requestStatement.setInt(3, systemSeqID); requestStatement.setTimestamp(4, new Timestamp(sendingTimeStamp)); requestStatement.setInt(5, clientID); requestStatement.setInt(6, supplierID); requestStatement.setString(7, clientAddress); requestStatement.setString(8, destinationAddress); requestStatement.setByte(9, msgProp1); requestStatement.setByte(10, msgProp2); requestStatement.setByte(11, msgProp3); requestStatement.setByte(12, msgProp4); requestStatement.setByte(13, msgProp5); requestStatement.setByte(14, msgProp6); requestStatement.setString(15, msgProp7); requestStatement.setByte(16, msgProp8); requestStatement.setInt(17, 0); requestStatement.setBlob(18, new SerialBlob(msg)); requestStatement.addBatch(); logger.info("Added request message to the batch successfully"); if (++insertCount > connector.getBatchSize()) { saveBatch(); } } catch (SQLException e) { logger.fatal("Exception while executing " + insertSQL + " values " + requestTimeStamp + ", " + clientSeqID + ", " + systemSeqID + ", " + sendingTimeStamp + ", " + clientID + ", " + supplierID + ", " + clientAddress + ", " + destinationAddress + ", " + msgProp1 + ", " + msgProp2 + ", " + msgProp3 + ", " + msgProp4 + ", " + msgProp5 + ", " + msgProp6 + ", " + msgProp7 + ", " + msgProp8); } } public synchronized void saveBatch() { if (insertCount > 0) { int batchCount = 0; try { batchCount = requestStatement.executeBatch().length; batchCount = responseStatement.executeBatch().length; } catch (SQLException e) { logger.fatal( "Exception while executing batch of requestStatement", e); } try { connection.commit(); } catch (SQLException e) { logger .fatal("Exception while commiting using the statement", e); } if (batchCount > 0) { logger.info("Batch Insert Count:" + batchCount); } insertCount = 0; } } private class RequestInsertThread implements Runnable { private int recordCount; private boolean loop = true; public void setLoop(boolean loop) { this.loop = loop; } public RequestInsertThread(int requestCount) { this.recordCount = requestCount; } @Override public void run() { byte[] array = "qwertyuioplkjhgfdsazxcvbnm1123456677880poiuytiheqjehkqwheqhwehqkesdmnfbsfbksjdfhsdsfjsp" .getBytes(); for (int i = 0; i < recordCount && loop; i++) { saveRequest(System.currentTimeMillis(), i, i + 1, System .currentTimeMillis(), 2, 3, "indddd", " ktmmmm", (byte) 0x1, (byte) 0x01, (byte) 0x01, (byte) 0x01, (byte) 0x01, (byte) 0x01, "12345", (byte) 0x01, array); } saveBatch(); logger.info("Completed RequestInsertThread"); } } private class ResponseInsertThread implements Runnable { private int recordCount; private boolean loop = true; public void setLoop(boolean loop) { this.loop = loop; } public ResponseInsertThread(int recordCount) { super(); this.recordCount = recordCount; } @Override public void run() { for (int i = 0; i < recordCount && loop; i++) { saveResponse(2, i, 4, System.currentTimeMillis(), "msg" + i, 200); } saveBatch(); logger.info("Completed ResponseInsertThread"); } } public void saveResponse(int clientID, int systemSeqID, int supplierID, long responseTimeStamp, String msgID, int status) { String query = null; try { query = "insert into response(clientID,systemSeqID,supplierID,responseTimeStamp,msgID,status) values(" + clientID + "," + systemSeqID + "," + supplierID + ",'" + new Timestamp(responseTimeStamp) + "'" + ",'" + msgID + "'" + "," + status + ")"; responseStatement.addBatch(query); logger.info("Added the response query to batch successfully"); if (++insertCount > connector.getBatchSize()) { saveBatch(); } } catch (SQLException e) { logger.fatal( "Exception while adding the response query into the batch. Query:" + query, e); } } public void closeResources() { if (requestInsertThread != null) { requestInsertThread.setLoop(false); requestInsertThread = null; } if (requestGenerateThread != null) { requestGenerateThread = null; } if (responseInsertThread != null) { responseInsertThread.setLoop(false); responseInsertThread = null; } if (responseGenerateThread != null) { responseGenerateThread = null; } if (connection != null) { try { connection.commit(); connection.close(); } catch (SQLException e) { System.out.println("Exception while closing the connection"); } finally { connection = null; } } } public static void main(String[] args) { DOMConfigurator.configure("." + File.separator + "config" + File.separator + "log4j.xml"); PreInsert insert = null; Runtime.getRuntime().addShutdownHook(new Shutdownhook(insert)); try { insert = new PreInsert(); } catch (Exception e) { logger.info("Exception while initializing the insert threads", e); } try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { // TODO Auto-generated catch block logger.info("Exception while sleeping in the main thread", e); } } public static class Shutdownhook extends Thread { PreInsert insert = null; public Shutdownhook(PreInsert insert) { this.insert = insert; } public void run() { insert.closeResources(); logger.info("Closed resources in shutdownhook"); } } }