#include <mysql.h>
#include <mysqld_error.h>
#include <NdbApi.hpp>
// Used for cout
#include <iostream.h>
#include <stdio.h>

namespace std {} using namespace std;

/**
 * Helper sleep function
 */
static void
milliSleep(int milliseconds){
  struct timeval sleeptime;
  sleeptime.tv_sec = milliseconds / 1000;
  sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
  select(0, 0, 0, 0, &sleeptime);
}


/**
 * Helper sleep function
 */
#define PRINT_ERROR(code,msg) \
  cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
            << ", code: " << code \
            << ", msg: " << msg << "." << endl
#define MYSQLERROR(mysql) { \
  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
  exit(-1); }
#define APIERROR(error) { \
  PRINT_ERROR(error.code,error.message); \
  exit(-1); }

struct Car 
{
  /**
   * Note memset, so that entire char-fields are cleared
   *   as all 20 bytes are significant (as type is char)
   */
  Car() { memset(this, 0, sizeof(* this)); }
  
  unsigned int reg_no;
  char brand[20];
  char color[20];
};

/**
 * Function to create table
 */
int create_table(MYSQL &mysql) 
{
  while (mysql_query(&mysql, 
		  "CREATE TABLE"
		  "  garage"
		  "    (REG_NO INT UNSIGNED NOT NULL,"
		  "     BRAND CHAR(20) NOT NULL,"
		  "     COLOR CHAR(20) NOT NULL,"
		  "     PRIMARY KEY (REG_NO))"
		  "  ENGINE=NDB"))
  {
    if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
      MYSQLERROR(mysql);
    cout << "MySQL Cluster already has example table: garage. "
	      << "Dropping it..." << endl; 
    /**************
     * Drop table *
     **************/
    if (mysql_query(&mysql, "DROP TABLE garage"))
      MYSQLERROR(mysql);
  }
  return 1;
}

int read_one(Ndb * myNdb)
{
  // Read one un-existing record
  int                  retryAttempt = 0;
  const int            retryMax = 10;
  int check;
  NdbError              err;
  NdbTransaction	*myTrans;
  NdbOperation	        *myOp;
  /* Result of reading attribute values, three columns:
     REG_NO, BRAND, and COLOR
   */
  NdbRecAttr *    	myRecAttr[3];   

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("garage");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  /**
   * Loop as long as :
   *  retryMax not reached
   *  failed operations due to TEMPORARY erros
   *
   * Exit loop;
   *  retyrMax reached
   *  Permanent error (return -1)
   */
  while (true)
  {

    if (retryAttempt >= retryMax)
    {
      cout << "ERROR: has retried this operation " << retryAttempt 
		<< " times, failing!" << endl;
      return -1;
    }

    myTrans = myNdb->startTransaction();
    if (myTrans == NULL) 
    {
      const NdbError err = myNdb->getNdbError();

      if (err.status == NdbError::TemporaryError)
      {
	milliSleep(50);
	retryAttempt++;
	continue;
      }
      cout << err.message << endl;
      return -1;
    }
    /*
     * Define an operation. 
     * NDBAPI.
     */
    myOp = myTrans->getNdbOperation(myTable);
    if (myOp == NULL) 
    {
      cout << myTrans->getNdbError().message << endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    /**
     * Read without lock, without being placed in lock queue
     */
    if( myOp->readTuple(NdbOperation::LM_CommittedRead) == -1)
    {
      cout << myTrans->getNdbError().message << endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    /**
     * Set read condition
     */
    int column_reg_no = myTable->getColumn("REG_NO")->getColumnNo();
    if (myOp->equal(column_reg_no, 100))
    {
      cout << myOp->getNdbError().message << endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    /**
     * Define storage for fetched attributes.
     * E.g., the resulting attributes of executing
     * myOp->getValue("REG_NO") is placed in myRecAttr[0].
     * No data exists in myRecAttr until transaction has commited!
     */
    myRecAttr[0] = myOp->getValue("REG_NO");
    myRecAttr[1] = myOp->getValue("BRAND");
    myRecAttr[2] = myOp->getValue("COLOR");
    if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL) 
    {
	cout << myTrans->getNdbError().message << endl;
	myNdb->closeTransaction(myTrans);
	return -1;
    }

    /**
     * Start reading
     */     
    if(myTrans->execute(NdbTransaction::NoCommit) != 0){      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
	cout << myTrans->getNdbError().message << endl;
	myNdb->closeTransaction(myTrans);
	milliSleep(50);
	continue;
      }
      cout << err.code << endl;
      cout << myTrans->getNdbError().code << endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    cout << "myTrans->execute returned 0" << endl;
    
    /**
     * print  REG_NO unsigned int
     */
    cout << "REG_NO=" << myRecAttr[0]->u_32_value() << endl;

    /**
     * print  BRAND character string
     */
    cout << "BRAND=" << myRecAttr[1]->aRef() << endl;

    /**
     * print  COLOR character string
     */
    cout << "COLOR=" << myRecAttr[2]->aRef() << endl;

    myNdb->closeTransaction(myTrans);
    return 1;
  }
  return -1;
}


int main()
{
  ndb_init();
  MYSQL mysql;

  /**************************************************************
   * Connect to mysql server and create table                   *
   **************************************************************/
  {
    if ( !mysql_init(&mysql) ) {
      cout << "mysql_init failed\n";
      exit(-1);
    }
    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
			     3306, "/tmp/mysql.sock", 0) )
      MYSQLERROR(mysql);

    mysql_query(&mysql, "CREATE DATABASE test_db");
    if (mysql_query(&mysql, "USE test_db") != 0) MYSQLERROR(mysql);

    create_table(mysql);
  }

  /**************************************************************
   * Connect to ndb cluster                                     *
   **************************************************************/

  Ndb_cluster_connection cluster_connection;
  if (cluster_connection.connect(4, 5, 1))
  {
    cout << "Unable to connect to cluster within 30 secs." << endl;
    exit(-1);
  }
  // Optionally connect and wait for the storage nodes (ndbd's)
  if (cluster_connection.wait_until_ready(30,0) < 0)
  {
    cout << "Cluster was not ready within 30 secs.\n";
    exit(-1);
  }

  Ndb myNdb(&cluster_connection,"test_db");
  if (myNdb.init(1024) == -1) {      // Set max 1024  parallel transactions
    APIERROR(myNdb.getNdbError());
    exit(-1);
  }

  /*******************************************
   * Check table definition                  *
   *******************************************/
  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
  const NdbDictionary::Table *t= myDict->getTable("garage");

  Car car;
  if (t->getColumn("COLOR")->getLength() != sizeof(car.color) ||
      t->getColumn("BRAND")->getLength() != sizeof(car.brand))
  {
    cout << "Wrong table definition" << endl;
    exit(-1);
  }

  if(read_one(&myNdb) > 0)
    cout << "read_one: Success!" << endl  << endl;
  
  return 0;
}
