#include #include #include #include #include #include #include #include #include #include #include "ndbapi/NdbApi.hpp" #define PRINT_ERROR(code,msg) \ std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(1); } Ndb_cluster_connection* cluster_connection; std::mutex coutMutex; struct Session { long long id; unsigned int creationDate; unsigned int dateTo; char reservations[13840]; }; std::thread::id firstThreadId; void ReadSession(NdbTransaction* mainTransaction, long long id, Session& session) { NdbTransaction* transaction = nullptr; if (std::this_thread::get_id()==firstThreadId) { transaction = mainTransaction; } else { transaction = mainTransaction->getNdb()->startTransaction(); if (transaction == nullptr) APIERROR(mainTransaction->getNdb()->getNdbError()); } NdbOperation* myOperation = transaction->getNdbOperation("SE_Entities"); if (myOperation == NULL) APIERROR(transaction->getNdbError()); if (myOperation->readTuple(NdbOperation::LM_Read)==-1) APIERROR(myOperation->getNdbError()); if (myOperation->equal("id", id) == -1) APIERROR(myOperation->getNdbError()); NdbRecAttr* nextAttr = myOperation->getValue("creationDate", (char*)&session.creationDate); if (nextAttr == NULL) APIERROR(myOperation->getNdbError()); nextAttr = myOperation->getValue("dateTo", (char*)&session.dateTo); if (nextAttr == NULL) APIERROR(myOperation->getNdbError()); nextAttr = myOperation->getValue("reservations", session.reservations); if (nextAttr == NULL) APIERROR(myOperation->getNdbError()); if (transaction->execute(NdbTransaction::NoCommit) == -1) APIERROR(transaction->getNdbError()); session.id = id; if (transaction != mainTransaction) { transaction->close(); } } void Test(const char* schemaName) { std::unique_ptr myNdb(new Ndb(cluster_connection, schemaName)); if (myNdb->init() == -1) { APIERROR(myNdb->getNdbError()); exit(1); } NdbDictionary::Dictionary* myDict = myNdb->getDictionary(); const NdbDictionary::Table *myTable = myDict->getTable("SE_Entities"); if (myTable == nullptr) APIERROR(myDict->getNdbError()); myTable = myDict->getTable("SE_ExpiredEntities"); if (myTable == nullptr) APIERROR(myDict->getNdbError()); auto t_start = std::chrono::high_resolution_clock::now(); auto succReads = 0U, totalReads = 0U; std::chrono::duration < double > execDuration; NdbTransaction *myTransaction = myNdb->startTransaction(); if (myTransaction == nullptr) APIERROR(myNdb->getNdbError()); totalReads++; NdbScanOperation* myOperation = myTransaction->getNdbIndexScanOperation("PRIMARY", "SE_ExpiredEntities"); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); if (myOperation->readTuples(NdbOperation::LM_CommittedRead) == -1) APIERROR(myTransaction->getNdbError()); long long sessionId; NdbRecAttr* nextAttr = myOperation->getValue("sessionId", (char*)&sessionId); if (nextAttr == NULL) APIERROR(myOperation->getNdbError()); if (myTransaction->execute(NdbTransaction::NoCommit) == -1) APIERROR(myTransaction->getNdbError()); if (myTransaction->getNdbError().classification == NdbError::NoDataFound) { myNdb->closeTransaction(myTransaction); return; } std::vector sessions; while (myOperation->nextResult(true)==0) { succReads++; /*sessions.emplace_back(); ReadSession(myTransaction, sessionId, sessions.back());*/ Session tmpSesssion; ReadSession(myTransaction,sessionId,tmpSesssion); std::cout << tmpSesssion.id << "\t" << tmpSesssion.dateTo << std::endl; } myNdb->closeTransaction(myTransaction); auto duration = std::chrono::duration(std::chrono::high_resolution_clock::now() - t_start).count(); std::lock_guard guard(coutMutex); std::cout << "Finished test in " << duration << " sec." << "Total " << totalReads << " iterations, " << succReads << " successful reads. It's " << totalReads / duration << " iterations per sec. NdbTransaction::exec method duration is " << execDuration.count() << std::endl; for (auto& session : sessions) { std::cout << session.id << "\t" << session.dateTo << std::endl; } } int main(int argc, char* argv[]) { if (argc < 4) { std::cout << "Usage: ndb_ty " << std::endl; return 0; } srand(time(nullptr)); ndb_init(); cluster_connection = new Ndb_cluster_connection(argv[1]); if (cluster_connection->connect(5, 3, 1)) { std::cout << "Connect to cluster management server failed.\n"; exit(1); } if (cluster_connection->wait_until_ready(30, 30)) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(1); } Ndb* myNdb = new Ndb(cluster_connection, argv[2]); if (myNdb->init() == -1) { APIERROR(myNdb->getNdbError()); exit(1); } std::cout << "Connected to " << argv[1] << ", schema " << argv[2] << std::endl; std::vector threads; std::cout << "Starting test" << std::endl; for (int i = 0; i < atoi(argv[3]); i++) { threads.emplace_back([&]() { Test(argv[2]); }); if (i == 0) { firstThreadId = threads.back().get_id(); } } for (auto& thread : threads) { thread.join(); } ndb_end(0); return 0; }