SQLite4
Check-in [885387b919]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
SHA1 Hash:885387b9190a323b8c8dc1887945ecd06f3d624b
Date: 2014-01-08 20:29:52
User: dan
Comment:Fill in more merging code. Fix many bugs.
Tags And Properties
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/bt_main.c

201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
...
531
532
533
534
535
536
537
538
539
540
541
542
543
544

545
546
547
548
549
550
551
552
...
679
680
681
682
683
684
685

































































































686
687
688
689
690
691
692
....
1388
1389
1390
1391
1392
1393
1394







1395
1396
1397
1398
1399
1400
1401
....
1425
1426
1427
1428
1429
1430
1431








1432
1433
1434
1435
1436
1437
1438
....
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
....
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
....
3632
3633
3634
3635
3636
3637
3638

3639
3640
3641
3642
3643
3644
3645

3646
3647
3648
3649
3650
3651
3652
....
3668
3669
3670
3671
3672
3673
3674




3675
3676
3677
3678
3679
3680
3681
....
3683
3684
3685
3686
3687
3688
3689




3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704


3705
3706
3707
3708

3709
3710
3711


3712
3713
3714
3715
3716
3717
3718




3719




3720
3721
3722
3723
3724
3725
3726
3727
....
3752
3753
3754
3755
3756
3757
3758



3759
3760

3761


3762
3763
3764


3765
3766
3767
3768
3769
3770
3771
3772
3773











3774
3775
3776
3777
3778
3779
3780
3781
3782
3783







3784
3785
3786
3787
3788
3789
3790
....
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
....
4066
4067
4068
4069
4070
4071
4072


4073
4074
4075
4076
4077
4078
4079
....
4081
4082
4083
4084
4085
4086
4087

4088
4089
4090
4091
4092
4093
4094
....
4106
4107
4108
4109
4110
4111
4112




4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
}
#define btPutU32(x,y) sqlite4BtPutU32(x,y)

/*
** Allocate a new database handle.
*/
int sqlite4BtNew(sqlite4_env *pEnv, int nExtra, bt_db **ppDb){
  static const int MIN_MERGE = 4;
  static const int SCHEDULE_ALLOC = 1;

  bt_db *db = 0;                  /* New database object */
  BtPager *pPager = 0;            /* Pager object for this database */
  int nReq;                       /* Bytes of space required for bt_db object */
  int rc;                         /* Return code */

  nReq = sizeof(bt_db);
................................................................................
static void btPageToAscii(
  u32 pgno,                       /* Page number */
  int bAscii,                     /* True to print keys and values as ASCII */
  BtPager *pPager,                /* Pager object (or NULL) */
  u8 *aData, int nData,           /* Buffer containing page data */
  sqlite4_buffer *pBuf            /* Output buffer */
){
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(pPager);
  int i;
  int nCell = (int)btCellCount(aData, nData);
  u8 flags = btFlags(aData);      /* Page flags */

  sqlite4BtBufAppendf(pBuf, "Page %d: ", pgno);


  if( pgno==pHdr->iSRoot ){
    int i;
    BtSchedule s;
    sqlite4BtBufAppendf(pBuf, "(schedule page) ");
    btReadSchedule(0, aData, &s);

    sqlite4BtBufAppendf(pBuf, "  eBusy=(%s)\n",
        s.eBusy==BT_SCHEDULE_EMPTY ? "empty" :
................................................................................
  return 0;
}

int printPgToStderr(BtPage *pPg){
  printPage(stderr, sqlite4BtPagePgno(pPg), sqlite4BtPageData(pPg), 1024);
  return 0;
}

































































































#endif


/*
** This function compares the key passed via parameters pK and nK to the
** key that cursor pCsr currently points to.
**
................................................................................
** direction.
*/
static int fiCsrStep(FiCursor *pCsr){
  int rc = SQLITE4_OK;
  int bNext = (0!=(pCsr->base.flags & CSR_NEXT_OK));
  const void *pKey; int nKey;     /* Current key that cursor points to */
  int i;








  assert( pCsr->base.flags & (CSR_NEXT_OK | CSR_PREV_OK) );
  assert( pCsr->iBt>=0 );

  do{
    /* Load the current key in to pKey/nKey. Then advance all sub-cursors 
    ** that share a key with the current sub-cursor. */
................................................................................
    }

    /* Figure out a new current bt cursor */
    if( rc==SQLITE4_OK ){
      rc = fiCsrSetCurrent(pCsr);
    }
  }while( rc==SQLITE4_OK && fiCsrIsDelete(&pCsr->aSub[pCsr->iBt].csr) );









  return rc;
}

typedef struct FiLevelIter FiLevelIter;
struct FiLevelIter {
  /* Used internally */
................................................................................
};

static int fiLevelIterNext(FiLevelIter *p){
  u16 iMin, nLevel;

  p->iSub++;
  p->iLvl--;
  btReadSummary(p->aSum, 0, &iMin, &nLevel, 0);
  while( p->iLvl<(int)iMin ){
    p->iAge++;
    if( p->iAge>=(p->nSum)/6 ) return 1;
    btReadSummary(p->aSum, p->iAge, &iMin, &nLevel, 0);
    p->iLvl = (int)iMin + (int)nLevel - 1;
  }

................................................................................
          btCsrReset(&pSub->csr, 1);
          btCsrSetup(db, iRoot, &pSub->csr);

          rc = btCsrSeek(&pSub->csr, 0, pK, nK, BT_SEEK_EQ, BT_CSRSEEK_SEEK);
          assert( rc!=SQLITE4_INEXACT );
          if( rc!=SQLITE4_NOTFOUND ){
            /* A hit on the requested key or an error has occurred. Either
             ** way, break out of the loop. If this is a hit, set iBt to
             ** zero so that the BtCsrKey() and BtCsrData() routines know
             ** to return data from the first (only) sub-cursor. */
            assert( pCsr->iBt<0 );
            if( rc==SQLITE4_OK ){
              if( 0==fiCsrIsDelete(&pSub->csr) ){
                pCsr->iBt = 0;
              }else{
                rc = SQLITE4_NOTFOUND;
              }
................................................................................
        *piMaxLevel = (u32)(iMin + nLevel - 1);
        *piAge = iBestAge;

        /* Find the output level */
        btReadSummary(aNew, iBestAge+1, &iMin, &nLevel, &iMerge);
        *piOutLevel = iMin + nLevel;


        /* Update the summary record for the output segment. */
        btWriteSummary(aNew, iBestAge+1, iMin, nLevel+1, iMerge);

        /* Write the updated summary record back to the db. */
        rc = btReplaceEntry(
             db, pHdr->iMRoot, aSummaryKey, sizeof(aSummaryKey), aNew, nByte
        );

      }
    }
  }

  btCsrReset(&csr, 1);
  return rc;
}
................................................................................
  BtCursor mcsr;                  /* Cursor for reading the meta-tree */
  const void *pKey = 0;
  const u8 *aSum; int nSum;       /* Summary value */
  int nKey = 0;
  sqlite4_buffer buf;
  u32 iLvl;
  int iBlk;





  memset(&csr, 0, sizeof(csr));
  memset(&mcsr, 0, sizeof(mcsr));
  btCsrSetup(db, pHdr->iMRoot, &mcsr);
  sqlite4_buffer_init(&buf, 0);
  
  if( p->iNextPg ){
................................................................................
    rc = btCsrEnd(&csr, 0);
    if( rc==SQLITE4_OK ){
      csr.aiCell[0] = p->iNextCell;
      rc = btCsrKey(&csr, &pKey, &nKey);
    }
  }





  for(iLvl=p->iMinLevel; iLvl<=p->iMaxLevel; iLvl++){
    u8 aPrefix[8];
    u32 iRoot = 0;
    btPutU32(&aPrefix[0], p->iAge);
    btPutU32(&aPrefix[4], ~iLvl);

    rc = btCsrSeek(&mcsr, 0, aPrefix, sizeof(aPrefix), BT_SEEK_GE, 0);
    if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
    while( rc==SQLITE4_OK && iRoot==0 ){
      const u8 *pMKey; int nMKey;
      rc = btCsrKey(&mcsr, (const void **)&pMKey, &nMKey);

      if( rc==SQLITE4_OK ){
        if( nMKey<sizeof(aPrefix) || memcmp(aPrefix, pMKey, sizeof(aPrefix)) ){
          rc = SQLITE4_NOTFOUND;


        }
      }
      if( rc==SQLITE4_OK ){
        if( pKey ){

          int res = btKeyCompare(pMKey + 8, nMKey - 8, pKey, nKey);
          if( res<=0 ){
            const void *pData; int nData;


            btCsrData(&mcsr, 0, 4, &pData, &nData);
            iRoot = btGetU32(pData);
          }
        }
        rc = sqlite4BtDelete(&mcsr.base);
      }
    }









    if( rc==SQLITE4_OK && iRoot ){
      int n = sizeof(aPrefix) + nKey;
      rc = sqlite4_buffer_resize(&buf, n);
      if( rc==SQLITE4_OK ){
        u8 aData[4];
        u8 *a = (u8*)buf.p;
        memcpy(a, aPrefix, sizeof(aPrefix));
        memcpy(&a[sizeof(aPrefix)], pKey, nKey);
................................................................................
      btPutU32(&a[4], ~p->iOutLevel);
      memcpy(&a[8], pKey, nKey);
      btPutU32(aData, p->aRoot[iBlk]);
      rc = btReplaceEntry(db, pHdr->iMRoot, a, nKey+8, aData, sizeof(aData));
    }
  }




  btCsrReset(&csr, 1);
  rc = fiLoadSummary(db, &csr, &aSum, &nSum);

  if( rc==SQLITE4_OK ){


    u16 iMinLevel = 0;
    u16 nLevel = 0;
    u16 iMergeLevel = 0;


    if( nSum>(6*(p->iAge+1)) ){
      btReadSummary(aSum, p->iAge+1, &iMinLevel, &nLevel, &iMergeLevel);
    }
    if( (iMinLevel+nLevel)>p->iOutLevel ){
      rc = sqlite4_buffer_resize(&buf, MAX(nSum, (p->iOutLevel+1)*6));
      if( rc==SQLITE4_OK ){
        nLevel = p->iOutLevel - iMinLevel + 1;
        memcpy(buf.p, aSum, nSum);
        btWriteSummary((u8*)buf.p, p->iAge+1, iMinLevel, nLevel, iMergeLevel);











        rc = btReplaceEntry(
             db, pHdr->iMRoot, aSummaryKey, sizeof(aSummaryKey), buf.p, buf.n
        );
      }
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
  sqlite4_buffer_clear(&buf);







  return rc;
}

/*
** If possible, schedule a merge operation. 
**
** The merge operation is selected based on the following criteria:
................................................................................
  /* Check if the schedule page is busy. If so, no new merge may be 
  ** scheduled. If the schedule page is not busy, call btFindMerge() to
  ** figure out which levels should be scheduled for merge.  */
  if( rc==SQLITE4_OK ){
    aData = sqlite4BtPageData(pPg);
    
    switch( btGetU32(aData) ){
      case 555:
      case BT_SCHEDULE_BUSY:
        rc = SQLITE4_NOTFOUND;
        break;

      case BT_SCHEDULE_DONE: {
        BtSchedule s;
        rc = btReadSchedule(db, aData, &s);
        if( rc==SQLITE4_OK ){
          rc = btIntegrateMerge(db, &s);
        }
        if( rc==SQLITE4_OK ){
          s.eBusy = 555;
          btWriteSchedulePage(pPg, &s, &rc);
          rc = SQLITE4_NOTFOUND;
        }
        break;
      }

      default: /* BT_SCHEDULE_EMPTY */
        break;
    }
................................................................................
**
** No page number is assigned at this point.
*/
static int fiWriterAlloc(FiWriter *p, int bLeaf, FiWriterPg *pPg){
  int rc = SQLITE4_OK;

  assert( pPg->bAllocated==0 );


  pPg->bAllocated = 1;
  p->nAlloc++;
  pPg->nCell = 0;

  if( pPg->aBuf==0 ){
    rc = btNewBuffer(p->db, &pPg->aBuf);
  }
................................................................................
    if( bLeaf ){
      pPg->iFree = 1;
      pPg->aBuf[0] = 0x00;
    }else{
      pPg->iFree = 5;
      pPg->aBuf[0] = BT_PGFLAGS_INTERNAL;
    }

  }
  return rc;
}

static int fiWriterPush(FiWriter *p, const u8 *pKey, int nKey){
  int rc = SQLITE4_OK;
  int i;                          /* Iterator variable */
................................................................................
    /* Check if the key will fit on page FiWriter.aHier[iIns]. If so,
    ** break out of the loop. */
    int nFree;
    pPg = &p->aHier[iIns];
    nFree = p->pgsz - (pPg->iFree + 6 + pPg->nCell*2);
    if( nFree>=nByte ) break;
  }




  if( iIns==p->nHier ){
    p->nHier = iIns+1;
    rc = fiWriterAlloc(p, 0, &p->aHier[iIns]);
  }

  if( rc==SQLITE4_OK && iIns==nMaxAlloc ){
    rc = BT_BLOCKFULL;
  }

  for(i=0; rc==SQLITE4_OK && i<iIns; i++){
    pPg = &p->aHier[i];
    if( i!=0 ){
      btPutU32(&pPg->aBuf[1], pgno);
    }
    rc = fiWriterFlush(p, pPg, &pgno);
    if( rc==SQLITE4_OK ){







|
|







 







|






>
|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>







 







|







 







|
|
|







 







>







>







 







>
>
>
>







 







>
>
>
>








|






>
>




>

|
<
>
>
|
|
<



|
>
>
>
>
|
>
>
>
>
|







 







>
>
>
|
|
>

>
>
|
|
|
>
>
|
|
|
|
<
<

<

>
>
>
>
>
>
>
>
>
>
>
|
|
|
<






>
>
>
>
>
>
>







 







<











|

<







 







>
>







 







>







 







>
>
>
>
|




<
<
<
<







201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
...
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
...
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
....
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
....
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
....
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
....
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
....
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
....
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
....
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836

3837
3838
3839
3840

3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
....
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910


3911

3912
3913
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926

3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
3942
3943
3944
3945
3946
....
3975
3976
3977
3978
3979
3980
3981

3982
3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994

3995
3996
3997
3998
3999
4000
4001
....
4220
4221
4222
4223
4224
4225
4226
4227
4228
4229
4230
4231
4232
4233
4234
4235
....
4237
4238
4239
4240
4241
4242
4243
4244
4245
4246
4247
4248
4249
4250
4251
....
4263
4264
4265
4266
4267
4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278




4279
4280
4281
4282
4283
4284
4285
}
#define btPutU32(x,y) sqlite4BtPutU32(x,y)

/*
** Allocate a new database handle.
*/
int sqlite4BtNew(sqlite4_env *pEnv, int nExtra, bt_db **ppDb){
  static const int MIN_MERGE = 2;
  static const int SCHEDULE_ALLOC = 4;

  bt_db *db = 0;                  /* New database object */
  BtPager *pPager = 0;            /* Pager object for this database */
  int nReq;                       /* Bytes of space required for bt_db object */
  int rc;                         /* Return code */

  nReq = sizeof(bt_db);
................................................................................
static void btPageToAscii(
  u32 pgno,                       /* Page number */
  int bAscii,                     /* True to print keys and values as ASCII */
  BtPager *pPager,                /* Pager object (or NULL) */
  u8 *aData, int nData,           /* Buffer containing page data */
  sqlite4_buffer *pBuf            /* Output buffer */
){
  BtDbHdr *pHdr = 0;
  int i;
  int nCell = (int)btCellCount(aData, nData);
  u8 flags = btFlags(aData);      /* Page flags */

  sqlite4BtBufAppendf(pBuf, "Page %d: ", pgno);

  if( pPager ) pHdr = sqlite4BtPagerDbhdr(pPager);
  if( pHdr && pgno==pHdr->iSRoot ){
    int i;
    BtSchedule s;
    sqlite4BtBufAppendf(pBuf, "(schedule page) ");
    btReadSchedule(0, aData, &s);

    sqlite4BtBufAppendf(pBuf, "  eBusy=(%s)\n",
        s.eBusy==BT_SCHEDULE_EMPTY ? "empty" :
................................................................................
  return 0;
}

int printPgToStderr(BtPage *pPg){
  printPage(stderr, sqlite4BtPagePgno(pPg), sqlite4BtPageData(pPg), 1024);
  return 0;
}

static void btPrintMetaTree(BtPager *pPager, int bAscii, BtDbHdr *pHdr){
  u8 *aData;
  int nData;
  sqlite4_buffer buf;
  BtPage *pPg = 0;

  sqlite4BtPageGet(pPager, pHdr->iMRoot, &pPg);
  aData = sqlite4BtPageData(pPg);
  nData = pHdr->pgsz;
  sqlite4_buffer_init(&buf, 0);
  btPageToAscii(pHdr->iMRoot, bAscii, pPager, aData, nData, &buf);
  sqlite4_buffer_append(&buf, "", 1);

  fprintf(stderr, "%s", (char*)buf.p);
  sqlite4_buffer_clear(&buf);
  sqlite4BtPageRelease(pPg);
}
#endif

static int btCsrEnd(BtCursor *pCsr, int bLast);
static int btCsrStep(BtCursor *pCsr, int bNext);
static int btCsrKey(BtCursor *pCsr, const void **ppK, int *pnK);

#ifndef NDEBUG
/*
** This function is really just a big assert() statement. It contributes 
** nothing to the operation of the library.
**
** The assert() fails if the summary-record is not consistent with the
** actual contents of the meta-tree.
*/
static void assert_summary_ok(bt_db *db){
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);
  BtCursor csr;                   /* Cursor used to load summary record */
  BtCursor mcsr;                  /* Cursor used to scan meta-tree */
  const u8 *aSum; int nSum;       /* Current summary record */
  int i;
  int rc;

  struct AgeData {
    int iMinLevel;                /* Smallest level of this age seen */
    int iMaxLevel;                /* Largest level of this age seen */
  } aData[32];

  for(i=0; i<array_size(aData); i++){
    aData[i].iMinLevel = -1;
    aData[i].iMaxLevel = -1;
  }

  rc = fiLoadSummary(db, &csr, &aSum, &nSum);
  assert( rc==SQLITE4_OK );

  btCsrSetup(db, pHdr->iMRoot, &mcsr);
  for(rc=btCsrEnd(&mcsr, 0); rc==SQLITE4_OK; rc=btCsrStep(&mcsr, 1)){
    const u8 *pKey = 0; int nKey = 0;
    u32 iAge;
    u32 iLevel;
    rc = btCsrKey(&mcsr, (const void **)&pKey, &nKey);
    assert( rc==SQLITE4_OK );

    if( nKey==array_size(aSummaryKey) && memcmp(aSummaryKey, pKey, nKey)==0 ){
      break;
    }
    assert( nKey>=8 );

    iAge = btGetU32(&pKey[0]);
    iLevel = ~btGetU32(&pKey[4]);

    if( aData[iAge].iMinLevel<0 || aData[iAge].iMinLevel>iLevel ){
      aData[iAge].iMinLevel = (int)iLevel;
    }
    if( aData[iAge].iMaxLevel<0 || aData[iAge].iMaxLevel<iLevel ){
      aData[iAge].iMaxLevel = (int)iLevel;
    }
  }
  assert( rc==SQLITE4_OK );

  for(i=0; i<array_size(aData); i++){
    u16 iMin = 0; 
    u16 nLevel = 0;
    if( i<(nSum/6) ){
      btReadSummary(aSum, i, &iMin, &nLevel, 0);
    }
    if( aData[i].iMinLevel>=0 ){
      assert( (int)iMin==aData[i].iMinLevel );
      assert( (int)nLevel==(aData[i].iMaxLevel - aData[i].iMinLevel + 1) );
    }else{
      assert( iMin==0 && nLevel==0 );
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
}
#else
# define assert_summary_ok(x) 
#endif


/*
** This function compares the key passed via parameters pK and nK to the
** key that cursor pCsr currently points to.
**
................................................................................
** direction.
*/
static int fiCsrStep(FiCursor *pCsr){
  int rc = SQLITE4_OK;
  int bNext = (0!=(pCsr->base.flags & CSR_NEXT_OK));
  const void *pKey; int nKey;     /* Current key that cursor points to */
  int i;

#ifndef NDEBUG
  sqlite4_buffer buf;
  sqlite4_buffer_init(&buf, 0);
  sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
  sqlite4_buffer_set(&buf, pKey, nKey);
#endif

  assert( pCsr->base.flags & (CSR_NEXT_OK | CSR_PREV_OK) );
  assert( pCsr->iBt>=0 );

  do{
    /* Load the current key in to pKey/nKey. Then advance all sub-cursors 
    ** that share a key with the current sub-cursor. */
................................................................................
    }

    /* Figure out a new current bt cursor */
    if( rc==SQLITE4_OK ){
      rc = fiCsrSetCurrent(pCsr);
    }
  }while( rc==SQLITE4_OK && fiCsrIsDelete(&pCsr->aSub[pCsr->iBt].csr) );

#ifndef NDEBUG
  if( rc==SQLITE4_OK ){
    sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
    assert( btKeyCompare(buf.p, buf.n, pKey, nKey) < 0 );
  }
  sqlite4_buffer_clear(&buf);
#endif

  return rc;
}

typedef struct FiLevelIter FiLevelIter;
struct FiLevelIter {
  /* Used internally */
................................................................................
};

static int fiLevelIterNext(FiLevelIter *p){
  u16 iMin, nLevel;

  p->iSub++;
  p->iLvl--;
  btReadSummary(p->aSum, p->iAge, &iMin, &nLevel, 0);
  while( p->iLvl<(int)iMin ){
    p->iAge++;
    if( p->iAge>=(p->nSum)/6 ) return 1;
    btReadSummary(p->aSum, p->iAge, &iMin, &nLevel, 0);
    p->iLvl = (int)iMin + (int)nLevel - 1;
  }

................................................................................
          btCsrReset(&pSub->csr, 1);
          btCsrSetup(db, iRoot, &pSub->csr);

          rc = btCsrSeek(&pSub->csr, 0, pK, nK, BT_SEEK_EQ, BT_CSRSEEK_SEEK);
          assert( rc!=SQLITE4_INEXACT );
          if( rc!=SQLITE4_NOTFOUND ){
            /* A hit on the requested key or an error has occurred. Either
            ** way, break out of the loop. If this is a hit, set iBt to
            ** zero so that the BtCsrKey() and BtCsrData() routines know
            ** to return data from the first (only) sub-cursor. */
            assert( pCsr->iBt<0 );
            if( rc==SQLITE4_OK ){
              if( 0==fiCsrIsDelete(&pSub->csr) ){
                pCsr->iBt = 0;
              }else{
                rc = SQLITE4_NOTFOUND;
              }
................................................................................
        *piMaxLevel = (u32)(iMin + nLevel - 1);
        *piAge = iBestAge;

        /* Find the output level */
        btReadSummary(aNew, iBestAge+1, &iMin, &nLevel, &iMerge);
        *piOutLevel = iMin + nLevel;

#if 0
        /* Update the summary record for the output segment. */
        btWriteSummary(aNew, iBestAge+1, iMin, nLevel+1, iMerge);

        /* Write the updated summary record back to the db. */
        rc = btReplaceEntry(
             db, pHdr->iMRoot, aSummaryKey, sizeof(aSummaryKey), aNew, nByte
        );
#endif
      }
    }
  }

  btCsrReset(&csr, 1);
  return rc;
}
................................................................................
  BtCursor mcsr;                  /* Cursor for reading the meta-tree */
  const void *pKey = 0;
  const u8 *aSum; int nSum;       /* Summary value */
  int nKey = 0;
  sqlite4_buffer buf;
  u32 iLvl;
  int iBlk;
  
  fprintf(stderr, "BEFORE!\n");
  btPrintMetaTree(db->pPager, 1, pHdr);
  assert_summary_ok(db);

  memset(&csr, 0, sizeof(csr));
  memset(&mcsr, 0, sizeof(mcsr));
  btCsrSetup(db, pHdr->iMRoot, &mcsr);
  sqlite4_buffer_init(&buf, 0);
  
  if( p->iNextPg ){
................................................................................
    rc = btCsrEnd(&csr, 0);
    if( rc==SQLITE4_OK ){
      csr.aiCell[0] = p->iNextCell;
      rc = btCsrKey(&csr, &pKey, &nKey);
    }
  }

  /* The following loop iterates through each of the input levels. Each
  ** level is either removed from the database completely (if the merge
  ** completed) or else modified so that it contains no keys smaller
  ** than (pKey/nKey).  */ 
  for(iLvl=p->iMinLevel; iLvl<=p->iMaxLevel; iLvl++){
    u8 aPrefix[8];
    u32 iRoot = 0;
    btPutU32(&aPrefix[0], p->iAge);
    btPutU32(&aPrefix[4], ~iLvl);

    rc = btCsrSeek(&mcsr, 0, aPrefix, sizeof(aPrefix), BT_SEEK_GE, 0);
    if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
    while( rc==SQLITE4_OK ){
      const u8 *pMKey; int nMKey;
      rc = btCsrKey(&mcsr, (const void **)&pMKey, &nMKey);

      if( rc==SQLITE4_OK ){
        if( nMKey<sizeof(aPrefix) || memcmp(aPrefix, pMKey, sizeof(aPrefix)) ){
          rc = SQLITE4_NOTFOUND;
        }else{
          rc = SQLITE4_OK;
        }
      }
      if( rc==SQLITE4_OK ){
        if( pKey ){
          const void *pData; int nData;
          int res = btKeyCompare(pMKey + 8, nMKey - 8, pKey, nKey);
          if( res>=0 ){

            break;
          }
          btCsrData(&mcsr, 0, 4, &pData, &nData);
          iRoot = btGetU32(pData);

        }
        rc = sqlite4BtDelete(&mcsr.base);
      }

      if( rc==SQLITE4_OK ){
        /* rc = btCsrStep(&mcsr, 1); */
        rc = btCsrSeek(&mcsr, 0, aPrefix, sizeof(aPrefix), BT_SEEK_GE, 0);
        if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
      }
    }

    if( rc==SQLITE4_NOTFOUND ){
      rc = SQLITE4_OK;
    }else if( rc==SQLITE4_OK && iRoot ){
      int n = sizeof(aPrefix) + nKey;
      rc = sqlite4_buffer_resize(&buf, n);
      if( rc==SQLITE4_OK ){
        u8 aData[4];
        u8 *a = (u8*)buf.p;
        memcpy(a, aPrefix, sizeof(aPrefix));
        memcpy(&a[sizeof(aPrefix)], pKey, nKey);
................................................................................
      btPutU32(&a[4], ~p->iOutLevel);
      memcpy(&a[8], pKey, nKey);
      btPutU32(aData, p->aRoot[iBlk]);
      rc = btReplaceEntry(db, pHdr->iMRoot, a, nKey+8, aData, sizeof(aData));
    }
  }

  /* Update the summary record with the outcome of the merge operation.
  */
  if( rc==SQLITE4_OK ){
    btCsrReset(&csr, 1);
    rc = fiLoadSummary(db, &csr, &aSum, &nSum);
  }
  if( rc==SQLITE4_OK ){
    rc = sqlite4_buffer_resize(&buf, MAX(nSum, (p->iAge+1+1)*6));
    if( rc==SQLITE4_OK ){
      u16 iMinLevel = 0;
      u16 nLevel = 0;
      u16 iMergeLevel = 0;

      memcpy(buf.p, aSum, nSum);
      if( nSum>(6*(p->iAge+1)) ){
        btReadSummary(aSum, p->iAge+1, &iMinLevel, &nLevel, &iMergeLevel);
      }
      if( (iMinLevel+nLevel)>=p->iOutLevel ){


        nLevel = p->iOutLevel - iMinLevel + 1;

        btWriteSummary((u8*)buf.p, p->iAge+1, iMinLevel, nLevel, iMergeLevel);
      }

      btReadSummary(aSum, p->iAge, &iMinLevel, &nLevel, &iMergeLevel);
      if( p->iNextPg==0 ){
        u16 nNewLevel = nLevel - (1 + p->iMaxLevel - p->iMinLevel);
        iMinLevel = (nNewLevel==0 ? 0 : p->iMaxLevel+1);
        btWriteSummary((u8*)buf.p, p->iAge, iMinLevel, nNewLevel, 0);
      }else{
        btWriteSummary((u8*)buf.p, p->iAge, iMinLevel, nLevel, p->iMaxLevel);
      }

      rc = btReplaceEntry(
          db, pHdr->iMRoot, aSummaryKey, sizeof(aSummaryKey), buf.p, buf.n
      );

    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
  sqlite4_buffer_clear(&buf);

#if 1
  if( rc==SQLITE4_OK ){
    btPrintMetaTree(db->pPager, 1, pHdr);
    assert_summary_ok(db);
  }
#endif
  return rc;
}

/*
** If possible, schedule a merge operation. 
**
** The merge operation is selected based on the following criteria:
................................................................................
  /* Check if the schedule page is busy. If so, no new merge may be 
  ** scheduled. If the schedule page is not busy, call btFindMerge() to
  ** figure out which levels should be scheduled for merge.  */
  if( rc==SQLITE4_OK ){
    aData = sqlite4BtPageData(pPg);
    
    switch( btGetU32(aData) ){

      case BT_SCHEDULE_BUSY:
        rc = SQLITE4_NOTFOUND;
        break;

      case BT_SCHEDULE_DONE: {
        BtSchedule s;
        rc = btReadSchedule(db, aData, &s);
        if( rc==SQLITE4_OK ){
          rc = btIntegrateMerge(db, &s);
        }
        if( rc==SQLITE4_OK ){
          s.eBusy = BT_SCHEDULE_EMPTY;
          btWriteSchedulePage(pPg, &s, &rc);

        }
        break;
      }

      default: /* BT_SCHEDULE_EMPTY */
        break;
    }
................................................................................
**
** No page number is assigned at this point.
*/
static int fiWriterAlloc(FiWriter *p, int bLeaf, FiWriterPg *pPg){
  int rc = SQLITE4_OK;

  assert( pPg->bAllocated==0 );
  assert( p->nAlloc<p->nPgPerBlk );

  pPg->bAllocated = 1;
  p->nAlloc++;
  pPg->nCell = 0;

  if( pPg->aBuf==0 ){
    rc = btNewBuffer(p->db, &pPg->aBuf);
  }
................................................................................
    if( bLeaf ){
      pPg->iFree = 1;
      pPg->aBuf[0] = 0x00;
    }else{
      pPg->iFree = 5;
      pPg->aBuf[0] = BT_PGFLAGS_INTERNAL;
    }
    btPutU16(&pPg->aBuf[p->pgsz-2], 0);
  }
  return rc;
}

static int fiWriterPush(FiWriter *p, const u8 *pKey, int nKey){
  int rc = SQLITE4_OK;
  int i;                          /* Iterator variable */
................................................................................
    /* Check if the key will fit on page FiWriter.aHier[iIns]. If so,
    ** break out of the loop. */
    int nFree;
    pPg = &p->aHier[iIns];
    nFree = p->pgsz - (pPg->iFree + 6 + pPg->nCell*2);
    if( nFree>=nByte ) break;
  }

  if( (iIns + (iIns==p->nHier))>=nMaxAlloc ){
    rc = BT_BLOCKFULL;
  }
  if( rc==SQLITE4_OK && iIns==p->nHier ){
    p->nHier = iIns+1;
    rc = fiWriterAlloc(p, 0, &p->aHier[iIns]);
  }





  for(i=0; rc==SQLITE4_OK && i<iIns; i++){
    pPg = &p->aHier[i];
    if( i!=0 ){
      btPutU32(&pPg->aBuf[1], pgno);
    }
    rc = fiWriterFlush(p, pPg, &pgno);
    if( rc==SQLITE4_OK ){