=== modified file 'storage/ndb/include/kernel/Interpreter.hpp' --- storage/ndb/include/kernel/Interpreter.hpp 2008-02-19 15:00:29 +0000 +++ storage/ndb/include/kernel/Interpreter.hpp 2008-12-01 14:48:46 +0000 @@ -89,18 +89,17 @@ public: /** * Branch string * - * i = Instruction - 5 Bits ( 0 - 5 ) max 63 - * a = Attribute id - * l = Length of string - * b = Branch offset - * t = branch type + * i = Instruction - 5 Bits ( 0 - 5 ) max 63 + * a = Attribute id - 16 bits + * l = Length of string (bytes) - 16 bits + * b = Branch offset (words) - 16 bits + * t = branch type - 4 bits * d = Array length diff * v = Varchar flag - * p = No-blank-padding flag for char compare * * 1111111111222222222233 * 01234567890123456789012345678901 - * iiiiii ddvtttpbbbbbbbbbbbbbbbb + * iiiiii ddvttttbbbbbbbbbbbbbbbb * aaaaaaaaaaaaaaaallllllllllllllll * -string.... - */ @@ -117,17 +116,21 @@ public: GT = 4, GE = 5, LIKE = 6, - NOT_LIKE = 7 + NOT_LIKE = 7, + AND_EQ_MASK = 8, + AND_NE_MASK = 9, + AND_EQ_ZERO = 10, + AND_NE_ZERO = 11 }; + // TODO : Remove other 2 unused parameters. static Uint32 BranchCol(BinaryCondition cond, - Uint32 arrayLengthDiff, Uint32 varchar, bool nopad); + Uint32 arrayLengthDiff, Uint32 varchar); static Uint32 BranchCol_2(Uint32 AttrId); static Uint32 BranchCol_2(Uint32 AttrId, Uint32 Len); static Uint32 getBinaryCondition(Uint32 op1); static Uint32 getArrayLengthDiff(Uint32 op1); static Uint32 isVarchar(Uint32 op1); - static Uint32 isNopad(Uint32 op1); static Uint32 getBranchCol_AttrId(Uint32 op2); static Uint32 getBranchCol_Len(Uint32 op2); @@ -216,15 +219,14 @@ inline Uint32 Interpreter::BranchCol(BinaryCondition cond, Uint32 arrayLengthDiff, - Uint32 varchar, bool nopad){ - //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u nopad=%d", - //cond, arrayLengthDiff, varchar, nopad); + Uint32 varchar){ + //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u", + //cond, arrayLengthDiff, varchar); return BRANCH_ATTR_OP_ARG + (arrayLengthDiff << 9) + (varchar << 11) + - (cond << 12) + - (nopad << 15); + (cond << 12); } inline @@ -242,7 +244,7 @@ Interpreter::BranchCol_2(Uint32 AttrId){ inline Uint32 Interpreter::getBinaryCondition(Uint32 op){ - return (op >> 12) & 0x7; + return (op >> 12) & 0xf; } inline @@ -259,12 +261,6 @@ Interpreter::isVarchar(Uint32 op){ inline Uint32 -Interpreter::isNopad(Uint32 op){ - return (op >> 15) & 1; -} - -inline -Uint32 Interpreter::getBranchCol_AttrId(Uint32 op){ return (op >> 16) & 0xFFFF; } === modified file 'storage/ndb/include/ndbapi/NdbInterpretedCode.hpp' --- storage/ndb/include/ndbapi/NdbInterpretedCode.hpp 2008-04-07 09:52:25 +0000 +++ storage/ndb/include/ndbapi/NdbInterpretedCode.hpp 2008-12-01 14:48:46 +0000 @@ -225,27 +225,30 @@ public: * Space required Buffer Request message * branch_col_*_null 2 words 2 words * branch_col_* 2 words + 2 words + - * len bytes len bytes + * type length type length * rounded to rounded to * nearest word nearest word * + * Only significant words stored/ + * sent for VAR* types + * * @param val ptr to const value to compare against - * @param len length in bytes of const value + * @param unused unnecessary * @param attrId column to compare * @param Label Program label to jump to if condition is true * @return 0 if successful, -1 otherwise. */ - int branch_col_eq(const void * val, Uint32 len, Uint32 attrId, + int branch_col_eq(const void * val, Uint32 unused, Uint32 attrId, Uint32 Label); - int branch_col_ne(const void * val, Uint32 len, Uint32 attrId, + int branch_col_ne(const void * val, Uint32 unused, Uint32 attrId, Uint32 Label); - int branch_col_lt(const void * val, Uint32 len, Uint32 attrId, + int branch_col_lt(const void * val, Uint32 unused, Uint32 attrId, Uint32 Label); - int branch_col_le(const void * val, Uint32 len, Uint32 attrId, + int branch_col_le(const void * val, Uint32 unused, Uint32 attrId, Uint32 Label); - int branch_col_gt(const void * val, Uint32 len, Uint32 attrId, + int branch_col_gt(const void * val, Uint32 unused, Uint32 attrId, Uint32 Label); - int branch_col_ge(const void * val, Uint32 len, Uint32 attrId, + int branch_col_ge(const void * val, Uint32 unused, Uint32 attrId, Uint32 Label); int branch_col_eq_null(Uint32 attrId, Uint32 Label); int branch_col_ne_null(Uint32 attrId, Uint32 Label); @@ -280,13 +283,60 @@ public: * @param attrId column to compare * @param Label Program label to jump to if condition is true * @return 0 if successful, -1 otherwise. - + * */ int branch_col_like(const void * val, Uint32 len, Uint32 attrId, Uint32 Label); int branch_col_notlike(const void * val, Uint32 len, Uint32 attrId, Uint32 Label); + /* Table based bitwise logical conditional operations + * -------------------------------------------------- + * These instructions are used to branch based on the + * result of logical AND between Bit type column data + * and a bitmask pattern. + * + * These instructions require that the table being operated + * upon was supplied when the NdbInterpretedCode object was + * constructed. + * + * The mask value should be the same size as the bit column + * being compared. + * Bitfields are passed in/out of NdbApi as 32-bit words + * with bits set from lsb to msb. + * The platform's endianness controls which byte contains + * the ls bits. + * x86= first(0th) byte. Sparc/PPC= last(3rd byte) + * + * To set bit n of a bitmask to 1 from a Uint32* mask : + * mask[n >> 5] |= (1 << (n & 31)) + * + * if (BitWiseAnd(ValueOf(attrId), *mask) <*mask/0>) + * goto Label; + * + * Space required Buffer Request message + * branch_col_and_mask_eq_mask/ + * branch_col_and_mask_ne_mask/ + * branch_col_and_mask_eq_zero/ + * branch_col_and_mask_ne_zero + * 2 words + 2 words + + * column width column width + * rounded to rounded to + * nearest word nearest word + * + * @param mask ptr to const mask to use + * @param unused unnecessary + * @param attrId column to compare + * @param Label Program label to jump to if condition is true + * @return 0 if successful, -1 otherwise. + * + */ + int branch_col_and_mask_eq_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label); + int branch_col_and_mask_ne_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label); + int branch_col_and_mask_eq_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label); + int branch_col_and_mask_ne_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label); + + /* Program results * --------------- * These instructions indicate to the interpreter that processing === modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp' --- storage/ndb/include/ndbapi/NdbOperation.hpp 2008-11-08 21:22:57 +0000 +++ storage/ndb/include/ndbapi/NdbOperation.hpp 2008-12-01 14:48:46 +0000 @@ -734,7 +734,12 @@ public: int branch_col_ge(Uint32 ColId, const void * val, Uint32 len, bool nopad, Uint32 Label); /** - * The argument is always plain char, even if the field is varchar + * LIKE/NOTLIKE wildcard comparisons + * These instructions support SQL-style % and _ wildcards for + * (VAR)CHAR/BINARY columns only + * + * The argument is always plain char format, even if the field + * is varchar * (changed in 5.0.22). * * @note For Scans and NdbRecord operations, use the @@ -744,7 +749,40 @@ public: bool nopad, Uint32 Label); int branch_col_notlike(Uint32 ColId, const void *, Uint32 len, bool nopad, Uint32 Label); - + + /** + * Bitwise logical comparisons + * + * These comparison types are only supported for the Bitfield + * type + * They can be used to test for bit patterns in bitfield columns + * The value passed is a bitmask which is bitwise-ANDed with the + * column data. + * Bitfields are passed in/out of NdbApi as 32-bit words with + * bits set from lsb to msb. + * The platform's endianness controls which byte contains the ls + * bits. + * x86= first(0th) byte. Sparc/PPC= last (3rd byte) + * + * To set bit n of a bitmask to 1 from a Uint32* mask : + * mask[n >> 5] |= (1 << (n & 31)) + * + * The branch can be taken in 4 cases : + * - Column data AND Mask == Mask (all masked bits are set in data) + * - Column data AND Mask != Mask (not all masked bits are set in data) + * - Column data AND Mask == 0 (No masked bits are set in data) + * - Column data AND Mask != 0 (Some masked bits are set in data) + * + */ + int branch_col_and_mask_eq_mask(Uint32 ColId, const void *, Uint32 len, + bool nopad, Uint32 Label); + int branch_col_and_mask_ne_mask(Uint32 ColId, const void *, Uint32 len, + bool nopad, Uint32 Label); + int branch_col_and_mask_eq_zero(Uint32 ColId, const void *, Uint32 len, + bool nopad, Uint32 Label); + int branch_col_and_mask_ne_zero(Uint32 ColId, const void *, Uint32 len, + bool nopad, Uint32 Label); + /** * Interpreted program instruction: Exit with Ok * @@ -1249,7 +1287,7 @@ protected: int read_attr(const NdbColumnImpl* anAttrObject, Uint32 RegDest); int write_attr(const NdbColumnImpl* anAttrObject, Uint32 RegSource); int branch_reg_reg(Uint32 type, Uint32, Uint32, Uint32); - int branch_col(Uint32 type, Uint32, const void *, Uint32, bool, Uint32 Label); + int branch_col(Uint32 type, Uint32, const void *, Uint32, Uint32 Label); int branch_col_null(Uint32 type, Uint32 col, Uint32 Label); NdbBlob *linkInBlobHandle(NdbTransaction *aCon, const NdbColumnImpl *column, === modified file 'storage/ndb/include/ndbapi/NdbScanFilter.hpp' --- storage/ndb/include/ndbapi/NdbScanFilter.hpp 2008-04-07 09:56:30 +0000 +++ storage/ndb/include/ndbapi/NdbScanFilter.hpp 2008-12-01 14:48:46 +0000 @@ -80,14 +80,18 @@ public: enum BinaryCondition { - COND_LE = 0, ///< lower bound - COND_LT = 1, ///< lower bound, strict - COND_GE = 2, ///< upper bound - COND_GT = 3, ///< upper bound, strict - COND_EQ = 4, ///< equality - COND_NE = 5, ///< not equal - COND_LIKE = 6, ///< like - COND_NOT_LIKE = 7 ///< not like + COND_LE = 0, ///< lower bound + COND_LT = 1, ///< lower bound, strict + COND_GE = 2, ///< upper bound + COND_GT = 3, ///< upper bound, strict + COND_EQ = 4, ///< equality + COND_NE = 5, ///< not equal + COND_LIKE = 6, ///< like + COND_NOT_LIKE = 7, ///< not like + COND_AND_EQ_MASK = 8, ///< (bit & mask) == mask + COND_AND_NE_MASK = 9, ///< (bit & mask) != mask (incl. NULL) + COND_AND_EQ_ZERO = 10, ///< (bit & mask) == 0 + COND_AND_NE_ZERO = 11, ///< (bit & mask) != 0 (incl. NULL) }; /** @@ -130,6 +134,9 @@ public: * documentation for NdbOperation::equal(). * For BinaryConditions LIKE and NOT_LIKE, the value pointed to by val * should NOT include initial length bytes. + * For LIKE and NOT_LIKE, the % and ? wildcards are supported. + * For bitmask operations, see the bitmask format information against + * the branch_col_and_mask_eq_mask instruction in NdbInterpretedCode.hpp * * ®return 0 if successful, -1 otherwise */ === modified file 'storage/ndb/include/util/NdbSqlUtil.hpp' --- storage/ndb/include/util/NdbSqlUtil.hpp 2006-12-23 19:20:40 +0000 +++ storage/ndb/include/util/NdbSqlUtil.hpp 2008-12-01 14:48:46 +0000 @@ -52,6 +52,15 @@ public: */ typedef int Like(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2); + /** + * Prototype for mask comparisons. Defined for bit type. + * + * If common portion of data AND Mask is equal to mask + * return 0, else return 1. + * If cmpZero, compare data AND Mask to zero. + */ + typedef int AndMask(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero); + enum CmpResult { CmpLess = -1, CmpEqual = 0, @@ -96,6 +105,7 @@ public: Enum m_typeId; // redundant Cmp* m_cmp; // comparison method Like* m_like; // "like" comparison method + AndMask* m_mask; // Mask comparison method }; /** @@ -179,6 +189,8 @@ private: static Like likeVarbinary; static Like likeLongvarchar; static Like likeLongvarbinary; + // + static AndMask maskBit; }; #endif === modified file 'storage/ndb/src/common/util/NdbSqlUtil.cpp' --- storage/ndb/src/common/util/NdbSqlUtil.cpp 2007-02-23 11:28:34 +0000 +++ storage/ndb/src/common/util/NdbSqlUtil.cpp 2008-12-01 14:48:46 +0000 @@ -26,156 +26,187 @@ NdbSqlUtil::m_typeList[] = { { // 0 Type::Undefined, NULL, + NULL, NULL }, { // 1 Type::Tinyint, cmpTinyint, + NULL, NULL }, { // 2 Type::Tinyunsigned, cmpTinyunsigned, + NULL, NULL }, { // 3 Type::Smallint, cmpSmallint, + NULL, NULL }, { // 4 Type::Smallunsigned, cmpSmallunsigned, + NULL, NULL }, { // 5 Type::Mediumint, cmpMediumint, + NULL, NULL }, { // 6 Type::Mediumunsigned, cmpMediumunsigned, + NULL, NULL }, { // 7 Type::Int, cmpInt, + NULL, NULL }, { // 8 Type::Unsigned, cmpUnsigned, + NULL, NULL }, { // 9 Type::Bigint, cmpBigint, + NULL, NULL }, { // 10 Type::Bigunsigned, cmpBigunsigned, + NULL, NULL }, { // 11 Type::Float, cmpFloat, + NULL, NULL }, { // 12 Type::Double, cmpDouble, + NULL, NULL }, { // 13 Type::Olddecimal, cmpOlddecimal, + NULL, NULL }, { // 14 Type::Char, cmpChar, - likeChar + likeChar, + NULL }, { // 15 Type::Varchar, cmpVarchar, - likeVarchar + likeVarchar, + NULL }, { // 16 Type::Binary, cmpBinary, - likeBinary + likeBinary, + NULL }, { // 17 Type::Varbinary, cmpVarbinary, - likeVarbinary + likeVarbinary, + NULL }, { // 18 Type::Datetime, cmpDatetime, + NULL, NULL }, { // 19 Type::Date, cmpDate, + NULL, NULL }, { // 20 Type::Blob, NULL, + NULL, NULL }, { // 21 Type::Text, NULL, + NULL, NULL }, { // 22 Type::Bit, cmpBit, - NULL + NULL, + maskBit }, { // 23 Type::Longvarchar, cmpLongvarchar, - likeLongvarchar + likeLongvarchar, + NULL }, { // 24 Type::Longvarbinary, cmpLongvarbinary, - likeLongvarbinary + likeLongvarbinary, + NULL }, { // 25 Type::Time, cmpTime, + NULL, NULL }, { // 26 Type::Year, cmpYear, + NULL, NULL }, { // 27 Type::Timestamp, cmpTimestamp, + NULL, NULL }, { // 28 Type::Olddecimalunsigned, cmpOlddecimalunsigned, + NULL, NULL }, { // 29 Type::Decimal, cmpDecimal, + NULL, NULL }, { // 30 Type::Decimalunsigned, cmpDecimalunsigned, + NULL, NULL } }; @@ -680,9 +711,54 @@ NdbSqlUtil::cmpText(const void* info, co int NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - Uint32 n = (n1 < n2) ? n1 : n2; - int ret = memcmp(p1, p2, n); - return ret; + /* Bitfields are stored as 32-bit words + * This means that a byte-by-byte comparison will not work on all platforms + * We do a word-wise comparison of the significant bytes. + * It is assumed that insignificant bits (but not bytes) are zeroed in the + * passed values. + */ + const Uint32 bytes= MIN(n1, n2); + Uint32 words= (bytes + 3) >> 2; + + /* Don't expect either value to be length zero */ + assert(words); + + /* Check ptr alignment */ + if (unlikely(((((UintPtr)p1) & 3) != 0) || + ((((UintPtr)p2) & 3) != 0))) + { + Uint32 copyP1[ MAX_TUPLE_SIZE_IN_WORDS ]; + Uint32 copyP2[ MAX_TUPLE_SIZE_IN_WORDS ]; + memcpy(copyP1, p1, words << 2); + memcpy(copyP2, p2, words << 2); + + return cmpBit(info, copyP1, bytes, copyP2, bytes, full); + } + + const Uint32* wp1= (const Uint32*) p1; + const Uint32* wp2= (const Uint32*) p2; + while (--words) + { + if (*wp1 < *wp2) + return -1; + if (*(wp1++) > *(wp2++)) + return 1; + } + + /* For the last word, we mask out any insignificant bytes */ + const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant + const Uint32 mask= sigBytes? + (1 << (sigBytes *8)) -1 : + ~0; + const Uint32 lastWord1= *wp1 & mask; + const Uint32 lastWord2= *wp2 & mask; + + if (lastWord1 < lastWord2) + return -1; + if (lastWord1 > lastWord2) + return 1; + + return 0; } @@ -874,6 +950,85 @@ NdbSqlUtil::likeLongvarbinary(const void return likeLongvarchar(&my_charset_bin, p1, n1, p2, n2); } + +// mask Functions + +int +NdbSqlUtil::maskBit(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero) +{ + /* Bitfields are stored in word oriented form, so we must compare them in that + * style as well + * It is assumed that insignificant bits (but not bytes) in the passed values + * are zeroed + */ + const Uint32 bytes = MIN(dataLen, maskLen); + Uint32 words = (bytes + 3) >> 2; + + /* Don't expect either value to be length zero */ + assert(words); + + /* Check ptr alignment */ + if (unlikely(((((UintPtr)data) & 3) != 0) || + ((((UintPtr)mask) & 3) != 0))) + { + Uint32 copydata[ MAX_TUPLE_SIZE_IN_WORDS ]; + Uint32 copymask[ MAX_TUPLE_SIZE_IN_WORDS ]; + memcpy(copydata, data, words << 2); + memcpy(copymask, mask, words << 2); + + return maskBit(data, bytes, mask, bytes, cmpZero); + } + + const Uint32* wdata= (const Uint32*) data; + const Uint32* wmask= (const Uint32*) mask; + + if (cmpZero) + { + while (--words) + { + if ((*(wdata++) & *(wmask++)) != 0) + return 1; + } + + /* For the last word, we mask out any insignificant bytes */ + const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant + const Uint32 comparisonMask= sigBytes? + (1 << (sigBytes *8)) -1 : + ~0; + const Uint32 lastDataWord= *wdata & comparisonMask; + const Uint32 lastMaskWord= *wmask & comparisonMask; + + if ((lastDataWord & lastMaskWord) != 0) + return 1; + + return 0; + } + else + { + while (--words) + { + if ((*(wdata++) & *wmask) != *wmask) + return 1; + + wmask++; + } + + /* For the last word, we mask out any insignificant bytes */ + const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant + const Uint32 comparisonMask= sigBytes? + (1 << (sigBytes *8)) -1 : + ~0; + const Uint32 lastDataWord= *wdata & comparisonMask; + const Uint32 lastMaskWord= *wmask & comparisonMask; + + if ((lastDataWord & lastMaskWord) != lastMaskWord) + return 1; + + return 0; + } +}; + + // check charset uint === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp' --- storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-08-26 14:12:34 +0000 +++ storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-12-01 14:48:46 +0000 @@ -2491,12 +2491,23 @@ int Dbtup::interpreterNextLab(Signal* si const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1]; // fixed length in 5.0 Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1); + + if (typeId == NDB_TYPE_BIT) + { + /* Size in bytes for bit fields can be incorrect due to + * rounding down + */ + Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1) + + 7) / 8; + attrLen= bitFieldAttrLen; + } bool r1_null = ah.isNULL(); bool r2_null = argLen == 0; int res1; - if (cond != Interpreter::LIKE && - cond != Interpreter::NOT_LIKE) { + if (cond <= Interpreter::GE) + { + /* Inequality - EQ, NE, LT, LE, GT, GE */ if (r1_null || r2_null) { // NULL==NULL and NULLgetStringType()) { /* Fixed size type */ + if (col->getType() == NDB_TYPE_BIT) + { + /* We want to zero out insignificant bits in the + * last word of a bit type + */ + Uint32 bitLen= col->getLength(); + Uint32 lastWordBits= bitLen & 0x1F; + if (lastWordBits) + lastWordMask= (1 << lastWordBits) -1; + } sigLen= sendLen= col->m_attrSize * col->m_arraySize; } else @@ -548,7 +559,7 @@ NdbInterpretedCode::branch_col(Uint32 br val = tempData; } - if (add_branch(Interpreter::BranchCol(c, 0, 0, false), Label) != 0) + if (add_branch(Interpreter::BranchCol(c, 0, 0), Label) != 0) DBUG_RETURN(-1); if (add1(Interpreter::BranchCol_2(attrId, sendLen)) != 0) @@ -556,7 +567,8 @@ NdbInterpretedCode::branch_col(Uint32 br /* Get value byte length rounded up to nearest 32-bit word */ Uint32 len2 = Interpreter::mod4(sendLen); - if(len2 == sendLen){ + if((len2 == sendLen) && + (lastWordMask == (Uint32)~0)){ /* Whole number of 32-bit words */ DBUG_RETURN(addN((Uint32*)val, len2 >> 2)); } else { @@ -571,7 +583,7 @@ NdbInterpretedCode::branch_col(Uint32 br char* p = (char*)&tmp; p[i] = ((char*)val)[len2+i]; } - DBUG_RETURN(add1(tmp)); + DBUG_RETURN(add1((tmp & lastWordMask))); } } @@ -581,7 +593,7 @@ NdbInterpretedCode::branch_col_eq(const Uint32 attrId, Uint32 Label) { - return branch_col(Interpreter::EQ, attrId, val, len, Label); + return branch_col(Interpreter::EQ, attrId, val, 0, Label); } int @@ -590,7 +602,7 @@ NdbInterpretedCode::branch_col_ne(const Uint32 attrId, Uint32 Label) { - return branch_col(Interpreter::NE, attrId, val, len, Label); + return branch_col(Interpreter::NE, attrId, val, 0, Label); } int @@ -599,7 +611,7 @@ NdbInterpretedCode::branch_col_lt(const Uint32 attrId, Uint32 Label) { - return branch_col(Interpreter::LT, attrId, val, len, Label); + return branch_col(Interpreter::LT, attrId, val, 0, Label); } int @@ -608,7 +620,7 @@ NdbInterpretedCode::branch_col_le(const Uint32 attrId, Uint32 Label) { - return branch_col(Interpreter::LE, attrId, val, len, Label); + return branch_col(Interpreter::LE, attrId, val, 0, Label); } int @@ -617,7 +629,7 @@ NdbInterpretedCode::branch_col_gt(const Uint32 attrId, Uint32 Label) { - return branch_col(Interpreter::GT, attrId, val, len, Label); + return branch_col(Interpreter::GT, attrId, val, 0, Label); } int @@ -626,7 +638,7 @@ NdbInterpretedCode::branch_col_ge(const Uint32 attrId, Uint32 Label) { - return branch_col(Interpreter::GE, attrId, val, len, Label); + return branch_col(Interpreter::GE, attrId, val, 0, Label); } int @@ -648,6 +660,42 @@ NdbInterpretedCode::branch_col_notlike(c } int +NdbInterpretedCode::branch_col_and_mask_eq_mask(const void * mask, + Uint32 len, + Uint32 attrId, + Uint32 label) +{ + return branch_col(Interpreter::AND_EQ_MASK, attrId, mask, 0, Label); +} + +int +NdbInterpretedCode::branch_col_and_mask_ne_mask(const void * mask, + Uint32 len, + Uint32 attrId, + Uint32 label) +{ + return branch_col(Interpreter::AND_NE_MASK, attrId, mask, 0, Label); +} + +int +NdbInterpretedCode::branch_col_and_mask_eq_zero(const void * mask, + Uint32 len, + Uint32 attrId, + Uint32 label) +{ + return branch_col(Interpreter::AND_EQ_ZERO, attrId, mask, 0, Label); +} + +int +NdbInterpretedCode::branch_col_and_mask_ne_zero(const void * mask, + Uint32 len, + Uint32 attrId, + Uint32 label) +{ + return branch_col(Interpreter::AND_NE_ZERO, attrId, mask, 0, Label); +} + +int NdbInterpretedCode::interpret_exit_ok() { return add1(Interpreter::EXIT_OK); === modified file 'storage/ndb/src/ndbapi/NdbOperationInt.cpp' --- storage/ndb/src/ndbapi/NdbOperationInt.cpp 2008-11-11 11:47:00 +0000 +++ storage/ndb/src/ndbapi/NdbOperationInt.cpp 2008-12-01 14:48:46 +0000 @@ -1067,7 +1067,7 @@ NdbOperation::insertCall(Uint32 aCall) int NdbOperation::branch_col(Uint32 type, Uint32 ColId, const void * val, Uint32 len, - bool nopad, Uint32 Label){ + Uint32 Label){ DBUG_ENTER("NdbOperation::branch_col"); DBUG_PRINT("enter", ("type: %u col:%u val: 0x%lx len: %u label: %u", @@ -1089,6 +1089,7 @@ NdbOperation::branch_col(Uint32 type, Uint32 sigLen= len; Uint32 sendLen= len; + Uint32 lastWordMask= ~0; if (val == NULL) sigLen= sendLen = 0; @@ -1096,6 +1097,16 @@ NdbOperation::branch_col(Uint32 type, if (! col->getStringType()) { /* Fixed size type */ + if (col->getType() == NDB_TYPE_BIT) + { + /* We want to zero out insignificant bits in the + * last word of a bit type + */ + Uint32 bitLen= col->getLength(); + Uint32 lastWordBits= bitLen & 0x1F; + if (lastWordBits) + lastWordMask= (1 << lastWordBits) -1; + } sigLen= sendLen= col->m_attrSize * col->m_arraySize; } else @@ -1130,7 +1141,7 @@ NdbOperation::branch_col(Uint32 type, val = tempData; } - if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1) + if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0)) == -1) DBUG_RETURN(-1); if (insertBranch(Label) == -1) @@ -1140,7 +1151,8 @@ NdbOperation::branch_col(Uint32 type, DBUG_RETURN(-1); Uint32 len2 = Interpreter::mod4(sendLen); - if(len2 == sendLen){ + if((len2 == sendLen) && + (lastWordMask == (Uint32)~0)){ insertATTRINFOloop((Uint32*)val, len2 >> 2); } else { len2 -= 4; @@ -1150,7 +1162,7 @@ NdbOperation::branch_col(Uint32 type, char* p = (char*)&tmp; p[i] = ((char*)val)[len2+i]; } - insertATTRINFO(tmp); + insertATTRINFO(tmp & lastWordMask); } theErrorLine++; @@ -1162,7 +1174,7 @@ NdbOperation::branch_col_eq(Uint32 ColId bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_eq %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::EQ, ColId, val, len, nopad, Label); + return branch_col(Interpreter::EQ, ColId, val, len, Label); } int @@ -1170,28 +1182,28 @@ NdbOperation::branch_col_ne(Uint32 ColId bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_ne %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::NE, ColId, val, len, nopad, Label); + return branch_col(Interpreter::NE, ColId, val, len, Label); } int NdbOperation::branch_col_lt(Uint32 ColId, const void * val, Uint32 len, bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_lt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::LT, ColId, val, len, nopad, Label); + return branch_col(Interpreter::LT, ColId, val, len, Label); } int NdbOperation::branch_col_le(Uint32 ColId, const void * val, Uint32 len, bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_le %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::LE, ColId, val, len, nopad, Label); + return branch_col(Interpreter::LE, ColId, val, len, Label); } int NdbOperation::branch_col_gt(Uint32 ColId, const void * val, Uint32 len, bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_gt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::GT, ColId, val, len, nopad, Label); + return branch_col(Interpreter::GT, ColId, val, len, Label); } int @@ -1199,7 +1211,7 @@ NdbOperation::branch_col_ge(Uint32 ColId bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_ge %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::GE, ColId, val, len, nopad, Label); + return branch_col(Interpreter::GE, ColId, val, len, Label); } int @@ -1207,7 +1219,7 @@ NdbOperation::branch_col_like(Uint32 Col bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_like %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::LIKE, ColId, val, len, nopad, Label); + return branch_col(Interpreter::LIKE, ColId, val, len, Label); } int @@ -1215,7 +1227,39 @@ NdbOperation::branch_col_notlike(Uint32 bool nopad, Uint32 Label){ INT_DEBUG(("branch_col_notlike %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len, nopad, Label)); - return branch_col(Interpreter::NOT_LIKE, ColId, val, len, nopad, Label); + return branch_col(Interpreter::NOT_LIKE, ColId, val, len, Label); +} + +int +NdbOperation::branch_col_and_mask_eq_mask(Uint32 ColId, const void * mask, + Uint32 len, bool nopad, Uint32 Label){ + INT_DEBUG(("branch_col_and_mask_eq_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len, + nopad, Label)); + return branch_col(Interpreter::AND_EQ_MASK, ColId, mask, len, Label); +} + +int +NdbOperation::branch_col_and_mask_ne_mask(Uint32 ColId, const void * mask, + Uint32 len, bool nopad, Uint32 Label){ + INT_DEBUG(("branch_col_and_mask_ne_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len, + nopad, Label)); + return branch_col(Interpreter::AND_NE_MASK, ColId, mask, len, Label); +} + +int +NdbOperation::branch_col_and_mask_eq_zero(Uint32 ColId, const void * mask, + Uint32 len, bool nopad, Uint32 Label){ + INT_DEBUG(("branch_col_and_mask_eq_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len, + nopad, Label)); + return branch_col(Interpreter::AND_EQ_ZERO, ColId, mask, len, Label); +} + +int +NdbOperation::branch_col_and_mask_ne_zero(Uint32 ColId, const void * mask, + Uint32 len, bool nopad, Uint32 Label){ + INT_DEBUG(("branch_col_and_mask_ne_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len, + nopad, Label)); + return branch_col(Interpreter::AND_NE_ZERO, ColId, mask, len, Label); } int === modified file 'storage/ndb/src/ndbapi/NdbScanFilter.cpp' --- storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-08-06 15:33:19 +0000 +++ storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-12-01 14:48:46 +0000 @@ -586,6 +586,42 @@ static const tab3 table3[] = { &NdbInterpretedCode::branch_col_notlike, &NdbInterpretedCode::branch_col_like, &NdbInterpretedCode::branch_col_notlike } } + + /** + * AND EQ MASK + */ + ,{ { 0, + &NdbInterpretedCode::branch_col_and_mask_ne_mask, + &NdbInterpretedCode::branch_col_and_mask_eq_mask, + &NdbInterpretedCode::branch_col_and_mask_ne_mask, + &NdbInterpretedCode::branch_col_and_mask_eq_mask } } + + /** + * AND NE MASK + */ + ,{ { 0, + &NdbInterpretedCode::branch_col_and_mask_eq_mask, + &NdbInterpretedCode::branch_col_and_mask_ne_mask, + &NdbInterpretedCode::branch_col_and_mask_eq_mask, + &NdbInterpretedCode::branch_col_and_mask_ne_mask } } + + /** + * AND EQ ZERO + */ + ,{ { 0, + &NdbInterpretedCode::branch_col_and_mask_ne_zero, + &NdbInterpretedCode::branch_col_and_mask_eq_zero, + &NdbInterpretedCode::branch_col_and_mask_ne_zero, + &NdbInterpretedCode::branch_col_and_mask_eq_zero } } + + /** + * AND NE ZERO + */ + ,{ { 0, + &NdbInterpretedCode::branch_col_and_mask_eq_zero, + &NdbInterpretedCode::branch_col_and_mask_ne_zero, + &NdbInterpretedCode::branch_col_and_mask_eq_zero, + &NdbInterpretedCode::branch_col_and_mask_ne_zero } } }; const int tab3_sz = sizeof(table3)/sizeof(table3[0]); @@ -664,6 +700,14 @@ NdbScanFilter::cmp(BinaryCondition cond, return m_impl.cond_col_const(Interpreter::LIKE, ColId, val, len); case COND_NOT_LIKE: return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len); + case COND_AND_EQ_MASK: + return m_impl.cond_col_const(Interpreter::AND_EQ_MASK, ColId, val, len); + case COND_AND_NE_MASK: + return m_impl.cond_col_const(Interpreter::AND_NE_MASK, ColId, val, len); + case COND_AND_EQ_ZERO: + return m_impl.cond_col_const(Interpreter::AND_EQ_ZERO, ColId, val, len); + case COND_AND_NE_ZERO: + return m_impl.cond_col_const(Interpreter::AND_NE_ZERO, ColId, val, len); } return -1; } === modified file 'storage/ndb/test/ndbapi/testScanFilter.cpp' --- storage/ndb/test/ndbapi/testScanFilter.cpp 2008-11-10 11:41:44 +0000 +++ storage/ndb/test/ndbapi/testScanFilter.cpp 2008-12-01 14:48:46 +0000 @@ -88,6 +88,49 @@ static const NDBT_Table MYTAB1(TABLE_NAME, sizeof(MYTAB1Attribs)/sizeof(NDBT_Attribute), MYTAB1Attribs); +static +const +NDBT_Attribute MYTAB2Attribs[] = { + NDBT_Attribute("id", NdbDictionary::Column::Unsigned, 1, true), + // _pk _nullable + NDBT_Attribute("1bitnn", NdbDictionary::Column::Bit, 1, false, false), + NDBT_Attribute("1bitnu", NdbDictionary::Column::Bit, 1, false, true), + NDBT_Attribute("2bitnn", NdbDictionary::Column::Bit, 2, false, false), + NDBT_Attribute("2bitnu", NdbDictionary::Column::Bit, 2, false, true), + NDBT_Attribute("7bitnn", NdbDictionary::Column::Bit, 7, false, false), + NDBT_Attribute("7bitnu", NdbDictionary::Column::Bit, 7, false, true), + NDBT_Attribute("8bitnn", NdbDictionary::Column::Bit, 8, false, false), + NDBT_Attribute("8bitnu", NdbDictionary::Column::Bit, 8, false, true), + NDBT_Attribute("15bitnn", NdbDictionary::Column::Bit, 15, false, false), + NDBT_Attribute("15bitnu", NdbDictionary::Column::Bit, 15, false, true), + NDBT_Attribute("31bitnn", NdbDictionary::Column::Bit, 31, false, false), + NDBT_Attribute("31bitnu", NdbDictionary::Column::Bit, 31, false, true), + NDBT_Attribute("32bitnn", NdbDictionary::Column::Bit, 32, false, false), + NDBT_Attribute("32bitnu", NdbDictionary::Column::Bit, 32, false, true), + NDBT_Attribute("33bitnn", NdbDictionary::Column::Bit, 33, false, false), + NDBT_Attribute("33bitnu", NdbDictionary::Column::Bit, 33, false, true), + NDBT_Attribute("63bitnn", NdbDictionary::Column::Bit, 63, false, false), + NDBT_Attribute("63bitnu", NdbDictionary::Column::Bit, 63, false, true), + NDBT_Attribute("64bitnn", NdbDictionary::Column::Bit, 64, false, false), + NDBT_Attribute("64bitnu", NdbDictionary::Column::Bit, 64, false, true), + NDBT_Attribute("65bitnn", NdbDictionary::Column::Bit, 65, false, false), + NDBT_Attribute("65bitnu", NdbDictionary::Column::Bit, 65, false, true), + NDBT_Attribute("127bitnn", NdbDictionary::Column::Bit, 127, false, false), + NDBT_Attribute("127bitnu", NdbDictionary::Column::Bit, 127, false, true), + NDBT_Attribute("513bitnn", NdbDictionary::Column::Bit, 513, false, false), + NDBT_Attribute("513bitnu", NdbDictionary::Column::Bit, 513, false, true) +}; + +static const char* TABLE2_NAME= "MyTab2"; + +static +const +NDBT_Table MYTAB2(TABLE2_NAME, sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute), MYTAB2Attribs); + +static const int NUM_COLS= sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute); +static const int MAX_BIT_WIDTH= 513; +/* One extra row for all bits == 0 */ +static const int TOTAL_ROWS= MAX_BIT_WIDTH + 1; int createTable(Ndb* pNdb, const NdbDictionary::Table* tab, bool _temp, bool existsOk, NDBT_CreateTableHook f) @@ -1012,6 +1055,588 @@ int runScanFilterConstructorFail(NDBT_Co return NDBT_OK; } +bool getBit(const Uint32* bitMap, + int bitNum) +{ + return ((bitMap[ bitNum >> 5 ] & (1 << (bitNum & 31))) != 0); +} + +void setBit(Uint32* bitMap, + int bitNum) +{ + bitMap[ bitNum >> 5 ] |= (1 << (bitNum & 31)); +} + +enum TestConditions { + COND_LE = 0, + COND_LT = 1, + COND_GE = 2, + COND_GT = 3, + COND_EQ = 4, + COND_NE = 5, + COND_NULL = 6, + COND_NOTNULL = 7, + COND_AND_EQ_MASK = 8, + COND_AND_NE_MASK = 9, + COND_AND_EQ_ZERO = 10, + COND_AND_NE_ZERO = 11 +}; + +int getExpectedBitsSet(int rowId, + int colBitWidth) +{ + /* returns value from -1 to colBitWidth -1 + * -1 == no bits set + * 0 == bit 0 set + * 1 == bit 0 + bit 1 set + * ... + */ + return (rowId % (colBitWidth + 1)) -1; +} + +bool isNullValue(int rowId, + int colBitWidth) +{ + /* Occasionally we'll have a Null column */ + return (((rowId + colBitWidth) % 13) == 0); +} + +void getBitfieldVariants(int bitNum, int& offset, bool& invert) +{ + offset= 0; + invert= false; + if ((bitNum % 5) == 3) + { + /* Invert the mask */ + invert= true; + } + if ((bitNum % 7) == 6) + { + /* Shift the mask */ + offset= (bitNum / 2); + } +}; + + +bool isRowExpected(int rowId, + TestConditions cond, + int colBitWidth, + int bitsSetInScanFilter, + bool isNullable, + const Uint32* maskBuff) +{ + if (isNullable && isNullValue(rowId, colBitWidth)) + { + switch (cond) { + case COND_LE: + return true; // null < any value + case COND_LT: + return true; // null < any value + case COND_GE: + return false; // null < any value + case COND_GT: + return false; // null < any value + case COND_EQ: + return false; // null != any value + case COND_NE: + return true; // null != any value + case COND_NULL: + return true; // null is null + case COND_NOTNULL: + return false; + case COND_AND_EQ_MASK: + return false; // NULL AND MASK != MASK + case COND_AND_NE_MASK: + return true; // NULL AND MASK != MASK + case COND_AND_EQ_ZERO: + return false; // NULL AND MASK != 0 + case COND_AND_NE_ZERO: + return true; // NULL AND MASK != 0 + default: + printf("isRowExpected given bad condition : %u\n", + cond); + return false; + } + } + else + { + /* Not a null value */ + int expectedBitsSet= getExpectedBitsSet(rowId, colBitWidth); + + int offset= 0; + bool invert= false; + + getBitfieldVariants(bitsSetInScanFilter + 1, offset, invert); + + switch (cond) { + case COND_LE: + return expectedBitsSet <= bitsSetInScanFilter; + case COND_LT: + return expectedBitsSet < bitsSetInScanFilter; + case COND_GE: + return expectedBitsSet >= bitsSetInScanFilter; + case COND_GT: + return expectedBitsSet > bitsSetInScanFilter; + case COND_EQ: + return expectedBitsSet == bitsSetInScanFilter; + case COND_NE: + return expectedBitsSet != bitsSetInScanFilter; + case COND_NULL: + return false; + case COND_NOTNULL: + return true; + case COND_AND_EQ_MASK: + case COND_AND_NE_MASK: + { + bool result= true; + /* Compare data AND mask to the mask buff */ + for (int idx=0; idx < colBitWidth; idx++) + { + bool bitVal= (expectedBitsSet >= 0)? + (expectedBitsSet >= idx): false; + bool maskVal= getBit(maskBuff, idx); + if ((bitVal & maskVal) != maskVal) + { + /* Difference to mask, know result */ + result= false; + break; + } + } + + /* Invert result for NE condition */ + return (result ^ (cond == COND_AND_NE_MASK)); + } + case COND_AND_EQ_ZERO: + case COND_AND_NE_ZERO: + { + bool result= true; + /* Compare data AND mask to zero */ + for (int idx=0; idx < colBitWidth; idx++) + { + bool bitVal= (expectedBitsSet >= 0)? + (expectedBitsSet >= idx): false; + bool maskVal= getBit(maskBuff, idx); + if ((bitVal & maskVal) != 0) + { + /* Difference to 0, know result */ + result= false; + break; + } + } + /* Invert result for NE condition */ + return (result ^ (cond == COND_AND_NE_ZERO)); + } + default: + printf("isRowExpected given bad condition : %u\n", + cond); + return false; + } + } +} + +int insertBitRows(Ndb* pNdb) +{ + const NdbDictionary::Dictionary* myDict= pNdb->getDictionary(); + const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME); + if(myTable == NULL) + APIERROR(myDict->getNdbError()); + + for (int i=0; i< TOTAL_ROWS; i++) + { + NdbTransaction* myTrans = pNdb->startTransaction(); + if (myTrans == NULL) + APIERROR(pNdb->getNdbError()); + + NdbOperation* insertOp= myTrans->getNdbOperation(myTable); + if (insertOp == NULL) + APIERROR(pNdb->getNdbError()); + + const int buffSize= (MAX_BIT_WIDTH + 31) / 32; + Uint32 buff[ buffSize ]; + + if (insertOp->insertTuple() != 0) + APIERROR(insertOp->getNdbError()); + + if (insertOp->equal((Uint32)0, i) != 0) // Set id column + APIERROR(insertOp->getNdbError()); + + for (int col=1; col < myTable->getNoOfColumns(); col++) + { + const NdbDictionary::Column* c= myTable->getColumn(col); + int colBitWidth= c->getLength(); + bool isNullable= c->getNullable(); + + if (isNullable && + isNullValue(i, colBitWidth)) + { + /* Set column value to NULL */ + if (insertOp->setValue(col, (char*) NULL) != 0) + APIERROR(insertOp->getNdbError()); + } + else + { + /* Set lowest bits in this column */ + memset(buff, 0, (4 * buffSize)); + + int bitsToSet= getExpectedBitsSet(i, colBitWidth); + + if (bitsToSet >= 0) + { + for (int idx=0; idx <= bitsToSet; idx++) + setBit(buff, idx); + } + + if (insertOp->setValue(col, (char *)buff) != 0) + APIERROR(insertOp->getNdbError()); + } + } + + if (myTrans->execute(NdbTransaction::Commit) != 0) + { + APIERROR(myTrans->getNdbError()); + } + myTrans->close(); + } + + printf("Inserted %u rows\n", TOTAL_ROWS); + + return NDBT_OK; +} + +int verifyBitData(Ndb* pNdb) +{ + const NdbDictionary::Dictionary* myDict= pNdb->getDictionary(); + const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME); + if(myTable == NULL) + APIERROR(myDict->getNdbError()); + + NdbTransaction* myTrans= pNdb->startTransaction(); + if (myTrans == NULL) + APIERROR(pNdb->getNdbError()); + + NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable); + if (scanOp == NULL) + APIERROR(pNdb->getNdbError()); + + if (scanOp->readTuples() != 0) + APIERROR(scanOp->getNdbError()); + + NdbRecAttr* results[ NUM_COLS ]; + + for (int col=0; col< NUM_COLS; col++) + { + if ((results[col]= scanOp->getValue(col)) == NULL) + APIERROR(scanOp->getNdbError()); + } + + if (myTrans->execute(NdbTransaction::NoCommit) != 0) + APIERROR(myTrans->getNdbError()); + + for (int row=0; row < TOTAL_ROWS; row++) + { + if (scanOp->nextResult() != 0) + APIERROR(scanOp->getNdbError()); + + int rowId= results[0]->int32_value(); + + for (int col=1; col < NUM_COLS; col++) + { + const NdbDictionary::Column* c= myTable->getColumn(col); + bool isNullable= c->getNullable(); + int colBitWidth= c->getLength(); + + if (isNullable && + isNullValue(rowId, colBitWidth)) + { + if (!results[ col ] ->isNULL()) + { + printf("Mismatch at result %d row %d, column %d, expected NULL\n", + row, rowId, col); + myTrans->close(); + return NDBT_FAILED; + } + } + else + { + /* Non null value, check it */ + int expectedSetBits= getExpectedBitsSet(rowId, colBitWidth); + + const Uint32* val= (const Uint32 *) results[ col ]->aRef(); + + for (int bitNum=0; bitNum < colBitWidth; bitNum++) + { + bool expectClear= (bitNum > expectedSetBits); + bool isClear= ! getBit(val, bitNum); + if (expectClear != isClear) + { + printf("Mismatch at result %d row %d, column %d, bit %d" + " expected %d \n", + row, rowId, col, bitNum, (expectClear)?0:1); + myTrans->close(); + return NDBT_FAILED; + } + } + } + } + } + + if (scanOp->nextResult() != 1) + { + printf("Too many rows returned\n"); + return NDBT_FAILED; + } + + if (myTrans->execute(NdbTransaction::Commit) != 0) + APIERROR(myTrans->getNdbError()); + + myTrans->close(); + + printf("Verified data for %u rows\n", + TOTAL_ROWS); + + return NDBT_OK; +} + +int verifyBitScanFilter(Ndb* pNdb) +{ + const NdbDictionary::Dictionary* myDict= pNdb->getDictionary(); + const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME); + if(myTable == NULL) + APIERROR(myDict->getNdbError()); + + /* Perform scan with scanfilter for : + * - each column in the table + * - each supported comparison type + * - each potentially set bit in the column + */ + int scanCount= 0; + for (int col=1; col < NUM_COLS; col++) + { + const NdbDictionary::Column* c= myTable->getColumn(col); + const int bitWidth= c->getLength(); + printf("Testing %s column %u (width=%u bits) with %u scan filter variants\n", + (c->getNullable())?"Nullable":"Non-null", + col, bitWidth, ((bitWidth+1) * (COND_AND_NE_ZERO + 1))); + for (int comp=0; comp <= COND_AND_NE_ZERO; comp++) + { + for (int bitNum=0; bitNum <= bitWidth; bitNum++) + { + /* Define scan */ + NdbTransaction* myTrans= pNdb->startTransaction(); + if (myTrans == NULL) + APIERROR(pNdb->getNdbError()); + + NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable); + if (scanOp == NULL) + APIERROR(pNdb->getNdbError()); + + if (scanOp->readTuples() != 0) + APIERROR(scanOp->getNdbError()); + + NdbRecAttr* ra; + if ((ra= scanOp->getValue((Uint32)0)) == NULL) + APIERROR(scanOp->getNdbError()); + + /* Define ScanFilter */ + const int buffSize= (MAX_BIT_WIDTH + 31)/32; + Uint32 buff[ buffSize ]; + memset(buff, 0, (4 * buffSize)); + + /* Define constant value, with some variants for bitwise operators */ + bool invert= false; + int offset= 0; + + switch (comp) { + case COND_AND_EQ_MASK: + case COND_AND_NE_MASK: + case COND_AND_EQ_ZERO: + case COND_AND_NE_ZERO: + getBitfieldVariants(bitNum, offset, invert); + default: + ; + } + + /* Set lower bitNum -1 bits + * If bitNum == 0, set none + */ + int bitsSetInFilter= bitNum-1; + + if ((bitsSetInFilter >= 0) || + invert) + { + for (int idx=0; idx <= (32 * buffSize); idx++) + { + if ((idx >= offset) && + (idx <= (offset + bitsSetInFilter))) + { + if (!invert) + setBit(buff, idx); + } + else + { + if (invert) + setBit(buff, idx); + } + } + } + + + + NdbScanFilter sf(scanOp); + + if (sf.begin(NdbScanFilter::AND) != 0) + APIERROR(sf.getNdbError()); + + if ((comp != COND_NULL) && + (comp != COND_NOTNULL)) + { + /* Operator with a constant */ + if (sf.cmp((NdbScanFilter::BinaryCondition)comp, + col, + (char*)buff) != 0) + APIERROR(sf.getNdbError()); + } + else + { + switch (comp) { + case COND_NULL: + if (sf.isnull(col) != 0) + APIERROR(sf.getNdbError()); + break; + case COND_NOTNULL: + if (sf.isnotnull(col) != 0) + APIERROR(sf.getNdbError()); + break; + default: + printf("Condition %u not supported\n", comp); + return NDBT_FAILED; + } + } + + if (sf.end() != 0) + APIERROR(sf.getNdbError()); + + /* Calculate expected number of rows in result */ + const NdbDictionary::Column* c= myTable->getColumn(col); + int colBitWidth= c->getLength(); + bool isNullable= c->getNullable(); + + // printf("Determining expected rows\n"); + int expectedResultCount= 0; + for (int i=0; i< TOTAL_ROWS; i++) + { + if (isRowExpected(i, + (TestConditions)comp, + colBitWidth, + bitsSetInFilter, + isNullable, + buff)) + expectedResultCount++; + } + + + /* Execute */ + if (myTrans->execute(NdbTransaction::NoCommit) != 0) + APIERROR(myTrans->getNdbError()); + + + /* Process results to ensure we got the expected rows back */ + int rc= 0; + int count= 0; + int matchCount= 0; + // printf("Checking rows returned\n"); + while ((rc= scanOp->nextResult()) == 0) + { + int rowId= ra->int32_value(); + count++; + /* + * Check that this row was expected + */ + if (isRowExpected(rowId, + (TestConditions)comp, + colBitWidth, + bitsSetInFilter, + isNullable, + buff)) + { + matchCount++; + } + else + { + printf("Col=%u Comp=%u BitNum=%u row=%u : " + "Got row %u back which I did not expect\n", + col, comp, bitNum, count, rowId); + myTrans->close(); + return NDBT_FAILED; + } + } + + if (rc != 1) + { + printf("Col=%u Comp=%u BitNum=%u :" + "nextResult failure %d\n", col, comp, bitNum, rc); + APIERROR(myTrans->getNdbError()); + } + + //printf("Column %u, Comp=%u bitNum=%u num expectedResults=%u matchCount=%u\n", + // col, comp, bitNum, expectedResultCount, matchCount); + + /* Check that we didn't miss any expected rows */ + if (matchCount != expectedResultCount) + { + printf("Col=%u Comp=%u BitNum=%u :" + "Mismatch between expected(%u) and received(%u) result counts\n", + col, comp, bitNum, expectedResultCount, matchCount); + myTrans->close(); + return NDBT_FAILED; + } + + if (myTrans->execute(NdbTransaction::Commit) != 0) + APIERROR(myTrans->getNdbError()); + + myTrans->close(); + + scanCount++; + + } // for bitNum + } // for comparison + } // for column + + printf("Verified %u scans with bitfield ScanFilter conditions\n", + scanCount); + + return NDBT_OK; +} + + +int runTestScanFilterBit(NDBT_Context* ctx, NDBT_Step* step) +{ + /* Create table */ + Ndb *pNdb = GETNDB(step); + pNdb->getDictionary()->dropTable(MYTAB2.getName()); + int ret = createTable(pNdb, &MYTAB2, false, true, 0); + if(ret) + return ret; + + /* Populate with data */ + if (insertBitRows(pNdb) != NDBT_OK) + return NDBT_FAILED; + + /* Initial data check via scan */ + if (verifyBitData(pNdb) != NDBT_OK) + return NDBT_FAILED; + + /* Verify Bit ScanFilter correctness */ + if (verifyBitScanFilter(pNdb) != NDBT_OK) + return NDBT_FAILED; + + /* Drop table */ + pNdb->getDictionary()->dropTable(MYTAB2.getName()); + + return NDBT_OK; +} + + NDBT_TESTSUITE(testScanFilter); TESTCASE(TEST_NAME, "Scan table TABLE_NAME for the records which accord with \ @@ -1025,6 +1650,12 @@ TESTCASE(TEST_NAME, FINALIZER(runDropTables); } +TESTCASE("TestScanFilterBit", + "Test ScanFilter with bitfield columns") +{ + INITIALIZER(runTestScanFilterBit); +} + NDBT_TESTSUITE_END(testScanFilter); @@ -1038,5 +1669,5 @@ int main(int argc, const char** argv) return NDBT_ProgramExit(NDBT_FAILED); } - return testScanFilter.executeOneCtx(con, &MYTAB1, TEST_NAME); + return testScanFilter.execute(argc, argv); } === modified file 'storage/ndb/test/run-test/daily-basic-tests.txt' --- storage/ndb/test/run-test/daily-basic-tests.txt 2008-11-08 21:22:57 +0000 +++ storage/ndb/test/run-test/daily-basic-tests.txt 2008-12-01 14:48:46 +0000 @@ -1181,7 +1181,7 @@ args -n bug37672 T1 #EOF 2008-07-04 max-time: 500 -cmd: testScanFilter +cmd: testScanFilter T1 args: #EOF 2008-07-09