=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h' --- storage/ndb/include/kernel/GlobalSignalNumbers.h 2009-10-15 12:36:53 +0000 +++ storage/ndb/include/kernel/GlobalSignalNumbers.h 2010-02-08 11:34:20 +0000 @@ -191,7 +191,7 @@ extern const GlobalSignalNumber NO_OF_SI /* 120 not unused */ #define GSN_ROUTE_ORD 121 #define GSN_NODE_VERSION_REP 122 -/* 123 unused */ +/* 123 not unused */ /* 124 unused */ #define GSN_CHECK_LCP_STOP 125 #define GSN_CLOSE_COMCONF 126 /* local */ @@ -673,6 +673,7 @@ extern const GlobalSignalNumber NO_OF_SI #define GSN_TCINDXNEXTCONF 525 #define GSN_TCINDXNEXREF 526 #define GSN_FIRE_TRIG_ORD 527 +#define GSN_FIRE_TRIG_ORD_L 123 /* local from TUP to SUMA */ /** * These are used only by kernel === modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp' --- storage/ndb/src/common/transporter/TCP_Transporter.cpp 2009-12-15 16:00:47 +0000 +++ storage/ndb/src/common/transporter/TCP_Transporter.cpp 2010-02-08 20:53:22 +0000 @@ -76,9 +76,11 @@ setIf(int& ref, Uint32 val, Uint32 def) static Uint32 overload_limit(const TransporterConfiguration* conf) { - return (conf->tcp.tcpOverloadLimit ? - conf->tcp.tcpOverloadLimit : - conf->tcp.sendBufferSize*4/5); + Uint32 u = (conf->tcp.tcpOverloadLimit ? + conf->tcp.tcpOverloadLimit : + conf->tcp.sendBufferSize*4/5); + printf("overload_limit: %u\n", u); + return u; } === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- storage/ndb/src/common/transporter/TransporterRegistry.cpp 2009-10-21 19:08:32 +0000 +++ storage/ndb/src/common/transporter/TransporterRegistry.cpp 2010-02-08 20:10:47 +0000 @@ -189,6 +189,8 @@ TransporterRegistry::allocate_send_buffe m_send_buffer_memory = new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)]; + ndbout_c("allocated %llu bytes for sendbuffer", + UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)); if (m_send_buffer_memory == NULL) { ndbout << "Unable to allocate " === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2010-02-04 21:15:23 +0000 +++ storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2010-02-08 20:34:54 +0000 @@ -13752,8 +13752,9 @@ void Dblqh::execSUB_GCP_COMPLETE_REP(Signal* signal) { jamEntry(); - sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal, - signal->getLength(), JBB); + Uint32 len = signal->getLength(); + EXECUTE_DIRECT(DBTUP, GSN_SUB_GCP_COMPLETE_REP, signal, len); + sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal, len, JBB); } /* ------------------------------------------------------------------------- */ === modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp' --- storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2010-01-13 13:28:03 +0000 +++ storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2010-02-08 20:34:54 +0000 @@ -1777,6 +1777,7 @@ private: void execFSREMOVECONF(Signal*); void execDBINFO_SCANREQ(Signal*); + void execSUB_GCP_COMPLETE_REP(Signal*); //------------------------------------------------------------------ //------------------------------------------------------------------ @@ -2611,15 +2612,6 @@ private: Uint32 noOfAttributes, Uint32* inBuffer); - void sendFireTrigOrd(Signal* signal, - KeyReqStruct *req_struct, - Operationrec * regOperPtr, - TupTriggerData* trigPtr, - Uint32 fragmentId, - Uint32 noPrimKeySignals, - Uint32 noBeforeSignals, - Uint32 noAfterSignals); - bool primaryKey(Tablerec* const, Uint32); // these set terrorCode and return non-zero on error @@ -2658,6 +2650,17 @@ private: void removeTuxEntries(Signal* signal, Tablerec* regTabPtr); + void ndbmtd_buffer_suma_trigger(Signal* signal, Uint32 len, + LinearSectionPtr ptr[]); + void flush_ndbmtd_suma_buffer(Signal*); + + struct SumaTriggerBuffer + { + SumaTriggerBuffer() { m_pageId = RNIL; m_freeWords = 0;} + Uint32 m_pageId; + Uint32 m_freeWords; + } m_suma_trigger_buffer; + // ***************************************************************** // Error Handling routines. // ***************************************************************** === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp' --- storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2010-01-13 13:28:03 +0000 +++ storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2010-02-08 20:34:54 +0000 @@ -125,6 +125,7 @@ Dbtup::Dbtup(Block_context& ctx, Uint32 addRecSignal(GSN_FSREMOVECONF, &Dbtup::execFSREMOVECONF, true); addRecSignal(GSN_DROP_FRAG_REQ, &Dbtup::execDROP_FRAG_REQ); + addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Dbtup::execSUB_GCP_COMPLETE_REP); fragoperrec = 0; fragrecord = 0; === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp' --- storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2009-12-14 22:14:34 +0000 +++ storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2010-02-08 20:34:54 +0000 @@ -939,6 +939,7 @@ out: jam(); return; } + //-------------------------------------------------------------------- // Now all data for this trigger has been read. It is now time to send // the trigger information consisting of two or three sets of TRIG_ @@ -1090,7 +1091,7 @@ out: if (executeDirect) { jam(); - EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef), + EXECUTE_DIRECT(refToMain(ref), GSN_FIRE_TRIG_ORD, signal, FireTrigOrd::SignalLengthSuma); @@ -1106,8 +1107,17 @@ out: ptr[1].sz = noBeforeWords; ptr[2].p = afterBuffer; ptr[2].sz = noAfterWords; - sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD, - signal, FireTrigOrd::SignalLengthSuma, JBB, ptr, 3); + if (refToMain(ref) == SUMA && (refToInstance(ref) != instance())) + { + jam(); + ndbmtd_buffer_suma_trigger(signal, FireTrigOrd::SignalLengthSuma, ptr); + } + else + { + jam(); + sendSignal(ref, GSN_FIRE_TRIG_ORD, + signal, FireTrigOrd::SignalLengthSuma, JBB, ptr, 3); + } } break; case (TriggerType::SUBSCRIPTION): @@ -1119,7 +1129,7 @@ out: if (executeDirect) { jam(); - EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef), + EXECUTE_DIRECT(refToMain(ref), GSN_FIRE_TRIG_ORD, signal, FireTrigOrd::SignalWithGCILength); @@ -1137,7 +1147,7 @@ out: ptr[1].sz = noBeforeWords; ptr[2].p = afterBuffer; ptr[2].sz = noAfterWords; - sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD, + sendSignal(ref, GSN_FIRE_TRIG_ORD, signal, FireTrigOrd::SignalWithGCILength, JBB, ptr, 3); } break; @@ -1370,17 +1380,6 @@ void Dbtup::sendTrigAttrInfo(Signal* sig } while (dataLen != dataIndex); } -void Dbtup::sendFireTrigOrd(Signal* signal, - KeyReqStruct *req_struct, - Operationrec * const regOperPtr, - TupTriggerData* const trigPtr, - Uint32 fragmentId, - Uint32 noPrimKeyWords, - Uint32 noBeforeValueWords, - Uint32 noAfterValueWords) -{ -} - /* * Ordered index triggers. * @@ -1583,3 +1582,93 @@ Dbtup::removeTuxEntries(Signal* signal, triggerList.next(triggerPtr); } } + +void +Dbtup::ndbmtd_buffer_suma_trigger(Signal * signal, + Uint32 len, + LinearSectionPtr sec[3]) +{ + jam(); + Uint32 tot = len + 5; + for (Uint32 i = 0; i<3; i++) + tot += sec[i].sz; + + Uint32 * ptr = 0; + Uint32 free = m_suma_trigger_buffer.m_freeWords; + Uint32 pageId = m_suma_trigger_buffer.m_pageId; + if (free < tot) + { + jam(); + if (pageId != RNIL) + { + flush_ndbmtd_suma_buffer(signal); + } + ndbassert(m_suma_trigger_buffer.m_pageId == RNIL); + void * vptr = m_ctx.m_mm.alloc_page(RT_DBTUP_PAGE, + &m_suma_trigger_buffer.m_pageId, + Ndbd_mem_manager::NDB_ZONE_ANY); + ptr = reinterpret_cast(vptr); + free = GLOBAL_PAGE_SIZE_WORDS - tot; + } + else + { + jam(); + ptr = reinterpret_cast(c_page_pool.getPtr(pageId)); + ptr += (GLOBAL_PAGE_SIZE_WORDS - free); + free -= tot; + } + + if (likely(ptr != 0)) + { + * ptr++ = tot; + * ptr++ = len; + * ptr++ = sec[0].sz; + * ptr++ = sec[1].sz; + * ptr++ = sec[2].sz; + memcpy(ptr, signal->getDataPtrSend(), 4 * len); + ptr += len; + for (Uint32 i = 0; i<3; i++) + { + memcpy(ptr, sec[i].p, 4 * sec[i].sz); + ptr += sec[i].sz; + } + + m_suma_trigger_buffer.m_freeWords = free; + if (free < (len + 5)) + { + flush_ndbmtd_suma_buffer(signal); + } + } +} + +void +Dbtup::flush_ndbmtd_suma_buffer(Signal* signal) +{ + jam(); + + Uint32 pageId = m_suma_trigger_buffer.m_pageId; + Uint32 free = m_suma_trigger_buffer.m_freeWords; + + if (pageId != RNIL) + { + jam(); + Uint32 save[2]; + save[0] = signal->theData[0]; + save[1] = signal->theData[1]; + signal->theData[0] = pageId; + signal->theData[1] = GLOBAL_PAGE_SIZE_WORDS - free; + sendSignal(SUMA_REF, GSN_FIRE_TRIG_ORD_L, signal, 2, JBB); + + signal->theData[0] = save[0]; + signal->theData[1] = save[1]; + } + + m_suma_trigger_buffer.m_pageId = RNIL; + m_suma_trigger_buffer.m_freeWords = 0; +} + +void +Dbtup::execSUB_GCP_COMPLETE_REP(Signal* signal) +{ + flush_ndbmtd_suma_buffer(signal); +} === modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp' --- storage/ndb/src/kernel/blocks/suma/Suma.cpp 2010-01-26 14:03:52 +0000 +++ storage/ndb/src/kernel/blocks/suma/Suma.cpp 2010-02-08 20:34:54 +0000 @@ -3971,6 +3971,7 @@ reformat(Signal* signal, LinearSectionPt noOfAttrs++; dataLen += len; + assert(sz_1 >= (1 + len)); sz_1 -= (1 + len); } assert(sz_1 == 0); @@ -3984,6 +3985,54 @@ reformat(Signal* signal, LinearSectionPt return sz_2 > 0 ? 3 : 2; } +/** + * Pass entire pages with SUMA-trigger-data from + * TUP to SUMA to avoid extensive LongSignalMessage buffer contention + */ +void +Suma::execFIRE_TRIG_ORD_L(Signal* signal) +{ + jamEntry(); + + ndbassert(signal->getNoOfSections() == 0); + Uint32 pageId = signal->theData[0]; + Uint32 len = signal->theData[1]; + Uint32 * ptr = reinterpret_cast(c_page_pool.getPtr(pageId)); + while (len) + { + Uint32 * save = ptr; + Uint32 msglen = * ptr++; + Uint32 siglen = * ptr++; + Uint32 sec0len = * ptr++; + Uint32 sec1len = * ptr++; + Uint32 sec2len = * ptr++; + + /** + * Copy value directly into local buffers + */ + Uint32 trigId = ((FireTrigOrd*)ptr)->getTriggerId(); + memcpy(signal->theData, ptr, 4 * siglen); // signal + ptr += siglen; + memcpy(f_buffer, ptr, 4*sec0len); + ptr += sec0len; + memcpy(b_buffer, ptr, 4*sec1len); + ptr += sec1len; + memcpy(f_buffer + sec0len, ptr, 4*sec2len); + ptr += sec2len; + + f_trigBufferSize = sec0len + sec2len; + b_trigBufferSize = sec1len; + f_bufferLock = trigId; + b_bufferLock = trigId; + + execFIRE_TRIG_ORD(signal); + ndbrequire(len >= msglen); + len -= msglen; + } + + m_ctx.m_mm.release_page(RT_DBTUP_PAGE, pageId); +} + void Suma::execFIRE_TRIG_ORD(Signal* signal) { @@ -4004,7 +4053,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF); ndbassert(gci > m_last_complete_gci); - + if (signal->getNoOfSections()) { jam(); @@ -4054,7 +4103,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) LinearSectionPtr ptr[3]; const Uint32 nptr= reformat(signal, ptr, - f_buffer, sz, b_buffer, b_trigBufferSize); + f_buffer, f_trigBufferSize, + b_buffer, b_trigBufferSize); Uint32 ptrLen= 0; for(Uint32 i =0; i < nptr; i++) ptrLen+= ptr[i].sz; === modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp' --- storage/ndb/src/kernel/blocks/suma/Suma.hpp 2010-01-26 14:03:52 +0000 +++ storage/ndb/src/kernel/blocks/suma/Suma.hpp 2010-02-08 20:34:54 +0000 @@ -87,6 +87,7 @@ public: */ void execTRIG_ATTRINFO(Signal* signal); void execFIRE_TRIG_ORD(Signal* signal); + void execFIRE_TRIG_ORD_L(Signal* signal); void execSUB_GCP_COMPLETE_REP(Signal* signal); /** === modified file 'storage/ndb/src/kernel/blocks/suma/SumaInit.cpp' --- storage/ndb/src/kernel/blocks/suma/SumaInit.cpp 2009-05-27 15:21:45 +0000 +++ storage/ndb/src/kernel/blocks/suma/SumaInit.cpp 2010-02-08 20:34:54 +0000 @@ -115,6 +115,7 @@ Suma::Suma(Block_context& ctx) : */ addRecSignal(GSN_TRIG_ATTRINFO, &Suma::execTRIG_ATTRINFO); addRecSignal(GSN_FIRE_TRIG_ORD, &Suma::execFIRE_TRIG_ORD); + addRecSignal(GSN_FIRE_TRIG_ORD_L, &Suma::execFIRE_TRIG_ORD_L); addRecSignal(GSN_CREATE_TRIG_IMPL_REF, &Suma::execCREATE_TRIG_IMPL_REF); addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &Suma::execCREATE_TRIG_IMPL_CONF); === modified file 'storage/ndb/src/kernel/ndbd.cpp' --- storage/ndb/src/kernel/ndbd.cpp 2010-01-26 14:09:45 +0000 +++ storage/ndb/src/kernel/ndbd.cpp 2010-02-08 20:21:44 +0000 @@ -197,8 +197,14 @@ init_global_memory_manager(EmulatorData Uint32 sbpages = 0; if (globalTransporterRegistry.get_using_default_send_buffer() == false) { - Uint64 mem = globalTransporterRegistry.get_total_max_send_buffer(); - sbpages = Uint32((mem + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE); + Uint32 sbmem = 0; + ndb_mgm_get_int_parameter(p, CFG_TOTAL_SEND_BUFFER_MEMORY, &sbmem); + if (sbmem == 0) + { + sbmem = globalTransporterRegistry.get_total_max_send_buffer(); + } + ndbout_c("-- allocated %llu bytes send buffer", sbmem); + sbpages = Uint32((sbmem + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE); Resource_limit rl; rl.m_min = sbpages; rl.m_max = sbpages; === modified file 'storage/ndb/src/kernel/vm/ArrayPool.hpp' --- storage/ndb/src/kernel/vm/ArrayPool.hpp 2009-11-12 22:29:56 +0000 +++ storage/ndb/src/kernel/vm/ArrayPool.hpp 2010-02-08 08:58:37 +0000 @@ -83,7 +83,7 @@ public: } inline void decNoFree(Uint32 cnt) { - assert(noOfFree > cnt); + assert(noOfFree >= cnt); noOfFree -= cnt; updateFreeMin(); }