SQLite4
Check-in [9f83998312]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
SHA1 Hash:9f8399831277b31dc3dc471e6c0eac4e88d1ce63
Date: 2014-01-09 20:36:11
User: dan
Comment:Fix even more bugs in merging.
Tags And Properties
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/bt_main.c

119
120
121
122
123
124
125



126
127
128
129
130
131
132
...
698
699
700
701
702
703
704
705
706

707
708
709





710






























711
712
713
714
715
716
717
...
783
784
785
786
787
788
789























790
791

792
793
794
795
796
797
798
....
1447
1448
1449
1450
1451
1452
1453














1454
1455
1456
1457
1458
1459
1460
....
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487

1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
....
1513
1514
1515
1516
1517
1518
1519

1520
1521
1522
1523
1524
1525
1526
....
1557
1558
1559
1560
1561
1562
1563

1564
1565
1566
1567
1568
1569
1570
....
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
....
1649
1650
1651
1652
1653
1654
1655


1656
1657
1658
1659
1660
1661
1662
....
1663
1664
1665
1666
1667
1668
1669
1670
1671




1672
1673
1674
1675
1676
1677
1678
....
1701
1702
1703
1704
1705
1706
1707


1708
1709
1710
1711
1712
1713
1714
....
1717
1718
1719
1720
1721
1722
1723
1724




1725
1726
1727
1728


1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739







1740
1741
1742
1743


1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
....
1834
1835
1836
1837
1838
1839
1840

1841

1842
1843
1844
1845
1846
1847
1848
....
2630
2631
2632
2633
2634
2635
2636

2637
2638
2639
2640
2641
2642
2643
....
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
....
2987
2988
2989
2990
2991
2992
2993

2994
2995
2996
2997
2998
2999
3000
....
3174
3175
3176
3177
3178
3179
3180

3181
3182
3183
3184
3185
3186
3187
3188
....
3191
3192
3193
3194
3195
3196
3197

3198
3199
3200
3201
3202
3203
3204
....
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
....
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
....
3804
3805
3806
3807
3808
3809
3810






3811
3812
3813
3814
3815
3816
3817
....
3956
3957
3958
3959
3960
3961
3962

3963
3964
3965
3966
3967
3968
3969
....
4130
4131
4132
4133
4134
4135
4136

4137
4138
4139
4140
4141
4142
4143
....
4152
4153
4154
4155
4156
4157
4158

4159
4160
4161
4162
4163
4164
4165
....
4461
4462
4463
4464
4465
4466
4467

4468
4469
4470
4471
4472
4473
4474
....
4753
4754
4755
4756
4757
4758
4759
4760




/*
** TODO: Rearrange things so these are not required!
*/
static int fiLoadSummary(bt_db *, BtCursor *, const u8 **, int *);
static void btReadSummary(const u8 *, int, u16 *, u16 *, u16 *);
static int btCsrData(BtCursor *, int, int, const void **, int *);
static int btReadSchedule(bt_db *, u8 *, BtSchedule *);




/* 
** Meta-table summary key.
*/
static const u8 aSummaryKey[] = {0xFF, 0xFF, 0xFF, 0xFF};

#ifndef btErrorBkpt
................................................................................
  btPageToAscii(pHdr->iMRoot, bAscii, pPager, aData, nData, &buf);
  sqlite4_buffer_append(&buf, "", 1);

  fprintf(stderr, "%s", (char*)buf.p);
  sqlite4_buffer_clear(&buf);
  sqlite4BtPageRelease(pPg);
}
#endif


static int btCsrEnd(BtCursor *pCsr, int bLast);
static int btCsrStep(BtCursor *pCsr, int bNext);
static int btCsrKey(BtCursor *pCsr, const void **ppK, int *pnK);




































#ifndef NDEBUG
/*
** This function is really just a big assert() statement. It contributes 
** nothing to the operation of the library.
**
** The assert() fails if the summary-record is not consistent with the
** actual contents of the meta-tree.
................................................................................
      assert( iMin==0 && nLevel==0 );
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
}























#else
# define assert_summary_ok(x, rc) 

#endif


/*
** This function compares the key passed via parameters pK and nK to the
** key that cursor pCsr currently points to.
**
................................................................................
    }
  }

  if( iMin<0 && rc==SQLITE4_OK ) rc = SQLITE4_NOTFOUND;
  pCsr->iBt = iMin;
  return rc;
}















/*
** Return SQLITE4_OK if the cursor is successfully stepped, or 
** SQLITE4_NOTFOUND if an EOF is encountered.
**
** If an error occurs (e.g. an IO error or OOM condition), return the
** relevant error code.
................................................................................
#endif

    rc = btCsrStep(&pSub->mcsr, bNext);
    if( rc==SQLITE4_OK ){
      rc = btCsrKey(&pSub->mcsr, &pV, &nV);
    }
    if( rc==SQLITE4_OK ){
      if( nV<sizeof(pSub->aPrefix) 
       || memcmp(pSub->aPrefix, pV, sizeof(pSub->aPrefix)) 
      ){

        rc = SQLITE4_NOTFOUND;
      }else{
        rc = btCsrData(&pSub->mcsr, 0, 4, &pV, &nV);
      }
    }
    if( rc==SQLITE4_OK ){
      pSub->csr.iRoot = sqlite4BtGetU32((const u8*)pV);
      rc = btCsrEnd(&pSub->csr, !bNext);
    }
  }

................................................................................
#ifndef NDEBUG
  sqlite4_buffer buf;
  sqlite4_buffer_init(&buf, 0);
  sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
  sqlite4_buffer_set(&buf, pKey, nKey);
#endif


  assert( pCsr->base.flags & (CSR_NEXT_OK | CSR_PREV_OK) );
  assert( pCsr->iBt>=0 );

  do{
    /* Load the current key in to pKey/nKey. Then advance all sub-cursors 
    ** that share a key with the current sub-cursor. */
    rc = sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
................................................................................
#ifndef NDEBUG
  if( rc==SQLITE4_OK ){
    sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
    assert( btKeyCompare(buf.p, buf.n, pKey, nKey) * (bNext?1:-1) < 0 );
  }
  sqlite4_buffer_clear(&buf);
#endif


  return rc;
}

typedef struct FiLevelIter FiLevelIter;
struct FiLevelIter {
  /* Used internally */
................................................................................
  btPutU32(&aPrefix[4], ~(u32)iLvl);
}

/*
** Seek a fast-insert cursor.
*/
static int fiCsrSeek(FiCursor *pCsr, const void *pK, int nK, int eSeek){
  u8 aPrefix[8];
  int rc = SQLITE4_NOTFOUND;      /* Return code */
  bt_db *db = pCsr->base.pDb;     /* Database handle */
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);

  assert( eSeek==BT_SEEK_LE || eSeek==BT_SEEK_EQ || eSeek==BT_SEEK_GE );
  assert( (pCsr->base.flags & CSR_VISIT_DEL)==0 || eSeek==BT_SEEK_GE );
  fiCsrReset(pCsr);
................................................................................
    /* Initialize the iterator used to skip through database levels */
    rc = fiLevelIterInit(db, &iter);
    if( rc!=SQLITE4_OK ) return rc;

    if( eSeek==BT_SEEK_EQ ){
      FiSubCursor *pSub;
      BtCursor *pM;



      /* A BT_SEEK_EQ is a special case. There is no need to set up a cursor
      ** that can be advanced (in either direction) in this case. All that
      ** is required is to search each level in order for the requested key 
      ** (or a corresponding delete marker). Once a match is found, there
      ** is no need to search any further. As a result, only a single
      ** sub-cursor is required.  */
................................................................................
      rc = fiCsrAllocateSubs(db, pCsr, 1);
      pSub = pCsr->aSub;
      pM = &pSub->mcsr;

      btCsrSetup(db, pHdr->iMRoot, pM);
      while( 0==fiLevelIterNext(&iter) ){

        fiFormatPrefix(aPrefix, iter.iAge, iter.iLvl);
        rc = btCsrSeek(pM, aPrefix, pK, nK, BT_SEEK_LE, BT_CSRSEEK_SEEK);





        if( rc==SQLITE4_NOTFOUND ){
          /* All keys in this level are greater than pK/nK. */
          /* no-op */
        }else if( rc==SQLITE4_OK || rc==SQLITE4_INEXACT ){
          const void *pV;
          int nV;
................................................................................
          }
        }
      }
    }else{
      int bMatch = 0;           /* Found an exact match */
      int bHit = 0;             /* Found at least one entry */



      /* Allocate required sub-cursors. */
      if( rc==SQLITE4_OK ){
        rc = fiCsrAllocateSubs(db, pCsr, iter.nSub);
      }

      /* This loop runs once for each sub-cursor */
      while( rc==SQLITE4_OK && 0==fiLevelIterNext(&iter) ){
................................................................................
        btCsrSetup(db, pHdr->iMRoot, pM);

        btPutU32(&pSub->aPrefix[0], (u32)iter.iAge);
        btPutU32(&pSub->aPrefix[4], ~(u32)iter.iLvl);

        rc = btCsrSeek(pM, pSub->aPrefix, pK, nK, BT_SEEK_LE, BT_CSRSEEK_SEEK);
        if( rc==SQLITE4_NOTFOUND && eSeek==BT_SEEK_GE ){
          rc = btCsrEnd(pM, 0);




        }

        if( rc==SQLITE4_NOTFOUND ){
          /* No keys to visit in this level */


          rc = SQLITE4_OK;
        }else if( rc==SQLITE4_OK || rc==SQLITE4_INEXACT ){
          const void *pV;
          int nV;
          u32 iRoot;
          sqlite4BtCsrData(&pM->base, 0, 4, &pV, &nV);
          iRoot = sqlite4BtGetU32((const u8*)pV);
          btCsrReset(&pSub->csr, 1);
          btCsrSetup(db, iRoot, &pSub->csr);

          rc = btCsrSeek(&pSub->csr, 0, pK, nK, eSeek, BT_CSRSEEK_SEEK);







          if( rc==SQLITE4_OK ) bMatch = 1;
          if( rc==SQLITE4_INEXACT ) bHit = 1;
          if( rc==SQLITE4_INEXACT || rc==SQLITE4_NOTFOUND ) rc = SQLITE4_OK;



        }else{
          /* An error */
        }
      }
      assert( rc!=SQLITE4_OK || iter.iSub==iter.nSub );

      if( rc==SQLITE4_OK ){
        pCsr->base.flags |= (eSeek==BT_SEEK_GE ? CSR_NEXT_OK : CSR_PREV_OK);
        rc = fiCsrSetCurrent(pCsr);
        if( rc==SQLITE4_OK ){
          if( fiCsrIsDelete(pCsr) ){
            rc = fiCsrStep(pCsr);
            if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT;
          }else if( bMatch==0 ){
            rc = (bHit ? SQLITE4_INEXACT : SQLITE4_NOTFOUND);
................................................................................
){
  int rc;
  btCheckPageRefs(pBase->pDb);
  if( IsBtCsr(pBase) ){
    BtCursor *pCsr = (BtCursor*)pBase;
    rc = btCsrSeek(pCsr, 0, pK, nK, eSeek, BT_CSRSEEK_SEEK);
  }else{

    rc = fiCsrSeek((FiCursor*)pBase, pK, nK, eSeek);

  }
  btCheckPageRefs(pBase->pDb);
  return rc;
}

/*
** Position cursor pCsr to point to the smallest key in the database.
................................................................................
  return rc;
}

typedef struct BalanceCtx BalanceCtx;
struct BalanceCtx {
  int pgsz;                       /* Database page size */
  int bLeaf;                      /* True if we are rebalancing leaf data */

  BtCursor *pCsr;                 /* Cursor identifying where to insert pKV */
  int nKV;                        /* Number of KV pairs */
  KeyValue *apKV;                 /* New KV pairs being inserted */

  /* Populated by btGatherSiblings */
  int nIn;                        /* Number of sibling pages */
  BtPage *apPg[5];                /* Array of sibling pages */
................................................................................
    btPutU16(&aOut[p->pgsz-6], iOff);

    if( (iCell+1)==p->anOut[p->iOut] ){
      /* That was the last cell for this page. Fill in the rest of the 
      ** output page footer and the flags byte at the start of the page.  */
      int nFree;                    /* Free space remaining on output page */
      nFree = p->pgsz - iOff - (6 + 2*(nCell+1));
      aOut[0] = (p->bLeaf ? 0 : BT_PGFLAGS_INTERNAL);
      btPutU16(&aOut[p->pgsz-4], nFree);

      /* If the siblings are leaf pages, increment BalanceCtx.iOut here.
      ** for internal nodes, it will be incremented by the next call to
      ** this function, after a divider cell is pushed into the parent 
      ** node.  */
      p->iOut += p->bLeaf;
................................................................................
  BalanceCtx ctx;
  memset(&ctx, 0, sizeof(ctx));
  ctx.pCsr = pCsr;
  ctx.nKV = nKV;
  ctx.apKV = apKV;
  ctx.pgsz = pgsz;
  ctx.bLeaf = bLeaf;


  memset(anByteOut, 0, sizeof(anByteOut));

  /* Gather the sibling pages from which cells will be redistributed into
  ** the ctx.apPg[] array.  */
  assert( bLeaf==0 || bLeaf==1 );
  assert( pCsr->nPg>1 );
................................................................................
  btFreeBuffer(pDb, ctx.pTmp);
  sqlite4_free(pDb->pEnv, ctx.anCellSz);
  return rc;
}

static int btExtendTree(BtCursor *pCsr){
  bt_db * const pDb = pCsr->base.pDb;

  const int pgsz = sqlite4BtPagerPagesize(pDb->pPager);
  int rc;                         /* Return code */
  BtPage *pNew;                   /* New (and only) child of root page */
  BtPage *pRoot = pCsr->apPage[0];

  assert( pCsr->nPg==1 );

  rc = sqlite4BtPageWrite(pRoot);
................................................................................
  }
  if( rc==SQLITE4_OK ){
    u8 *aRoot = sqlite4BtPageData(pRoot);
    u8 *aData = sqlite4BtPageData(pNew);

    memcpy(aData, aRoot, pgsz);
    aRoot[0] = BT_PGFLAGS_INTERNAL;

    btPutU32(&aRoot[1], sqlite4BtPagePgno(pNew));
    btPutU16(&aRoot[pgsz-2], 0);
    btPutU16(&aRoot[pgsz-4], 5);
    btPutU16(&aRoot[pgsz-6], pgsz - 5 - 6);

    pCsr->nPg = 2;
    pCsr->aiCell[1] = pCsr->aiCell[0];
................................................................................
      u8 *aNew;
      int nByte = nSum;
      if( iBestAge+1>=(nSum/6) ) nByte += 6;

      rc = SQLITE4_OK;
      aNew = (u8*)btMalloc(db, nByte, &rc);
      if( rc==SQLITE4_OK ){
        BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);

        /* Create a copy of the summary record */
        memcpy(aNew, aSum, nSum);
        if( nByte>nSum ) btWriteSummary(aNew, iBestAge+1, 0, 0, 0);

        /* Find the input age and maximum level */
        btReadSummary(aSum, iBestAge, &iMin, &nLevel, &iMerge);
................................................................................
        *piMinLevel = (u32)iMin;
        *piMaxLevel = (u32)(iMin + nLevel - 1);
        *piAge = iBestAge;

        /* Find the output level */
        btReadSummary(aNew, iBestAge+1, &iMin, &nLevel, &iMerge);
        *piOutLevel = iMin + nLevel;

#if 0
        /* Update the summary record for the output segment. */
        btWriteSummary(aNew, iBestAge+1, iMin, nLevel+1, iMerge);

        /* Write the updated summary record back to the db. */
        rc = btReplaceEntry(
             db, pHdr->iMRoot, aSummaryKey, sizeof(aSummaryKey), aNew, nByte
        );
#endif
      }
    }
  }

  btCsrReset(&csr, 1);
  return rc;
}
................................................................................
  BtCursor mcsr;                  /* Cursor for reading the meta-tree */
  const void *pKey = 0;
  const u8 *aSum; int nSum;       /* Summary value */
  int nKey = 0;
  sqlite4_buffer buf;
  u32 iLvl;
  int iBlk;






  
#if 0
  fprintf(stderr, "BEFORE!\n");
  btPrintMetaTree(db->pPager, 1, pHdr);
#endif
  assert_summary_ok(db, SQLITE4_OK);

................................................................................

#if 0
  if( rc==SQLITE4_OK ){
    btPrintMetaTree(db->pPager, 1, pHdr);
  }
#endif
  assert_summary_ok(db, SQLITE4_OK);

  return rc;
}

/*
** If possible, schedule a merge operation. 
**
** The merge operation is selected based on the following criteria:
................................................................................

    if( rc==SQLITE4_INEXACT ){
      const int nPrefix = sizeof(pSub->aPrefix);
      rc = btCsrKey(pM, &pKey, &nKey);
      if( rc==SQLITE4_OK ){
        if( nKey<nPrefix || memcmp(pKey, pSub->aPrefix, nPrefix) ){
          /* Level is completely empty. Nothing to do for this level. */

          rc = SQLITE4_NOTFOUND;
        }else{
          nKey -= nPrefix;
          pKey = (const void*)(((const u8*)pKey) + nPrefix);
        }
      }
    }
................................................................................
        btCsrSetup(db, iRoot, &pSub->csr);
        rc = btCsrSeek(&pSub->csr, 0, pKey, nKey, BT_SEEK_GE, 0);
        if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
        if( rc==SQLITE4_NOTFOUND ) rc = btErrorBkpt(SQLITE4_CORRUPT);
      }
    }else if( rc==SQLITE4_NOTFOUND ){
      assert( pSub->csr.nPg==0 );

      rc = SQLITE4_OK;
    }
  }

  if( rc==SQLITE4_OK ){
    rc = fiCsrSetCurrent(pCsr);
  }
................................................................................
  btReadSchedule(db, aSched, &s);
  if( s.eBusy==BT_SCHEDULE_BUSY ){
    FiCursor fcsr;                /* FiCursor used to read input */
    FiWriter writer;              /* FiWriter used to write output */

    rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);
    assert( rc!=SQLITE4_NOTFOUND );

    fiWriterInit(db, &s, &writer, &rc);

    /* The following loop runs once for each key copied from the input to
    ** the output segments. It terminates either when the input is exhausted
    ** or when all available output blocks are full.  */
    while( rc==SQLITE4_OK ){
      const void *pCell;          /* Cell to copy to output */
................................................................................
      db->bFastInsertOp = 1;
      break;
    }
  }

  return rc;
}












>
>
>







 







<

>
|
|
|
>
>
>
>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|
<
<
>
|
<
|
<







 







>







 







>







 







<







 







>
>







 







|
|
>
>
>
>







 







>
>







 







|
>
>
>
>




>
>











>
>
>
>
>
>
>
|
|
<
|
>
>







<







 







>
|
>







 







>







 







|







 







>







 







>
|







 







>







 







<







 







<
<
<
<
<
<
<
<
<
<







 







>
>
>
>
>
>







 







>







 







>







 







>







 







>







 








>
>
>
>
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
...
701
702
703
704
705
706
707

708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
...
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
....
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
....
1554
1555
1556
1557
1558
1559
1560
1561


1562
1563

1564

1565
1566
1567
1568
1569
1570
1571
....
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
....
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
....
1705
1706
1707
1708
1709
1710
1711

1712
1713
1714
1715
1716
1717
1718
....
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
....
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
....
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
....
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836

1837
1838
1839
1840
1841
1842
1843
1844
1845
1846

1847
1848
1849
1850
1851
1852
1853
....
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
....
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
....
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
....
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
....
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
....
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
....
3851
3852
3853
3854
3855
3856
3857

3858
3859
3860
3861
3862
3863
3864
....
3865
3866
3867
3868
3869
3870
3871










3872
3873
3874
3875
3876
3877
3878
....
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
....
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065
4066
....
4227
4228
4229
4230
4231
4232
4233
4234
4235
4236
4237
4238
4239
4240
4241
....
4250
4251
4252
4253
4254
4255
4256
4257
4258
4259
4260
4261
4262
4263
4264
....
4560
4561
4562
4563
4564
4565
4566
4567
4568
4569
4570
4571
4572
4573
4574
....
4853
4854
4855
4856
4857
4858
4859
4860
4861
4862
4863
4864
/*
** TODO: Rearrange things so these are not required!
*/
static int fiLoadSummary(bt_db *, BtCursor *, const u8 **, int *);
static void btReadSummary(const u8 *, int, u16 *, u16 *, u16 *);
static int btCsrData(BtCursor *, int, int, const void **, int *);
static int btReadSchedule(bt_db *, u8 *, BtSchedule *);
static int btCsrEnd(BtCursor *pCsr, int bLast);
static int btCsrStep(BtCursor *pCsr, int bNext);
static int btCsrKey(BtCursor *pCsr, const void **ppK, int *pnK);

/* 
** Meta-table summary key.
*/
static const u8 aSummaryKey[] = {0xFF, 0xFF, 0xFF, 0xFF};

#ifndef btErrorBkpt
................................................................................
  btPageToAscii(pHdr->iMRoot, bAscii, pPager, aData, nData, &buf);
  sqlite4_buffer_append(&buf, "", 1);

  fprintf(stderr, "%s", (char*)buf.p);
  sqlite4_buffer_clear(&buf);
  sqlite4BtPageRelease(pPg);
}



static void btDumpCsr(sqlite4_buffer *pBuf, BtCursor *pCsr){
  assert( pCsr->nPg>=0 );
  if( pCsr->nPg==0 ){
    sqlite4BtBufAppendf(pBuf, "EOF");
  }else{
    int rc;
    const void *pKey = 0;
    int nKey = 0;

    rc = btCsrKey(pCsr, &pKey, &nKey);
    assert( rc==SQLITE4_OK );
    btBufferAppendBlob(pBuf, 0, (u8*)pKey, nKey);
  }
}

static void fiDumpCsr(FiCursor *pCsr){
  int iBt;

  sqlite4_buffer buf;
  sqlite4_buffer_init(&buf, 0);
  for(iBt=0; iBt<pCsr->nBt; iBt++){
    FiSubCursor *pSub = &pCsr->aSub[iBt];

    sqlite4BtBufAppendf(&buf, "%d prefix: ", iBt);
    btBufferAppendBlob(&buf, 0, pSub->aPrefix, sizeof(pSub->aPrefix));

    sqlite4BtBufAppendf(&buf, "\n%d mcsr  : ", iBt);
    btDumpCsr(&buf, &pSub->mcsr);

    sqlite4BtBufAppendf(&buf, "\n%d csr   : ", iBt);
    btDumpCsr(&buf, &pSub->csr);
    sqlite4BtBufAppendf(&buf, "\n");
  }

  fprintf(stderr, "%s", (char*)buf.p);
  sqlite4_buffer_clear(&buf);
}
#endif

#ifndef NDEBUG
/*
** This function is really just a big assert() statement. It contributes 
** nothing to the operation of the library.
**
** The assert() fails if the summary-record is not consistent with the
** actual contents of the meta-tree.
................................................................................
      assert( iMin==0 && nLevel==0 );
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
}

static void assert_ficursor_ok(FiCursor *p, int crc){
  int iBt;

  if( crc!=SQLITE4_OK && crc!=SQLITE4_NOTFOUND && crc!=SQLITE4_INEXACT ) return;
  if( (p->base.flags & (CSR_NEXT_OK|CSR_PREV_OK))==0 ) return;

  for(iBt=0; iBt<p->nBt; iBt++){
    FiSubCursor *pSub = &p->aSub[iBt];

    if( pSub->mcsr.nPg>0 ){
      const void *pKey = 0; 
      int nKey = 0;
      assert( pSub->csr.nPg>0 );

      btCsrKey(&pSub->mcsr, &pKey, &nKey);
      assert( nKey>=8 && 0==memcmp(pSub->aPrefix, pKey, 8) );
    }else{
      assert( pSub->csr.nPg==0 );
    }
  }
}

#else
# define assert_summary_ok(x, rc) 
# define assert_ficursor_ok(p, rc)
#endif


/*
** This function compares the key passed via parameters pK and nK to the
** key that cursor pCsr currently points to.
**
................................................................................
    }
  }

  if( iMin<0 && rc==SQLITE4_OK ) rc = SQLITE4_NOTFOUND;
  pCsr->iBt = iMin;
  return rc;
}

static int fiSubCsrCheckPrefix(FiSubCursor *pSub){
  const int nPrefix = sizeof(pSub->aPrefix);
  const void *pK = 0;
  int nK;
  int rc;

  rc = btCsrKey(&pSub->mcsr, &pK, &nK);
  if( rc==SQLITE4_OK && (nK<nPrefix || memcmp(pK, pSub->aPrefix, nPrefix)) ){
    rc = SQLITE4_NOTFOUND;
    btCsrReset(&pSub->mcsr, 0);
  }
  return rc;
}

/*
** Return SQLITE4_OK if the cursor is successfully stepped, or 
** SQLITE4_NOTFOUND if an EOF is encountered.
**
** If an error occurs (e.g. an IO error or OOM condition), return the
** relevant error code.
................................................................................
#endif

    rc = btCsrStep(&pSub->mcsr, bNext);
    if( rc==SQLITE4_OK ){
      rc = btCsrKey(&pSub->mcsr, &pV, &nV);
    }
    if( rc==SQLITE4_OK ){
      rc = fiSubCsrCheckPrefix(pSub);


    }
    if( rc==SQLITE4_OK ){

      rc = btCsrData(&pSub->mcsr, 0, 4, &pV, &nV);

    }
    if( rc==SQLITE4_OK ){
      pSub->csr.iRoot = sqlite4BtGetU32((const u8*)pV);
      rc = btCsrEnd(&pSub->csr, !bNext);
    }
  }

................................................................................
#ifndef NDEBUG
  sqlite4_buffer buf;
  sqlite4_buffer_init(&buf, 0);
  sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
  sqlite4_buffer_set(&buf, pKey, nKey);
#endif

  assert_ficursor_ok(pCsr, rc);
  assert( pCsr->base.flags & (CSR_NEXT_OK | CSR_PREV_OK) );
  assert( pCsr->iBt>=0 );

  do{
    /* Load the current key in to pKey/nKey. Then advance all sub-cursors 
    ** that share a key with the current sub-cursor. */
    rc = sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
................................................................................
#ifndef NDEBUG
  if( rc==SQLITE4_OK ){
    sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
    assert( btKeyCompare(buf.p, buf.n, pKey, nKey) * (bNext?1:-1) < 0 );
  }
  sqlite4_buffer_clear(&buf);
#endif
  assert_ficursor_ok(pCsr, rc);

  return rc;
}

typedef struct FiLevelIter FiLevelIter;
struct FiLevelIter {
  /* Used internally */
................................................................................
  btPutU32(&aPrefix[4], ~(u32)iLvl);
}

/*
** Seek a fast-insert cursor.
*/
static int fiCsrSeek(FiCursor *pCsr, const void *pK, int nK, int eSeek){

  int rc = SQLITE4_NOTFOUND;      /* Return code */
  bt_db *db = pCsr->base.pDb;     /* Database handle */
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);

  assert( eSeek==BT_SEEK_LE || eSeek==BT_SEEK_EQ || eSeek==BT_SEEK_GE );
  assert( (pCsr->base.flags & CSR_VISIT_DEL)==0 || eSeek==BT_SEEK_GE );
  fiCsrReset(pCsr);
................................................................................
    /* Initialize the iterator used to skip through database levels */
    rc = fiLevelIterInit(db, &iter);
    if( rc!=SQLITE4_OK ) return rc;

    if( eSeek==BT_SEEK_EQ ){
      FiSubCursor *pSub;
      BtCursor *pM;

      pCsr->base.flags &= ~(CSR_NEXT_OK | CSR_PREV_OK);

      /* A BT_SEEK_EQ is a special case. There is no need to set up a cursor
      ** that can be advanced (in either direction) in this case. All that
      ** is required is to search each level in order for the requested key 
      ** (or a corresponding delete marker). Once a match is found, there
      ** is no need to search any further. As a result, only a single
      ** sub-cursor is required.  */
................................................................................
      rc = fiCsrAllocateSubs(db, pCsr, 1);
      pSub = pCsr->aSub;
      pM = &pSub->mcsr;

      btCsrSetup(db, pHdr->iMRoot, pM);
      while( 0==fiLevelIterNext(&iter) ){

        fiFormatPrefix(pSub->aPrefix, iter.iAge, iter.iLvl);
        rc = btCsrSeek(pM, pSub->aPrefix, pK, nK, BT_SEEK_LE, BT_CSRSEEK_SEEK);

        if( rc==SQLITE4_INEXACT ){
          rc = fiSubCsrCheckPrefix(pSub);
        }

        if( rc==SQLITE4_NOTFOUND ){
          /* All keys in this level are greater than pK/nK. */
          /* no-op */
        }else if( rc==SQLITE4_OK || rc==SQLITE4_INEXACT ){
          const void *pV;
          int nV;
................................................................................
          }
        }
      }
    }else{
      int bMatch = 0;           /* Found an exact match */
      int bHit = 0;             /* Found at least one entry */

      pCsr->base.flags |= (eSeek==BT_SEEK_GE ? CSR_NEXT_OK : CSR_PREV_OK);

      /* Allocate required sub-cursors. */
      if( rc==SQLITE4_OK ){
        rc = fiCsrAllocateSubs(db, pCsr, iter.nSub);
      }

      /* This loop runs once for each sub-cursor */
      while( rc==SQLITE4_OK && 0==fiLevelIterNext(&iter) ){
................................................................................
        btCsrSetup(db, pHdr->iMRoot, pM);

        btPutU32(&pSub->aPrefix[0], (u32)iter.iAge);
        btPutU32(&pSub->aPrefix[4], ~(u32)iter.iLvl);

        rc = btCsrSeek(pM, pSub->aPrefix, pK, nK, BT_SEEK_LE, BT_CSRSEEK_SEEK);
        if( rc==SQLITE4_NOTFOUND && eSeek==BT_SEEK_GE ){
          rc = btCsrSeek(pM, pSub->aPrefix, 0, 0, BT_SEEK_GE, BT_CSRSEEK_SEEK);
        }

        if( rc==SQLITE4_INEXACT ){
          rc = fiSubCsrCheckPrefix(pSub);
        }

        if( rc==SQLITE4_NOTFOUND ){
          /* No keys to visit in this level */
          assert( pSub->mcsr.nPg==0 );
          assert( pSub->csr.nPg==0 );
          rc = SQLITE4_OK;
        }else if( rc==SQLITE4_OK || rc==SQLITE4_INEXACT ){
          const void *pV;
          int nV;
          u32 iRoot;
          sqlite4BtCsrData(&pM->base, 0, 4, &pV, &nV);
          iRoot = sqlite4BtGetU32((const u8*)pV);
          btCsrReset(&pSub->csr, 1);
          btCsrSetup(db, iRoot, &pSub->csr);

          rc = btCsrSeek(&pSub->csr, 0, pK, nK, eSeek, BT_CSRSEEK_SEEK);
          if( rc==SQLITE4_NOTFOUND ){
            if( eSeek==BT_SEEK_LE ){
              rc = fiSubCsrStep(pCsr, pSub, 0);
            }else{
              btCsrReset(pM, 0);
            }
          }else{
            if( rc==SQLITE4_OK ) bMatch = 1;
            if( rc==SQLITE4_INEXACT ) bHit = 1;

          }

          if( rc==SQLITE4_INEXACT || rc==SQLITE4_NOTFOUND ) rc = SQLITE4_OK;
        }else{
          /* An error */
        }
      }
      assert( rc!=SQLITE4_OK || iter.iSub==iter.nSub );

      if( rc==SQLITE4_OK ){

        rc = fiCsrSetCurrent(pCsr);
        if( rc==SQLITE4_OK ){
          if( fiCsrIsDelete(pCsr) ){
            rc = fiCsrStep(pCsr);
            if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT;
          }else if( bMatch==0 ){
            rc = (bHit ? SQLITE4_INEXACT : SQLITE4_NOTFOUND);
................................................................................
){
  int rc;
  btCheckPageRefs(pBase->pDb);
  if( IsBtCsr(pBase) ){
    BtCursor *pCsr = (BtCursor*)pBase;
    rc = btCsrSeek(pCsr, 0, pK, nK, eSeek, BT_CSRSEEK_SEEK);
  }else{
    FiCursor *pCsr = (FiCursor*)pBase;
    rc = fiCsrSeek(pCsr, pK, nK, eSeek);
    assert_ficursor_ok(pCsr, rc);
  }
  btCheckPageRefs(pBase->pDb);
  return rc;
}

/*
** Position cursor pCsr to point to the smallest key in the database.
................................................................................
  return rc;
}

typedef struct BalanceCtx BalanceCtx;
struct BalanceCtx {
  int pgsz;                       /* Database page size */
  int bLeaf;                      /* True if we are rebalancing leaf data */
  u8 flags;                       /* Flags byte for new sibling pages */
  BtCursor *pCsr;                 /* Cursor identifying where to insert pKV */
  int nKV;                        /* Number of KV pairs */
  KeyValue *apKV;                 /* New KV pairs being inserted */

  /* Populated by btGatherSiblings */
  int nIn;                        /* Number of sibling pages */
  BtPage *apPg[5];                /* Array of sibling pages */
................................................................................
    btPutU16(&aOut[p->pgsz-6], iOff);

    if( (iCell+1)==p->anOut[p->iOut] ){
      /* That was the last cell for this page. Fill in the rest of the 
      ** output page footer and the flags byte at the start of the page.  */
      int nFree;                    /* Free space remaining on output page */
      nFree = p->pgsz - iOff - (6 + 2*(nCell+1));
      aOut[0] = p->flags;
      btPutU16(&aOut[p->pgsz-4], nFree);

      /* If the siblings are leaf pages, increment BalanceCtx.iOut here.
      ** for internal nodes, it will be incremented by the next call to
      ** this function, after a divider cell is pushed into the parent 
      ** node.  */
      p->iOut += p->bLeaf;
................................................................................
  BalanceCtx ctx;
  memset(&ctx, 0, sizeof(ctx));
  ctx.pCsr = pCsr;
  ctx.nKV = nKV;
  ctx.apKV = apKV;
  ctx.pgsz = pgsz;
  ctx.bLeaf = bLeaf;
  ctx.flags = *(u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);

  memset(anByteOut, 0, sizeof(anByteOut));

  /* Gather the sibling pages from which cells will be redistributed into
  ** the ctx.apPg[] array.  */
  assert( bLeaf==0 || bLeaf==1 );
  assert( pCsr->nPg>1 );
................................................................................
  btFreeBuffer(pDb, ctx.pTmp);
  sqlite4_free(pDb->pEnv, ctx.anCellSz);
  return rc;
}

static int btExtendTree(BtCursor *pCsr){
  bt_db * const pDb = pCsr->base.pDb;
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(pDb->pPager);
  const int pgsz = pHdr->pgsz;
  int rc;                         /* Return code */
  BtPage *pNew;                   /* New (and only) child of root page */
  BtPage *pRoot = pCsr->apPage[0];

  assert( pCsr->nPg==1 );

  rc = sqlite4BtPageWrite(pRoot);
................................................................................
  }
  if( rc==SQLITE4_OK ){
    u8 *aRoot = sqlite4BtPageData(pRoot);
    u8 *aData = sqlite4BtPageData(pNew);

    memcpy(aData, aRoot, pgsz);
    aRoot[0] = BT_PGFLAGS_INTERNAL;
    if( pHdr->iMRoot==pCsr->iRoot ) aRoot[0] |= BT_PGFLAGS_METATREE;
    btPutU32(&aRoot[1], sqlite4BtPagePgno(pNew));
    btPutU16(&aRoot[pgsz-2], 0);
    btPutU16(&aRoot[pgsz-4], 5);
    btPutU16(&aRoot[pgsz-6], pgsz - 5 - 6);

    pCsr->nPg = 2;
    pCsr->aiCell[1] = pCsr->aiCell[0];
................................................................................
      u8 *aNew;
      int nByte = nSum;
      if( iBestAge+1>=(nSum/6) ) nByte += 6;

      rc = SQLITE4_OK;
      aNew = (u8*)btMalloc(db, nByte, &rc);
      if( rc==SQLITE4_OK ){


        /* Create a copy of the summary record */
        memcpy(aNew, aSum, nSum);
        if( nByte>nSum ) btWriteSummary(aNew, iBestAge+1, 0, 0, 0);

        /* Find the input age and maximum level */
        btReadSummary(aSum, iBestAge, &iMin, &nLevel, &iMerge);
................................................................................
        *piMinLevel = (u32)iMin;
        *piMaxLevel = (u32)(iMin + nLevel - 1);
        *piAge = iBestAge;

        /* Find the output level */
        btReadSummary(aNew, iBestAge+1, &iMin, &nLevel, &iMerge);
        *piOutLevel = iMin + nLevel;










      }
    }
  }

  btCsrReset(&csr, 1);
  return rc;
}
................................................................................
  BtCursor mcsr;                  /* Cursor for reading the meta-tree */
  const void *pKey = 0;
  const u8 *aSum; int nSum;       /* Summary value */
  int nKey = 0;
  sqlite4_buffer buf;
  u32 iLvl;
  int iBlk;

  /* Save the value of the fast-insert flag. It will be restored before
  ** this function returns. Leaving it set here interferes with page 
  ** allocation if the meta-tree needs to be extended.  */
  const int bFastInsertOp = db->bFastInsertOp;
  db->bFastInsertOp = 0;
  
#if 0
  fprintf(stderr, "BEFORE!\n");
  btPrintMetaTree(db->pPager, 1, pHdr);
#endif
  assert_summary_ok(db, SQLITE4_OK);

................................................................................

#if 0
  if( rc==SQLITE4_OK ){
    btPrintMetaTree(db->pPager, 1, pHdr);
  }
#endif
  assert_summary_ok(db, SQLITE4_OK);
  db->bFastInsertOp = bFastInsertOp;
  return rc;
}

/*
** If possible, schedule a merge operation. 
**
** The merge operation is selected based on the following criteria:
................................................................................

    if( rc==SQLITE4_INEXACT ){
      const int nPrefix = sizeof(pSub->aPrefix);
      rc = btCsrKey(pM, &pKey, &nKey);
      if( rc==SQLITE4_OK ){
        if( nKey<nPrefix || memcmp(pKey, pSub->aPrefix, nPrefix) ){
          /* Level is completely empty. Nothing to do for this level. */
          btCsrReset(pM, 0);
          rc = SQLITE4_NOTFOUND;
        }else{
          nKey -= nPrefix;
          pKey = (const void*)(((const u8*)pKey) + nPrefix);
        }
      }
    }
................................................................................
        btCsrSetup(db, iRoot, &pSub->csr);
        rc = btCsrSeek(&pSub->csr, 0, pKey, nKey, BT_SEEK_GE, 0);
        if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
        if( rc==SQLITE4_NOTFOUND ) rc = btErrorBkpt(SQLITE4_CORRUPT);
      }
    }else if( rc==SQLITE4_NOTFOUND ){
      assert( pSub->csr.nPg==0 );
      assert( pSub->mcsr.nPg==0 );
      rc = SQLITE4_OK;
    }
  }

  if( rc==SQLITE4_OK ){
    rc = fiCsrSetCurrent(pCsr);
  }
................................................................................
  btReadSchedule(db, aSched, &s);
  if( s.eBusy==BT_SCHEDULE_BUSY ){
    FiCursor fcsr;                /* FiCursor used to read input */
    FiWriter writer;              /* FiWriter used to write output */

    rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);
    assert( rc!=SQLITE4_NOTFOUND );
    assert_ficursor_ok(&fcsr, rc);
    fiWriterInit(db, &s, &writer, &rc);

    /* The following loop runs once for each key copied from the input to
    ** the output segments. It terminates either when the input is exhausted
    ** or when all available output blocks are full.  */
    while( rc==SQLITE4_OK ){
      const void *pCell;          /* Cell to copy to output */
................................................................................
      db->bFastInsertOp = 1;
      break;
    }
  }

  return rc;
}