=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp' --- storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-09-02 17:24:52 +0000 +++ storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-10-12 10:11:23 +0000 @@ -51,6 +51,7 @@ #include #include +#include #define ZREPORT_MEMORY_USAGE 1000 @@ -2678,6 +2679,31 @@ Cmvmi::execTESTSIG(Signal* signal){ return; } + /** + * Testing Api fragmented signal send/receive + */ + if (testType == 40) + { + /* Fragmented signal sent from Api, we'll check it and return it */ + Uint32 expectedVal = 0; + for (Uint32 s = 0; s < handle.m_cnt; s++) + { + SectionReader sr(handle.m_ptr[s].i, getSectionSegmentPool()); + Uint32 received; + while (sr.getWord(&received)) + { + ndbrequire(received == expectedVal ++); + } + } + + /* Now return it back to the Api, no callback, so framework + * can time-slice the send + */ + sendFragmentedSignal(ref, GSN_TESTSIG, signal, signal->length(), JBB, &handle); + + return; + } + if(signal->getSendersBlockRef() == ref){ /** * Signal from API (not via NodeReceiverGroup) === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-09-09 10:48:14 +0000 +++ storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-10-12 10:11:23 +0000 @@ -1191,9 +1191,11 @@ TransporterFacade::sendFragmentedSignal( /* This section fits whole, move onto next */ this_chunk_sz+= remaining_sec_sz; i++; + continue; } else { + assert(this_chunk_sz <= CHUNK_SZ); /* This section doesn't fit, truncate it */ unsigned send_sz= CHUNK_SZ - this_chunk_sz; if (i != start_i) @@ -1205,19 +1207,34 @@ TransporterFacade::sendFragmentedSignal( * The final piece does not need to be a multiple of * NDB_SECTION_SEGMENT_SZ * - * Note that this can push this_chunk_sz above CHUNK_SZ - * Should probably round-down, but need to be careful of - * 'can't fit any' cases. Instead, CHUNK_SZ is defined - * with some slack below MAX_SENT_MESSAGE_BYTESIZE + * We round down the available send space to the nearest whole + * number of segments. + * If there's not enough space for one segment, then we round up + * to one segment. This can make us send more than CHUNK_SZ, which + * is ok as it's defined as less than the maximum message length. */ - send_sz= - NDB_SECTION_SEGMENT_SZ - *((send_sz+NDB_SECTION_SEGMENT_SZ-1) - /NDB_SECTION_SEGMENT_SZ); - if (send_sz > remaining_sec_sz) - send_sz= remaining_sec_sz; + send_sz = (send_sz / NDB_SECTION_SEGMENT_SZ) * + NDB_SECTION_SEGMENT_SZ; /* Round down */ + send_sz = MAX(send_sz, NDB_SECTION_SEGMENT_SZ); /* At least one */ + send_sz = MIN(send_sz, remaining_sec_sz); /* Only actual data */ + + /* If we've squeezed the last bit of data in, jump out of + * here to send the last fragment. + * Otherwise, send what we've collected so far. + */ + if ((send_sz == remaining_sec_sz) && /* All sent */ + (i == secs - 1)) /* No more sections */ + { + this_chunk_sz+= remaining_sec_sz; + i++; + continue; + } } + /* At this point, there must be data to send in a further signal */ + assert((send_sz < remaining_sec_sz) || + (i < secs - 1)); + /* Modify tmp generic section ptr to describe truncated * section */ @@ -1256,9 +1273,6 @@ TransporterFacade::sendFragmentedSignal( tmp_signal.readSignalNumber() == GSN_API_REGREQ); } } - // setup variables for next signal - start_i= i; - this_chunk_sz= 0; assert(remaining_sec_sz >= send_sz); Uint32 remaining= remaining_sec_sz - send_sz; tmp_ptr[i].sz= remaining; @@ -1271,6 +1285,10 @@ TransporterFacade::sendFragmentedSignal( if (remaining == 0) /* This section's done, move onto the next */ i++; + + // setup variables for next signal + start_i= i; + this_chunk_sz= 0; } } === modified file 'storage/ndb/test/ndbapi/Makefile.am' --- storage/ndb/test/ndbapi/Makefile.am 2011-09-13 09:10:52 +0000 +++ storage/ndb/test/ndbapi/Makefile.am 2011-10-13 09:53:33 +0000 @@ -112,6 +112,8 @@ testMgmd_CXXFLAGS = -I$(top_srcdir)/stor testSingleUserMode_SOURCES = testSingleUserMode.cpp testNativeDefault_SOURCES = testNativeDefault.cpp testNdbApi_SOURCES = testNdbApi.cpp +testNdbApi_CXXFLAGS = -I$(top_srcdir)/storage/ndb/src/ndbapi \ + -I$(top_srcdir)/storage/ndb/include/transporter testNodeRestart_SOURCES = testNodeRestart.cpp testUpgrade_SOURCES = testUpgrade.cpp testUpgrade_LDADD = $(LDADD) $(top_srcdir)/libmysql_r/libmysqlclient_r.la === modified file 'storage/ndb/test/ndbapi/testNdbApi.cpp' --- storage/ndb/test/ndbapi/testNdbApi.cpp 2011-09-29 06:48:39 +0000 +++ storage/ndb/test/ndbapi/testNdbApi.cpp 2011-10-13 11:18:05 +0000 @@ -25,6 +25,8 @@ #include #include #include +#include +#include #define MAX_NDB_OBJECTS 32678 @@ -4972,6 +4974,635 @@ int runNdbClusterConnectionConnect(NDBT_ return NDBT_OK; } +/* Testing fragmented signal send/receive */ + +/* + SectionStore + + Abstraction of long section storage api. + Used by FragmentAssembler to assemble received long sections +*/ +class SectionStore +{ +public: + virtual ~SectionStore() {}; + virtual int appendToSection(Uint32 secId, LinearSectionPtr ptr) = 0; +}; + +/* + Basic Section Store + + Naive implementation using malloc. Real usage might use something better. +*/ +class BasicSectionStore : public SectionStore +{ +public: + BasicSectionStore() + { + init(); + }; + + ~BasicSectionStore() + { + freeStorage(); + }; + + void init() + { + ptrs[0].p = NULL; + ptrs[0].sz = 0; + + ptrs[2] = ptrs[1] = ptrs[0]; + } + + void freeStorage() + { + free(ptrs[0].p); + free(ptrs[1].p); + free(ptrs[2].p); + } + + virtual int appendToSection(Uint32 secId, LinearSectionPtr ptr) + { + /* Potentially expensive re-alloc + copy */ + assert(secId < 3); + + Uint32 existingSz = ptrs[secId].sz; + Uint32* existingBuff = ptrs[secId].p; + + Uint32 newSize = existingSz + ptr.sz; + Uint32* newBuff = (Uint32*) realloc(existingBuff, newSize * 4); + + if (!newBuff) + return -1; + + memcpy(newBuff + existingSz, ptr.p, ptr.sz * 4); + + ptrs[secId].p = newBuff; + ptrs[secId].sz = existingSz + ptr.sz; + + return 0; + } + + LinearSectionPtr ptrs[3]; +}; + + + +/* + FragmentAssembler + + Used to assemble sections from multiple fragment signals, and + produce a 'normal' signal. + + Requires a SectionStore implementation to accumulate the section + fragments + + Might be useful generic utility, or not. + + Usage : + FragmentAssembler fa(ss); + while (!fa.isComplete()) + { + sig = waitSignal(); + ss.handleSignal(sig, sections); + } + + fa.getSignalHeader(); + fa.getSignalBody(); + fa.getSectionStore(); .. + +*/ +class FragmentAssembler +{ +public: + enum AssemblyError + { + NoError = 0, + FragmentSequence = 1, + FragmentSource = 2, + FragmentIdentity = 3, + SectionAppend = 4 + }; + + FragmentAssembler(SectionStore* _secStore): + secsReceived(0), + secStore(_secStore), + complete(false), + fragId(0), + sourceNode(0), + error(NoError) + {} + + int handleSignal(const SignalHeader* sigHead, + const Uint32* sigBody, + LinearSectionPtr* sections) + { + Uint32 sigLen = sigHead->theLength; + + if (fragId == 0) + { + switch (sigHead->m_fragmentInfo) + { + case 0: + { + /* Not fragmented, pass through */ + sh = *sigHead; + memcpy(signalBody, sigBody, sigLen * 4); + Uint32 numSecs = sigHead->m_noOfSections; + for (Uint32 i=0; iappendToSection(i, sections[i]) != 0) + { + error = SectionAppend; + return -1; + } + } + complete = true; + break; + } + case 1: + { + /* Start of fragmented signal */ + Uint32 incomingFragId; + Uint32 incomingSourceNode; + Uint32 numSecsInFragment; + + if (handleFragmentSections(sigHead, sigBody, sections, + &incomingFragId, &incomingSourceNode, + &numSecsInFragment) != 0) + return -1; + + assert(incomingFragId != 0); + fragId = incomingFragId; + sourceNode = incomingSourceNode; + assert(numSecsInFragment > 0); + + break; + } + default: + { + /* Error, out of sequence fragment */ + error = FragmentSequence; + return -1; + break; + } + } + } + else + { + /* FragId != 0 */ + switch (sigHead->m_fragmentInfo) + { + case 0: + case 1: + { + /* Error, out of sequence fragment */ + error = FragmentSequence; + return -1; + } + case 2: + /* Fall through */ + case 3: + { + /* Body fragment */ + Uint32 incomingFragId; + Uint32 incomingSourceNode; + Uint32 numSecsInFragment; + + if (handleFragmentSections(sigHead, sigBody, sections, + &incomingFragId, &incomingSourceNode, + &numSecsInFragment) != 0) + return -1; + + if (incomingSourceNode != sourceNode) + { + /* Error in source node */ + error = FragmentSource; + return -1; + } + if (incomingFragId != fragId) + { + error = FragmentIdentity; + return -1; + } + + if (sigHead->m_fragmentInfo == 3) + { + /* Final fragment, contains actual signal body */ + memcpy(signalBody, + sigBody, + sigLen * 4); + sh = *sigHead; + sh.theLength = sigLen - (numSecsInFragment + 1); + sh.m_noOfSections = + ((secsReceived & 4)? 1 : 0) + + ((secsReceived & 2)? 1 : 0) + + ((secsReceived & 1)? 1 : 0); + sh.m_fragmentInfo = 0; + + complete=true; + } + break; + } + default: + { + /* Bad fragmentinfo field */ + error = FragmentSequence; + return -1; + } + } + } + + return 0; + } + + int handleSignal(NdbApiSignal* signal, + LinearSectionPtr* sections) + { + return handleSignal(signal, signal->getDataPtr(), sections); + } + + bool isComplete() + { + return complete; + } + + /* Valid if isComplete() */ + SignalHeader getSignalHeader() + { + return sh; + } + + /* Valid if isComplete() */ + Uint32* getSignalBody() + { + return signalBody; + } + + /* Valid if isComplete() */ + Uint32 getSourceNode() + { + return sourceNode; + } + + SectionStore* getSectionStore() + { + return secStore; + } + + AssemblyError getError() const + { + return error; + } + +private: + int handleFragmentSections(const SignalHeader* sigHead, + const Uint32* sigBody, + LinearSectionPtr* sections, + Uint32* incomingFragId, + Uint32* incomingSourceNode, + Uint32* numSecsInFragment) + { + Uint32 sigLen = sigHead->theLength; + + *numSecsInFragment = sigHead->m_noOfSections; + assert(sigLen >= (1 + *numSecsInFragment)); + + *incomingFragId = sigBody[sigLen - 1]; + *incomingSourceNode = refToNode(sigHead->theSendersBlockRef); + const Uint32* secIds = &sigBody[sigLen - (*numSecsInFragment) - 1]; + + for (Uint32 i=0; i < *numSecsInFragment; i++) + { + secsReceived |= (1 < secIds[i]); + + if (secStore->appendToSection(secIds[i], sections[i]) != 0) + { + error = SectionAppend; + return -1; + } + } + + return 0; + } + + Uint32 secsReceived; + SectionStore* secStore; + bool complete; + Uint32 fragId; + Uint32 sourceNode; + SignalHeader sh; + Uint32 signalBody[NdbApiSignal::MaxSignalWords]; + AssemblyError error; +}; + +static const Uint32 MAX_SEND_BYTES=32768; /* Align with TransporterDefinitions.hpp */ +static const Uint32 MAX_SEND_WORDS=MAX_SEND_BYTES/4; +static const Uint32 SEGMENT_WORDS= 60; /* Align with SSPool etc */ +static const Uint32 SEGMENT_BYTES = SEGMENT_WORDS * 4; +//static const Uint32 MAX_SEGS_PER_SEND=64; /* 6.3 */ +static const Uint32 MAX_SEGS_PER_SEND = (MAX_SEND_BYTES / SEGMENT_BYTES) - 2; /* Align with TransporterFacade.cpp */ +static const Uint32 MAX_WORDS_PER_SEND = MAX_SEGS_PER_SEND * SEGMENT_WORDS; +static const Uint32 HALF_MAX_WORDS_PER_SEND = MAX_WORDS_PER_SEND / 2; +static const Uint32 THIRD_MAX_WORDS_PER_SEND = MAX_WORDS_PER_SEND / 3; +static const Uint32 MEDIUM_SIZE = 5000; + +/* Most problems occurred with sections lengths around the boundary + * of the max amount sent - MAX_WORDS_PER_SEND, so we define interesting + * sizes so that we test behavior around these boundaries + */ +static Uint32 interestingSizes[] = +{ + 0, + 1, + MEDIUM_SIZE, + THIRD_MAX_WORDS_PER_SEND -1, + THIRD_MAX_WORDS_PER_SEND, + THIRD_MAX_WORDS_PER_SEND +1, + HALF_MAX_WORDS_PER_SEND -1, + HALF_MAX_WORDS_PER_SEND, + HALF_MAX_WORDS_PER_SEND + 1, + MAX_WORDS_PER_SEND -1, + MAX_WORDS_PER_SEND, + MAX_WORDS_PER_SEND + 1, + (2* MAX_SEND_WORDS) + 1, + 1234 /* Random */ +}; + + +/* + FragSignalChecker + + Class for testing fragmented signal send + receive +*/ +class FragSignalChecker +{ +public: + + Uint32* buffer; + + FragSignalChecker() + { + buffer= NULL; + init(); + } + + ~FragSignalChecker() + { + free(buffer); + } + + void init() + { + buffer = (Uint32*) malloc(getBufferSize()); + + if (buffer) + { + /* Init to a known pattern */ + for (Uint32 i = 0; i < (getBufferSize()/4); i++) + { + buffer[i] = i; + } + } + } + + static Uint32 getNumInterestingSizes() + { + return sizeof(interestingSizes) / sizeof(Uint32); + } + + static Uint32 getNumIterationsRequired() + { + /* To get combinatorial coverage, need each of 3 + * sections with each of the interesting sizes + */ + Uint32 numSizes = getNumInterestingSizes(); + return numSizes * numSizes * numSizes; + } + + static Uint32 getSecSz(Uint32 secNum, Uint32 iter) + { + assert(secNum < 3); + Uint32 numSizes = getNumInterestingSizes(); + Uint32 divisor = (secNum == 0 ? 1 : + secNum == 1 ? numSizes : + numSizes * numSizes); + /* offset ensures only end sections are 0 length */ + Uint32 index = (iter / divisor) % numSizes; + if ((index == 0) && (iter >= (divisor * numSizes))) + index = 1; /* Avoid lower numbered section being empty */ + Uint32 value = interestingSizes[index]; + if(value == 1234) + { + value = 1 + (rand() % (2* MAX_WORDS_PER_SEND)); + } + return value; + } + + static Uint32 getBufferSize() + { + const Uint32 MaxSectionWords = (2 * MAX_SEND_WORDS) + 1; + const Uint32 MaxTotalSectionsWords = MaxSectionWords * 3; + return MaxTotalSectionsWords * 4; + } + + int sendRequest(SignalSender* ss, + Uint32* sizes) + { + /* + * We want to try out various interactions between the + * 3 sections and the length of the data sent + * - All fit in one 'chunk' + * - None fit in one 'chunk' + * - Each ends on a chunk boundary + * + * Max send size is ~ 32kB + * Segment size is 60 words / 240 bytes + * -> 136 segments / chunk + * -> 134 segments / chunk 'normally' sent + * -> 32160 bytes + */ + g_err << "Sending " + << sizes[0] + << " " << sizes[1] + << " " << sizes[2] + << endl; + + const Uint32 numSections = + (sizes[0] ? 1 : 0) + + (sizes[1] ? 1 : 0) + + (sizes[2] ? 1 : 0); + const Uint32 testType = 40; + const Uint32 fragmentLength = 1; + const Uint32 print = 1; + const Uint32 len = 5 + numSections; + SimpleSignal request(false); + + Uint32* signalBody = request.getDataPtrSend(); + signalBody[0] = ss->getOwnRef(); + signalBody[1] = testType; + signalBody[2] = fragmentLength; + signalBody[3] = print; + signalBody[4] = 0; /* Return count */ + signalBody[5] = sizes[0]; + signalBody[6] = sizes[1]; + signalBody[7] = sizes[2]; + + + request.ptr[0].sz = sizes[0]; + request.ptr[0].p = &buffer[0]; + request.ptr[1].sz = sizes[1]; + request.ptr[1].p = &buffer[sizes[0]]; + request.ptr[2].sz = sizes[2]; + request.ptr[2].p = &buffer[sizes[0] + sizes[1]]; + + request.header.m_noOfSections= numSections; + + int rc = 0; + ss->lock(); + rc = ss->sendFragmentedSignal(ss->get_an_alive_node(), + request, + CMVMI, + GSN_TESTSIG, + len); + ss->unlock(); + + if (rc != 0) + { + g_err << "Error sending signal" << endl; + return rc; + } + + return 0; + } + + int waitResponse(SignalSender* ss, + Uint32* expectedSz) + { + /* Here we need to wait for all of the signals which + * comprise a fragmented send, and check that + * the data is as expected + */ + BasicSectionStore bss; + FragmentAssembler fa(&bss); + + while(true) + { + ss->lock(); + SimpleSignal* response = ss->waitFor(10000); + ss->unlock(); + + if (!response) + { + g_err << "Timed out waiting for response" << endl; + return -1; + } + + //response->print(); + + if (response->header.theVerId_signalNumber == GSN_TESTSIG) + { + if (fa.handleSignal(&response->header, + response->getDataPtr(), + response->ptr) != 0) + { + g_err << "Error assembling fragmented signal." + << " Error is " + << (Uint32) fa.getError() + << endl; + return -1; + } + + if (fa.isComplete()) + { + Uint32 expectedWord = 0; + for (Uint32 i=0; i < 3; i++) + { + if (bss.ptrs[i].sz != expectedSz[i]) + { + g_err << "Wrong size for section : " + << i + << " expected " << expectedSz[i] + << " but received " << bss.ptrs[i].sz + << endl; + return -1; + } + + for (Uint32 d=0; d < expectedSz[i]; d++) + { + if (bss.ptrs[i].p[d] != expectedWord) + { + g_err << "Bad data in section " + << i + << " at word number " + << d + << ". Expected " + << expectedWord + << " but found " + << bss.ptrs[i].p[d] + << endl; + return -1; + } + expectedWord++; + } + } + + break; + } + + } + } + + return 0; + } + + int runTest(SignalSender* ss) + { + for (Uint32 iter=0; + iter < getNumIterationsRequired(); + iter++) + { + int rc; + Uint32 sizes[3]; + sizes[0] = getSecSz(0, iter); + sizes[1] = getSecSz(1, iter); + sizes[2] = getSecSz(2, iter); + + /* Build request, including sections */ + rc = sendRequest(ss, sizes); + if (rc != 0) + { + g_err << "Failed sending request on iteration " << iter + << " with rc " << rc << endl; + return NDBT_FAILED; + } + + /* Wait for response */ + rc = waitResponse(ss, sizes); + if (rc != 0) + { + g_err << "Failed waiting for response on iteration " << iter + << " with rc " << rc << endl; + return NDBT_FAILED; + } + } + + return NDBT_OK; + } +}; + + +int testFragmentedSend(NDBT_Context* ctx, NDBT_Step* step){ + Ndb* pNdb= GETNDB(step); + Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection(); + SignalSender ss(conn); + FragSignalChecker fsc; + + return fsc.runTest(&ss); +} + + NDBT_TESTSUITE(testNdbApi); TESTCASE("MaxNdb", @@ -5245,6 +5876,10 @@ TESTCASE("NdbClusterConnectSR", STEPS(runNdbClusterConnect, MAX_NODES); STEP(runRestarts); // Note after runNdbClusterConnect or else counting wrong } +TESTCASE("TestFragmentedSend", + "Test fragmented send behaviour"){ + INITIALIZER(testFragmentedSend); +} NDBT_TESTSUITE_END(testNdbApi); int main(int argc, const char** argv){ === modified file 'storage/ndb/test/run-test/daily-devel-tests.txt' --- storage/ndb/test/run-test/daily-devel-tests.txt 2011-10-05 13:18:31 +0000 +++ storage/ndb/test/run-test/daily-devel-tests.txt 2011-10-13 09:56:39 +0000 @@ -73,3 +73,8 @@ max-time: 1800 cmd: testDict args: -n SchemaTrans -l 1 +# Fragmented signal send +max-time 1800 +cmd: testNdbApi +args: -n TestFragmentedSend T1 +