diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/configure.in clean_dolphin_rt/configure.in --- mysql-5.1.12/configure.in 2006-10-16 15:33:28 -04:00 +++ clean_dolphin_rt/configure.in 2006-11-11 15:34:51 -05:00 @@ -1838,6 +1838,8 @@ pthread_attr_getstacksize pthread_attr_setprio pthread_attr_setschedparam \ pthread_attr_setstacksize pthread_condattr_create pthread_getsequence_np \ pthread_key_delete pthread_rwlock_rdlock pthread_setprio \ + pthread_self sched_get_priority_min sched_get_priority_max \ + sched_setaffinity processor_bind sched_setscheduler \ pthread_setprio_np pthread_setschedparam pthread_sigmask readlink \ realpath rename rint rwlock_init setupterm \ shmget shmat shmdt shmctl sigaction sigemptyset sigaddset \ @@ -1859,6 +1861,47 @@ *) AC_CHECK_FUNCS(clock_gettime) ;; esac +#checking for Linux Scheduling Support +AC_MSG_CHECKING(for Linux scheduling support) +AC_TRY_LINK( + [#include + #include + #include + _syscall0(pid_t,gettid)], + [const struct sched_param *p= (const struct sched_param*)0; + pid_t tid = gettid(); + int ret = sched_setaffinity(tid, 0, p);], + AC_MSG_RESULT(yes) + AC_DEFINE(HAVE_LINUX_SCHEDULING, [1], [Linux scheduling function]), + AC_MSG_RESULT(no)) + +#checking for Locking CPU support +AC_MSG_CHECKING(for Linux affinity support) +AC_TRY_LINK( + [#include + #include + #include + _syscall0(pid_t,gettid)], + [unsigned long *mask = (unsigned long *)0; + pid_t tid = gettid(); + int ret = sched_setaffinity(tid, sizeof(unsigned long), mask);], + AC_MSG_RESULT(yes) + AC_DEFINE(HAVE_LINUX_AFFINITY, [1], [Linux affinity function]), + AC_MSG_RESULT(no)) + +AC_MSG_CHECKING(for Solaris affinity support) +AC_TRY_LINK( + [#include + #include + #include + #include ], + [processor_id_t *pid = (processor_id_t *)0; + processor_id_t cpu_id = (processor_id_t)0; + id_t tid = _lw_self(); + int ret = processor_bind(P_LWPID, tid, cpu_id, bind);], + AC_MSG_RESULT(yes) + AC_DEFINE(HAVE_SOLARIS_AFFINITY, [1], [Solaris affinity function]), + AC_MSG_RESULT(no)) # isinf() could be a function or a macro (HPUX) AC_MSG_CHECKING(for isinf with ) diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp clean_dolphin_rt/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp --- mysql-5.1.12/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp 2006-05-18 02:50:46 -04:00 +++ clean_dolphin_rt/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp 2006-10-25 04:47:41 -04:00 @@ -72,6 +72,9 @@ NdbfsDumpAllFiles = 401, NdbfsDumpOpenFiles = 402, NdbfsDumpIdleFiles = 403, + CmvmiRealtimeScheduler = 503, + CmvmiExecuteLockCPU = 504, + CmvmiMaintLockCPU = 505, // 1222-1225 DICT LqhDumpAllDefinedTabs = 1332, LqhDumpNoLogPages = 1333, diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/include/mgmapi/mgmapi_config_parameters.h clean_dolphin_rt/storage/ndb/include/mgmapi/mgmapi_config_parameters.h --- mysql-5.1.12/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2006-08-31 18:09:09 -04:00 +++ clean_dolphin_rt/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2006-10-25 04:47:41 -04:00 @@ -96,6 +96,10 @@ #define CFG_DB_CHECKPOINT_SPEED 164 #define CFG_DB_CHECKPOINT_SPEED_SR 165 +#define CFG_DB_REALTIME_SCHEDULER 170 +#define CFG_DB_EXECUTE_LOCK_CPU 171 +#define CFG_DB_MAINT_LOCK_CPU 172 + #define CFG_DB_SGA 198 /* super pool mem */ #define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */ diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/include/portlib/NdbThread.h clean_dolphin_rt/storage/ndb/include/portlib/NdbThread.h --- mysql-5.1.12/storage/ndb/include/portlib/NdbThread.h 2005-07-26 07:42:55 -04:00 +++ clean_dolphin_rt/storage/ndb/include/portlib/NdbThread.h 2006-11-11 15:34:51 -05:00 @@ -35,6 +35,46 @@ typedef void* NDB_THREAD_ARG; typedef size_t NDB_THREAD_STACKSIZE; +#ifdef HAVE_LINUX_AFFINITY + typedef pid_t NDB_TID_TYPE; +#else +#ifdef HAVE_SOLARIS_AFFINITY + typedef id_t NDB_TID_TYPE; +#else + typedef int NDB_TID_TYPE; +#endif +#endif + +#ifdef HAVE_LINUX_SCHEDULING + typedef pid_t NDB_THAND_TYPE; +#else +#ifdef HAVE_PTHREAD_SELF + typedef pthread_t NDB_THAND_TYPE; +#else + typedef int NDB_THAND_TYPE; +#endif +#endif + +/** + * Three functions that are used in conjunctions with SocketClient + * threads and SocketServer threads. In the NDB kernel they are + * used to set real-time properties and lock threads to CPU's. In + * the NDB API they are dummy subroutines. + * add_thread_id is used before starting run method in thread. + * remove_thread_id is called immediately after returning from + * run-method in thread. + * fill_thread_object is called before calling add_thread_id to + * prepare parameters. + */ +void* +add_thread_id(void *param); + +void* +remove_thread_id(void *param); + +void +fill_thread_object(void *param, uint *len, bool server); + struct NdbThread; /* @@ -60,6 +100,18 @@ const char* p_thread_name, NDB_THREAD_PRIO thread_prio); +struct NdbThread* NdbThread_CreateWithFunc(NDB_THREAD_FUNC *p_thread_func, + NDB_THREAD_ARG *p_thread_arg, + const NDB_THREAD_STACKSIZE thread_stack_size, + const char* p_thread_name, + NDB_THREAD_PRIO thread_prio, + NDB_THREAD_FUNC *start_func, + NDB_THREAD_ARG start_obj, + size_t start_obj_len, + NDB_THREAD_FUNC *end_func, + NDB_THREAD_ARG end_obj, + size_t end_obj_len); + /** * Destroy a thread * Deallocates memory for thread @@ -93,6 +145,26 @@ */ int NdbThread_SetConcurrencyLevel(int level); +/** + * Get thread id + */ +NDB_TID_TYPE Ndb_getThreadId(); + +/** + * Get thread handle + */ +NDB_THAND_TYPE Ndb_getThreadHandle(); + +/** + * Set Scheduler for pid + */ +int Ndb_SetScheduler(NDB_THAND_TYPE threadHandle, bool rt_prio, + bool high_prio); + +/** + * Lock Thread to CPU + */ +int Ndb_LockCPU(NDB_TID_TYPE threadId, Uint32 cpu_id); #ifdef __cplusplus } diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/include/transporter/TransporterCallback.hpp clean_dolphin_rt/storage/ndb/include/transporter/TransporterCallback.hpp --- mysql-5.1.12/storage/ndb/include/transporter/TransporterCallback.hpp 2005-11-04 15:09:58 -05:00 +++ clean_dolphin_rt/storage/ndb/include/transporter/TransporterCallback.hpp 2006-10-28 03:35:33 -04:00 @@ -34,7 +34,7 @@ /** * Call back functions */ - + /** * The execute function */ diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/common/portlib/NdbThread.c clean_dolphin_rt/storage/ndb/src/common/portlib/NdbThread.c --- mysql-5.1.12/storage/ndb/src/common/portlib/NdbThread.c 2005-11-23 06:32:24 -05:00 +++ clean_dolphin_rt/storage/ndb/src/common/portlib/NdbThread.c 2006-11-12 22:04:02 -05:00 @@ -20,6 +20,21 @@ #include #include +#ifdef HAVE_LINUX_AFFINITY +#include +#include +#include + +_syscall0(pid_t,gettid) +#endif + +#ifdef HAVE_SOLARIS_AFFINITY +#include +#include +#include +#include +#endif + #define MAX_THREAD_NAME 16 /*#define USE_PTHREAD_EXTRAS*/ @@ -34,9 +49,13 @@ char thread_name[MAX_THREAD_NAME]; NDB_THREAD_FUNC * func; void * object; + NDB_THREAD_FUNC *start_func; + NDB_THREAD_FUNC *end_func; + bool same_start_end_object; + char start_object[128]; + char end_object[128]; }; - #ifdef NDB_SHM_TRANSPORTER void NdbThread_set_shm_sigmask(my_bool block) { @@ -79,7 +98,16 @@ { void *ret; struct NdbThread * ss = (struct NdbThread *)_ss; + if (ss->start_func) + (*ss->start_func)(ss->start_object); ret= (* ss->func)(ss->object); + if (ss->end_func) + { + if (ss->same_start_end_object) + (*ss->end_func)(ss->start_object); + else + (*ss->end_func)(ss->end_object); + } DBUG_POP(); NdbThread_Exit(ret); } @@ -95,6 +123,23 @@ const char* p_thread_name, NDB_THREAD_PRIO thread_prio) { + return NdbThread_CreateWithFunc(p_thread_func, p_thread_arg, _thread_stack_size, + p_thread_name, thread_prio, + NULL, NULL, 0, NULL, NULL, 0); +} + +struct NdbThread* NdbThread_CreateWithFunc(NDB_THREAD_FUNC *p_thread_func, + NDB_THREAD_ARG *p_thread_arg, + const NDB_THREAD_STACKSIZE _thread_stack_size, + const char* p_thread_name, + NDB_THREAD_PRIO thread_prio, + NDB_THREAD_FUNC *start_func, + NDB_THREAD_ARG start_obj, + size_t start_obj_len, + NDB_THREAD_FUNC *end_func, + NDB_THREAD_ARG end_obj, + size_t end_obj_len) +{ struct NdbThread* tmpThread; int result; pthread_attr_t thread_attr; @@ -115,6 +160,15 @@ strnmov(tmpThread->thread_name,p_thread_name,sizeof(tmpThread->thread_name)); + tmpThread->start_func = start_func; + memcpy(tmpThread->start_object, start_obj, start_obj_len); + tmpThread->end_func = end_func; + memcpy(tmpThread->end_object, end_obj, end_obj_len); + if (start_obj == end_obj) + tmpThread->same_start_end_object = TRUE; + else + tmpThread->same_start_end_object = FALSE; + pthread_attr_init(&thread_attr); #ifdef PTHREAD_STACK_MIN if (thread_stack_size < PTHREAD_STACK_MIN) @@ -191,3 +245,193 @@ return 0; #endif } + +NDB_TID_TYPE +Ndb_getThreadId() +{ +#ifdef HAVE_LINUX_AFFINITY + pid_t tid = gettid(); + if (tid == (pid_t)-1) + { + /* + This extra check is from suggestion by Kristian Nielsen + to handle cases when running binaries on LinuxThreads + compiled with NPTL threads + */ + tid = getpid(); + } + return tid; +#else +#ifdef HAVE_SOLARIS_AFFINITY + id_t tid; + tid = _lw_self(); + return tid; +#else + return 0; +#endif +#endif +} + +NDB_THAND_TYPE +Ndb_getThreadHandle() +{ +#ifdef HAVE_LINUX_SCHEDULING + pid_t tid = gettid(); + if (tid == (pid_t)-1) + tid = getpid(); + return tid; +#else +#ifdef HAVE_PTHREAD_SELF + return pthread_self(); +#else + return 0; +#endif +#endif +} + +static int +get_max_prio(int policy) +{ + int max_prio; +#ifdef HAVE_SCHED_GET_PRIORITY_MAX + max_prio = sched_get_priority_max(policy); +#else + max_prio = 90; +#endif + return max_prio; +} + +static int +get_min_prio(int policy) +{ + int min_prio; +#ifdef HAVE_SCHED_GET_PRIORITY_MIN + min_prio = sched_get_priority_min(policy); +#else + min_prio = 1; +#endif + return min_prio; +} + +static int +get_prio(bool rt_prio, bool high_prio, int policy) +{ + int prio, min_prio, max_prio; + if (!rt_prio) + return 0; + max_prio = get_max_prio(policy); + min_prio = get_min_prio(policy); + if (high_prio) + prio = min_prio + 3; + else + prio = min_prio + 1; + if (prio < min_prio) + prio = min_prio; + return prio; +} + +int +Ndb_SetScheduler(NDB_THAND_TYPE threadHandle, bool rt_prio, + bool high_prio) +{ + int policy, prio, error_no= 0; +#ifdef HAVE_LINUX_SCHEDULING + int ret; + struct sched_param loc_sched_param; + if (rt_prio) + { + policy = SCHED_RR; + prio = get_prio(rt_prio, high_prio, policy); + } + else + { + policy = SCHED_OTHER; + prio = 0; + } + memset((char*)&loc_sched_param, sizeof(loc_sched_param), 0); + loc_sched_param.sched_priority = prio; + ret= sched_setscheduler(threadHandle, policy, &loc_sched_param); + if (ret) + error_no= errno; +#else +#ifdef HAVE_PTHREAD_SET_SCHEDPARAM + /* + This variant is POSIX compliant so should be useful on most + Operating Systems supporting real-time scheduling. + */ + int ret; + struct sched_param loc_sched_param; + if (rt_prio) + { + policy = SCHED_RR; + prio = get_prio(rt_prio, high_prio, policy); + } + else + { + policy = SCHED_OTHER; + prio = 0; + } + memset((char*)&loc_sched_param, sizeof(loc_sched_param), 0); + loc_sched_param.sched_priority = prio; + ret= pthread_setschedparam(threadHandle, policy, &loc_sched_param); + if (ret) + error_no= errno; +#endif +#endif + return error_no; +} + +int +Ndb_LockCPU(NDB_TID_TYPE threadId, Uint32 cpu_id) +{ + int error_no= 0; +#ifdef HAVE_LINUX_AFFINITY + /* + On recent Linux versions the ability to set processor + affinity is available through the sched_setaffinity call. + In Linux this is possible to do on thread level so we can + lock execution thread to one CPU and the rest of the threads + to another CPU. + + By combining Real-time Scheduling and Locking to CPU we can + achieve more or less a realtime system for NDB Cluster. + */ + int ret; + ulong bitmask = 0; + uint no_bits = sizeof(ulong)*8; + if (cpu_id >= no_bits) + return 0; /* cpu_id out of range for this to work */ + bitmask = (1 << cpu_id); + ret= sched_setaffinity(threadId, sizeof(ulong), + (const cpu_set_t *)&bitmask); + if (ret) + error_no= errno; +#else +#ifdef HAVE_SOLARIS_AFFINITY + /* + Solaris have a number of versions to lock threads to CPU's. + We'll use the processor_bind interface since we only work + with single threads and bind those to CPU's. + A bit unclear as whether the id returned by pthread_self + is the LWP id. + */ + int ret; + ret= processor_bind(P_LWPID, threadId, cpu_id, NULL); + if (ret) + error_no= errno; +#else +#ifdef WIN32 + /* + Windows can currently as far I found out only lock processes + to CPU's, thus it cannot be used to support the desirable + feture here. So we ignore the call on Windows for the moment. + */ + return error_no; +#else + return error_no; +#endif +#endif +#endif + return error_no; +} + diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/common/transporter/TransporterRegistry.cpp clean_dolphin_rt/storage/ndb/src/common/transporter/TransporterRegistry.cpp --- mysql-5.1.12/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2006-09-08 02:37:11 -04:00 +++ clean_dolphin_rt/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2006-10-28 03:35:33 -04:00 @@ -1349,12 +1349,22 @@ bool TransporterRegistry::start_clients() { + char thread_object[128]; + uint len; + m_run_start_clients_thread= true; - m_start_clients_thread= NdbThread_Create(run_start_clients_C, + fill_thread_object((void*)thread_object, &len, FALSE); + m_start_clients_thread= NdbThread_CreateWithFunc(run_start_clients_C, (void**)this, 32768, "ndb_start_clients", - NDB_THREAD_PRIO_LOW); + NDB_THREAD_PRIO_LOW, + add_thread_id, + thread_object, + len, + remove_thread_id, + thread_object, + len); if (m_start_clients_thread == 0) { m_run_start_clients_thread= false; return false; diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/common/util/SocketServer.cpp clean_dolphin_rt/storage/ndb/src/common/util/SocketServer.cpp --- mysql-5.1.12/storage/ndb/src/common/util/SocketServer.cpp 2006-06-08 09:55:09 -04:00 +++ clean_dolphin_rt/storage/ndb/src/common/util/SocketServer.cpp 2006-10-28 03:35:33 -04:00 @@ -213,14 +213,25 @@ } void -SocketServer::startServer(){ +SocketServer::startServer() +{ + char thread_object[128]; + uint len; + m_threadLock.lock(); if(m_thread == 0 && m_stopThread == false){ - m_thread = NdbThread_Create(socketServerThread_C, + fill_thread_object((void*)thread_object, &len, TRUE); + m_thread = NdbThread_CreateWithFunc(socketServerThread_C, (void**)this, 32768, "NdbSockServ", - NDB_THREAD_PRIO_LOW); + NDB_THREAD_PRIO_LOW, + add_thread_id, + thread_object, + len, + remove_thread_id, + thread_object, + len); } m_threadLock.unlock(); } diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp clean_dolphin_rt/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp --- mysql-5.1.12/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2006-06-09 06:40:01 -04:00 +++ clean_dolphin_rt/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2006-11-12 22:04:02 -05:00 @@ -364,11 +364,7 @@ Uint32 theStartPhase = signal->theData[1]; jamEntry(); - if (theStartPhase == 1){ - jam(); - sendSTTORRY(signal); - return; - } else if (theStartPhase == 3) { + if (theStartPhase == 3) { jam(); globalData.activateSendPacked = 1; sendSTTORRY(signal); @@ -386,6 +382,7 @@ jam(); if(m_ctx.m_config.lockPagesInMainMemory()){ + ndbout << "Attempting to Lock Pages in Main Memory" << endl; int res = NdbMem_MemLockAll(); if(res != 0){ g_eventLogger.warning("Failed to memlock pages"); @@ -1147,7 +1144,22 @@ } return; } - + + if (arg == DumpStateOrd::CmvmiRealtimeScheduler) + { + bool realtime_on = signal->theData[1]; + globalEmulatorData.theConfiguration->realtimeScheduler(realtime_on); + } + if (arg == DumpStateOrd::CmvmiExecuteLockCPU) + { + Uint32 cpu_id = signal->theData[1]; + globalEmulatorData.theConfiguration->executeLockCPU(cpu_id); + } + if (arg == DumpStateOrd::CmvmiMaintLockCPU) + { + Uint32 cpu_id = signal->theData[1]; + globalEmulatorData.theConfiguration->maintLockCPU(cpu_id); + } if (arg == DumpStateOrd::CmvmiSetRestartOnErrorInsert) { if(signal->getLength() == 1) diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp clean_dolphin_rt/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp --- mysql-5.1.12/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2006-08-31 18:09:09 -04:00 +++ clean_dolphin_rt/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2006-10-28 03:35:33 -04:00 @@ -27,6 +27,7 @@ #include #include #include +#include // use this to test broken pread code //#define HAVE_BROKEN_PREAD @@ -110,6 +111,7 @@ const NDB_THREAD_STACKSIZE stackSize = 8192; char buf[16]; + struct ThreadContainer container; numAsyncFiles++; BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles); @@ -117,11 +119,19 @@ theStartConditionPtr = NdbCondition_Create(); NdbMutex_Lock(theStartMutexPtr); theStartFlag = false; - theThreadPtr = NdbThread_Create(runAsyncFile, + container.conf = globalEmulatorData.theConfiguration; + container.type = NdbfsThread; + theThreadPtr = NdbThread_CreateWithFunc(runAsyncFile, (void**)this, stackSize, (char*)&buf, - NDB_THREAD_PRIO_MEAN); + NDB_THREAD_PRIO_MEAN, + add_thread_id, + &container, + sizeof(container), + remove_thread_id, + &container, + sizeof(container)); if (theThreadPtr == 0) ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, "","Could not allocate file system thread"); diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/kernel/main.cpp clean_dolphin_rt/storage/ndb/src/kernel/main.cpp --- mysql-5.1.12/storage/ndb/src/kernel/main.cpp 2006-06-15 08:27:40 -04:00 +++ clean_dolphin_rt/storage/ndb/src/kernel/main.cpp 2006-10-28 03:35:33 -04:00 @@ -459,20 +459,19 @@ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Connection to mgmd terminated before setup was complete", "StopOnError missing"); - if (!globalTransporterRegistry.start_clients()){ ndbout_c("globalTransporterRegistry.start_clients() failed"); exit(-1); } - globalEmulatorData.theWatchDog->doStart(); - globalEmulatorData.m_socket_server->startServer(); // theConfig->closeConfiguration(); - - globalEmulatorData.theThreadConfig->ipControlLoop(); - + { + Uint32 inx = globalEmulatorData.theConfiguration->addThreadId(MainThread); + globalEmulatorData.theThreadConfig->ipControlLoop(); + globalEmulatorData.theConfiguration->removeThreadId(inx); + } NdbShutdown(NST_Normal); return NRT_Default; diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/kernel/vm/Configuration.cpp clean_dolphin_rt/storage/ndb/src/kernel/vm/Configuration.cpp --- mysql-5.1.12/storage/ndb/src/kernel/vm/Configuration.cpp 2006-10-16 15:33:30 -04:00 +++ clean_dolphin_rt/storage/ndb/src/kernel/vm/Configuration.cpp 2006-11-10 08:26:03 -05:00 @@ -38,6 +38,7 @@ #include "pc.hpp" #include #include +#include extern "C" { void ndbSetOwnVersion(); @@ -204,7 +205,14 @@ _initialStart = true; g_start_type |= (1 << NodeState::ST_INITIAL_START); } - + + threadIdMutex = NdbMutex_Create(); + if (!threadIdMutex) + { + ndbout_c("Failed to create threadIdMutex"); + exit(-1); + } + initThreadArray(); return true; } @@ -444,6 +452,21 @@ "TimeBetweenWatchDogCheck missing"); } + if(iter.get(CFG_DB_REALTIME_SCHEDULER, &_realtimeScheduler)){ + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", + "RealtimeScheduler missing"); + } + + if(iter.get(CFG_DB_EXECUTE_LOCK_CPU, &_executeLockCPU)){ + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", + "LockExecuteThreadToCPU missing"); + } + + if(iter.get(CFG_DB_MAINT_LOCK_CPU, &_maintLockCPU)){ + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", + "LockMaintThreadsToCPU missing"); + } + /** * Get paths */ @@ -485,6 +508,51 @@ return _lockPagesInMainMemory; } +bool +Configuration::realtimeScheduler() const +{ + return (bool)_realtimeScheduler; +} + +void +Configuration::realtimeScheduler(bool realtime_on) +{ + bool old_value = (bool)_realtimeScheduler; + _realtimeScheduler = (Uint32)realtime_on; + if (old_value != realtime_on) + setAllRealtimeScheduler(); +} + +Uint32 +Configuration::executeLockCPU() const +{ + return _executeLockCPU; +} + +void +Configuration::executeLockCPU(Uint32 value) +{ + Uint32 old_value = _executeLockCPU; + _executeLockCPU = value; + if (value != old_value) + setAllLockCPU(TRUE); +} + +Uint32 +Configuration::maintLockCPU() const +{ + return _maintLockCPU; +} + +void +Configuration::maintLockCPU(Uint32 value) +{ + Uint32 old_value = _maintLockCPU; + _maintLockCPU = value; + if (value != old_value) + setAllLockCPU(FALSE); +} + int Configuration::timeBetweenWatchDogCheck() const { return _timeBetweenWatchDogCheck; @@ -893,3 +961,186 @@ Configuration::setInitialStart(bool val){ _initialStart = val; } + +void +Configuration::setAllRealtimeScheduler() +{ + Uint32 i; + for (i = 0; i < threadInfo.size(); i++) + { + if (threadInfo[i].type != NotInUse) + { + if (setRealtimeScheduler(threadInfo[i].threadHandle, + threadInfo[i].type, + FALSE)) + return; + } + } +} + +void +Configuration::setAllLockCPU(bool exec_thread) +{ + Uint32 i; + for (i = 0; i < threadInfo.size(); i++) + { + if (threadInfo[i].type != NotInUse) + { + if (setLockCPU(threadInfo[i].threadId, + threadInfo[i].type, + exec_thread, + FALSE)) + return; + } + } +} + +int +Configuration::setRealtimeScheduler(NDB_THAND_TYPE threadHandle, + enum ThreadTypes type, + bool init) +{ + /* + We ignore thread characteristics on platforms where we cannot + determine the thread id. + */ + if (!threadHandle) + return 0; + if (!init || _realtimeScheduler) + { + int error_no; + ndbout << "Set scheduler " << _realtimeScheduler; + ndbout << " threadHandle = " << (int)threadHandle << endl; + if ((error_no = Ndb_SetScheduler(threadHandle, _realtimeScheduler, + (type != MainThread)))) + { + ndbout << "Set scheduler failed with error_no = " << error_no << endl; + ;//Warning, no permission to set scheduler + return 1; + } + } + return 0; +} + +int +Configuration::setLockCPU(NDB_TID_TYPE threadId, + enum ThreadTypes type, + bool exec_thread, + bool init) +{ + Uint32 cpu_id; + /* + We ignore thread characteristics on platforms where we cannot + determine the thread id. + We only set new lock CPU characteristics for the threads for which + it has changed + */ + if (!threadId) + return 0; + if ((exec_thread && type != MainThread) || + (!exec_thread && type == MainThread)) + return 0; + if (type == MainThread) + cpu_id = _executeLockCPU; + else + cpu_id = _maintLockCPU; + if (!init || + cpu_id != NO_LOCK_CPU) + { + int error_no; + ndbout << "Lock threadId = " << threadId; + ndbout << " to CPU id = " << cpu_id << endl; + if ((error_no = Ndb_LockCPU(threadId, cpu_id))) + { + ndbout << "Failed to lock CPU, error_no = " << error_no << endl; + ;//Warning, no permission to lock thread to CPU + return 1; + } + } + return 0; +} + +void fill_thread_object(void *param, uint *len, bool server) +{ + struct ThreadContainer container; + + memset((char*)&container, sizeof(container), 0); + container.conf = globalEmulatorData.theConfiguration; + container.type = server ? SocketServerThread : SocketClientThread; + memcpy((char*)param, (char*)&container, sizeof(container)); + *len = sizeof(container); +} + +void* +add_thread_id(void *param) +{ + struct ThreadContainer container; + + memcpy((char*)&container, param, sizeof(struct ThreadContainer)); + container.index = container.conf->addThreadId(container.type); + memcpy(param, (char*)&container, sizeof(struct ThreadContainer)); + return NULL; +} + +void* +remove_thread_id(void *param) +{ + struct ThreadContainer container; + + memcpy((char*)&container, param, sizeof(struct ThreadContainer)); + container.conf->removeThreadId(container.index); + return NULL; +} + +Uint32 Configuration::addThreadId(enum ThreadTypes type) +{ + NDB_TID_TYPE threadId; + NDB_THAND_TYPE threadHandle; + Uint32 i, cpu_id; + NdbMutex_Lock(threadIdMutex); + for (i = 0; i < threadInfo.size(); i++) + { + if (threadInfo[i].type == NotInUse) + break; + } + if (i == threadInfo.size()) + { + struct ThreadInfo tmp; + threadInfo.push_back(tmp); + } + threadHandle = Ndb_getThreadHandle(); + threadInfo[i].threadHandle = threadHandle; + threadId = Ndb_getThreadId(); + threadInfo[i].threadId = threadId; + threadInfo[i].type = type; + NdbMutex_Unlock(threadIdMutex); + setRealtimeScheduler(threadHandle, type, TRUE); + setLockCPU(threadId, type, (type == MainThread), TRUE); + return i; +} + +void +Configuration::removeThreadId(Uint32 index) +{ + NdbMutex_Lock(threadIdMutex); + threadInfo[index].threadId = 0; + threadInfo[index].threadHandle = 0; + threadInfo[index].type = NotInUse; + NdbMutex_Unlock(threadIdMutex); +} + +void +Configuration::initThreadArray() +{ + NdbMutex_Lock(threadIdMutex); + for (Uint32 i = 0; i < threadInfo.size(); i++) + { + threadInfo[i].threadId = 0; + threadInfo[i].threadHandle = 0; + threadInfo[i].type = NotInUse; + } + NdbMutex_Unlock(threadIdMutex); +} + +template class Vector; + diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/kernel/vm/Configuration.hpp clean_dolphin_rt/storage/ndb/src/kernel/vm/Configuration.hpp --- mysql-5.1.12/storage/ndb/src/kernel/vm/Configuration.hpp 2006-02-09 05:34:38 -05:00 +++ clean_dolphin_rt/storage/ndb/src/kernel/vm/Configuration.hpp 2006-11-10 08:26:03 -05:00 @@ -20,6 +20,37 @@ #include #include #include +#include +#include + +enum ThreadTypes +{ + WatchDogThread = 1, + SocketServerThread = 2, + SocketClientThread = 3, + NdbfsThread = 3, + MainThread = 4, + NotInUse = 5 +}; + +#define MAX_NDB_THREADS 256 +#define NO_LOCK_CPU 65535 + +struct ThreadInfo +{ + NDB_TID_TYPE threadId; + NDB_THAND_TYPE threadHandle; + enum ThreadTypes type; +}; + +class Configuration; + +struct ThreadContainer +{ + Configuration *conf; + enum ThreadTypes type; + Uint32 index; +}; class ConfigRetriever; @@ -38,7 +69,29 @@ void closeConfiguration(bool end_session= true); bool lockPagesInMainMemory() const; - + + bool realtimeScheduler() const; + void realtimeScheduler(bool realtime_on); + + Uint32 executeLockCPU() const; + void executeLockCPU(Uint32 value); + + Uint32 maintLockCPU() const; + void maintLockCPU(Uint32 value); + + void setAllRealtimeScheduler(); + void setAllLockCPU(bool exec_thread); + int setLockCPU(NDB_TID_TYPE threadId, + enum ThreadTypes type, + bool exec_thread, + bool init); + int setRealtimeScheduler(NDB_THAND_TYPE threadHandle, + enum ThreadTypes type, + bool init); + Uint32 addThreadId(enum ThreadTypes type); + void removeThreadId(Uint32 index); + void initThreadArray(); + int timeBetweenWatchDogCheck() const ; void timeBetweenWatchDogCheck(int value); @@ -85,6 +138,12 @@ Uint32 _maxErrorLogs; Uint32 _lockPagesInMainMemory; Uint32 _timeBetweenWatchDogCheck; + Uint32 _realtimeScheduler; + Uint32 _executeLockCPU; + Uint32 _maintLockCPU; + + Vector threadInfo; + NdbMutex *threadIdMutex; ndb_mgm_configuration * m_ownConfig; ndb_mgm_configuration * m_clusterConfig; diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/kernel/vm/WatchDog.cpp clean_dolphin_rt/storage/ndb/src/kernel/vm/WatchDog.cpp --- mysql-5.1.12/storage/ndb/src/kernel/vm/WatchDog.cpp 2005-10-06 19:06:21 -04:00 +++ clean_dolphin_rt/storage/ndb/src/kernel/vm/WatchDog.cpp 2006-10-28 03:35:33 -04:00 @@ -23,6 +23,7 @@ #include #include #include +#include extern "C" void* @@ -50,13 +51,23 @@ } void -WatchDog::doStart(){ +WatchDog::doStart() +{ + struct ThreadContainer container; theStop = false; - theThreadPtr = NdbThread_Create(runWatchDog, + container.conf = globalEmulatorData.theConfiguration; + container.type = WatchDogThread; + theThreadPtr = NdbThread_CreateWithFunc(runWatchDog, (void**)this, 32768, "ndb_watchdog", - NDB_THREAD_PRIO_HIGH); + NDB_THREAD_PRIO_HIGH, + add_thread_id, + &container, + sizeof(container), + remove_thread_id, + &container, + sizeof(container)); } void diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/mgmsrv/ConfigInfo.cpp clean_dolphin_rt/storage/ndb/src/mgmsrv/ConfigInfo.cpp --- mysql-5.1.12/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2006-10-16 15:33:30 -04:00 +++ clean_dolphin_rt/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2006-10-25 04:47:41 -04:00 @@ -558,6 +558,42 @@ STR_VALUE(MAX_INT_RNIL) }, { + CFG_DB_EXECUTE_LOCK_CPU, + "LockExecuteThreadToCPU", + DB_TOKEN, + "CPU ID indicating which CPU will run the execution thread", + ConfigInfo::CI_USED, + true, + ConfigInfo::CI_INT, + "65535", + "0", + "65535" }, + + { + CFG_DB_MAINT_LOCK_CPU, + "LockMaintThreadsToCPU", + DB_TOKEN, + "CPU ID indicating which CPU will run the maintenance threads", + ConfigInfo::CI_USED, + true, + ConfigInfo::CI_INT, + "65535", + "0", + "65535" }, + + { + CFG_DB_REALTIME_SCHEDULER, + "RealtimeScheduler", + DB_TOKEN, + "If yes, then NDB Cluster threads will be scheduled as real-time threads", + ConfigInfo::CI_USED, + true, + ConfigInfo::CI_BOOL, + "false", + "false", + "true" }, + + { CFG_DB_MEMLOCK, "LockPagesInMainMemory", DB_TOKEN, diff -Nur --exclude=RCS --exclude=CVS --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet mysql-5.1.12/storage/ndb/src/ndbapi/TransporterFacade.cpp clean_dolphin_rt/storage/ndb/src/ndbapi/TransporterFacade.cpp --- mysql-5.1.12/storage/ndb/src/ndbapi/TransporterFacade.cpp 2006-08-31 18:09:10 -04:00 +++ clean_dolphin_rt/storage/ndb/src/ndbapi/TransporterFacade.cpp 2006-10-28 03:35:33 -04:00 @@ -61,6 +61,21 @@ * Call back functions *****************************************************************************/ +void* add_thread_id(void *param) +{ + return NULL; +} + +void *remove_thread_id(void *param) +{ + return NULL; +} + +void fill_thread_object(void *param, uint *len, bool server) +{ + *len = 0; +} + void reportError(void * callbackObj, NodeId nodeId, TransporterError errorCode, const char *info)