/ Check-in [3861e853]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove the vdbeRecordCompareLargeHeader function. Fix some other details.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | experimental
Files: files | file ages | folders
SHA1:3861e853105cb8da344c7eebd2e455622b26395e
User & Date: dan 2014-03-01 19:44:56
Context
2014-03-01
19:45
Merge trunk changes. check-in: 9c1747b5 user: dan tags: experimental
19:44
Remove the vdbeRecordCompareLargeHeader function. Fix some other details. check-in: 3861e853 user: dan tags: experimental
2014-02-28
18:39
Update some test cases that deal with corrupt databases. check-in: 3a09f560 user: dan tags: experimental
Changes
Hide Diffs Unified Diffs Show Whitespace Changes Patch

Changes to src/btree.c.

4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
....
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
        */
        nCell = pCell[0];
        if( nCell<=pPage->max1bytePayload ){
          /* This branch runs if the record-size field of the cell is a
          ** single byte varint and the record fits entirely on the main
          ** b-tree page.  */
          testcase( pCell+nCell+1==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[1], pCell[1], 1, pIdxKey);
        }else if( !(pCell[1] & 0x80) 
          && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal
        ){
          /* The record-size field is a 2 byte varint and the record 
          ** fits entirely on the main b-tree page.  */
          testcase( pCell+nCell+2==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[2], pCell[2], 1, pIdxKey);
        }else{
          /* The record flows over onto one or more overflow pages. In
          ** this case the whole cell needs to be parsed, a buffer allocated
          ** and accessPayload() used to retrieve the record into the
          ** buffer before VdbeRecordCompare() can be called. */
          void *pCellKey;
          u8 * const pCellBody = pCell - pPage->childPtrSize;
................................................................................
          }
          pCur->aiIdx[pCur->iPage] = (u16)idx;
          rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0);
          if( rc ){
            sqlite3_free(pCellKey);
            goto moveto_finish;
          }
          c = xRecordCompare(nCell, pCellKey, ((u8*)pCellKey)[0], 1, pIdxKey);
          sqlite3_free(pCellKey);
        }
        if( c<0 ){
          lwr = idx+1;
        }else if( c>0 ){
          upr = idx-1;
        }else{







|






|







 







|







4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
....
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
        */
        nCell = pCell[0];
        if( nCell<=pPage->max1bytePayload ){
          /* This branch runs if the record-size field of the cell is a
          ** single byte varint and the record fits entirely on the main
          ** b-tree page.  */
          testcase( pCell+nCell+1==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[1], pIdxKey, 0);
        }else if( !(pCell[1] & 0x80) 
          && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal
        ){
          /* The record-size field is a 2 byte varint and the record 
          ** fits entirely on the main b-tree page.  */
          testcase( pCell+nCell+2==pPage->aDataEnd );
          c = xRecordCompare(nCell, (void*)&pCell[2], pIdxKey, 0);
        }else{
          /* The record flows over onto one or more overflow pages. In
          ** this case the whole cell needs to be parsed, a buffer allocated
          ** and accessPayload() used to retrieve the record into the
          ** buffer before VdbeRecordCompare() can be called. */
          void *pCellKey;
          u8 * const pCellBody = pCell - pPage->childPtrSize;
................................................................................
          }
          pCur->aiIdx[pCur->iPage] = (u16)idx;
          rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0);
          if( rc ){
            sqlite3_free(pCellKey);
            goto moveto_finish;
          }
          c = xRecordCompare(nCell, pCellKey, pIdxKey, 0);
          sqlite3_free(pCellKey);
        }
        if( c<0 ){
          lwr = idx+1;
        }else if( c>0 ){
          upr = idx-1;
        }else{

Changes to src/sqliteInt.h.

1582
1583
1584
1585
1586
1587
1588



1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
** Records are used to store the content of a table row and to store
** the key of an index.  A blob encoding of a record is created by
** the OP_MakeRecord opcode of the VDBE and is disassembled by the
** OP_Column opcode.
**
** This structure holds a record that has already been disassembled
** into its constituent fields.



*/
struct UnpackedRecord {
  KeyInfo *pKeyInfo;  /* Collation and sort-order information */
  u16 nField;         /* Number of entries in apMem[] */
  char default_rc;    /* Comparison result if keys are equal */
  Mem *aMem;          /* Values */
  int r1;
  int r2;
};


/*
** Each SQL index is represented in memory by an
** instance of the following structure.
**







>
>
>






|
|







1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
** Records are used to store the content of a table row and to store
** the key of an index.  A blob encoding of a record is created by
** the OP_MakeRecord opcode of the VDBE and is disassembled by the
** OP_Column opcode.
**
** This structure holds a record that has already been disassembled
** into its constituent fields.
**
** The r1 and r2 member variables are only used by the optimized comparison
** functions vdbeRecordCompareInt() and vdbeRecordCompareString().
*/
struct UnpackedRecord {
  KeyInfo *pKeyInfo;  /* Collation and sort-order information */
  u16 nField;         /* Number of entries in apMem[] */
  char default_rc;    /* Comparison result if keys are equal */
  Mem *aMem;          /* Values */
  int r1;             /* Value to return if (lhs > rhs) */
  int r2;             /* Value to return if (rhs < lhs) */
};


/*
** Each SQL index is represented in memory by an
** instance of the following structure.
**

Changes to src/vdbe.h.

207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
sqlite3_value *sqlite3VdbeGetBoundValue(Vdbe*, int, u8);
void sqlite3VdbeSetVarmask(Vdbe*, int);
#ifndef SQLITE_OMIT_TRACE
  char *sqlite3VdbeExpandSql(Vdbe*, const char*);
#endif

void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*);
int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*);
UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **);

typedef int (*RecordCompare)(int,const void*,int,u32,UnpackedRecord*);
RecordCompare sqlite3VdbeFindCompare(UnpackedRecord*);
RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo*);

#ifndef SQLITE_OMIT_TRIGGER
void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *);
#endif

/* Use SQLITE_ENABLE_COMMENTS to enable generation of extra comments on
** each VDBE opcode.







|


|

<







207
208
209
210
211
212
213
214
215
216
217
218

219
220
221
222
223
224
225
sqlite3_value *sqlite3VdbeGetBoundValue(Vdbe*, int, u8);
void sqlite3VdbeSetVarmask(Vdbe*, int);
#ifndef SQLITE_OMIT_TRACE
  char *sqlite3VdbeExpandSql(Vdbe*, const char*);
#endif

void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*);
int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*,int);
UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **);

typedef int (*RecordCompare)(int,const void*,UnpackedRecord*,int);
RecordCompare sqlite3VdbeFindCompare(UnpackedRecord*);


#ifndef SQLITE_OMIT_TRIGGER
void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *);
#endif

/* Use SQLITE_ENABLE_COMMENTS to enable generation of extra comments on
** each VDBE opcode.

Changes to src/vdbeaux.c.

3119
3120
3121
3122
3123
3124
3125

3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
....
3216
3217
3218
3219
3220
3221
3222

3223






3224
3225
3226
3227
3228
3229
3230
....
3340
3341
3342
3343
3344
3345
3346







3347

3348

3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
















3374
3375
3376
3377
3378

3379
3380
3381



3382
3383
3384
3385
3386
3387
3388
3389
3390
3391
3392
3393
3394
3395
3396
3397

3398
3399
3400

3401
3402
3403




3404
3405
3406
3407
3408
3409
3410
....
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525





3526
3527
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543

3544

3545
3546
3547






3548
3549
3550
3551
3552

3553
3554
3555
3556
3557
3558
3559
3560
3561

3562
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631






3632
3633
3634
3635
3636

3637
3638
3639
3640
3641

3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
....
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
....
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
....
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
    pMem++;
    u++;
  }
  assert( u<=pKeyInfo->nField + 1 );
  p->nField = u;
}


/*
** This function compares the two table rows or index records
** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
** or positive integer if key1 is less than, equal to or 
** greater than key2.  The {nKey1, pKey1} key must be a blob
** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
** key must be a parsed key such as obtained from
** sqlite3VdbeParseRecord.
**
** Key1 and Key2 do not have to contain the same number of fields.
** The key with fewer fields is usually compares less than the 
** longer key.  However if the UNPACKED_INCRKEY flags in pPKey2 is set
** and the common prefixes are equal, then key1 is less than key2.
** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are
** equal, then the keys are considered to be equal and
** the parts beyond the common prefix are ignored.
*/
static int vdbeRecordComparePrev(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){
  u32 d1;            /* Offset into aKey[] of next data element */
  u32 idx1;          /* Offset into aKey[] of next header element */
  u32 szHdr1;        /* Number of bytes in header */
  int i = 0;
................................................................................
  assert( mem1.zMalloc==0 );

  /* rc==0 here means that one of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */
  return pPKey2->default_rc;
}








static int vdbeCompareMemString(
  const Mem *pMem1, 
  const Mem *pMem2, 
  const CollSeq *pColl
){
  if( pMem1->enc==pColl->enc ){
    /* The strings are already in the correct encoding.  Call the
................................................................................
  if( rc==0 ){
    rc = pMem1->n - pMem2->n;
  }
  return rc;
}









static i64 vdbeRecordDecodeInt(u32 serial_type, const u8 *aKey){

  switch( serial_type ){

    case 1:
      return (char)aKey[0];
    case 2:
      return ((char)aKey[0] << 8) | aKey[1];
    case 3:
      return ((char)aKey[0] << 16) | (aKey[1] << 8) | aKey[2];
    case 4:
      return ((char)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3];

    case 5: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = (aKey[4] << 8) | aKey[5];
      return (i64)( msw << 16 | (u64)lsw );
    }

    case 6: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
      return (i64)( msw << 32 | (u64)lsw );
    }
  }

  return (serial_type - 8);
}

















static int vdbeRecordCompare(
  int nKey1, const void *pKey1, /* Left key */
  int szHdr1,                   /* Size of record header in bytes */
  u32 idx1,                     /* Offset of first type in header */
  UnpackedRecord *const pPKey2  /* Right key */

){
  u32 d1 = szHdr1;   /* Offset into aKey[] of next data element */
  int i = 0;



  int rc = 0;
  Mem *pRhs = pPKey2->aMem;
  KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
  const unsigned char *aKey1 = (const unsigned char *)pKey1;
  Mem mem1;

#ifdef SQLITE_DEBUG
  int expected = vdbeRecordComparePrev(nKey1, pKey1, pPKey2);
  static int nCall = 0;
  nCall++;
#endif

  /* If idx==0, then the caller has already determined that the first two
  ** elements in the keys are equal. Fix the various stack variables so
  ** that this routine begins comparing at the second field. */
  if( idx1==0 ){

    u32 s1;
    assert( sqlite3VarintLen(szHdr1)==1 );
    idx1 = 1 + getVarint32(&aKey1[1], s1);

    d1 += sqlite3VdbeSerialTypeLen(s1);
    i = 1;
    pRhs++;




  }

  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
  assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField 
       || CORRUPT_DB );
  assert( pPKey2->pKeyInfo->aSortOrder!=0 );
  assert( pPKey2->pKeyInfo->nField>0 );
................................................................................
    /* RHS is null */
    else{
      serial_type = aKey1[idx1];
      rc = (serial_type!=0);
    }

    if( rc!=0 ){
      assert( mem1.zMalloc==0 );  /* See comment below */
      if( pKeyInfo->aSortOrder[i] ){
        rc = -rc;
#if 0
        assert( (rc>0 && (rc^(int)0x80000000)<0) 
             || (rc<0 && (rc^(int)0x80000000)>0) );
        assert( sizeof(int)==4 );
        rc ^= (int)0x80000000;    /* similar in spirit to: "rc = -rc;" */
        assert( rc!=0 );
#endif
      }
      assert( (rc<0 && expected<0) || (rc>0 && expected>0) || CORRUPT_DB );





      return rc;
    }

    i++;
    pRhs++;
    d1 += sqlite3VdbeSerialTypeLen(serial_type);
    idx1 += sqlite3VarintLen(serial_type);
  }while( idx1<szHdr1 && i<pPKey2->nField && d1<=nKey1 );

  /* No memory allocation is ever used on mem1.  Prove this using
  ** the following assert().  If the assert() fails, it indicates a
  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  */
  assert( mem1.zMalloc==0 );

  /* rc==0 here means that one or both of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */

  assert( pPKey2->default_rc==expected );

  return pPKey2->default_rc;
}







static int vdbeRecordCompareInt(
  int nKey1, const void *pKey1, /* Left key */
  int szHdr,
  u32 idx1,
  UnpackedRecord *pPKey2        /* Right key */

){
  const u8 *aKey = &((const u8*)pKey1)[szHdr];
  int serial_type = ((const u8*)pKey1)[1];
  int res;
  i64 v = pPKey2->aMem[0].u.i;
  i64 lhs;

  switch( serial_type ){


    case 1:
      lhs = (char)(aKey[0]);
      break;
    case 2:
      lhs = 256*(signed char)aKey[0] + aKey[1];
      break;
    case 3:
      lhs = 65536*(char)aKey[0] | (aKey[1]<<8) | aKey[2];
      break;
    case 4:
      lhs = (int)(((u32)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3]);
      break;

    case 5: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = (aKey[4] << 8) | aKey[5];
      lhs = (i64)( msw << 16 | (u64)lsw );
      break;
    }

    case 6: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
      lhs = (i64)( msw << 32 | (u64)lsw );
      break;
    }

    case 8: 
      lhs = 0;
      break;

    case 9:
      lhs = 1;
      break;

    /* This case could be removed without changing the results of running
    ** this code. Including it causes gcc to generate a faster switch 
    ** statement (since the range of switch targets now starts at zero and
    ** is contiguous) but does not cause any duplicate code to be generated
    ** (as gcc is clever enough to combine the two like cases). Other 
    ** compilers might be similar.  */ 
    case 0: case 7:
      return vdbeRecordCompare(nKey1, pKey1, szHdr, 1, pPKey2);

    default:
      return vdbeRecordCompare(nKey1, pKey1, szHdr, 1, pPKey2);
  }

  if( v>lhs ){
    res = pPKey2->r1;
  }else if( v<lhs ){
    res = pPKey2->r2;
  }else if( pPKey2->nField>1 ){
    /* The first fields of the two keys are equal. Compare the trailing 
    ** fields.  */
    res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
  }else{
    /* The first fields of the two keys are equal and there are no trailing
    ** fields. Return pPKey2->default_rc in this case. */
    res = pPKey2->default_rc;
  }

  assert( (res==0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)==0)
       || (res<0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)<0)
       || (res>0 && vdbeRecordComparePrev(nKey1, pKey1, pPKey2)>0)
       || CORRUPT_DB
  );
  return res;
}







static int vdbeRecordCompareString(
  int nKey1, const void *pKey1, /* Left key */
  int szHdr,
  u32 idx1,
  UnpackedRecord *pPKey2        /* Right key */

){
  const u8 *aKey1 = (const u8*)pKey1;
  int serial_type;
  int res;


  getVarint32(&aKey1[1], serial_type);

  if( serial_type<12 ){
    res = pPKey2->r1;      /* (pKey1/nKey1) is a number or a null */
  }else if( !(serial_type & 0x01) ){ 
    res = pPKey2->r2;      /* (pKey1/nKey1) is a blob */
  }else{
    int nCmp;
    int nStr;
    aKey1 = &aKey1[szHdr];

    nStr = (serial_type-12) / 2;
    if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
    nCmp = MIN( pPKey2->aMem[0].n, nStr );
    res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp);

    if( res==0 ){
      res = nStr - pPKey2->aMem[0].n;
      if( res==0 ){
        if( pPKey2->nField>1 ){
          res = vdbeRecordCompare(nKey1, pKey1, szHdr, 0, pPKey2);
        }else{
          res = pPKey2->default_rc;
        }
      }else if( res>0 ){
        res = pPKey2->r2;
      }else{
        res = pPKey2->r1;
................................................................................
    }else if( res>0 ){
      res = pPKey2->r2;
    }else{
      res = pPKey2->r1;
    }
  }

  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
       || CORRUPT_DB
  );
  return res;
}


int vdbeRecordCompareLargeHeader(
  int nKey1, const void *pKey1, /* Left key */
  int dummy1, u32 dummy2,       /* Unused arguments */
  UnpackedRecord *pPKey2        /* Right key */
){
  int szHdr;
  u32 idx1;
  idx1 = getVarint32(((u8*)pKey1), szHdr);
  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
}

RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){
  if( (p->pKeyInfo->nField + p->pKeyInfo->nXField) > 10 ){
    return vdbeRecordCompareLargeHeader;
  }else{
    int flags = p->aMem[0].flags;
    if( p->pKeyInfo->aSortOrder[0] ){
      p->r1 = 1;
      p->r2 = -1;
    }else{
      p->r1 = -1;
      p->r2 = 1;
................................................................................
    if( (flags & (MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
        && p->pKeyInfo->aColl[0]==0 
    ){
      return vdbeRecordCompareString;
    }
  }

  return vdbeRecordCompare;
}

RecordCompare sqlite3VdbeFindSorterCompare(KeyInfo *pKeyInfo){
  if( (pKeyInfo->nField + pKeyInfo->nXField) > 10 ){
    return vdbeRecordCompareLargeHeader;
  }
  return vdbeRecordCompare;
}

int sqlite3VdbeRecordCompare(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){
  int szHdr;
  u32 idx1;

  idx1 = getVarint32(((u8*)pKey1), szHdr);
  return vdbeRecordCompare(nKey1, pKey1, szHdr, idx1, pPKey2);
}

/*
** pCur points at an index entry created using the OP_MakeRecord opcode.
** Read the rowid (the last field in the record) and store it in *rowid.
** Return SQLITE_OK if everything works, or an error code otherwise.
**
................................................................................
    return SQLITE_CORRUPT_BKPT;
  }
  memset(&m, 0, sizeof(m));
  rc = sqlite3VdbeMemFromBtree(pC->pCursor, 0, (u32)nCellKey, 1, &m);
  if( rc ){
    return rc;
  }
  *res = sqlite3VdbeRecordCompare(m.n, m.z, pUnpacked);
  sqlite3VdbeMemRelease(&m);
  return SQLITE_OK;
}

/*
** This routine sets the value to be returned by subsequent calls to
** sqlite3_changes() on the database handle 'db'. 







>

|
<
|
|
|
|
|
<
<
<
<
<
<
<
<

|







 







>

>
>
>
>
>
>







 







>
>
>
>
>
>
>

>

>








<





<










>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
<
<
|
>

|
<
>
>
>
|
|




<
<
<
<
<
<
|
|

<
>

<

>
|


>
>
>
>







 







<


<
<
<
<
<
<
<

<
>
>
>
>
>











|
<





>
|
>



>
>
>
>
>
>


<
<
|
>

|





|

>












<






<






<



<











|


|









|






|
|
|





>
>
>
>
>
>


<
<
|
>





>









|




|





|







 







|
|
|





|
|
|
|
|
|
|
|
|
|
|
|
<
<
<
<







 







<
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<







 







|







3119
3120
3121
3122
3123
3124
3125
3126
3127
3128

3129
3130
3131
3132
3133








3134
3135
3136
3137
3138
3139
3140
3141
3142
....
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
....
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364

3365
3366
3367
3368
3369

3370
3371
3372
3373
3374
3375
3376
3377
3378
3379
3380
3381
3382
3383
3384
3385
3386
3387
3388
3389
3390
3391
3392
3393
3394
3395
3396
3397


3398
3399
3400
3401

3402
3403
3404
3405
3406
3407
3408
3409
3410






3411
3412
3413

3414
3415

3416
3417
3418
3419
3420
3421
3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
....
3528
3529
3530
3531
3532
3533
3534

3535
3536







3537

3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
3549
3550
3551
3552
3553
3554

3555
3556
3557
3558
3559
3560
3561
3562
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573


3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597

3598
3599
3600
3601
3602
3603

3604
3605
3606
3607
3608
3609

3610
3611
3612

3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659


3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
....
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
3712
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
3723




3724
3725
3726
3727
3728
3729
3730
....
3735
3736
3737
3738
3739
3740
3741










3742








3743
3744
3745
3746
3747
3748
3749
....
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
    pMem++;
    u++;
  }
  assert( u<=pKeyInfo->nField + 1 );
  p->nField = u;
}

#if SQLITE_DEBUG
/*
** This function compares two index or table record keys in the same way

** as the sqlite3VdbeRecordCompare() routine. Unlike VdbeRecordCompare(),
** this function deserializes and compares values using the
** sqlite3VdbeSerialGet() and sqlite3MemCompare() functions. It is used
** in assert() statements to ensure that the optimized code in
** sqlite3VdbeRecordCompare() returns results with these two primitives.








*/
static int vdbeRecordCompareDebug(
  int nKey1, const void *pKey1, /* Left key */
  UnpackedRecord *pPKey2        /* Right key */
){
  u32 d1;            /* Offset into aKey[] of next data element */
  u32 idx1;          /* Offset into aKey[] of next header element */
  u32 szHdr1;        /* Number of bytes in header */
  int i = 0;
................................................................................
  assert( mem1.zMalloc==0 );

  /* rc==0 here means that one of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */
  return pPKey2->default_rc;
}
#endif

/*
** Both *pMem1 and *pMem2 contain string values. Compare the two values
** using the collation sequence pColl. As usual, return a negative , zero
** or positive value if *pMem1 is less than, equal to or greater than 
** *pMem2, respectively. Similar in spirit to "rc = (*pMem1) - (*pMem2);".
*/
static int vdbeCompareMemString(
  const Mem *pMem1,
  const Mem *pMem2,
  const CollSeq *pColl
){
  if( pMem1->enc==pColl->enc ){
    /* The strings are already in the correct encoding.  Call the
................................................................................
  if( rc==0 ){
    rc = pMem1->n - pMem2->n;
  }
  return rc;
}


/*
** The first argument passed to this function is a serial-type that
** corresponds to an integer - all values between 1 and 9 inclusive 
** except 7. The second points to a buffer containing an integer value
** serialized according to serial_type. This function deserializes
** and returns the value.
*/
static i64 vdbeRecordDecodeInt(u32 serial_type, const u8 *aKey){
  assert( CORRUPT_DB || (serial_type>=1 && serial_type<=9 && serial_type!=7) );
  switch( serial_type ){
    case 0:
    case 1:
      return (char)aKey[0];
    case 2:
      return ((char)aKey[0] << 8) | aKey[1];
    case 3:
      return ((char)aKey[0] << 16) | (aKey[1] << 8) | aKey[2];
    case 4:
      return ((char)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3];

    case 5: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = (aKey[4] << 8) | aKey[5];
      return (i64)( msw << 16 | (u64)lsw );
    }

    case 6: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
      return (i64)( msw << 32 | (u64)lsw );
    }
  }

  return (serial_type - 8);
}

/*
** This function compares the two table rows or index records
** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
** or positive integer if key1 is less than, equal to or 
** greater than key2.  The {nKey1, pKey1} key must be a blob
** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
** key must be a parsed key such as obtained from
** sqlite3VdbeParseRecord.
**
** If argument bSkip is non-zero, it is assumed that the caller has already
** determined that the first fields of the keys are equal.
**
** Key1 and Key2 do not have to contain the same number of fields. If all 
** fields that appear in both keys are equal, then pPKey2->default_rc is 
** returned.
*/
int sqlite3VdbeRecordCompare(
  int nKey1, const void *pKey1,   /* Left key */


  UnpackedRecord *const pPKey2,   /* Right key */
  int bSkip                       /* If true, skip the first field */
){
  u32 d1;                         /* Offset into aKey[] of next data element */

  int i;                          /* Index of next field to compare */
  int szHdr1;                     /* Size of record header in bytes */
  u32 idx1;                       /* Offset of first type in header */
  int rc = 0;                     /* Return value */
  Mem *pRhs = pPKey2->aMem;       /* Next field of pPKey2 to compare */
  KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
  const unsigned char *aKey1 = (const unsigned char *)pKey1;
  Mem mem1;







  /* If bSkip is true, then the caller has already determined that the first
  ** two elements in the keys are equal. Fix the various stack variables so
  ** that this routine begins comparing at the second field. */

  if( bSkip ){
    u32 s1;

    idx1 = 1 + getVarint32(&aKey1[1], s1);
    szHdr1 = aKey1[0];
    d1 = szHdr1 + sqlite3VdbeSerialTypeLen(s1);
    i = 1;
    pRhs++;
  }else{
    idx1 = getVarint32(aKey1, szHdr1);
    d1 = szHdr1;
    i = 0;
  }

  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
  assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField 
       || CORRUPT_DB );
  assert( pPKey2->pKeyInfo->aSortOrder!=0 );
  assert( pPKey2->pKeyInfo->nField>0 );
................................................................................
    /* RHS is null */
    else{
      serial_type = aKey1[idx1];
      rc = (serial_type!=0);
    }

    if( rc!=0 ){

      if( pKeyInfo->aSortOrder[i] ){
        rc = -rc;







      }

      assert( CORRUPT_DB 
          || (rc<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
          || (rc>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
      );
      assert( mem1.zMalloc==0 );  /* See comment below */
      return rc;
    }

    i++;
    pRhs++;
    d1 += sqlite3VdbeSerialTypeLen(serial_type);
    idx1 += sqlite3VarintLen(serial_type);
  }while( idx1<szHdr1 && i<pPKey2->nField && d1<=nKey1 );

  /* No memory allocation is ever used on mem1.  Prove this using
  ** the following assert().  If the assert() fails, it indicates a
  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).  */

  assert( mem1.zMalloc==0 );

  /* rc==0 here means that one or both of the keys ran out of fields and
  ** all the fields up to that point were equal. Return the the default_rc
  ** value.  */
  assert( CORRUPT_DB 
       || pPKey2->default_rc==vdbeRecordCompareDebug(nKey1, pKey1, pPKey2) 
  );
  return pPKey2->default_rc;
}

/*
** This function is an optimized version of sqlite3VdbeRecordCompare() 
** that (a) the first field of pPKey2 is an integer, and (b) the 
** size-of-header varint at the start of (pKey1/nKey1) fits in a single
** byte (i.e. is less than 128).
*/
static int vdbeRecordCompareInt(
  int nKey1, const void *pKey1, /* Left key */


  UnpackedRecord *pPKey2,       /* Right key */
  int bSkip                     /* Ignored */
){
  const u8 *aKey = &((const u8*)pKey1)[*(const u8*)pKey1];
  int serial_type = ((const u8*)pKey1)[1];
  int res;
  i64 v = pPKey2->aMem[0].u.i;
  i64 lhs;

  assert( bSkip==0 );

  switch( serial_type ){
    case 1:
      lhs = (char)(aKey[0]);
      break;
    case 2:
      lhs = 256*(signed char)aKey[0] + aKey[1];
      break;
    case 3:
      lhs = 65536*(char)aKey[0] | (aKey[1]<<8) | aKey[2];
      break;
    case 4:
      lhs = (int)(((u32)aKey[0]<<24) | (aKey[1]<<16) | (aKey[2]<<8)| aKey[3]);
      break;

    case 5: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = (aKey[4] << 8) | aKey[5];
      lhs = (i64)( msw << 16 | (u64)lsw );
      break;
    }

    case 6: {
      i64 msw = ((char)aKey[0]<<24)|(aKey[1]<<16)|(aKey[2]<<8)|aKey[3];
      u32 lsw = ((unsigned)aKey[4]<<24)|(aKey[5]<<16)|(aKey[6]<<8)|aKey[7];
      lhs = (i64)( msw << 32 | (u64)lsw );
      break;
    }

    case 8: 
      lhs = 0;
      break;

    case 9:
      lhs = 1;
      break;

    /* This case could be removed without changing the results of running
    ** this code. Including it causes gcc to generate a faster switch 
    ** statement (since the range of switch targets now starts at zero and
    ** is contiguous) but does not cause any duplicate code to be generated
    ** (as gcc is clever enough to combine the two like cases). Other 
    ** compilers might be similar.  */ 
    case 0: case 7:
      return sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 0);

    default:
      return sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 0);
  }

  if( v>lhs ){
    res = pPKey2->r1;
  }else if( v<lhs ){
    res = pPKey2->r2;
  }else if( pPKey2->nField>1 ){
    /* The first fields of the two keys are equal. Compare the trailing 
    ** fields.  */
    res = sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 1);
  }else{
    /* The first fields of the two keys are equal and there are no trailing
    ** fields. Return pPKey2->default_rc in this case. */
    res = pPKey2->default_rc;
  }

  assert( (res==0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)==0)
       || (res<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
       || (res>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
       || CORRUPT_DB
  );
  return res;
}

/*
** This function is an optimized version of sqlite3VdbeRecordCompare() 
** that (a) the first field of pPKey2 is a string, that (b) the first field
** uses the collation sequence BINARY and (c) that the size-of-header varint 
** at the start of (pKey1/nKey1) fits in a single byte.
*/
static int vdbeRecordCompareString(
  int nKey1, const void *pKey1, /* Left key */


  UnpackedRecord *pPKey2,       /* Right key */
  int bSkip
){
  const u8 *aKey1 = (const u8*)pKey1;
  int serial_type;
  int res;

  assert( bSkip==0 );
  getVarint32(&aKey1[1], serial_type);

  if( serial_type<12 ){
    res = pPKey2->r1;      /* (pKey1/nKey1) is a number or a null */
  }else if( !(serial_type & 0x01) ){ 
    res = pPKey2->r2;      /* (pKey1/nKey1) is a blob */
  }else{
    int nCmp;
    int nStr;
    int szHdr = aKey1[0];

    nStr = (serial_type-12) / 2;
    if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
    nCmp = MIN( pPKey2->aMem[0].n, nStr );
    res = memcmp(&aKey1[szHdr], pPKey2->aMem[0].z, nCmp);

    if( res==0 ){
      res = nStr - pPKey2->aMem[0].n;
      if( res==0 ){
        if( pPKey2->nField>1 ){
          res = sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2, 1);
        }else{
          res = pPKey2->default_rc;
        }
      }else if( res>0 ){
        res = pPKey2->r2;
      }else{
        res = pPKey2->r1;
................................................................................
    }else if( res>0 ){
      res = pPKey2->r2;
    }else{
      res = pPKey2->r1;
    }
  }

  assert( (res==0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)==0)
       || (res<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
       || (res>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
       || CORRUPT_DB
  );
  return res;
}

/*
** Return a pointer to an sqlite3VdbeRecordCompare() compatible function
** suitable for comparing serialized records to the unpacked record passed
** as the only argument.
*/
RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){
  /* As the varints that make up a record header are all 5 bytes in size
  ** or less, if the binary keys being compared have 25 or fewer fields 
  ** then it is guaranteed that the varint at the start of every record 
  ** (the record-header size in bytes) fits in a single byte. If this
  ** is not the case, then sqlite3VdbeRecordCompare() must be used.  */
  if( (p->pKeyInfo->nField + p->pKeyInfo->nXField)<=25 ){




    int flags = p->aMem[0].flags;
    if( p->pKeyInfo->aSortOrder[0] ){
      p->r1 = 1;
      p->r2 = -1;
    }else{
      p->r1 = -1;
      p->r2 = 1;
................................................................................
    if( (flags & (MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
        && p->pKeyInfo->aColl[0]==0 
    ){
      return vdbeRecordCompareString;
    }
  }











  return sqlite3VdbeRecordCompare;








}

/*
** pCur points at an index entry created using the OP_MakeRecord opcode.
** Read the rowid (the last field in the record) and store it in *rowid.
** Return SQLITE_OK if everything works, or an error code otherwise.
**
................................................................................
    return SQLITE_CORRUPT_BKPT;
  }
  memset(&m, 0, sizeof(m));
  rc = sqlite3VdbeMemFromBtree(pC->pCursor, 0, (u32)nCellKey, 1, &m);
  if( rc ){
    return rc;
  }
  *res = sqlite3VdbeRecordCompare(m.n, m.z, pUnpacked, 0);
  sqlite3VdbeMemRelease(&m);
  return SQLITE_OK;
}

/*
** This routine sets the value to be returned by subsequent calls to
** sqlite3_changes() on the database handle 'db'. 

Changes to src/vdbesort.c.

101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
...
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
...
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  VdbeSorterIter *aIter;          /* Array of iterators to merge */
  int *aTree;                     /* Current state of incremental merge */
  sqlite3_file *pTemp1;           /* PMA file 1 */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  UnpackedRecord *pUnpacked;      /* Used to unpack keys */
  RecordCompare xRecordCompare;   /* Record compare function */
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {
................................................................................
        *pRes = -1;
        return;
      }
    }
    assert( r2->default_rc==0 );
  }

#if 0
  *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
#endif
  *pRes = pSorter->xRecordCompare(nKey1, pKey1, *((u8*)pKey1), 1, r2);
}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
** value to recalculate.
*/
................................................................................
  if( !sqlite3TempInMemory(db) ){
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
    pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
    mxCache = db->aDb[0].pSchema->cache_size;
    if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
    pSorter->mxPmaSize = mxCache * pgsz;
  }
  pSorter->xRecordCompare = sqlite3VdbeFindSorterCompare(pCsr->pKeyInfo);

  return SQLITE_OK;
}

/*
** Free the list of sorted records starting at pRecord.
*/







<







 







<
|
<
<







 







<







101
102
103
104
105
106
107

108
109
110
111
112
113
114
...
408
409
410
411
412
413
414

415


416
417
418
419
420
421
422
...
484
485
486
487
488
489
490

491
492
493
494
495
496
497
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  VdbeSorterIter *aIter;          /* Array of iterators to merge */
  int *aTree;                     /* Current state of incremental merge */
  sqlite3_file *pTemp1;           /* PMA file 1 */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  UnpackedRecord *pUnpacked;      /* Used to unpack keys */

};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {
................................................................................
        *pRes = -1;
        return;
      }
    }
    assert( r2->default_rc==0 );
  }


  *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);


}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
** value to recalculate.
*/
................................................................................
  if( !sqlite3TempInMemory(db) ){
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
    pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
    mxCache = db->aDb[0].pSchema->cache_size;
    if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
    pSorter->mxPmaSize = mxCache * pgsz;
  }


  return SQLITE_OK;
}

/*
** Free the list of sorted records starting at pRecord.
*/

Changes to src/where.c.

1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
....
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
#endif
  assert( pRec!=0 );
  iCol = pRec->nField - 1;
  assert( pIdx->nSample>0 );
  assert( pRec->nField>0 && iCol<pIdx->nSampleCol );
  do{
    iTest = (iMin+i)/2;
    res = sqlite3VdbeRecordCompare(aSample[iTest].n, aSample[iTest].p, pRec);
    if( res<0 ){
      iMin = iTest+1;
    }else{
      i = iTest;
    }
  }while( res && iMin<i );

................................................................................
#ifdef SQLITE_DEBUG
  /* The following assert statements check that the binary search code
  ** above found the right answer. This block serves no purpose other
  ** than to invoke the asserts.  */
  if( res==0 ){
    /* If (res==0) is true, then sample $i must be equal to pRec */
    assert( i<pIdx->nSample );
    assert( 0==sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec)
         || pParse->db->mallocFailed );
  }else{
    /* Otherwise, pRec must be smaller than sample $i and larger than
    ** sample ($i-1).  */
    assert( i==pIdx->nSample 
         || sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec)>0
         || pParse->db->mallocFailed );
    assert( i==0
         || sqlite3VdbeRecordCompare(aSample[i-1].n, aSample[i-1].p, pRec)<0
         || pParse->db->mallocFailed );
  }
#endif /* ifdef SQLITE_DEBUG */

  /* At this point, aSample[i] is the first sample that is greater than
  ** or equal to pVal.  Or if i==pIdx->nSample, then all samples are less
  ** than pVal.  If aSample[i]==pVal, then res==0.







|







 







|





|


|







1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
....
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
#endif
  assert( pRec!=0 );
  iCol = pRec->nField - 1;
  assert( pIdx->nSample>0 );
  assert( pRec->nField>0 && iCol<pIdx->nSampleCol );
  do{
    iTest = (iMin+i)/2;
    res = sqlite3VdbeRecordCompare(aSample[iTest].n, aSample[iTest].p, pRec, 0);
    if( res<0 ){
      iMin = iTest+1;
    }else{
      i = iTest;
    }
  }while( res && iMin<i );

................................................................................
#ifdef SQLITE_DEBUG
  /* The following assert statements check that the binary search code
  ** above found the right answer. This block serves no purpose other
  ** than to invoke the asserts.  */
  if( res==0 ){
    /* If (res==0) is true, then sample $i must be equal to pRec */
    assert( i<pIdx->nSample );
    assert( 0==sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec, 0)
         || pParse->db->mallocFailed );
  }else{
    /* Otherwise, pRec must be smaller than sample $i and larger than
    ** sample ($i-1).  */
    assert( i==pIdx->nSample 
         || sqlite3VdbeRecordCompare(aSample[i].n, aSample[i].p, pRec, 0)>0
         || pParse->db->mallocFailed );
    assert( i==0
         || sqlite3VdbeRecordCompare(aSample[i-1].n, aSample[i-1].p, pRec, 0)<0
         || pParse->db->mallocFailed );
  }
#endif /* ifdef SQLITE_DEBUG */

  /* At this point, aSample[i] is the first sample that is greater than
  ** or equal to pVal.  Or if i==pIdx->nSample, then all samples are less
  ** than pVal.  If aSample[i]==pVal, then res==0.