SQLite

Check-in [41d03d883c]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add the MemPage.xParseCell method and provide various implementations (variations on the former btreeParseCellPtr()) depending on the page type.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 41d03d883c4f7ca279eb9dd679f3ab81c8d957d9
User & Date: drh 2015-06-19 17:19:34.228
Context
2015-06-19
20:31
Performance improvements in btreeParseCell() by inlining the varint decoder. (check-in: 172a864d14 user: drh tags: trunk)
18:24
Performance improvements in btreeParseCell() by inlining the varint decoder. (check-in: faab0ed928 user: drh tags: btree-opt)
17:19
Add the MemPage.xParseCell method and provide various implementations (variations on the former btreeParseCellPtr()) depending on the page type. (check-in: 41d03d883c user: drh tags: trunk)
15:07
Make cellSizePtr() a method on the MemPage object, with alternative implementations depending on the page type. This results is a small performance improvement and size reduction. (check-in: 02f7e9d7d7 user: drh tags: trunk)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/btree.c.
976
977
978
979
980
981
982









































983

984

985



986
987
















988
989
990
991
992
993
994
995
996
997


998
999
1000
1001

1002


1003



1004
1005
1006
1007
1008
1009
1010















1011

1012

1013



1014


1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063



1064
1065
1066
1067
1068
1069
1070
1071
1072

1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
      iCell--;
    }
  }
  return findCell(pPage, iCell);
}

/*









































** Parse a cell content block and fill in the CellInfo structure.  There

** are two versions of this function.  btreeParseCell() takes a 

** cell index as the second argument and btreeParseCellPtr() 



** takes a pointer to the body of the cell as its second argument.
*/
















static void btreeParseCellPtr(
  MemPage *pPage,         /* Page containing the cell */
  u8 *pCell,              /* Pointer to the cell text. */
  CellInfo *pInfo         /* Fill in this structure */
){
  u8 *pIter;              /* For scanning through pCell */
  u32 nPayload;           /* Number of bytes of cell payload */

  assert( sqlite3_mutex_held(pPage->pBt->mutex) );
  assert( pPage->leaf==0 || pPage->leaf==1 );


  if( pPage->intKeyLeaf ){
    assert( pPage->childPtrSize==0 );
    pIter = pCell + getVarint32(pCell, nPayload);
    pIter += getVarint(pIter, (u64*)&pInfo->nKey);

  }else if( pPage->noPayload ){


    assert( pPage->childPtrSize==4 );



    pInfo->nSize = 4 + getVarint(&pCell[4], (u64*)&pInfo->nKey);
    pInfo->nPayload = 0;
    pInfo->nLocal = 0;
    pInfo->iOverflow = 0;
    pInfo->pPayload = 0;
    return;
  }else{















    pIter = pCell + pPage->childPtrSize;

    pIter += getVarint32(pIter, nPayload);

    pInfo->nKey = nPayload;



  }


  pInfo->nPayload = nPayload;
  pInfo->pPayload = pIter;
  testcase( nPayload==pPage->maxLocal );
  testcase( nPayload==pPage->maxLocal+1 );
  if( nPayload<=pPage->maxLocal ){
    /* This is the (easy) common case where the entire payload fits
    ** on the local page.  No overflow is required.
    */
    pInfo->nSize = nPayload + (u16)(pIter - pCell);
    if( pInfo->nSize<4 ) pInfo->nSize = 4;
    pInfo->nLocal = (u16)nPayload;
    pInfo->iOverflow = 0;
  }else{
    /* If the payload will not fit completely on the local page, we have
    ** to decide how much to store locally and how much to spill onto
    ** overflow pages.  The strategy is to minimize the amount of unused
    ** space on overflow pages while keeping the amount of local storage
    ** in between minLocal and maxLocal.
    **
    ** Warning:  changing the way overflow payload is distributed in any
    ** way will result in an incompatible file format.
    */
    int minLocal;  /* Minimum amount of payload held locally */
    int maxLocal;  /* Maximum amount of payload held locally */
    int surplus;   /* Overflow payload available for local storage */

    minLocal = pPage->minLocal;
    maxLocal = pPage->maxLocal;
    surplus = minLocal + (nPayload - minLocal)%(pPage->pBt->usableSize - 4);
    testcase( surplus==maxLocal );
    testcase( surplus==maxLocal+1 );
    if( surplus <= maxLocal ){
      pInfo->nLocal = (u16)surplus;
    }else{
      pInfo->nLocal = (u16)minLocal;
    }
    pInfo->iOverflow = (u16)(&pInfo->pPayload[pInfo->nLocal] - pCell);
    pInfo->nSize = pInfo->iOverflow + 4;
  }
}
static void btreeParseCell(
  MemPage *pPage,         /* Page containing the cell */
  int iCell,              /* The cell index.  First cell is 0 */
  CellInfo *pInfo         /* Fill in this structure */
){
  btreeParseCellPtr(pPage, findCell(pPage, iCell), pInfo);
}

/*



** Compute the total number of bytes that a Cell needs in the cell
** data area of the btree-page.  The return number includes the cell
** data header and the local payload, but not any overflow page or
** the space used by the cell pointer.
**
** The first implementation, cellSizePtr(), handles pages that contain
** payload, which is to say all index pages and left table pages.  The
** second cellSizePtrNoPayload() implemention is a high-speed version
** for pages that contain no payload - internal table pages.

*/
static u16 cellSizePtr(MemPage *pPage, u8 *pCell){
  u8 *pIter = pCell + pPage->childPtrSize; /* For looping over bytes of pCell */
  u8 *pEnd;                                /* End mark for a varint */
  u32 nSize;                               /* Size value to return */

#ifdef SQLITE_DEBUG
  /* The value returned by this function should always be the same as
  ** the (CellInfo.nSize) value found by doing a full parse of the
  ** cell. If SQLITE_DEBUG is defined, an assert() at the bottom of
  ** this function verifies that this invariant is not violated. */
  CellInfo debuginfo;
  btreeParseCellPtr(pPage, pCell, &debuginfo);
#endif

  assert( pPage->noPayload==0 );
  nSize = *pIter;
  if( nSize>=0x80 ){
    pEnd = &pIter[9];
    nSize &= 0x7f;







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
|
>
|
>
>
>
|

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>










>
>
|
|
|
|
>
|
>
>
|
>
>
>
|
|
|

<
<

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
|
>
|
>
>
>

>
>













<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<







|



>
>
>





<
<
|
<
>












|







976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077


1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117












1118












1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137


1138

1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
      iCell--;
    }
  }
  return findCell(pPage, iCell);
}

/*
** This is common tail processing for btreeParseCellPtr() and
** btreeParseCellPtrIndex() for the case when the cell does not fit entirely
** on a single B-tree page.  Make necessary adjustments to the CellInfo
** structure.
*/
static SQLITE_NOINLINE void btreeParseCellAdjustSizeForOverflow(
  MemPage *pPage,         /* Page containing the cell */
  u8 *pCell,              /* Pointer to the cell text. */
  CellInfo *pInfo         /* Fill in this structure */
){
  /* If the payload will not fit completely on the local page, we have
  ** to decide how much to store locally and how much to spill onto
  ** overflow pages.  The strategy is to minimize the amount of unused
  ** space on overflow pages while keeping the amount of local storage
  ** in between minLocal and maxLocal.
  **
  ** Warning:  changing the way overflow payload is distributed in any
  ** way will result in an incompatible file format.
  */
  int minLocal;  /* Minimum amount of payload held locally */
  int maxLocal;  /* Maximum amount of payload held locally */
  int surplus;   /* Overflow payload available for local storage */

  minLocal = pPage->minLocal;
  maxLocal = pPage->maxLocal;
  surplus = minLocal + (pInfo->nPayload - minLocal)%(pPage->pBt->usableSize-4);
  testcase( surplus==maxLocal );
  testcase( surplus==maxLocal+1 );
  if( surplus <= maxLocal ){
    pInfo->nLocal = (u16)surplus;
  }else{
    pInfo->nLocal = (u16)minLocal;
  }
  pInfo->iOverflow = (u16)(&pInfo->pPayload[pInfo->nLocal] - pCell);
  pInfo->nSize = pInfo->iOverflow + 4;
}

/*
** The following routines are implementations of the MemPage.xParseCell()
** method.
**
** Parse a cell content block and fill in the CellInfo structure.
**
** btreeParseCellPtr()        =>   table btree leaf nodes
** btreeParseCellNoPayload()  =>   table btree internal nodes
** btreeParseCellPtrIndex()   =>   index btree nodes
**
** There is also a wrapper function btreeParseCell() that works for
** all MemPage types and that references the cell by index rather than
** by pointer.
*/
static void btreeParseCellPtrNoPayload(
  MemPage *pPage,         /* Page containing the cell */
  u8 *pCell,              /* Pointer to the cell text. */
  CellInfo *pInfo         /* Fill in this structure */
){
  assert( sqlite3_mutex_held(pPage->pBt->mutex) );
  assert( pPage->leaf==0 );
  assert( pPage->noPayload );
  assert( pPage->childPtrSize==4 );
  pInfo->nSize = 4 + getVarint(&pCell[4], (u64*)&pInfo->nKey);
  pInfo->nPayload = 0;
  pInfo->nLocal = 0;
  pInfo->iOverflow = 0;
  pInfo->pPayload = 0;
  return;
}
static void btreeParseCellPtr(
  MemPage *pPage,         /* Page containing the cell */
  u8 *pCell,              /* Pointer to the cell text. */
  CellInfo *pInfo         /* Fill in this structure */
){
  u8 *pIter;              /* For scanning through pCell */
  u32 nPayload;           /* Number of bytes of cell payload */

  assert( sqlite3_mutex_held(pPage->pBt->mutex) );
  assert( pPage->leaf==0 || pPage->leaf==1 );
  assert( pPage->intKeyLeaf || pPage->noPayload );
  assert( pPage->noPayload==0 );
  assert( pPage->intKeyLeaf );
  assert( pPage->childPtrSize==0 );
  pIter = pCell + getVarint32(pCell, nPayload);
  pIter += getVarint(pIter, (u64*)&pInfo->nKey);
  pInfo->nPayload = nPayload;
  pInfo->pPayload = pIter;
  testcase( nPayload==pPage->maxLocal );
  testcase( nPayload==pPage->maxLocal+1 );
  if( nPayload<=pPage->maxLocal ){
    /* This is the (easy) common case where the entire payload fits
    ** on the local page.  No overflow is required.
    */
    pInfo->nSize = nPayload + (u16)(pIter - pCell);
    if( pInfo->nSize<4 ) pInfo->nSize = 4;
    pInfo->nLocal = (u16)nPayload;
    pInfo->iOverflow = 0;


  }else{
    btreeParseCellAdjustSizeForOverflow(pPage, pCell, pInfo);
  }
}
static void btreeParseCellPtrIndex(
  MemPage *pPage,         /* Page containing the cell */
  u8 *pCell,              /* Pointer to the cell text. */
  CellInfo *pInfo         /* Fill in this structure */
){
  u8 *pIter;              /* For scanning through pCell */
  u32 nPayload;           /* Number of bytes of cell payload */

  assert( sqlite3_mutex_held(pPage->pBt->mutex) );
  assert( pPage->leaf==0 || pPage->leaf==1 );
  assert( pPage->intKeyLeaf==0 );
  assert( pPage->noPayload==0 );
  pIter = pCell + pPage->childPtrSize;
  nPayload = *pIter;
  if( nPayload>=0x80 ){
    u8 *pEnd = &pIter[9];
    nPayload &= 0x7f;
    do{
      nPayload = (nPayload<<7) | (*++pIter & 0x7f);
    }while( *(pIter)>=0x80 && pIter<pEnd );
  }
  pIter++;
  pInfo->nKey = nPayload;
  pInfo->nPayload = nPayload;
  pInfo->pPayload = pIter;
  testcase( nPayload==pPage->maxLocal );
  testcase( nPayload==pPage->maxLocal+1 );
  if( nPayload<=pPage->maxLocal ){
    /* This is the (easy) common case where the entire payload fits
    ** on the local page.  No overflow is required.
    */
    pInfo->nSize = nPayload + (u16)(pIter - pCell);
    if( pInfo->nSize<4 ) pInfo->nSize = 4;
    pInfo->nLocal = (u16)nPayload;
    pInfo->iOverflow = 0;
  }else{












    btreeParseCellAdjustSizeForOverflow(pPage, pCell, pInfo);












  }
}
static void btreeParseCell(
  MemPage *pPage,         /* Page containing the cell */
  int iCell,              /* The cell index.  First cell is 0 */
  CellInfo *pInfo         /* Fill in this structure */
){
  pPage->xParseCell(pPage, findCell(pPage, iCell), pInfo);
}

/*
** The following routines are implementations of the MemPage.xCellSize
** method.
**
** Compute the total number of bytes that a Cell needs in the cell
** data area of the btree-page.  The return number includes the cell
** data header and the local payload, but not any overflow page or
** the space used by the cell pointer.
**


** cellSizePtrNoPayload()    =>   table internal nodes

** cellSizePtr()             =>   all index nodes & table leaf nodes
*/
static u16 cellSizePtr(MemPage *pPage, u8 *pCell){
  u8 *pIter = pCell + pPage->childPtrSize; /* For looping over bytes of pCell */
  u8 *pEnd;                                /* End mark for a varint */
  u32 nSize;                               /* Size value to return */

#ifdef SQLITE_DEBUG
  /* The value returned by this function should always be the same as
  ** the (CellInfo.nSize) value found by doing a full parse of the
  ** cell. If SQLITE_DEBUG is defined, an assert() at the bottom of
  ** this function verifies that this invariant is not violated. */
  CellInfo debuginfo;
  pPage->xParseCell(pPage, pCell, &debuginfo);
#endif

  assert( pPage->noPayload==0 );
  nSize = *pIter;
  if( nSize>=0x80 ){
    pEnd = &pIter[9];
    nSize &= 0x7f;
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140

#ifdef SQLITE_DEBUG
  /* The value returned by this function should always be the same as
  ** the (CellInfo.nSize) value found by doing a full parse of the
  ** cell. If SQLITE_DEBUG is defined, an assert() at the bottom of
  ** this function verifies that this invariant is not violated. */
  CellInfo debuginfo;
  btreeParseCellPtr(pPage, pCell, &debuginfo);
#endif

  assert( pPage->childPtrSize==4 );
  pEnd = pIter + 9;
  while( (*pIter++)&0x80 && pIter<pEnd );
  assert( debuginfo.nSize==(u16)(pIter - pCell) || CORRUPT_DB );
  return (u16)(pIter - pCell);







|







1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207

#ifdef SQLITE_DEBUG
  /* The value returned by this function should always be the same as
  ** the (CellInfo.nSize) value found by doing a full parse of the
  ** cell. If SQLITE_DEBUG is defined, an assert() at the bottom of
  ** this function verifies that this invariant is not violated. */
  CellInfo debuginfo;
  pPage->xParseCell(pPage, pCell, &debuginfo);
#endif

  assert( pPage->childPtrSize==4 );
  pEnd = pIter + 9;
  while( (*pIter++)&0x80 && pIter<pEnd );
  assert( debuginfo.nSize==(u16)(pIter - pCell) || CORRUPT_DB );
  return (u16)(pIter - pCell);
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
** to an overflow page, insert an entry into the pointer-map
** for the overflow page.
*/
static void ptrmapPutOvflPtr(MemPage *pPage, u8 *pCell, int *pRC){
  CellInfo info;
  if( *pRC ) return;
  assert( pCell!=0 );
  btreeParseCellPtr(pPage, pCell, &info);
  if( info.iOverflow ){
    Pgno ovfl = get4byte(&pCell[info.iOverflow]);
    ptrmapPut(pPage->pBt, ovfl, PTRMAP_OVERFLOW1, pPage->pgno, pRC);
  }
}
#endif








|







1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
** to an overflow page, insert an entry into the pointer-map
** for the overflow page.
*/
static void ptrmapPutOvflPtr(MemPage *pPage, u8 *pCell, int *pRC){
  CellInfo info;
  if( *pRC ) return;
  assert( pCell!=0 );
  pPage->xParseCell(pPage, pCell, &info);
  if( info.iOverflow ){
    Pgno ovfl = get4byte(&pCell[info.iOverflow]);
    ptrmapPut(pPage->pBt, ovfl, PTRMAP_OVERFLOW1, pPage->pgno, pRC);
  }
}
#endif

1536
1537
1538
1539
1540
1541
1542

1543
1544
1545
1546

1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559

1560
1561
1562
1563
1564
1565
1566
    /* EVIDENCE-OF: R-20501-61796 A value of 13 means the page is a leaf
    ** table b-tree page. */
    assert( (PTF_LEAFDATA|PTF_INTKEY|PTF_LEAF)==13 );
    pPage->intKey = 1;
    if( pPage->leaf ){
      pPage->intKeyLeaf = 1;
      pPage->noPayload = 0;

    }else{
      pPage->intKeyLeaf = 0;
      pPage->noPayload = 1;
      pPage->xCellSize = cellSizePtrNoPayload;

    }
    pPage->maxLocal = pBt->maxLeaf;
    pPage->minLocal = pBt->minLeaf;
  }else if( flagByte==PTF_ZERODATA ){
    /* EVIDENCE-OF: R-27225-53936 A value of 2 means the page is an interior
    ** index b-tree page. */
    assert( (PTF_ZERODATA)==2 );
    /* EVIDENCE-OF: R-16571-11615 A value of 10 means the page is a leaf
    ** index b-tree page. */
    assert( (PTF_ZERODATA|PTF_LEAF)==10 );
    pPage->intKey = 0;
    pPage->intKeyLeaf = 0;
    pPage->noPayload = 0;

    pPage->maxLocal = pBt->maxLocal;
    pPage->minLocal = pBt->minLocal;
  }else{
    /* EVIDENCE-OF: R-47608-56469 Any other value for the b-tree page type is
    ** an error. */
    return SQLITE_CORRUPT_BKPT;
  }







>




>













>







1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
    /* EVIDENCE-OF: R-20501-61796 A value of 13 means the page is a leaf
    ** table b-tree page. */
    assert( (PTF_LEAFDATA|PTF_INTKEY|PTF_LEAF)==13 );
    pPage->intKey = 1;
    if( pPage->leaf ){
      pPage->intKeyLeaf = 1;
      pPage->noPayload = 0;
      pPage->xParseCell = btreeParseCellPtr;
    }else{
      pPage->intKeyLeaf = 0;
      pPage->noPayload = 1;
      pPage->xCellSize = cellSizePtrNoPayload;
      pPage->xParseCell = btreeParseCellPtrNoPayload;
    }
    pPage->maxLocal = pBt->maxLeaf;
    pPage->minLocal = pBt->minLeaf;
  }else if( flagByte==PTF_ZERODATA ){
    /* EVIDENCE-OF: R-27225-53936 A value of 2 means the page is an interior
    ** index b-tree page. */
    assert( (PTF_ZERODATA)==2 );
    /* EVIDENCE-OF: R-16571-11615 A value of 10 means the page is a leaf
    ** index b-tree page. */
    assert( (PTF_ZERODATA|PTF_LEAF)==10 );
    pPage->intKey = 0;
    pPage->intKeyLeaf = 0;
    pPage->noPayload = 0;
    pPage->xParseCell = btreeParseCellPtrIndex;
    pPage->maxLocal = pBt->maxLocal;
    pPage->minLocal = pBt->minLocal;
  }else{
    /* EVIDENCE-OF: R-47608-56469 Any other value for the b-tree page type is
    ** an error. */
    return SQLITE_CORRUPT_BKPT;
  }
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154
3155
3156
3157
    if( rc ) return rc;
    nCell = pPage->nCell;

    for(i=0; i<nCell; i++){
      u8 *pCell = findCell(pPage, i);
      if( eType==PTRMAP_OVERFLOW1 ){
        CellInfo info;
        btreeParseCellPtr(pPage, pCell, &info);
        if( info.iOverflow
         && pCell+info.iOverflow+3<=pPage->aData+pPage->maskPage
         && iFrom==get4byte(&pCell[info.iOverflow])
        ){
          put4byte(&pCell[info.iOverflow], iTo);
          break;
        }







|







3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
    if( rc ) return rc;
    nCell = pPage->nCell;

    for(i=0; i<nCell; i++){
      u8 *pCell = findCell(pPage, i);
      if( eType==PTRMAP_OVERFLOW1 ){
        CellInfo info;
        pPage->xParseCell(pPage, pCell, &info);
        if( info.iOverflow
         && pCell+info.iOverflow+3<=pPage->aData+pPage->maskPage
         && iFrom==get4byte(&pCell[info.iOverflow])
        ){
          put4byte(&pCell[info.iOverflow], iTo);
          break;
        }
4988
4989
4990
4991
4992
4993
4994
4995
4996
4997
4998
4999
5000
5001
5002
          **
          ** If the record is corrupt, the xRecordCompare routine may read
          ** up to two varints past the end of the buffer. An extra 18 
          ** bytes of padding is allocated at the end of the buffer in
          ** case this happens.  */
          void *pCellKey;
          u8 * const pCellBody = pCell - pPage->childPtrSize;
          btreeParseCellPtr(pPage, pCellBody, &pCur->info);
          nCell = (int)pCur->info.nKey;
          testcase( nCell<0 );   /* True if key size is 2^32 or more */
          testcase( nCell==0 );  /* Invalid key size:  0x80 0x80 0x00 */
          testcase( nCell==1 );  /* Invalid key size:  0x80 0x80 0x01 */
          testcase( nCell==2 );  /* Minimum legal index key size */
          if( nCell<2 ){
            rc = SQLITE_CORRUPT_BKPT;







|







5058
5059
5060
5061
5062
5063
5064
5065
5066
5067
5068
5069
5070
5071
5072
          **
          ** If the record is corrupt, the xRecordCompare routine may read
          ** up to two varints past the end of the buffer. An extra 18 
          ** bytes of padding is allocated at the end of the buffer in
          ** case this happens.  */
          void *pCellKey;
          u8 * const pCellBody = pCell - pPage->childPtrSize;
          pPage->xParseCell(pPage, pCellBody, &pCur->info);
          nCell = (int)pCur->info.nKey;
          testcase( nCell<0 );   /* True if key size is 2^32 or more */
          testcase( nCell==0 );  /* Invalid key size:  0x80 0x80 0x00 */
          testcase( nCell==1 );  /* Invalid key size:  0x80 0x80 0x01 */
          testcase( nCell==2 );  /* Minimum legal index key size */
          if( nCell<2 ){
            rc = SQLITE_CORRUPT_BKPT;
5777
5778
5779
5780
5781
5782
5783
5784
5785
5786
5787
5788
5789
5790
5791
  CellInfo info;
  Pgno ovflPgno;
  int rc;
  int nOvfl;
  u32 ovflPageSize;

  assert( sqlite3_mutex_held(pPage->pBt->mutex) );
  btreeParseCellPtr(pPage, pCell, &info);
  *pnSize = info.nSize;
  if( info.iOverflow==0 ){
    return SQLITE_OK;  /* No overflow pages. Return without doing anything */
  }
  if( pCell+info.iOverflow+3 > pPage->aData+pPage->maskPage ){
    return SQLITE_CORRUPT_BKPT;  /* Cell extends past end of page */
  }







|







5847
5848
5849
5850
5851
5852
5853
5854
5855
5856
5857
5858
5859
5860
5861
  CellInfo info;
  Pgno ovflPgno;
  int rc;
  int nOvfl;
  u32 ovflPageSize;

  assert( sqlite3_mutex_held(pPage->pBt->mutex) );
  pPage->xParseCell(pPage, pCell, &info);
  *pnSize = info.nSize;
  if( info.iOverflow==0 ){
    return SQLITE_OK;  /* No overflow pages. Return without doing anything */
  }
  if( pCell+info.iOverflow+3 > pPage->aData+pPage->maskPage ){
    return SQLITE_CORRUPT_BKPT;  /* Cell extends past end of page */
  }
5931
5932
5933
5934
5935
5936
5937
5938
5939
5940
5941
5942
5943
5944
5945
  **
  ** Use a call to btreeParseCellPtr() to verify that the values above
  ** were computed correctly.
  */
#if SQLITE_DEBUG
  {
    CellInfo info;
    btreeParseCellPtr(pPage, pCell, &info);
    assert( nHeader=(int)(info.pPayload - pCell) );
    assert( info.nKey==nKey );
    assert( *pnSize == info.nSize );
    assert( spaceLeft == info.nLocal );
    assert( pPrior == &pCell[info.iOverflow] );
  }
#endif







|







6001
6002
6003
6004
6005
6006
6007
6008
6009
6010
6011
6012
6013
6014
6015
  **
  ** Use a call to btreeParseCellPtr() to verify that the values above
  ** were computed correctly.
  */
#if SQLITE_DEBUG
  {
    CellInfo info;
    pPage->xParseCell(pPage, pCell, &info);
    assert( nHeader=(int)(info.pPayload - pCell) );
    assert( info.nKey==nKey );
    assert( *pnSize == info.nSize );
    assert( spaceLeft == info.nLocal );
    assert( pPrior == &pCell[info.iOverflow] );
  }
#endif
6580
6581
6582
6583
6584
6585
6586
6587
6588
6589
6590
6591
6592
6593
6594
    assert( pPage->isInit );

    for(j=0; j<pPage->nCell; j++){
      CellInfo info;
      u8 *z;
     
      z = findCell(pPage, j);
      btreeParseCellPtr(pPage, z, &info);
      if( info.iOverflow ){
        Pgno ovfl = get4byte(&z[info.iOverflow]);
        ptrmapGet(pBt, ovfl, &e, &n);
        assert( n==pPage->pgno && e==PTRMAP_OVERFLOW1 );
      }
      if( !pPage->leaf ){
        Pgno child = get4byte(z);







|







6650
6651
6652
6653
6654
6655
6656
6657
6658
6659
6660
6661
6662
6663
6664
    assert( pPage->isInit );

    for(j=0; j<pPage->nCell; j++){
      CellInfo info;
      u8 *z;
     
      z = findCell(pPage, j);
      pPage->xParseCell(pPage, z, &info);
      if( info.iOverflow ){
        Pgno ovfl = get4byte(&z[info.iOverflow]);
        ptrmapGet(pBt, ovfl, &e, &n);
        assert( n==pPage->pgno && e==PTRMAP_OVERFLOW1 );
      }
      if( !pPage->leaf ){
        Pgno child = get4byte(z);
7211
7212
7213
7214
7215
7216
7217
7218
7219
7220
7221
7222
7223
7224
7225
      /* If the tree is a leaf-data tree, and the siblings are leaves, 
      ** then there is no divider cell in apCell[]. Instead, the divider 
      ** cell consists of the integer key for the right-most cell of 
      ** the sibling-page assembled above only.
      */
      CellInfo info;
      j--;
      btreeParseCellPtr(pNew, apCell[j], &info);
      pCell = pTemp;
      sz = 4 + putVarint(&pCell[4], info.nKey);
      pTemp = 0;
    }else{
      pCell -= 4;
      /* Obscure case for non-leaf-data trees: If the cell at pCell was
      ** previously stored on a leaf node, and its reported size was 4







|







7281
7282
7283
7284
7285
7286
7287
7288
7289
7290
7291
7292
7293
7294
7295
      /* If the tree is a leaf-data tree, and the siblings are leaves, 
      ** then there is no divider cell in apCell[]. Instead, the divider 
      ** cell consists of the integer key for the right-most cell of 
      ** the sibling-page assembled above only.
      */
      CellInfo info;
      j--;
      pNew->xParseCell(pNew, apCell[j], &info);
      pCell = pTemp;
      sz = 4 + putVarint(&pCell[4], info.nKey);
      pTemp = 0;
    }else{
      pCell -= 4;
      /* Obscure case for non-leaf-data trees: If the cell at pCell was
      ** previously stored on a leaf node, and its reported size was 4
8711
8712
8713
8714
8715
8716
8717
8718
8719
8720
8721
8722
8723
8724
8725

    /* Check payload overflow pages
    */
    pCheck->zPfx = "On tree page %d cell %d: ";
    pCheck->v1 = iPage;
    pCheck->v2 = i;
    pCell = findCell(pPage,i);
    btreeParseCellPtr(pPage, pCell, &info);
    sz = info.nPayload;
    /* For intKey pages, check that the keys are in order.
    */
    if( pPage->intKey ){
      if( i==0 ){
        nMinKey = nMaxKey = info.nKey;
      }else if( info.nKey <= nMaxKey ){







|







8781
8782
8783
8784
8785
8786
8787
8788
8789
8790
8791
8792
8793
8794
8795

    /* Check payload overflow pages
    */
    pCheck->zPfx = "On tree page %d cell %d: ";
    pCheck->v1 = iPage;
    pCheck->v2 = i;
    pCell = findCell(pPage,i);
    pPage->xParseCell(pPage, pCell, &info);
    sz = info.nPayload;
    /* For intKey pages, check that the keys are in order.
    */
    if( pPage->intKey ){
      if( i==0 ){
        nMinKey = nMaxKey = info.nKey;
      }else if( info.nKey <= nMaxKey ){
Changes to src/btreeInt.h.
227
228
229
230
231
232
233

234
235
236
237
238
239
240
** small cells will be rare, but they are possible.
*/
#define MX_CELL(pBt) ((pBt->pageSize-8)/6)

/* Forward declarations */
typedef struct MemPage MemPage;
typedef struct BtLock BtLock;


/*
** This is a magic string that appears at the beginning of every
** SQLite database in order to identify the file as a real database.
**
** You can change this value at compile-time by specifying a
** -DSQLITE_FILE_HEADER="..." on the compiler command-line.  The







>







227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
** small cells will be rare, but they are possible.
*/
#define MX_CELL(pBt) ((pBt->pageSize-8)/6)

/* Forward declarations */
typedef struct MemPage MemPage;
typedef struct BtLock BtLock;
typedef struct CellInfo CellInfo;

/*
** This is a magic string that appears at the beginning of every
** SQLite database in order to identify the file as a real database.
**
** You can change this value at compile-time by specifying a
** -DSQLITE_FILE_HEADER="..." on the compiler command-line.  The
291
292
293
294
295
296
297
298

299
300
301
302
303
304
305
                       ** non-overflow cell */
  u8 *apOvfl[5];       /* Pointers to the body of overflow cells */
  BtShared *pBt;       /* Pointer to BtShared that this page is part of */
  u8 *aData;           /* Pointer to disk image of the page data */
  u8 *aDataEnd;        /* One byte past the end of usable data */
  u8 *aCellIdx;        /* The cell index area */
  DbPage *pDbPage;     /* Pager page handle */
  u16 (*xCellSize)(MemPage*,u8*);  /* cellSizePtr method */

  Pgno pgno;           /* Page number for this page */
};

/*
** The in-memory image of a disk page has the auxiliary information appended
** to the end.  EXTRA_SIZE is the number of bytes of space needed to hold
** that extra information.







|
>







292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
                       ** non-overflow cell */
  u8 *apOvfl[5];       /* Pointers to the body of overflow cells */
  BtShared *pBt;       /* Pointer to BtShared that this page is part of */
  u8 *aData;           /* Pointer to disk image of the page data */
  u8 *aDataEnd;        /* One byte past the end of usable data */
  u8 *aCellIdx;        /* The cell index area */
  DbPage *pDbPage;     /* Pager page handle */
  u16 (*xCellSize)(MemPage*,u8*);             /* cellSizePtr method */
  void (*xParseCell)(MemPage*,u8*,CellInfo*); /* btreeParseCell method */
  Pgno pgno;           /* Page number for this page */
};

/*
** The in-memory image of a disk page has the auxiliary information appended
** to the end.  EXTRA_SIZE is the number of bytes of space needed to hold
** that extra information.
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
#define BTS_PENDING          0x0040   /* Waiting for read-locks to clear */

/*
** An instance of the following structure is used to hold information
** about a cell.  The parseCellPtr() function fills in this structure
** based on information extract from the raw disk page.
*/
typedef struct CellInfo CellInfo;
struct CellInfo {
  i64 nKey;      /* The key for INTKEY tables, or nPayload otherwise */
  u8 *pPayload;  /* Pointer to the start of payload */
  u32 nPayload;  /* Bytes of payload */
  u16 nLocal;    /* Amount of payload held locally, not on overflow */
  u16 iOverflow; /* Offset to overflow page number.  Zero if no overflow */
  u16 nSize;     /* Size of the cell content on the main b-tree page */







<







459
460
461
462
463
464
465

466
467
468
469
470
471
472
#define BTS_PENDING          0x0040   /* Waiting for read-locks to clear */

/*
** An instance of the following structure is used to hold information
** about a cell.  The parseCellPtr() function fills in this structure
** based on information extract from the raw disk page.
*/

struct CellInfo {
  i64 nKey;      /* The key for INTKEY tables, or nPayload otherwise */
  u8 *pPayload;  /* Pointer to the start of payload */
  u32 nPayload;  /* Bytes of payload */
  u16 nLocal;    /* Amount of payload held locally, not on overflow */
  u16 iOverflow; /* Offset to overflow page number.  Zero if no overflow */
  u16 nSize;     /* Size of the cell content on the main b-tree page */