/ Check-in [57089374]
Login
SQLite training in Houston TX on 2019-11-05 (details)
Part of the 2019 Tcl Conference

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Further changes to sqlite3VdbeRecordCompare().
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | experimental
Files: files | file ages | folders
SHA1: 570893740067a7caa952f259fa078cdf67017d71
User & Date: dan 2014-02-27 20:44:18
Context
2014-02-27
20:52
Merge in latest trunk changes. check-in: 8f30b095 user: dan tags: experimental
20:44
Further changes to sqlite3VdbeRecordCompare(). check-in: 57089374 user: dan tags: experimental
2014-02-25
21:01
Attempt to speed up sqlite3VdbeRecordCompare() by various means. This code is in an interim state. check-in: 85206e0b user: dan tags: experimental
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btree.c.

  4543   4543     BtCursor *pCur,          /* The cursor to be moved */
  4544   4544     UnpackedRecord *pIdxKey, /* Unpacked index key */
  4545   4545     i64 intKey,              /* The table key */
  4546   4546     int biasRight,           /* If true, bias the search to the high end */
  4547   4547     int *pRes                /* Write search results here */
  4548   4548   ){
  4549   4549     int rc;
  4550         -  int (*xRecordCompare)(int, const void*, UnpackedRecord*);
         4550  +  RecordCompare xRecordCompare;
  4551   4551   
  4552   4552     assert( cursorHoldsMutex(pCur) );
  4553   4553     assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
  4554   4554     assert( pRes );
  4555   4555     assert( (pIdxKey==0)==(pCur->pKeyInfo==0) );
  4556   4556   
  4557   4557     /* If the cursor is already positioned at the point we are trying
................................................................................
  4567   4567         *pRes = -1;
  4568   4568         return SQLITE_OK;
  4569   4569       }
  4570   4570     }
  4571   4571   
  4572   4572     if( pIdxKey ){
  4573   4573       xRecordCompare = sqlite3VdbeFindCompare(pIdxKey);
  4574         -    assert( pIdxKey->default_rc==1 || pIdxKey->default_rc==0 || pIdxKey->default_rc==-1);
         4574  +    assert( pIdxKey->default_rc==1 
         4575  +         || pIdxKey->default_rc==0 
         4576  +         || pIdxKey->default_rc==-1
         4577  +    );
  4575   4578     }
  4576   4579   
  4577   4580     rc = moveToRoot(pCur);
  4578   4581     if( rc ){
  4579   4582       return rc;
  4580   4583     }
  4581   4584     assert( pCur->pgnoRoot==0 || pCur->apPage[pCur->iPage] );
................................................................................
  4654   4657           */
  4655   4658           nCell = pCell[0];
  4656   4659           if( nCell<=pPage->max1bytePayload ){
  4657   4660             /* This branch runs if the record-size field of the cell is a
  4658   4661             ** single byte varint and the record fits entirely on the main
  4659   4662             ** b-tree page.  */
  4660   4663             testcase( pCell+nCell+1==pPage->aDataEnd );
  4661         -          c = xRecordCompare(nCell, (void*)&pCell[1], pIdxKey);
         4664  +          c = xRecordCompare(nCell, (void*)&pCell[1], pCell[1], 1, pIdxKey);
  4662   4665           }else if( !(pCell[1] & 0x80) 
  4663   4666             && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal
  4664   4667           ){
  4665   4668             /* The record-size field is a 2 byte varint and the record 
  4666   4669             ** fits entirely on the main b-tree page.  */
  4667   4670             testcase( pCell+nCell+2==pPage->aDataEnd );
  4668         -          c = xRecordCompare(nCell, (void*)&pCell[2], pIdxKey);
         4671  +          c = xRecordCompare(nCell, (void*)&pCell[2], pCell[2], 1, pIdxKey);
  4669   4672           }else{
  4670   4673             /* The record flows over onto one or more overflow pages. In
  4671   4674             ** this case the whole cell needs to be parsed, a buffer allocated
  4672   4675             ** and accessPayload() used to retrieve the record into the
  4673   4676             ** buffer before VdbeRecordCompare() can be called. */
  4674   4677             void *pCellKey;
  4675   4678             u8 * const pCellBody = pCell - pPage->childPtrSize;
................................................................................
  4682   4685             }
  4683   4686             pCur->aiIdx[pCur->iPage] = (u16)idx;
  4684   4687             rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0);
  4685   4688             if( rc ){
  4686   4689               sqlite3_free(pCellKey);
  4687   4690               goto moveto_finish;
  4688   4691             }
  4689         -          c = xRecordCompare(nCell, pCellKey, pIdxKey);
         4692  +          c = xRecordCompare(nCell, pCellKey, ((u8*)pCellKey)[0], 1, pIdxKey);
  4690   4693             sqlite3_free(pCellKey);
  4691   4694           }
  4692   4695           if( c<0 ){
  4693   4696             lwr = idx+1;
  4694   4697           }else if( c>0 ){
  4695   4698             upr = idx-1;
  4696   4699           }else{

Changes to src/sqliteInt.h.

  1588   1588   ** into its constituent fields.
  1589   1589   */
  1590   1590   struct UnpackedRecord {
  1591   1591     KeyInfo *pKeyInfo;  /* Collation and sort-order information */
  1592   1592     u16 nField;         /* Number of entries in apMem[] */
  1593   1593     char default_rc;    /* Comparison result if keys are equal */
  1594   1594     Mem *aMem;          /* Values */
         1595  +  int r1;
         1596  +  int r2;
  1595   1597   };
  1596   1598   
  1597   1599   
  1598   1600   /*
  1599   1601   ** Each SQL index is represented in memory by an
  1600   1602   ** instance of the following structure.
  1601   1603   **

Changes to src/vdbe.h.

   210    210     char *sqlite3VdbeExpandSql(Vdbe*, const char*);
   211    211   #endif
   212    212   
   213    213   void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*);
   214    214   int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*);
   215    215   UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **);
   216    216   
   217         -typedef int (*RecordCompare)(int,const void*, UnpackedRecord*);
   218         -RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *);
          217  +typedef int (*RecordCompare)(int,const void*,int,u32,UnpackedRecord*);
          218  +RecordCompare sqlite3VdbeFindCompare(UnpackedRecord*);
          219  +RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo*);
   219    220   
   220    221   #ifndef SQLITE_OMIT_TRIGGER
   221    222   void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *);
   222    223   #endif
   223    224   
   224    225   /* Use SQLITE_ENABLE_COMMENTS to enable generation of extra comments on
   225    226   ** each VDBE opcode.

Changes to src/vdbeaux.c.

  3119   3119       pMem++;
  3120   3120       u++;
  3121   3121     }
  3122   3122     assert( u<=pKeyInfo->nField + 1 );
  3123   3123     p->nField = u;
  3124   3124   }
  3125   3125   
  3126         -static int vdbeRecordCompareString(
  3127         -  int nKey1, const void *pKey1, /* Left key */
  3128         -  UnpackedRecord *pPKey2        /* Right key */
  3129         -){
  3130         -  const u8 *aKey1 = (const u8*)pKey1;
  3131         -  int szHdr;
  3132         -  int serial_type;
  3133         -  int res;
  3134         -
  3135         -  szHdr = aKey1[0];
  3136         -  getVarint32(&aKey1[1], serial_type);
  3137         -
  3138         -  if( serial_type<12 ){
  3139         -    res = -1;      /* (pKey1/nKey1) is a number or a null */
  3140         -  }else if( !(serial_type & 0x01) ){ 
  3141         -    res = +1;      /* (pKey1/nKey1) is a blob */
  3142         -  }else{
  3143         -    int nCmp;
  3144         -    int nStr;
  3145         -    aKey1 = &aKey1[szHdr];
  3146         -
  3147         -    nStr = (serial_type-12) / 2;
  3148         -    if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
  3149         -    nCmp = MIN( pPKey2->aMem[0].n, nStr );
  3150         -    res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp);
  3151         -
  3152         -    if( res==0 ){
  3153         -      res = nStr - pPKey2->aMem[0].n;
  3154         -      if( res==0 ) res = pPKey2->default_rc;
  3155         -    }
  3156         -  }
  3157         -
  3158         -  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
  3159         -       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
  3160         -       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
  3161         -  );
  3162         -  return res;
  3163         -}
  3164         -
  3165         -static int vdbeRecordCompareInt(
         3126  +/*
         3127  +** This function compares the two table rows or index records
         3128  +** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
         3129  +** or positive integer if key1 is less than, equal to or 
         3130  +** greater than key2.  The {nKey1, pKey1} key must be a blob
         3131  +** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
         3132  +** key must be a parsed key such as obtained from
         3133  +** sqlite3VdbeParseRecord.
         3134  +**
         3135  +** Key1 and Key2 do not have to contain the same number of fields.
         3136  +** The key with fewer fields is usually compares less than the 
         3137  +** longer key.  However if the UNPACKED_INCRKEY flags in pPKey2 is set
         3138  +** and the common prefixes are equal, then key1 is less than key2.
         3139  +** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are
         3140  +** equal, then the keys are considered to be equal and
         3141  +** the parts beyond the common prefix are ignored.
         3142  +*/
         3143  +static int vdbeRecordComparePrev(
  3166   3144     int nKey1, const void *pKey1, /* Left key */
  3167   3145     UnpackedRecord *pPKey2        /* Right key */
  3168   3146   ){
  3169         -  const u8 *aKey1 = (const u8*)pKey1;
  3170         -  int szHdr;
  3171         -  int serial_type;
  3172         -  int res;
  3173         -
  3174         -  szHdr = aKey1[0];
  3175         -  getVarint32(&aKey1[1], serial_type);
  3176         -
  3177         -  if( serial_type==0 ){
  3178         -    res = -1;      /* NULL values are smaller than integers */
  3179         -  }else if( serial_type>=12 ){
  3180         -    res = +1;      /* text/blob values are greater */
  3181         -  }else{
  3182         -    Mem mem;
  3183         -    sqlite3VdbeSerialGet(&aKey1[szHdr], serial_type, &mem);
  3184         -    if( mem.flags & MEM_Int ){
  3185         -      i64 v = pPKey2->aMem[0].u.i;
  3186         -      if( v>mem.u.i ){
  3187         -        res = -1;
  3188         -      }else if( v<mem.u.i ){
  3189         -        res = +1;
  3190         -      }else{
  3191         -        res = pPKey2->default_rc;
         3147  +  u32 d1;            /* Offset into aKey[] of next data element */
         3148  +  u32 idx1;          /* Offset into aKey[] of next header element */
         3149  +  u32 szHdr1;        /* Number of bytes in header */
         3150  +  int i = 0;
         3151  +  int rc = 0;
         3152  +  const unsigned char *aKey1 = (const unsigned char *)pKey1;
         3153  +  KeyInfo *pKeyInfo;
         3154  +  Mem mem1;
         3155  +
         3156  +  pKeyInfo = pPKey2->pKeyInfo;
         3157  +  mem1.enc = pKeyInfo->enc;
         3158  +  mem1.db = pKeyInfo->db;
         3159  +  /* mem1.flags = 0;  // Will be initialized by sqlite3VdbeSerialGet() */
         3160  +  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
         3161  +
         3162  +  /* Compilers may complain that mem1.u.i is potentially uninitialized.
         3163  +  ** We could initialize it, as shown here, to silence those complaints.
         3164  +  ** But in fact, mem1.u.i will never actually be used uninitialized, and doing 
         3165  +  ** the unnecessary initialization has a measurable negative performance
         3166  +  ** impact, since this routine is a very high runner.  And so, we choose
         3167  +  ** to ignore the compiler warnings and leave this variable uninitialized.
         3168  +  */
         3169  +  /*  mem1.u.i = 0;  // not needed, here to silence compiler warning */
         3170  +  
         3171  +  idx1 = getVarint32(aKey1, szHdr1);
         3172  +  d1 = szHdr1;
         3173  +  assert( pKeyInfo->nField+pKeyInfo->nXField>=pPKey2->nField || CORRUPT_DB );
         3174  +  assert( pKeyInfo->aSortOrder!=0 );
         3175  +  assert( pKeyInfo->nField>0 );
         3176  +  assert( idx1<=szHdr1 || CORRUPT_DB );
         3177  +  do{
         3178  +    u32 serial_type1;
         3179  +
         3180  +    /* Read the serial types for the next element in each key. */
         3181  +    idx1 += getVarint32( aKey1+idx1, serial_type1 );
         3182  +
         3183  +    /* Verify that there is enough key space remaining to avoid
         3184  +    ** a buffer overread.  The "d1+serial_type1+2" subexpression will
         3185  +    ** always be greater than or equal to the amount of required key space.
         3186  +    ** Use that approximation to avoid the more expensive call to
         3187  +    ** sqlite3VdbeSerialTypeLen() in the common case.
         3188  +    */
         3189  +    if( d1+serial_type1+2>(u32)nKey1
         3190  +     && d1+sqlite3VdbeSerialTypeLen(serial_type1)>(u32)nKey1 
         3191  +    ){
         3192  +      break;
         3193  +    }
         3194  +
         3195  +    /* Extract the values to be compared.
         3196  +    */
         3197  +    d1 += sqlite3VdbeSerialGet(&aKey1[d1], serial_type1, &mem1);
         3198  +
         3199  +    /* Do the comparison
         3200  +    */
         3201  +    rc = sqlite3MemCompare(&mem1, &pPKey2->aMem[i], pKeyInfo->aColl[i]);
         3202  +    if( rc!=0 ){
         3203  +      assert( mem1.zMalloc==0 );  /* See comment below */
         3204  +      if( pKeyInfo->aSortOrder[i] ){
         3205  +        rc = -rc;  /* Invert the result for DESC sort order. */
  3192   3206         }
  3193         -    }else{
  3194         -      double v = (double)pPKey2->aMem[0].u.i;
  3195         -      if( v>mem.r ){
  3196         -        res = -1;
  3197         -      }else if( v<mem.r ){
  3198         -        res = +1;
  3199         -      }else{
  3200         -        res = pPKey2->default_rc;
  3201         -      }
         3207  +      return rc;
  3202   3208       }
  3203         -  }
         3209  +    i++;
         3210  +  }while( idx1<szHdr1 && i<pPKey2->nField );
  3204   3211   
  3205         -  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
  3206         -       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
  3207         -       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
  3208         -  );
  3209         -  return res;
         3212  +  /* No memory allocation is ever used on mem1.  Prove this using
         3213  +  ** the following assert().  If the assert() fails, it indicates a
         3214  +  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
         3215  +  */
         3216  +  assert( mem1.zMalloc==0 );
         3217  +
         3218  +  /* rc==0 here means that one of the keys ran out of fields and
         3219  +  ** all the fields up to that point were equal. Return the the default_rc
         3220  +  ** value.  */
         3221  +  return pPKey2->default_rc;
  3210   3222   }
  3211   3223   
  3212   3224   static int vdbeCompareMemString(
  3213   3225     const Mem *pMem1, 
  3214   3226     const Mem *pMem2, 
  3215   3227     const CollSeq *pColl
  3216   3228   ){
................................................................................
  3328   3340     if( rc==0 ){
  3329   3341       rc = pMem1->n - pMem2->n;
  3330   3342     }
  3331   3343     return rc;
  3332   3344   }
  3333   3345   
  3334   3346   
  3335         -/*
  3336         -** This function compares the two table rows or index records
  3337         -** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
  3338         -** or positive integer if key1 is less than, equal to or 
  3339         -** greater than key2.  The {nKey1, pKey1} key must be a blob
  3340         -** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
  3341         -** key must be a parsed key such as obtained from
  3342         -** sqlite3VdbeParseRecord.
  3343         -**
  3344         -** Key1 and Key2 do not have to contain the same number of fields.
  3345         -** The key with fewer fields is usually compares less than the 
  3346         -** longer key.  However if the UNPACKED_INCRKEY flags in pPKey2 is set
  3347         -** and the common prefixes are equal, then key1 is less than key2.
  3348         -** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are
  3349         -** equal, then the keys are considered to be equal and
  3350         -** the parts beyond the common prefix are ignored.
  3351         -*/
  3352         -static int vdbeRecordComparePrev(
         3347  +static i64 vdbeRecordDecodeInt(u32 serial_type, const u8 *aKey){
         3348  +  switch( serial_type ){
         3349  +    case 1:
         3350  +      return (char)aKey[0];
         3351  +    case 2:
         3352  +      return ((char)aKey[0] << 8) | aKey[1];
         3353  +    case 3:
         3354  +      return ((char)aKey[0] << 16) | (aKey[1] << 8) | aKey[2];
         3355  +    case 4:
         3356  +      return ((char)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3];
         3357  +
         3358  +    case 5: {
         3359  +      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
         3360  +      u32 lsw = (aKey[4] << 8) | aKey[5];
         3361  +      return (i64)( msw << 16 | (u64)lsw );
         3362  +    }
         3363  +
         3364  +    case 6: {
         3365  +      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
         3366  +      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
         3367  +      return (i64)( msw << 32 | (u64)lsw );
         3368  +    }
         3369  +  }
         3370  +
         3371  +  return (serial_type - 8);
         3372  +}
         3373  +
         3374  +static int vdbeRecordCompare(
  3353   3375     int nKey1, const void *pKey1, /* Left key */
  3354         -  UnpackedRecord *pPKey2        /* Right key */
         3376  +  int szHdr1,                   /* Size of record header in bytes */
         3377  +  u32 idx1,                     /* Offset of first type in header */
         3378  +  UnpackedRecord *const pPKey2  /* Right key */
  3355   3379   ){
  3356         -  u32 d1;            /* Offset into aKey[] of next data element */
  3357         -  u32 idx1;          /* Offset into aKey[] of next header element */
  3358         -  u32 szHdr1;        /* Number of bytes in header */
         3380  +  u32 d1 = szHdr1;   /* Offset into aKey[] of next data element */
  3359   3381     int i = 0;
  3360   3382     int rc = 0;
  3361         -  const unsigned char *aKey1 = (const unsigned char *)pKey1;
  3362         -  KeyInfo *pKeyInfo;
  3363         -  Mem mem1;
  3364         -
  3365         -  pKeyInfo = pPKey2->pKeyInfo;
  3366         -  mem1.enc = pKeyInfo->enc;
  3367         -  mem1.db = pKeyInfo->db;
  3368         -  /* mem1.flags = 0;  // Will be initialized by sqlite3VdbeSerialGet() */
  3369         -  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
  3370         -
  3371         -  /* Compilers may complain that mem1.u.i is potentially uninitialized.
  3372         -  ** We could initialize it, as shown here, to silence those complaints.
  3373         -  ** But in fact, mem1.u.i will never actually be used uninitialized, and doing 
  3374         -  ** the unnecessary initialization has a measurable negative performance
  3375         -  ** impact, since this routine is a very high runner.  And so, we choose
  3376         -  ** to ignore the compiler warnings and leave this variable uninitialized.
  3377         -  */
  3378         -  /*  mem1.u.i = 0;  // not needed, here to silence compiler warning */
  3379         -  
  3380         -  idx1 = getVarint32(aKey1, szHdr1);
  3381         -  d1 = szHdr1;
  3382         -  assert( pKeyInfo->nField+pKeyInfo->nXField>=pPKey2->nField || CORRUPT_DB );
  3383         -  assert( pKeyInfo->aSortOrder!=0 );
  3384         -  assert( pKeyInfo->nField>0 );
  3385         -  assert( idx1<=szHdr1 || CORRUPT_DB );
  3386         -  do{
  3387         -    u32 serial_type1;
  3388         -
  3389         -    /* Read the serial types for the next element in each key. */
  3390         -    idx1 += getVarint32( aKey1+idx1, serial_type1 );
  3391         -
  3392         -    /* Verify that there is enough key space remaining to avoid
  3393         -    ** a buffer overread.  The "d1+serial_type1+2" subexpression will
  3394         -    ** always be greater than or equal to the amount of required key space.
  3395         -    ** Use that approximation to avoid the more expensive call to
  3396         -    ** sqlite3VdbeSerialTypeLen() in the common case.
  3397         -    */
  3398         -    if( d1+serial_type1+2>(u32)nKey1
  3399         -     && d1+sqlite3VdbeSerialTypeLen(serial_type1)>(u32)nKey1 
  3400         -    ){
  3401         -      break;
  3402         -    }
  3403         -
  3404         -    /* Extract the values to be compared.
  3405         -    */
  3406         -    d1 += sqlite3VdbeSerialGet(&aKey1[d1], serial_type1, &mem1);
  3407         -
  3408         -    /* Do the comparison
  3409         -    */
  3410         -    rc = sqlite3MemCompare(&mem1, &pPKey2->aMem[i], pKeyInfo->aColl[i]);
  3411         -    if( rc!=0 ){
  3412         -      assert( mem1.zMalloc==0 );  /* See comment below */
  3413         -      if( pKeyInfo->aSortOrder[i] ){
  3414         -        rc = -rc;  /* Invert the result for DESC sort order. */
  3415         -      }
  3416         -      return rc;
  3417         -    }
  3418         -    i++;
  3419         -  }while( idx1<szHdr1 && i<pPKey2->nField );
  3420         -
  3421         -  /* No memory allocation is ever used on mem1.  Prove this using
  3422         -  ** the following assert().  If the assert() fails, it indicates a
  3423         -  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  3424         -  */
  3425         -  assert( mem1.zMalloc==0 );
  3426         -
  3427         -  /* rc==0 here means that one of the keys ran out of fields and
  3428         -  ** all the fields up to that point were equal. Return the the default_rc
  3429         -  ** value.  */
  3430         -  return pPKey2->default_rc;
  3431         -}
  3432         - 
  3433         -
  3434         -int sqlite3VdbeRecordCompare(
  3435         -  int nKey1, const void *pKey1, /* Left key */
  3436         -  UnpackedRecord *pPKey2        /* Right key */
  3437         -){
  3438         -  u32 d1;            /* Offset into aKey[] of next data element */
  3439         -  u32 idx1;          /* Offset into aKey[] of next header element */
  3440         -  u32 szHdr1;        /* Number of bytes in header */
  3441         -  int i = 0;
  3442         -  int rc = 0;
         3383  +  Mem *pRhs = pPKey2->aMem;
  3443   3384     KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
  3444   3385     const unsigned char *aKey1 = (const unsigned char *)pKey1;
  3445   3386     Mem mem1;
  3446   3387   
  3447   3388   #ifdef SQLITE_DEBUG
  3448   3389     int expected = vdbeRecordComparePrev(nKey1, pKey1, pPKey2);
  3449   3390     static int nCall = 0;
  3450   3391     nCall++;
  3451   3392   #endif
         3393  +
         3394  +  /* If idx==0, then the caller has already determined that the first two
         3395  +  ** elements in the keys are equal. Fix the various stack variables so
         3396  +  ** that this routine begins comparing at the second field. */
         3397  +  if( idx1==0 ){
         3398  +    u32 s1;
         3399  +    assert( sqlite3VarintLen(szHdr1)==1 );
         3400  +    idx1 = 1 + getVarint32(&aKey1[1], s1);
         3401  +    d1 += sqlite3VdbeSerialTypeLen(s1);
         3402  +    i = 1;
         3403  +    pRhs++;
         3404  +  }
  3452   3405   
  3453   3406     VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
  3454         -  
  3455         -  idx1 = getVarint32(aKey1, szHdr1);
  3456         -  d1 = szHdr1;
  3457   3407     assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField 
  3458   3408          || CORRUPT_DB );
  3459   3409     assert( pPKey2->pKeyInfo->aSortOrder!=0 );
  3460   3410     assert( pPKey2->pKeyInfo->nField>0 );
  3461   3411     assert( idx1<=szHdr1 || CORRUPT_DB );
  3462   3412     do{
  3463         -    Mem *pRhs = &pPKey2->aMem[i];
  3464   3413       u32 serial_type;
  3465   3414   
  3466   3415       /* RHS is an integer */
  3467   3416       if( pRhs->flags & MEM_Int ){
  3468   3417         serial_type = aKey1[idx1];
  3469   3418         if( serial_type>=12 ){
  3470   3419           rc = +1;
  3471   3420         }else if( serial_type==0 ){
  3472   3421           rc = -1;
  3473         -      }else{
         3422  +      }else if( serial_type==7 ){
         3423  +        double rhs = (double)pRhs->u.i;
  3474   3424           sqlite3VdbeSerialGet(&aKey1[d1], serial_type, &mem1);
  3475         -        if( serial_type==7 ){
  3476         -          double rhs = (double)pRhs->u.i;
  3477         -          if( mem1.r<rhs ){
  3478         -            rc = -1;
  3479         -          }else if( mem1.r>rhs ){
  3480         -            rc = +1;
  3481         -          }
  3482         -        }else{
  3483         -          i64 rhs = pRhs->u.i;
  3484         -          if( mem1.u.i<rhs ){
  3485         -            rc = -1;
  3486         -          }else if( mem1.u.i>rhs ){
  3487         -            rc = +1;
  3488         -          }
         3425  +        if( mem1.r<rhs ){
         3426  +          rc = -1;
         3427  +        }else if( mem1.r>rhs ){
         3428  +          rc = +1;
         3429  +        }
         3430  +      }else{
         3431  +        i64 lhs = vdbeRecordDecodeInt(serial_type, &aKey1[d1]);
         3432  +        i64 rhs = pRhs->u.i;
         3433  +        if( lhs<rhs ){
         3434  +          rc = -1;
         3435  +        }else if( lhs>rhs ){
         3436  +          rc = +1;
  3489   3437           }
  3490   3438         }
  3491   3439       }
  3492   3440   
  3493   3441       /* RHS is real */
  3494   3442       else if( pRhs->flags & MEM_Real ){
  3495   3443         serial_type = aKey1[idx1];
................................................................................
  3575   3523   #endif
  3576   3524         }
  3577   3525         assert( (rc<0 && expected<0) || (rc>0 && expected>0) || CORRUPT_DB );
  3578   3526         return rc;
  3579   3527       }
  3580   3528   
  3581   3529       i++;
         3530  +    pRhs++;
  3582   3531       d1 += sqlite3VdbeSerialTypeLen(serial_type);
  3583   3532       idx1 += sqlite3VarintLen(serial_type);
  3584   3533     }while( idx1<szHdr1 && i<pPKey2->nField && d1<=nKey1 );
  3585   3534   
  3586   3535     /* No memory allocation is ever used on mem1.  Prove this using
  3587   3536     ** the following assert().  If the assert() fails, it indicates a
  3588   3537     ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
................................................................................
  3591   3540   
  3592   3541     /* rc==0 here means that one or both of the keys ran out of fields and
  3593   3542     ** all the fields up to that point were equal. Return the the default_rc
  3594   3543     ** value.  */
  3595   3544     assert( pPKey2->default_rc==expected );
  3596   3545     return pPKey2->default_rc;
  3597   3546   }
         3547  +
         3548  +static int vdbeRecordCompareInt(
         3549  +  int nKey1, const void *pKey1, /* Left key */
         3550  +  int szHdr,
         3551  +  u32 idx1,
         3552  +  UnpackedRecord *pPKey2        /* Right key */
         3553  +){
         3554  +  const u8 *aKey = &((const u8*)pKey1)[szHdr];
         3555  +  int serial_type = ((const u8*)pKey1)[1];
         3556  +  int res;
         3557  +  i64 v = pPKey2->aMem[0].u.i;
         3558  +  i64 lhs;
         3559  +
         3560  +  switch( serial_type ){
         3561  +    case 1:
         3562  +      lhs = (char)(aKey[0]);
         3563  +      break;
         3564  +    case 2:
         3565  +      lhs = 256*(signed char)aKey[0] + aKey[1];
         3566  +      break;
         3567  +    case 3:
         3568  +      lhs = 65536*(char)aKey[0] | (aKey[1]<<8) | aKey[2];
         3569  +      break;
         3570  +    case 4:
         3571  +      lhs = (int)(((u32)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3]);
         3572  +      break;
         3573  +
         3574  +    case 5: {
         3575  +      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
         3576  +      u32 lsw = (aKey[4] << 8) | aKey[5];
         3577  +      lhs = (i64)( msw << 16 | (u64)lsw );
         3578  +      break;
         3579  +    }
         3580  +
         3581  +    case 6: {
         3582  +      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
         3583  +      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
         3584  +      lhs = (i64)( msw << 32 | (u64)lsw );
         3585  +      break;
         3586  +    }
         3587  +
         3588  +    case 8: 
         3589  +      lhs = 0;
         3590  +      break;
         3591  +
         3592  +    case 9:
         3593  +      lhs = 1;
         3594  +      break;
         3595  +
         3596  +    default:
         3597  +      return vdbeRecordCompare(nKey1, pKey1, szHdr, 1, pPKey2);
         3598  +  }
         3599  +
         3600  +  if( v>lhs ){
         3601  +    res = pPKey2->r1;
         3602  +  }else if( v<lhs ){
         3603  +    res = pPKey2->r2;
         3604  +  }else if( pPKey2->nField>1 ){
         3605  +    res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
         3606  +  }else{
         3607  +    res = pPKey2->default_rc;
         3608  +  }
         3609  +
         3610  +  assert( (res==0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)==0)
         3611  +       || (res<0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)<0)
         3612  +       || (res>0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)>0)
         3613  +       || CORRUPT_DB
         3614  +  );
         3615  +  return res;
         3616  +}
         3617  +
         3618  +static int vdbeRecordCompareString(
         3619  +  int nKey1, const void *pKey1, /* Left key */
         3620  +  int szHdr,
         3621  +  u32 idx1,
         3622  +  UnpackedRecord *pPKey2        /* Right key */
         3623  +){
         3624  +  const u8 *aKey1 = (const u8*)pKey1;
         3625  +  int serial_type;
         3626  +  int res;
         3627  +
         3628  +  getVarint32(&aKey1[1], serial_type);
         3629  +
         3630  +  if( serial_type<12 ){
         3631  +    res = pPKey2->r1;      /* (pKey1/nKey1) is a number or a null */
         3632  +  }else if( !(serial_type & 0x01) ){ 
         3633  +    res = pPKey2->r2;      /* (pKey1/nKey1) is a blob */
         3634  +  }else{
         3635  +    int nCmp;
         3636  +    int nStr;
         3637  +    aKey1 = &aKey1[szHdr];
         3638  +
         3639  +    nStr = (serial_type-12) / 2;
         3640  +    if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
         3641  +    nCmp = MIN( pPKey2->aMem[0].n, nStr );
         3642  +    res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp);
         3643  +
         3644  +    if( res==0 ){
         3645  +      res = nStr - pPKey2->aMem[0].n;
         3646  +      if( res==0 ){
         3647  +        if( pPKey2->nField>1 ){
         3648  +          res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
         3649  +        }else{
         3650  +          res = pPKey2->default_rc;
         3651  +        }
         3652  +      }else if( res>0 ){
         3653  +        res = pPKey2->r2;
         3654  +      }else{
         3655  +        res = pPKey2->r1;
         3656  +      }
         3657  +    }else if( res>0 ){
         3658  +      res = pPKey2->r2;
         3659  +    }else{
         3660  +      res = pPKey2->r1;
         3661  +    }
         3662  +  }
         3663  +
         3664  +  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
         3665  +       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
         3666  +       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
         3667  +       || CORRUPT_DB
         3668  +  );
         3669  +  return res;
         3670  +}
         3671  +
         3672  +
         3673  +int vdbeRecordCompareLargeHeader(
         3674  +  int nKey1, const void *pKey1, /* Left key */
         3675  +  int dummy1, u32 dummy2,       /* Unused arguments */
         3676  +  UnpackedRecord *pPKey2        /* Right key */
         3677  +){
         3678  +  int szHdr;
         3679  +  u32 idx1;
         3680  +  idx1 = getVarint32(((u8*)pKey1), szHdr);
         3681  +  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
         3682  +}
  3598   3683   
  3599   3684   RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){
  3600         -  if( p->nField==1 && p->pKeyInfo->aSortOrder[0]==0 ){
         3685  +  if( (p->pKeyInfo->nField + p->pKeyInfo->nXField) > 10 ){
         3686  +    return vdbeRecordCompareLargeHeader;
         3687  +  }else{
  3601   3688       int flags = p->aMem[0].flags;
         3689  +    if( p->pKeyInfo->aSortOrder[0] ){
         3690  +      p->r1 = 1;
         3691  +      p->r2 = -1;
         3692  +    }else{
         3693  +      p->r1 = -1;
         3694  +      p->r2 = 1;
         3695  +    }
  3602   3696       if( (flags & MEM_Int) ){
  3603   3697         return vdbeRecordCompareInt;
  3604         -    }else if( (p->aMem[0].flags&(MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
  3605         -            && p->pKeyInfo->aColl[0]==0 
         3698  +    }
         3699  +    if( (flags & (MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
         3700  +        && p->pKeyInfo->aColl[0]==0 
  3606   3701       ){
  3607   3702         return vdbeRecordCompareString;
  3608   3703       }
  3609   3704     }
  3610         -  return sqlite3VdbeRecordCompare;
         3705  +
         3706  +  return vdbeRecordCompare;
         3707  +}
         3708  +
         3709  +RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo *pKeyInfo){
         3710  +  if( (pKeyInfo->nField + pKeyInfo->nXField) > 10 ){
         3711  +    return vdbeRecordCompareLargeHeader;
         3712  +  }
         3713  +  return vdbeRecordCompare;
  3611   3714   }
  3612   3715   
         3716  +int sqlite3VdbeRecordCompare(
         3717  +  int nKey1, const void *pKey1, /* Left key */
         3718  +  UnpackedRecord *pPKey2        /* Right key */
         3719  +){
         3720  +  int szHdr;
         3721  +  u32 idx1;
         3722  +
         3723  +  idx1 = getVarint32(((u8*)pKey1), szHdr);
         3724  +  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
         3725  +}
  3613   3726   
  3614   3727   /*
  3615   3728   ** pCur points at an index entry created using the OP_MakeRecord opcode.
  3616   3729   ** Read the rowid (the last field in the record) and store it in *rowid.
  3617   3730   ** Return SQLITE_OK if everything works, or an error code otherwise.
  3618   3731   **
  3619   3732   ** pCur might be pointing to text obtained from a corrupt database file.

Changes to src/vdbesort.c.

   101    101     int mnPmaSize;                  /* Minimum PMA size, in bytes */
   102    102     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   103    103     VdbeSorterIter *aIter;          /* Array of iterators to merge */
   104    104     int *aTree;                     /* Current state of incremental merge */
   105    105     sqlite3_file *pTemp1;           /* PMA file 1 */
   106    106     SorterRecord *pRecord;          /* Head of in-memory record list */
   107    107     UnpackedRecord *pUnpacked;      /* Used to unpack keys */
          108  +  RecordCompare xRecordCompare;   /* Record compare function */
   108    109   };
   109    110   
   110    111   /*
   111    112   ** The following type is an iterator for a PMA. It caches the current key in 
   112    113   ** variables nKey/aKey. If the iterator is at EOF, pFile==0.
   113    114   */
   114    115   struct VdbeSorterIter {
................................................................................
   408    409           *pRes = -1;
   409    410           return;
   410    411         }
   411    412       }
   412    413       assert( r2->default_rc==0 );
   413    414     }
   414    415   
          416  +#if 0
   415    417     *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
          418  +#endif
          419  +  *pRes = pSorter->xRecordCompare(nKey1, pKey1, *((u8*)pKey1), 1, r2);
   416    420   }
   417    421   
   418    422   /*
   419    423   ** This function is called to compare two iterator keys when merging 
   420    424   ** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
   421    425   ** value to recalculate.
   422    426   */
................................................................................
   484    488     if( !sqlite3TempInMemory(db) ){
   485    489       pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
   486    490       pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
   487    491       mxCache = db->aDb[0].pSchema->cache_size;
   488    492       if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
   489    493       pSorter->mxPmaSize = mxCache * pgsz;
   490    494     }
          495  +  pSorter->xRecordCompare = sqlite3VdbeFindSorterCompare(pCsr->pKeyInfo);
   491    496   
   492    497     return SQLITE_OK;
   493    498   }
   494    499   
   495    500   /*
   496    501   ** Free the list of sorted records starting at pRecord.
   497    502   */