=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h' --- storage/ndb/include/kernel/GlobalSignalNumbers.h 2009-05-27 15:21:45 +0000 +++ storage/ndb/include/kernel/GlobalSignalNumbers.h 2009-08-19 15:12:12 +0000 @@ -816,8 +816,9 @@ extern const GlobalSignalNumber NO_OF_SI #define GSN_ALTER_TABLE_REP 606 #define GSN_API_BROADCAST_REP 607 -#define GSN_608 -#define GSN_609 + +#define GSN_SYNC_THREAD_REQ 608 +#define GSN_SYNC_THREAD_CONF 609 #define GSN_610 #define GSN_611 === modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp' --- storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2009-08-18 06:59:17 +0000 +++ storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2009-08-19 15:17:33 +0000 @@ -755,6 +755,7 @@ private: void execDIH_SCAN_TAB_COMPLETE_REP(Signal*); void execGCP_SAVEREF(Signal *); void execGCP_TCFINISHED(Signal *); + void execGCP_TCFINISHED_sync_conf(Signal* signal, Uint32 cb, Uint32 err); void execREAD_NODESCONF(Signal *); void execNDB_STTOR(Signal *); void execDICTSTARTCONF(Signal *); === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2009-08-18 13:44:17 +0000 +++ storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2009-08-19 15:25:10 +0000 @@ -9437,8 +9437,25 @@ void Dbdih::execGCP_TCFINISHED(Signal* s #endif ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMIT); + + /** + * Make sure that each LQH gets scheduled, so that they don't get out of sync + * wrt to SUB_GCP_COMPLETE_REP + */ + Callback cb; + cb.m_callbackData = 10; + cb.m_callbackFunction = safe_cast(&Dbdih::execGCP_TCFINISHED_sync_conf); + Uint32 blocks[] = { DBLQH, 0 }; + synchronize_threads_for_blocks(signal, blocks, cb); +}//Dbdih::execGCP_TCFINISHED() + +void +Dbdih::execGCP_TCFINISHED_sync_conf(Signal* signal, Uint32 cb, Uint32 err) +{ + ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMIT); + m_micro_gcp.m_state = MicroGcp::M_GCP_COMMITTED; - retRef = m_micro_gcp.m_master_ref; + Uint32 retRef = m_micro_gcp.m_master_ref; GCPNodeFinished* conf2 = (GCPNodeFinished*)signal->getDataPtrSend(); conf2->nodeId = cownNodeId; @@ -9447,7 +9464,7 @@ void Dbdih::execGCP_TCFINISHED(Signal* s conf2->gci_lo = (Uint32)(m_micro_gcp.m_old_gci & 0xFFFFFFFF); sendSignal(retRef, GSN_GCP_NODEFINISH, signal, GCPNodeFinished::SignalLength, JBB); -}//Dbdih::execGCP_TCFINISHED() +} void Dbdih::execSUB_GCP_COMPLETE_REP(Signal* signal) === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp' --- storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-05-27 15:21:45 +0000 +++ storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-08-19 15:15:52 +0000 @@ -158,6 +158,10 @@ SimulatedBlock::initCommon() count = 5; this->getParam("ActiveCounters", &count); c_counterMgr.setSize(count); + + count = 5; + this->getParam("ActiveThreadSync", &count); + c_syncThreadPool.setSize(count); } SimulatedBlock::~SimulatedBlock() @@ -208,6 +212,8 @@ SimulatedBlock::installSimulatedBlockFun a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP; a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED; a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF; + a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ; + a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF; } void @@ -1918,7 +1924,6 @@ SimulatedBlock::execCALLBACK_CONF(Signal Uint32 senderRef = conf->senderRef; ndbrequire(m_callbackTableAddr != 0); - const CallbackTable& ct = *m_callbackTableAddr; const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex); CallbackFunction function = ce.m_function; @@ -3339,3 +3344,66 @@ SimulatedBlock::debugOutTag(char *buf, i return buf; } #endif + +void +SimulatedBlock::synchronize_threads_for_blocks(Signal * signal, + const Uint32 blocks[], + const Callback & cb, + JobBufferLevel prio) +{ +#ifndef NDBD_MULTITHREADED + Callback copy = cb; + execute(signal, copy, 0); +#else + ljam(); + Uint32 ref[32]; // max threads + Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(), + ref, NDB_ARRAY_SIZE(ref)); + if (cnt == 0) + { + ljam(); + Callback copy = cb; + execute(signal, copy, 0); + return; + } + + Ptr ptr; + ndbrequire(c_syncThreadPool.seize(ptr)); + ptr.p->m_cnt = cnt; + ptr.p->m_callback = cb; + + signal->theData[0] = reference(); + signal->theData[1] = ptr.i; + signal->theData[2] = Uint32(prio); + for (Uint32 i = 0; itheData[0]; + Uint32 prio = signal->theData[2]; + sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(), + JobBufferLevel(prio)); +} + +void +SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal) +{ + ljamEntry(); + Ptr ptr; + c_syncThreadPool.getPtr(ptr, signal->theData[1]); + if (ptr.p->m_cnt == 1) + { + ljam(); + execute(signal, ptr.p->m_callback, 0); + c_syncThreadPool.release(ptr); + return; + } + ptr.p->m_cnt --; +} === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp' --- storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-05-27 15:21:45 +0000 +++ storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-08-19 15:16:30 +0000 @@ -141,6 +141,14 @@ protected: void initCommon(); public: + typedef void (SimulatedBlock::* CallbackFunction)(class Signal*, + Uint32 callbackData, + Uint32 returnCode); + struct Callback { + CallbackFunction m_callbackFunction; + Uint32 m_callbackData; + }; + /** * */ @@ -189,15 +197,26 @@ public: static Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId); static Uint32 getInstanceFromKey(Uint32 instanceKey); // local use only -public: - typedef void (SimulatedBlock::* CallbackFunction)(class Signal*, - Uint32 callbackData, - Uint32 returnCode); - struct Callback { - CallbackFunction m_callbackFunction; - Uint32 m_callbackData; - }; + /** + * This method will make sure that when callback in called each + * thread running an instance any of the threads in blocks[] + * will have executed a signal + */ + void synchronize_threads_for_blocks(Signal*, const Uint32 blocks[], + const Callback&, JobBufferLevel = JBB); +private: + struct SyncThreadRecord + { + Callback m_callback; + Uint32 m_cnt; + Uint32 nextPool; + }; + ArrayPool c_syncThreadPool; + void execSYNC_THREAD_REQ(Signal*); + void execSYNC_THREAD_CONF(Signal*); + +public: virtual const char* get_filename(Uint32 fd) const { return "";} protected: static Callback TheEmptyCallback; === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- storage/ndb/src/kernel/vm/mt.cpp 2009-06-06 12:20:07 +0000 +++ storage/ndb/src/kernel/vm/mt.cpp 2009-08-19 15:12:12 +0000 @@ -3995,6 +3995,37 @@ lookup_lock(const void * ptr) return 0; } +Uint32 +mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId, + Uint32 dst[], Uint32 len) +{ + Uint32 cnt = 0; + Bitmask<(MAX_THREADS+31)/32> mask; + mask.set(threadId); + for (Uint32 i = 0; blocks[i] != 0; i++) + { + Uint32 block = blocks[i]; + /** + * Find each thread that has instance of block + */ + assert(block == blockToMain(block)); + Uint32 index = block - MIN_BLOCK_NO; + for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++) + { + Uint32 thr_no = thr_map[index][instance].thr_no; + if (thr_no == thr_map_entry::NULL_THR_NO) + break; + + if (mask.get(thr_no)) + continue; + + mask.set(thr_no); + require(cnt < len); + dst[cnt++] = numberToRef(block, instance, 0); + } + } + return cnt; +} /** * Global data === modified file 'storage/ndb/src/kernel/vm/mt.hpp' --- storage/ndb/src/kernel/vm/mt.hpp 2008-11-15 15:43:59 +0000 +++ storage/ndb/src/kernel/vm/mt.hpp 2009-08-19 15:12:12 +0000 @@ -56,4 +56,16 @@ SendStatus mt_send_remote(Uint32 self, c void mt_section_lock(); void mt_section_unlock(); +/** + * Get list of BlockReferences so that + * each thread holding an instance of any block in blocks[] get "covered" + * (excluding ownThreadId + * + * eg. calling it with DBLQH, will return a block-reference to *a* block + * in each of the threads that has an DBLQH instance + */ +Uint32 mt_get_thread_references_for_blocks(const Uint32 blocks[], + Uint32 ownThreadId, + Uint32 dst[], Uint32 len); + #endif