Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Improve the integrity-check assert that ensures all blocks in the file are accounted for. Roll any in-memory free-list records into the end of the main segment when executing lsm_work(nmerge=1).
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | block-redirects
Files: files | file ages | folders
SHA1: e5edba9caa5b8cd58c2e761eb528b240878e0dce
User & Date: dan 2013-01-24 16:45:59.754
Context
2013-01-25
20:46
Fix a problem in the range-delete code. check-in: 96badcb933 user: dan tags: block-redirects
2013-01-24
16:45
Improve the integrity-check assert that ensures all blocks in the file are accounted for. Roll any in-memory free-list records into the end of the main segment when executing lsm_work(nmerge=1). check-in: e5edba9caa user: dan tags: block-redirects
2013-01-23
18:13
Keep the contents of Segment structures up to date at all times, so that none of the page numbers contained within are subject to redirection. check-in: a89abc2117 user: dan tags: block-redirects
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest9.c.
61
62
63
64
65
66
67

68
69
70
71
72

73
74

75
76
77
78
79
80
81
82
83
84
85

86
87
88
89
90
91
92

    testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc);
    testDeleteDatasourceRange(pDb,      pData, iData, nRecOn3*2, &rc);

    if( db ){
      int nDone;
      fprintf(stderr, "lsm_work() start...\n");

      do {
        nDone = 0;
        rc = lsm_work(db, 1, (1<<30), &nDone);
      }while( rc==0 && nDone>0 );
      fprintf(stderr, "lsm_work() done...\n");

    }


    iData += (nRecOn3*2);
    testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc);
    testWriteDatasourceRange(pDb,      pData, iData+nRecOn3, nRecOn3*2, &rc);

    testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc);

    /* If Datatest4.bReopen is true, close and reopen the database */
    if( p->bReopen ){
      testReopen(&pDb, &rc);
      if( rc==0 ) db = tdb_lsm(pDb);
    }


    /* Update the progress dots... */
    testCaseProgress(i, p->nRepeat, testCaseNDot(), &iDot);
  }

  testClose(&pDb);
  testClose(&pControl);







>





>


>











>







61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

    testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc);
    testDeleteDatasourceRange(pDb,      pData, iData, nRecOn3*2, &rc);

    if( db ){
      int nDone;
      fprintf(stderr, "lsm_work() start...\n");
      fflush(stderr);
      do {
        nDone = 0;
        rc = lsm_work(db, 1, (1<<30), &nDone);
      }while( rc==0 && nDone>0 );
      fprintf(stderr, "lsm_work() done...\n");
      fflush(stderr);
    }

if( i+1<p->nRepeat ){
    iData += (nRecOn3*2);
    testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc);
    testWriteDatasourceRange(pDb,      pData, iData+nRecOn3, nRecOn3*2, &rc);

    testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc);

    /* If Datatest4.bReopen is true, close and reopen the database */
    if( p->bReopen ){
      testReopen(&pDb, &rc);
      if( rc==0 ) db = tdb_lsm(pDb);
    }
}

    /* Update the progress dots... */
    testCaseProgress(i, p->nRepeat, testCaseNDot(), &iDot);
  }

  testClose(&pDb);
  testClose(&pControl);
Changes to lsm-test/lsmtest_tdb3.c.
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 autoflush=16384 "
    "max_freelist=2 autocheckpoint=32768 "
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zFilename, 







|







986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 autoflush=16384 "
    "max_freelist=4 autocheckpoint=32768 "
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zFilename, 
Changes to src/lsm_file.c.
1447
1448
1449
1450
1451
1452
1453




1454
1455
1456





1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
      iRet = iPg;
    }
  }
  return iRet;
}

#ifndef NDEBUG




static int fsPageRedirects(FileSystem *pFS, Segment *p, Pgno iPg){
  return (iPg!=0 && iPg!=lsmFsRedirectPage(pFS, p->pRedirect, iPg));
}





static int fsSegmentRedirects(FileSystem *pFS, Segment *p){
  return (
      fsPageRedirects(pFS, p, p->iFirst)
   || fsPageRedirects(pFS, p, p->iRoot)
   || fsPageRedirects(pFS, p, p->iLastPg)
  );
}
#endif

/*
** Argument aPgno is an array of nPgno page numbers. All pages belong to
** the segment pRun. This function gobbles from the start of the run to the
** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is







>
>
>
>



>
>
>
>
>

|



|







1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
      iRet = iPg;
    }
  }
  return iRet;
}

#ifndef NDEBUG
/*
** Return true if page iPg, which is a part of segment p, lies on
** a redirected block. 
*/
static int fsPageRedirects(FileSystem *pFS, Segment *p, Pgno iPg){
  return (iPg!=0 && iPg!=lsmFsRedirectPage(pFS, p->pRedirect, iPg));
}

/*
** Return true if the second argument is not NULL and any of the first
** last or root pages lie on a redirected block. 
*/
static int fsSegmentRedirects(FileSystem *pFS, Segment *p){
  return (p && (
      fsPageRedirects(pFS, p, p->iFirst)
   || fsPageRedirects(pFS, p, p->iRoot)
   || fsPageRedirects(pFS, p, p->iLastPg)
  ));
}
#endif

/*
** Argument aPgno is an array of nPgno page numbers. All pages belong to
** the segment pRun. This function gobbles from the start of the run to the
** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is
2641
2642
2643
2644
2645
2646
2647
2648

2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668








2669
2670
2671
2672
2673
2674
2675
  u8 *aUsed
){
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      Redirect *pRedir = pSeg->pRedirect;
      int rc;
      int iBlk;                   /* Current block (during iteration) */
      int iLastBlk;               /* Last real block of segment */

      int bLastIsLastOnBlock;     /* True iLast is the last on its block */

      assert( 0==fsSegmentRedirects(pFS, pSeg) );
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);

      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
      assert( iBlk>0 );

      /* If the first page of this run is also the first page of its first
      ** block, set the flag to indicate that the first page of iBlk is 
      ** in use.  */
      if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst ){
        assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
        aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
      }

      do {
        /* iBlk is a part of this sorted run. */
        aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;









        /* Unless the sorted run finishes before the last page on this block, 
        ** the last page of this block is also in use.  */
        if( iBlk!=iLastBlk || bLastIsLastOnBlock ){
          assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_LAST_PG)==0 );
          aUsed[iBlk-1] |= INTEGRITY_CHECK_LAST_PG;
        }







|
>



|





<
<
<
<
<
<
<




>
>
>
>
>
>
>
>







2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667







2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
  u8 *aUsed
){
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      Redirect *pRedir = pSeg->pRedirect;
      int rc;
      int iBlk;                   /* Current block (during iteration) */
      int iLastBlk;               /* Last block of segment */
      int iFirstBlk;              /* First block of segment */
      int bLastIsLastOnBlock;     /* True iLast is the last on its block */

      assert( 0==fsSegmentRedirects(pFS, pSeg) );
      iBlk = iFirstBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);

      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
      assert( iBlk>0 );









      do {
        /* iBlk is a part of this sorted run. */
        aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;

        /* If the first page of this block is also part of the segment,
        ** set the flag to indicate that the first page of iBlk is in use.  
        */
        if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst || iBlk!=iFirstBlk ){
          assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
          aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
        }

        /* Unless the sorted run finishes before the last page on this block, 
        ** the last page of this block is also in use.  */
        if( iBlk!=iLastBlk || bLastIsLastOnBlock ){
          assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_LAST_PG)==0 );
          aUsed[iBlk-1] |= INTEGRITY_CHECK_LAST_PG;
        }
Changes to src/lsm_shared.c.
161
162
163
164
165
166
167
168

169
170
171
172
173
174
175
  DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
  if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
  p->nBlock--;
  return 0;
}

static int dbTruncate(lsm_db *pDb, i64 iInUse){
  int rc;

  int i;
  DbTruncateCtx ctx;

  assert( pDb->pWorker );
  ctx.nBlock = pDb->pWorker->nBlock;
  ctx.iInUse = iInUse;








|
>







161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
  DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
  if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
  p->nBlock--;
  return 0;
}

static int dbTruncate(lsm_db *pDb, i64 iInUse){
  int rc = LSM_OK;
#if 0
  int i;
  DbTruncateCtx ctx;

  assert( pDb->pWorker );
  ctx.nBlock = pDb->pWorker->nBlock;
  ctx.iInUse = iInUse;

184
185
186
187
188
189
190

191
192
193
194
195
196
197
      lsmLogMessage(pDb, 0, 
          "dbTruncate(): truncated db to %d blocks",ctx.nBlock
      );
    }
#endif
    pDb->pWorker->nBlock = ctx.nBlock;
  }

  return rc;
}


/*
** This function is called during database shutdown (when the number of
** connections drops from one to zero). It truncates the database file







>







185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
      lsmLogMessage(pDb, 0, 
          "dbTruncate(): truncated db to %d blocks",ctx.nBlock
      );
    }
#endif
    pDb->pWorker->nBlock = ctx.nBlock;
  }
#endif
  return rc;
}


/*
** This function is called during database shutdown (when the number of
** connections drops from one to zero). It truncates the database file
Changes to src/lsm_sorted.c.
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
  return rc;
}


static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
  int rc;

  assert( pCsr->aTree==0 );

  rc = multiCursorAllocTree(pCsr);
  if( rc==LSM_OK ){
    int i;
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bRev);
    }
  }







<
<







3835
3836
3837
3838
3839
3840
3841


3842
3843
3844
3845
3846
3847
3848
  return rc;
}


static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
  int rc;



  rc = multiCursorAllocTree(pCsr);
  if( rc==LSM_OK ){
    int i;
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bRev);
    }
  }
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
4049
4050
4051
4052

  iVal = pCsr->aTree[1];
  mergeRangeDeletes(pCsr, &iVal, &eType);

  if( eType!=0 ){
    if( pMW->aGobble ){
      int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
      if( iGobble<pCsr->nPtr ){
        SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
        if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
          pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
        }
      }
    }








|







4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
4049
4050

  iVal = pCsr->aTree[1];
  mergeRangeDeletes(pCsr, &iVal, &eType);

  if( eType!=0 ){
    if( pMW->aGobble ){
      int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
      if( iGobble<pCsr->nPtr && iGobble>=0 ){
        SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
        if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
          pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
        }
      }
    }

4068
4069
4070
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
4103
4104
4105
    }
  }

  /* Advance the cursor to the next input record (assuming one exists). */
  assert( lsmMCursorValid(pMW->pCsr) );
  if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);

  /* If the cursor is at EOF, the merge is finished. Release all page
  ** references currently held by the merge worker and inform the 
  ** FileSystem object that no further pages will be appended to either 
  ** the main or separators array. 
  */
  if( rc==LSM_OK && !lsmMCursorValid(pMW->pCsr) ){

    mergeWorkerShutdown(pMW, &rc);
    if( pSeg->iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, pSeg);
    }

#ifdef LSM_DEBUG_EXPENSIVE
    if( rc==LSM_OK ){
#if 0
      rc = assertBtreeOk(pDb, pSeg);
      if( pMW->pCsr->pBtCsr ){
        Segment *pNext = &pMW->pLevel->pNext->lhs;
        rc = assertPointersOk(pDb, pSeg, pNext, 0);
      }
#endif
    }
#endif
  }
  return rc;
}

static int mergeWorkerDone(MergeWorker *pMW){
  return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
}








<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







4066
4067
4068
4069
4070
4071
4072
























4073
4074
4075
4076
4077
4078
4079
    }
  }

  /* Advance the cursor to the next input record (assuming one exists). */
  assert( lsmMCursorValid(pMW->pCsr) );
  if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);

























  return rc;
}

static int mergeWorkerDone(MergeWorker *pMW){
  return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
}

4199
4200
4201
4202
4203
4204
4205



4206
4207
4208
4209
4210
4211
4212
4213
4214
4215

    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }
    assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst );




    nWrite = mergeworker.nWork;
    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->flags &= ~LEVEL_INCOMPLETE;
    if( eTree==TREE_NONE ){
      pNew->flags |= LEVEL_FREELIST_ONLY;
    }
    pNew->pMerge = 0;
  }








>
>
>
|

<







4173
4174
4175
4176
4177
4178
4179
4180
4181
4182
4183
4184

4185
4186
4187
4188
4189
4190
4191

    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }
    assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst );
    mergeWorkerShutdown(&mergeworker, &rc);
    if( rc==LSM_OK && pNew->lhs.iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, &pNew->lhs);
    }
    nWrite = mergeworker.nWork;

    pNew->flags &= ~LEVEL_INCOMPLETE;
    if( eTree==TREE_NONE ){
      pNew->flags |= LEVEL_FREELIST_ONLY;
    }
    pNew->pMerge = 0;
  }

4223
4224
4225
4226
4227
4228
4229
4230
4231
4232
4233
4234
4235
4236
4237
    }else if( pDel ){
      assert( pNew->pNext==pDel );
      pNew->pNext = pDel->pNext;
      lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
      sortedFreeLevel(pDb->pEnv, pDel);
    }

#if 0
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));







|







4199
4200
4201
4202
4203
4204
4205
4206
4207
4208
4209
4210
4211
4212
4213
    }else if( pDel ){
      assert( pNew->pNext==pDel );
      pNew->pNext = pDel->pNext;
      lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
      sortedFreeLevel(pDb->pEnv, pDel);
    }

#if 1
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
      rc = lsmBlockFree(pDb, iFrom);

      *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
      pLvl->lhs.pRedirect = &p->redirect;
    }
  }

#if 0
  if( rc==LSM_OK ){
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "move-block");
  }
#endif
  return rc;
}








|







4614
4615
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
      rc = lsmBlockFree(pDb, iFrom);

      *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
      pLvl->lhs.pRedirect = &p->redirect;
    }
  }

#if 1
  if( rc==LSM_OK ){
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "move-block");
  }
#endif
  return rc;
}

4678
4679
4680
4681
4682
4683
4684


4685
4686
4687


4688
4689
4690

4691
4692
4693
4694

4695
4696















4697
4698
4699

4700
4701
4702
4703
4704
4705
4706
4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717




4718



















4719
4720
4721
4722
4723
4724
4725
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
4761






4762
4763
4764
4765
4766
4767
4768

4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
        rc = sortedMoveBlock(pDb, &nDone);
      }
      nRemaining -= nDone;

      /* Could not find any work to do. Finished. */
      if( nDone==0 ) break;
    }else{


      MergeWorker mergeworker;    /* State used to work on the level merge */

      assert( pDb->bIncrMerge==0 );


      pDb->bIncrMerge = 1;
      rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
      assert( mergeworker.nWork==0 );

      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && mergeworker.nWork<nRemaining 
      ){

        rc = mergeWorkerStep(&mergeworker);
      }















      nRemaining -= LSM_MAX(mergeworker.nWork, 1);
      pDb->bIncrMerge = 0;


      /* Check if the merge operation is completely finished. If so, the
      ** Merge object and the right-hand-side of the level can be deleted. 
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
      */
      if( rc==LSM_OK ){
        if( mergeWorkerDone(&mergeworker)==0 ){
          int i;
          for(i=0; i<pLevel->nRight; i++){
            SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
            if( pGobble->pSeg->iRoot ){
              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
            }else if( mergeworker.aGobble[i] ){
              lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
            }
          }




        }else if( pLevel->lhs.iFirst==0 ){



















          /* If the new level is completely empty, remove it from the 
          ** database snapshot. This can only happen if all input keys were
          ** annihilated. Since keys are only annihilated if the new level
          ** is the last in the linked list (contains the most ancient of
          ** database content), this guarantees that pLevel->pNext==0.  */ 

          Level *pTop;          /* Top level of worker snapshot */
          Level **pp;           /* Read/write iterator for Level.pNext list */
          int i;
          assert( pLevel->pNext==0 );

          /* Remove the level from the worker snapshot. */
          pTop = lsmDbSnapshotLevel(pWorker);
          for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
          *pp = pLevel->pNext;
          lsmDbSnapshotSetLevel(pWorker, pTop);

          /* Free the Level structure. */
          lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs);
          for(i=0; i<pLevel->nRight; i++){
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
          }
          sortedFreeLevel(pDb->pEnv, pLevel);
        }else{
          int i;

          /* Free the separators of the next level, if required. */
          if( pLevel->pMerge->nInput > pLevel->nRight ){
            assert( pLevel->pNext->lhs.iRoot );
            pLevel->pNext->lhs.iRoot = 0;
          }

          /* Free the right-hand-side of pLevel */
          for(i=0; i<pLevel->nRight; i++){
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
          }
          lsmFree(pDb->pEnv, pLevel->aRhs);
          pLevel->nRight = 0;
          pLevel->aRhs = 0;

          /* Free the Merge object */
          lsmFree(pDb->pEnv, pLevel->pMerge);
          pLevel->pMerge = 0;






        }
      }

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);

      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than







>
>



>
>



>


|

>

|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

<

>
|
<
<
|
|
|
<
<










>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
<
|
|
|
|

|
|
|
|
|

|
<
<
<
<
|
|
<

|
|
|
|
|

|
<
<
<
|
|
|

|
|
|
>
>
>
>
>
>







>


|







4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694

4695
4696
4697


4698
4699
4700


4701
4702
4703
4704
4705
4706
4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719
4720
4721
4722
4723
4724
4725
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739

4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751




4752
4753

4754
4755
4756
4757
4758
4759
4760
4761



4762
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
        rc = sortedMoveBlock(pDb, &nDone);
      }
      nRemaining -= nDone;

      /* Could not find any work to do. Finished. */
      if( nDone==0 ) break;
    }else{
      int bSave = 0;
      Freelist freelist = {0, 0, 0};
      MergeWorker mergeworker;    /* State used to work on the level merge */

      assert( pDb->bIncrMerge==0 );
      assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );

      pDb->bIncrMerge = 1;
      rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
      assert( mergeworker.nWork==0 );
      
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
      ){
        int eType = rtTopic(mergeworker.pCsr->eType);
        rc = mergeWorkerStep(&mergeworker);

        if( rc==LSM_OK 
         && nMerge==1 
         && mergeworker.pLevel==pWorker->pLevel
         && eType==0
         && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
        ){
          assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
          rc = multiCursorVisitFreelist(mergeworker.pCsr);
          if( rc==LSM_OK ){
            rc = multiCursorSetupTree(mergeworker.pCsr, 0);
            pDb->pFreelist = &freelist;
            pDb->bUseFreelist = 1;
          }
        }
      }
      nRemaining -= LSM_MAX(mergeworker.nWork, 1);


      if( rc==LSM_OK ){
        /* Check if the merge operation is completely finished. If not,


        ** gobble up (declare eligible for recycling) any pages from rhs
        ** segments for which the content has been completely merged into 
        ** the lhs of the level.  */


        if( mergeWorkerDone(&mergeworker)==0 ){
          int i;
          for(i=0; i<pLevel->nRight; i++){
            SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
            if( pGobble->pSeg->iRoot ){
              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
            }else if( mergeworker.aGobble[i] ){
              lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
            }
          }
        }else{
          int i;
          int bEmpty;
          mergeWorkerShutdown(&mergeworker, &rc);
          bEmpty = (pLevel->lhs.iFirst==0);

          if( bEmpty==0 && rc==LSM_OK ){
            rc = lsmFsSortedFinish(pDb->pFS, &pLevel->lhs);
          }

          if( pDb->bUseFreelist ){
            Freelist *p = &pDb->pWorker->freelist;
            lsmFree(pDb->pEnv, p->aEntry);
            memcpy(p, &freelist, sizeof(freelist));
            pDb->bUseFreelist = 0;
            pDb->pFreelist = 0;
            bSave = 1;
          }

          for(i=0; i<pLevel->nRight; i++){
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
          }

          if( bEmpty ){
            /* If the new level is completely empty, remove it from the 
            ** database snapshot. This can only happen if all input keys were
            ** annihilated. Since keys are only annihilated if the new level
            ** is the last in the linked list (contains the most ancient of
            ** database content), this guarantees that pLevel->pNext==0.  */ 

            Level *pTop;          /* Top level of worker snapshot */
            Level **pp;           /* Read/write iterator for Level.pNext list */

            assert( pLevel->pNext==0 );

            /* Remove the level from the worker snapshot. */
            pTop = lsmDbSnapshotLevel(pWorker);
            for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
            *pp = pLevel->pNext;
            lsmDbSnapshotSetLevel(pWorker, pTop);

            /* Free the Level structure. */




            sortedFreeLevel(pDb->pEnv, pLevel);
          }else{


            /* Free the separators of the next level, if required. */
            if( pLevel->pMerge->nInput > pLevel->nRight ){
              assert( pLevel->pNext->lhs.iRoot );
              pLevel->pNext->lhs.iRoot = 0;
            }

            /* Zero the right-hand-side of pLevel */



            lsmFree(pDb->pEnv, pLevel->aRhs);
            pLevel->nRight = 0;
            pLevel->aRhs = 0;

            /* Free the Merge object */
            lsmFree(pDb->pEnv, pLevel->pMerge);
            pLevel->pMerge = 0;
          }

          if( bSave && rc==LSM_OK ){
            pDb->bIncrMerge = 0;
            rc = lsmSaveWorker(pDb, 0);
          }
        }
      }

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      pDb->bIncrMerge = 0;
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 1
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
5560
5561
5562
5563
5564
5565
5566
5567
5568
5569
5570
5571
5572
5573
5574
    static int nCall = 0;
    Level *pLevel;
    int iLevel = 0;

    nCall++;
    lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);

    /* if( nCall>639 ) bKeys = 1; */

    for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
      char zLeft[1024];
      char zRight[1024];
      int i = 0;

      Segment *aLeft[24];  







|







5574
5575
5576
5577
5578
5579
5580
5581
5582
5583
5584
5585
5586
5587
5588
    static int nCall = 0;
    Level *pLevel;
    int iLevel = 0;

    nCall++;
    lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);

    if( nCall==275 || nCall==276 ) bKeys = 1;

    for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
      char zLeft[1024];
      char zRight[1024];
      int i = 0;

      Segment *aLeft[24];  
5631
5632
5633
5634
5635
5636
5637


5638
5639
5640
5641
5642
5643
5644
        sortedDumpSegment(pDb, &pLevel->lhs, bVals);
        for(i=0; i<pLevel->nRight; i++){
          sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
        }
      }
    }
  }


}

void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
  Level *pNext;
  Level *p;

  for(p=pLevel; p; p=pNext){







>
>







5645
5646
5647
5648
5649
5650
5651
5652
5653
5654
5655
5656
5657
5658
5659
5660
        sortedDumpSegment(pDb, &pLevel->lhs, bVals);
        for(i=0; i<pLevel->nRight; i++){
          sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
        }
      }
    }
  }

  assert( lsmFsIntegrityCheck(pDb) );
}

void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
  Level *pNext;
  Level *p;

  for(p=pLevel; p; p=pNext){