SQLite4
Check-in [ef14a93b8b]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
SHA1 Hash:ef14a93b8bdf5884ba6fd9f984ac14af9860d711
Date: 2014-01-09 15:37:43
User: dan
Comment:Further bug fixes for merge routines.
Tags And Properties
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/bt_main.c

712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730


731
732
733
734
735
736
737
...
761
762
763
764
765
766
767

768
769
770
771

772


773


774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
....
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
....
1169
1170
1171
1172
1173
1174
1175









1176
1177
1178
1179
1180
1181
1182
....
1449
1450
1451
1452
1453
1454
1455
1456



1457
1458
1459
1460
1461

1462
1463
1464
1465
1466
1467
1468
....
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
....
1617
1618
1619
1620
1621
1622
1623

1624
1625
1626
1627
1628
1629
1630
....
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
....
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
....
1749
1750
1751
1752
1753
1754
1755


1756
1757
1758
1759
1760
1761
1762
....
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
....
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
....
3784
3785
3786
3787
3788
3789
3790

3791
3792

3793
3794
3795
3796
3797
3798
3799
3800
....
3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939

3940
3941
3942
3943
3944
3945
3946
....
4127
4128
4129
4130
4131
4132
4133



4134
4135
4136
4137
4138
4139
4140
....
4434
4435
4436
4437
4438
4439
4440

4441
4442
4443
4444
4445
4446
4447
....
4449
4450
4451
4452
4453
4454
4455
4456
4457
4458
4459
4460
4461
4462
4463
4464
/*
** This function is really just a big assert() statement. It contributes 
** nothing to the operation of the library.
**
** The assert() fails if the summary-record is not consistent with the
** actual contents of the meta-tree.
*/
static void assert_summary_ok(bt_db *db){
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);
  BtCursor csr;                   /* Cursor used to load summary record */
  BtCursor mcsr;                  /* Cursor used to scan meta-tree */
  const u8 *aSum; int nSum;       /* Current summary record */
  int i;
  int rc;

  struct AgeData {
    int iMinLevel;                /* Smallest level of this age seen */
    int iMaxLevel;                /* Largest level of this age seen */
  } aData[32];



  for(i=0; i<array_size(aData); i++){
    aData[i].iMinLevel = -1;
    aData[i].iMaxLevel = -1;
  }

  rc = fiLoadSummary(db, &csr, &aSum, &nSum);
................................................................................
    }
  }
  assert( rc==SQLITE4_OK );

  for(i=0; i<array_size(aData); i++){
    u16 iMin = 0; 
    u16 nLevel = 0;

    if( i<(nSum/6) ){
      btReadSummary(aSum, i, &iMin, &nLevel, 0);
    }
    if( aData[i].iMinLevel>=0 ){

      assert( (int)iMin==aData[i].iMinLevel );


      assert( (int)nLevel==(aData[i].iMaxLevel - aData[i].iMinLevel + 1) );


    }else{
      assert( iMin==0 && nLevel==0 );
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
}
#else
# define assert_summary_ok(x) 
#endif


/*
** This function compares the key passed via parameters pK and nK to the
** key that cursor pCsr currently points to.
**
................................................................................
  return (iBlk - 1) * (pHdr->blksz / pHdr->pgsz) + 1;
}

/*
** Return true if the cell that the argument cursor currently points to
** is a delete marker.
*/
static int fiCsrIsDelete(BtCursor *pCsr){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->base.pDb->pPager);
  int bRet;                       /* Return value */
  u8 *aData;
  u8 *pCell;
  int n;

  aData = sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
................................................................................
    pCell += n;
    pCell += sqlite4BtVarintGet32(pCell, &n);
    bRet = (n==1);
  }

  return bRet;
}










/*
** Return an integer representing the result of (K1 - K2).
*/
static int btKeyCompare(
  const void *pKey1, int nKey1, 
  const void *pKey2, int nKey2
................................................................................
){
  int rc;

  rc = btCsrStep(&pSub->csr, bNext);
  if( rc==SQLITE4_NOTFOUND ){
    const void *pV;
    int nV;
#if 1



    rc = btCsrKey(&pSub->mcsr, &pV, &nV);
    assert( rc==SQLITE4_OK && memcmp(pV, pSub->aPrefix, 8)==0 );
#endif
    rc = btCsrStep(&pSub->mcsr, bNext);


    if( rc==SQLITE4_OK ){
      rc = btCsrKey(&pSub->mcsr, &pV, &nV);
    }
    if( rc==SQLITE4_OK ){
      if( nV<sizeof(pSub->aPrefix) 
       || memcmp(pSub->aPrefix, pV, sizeof(pSub->aPrefix)) 
      ){
................................................................................
      }
    }

    /* Figure out a new current bt cursor */
    if( rc==SQLITE4_OK ){
      rc = fiCsrSetCurrent(pCsr);
    }
  }while( rc==SQLITE4_OK && fiCsrIsDelete(&pCsr->aSub[pCsr->iBt].csr) );

#ifndef NDEBUG
  if( rc==SQLITE4_OK ){
    sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
    assert( btKeyCompare(buf.p, buf.n, pKey, nKey) < 0 );
  }
  sqlite4_buffer_clear(&buf);
#endif

  return rc;
}

................................................................................
static int fiCsrSeek(FiCursor *pCsr, const void *pK, int nK, int eSeek){
  u8 aPrefix[8];
  int rc = SQLITE4_NOTFOUND;      /* Return code */
  bt_db *db = pCsr->base.pDb;     /* Database handle */
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);

  assert( eSeek==BT_SEEK_LE || eSeek==BT_SEEK_EQ || eSeek==BT_SEEK_GE );

  fiCsrReset(pCsr);

  if( pHdr->iMRoot ){
    FiLevelIter iter;

    /* Initialize the iterator used to skip through database levels */
    rc = fiLevelIterInit(db, &iter);
................................................................................
          if( rc!=SQLITE4_NOTFOUND ){
            /* A hit on the requested key or an error has occurred. Either
            ** way, break out of the loop. If this is a hit, set iBt to
            ** zero so that the BtCsrKey() and BtCsrData() routines know
            ** to return data from the first (only) sub-cursor. */
            assert( pCsr->iBt<0 );
            if( rc==SQLITE4_OK ){
              if( 0==fiCsrIsDelete(&pSub->csr) ){
                pCsr->iBt = 0;
              }else{
                rc = SQLITE4_NOTFOUND;
              }
            }
            break;
          }
................................................................................
      }
      assert( rc!=SQLITE4_OK || iter.iSub==iter.nSub );

      if( rc==SQLITE4_OK ){
        pCsr->base.flags |= (eSeek==BT_SEEK_GE ? CSR_NEXT_OK : CSR_PREV_OK);
        rc = fiCsrSetCurrent(pCsr);
        if( rc==SQLITE4_OK ){
          BtCursor *pCurrent = &pCsr->aSub[pCsr->iBt].csr;
          if( fiCsrIsDelete(pCurrent) ){
            rc = fiCsrStep(pCsr);
            if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT;
          }else if( bMatch==0 ){
            rc = (bHit ? SQLITE4_INEXACT : SQLITE4_NOTFOUND);
          }
        }
      }
................................................................................
}

static int fiCsrEnd(FiCursor *pCsr, int bLast){
  bt_db *db = pCsr->base.pDb;
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);
  FiLevelIter iter;         /* Used to iterate through all f-tree levels */
  int rc;                   /* Return code */



  rc = fiLevelIterInit(db, &iter);
  if( rc==SQLITE4_OK ){
    rc = fiCsrAllocateSubs(db, pCsr, iter.nSub);
  }

  while( rc==SQLITE4_OK && 0==fiLevelIterNext(&iter) ){
................................................................................
  }
  fiLevelIterCleanup(&iter);

  if( rc==SQLITE4_OK ){
    pCsr->base.flags &= ~(CSR_NEXT_OK | CSR_PREV_OK);
    pCsr->base.flags |= (bLast ? CSR_PREV_OK : CSR_NEXT_OK);
    rc = fiCsrSetCurrent(pCsr);
    if( rc==SQLITE4_OK && fiCsrIsDelete(&pCsr->aSub[pCsr->iBt].csr) ){
      rc = fiCsrStep(pCsr);
    }
  }

  return rc;
}

................................................................................
        }
      }
    }

    if( rc==SQLITE4_NOTFOUND && iBestAge>=0 ){
      u8 *aNew;
      int nByte = nSum;
      if( iAge+1>=(nSum/3) ) nByte += 6;

      rc = SQLITE4_OK;
      aNew = (u8*)btMalloc(db, nByte, &rc);
      if( rc==SQLITE4_OK ){
        BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);

        /* Create a copy of the summary record */
................................................................................
  const void *pKey = 0;
  const u8 *aSum; int nSum;       /* Summary value */
  int nKey = 0;
  sqlite4_buffer buf;
  u32 iLvl;
  int iBlk;
  

  fprintf(stderr, "BEFORE!\n");
  btPrintMetaTree(db->pPager, 1, pHdr);

  assert_summary_ok(db);

  memset(&csr, 0, sizeof(csr));
  memset(&mcsr, 0, sizeof(mcsr));
  btCsrSetup(db, pHdr->iMRoot, &mcsr);
  sqlite4_buffer_init(&buf, 0);
  
  if( p->iNextPg ){
................................................................................
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
  sqlite4_buffer_clear(&buf);

#if 1
  if( rc==SQLITE4_OK ){
    btPrintMetaTree(db->pPager, 1, pHdr);
    assert_summary_ok(db);
  }
#endif

  return rc;
}

/*
** If possible, schedule a merge operation. 
**
** The merge operation is selected based on the following criteria:
................................................................................
      if( rc==SQLITE4_OK ){
        u32 iRoot = sqlite4BtGetU32((const u8*)pVal);
        btCsrSetup(db, iRoot, &pSub->csr);
        rc = btCsrSeek(&pSub->csr, 0, pKey, nKey, BT_SEEK_GE, 0);
        if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
        if( rc==SQLITE4_NOTFOUND ) rc = btErrorBkpt(SQLITE4_CORRUPT);
      }



    }
  }

  if( rc==SQLITE4_OK ){
    rc = fiCsrSetCurrent(pCsr);
  }

................................................................................
  /* Set up the input cursor. */
  btReadSchedule(db, aSched, &s);
  if( s.eBusy==BT_SCHEDULE_BUSY ){
    FiCursor fcsr;                /* FiCursor used to read input */
    FiWriter writer;              /* FiWriter used to write output */

    rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);

    fiWriterInit(db, &s, &writer, &rc);

    /* The following loop runs once for each key copied from the input to
    ** the output segments. It terminates either when the input is exhausted
    ** or when all available output blocks are full.  */
    while( rc==SQLITE4_OK ){
      const void *pCell;          /* Cell to copy to output */
................................................................................

      /* Read the current cell from the input and push it to the output. */
      fiCsrCell(&fcsr, &pCell, &nCell);
      rc = fiWriterAdd(&writer, pCell, nCell);
      if( rc==BT_BLOCKFULL ){
        rc = fiWriterFlushAll(&writer);
        fiWriterInit(db, &s, &writer, &rc);
      }
      if( rc==SQLITE4_OK ){
        rc = fiCsrStep(&fcsr);
      }
    }

    /* Assuming no error has occurred, update the serialized BtSchedule
    ** structure stored in buffer aSched[]. The caller will write this
    ** buffer to the database file as page (pHdr->iSRoot).  */







|











>
>







 







>

|


>
|
>
>
|
>
>









|







 







|







 







>
>
>
>
>
>
>
>
>







 







<
>
>
>
|
|

<

>







 







|




|







 







>







 







|







 







<
|







 







>
>







 







|







 







|







 







>


>
|







 







|


<


>







 







>
>
>







 







>







 







<
|







712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
...
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
....
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
....
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
....
1466
1467
1468
1469
1470
1471
1472

1473
1474
1475
1476
1477
1478

1479
1480
1481
1482
1483
1484
1485
1486
1487
....
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
....
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
....
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
....
1747
1748
1749
1750
1751
1752
1753

1754
1755
1756
1757
1758
1759
1760
1761
....
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
....
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
....
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
....
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
....
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959

3960
3961
3962
3963
3964
3965
3966
3967
3968
3969
....
4150
4151
4152
4153
4154
4155
4156
4157
4158
4159
4160
4161
4162
4163
4164
4165
4166
....
4460
4461
4462
4463
4464
4465
4466
4467
4468
4469
4470
4471
4472
4473
4474
....
4476
4477
4478
4479
4480
4481
4482

4483
4484
4485
4486
4487
4488
4489
4490
/*
** This function is really just a big assert() statement. It contributes 
** nothing to the operation of the library.
**
** The assert() fails if the summary-record is not consistent with the
** actual contents of the meta-tree.
*/
static void assert_summary_ok(bt_db *db, int crc){
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);
  BtCursor csr;                   /* Cursor used to load summary record */
  BtCursor mcsr;                  /* Cursor used to scan meta-tree */
  const u8 *aSum; int nSum;       /* Current summary record */
  int i;
  int rc;

  struct AgeData {
    int iMinLevel;                /* Smallest level of this age seen */
    int iMaxLevel;                /* Largest level of this age seen */
  } aData[32];

  if( crc!=SQLITE4_OK ) return;

  for(i=0; i<array_size(aData); i++){
    aData[i].iMinLevel = -1;
    aData[i].iMaxLevel = -1;
  }

  rc = fiLoadSummary(db, &csr, &aSum, &nSum);
................................................................................
    }
  }
  assert( rc==SQLITE4_OK );

  for(i=0; i<array_size(aData); i++){
    u16 iMin = 0; 
    u16 nLevel = 0;
    u16 iMerge = 0;
    if( i<(nSum/6) ){
      btReadSummary(aSum, i, &iMin, &nLevel, &iMerge);
    }
    if( aData[i].iMinLevel>=0 ){
      int nLevelExpect = aData[i].iMaxLevel - aData[i].iMinLevel + 1;
      assert( (int)iMin==aData[i].iMinLevel 
           || (iMerge!=0 && (int)iMin<aData[i].iMinLevel)
      );
      assert( (int)nLevel==nLevelExpect
           || (iMerge!=0 && nLevel>nLevelExpect)
      );
    }else{
      assert( iMin==0 && nLevel==0 );
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
}
#else
# define assert_summary_ok(x, rc) 
#endif


/*
** This function compares the key passed via parameters pK and nK to the
** key that cursor pCsr currently points to.
**
................................................................................
  return (iBlk - 1) * (pHdr->blksz / pHdr->pgsz) + 1;
}

/*
** Return true if the cell that the argument cursor currently points to
** is a delete marker.
*/
static int btCsrIsDelete(BtCursor *pCsr){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->base.pDb->pPager);
  int bRet;                       /* Return value */
  u8 *aData;
  u8 *pCell;
  int n;

  aData = sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
................................................................................
    pCell += n;
    pCell += sqlite4BtVarintGet32(pCell, &n);
    bRet = (n==1);
  }

  return bRet;
}

static int fiCsrIsDelete(FiCursor *pCsr){
  int res = 0;
  if( (pCsr->base.flags & CSR_VISIT_DEL)==0 ){
    BtCursor *p = &pCsr->aSub[pCsr->iBt].csr;
    res = btCsrIsDelete(p);
  }
  return res;
}

/*
** Return an integer representing the result of (K1 - K2).
*/
static int btKeyCompare(
  const void *pKey1, int nKey1, 
  const void *pKey2, int nKey2
................................................................................
){
  int rc;

  rc = btCsrStep(&pSub->csr, bNext);
  if( rc==SQLITE4_NOTFOUND ){
    const void *pV;
    int nV;


#ifndef NDEBUG
    const void *pTmp; int nTmp;
    rc = btCsrKey(&pSub->mcsr, &pTmp, &nTmp);
    assert( rc==SQLITE4_OK && memcmp(pTmp, pSub->aPrefix, 8)==0 );
#endif


    rc = btCsrStep(&pSub->mcsr, bNext);
    if( rc==SQLITE4_OK ){
      rc = btCsrKey(&pSub->mcsr, &pV, &nV);
    }
    if( rc==SQLITE4_OK ){
      if( nV<sizeof(pSub->aPrefix) 
       || memcmp(pSub->aPrefix, pV, sizeof(pSub->aPrefix)) 
      ){
................................................................................
      }
    }

    /* Figure out a new current bt cursor */
    if( rc==SQLITE4_OK ){
      rc = fiCsrSetCurrent(pCsr);
    }
  }while( rc==SQLITE4_OK && fiCsrIsDelete(pCsr) );

#ifndef NDEBUG
  if( rc==SQLITE4_OK ){
    sqlite4BtCsrKey(&pCsr->base, &pKey, &nKey);
    assert( btKeyCompare(buf.p, buf.n, pKey, nKey) * (bNext?1:-1) < 0 );
  }
  sqlite4_buffer_clear(&buf);
#endif

  return rc;
}

................................................................................
static int fiCsrSeek(FiCursor *pCsr, const void *pK, int nK, int eSeek){
  u8 aPrefix[8];
  int rc = SQLITE4_NOTFOUND;      /* Return code */
  bt_db *db = pCsr->base.pDb;     /* Database handle */
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);

  assert( eSeek==BT_SEEK_LE || eSeek==BT_SEEK_EQ || eSeek==BT_SEEK_GE );
  assert( (pCsr->base.flags & CSR_VISIT_DEL)==0 || eSeek==BT_SEEK_GE );
  fiCsrReset(pCsr);

  if( pHdr->iMRoot ){
    FiLevelIter iter;

    /* Initialize the iterator used to skip through database levels */
    rc = fiLevelIterInit(db, &iter);
................................................................................
          if( rc!=SQLITE4_NOTFOUND ){
            /* A hit on the requested key or an error has occurred. Either
            ** way, break out of the loop. If this is a hit, set iBt to
            ** zero so that the BtCsrKey() and BtCsrData() routines know
            ** to return data from the first (only) sub-cursor. */
            assert( pCsr->iBt<0 );
            if( rc==SQLITE4_OK ){
              if( 0==btCsrIsDelete(&pSub->csr) ){
                pCsr->iBt = 0;
              }else{
                rc = SQLITE4_NOTFOUND;
              }
            }
            break;
          }
................................................................................
      }
      assert( rc!=SQLITE4_OK || iter.iSub==iter.nSub );

      if( rc==SQLITE4_OK ){
        pCsr->base.flags |= (eSeek==BT_SEEK_GE ? CSR_NEXT_OK : CSR_PREV_OK);
        rc = fiCsrSetCurrent(pCsr);
        if( rc==SQLITE4_OK ){

          if( fiCsrIsDelete(pCsr) ){
            rc = fiCsrStep(pCsr);
            if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT;
          }else if( bMatch==0 ){
            rc = (bHit ? SQLITE4_INEXACT : SQLITE4_NOTFOUND);
          }
        }
      }
................................................................................
}

static int fiCsrEnd(FiCursor *pCsr, int bLast){
  bt_db *db = pCsr->base.pDb;
  BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);
  FiLevelIter iter;         /* Used to iterate through all f-tree levels */
  int rc;                   /* Return code */

  assert( (pCsr->base.flags & CSR_VISIT_DEL)==0 );

  rc = fiLevelIterInit(db, &iter);
  if( rc==SQLITE4_OK ){
    rc = fiCsrAllocateSubs(db, pCsr, iter.nSub);
  }

  while( rc==SQLITE4_OK && 0==fiLevelIterNext(&iter) ){
................................................................................
  }
  fiLevelIterCleanup(&iter);

  if( rc==SQLITE4_OK ){
    pCsr->base.flags &= ~(CSR_NEXT_OK | CSR_PREV_OK);
    pCsr->base.flags |= (bLast ? CSR_PREV_OK : CSR_NEXT_OK);
    rc = fiCsrSetCurrent(pCsr);
    if( rc==SQLITE4_OK && btCsrIsDelete(&pCsr->aSub[pCsr->iBt].csr) ){
      rc = fiCsrStep(pCsr);
    }
  }

  return rc;
}

................................................................................
        }
      }
    }

    if( rc==SQLITE4_NOTFOUND && iBestAge>=0 ){
      u8 *aNew;
      int nByte = nSum;
      if( iBestAge+1>=(nSum/6) ) nByte += 6;

      rc = SQLITE4_OK;
      aNew = (u8*)btMalloc(db, nByte, &rc);
      if( rc==SQLITE4_OK ){
        BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);

        /* Create a copy of the summary record */
................................................................................
  const void *pKey = 0;
  const u8 *aSum; int nSum;       /* Summary value */
  int nKey = 0;
  sqlite4_buffer buf;
  u32 iLvl;
  int iBlk;
  
#if 0
  fprintf(stderr, "BEFORE!\n");
  btPrintMetaTree(db->pPager, 1, pHdr);
#endif
  assert_summary_ok(db, SQLITE4_OK);

  memset(&csr, 0, sizeof(csr));
  memset(&mcsr, 0, sizeof(mcsr));
  btCsrSetup(db, pHdr->iMRoot, &mcsr);
  sqlite4_buffer_init(&buf, 0);
  
  if( p->iNextPg ){
................................................................................
    }
  }

  btCsrReset(&csr, 1);
  btCsrReset(&mcsr, 1);
  sqlite4_buffer_clear(&buf);

#if 0
  if( rc==SQLITE4_OK ){
    btPrintMetaTree(db->pPager, 1, pHdr);

  }
#endif
  assert_summary_ok(db, SQLITE4_OK);
  return rc;
}

/*
** If possible, schedule a merge operation. 
**
** The merge operation is selected based on the following criteria:
................................................................................
      if( rc==SQLITE4_OK ){
        u32 iRoot = sqlite4BtGetU32((const u8*)pVal);
        btCsrSetup(db, iRoot, &pSub->csr);
        rc = btCsrSeek(&pSub->csr, 0, pKey, nKey, BT_SEEK_GE, 0);
        if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
        if( rc==SQLITE4_NOTFOUND ) rc = btErrorBkpt(SQLITE4_CORRUPT);
      }
    }else if( rc==SQLITE4_NOTFOUND ){
      assert( pSub->csr.nPg==0 );
      rc = SQLITE4_OK;
    }
  }

  if( rc==SQLITE4_OK ){
    rc = fiCsrSetCurrent(pCsr);
  }

................................................................................
  /* Set up the input cursor. */
  btReadSchedule(db, aSched, &s);
  if( s.eBusy==BT_SCHEDULE_BUSY ){
    FiCursor fcsr;                /* FiCursor used to read input */
    FiWriter writer;              /* FiWriter used to write output */

    rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);
    assert( rc!=SQLITE4_NOTFOUND );
    fiWriterInit(db, &s, &writer, &rc);

    /* The following loop runs once for each key copied from the input to
    ** the output segments. It terminates either when the input is exhausted
    ** or when all available output blocks are full.  */
    while( rc==SQLITE4_OK ){
      const void *pCell;          /* Cell to copy to output */
................................................................................

      /* Read the current cell from the input and push it to the output. */
      fiCsrCell(&fcsr, &pCell, &nCell);
      rc = fiWriterAdd(&writer, pCell, nCell);
      if( rc==BT_BLOCKFULL ){
        rc = fiWriterFlushAll(&writer);
        fiWriterInit(db, &s, &writer, &rc);

      }else if( rc==SQLITE4_OK ){
        rc = fiCsrStep(&fcsr);
      }
    }

    /* Assuming no error has occurred, update the serialized BtSchedule
    ** structure stored in buffer aSched[]. The caller will write this
    ** buffer to the database file as page (pHdr->iSRoot).  */