Index: src/btree.c ================================================================== --- src/btree.c +++ src/btree.c @@ -4545,11 +4545,11 @@ i64 intKey, /* The table key */ int biasRight, /* If true, bias the search to the high end */ int *pRes /* Write search results here */ ){ int rc; - int (*xRecordCompare)(int, const void*, UnpackedRecord*); + RecordCompare xRecordCompare; assert( cursorHoldsMutex(pCur) ); assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) ); assert( pRes ); assert( (pIdxKey==0)==(pCur->pKeyInfo==0) ); @@ -4569,11 +4569,14 @@ } } if( pIdxKey ){ xRecordCompare = sqlite3VdbeFindCompare(pIdxKey); - assert( pIdxKey->default_rc==1 || pIdxKey->default_rc==0 || pIdxKey->default_rc==-1); + assert( pIdxKey->default_rc==1 + || pIdxKey->default_rc==0 + || pIdxKey->default_rc==-1 + ); } rc = moveToRoot(pCur); if( rc ){ return rc; @@ -4656,18 +4659,18 @@ if( nCell<=pPage->max1bytePayload ){ /* This branch runs if the record-size field of the cell is a ** single byte varint and the record fits entirely on the main ** b-tree page. */ testcase( pCell+nCell+1==pPage->aDataEnd ); - c = xRecordCompare(nCell, (void*)&pCell[1], pIdxKey); + c = xRecordCompare(nCell, (void*)&pCell[1], pCell[1], 1, pIdxKey); }else if( !(pCell[1] & 0x80) && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal ){ /* The record-size field is a 2 byte varint and the record ** fits entirely on the main b-tree page. */ testcase( pCell+nCell+2==pPage->aDataEnd ); - c = xRecordCompare(nCell, (void*)&pCell[2], pIdxKey); + c = xRecordCompare(nCell, (void*)&pCell[2], pCell[2], 1, pIdxKey); }else{ /* The record flows over onto one or more overflow pages. In ** this case the whole cell needs to be parsed, a buffer allocated ** and accessPayload() used to retrieve the record into the ** buffer before VdbeRecordCompare() can be called. */ @@ -4684,11 +4687,11 @@ rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0); if( rc ){ sqlite3_free(pCellKey); goto moveto_finish; } - c = xRecordCompare(nCell, pCellKey, pIdxKey); + c = xRecordCompare(nCell, pCellKey, ((u8*)pCellKey)[0], 1, pIdxKey); sqlite3_free(pCellKey); } if( c<0 ){ lwr = idx+1; }else if( c>0 ){ Index: src/sqliteInt.h ================================================================== --- src/sqliteInt.h +++ src/sqliteInt.h @@ -1590,10 +1590,12 @@ struct UnpackedRecord { KeyInfo *pKeyInfo; /* Collation and sort-order information */ u16 nField; /* Number of entries in apMem[] */ char default_rc; /* Comparison result if keys are equal */ Mem *aMem; /* Values */ + int r1; + int r2; }; /* ** Each SQL index is represented in memory by an Index: src/vdbe.h ================================================================== --- src/vdbe.h +++ src/vdbe.h @@ -212,12 +212,13 @@ void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*); int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*); UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **); -typedef int (*RecordCompare)(int,const void*, UnpackedRecord*); -RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *); +typedef int (*RecordCompare)(int,const void*,int,u32,UnpackedRecord*); +RecordCompare sqlite3VdbeFindCompare(UnpackedRecord*); +RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo*); #ifndef SQLITE_OMIT_TRIGGER void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *); #endif Index: src/vdbeaux.c ================================================================== --- src/vdbeaux.c +++ src/vdbeaux.c @@ -3121,94 +3121,106 @@ } assert( u<=pKeyInfo->nField + 1 ); p->nField = u; } -static int vdbeRecordCompareString( - int nKey1, const void *pKey1, /* Left key */ - UnpackedRecord *pPKey2 /* Right key */ -){ - const u8 *aKey1 = (const u8*)pKey1; - int szHdr; - int serial_type; - int res; - - szHdr = aKey1[0]; - getVarint32(&aKey1[1], serial_type); - - if( serial_type<12 ){ - res = -1; /* (pKey1/nKey1) is a number or a null */ - }else if( !(serial_type & 0x01) ){ - res = +1; /* (pKey1/nKey1) is a blob */ - }else{ - int nCmp; - int nStr; - aKey1 = &aKey1[szHdr]; - - nStr = (serial_type-12) / 2; - if( (szHdr + nStr) > nKey1 ) return 0; /* Corruption */ - nCmp = MIN( pPKey2->aMem[0].n, nStr ); - res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp); - - if( res==0 ){ - res = nStr - pPKey2->aMem[0].n; - if( res==0 ) res = pPKey2->default_rc; - } - } - - assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0) - || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0) - || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0) - ); - return res; -} - -static int vdbeRecordCompareInt( - int nKey1, const void *pKey1, /* Left key */ - UnpackedRecord *pPKey2 /* Right key */ -){ - const u8 *aKey1 = (const u8*)pKey1; - int szHdr; - int serial_type; - int res; - - szHdr = aKey1[0]; - getVarint32(&aKey1[1], serial_type); - - if( serial_type==0 ){ - res = -1; /* NULL values are smaller than integers */ - }else if( serial_type>=12 ){ - res = +1; /* text/blob values are greater */ - }else{ - Mem mem; - sqlite3VdbeSerialGet(&aKey1[szHdr], serial_type, &mem); - if( mem.flags & MEM_Int ){ - i64 v = pPKey2->aMem[0].u.i; - if( v>mem.u.i ){ - res = -1; - }else if( vdefault_rc; - } - }else{ - double v = (double)pPKey2->aMem[0].u.i; - if( v>mem.r ){ - res = -1; - }else if( vdefault_rc; - } - } - } - - assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0) - || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0) - || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0) - ); - return res; +/* +** This function compares the two table rows or index records +** specified by {nKey1, pKey1} and pPKey2. It returns a negative, zero +** or positive integer if key1 is less than, equal to or +** greater than key2. The {nKey1, pKey1} key must be a blob +** created by th OP_MakeRecord opcode of the VDBE. The pPKey2 +** key must be a parsed key such as obtained from +** sqlite3VdbeParseRecord. +** +** Key1 and Key2 do not have to contain the same number of fields. +** The key with fewer fields is usually compares less than the +** longer key. However if the UNPACKED_INCRKEY flags in pPKey2 is set +** and the common prefixes are equal, then key1 is less than key2. +** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are +** equal, then the keys are considered to be equal and +** the parts beyond the common prefix are ignored. +*/ +static int vdbeRecordComparePrev( + int nKey1, const void *pKey1, /* Left key */ + UnpackedRecord *pPKey2 /* Right key */ +){ + u32 d1; /* Offset into aKey[] of next data element */ + u32 idx1; /* Offset into aKey[] of next header element */ + u32 szHdr1; /* Number of bytes in header */ + int i = 0; + int rc = 0; + const unsigned char *aKey1 = (const unsigned char *)pKey1; + KeyInfo *pKeyInfo; + Mem mem1; + + pKeyInfo = pPKey2->pKeyInfo; + mem1.enc = pKeyInfo->enc; + mem1.db = pKeyInfo->db; + /* mem1.flags = 0; // Will be initialized by sqlite3VdbeSerialGet() */ + VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */ + + /* Compilers may complain that mem1.u.i is potentially uninitialized. + ** We could initialize it, as shown here, to silence those complaints. + ** But in fact, mem1.u.i will never actually be used uninitialized, and doing + ** the unnecessary initialization has a measurable negative performance + ** impact, since this routine is a very high runner. And so, we choose + ** to ignore the compiler warnings and leave this variable uninitialized. + */ + /* mem1.u.i = 0; // not needed, here to silence compiler warning */ + + idx1 = getVarint32(aKey1, szHdr1); + d1 = szHdr1; + assert( pKeyInfo->nField+pKeyInfo->nXField>=pPKey2->nField || CORRUPT_DB ); + assert( pKeyInfo->aSortOrder!=0 ); + assert( pKeyInfo->nField>0 ); + assert( idx1<=szHdr1 || CORRUPT_DB ); + do{ + u32 serial_type1; + + /* Read the serial types for the next element in each key. */ + idx1 += getVarint32( aKey1+idx1, serial_type1 ); + + /* Verify that there is enough key space remaining to avoid + ** a buffer overread. The "d1+serial_type1+2" subexpression will + ** always be greater than or equal to the amount of required key space. + ** Use that approximation to avoid the more expensive call to + ** sqlite3VdbeSerialTypeLen() in the common case. + */ + if( d1+serial_type1+2>(u32)nKey1 + && d1+sqlite3VdbeSerialTypeLen(serial_type1)>(u32)nKey1 + ){ + break; + } + + /* Extract the values to be compared. + */ + d1 += sqlite3VdbeSerialGet(&aKey1[d1], serial_type1, &mem1); + + /* Do the comparison + */ + rc = sqlite3MemCompare(&mem1, &pPKey2->aMem[i], pKeyInfo->aColl[i]); + if( rc!=0 ){ + assert( mem1.zMalloc==0 ); /* See comment below */ + if( pKeyInfo->aSortOrder[i] ){ + rc = -rc; /* Invert the result for DESC sort order. */ + } + return rc; + } + i++; + }while( idx1nField ); + + /* No memory allocation is ever used on mem1. Prove this using + ** the following assert(). If the assert() fails, it indicates a + ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1). + */ + assert( mem1.zMalloc==0 ); + + /* rc==0 here means that one of the keys ran out of fields and + ** all the fields up to that point were equal. Return the the default_rc + ** value. */ + return pPKey2->default_rc; } static int vdbeCompareMemString( const Mem *pMem1, const Mem *pMem2, @@ -3330,164 +3342,100 @@ } return rc; } -/* -** This function compares the two table rows or index records -** specified by {nKey1, pKey1} and pPKey2. It returns a negative, zero -** or positive integer if key1 is less than, equal to or -** greater than key2. The {nKey1, pKey1} key must be a blob -** created by th OP_MakeRecord opcode of the VDBE. The pPKey2 -** key must be a parsed key such as obtained from -** sqlite3VdbeParseRecord. -** -** Key1 and Key2 do not have to contain the same number of fields. -** The key with fewer fields is usually compares less than the -** longer key. However if the UNPACKED_INCRKEY flags in pPKey2 is set -** and the common prefixes are equal, then key1 is less than key2. -** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are -** equal, then the keys are considered to be equal and -** the parts beyond the common prefix are ignored. -*/ -static int vdbeRecordComparePrev( - int nKey1, const void *pKey1, /* Left key */ - UnpackedRecord *pPKey2 /* Right key */ -){ - u32 d1; /* Offset into aKey[] of next data element */ - u32 idx1; /* Offset into aKey[] of next header element */ - u32 szHdr1; /* Number of bytes in header */ - int i = 0; - int rc = 0; - const unsigned char *aKey1 = (const unsigned char *)pKey1; - KeyInfo *pKeyInfo; - Mem mem1; - - pKeyInfo = pPKey2->pKeyInfo; - mem1.enc = pKeyInfo->enc; - mem1.db = pKeyInfo->db; - /* mem1.flags = 0; // Will be initialized by sqlite3VdbeSerialGet() */ - VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */ - - /* Compilers may complain that mem1.u.i is potentially uninitialized. - ** We could initialize it, as shown here, to silence those complaints. - ** But in fact, mem1.u.i will never actually be used uninitialized, and doing - ** the unnecessary initialization has a measurable negative performance - ** impact, since this routine is a very high runner. And so, we choose - ** to ignore the compiler warnings and leave this variable uninitialized. - */ - /* mem1.u.i = 0; // not needed, here to silence compiler warning */ - - idx1 = getVarint32(aKey1, szHdr1); - d1 = szHdr1; - assert( pKeyInfo->nField+pKeyInfo->nXField>=pPKey2->nField || CORRUPT_DB ); - assert( pKeyInfo->aSortOrder!=0 ); - assert( pKeyInfo->nField>0 ); - assert( idx1<=szHdr1 || CORRUPT_DB ); - do{ - u32 serial_type1; - - /* Read the serial types for the next element in each key. */ - idx1 += getVarint32( aKey1+idx1, serial_type1 ); - - /* Verify that there is enough key space remaining to avoid - ** a buffer overread. The "d1+serial_type1+2" subexpression will - ** always be greater than or equal to the amount of required key space. - ** Use that approximation to avoid the more expensive call to - ** sqlite3VdbeSerialTypeLen() in the common case. - */ - if( d1+serial_type1+2>(u32)nKey1 - && d1+sqlite3VdbeSerialTypeLen(serial_type1)>(u32)nKey1 - ){ - break; - } - - /* Extract the values to be compared. - */ - d1 += sqlite3VdbeSerialGet(&aKey1[d1], serial_type1, &mem1); - - /* Do the comparison - */ - rc = sqlite3MemCompare(&mem1, &pPKey2->aMem[i], pKeyInfo->aColl[i]); - if( rc!=0 ){ - assert( mem1.zMalloc==0 ); /* See comment below */ - if( pKeyInfo->aSortOrder[i] ){ - rc = -rc; /* Invert the result for DESC sort order. */ - } - return rc; - } - i++; - }while( idx1nField ); - - /* No memory allocation is ever used on mem1. Prove this using - ** the following assert(). If the assert() fails, it indicates a - ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1). - */ - assert( mem1.zMalloc==0 ); - - /* rc==0 here means that one of the keys ran out of fields and - ** all the fields up to that point were equal. Return the the default_rc - ** value. */ - return pPKey2->default_rc; -} - - -int sqlite3VdbeRecordCompare( - int nKey1, const void *pKey1, /* Left key */ - UnpackedRecord *pPKey2 /* Right key */ -){ - u32 d1; /* Offset into aKey[] of next data element */ - u32 idx1; /* Offset into aKey[] of next header element */ - u32 szHdr1; /* Number of bytes in header */ - int i = 0; - int rc = 0; +static i64 vdbeRecordDecodeInt(u32 serial_type, const u8 *aKey){ + switch( serial_type ){ + case 1: + return (char)aKey[0]; + case 2: + return ((char)aKey[0] << 8) | aKey[1]; + case 3: + return ((char)aKey[0] << 16) | (aKey[1] << 8) | aKey[2]; + case 4: + return ((char)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3]; + + case 5: { + i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3]; + u32 lsw = (aKey[4] << 8) | aKey[5]; + return (i64)( msw << 16 | (u64)lsw ); + } + + case 6: { + i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3]; + u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7]; + return (i64)( msw << 32 | (u64)lsw ); + } + } + + return (serial_type - 8); +} + +static int vdbeRecordCompare( + int nKey1, const void *pKey1, /* Left key */ + int szHdr1, /* Size of record header in bytes */ + u32 idx1, /* Offset of first type in header */ + UnpackedRecord *const pPKey2 /* Right key */ +){ + u32 d1 = szHdr1; /* Offset into aKey[] of next data element */ + int i = 0; + int rc = 0; + Mem *pRhs = pPKey2->aMem; KeyInfo *pKeyInfo = pPKey2->pKeyInfo; const unsigned char *aKey1 = (const unsigned char *)pKey1; Mem mem1; #ifdef SQLITE_DEBUG int expected = vdbeRecordComparePrev(nKey1, pKey1, pPKey2); static int nCall = 0; nCall++; #endif + + /* If idx==0, then the caller has already determined that the first two + ** elements in the keys are equal. Fix the various stack variables so + ** that this routine begins comparing at the second field. */ + if( idx1==0 ){ + u32 s1; + assert( sqlite3VarintLen(szHdr1)==1 ); + idx1 = 1 + getVarint32(&aKey1[1], s1); + d1 += sqlite3VdbeSerialTypeLen(s1); + i = 1; + pRhs++; + } VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */ - - idx1 = getVarint32(aKey1, szHdr1); - d1 = szHdr1; assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField || CORRUPT_DB ); assert( pPKey2->pKeyInfo->aSortOrder!=0 ); assert( pPKey2->pKeyInfo->nField>0 ); assert( idx1<=szHdr1 || CORRUPT_DB ); do{ - Mem *pRhs = &pPKey2->aMem[i]; u32 serial_type; /* RHS is an integer */ if( pRhs->flags & MEM_Int ){ serial_type = aKey1[idx1]; if( serial_type>=12 ){ rc = +1; }else if( serial_type==0 ){ rc = -1; - }else{ + }else if( serial_type==7 ){ + double rhs = (double)pRhs->u.i; sqlite3VdbeSerialGet(&aKey1[d1], serial_type, &mem1); - if( serial_type==7 ){ - double rhs = (double)pRhs->u.i; - if( mem1.rrhs ){ - rc = +1; - } - }else{ - i64 rhs = pRhs->u.i; - if( mem1.u.irhs ){ - rc = +1; - } + if( mem1.rrhs ){ + rc = +1; + } + }else{ + i64 lhs = vdbeRecordDecodeInt(serial_type, &aKey1[d1]); + i64 rhs = pRhs->u.i; + if( lhsrhs ){ + rc = +1; } } } /* RHS is real */ @@ -3577,10 +3525,11 @@ assert( (rc<0 && expected<0) || (rc>0 && expected>0) || CORRUPT_DB ); return rc; } i++; + pRhs++; d1 += sqlite3VdbeSerialTypeLen(serial_type); idx1 += sqlite3VarintLen(serial_type); }while( idx1nField && d1<=nKey1 ); /* No memory allocation is ever used on mem1. Prove this using @@ -3593,25 +3542,189 @@ ** all the fields up to that point were equal. Return the the default_rc ** value. */ assert( pPKey2->default_rc==expected ); return pPKey2->default_rc; } + +static int vdbeRecordCompareInt( + int nKey1, const void *pKey1, /* Left key */ + int szHdr, + u32 idx1, + UnpackedRecord *pPKey2 /* Right key */ +){ + const u8 *aKey = &((const u8*)pKey1)[szHdr]; + int serial_type = ((const u8*)pKey1)[1]; + int res; + i64 v = pPKey2->aMem[0].u.i; + i64 lhs; + + switch( serial_type ){ + case 1: + lhs = (char)(aKey[0]); + break; + case 2: + lhs = 256*(signed char)aKey[0] + aKey[1]; + break; + case 3: + lhs = 65536*(char)aKey[0] | (aKey[1]<<8) | aKey[2]; + break; + case 4: + lhs = (int)(((u32)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3]); + break; + + case 5: { + i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3]; + u32 lsw = (aKey[4] << 8) | aKey[5]; + lhs = (i64)( msw << 16 | (u64)lsw ); + break; + } + + case 6: { + i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3]; + u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7]; + lhs = (i64)( msw << 32 | (u64)lsw ); + break; + } + + case 8: + lhs = 0; + break; + + case 9: + lhs = 1; + break; + + default: + return vdbeRecordCompare(nKey1, pKey1, szHdr, 1, pPKey2); + } + + if( v>lhs ){ + res = pPKey2->r1; + }else if( vr2; + }else if( pPKey2->nField>1 ){ + res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2); + }else{ + res = pPKey2->default_rc; + } + + assert( (res==0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)==0) + || (res<0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)<0) + || (res>0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)>0) + || CORRUPT_DB + ); + return res; +} + +static int vdbeRecordCompareString( + int nKey1, const void *pKey1, /* Left key */ + int szHdr, + u32 idx1, + UnpackedRecord *pPKey2 /* Right key */ +){ + const u8 *aKey1 = (const u8*)pKey1; + int serial_type; + int res; + + getVarint32(&aKey1[1], serial_type); + + if( serial_type<12 ){ + res = pPKey2->r1; /* (pKey1/nKey1) is a number or a null */ + }else if( !(serial_type & 0x01) ){ + res = pPKey2->r2; /* (pKey1/nKey1) is a blob */ + }else{ + int nCmp; + int nStr; + aKey1 = &aKey1[szHdr]; + + nStr = (serial_type-12) / 2; + if( (szHdr + nStr) > nKey1 ) return 0; /* Corruption */ + nCmp = MIN( pPKey2->aMem[0].n, nStr ); + res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp); + + if( res==0 ){ + res = nStr - pPKey2->aMem[0].n; + if( res==0 ){ + if( pPKey2->nField>1 ){ + res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2); + }else{ + res = pPKey2->default_rc; + } + }else if( res>0 ){ + res = pPKey2->r2; + }else{ + res = pPKey2->r1; + } + }else if( res>0 ){ + res = pPKey2->r2; + }else{ + res = pPKey2->r1; + } + } + + assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0) + || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0) + || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0) + || CORRUPT_DB + ); + return res; +} + + +int vdbeRecordCompareLargeHeader( + int nKey1, const void *pKey1, /* Left key */ + int dummy1, u32 dummy2, /* Unused arguments */ + UnpackedRecord *pPKey2 /* Right key */ +){ + int szHdr; + u32 idx1; + idx1 = getVarint32(((u8*)pKey1), szHdr); + return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2); +} RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){ - if( p->nField==1 && p->pKeyInfo->aSortOrder[0]==0 ){ + if( (p->pKeyInfo->nField + p->pKeyInfo->nXField) > 10 ){ + return vdbeRecordCompareLargeHeader; + }else{ int flags = p->aMem[0].flags; + if( p->pKeyInfo->aSortOrder[0] ){ + p->r1 = 1; + p->r2 = -1; + }else{ + p->r1 = -1; + p->r2 = 1; + } if( (flags & MEM_Int) ){ return vdbeRecordCompareInt; - }else if( (p->aMem[0].flags&(MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 - && p->pKeyInfo->aColl[0]==0 + } + if( (flags & (MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 + && p->pKeyInfo->aColl[0]==0 ){ return vdbeRecordCompareString; } } - return sqlite3VdbeRecordCompare; + + return vdbeRecordCompare; +} + +RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo *pKeyInfo){ + if( (pKeyInfo->nField + pKeyInfo->nXField) > 10 ){ + return vdbeRecordCompareLargeHeader; + } + return vdbeRecordCompare; } +int sqlite3VdbeRecordCompare( + int nKey1, const void *pKey1, /* Left key */ + UnpackedRecord *pPKey2 /* Right key */ +){ + int szHdr; + u32 idx1; + + idx1 = getVarint32(((u8*)pKey1), szHdr); + return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2); +} /* ** pCur points at an index entry created using the OP_MakeRecord opcode. ** Read the rowid (the last field in the record) and store it in *rowid. ** Return SQLITE_OK if everything works, or an error code otherwise. Index: src/vdbesort.c ================================================================== --- src/vdbesort.c +++ src/vdbesort.c @@ -103,10 +103,11 @@ VdbeSorterIter *aIter; /* Array of iterators to merge */ int *aTree; /* Current state of incremental merge */ sqlite3_file *pTemp1; /* PMA file 1 */ SorterRecord *pRecord; /* Head of in-memory record list */ UnpackedRecord *pUnpacked; /* Used to unpack keys */ + RecordCompare xRecordCompare; /* Record compare function */ }; /* ** The following type is an iterator for a PMA. It caches the current key in ** variables nKey/aKey. If the iterator is at EOF, pFile==0. @@ -410,11 +411,14 @@ } } assert( r2->default_rc==0 ); } +#if 0 *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2); +#endif + *pRes = pSorter->xRecordCompare(nKey1, pKey1, *((u8*)pKey1), 1, r2); } /* ** This function is called to compare two iterator keys when merging ** multiple b-tree segments. Parameter iOut is the index of the aTree[] @@ -486,10 +490,11 @@ pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz; mxCache = db->aDb[0].pSchema->cache_size; if( mxCachemxPmaSize = mxCache * pgsz; } + pSorter->xRecordCompare = sqlite3VdbeFindSorterCompare(pCsr->pKeyInfo); return SQLITE_OK; } /*