SQLite4
Check-in [6003e7dcc2]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Save sub-tree root page numbers, instead of block numbers, in the meta-tree.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 6003e7dcc2485c89d7e6ff12e8953a8655fb9e1b
User & Date: dan 2014-01-03 19:36:12
Context
2014-01-06
20:30
Add merging code. check-in: ad91883237 user: dan tags: trunk
2014-01-03
19:36
Save sub-tree root page numbers, instead of block numbers, in the meta-tree. check-in: 6003e7dcc2 user: dan tags: trunk
2014-01-02
18:53
Changes to FiCursor object to support reading merged blocks. check-in: 27248a1ebc user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btInt.h.

   141    141   
   142    142   /*
   143    143   ** Query for the database page size. Requires an open read transaction.
   144    144   */
   145    145   int sqlite4BtPagerPagesize(BtPager*);
   146    146   
   147    147   /* 
   148         -** Query for the db header values. Requires an open read transaction.
          148  +** Query for the db header values. Requires an open read transaction or
          149  +** an active checkpoint.
   149    150   */
   150    151   BtDbHdr *sqlite4BtPagerDbhdr(BtPager*);
   151    152   
          153  +/*
          154  +** Used by checkpointers to specify the header to use during a checkpoint.
          155  +*/
          156  +void sqlite4BtPagerSetDbhdr(BtPager *, BtDbHdr *);
          157  +
   152    158   /*
   153    159   ** Read, write and trim existing database pages.
   154    160   */
   155    161   int sqlite4BtPageGet(BtPager*, u32 pgno, BtPage **ppPage);
   156    162   int sqlite4BtPageTrimPgno(BtPager*, u32 pgno);
   157    163   int sqlite4BtPageWrite(BtPage*);
   158    164   int sqlite4BtPageTrim(BtPage*);

Changes to src/bt_log.c.

  1824   1824       u32 *aPgno = 0;               /* Array of page numbers to checkpoint */
  1825   1825       int nPgno;                    /* Number of entries in aPgno[] */
  1826   1826       int i;                        /* Used to loop through aPgno[] */
  1827   1827       u8 *aBuf;                     /* Buffer to load page data into */
  1828   1828       u32 iFirstRead;               /* First frame not checkpointed */
  1829   1829   
  1830   1830       rc = btLogSnapshot(pLog, &pLog->snapshot);
         1831  +    sqlite4BtPagerSetDbhdr((BtPager*)pLock, &pLog->snapshot.dbhdr);
  1831   1832       pgsz = pLog->snapshot.dbhdr.pgsz;
  1832   1833   
  1833   1834       if( rc==SQLITE4_OK ){
  1834   1835         /* Allocate space to load log data into */
  1835   1836         aBuf = sqlite4_malloc(pLock->pEnv, pgsz);
  1836   1837         if( aBuf==0 ) rc = btErrorBkpt(SQLITE4_NOMEM);
  1837   1838       }
................................................................................
  1925   1926           pVfs->xShmBarrier(pLog->pFd);
  1926   1927         }
  1927   1928       }
  1928   1929   
  1929   1930       /* Free buffers and drop the checkpointer lock */
  1930   1931       sqlite4_free(pLock->pEnv, aBuf);
  1931   1932       sqlite4_free(pLock->pEnv, aPgno);
  1932         -    sqlite4BtLockCkptUnlock(pLog->pLock);
         1933  +    sqlite4BtLockCkptUnlock(pLock);
         1934  +    sqlite4BtPagerSetDbhdr((BtPager*)pLock, 0);
  1933   1935     }
         1936  +
  1934   1937     return rc;
  1935   1938   }
  1936   1939   
  1937   1940   #if 0
  1938   1941   /*
  1939   1942   ** Return the database page size in bytes.
  1940   1943   */

Changes to src/bt_main.c.

    52     52   /*
    53     53   ** Values that make up the bt_cursor.flags mask.
    54     54   **
    55     55   ** CSR_NEXT_OK, CSR_PREV_OK:
    56     56   **   These are only used by fast-insert cursors. The CSR_NEXT_OK flag is
    57     57   **   set if xNext() may be safely called on the cursor. CSR_PREV_OK is
    58     58   **   true if xPrev() is Ok.
           59  +**
           60  +** CSR_VISIT_DEL:
           61  +**   If this flag is set, do not skip over delete keys that occur in the
           62  +**   merged cursor output. This is used by checkpoint merges.
    59     63   */
    60     64   #define CSR_TYPE_BT   0x0001
    61     65   #define CSR_TYPE_FAST 0x0002
    62     66   #define CSR_NEXT_OK   0x0004
    63     67   #define CSR_PREV_OK   0x0008
           68  +#define CSR_VISIT_DEL 0x0010
    64     69   
    65     70   #define IsBtCsr(pCsr) (((pCsr)->flags & CSR_TYPE_BT)!=0)
    66     71   
    67     72   /* 
    68     73   ** Base class for both cursor types (BtCursor and FiCursor).
    69     74   */
    70     75   struct bt_cursor {
................................................................................
   533    538   
   534    539     if( pgno==pHdr->iSRoot ){
   535    540       int i;
   536    541       BtSchedule s;
   537    542       sqlite4BtBufAppendf(pBuf, "(schedule page) ");
   538    543       memcpy(&s, aData, sizeof(s));
   539    544   
   540         -    sqlite4BtBufAppendf(pBuf, "bBusy=%d ", (int)s.bBusy);
   541         -    sqlite4BtBufAppendf(pBuf, "iAge=%d ", (int)s.iAge);
   542         -    sqlite4BtBufAppendf(pBuf, "iMinLevel=%d ", (int)s.iMinLevel);
   543         -    sqlite4BtBufAppendf(pBuf, "iMaxLevel=%d ", (int)s.iMaxLevel);
   544         -    sqlite4BtBufAppendf(pBuf, "iOutLevel=%d ", (int)s.iOutLevel);
   545         -    sqlite4BtBufAppendf(pBuf, "blocks=(");
          545  +    sqlite4BtBufAppendf(pBuf, "  bBusy=%d\n", (int)s.bBusy);
          546  +    sqlite4BtBufAppendf(pBuf, "  iAge=%d\n", (int)s.iAge);
          547  +    sqlite4BtBufAppendf(pBuf, "  iMinLevel=%d\n", (int)s.iMinLevel);
          548  +    sqlite4BtBufAppendf(pBuf, "  iMaxLevel=%d\n", (int)s.iMaxLevel);
          549  +    sqlite4BtBufAppendf(pBuf, "  iOutLevel=%d\n", (int)s.iOutLevel);
          550  +    sqlite4BtBufAppendf(pBuf, "  blocks=(");
   546    551       for(i=0; s.aBlock[i] && i<array_size(s.aBlock); i++){
   547    552         sqlite4BtBufAppendf(pBuf, "%s%d", i==0 ? "" : " ", (int)s.aBlock[i]);
   548    553       }
   549         -    sqlite4BtBufAppendf(pBuf, ") ");
          554  +    sqlite4BtBufAppendf(pBuf, ")\n");
   550    555   
   551         -    sqlite4BtBufAppendf(pBuf, "iNextPg=%d ", (int)s.iNextPg);
   552         -    sqlite4BtBufAppendf(pBuf, "iNextCell=%d ", (int)s.iNextCell);
   553         -    sqlite4BtBufAppendf(pBuf, "nUsed=%d ", (int)s.nUsed);
   554         -    sqlite4BtBufAppendf(pBuf, "iFreeList=%d", (int)s.iFreeList);
          556  +    sqlite4BtBufAppendf(pBuf, "  iNextPg=%d\n", (int)s.iNextPg);
          557  +    sqlite4BtBufAppendf(pBuf, "  iNextCell=%d\n", (int)s.iNextCell);
          558  +    sqlite4BtBufAppendf(pBuf, "  nUsed=%d\n", (int)s.nUsed);
          559  +    sqlite4BtBufAppendf(pBuf, "  iFreeList=%d\n", (int)s.iFreeList);
   555    560     }else{
   556    561       sqlite4BtBufAppendf(pBuf, "nCell=%d ", nCell);
   557    562       sqlite4BtBufAppendf(pBuf, "iFree=%d ", (int)btFreeOffset(aData, nData));
   558    563       sqlite4BtBufAppendf(pBuf, "flags=%d ", (int)btFlags(aData));
   559    564       if( btFlags(aData) & BT_PGFLAGS_INTERNAL ){
   560    565         sqlite4BtBufAppendf(pBuf, "rchild=%d ", (int)btGetU32(&aData[1]));
   561    566       }
................................................................................
   599    604             /* Interpret the meta-tree entry */
   600    605             if( nKey==sizeof(aSummaryKey) && 0==memcmp(pKey, aSummaryKey, nKey) ){
   601    606               sqlite4BtBufAppendf(pBuf, "  [summary]");
   602    607             }else{
   603    608               u32 iAge = btGetU32(&pKey[0]);
   604    609               u32 iLevel = ~btGetU32(&pKey[4]);
   605    610               u32 iBlk = btGetU32(pVal);
   606         -            sqlite4BtBufAppendf(pBuf, "  [age=%d level=%d block=%d]", 
          611  +            sqlite4BtBufAppendf(pBuf, "  [age=%d level=%d root=%d]", 
   607    612                   (int)iAge, (int)iLevel, (int)iBlk
   608    613                   );
   609    614             }
   610    615           }
   611    616         }
   612    617         sqlite4BtBufAppendf(pBuf, "\n");
   613    618       }
................................................................................
  1002   1007         rc = btErrorBkpt(SQLITE4_NOMEM);
  1003   1008       }
  1004   1009     }
  1005   1010   
  1006   1011     return rc;
  1007   1012   }
  1008   1013   
  1009         -static u32 btBlockToRoot(BtDbHdr *pHdr, u32 iBlk){
         1014  +/*
         1015  +** Return the page number of the first page on block iBlk.
         1016  +*/
         1017  +static u32 btFirstOfBlock(BtDbHdr *pHdr, u32 iBlk){
  1010   1018     assert( iBlk>0 );
  1011   1019     return (iBlk - 1) * (pHdr->blksz / pHdr->pgsz) + 1;
  1012   1020   }
  1013   1021   
  1014   1022   /*
  1015   1023   ** Return true if the cell that the argument cursor currently points to
  1016   1024   ** is a delete marker.
................................................................................
  1337   1345           rc = SQLITE4_NOTFOUND;
  1338   1346         }else{
  1339   1347           rc = btCsrData(&pSub->mcsr, 0, 4, &pV, &nV);
  1340   1348         }
  1341   1349       }
  1342   1350       if( rc==SQLITE4_OK ){
  1343   1351         BtDbHdr *pHdr = sqlite4BtPagerDbhdr(pCsr->base.pDb->pPager);
  1344         -      pSub->csr.iRoot = btBlockToRoot(pHdr, sqlite4BtGetU32((const u8*)pV));
         1352  +      pSub->csr.iRoot = sqlite4BtGetU32((const u8*)pV);
  1345   1353         rc = btCsrEnd(&pSub->csr, !bNext);
  1346   1354       }
  1347   1355     }
  1348   1356   
  1349   1357     return rc;
  1350   1358   }
  1351   1359   
................................................................................
  1423   1431     while( p->iLvl<(int)iMin ){
  1424   1432       p->iAge++;
  1425   1433       if( p->iAge>=(p->nSum)/6 ) return 1;
  1426   1434       btReadSummary(p->aSum, p->iAge, &iMin, &nLevel, 0);
  1427   1435       p->iLvl = (int)iMin + (int)nLevel - 1;
  1428   1436     }
  1429   1437   
         1438  +  assert( p->iSub<p->nSub );
  1430   1439     return 0;
  1431   1440   }
  1432   1441   
  1433   1442   static int fiLevelIterInit(bt_db *db, FiLevelIter *p){
  1434   1443     int rc;                         /* Return code */
  1435   1444   
  1436   1445     memset(p, 0, sizeof(FiLevelIter));
................................................................................
  1450   1459   
  1451   1460     return rc;
  1452   1461   }
  1453   1462   
  1454   1463   static void fiLevelIterCleanup(FiLevelIter *p){
  1455   1464     btCsrReset(&p->csr, 1);
  1456   1465   }
         1466  +
         1467  +/*
         1468  +** Format values iAge and iLvl into an 8 byte prefix as used in the
         1469  +** meta-tree.
         1470  +*/
         1471  +static void fiFormatPrefix(u8 *aPrefix, u32 iAge, u32 iLvl){
         1472  +  btPutU32(&aPrefix[0], iAge);
         1473  +  btPutU32(&aPrefix[4], ~(u32)iLvl);
         1474  +}
  1457   1475   
  1458   1476   /*
  1459   1477   ** Seek a fast-insert cursor.
  1460   1478   */
  1461   1479   static int fiCsrSeek(FiCursor *pCsr, const void *pK, int nK, int eSeek){
  1462   1480     u8 aPrefix[8];
  1463   1481     int rc = SQLITE4_NOTFOUND;      /* Return code */
................................................................................
  1473   1491       /* Initialize the iterator used to skip through database levels */
  1474   1492       rc = fiLevelIterInit(db, &iter);
  1475   1493       if( rc!=SQLITE4_OK ) return rc;
  1476   1494   
  1477   1495       if( eSeek==BT_SEEK_EQ ){
  1478   1496         FiSubCursor *pSub;
  1479   1497         BtCursor *pM;
  1480         -      int iAge;
  1481   1498   
  1482   1499         /* A BT_SEEK_EQ is a special case. There is no need to set up a cursor
  1483   1500         ** that can be advanced (in either direction) in this case. All that
  1484   1501         ** is required is to search each level in order for the requested key 
  1485   1502         ** (or a corresponding delete marker). Once a match is found, there
  1486   1503         ** is no need to search any further. As a result, only a single
  1487   1504         ** sub-cursor is required.  */
................................................................................
  1488   1505         rc = fiCsrAllocateSubs(db, pCsr, 1);
  1489   1506         pSub = pCsr->aSub;
  1490   1507         pM = &pSub->mcsr;
  1491   1508   
  1492   1509         btCsrSetup(db, pHdr->iMRoot, pM);
  1493   1510         while( 0==fiLevelIterNext(&iter) ){
  1494   1511   
  1495         -        u16 iMin;
  1496         -        u16 nLevel;
  1497         -        u16 iMerge;
  1498         -        int iLvl;
  1499         -
  1500         -        btPutU32(aPrefix, iter.iAge);
  1501         -        btPutU32(&aPrefix[4], ~(u32)iter.iLvl);
         1512  +        fiFormatPrefix(aPrefix, iter.iAge, iter.iLvl);
  1502   1513           rc = btCsrSeek(pM, aPrefix, pK, nK, BT_SEEK_LE, BT_CSRSEEK_SEEK);
  1503   1514   
  1504   1515           if( rc==SQLITE4_NOTFOUND ){
  1505   1516             /* All keys in this level are greater than pK/nK. */
  1506   1517             /* no-op */
  1507   1518           }else if( rc==SQLITE4_OK || rc==SQLITE4_INEXACT ){
  1508   1519             const void *pV;
  1509   1520             int nV;
  1510         -          int iBlk;
         1521  +          u32 iRoot;
  1511   1522             sqlite4BtCsrData(&pM->base, 0, 4, &pV, &nV);
  1512         -          iBlk = sqlite4BtGetU32((const u8*)pV);
         1523  +          iRoot = sqlite4BtGetU32((const u8*)pV);
  1513   1524             btCsrReset(&pSub->csr, 1);
  1514         -          btCsrSetup(db, btBlockToRoot(pHdr, iBlk), &pSub->csr);
         1525  +          btCsrSetup(db, iRoot, &pSub->csr);
  1515   1526   
  1516   1527             rc = btCsrSeek(&pSub->csr, 0, pK, nK, BT_SEEK_EQ, BT_CSRSEEK_SEEK);
  1517   1528             assert( rc!=SQLITE4_INEXACT );
  1518   1529             if( rc!=SQLITE4_NOTFOUND ){
  1519   1530               /* A hit on the requested key or an error has occurred. Either
  1520   1531                ** way, break out of the loop. If this is a hit, set iBt to
  1521   1532                ** zero so that the BtCsrKey() and BtCsrData() routines know
................................................................................
  1557   1568   
  1558   1569           if( rc==SQLITE4_NOTFOUND ){
  1559   1570             /* No keys to visit in this level */
  1560   1571             rc = SQLITE4_OK;
  1561   1572           }else if( rc==SQLITE4_OK || rc==SQLITE4_INEXACT ){
  1562   1573             const void *pV;
  1563   1574             int nV;
  1564         -          int iBlk;
         1575  +          u32 iRoot;
  1565   1576             sqlite4BtCsrData(&pM->base, 0, 4, &pV, &nV);
  1566         -          iBlk = sqlite4BtGetU32((const u8*)pV);
         1577  +          iRoot = sqlite4BtGetU32((const u8*)pV);
  1567   1578             btCsrReset(&pSub->csr, 1);
  1568         -          btCsrSetup(db, btBlockToRoot(pHdr, iBlk), &pSub->csr);
         1579  +          btCsrSetup(db, iRoot, &pSub->csr);
  1569   1580   
  1570   1581             rc = btCsrSeek(&pSub->csr, 0, pK, nK, eSeek, BT_CSRSEEK_SEEK);
  1571   1582             if( rc==SQLITE4_OK ) bMatch = 1;
  1572   1583             if( rc==SQLITE4_INEXACT ) bHit = 1;
  1573   1584             if( rc==SQLITE4_INEXACT || rc==SQLITE4_NOTFOUND ) rc = SQLITE4_OK;
  1574   1585   
  1575   1586           }else{
................................................................................
  1630   1641         }
  1631   1642       }
  1632   1643       if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
  1633   1644   
  1634   1645       if( rc==SQLITE4_OK ){
  1635   1646         const void *pV;
  1636   1647         int nV;
  1637         -      int iBlk;
         1648  +      int iRoot;
  1638   1649         btCsrData(&pSub->mcsr, 0, 4, &pV, &nV);
  1639         -      iBlk = sqlite4BtGetU32((const u8*)pV);
         1650  +      iRoot = sqlite4BtGetU32((const u8*)pV);
  1640   1651         btCsrReset(&pSub->csr, 1);
  1641         -      btCsrSetup(db, btBlockToRoot(pHdr, iBlk), &pSub->csr);
         1652  +      btCsrSetup(db, iRoot, &pSub->csr);
  1642   1653         rc = btCsrEnd(&pSub->csr, bLast);
  1643   1654       }
  1644   1655     }
  1645   1656     fiLevelIterCleanup(&iter);
  1646   1657   
  1647   1658     if( rc==SQLITE4_OK ){
  1648   1659       pCsr->base.flags &= ~(CSR_NEXT_OK | CSR_PREV_OK);
................................................................................
  2163   2174   */
  2164   2175   static int btAllocateNonOverflow(bt_db *db, BtPage **ppPg){
  2165   2176     int rc;
  2166   2177     if( db->bFastInsertOp ){
  2167   2178       BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);
  2168   2179       u32 iPg;
  2169   2180   
  2170         -    iPg = pHdr->nSubPg + btBlockToRoot(pHdr, pHdr->iSubBlock);
         2181  +    iPg = pHdr->nSubPg + btFirstOfBlock(pHdr, pHdr->iSubBlock);
  2171   2182       pHdr->nSubPg++;
  2172   2183       rc = sqlite4BtPageGet(db->pPager, iPg, ppPg);
  2173   2184       if( rc==SQLITE4_OK ){
  2174   2185         rc = sqlite4BtPageWrite(*ppPg);
  2175   2186         if( rc!=SQLITE4_OK ){
  2176   2187           sqlite4BtPageRelease(*ppPg);
  2177   2188         }
................................................................................
  3314   3325             rc = btInsertAndBalance(&csr, 1, &kv);
  3315   3326   
  3316   3327             /* Unless this is a block-full error, break out of the loop */
  3317   3328             if( rc!=BT_BLOCKFULL ) break;
  3318   3329             assert( iRoot==0 );
  3319   3330   
  3320   3331             /* Try to schedule a merge operation */
  3321         -#if 0
  3322   3332             rc = btScheduleMerge(db);
  3323         -#endif
  3324         -          rc = SQLITE4_OK;
  3325   3333   
  3326   3334             if( rc==SQLITE4_OK ){
  3327   3335               rc = btFastInsertRoot(db, pHdr, &iRootPg);
  3328   3336             }
  3329   3337             if( rc==SQLITE4_OK ){
  3330   3338               btCsrReset(&csr, 1);
  3331   3339               btCsrSetup(db, iRootPg, &csr);
................................................................................
  3571   3579           /* Find the input age and maximum level */
  3572   3580           btReadSummary(aSum, iBestAge, &iMin, &nLevel, &iMerge);
  3573   3581           *piMinLevel = (u32)iMin;
  3574   3582           *piMaxLevel = (u32)(iMin + nLevel - 1);
  3575   3583           *piAge = iBestAge;
  3576   3584   
  3577   3585           /* Find the output level */
  3578         -        btReadSummary(aSum, iBestAge+1, &iMin, &nLevel, &iMerge);
         3586  +        btReadSummary(aNew, iBestAge+1, &iMin, &nLevel, &iMerge);
  3579   3587           *piOutLevel = iMin + nLevel;
  3580   3588   
  3581   3589           /* Update the summary record for the output segment. */
  3582   3590           btWriteSummary(aNew, iBestAge+1, iMin, nLevel+1, iMerge);
  3583   3591   
  3584   3592           /* Write the updated summary record back to the db. */
  3585   3593           rc = btReplaceEntry(
................................................................................
  3690   3698   
  3691   3699         /* The key for the new entry consists of the concatentation of two 
  3692   3700         ** 32-bit big-endian integers - the <age> and <level-no>. The age
  3693   3701         ** of the new segment is 0. The level number is one greater than the
  3694   3702         ** level number of the previous segment.  */
  3695   3703         btPutU32(&aKey[0], 0);
  3696   3704         btPutU32(&aKey[4], ~iLevel);
  3697         -      btPutU32(&aVal[0], iSubBlock);
         3705  +      btPutU32(&aVal[0], btFirstOfBlock(pHdr, iSubBlock));
  3698   3706         rc = btReplaceEntry(db, pHdr->iMRoot, aKey, 8, aVal, 4);
  3699   3707       }
  3700   3708     }
  3701   3709   
  3702   3710     if( rc==SQLITE4_OK ){
  3703         -    *piRoot = btBlockToRoot(pHdr, pHdr->iSubBlock);
         3711  +    *piRoot = btFirstOfBlock(pHdr, pHdr->iSubBlock);
  3704   3712     }
  3705   3713     db->bFastInsertOp = 1;
  3706   3714     return rc;
  3707   3715   }
  3708   3716   
  3709   3717   /*
  3710   3718   ** Set up a fast-insert cursor to read the input data for a merge operation.
................................................................................
  3711   3719   */
  3712   3720   static int fiSetupMergeCsr(
  3713   3721     bt_db *db,                      /* Database handle */
  3714   3722     BtDbHdr *pHdr,                  /* Current database header values */
  3715   3723     BtSchedule *p,                  /* Description of merge operation */
  3716   3724     FiCursor *pCsr                  /* Populate this object before returning */
  3717   3725   ){
  3718         -  int iBt;                        /* Used to loop through component cursors */
         3726  +  int iSub;                       /* Used to loop through component cursors */
  3719   3727     int rc;                         /* Return code */
  3720   3728   
  3721   3729     memset(pCsr, 0, sizeof(FiCursor));
  3722         -  pCsr->base.flags = CSR_TYPE_FAST;
         3730  +  pCsr->base.flags = CSR_TYPE_FAST | CSR_NEXT_OK | CSR_VISIT_DEL;
  3723   3731     pCsr->base.pDb = db;
  3724         -
  3725   3732     rc = fiCsrAllocateSubs(db, pCsr, (p->iMaxLevel - p->iMinLevel) + 1);
  3726   3733     assert( rc==SQLITE4_OK || pCsr->nBt==0 );
  3727         -  for(iBt=0; iBt<pCsr->nBt && rc==SQLITE4_OK; iBt++){
         3734  +
         3735  +  /* Initialize each sub-cursor */
         3736  +  for(iSub=0; iSub<pCsr->nBt && rc==SQLITE4_OK; iSub++){
         3737  +    u32 iLvl = p->iMaxLevel - iSub;
         3738  +    FiSubCursor *pSub = &pCsr->aSub[iSub];
         3739  +    BtCursor *pM = &pSub->mcsr;
         3740  +    const void *pKey = 0; int nKey = 0;
         3741  +
         3742  +    /* Seek the meta-tree cursor to the first entry (smallest keys) for the
         3743  +    ** current level. If an earlier merge operation completely emptied the
         3744  +    ** level, the sought entry may not exist at all.  */
         3745  +    fiFormatPrefix(pSub->aPrefix, p->iAge, iLvl);
         3746  +    btCsrSetup(db, pHdr->iMRoot, pM);
         3747  +    rc = btCsrSeek(pM, 0, pSub->aPrefix, sizeof(pSub->aPrefix), BT_SEEK_GE, 0);
         3748  +
         3749  +    if( rc==SQLITE4_INEXACT ){
         3750  +      const int nPrefix = sizeof(pSub->aPrefix);
         3751  +      rc = btCsrKey(pM, &pKey, &nKey);
         3752  +      if( rc==SQLITE4_OK ){
         3753  +        if( nKey<nPrefix || memcmp(pKey, pSub->aPrefix, nPrefix) ){
         3754  +          /* Level is completely empty. Nothing to do for this level. */
         3755  +          rc = SQLITE4_NOTFOUND;
         3756  +        }else{
         3757  +          nKey -= nPrefix;
         3758  +          pKey = (const void*)(((const u8*)pKey) + nPrefix);
         3759  +        }
         3760  +      }
         3761  +    }
         3762  +
         3763  +    /* Assuming the process above found a block, set up the block cursor and
         3764  +    ** seek it to the smallest first valid key.  */
         3765  +    if( rc==SQLITE4_OK ){
         3766  +      const void *pVal = 0; int nVal = 0;
         3767  +      rc = btCsrData(pM, 0, 4, &pVal, &nVal);
         3768  +      if( rc==SQLITE4_OK ){
         3769  +        u32 iRoot = sqlite4BtGetU32((const u8*)pVal);
         3770  +        btCsrSetup(db, iRoot, &pSub->csr);
         3771  +        rc = btCsrSeek(&pSub->csr, 0, pKey, nKey, BT_SEEK_GE, 0);
         3772  +        if( rc==SQLITE4_INEXACT ) rc = SQLITE4_OK;
         3773  +        if( rc==SQLITE4_NOTFOUND ) rc = btErrorBkpt(SQLITE4_CORRUPT);
         3774  +      }
         3775  +    }
         3776  +  }
         3777  +
         3778  +  if( rc==SQLITE4_OK ){
         3779  +    rc = fiCsrSetCurrent(pCsr);
  3728   3780     }
  3729   3781   
  3730   3782     return rc;
  3731   3783   }
  3732   3784   
  3733   3785   /*
  3734   3786   ** This is called by a checkpointer to handle a schedule page.
  3735   3787   */
  3736   3788   int sqlite4BtMerge(bt_db *db, BtDbHdr *pHdr, u8 *aSched){
  3737   3789     BtSchedule s;                   /* Deserialized schedule object */
  3738         -  FiCursor fcsr;                  /* FiCursor used to read input */
  3739         -  int rc;                         /* Return code */
         3790  +  int rc = SQLITE4_OK;            /* Return code */
  3740   3791   
  3741   3792     /* Set up the input cursor. */
  3742   3793     btReadSchedule(db, aSched, &s);
  3743         -  assert( s.bBusy );
  3744         -  rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);
         3794  +  if( s.bBusy ){
         3795  +    FiCursor fcsr;                /* FiCursor used to read input */
         3796  +    rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);
  3745   3797   
  3746         -  /* The following loop runs once for each key copied from the input to
  3747         -  ** the output segments. It terminates either when the input is exhausted
  3748         -  ** or when all available output blocks are full.  */
  3749         -  while( rc==SQLITE4_OK ){
  3750         -    rc = fiCsrStep(&fcsr);
         3798  +    /* The following loop runs once for each key copied from the input to
         3799  +    ** the output segments. It terminates either when the input is exhausted
         3800  +    ** or when all available output blocks are full.  */
         3801  +    while( rc==SQLITE4_OK ){
         3802  +      rc = fiCsrStep(&fcsr);
         3803  +    }
         3804  +
         3805  +    /* Assuming no error has occurred, update the serialized BtSchedule
         3806  +    ** structure stored in buffer aSched[]. The caller will write this
         3807  +    ** buffer to the database file as page (pHdr->iSRoot).  */
         3808  +    if( rc==SQLITE4_OK || rc==SQLITE4_NOTFOUND ){
         3809  +      rc = SQLITE4_OK;
         3810  +      btWriteSchedule(aSched, &s, &rc);
         3811  +    }
         3812  +
         3813  +    fiCsrReset(&fcsr);
  3751   3814     }
  3752         -
  3753         -  /* Assuming no error has occurred, update the serialized BtSchedule
  3754         -  ** structure stored in buffer aSched[]. The caller will write this
  3755         -  ** buffer to the database file as page (pHdr->iSRoot).  */
  3756         -  if( rc==SQLITE4_OK || rc==SQLITE4_NOTFOUND ){
  3757         -    rc = SQLITE4_OK;
  3758         -    btWriteSchedule(aSched, &s, &rc);
  3759         -  }
  3760         -
  3761         -  fiCsrReset(&fcsr);
  3762   3815     return rc;
  3763   3816   }
  3764   3817   
  3765   3818   /*
  3766   3819   ** Insert a new key/value pair or replace an existing one.
  3767   3820   **
  3768   3821   ** This function may modify either the b-tree or fast-insert-tree, depending

Changes to src/bt_pager.c.

   760    760   
   761    761   /* 
   762    762   ** Query for the root page number. Requires an open read transaction.
   763    763   */
   764    764   BtDbHdr *sqlite4BtPagerDbhdr(BtPager *p){
   765    765     return p->pHdr;
   766    766   }
          767  +
          768  +void sqlite4BtPagerSetDbhdr(BtPager *p, BtDbHdr *pHdr){
          769  +  assert( p->pHdr==0 || pHdr==0 );
          770  +  p->pHdr = pHdr;
          771  +}
   767    772   
   768    773   /*
   769    774   ** Request a reference to page pgno of the database.
   770    775   */
   771    776   int sqlite4BtPageGet(BtPager *p, u32 pgno, BtPage **ppPg){
   772    777     int rc = SQLITE4_OK;            /* Return code */
   773    778     BtPage *pRet;                   /* Returned page handle */