/ Check-in [85206e0b]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Attempt to speed up sqlite3VdbeRecordCompare() by various means. This code is in an interim state.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | experimental
Files: files | file ages | folders
SHA1: 85206e0bbac29adab52bef795f6d1479f2ae2c0e
User & Date: dan 2014-02-25 21:01:25
Context
2014-02-27
20:44
Further changes to sqlite3VdbeRecordCompare(). check-in: 57089374 user: dan tags: experimental
2014-02-25
21:01
Attempt to speed up sqlite3VdbeRecordCompare() by various means. This code is in an interim state. check-in: 85206e0b user: dan tags: experimental
18:12
Also adjust the order of files in the amalgamation to ensure that _FILE_OFFSET_BITS is defined before any #include, for QNX. check-in: 23001a85 user: drh tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Show Whitespace Changes Patch

Changes to src/btree.c.

  4543   4543     BtCursor *pCur,          /* The cursor to be moved */
  4544   4544     UnpackedRecord *pIdxKey, /* Unpacked index key */
  4545   4545     i64 intKey,              /* The table key */
  4546   4546     int biasRight,           /* If true, bias the search to the high end */
  4547   4547     int *pRes                /* Write search results here */
  4548   4548   ){
  4549   4549     int rc;
         4550  +  int (*xRecordCompare)(int, const void*, UnpackedRecord*);
  4550   4551   
  4551   4552     assert( cursorHoldsMutex(pCur) );
  4552   4553     assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
  4553   4554     assert( pRes );
  4554   4555     assert( (pIdxKey==0)==(pCur->pKeyInfo==0) );
  4555   4556   
  4556   4557     /* If the cursor is already positioned at the point we are trying
................................................................................
  4563   4564         return SQLITE_OK;
  4564   4565       }
  4565   4566       if( pCur->atLast && pCur->info.nKey<intKey ){
  4566   4567         *pRes = -1;
  4567   4568         return SQLITE_OK;
  4568   4569       }
  4569   4570     }
         4571  +
         4572  +  if( pIdxKey ){
         4573  +    xRecordCompare = sqlite3VdbeFindCompare(pIdxKey);
         4574  +    assert( pIdxKey->default_rc==1 || pIdxKey->default_rc==0 || pIdxKey->default_rc==-1);
         4575  +  }
  4570   4576   
  4571   4577     rc = moveToRoot(pCur);
  4572   4578     if( rc ){
  4573   4579       return rc;
  4574   4580     }
  4575   4581     assert( pCur->pgnoRoot==0 || pCur->apPage[pCur->iPage] );
  4576   4582     assert( pCur->pgnoRoot==0 || pCur->apPage[pCur->iPage]->isInit );
................................................................................
  4648   4654           */
  4649   4655           nCell = pCell[0];
  4650   4656           if( nCell<=pPage->max1bytePayload ){
  4651   4657             /* This branch runs if the record-size field of the cell is a
  4652   4658             ** single byte varint and the record fits entirely on the main
  4653   4659             ** b-tree page.  */
  4654   4660             testcase( pCell+nCell+1==pPage->aDataEnd );
  4655         -          c = sqlite3VdbeRecordCompare(nCell, (void*)&pCell[1], pIdxKey);
         4661  +          c = xRecordCompare(nCell, (void*)&pCell[1], pIdxKey);
  4656   4662           }else if( !(pCell[1] & 0x80) 
  4657   4663             && (nCell = ((nCell&0x7f)<<7) + pCell[1])<=pPage->maxLocal
  4658   4664           ){
  4659   4665             /* The record-size field is a 2 byte varint and the record 
  4660   4666             ** fits entirely on the main b-tree page.  */
  4661   4667             testcase( pCell+nCell+2==pPage->aDataEnd );
  4662         -          c = sqlite3VdbeRecordCompare(nCell, (void*)&pCell[2], pIdxKey);
         4668  +          c = xRecordCompare(nCell, (void*)&pCell[2], pIdxKey);
  4663   4669           }else{
  4664   4670             /* The record flows over onto one or more overflow pages. In
  4665   4671             ** this case the whole cell needs to be parsed, a buffer allocated
  4666   4672             ** and accessPayload() used to retrieve the record into the
  4667   4673             ** buffer before VdbeRecordCompare() can be called. */
  4668   4674             void *pCellKey;
  4669   4675             u8 * const pCellBody = pCell - pPage->childPtrSize;
................................................................................
  4676   4682             }
  4677   4683             pCur->aiIdx[pCur->iPage] = (u16)idx;
  4678   4684             rc = accessPayload(pCur, 0, nCell, (unsigned char*)pCellKey, 0);
  4679   4685             if( rc ){
  4680   4686               sqlite3_free(pCellKey);
  4681   4687               goto moveto_finish;
  4682   4688             }
  4683         -          c = sqlite3VdbeRecordCompare(nCell, pCellKey, pIdxKey);
         4689  +          c = xRecordCompare(nCell, pCellKey, pIdxKey);
  4684   4690             sqlite3_free(pCellKey);
  4685   4691           }
  4686   4692           if( c<0 ){
  4687   4693             lwr = idx+1;
  4688   4694           }else if( c>0 ){
  4689   4695             upr = idx-1;
  4690   4696           }else{

Changes to src/sqliteInt.h.

  1586   1586   **
  1587   1587   ** This structure holds a record that has already been disassembled
  1588   1588   ** into its constituent fields.
  1589   1589   */
  1590   1590   struct UnpackedRecord {
  1591   1591     KeyInfo *pKeyInfo;  /* Collation and sort-order information */
  1592   1592     u16 nField;         /* Number of entries in apMem[] */
  1593         -  u8 flags;           /* Boolean settings.  UNPACKED_... below */
         1593  +  char default_rc;    /* Comparison result if keys are equal */
  1594   1594     Mem *aMem;          /* Values */
  1595   1595   };
  1596   1596   
  1597         -/*
  1598         -** Allowed values of UnpackedRecord.flags
  1599         -*/
  1600         -#define UNPACKED_INCRKEY       0x01  /* Make this key an epsilon larger */
  1601         -#define UNPACKED_PREFIX_MATCH  0x02  /* A prefix match is considered OK */
  1602   1597   
  1603   1598   /*
  1604   1599   ** Each SQL index is represented in memory by an
  1605   1600   ** instance of the following structure.
  1606   1601   **
  1607   1602   ** The columns of the table that are to be indexed are described
  1608   1603   ** by the aiColumn[] field of this structure.  For example, suppose

Changes to src/vdbe.c.

  3551   3551       assert( pOp->p4type==P4_INT32 );
  3552   3552       assert( nField>0 );
  3553   3553       r.pKeyInfo = pC->pKeyInfo;
  3554   3554       r.nField = (u16)nField;
  3555   3555   
  3556   3556       /* The next line of code computes as follows, only faster:
  3557   3557       **   if( oc==OP_SeekGT || oc==OP_SeekLE ){
  3558         -    **     r.flags = UNPACKED_INCRKEY;
         3558  +    **     r.default_rc = -1;
  3559   3559       **   }else{
  3560         -    **     r.flags = 0;
         3560  +    **     r.default_rc = +1;
  3561   3561       **   }
  3562   3562       */
  3563         -    r.flags = (u8)(UNPACKED_INCRKEY * (1 & (oc - OP_SeekLT)));
  3564         -    assert( oc!=OP_SeekGT || r.flags==UNPACKED_INCRKEY );
  3565         -    assert( oc!=OP_SeekLE || r.flags==UNPACKED_INCRKEY );
  3566         -    assert( oc!=OP_SeekGE || r.flags==0 );
  3567         -    assert( oc!=OP_SeekLT || r.flags==0 );
         3563  +    r.default_rc = ((1 & (oc - OP_SeekLT)) ? -1 : +1);
         3564  +    assert( oc!=OP_SeekGT || r.default_rc==-1 );
         3565  +    assert( oc!=OP_SeekLE || r.default_rc==-1 );
         3566  +    assert( oc!=OP_SeekGE || r.default_rc==+1 );
         3567  +    assert( oc!=OP_SeekLT || r.default_rc==+1 );
  3568   3568   
  3569   3569       r.aMem = &aMem[pOp->p3];
  3570   3570   #ifdef SQLITE_DEBUG
  3571   3571       { int i; for(i=0; i<r.nField; i++) assert( memIsValid(&r.aMem[i]) ); }
  3572   3572   #endif
  3573   3573       ExpandBlob(r.aMem);
  3574   3574       rc = sqlite3BtreeMovetoUnpacked(pC->pCursor, &r, 0, 0, &res);
................................................................................
  3718   3718       for(ii=0; ii<r.nField; ii++){
  3719   3719         assert( memIsValid(&r.aMem[ii]) );
  3720   3720         ExpandBlob(&r.aMem[ii]);
  3721   3721   #ifdef SQLITE_DEBUG
  3722   3722         if( ii ) REGISTER_TRACE(pOp->p3+ii, &r.aMem[ii]);
  3723   3723   #endif
  3724   3724       }
  3725         -    r.flags = UNPACKED_PREFIX_MATCH;
  3726   3725       pIdxKey = &r;
  3727   3726     }else{
  3728   3727       pIdxKey = sqlite3VdbeAllocUnpackedRecord(
  3729   3728           pC->pKeyInfo, aTempRec, sizeof(aTempRec), &pFree
  3730   3729       ); 
  3731   3730       if( pIdxKey==0 ) goto no_mem;
  3732   3731       assert( pIn3->flags & MEM_Blob );
  3733   3732       assert( (pIn3->flags & MEM_Zero)==0 );  /* zeroblobs already expanded */
  3734   3733       sqlite3VdbeRecordUnpack(pC->pKeyInfo, pIn3->n, pIn3->z, pIdxKey);
  3735         -    pIdxKey->flags |= UNPACKED_PREFIX_MATCH;
  3736   3734     }
         3735  +  pIdxKey->default_rc = 0;
  3737   3736     if( pOp->opcode==OP_NoConflict ){
  3738   3737       /* For the OP_NoConflict opcode, take the jump if any of the
  3739   3738       ** input fields are NULL, since any key with a NULL will not
  3740   3739       ** conflict */
  3741   3740       for(ii=0; ii<r.nField; ii++){
  3742   3741         if( r.aMem[ii].flags & MEM_Null ){
  3743   3742           pc = pOp->p2 - 1; VdbeBranchTaken(1,2);
................................................................................
  4618   4617     pC = p->apCsr[pOp->p1];
  4619   4618     assert( pC!=0 );
  4620   4619     pCrsr = pC->pCursor;
  4621   4620     assert( pCrsr!=0 );
  4622   4621     assert( pOp->p5==0 );
  4623   4622     r.pKeyInfo = pC->pKeyInfo;
  4624   4623     r.nField = (u16)pOp->p3;
  4625         -  r.flags = UNPACKED_PREFIX_MATCH;
         4624  +  r.default_rc = 0;
  4626   4625     r.aMem = &aMem[pOp->p2];
  4627   4626   #ifdef SQLITE_DEBUG
  4628   4627     { int i; for(i=0; i<r.nField; i++) assert( memIsValid(&r.aMem[i]) ); }
  4629   4628   #endif
  4630   4629     rc = sqlite3BtreeMovetoUnpacked(pCrsr, &r, 0, 0, &res);
  4631   4630     if( rc==SQLITE_OK && res==0 ){
  4632   4631       rc = sqlite3BtreeDelete(pCrsr);
................................................................................
  4732   4731     assert( pC->deferredMoveto==0 );
  4733   4732     assert( pOp->p5==0 || pOp->p5==1 );
  4734   4733     assert( pOp->p4type==P4_INT32 );
  4735   4734     r.pKeyInfo = pC->pKeyInfo;
  4736   4735     r.nField = (u16)pOp->p4.i;
  4737   4736     if( pOp->opcode<OP_IdxLT ){
  4738   4737       assert( pOp->opcode==OP_IdxLE || pOp->opcode==OP_IdxGT );
  4739         -    r.flags = UNPACKED_INCRKEY | UNPACKED_PREFIX_MATCH;
         4738  +    r.default_rc = -1;
  4740   4739     }else{
  4741   4740       assert( pOp->opcode==OP_IdxGE || pOp->opcode==OP_IdxLT );
  4742         -    r.flags = UNPACKED_PREFIX_MATCH;
         4741  +    r.default_rc = 0;
  4743   4742     }
  4744   4743     r.aMem = &aMem[pOp->p3];
  4745   4744   #ifdef SQLITE_DEBUG
  4746   4745     { int i; for(i=0; i<r.nField; i++) assert( memIsValid(&r.aMem[i]) ); }
  4747   4746   #endif
  4748   4747     res = 0;  /* Not needed.  Only used to silence a warning. */
  4749   4748     rc = sqlite3VdbeIdxKeyCompare(pC, &r, &res);

Changes to src/vdbe.h.

   209    209   #ifndef SQLITE_OMIT_TRACE
   210    210     char *sqlite3VdbeExpandSql(Vdbe*, const char*);
   211    211   #endif
   212    212   
   213    213   void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*);
   214    214   int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*);
   215    215   UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **);
          216  +
          217  +typedef int (*RecordCompare)(int,const void*, UnpackedRecord*);
          218  +RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *);
   216    219   
   217    220   #ifndef SQLITE_OMIT_TRIGGER
   218    221   void sqlite3VdbeLinkSubProgram(Vdbe *, SubProgram *);
   219    222   #endif
   220    223   
   221    224   /* Use SQLITE_ENABLE_COMMENTS to enable generation of extra comments on
   222    225   ** each VDBE opcode.

Changes to src/vdbeaux.c.

  3098   3098     const unsigned char *aKey = (const unsigned char *)pKey;
  3099   3099     int d; 
  3100   3100     u32 idx;                        /* Offset in aKey[] to read from */
  3101   3101     u16 u;                          /* Unsigned loop counter */
  3102   3102     u32 szHdr;
  3103   3103     Mem *pMem = p->aMem;
  3104   3104   
  3105         -  p->flags = 0;
         3105  +  p->default_rc = 0;
  3106   3106     assert( EIGHT_BYTE_ALIGNMENT(pMem) );
  3107   3107     idx = getVarint32(aKey, szHdr);
  3108   3108     d = szHdr;
  3109   3109     u = 0;
  3110   3110     while( idx<szHdr && u<p->nField && d<=nKey ){
  3111   3111       u32 serial_type;
  3112   3112   
................................................................................
  3118   3118       d += sqlite3VdbeSerialGet(&aKey[d], serial_type, pMem);
  3119   3119       pMem++;
  3120   3120       u++;
  3121   3121     }
  3122   3122     assert( u<=pKeyInfo->nField + 1 );
  3123   3123     p->nField = u;
  3124   3124   }
         3125  +
         3126  +static int vdbeRecordCompareString(
         3127  +  int nKey1, const void *pKey1, /* Left key */
         3128  +  UnpackedRecord *pPKey2        /* Right key */
         3129  +){
         3130  +  const u8 *aKey1 = (const u8*)pKey1;
         3131  +  int szHdr;
         3132  +  int serial_type;
         3133  +  int res;
         3134  +
         3135  +  szHdr = aKey1[0];
         3136  +  getVarint32(&aKey1[1], serial_type);
         3137  +
         3138  +  if( serial_type<12 ){
         3139  +    res = -1;      /* (pKey1/nKey1) is a number or a null */
         3140  +  }else if( !(serial_type & 0x01) ){ 
         3141  +    res = +1;      /* (pKey1/nKey1) is a blob */
         3142  +  }else{
         3143  +    int nCmp;
         3144  +    int nStr;
         3145  +    aKey1 = &aKey1[szHdr];
         3146  +
         3147  +    nStr = (serial_type-12) / 2;
         3148  +    if( (szHdr + nStr) > nKey1 ) return 0;    /* Corruption */
         3149  +    nCmp = MIN( pPKey2->aMem[0].n, nStr );
         3150  +    res = memcmp(aKey1, pPKey2->aMem[0].z, nCmp);
         3151  +
         3152  +    if( res==0 ){
         3153  +      res = nStr - pPKey2->aMem[0].n;
         3154  +      if( res==0 ) res = pPKey2->default_rc;
         3155  +    }
         3156  +  }
         3157  +
         3158  +  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
         3159  +       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
         3160  +       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
         3161  +  );
         3162  +  return res;
         3163  +}
         3164  +
         3165  +static int vdbeRecordCompareInt(
         3166  +  int nKey1, const void *pKey1, /* Left key */
         3167  +  UnpackedRecord *pPKey2        /* Right key */
         3168  +){
         3169  +  const u8 *aKey1 = (const u8*)pKey1;
         3170  +  int szHdr;
         3171  +  int serial_type;
         3172  +  int res;
         3173  +
         3174  +  szHdr = aKey1[0];
         3175  +  getVarint32(&aKey1[1], serial_type);
         3176  +
         3177  +  if( serial_type==0 ){
         3178  +    res = -1;      /* NULL values are smaller than integers */
         3179  +  }else if( serial_type>=12 ){
         3180  +    res = +1;      /* text/blob values are greater */
         3181  +  }else{
         3182  +    Mem mem;
         3183  +    sqlite3VdbeSerialGet(&aKey1[szHdr], serial_type, &mem);
         3184  +    if( mem.flags & MEM_Int ){
         3185  +      i64 v = pPKey2->aMem[0].u.i;
         3186  +      if( v>mem.u.i ){
         3187  +        res = -1;
         3188  +      }else if( v<mem.u.i ){
         3189  +        res = +1;
         3190  +      }else{
         3191  +        res = pPKey2->default_rc;
         3192  +      }
         3193  +    }else{
         3194  +      double v = (double)pPKey2->aMem[0].u.i;
         3195  +      if( v>mem.r ){
         3196  +        res = -1;
         3197  +      }else if( v<mem.r ){
         3198  +        res = +1;
         3199  +      }else{
         3200  +        res = pPKey2->default_rc;
         3201  +      }
         3202  +    }
         3203  +  }
         3204  +
         3205  +  assert( (res==0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)==0)
         3206  +       || (res<0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)<0)
         3207  +       || (res>0 && sqlite3VdbeRecordCompare(nKey1, pKey1, pPKey2)>0)
         3208  +  );
         3209  +  return res;
         3210  +}
         3211  +
         3212  +static int vdbeCompareMemString(
         3213  +  const Mem *pMem1, 
         3214  +  const Mem *pMem2, 
         3215  +  const CollSeq *pColl
         3216  +){
         3217  +  if( pMem1->enc==pColl->enc ){
         3218  +    /* The strings are already in the correct encoding.  Call the
         3219  +     ** comparison function directly */
         3220  +    return pColl->xCmp(pColl->pUser,pMem1->n,pMem1->z,pMem2->n,pMem2->z);
         3221  +  }else{
         3222  +    int rc;
         3223  +    const void *v1, *v2;
         3224  +    int n1, n2;
         3225  +    Mem c1;
         3226  +    Mem c2;
         3227  +    memset(&c1, 0, sizeof(c1));
         3228  +    memset(&c2, 0, sizeof(c2));
         3229  +    sqlite3VdbeMemShallowCopy(&c1, pMem1, MEM_Ephem);
         3230  +    sqlite3VdbeMemShallowCopy(&c2, pMem2, MEM_Ephem);
         3231  +    v1 = sqlite3ValueText((sqlite3_value*)&c1, pColl->enc);
         3232  +    n1 = v1==0 ? 0 : c1.n;
         3233  +    v2 = sqlite3ValueText((sqlite3_value*)&c2, pColl->enc);
         3234  +    n2 = v2==0 ? 0 : c2.n;
         3235  +    rc = pColl->xCmp(pColl->pUser, n1, v1, n2, v2);
         3236  +    sqlite3VdbeMemRelease(&c1);
         3237  +    sqlite3VdbeMemRelease(&c2);
         3238  +    return rc;
         3239  +  }
         3240  +}
         3241  +
         3242  +/*
         3243  +** Compare the values contained by the two memory cells, returning
         3244  +** negative, zero or positive if pMem1 is less than, equal to, or greater
         3245  +** than pMem2. Sorting order is NULL's first, followed by numbers (integers
         3246  +** and reals) sorted numerically, followed by text ordered by the collating
         3247  +** sequence pColl and finally blob's ordered by memcmp().
         3248  +**
         3249  +** Two NULL values are considered equal by this function.
         3250  +*/
         3251  +int sqlite3MemCompare(const Mem *pMem1, const Mem *pMem2, const CollSeq *pColl){
         3252  +  int rc;
         3253  +  int f1, f2;
         3254  +  int combined_flags;
         3255  +
         3256  +  f1 = pMem1->flags;
         3257  +  f2 = pMem2->flags;
         3258  +  combined_flags = f1|f2;
         3259  +  assert( (combined_flags & MEM_RowSet)==0 );
         3260  + 
         3261  +  /* If one value is NULL, it is less than the other. If both values
         3262  +  ** are NULL, return 0.
         3263  +  */
         3264  +  if( combined_flags&MEM_Null ){
         3265  +    return (f2&MEM_Null) - (f1&MEM_Null);
         3266  +  }
         3267  +
         3268  +  /* If one value is a number and the other is not, the number is less.
         3269  +  ** If both are numbers, compare as reals if one is a real, or as integers
         3270  +  ** if both values are integers.
         3271  +  */
         3272  +  if( combined_flags&(MEM_Int|MEM_Real) ){
         3273  +    double r1, r2;
         3274  +    if( (f1 & f2 & MEM_Int)!=0 ){
         3275  +      if( pMem1->u.i < pMem2->u.i ) return -1;
         3276  +      if( pMem1->u.i > pMem2->u.i ) return 1;
         3277  +      return 0;
         3278  +    }
         3279  +    if( (f1&MEM_Real)!=0 ){
         3280  +      r1 = pMem1->r;
         3281  +    }else if( (f1&MEM_Int)!=0 ){
         3282  +      r1 = (double)pMem1->u.i;
         3283  +    }else{
         3284  +      return 1;
         3285  +    }
         3286  +    if( (f2&MEM_Real)!=0 ){
         3287  +      r2 = pMem2->r;
         3288  +    }else if( (f2&MEM_Int)!=0 ){
         3289  +      r2 = (double)pMem2->u.i;
         3290  +    }else{
         3291  +      return -1;
         3292  +    }
         3293  +    if( r1<r2 ) return -1;
         3294  +    if( r1>r2 ) return 1;
         3295  +    return 0;
         3296  +  }
         3297  +
         3298  +  /* If one value is a string and the other is a blob, the string is less.
         3299  +  ** If both are strings, compare using the collating functions.
         3300  +  */
         3301  +  if( combined_flags&MEM_Str ){
         3302  +    if( (f1 & MEM_Str)==0 ){
         3303  +      return 1;
         3304  +    }
         3305  +    if( (f2 & MEM_Str)==0 ){
         3306  +      return -1;
         3307  +    }
         3308  +
         3309  +    assert( pMem1->enc==pMem2->enc );
         3310  +    assert( pMem1->enc==SQLITE_UTF8 || 
         3311  +            pMem1->enc==SQLITE_UTF16LE || pMem1->enc==SQLITE_UTF16BE );
         3312  +
         3313  +    /* The collation sequence must be defined at this point, even if
         3314  +    ** the user deletes the collation sequence after the vdbe program is
         3315  +    ** compiled (this was not always the case).
         3316  +    */
         3317  +    assert( !pColl || pColl->xCmp );
         3318  +
         3319  +    if( pColl ){
         3320  +      return vdbeCompareMemString(pMem1, pMem2, pColl);
         3321  +    }
         3322  +    /* If a NULL pointer was passed as the collate function, fall through
         3323  +    ** to the blob case and use memcmp().  */
         3324  +  }
         3325  + 
         3326  +  /* Both values must be blobs.  Compare using memcmp().  */
         3327  +  rc = memcmp(pMem1->z, pMem2->z, (pMem1->n>pMem2->n)?pMem2->n:pMem1->n);
         3328  +  if( rc==0 ){
         3329  +    rc = pMem1->n - pMem2->n;
         3330  +  }
         3331  +  return rc;
         3332  +}
         3333  +
  3125   3334   
  3126   3335   /*
  3127   3336   ** This function compares the two table rows or index records
  3128   3337   ** specified by {nKey1, pKey1} and pPKey2.  It returns a negative, zero
  3129   3338   ** or positive integer if key1 is less than, equal to or 
  3130   3339   ** greater than key2.  The {nKey1, pKey1} key must be a blob
  3131   3340   ** created by th OP_MakeRecord opcode of the VDBE.  The pPKey2
................................................................................
  3136   3345   ** The key with fewer fields is usually compares less than the 
  3137   3346   ** longer key.  However if the UNPACKED_INCRKEY flags in pPKey2 is set
  3138   3347   ** and the common prefixes are equal, then key1 is less than key2.
  3139   3348   ** Or if the UNPACKED_MATCH_PREFIX flag is set and the prefixes are
  3140   3349   ** equal, then the keys are considered to be equal and
  3141   3350   ** the parts beyond the common prefix are ignored.
  3142   3351   */
  3143         -int sqlite3VdbeRecordCompare(
         3352  +static int vdbeRecordComparePrev(
  3144   3353     int nKey1, const void *pKey1, /* Left key */
  3145   3354     UnpackedRecord *pPKey2        /* Right key */
  3146   3355   ){
  3147   3356     u32 d1;            /* Offset into aKey[] of next data element */
  3148   3357     u32 idx1;          /* Offset into aKey[] of next header element */
  3149   3358     u32 szHdr1;        /* Number of bytes in header */
  3150   3359     int i = 0;
................................................................................
  3212   3421     /* No memory allocation is ever used on mem1.  Prove this using
  3213   3422     ** the following assert().  If the assert() fails, it indicates a
  3214   3423     ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
  3215   3424     */
  3216   3425     assert( mem1.zMalloc==0 );
  3217   3426   
  3218   3427     /* rc==0 here means that one of the keys ran out of fields and
  3219         -  ** all the fields up to that point were equal. If the UNPACKED_INCRKEY
  3220         -  ** flag is set, then break the tie by treating key2 as larger.
  3221         -  ** If the UPACKED_PREFIX_MATCH flag is set, then keys with common prefixes
  3222         -  ** are considered to be equal.  Otherwise, the longer key is the 
  3223         -  ** larger.  As it happens, the pPKey2 will always be the longer
  3224         -  ** if there is a difference.
  3225         -  */
  3226         -  assert( rc==0 );
  3227         -  if( pPKey2->flags & UNPACKED_INCRKEY ){
         3428  +  ** all the fields up to that point were equal. Return the the default_rc
         3429  +  ** value.  */
         3430  +  return pPKey2->default_rc;
         3431  +}
         3432  + 
         3433  +
         3434  +int sqlite3VdbeRecordCompare(
         3435  +  int nKey1, const void *pKey1, /* Left key */
         3436  +  UnpackedRecord *pPKey2        /* Right key */
         3437  +){
         3438  +  u32 d1;            /* Offset into aKey[] of next data element */
         3439  +  u32 idx1;          /* Offset into aKey[] of next header element */
         3440  +  u32 szHdr1;        /* Number of bytes in header */
         3441  +  int i = 0;
         3442  +  int rc = 0;
         3443  +  KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
         3444  +  const unsigned char *aKey1 = (const unsigned char *)pKey1;
         3445  +  Mem mem1;
         3446  +
         3447  +#ifdef SQLITE_DEBUG
         3448  +  int expected = vdbeRecordComparePrev(nKey1, pKey1, pPKey2);
         3449  +  static int nCall = 0;
         3450  +  nCall++;
         3451  +#endif
         3452  +
         3453  +  VVA_ONLY( mem1.zMalloc = 0; ) /* Only needed by assert() statements */
         3454  +  
         3455  +  idx1 = getVarint32(aKey1, szHdr1);
         3456  +  d1 = szHdr1;
         3457  +  assert( pPKey2->pKeyInfo->nField+pPKey2->pKeyInfo->nXField>=pPKey2->nField 
         3458  +       || CORRUPT_DB );
         3459  +  assert( pPKey2->pKeyInfo->aSortOrder!=0 );
         3460  +  assert( pPKey2->pKeyInfo->nField>0 );
         3461  +  assert( idx1<=szHdr1 || CORRUPT_DB );
         3462  +  do{
         3463  +    Mem *pRhs = &pPKey2->aMem[i];
         3464  +    u32 serial_type;
         3465  +
         3466  +    /* RHS is an integer */
         3467  +    if( pRhs->flags & MEM_Int ){
         3468  +      serial_type = aKey1[idx1];
         3469  +      if( serial_type>=12 ){
         3470  +        rc = +1;
         3471  +      }else if( serial_type==0 ){
         3472  +        rc = -1;
         3473  +      }else{
         3474  +        sqlite3VdbeSerialGet(&aKey1[d1], serial_type, &mem1);
         3475  +        if( serial_type==7 ){
         3476  +          double rhs = (double)pRhs->u.i;
         3477  +          if( mem1.r<rhs ){
         3478  +            rc = -1;
         3479  +          }else if( mem1.r>rhs ){
         3480  +            rc = +1;
         3481  +          }
         3482  +        }else{
         3483  +          i64 rhs = pRhs->u.i;
         3484  +          if( mem1.u.i<rhs ){
         3485  +            rc = -1;
         3486  +          }else if( mem1.u.i>rhs ){
         3487  +            rc = +1;
         3488  +          }
         3489  +        }
         3490  +      }
         3491  +    }
         3492  +
         3493  +    /* RHS is real */
         3494  +    else if( pRhs->flags & MEM_Real ){
         3495  +      serial_type = aKey1[idx1];
         3496  +      if( serial_type>=12 ){
         3497  +        rc = +1;
         3498  +      }else if( serial_type==0 ){
         3499  +        rc = -1;
         3500  +      }else{
         3501  +        double rhs = pRhs->r;
         3502  +        double lhs;
         3503  +        sqlite3VdbeSerialGet(&aKey1[d1], serial_type, &mem1);
         3504  +        if( serial_type==7 ){
         3505  +          lhs = mem1.r;
         3506  +        }else{
         3507  +          lhs = mem1.u.i;
         3508  +        }
         3509  +        if( lhs<rhs ){
         3510  +          rc = -1;
         3511  +        }else if( lhs>rhs ){
         3512  +          rc = +1;
         3513  +        }
         3514  +      }
         3515  +    }
         3516  +
         3517  +    /* RHS is a string */
         3518  +    else if( pRhs->flags & MEM_Str ){
         3519  +      getVarint32(&aKey1[idx1], serial_type);
         3520  +      if( serial_type<12 ){
         3521  +        rc = -1;
         3522  +      }else if( !(serial_type & 0x01) ){
         3523  +        rc = +1;
         3524  +      }else{
         3525  +        mem1.n = (serial_type - 12) / 2;
         3526  +        if( (d1+mem1.n) > nKey1 ){
         3527  +          rc = 1;                /* Corruption */
         3528  +        }else if( pKeyInfo->aColl[i] ){
         3529  +          mem1.enc = pKeyInfo->enc;
         3530  +          mem1.db = pKeyInfo->db;
         3531  +          mem1.flags = MEM_Str;
         3532  +          mem1.z = &aKey1[d1];
         3533  +          rc = vdbeCompareMemString(&mem1, pRhs, pKeyInfo->aColl[i]);
         3534  +        }else{
         3535  +          int nCmp = MIN(mem1.n, pRhs->n);
         3536  +          rc = memcmp(&aKey1[d1], pRhs->z, nCmp);
         3537  +          if( rc==0 ) rc = mem1.n - pRhs->n; 
         3538  +        }
         3539  +      }
         3540  +    }
         3541  +
         3542  +    /* RHS is a blob */
         3543  +    else if( pRhs->flags & MEM_Blob ){
         3544  +      getVarint32(&aKey1[idx1], serial_type);
         3545  +      if( serial_type<12 || (serial_type & 0x01) ){
  3228   3546       rc = -1;
  3229         -  }else if( pPKey2->flags & UNPACKED_PREFIX_MATCH ){
  3230         -    /* Leave rc==0 */
  3231         -  }else if( idx1<szHdr1 ){
  3232         -    rc = 1;
         3547  +      }else{
         3548  +        int nStr = (serial_type - 12) / 2;
         3549  +        if( (d1+nStr) > nKey1 ){
         3550  +          rc = 1;                /* Corruption */
         3551  +        }else{
         3552  +          int nCmp = MIN(nStr, pRhs->n);
         3553  +          rc = memcmp(&aKey1[d1], pRhs->z, nCmp);
         3554  +          if( rc==0 ) rc = nStr - pRhs->n;
  3233   3555     }
         3556  +      }
         3557  +    }
         3558  +
         3559  +    /* RHS is null */
         3560  +    else{
         3561  +      serial_type = aKey1[idx1];
         3562  +      rc = (serial_type!=0);
         3563  +    }
         3564  +
         3565  +    if( rc!=0 ){
         3566  +      assert( mem1.zMalloc==0 );  /* See comment below */
         3567  +      if( pKeyInfo->aSortOrder[i] ){
         3568  +        rc = -rc;
         3569  +#if 0
         3570  +        assert( (rc>0 && (rc^(int)0x80000000)<0) 
         3571  +             || (rc<0 && (rc^(int)0x80000000)>0) );
         3572  +        assert( sizeof(int)==4 );
         3573  +        rc ^= (int)0x80000000;    /* similar in spirit to: "rc = -rc;" */
         3574  +        assert( rc!=0 );
         3575  +#endif
         3576  +      }
         3577  +      assert( (rc<0 && expected<0) || (rc>0 && expected>0) || CORRUPT_DB );
  3234   3578     return rc;
  3235   3579   }
         3580  +
         3581  +    i++;
         3582  +    d1 += sqlite3VdbeSerialTypeLen(serial_type);
         3583  +    idx1 += sqlite3VarintLen(serial_type);
         3584  +  }while( idx1<szHdr1 && i<pPKey2->nField && d1<=nKey1 );
         3585  +
         3586  +  /* No memory allocation is ever used on mem1.  Prove this using
         3587  +  ** the following assert().  If the assert() fails, it indicates a
         3588  +  ** memory leak and a need to call sqlite3VdbeMemRelease(&mem1).
         3589  +  */
         3590  +  assert( mem1.zMalloc==0 );
         3591  +
         3592  +  /* rc==0 here means that one or both of the keys ran out of fields and
         3593  +  ** all the fields up to that point were equal. Return the the default_rc
         3594  +  ** value.  */
         3595  +  assert( pPKey2->default_rc==expected );
         3596  +  return pPKey2->default_rc;
         3597  +}
         3598  +
         3599  +RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *p){
         3600  +  if( p->nField==1 && p->pKeyInfo->aSortOrder[0]==0 ){
         3601  +    int flags = p->aMem[0].flags;
         3602  +    if( (flags & MEM_Int) ){
         3603  +      return vdbeRecordCompareInt;
         3604  +    }else if( (p->aMem[0].flags&(MEM_Int|MEM_Real|MEM_Null|MEM_Blob))==0 
         3605  +            && p->pKeyInfo->aColl[0]==0 
         3606  +    ){
         3607  +      return vdbeRecordCompareString;
         3608  +    }
         3609  +  }
         3610  +  return sqlite3VdbeRecordCompare;
         3611  +}
  3236   3612    
  3237   3613   
  3238   3614   /*
  3239   3615   ** pCur points at an index entry created using the OP_MakeRecord opcode.
  3240   3616   ** Read the rowid (the last field in the record) and store it in *rowid.
  3241   3617   ** Return SQLITE_OK if everything works, or an error code otherwise.
  3242   3618   **
................................................................................
  3343   3719       return SQLITE_CORRUPT_BKPT;
  3344   3720     }
  3345   3721     memset(&m, 0, sizeof(m));
  3346   3722     rc = sqlite3VdbeMemFromBtree(pC->pCursor, 0, (u32)nCellKey, 1, &m);
  3347   3723     if( rc ){
  3348   3724       return rc;
  3349   3725     }
  3350         -  assert( pUnpacked->flags & UNPACKED_PREFIX_MATCH );
  3351   3726     *res = sqlite3VdbeRecordCompare(m.n, m.z, pUnpacked);
  3352   3727     sqlite3VdbeMemRelease(&m);
  3353   3728     return SQLITE_OK;
  3354   3729   }
  3355   3730   
  3356   3731   /*
  3357   3732   ** This routine sets the value to be returned by subsequent calls to

Changes to src/vdbemem.c.

   750    750     if( nByte>iLimit ){
   751    751       return SQLITE_TOOBIG;
   752    752     }
   753    753   
   754    754     return SQLITE_OK;
   755    755   }
   756    756   
   757         -/*
   758         -** Compare the values contained by the two memory cells, returning
   759         -** negative, zero or positive if pMem1 is less than, equal to, or greater
   760         -** than pMem2. Sorting order is NULL's first, followed by numbers (integers
   761         -** and reals) sorted numerically, followed by text ordered by the collating
   762         -** sequence pColl and finally blob's ordered by memcmp().
   763         -**
   764         -** Two NULL values are considered equal by this function.
   765         -*/
   766         -int sqlite3MemCompare(const Mem *pMem1, const Mem *pMem2, const CollSeq *pColl){
   767         -  int rc;
   768         -  int f1, f2;
   769         -  int combined_flags;
   770         -
   771         -  f1 = pMem1->flags;
   772         -  f2 = pMem2->flags;
   773         -  combined_flags = f1|f2;
   774         -  assert( (combined_flags & MEM_RowSet)==0 );
   775         - 
   776         -  /* If one value is NULL, it is less than the other. If both values
   777         -  ** are NULL, return 0.
   778         -  */
   779         -  if( combined_flags&MEM_Null ){
   780         -    return (f2&MEM_Null) - (f1&MEM_Null);
   781         -  }
   782         -
   783         -  /* If one value is a number and the other is not, the number is less.
   784         -  ** If both are numbers, compare as reals if one is a real, or as integers
   785         -  ** if both values are integers.
   786         -  */
   787         -  if( combined_flags&(MEM_Int|MEM_Real) ){
   788         -    double r1, r2;
   789         -    if( (f1 & f2 & MEM_Int)!=0 ){
   790         -      if( pMem1->u.i < pMem2->u.i ) return -1;
   791         -      if( pMem1->u.i > pMem2->u.i ) return 1;
   792         -      return 0;
   793         -    }
   794         -    if( (f1&MEM_Real)!=0 ){
   795         -      r1 = pMem1->r;
   796         -    }else if( (f1&MEM_Int)!=0 ){
   797         -      r1 = (double)pMem1->u.i;
   798         -    }else{
   799         -      return 1;
   800         -    }
   801         -    if( (f2&MEM_Real)!=0 ){
   802         -      r2 = pMem2->r;
   803         -    }else if( (f2&MEM_Int)!=0 ){
   804         -      r2 = (double)pMem2->u.i;
   805         -    }else{
   806         -      return -1;
   807         -    }
   808         -    if( r1<r2 ) return -1;
   809         -    if( r1>r2 ) return 1;
   810         -    return 0;
   811         -  }
   812         -
   813         -  /* If one value is a string and the other is a blob, the string is less.
   814         -  ** If both are strings, compare using the collating functions.
   815         -  */
   816         -  if( combined_flags&MEM_Str ){
   817         -    if( (f1 & MEM_Str)==0 ){
   818         -      return 1;
   819         -    }
   820         -    if( (f2 & MEM_Str)==0 ){
   821         -      return -1;
   822         -    }
   823         -
   824         -    assert( pMem1->enc==pMem2->enc );
   825         -    assert( pMem1->enc==SQLITE_UTF8 || 
   826         -            pMem1->enc==SQLITE_UTF16LE || pMem1->enc==SQLITE_UTF16BE );
   827         -
   828         -    /* The collation sequence must be defined at this point, even if
   829         -    ** the user deletes the collation sequence after the vdbe program is
   830         -    ** compiled (this was not always the case).
   831         -    */
   832         -    assert( !pColl || pColl->xCmp );
   833         -
   834         -    if( pColl ){
   835         -      if( pMem1->enc==pColl->enc ){
   836         -        /* The strings are already in the correct encoding.  Call the
   837         -        ** comparison function directly */
   838         -        return pColl->xCmp(pColl->pUser,pMem1->n,pMem1->z,pMem2->n,pMem2->z);
   839         -      }else{
   840         -        const void *v1, *v2;
   841         -        int n1, n2;
   842         -        Mem c1;
   843         -        Mem c2;
   844         -        memset(&c1, 0, sizeof(c1));
   845         -        memset(&c2, 0, sizeof(c2));
   846         -        sqlite3VdbeMemShallowCopy(&c1, pMem1, MEM_Ephem);
   847         -        sqlite3VdbeMemShallowCopy(&c2, pMem2, MEM_Ephem);
   848         -        v1 = sqlite3ValueText((sqlite3_value*)&c1, pColl->enc);
   849         -        n1 = v1==0 ? 0 : c1.n;
   850         -        v2 = sqlite3ValueText((sqlite3_value*)&c2, pColl->enc);
   851         -        n2 = v2==0 ? 0 : c2.n;
   852         -        rc = pColl->xCmp(pColl->pUser, n1, v1, n2, v2);
   853         -        sqlite3VdbeMemRelease(&c1);
   854         -        sqlite3VdbeMemRelease(&c2);
   855         -        return rc;
   856         -      }
   857         -    }
   858         -    /* If a NULL pointer was passed as the collate function, fall through
   859         -    ** to the blob case and use memcmp().  */
   860         -  }
   861         - 
   862         -  /* Both values must be blobs.  Compare using memcmp().  */
   863         -  rc = memcmp(pMem1->z, pMem2->z, (pMem1->n>pMem2->n)?pMem2->n:pMem1->n);
   864         -  if( rc==0 ){
   865         -    rc = pMem1->n - pMem2->n;
   866         -  }
   867         -  return rc;
   868         -}
   869         -
   870    757   /*
   871    758   ** Move data out of a btree key or data field and into a Mem structure.
   872    759   ** The data or key is taken from the entry that pCur is currently pointing
   873    760   ** to.  offset and amt determine what portion of the data or key to retrieve.
   874    761   ** key is true to get the key or false to get data.  The result is written
   875    762   ** into the pMem element.
   876    763   **

Changes to src/vdbesort.c.

   405    405       assert( r2->nField>0 );
   406    406       for(i=0; i<r2->nField; i++){
   407    407         if( r2->aMem[i].flags & MEM_Null ){
   408    408           *pRes = -1;
   409    409           return;
   410    410         }
   411    411       }
   412         -    r2->flags |= UNPACKED_PREFIX_MATCH;
          412  +    assert( r2->default_rc==0 );
   413    413     }
   414    414   
   415    415     *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
   416    416   }
   417    417   
   418    418   /*
   419    419   ** This function is called to compare two iterator keys when merging 

Changes to test/pragma.test.

  1571   1571   do_test pragma-20.8 {
  1572   1572     catchsql {PRAGMA data_store_directory}
  1573   1573   } {0 {}}
  1574   1574   
  1575   1575   forcedelete data_dir
  1576   1576   } ;# endif windows
  1577   1577   
         1578  +database_may_be_corrupt
         1579  +
  1578   1580   do_test 21.1 {
  1579   1581     # Create a corrupt database in testerr.db. And a non-corrupt at test.db.
  1580   1582     #
  1581   1583     db close
  1582   1584     forcedelete test.db
  1583   1585     sqlite3 db test.db
  1584   1586     execsql { 
................................................................................
  1676   1678       CREATE TABLE t2(x, y INTEGER REFERENCES t1);
  1677   1679     }
  1678   1680     db2 eval {
  1679   1681       PRAGMA foreign_key_list(t2);
  1680   1682     }
  1681   1683   } {0 0 t1 y {} {NO ACTION} {NO ACTION} NONE}
  1682   1684   
         1685  +database_never_corrupt
  1683   1686   finish_test