/ Check-in [fec849dc]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Attempt to further reduce memcpy() in balance_nonroot().
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | defrag-opt
Files: files | file ages | folders
SHA1: fec849dcca3aead2bc2d4ecffeda750684d32fb0
User & Date: dan 2014-10-11 20:00:24
Context
2014-10-13
18:03
Further work on balance_nonroot(). check-in: 6594f9b4 user: dan tags: defrag-opt
2014-10-11
20:00
Attempt to further reduce memcpy() in balance_nonroot(). check-in: fec849dc user: dan tags: defrag-opt
2014-10-09
19:35
Change the balance_nonroot() routine to reduce the amount of memcpy work that takes place. This is a work in progress. check-in: 29304499 user: dan tags: defrag-opt
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btree.c.

  5976   5976     pPage->nFree -= (nCell*2 + nUsable - cellbody);
  5977   5977     pPage->nCell = (u16)nCell;
  5978   5978   }
  5979   5979   
  5980   5980   
  5981   5981   static void rebuildPage(
  5982   5982     MemPage *pPg,                   /* Edit this page */
  5983         -  int nRemove,                    /* Cells to remove from start of page */
  5984   5983     int nCell,                      /* Final number of cells on page */
  5985         -  u8 **apCell,                    /* Array of nCell final cells */
  5986         -  u16 *szCell                     /* Array of nCell cell sizes */
         5984  +  u8 **apCell,                    /* Array of cells */
         5985  +  u16 *szCell                     /* Array of cell sizes */
  5987   5986   ){
  5988   5987     const int hdr = pPg->hdrOffset;          /* Offset of header on pPg */
  5989   5988     u8 * const aData = pPg->aData;           /* Pointer to data for pPg */
  5990   5989     const int usableSize = pPg->pBt->usableSize;
  5991   5990     u8 * const pEnd = &aData[usableSize];
  5992   5991     int i;
  5993   5992     u8 *pCellptr = pPg->aCellIdx;
................................................................................
  6015   6014     pPg->nOverflow = 0;
  6016   6015   
  6017   6016     put2byte(&aData[hdr+1], 0);
  6018   6017     put2byte(&aData[hdr+3], pPg->nCell);
  6019   6018     put2byte(&aData[hdr+5], pData - aData);
  6020   6019     aData[hdr+7] = 0x00;
  6021   6020   }
         6021  +
         6022  +static void editPage(
         6023  +  MemPage *pPg,                   /* Edit this page */
         6024  +  int iOld,                       /* Index of first cell currently on page */
         6025  +  int iNew,                       /* Index of new first cell on page */
         6026  +  int nNew,                       /* Final number of cells on page */
         6027  +  u8 **apCell,                    /* Array of cells */
         6028  +  u16 *szCell                     /* Array of cell sizes */
         6029  +){
         6030  +
         6031  +  if( 1 ){
         6032  +    u8 * const aData = pPg->aData;
         6033  +    const int hdr = pPg->hdrOffset;
         6034  +    u8 *pBegin = &pPg->aCellIdx[nNew * 2];
         6035  +    int nFree = pPg->nFree;       /* Free bytes on pPg */
         6036  +    int nCell = pPg->nCell;       /* Cells stored on pPg */
         6037  +    u8 *pData;
         6038  +    u8 *pCellptr;
         6039  +    int i;
         6040  +    int iOldEnd = iOld + pPg->nCell + pPg->nOverflow;
         6041  +
         6042  +#ifdef SQLITE_DEBUG
         6043  +    u8 *pTmp = sqlite3PagerTempSpace(pPg->pBt->pPager);
         6044  +    memcpy(pTmp, aData, pPg->pBt->usableSize);
         6045  +#endif
         6046  +
         6047  +    /* Remove cells from the start and end of the page */
         6048  +    if( iOld<iNew ){
         6049  +      int nShift = 0;
         6050  +      for(i=iOld; i<iNew; i++){
         6051  +        if( apCell[i]>aData && apCell[i]<&aData[pPg->pBt->usableSize] ){
         6052  +          freeSpace(pPg, apCell[i] - aData, szCell[i]);
         6053  +          nFree += szCell[i] + 2;
         6054  +          nShift++;
         6055  +        }
         6056  +      }
         6057  +      nCell -= nShift;
         6058  +      memmove(pPg->aCellIdx, &pPg->aCellIdx[nShift*2], nCell*2);
         6059  +    }
         6060  +    for(i=iNew+nNew; i<iOldEnd; i++){
         6061  +      if( apCell[i]>aData && apCell[i]<&aData[pPg->pBt->usableSize] ){
         6062  +        freeSpace(pPg, apCell[i] - aData, szCell[i]);
         6063  +        nFree += szCell[i] + 2;
         6064  +        nCell--;
         6065  +      }
         6066  +    }
         6067  +    pData = &aData[get2byte(&aData[hdr+5])];
         6068  +    if( pData<pBegin ) goto editpage_fail;
         6069  +
         6070  +    /* Add cells to the start of the page */
         6071  +    if( iNew<iOld ){
         6072  +      pCellptr = pPg->aCellIdx;
         6073  +      memmove(&pCellptr[(iOld-iNew)*2], pCellptr, nCell*2);
         6074  +      for(i=iNew; i<iOld; i++){
         6075  +        pData -= szCell[i];
         6076  +        if( pData<pBegin ) goto editpage_fail;
         6077  +        memcpy(pData, apCell[i], szCell[i]);
         6078  +        put2byte(pCellptr, (pData - aData));
         6079  +        pCellptr += 2;
         6080  +        nFree -= (szCell[i] + 2);
         6081  +        nCell++;
         6082  +      }
         6083  +    }
         6084  +
         6085  +    /* Add any overflow cells */
         6086  +    for(i=0; i<pPg->nOverflow; i++){
         6087  +      int iCell = (iOld + pPg->aiOvfl[i]) - iNew;
         6088  +      if( iCell>=0 && iCell<nNew ){
         6089  +        u8 *pCellptr = &pPg->aCellIdx[iCell * 2];
         6090  +        int sz = szCell[iCell+iNew];
         6091  +        memmove(&pCellptr[2], pCellptr, (nCell - iCell) * 2);
         6092  +        pData -= sz;
         6093  +        if( pData<pBegin ) goto editpage_fail;
         6094  +        memcpy(pData, apCell[iCell+iNew], sz);
         6095  +        put2byte(pCellptr, (pData - aData));
         6096  +        nFree -= (sz + 2);
         6097  +        nCell++;
         6098  +      }
         6099  +    }
         6100  +
         6101  +    /* Append cells to the end of the page */
         6102  +    pCellptr = &pPg->aCellIdx[nCell*2];
         6103  +    for(i=iNew+nCell; i<(iNew+nNew); i++){
         6104  +      pData -= szCell[i];
         6105  +      if( pData<pBegin ) goto editpage_fail;
         6106  +      memcpy(pData, apCell[i], szCell[i]);
         6107  +      put2byte(pCellptr, (pData - aData));
         6108  +      pCellptr += 2;
         6109  +      nFree -= (szCell[i] + 2);
         6110  +    }
         6111  +
         6112  +    pPg->nFree = nFree;
         6113  +    pPg->nCell = nNew;
         6114  +    pPg->nOverflow = 0;
         6115  +
         6116  +    put2byte(&aData[hdr+3], pPg->nCell);
         6117  +    put2byte(&aData[hdr+5], pData - aData);
         6118  +
         6119  +#ifdef SQLITE_DEBUG
         6120  +    for(i=0; i<nNew; i++){
         6121  +      u8 *pCell = apCell[i+iNew];
         6122  +      int iOff = get2byte(&pPg->aCellIdx[i*2]);
         6123  +      if( pCell>=aData && pCell<&aData[pPg->pBt->usableSize] ){
         6124  +        pCell = &pTmp[pCell - aData];
         6125  +      }
         6126  +      assert( 0==memcmp(pCell, &aData[iOff], szCell[i+iNew]) );
         6127  +    }
         6128  +#endif
         6129  +
         6130  +#if 0
         6131  +  printf("EDIT\n");
         6132  +#endif
         6133  +
         6134  +    return;
         6135  +  }
         6136  +
         6137  + editpage_fail:
         6138  +#if 0
         6139  +  printf("REBUILD\n");
         6140  +#endif
         6141  +  /* Unable to edit this page. Rebuild it from scratch instead. */
         6142  +  rebuildPage(pPg, nNew, &apCell[iNew], &szCell[iNew]);
         6143  +}
  6022   6144   
  6023   6145   /*
  6024   6146   ** The following parameters determine how many adjacent pages get involved
  6025   6147   ** in a balancing operation.  NN is the number of neighbors on either side
  6026   6148   ** of the page that participate in the balancing operation.  NB is the
  6027   6149   ** total number of pages that participate, including the target page and
  6028   6150   ** NN neighbors on either side.
................................................................................
  6308   6430     int iOvflSpace = 0;          /* First unused byte of aOvflSpace[] */
  6309   6431     int szScratch;               /* Size of scratch memory requested */
  6310   6432     MemPage *apOld[NB];          /* pPage and up to two siblings */
  6311   6433     MemPage *apNew[NB+2];        /* pPage and up to NB siblings after balancing */
  6312   6434     u8 *pRight;                  /* Location in parent of right-sibling pointer */
  6313   6435     u8 *apDiv[NB-1];             /* Divider cells in pParent */
  6314   6436     int cntNew[NB+2];            /* Index in aCell[] of cell after i-th page */
         6437  +  int cntOld[NB+2];            /* Old index in aCell[] after i-th page */
  6315   6438     int szNew[NB+2];             /* Combined size of cells place on i-th page */
  6316   6439     u8 **apCell = 0;             /* All cells begin balanced */
  6317   6440     u16 *szCell;                 /* Local size of all cells in apCell[] */
  6318   6441     u8 *aSpace1;                 /* Space for copies of dividers cells */
  6319   6442     Pgno pgno;                   /* Temp var to store a page number in */
  6320   6443   
  6321   6444     int aShiftLeft[NB+2];
................................................................................
  6483   6606         for(j=0; j<limit; j++){
  6484   6607           assert( nCell<nMaxCells );
  6485   6608           apCell[nCell] = findCellv2(aData, maskPage, cellOffset, j);
  6486   6609           szCell[nCell] = cellSizePtr(pOld, apCell[nCell]);
  6487   6610           nCell++;
  6488   6611         }
  6489   6612       }       
         6613  +    cntOld[i] = nCell;
  6490   6614       if( i<nOld-1 && !leafData){
  6491   6615         u16 sz = (u16)szNew[i];
  6492   6616         u8 *pTemp;
  6493   6617         assert( nCell<nMaxCells );
  6494   6618         szCell[nCell] = sz;
  6495   6619         pTemp = &aSpace1[iSpace1];
  6496   6620         iSpace1 += sz;
................................................................................
  6620   6744       }else{
  6621   6745         assert( i>0 );
  6622   6746         rc = allocateBtreePage(pBt, &pNew, &pgno, (bBulk ? 1 : pgno), 0);
  6623   6747         if( rc ) goto balance_cleanup;
  6624   6748         zeroPage(pNew, pageFlags);
  6625   6749         apNew[i] = pNew;
  6626   6750         nNew++;
         6751  +      cntOld[i] = nCell;
  6627   6752   
  6628   6753         /* Set the pointer-map entry for the new sibling page. */
  6629   6754         if( ISAUTOVACUUM ){
  6630   6755           ptrmapPut(pBt, pNew->pgno, PTRMAP_BTREE, pParent->pgno, &rc);
  6631   6756           if( rc!=SQLITE_OK ){
  6632   6757             goto balance_cleanup;
  6633   6758           }
................................................................................
  6689   6814     for(i=0; i<nNew; i++){
  6690   6815       /* At this point, "j" is the apCell[] index of the first cell currently
  6691   6816       ** stored on page apNew[i]. Or, if apNew[i] was not one of the original 
  6692   6817       ** sibling pages, "j" should be set to nCell. Variable iFirst is set
  6693   6818       ** to the apCell[] index of the first cell that will appear on the
  6694   6819       ** page following this balancing operation.  */
  6695   6820       int iFirst = (i==0 ? 0 : cntNew[i-1] + !leafData);     /* new first cell */
         6821  +
         6822  +#if 0
         6823  +    MemPage *pNew = apNew[i];
         6824  +    int iCell;
         6825  +    int nCta = 0;
         6826  +    int nFree;
         6827  +
         6828  +    printf("REBUILD %d: %d@%d -> %d@%d", apNew[i]->pgno,
         6829  +          pNew->nCell+pNew->nOverflow, j,
         6830  +          cntNew[i] - iFirst, iFirst
         6831  +    );
         6832  +    for(iCell=iFirst; iCell<cntNew[i]; iCell++){
         6833  +      if( apCell[iCell]<pNew->aData 
         6834  +       || apCell[iCell]>=&pNew->aData[pBt->usableSize] 
         6835  +      ){
         6836  +        nCta += szCell[iCell];
         6837  +      }
         6838  +    }
         6839  +    nFree = get2byte(&pNew->aData[pNew->hdrOffset+5]);
         6840  +    nFree -= (pNew->cellOffset + (cntNew[i] - iFirst) * 2);
         6841  +    printf(" cta=%d free=%d\n", nCta, nFree);
         6842  +    if( i==(nNew-1) ){
         6843  +      printf("-----\n");
         6844  +      fflush(stdout);
         6845  +    }
         6846  +#endif
         6847  +
  6696   6848       assert( i<nOld || j==nCell );
  6697   6849       aShiftLeft[i] = j - iFirst;
  6698   6850       j += apNew[i]->nCell + apNew[i]->nOverflow;
  6699   6851       aShiftRight[i] = cntNew[i] - j;
  6700   6852       assert( i!=nOld-1 || j==nCell );
  6701   6853       if( j<nCell ) j += !leafData;
  6702   6854     }
................................................................................
  6831   6983     assert( aShiftRight[nNew-1]>=0 && aShiftLeft[0]==0 );
  6832   6984     for(i=0; i<nNew*2; i++){
  6833   6985       int iPg = (i>=nNew ? i-nNew : nNew-1-i);
  6834   6986       if( abDone[iPg]==0 
  6835   6987        && (aShiftLeft[iPg]>=0 || abDone[iPg-1])
  6836   6988        && (aShiftRight[iPg]>=0 || abDone[iPg+1])
  6837   6989       ){
  6838         -      MemPage *pNew = apNew[iPg];
  6839         -      int iLeft = ((iPg==0) ? 0 : cntNew[iPg-1] + !leafData);
  6840         -      rebuildPage(pNew, 
  6841         -          aShiftLeft[iPg] < 0 ? (aShiftLeft[iPg]*-1) : 0,
  6842         -          cntNew[iPg] - iLeft,
  6843         -          &apCell[iLeft],
  6844         -          &szCell[iLeft]
  6845         -      );
         6990  +      int iNew;
         6991  +      int iOld;
         6992  +      int nNewCell;
         6993  +
         6994  +      if( iPg==0 ){
         6995  +        iNew = iOld = 0;
         6996  +        nNewCell = cntNew[0];
         6997  +      }else{
         6998  +        iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : nCell;
         6999  +        iNew = cntNew[iPg-1] + !leafData;
         7000  +        nNewCell = cntNew[iPg] - iNew;
         7001  +      }
         7002  +
         7003  +      editPage(apNew[iPg], iOld, iNew, nNewCell, apCell, szCell);
  6846   7004         abDone[iPg] = 1;
  6847         -      assert( pNew->nOverflow==0 );
  6848         -      assert( pNew->nCell==(cntNew[iPg] - (iPg==0?0:cntNew[iPg-1]+!leafData)) );
         7005  +      assert( apNew[iPg]->nOverflow==0 );
         7006  +      assert( apNew[iPg]->nCell==nNewCell );
  6849   7007       }
  6850   7008     }
  6851   7009     assert( memcmp(abDone, "\01\01\01\01\01", nNew)==0 );
  6852   7010   
  6853   7011     assert( nOld>0 );
  6854   7012     assert( nNew>0 );
  6855   7013   
................................................................................
  6865   7023       ** for which the pointer is stored within the content being copied.
  6866   7024       **
  6867   7025       ** The second assert below verifies that the child page is defragmented
  6868   7026       ** (it must be, as it was just reconstructed using assemblePage()). This
  6869   7027       ** is important if the parent page happens to be page 1 of the database
  6870   7028       ** image.  */
  6871   7029       assert( nNew==1 );
         7030  +    defragmentPage(apNew[0]);
  6872   7031       assert( apNew[0]->nFree == 
  6873   7032           (get2byte(&apNew[0]->aData[5])-apNew[0]->cellOffset-apNew[0]->nCell*2) 
  6874   7033       );
  6875   7034       copyNodeContent(apNew[0], pParent, &rc);
  6876   7035       freePage(apNew[0], &rc);
  6877   7036     }else if( ISAUTOVACUUM && !leafCorrection ){
  6878   7037       /* Fix the pointer map entries associated with the right-child of each