SQLite4
Check-in [eec16b0f2f]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add tests for block-redirects to lsmtest.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | block-redirects
Files: files | file ages | folders
SHA1: eec16b0f2fb6312f0538c6d15c9533fe42ec6717
User & Date: dan 2013-01-21 19:50:21
Context
2013-01-22
20:07
Several block-redirect related bugfixes. check-in: a56a334333 user: dan tags: block-redirects
2013-01-21
19:50
Add tests for block-redirects to lsmtest. check-in: eec16b0f2f user: dan tags: block-redirects
16:53
If a free-list-only segment is generated while a merge of the top-level segment is underway, add the new segment to the merge inputs immediately. Also, if auto-checkpoints are enabled, schedule a checkpoint after each block is moved within an lsm_work(nmerge=1) call. check-in: 89b4286682 user: dan tags: block-redirects
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to lsm-test/README.

    24     24                 the system recovers and other clients proceed unaffected if
    25     25                 a process fails in the middle of a write transaction.
    26     26   
    27     27                 The difference from lsmtest2.c is that this file tests
    28     28                 live-recovery (recovery from a failure that occurs while other
    29     29                 clients are still running) whereas lsmtest2.c tests recovery
    30     30                 from a system or power failure.
           31  +
           32  +  lsmtest9.c: More data tests. These focus on testing that calling
           33  +              lsm_work(nMerge=1) to compact the database does not corrupt it.
           34  +              In other words, that databases containing block-redirects
           35  +              can be read and written.
    31     36   
    32     37   
    33     38   
    34     39   

Changes to lsm-test/lsmtest.h.

   215    215   void test_data_1(const char *, const char *, int *pRc);
   216    216   void test_data_2(const char *, const char *, int *pRc);
   217    217   void test_data_3(const char *, const char *, int *pRc);
   218    218   void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *);
   219    219   void testCaseProgress(int, int, int, int *);
   220    220   int testCaseNDot(void);
   221    221   
          222  +void testCompareDb(Datasource *, int, int, TestDb *, TestDb *, int *);
          223  +int testControlDb(TestDb **ppDb);
          224  +
   222    225   typedef struct CksumDb CksumDb;
   223    226   CksumDb *testCksumArrayNew(Datasource *, int, int, int);
   224    227   char *testCksumArrayGet(CksumDb *, int);
   225    228   void testCksumArrayFree(CksumDb *);
   226    229   void testCaseStart(int *pRc, char *zFmt, ...);
   227    230   void testCaseFinish(int rc);
   228    231   void testCaseSkip(void);
................................................................................
   230    233   
   231    234   #define TEST_CKSUM_BYTES 29
   232    235   int testCksumDatabase(TestDb *pDb, char *zOut);
   233    236   int testCountDatabase(TestDb *pDb);
   234    237   void testCompareInt(int, int, int *);
   235    238   void testCompareStr(const char *z1, const char *z2, int *pRc);
   236    239   
          240  +/* lsmtest9.c */
          241  +void test_data_4(const char *, const char *, int *pRc);
          242  +
   237    243   
   238    244   /*
   239    245   ** Similar to the Tcl_GetIndexFromObjStruct() Tcl library function.
   240    246   */
   241    247   #define testArgSelect(w,x,y,z) testArgSelectX(w,x,sizeof(w[0]),y,z)
   242    248   int testArgSelectX(void *, const char *, int, const char *, int *);
   243    249   
   244    250   #ifdef __cplusplus
   245    251   }  /* End of the 'extern "C"' block */
   246    252   #endif
   247    253   
   248    254   #endif

Changes to lsm-test/lsmtest1.c.

    88     88     zRet = testMallocPrintf("data.%s.%s.%d.%d", 
    89     89         zSystem, zData, pTest->nRow, pTest->nVerify
    90     90     );
    91     91     testFree(zData);
    92     92     return zRet;
    93     93   }
    94     94   
    95         -static int testControlDb(TestDb **ppDb){
           95  +int testControlDb(TestDb **ppDb){
    96     96   #ifdef HAVE_KYOTOCABINET
    97     97     return tdb_open("kyotocabinet", "tmp.db", 1, ppDb);
    98     98   #else
    99     99     return tdb_open("sqlite3", ":memory:", 1, ppDb);
   100    100   #endif
   101    101   }
   102    102   
................................................................................
   344    344       if( testCaseBegin(pRc, zPattern, "%s", zName) ){
   345    345         doDataTest1(zSystem, &aTest[i], pRc);
   346    346       }
   347    347       testFree(zName);
   348    348     }
   349    349   }
   350    350   
   351         -static void testCompareDb(
          351  +void testCompareDb(
   352    352     Datasource *pData,
   353    353     int nData,
   354    354     int iSeed,
   355    355     TestDb *pControl,
   356    356     TestDb *pDb,
   357    357     int *pRc
   358    358   ){

Added lsm-test/lsmtest9.c.

            1  +
            2  +#include "lsmtest.h"
            3  +
            4  +#define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE
            5  +#define DATA_RANDOM     TEST_DATASOURCE_RANDOM
            6  +
            7  +typedef struct Datatest4 Datatest4;
            8  +
            9  +/*
           10  +** Test overview:
           11  +**
           12  +**   1. Insert (Datatest4.nRec) records into a database.
           13  +**
           14  +**   2. Repeat (Datatest4.nRepeat) times:
           15  +**
           16  +**      2a. Delete 2/3 of the records in the database.
           17  +**
           18  +**      2b. Run lsm_work(nMerge=1).
           19  +**
           20  +**      2c. Insert as many records as were deleted in 2a.
           21  +**
           22  +**      2d. Check database content is as expected.
           23  +**
           24  +**      2e. If (Datatest4.bReopen) is true, close and reopen the database.
           25  +*/
           26  +struct Datatest4 {
           27  +  /* Datasource definition */
           28  +  DatasourceDefn defn;
           29  +
           30  +  int nRec;
           31  +  int nRepeat;
           32  +  int bReopen;
           33  +};
           34  +
           35  +static void doDataTest4(
           36  +  const char *zSystem,            /* Database system to test */
           37  +  Datatest4 *p,                   /* Structure containing test parameters */
           38  +  int *pRc                        /* OUT: Error code */
           39  +){
           40  +  lsm_db *db = 0;
           41  +  TestDb *pDb;
           42  +  TestDb *pControl;
           43  +  Datasource *pData;
           44  +  int i;
           45  +  int rc = 0;
           46  +  int iDot = 0;
           47  +
           48  +  int nRecOn3 = (p->nRec / 3);
           49  +  int iData = 0;
           50  +
           51  +  /* Start the test case, open a database and allocate the datasource. */
           52  +  rc = testControlDb(&pControl);
           53  +  pDb = testOpen(zSystem, 1, &rc);
           54  +  pData = testDatasourceNew(&p->defn);
           55  +  if( rc==0 ) db = tdb_lsm(pDb);
           56  +
           57  +  testWriteDatasourceRange(pControl, pData, iData, nRecOn3*3, &rc);
           58  +  testWriteDatasourceRange(pDb,      pData, iData, nRecOn3*3, &rc);
           59  +
           60  +  for(i=0; rc==0 && i<p->nRepeat; i++){
           61  +
           62  +    testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc);
           63  +    testDeleteDatasourceRange(pDb,      pData, iData, nRecOn3*2, &rc);
           64  +
           65  +    if( db ){
           66  +      int nDone;
           67  +      do {
           68  +        nDone = 0;
           69  +        rc = lsm_work(db, 1, 100000, &nDone);
           70  +      }while( rc==0 && nDone>0 );
           71  +    }
           72  +
           73  +    iData += (nRecOn3*2);
           74  +    testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc);
           75  +    testWriteDatasourceRange(pDb,      pData, iData+nRecOn3, nRecOn3*2, &rc);
           76  +
           77  +    testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc);
           78  +
           79  +    /* If Datatest4.bReopen is true, close and reopen the database */
           80  +    if( p->bReopen ){
           81  +      testReopen(&pDb, &rc);
           82  +      if( rc==0 ) db = tdb_lsm(pDb);
           83  +    }
           84  +
           85  +    /* Update the progress dots... */
           86  +    testCaseProgress(i, p->nRepeat, testCaseNDot(), &iDot);
           87  +  }
           88  +
           89  +  testClose(&pDb);
           90  +  testClose(&pControl);
           91  +  testDatasourceFree(pData);
           92  +  testCaseFinish(rc);
           93  +  *pRc = rc;
           94  +}
           95  +
           96  +static char *getName4(const char *zSystem, Datatest4 *pTest){
           97  +  char *zRet;
           98  +  char *zData;
           99  +  zData = testDatasourceName(&pTest->defn);
          100  +  zRet = testMallocPrintf("data4.%s.%s.%d.%d.%d", 
          101  +      zSystem, zData, pTest->nRec, pTest->nRepeat, pTest->bReopen
          102  +  );
          103  +  testFree(zData);
          104  +  return zRet;
          105  +}
          106  +
          107  +void test_data_4(
          108  +  const char *zSystem,            /* Database system name */
          109  +  const char *zPattern,           /* Run test cases that match this pattern */
          110  +  int *pRc                        /* IN/OUT: Error code */
          111  +){
          112  +  Datatest4 aTest[] = {
          113  +      /* defn,                                 nRec, nRepeat, bReopen */
          114  +    { {DATA_RANDOM,     20,25,     100,200}, 10000,      10,       0   },
          115  +    { {DATA_RANDOM,     20,25,     100,200}, 10000,      10,       1   },
          116  +  };
          117  +
          118  +  int i;
          119  +
          120  +  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
          121  +    char *zName = getName4(zSystem, &aTest[i]);
          122  +    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
          123  +      doDataTest4(zSystem, &aTest[i], pRc);
          124  +    }
          125  +    testFree(zName);
          126  +  }
          127  +}
          128  +
          129  +
          130  +

Changes to lsm-test/lsmtest_main.c.

   459    459   
   460    460     for(j=0; tdb_system_name(j); j++){
   461    461       rc = 0;
   462    462   
   463    463       test_data_1(tdb_system_name(j), zPattern, &rc);
   464    464       test_data_2(tdb_system_name(j), zPattern, &rc);
   465    465       test_data_3(tdb_system_name(j), zPattern, &rc);
          466  +    test_data_4(tdb_system_name(j), zPattern, &rc);
   466    467       test_rollback(tdb_system_name(j), zPattern, &rc);
   467    468       test_mc(tdb_system_name(j), zPattern, &rc);
   468    469       test_mt(tdb_system_name(j), zPattern, &rc);
   469    470   
   470    471       if( rc ) nFail++;
   471    472     }
   472    473   

Changes to main.mk.

   290    290   EXTHDR += \
   291    291     $(TOP)/ext/icu/sqliteicu.h
   292    292   
   293    293   LSMTESTSRC = $(TOP)/lsm-test/lsmtest1.c $(TOP)/lsm-test/lsmtest2.c           \
   294    294                $(TOP)/lsm-test/lsmtest3.c $(TOP)/lsm-test/lsmtest4.c           \
   295    295                $(TOP)/lsm-test/lsmtest5.c $(TOP)/lsm-test/lsmtest6.c           \
   296    296                $(TOP)/lsm-test/lsmtest7.c $(TOP)/lsm-test/lsmtest8.c           \
          297  +             $(TOP)/lsm-test/lsmtest9.c                                      \
   297    298                $(TOP)/lsm-test/lsmtest_datasource.c \
   298    299                $(TOP)/lsm-test/lsmtest_func.c $(TOP)/lsm-test/lsmtest_io.c     \
   299    300                $(TOP)/lsm-test/lsmtest_main.c $(TOP)/lsm-test/lsmtest_mem.c    \
   300    301                $(TOP)/lsm-test/lsmtest_tdb.c $(TOP)/lsm-test/lsmtest_tdb3.c    \
   301    302                $(TOP)/lsm-test/lsmtest_util.c 
   302    303   
   303    304   LSMTESTHDR = $(TOP)/lsm-test/lsmtest.h $(TOP)/lsm-test/lsmtest_tdb.h

Changes to src/lsmInt.h.

   720    720   void lsmEnvShmUnmap(lsm_env *, lsm_file *, int);
   721    721   
   722    722   void lsmEnvSleep(lsm_env *, int);
   723    723   
   724    724   int lsmFsReadSyncedId(lsm_db *db, int, i64 *piVal);
   725    725   
   726    726   int lsmFsSegmentContainsPg(FileSystem *pFS, Segment *, Pgno, int *);
          727  +Pgno lsmFsRedirectPage(FileSystem *, Redirect *, Pgno);
   727    728   
   728    729   /*
   729    730   ** End of functions from "lsm_file.c".
   730    731   **************************************************************************/
   731    732   
   732    733   /* 
   733    734   ** Functions from file "lsm_sorted.c".

Changes to src/lsm_file.c.

   887    887       for(i=0; i<p->n; i++){
   888    888         if( iBlk==p->a[i].iFrom ) return p->a[i].iTo;
   889    889       }
   890    890     }
   891    891     return iBlk;
   892    892   }
   893    893   
   894         -static Pgno fsRedirectPage(FileSystem *pFS, Redirect *pRedir, Pgno iPg){
          894  +Pgno lsmFsRedirectPage(FileSystem *pFS, Redirect *pRedir, Pgno iPg){
   895    895     Pgno iReal = iPg;
   896    896   
   897    897     if( pRedir ){
   898    898       const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
   899    899       int iBlk = fsPageToBlock(pFS, iPg);
   900    900       int i;
   901    901       for(i=0; i<pRedir->n; i++){
................................................................................
  1440   1440           rc = fsBlockNext(pFS, pDel, iBlk, &iNext);
  1441   1441         }else if( bZero==0 && pDel->iLastPg!=fsLastPageOnBlock(pFS, iLastBlk) ){
  1442   1442           break;
  1443   1443         }
  1444   1444         rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk);
  1445   1445         iBlk = iNext;
  1446   1446       }
         1447  +
         1448  +    if( pDel->pRedirect ){
         1449  +      assert( pDel->pRedirect==&pSnapshot->redirect );
         1450  +      pSnapshot->redirect.n = 0;
         1451  +    }
  1447   1452   
  1448   1453       if( bZero ) memset(pDel, 0, sizeof(Segment));
  1449   1454     }
  1450   1455     return LSM_OK;
  1451   1456   }
  1452   1457   
  1453   1458   static Pgno firstOnBlock(FileSystem *pFS, int iBlk, Pgno *aPgno, int nPgno){
................................................................................
  1611   1616           assert( pPg->flags & PAGE_HASPREV );
  1612   1617           iPg = fsLastPageOnBlock(pFS, lsmGetU32(&pPg->aData[-4]));
  1613   1618         }else{
  1614   1619           iPg--;
  1615   1620         }
  1616   1621       }else{
  1617   1622         if( pRun ){
  1618         -        Pgno iLast = fsRedirectPage(pFS, pRedir, pRun->iLastPg);
         1623  +        Pgno iLast = lsmFsRedirectPage(pFS, pRedir, pRun->iLastPg);
  1619   1624           if( iPg==iLast ){
  1620   1625             *ppNext = 0;
  1621   1626             return LSM_OK;
  1622   1627           }
  1623   1628         }
  1624   1629   
  1625   1630         if( fsIsLast(pFS, iPg) ){
................................................................................
  2610   2615         Pgno iFirst;                /* First page of segment (post-redirection) */
  2611   2616         int iBlk;                   /* Current block (during iteration) */
  2612   2617         int iLastBlk;               /* Last real block of segment */
  2613   2618         int bLastIsLastOnBlock;     /* True iLast is the last on its block */
  2614   2619   
  2615   2620         iBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iFirst));
  2616   2621         iLastBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iLastPg));
  2617         -      iLast = fsRedirectPage(pFS, pRedir, pSeg->iLastPg);
  2618         -      iFirst = fsRedirectPage(pFS, pRedir, pSeg->iFirst);
         2622  +      iLast = lsmFsRedirectPage(pFS, pRedir, pSeg->iLastPg);
         2623  +      iFirst = lsmFsRedirectPage(pFS, pRedir, pSeg->iFirst);
  2619   2624   
  2620   2625         bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==iLast);
  2621   2626         assert( iBlk>0 );
  2622   2627   
  2623   2628         /* If the first page of this run is also the first page of its first
  2624   2629         ** block, set the flag to indicate that the first page of iBlk is 
  2625   2630         ** in use.  */

Changes to src/lsm_sorted.c.

   833    833     int rc = LSM_OK;
   834    834   
   835    835     if( p->iPg ){
   836    836       lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
   837    837       int iCell;                    /* Current cell number on leaf page */
   838    838       Pgno iLeaf;                   /* Page number of current leaf page */
   839    839       int nDepth;                   /* Depth of b-tree structure */
          840  +    Segment *pSeg = pCsr->pSeg;
   840    841   
   841    842       /* Decode the MergeInput structure */
   842    843       iLeaf = p->iPg;
   843    844       nDepth = (p->iCell & 0x00FF);
   844    845       iCell = (p->iCell >> 8) - 1;
   845    846   
   846    847       /* Allocate the BtreeCursor.aPg[] array */
................................................................................
   849    850   
   850    851       /* Populate the last entry of the aPg[] array */
   851    852       if( rc==LSM_OK ){
   852    853         Page **pp = &pCsr->aPg[nDepth-1].pPage;
   853    854         pCsr->iPg = nDepth-1;
   854    855         pCsr->nDepth = nDepth;
   855    856         pCsr->aPg[pCsr->iPg].iCell = iCell;
   856         -      rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLeaf, pp);
          857  +      rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp);
   857    858       }
   858    859   
   859    860       /* Populate any other aPg[] array entries */
   860    861       if( rc==LSM_OK && nDepth>1 ){
   861    862         Blob blob = {0,0,0};
   862    863         void *pSeek;
   863    864         int nSeek;
   864    865         int iTopicSeek;
   865    866         int iPg = 0;
   866         -      int iLoad = pCsr->pSeg->iRoot;
          867  +      int iLoad = pSeg->iRoot;
   867    868         Page *pPg = pCsr->aPg[nDepth-1].pPage;
   868    869    
   869    870         if( pageObjGetNRec(pPg)==0 ){
   870    871           /* This can happen when pPg is the right-most leaf in the b-tree.
   871    872           ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
   872    873           ** greater than any real key.  */
   873    874           assert( iCell==-1 );
   874    875           iTopicSeek = 1000;
   875    876           pSeek = 0;
   876    877           nSeek = 0;
   877    878         }else{
   878    879           Pgno dummy;
   879         -        rc = pageGetBtreeKey(pCsr->pSeg, pPg,
          880  +        rc = pageGetBtreeKey(pSeg, pPg,
   880    881               0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
   881    882           );
   882    883         }
   883    884   
   884    885         do {
   885    886           Page *pPg;
   886         -        rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pPg);
          887  +        rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg);
   887    888           assert( rc==LSM_OK || pPg==0 );
   888    889           if( rc==LSM_OK ){
   889    890             u8 *aData;                  /* Buffer containing page data */
   890    891             int nData;                  /* Size of aData[] in bytes */
   891    892             int iMin;
   892    893             int iMax;
   893    894             int iCell;
................................................................................
   904    905               int iTry = (iMin+iMax)/2;
   905    906               void *pKey; int nKey;         /* Key for cell iTry */
   906    907               int iTopic;                   /* Topic for key pKeyT/nKeyT */
   907    908               Pgno iPtr;                    /* Pointer for cell iTry */
   908    909               int res;                      /* (pSeek - pKeyT) */
   909    910   
   910    911               rc = pageGetBtreeKey(
   911         -                pCsr->pSeg, pPg, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob
          912  +                pSeg, pPg, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob
   912    913               );
   913    914               if( rc!=LSM_OK ) break;
   914    915   
   915    916               res = sortedKeyCompare(
   916    917                   xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
   917    918               );
   918    919               assert( res!=0 );
................................................................................
   925    926                 iMin = iTry+1;
   926    927               }
   927    928             }
   928    929   
   929    930             pCsr->aPg[iPg].pPage = pPg;
   930    931             pCsr->aPg[iPg].iCell = iCell;
   931    932             iPg++;
   932         -          assert( iPg!=nDepth-1 || iLoad==iLeaf );
          933  +          assert( iPg!=nDepth-1 
          934  +               || lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf
          935  +          );
   933    936           }
   934    937         }while( rc==LSM_OK && iPg<(nDepth-1) );
   935    938         sortedBlobFree(&blob);
   936    939       }
   937    940   
   938    941       /* Load the current key and pointer */
   939    942       if( rc==LSM_OK ){
................................................................................
   947    950         if( pBtreePg->iCell<0 ){
   948    951           Pgno dummy;
   949    952           int i;
   950    953           for(i=pCsr->iPg-1; i>=0; i--){
   951    954             if( pCsr->aPg[i].iCell>0 ) break;
   952    955           }
   953    956           assert( i>=0 );
   954         -        rc = pageGetBtreeKey(pCsr->pSeg,
          957  +        rc = pageGetBtreeKey(pSeg,
   955    958               pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
   956    959               &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
   957    960           );
   958    961           pCsr->eType |= LSM_SEPARATOR;
   959    962   
   960    963         }else{
   961    964           rc = btreeCursorLoadKey(pCsr);
................................................................................
  4942   4945       if( rc==LSM_OK ){
  4943   4946         rc = sortedNewFreelistOnly(pDb);
  4944   4947       }
  4945   4948       nRem -= nPg;
  4946   4949       if( nPg ) bDirty = 1;
  4947   4950     }
  4948   4951   
  4949         -  if( rc==LSM_OK && bDirty ){
  4950         -    lsmFinishWork(pDb, 0, &rc);
  4951         -  }else{
  4952         -    int rcdummy = LSM_BUSY;
  4953         -    lsmFinishWork(pDb, 0, &rcdummy);
  4954         -  }
  4955         -  assert( pDb->pWorker==0 );
  4956         -
  4957   4952     if( rc==LSM_OK ){
  4958   4953       *pnWrite = (nMax - nRem);
  4959   4954       *pbCkpt = (bCkpt && nRem<=0);
  4960         -    if( nMerge==1 && pDb->nAutockpt>0 && bDirty
         4955  +    if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0
  4961   4956        && pWorker->pLevel 
  4962   4957        && pWorker->pLevel->nRight==0 
  4963   4958        && pWorker->pLevel->pNext==0 
  4964   4959       ){
  4965   4960         *pbCkpt = 1;
  4966   4961       }
         4962  +  }
         4963  +
         4964  +  if( rc==LSM_OK && bDirty ){
         4965  +    lsmFinishWork(pDb, 0, &rc);
  4967   4966     }else{
         4967  +    int rcdummy = LSM_BUSY;
         4968  +    lsmFinishWork(pDb, 0, &rcdummy);
  4968   4969       *pnWrite = 0;
  4969   4970       *pbCkpt = 0;
  4970   4971     }
  4971         -
         4972  +  assert( pDb->pWorker==0 );
  4972   4973     return rc;
  4973   4974   }
  4974   4975   
  4975   4976   static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
  4976   4977     int rc;
  4977   4978     int nWrite = 0;
  4978   4979     int bCkpt = 0;