--- tmp50/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2005-04-19 13:07:21.000000000 +0200 +++ mysql-5.0/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2005-04-14 13:45:39.000000000 +0200 @@ -2033,9 +2033,7 @@ /* --------------------------------------------------------------------------------- */ Uint32 Dbacc::placeReadInLockQueue(Signal* signal) { - tgnptMainOpPtr = queOperPtr; - getNoParallelTransaction(signal); - if (tgnptNrTransaction == 1) { + if (getNoParallelTransaction(queOperPtr.p) == 1) { if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) && (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) { /* --------------------------------------------------------------------------------- */ @@ -2118,9 +2116,7 @@ checkOnlyReadEntry(signal); return; }//if - tgnptMainOpPtr = readWriteOpPtr; - getNoParallelTransaction(signal); - if (tgnptNrTransaction == 1) { + if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { jam(); /* --------------------------------------------------------------------------------- */ /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ @@ -2201,24 +2197,23 @@ /* --------------------------------------------------------------------------------- */ /* GET_NO_PARALLEL_TRANSACTION */ /* --------------------------------------------------------------------------------- */ -void Dbacc::getNoParallelTransaction(Signal* signal) +Uint32 +Dbacc::getNoParallelTransaction(const Operationrec * op) { - OperationrecPtr tnptOpPtr; - - tgnptNrTransaction = 1; - tnptOpPtr.i = tgnptMainOpPtr.p->nextParallelQue; - while ((tnptOpPtr.i != RNIL) && - (tgnptNrTransaction == 1)) { + OperationrecPtr tmp; + + tmp.i= op->nextParallelQue; + Uint32 transId[2] = { op->transId1, op->transId2 }; + while (tmp.i != RNIL) + { jam(); - ptrCheckGuard(tnptOpPtr, coprecsize, operationrec); - if ((tnptOpPtr.p->transId1 == tgnptMainOpPtr.p->transId1) && - (tnptOpPtr.p->transId2 == tgnptMainOpPtr.p->transId2)) { - tnptOpPtr.i = tnptOpPtr.p->nextParallelQue; - } else { - jam(); - tgnptNrTransaction++; - }//if - }//while + ptrCheckGuard(tmp, coprecsize, operationrec); + if (tmp.p->transId1 == transId[0] && tmp.p->transId2 == transId[1]) + tmp.i = tmp.p->nextParallelQue; + else + return 2; + } + return 1; }//Dbacc::getNoParallelTransaction() void Dbacc::moveLastParallelQueue(Signal* signal) @@ -2259,9 +2254,7 @@ /* --------------------------------------------------------------------------------- */ Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) { - tgnptMainOpPtr = queOperPtr; - getNoParallelTransaction(signal); - if (!((tgnptNrTransaction == 1) && + if (!((getNoParallelTransaction(queOperPtr.p) == 1) && (queOperPtr.p->transId1 == operationRecPtr.p->transId1) && (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) { jam(); @@ -2312,9 +2305,7 @@ }//if readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - tgnptMainOpPtr = readWriteOpPtr; - getNoParallelTransaction(signal); - if (tgnptNrTransaction == 1) { + if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { /* --------------------------------------------------------------------------------- */ /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ @@ -4449,9 +4440,154 @@ ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; }//if - }//if + + /** + * Check possible lock upgrade + * 1) Find lock owner + * 2) Count transactions in parallel que + * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner) + * upgrade next serial + */ + if(operationRecPtr.p->lockMode) + { + jam(); + /** + * Committing a non shared operation can't lead to lock upgrade + */ + return; + } + + OperationrecPtr lock_owner; + lock_owner.i = operationRecPtr.p->prevParallelQue; + ptrCheckGuard(lock_owner, coprecsize, operationrec); + Uint32 transid[2] = { lock_owner.p->transId1, + lock_owner.p->transId2 }; + + + while(lock_owner.p->prevParallelQue != RNIL) + { + lock_owner.i = lock_owner.p->prevParallelQue; + ptrCheckGuard(lock_owner, coprecsize, operationrec); + + if(lock_owner.p->transId1 != transid[0] || + lock_owner.p->transId2 != transid[1]) + { + jam(); + /** + * If more than 1 trans in lock queue -> no lock upgrade + */ + return; + } + } + + check_lock_upgrade(signal, lock_owner, operationRecPtr); + } }//Dbacc::commitOperation() +void +Dbacc::check_lock_upgrade(Signal* signal, + OperationrecPtr lock_owner, + OperationrecPtr release_op) +{ + if((lock_owner.p->transId1 == release_op.p->transId1 && + lock_owner.p->transId2 == release_op.p->transId2) || + release_op.p->lockMode || + lock_owner.p->nextSerialQue == RNIL) + { + jam(); + /** + * No lock upgrade if same trans or lock owner has no serial queue + * or releasing non shared op + */ + return; + } + + OperationrecPtr next; + next.i = lock_owner.p->nextSerialQue; + ptrCheckGuard(next, coprecsize, operationrec); + + if(lock_owner.p->transId1 != next.p->transId1 || + lock_owner.p->transId2 != next.p->transId2) + { + jam(); + /** + * No lock upgrad if !same trans in serial queue + */ + return; + } + + if (getNoParallelTransaction(lock_owner.p) > 1) + { + jam(); + /** + * No lock upgrade if more than 1 transaction in parallell queue + */ + return; + } + + if (getNoParallelTransaction(next.p) > 1) + { + jam(); + /** + * No lock upgrade if more than 1 transaction in next's parallell queue + */ + return; + } + + OperationrecPtr tmp; + tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue; + if(tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + ndbassert(tmp.p->prevSerialQue == next.i); + tmp.p->prevSerialQue = lock_owner.i; + } + next.p->nextSerialQue = next.p->prevSerialQue = RNIL; + + // Find end of parallell que + tmp = lock_owner; + while(tmp.p->nextParallelQue != RNIL) + { + jam(); + tmp.i = tmp.p->nextParallelQue; + ptrCheckGuard(tmp, coprecsize, operationrec); + } + + next.p->prevParallelQue = tmp.i; + tmp.p->nextParallelQue = next.i; + + OperationrecPtr save = operationRecPtr; + Uint32 lockMode = lock_owner.p->lockMode; + + Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads + Uint32 ThashValue = lock_owner.p->hashValue; + Uint32 localdata[2]; + localdata[0] = lock_owner.p->localdata[0]; + localdata[1] = lock_owner.p->localdata[1]; + do { + next.p->elementIsDisappeared = TelementIsDisappeared; + next.p->hashValue = ThashValue; + next.p->localdata[0] = localdata[0]; + next.p->localdata[1] = localdata[1]; + + operationRecPtr = next; + next.p->lockMode = lockMode; + TelementIsDisappeared = executeNextOperation(signal); + if (next.p->nextParallelQue != RNIL) + { + jam(); + next.i = next.p->nextParallelQue; + ptrCheckGuard(next, coprecsize, operationrec); + } else { + jam(); + break; + }//if + } while (1); + + operationRecPtr = save; + +} + /* ------------------------------------------------------------------------- */ /* RELEASELOCK */ /* RESETS LOCK OF AN ELEMENT. */ @@ -4488,6 +4624,8 @@ ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i; }//if + + check_lock_upgrade(signal, copyInOperPtr, operationRecPtr); /* --------------------------------------------------------------------------------- */ /* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */ /* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */ --- tmp50/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2005-04-19 13:07:21.000000000 +0200 +++ mysql-5.0/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2005-04-14 13:32:41.000000000 +0200 @@ -984,7 +984,7 @@ Uint32 placeReadInLockQueue(Signal* signal); void placeSerialQueueRead(Signal* signal); void checkOnlyReadEntry(Signal* signal); - void getNoParallelTransaction(Signal* signal); + Uint32 getNoParallelTransaction(const Operationrec*); void moveLastParallelQueue(Signal* signal); void moveLastParallelQueueWrite(Signal* signal); Uint32 placeWriteInLockQueue(Signal* signal); @@ -1045,6 +1045,8 @@ Uint32 executeNextOperation(Signal* signal); void releaselock(Signal* signal); void takeOutFragWaitQue(Signal* signal); + void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner, + OperationrecPtr release_op); void allocOverflowPage(Signal* signal); bool getrootfragmentrec(Signal* signal, RootfragmentrecPtr&, Uint32 fragId); void insertLockOwnersList(Signal* signal, const OperationrecPtr&); @@ -1206,7 +1208,6 @@ OperationrecPtr mlpqOperPtr; OperationrecPtr queOperPtr; OperationrecPtr readWriteOpPtr; - OperationrecPtr tgnptMainOpPtr; Uint32 cfreeopRec; Uint32 coprecsize; /* --------------------------------------------------------------------------------- */ @@ -1412,7 +1413,6 @@ Uint32 turlIndex; Uint32 tlfrTmp1; Uint32 tlfrTmp2; - Uint32 tgnptNrTransaction; Uint32 tscanTrid1; Uint32 tscanTrid2;