package POC;

import java.nio.ByteBuffer;

import com.mysql.ndbjtie.ndbapi.Ndb;
import com.mysql.ndbjtie.ndbapi.NdbDictionary;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.Event;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.EventConst;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
import com.mysql.ndbjtie.ndbapi.NdbEventOperation;
import com.mysql.ndbjtie.ndbapi.NdbRecAttr;
import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection;

public class Atomizer {
	private static final String EVENT_NAME = "event";
	private static final boolean MERGE_EVENTS = false;
	private static final String connectionString = "localhost:1186";
	private static final String tableName = "simples";
	
	private static NdbRecAttr[] recAttrs;
	private static String[] eventColumnNames;
	
	private static boolean createEvent(
		Ndb ndb, 
		String eventName, 
		String eventTableName, 
		boolean mergeEvents) {
		
		Dictionary dictionary = ndb.getDictionary();
		if (dictionary == null) {
			System.out.println(ndb.getNdbError().message());
			return false;
		}
		TableConst table = dictionary.getTable(eventTableName);
		if (table == null) {
			System.out.println(dictionary.getNdbError().message());
			return false;
		}
		
		eventColumnNames = new String[table.getNoOfColumns()];
		recAttrs = new NdbRecAttr[table.getNoOfColumns()];
		
		for (int i = 0; i < table.getNoOfColumns(); i++) {
			eventColumnNames[i] = table.getColumn(i).getName();
		}
		
		Event event = Event.create(eventName, table);
		event.addTableEvent(EventConst.TableEvent.TE_ALL);
		for (int i = 0; i < eventColumnNames.length; i++)
			event.addEventColumn(eventColumnNames[i]);
		event.mergeEvents(mergeEvents);
		
		if (dictionary.createEvent(event) != 0) {
			if (dictionary.getNdbError().classification() == NdbErrorConst.Classification.SchemaObjectExists) {
				System.out.println("Event exists.");
				if (dictionary.dropEvent(eventName, 0) != 0)
					System.out.println(dictionary.getNdbError().message());
				if (dictionary.createEvent(event) != 0)
					System.out.println(dictionary.getNdbError().message());
				return true;
			} else
				System.out.println(dictionary.getNdbError().message());
			return false;
		}
		
		return true;
	}

	public static void main(String[] args) {
		System.loadLibrary("ndbclient");
		System.loadLibrary("libmysql");
		
		Ndb_cluster_connection clusterConnection = Ndb_cluster_connection.create(connectionString);
		if (clusterConnection.connect(5, 3, 1) != 0) {
			System.out.println("Cluster connect failed.");
			System.exit(1);
		}
		
		if (clusterConnection.wait_until_ready(30, 30) != 0) {
			System.out.println("Cluster not ready.");
			System.exit(1);
		}
		
		Ndb ndb = Ndb.create(clusterConnection, "clusterdb", "def");
		ndb.init(1024);
		if (!createEvent(ndb, EVENT_NAME, tableName, MERGE_EVENTS))
			System.exit(1);
		
		NdbEventOperation eventOp = ndb.createEventOperation(EVENT_NAME);
		if (eventOp == null) {
			System.out.println(ndb.getNdbError().message());
			System.exit(1);
		}
		eventOp.mergeEvents(MERGE_EVENTS);
		
		ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1000);
		for (int i = 0; i < recAttrs.length; i++) 
			recAttrs[i] = eventOp.getValue(eventColumnNames[i], byteBuffer);
		
		if (eventOp.execute() != 0) {
			System.out.println(ndb.getNdbError().message());
			System.exit(1);
		}
		
		NdbEventOperation operation = eventOp;
		while (true) {
			int result = ndb.pollEvents(1000, new long[] {0});
			
			if (result < 0) {
				System.out.println("Polling error. " + result);
				break;
			}
			if (result > 0) {
				while ((eventOp = ndb.nextEvent()) != null) {
					switch (eventOp.getEventType()) {
						case NdbDictionary.EventConst.TableEvent.TE_INSERT:
							System.out.print("INSERT");
							break;
						case NdbDictionary.EventConst.TableEvent.TE_DELETE:
							System.out.println("DELETE");
							break;
						case NdbDictionary.EventConst.TableEvent.TE_UPDATE:
							System.out.println("UPDATE");
							break;
						default:
							System.out.println("Invalid event.");
							break;
					}
				}
				for (int i = 0; i < eventColumnNames.length; i++) {
					NdbRecAttr recAttr = recAttrs[i];
					if (recAttr.isNULL() == 0) {
						System.out.println(eventColumnNames[i]);
						System.out.println(recAttr.get_size_in_bytes());
					}
				}
			} 
		}
		
		ndb.dropEventOperation(operation);
		Dictionary dictionary = ndb.getDictionary();
		if (dictionary == null)
			System.out.println(ndb.getNdbError().message());
		else
			dictionary.dropEvent(EVENT_NAME, 0);

		Ndb.delete(ndb);
		Ndb_cluster_connection.delete(clusterConnection);
		
	}
}
