#include #include #include // Used for cout #include #include namespace std {} using namespace std; /** * Helper sleep function */ static void milliSleep(int milliseconds){ struct timeval sleeptime; sleeptime.tv_sec = milliseconds / 1000; sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; select(0, 0, 0, 0, &sleeptime); } /** * Helper sleep function */ #define PRINT_ERROR(code,msg) \ cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << endl #define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); } #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } struct Car { /** * Note memset, so that entire char-fields are cleared * as all 20 bytes are significant (as type is char) */ Car() { memset(this, 0, sizeof(* this)); } unsigned int reg_no; char brand[20]; char color[20]; }; /** * Function to create table */ int create_table(MYSQL &mysql) { while (mysql_query(&mysql, "CREATE TABLE" " garage" " (REG_NO INT UNSIGNED NOT NULL," " BRAND CHAR(20) NOT NULL," " COLOR CHAR(20) NOT NULL," " PRIMARY KEY (REG_NO))" " ENGINE=NDB")) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); cout << "MySQL Cluster already has example table: garage. " << "Dropping it..." << endl; /************** * Drop table * **************/ if (mysql_query(&mysql, "DROP TABLE garage")) MYSQLERROR(mysql); } return 1; } int populate(Ndb * myNdb) { int i; Car cars[150]; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("garage"); if (myTable == NULL) APIERROR(myDict->getNdbError()); /** * Fifty blue mercedes */ for (i = 0; i < 50; i++) { cars[i].reg_no = i; sprintf(cars[i].brand, "Mercedes"); sprintf(cars[i].color, "Blue"); } /** * Fifty black bmw */ for (i = 50; i < 100; i++) { cars[i].reg_no = i; sprintf(cars[i].brand, "BMW"); sprintf(cars[i].color, "Black"); } /** * Fifty pink toyotas */ for (i = 100; i < 150; i++) { cars[i].reg_no = i; sprintf(cars[i].brand, "Toyota"); sprintf(cars[i].color, "Pink"); } for (i = 0; i < 150; i++) { NdbTransaction* myTrans = myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); myNdbOperation->insertTuple(); myNdbOperation->equal("REG_NO", cars[i].reg_no); myNdbOperation->setValue("BRAND", cars[i].brand); myNdbOperation->setValue("COLOR", cars[i].color); int check = myTrans->execute(NdbTransaction::Commit); myTrans->close(); if (check == -1) return false; } } int scan_print(Ndb * myNdb) { // Scan all records exclusive and update // them one by one int retryAttempt = 0; const int retryMax = 10; int fetchedRows = 0; int check; NdbError err; NdbTransaction *myTrans; NdbIndexScanOperation *myScanOp; /* Result of reading attribute value, three columns: REG_NO, BRAND, and COLOR */ NdbRecAttr * myRecAttr[3]; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("garage"); if (myTable == NULL) APIERROR(myDict->getNdbError()); const NdbDictionary::Index *myIndex = myDict->getIndex("PRIMARY", "garage"); if (myIndex == NULL) APIERROR(myDict->getNdbError()); /** * Loop as long as : * retryMax not reached * failed operations due to TEMPORARY erros * * Exit loop; * retyrMax reached * Permanent error (return -1) */ while (true) { if (retryAttempt >= retryMax) { cout << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; return -1; } myTrans = myNdb->startTransaction(); if (myTrans == NULL) { const NdbError err = myNdb->getNdbError(); if (err.status == NdbError::TemporaryError) { milliSleep(50); retryAttempt++; continue; } cout << err.message << endl; return -1; } /* * Define a scan operation. * NDBAPI. */ myScanOp = myTrans->getNdbIndexScanOperation(myIndex); if (myScanOp == NULL) { cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); return -1; } /** * Read without locks, without being placed in lock queue */ if( myScanOp->readTuples(NdbOperation::LM_Read, 1, 0, true) == -1) { cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); return -1; } /** * Define storage for fetched attributes. * E.g., the resulting attributes of executing * myOp->getValue("REG_NO") is placed in myRecAttr[0]. * No data exists in myRecAttr until transaction has commited! */ myRecAttr[0] = myScanOp->getValue("REG_NO"); myRecAttr[1] = myScanOp->getValue("BRAND"); myRecAttr[2] = myScanOp->getValue("COLOR"); if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL) { cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); return -1; } /** * Start scan (NoCommit since we are only reading at this stage); */ if(myTrans->execute(NdbTransaction::NoCommit) != 0){ err = myTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ cout << myTrans->getNdbError().message << endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } cout << err.code << endl; cout << myTrans->getNdbError().code << endl; myNdb->closeTransaction(myTrans); return -1; } /** * start of loop: nextResult(true) means that "parallelism" number of * rows are fetched from NDB and cached in NDBAPI */ while((check = myScanOp->nextResult(true)) == 0){ int realBatchSize = 0; do { fetchedRows++; realBatchSize++; /** * print REG_NO unsigned int */ //cout << myRecAttr[0]->u_32_value() << "\t"; /** * print BRAND character string */ //cout << myRecAttr[1]->aRef() << "\t"; /** * print COLOR character string */ //cout << myRecAttr[2]->aRef() << endl; /** * nextResult(false) means that the records * cached in the NDBAPI are modified before * fetching more rows from NDB. */ } while((check = myScanOp->nextResult(false)) == 0); cout << "Real Batch Size is " << realBatchSize << endl; } myNdb->closeTransaction(myTrans); return 1; } return -1; } int main() { ndb_init(); MYSQL mysql; /************************************************************** * Connect to mysql server and create table * **************************************************************/ { if ( !mysql_init(&mysql) ) { cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 3306, "/tmp/mysql.sock", 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE test_db"); if (mysql_query(&mysql, "USE test_db") != 0) MYSQLERROR(mysql); create_table(mysql); } /************************************************************** * Connect to ndb cluster * **************************************************************/ Ndb_cluster_connection cluster_connection; if (cluster_connection.connect(4, 5, 1)) { cout << "Unable to connect to cluster within 30 secs." << endl; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb myNdb(&cluster_connection,"test_db"); if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb.getNdbError()); exit(-1); } /******************************************* * Check table definition * *******************************************/ int column_color; { const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *t= myDict->getTable("garage"); Car car; if (t->getColumn("COLOR")->getLength() != sizeof(car.color) || t->getColumn("BRAND")->getLength() != sizeof(car.brand)) { cout << "Wrong table definition" << endl; exit(-1); } column_color= t->getColumn("COLOR")->getColumnNo(); } if(populate(&myNdb) > 0) cout << "populate: Success!" << endl; if(scan_print(&myNdb) > 0) cout << "scan_print: Success!" << endl << endl; return 0; }