#include #include // Used for cout #include #include #include namespace std {} using namespace std; static void milliSleep(int milliseconds){ struct timeval sleeptime; sleeptime.tv_sec = milliseconds / 1000; sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; select(0, 0, 0, 0, &sleeptime); } #define PRINT_ERROR(code,msg) \ cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << endl #define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); } #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } static void create_table(MYSQL &mysql); static void populate(Ndb * myNdb); static void* _thread_run(void* arg); int main() { // ndb_init must be called first ndb_init(); // connect to mysql server and cluster and run application { // Object representing the cluster Ndb_cluster_connection cluster_connection; // Connect to cluster management server (ndb_mgmd) if (cluster_connection.connect(4 /* retries */, 5 /* delay between retries */, 1 /* verbose */)) { cout << "Cluster management server was not ready within 30 secs.\n"; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } // connect to mysql server MYSQL mysql; if ( !mysql_init(&mysql) ) { cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 3306, "/tmp/mysql.sock", 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE test_db_1"); if (mysql_query(&mysql, "USE test_db_1") != 0) MYSQLERROR(mysql); create_table(mysql); // Connect to database via NdbApi Ndb myNdb( &cluster_connection, "test_db_1" ); if (myNdb.init()) APIERROR(myNdb.getNdbError()); // Connect to database via NdbApi Ndb myNdb2( &cluster_connection, "test_db_1" ); if (myNdb2.init()) APIERROR(myNdb.getNdbError()); populate(&myNdb); pthread_t thread1; int retCode = pthread_create(&thread1, NULL, _thread_run, &myNdb); if (retCode != 0) { cout << "Failed to create a thread 1. Error code " << retCode << endl; exit(-1); } pthread_t thread2; retCode = pthread_create(&thread2, NULL, _thread_run, &myNdb2); if (retCode != 0) { cout << "Failed to create a thread 2. Error code " << retCode << endl; exit(-1); } // Wait until both threads finish void* value_ptr; pthread_join(thread1, &value_ptr); pthread_join(thread2, &value_ptr); cout << "Both threads finished" << endl; } ndb_end(0); cout << "\nTo drop created table use:\n" << "echo \"drop table mytablename\" | mysql test_db_1 -u root\n"; return 0; } /********************************************************* * Create a table named mytablename if it does not exist * *********************************************************/ static void create_table(MYSQL &mysql) { if (mysql_query(&mysql, "CREATE TABLE IF NOT EXISTS" " mytablename" " (ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY," " ATTR2 INT UNSIGNED NOT NULL)" " ENGINE=NDB")) MYSQLERROR(mysql); } void populate(Ndb * myNdb) { const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("mytablename"); if (myTable == NULL) APIERROR(myDict->getNdbError()); // Populate table cout << "Populating the table" << endl; for (int i = 0; i < 1000; i++) { NdbTransaction *myTransaction= myNdb->startTransaction(); if (myTransaction == NULL) APIERROR(myNdb->getNdbError()); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->writeTuple(); myOperation->equal("ATTR1", i); myOperation->setValue("ATTR2", 0); if (myTransaction->execute(NdbTransaction::Commit) == -1) APIERROR(myTransaction->getNdbError()); myNdb->closeTransaction(myTransaction); } } void* _thread_run(void* arg) { Ndb * myNdb = (Ndb*)arg; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("mytablename"); if (myTable == NULL) APIERROR(myDict->getNdbError()); int attr2ColumnId = myTable->getColumn("ATTR2")->getColumnNo(); const NdbDictionary::Index *myIndex= myDict->getIndex("PRIMARY", "mytablename"); if (myIndex == NULL) APIERROR(myDict->getNdbError()); for (int i = 0; i < 100; i++) { NdbTransaction *myTransaction= myNdb->startTransaction(); if (myTransaction == NULL) APIERROR(myNdb->getNdbError()); NdbIndexScanOperation *myScanOp = myTransaction->getNdbIndexScanOperation(myIndex); if (myScanOp == NULL) APIERROR(myTransaction->getNdbError()); if( myScanOp->readTuples(NdbOperation::LM_Exclusive, NdbScanOperation::SF_OrderBy) == -1) APIERROR(myScanOp->getNdbError()); // Set the filter /* NdbScanFilter filter(myScanOp); if (filter.begin() == -1) APIERROR(myScanOp->getNdbError()); Uint32 zero = 0; if (filter.cmp(NdbScanFilter::COND_EQ, attr2ColumnId, &zero, sizeof(zero)) < 0) APIERROR(myScanOp->getNdbError()); if (filter.end() == -1) APIERROR(myScanOp->getNdbError()); */ NdbRecAttr * myRecAttr[2]; myRecAttr[0] = myScanOp->getValue("ATTR1"); myRecAttr[1] = myScanOp->getValue("ATTR2"); if(myRecAttr[0] == NULL || myRecAttr[1] == NULL) APIERROR(myScanOp->getNdbError()); // Start scan if(myTransaction->execute(NdbTransaction::NoCommit) != 0) { NdbError err = myTransaction->getNdbError(); if(err.status == NdbError::TemporaryError) { cout << "Thread: " << pthread_self() << " - " << myTransaction->getNdbError().message << endl; myNdb->closeTransaction(myTransaction); milliSleep(50); continue; } APIERROR(err); } // Process the whole result batch if (myScanOp->nextResult(true) < 0) APIERROR(myScanOp->getNdbError()); // Update all retrieved records using another transaction NdbTransaction* updateTrans = myNdb->startTransaction(); if (updateTrans == NULL) APIERROR(myNdb->getNdbError()); Uint32 firstRecord = 1000; Uint32 lastRecord = 0; do { if (firstRecord > myRecAttr[0]->u_32_value()) firstRecord = myRecAttr[0]->u_32_value(); if (lastRecord < myRecAttr[0]->u_32_value()) lastRecord = myRecAttr[0]->u_32_value(); NdbOperation *updateOp = myScanOp->updateCurrentTuple(updateTrans); if (updateOp == 0) APIERROR(myScanOp->getNdbError()); updateOp->setValue(attr2ColumnId, (Uint32)(myRecAttr[1]->u_32_value() + 1)); } while (myScanOp->nextResult(false) == 0); // Execute the update transaction cout << "Thread: " << pthread_self() << " - updating records " << firstRecord << " through " << lastRecord << endl; if (updateTrans->execute(NdbTransaction::Commit) == -1) APIERROR(updateTrans->getNdbError()); // Close the update transaction myNdb->closeTransaction(updateTrans); myScanOp->close(true, true); // Close the scan transaction myNdb->closeTransaction(myTransaction); } return 0; }