SQLite4
Check-in [4431ab2769]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Still further progress on the same.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 4431ab2769f8140feb8f4477cfaea882711d87fc
User & Date: dan 2013-09-28 15:07:54
Context
2013-10-07
20:43
Further progress on b-tree module. check-in: 51c2c9358d user: dan tags: trunk
2013-09-28
15:07
Still further progress on the same. check-in: 4431ab2769 user: dan tags: trunk
11:23
Fixes for b-tree balancing routines. Still incomplete. check-in: 9e8d7525d8 user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/bt_main.c.

   277    277   static void printPage(FILE *f, u32 pgno, u8 *aData, int nData){
   278    278     int i;
   279    279     int nCell = (int)btCellCount(aData, nData);
   280    280     fprintf(f, "Page %d: ", pgno);
   281    281     fprintf(f, "nCell=%d ", nCell);
   282    282     fprintf(f, "iFree=%d ", (int)btFreeOffset(aData, nData));
   283    283     fprintf(f, "flags=%d ", (int)btFlags(aData, nData));
          284  +  if( btFlags(aData, nData) & BT_PGFLAGS_INTERNAL ){
          285  +    fprintf(f, "rchild=%d ", (int)btGetU32(&aData[1]));
          286  +  }
   284    287     fprintf(f, "cell-offsets=(");
   285    288     for(i=0; i<nCell; i++){
   286    289       u8 *ptr = btCellPtrFind(aData, nData, i);
   287    290       fprintf(f, "%s%d", i==0?"":" ", (int)btGetU16(ptr));
   288    291     }
   289    292     fprintf(f, ")\n");
          293  +
          294  +
   290    295   }
   291    296   #endif
   292    297   
   293    298   
   294    299   /*
   295    300   ** This function compares the key passed via parameters pK and nK to the
   296    301   ** key stored as part of cell iCell on the database page stored in buffer
................................................................................
   600    605   
   601    606   static u8 *btCellFindSize(u8 *aData, int nData, int iCell, int *pnByte){
   602    607     int nKey;
   603    608     u8 *pCell;
   604    609     u8 *p;
   605    610   
   606    611     p = pCell = btCellFind(aData, nData, iCell);
   607         -
   608    612     p += sqlite4BtVarintGet32(p, &nKey);
   609    613     p += nKey;
   610         -  p += sqlite4BtVarintGet32(p, &nKey);
   611         -  p += nKey;
          614  +  if( 0==(aData[0] & BT_PGFLAGS_INTERNAL) ){
          615  +    p += sqlite4BtVarintGet32(p, &nKey);
          616  +    p += nKey;
          617  +  }else{
          618  +    p += 4;
          619  +  }
   612    620   
   613    621     *pnByte = (p - pCell);
   614    622     return pCell;
   615    623   }
   616    624   
   617    625   /*
   618    626   ** Allocate a new page buffer.
................................................................................
   668    676     int i;                          /* Used to iterate through cells */
   669    677     int bLeaf;                      /* True if pPg is a leaf page */
   670    678     int nHdr;                       /* Bytes in header of this page */
   671    679   
   672    680     if( btNewBuffer(pDb, &aTmp) ) return SQLITE4_NOMEM;
   673    681   
   674    682     aData = sqlite4BtPageData(pPg);
   675         -  assert( (btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL)==0 );
   676    683     nCell = btCellCount(aData, pgsz);
   677    684   
   678    685     bLeaf = 0==(btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL);
   679    686     nHdr = bLeaf ? 1 : 5;
   680    687   
   681    688     /* Set header bytes of new page */
   682    689     memcpy(aTmp, aData, nHdr);
................................................................................
   746    753       i += 4;
   747    754     }
   748    755   
   749    756     assert( i==btKVCellSize(pKV, pgsz) );
   750    757     return i;
   751    758   }
   752    759   
   753         -static int btGatherSiblings(bt_cursor *pCsr, BtPage **apPg, int *pnPg){
          760  +typedef struct BalanceCtx BalanceCtx;
          761  +struct BalanceCtx {
          762  +  int pgsz;                       /* Database page size */
          763  +  int bLeaf;                      /* True if we are rebalancing leaf data */
          764  +  bt_cursor *pCsr;                /* Cursor identifying where to insert pKV */
          765  +  int nKV;                        /* Number of KV pairs */
          766  +  KeyValue *apKV;                 /* New KV pairs being inserted */
          767  +
          768  +  /* Populated by btGatherSiblings */
          769  +  int nIn;                        /* Number of sibling pages */
          770  +  BtPage *apPg[5];                /* Array of sibling pages */
          771  +
          772  +  /* Array populated by btBalanceMeasure */
          773  +  int *anCellSz;
          774  +  
          775  +  int anOut[5];                   /* Cell counts for output pages */
          776  +
          777  +  /* Variables used by btBalanceOutput */
          778  +  int nOut;                       /* Number of output pages */
          779  +  int iOut;                       /* Current output page */
          780  +  u8 *apOut[5];                   /* Buffers to assemble output in */
          781  +};
          782  +
          783  +static int btGatherSiblings(BalanceCtx *p){
          784  +  bt_cursor *pCsr = p->pCsr;
   754    785     bt_db * const pDb = pCsr->pDb; 
   755    786     const int pgsz = sqlite4BtPagerPagesize(pDb->pPager);
   756    787   
   757    788     int rc = SQLITE4_OK;
   758    789     int nCell;                      /* Number of cells in parent page */
   759    790     u8 *aParent;                    /* Buffer of parent page */
   760    791     int iChild;                     /* Index of child page */
................................................................................
   772    803     }else{
   773    804       nSib = 3;
   774    805     }
   775    806   
   776    807     if( iChild==0 ){
   777    808       iSib = 0;
   778    809     }else if( iChild==nCell ){
   779         -    iSib = nCell-2;
          810  +    iSib = nCell-(nSib-1);
   780    811     }else{
   781    812       iSib = iChild-1;
   782    813     }
   783    814   
   784    815     for(i=0; i<nSib && rc==SQLITE4_OK; i++){
   785    816       u32 pgno = btChildPgno(aParent, pgsz, iSib+i);
   786         -    rc = sqlite4BtPageGet(pDb->pPager, pgno, &apPg[i]);
          817  +    rc = sqlite4BtPageGet(pDb->pPager, pgno, &p->apPg[i]);
   787    818     }
   788         -  *pnPg = nSib;
          819  +  p->nIn = nSib;
   789    820   
   790    821     pCsr->aiCell[pCsr->nPg-2] = iSib;
   791    822     return rc;
   792    823   }
   793    824   
   794    825   static int btSetChildPgno(bt_db *pDb, BtPage *pPg, int iChild, u32 pgno){
   795    826     const int pgsz = sqlite4BtPagerPagesize(pDb->pPager);
................................................................................
   809    840         btPutU32(pCell, pgno);
   810    841       }
   811    842     }
   812    843   
   813    844     return rc;
   814    845   }
   815    846   
   816         -/* Called recursively by btInsertAndRebalance(). todo: Fix this! */
   817         -static int btModifyPage(bt_cursor *, int, int, KeyValue *);
   818         -
   819         -#if 0
   820         -/*
   821         -** An instance of this structure describes all the cells that will be
   822         -** redistributed by a single btRebalance() operation.
   823         -*/
   824         -struct BtRBIter {
   825         -  /* Input parameters */
   826         -  int nKV;                        /* Number of KV pairs to add */
   827         -  int iKVSib;                     /* Sibling to add KV pairs to */
   828         -  int iKVCell;                    /* Cell offset at which to insert KV pairs */
   829         -  int nSib;                       /* Number of sibling pages */
   830         -  /* Output parameters */
   831         -  int iSib;                       /* Sibling page to read cell from (or -ve) */
   832         -  int iCell;                      /* Index of cell to read (or -ve) */
   833         -};
   834         -static void btIterFirst(BtRBIter *p){
   835         -  p->iSib = 0;
   836         -  p->iCell = 0;
   837         -  if( p->iKVSib==0 && p->iKVCell==0 ){
   838         -    p->iSib = -1;
   839         -  }
   840         -}
   841         -static void btIterNext(BtRBIter *p){
   842         -  p->iCell++;
   843         -  /* Just finished the KV pairs */
   844         -  if( p->iSib<0 && p->iCell==p->nKV ){
   845         -    p->iSib = p->iKVSib;
   846         -    p->iCell = p->iKVCell;
   847         -  }
   848         -  if( p->iSib && p->iCell==p->nKV ){
   849         -    p->iSib = p->iKVSib;
   850         -    p->iCell = p->iKVCell;
   851         -  }
   852         -}
   853         -
   854         -static void btIterValid(BtRBIter *p){
   855         -  return (p->iSib>=0 || p->iCell>=0);
   856         -}
   857         -#endif
   858         -
   859         -
   860         -typedef struct BalanceCtx BalanceCtx;
   861         -struct BalanceCtx {
   862         -  int pgsz;                       /* Database page size */
   863         -  int bLeaf;                      /* True if we are rebalancing leaf data */
   864         -  bt_cursor *pCsr;                /* Cursor identifying where to insert pKV */
   865         -  KeyValue *pKV;                  /* New KV pair being inserted */
   866         -  int nPg;                        /* Number of sibling pages */
   867         -  BtPage **apPg;                  /* Array of sibling pages */
   868         -  
   869         -  int *anOut;                     /* Cell counts for output pages */
   870         -
   871         -  /* Variables used by btBalanceOutput */
   872         -  int nOut;                       /* Number of output pages */
   873         -  int iOut;                       /* Current output page */
   874         -  u8 **apOut;                     /* Buffers to assemble output in */
   875         -
   876         -
   877         -};
          847  +/* Called recursively by btBalance(). todo: Fix this! */
          848  +static int btInsertAndBalance(bt_cursor *, int, KeyValue *);
          849  +static int btDeleteFromPage(bt_cursor *, int);
          850  +
          851  +static int btBalanceMeasure(
          852  +  BalanceCtx *p,                  /* Description of balance operation */
          853  +  int iCell,                      /* Cell number in this iteration */
          854  +  u8 *pCell, int nByte,           /* Binary cell */
          855  +  KeyValue *pKV                   /* Key-value cell */
          856  +){
          857  +  if( pCell ){
          858  +    p->anCellSz[iCell] = nByte;
          859  +  }else{
          860  +    p->anCellSz[iCell] = btKVCellSize(pKV, p->pgsz);
          861  +  }
          862  +  return SQLITE4_OK;
          863  +}
   878    864   
   879    865   static int btBalanceOutput(
   880    866     BalanceCtx *p,                  /* Description of balance operation */
   881    867     int iCell,                      /* Cell number in this iteration */
   882    868     u8 *pCell, int nByte,           /* Binary cell to copy to output */
   883    869     KeyValue *pKV                   /* Key-value cell to write to output */
   884    870   ){
................................................................................
   921    907     BalanceCtx *p,
   922    908     int (*xVisit)(BalanceCtx*, int, u8*, int, KeyValue*)
   923    909   ){
   924    910     const int pgsz = sqlite4BtPagerPagesize(p->pCsr->pDb->pPager);
   925    911     int rc = SQLITE4_OK;            /* Return code */
   926    912     int iPg;                        /* Current page in apPg[] */
   927    913     int iCall = 0;
          914  +  int i;                          /* Used to iterate through KV pairs */
   928    915   
   929    916     BtPage *pIns = p->pCsr->apPage[p->pCsr->nPg-1];
   930    917     int iIns = p->pCsr->aiCell[p->pCsr->nPg-1];
   931    918   
   932         -  for(iPg=0; iPg<p->nPg && rc==SQLITE4_OK; iPg++){
          919  +  for(iPg=0; iPg<p->nIn && rc==SQLITE4_OK; iPg++){
   933    920       BtPage *pPg;                  /* Current page */
   934    921       u8 *aData;                    /* Page data */
   935    922       int nCell;                    /* Number of cells on page pPg */
   936    923       int iCell;                    /* Current cell in pPg */
   937    924   
   938    925       pPg = p->apPg[iPg];
   939    926       aData = sqlite4BtPageData(pPg);
................................................................................
   940    927       nCell = btCellCount(aData, pgsz);
   941    928   
   942    929       for(iCell=0; iCell<nCell && rc==SQLITE4_OK; iCell++){
   943    930         int nByte;
   944    931         u8 *pCell;
   945    932   
   946    933         if( pPg==pIns && iCell==iIns ){
   947         -        rc = xVisit(p, iCall++, 0, 0, p->pKV);
   948         -        if( rc!=SQLITE4_OK ) break;
          934  +        for(i=0; i<p->nKV; i++){
          935  +          rc = xVisit(p, iCall++, 0, 0, &p->apKV[i]);
          936  +          if( rc!=SQLITE4_OK ) break;
          937  +        }
   949    938         }
   950    939   
   951    940         pCell = btCellFindSize(aData, pgsz, iCell, &nByte);
   952    941         rc = xVisit(p, iCall++, pCell, nByte, 0);
   953    942       }
   954    943   
   955    944       if( pPg==pIns && iCell==nCell && rc==SQLITE4_OK ){
   956         -      rc = xVisit(p, iCall++, 0, 0, p->pKV);
          945  +      for(i=0; i<p->nKV; i++){
          946  +        rc = xVisit(p, iCall++, 0, 0, &p->apKV[i]);
          947  +      }
   957    948       }
   958    949     }
   959    950   
   960    951     return rc;
   961    952   }
   962    953   
   963    954   
   964         -int btInsertAndRebalance(bt_cursor *pCsr, KeyValue *pKV){
          955  +int btBalance(bt_cursor *pCsr, int nKV, KeyValue *apKV){
   965    956     bt_db * const pDb = pCsr->pDb; 
   966    957     const int pgsz = sqlite4BtPagerPagesize(pDb->pPager);
   967    958     const int nSpacePerPage = (pgsz - 1 - 6);
   968    959   
   969         -  int nIn = 0;                    /* Number of input pages */
   970    960     int iPg;                        /* Used to iterate through pages */
   971    961     int iCell;                      /* Used to iterate through cells */
   972    962     int nCell = 0;                  /* Total number of cells to redistribute */
   973    963     int *anCellSz;                  /* Array containing size in bytes of cells */
   974         -  int iIns;                       /* Index of new cell */
   975         -  int nOut;                       /* Number of output pages */
   976    964   
   977         -  BtPage *apPg[5];                /* Input/output pages */
   978         -  int anOut[5];                   /* Cell counts for output pages */
   979         -  u8 *apOut[5];                   /* Buffers for assembly of output pages */
   980    965     KeyValue aPCell[5];             /* Cells to push into the parent page */
   981    966     BtPage *pPar;                   /* Parent page */
   982    967     int iSib;                       /* Index of left-most sibling */
   983    968   
   984    969     int nTotal;                     /* Total bytes of content to distribute */
   985    970     int rc = SQLITE4_OK;            /* Return code */
   986    971   
   987    972     BalanceCtx ctx;
          973  +  memset(&ctx, 0, sizeof(ctx));
          974  +  ctx.pCsr = pCsr;
          975  +  ctx.nKV = nKV;
          976  +  ctx.apKV = apKV;
          977  +  ctx.pgsz = pgsz;
          978  +  ctx.bLeaf = 1; /* todo */
   988    979   
   989         -  memset(apOut, 0, sizeof(apOut));
   990         -  memset(apPg, 0, sizeof(apPg));
   991    980     memset(aPCell, 0, sizeof(aPCell));
   992    981   
   993    982     /* Gather the sibling pages from which cells will be redistributed into
   994         -  ** the apPg[] array.  */
          983  +  ** the ctx.apPg[] array.  */
   995    984     assert( pCsr->nPg>1 );
   996         -  rc = btGatherSiblings(pCsr, apPg, &nIn);
          985  +  rc = btGatherSiblings(&ctx);
   997    986     if( rc!=SQLITE4_OK ) goto rebalance_out;
   998    987     pPar = pCsr->apPage[pCsr->nPg-2];
   999    988     iSib = pCsr->aiCell[pCsr->nPg-2];
  1000    989   
  1001         -  /* Count the cells on the input pages. This loop also sets iNew. */
  1002         -  for(iPg=0; iPg<nIn; iPg++){
  1003         -    u8 *aData = sqlite4BtPageData(apPg[iPg]);
  1004         -    if( apPg[iPg]==pCsr->apPage[pCsr->nPg-1] ){
  1005         -      iIns = (nCell + pCsr->aiCell[pCsr->nPg-1]);
  1006         -      nCell++;
  1007         -    }
          990  +  /* Count the number of input cells */
          991  +  nCell = 1;
          992  +  for(iPg=0; iPg<ctx.nIn; iPg++){
          993  +    u8 *aData = sqlite4BtPageData(ctx.apPg[iPg]);
  1008    994       nCell += btCellCount(aData, pgsz);
  1009    995     }
  1010    996   
  1011    997     /* Allocate and populate the anCellSz[] array */
  1012         -  anCellSz = (int*)sqlite4_malloc(pDb->pEnv, sizeof(int) * nCell);
          998  +  anCellSz = ctx.anCellSz = (int*)sqlite4_malloc(pDb->pEnv, sizeof(int)*nCell);
  1013    999     if( anCellSz==0 ){
  1014   1000       rc = btErrorBkpt(SQLITE4_NOMEM);
  1015   1001       goto rebalance_out;
  1016   1002     }
  1017         -  iCell = 0;
  1018         -  for(iPg=0; iPg<nIn; iPg++){
  1019         -    u8 *aData = sqlite4BtPageData(apPg[iPg]);
  1020         -    int nPgCell;                  /* Number of cells on page apPg[iPg] */
  1021         -    int iPgCell;                  /* Index of cell under analysis in page */
         1003  +  rc = btBalanceVisitCells(&ctx, btBalanceMeasure);
  1022   1004   
  1023         -    nPgCell = btCellCount(aData, pgsz);
  1024         -    for(iPgCell=0; iPgCell<nPgCell; iPgCell++){
  1025         -      if( iCell==iIns ) iCell++;
  1026         -      btCellFindSize(aData, pgsz, iCell, &anCellSz[iCell]);
  1027         -      iCell++;
  1028         -    }
  1029         -  }
  1030         -  anCellSz[iIns] = btKVCellSize(pKV, pgsz);
  1031         -
  1032         -  /* Now figure out the number of output pages. Set nOut to this value. */
         1005  +  /* Now figure out the number of output pages. Set ctx.nOut to this value. */
  1033   1006     iCell = 0;
  1034   1007     for(iPg=0; iCell<nCell; iPg++){
  1035   1008       int nByte = 0;                /* Number of bytes of content on page */
  1036         -    assert( iPg<array_size(anOut) );
         1009  +    assert( iPg<array_size(ctx.anOut) );
  1037   1010       for(/* noop */; iCell<nCell; iCell++){
  1038   1011         nByte += (anCellSz[iCell] + 2);
  1039   1012         if( nByte>nSpacePerPage ) break;
  1040   1013       }
  1041         -    anOut[iPg] = iCell;
         1014  +    ctx.anOut[iPg] = iCell;
  1042   1015     }
  1043         -  nOut = iPg;
  1044         -  assert( anOut[nOut-1]==nCell );
         1016  +  ctx.nOut = iPg;
         1017  +  assert( ctx.anOut[ctx.nOut-1]==nCell );
  1045   1018   
  1046   1019     /* Calculate the total size of all cells. */
  1047   1020     nTotal = 0;
  1048   1021     for(iCell=0; iCell<nCell; iCell++) nTotal += (anCellSz[iCell] + 2);
  1049   1022   
  1050   1023     /* The loop in the previous block populated the anOut[] array in such a
  1051         -  ** way as to make the (nOut-1) leftmost pages completely full but leave
  1052         -  ** the rightmost page partially empty. This block redistributes cells
  1053         -  ** a bit more evenly. This block may reduce one or more of the values in 
  1054         -  ** the anOut[] array, but will not increase any. No values are reduced
  1055         -  ** to values lower than 1.  */
         1024  +  ** way as to make the (ctx.nOut-1) leftmost pages completely full but 
         1025  +  ** leave the rightmost page partially empty. This block redistributes 
         1026  +  ** cells a bit more evenly. This block may reduce one or more of the 
         1027  +  ** values in the anOut[] array, but will not increase any. No values 
         1028  +  ** are reduced to values lower than 1.  */
  1056   1029     iCell = nCell;
  1057         -  for(iPg=(nOut-2); iPg>=0; iPg--){
         1030  +  for(iPg=(ctx.nOut-2); iPg>=0; iPg--){
  1058   1031       int nByte = 0;                /* Number of bytes of content on page */
  1059   1032       int nGoal = nTotal / (iPg + 2);
  1060   1033   
  1061         -    for(/* noop */; iCell>0 && ((nByte<nGoal) || iCell>anOut[iPg]); iCell--){
         1034  +    for( ; iCell>0 && ((nByte<nGoal) || iCell>ctx.anOut[iPg]); iCell--){
  1062   1035         int nThis = (anCellSz[iCell-1] + 2);
  1063   1036         if( (nThis + nByte)>nSpacePerPage ) break;
  1064   1037         nByte += nThis;
  1065   1038       }
  1066         -    assert( iCell<=anOut[iPg] );
  1067         -    anOut[iPg] = iCell;
         1039  +    assert( iCell<=ctx.anOut[iPg] );
         1040  +    ctx.anOut[iPg] = iCell;
  1068   1041       nTotal = nByte;
  1069   1042     }
  1070   1043   
  1071   1044   #ifndef NDEBUG
  1072   1045     {
  1073   1046       int iDbg;
  1074         -    fprintf(stderr, "btInsertAndRebalance(): nIn=%d anIn[] = ", nIn);
  1075         -    for(iDbg=0; iDbg<nIn; iDbg++){
  1076         -      u8 *aData = sqlite4BtPageData(apPg[iDbg]);
         1047  +    fprintf(stderr, "btBalance(): nIn=%d anIn[] = ", ctx.nIn);
         1048  +    for(iDbg=0; iDbg<ctx.nIn; iDbg++){
         1049  +      u8 *aData = sqlite4BtPageData(ctx.apPg[iDbg]);
  1077   1050         fprintf(stderr, "%d ", btCellCount(aData, pgsz));
  1078   1051       }
  1079         -    fprintf(stderr, " ->  nOut=%d anOut[] = ", nOut);
  1080         -    for(iDbg=0; iDbg<nOut; iDbg++){
  1081         -      fprintf(stderr, "%d ", anOut[iDbg]);
         1052  +    fprintf(stderr, " ->  nOut=%d anOut[] = ", ctx.nOut);
         1053  +    for(iDbg=0; iDbg<ctx.nOut; iDbg++){
         1054  +      fprintf(stderr, "%d ", ctx.anOut[iDbg]);
  1082   1055       }
  1083   1056       fprintf(stderr, "\n");
  1084   1057     }
  1085   1058   #endif
  1086   1059   
  1087   1060     /* Allocate buffers for the output leaves */
  1088         -  for(iPg=0; iPg<nOut; iPg++){
  1089         -    rc = btNewBuffer(pDb, &apOut[iPg]);
         1061  +  for(iPg=0; iPg<ctx.nOut; iPg++){
         1062  +    rc = btNewBuffer(pDb, &ctx.apOut[iPg]);
  1090   1063       if( rc!=SQLITE4_OK ) goto rebalance_out;
  1091         -    memset(apOut[iPg] + pgsz-6, 0, 6);
         1064  +    memset(ctx.apOut[iPg] + pgsz-6, 0, 6);
  1092   1065     }
  1093   1066   
  1094   1067   
  1095         -  memset(&ctx, 0, sizeof(ctx));
  1096         -  ctx.pCsr = pCsr;
  1097         -  ctx.pKV = pKV;
  1098         -  ctx.nPg = nIn;
  1099         -  ctx.apPg = apPg;
  1100         -  ctx.pgsz = pgsz;
  1101         -  ctx.bLeaf = 1; /* todo */
  1102         -
  1103         -  ctx.anOut = anOut;
  1104         -  ctx.nOut = nOut;
  1105         -  ctx.apOut = apOut;
  1106   1068     rc = btBalanceVisitCells(&ctx, btBalanceOutput);
  1107   1069   
  1108         -#if 0
  1109         -  /* Populate buffers for the output leaves */
  1110         -  iPgOut = 0;                     /* Index of current output page */
  1111         -  iPgIn = 0;                      /* Index of current input page */
  1112         -  iPgInFirst = 0;
  1113         -  for(iCell=0; iCell<nCell; iCell++){
  1114         -    int iOff;                     /* Output page offset */
  1115         -    u8 *aOut;                     /* Output page buffer */
  1116         -
  1117         -    if( iCell==anOut[iPg] ) iPg++;
  1118         -    aOut = apOut[iPg];
  1119         -    iOff = btFreeOffset(aOut, pgsz);
  1120         -
  1121         -    if( iCell==iIns ){
  1122         -      iOff += btKVCellWrite(pKV, pgsz, &aOut[iOff]);
  1123         -    }else{
  1124         -      u8 *aIn = sqlite4BtPageData(apPg[iPgIn]);
  1125         -      int iPgCell = iCell - iPgInFirst;
  1126         -      if( iPgCell>=btCellCount(aIn, pgsz) ){
  1127         -        iPgIn++;
  1128         -        iPgInFirst = iCell;
  1129         -        iPgCell = 0;
  1130         -        aIn = sqlite4BtPageData(apPg[iPgIn]);
  1131         -      }
  1132         -      memcpy(&aOut[iOff], btCellFind(aIn, pgsz, iPgCell), anCellSz[iCell]);
  1133         -      iOff += anCellSz[iCell];
  1134         -    }
  1135         -    btPutU16(&aOut[pgsz-6], iOff);
  1136         -  }
  1137         -  for(iPg=0; iPg<nOut; iPg++){
  1138         -    u8 *aData = apOut[iPg];
  1139         -    btPutU16(&aData[pgsz-2], anOut[iPg] - (iPg==0 ? 0 : anOut[iPg-1]));
  1140         -  }
  1141         -#endif
  1142         -
  1143   1070     /* Clobber the old pages with the new buffers */
  1144         -  for(iPg=0; iPg<nOut; iPg++){
  1145         -    if( iPg>=nIn ){
  1146         -      rc = sqlite4BtPageAllocate(pDb->pPager, &apPg[iPg]);
         1071  +  for(iPg=0; iPg<ctx.nOut; iPg++){
         1072  +    if( iPg>=ctx.nIn ){
         1073  +      rc = sqlite4BtPageAllocate(pDb->pPager, &ctx.apPg[iPg]);
  1147   1074         if( rc!=SQLITE4_OK ) goto rebalance_out;
  1148   1075       }
  1149         -    btSetBuffer(pDb, apPg[iPg], apOut[iPg]);
  1150         -    apOut[iPg] = 0;
         1076  +    btSetBuffer(pDb, ctx.apPg[iPg], ctx.apOut[iPg]);
         1077  +    ctx.apOut[iPg] = 0;
  1151   1078     }
  1152   1079   
  1153   1080   #ifndef NDEBUG
  1154   1081     {
  1155   1082       int iDbg;
  1156         -    for(iDbg=0; iDbg<nOut; iDbg++){
  1157         -      u8 *aData = sqlite4BtPageData(apPg[iDbg]);
  1158         -      printPage(stderr, sqlite4BtPagePgno(apPg[iDbg]), aData, pgsz);
         1083  +    for(iDbg=0; iDbg<ctx.nOut; iDbg++){
         1084  +      u8 *aData = sqlite4BtPageData(ctx.apPg[iDbg]);
         1085  +      printPage(stderr, sqlite4BtPagePgno(ctx.apPg[iDbg]), aData, pgsz);
  1159   1086       }
  1160   1087     }
  1161   1088   #endif
  1162   1089   
  1163   1090     /* The leaves are written. Now gather the keys and page numbers to
  1164   1091     ** push up into the parent page.  */ 
  1165         -  for(iPg=0; iPg<(nOut-1); iPg++){
  1166         -    u8 *aData = sqlite4BtPageData(apPg[iPg]);
         1092  +  for(iPg=0; iPg<(ctx.nOut-1); iPg++){
         1093  +    u8 *aData = sqlite4BtPageData(ctx.apPg[iPg]);
  1167   1094       u8 *pCell;
  1168   1095   
  1169   1096       pCell = btCellFind(aData, pgsz, btCellCount(aData, pgsz)-1);
  1170         -    aPCell[iPg].pgno = sqlite4BtPagePgno(apPg[iPg]);
         1097  +    aPCell[iPg].pgno = sqlite4BtPagePgno(ctx.apPg[iPg]);
  1171   1098       pCell += sqlite4BtVarintGet32(pCell, &aPCell[iPg].nK);
  1172   1099       aPCell[iPg].pK = pCell;
  1173   1100     }
  1174   1101   
  1175         -  assert( nIn==1 && nOut==2 );
  1176         -  rc = btSetChildPgno(pDb, pPar, iSib+nIn-1, sqlite4BtPagePgno(apPg[nOut-1]));
  1177         -  if( rc!=SQLITE4_OK ) goto rebalance_out;
         1102  +  rc = btSetChildPgno(
         1103  +      pDb, pPar, iSib+ctx.nIn-1, sqlite4BtPagePgno(ctx.apPg[ctx.nOut-1])
         1104  +  );
         1105  +  if( rc==SQLITE4_OK ){
         1106  +    pCsr->nPg--;
         1107  +    rc = btDeleteFromPage(pCsr, ctx.nIn-1);
         1108  +  }
         1109  +  if( rc==SQLITE4_OK ){
         1110  +    rc = btInsertAndBalance(pCsr, ctx.nOut-1, aPCell);
         1111  +  }
  1178   1112   
  1179         -  pCsr->nPg--;
  1180         -  rc = btModifyPage(pCsr, nIn-1, nOut-1, aPCell);
  1181         -  if( rc!=SQLITE4_OK ) goto rebalance_out;
         1113  +#ifndef NDEBUG
         1114  +  {
         1115  +    u8 *aData = sqlite4BtPageData(pPar);
         1116  +    printPage(stderr, sqlite4BtPagePgno(pPar), aData, pgsz);
         1117  +  }
         1118  +#endif
  1182   1119   
  1183   1120    rebalance_out:
  1184         -  for(iPg=0; iPg<nIn; iPg++){
  1185         -    sqlite4BtPageRelease(apPg[iPg]);
         1121  +  for(iPg=0; iPg<ctx.nIn; iPg++){
         1122  +    sqlite4BtPageRelease(ctx.apPg[iPg]);
  1186   1123     }
  1187   1124     return rc;
  1188   1125   }
  1189   1126   
  1190   1127   static int btExtendTree(bt_cursor *pCsr){
  1191   1128     bt_db * const pDb = pCsr->pDb;
  1192   1129     const int pgsz = sqlite4BtPagerPagesize(pDb->pPager);
................................................................................
  1230   1167   **       deleted from the page.
  1231   1168   **
  1232   1169   **     * nKV entries are inserted in their place.
  1233   1170   **
  1234   1171   ** The tree balancing routine is called if this causes the page to
  1235   1172   ** become either overfull or to contain no entries at all.
  1236   1173   */
  1237         -static int btModifyPage(
         1174  +static int btInsertAndBalance(
  1238   1175     bt_cursor *pCsr,                /* Cursor identifying page to modify */
  1239         -  int nDel,                       /* Number of entries to delete from page */
  1240   1176     int nKV,                        /* Number of entries in apKV */
  1241   1177     KeyValue *apKV                  /* New cells to insert into the page */
  1242   1178   ){
  1243   1179     int rc = SQLITE4_OK;
  1244   1180     const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  1245   1181     u8 *aData;                      /* Page buffer */
  1246   1182     int nCell;                      /* Number of cells on this page already */
  1247   1183     int nFree;                      /* Contiguous free space on this page */
  1248         -  int nReq;                       /* Space required for type (a) cell */
         1184  +  int nReq = 0;                   /* Space required for type (a) cells */
  1249   1185     int iCell;                      /* Position to insert new key */
  1250   1186     int iWrite;                     /* Byte offset at which to write new cell */
         1187  +  int i;
  1251   1188     BtPage *pLeaf;
  1252         -  KeyValue *pKV;
  1253         -
  1254         -  assert( nDel==0 && nKV==1 );
  1255         -  pKV = apKV;
  1256   1189   
  1257   1190     /* Bytes of space required on the current page. */
  1258         -  nReq = btKVCellSize(pKV, pgsz) + 2;
         1191  +  for(i=0; i<nKV; i++){
         1192  +    nReq += btKVCellSize(&apKV[i], pgsz) + 2;
         1193  +  }
  1259   1194   
  1260   1195     iCell = pCsr->aiCell[pCsr->nPg-1];
  1261   1196     assert( pCsr->nPg>0 );
  1262   1197     pLeaf = pCsr->apPage[pCsr->nPg-1];
  1263   1198     aData = (u8*)sqlite4BtPageData(pLeaf);
  1264   1199   
  1265   1200     nCell = btCellCount(aData, pgsz);
................................................................................
  1282   1217       }
  1283   1218   
  1284   1219       iWrite = btFreeOffset(aData, pgsz);
  1285   1220       nFree = btFreeContiguous(aData, pgsz);
  1286   1221     }
  1287   1222   
  1288   1223     if( nFree>=nReq ){
  1289         -    /* The new entry will fit on the leaf page. So in this case all there
         1224  +    /* The new entry will fit on the page. So in this case all there
  1290   1225       ** is to do is update this single page. The easy case. */
  1291   1226       rc = sqlite4BtPageWrite(pLeaf);
  1292   1227       if( rc==SQLITE4_OK ){
  1293   1228         aData = sqlite4BtPageData(pLeaf);
  1294   1229   
  1295         -      /* Update the cell pointer array */
         1230  +      /* Make space within the cell pointer array */
  1296   1231         if( iCell!=nCell ){
  1297   1232           u8 *aFrom = btCellPtrFind(aData, pgsz, nCell-1);
  1298   1233           u8 *aTo = btCellPtrFind(aData, pgsz, nCell);
  1299   1234           memmove(aTo, aFrom, (nCell-iCell) * 2);
  1300   1235         }
  1301         -      btPutU16(btCellPtrFind(aData, pgsz, iCell), iWrite);
  1302   1236   
  1303         -      /* Increase cell count */
  1304         -      btPutU16(&aData[pgsz-2], nCell+1);
         1237  +      for(i=0; i<nKV; i++){
         1238  +        /* Write the cell pointer */
         1239  +        btPutU16(btCellPtrFind(aData, pgsz, iCell+i), iWrite);
  1305   1240         
  1306         -      /* Write the cell itself */
  1307         -      iWrite += btKVCellWrite(pKV, pgsz, &aData[iWrite]);
         1241  +        /* Write the cell itself */
         1242  +        iWrite += btKVCellWrite(&apKV[i], pgsz, &aData[iWrite]);
         1243  +      }
  1308   1244   
  1309   1245         /* Set the new total free space */
  1310   1246         if( nCell==0 ){
  1311   1247           btPutU16(&aData[pgsz-4], nFree - nReq);
  1312   1248         }else{
  1313   1249           btPutU16(&aData[pgsz-4], btFreeSpace(aData, pgsz) - nReq);
  1314   1250         }
  1315   1251   
         1252  +      /* Increase cell count */
         1253  +      btPutU16(&aData[pgsz-2], nCell+nKV);
         1254  +
  1316   1255         /* Set the offset to the block of empty space */
  1317   1256         btPutU16(&aData[pgsz-6], iWrite);
  1318   1257       }
         1258  +
  1319   1259     }else{
  1320   1260       /* The new entry will not fit on the leaf page. Entries will have
  1321   1261       ** to be shuffled between existing leaves and new leaves may need
  1322   1262       ** to be added to make space for it. */
  1323   1263       if( pCsr->nPg==1 ){
  1324   1264         rc = btExtendTree(pCsr);
  1325   1265       }
  1326   1266       if( rc==SQLITE4_OK ){
  1327         -      rc = btInsertAndRebalance(pCsr, pKV);
         1267  +      rc = btBalance(pCsr, nKV, apKV);
  1328   1268       }
  1329   1269     }
  1330   1270   
  1331   1271     return rc;
  1332   1272   }
  1333   1273   
  1334         -static int btRemoveFromLeaf(bt_cursor *pCsr){
         1274  +static int btDeleteFromPage(bt_cursor *pCsr, int nDel){
  1335   1275     const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  1336   1276     int rc = SQLITE4_OK;            /* Return code */
  1337   1277     BtPage *pLeaf;                  /* Leaf page */
  1338   1278   
  1339   1279     pLeaf = pCsr->apPage[pCsr->nPg-1];
  1340   1280     rc = sqlite4BtPageWrite(pLeaf);
  1341   1281     if( rc==SQLITE4_OK ){
         1282  +    int i;                        /* Used to iterate through cells to delete */
  1342   1283       u8 *aData;                    /* Page buffer */
  1343   1284       int nCell;                    /* Number of cells initially on this page */
  1344   1285       int iDel;                     /* Index of cell to delete */
  1345         -    int nByte;                    /* Size of cell to delete in bytes */
  1346         -
         1286  +    int nFreed = 0;               /* Total bytes of space freed */
  1347   1287   
  1348   1288       iDel = pCsr->aiCell[pCsr->nPg-1];
  1349   1289       aData = (u8*)sqlite4BtPageData(pLeaf);
  1350   1290       nCell = btCellCount(aData, pgsz);
  1351         -    btCellFindSize(aData, pgsz, iDel, &nByte);
         1291  +
         1292  +    for(i=iDel; i<(iDel+nDel); i++){
         1293  +      int nByte;
         1294  +      btCellFindSize(aData, pgsz, i, &nByte);
         1295  +      nFreed += nByte + 2;
         1296  +    }
  1352   1297   
  1353         -    if( iDel<(nCell-1) ){
  1354         -      u8 *aTo = btCellPtrFind(aData, pgsz, nCell-2);
         1298  +    if( (iDel+nDel)<nCell ){
         1299  +      u8 *aTo = btCellPtrFind(aData, pgsz, nCell-1-nDel);
  1355   1300         u8 *aFrom = btCellPtrFind(aData, pgsz, nCell-1);
  1356         -      memmove(aTo, aFrom, 2*(nCell-iDel-1));
         1301  +      memmove(aTo, aFrom, 2*(nCell-(iDel+nDel)));
  1357   1302       }
  1358   1303   
  1359   1304       /* Decrease cell count */
  1360         -    btPutU16(&aData[pgsz-2], nCell-1);
         1305  +    btPutU16(&aData[pgsz-2], nCell-nDel);
  1361   1306   
  1362   1307       /* Increase total free space */
  1363         -    btPutU16(&aData[pgsz-4], btFreeSpace(aData, pgsz) + nByte + 2);
         1308  +    btPutU16(&aData[pgsz-4], btFreeSpace(aData, pgsz) + nFreed);
  1364   1309     }
  1365   1310     
  1366   1311     return rc;
  1367   1312   }
  1368   1313   
  1369   1314   /*
  1370   1315   ** Insert a new key/value pair or replace an existing one.
................................................................................
  1388   1333   
  1389   1334     if( rc==SQLITE4_NOTFOUND || rc==SQLITE4_INEXACT ){
  1390   1335       /* Insert the new KV pair into the current leaf. */
  1391   1336       KeyValue kv;
  1392   1337       kv.pgno = 0;
  1393   1338       kv.pK = pK; kv.nK = nK;
  1394   1339       kv.pV = pV; kv.nV = nV;
  1395         -    rc = btModifyPage(&csr, 0, 1, &kv);
         1340  +    rc = btInsertAndBalance(&csr, 1, &kv);
  1396   1341     }
  1397   1342   
  1398   1343     return rc;
  1399   1344   }
  1400   1345   
  1401   1346   int sqlite4BtDelete(bt_cursor *pCsr){
  1402         -  return btRemoveFromLeaf(pCsr);
         1347  +  return btDeleteFromPage(pCsr, 1);
  1403   1348   }
  1404   1349   
  1405   1350   int sqlite4BtSetCookie(bt_db *db, unsigned int iVal){
  1406   1351     return sqlite4BtPagerSetCookie(db->pPager, iVal);
  1407   1352   }
  1408   1353   
  1409   1354   int sqlite4BtGetCookie(bt_db *db, unsigned int *piVal){
  1410   1355     return sqlite4BtPagerGetCookie(db->pPager, piVal);
  1411   1356   }
  1412   1357   

Changes to test/simple3.test.

    83     83     SELECT a, length(b) FROM t1 
    84     84   } {1 200  3 200  4 200  5 200}
    85     85   
    86     86   do_execsql_test 2.4 {
    87     87     INSERT INTO t1 VALUES(6, $val);
    88     88   }
    89     89   
    90         -breakpoint
    91     90   do_execsql_test 2.5 { 
    92     91     SELECT a, length(b) FROM t1 
    93     92   } {1 200  3 200  4 200  5 200  6 200}
           93  +
           94  +#--------------------------------------------------------------------------
           95  +do_test 3.0 {
           96  +  catch { db close }
           97  +  forcedelete test.db
           98  +  sqlite4 db file:test.db?kv=bt
           99  +} {}
          100  +
          101  +do_execsql_test 3.1 {
          102  +  CREATE TABLE t1(a PRIMARY KEY, b);
          103  +}
          104  +
          105  +for {set i 0} {$i < 100} {incr i} {
          106  +if {$i==6} breakpoint
          107  +  lappend rows $i
          108  +  do_execsql_test 3.2.$i {
          109  +    INSERT INTO t1 VALUES($i, randomblob(200));
          110  +    SELECT a FROM t1 ORDER BY a;
          111  +  } $rows
          112  +}
          113  +
    94    114   
    95    115   finish_test
    96    116