/ Check-in [3861e853]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove the vdbeRecordCompareLargeHeader function. Fix some other details.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | experimental
Files: files | file ages | folders
SHA1:3861e853105cb8da344c7eebd2e455622b26395e
User & Date: dan 2014-03-01 19:44:56
Context
2014-03-01
19:45
Merge trunk changes. check-in: 9c1747b5 user: dan tags: experimental
19:44
Remove the vdbeRecordCompareLargeHeader function. Fix some other details. check-in: 3861e853 user: dan tags: experimental
2014-02-28
18:39
Update some test cases that deal with corrupt databases. check-in: 3a09f560 user: dan tags: experimental
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btree.c.

  4657   4657           */
  4658   4658           nCell = pCell[0];
  4659   4659           if( nCell<=pPage->max1bytePayload ){
  4660   4660             /* This branch runs if the record-size field of the cell is a
  4661   4661             ** single byte varint and the record fits entirely on the main
  4662   4662             ** b-tree page.  */
  4663   4663             testcase( pCell+nCell+1==pPage->aDataEnd );
  4664         -          c = xRecordCompare(nCell, (void*)&pCell[1], pCell[1], 1, pIdxKey);
         4664  +          c = xRecordCompare(nCell, (void*)&pCell[1], pIdxKey, 0);
  4665   4665           }else if( !(pCell[1] & 0x80) 
  4666   4666             && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal
  4667   4667           ){
  4668   4668             /* The record-size field is a 2 byte varint and the record 
  4669   4669             ** fits entirely on the main b-tree page.  */
  4670   4670             testcase( pCell+nCell+2==pPage->aDataEnd );
  4671         -          c = xRecordCompare(nCell, (void*)&pCell[2], pCell[2], 1, pIdxKey);
         4671  +          c = xRecordCompare(nCell, (void*)&pCell[2], pIdxKey, 0);
  4672   4672           }else{
  4673   4673             /* The record flows over onto one or more overflow pages. In
  4674   4674             ** this case the whole cell needs to be parsed, a buffer allocated
  4675   4675             ** and accessPayload() used to retrieve the record into the
  4676   4676             ** buffer before VdbeRecordCompare() can be called. */
  4677   4677             void *pCellKey;
  4678   4678             u8 * const pCellBody = pCell - pPage->childPtrSize;
................................................................................
  4685   4685             }
  4686   4686             pCur->aiIdx[pCur->iPage] = (u16)idx;
  4687   4687             rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0);
  4688   4688             if( rc ){
  4689   4689               sqlite3_free(pCellKey);
  4690   4690               goto moveto_finish;
  4691   4691             }
  4692         -          c = xRecordCompare(nCell, pCellKey, ((u8*)pCellKey)[0], 1, pIdxKey);
         4692  +          c = xRecordCompare(nCell, pCellKey, pIdxKey, 0);
  4693   4693             sqlite3_free(pCellKey);
  4694   4694           }
  4695   4695           if( c<0 ){
  4696   4696             lwr = idx+1;
  4697   4697           }else if( c>0 ){
  4698   4698             upr = idx-1;
  4699   4699           }else{

Changes to src/sqliteInt.h.

  1582   1582   ** Records are used to store the content of a table row and to store
  1583   1583   ** the key of an index.  A blob encoding of a record is created by
  1584   1584   ** the OP_MakeRecord opcode of the VDBE and is disassembled by the
  1585   1585   ** OP_Column opcode.
  1586   1586   **
  1587   1587   ** This structure holds a record that has already been disassembled
  1588   1588   ** into its constituent fields.
         1589  +**
         1590  +** The r1 and r2 member variables are only used by the optimized comparison
         1591  +** functions vdbeRecordCompareInt() and vdbeRecordCompareString().
  1589   1592   */
  1590   1593   struct UnpackedRecord {
  1591   1594     KeyInfo *pKeyInfo;  /* Collation and sort-order information */
  1592   1595     u16 nField;         /* Number of entries in apMem[] */
  1593   1596     char default_rc;    /* Comparison result if keys are equal */
  1594   1597     Mem *aMem;          /* Values */
  1595         -  int r1;
  1596         -  int r2;
         1598  +  int r1;             /* Value to return if (lhs > rhs) */
         1599  +  int r2;             /* Value to return if (rhs < lhs) */
  1597   1600   };
  1598   1601   
  1599   1602   
  1600   1603   /*
  1601   1604   ** Each SQL index is represented in memory by an
  1602   1605   ** instance of the following structure.
  1603   1606   **

Changes to src/vdbe.h.

   207    207   sqlite3_value *sqlite3VdbeGetBoundValue(Vdbe*, int, u8);
   208    208   void sqlite3VdbeSetVarmask(Vdbe*, int);
   209    209   #ifndef SQLITE_OMIT_TRACE
   210    210     char *sqlite3VdbeExpandSql(Vdbe*, const char*);
   211    211   #endif
   212    212   
   213    213   void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*);
   214         -int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*);
          214  +int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*,int);
   215    215   UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **);
   216    216   
   217         -typedef int (*RecordCompare)(int,const void*,int,u32,UnpackedRecord*);
          217  +typedef int (*RecordCompare)(int,const void*,UnpackedRecord*,int);
   218    218   RecordCompare sqlite3VdbeFindCompare(UnpackedRecord*);
   219         -RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo*);
   220    219   
   221    220   #ifndef SQLITE_OMIT_TRIGGER
   222    221   void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *);
   223    222   #endif
   224    223   
   225    224   /* Use SQLITE_ENABLE_COMMENTS to enable generation of extra comments on
   226    225   ** each VDBE opcode.

Changes to src/vdbeaux.c.

  3119   3119       pMem++;
  3120   3120       u++;
  3121   3121     }
  3122   3122     assert( u<=pKeyInfo->nField + 1 );
  3123   3123     p->nField = u;
  3124   3124   }
  3125   3125   
         3126  +#if SQLITE_DEBUG
  3126   3127   /*
  3127         -** This function compares the two table rows or index records
  3128         -** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
  3129         -** or positive integer if key1 is less than, equal to or 
  3130         -** greater than key2.  The {nKey1, pKey1} key must be a blob
  3131         -** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
  3132         -** key must be a parsed key such as obtained from
  3133         -** sqlite3VdbeParseRecord.
  3134         -**
  3135         -** Key1 and Key2 do not have to contain the same number of fields.
  3136         -** The key with fewer fields is usually compares less than the 
  3137         -** longer key.  However if the UNPACKED_INCRKEY flags in pPKey2 is set
  3138         -** and the common prefixes are equal, then key1 is less than key2.
  3139         -** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are
  3140         -** equal, then the keys are considered to be equal and
  3141         -** the parts beyond the common prefix are ignored.
         3128  +** This function compares two index or table record keys in the same way
         3129  +** as the sqlite3VdbeRecordCompare() routine. Unlike VdbeRecordCompare(),
         3130  +** this function deserializes and compares values using the
         3131  +** sqlite3VdbeSerialGet() and sqlite3MemCompare() functions. It is used
         3132  +** in assert() statements to ensure that the optimized code in
         3133  +** sqlite3VdbeRecordCompare() returns results with these two primitives.
  3142   3134   */
  3143         -static int vdbeRecordComparePrev(
         3135  +static int vdbeRecordCompareDebug(
  3144   3136     int nKey1, const void *pKey1, /* Left key */
  3145   3137     UnpackedRecord *pPKey2        /* Right key */
  3146   3138   ){
  3147   3139     u32 d1;            /* Offset into aKey[] of next data element */
  3148   3140     u32 idx1;          /* Offset into aKey[] of next header element */
  3149   3141     u32 szHdr1;        /* Number of bytes in header */
  3150   3142     int i = 0;
................................................................................
  3216   3208     assert( mem1.zMalloc==0 );
  3217   3209   
  3218   3210     /* rc==0 here means that one of the keys ran out of fields and
  3219   3211     ** all the fields up to that point were equal. Return the the default_rc
  3220   3212     ** value.  */
  3221   3213     return pPKey2->default_rc;
  3222   3214   }
         3215  +#endif
  3223   3216   
         3217  +/*
         3218  +** Both *pMem1 and *pMem2 contain string values. Compare the two values
         3219  +** using the collation sequence pColl. As usual, return a negative , zero
         3220  +** or positive value if *pMem1 is less than, equal to or greater than 
         3221  +** *pMem2, respectively. Similar in spirit to "rc = (*pMem1) - (*pMem2);".
         3222  +*/
  3224   3223   static int vdbeCompareMemString(
  3225         -  const Mem *pMem1, 
  3226         -  const Mem *pMem2, 
         3224  +  const Mem *pMem1,
         3225  +  const Mem *pMem2,
  3227   3226     const CollSeq *pColl
  3228   3227   ){
  3229   3228     if( pMem1->enc==pColl->enc ){
  3230   3229       /* The strings are already in the correct encoding.  Call the
  3231   3230        ** comparison function directly */
  3232   3231       return pColl->xCmp(pColl->pUser,pMem1->n,pMem1->z,pMem2->n,pMem2->z);
  3233   3232     }else{
................................................................................
  3340   3339     if( rc==0 ){
  3341   3340       rc = pMem1->n - pMem2->n;
  3342   3341     }
  3343   3342     return rc;
  3344   3343   }
  3345   3344   
  3346   3345   
         3346  +/*
         3347  +** The first argument passed to this function is a serial-type that
         3348  +** corresponds to an integer - all values between 1 and 9 inclusive 
         3349  +** except 7. The second points to a buffer containing an integer value
         3350  +** serialized according to serial_type. This function deserializes
         3351  +** and returns the value.
         3352  +*/
  3347   3353   static i64 vdbeRecordDecodeInt(u32 serial_type, const u8 *aKey){
         3354  +  assert( CORRUPT_DB || (serial_type>=1 && serial_type<=9 && serial_type!=7) );
  3348   3355     switch( serial_type ){
         3356  +    case 0:
  3349   3357       case 1:
  3350   3358         return (char)aKey[0];
  3351   3359       case 2:
  3352   3360         return ((char)aKey[0] << 8) | aKey[1];
  3353   3361       case 3:
  3354   3362         return ((char)aKey[0] << 16) | (aKey[1] << 8) | aKey[2];
  3355   3363       case 4:
  3356   3364         return ((char)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3];
  3357         -
  3358   3365       case 5: {
  3359   3366         i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
  3360   3367         u32 lsw = (aKey[4] << 8) | aKey[5];
  3361   3368         return (i64)( msw << 16 | (u64)lsw );
  3362   3369       }
  3363         -
  3364   3370       case 6: {
  3365   3371         i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
  3366   3372         u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
  3367   3373         return (i64)( msw << 32 | (u64)lsw );
  3368   3374       }
  3369   3375     }
  3370   3376   
  3371   3377     return (serial_type - 8);
  3372   3378   }
  3373   3379   
  3374         -static int vdbeRecordCompare(
  3375         -  int nKey1, const void *pKey1, /* Left key */
  3376         -  int szHdr1,                   /* Size of record header in bytes */
  3377         -  u32 idx1,                     /* Offset of first type in header */
  3378         -  UnpackedRecord *const pPKey2  /* Right key */
         3380  +/*
         3381  +** This function compares the two table rows or index records
         3382  +** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
         3383  +** or positive integer if key1 is less than, equal to or 
         3384  +** greater than key2.  The {nKey1, pKey1} key must be a blob
         3385  +** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
         3386  +** key must be a parsed key such as obtained from
         3387  +** sqlite3VdbeParseRecord.
         3388  +**
         3389  +** If argument bSkip is non-zero, it is assumed that the caller has already
         3390  +** determined that the first fields of the keys are equal.
         3391  +**
         3392  +** Key1 and Key2 do not have to contain the same number of fields. If all 
         3393  +** fields that appear in both keys are equal, then pPKey2->default_rc is 
         3394  +** returned.
         3395  +*/
         3396  +int sqlite3VdbeRecordCompare(
         3397  +  int nKey1, const void *pKey1,   /* Left key */
         3398  +  UnpackedRecord *const pPKey2,   /* Right key */
         3399  +  int bSkip                       /* If true, skip the first field */
  3379   3400   ){
  3380         -  u32 d1 = szHdr1;   /* Offset into aKey[] of next data element */
  3381         -  int i = 0;
  3382         -  int rc = 0;
  3383         -  Mem *pRhs = pPKey2->aMem;
         3401  +  u32 d1;                         /* Offset into aKey[] of next data element */
         3402  +  int i;                          /* Index of next field to compare */
         3403  +  int szHdr1;                     /* Size of record header in bytes */
         3404  +  u32 idx1;                       /* Offset of first type in header */
         3405  +  int rc = 0;                     /* Return value */
         3406  +  Mem *pRhs = pPKey2->aMem;       /* Next field of pPKey2 to compare */
  3384   3407     KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
  3385   3408     const unsigned char *aKey1 = (const unsigned char *)pKey1;
  3386   3409     Mem mem1;
  3387   3410   
  3388         -#ifdef SQLITE_DEBUG
  3389         -  int expected = vdbeRecordComparePrev(nKey1, pKey1, pPKey2);
  3390         -  static int nCall = 0;
  3391         -  nCall++;
  3392         -#endif
  3393         -
  3394         -  /* If idx==0, then the caller has already determined that the first two
  3395         -  ** elements in the keys are equal. Fix the various stack variables so
         3411  +  /* If bSkip is true, then the caller has already determined that the first
         3412  +  ** two elements in the keys are equal. Fix the various stack variables so
  3396   3413     ** that this routine begins comparing at the second field. */
  3397         -  if( idx1==0 ){
         3414  +  if( bSkip ){
  3398   3415       u32 s1;
  3399         -    assert( sqlite3VarintLen(szHdr1)==1 );
  3400   3416       idx1 = 1 + getVarint32(&aKey1[1], s1);
  3401         -    d1 += sqlite3VdbeSerialTypeLen(s1);
         3417  +    szHdr1 = aKey1[0];
         3418  +    d1 = szHdr1 + sqlite3VdbeSerialTypeLen(s1);
  3402   3419       i = 1;
  3403   3420       pRhs++;
         3421  +  }else{
         3422  +    idx1 = getVarint32(aKey1, szHdr1);
         3423  +    d1 = szHdr1;
         3424  +    i = 0;
  3404   3425     }
  3405   3426   
  3406   3427     VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
  3407   3428     assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField 
  3408   3429          || CORRUPT_DB );
  3409   3430     assert( pPKey2->pKeyInfo->aSortOrder!=0 );
  3410   3431     assert( pPKey2->pKeyInfo->nField>0 );
................................................................................
  3507   3528       /* RHS is null */
  3508   3529       else{
  3509   3530         serial_type = aKey1[idx1];
  3510   3531         rc = (serial_type!=0);
  3511   3532       }
  3512   3533   
  3513   3534       if( rc!=0 ){
  3514         -      assert( mem1.zMalloc==0 );  /* See comment below */
  3515   3535         if( pKeyInfo->aSortOrder[i] ){
  3516   3536           rc = -rc;
  3517         -#if 0
  3518         -        assert( (rc>0 && (rc^(int)0x80000000)<0) 
  3519         -             || (rc<0 && (rc^(int)0x80000000)>0) );
  3520         -        assert( sizeof(int)==4 );
  3521         -        rc ^= (int)0x80000000;    /* similar in spirit to: "rc = -rc;" */
  3522         -        assert( rc!=0 );
  3523         -#endif
  3524   3537         }
  3525         -      assert( (rc<0 && expected<0) || (rc>0 && expected>0) || CORRUPT_DB );
         3538  +      assert( CORRUPT_DB 
         3539  +          || (rc<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
         3540  +          || (rc>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
         3541  +      );
         3542  +      assert( mem1.zMalloc==0 );  /* See comment below */
  3526   3543         return rc;
  3527   3544       }
  3528   3545   
  3529   3546       i++;
  3530   3547       pRhs++;
  3531   3548       d1 += sqlite3VdbeSerialTypeLen(serial_type);
  3532   3549       idx1 += sqlite3VarintLen(serial_type);
  3533   3550     }while( idx1<szHdr1 && i<pPKey2->nField && d1<=nKey1 );
  3534   3551   
  3535   3552     /* No memory allocation is ever used on mem1.  Prove this using
  3536   3553     ** the following assert().  If the assert() fails, it indicates a
  3537         -  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  3538         -  */
         3554  +  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).  */
  3539   3555     assert( mem1.zMalloc==0 );
  3540   3556   
  3541   3557     /* rc==0 here means that one or both of the keys ran out of fields and
  3542   3558     ** all the fields up to that point were equal. Return the the default_rc
  3543   3559     ** value.  */
  3544         -  assert( pPKey2->default_rc==expected );
         3560  +  assert( CORRUPT_DB 
         3561  +       || pPKey2->default_rc==vdbeRecordCompareDebug(nKey1, pKey1, pPKey2) 
         3562  +  );
  3545   3563     return pPKey2->default_rc;
  3546   3564   }
  3547   3565   
         3566  +/*
         3567  +** This function is an optimized version of sqlite3VdbeRecordCompare() 
         3568  +** that (a) the first field of pPKey2 is an integer, and (b) the 
         3569  +** size-of-header varint at the start of (pKey1/nKey1) fits in a single
         3570  +** byte (i.e. is less than 128).
         3571  +*/
  3548   3572   static int vdbeRecordCompareInt(
  3549   3573     int nKey1, const void *pKey1, /* Left key */
  3550         -  int szHdr,
  3551         -  u32 idx1,
  3552         -  UnpackedRecord *pPKey2        /* Right key */
         3574  +  UnpackedRecord *pPKey2,       /* Right key */
         3575  +  int bSkip                     /* Ignored */
  3553   3576   ){
  3554         -  const u8 *aKey = &((const u8*)pKey1)[szHdr];
         3577  +  const u8 *aKey = &((const u8*)pKey1)[*(const u8*)pKey1];
  3555   3578     int serial_type = ((const u8*)pKey1)[1];
  3556   3579     int res;
  3557   3580     i64 v = pPKey2->aMem[0].u.i;
  3558   3581     i64 lhs;
         3582  +
         3583  +  assert( bSkip==0 );
  3559   3584   
  3560   3585     switch( serial_type ){
  3561         -
  3562   3586       case 1:
  3563   3587         lhs = (char)(aKey[0]);
  3564   3588         break;
  3565   3589       case 2:
  3566   3590         lhs = 256*(signed char)aKey[0] + aKey[1];
  3567   3591         break;
  3568   3592       case 3:
  3569   3593         lhs = 65536*(char)aKey[0] | (aKey[1]<<8) | aKey[2];
  3570   3594         break;
  3571   3595       case 4:
  3572   3596         lhs = (int)(((u32)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3]);
  3573   3597         break;
  3574         -
  3575   3598       case 5: {
  3576   3599         i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
  3577   3600         u32 lsw = (aKey[4] << 8) | aKey[5];
  3578   3601         lhs = (i64)( msw << 16 | (u64)lsw );
  3579   3602         break;
  3580   3603       }
  3581         -
  3582   3604       case 6: {
  3583   3605         i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
  3584   3606         u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
  3585   3607         lhs = (i64)( msw << 32 | (u64)lsw );
  3586   3608         break;
  3587   3609       }
  3588         -
  3589   3610       case 8: 
  3590   3611         lhs = 0;
  3591   3612         break;
  3592         -
  3593   3613       case 9:
  3594   3614         lhs = 1;
  3595   3615         break;
  3596   3616   
  3597   3617       /* This case could be removed without changing the results of running
  3598   3618       ** this code. Including it causes gcc to generate a faster switch 
  3599   3619       ** statement (since the range of switch targets now starts at zero and
  3600   3620       ** is contiguous) but does not cause any duplicate code to be generated
  3601   3621       ** (as gcc is clever enough to combine the two like cases). Other 
  3602   3622       ** compilers might be similar.  */ 
  3603   3623       case 0: case 7:
  3604         -      return vdbeRecordCompare(nKey1, pKey1, szHdr, 1, pPKey2);
         3624  +      return sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 0);
  3605   3625   
  3606   3626       default:
  3607         -      return vdbeRecordCompare(nKey1, pKey1, szHdr, 1, pPKey2);
         3627  +      return sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 0);
  3608   3628     }
  3609   3629   
  3610   3630     if( v>lhs ){
  3611   3631       res = pPKey2->r1;
  3612   3632     }else if( v<lhs ){
  3613   3633       res = pPKey2->r2;
  3614   3634     }else if( pPKey2->nField>1 ){
  3615   3635       /* The first fields of the two keys are equal. Compare the trailing 
  3616   3636       ** fields.  */
  3617         -    res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
         3637  +    res = sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 1);
  3618   3638     }else{
  3619   3639       /* The first fields of the two keys are equal and there are no trailing
  3620   3640       ** fields. Return pPKey2->default_rc in this case. */
  3621   3641       res = pPKey2->default_rc;
  3622   3642     }
  3623   3643   
  3624         -  assert( (res==0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)==0)
  3625         -       || (res<0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)<0)
  3626         -       || (res>0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)>0)
         3644  +  assert( (res==0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)==0)
         3645  +       || (res<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
         3646  +       || (res>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
  3627   3647          || CORRUPT_DB
  3628   3648     );
  3629   3649     return res;
  3630   3650   }
  3631   3651   
         3652  +/*
         3653  +** This function is an optimized version of sqlite3VdbeRecordCompare() 
         3654  +** that (a) the first field of pPKey2 is a string, that (b) the first field
         3655  +** uses the collation sequence BINARY and (c) that the size-of-header varint 
         3656  +** at the start of (pKey1/nKey1) fits in a single byte.
         3657  +*/
  3632   3658   static int vdbeRecordCompareString(
  3633   3659     int nKey1, const void *pKey1, /* Left key */
  3634         -  int szHdr,
  3635         -  u32 idx1,
  3636         -  UnpackedRecord *pPKey2        /* Right key */
         3660  +  UnpackedRecord *pPKey2,       /* Right key */
         3661  +  int bSkip
  3637   3662   ){
  3638   3663     const u8 *aKey1 = (const u8*)pKey1;
  3639   3664     int serial_type;
  3640   3665     int res;
  3641   3666   
         3667  +  assert( bSkip==0 );
  3642   3668     getVarint32(&aKey1[1], serial_type);
  3643   3669   
  3644   3670     if( serial_type<12 ){
  3645   3671       res = pPKey2->r1;      /* (pKey1/nKey1) is a number or a null */
  3646   3672     }else if( !(serial_type & 0x01) ){ 
  3647   3673       res = pPKey2->r2;      /* (pKey1/nKey1) is a blob */
  3648   3674     }else{
  3649   3675       int nCmp;
  3650   3676       int nStr;
  3651         -    aKey1 = &aKey1[szHdr];
         3677  +    int szHdr = aKey1[0];
  3652   3678   
  3653   3679       nStr = (serial_type-12) / 2;
  3654   3680       if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
  3655   3681       nCmp = MIN( pPKey2->aMem[0].n, nStr );
  3656         -    res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp);
         3682  +    res = memcmp(&aKey1[szHdr], pPKey2->aMem[0].z, nCmp);
  3657   3683   
  3658   3684       if( res==0 ){
  3659   3685         res = nStr - pPKey2->aMem[0].n;
  3660   3686         if( res==0 ){
  3661   3687           if( pPKey2->nField>1 ){
  3662         -          res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
         3688  +          res = sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 1);
  3663   3689           }else{
  3664   3690             res = pPKey2->default_rc;
  3665   3691           }
  3666   3692         }else if( res>0 ){
  3667   3693           res = pPKey2->r2;
  3668   3694         }else{
  3669   3695           res = pPKey2->r1;
................................................................................
  3671   3697       }else if( res>0 ){
  3672   3698         res = pPKey2->r2;
  3673   3699       }else{
  3674   3700         res = pPKey2->r1;
  3675   3701       }
  3676   3702     }
  3677   3703   
  3678         -  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
  3679         -       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
  3680         -       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
         3704  +  assert( (res==0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)==0)
         3705  +       || (res<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
         3706  +       || (res>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
  3681   3707          || CORRUPT_DB
  3682   3708     );
  3683   3709     return res;
  3684   3710   }
  3685   3711   
  3686         -
  3687         -int vdbeRecordCompareLargeHeader(
  3688         -  int nKey1, const void *pKey1, /* Left key */
  3689         -  int dummy1, u32 dummy2,       /* Unused arguments */
  3690         -  UnpackedRecord *pPKey2        /* Right key */
  3691         -){
  3692         -  int szHdr;
  3693         -  u32 idx1;
  3694         -  idx1 = getVarint32(((u8*)pKey1), szHdr);
  3695         -  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
  3696         -}
  3697         -
         3712  +/*
         3713  +** Return a pointer to an sqlite3VdbeRecordCompare() compatible function
         3714  +** suitable for comparing serialized records to the unpacked record passed
         3715  +** as the only argument.
         3716  +*/
  3698   3717   RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){
  3699         -  if( (p->pKeyInfo->nField + p->pKeyInfo->nXField) > 10 ){
  3700         -    return vdbeRecordCompareLargeHeader;
  3701         -  }else{
         3718  +  /* As the varints that make up a record header are all 5 bytes in size
         3719  +  ** or less, if the binary keys being compared have 25 or fewer fields 
         3720  +  ** then it is guaranteed that the varint at the start of every record 
         3721  +  ** (the record-header size in bytes) fits in a single byte. If this
         3722  +  ** is not the case, then sqlite3VdbeRecordCompare() must be used.  */
         3723  +  if( (p->pKeyInfo->nField + p->pKeyInfo->nXField)<=25 ){
  3702   3724       int flags = p->aMem[0].flags;
  3703   3725       if( p->pKeyInfo->aSortOrder[0] ){
  3704   3726         p->r1 = 1;
  3705   3727         p->r2 = -1;
  3706   3728       }else{
  3707   3729         p->r1 = -1;
  3708   3730         p->r2 = 1;
................................................................................
  3713   3735       if( (flags & (MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
  3714   3736           && p->pKeyInfo->aColl[0]==0 
  3715   3737       ){
  3716   3738         return vdbeRecordCompareString;
  3717   3739       }
  3718   3740     }
  3719   3741   
  3720         -  return vdbeRecordCompare;
  3721         -}
  3722         -
  3723         -RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo *pKeyInfo){
  3724         -  if( (pKeyInfo->nField + pKeyInfo->nXField) > 10 ){
  3725         -    return vdbeRecordCompareLargeHeader;
  3726         -  }
  3727         -  return vdbeRecordCompare;
  3728         -}
  3729         -
  3730         -int sqlite3VdbeRecordCompare(
  3731         -  int nKey1, const void *pKey1, /* Left key */
  3732         -  UnpackedRecord *pPKey2        /* Right key */
  3733         -){
  3734         -  int szHdr;
  3735         -  u32 idx1;
  3736         -
  3737         -  idx1 = getVarint32(((u8*)pKey1), szHdr);
  3738         -  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
         3742  +  return sqlite3VdbeRecordCompare;
  3739   3743   }
  3740   3744   
  3741   3745   /*
  3742   3746   ** pCur points at an index entry created using the OP_MakeRecord opcode.
  3743   3747   ** Read the rowid (the last field in the record) and store it in *rowid.
  3744   3748   ** Return SQLITE_OK if everything works, or an error code otherwise.
  3745   3749   **
................................................................................
  3846   3850       return SQLITE_CORRUPT_BKPT;
  3847   3851     }
  3848   3852     memset(&m, 0, sizeof(m));
  3849   3853     rc = sqlite3VdbeMemFromBtree(pC->pCursor, 0, (u32)nCellKey, 1, &m);
  3850   3854     if( rc ){
  3851   3855       return rc;
  3852   3856     }
  3853         -  *res = sqlite3VdbeRecordCompare(m.n, m.z, pUnpacked);
         3857  +  *res = sqlite3VdbeRecordCompare(m.n, m.z, pUnpacked, 0);
  3854   3858     sqlite3VdbeMemRelease(&m);
  3855   3859     return SQLITE_OK;
  3856   3860   }
  3857   3861   
  3858   3862   /*
  3859   3863   ** This routine sets the value to be returned by subsequent calls to
  3860   3864   ** sqlite3_changes() on the database handle 'db'. 

Changes to src/vdbesort.c.

   101    101     int mnPmaSize;                  /* Minimum PMA size, in bytes */
   102    102     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   103    103     VdbeSorterIter *aIter;          /* Array of iterators to merge */
   104    104     int *aTree;                     /* Current state of incremental merge */
   105    105     sqlite3_file *pTemp1;           /* PMA file 1 */
   106    106     SorterRecord *pRecord;          /* Head of in-memory record list */
   107    107     UnpackedRecord *pUnpacked;      /* Used to unpack keys */
   108         -  RecordCompare xRecordCompare;   /* Record compare function */
   109    108   };
   110    109   
   111    110   /*
   112    111   ** The following type is an iterator for a PMA. It caches the current key in 
   113    112   ** variables nKey/aKey. If the iterator is at EOF, pFile==0.
   114    113   */
   115    114   struct VdbeSorterIter {
................................................................................
   409    408           *pRes = -1;
   410    409           return;
   411    410         }
   412    411       }
   413    412       assert( r2->default_rc==0 );
   414    413     }
   415    414   
   416         -#if 0
   417         -  *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
   418         -#endif
   419         -  *pRes = pSorter->xRecordCompare(nKey1, pKey1, *((u8*)pKey1), 1, r2);
          415  +  *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
   420    416   }
   421    417   
   422    418   /*
   423    419   ** This function is called to compare two iterator keys when merging 
   424    420   ** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
   425    421   ** value to recalculate.
   426    422   */
................................................................................
   488    484     if( !sqlite3TempInMemory(db) ){
   489    485       pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
   490    486       pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
   491    487       mxCache = db->aDb[0].pSchema->cache_size;
   492    488       if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
   493    489       pSorter->mxPmaSize = mxCache * pgsz;
   494    490     }
   495         -  pSorter->xRecordCompare = sqlite3VdbeFindSorterCompare(pCsr->pKeyInfo);
   496    491   
   497    492     return SQLITE_OK;
   498    493   }
   499    494   
   500    495   /*
   501    496   ** Free the list of sorted records starting at pRecord.
   502    497   */

Changes to src/where.c.

  1909   1909   #endif
  1910   1910     assert( pRec!=0 );
  1911   1911     iCol = pRec->nField - 1;
  1912   1912     assert( pIdx->nSample>0 );
  1913   1913     assert( pRec->nField>0 && iCol<pIdx->nSampleCol );
  1914   1914     do{
  1915   1915       iTest = (iMin+i)/2;
  1916         -    res = sqlite3VdbeRecordCompare(aSample[iTest].n, aSample[iTest].p, pRec);
         1916  +    res = sqlite3VdbeRecordCompare(aSample[iTest].n, aSample[iTest].p, pRec, 0);
  1917   1917       if( res<0 ){
  1918   1918         iMin = iTest+1;
  1919   1919       }else{
  1920   1920         i = iTest;
  1921   1921       }
  1922   1922     }while( res && iMin<i );
  1923   1923   
................................................................................
  1924   1924   #ifdef SQLITE_DEBUG
  1925   1925     /* The following assert statements check that the binary search code
  1926   1926     ** above found the right answer. This block serves no purpose other
  1927   1927     ** than to invoke the asserts.  */
  1928   1928     if( res==0 ){
  1929   1929       /* If (res==0) is true, then sample $i must be equal to pRec */
  1930   1930       assert( i<pIdx->nSample );
  1931         -    assert( 0==sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec)
         1931  +    assert( 0==sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec, 0)
  1932   1932            || pParse->db->mallocFailed );
  1933   1933     }else{
  1934   1934       /* Otherwise, pRec must be smaller than sample $i and larger than
  1935   1935       ** sample ($i-1).  */
  1936   1936       assert( i==pIdx->nSample 
  1937         -         || sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec)>0
         1937  +         || sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec, 0)>0
  1938   1938            || pParse->db->mallocFailed );
  1939   1939       assert( i==0
  1940         -         || sqlite3VdbeRecordCompare(aSample[i-1].n, aSample[i-1].p, pRec)<0
         1940  +         || sqlite3VdbeRecordCompare(aSample[i-1].n, aSample[i-1].p, pRec, 0)<0
  1941   1941            || pParse->db->mallocFailed );
  1942   1942     }
  1943   1943   #endif /* ifdef SQLITE_DEBUG */
  1944   1944   
  1945   1945     /* At this point, aSample[i] is the first sample that is greater than
  1946   1946     ** or equal to pVal.  Or if i==pIdx->nSample, then all samples are less
  1947   1947     ** than pVal.  If aSample[i]==pVal, then res==0.