/ Check-in [43844537]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Avoid computing cell sizes in balance_nonroot() until they are really needed. This gives an overall 1.7% performance gain for about 1000 extra bytes of code space.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | btree-opt2
Files: files | file ages | folders
SHA1: 43844537e8a372953386663f8177202901ba7566
User & Date: drh 2015-06-23 02:37:30
Context
2015-06-23
13:02
Merge the compound SELECT operator fix from trunk. check-in: a7be554f user: drh tags: btree-opt2
02:37
Avoid computing cell sizes in balance_nonroot() until they are really needed. This gives an overall 1.7% performance gain for about 1000 extra bytes of code space. check-in: 43844537 user: drh tags: btree-opt2
2015-06-22
20:02
Change the way that balance_nonroot() partitions cells between the sibling pages such that a scan of the cell size array is not required. check-in: 16872871 user: drh tags: btree-opt2
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btree.c.

  6278   6278         ** the entry for the overflow page into the pointer map.
  6279   6279         */
  6280   6280         ptrmapPutOvflPtr(pPage, pCell, pRC);
  6281   6281       }
  6282   6282   #endif
  6283   6283     }
  6284   6284   }
         6285  +
         6286  +/*
         6287  +** A CellArray object contains a cache of pointers and sizes for a
         6288  +** consecutive sequence of cells that might be held multiple pages.
         6289  +*/
         6290  +typedef struct CellArray CellArray;
         6291  +struct CellArray {
         6292  +  int nCell;              /* Number of cells in apCell[] */
         6293  +  MemPage *pRef;          /* Reference page */
         6294  +  u8 **apCell;            /* All cells begin balanced */
         6295  +  u16 *szCell;            /* Local size of all cells in apCell[] */
         6296  +};
         6297  +
         6298  +/*
         6299  +** Make sure the cell sizes at idx, idx+1, ..., idx+N-1 have been
         6300  +** computed.
         6301  +*/
         6302  +static void populateCellCache(CellArray *p, int idx, int N){
         6303  +  assert( idx>=0 && idx+N<=p->nCell );
         6304  +  while( N>0 ){
         6305  +    assert( p->apCell[idx]!=0 );
         6306  +    if( p->szCell[idx]==0 ){
         6307  +      p->szCell[idx] = p->pRef->xCellSize(p->pRef, p->apCell[idx]);
         6308  +    }else{
         6309  +      assert( CORRUPT_DB ||
         6310  +              p->szCell[idx]==p->pRef->xCellSize(p->pRef, p->apCell[idx]) );
         6311  +    }
         6312  +    idx++;
         6313  +    N--;
         6314  +  }
         6315  +}
         6316  +
         6317  +/*
         6318  +** Return the size of the Nth element of the cell array
         6319  +*/
         6320  +static SQLITE_NOINLINE u16 computeCellSize(CellArray *p, int N){
         6321  +  assert( N>=0 && N<p->nCell );
         6322  +  assert( p->szCell[N]==0 );
         6323  +  p->szCell[N] = p->pRef->xCellSize(p->pRef, p->apCell[N]);
         6324  +  return p->szCell[N];
         6325  +}
         6326  +static u16 cachedCellSize(CellArray *p, int N){
         6327  +  assert( N>=0 && N<p->nCell );
         6328  +  if( p->szCell[N] ) return p->szCell[N];
         6329  +  return computeCellSize(p, N);
         6330  +}
  6285   6331   
  6286   6332   /*
  6287   6333   ** Array apCell[] contains pointers to nCell b-tree page cells. The 
  6288   6334   ** szCell[] array contains the size in bytes of each cell. This function
  6289   6335   ** replaces the current contents of page pPg with the contents of the cell
  6290   6336   ** array.
  6291   6337   **
................................................................................
  6458   6504   ** responsibility of the caller to set it correctly.
  6459   6505   */
  6460   6506   static int editPage(
  6461   6507     MemPage *pPg,                   /* Edit this page */
  6462   6508     int iOld,                       /* Index of first cell currently on page */
  6463   6509     int iNew,                       /* Index of new first cell on page */
  6464   6510     int nNew,                       /* Final number of cells on page */
  6465         -  u8 **apCell,                    /* Array of cells */
  6466         -  u16 *szCell                     /* Array of cell sizes */
         6511  +  CellArray *pCArray              /* Array of cells and sizes */
  6467   6512   ){
  6468   6513     u8 * const aData = pPg->aData;
  6469   6514     const int hdr = pPg->hdrOffset;
  6470   6515     u8 *pBegin = &pPg->aCellIdx[nNew * 2];
  6471   6516     int nCell = pPg->nCell;       /* Cells stored on pPg */
  6472   6517     u8 *pData;
  6473   6518     u8 *pCellptr;
................................................................................
  6478   6523   #ifdef SQLITE_DEBUG
  6479   6524     u8 *pTmp = sqlite3PagerTempSpace(pPg->pBt->pPager);
  6480   6525     memcpy(pTmp, aData, pPg->pBt->usableSize);
  6481   6526   #endif
  6482   6527   
  6483   6528     /* Remove cells from the start and end of the page */
  6484   6529     if( iOld<iNew ){
  6485         -    int nShift = pageFreeArray(
  6486         -        pPg, iNew-iOld, &apCell[iOld], &szCell[iOld]
         6530  +    int nShift;
         6531  +    populateCellCache(pCArray, iOld, iNew-iOld);
         6532  +    nShift = pageFreeArray(
         6533  +        pPg, iNew-iOld, &pCArray->apCell[iOld], &pCArray->szCell[iOld]
  6487   6534       );
  6488   6535       memmove(pPg->aCellIdx, &pPg->aCellIdx[nShift*2], nCell*2);
  6489   6536       nCell -= nShift;
  6490   6537     }
  6491   6538     if( iNewEnd < iOldEnd ){
         6539  +    populateCellCache(pCArray, iNewEnd, iOldEnd-iNewEnd);
  6492   6540       nCell -= pageFreeArray(
  6493         -        pPg, iOldEnd-iNewEnd, &apCell[iNewEnd], &szCell[iNewEnd]
         6541  +        pPg, iOldEnd-iNewEnd,
         6542  +        &pCArray->apCell[iNewEnd], &pCArray->szCell[iNewEnd]
  6494   6543       );
  6495   6544     }
  6496   6545   
  6497   6546     pData = &aData[get2byteNotZero(&aData[hdr+5])];
  6498   6547     if( pData<pBegin ) goto editpage_fail;
  6499   6548   
  6500   6549     /* Add cells to the start of the page */
  6501   6550     if( iNew<iOld ){
  6502   6551       int nAdd = MIN(nNew,iOld-iNew);
  6503   6552       assert( (iOld-iNew)<nNew || nCell==0 || CORRUPT_DB );
  6504   6553       pCellptr = pPg->aCellIdx;
  6505   6554       memmove(&pCellptr[nAdd*2], pCellptr, nCell*2);
         6555  +    populateCellCache(pCArray, iNew, nAdd);
  6506   6556       if( pageInsertArray(
  6507   6557             pPg, pBegin, &pData, pCellptr,
  6508         -          nAdd, &apCell[iNew], &szCell[iNew]
         6558  +          nAdd, &pCArray->apCell[iNew], &pCArray->szCell[iNew]
  6509   6559       ) ) goto editpage_fail;
  6510   6560       nCell += nAdd;
  6511   6561     }
  6512   6562   
  6513   6563     /* Add any overflow cells */
  6514   6564     for(i=0; i<pPg->nOverflow; i++){
  6515   6565       int iCell = (iOld + pPg->aiOvfl[i]) - iNew;
  6516   6566       if( iCell>=0 && iCell<nNew ){
  6517   6567         pCellptr = &pPg->aCellIdx[iCell * 2];
  6518   6568         memmove(&pCellptr[2], pCellptr, (nCell - iCell) * 2);
  6519   6569         nCell++;
         6570  +      (void)cachedCellSize(pCArray, iCell + iNew);
  6520   6571         if( pageInsertArray(
  6521   6572               pPg, pBegin, &pData, pCellptr,
  6522         -            1, &apCell[iCell + iNew], &szCell[iCell + iNew]
         6573  +            1, &pCArray->apCell[iCell + iNew], &pCArray->szCell[iCell + iNew]
  6523   6574         ) ) goto editpage_fail;
  6524   6575       }
  6525   6576     }
  6526   6577   
  6527   6578     /* Append cells to the end of the page */
  6528   6579     pCellptr = &pPg->aCellIdx[nCell*2];
         6580  +  populateCellCache(pCArray, iNew+nCell, nNew-nCell);
  6529   6581     if( pageInsertArray(
  6530   6582           pPg, pBegin, &pData, pCellptr,
  6531         -        nNew-nCell, &apCell[iNew+nCell], &szCell[iNew+nCell]
         6583  +        nNew-nCell, &pCArray->apCell[iNew+nCell], &pCArray->szCell[iNew+nCell]
  6532   6584     ) ) goto editpage_fail;
  6533   6585   
  6534   6586     pPg->nCell = nNew;
  6535   6587     pPg->nOverflow = 0;
  6536   6588   
  6537   6589     put2byte(&aData[hdr+3], pPg->nCell);
  6538   6590     put2byte(&aData[hdr+5], pData - aData);
  6539   6591   
  6540   6592   #ifdef SQLITE_DEBUG
  6541   6593     for(i=0; i<nNew && !CORRUPT_DB; i++){
  6542         -    u8 *pCell = apCell[i+iNew];
         6594  +    u8 *pCell = pCArray->apCell[i+iNew];
  6543   6595       int iOff = get2byte(&pPg->aCellIdx[i*2]);
  6544   6596       if( pCell>=aData && pCell<&aData[pPg->pBt->usableSize] ){
  6545   6597         pCell = &pTmp[pCell - aData];
  6546   6598       }
  6547         -    assert( 0==memcmp(pCell, &aData[iOff], szCell[i+iNew]) );
         6599  +    assert( 0==memcmp(pCell, &aData[iOff],
         6600  +            pCArray->pRef->xCellSize(pCArray->pRef, pCArray->apCell[i+iNew])) );
  6548   6601     }
  6549   6602   #endif
  6550   6603   
  6551   6604     return SQLITE_OK;
  6552   6605    editpage_fail:
  6553   6606     /* Unable to edit this page. Rebuild it from scratch instead. */
  6554         -  return rebuildPage(pPg, nNew, &apCell[iNew], &szCell[iNew]);
         6607  +  populateCellCache(pCArray, iNew, nNew);
         6608  +  return rebuildPage(pPg, nNew, &pCArray->apCell[iNew], &pCArray->szCell[iNew]);
  6555   6609   }
  6556   6610   
  6557   6611   /*
  6558   6612   ** The following parameters determine how many adjacent pages get involved
  6559   6613   ** in a balancing operation.  NN is the number of neighbors on either side
  6560   6614   ** of the page that participate in the balancing operation.  NB is the
  6561   6615   ** total number of pages that participate, including the target page and
................................................................................
  6824   6878     MemPage *pParent,               /* Parent page of siblings being balanced */
  6825   6879     int iParentIdx,                 /* Index of "the page" in pParent */
  6826   6880     u8 *aOvflSpace,                 /* page-size bytes of space for parent ovfl */
  6827   6881     int isRoot,                     /* True if pParent is a root-page */
  6828   6882     int bBulk                       /* True if this call is part of a bulk load */
  6829   6883   ){
  6830   6884     BtShared *pBt;               /* The whole database */
  6831         -  int nCell = 0;               /* Number of cells in apCell[] */
  6832   6885     int nMaxCells = 0;           /* Allocated size of apCell, szCell, aFrom. */
  6833   6886     int nNew = 0;                /* Number of pages in apNew[] */
  6834   6887     int nOld;                    /* Number of pages in apOld[] */
  6835   6888     int i, j, k;                 /* Loop counters */
  6836   6889     int nxDiv;                   /* Next divider slot in pParent->aCell[] */
  6837   6890     int rc = SQLITE_OK;          /* The return code */
  6838   6891     u16 leafCorrection;          /* 4 if pPage is a leaf.  0 if not */
................................................................................
  6842   6895     int iSpace1 = 0;             /* First unused byte of aSpace1[] */
  6843   6896     int iOvflSpace = 0;          /* First unused byte of aOvflSpace[] */
  6844   6897     int szScratch;               /* Size of scratch memory requested */
  6845   6898     MemPage *apOld[NB];          /* pPage and up to two siblings */
  6846   6899     MemPage *apNew[NB+2];        /* pPage and up to NB siblings after balancing */
  6847   6900     u8 *pRight;                  /* Location in parent of right-sibling pointer */
  6848   6901     u8 *apDiv[NB-1];             /* Divider cells in pParent */
  6849         -  int cntNew[NB+2];            /* Index in aCell[] of cell after i-th page */
  6850         -  int cntOld[NB+2];            /* Old index in aCell[] after i-th page */
         6902  +  int cntNew[NB+2];            /* Index in b.paCell[] of cell after i-th page */
         6903  +  int cntOld[NB+2];            /* Old index in b.apCell[] */
  6851   6904     int szNew[NB+2];             /* Combined size of cells placed on i-th page */
  6852         -  u8 **apCell = 0;             /* All cells begin balanced */
  6853         -  u16 *szCell;                 /* Local size of all cells in apCell[] */
  6854   6905     u8 *aSpace1;                 /* Space for copies of dividers cells */
  6855   6906     Pgno pgno;                   /* Temp var to store a page number in */
  6856   6907     u8 abDone[NB+2];             /* True after i'th new page is populated */
  6857   6908     Pgno aPgno[NB+2];            /* Page numbers of new pages before shuffling */
  6858   6909     Pgno aPgOrder[NB+2];         /* Copy of aPgno[] used for sorting pages */
  6859   6910     u16 aPgFlags[NB+2];          /* flags field of new pages before shuffling */
         6911  +  CellArray b;                  /* Parsed information on cells being balanced */
  6860   6912   
  6861   6913     memset(abDone, 0, sizeof(abDone));
         6914  +  b.nCell = 0;
         6915  +  b.apCell = 0;
  6862   6916     pBt = pParent->pBt;
  6863   6917     assert( sqlite3_mutex_held(pBt->mutex) );
  6864   6918     assert( sqlite3PagerIswriteable(pParent->pDbPage) );
  6865   6919   
  6866   6920   #if 0
  6867   6921     TRACE(("BALANCE: begin page %d child of %d\n", pPage->pgno, pParent->pgno));
  6868   6922   #endif
................................................................................
  6963   7017     ** alignment */
  6964   7018     nMaxCells = (nMaxCells + 3)&~3;
  6965   7019   
  6966   7020     /*
  6967   7021     ** Allocate space for memory structures
  6968   7022     */
  6969   7023     szScratch =
  6970         -       nMaxCells*sizeof(u8*)                       /* apCell */
  6971         -     + nMaxCells*sizeof(u16)                       /* szCell */
         7024  +       nMaxCells*sizeof(u8*)                       /* b.apCell */
         7025  +     + nMaxCells*sizeof(u16)                       /* b.szCell */
  6972   7026        + pBt->pageSize;                              /* aSpace1 */
  6973   7027   
  6974   7028     /* EVIDENCE-OF: R-28375-38319 SQLite will never request a scratch buffer
  6975   7029     ** that is more than 6 times the database page size. */
  6976   7030     assert( szScratch<=6*(int)pBt->pageSize );
  6977         -  apCell = sqlite3ScratchMalloc( szScratch ); 
  6978         -  if( apCell==0 ){
         7031  +  b.apCell = sqlite3ScratchMalloc( szScratch ); 
         7032  +  if( b.apCell==0 ){
  6979   7033       rc = SQLITE_NOMEM;
  6980   7034       goto balance_cleanup;
  6981   7035     }
  6982         -  szCell = (u16*)&apCell[nMaxCells];
  6983         -  aSpace1 = (u8*)&szCell[nMaxCells];
         7036  +  b.szCell = (u16*)&b.apCell[nMaxCells];
         7037  +  aSpace1 = (u8*)&b.szCell[nMaxCells];
  6984   7038     assert( EIGHT_BYTE_ALIGNMENT(aSpace1) );
  6985   7039   
  6986   7040     /*
  6987   7041     ** Load pointers to all cells on sibling pages and the divider cells
  6988         -  ** into the local apCell[] array.  Make copies of the divider cells
         7042  +  ** into the local b.apCell[] array.  Make copies of the divider cells
  6989   7043     ** into space obtained from aSpace1[]. The divider cells have already
  6990   7044     ** been removed from pParent.
  6991   7045     **
  6992   7046     ** If the siblings are on leaf pages, then the child pointers of the
  6993   7047     ** divider cells are stripped from the cells before they are copied
  6994         -  ** into aSpace1[].  In this way, all cells in apCell[] are without
         7048  +  ** into aSpace1[].  In this way, all cells in b.apCell[] are without
  6995   7049     ** child pointers.  If siblings are not leaves, then all cell in
  6996         -  ** apCell[] include child pointers.  Either way, all cells in apCell[]
         7050  +  ** b.apCell[] include child pointers.  Either way, all cells in b.apCell[]
  6997   7051     ** are alike.
  6998   7052     **
  6999   7053     ** leafCorrection:  4 if pPage is a leaf.  0 if pPage is not a leaf.
  7000   7054     **       leafData:  1 if pPage holds key+data and pParent holds only keys.
  7001   7055     */
  7002         -  leafCorrection = apOld[0]->leaf*4;
  7003         -  leafData = apOld[0]->intKeyLeaf;
         7056  +  b.pRef = apOld[0];
         7057  +  leafCorrection = b.pRef->leaf*4;
         7058  +  leafData = b.pRef->intKeyLeaf;
  7004   7059     for(i=0; i<nOld; i++){
  7005   7060       int limit;
  7006   7061       MemPage *pOld = apOld[i];
  7007   7062   
  7008   7063       /* Verify that all sibling pages are of the same "type" (table-leaf,
  7009   7064       ** table-interior, index-leaf, or index-interior).
  7010   7065       */
  7011   7066       if( pOld->aData[0]!=apOld[0]->aData[0] ){
  7012   7067         rc = SQLITE_CORRUPT_BKPT;
  7013   7068         goto balance_cleanup;
  7014   7069       }
  7015   7070   
  7016   7071       limit = pOld->nCell+pOld->nOverflow;
         7072  +    memset(&b.szCell[b.nCell], 0, sizeof(b.szCell[0])*limit);
  7017   7073       if( pOld->nOverflow>0 ){
  7018   7074         for(j=0; j<limit; j++){
  7019         -        assert( nCell<nMaxCells );
  7020         -        apCell[nCell] = findOverflowCell(pOld, j);
  7021         -        szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]);
  7022         -        nCell++;
         7075  +        assert( b.nCell<nMaxCells );
         7076  +        b.apCell[b.nCell] = findOverflowCell(pOld, j);
         7077  +        b.nCell++;
  7023   7078         }
  7024   7079       }else{
  7025   7080         u8 *aData = pOld->aData;
  7026   7081         u16 maskPage = pOld->maskPage;
  7027   7082         u16 cellOffset = pOld->cellOffset;
  7028   7083         for(j=0; j<limit; j++){
  7029         -        assert( nCell<nMaxCells );
  7030         -        apCell[nCell] = findCellv2(aData, maskPage, cellOffset, j);
  7031         -        szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]);
  7032         -        nCell++;
         7084  +        assert( b.nCell<nMaxCells );
         7085  +        b.apCell[b.nCell] = findCellv2(aData, maskPage, cellOffset, j);
         7086  +        b.nCell++;
  7033   7087         }
  7034         -    }       
  7035         -    cntOld[i] = nCell;
         7088  +    }
         7089  +    cntOld[i] = b.nCell;
  7036   7090       if( i<nOld-1 && !leafData){
  7037   7091         u16 sz = (u16)szNew[i];
  7038   7092         u8 *pTemp;
  7039         -      assert( nCell<nMaxCells );
  7040         -      szCell[nCell] = sz;
         7093  +      assert( b.nCell<nMaxCells );
         7094  +      b.szCell[b.nCell] = sz;
  7041   7095         pTemp = &aSpace1[iSpace1];
  7042   7096         iSpace1 += sz;
  7043   7097         assert( sz<=pBt->maxLocal+23 );
  7044   7098         assert( iSpace1 <= (int)pBt->pageSize );
  7045   7099         memcpy(pTemp, apDiv[i], sz);
  7046         -      apCell[nCell] = pTemp+leafCorrection;
         7100  +      b.apCell[b.nCell] = pTemp+leafCorrection;
  7047   7101         assert( leafCorrection==0 || leafCorrection==4 );
  7048         -      szCell[nCell] = szCell[nCell] - leafCorrection;
         7102  +      b.szCell[b.nCell] = b.szCell[b.nCell] - leafCorrection;
  7049   7103         if( !pOld->leaf ){
  7050   7104           assert( leafCorrection==0 );
  7051   7105           assert( pOld->hdrOffset==0 );
  7052   7106           /* The right pointer of the child page pOld becomes the left
  7053   7107           ** pointer of the divider cell */
  7054         -        memcpy(apCell[nCell], &pOld->aData[8], 4);
         7108  +        memcpy(b.apCell[b.nCell], &pOld->aData[8], 4);
  7055   7109         }else{
  7056   7110           assert( leafCorrection==4 );
  7057         -        while( szCell[nCell]<4 ){
         7111  +        while( b.szCell[b.nCell]<4 ){
  7058   7112             /* Do not allow any cells smaller than 4 bytes. If a smaller cell
  7059   7113             ** does exist, pad it with 0x00 bytes. */
  7060         -          assert( szCell[nCell]==3 || CORRUPT_DB );
  7061         -          assert( apCell[nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB );
         7114  +          assert( b.szCell[b.nCell]==3 || CORRUPT_DB );
         7115  +          assert( b.apCell[b.nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB );
  7062   7116             aSpace1[iSpace1++] = 0x00;
  7063         -          szCell[nCell]++;
         7117  +          b.szCell[b.nCell]++;
  7064   7118           }
  7065   7119         }
  7066         -      nCell++;
         7120  +      b.nCell++;
  7067   7121       }
  7068   7122     }
  7069   7123   
  7070   7124     /*
  7071         -  ** Figure out the number of pages needed to hold all nCell cells.
         7125  +  ** Figure out the number of pages needed to hold all b.nCell cells.
  7072   7126     ** Store this number in "k".  Also compute szNew[] which is the total
  7073   7127     ** size of all cells on the i-th page and cntNew[] which is the index
  7074         -  ** in apCell[] of the cell that divides page i from page i+1.  
  7075         -  ** cntNew[k] should equal nCell.
         7128  +  ** in b.apCell[] of the cell that divides page i from page i+1.  
         7129  +  ** cntNew[k] should equal b.nCell.
  7076   7130     **
  7077   7131     ** Values computed by this block:
  7078   7132     **
  7079   7133     **           k: The total number of sibling pages
  7080   7134     **    szNew[i]: Spaced used on the i-th sibling page.
  7081         -  **   cntNew[i]: Index in apCell[] and szCell[] for the first cell to
         7135  +  **   cntNew[i]: Index in b.apCell[] and b.szCell[] for the first cell to
  7082   7136     **              the right of the i-th sibling page.
  7083   7137     ** usableSpace: Number of bytes of space available on each sibling.
  7084   7138     ** 
  7085   7139     */
  7086   7140     usableSpace = pBt->usableSize - 12 + leafCorrection;
  7087   7141     for(i=0; i<nOld; i++){
  7088   7142       MemPage *p = apOld[i];
................................................................................
  7097   7151     for(i=0; i<k; i++){
  7098   7152       int sz;
  7099   7153       while( szNew[i]>usableSpace ){
  7100   7154         if( i+1>=k ){
  7101   7155           k = i+2;
  7102   7156           if( k>NB+2 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; }
  7103   7157           szNew[k-1] = 0;
  7104         -        cntNew[k-1] = nCell;
         7158  +        cntNew[k-1] = b.nCell;
  7105   7159         }
  7106         -      sz = 2+szCell[cntNew[i]-1];
         7160  +      sz = 2 + cachedCellSize(&b, cntNew[i]-1);
  7107   7161         szNew[i] -= sz;
  7108   7162         if( !leafData ){
  7109         -        sz = cntNew[i]<nCell ? 2+szCell[cntNew[i]] : 0;
         7163  +        if( cntNew[i]<b.nCell ){
         7164  +          sz = 2 + cachedCellSize(&b, cntNew[i]);
         7165  +        }else{
         7166  +          sz = 0;
         7167  +        }
  7110   7168         }
  7111   7169         szNew[i+1] += sz;
  7112   7170         cntNew[i]--;
  7113   7171       }
  7114         -    while( cntNew[i]<nCell ){
  7115         -      sz = 2+szCell[cntNew[i]];
         7172  +    while( cntNew[i]<b.nCell ){
         7173  +      sz = 2 + cachedCellSize(&b, cntNew[i]);
  7116   7174         if( szNew[i]+sz>usableSpace ) break;
  7117   7175         szNew[i] += sz;
  7118   7176         cntNew[i]++;
  7119   7177         if( !leafData ){
  7120         -        sz = cntNew[i]<nCell ? 2+szCell[cntNew[i]] : 0;
         7178  +        if( cntNew[i]<b.nCell ){
         7179  +          sz = 2 + cachedCellSize(&b, cntNew[i]);
         7180  +        }else{
         7181  +          sz = 0;
         7182  +        }
  7121   7183         }
  7122   7184         szNew[i+1] -= sz;
  7123   7185       }
  7124         -    if( cntNew[i]>=nCell ){
         7186  +    if( cntNew[i]>=b.nCell ){
  7125   7187         k = i+1;
  7126   7188       }else if( cntNew[i] - (i>0 ? cntNew[i-1] : 0) <= 0 ){
  7127   7189         rc = SQLITE_CORRUPT_BKPT;
  7128   7190         goto balance_cleanup;
  7129   7191       }
  7130   7192     }
  7131   7193   
................................................................................
  7142   7204     */
  7143   7205     for(i=k-1; i>0; i--){
  7144   7206       int szRight = szNew[i];  /* Size of sibling on the right */
  7145   7207       int szLeft = szNew[i-1]; /* Size of sibling on the left */
  7146   7208       int r;              /* Index of right-most cell in left sibling */
  7147   7209       int d;              /* Index of first cell to the left of right sibling */
  7148   7210   
  7149         -    r = cntNew[i-1] - 1;
  7150         -    d = r + 1 - leafData;
  7151         -    assert( d<nMaxCells );
  7152         -    assert( r<nMaxCells );
  7153         -    while( szRight==0 
  7154         -       || (!bBulk && szRight+szCell[d]+2<=szLeft-(szCell[r]+2)) 
  7155         -    ){
  7156         -      szRight += szCell[d] + 2;
  7157         -      szLeft -= szCell[r] + 2;
         7211  +    while(1){
         7212  +      r = cntNew[i-1] - 1;
         7213  +      d = r + 1 - leafData;
         7214  +      assert( d<nMaxCells );
         7215  +      assert( r<nMaxCells );
         7216  +      (void)cachedCellSize(&b, d);
         7217  +      (void)cachedCellSize(&b, r);
         7218  +      if( szRight!=0
         7219  +       && (bBulk || szRight+b.szCell[d]+2 > szLeft-(b.szCell[r]+2)) ){
         7220  +        break;
         7221  +      }
         7222  +      szRight += b.szCell[d] + 2;
         7223  +      szLeft -= b.szCell[r] + 2;
  7158   7224         cntNew[i-1]--;
  7159   7225         if( cntNew[i-1] <= 0 ){
  7160   7226           rc = SQLITE_CORRUPT_BKPT;
  7161   7227           goto balance_cleanup;
  7162   7228         }
  7163         -      r = cntNew[i-1] - 1;
  7164         -      d = r + 1 - leafData;
  7165   7229       }
  7166   7230       szNew[i] = szRight;
  7167   7231       szNew[i-1] = szLeft;
  7168   7232     }
  7169   7233   
  7170   7234     /* Sanity check:  For a non-corrupt database file one of the follwing
  7171   7235     ** must be true:
................................................................................
  7196   7260       }else{
  7197   7261         assert( i>0 );
  7198   7262         rc = allocateBtreePage(pBt, &pNew, &pgno, (bBulk ? 1 : pgno), 0);
  7199   7263         if( rc ) goto balance_cleanup;
  7200   7264         zeroPage(pNew, pageFlags);
  7201   7265         apNew[i] = pNew;
  7202   7266         nNew++;
  7203         -      cntOld[i] = nCell;
         7267  +      cntOld[i] = b.nCell;
  7204   7268   
  7205   7269         /* Set the pointer-map entry for the new sibling page. */
  7206   7270         if( ISAUTOVACUUM ){
  7207   7271           ptrmapPut(pBt, pNew->pgno, PTRMAP_BTREE, pParent->pgno, &rc);
  7208   7272           if( rc!=SQLITE_OK ){
  7209   7273             goto balance_cleanup;
  7210   7274           }
................................................................................
  7301   7365       MemPage *pNew = apNew[0];
  7302   7366       u8 *aOld = pNew->aData;
  7303   7367       int cntOldNext = pNew->nCell + pNew->nOverflow;
  7304   7368       int usableSize = pBt->usableSize;
  7305   7369       int iNew = 0;
  7306   7370       int iOld = 0;
  7307   7371   
  7308         -    for(i=0; i<nCell; i++){
  7309         -      u8 *pCell = apCell[i];
         7372  +    for(i=0; i<b.nCell; i++){
         7373  +      u8 *pCell = b.apCell[i];
  7310   7374         if( i==cntOldNext ){
  7311   7375           MemPage *pOld = (++iOld)<nNew ? apNew[iOld] : apOld[iOld];
  7312   7376           cntOldNext += pOld->nCell + pOld->nOverflow + !leafData;
  7313   7377           aOld = pOld->aData;
  7314   7378         }
  7315   7379         if( i==cntNew[iNew] ){
  7316   7380           pNew = apNew[++iNew];
................................................................................
  7328   7392          || pCell<aOld
  7329   7393          || pCell>=&aOld[usableSize]
  7330   7394         ){
  7331   7395           if( !leafCorrection ){
  7332   7396             ptrmapPut(pBt, get4byte(pCell), PTRMAP_BTREE, pNew->pgno, &rc);
  7333   7397             if( rc ) goto balance_cleanup;
  7334   7398           }
  7335         -        if( szCell[i]>pNew->minLocal ){
         7399  +        if( cachedCellSize(&b,i)>pNew->minLocal ){
  7336   7400             ptrmapPutOvflPtr(pNew, pCell, &rc);
  7337   7401             if( rc ) goto balance_cleanup;
  7338   7402           }
  7339   7403         }
  7340   7404       }
  7341   7405     }
  7342   7406   
................................................................................
  7345   7409       u8 *pCell;
  7346   7410       u8 *pTemp;
  7347   7411       int sz;
  7348   7412       MemPage *pNew = apNew[i];
  7349   7413       j = cntNew[i];
  7350   7414   
  7351   7415       assert( j<nMaxCells );
  7352         -    pCell = apCell[j];
  7353         -    sz = szCell[j] + leafCorrection;
         7416  +    assert( b.apCell[j]!=0 );
         7417  +    pCell = b.apCell[j];
         7418  +    sz = b.szCell[j] + leafCorrection;
  7354   7419       pTemp = &aOvflSpace[iOvflSpace];
  7355   7420       if( !pNew->leaf ){
  7356   7421         memcpy(&pNew->aData[8], pCell, 4);
  7357   7422       }else if( leafData ){
  7358   7423         /* If the tree is a leaf-data tree, and the siblings are leaves, 
  7359         -      ** then there is no divider cell in apCell[]. Instead, the divider 
         7424  +      ** then there is no divider cell in b.apCell[]. Instead, the divider 
  7360   7425         ** cell consists of the integer key for the right-most cell of 
  7361   7426         ** the sibling-page assembled above only.
  7362   7427         */
  7363   7428         CellInfo info;
  7364   7429         j--;
  7365         -      pNew->xParseCell(pNew, apCell[j], &info);
         7430  +      pNew->xParseCell(pNew, b.apCell[j], &info);
  7366   7431         pCell = pTemp;
  7367   7432         sz = 4 + putVarint(&pCell[4], info.nKey);
  7368   7433         pTemp = 0;
  7369   7434       }else{
  7370   7435         pCell -= 4;
  7371   7436         /* Obscure case for non-leaf-data trees: If the cell at pCell was
  7372   7437         ** previously stored on a leaf node, and its reported size was 4
................................................................................
  7375   7440         ** any cell). But it is important to pass the correct size to 
  7376   7441         ** insertCell(), so reparse the cell now.
  7377   7442         **
  7378   7443         ** Note that this can never happen in an SQLite data file, as all
  7379   7444         ** cells are at least 4 bytes. It only happens in b-trees used
  7380   7445         ** to evaluate "IN (SELECT ...)" and similar clauses.
  7381   7446         */
  7382         -      if( szCell[j]==4 ){
         7447  +      if( b.szCell[j]==4 ){
  7383   7448           assert(leafCorrection==4);
  7384   7449           sz = pParent->xCellSize(pParent, pCell);
  7385   7450         }
  7386   7451       }
  7387   7452       iOvflSpace += sz;
  7388   7453       assert( sz<=pBt->maxLocal+23 );
  7389   7454       assert( iOvflSpace <= (int)pBt->pageSize );
................................................................................
  7433   7498         ** only after iPg+1 has already been updated. */
  7434   7499         assert( cntNew[iPg]>=cntOld[iPg] || abDone[iPg+1] );
  7435   7500   
  7436   7501         if( iPg==0 ){
  7437   7502           iNew = iOld = 0;
  7438   7503           nNewCell = cntNew[0];
  7439   7504         }else{
  7440         -        iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : nCell;
         7505  +        iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : b.nCell;
  7441   7506           iNew = cntNew[iPg-1] + !leafData;
  7442   7507           nNewCell = cntNew[iPg] - iNew;
  7443   7508         }
  7444   7509   
  7445         -      rc = editPage(apNew[iPg], iOld, iNew, nNewCell, apCell, szCell);
         7510  +      rc = editPage(apNew[iPg], iOld, iNew, nNewCell, &b);
  7446   7511         if( rc ) goto balance_cleanup;
  7447   7512         abDone[iPg]++;
  7448   7513         apNew[iPg]->nFree = usableSpace-szNew[iPg];
  7449   7514         assert( apNew[iPg]->nOverflow==0 );
  7450   7515         assert( apNew[iPg]->nCell==nNewCell );
  7451   7516       }
  7452   7517     }
................................................................................
  7490   7555         u32 key = get4byte(&apNew[i]->aData[8]);
  7491   7556         ptrmapPut(pBt, key, PTRMAP_BTREE, apNew[i]->pgno, &rc);
  7492   7557       }
  7493   7558     }
  7494   7559   
  7495   7560     assert( pParent->isInit );
  7496   7561     TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n",
  7497         -          nOld, nNew, nCell));
         7562  +          nOld, nNew, b.nCell));
  7498   7563   
  7499   7564     /* Free any old pages that were not reused as new pages.
  7500   7565     */
  7501   7566     for(i=nNew; i<nOld; i++){
  7502   7567       freePage(apOld[i], &rc);
  7503   7568     }
  7504   7569   
................................................................................
  7513   7578     }
  7514   7579   #endif
  7515   7580   
  7516   7581     /*
  7517   7582     ** Cleanup before returning.
  7518   7583     */
  7519   7584   balance_cleanup:
  7520         -  sqlite3ScratchFree(apCell);
         7585  +  sqlite3ScratchFree(b.apCell);
  7521   7586     for(i=0; i<nOld; i++){
  7522   7587       releasePage(apOld[i]);
  7523   7588     }
  7524   7589     for(i=0; i<nNew; i++){
  7525   7590       releasePage(apNew[i]);
  7526   7591     }
  7527   7592