#include #include #include // Used for cout #include #include namespace std {} using namespace std; /** * Helper sleep function */ static void milliSleep(int milliseconds){ struct timeval sleeptime; sleeptime.tv_sec = milliseconds / 1000; sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; select(0, 0, 0, 0, &sleeptime); } /** * Helper sleep function */ #define PRINT_ERROR(code,msg) \ cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << endl #define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); } #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } struct Car { /** * Note memset, so that entire char-fields are cleared * as all 20 bytes are significant (as type is char) */ Car() { memset(this, 0, sizeof(* this)); } unsigned int reg_no; char brand[20]; char color[20]; }; /** * Function to create table */ int create_table(MYSQL &mysql) { while (mysql_query(&mysql, "CREATE TABLE" " garage" " (REG_NO INT UNSIGNED NOT NULL," " BRAND CHAR(20) NOT NULL," " COLOR CHAR(20) NOT NULL," " PRIMARY KEY (REG_NO))" " ENGINE=NDB")) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); cout << "MySQL Cluster already has example table: garage. " << "Dropping it..." << endl; /************** * Drop table * **************/ if (mysql_query(&mysql, "DROP TABLE garage")) MYSQLERROR(mysql); } return 1; } int read_one(Ndb * myNdb) { // Read one un-existing record int retryAttempt = 0; const int retryMax = 10; int check; NdbError err; NdbTransaction *myTrans; NdbOperation *myOp; /* Result of reading attribute values, three columns: REG_NO, BRAND, and COLOR */ NdbRecAttr * myRecAttr[3]; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("garage"); if (myTable == NULL) APIERROR(myDict->getNdbError()); /** * Loop as long as : * retryMax not reached * failed operations due to TEMPORARY erros * * Exit loop; * retyrMax reached * Permanent error (return -1) */ while (true) { if (retryAttempt >= retryMax) { cout << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return -1; } myTrans = myNdb->startTransaction(); if (myTrans == NULL) { const NdbError err = myNdb->getNdbError(); if (err.status == NdbError::TemporaryError) { milliSleep(50); retryAttempt++; continue; } cout << err.message << endl; return -1; } /* * Define an operation. * NDBAPI. */ myOp = myTrans->getNdbOperation(myTable); if (myOp == NULL) { cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); return -1; } /** * Read without lock, without being placed in lock queue */ if( myOp->readTuple(NdbOperation::LM_CommittedRead) == -1) { cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); return -1; } /** * Set read condition */ int column_reg_no = myTable->getColumn("REG_NO")->getColumnNo(); if (myOp->equal(column_reg_no, 100)) { cout << myOp->getNdbError().message << endl; myNdb->closeTransaction(myTrans); return -1; } /** * Define storage for fetched attributes. * E.g., the resulting attributes of executing * myOp->getValue("REG_NO") is placed in myRecAttr[0]. * No data exists in myRecAttr until transaction has commited! */ myRecAttr[0] = myOp->getValue("REG_NO"); myRecAttr[1] = myOp->getValue("BRAND"); myRecAttr[2] = myOp->getValue("COLOR"); if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL) { cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); return -1; } /** * Start reading */ if(myTrans->execute(NdbTransaction::NoCommit) != 0){ err = myTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } cout << err.code << endl; cout << myTrans->getNdbError().code << endl; myNdb->closeTransaction(myTrans); return -1; } cout << "myTrans->execute returned 0" << endl; /** * print REG_NO unsigned int */ cout << "REG_NO=" << myRecAttr[0]->u_32_value() << endl; /** * print BRAND character string */ cout << "BRAND=" << myRecAttr[1]->aRef() << endl; /** * print COLOR character string */ cout << "COLOR=" << myRecAttr[2]->aRef() << endl; myNdb->closeTransaction(myTrans); return 1; } return -1; } int main() { ndb_init(); MYSQL mysql; /************************************************************** * Connect to mysql server and create table * **************************************************************/ { if ( !mysql_init(&mysql) ) { cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 3306, "/tmp/mysql.sock", 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE test_db"); if (mysql_query(&mysql, "USE test_db") != 0) MYSQLERROR(mysql); create_table(mysql); } /************************************************************** * Connect to ndb cluster * **************************************************************/ Ndb_cluster_connection cluster_connection; if (cluster_connection.connect(4, 5, 1)) { cout << "Unable to connect to cluster within 30 secs." << endl; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb myNdb(&cluster_connection,"test_db"); if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb.getNdbError()); exit(-1); } /******************************************* * Check table definition * *******************************************/ const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *t= myDict->getTable("garage"); Car car; if (t->getColumn("COLOR")->getLength() != sizeof(car.color) || t->getColumn("BRAND")->getLength() != sizeof(car.brand)) { cout << "Wrong table definition" << endl; exit(-1); } if(read_one(&myNdb) > 0) cout << "read_one: Success!" << endl << endl; return 0; }