SQLite

Check-in [5708937400]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Further changes to sqlite3VdbeRecordCompare().
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | experimental
Files: files | file ages | folders
SHA1: 570893740067a7caa952f259fa078cdf67017d71
User & Date: dan 2014-02-27 20:44:18.479
Context
2014-02-27
20:52
Merge in latest trunk changes. (check-in: 8f30b09518 user: dan tags: experimental)
20:44
Further changes to sqlite3VdbeRecordCompare(). (check-in: 5708937400 user: dan tags: experimental)
2014-02-25
21:01
Attempt to speed up sqlite3VdbeRecordCompare() by various means. This code is in an interim state. (check-in: 85206e0bba user: dan tags: experimental)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/btree.c.
4543
4544
4545
4546
4547
4548
4549
4550
4551
4552
4553
4554
4555
4556
4557
  BtCursor *pCur,          /* The cursor to be moved */
  UnpackedRecord *pIdxKey, /* Unpacked index key */
  i64 intKey,              /* The table key */
  int biasRight,           /* If true, bias the search to the high end */
  int *pRes                /* Write search results here */
){
  int rc;
  int (*xRecordCompare)(int, const void*, UnpackedRecord*);

  assert( cursorHoldsMutex(pCur) );
  assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
  assert( pRes );
  assert( (pIdxKey==0)==(pCur->pKeyInfo==0) );

  /* If the cursor is already positioned at the point we are trying







|







4543
4544
4545
4546
4547
4548
4549
4550
4551
4552
4553
4554
4555
4556
4557
  BtCursor *pCur,          /* The cursor to be moved */
  UnpackedRecord *pIdxKey, /* Unpacked index key */
  i64 intKey,              /* The table key */
  int biasRight,           /* If true, bias the search to the high end */
  int *pRes                /* Write search results here */
){
  int rc;
  RecordCompare xRecordCompare;

  assert( cursorHoldsMutex(pCur) );
  assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
  assert( pRes );
  assert( (pIdxKey==0)==(pCur->pKeyInfo==0) );

  /* If the cursor is already positioned at the point we are trying
4567
4568
4569
4570
4571
4572
4573
4574



4575
4576
4577
4578
4579
4580
4581
      *pRes = -1;
      return SQLITE_OK;
    }
  }

  if( pIdxKey ){
    xRecordCompare = sqlite3VdbeFindCompare(pIdxKey);
    assert( pIdxKey->default_rc==1 || pIdxKey->default_rc==0 || pIdxKey->default_rc==-1);



  }

  rc = moveToRoot(pCur);
  if( rc ){
    return rc;
  }
  assert( pCur->pgnoRoot==0 || pCur->apPage[pCur->iPage] );







|
>
>
>







4567
4568
4569
4570
4571
4572
4573
4574
4575
4576
4577
4578
4579
4580
4581
4582
4583
4584
      *pRes = -1;
      return SQLITE_OK;
    }
  }

  if( pIdxKey ){
    xRecordCompare = sqlite3VdbeFindCompare(pIdxKey);
    assert( pIdxKey->default_rc==1 
         || pIdxKey->default_rc==0 
         || pIdxKey->default_rc==-1
    );
  }

  rc = moveToRoot(pCur);
  if( rc ){
    return rc;
  }
  assert( pCur->pgnoRoot==0 || pCur->apPage[pCur->iPage] );
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
        */
        nCell = pCell[0];
        if( nCell<=pPage->max1bytePayload ){
          /* This branch runs if the record-size field of the cell is a
          ** single byte varint and the record fits entirely on the main
          ** b-tree page.  */
          testcase( pCell+nCell+1==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[1], pIdxKey);
        }else if( !(pCell[1] & 0x80) 
          && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal
        ){
          /* The record-size field is a 2 byte varint and the record 
          ** fits entirely on the main b-tree page.  */
          testcase( pCell+nCell+2==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[2], pIdxKey);
        }else{
          /* The record flows over onto one or more overflow pages. In
          ** this case the whole cell needs to be parsed, a buffer allocated
          ** and accessPayload() used to retrieve the record into the
          ** buffer before VdbeRecordCompare() can be called. */
          void *pCellKey;
          u8 * const pCellBody = pCell - pPage->childPtrSize;
          btreeParseCellPtr(pPage, pCellBody, &pCur->info);
          nCell = (int)pCur->info.nKey;
          pCellKey = sqlite3Malloc( nCell );
          if( pCellKey==0 ){
            rc = SQLITE_NOMEM;
            goto moveto_finish;
          }
          pCur->aiIdx[pCur->iPage] = (u16)idx;
          rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0);
          if( rc ){
            sqlite3_free(pCellKey);
            goto moveto_finish;
          }
          c = xRecordCompare(nCell, pCellKey, pIdxKey);
          sqlite3_free(pCellKey);
        }
        if( c<0 ){
          lwr = idx+1;
        }else if( c>0 ){
          upr = idx-1;
        }else{







|






|




















|







4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
        */
        nCell = pCell[0];
        if( nCell<=pPage->max1bytePayload ){
          /* This branch runs if the record-size field of the cell is a
          ** single byte varint and the record fits entirely on the main
          ** b-tree page.  */
          testcase( pCell+nCell+1==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[1], pCell[1], 1, pIdxKey);
        }else if( !(pCell[1] & 0x80) 
          && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal
        ){
          /* The record-size field is a 2 byte varint and the record 
          ** fits entirely on the main b-tree page.  */
          testcase( pCell+nCell+2==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[2], pCell[2], 1, pIdxKey);
        }else{
          /* The record flows over onto one or more overflow pages. In
          ** this case the whole cell needs to be parsed, a buffer allocated
          ** and accessPayload() used to retrieve the record into the
          ** buffer before VdbeRecordCompare() can be called. */
          void *pCellKey;
          u8 * const pCellBody = pCell - pPage->childPtrSize;
          btreeParseCellPtr(pPage, pCellBody, &pCur->info);
          nCell = (int)pCur->info.nKey;
          pCellKey = sqlite3Malloc( nCell );
          if( pCellKey==0 ){
            rc = SQLITE_NOMEM;
            goto moveto_finish;
          }
          pCur->aiIdx[pCur->iPage] = (u16)idx;
          rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0);
          if( rc ){
            sqlite3_free(pCellKey);
            goto moveto_finish;
          }
          c = xRecordCompare(nCell, pCellKey, ((u8*)pCellKey)[0], 1, pIdxKey);
          sqlite3_free(pCellKey);
        }
        if( c<0 ){
          lwr = idx+1;
        }else if( c>0 ){
          upr = idx-1;
        }else{
Changes to src/sqliteInt.h.
1588
1589
1590
1591
1592
1593
1594


1595
1596
1597
1598
1599
1600
1601
** into its constituent fields.
*/
struct UnpackedRecord {
  KeyInfo *pKeyInfo;  /* Collation and sort-order information */
  u16 nField;         /* Number of entries in apMem[] */
  char default_rc;    /* Comparison result if keys are equal */
  Mem *aMem;          /* Values */


};


/*
** Each SQL index is represented in memory by an
** instance of the following structure.
**







>
>







1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
** into its constituent fields.
*/
struct UnpackedRecord {
  KeyInfo *pKeyInfo;  /* Collation and sort-order information */
  u16 nField;         /* Number of entries in apMem[] */
  char default_rc;    /* Comparison result if keys are equal */
  Mem *aMem;          /* Values */
  int r1;
  int r2;
};


/*
** Each SQL index is represented in memory by an
** instance of the following structure.
**
Changes to src/vdbe.h.
210
211
212
213
214
215
216
217
218

219
220
221
222
223
224
225
  char *sqlite3VdbeExpandSql(Vdbe*, const char*);
#endif

void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*);
int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*);
UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **);

typedef int (*RecordCompare)(int,const void*, UnpackedRecord*);
RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *);


#ifndef SQLITE_OMIT_TRIGGER
void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *);
#endif

/* Use SQLITE_ENABLE_COMMENTS to enable generation of extra comments on
** each VDBE opcode.







|
|
>







210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
  char *sqlite3VdbeExpandSql(Vdbe*, const char*);
#endif

void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*);
int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*);
UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **);

typedef int (*RecordCompare)(int,const void*,int,u32,UnpackedRecord*);
RecordCompare sqlite3VdbeFindCompare(UnpackedRecord*);
RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo*);

#ifndef SQLITE_OMIT_TRIGGER
void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *);
#endif

/* Use SQLITE_ENABLE_COMMENTS to enable generation of extra comments on
** each VDBE opcode.
Changes to src/vdbeaux.c.
3119
3120
3121
3122
3123
3124
3125

















3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154



3155
3156
3157

3158
3159
3160
3161
3162



3163



3164
3165
3166
3167
3168
3169
3170

3171
3172
3173
3174

3175
3176

3177



3178
3179
3180
3181
3182
3183
3184
3185
3186
3187

3188
3189
3190
3191



3192
3193

3194
3195
3196
3197
3198
3199
3200

3201

3202


3203





3204
3205
3206
3207
3208



3209
3210
3211
3212
3213
3214
3215
3216
    pMem++;
    u++;
  }
  assert( u<=pKeyInfo->nField + 1 );
  p->nField = u;
}


















static int vdbeRecordCompareString(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){
  const u8 *aKey1 = (const u8*)pKey1;
  int szHdr;
  int serial_type;
  int res;

  szHdr = aKey1[0];
  getVarint32(&aKey1[1], serial_type);

  if( serial_type<12 ){
    res = -1;      /* (pKey1/nKey1) is a number or a null */
  }else if( !(serial_type & 0x01) ){ 
    res = +1;      /* (pKey1/nKey1) is a blob */
  }else{
    int nCmp;
    int nStr;
    aKey1 = &aKey1[szHdr];

    nStr = (serial_type-12) / 2;
    if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
    nCmp = MIN( pPKey2->aMem[0].n, nStr );
    res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp);

    if( res==0 ){
      res = nStr - pPKey2->aMem[0].n;
      if( res==0 ) res = pPKey2->default_rc;



    }
  }


  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
  );
  return res;



}




static int vdbeRecordCompareInt(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){
  const u8 *aKey1 = (const u8*)pKey1;
  int szHdr;

  int serial_type;
  int res;

  szHdr = aKey1[0];

  getVarint32(&aKey1[1], serial_type);


  if( serial_type==0 ){



    res = -1;      /* NULL values are smaller than integers */
  }else if( serial_type>=12 ){
    res = +1;      /* text/blob values are greater */
  }else{
    Mem mem;
    sqlite3VdbeSerialGet(&aKey1[szHdr], serial_type, &mem);
    if( mem.flags & MEM_Int ){
      i64 v = pPKey2->aMem[0].u.i;
      if( v>mem.u.i ){
        res = -1;

      }else if( v<mem.u.i ){
        res = +1;
      }else{
        res = pPKey2->default_rc;



      }
    }else{

      double v = (double)pPKey2->aMem[0].u.i;
      if( v>mem.r ){
        res = -1;
      }else if( v<mem.r ){
        res = +1;
      }else{
        res = pPKey2->default_rc;

      }

    }


  }






  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
  );



  return res;
}

static int vdbeCompareMemString(
  const Mem *pMem1, 
  const Mem *pMem2, 
  const CollSeq *pColl
){







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|



<
<
<
<
|
<
<
|
<
<
<
|
<
<
|
|
|
<
<
|
|

<
|
|
>
>
>
|
<
|
>
|
|
|
<
<
>
>
>
|
>
>
>
|
<
<
<
<
|
|
>
|
<

<
>
|

>
|
>
>
>
|
|
<
<
<
|
<
<
|
|
>
|
<
<
<
>
>
>
|
|
>
|
|
<
|
<
<
|
>

>

>
>
|
>
>
>
>
>

<
<
<
<
>
>
>
|







3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146




3147


3148



3149


3150
3151
3152


3153
3154
3155

3156
3157
3158
3159
3160
3161

3162
3163
3164
3165
3166


3167
3168
3169
3170
3171
3172
3173
3174




3175
3176
3177
3178

3179

3180
3181
3182
3183
3184
3185
3186
3187
3188
3189



3190


3191
3192
3193
3194



3195
3196
3197
3198
3199
3200
3201
3202

3203


3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217




3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
    pMem++;
    u++;
  }
  assert( u<=pKeyInfo->nField + 1 );
  p->nField = u;
}

/*
** This function compares the two table rows or index records
** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
** or positive integer if key1 is less than, equal to or 
** greater than key2.  The {nKey1, pKey1} key must be a blob
** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
** key must be a parsed key such as obtained from
** sqlite3VdbeParseRecord.
**
** Key1 and Key2 do not have to contain the same number of fields.
** The key with fewer fields is usually compares less than the 
** longer key.  However if the UNPACKED_INCRKEY flags in pPKey2 is set
** and the common prefixes are equal, then key1 is less than key2.
** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are
** equal, then the keys are considered to be equal and
** the parts beyond the common prefix are ignored.
*/
static int vdbeRecordComparePrev(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){




  u32 d1;            /* Offset into aKey[] of next data element */


  u32 idx1;          /* Offset into aKey[] of next header element */



  u32 szHdr1;        /* Number of bytes in header */


  int i = 0;
  int rc = 0;
  const unsigned char *aKey1 = (const unsigned char *)pKey1;


  KeyInfo *pKeyInfo;
  Mem mem1;


  pKeyInfo = pPKey2->pKeyInfo;
  mem1.enc = pKeyInfo->enc;
  mem1.db = pKeyInfo->db;
  /* mem1.flags = 0;  // Will be initialized by sqlite3VdbeSerialGet() */
  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */


  /* Compilers may complain that mem1.u.i is potentially uninitialized.
  ** We could initialize it, as shown here, to silence those complaints.
  ** But in fact, mem1.u.i will never actually be used uninitialized, and doing 
  ** the unnecessary initialization has a measurable negative performance
  ** impact, since this routine is a very high runner.  And so, we choose


  ** to ignore the compiler warnings and leave this variable uninitialized.
  */
  /*  mem1.u.i = 0;  // not needed, here to silence compiler warning */
  
  idx1 = getVarint32(aKey1, szHdr1);
  d1 = szHdr1;
  assert( pKeyInfo->nField+pKeyInfo->nXField>=pPKey2->nField || CORRUPT_DB );
  assert( pKeyInfo->aSortOrder!=0 );




  assert( pKeyInfo->nField>0 );
  assert( idx1<=szHdr1 || CORRUPT_DB );
  do{
    u32 serial_type1;



    /* Read the serial types for the next element in each key. */
    idx1 += getVarint32( aKey1+idx1, serial_type1 );

    /* Verify that there is enough key space remaining to avoid
    ** a buffer overread.  The "d1+serial_type1+2" subexpression will
    ** always be greater than or equal to the amount of required key space.
    ** Use that approximation to avoid the more expensive call to
    ** sqlite3VdbeSerialTypeLen() in the common case.
    */
    if( d1+serial_type1+2>(u32)nKey1



     && d1+sqlite3VdbeSerialTypeLen(serial_type1)>(u32)nKey1 


    ){
      break;
    }




    /* Extract the values to be compared.
    */
    d1 += sqlite3VdbeSerialGet(&aKey1[d1], serial_type1, &mem1);

    /* Do the comparison
    */
    rc = sqlite3MemCompare(&mem1, &pPKey2->aMem[i], pKeyInfo->aColl[i]);
    if( rc!=0 ){

      assert( mem1.zMalloc==0 );  /* See comment below */


      if( pKeyInfo->aSortOrder[i] ){
        rc = -rc;  /* Invert the result for DESC sort order. */
      }
      return rc;
    }
    i++;
  }while( idx1<szHdr1 && i<pPKey2->nField );

  /* No memory allocation is ever used on mem1.  Prove this using
  ** the following assert().  If the assert() fails, it indicates a
  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  */
  assert( mem1.zMalloc==0 );





  /* rc==0 here means that one of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */
  return pPKey2->default_rc;
}

static int vdbeCompareMemString(
  const Mem *pMem1, 
  const Mem *pMem2, 
  const CollSeq *pColl
){
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359

3360
3361
3362
3363
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
3374
3375
3376
3377
3378

3379
3380
3381
3382
3383
3384
3385
3386

3387

3388
3389
3390
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412
3413
3414
3415
3416
3417
3418
3419
3420
3421
3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433
3434
3435


3436
3437
3438
3439
3440
3441
3442

3443
3444
3445
3446
3447
3448
3449
3450
3451
3452





3453




3454
3455
3456

3457
3458
3459
3460
3461
3462
3463
3464
3465
3466
3467
3468
3469
3470
3471
3472
3473
3474
3475
3476

3477
3478
3479
3480
3481
3482

3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
  if( rc==0 ){
    rc = pMem1->n - pMem2->n;
  }
  return rc;
}


/*
** This function compares the two table rows or index records
** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
** or positive integer if key1 is less than, equal to or 
** greater than key2.  The {nKey1, pKey1} key must be a blob
** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
** key must be a parsed key such as obtained from
** sqlite3VdbeParseRecord.
**
** Key1 and Key2 do not have to contain the same number of fields.
** The key with fewer fields is usually compares less than the 
** longer key.  However if the UNPACKED_INCRKEY flags in pPKey2 is set
** and the common prefixes are equal, then key1 is less than key2.
** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are
** equal, then the keys are considered to be equal and
** the parts beyond the common prefix are ignored.
*/
static int vdbeRecordComparePrev(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){
  u32 d1;            /* Offset into aKey[] of next data element */
  u32 idx1;          /* Offset into aKey[] of next header element */
  u32 szHdr1;        /* Number of bytes in header */
  int i = 0;

  int rc = 0;
  const unsigned char *aKey1 = (const unsigned char *)pKey1;
  KeyInfo *pKeyInfo;
  Mem mem1;

  pKeyInfo = pPKey2->pKeyInfo;
  mem1.enc = pKeyInfo->enc;
  mem1.db = pKeyInfo->db;
  /* mem1.flags = 0;  // Will be initialized by sqlite3VdbeSerialGet() */
  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */

  /* Compilers may complain that mem1.u.i is potentially uninitialized.
  ** We could initialize it, as shown here, to silence those complaints.
  ** But in fact, mem1.u.i will never actually be used uninitialized, and doing 
  ** the unnecessary initialization has a measurable negative performance
  ** impact, since this routine is a very high runner.  And so, we choose
  ** to ignore the compiler warnings and leave this variable uninitialized.
  */
  /*  mem1.u.i = 0;  // not needed, here to silence compiler warning */

  
  idx1 = getVarint32(aKey1, szHdr1);
  d1 = szHdr1;
  assert( pKeyInfo->nField+pKeyInfo->nXField>=pPKey2->nField || CORRUPT_DB );
  assert( pKeyInfo->aSortOrder!=0 );
  assert( pKeyInfo->nField>0 );
  assert( idx1<=szHdr1 || CORRUPT_DB );
  do{

    u32 serial_type1;


    /* Read the serial types for the next element in each key. */
    idx1 += getVarint32( aKey1+idx1, serial_type1 );

    /* Verify that there is enough key space remaining to avoid
    ** a buffer overread.  The "d1+serial_type1+2" subexpression will
    ** always be greater than or equal to the amount of required key space.
    ** Use that approximation to avoid the more expensive call to
    ** sqlite3VdbeSerialTypeLen() in the common case.
    */
    if( d1+serial_type1+2>(u32)nKey1
     && d1+sqlite3VdbeSerialTypeLen(serial_type1)>(u32)nKey1 
    ){
      break;
    }

    /* Extract the values to be compared.
    */
    d1 += sqlite3VdbeSerialGet(&aKey1[d1], serial_type1, &mem1);

    /* Do the comparison
    */
    rc = sqlite3MemCompare(&mem1, &pPKey2->aMem[i], pKeyInfo->aColl[i]);
    if( rc!=0 ){
      assert( mem1.zMalloc==0 );  /* See comment below */
      if( pKeyInfo->aSortOrder[i] ){
        rc = -rc;  /* Invert the result for DESC sort order. */
      }
      return rc;
    }
    i++;
  }while( idx1<szHdr1 && i<pPKey2->nField );

  /* No memory allocation is ever used on mem1.  Prove this using
  ** the following assert().  If the assert() fails, it indicates a
  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  */
  assert( mem1.zMalloc==0 );

  /* rc==0 here means that one of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */
  return pPKey2->default_rc;
}
 

int sqlite3VdbeRecordCompare(
  int nKey1, const void *pKey1, /* Left key */


  UnpackedRecord *pPKey2        /* Right key */
){
  u32 d1;            /* Offset into aKey[] of next data element */
  u32 idx1;          /* Offset into aKey[] of next header element */
  u32 szHdr1;        /* Number of bytes in header */
  int i = 0;
  int rc = 0;

  KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
  const unsigned char *aKey1 = (const unsigned char *)pKey1;
  Mem mem1;

#ifdef SQLITE_DEBUG
  int expected = vdbeRecordComparePrev(nKey1, pKey1, pPKey2);
  static int nCall = 0;
  nCall++;
#endif






  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */




  
  idx1 = getVarint32(aKey1, szHdr1);
  d1 = szHdr1;

  assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField 
       || CORRUPT_DB );
  assert( pPKey2->pKeyInfo->aSortOrder!=0 );
  assert( pPKey2->pKeyInfo->nField>0 );
  assert( idx1<=szHdr1 || CORRUPT_DB );
  do{
    Mem *pRhs = &pPKey2->aMem[i];
    u32 serial_type;

    /* RHS is an integer */
    if( pRhs->flags & MEM_Int ){
      serial_type = aKey1[idx1];
      if( serial_type>=12 ){
        rc = +1;
      }else if( serial_type==0 ){
        rc = -1;
      }else{
        sqlite3VdbeSerialGet(&aKey1[d1], serial_type, &mem1);
        if( serial_type==7 ){
          double rhs = (double)pRhs->u.i;

          if( mem1.r<rhs ){
            rc = -1;
          }else if( mem1.r>rhs ){
            rc = +1;
          }
        }else{

          i64 rhs = pRhs->u.i;
          if( mem1.u.i<rhs ){
            rc = -1;
          }else if( mem1.u.i>rhs ){
            rc = +1;
          }
        }
      }
    }

    /* RHS is real */
    else if( pRhs->flags & MEM_Real ){
      serial_type = aKey1[idx1];







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
|
<
<
<
|
>
|
<
|
|
|
<
<
<
<
<
|
<
<
<
<
<
<
<
<
>
|
<
|
<
<
<
<
<
>
|
>
|
<
<

<
<
<
<
|
<
<
<
<
<
<
|
<
<
<
|
<
<
<
<
<
<
<
<
|

<
<
|
<
<
<
<
<

<
<
<
|

|
<
|

>
>
|

|
<
<


>










>
>
>
>
>
|
>
>
>
>
|
|
<
>






<









<
<
|
|
>
|
|
|
|
|
|
>
|
|
|
|
|
<







3340
3341
3342
3343
3344
3345
3346

















3347


3348



3349
3350
3351

3352
3353
3354





3355








3356
3357

3358





3359
3360
3361
3362


3363




3364






3365



3366








3367
3368


3369





3370



3371
3372
3373

3374
3375
3376
3377
3378
3379
3380


3381
3382
3383
3384
3385
3386
3387
3388
3389
3390
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403
3404
3405

3406
3407
3408
3409
3410
3411
3412

3413
3414
3415
3416
3417
3418
3419
3420
3421


3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433
3434
3435
3436

3437
3438
3439
3440
3441
3442
3443
  if( rc==0 ){
    rc = pMem1->n - pMem2->n;
  }
  return rc;
}



















static i64 vdbeRecordDecodeInt(u32 serial_type, const u8 *aKey){


  switch( serial_type ){



    case 1:
      return (char)aKey[0];
    case 2:

      return ((char)aKey[0] << 8) | aKey[1];
    case 3:
      return ((char)aKey[0] << 16) | (aKey[1] << 8) | aKey[2];





    case 4:








      return ((char)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3];


    case 5: {





      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = (aKey[4] << 8) | aKey[5];
      return (i64)( msw << 16 | (u64)lsw );
    }







    case 6: {






      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];



      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];








      return (i64)( msw << 32 | (u64)lsw );
    }


  }









  return (serial_type - 8);
}


static int vdbeRecordCompare(
  int nKey1, const void *pKey1, /* Left key */
  int szHdr1,                   /* Size of record header in bytes */
  u32 idx1,                     /* Offset of first type in header */
  UnpackedRecord *const pPKey2  /* Right key */
){
  u32 d1 = szHdr1;   /* Offset into aKey[] of next data element */


  int i = 0;
  int rc = 0;
  Mem *pRhs = pPKey2->aMem;
  KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
  const unsigned char *aKey1 = (const unsigned char *)pKey1;
  Mem mem1;

#ifdef SQLITE_DEBUG
  int expected = vdbeRecordComparePrev(nKey1, pKey1, pPKey2);
  static int nCall = 0;
  nCall++;
#endif

  /* If idx==0, then the caller has already determined that the first two
  ** elements in the keys are equal. Fix the various stack variables so
  ** that this routine begins comparing at the second field. */
  if( idx1==0 ){
    u32 s1;
    assert( sqlite3VarintLen(szHdr1)==1 );
    idx1 = 1 + getVarint32(&aKey1[1], s1);
    d1 += sqlite3VdbeSerialTypeLen(s1);
    i = 1;
    pRhs++;
  }


  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
  assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField 
       || CORRUPT_DB );
  assert( pPKey2->pKeyInfo->aSortOrder!=0 );
  assert( pPKey2->pKeyInfo->nField>0 );
  assert( idx1<=szHdr1 || CORRUPT_DB );
  do{

    u32 serial_type;

    /* RHS is an integer */
    if( pRhs->flags & MEM_Int ){
      serial_type = aKey1[idx1];
      if( serial_type>=12 ){
        rc = +1;
      }else if( serial_type==0 ){
        rc = -1;


      }else if( serial_type==7 ){
        double rhs = (double)pRhs->u.i;
        sqlite3VdbeSerialGet(&aKey1[d1], serial_type, &mem1);
        if( mem1.r<rhs ){
          rc = -1;
        }else if( mem1.r>rhs ){
          rc = +1;
        }
      }else{
        i64 lhs = vdbeRecordDecodeInt(serial_type, &aKey1[d1]);
        i64 rhs = pRhs->u.i;
        if( lhs<rhs ){
          rc = -1;
        }else if( lhs>rhs ){
          rc = +1;

        }
      }
    }

    /* RHS is real */
    else if( pRhs->flags & MEM_Real ){
      serial_type = aKey1[idx1];
3575
3576
3577
3578
3579
3580
3581

3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598








































































































































3599
3600


3601







3602
3603

3604
3605
3606
3607
3608
3609











3610





3611


3612
3613
3614
3615
3616
3617
3618
3619
#endif
      }
      assert( (rc<0 && expected<0) || (rc>0 && expected>0) || CORRUPT_DB );
      return rc;
    }

    i++;

    d1 += sqlite3VdbeSerialTypeLen(serial_type);
    idx1 += sqlite3VarintLen(serial_type);
  }while( idx1<szHdr1 && i<pPKey2->nField && d1<=nKey1 );

  /* No memory allocation is ever used on mem1.  Prove this using
  ** the following assert().  If the assert() fails, it indicates a
  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  */
  assert( mem1.zMalloc==0 );

  /* rc==0 here means that one or both of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */
  assert( pPKey2->default_rc==expected );
  return pPKey2->default_rc;
}









































































































































RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){
  if( p->nField==1 && p->pKeyInfo->aSortOrder[0]==0 ){


    int flags = p->aMem[0].flags;







    if( (flags & MEM_Int) ){
      return vdbeRecordCompareInt;

    }else if( (p->aMem[0].flags&(MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
            && p->pKeyInfo->aColl[0]==0 
    ){
      return vdbeRecordCompareString;
    }
  }











  return sqlite3VdbeRecordCompare;





}




/*
** pCur points at an index entry created using the OP_MakeRecord opcode.
** Read the rowid (the last field in the record) and store it in *rowid.
** Return SQLITE_OK if everything works, or an error code otherwise.
**
** pCur might be pointing to text obtained from a corrupt database file.







>

















>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

|
>
>

>
>
>
>
>
>
>


>
|
|




>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
|
>
>
|







3523
3524
3525
3526
3527
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
3549
3550
3551
3552
3553
3554
3555
3556
3557
3558
3559
3560
3561
3562
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
3712
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
#endif
      }
      assert( (rc<0 && expected<0) || (rc>0 && expected>0) || CORRUPT_DB );
      return rc;
    }

    i++;
    pRhs++;
    d1 += sqlite3VdbeSerialTypeLen(serial_type);
    idx1 += sqlite3VarintLen(serial_type);
  }while( idx1<szHdr1 && i<pPKey2->nField && d1<=nKey1 );

  /* No memory allocation is ever used on mem1.  Prove this using
  ** the following assert().  If the assert() fails, it indicates a
  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  */
  assert( mem1.zMalloc==0 );

  /* rc==0 here means that one or both of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */
  assert( pPKey2->default_rc==expected );
  return pPKey2->default_rc;
}

static int vdbeRecordCompareInt(
  int nKey1, const void *pKey1, /* Left key */
  int szHdr,
  u32 idx1,
  UnpackedRecord *pPKey2        /* Right key */
){
  const u8 *aKey = &((const u8*)pKey1)[szHdr];
  int serial_type = ((const u8*)pKey1)[1];
  int res;
  i64 v = pPKey2->aMem[0].u.i;
  i64 lhs;

  switch( serial_type ){
    case 1:
      lhs = (char)(aKey[0]);
      break;
    case 2:
      lhs = 256*(signed char)aKey[0] + aKey[1];
      break;
    case 3:
      lhs = 65536*(char)aKey[0] | (aKey[1]<<8) | aKey[2];
      break;
    case 4:
      lhs = (int)(((u32)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3]);
      break;

    case 5: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = (aKey[4] << 8) | aKey[5];
      lhs = (i64)( msw << 16 | (u64)lsw );
      break;
    }

    case 6: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
      lhs = (i64)( msw << 32 | (u64)lsw );
      break;
    }

    case 8: 
      lhs = 0;
      break;

    case 9:
      lhs = 1;
      break;

    default:
      return vdbeRecordCompare(nKey1, pKey1, szHdr, 1, pPKey2);
  }

  if( v>lhs ){
    res = pPKey2->r1;
  }else if( v<lhs ){
    res = pPKey2->r2;
  }else if( pPKey2->nField>1 ){
    res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
  }else{
    res = pPKey2->default_rc;
  }

  assert( (res==0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)==0)
       || (res<0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)<0)
       || (res>0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)>0)
       || CORRUPT_DB
  );
  return res;
}

static int vdbeRecordCompareString(
  int nKey1, const void *pKey1, /* Left key */
  int szHdr,
  u32 idx1,
  UnpackedRecord *pPKey2        /* Right key */
){
  const u8 *aKey1 = (const u8*)pKey1;
  int serial_type;
  int res;

  getVarint32(&aKey1[1], serial_type);

  if( serial_type<12 ){
    res = pPKey2->r1;      /* (pKey1/nKey1) is a number or a null */
  }else if( !(serial_type & 0x01) ){ 
    res = pPKey2->r2;      /* (pKey1/nKey1) is a blob */
  }else{
    int nCmp;
    int nStr;
    aKey1 = &aKey1[szHdr];

    nStr = (serial_type-12) / 2;
    if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
    nCmp = MIN( pPKey2->aMem[0].n, nStr );
    res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp);

    if( res==0 ){
      res = nStr - pPKey2->aMem[0].n;
      if( res==0 ){
        if( pPKey2->nField>1 ){
          res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
        }else{
          res = pPKey2->default_rc;
        }
      }else if( res>0 ){
        res = pPKey2->r2;
      }else{
        res = pPKey2->r1;
      }
    }else if( res>0 ){
      res = pPKey2->r2;
    }else{
      res = pPKey2->r1;
    }
  }

  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
       || CORRUPT_DB
  );
  return res;
}


int vdbeRecordCompareLargeHeader(
  int nKey1, const void *pKey1, /* Left key */
  int dummy1, u32 dummy2,       /* Unused arguments */
  UnpackedRecord *pPKey2        /* Right key */
){
  int szHdr;
  u32 idx1;
  idx1 = getVarint32(((u8*)pKey1), szHdr);
  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
}

RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){
  if( (p->pKeyInfo->nField + p->pKeyInfo->nXField) > 10 ){
    return vdbeRecordCompareLargeHeader;
  }else{
    int flags = p->aMem[0].flags;
    if( p->pKeyInfo->aSortOrder[0] ){
      p->r1 = 1;
      p->r2 = -1;
    }else{
      p->r1 = -1;
      p->r2 = 1;
    }
    if( (flags & MEM_Int) ){
      return vdbeRecordCompareInt;
    }
    if( (flags & (MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
        && p->pKeyInfo->aColl[0]==0 
    ){
      return vdbeRecordCompareString;
    }
  }

  return vdbeRecordCompare;
}

RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo *pKeyInfo){
  if( (pKeyInfo->nField + pKeyInfo->nXField) > 10 ){
    return vdbeRecordCompareLargeHeader;
  }
  return vdbeRecordCompare;
}

int sqlite3VdbeRecordCompare(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){
  int szHdr;
  u32 idx1;

  idx1 = getVarint32(((u8*)pKey1), szHdr);
  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
}

/*
** pCur points at an index entry created using the OP_MakeRecord opcode.
** Read the rowid (the last field in the record) and store it in *rowid.
** Return SQLITE_OK if everything works, or an error code otherwise.
**
** pCur might be pointing to text obtained from a corrupt database file.
Changes to src/vdbesort.c.
101
102
103
104
105
106
107

108
109
110
111
112
113
114
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  VdbeSorterIter *aIter;          /* Array of iterators to merge */
  int *aTree;                     /* Current state of incremental merge */
  sqlite3_file *pTemp1;           /* PMA file 1 */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  UnpackedRecord *pUnpacked;      /* Used to unpack keys */

};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {







>







101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  VdbeSorterIter *aIter;          /* Array of iterators to merge */
  int *aTree;                     /* Current state of incremental merge */
  sqlite3_file *pTemp1;           /* PMA file 1 */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  UnpackedRecord *pUnpacked;      /* Used to unpack keys */
  RecordCompare xRecordCompare;   /* Record compare function */
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {
408
409
410
411
412
413
414

415


416
417
418
419
420
421
422
        *pRes = -1;
        return;
      }
    }
    assert( r2->default_rc==0 );
  }


  *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);


}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
** value to recalculate.
*/







>

>
>







409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
        *pRes = -1;
        return;
      }
    }
    assert( r2->default_rc==0 );
  }

#if 0
  *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
#endif
  *pRes = pSorter->xRecordCompare(nKey1, pKey1, *((u8*)pKey1), 1, r2);
}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
** value to recalculate.
*/
484
485
486
487
488
489
490

491
492
493
494
495
496
497
  if( !sqlite3TempInMemory(db) ){
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
    pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
    mxCache = db->aDb[0].pSchema->cache_size;
    if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
    pSorter->mxPmaSize = mxCache * pgsz;
  }


  return SQLITE_OK;
}

/*
** Free the list of sorted records starting at pRecord.
*/







>







488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
  if( !sqlite3TempInMemory(db) ){
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
    pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
    mxCache = db->aDb[0].pSchema->cache_size;
    if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
    pSorter->mxPmaSize = mxCache * pgsz;
  }
  pSorter->xRecordCompare = sqlite3VdbeFindSorterCompare(pCsr->pKeyInfo);

  return SQLITE_OK;
}

/*
** Free the list of sorted records starting at pRecord.
*/