=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp' --- storage/ndb/src/ndbapi/NdbOperationExec.cpp 2008-11-11 11:40:42 +0000 +++ storage/ndb/src/ndbapi/NdbOperationExec.cpp 2009-03-13 16:19:00 +0000 @@ -623,6 +623,7 @@ NdbOperation::buildSignalsNdbRecord(Uint const char *key_row= m_key_row; const NdbRecord *attr_rec= m_attribute_record; const char *updRow; + const bool isScanTakeover= (key_rec == NULL); TcKeyReq *tcKeyReq= CAST_PTR(TcKeyReq, theTCREQ->getDataPtrSend()); Uint32 hdrSize= fillTcKeyReqHdr(tcKeyReq, aTC_ConnectPtr, aTransId); @@ -630,7 +631,7 @@ NdbOperation::buildSignalsNdbRecord(Uint remain= TcKeyReq::MaxKeyInfo; /* Fill in keyinfo (in TCKEYREQ signal, spilling into KEYINFO signals). */ - if (!key_rec) + if (isScanTakeover) { /* This means that key_row contains the KEYINFO20 data. */ /* i.e. lock takeover */ @@ -869,18 +870,72 @@ NdbOperation::buildSignalsNdbRecord(Uint if (likely(!(col->flags & (NdbRecord::IsBlob|NdbRecord::IsMysqldBitfield)))) { - if (col->is_null(updRow)) - length= 0; - else if (!col->get_var_length(updRow, length)) + if (( ! (col->flags & NdbRecord::IsKey)) || + ( isScanTakeover ) ) + { + /* Normal path where we get data from the attr row + * Always get ATTRINFO data from the attr row for ScanTakeover + * Update as there's no key row + * This allows scan-takeover update to update pk within + * collation rules + */ + if (col->is_null(updRow)) + length= 0; + else if (!col->get_var_length(updRow, length)) + { + /* Hm, corrupt varchar length. */ + setErrorCodeAbort(4209); + return -1; + } + data= &updRow[col->offset]; + } + else { - /* Hm, corrupt varchar length. */ - setErrorCodeAbort(4209); - return -1; + /* For Insert/Write where user provides PK columns, + * take them from the key record row to avoid sending different + * values in KeyInfo and AttrInfo + * Need the correct Attr struct from the key + * record + */ + assert(key_rec != 0); /* Not scan takeover */ + assert(key_rec->m_attrId_indexes_length > attrId); + int keyColIdx= key_rec->m_attrId_indexes[attrId]; + assert(keyColIdx != -1); + col= &key_rec->columns[keyColIdx]; + assert(col->attrId == attrId); + assert(col->flags & NdbRecord::IsKey); + + /* Now get the data and length from the key row + * Any issues with key nullness should've been + * caught above + */ + assert(!col->is_null(key_row)); + length= 0; + + bool len_ok; + + if (col->flags & NdbRecord::IsMysqldShrinkVarchar) + { + /* Used to support special varchar format for mysqld keys. + * Ideally we'd avoid doing this shrink twice... + */ + len_ok= col->shrink_varchar(key_row, length, buf); + data= buf; + } + else + { + len_ok= col->get_var_length(key_row, length); + data= &key_row[col->offset]; + } + + /* Should have 'seen' any length issues when generating keyinfo above */ + assert(len_ok); } - data= &updRow[col->offset]; } else { + /* Blob or MySQLD bitfield handling */ + assert(! (col->flags & NdbRecord::IsKey)); if (likely(col->flags & NdbRecord::IsMysqldBitfield)) { /* Mysqld format bitfield. */ === modified file 'storage/ndb/test/ndbapi/testNdbApi.cpp' --- storage/ndb/test/ndbapi/testNdbApi.cpp 2008-11-08 21:06:51 +0000 +++ storage/ndb/test/ndbapi/testNdbApi.cpp 2009-03-13 16:00:04 +0000 @@ -22,6 +22,7 @@ #include #include #include +#include #define MAX_NDB_OBJECTS 32678 @@ -1982,6 +1983,598 @@ simpleReadAbortOnError(NDBT_Context* ctx } +int +testNdbRecordPkAmbiguity(NDBT_Context* ctx, NDBT_Step* step) +{ + /* NdbRecord Insert and Write can take 2 record and row ptrs + * In all cases, the AttrInfo sent to TC for PK columns + * should be the same as the KeyInfo sent to TC to avoid + * inconsistency + * Approach : + * 1) Use Insert/Write to insert tuple with different + * values for pks in attr row + * 2) Read back all data, including PKs + * 3) Verify all values. + */ + Ndb* pNdb = GETNDB(step); + const NdbDictionary::Table* pTab= ctx->getTab(); + const NdbRecord* tabRec= pTab->getDefaultRecord(); + const Uint32 sizeOfTabRec= NdbDictionary::getRecordRowLength(tabRec); + char keyRowBuf[ NDB_MAX_TUPLE_SIZE_IN_WORDS << 2 ]; + char attrRowBuf[ NDB_MAX_TUPLE_SIZE_IN_WORDS << 2 ]; + + HugoCalculator calc(*pTab); + + const int numRecords= 100; + + for (int optype=0; optype < 2; optype++) + { + /* First, let's calculate the correct Hugo values for this row */ + + for (int record=0; record < numRecords; record++) + { + int updates= 0; + for (int col=0; colgetNoOfColumns(); col++) + { + char* valPtr= NdbDictionary::getValuePtr(tabRec, + keyRowBuf, + col); + CHECK(valPtr != NULL); + + int len= pTab->getColumn(col)->getSizeInBytes(); + Uint32 real_len; + bool isNull= (calc.calcValue(record, col, updates, valPtr, + len, &real_len) == NULL); + if (pTab->getColumn(col)->getNullable()) + { + NdbDictionary::setNull(tabRec, + keyRowBuf, + col, + isNull); + } + } + + /* Now copy the values to the Attr record */ + memcpy(attrRowBuf, keyRowBuf, sizeOfTabRec); + + Uint32 mippleAttempts= 3; + + while (memcmp(keyRowBuf, attrRowBuf, sizeOfTabRec) == 0) + { + /* Now doctor the PK values in the Attr record */ + for (int col=0; colgetNoOfColumns(); col++) + { + if (pTab->getColumn(col)->getPrimaryKey()) + { + char* valPtr= NdbDictionary::getValuePtr(tabRec, + attrRowBuf, + col); + CHECK(valPtr != NULL); + + int len= pTab->getColumn(col)->getSizeInBytes(); + Uint32 real_len; + /* We use the PK value for some other record */ + int badRecord= record + (rand() % 1000); + bool isNull= (calc.calcValue(badRecord, col, updates, valPtr, + len, &real_len) == NULL); + CHECK(! isNull); + } + } + + /* Can try to get variance only a limited number of times */ + CHECK(mippleAttempts-- != 0); + } + + /* Ok, now have key and attr records with different values for + * PK cols, let's try to insert + */ + NdbTransaction* trans=pNdb->startTransaction(); + CHECK(trans != 0); + + const NdbOperation* op= NULL; + if (optype == 0) + { + // ndbout << "Using insertTuple" << endl; + op= trans->insertTuple(tabRec, + keyRowBuf, + tabRec, + attrRowBuf); + } + else + { + // ndbout << "Using writeTuple" << endl; + op= trans->writeTuple(tabRec, + keyRowBuf, + tabRec, + attrRowBuf); + } + CHECK(op != 0); + + CHECK(trans->execute(Commit) == 0); + trans->close(); + + /* Now read back */ + memset(attrRowBuf, 0, sizeOfTabRec); + + Uint32 pkVal= 0; + pkVal= *(Uint32*) NdbDictionary::getValuePtr(tabRec, + keyRowBuf, + 0); + + trans= pNdb->startTransaction(); + op= trans->readTuple(tabRec, + keyRowBuf, + tabRec, + attrRowBuf); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + CHECK(trans->getNdbError().code == 0); + trans->close(); + + /* Verify the values read back */ + for (int col=0; colgetNoOfColumns(); col++) + { + const char* valPtr= NdbDictionary::getValuePtr(tabRec, + attrRowBuf, + col); + CHECK(valPtr != NULL); + + char calcBuff[ NDB_MAX_TUPLE_SIZE_IN_WORDS << 2 ]; + int len= pTab->getColumn(col)->getSizeInBytes(); + Uint32 real_len; + bool isNull= (calc.calcValue(record, col, updates, calcBuff, + len, &real_len) == NULL); + bool colIsNullable= pTab->getColumn(col)->getNullable(); + if (isNull) + { + CHECK(colIsNullable); + if (!NdbDictionary::isNull(tabRec, + attrRowBuf, + col)) + { + ndbout << "Error, col " << col + << " (pk=" << pTab->getColumn(col)->getPrimaryKey() + << ") should be Null, but is not" << endl; + return NDBT_FAILED; + } + } + else + { + if (colIsNullable) + { + if (NdbDictionary::isNull(tabRec, + attrRowBuf, + col)) + { + ndbout << "Error, col " << col + << " (pk=" << pTab->getColumn(col)->getPrimaryKey() + << ") should be non-Null but is null" << endl; + return NDBT_FAILED; + }; + } + + /* Compare actual data read back */ + if( memcmp(calcBuff, valPtr, real_len) != 0 ) + { + ndbout << "Error, col " << col + << " (pk=" << pTab->getColumn(col)->getPrimaryKey() + << ") should be equal, but isn't for record " + << record << endl; + ndbout << "Expected :"; + for (Uint32 i=0; i < real_len; i++) + { + ndbout_c("%x ", calcBuff[i]); + } + ndbout << endl << "Received :"; + for (Uint32 i=0; i < real_len; i++) + { + ndbout_c("%x ", valPtr[i]); + } + ndbout << endl; + + return NDBT_FAILED; + } + } + } + + /* Now delete the tuple */ + trans= pNdb->startTransaction(); + op= trans->deleteTuple(tabRec, + keyRowBuf, + tabRec); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + + trans->close(); + } + } + + return NDBT_OK; + +} + +int +testNdbRecordPKUpdate(NDBT_Context* ctx, NDBT_Step* step) +{ + /* In general, we should be able to update primary key + * values. We cannot *change* them, but for cases where + * a collation maps several discrete values to a single + * normalised value, it should be possible to modify + * the discrete value of the key, as the normalised + * key value is unchanged. + * Rather than testing with such a collation here, we + * cop out and test for errors with a 'null' change. + */ + Ndb* pNdb = GETNDB(step); + const NdbDictionary::Table* pTab= ctx->getTab(); + const NdbRecord* tabRec= pTab->getDefaultRecord(); + char rowBuf[ NDB_MAX_TUPLE_SIZE_IN_WORDS << 2 ]; + char badKeyRowBuf[ NDB_MAX_TUPLE_SIZE_IN_WORDS << 2 ]; + + HugoCalculator calc(*pTab); + + const int numRecords= 100; + + /* First, let's calculate the correct Hugo values for this row */ + for (int record=0; record < numRecords; record++) + { + int updates= 0; + for (int col=0; colgetNoOfColumns(); col++) + { + char* valPtr= NdbDictionary::getValuePtr(tabRec, + rowBuf, + col); + CHECK(valPtr != NULL); + + int len= pTab->getColumn(col)->getSizeInBytes(); + Uint32 real_len; + bool isNull= (calc.calcValue(record, col, updates, valPtr, + len, &real_len) == NULL); + if (pTab->getColumn(col)->getNullable()) + { + NdbDictionary::setNull(tabRec, + rowBuf, + col, + isNull); + } + } + + /* Create similar row, but with different id col (different + * PK from p.o.v. of PK column update + */ + memcpy(badKeyRowBuf, rowBuf, NDB_MAX_TUPLE_SIZE_IN_WORDS << 2); + for (int col=0; colgetNoOfColumns(); col++) + { + if (calc.isIdCol(col)) + { + char* valPtr= NdbDictionary::getValuePtr(tabRec, + badKeyRowBuf, + col); + Uint32 badId= record+333; + memcpy(valPtr, &badId, sizeof(badId)); + } + } + + NdbTransaction* trans=pNdb->startTransaction(); + CHECK(trans != 0); + + const NdbOperation* op= trans->insertTuple(tabRec, + rowBuf); + CHECK(op != 0); + + CHECK(trans->execute(Commit) == 0); + trans->close(); + + /* Now update the PK columns */ + trans= pNdb->startTransaction(); + op= trans->updateTuple(tabRec, + rowBuf, + tabRec, + rowBuf); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + CHECK(trans->getNdbError().code == 0); + trans->close(); + + /* Now update PK with scan takeover op */ + trans= pNdb->startTransaction(); + + NdbScanOperation* scanOp=trans->scanTable(tabRec, + NdbOperation::LM_Exclusive); + CHECK(scanOp != 0); + + CHECK(trans->execute(NoCommit) == 0); + + /* Now update PK with lock takeover op */ + const char* rowPtr; + CHECK(scanOp->nextResult(&rowPtr, true, true) == 0); + + op= scanOp->updateCurrentTuple(trans, + tabRec, + rowBuf); + CHECK(op != NULL); + + CHECK(trans->execute(Commit) == 0); + + trans->close(); + + /* Now attempt bad PK update with lock takeover op + * This is interesting as NDBAPI normally takes the + * value of PK columns in an update from the key + * row - so it's not possible to pass a 'different' + * value (except when collations are used). + * Scan Takeover update takes the PK values from the + * attribute record and so different values can + * be supplied. + * Here we check that different values result in the + * kernel complaining. + */ + trans= pNdb->startTransaction(); + + scanOp=trans->scanTable(tabRec, + NdbOperation::LM_Exclusive); + CHECK(scanOp != 0); + + CHECK(trans->execute(NoCommit) == 0); + + /* Now update PK with lock takeover op */ + CHECK(scanOp->nextResult(&rowPtr, true, true) == 0); + + op= scanOp->updateCurrentTuple(trans, + tabRec, + badKeyRowBuf); + CHECK(op != NULL); + + CHECK(trans->execute(Commit) == -1); + CHECK(trans->getNdbError().code == 897); + + trans->close(); + + /* Now delete the tuple */ + trans= pNdb->startTransaction(); + op= trans->deleteTuple(tabRec, + rowBuf, + tabRec); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + + trans->close(); + } + + return NDBT_OK; + +} + +static +BaseString getKeyVal(int record, bool upper) +{ + /* Create VARCHAR format key with upper or + * lower case leading char + */ + BaseString keyData; + char c= 'a' + (record % ('z' - 'a')); + + keyData.appfmt("%cblahblah%d", c, record); + + if (upper) + keyData.ndb_toupper(); + + BaseString varCharKey; + varCharKey.appfmt("%c%s", keyData.length(), keyData.c_str()); + + return varCharKey; +} + +int +testNdbRecordCICharPKUpdate(NDBT_Context* ctx, NDBT_Step* step) +{ + /* Test a change to a CHAR primary key with a case insensitive + * collation. + */ + Ndb* pNdb = GETNDB(step); + const NdbDictionary::Table* pTab= ctx->getTab(); + + /* Run as a 'T1' testcase - do nothing for other tables */ + if (strcmp(pTab->getName(), "T1") != 0) + return NDBT_OK; + + CHARSET_INFO* charset= NULL; + const char* csname="latin1_general_ci"; + charset= get_charset_by_name(csname, MYF(0)); + + if (charset == NULL) + { + ndbout << "Couldn't get charset " << csname << endl; + return NDBT_FAILED; + } + + /* Create table with required schema */ + NdbDictionary::Table tab; + tab.setName("TAB_CICHARPKUPD"); + + NdbDictionary::Column pk; + pk.setName("PK"); + pk.setType(NdbDictionary::Column::Varchar); + pk.setLength(20); + pk.setNullable(false); + pk.setPrimaryKey(true); + pk.setCharset(charset); + tab.addColumn(pk); + + NdbDictionary::Column data; + data.setName("DATA"); + data.setType(NdbDictionary::Column::Unsigned); + data.setNullable(false); + data.setPrimaryKey(false); + tab.addColumn(data); + + pNdb->getDictionary()->dropTable(tab.getName()); + if(pNdb->getDictionary()->createTable(tab) != 0) + { + ndbout << "Create table failed with error : " + << pNdb->getDictionary()->getNdbError().code + << pNdb->getDictionary()->getNdbError().message + << endl; + return NDBT_FAILED; + } + + ndbout << (NDBT_Table&)tab << endl; + + pTab= pNdb->getDictionary()->getTable(tab.getName()); + + const NdbRecord* tabRec= pTab->getDefaultRecord(); + const Uint32 rowLen= NDB_MAX_TUPLE_SIZE_IN_WORDS << 2; + char ucRowBuf[ rowLen ]; + char lcRowBuf[ rowLen ]; + char readBuf[ rowLen ]; + char* ucPkPtr= NdbDictionary::getValuePtr(tabRec, + ucRowBuf, + 0); + Uint32* ucDataPtr= (Uint32*) NdbDictionary::getValuePtr(tabRec, + ucRowBuf, + 1); + char* lcPkPtr= NdbDictionary::getValuePtr(tabRec, + lcRowBuf, + 0); + Uint32* lcDataPtr= (Uint32*) NdbDictionary::getValuePtr(tabRec, + lcRowBuf, + 1); + + char* readPkPtr= NdbDictionary::getValuePtr(tabRec, + readBuf, + 0); + Uint32* readDataPtr= (Uint32*) NdbDictionary::getValuePtr(tabRec, + readBuf, + 1); + + + const int numRecords= 100; + BaseString upperKey; + BaseString lowerKey; + + for (int record=0; record < numRecords; record++) + { + upperKey.assign(getKeyVal(record, true).c_str()); + lowerKey.assign(getKeyVal(record, false).c_str()); + + memcpy(ucPkPtr, upperKey.c_str(), upperKey.length()); + memcpy(lcPkPtr, lowerKey.c_str(), lowerKey.length()); + memcpy(ucDataPtr, &record, sizeof(record)); + memcpy(lcDataPtr, &record, sizeof(record)); + + /* Insert with upper case */ + NdbTransaction* trans=pNdb->startTransaction(); + CHECK(trans != 0); + + const NdbOperation* op= trans->insertTuple(tabRec, + ucRowBuf); + CHECK(op != 0); + + int rc= trans->execute(Commit); + if (rc != 0) + ndbout << "Error " << trans->getNdbError().message << endl; + CHECK(rc == 0); + trans->close(); + + /* Read with upper case */ + trans=pNdb->startTransaction(); + CHECK(trans != 0); + op= trans->readTuple(tabRec, + ucRowBuf, + tabRec, + readBuf); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + trans->close(); + + /* Check key and data read */ + CHECK(memcmp(ucPkPtr, readPkPtr, ucPkPtr[0]) == 0); + CHECK(memcmp(ucDataPtr, readDataPtr, sizeof(int)) == 0); + + memset(readBuf, 0, NDB_MAX_TUPLE_SIZE_IN_WORDS << 2); + + /* Read with lower case */ + trans=pNdb->startTransaction(); + CHECK(trans != 0); + op= trans->readTuple(tabRec, + lcRowBuf, + tabRec, + readBuf); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + trans->close(); + + /* Check key and data read */ + CHECK(memcmp(ucPkPtr, readPkPtr, ucPkPtr[0]) == 0); + CHECK(memcmp(ucDataPtr, readDataPtr, sizeof(int)) == 0); + + memset(readBuf, 0, NDB_MAX_TUPLE_SIZE_IN_WORDS << 2); + + /* Now update just the PK column to lower case */ + trans= pNdb->startTransaction(); + unsigned char mask[1]; + mask[0]= 1; + op= trans->updateTuple(tabRec, + lcRowBuf, + tabRec, + lcRowBuf, + mask); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + CHECK(trans->getNdbError().code == 0); + trans->close(); + + /* Now check that we can read with the upper case key */ + memset(readBuf, 0, NDB_MAX_TUPLE_SIZE_IN_WORDS << 2); + + trans=pNdb->startTransaction(); + CHECK(trans != 0); + op= trans->readTuple(tabRec, + ucRowBuf, + tabRec, + readBuf); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + trans->close(); + + /* Check key and data read */ + CHECK(memcmp(lcPkPtr, readPkPtr, lcPkPtr[0]) == 0); + CHECK(memcmp(lcDataPtr, readDataPtr, sizeof(int)) == 0); + + /* Now check that we can read with the lower case key */ + memset(readBuf, 0, NDB_MAX_TUPLE_SIZE_IN_WORDS << 2); + + trans=pNdb->startTransaction(); + CHECK(trans != 0); + op= trans->readTuple(tabRec, + lcRowBuf, + tabRec, + readBuf); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + trans->close(); + + /* Check key and data read */ + CHECK(memcmp(lcPkPtr, readPkPtr, lcPkPtr[0]) == 0); + CHECK(memcmp(lcDataPtr, readDataPtr, sizeof(int)) == 0); + + + /* Now delete the tuple */ + trans= pNdb->startTransaction(); + op= trans->deleteTuple(tabRec, + ucRowBuf, + tabRec); + CHECK(op != 0); + CHECK(trans->execute(Commit) == 0); + + trans->close(); + } + + return NDBT_OK; + +} + + NDBT_TESTSUITE(testNdbApi); TESTCASE("MaxNdb", "Create Ndb objects until no more can be created\n"){ @@ -2102,6 +2695,18 @@ TESTCASE("SimpleReadAbortOnError", "Test behaviour of Simple reads with Abort On Error"){ INITIALIZER(simpleReadAbortOnError); } +TESTCASE("NdbRecordPKAmbiguity", + "Test behaviour of NdbRecord insert with ambig. pk values"){ + INITIALIZER(testNdbRecordPkAmbiguity); +} +TESTCASE("NdbRecordPKUpdate", + "Verify that primary key columns can be updated"){ + INITIALIZER(testNdbRecordPKUpdate); +} +TESTCASE("NdbRecordCICharPKUpdate", + "Verify that a case-insensitive char pk column can be updated"){ + INITIALIZER(testNdbRecordCICharPKUpdate); +} NDBT_TESTSUITE_END(testNdbApi); int main(int argc, const char** argv){ === modified file 'storage/ndb/test/run-test/daily-basic-tests.txt' --- storage/ndb/test/run-test/daily-basic-tests.txt 2009-03-02 09:39:57 +0000 +++ storage/ndb/test/run-test/daily-basic-tests.txt 2009-03-13 16:17:30 +0000 @@ -777,6 +777,18 @@ cmd: testNdbApi args: -n SimpleReadAbortOnError T1 T6 T15 max-time: 500 +cmd: testNdbApi +args: -n NdbRecordPKAmbiguity T1 T6 T15 + +max-time: 500 +cmd: testNdbApi +args: -n NdbRecordPKUpdate T1 T6 T15 + +max-time: 500 +cmd: testNdbApi +args: -n NdbRecordCICharPKUpdate T1 + +max-time: 500 cmd: testInterpreter args: T1