#include #include #include #include #include #include #include #include #include #include #include #include #ifdef WIN32 # define WIN32_LEAN_AND_MEAN # include # define noexcept #else # include # include # include #endif #define CATCH(x, line) catch(const std::exception& e) \ { \ std::cout << (x) << ":" << (line) << ": " << e.what() << std::endl; \ } \ catch(...) \ { \ std::cout << (x) << ":" << (line) << ": Unhandled exception catched." << std::endl; \ } #define ASSERT_NOT_NULL(x) if ((x) == nullptr) throw benchExcept(myNdb->getNdbError().message, (__LINE__)) #define THROW_NDB_ERROR(x) throw benchExcept((x)->getNdbError().message, (__LINE__)); class benchExcept : public std::exception { public: benchExcept(const std::string& m, int l = -1) noexcept : message(m), line(l) { } benchExcept(const char* m, int l = -1) noexcept : message(m), line(l) { } benchExcept(const benchExcept& e, int l = -1) noexcept : message(e.message), line(l) { } benchExcept& operator=(const benchExcept& e) noexcept { if(this != &e) { message = e.message; } } virtual ~benchExcept(){} virtual const char* what() const noexcept { if (line != -1) { return (std::string("line: ") + std::to_string(line) + ": " + message).c_str(); } return message.c_str(); } private: std::string message; int line; }; unsigned char date[4] = { 0x58, 0xdd, 0x12, 0xba }; unsigned char blob[34] = { 34, 0, 0xDE, 0xAD, 0xC0, 0xDE, 0xDE, 0xAD, 0xC0, 0xDE, 0xDE, 0xAD, 0xC0, 0xDE, 0xDE, 0xAD, 0xC0, 0xDE }; unsigned char blob1[34] = { 34, 0, 0xBA, 0xD, 0xC0, 0xDE, 0xBA, 0xD, 0xC0, 0xDE, 0xBA, 0xD, 0xC0, 0xDE, 0xBA, 0xD, 0xC0, 0xDE }; unsigned long long ReserveBalance(unsigned long long id, double& count, Ndb* myNdb, const NdbDictionary::Table* SE_Entities, const NdbDictionary::Table* ST_Locks, const NdbDictionary::Table* BL_TmpValues, const NdbDictionary::Table* BL_Values, const NdbDictionary::Table* BL_PeriodicAttrs, const NdbDictionary::Table* BL_TDs, const unsigned long long* ids) { int force = 0; NdbOperation* myOperation = nullptr; NdbTransaction* myTransaction = nullptr; NdbRecAttr* myRecAttr = nullptr; bool isCommit = (id != 0); auto lock_id = 0ULL; myTransaction = myNdb->startTransaction(); ASSERT_NOT_NULL(myTransaction); // ReserveBalance executed(NoCommit) operations: // Insert(PrimaryKeyAccess) operation on ST_Locks table if (myNdb->getAutoIncrementValue("ST_Locks", lock_id, 1000) == -1) THROW_NDB_ERROR(myNdb); myOperation = myTransaction->getNdbOperation(ST_Locks); ASSERT_NOT_NULL(myOperation); if (myOperation->insertTuple() == -1) THROW_NDB_ERROR(myOperation); if (myOperation->equal("id", lock_id) == -1) THROW_NDB_ERROR(myOperation); if (myOperation->setValue("executeDate", (const char*)&date[0], 4) == -1) THROW_NDB_ERROR(myOperation); if (myOperation->setValue("clientRequestId", lock_id) == -1) THROW_NDB_ERROR(myOperation); if (myTransaction->execute(NdbTransaction::NoCommit, NdbOperation::AbortOnError, force) == -1) THROW_NDB_ERROR(myTransaction); // NoCommit - Insert(PrimaryKeyAccess) operation on SE_Entities table // FullCommit - Read(PrimaryKeyAccess) operation on SE_Entities table with LM_Exclusive(Read with exclusive lock) lock mode if (!isCommit) { if (myNdb->getAutoIncrementValue("SE_Entities", id, 1000) == -1) THROW_NDB_ERROR(myNdb); } myOperation = myTransaction->getNdbOperation(SE_Entities); ASSERT_NOT_NULL(myOperation); if (!isCommit) { if (myOperation->insertTuple() == -1) THROW_NDB_ERROR(myOperation); if (myOperation->equal("id", id) == -1) THROW_NDB_ERROR(myOperation); if (myOperation->setValue("creationDate", (const char*)&date[0], 4) == -1) THROW_NDB_ERROR(myOperation); if (myOperation->setValue("reservations", (const char*)&blob[0], 34) == -1) THROW_NDB_ERROR(myOperation); } else { if (myOperation->readTuple(NdbOperation::LM_Exclusive) == -1) THROW_NDB_ERROR(myOperation); if (myOperation->equal("id", id) == -1) THROW_NDB_ERROR(myOperation); myRecAttr = myOperation->getValue("creationDate", NULL); ASSERT_NOT_NULL(myRecAttr); count += myRecAttr->int64_value(); } if (myTransaction->execute(NdbTransaction::NoCommit, NdbOperation::AbortOnError, force) == -1) THROW_NDB_ERROR(myTransaction); // ReserveBalance executed(NoCommit) operations: if (myTransaction->execute(NdbTransaction::NoCommit, NdbOperation::AbortOnError, force) == -1) THROW_NDB_ERROR(myTransaction); if (isCommit) { // ReserveBalance executed(NoCommit) operations: if (myTransaction->execute(NdbTransaction::NoCommit, NdbOperation::AbortOnError, force) == -1) THROW_NDB_ERROR(myTransaction); } myOperation = myTransaction->getNdbOperation(SE_Entities); ASSERT_NOT_NULL(myOperation); if (!isCommit) { if (myOperation->updateTuple() == -1) THROW_NDB_ERROR(myOperation); if (myOperation->equal("id", id) == -1) THROW_NDB_ERROR(myOperation); if (myOperation->setValue("reservations", (const char*)&blob1[0], 34) == -1) THROW_NDB_ERROR(myOperation); } else { if (myOperation->deleteTuple() == -1) THROW_NDB_ERROR(myOperation); if (myOperation->equal("id", id) == -1) THROW_NDB_ERROR(myOperation); } if (myTransaction->execute(NdbTransaction::NoCommit, NdbOperation::AbortOnError, force) == -1) THROW_NDB_ERROR(myTransaction); if (myTransaction->execute(NdbTransaction::Commit, NdbOperation::AbortOnError, force) == -1) THROW_NDB_ERROR(myTransaction); myNdb->closeTransaction(myTransaction); return id; } void execute(int iters, Ndb* myNdb) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); double count = 0.0; const NdbDictionary::Table* SE_Entities = myNdb->getDictionary()->getTable("SE_Entities"); const NdbDictionary::Table* ST_Locks = myNdb->getDictionary()->getTable("ST_Locks"); const NdbDictionary::Table* BL_TmpValues = myNdb->getDictionary()->getTable("BL_TmpValues"); const NdbDictionary::Table* BL_Values = myNdb->getDictionary()->getTable("BL_Values"); const NdbDictionary::Table* BL_PeriodicAttrs = myNdb->getDictionary()->getTable("BL_PeriodicAttrs"); const NdbDictionary::Table* BL_TDs = myNdb->getDictionary()->getTable("BL_TDs"); try { for (int i = 0; i < iters; ++i) { auto id1 = std::rand() % 200 + 4ULL; auto id2 = std::rand() % 200 + 4ULL; auto id3 = std::rand() % 200 + 4ULL; if (id1 == 23 || id1 == 64 || id1 == 65 || id1 == 106 || id1 == 157 || id1 == 168 || id1 == 179) { ++++id1; } if (id2 == 23 || id2 == 64 || id2 == 65 || id2 == 106 || id2 == 157 || id2 == 168 || id2 == 179) { ++++id2; } if (id3 == 23 || id3 == 64 || id3 == 65 || id3 == 106 || id3 == 157 || id3 == 168 || id3 == 179) { ++++id3; } unsigned long long ids[3] = { id1, id2, id3 }; auto id = ReserveBalance(0ULL, count, myNdb, SE_Entities, ST_Locks, BL_TmpValues, BL_Values, BL_PeriodicAttrs, BL_TDs, ids); ReserveBalance(id, count, myNdb, SE_Entities, ST_Locks, BL_TmpValues, BL_Values, BL_PeriodicAttrs, BL_TDs, ids); } fprintf(stderr, "count: %f\n", count); } CATCH("execute", __LINE__) delete myNdb; } class core { public: void LetsGo() { try { std::srand(std::time(0)); for (auto i = 0; i < args.ThreadsCount; ++i) { auto thread = new std::thread([=]{execute(args.IterationsCount, GetNdb());}); SetThreadAffinity(i, thread->native_handle()); threads.push_back(thread); } for (auto t : threads) { t->join(); } CleanTreads(); } CATCH("core::LetsGo", __LINE__) } public: core() : connection(nullptr) { ndb_init(); } ~core() { try { delete connection; } CATCH("core::~core:", __LINE__) try { ndb_end(0); } CATCH("core::~core:", __LINE__) } bool Prepare(int argc, char** argv) { if (ParseArgs(argc, argv)) { try { connection = new Ndb_cluster_connection(args.ConnectString.c_str()); connection->set_name("bs_bench"); if (args.ConnectionCpu != -1) { if (connection->set_recv_thread_cpu((unsigned short*)&args.ConnectionCpu, 1) == -1) { throw benchExcept(std::string("Error setting NDB receiver thread affinity: " + std::string(connection->get_latest_error_msg())).c_str()); } } if (connection->connect(5,3,1)) { std::cout << "Connect to cluster management server failed." << std::endl; throw benchExcept(std::string("Connect to cluster management server failed." + std::string(connection->get_latest_error_msg())).c_str()); } if (connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; throw benchExcept(std::string("Cluster was not ready within 30 secs." + std::string(connection->get_latest_error_msg())).c_str()); } return true; } CATCH("core::Prepare", __LINE__) } return false; } private: Ndb* GetNdb() const { Ndb* myNdb = new Ndb(connection, args.Catalog.c_str()); if (myNdb->init(1024) == -1) THROW_NDB_ERROR(myNdb); return myNdb; } void CleanTreads() { try { for (auto t : threads) { delete t; } threads.clear(); } CATCH("core::CleanTreads", __LINE__) } void SetThreadAffinity(int i, std::thread::native_handle_type native_handle) const { if (args.ThreadsCpuList.empty()) return; unsigned long long cpuMask = 1ULL << args.ThreadsCpuList[i % args.ThreadsCpuList.size()]; if (cpuMask == 0) return; bool err = false; #ifdef WIN32 DWORD_PTR mask = cpuMask; err = ::SetThreadAffinityMask(native_handle, mask) == 0; #else cpu_set_t cpuset; CPU_ZERO(&cpuset); int core_id = 0; while (cpuMask != 0) { if ((cpuMask&(1ULL << core_id)) != 0) { int num_cores = sysconf(_SC_NPROCESSORS_ONLN); if (core_id >= num_cores) // core_id = 0, 1, ... n-1 if system has n cores { throw(std::runtime_error("Error setting thread affinity. CPU number is greater than number of CPUs.")); } cpuMask &= !(1ULL << core_id); CPU_SET(core_id, &cpuset); } ++core_id; } err = pthread_setaffinity_np(native_handle, sizeof(cpu_set_t), &cpuset) != 0; #endif if (err) { throw(std::runtime_error("Error setting thread affinity")); } } void csv2vec(const std::string& csv, std::vector& vec) { static const std::string comma(","); auto start = 0U; auto end = csv.find(comma); while (end != std::string::npos) { vec.push_back(atoi(csv.substr(start, end - start).c_str())); start = end + comma.length(); end = csv.find(comma, start); } vec.push_back(atoi(csv.substr(start, end).c_str())); /* GCC 4.8.1 not implemented regex :( static const std::regex rx(","); for(std::sregex_token_iterator first(std::begin(csv), std::end(csv), rx, -1), last; first != last; ++first) { vec.push_back(atoi(first->str().c_str())); } */ } bool ParseArgs(int argc, char** argv) { if (argc < 5) { PrintUsage(); return false; } args.ConnectString = argv[1]; args.Catalog = argv[2]; args.IterationsCount = atoi(argv[3]); args.ThreadsCount = atoi(argv[4]); if (argc > 5) { args.ConnectionCpu = atoi(argv[5]); } else { args.ConnectionCpu = -1; } if (argc > 6) { csv2vec(argv[6], args.ThreadsCpuList); } args.ForceSend = argc > 7 ? atoi(argv[7]) : 0; args.ForceSend = args.ForceSend < 0 || args.ForceSend > 2 ? 0 : args.ForceSend; return true; } void PrintUsage() const { std::cout << "USAGE" << std::endl; std::cout << " bs_bench [ndb_cpu] [threads_cpus] [force_send]" << std::endl << std::endl; std::cout << "ARGUMENTS" << std::endl; std::cout << " MySQL Cluster management node connect string (e.g. 127.0.0.1:1186)" << std::endl; std::cout << " MySQL Cluster catalog name" << std::endl; std::cout << " count of measured requests" << std::endl; std::cout << " count of parallel running threads (each thread performs requests)" << std::endl; std::cout << " [ndb_cpu] csv-list of cpu numbers (e.g. 0,1,2), if set then affinity for ndb client recv threads will be set accordingly" << std::endl; std::cout << " [threads_cpus] csv-list of cpu numbers (e.g. 0,1,2), if set then affinity for test threads will be set accordingly" << std::endl; std::cout << " [force_send] force_send parameter value for execute transaction method (0-2), default 0" << std::endl; std::cout << "EXAMPLE" << std::endl; std::cout << " bs_bench 127.0.0.1:1186 BS 1000 6 1 0,1 2,3,4,5,6,7 2" << std::endl; } public: struct { int IterationsCount; int ThreadsCount; int ForceSend; std::string ConnectString; std::string Catalog; unsigned short ConnectionCpu; std::vector ThreadsCpuList; } args; private: Ndb_cluster_connection* connection; std::vector threads; }; int main(int argc, char** argv) { core c; if(c.Prepare(argc, argv)) { c.LetsGo(); } return 0; }