#include <mysql.h>
#include <NdbApi.hpp>
// Used for cout
#include <stdio.h>
#include <iostream.h>
#include <pthread.h>
#include <sys/time.h>

namespace std {} using namespace std;

static void
milliSleep(int milliseconds){
  struct timeval sleeptime;
  sleeptime.tv_sec = milliseconds / 1000;
  sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
  select(0, 0, 0, 0, &sleeptime);
}

#define PRINT_ERROR(code,msg) \
  cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
            << ", code: " << code \
            << ", msg: " << msg << "." << endl
#define MYSQLERROR(mysql) { \
  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
  exit(-1); }
#define APIERROR(error) { \
  PRINT_ERROR(error.code,error.message); \
  exit(-1); }

static void create_table(MYSQL &mysql);
static void populate(Ndb * myNdb);
static void* _thread_run(void* arg);

int main()
{
  // ndb_init must be called first
  ndb_init();

  // connect to mysql server and cluster and run application
  {
    // Object representing the cluster
    Ndb_cluster_connection cluster_connection;

    // Connect to cluster management server (ndb_mgmd)
    if (cluster_connection.connect(4 /* retries               */,
				   5 /* delay between retries */,
				   1 /* verbose               */))
    {
      cout << "Cluster management server was not ready within 30 secs.\n";
      exit(-1);
    }

    // Optionally connect and wait for the storage nodes (ndbd's)
    if (cluster_connection.wait_until_ready(30,0) < 0)
    {
      cout << "Cluster was not ready within 30 secs.\n";
      exit(-1);
    }

    // connect to mysql server
    MYSQL mysql;
    if ( !mysql_init(&mysql) ) {
      cout << "mysql_init failed\n";
      exit(-1);
    }
    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
			     3306, "/tmp/mysql.sock", 0) )
      MYSQLERROR(mysql);
    
    mysql_query(&mysql, "CREATE DATABASE test_db_1");
    if (mysql_query(&mysql, "USE test_db_1") != 0) MYSQLERROR(mysql);
    create_table(mysql);

    // Connect to database via NdbApi
    Ndb myNdb( &cluster_connection, "test_db_1" );
    if (myNdb.init()) APIERROR(myNdb.getNdbError());

    // Connect to database via NdbApi
    Ndb myNdb2( &cluster_connection, "test_db_1" );
    if (myNdb2.init()) APIERROR(myNdb.getNdbError());

    populate(&myNdb);

    pthread_t thread1;
    int retCode = pthread_create(&thread1, NULL, _thread_run, &myNdb);
    if (retCode != 0)
    {
        cout << "Failed to create a thread 1. Error code " << retCode << endl;
        exit(-1);
    }

    pthread_t thread2;
    retCode = pthread_create(&thread2, NULL, _thread_run, &myNdb2);
    if (retCode != 0)
    {
        cout << "Failed to create a thread 2. Error code " << retCode << endl;
        exit(-1);
    }

    // Wait until both threads finish
    void* value_ptr;
    pthread_join(thread1, &value_ptr);
    pthread_join(thread2, &value_ptr);
    cout << "Both threads finished" << endl;
  }

  ndb_end(0);

  cout << "\nTo drop created table use:\n"
	    << "echo \"drop table mytablename\" | mysql test_db_1 -u root\n";

  return 0;
}

/*********************************************************
 * Create a table named mytablename if it does not exist *
 *********************************************************/
static void create_table(MYSQL &mysql)
{
  if (mysql_query(&mysql, 
		  "CREATE TABLE IF NOT EXISTS"
		  "  mytablename"
		  "    (ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,"
		  "     ATTR2 INT UNSIGNED NOT NULL)"
		  "  ENGINE=NDB"))
    MYSQLERROR(mysql);
}

void populate(Ndb * myNdb)
{
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("mytablename");
  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  // Populate table
  cout << "Populating the table" << endl;
  for (int i = 0; i < 1000; i++)
  {
    NdbTransaction *myTransaction= myNdb->startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
    
    NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
    if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
    
    myOperation->writeTuple();
    myOperation->equal("ATTR1", i);
    myOperation->setValue("ATTR2", 0);

    if (myTransaction->execute(NdbTransaction::Commit) == -1)
      APIERROR(myTransaction->getNdbError());
    
    myNdb->closeTransaction(myTransaction);
  }
}

void* _thread_run(void* arg)
{
  Ndb * myNdb = (Ndb*)arg;

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("mytablename");
  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());
  int attr2ColumnId = myTable->getColumn("ATTR2")->getColumnNo();
  const NdbDictionary::Index *myIndex= myDict->getIndex("PRIMARY", "mytablename");
  if (myIndex == NULL) 
    APIERROR(myDict->getNdbError());

  for (int i = 0; i < 100; i++)
  {
    NdbTransaction *myTransaction= myNdb->startTransaction();
    if (myTransaction == NULL)
      APIERROR(myNdb->getNdbError());

    NdbIndexScanOperation *myScanOp = myTransaction->getNdbIndexScanOperation(myIndex);
    if (myScanOp == NULL)
      APIERROR(myTransaction->getNdbError());

    if( myScanOp->readTuples(NdbOperation::LM_Exclusive, NdbScanOperation::SF_OrderBy) == -1)
      APIERROR(myScanOp->getNdbError());

    // Set the filter
    /*
    NdbScanFilter filter(myScanOp);
    if (filter.begin() == -1)
      APIERROR(myScanOp->getNdbError());
    Uint32 zero = 0;
    if (filter.cmp(NdbScanFilter::COND_EQ, attr2ColumnId, &zero, sizeof(zero)) < 0)
      APIERROR(myScanOp->getNdbError());
    if (filter.end() == -1)
      APIERROR(myScanOp->getNdbError());
    */

    NdbRecAttr *    	myRecAttr[2];
    myRecAttr[0] = myScanOp->getValue("ATTR1");
    myRecAttr[1] = myScanOp->getValue("ATTR2");
    if(myRecAttr[0] == NULL || myRecAttr[1] == NULL)
        APIERROR(myScanOp->getNdbError());

    // Start scan
    cout << "Thread: " << pthread_self() << " - starting scan" << endl;
    if(myTransaction->execute(NdbTransaction::NoCommit) != 0)
    {
      NdbError err = myTransaction->getNdbError();
      if(err.status == NdbError::TemporaryError)
      {
        cout << "Thread: " << pthread_self() << " - " << myTransaction->getNdbError().message << endl;
	myNdb->closeTransaction(myTransaction);
	milliSleep(50);
	continue;
      }
      APIERROR(err);
    }

    // Do nextResult only once
    if (myScanOp->nextResult(true) < 0)
      APIERROR(myScanOp->getNdbError());

    // Update only ONE record using another transaction
    NdbTransaction* updateTrans = myNdb->startTransaction();
    if (updateTrans == NULL)
      APIERROR(myNdb->getNdbError());

    NdbOperation *updateOp = myScanOp->updateCurrentTuple(updateTrans);
    if (updateOp == 0)
      APIERROR(myScanOp->getNdbError());

    updateOp->setValue(attr2ColumnId, (Uint32)1);

    // Execute the update transaction
    cout << "Thread: " << pthread_self() << " - updating record " << myRecAttr[0]->u_32_value() << endl;
    if (updateTrans->execute(NdbTransaction::Commit) == -1)
      APIERROR(updateTrans->getNdbError());

    // Close the update transaction
    myNdb->closeTransaction(updateTrans);

    struct timeval startTime;
    gettimeofday(&startTime, NULL);
    cout << "Thread: " << pthread_self() << " - closing the scan" << endl;
    myScanOp->close(true, true);
    struct timeval endTime;
    gettimeofday(&endTime, NULL);
    int period = (endTime.tv_sec - startTime.tv_sec) * 1000 + (endTime.tv_usec - startTime.tv_usec) / 1000;
    cout << "Thread: " << pthread_self() << " - the scan is closed in " << period << " ms" << endl;

    // Close the scan transaction
    
    //cout << "Thread: " << pthread_self() << " - closing scan transaction" << endl;
    myNdb->closeTransaction(myTransaction);
  }
  return 0;
}
