Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Keep the contents of Segment structures up to date at all times, so that none of the page numbers contained within are subject to redirection.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | block-redirects
Files: files | file ages | folders
SHA1: a89abc21170fc3926c7d6071cbe7b0f2cf0f5b5f
User & Date: dan 2013-01-23 18:13:49.601
Context
2013-01-24
16:45
Improve the integrity-check assert that ensures all blocks in the file are accounted for. Roll any in-memory free-list records into the end of the main segment when executing lsm_work(nmerge=1). check-in: e5edba9caa user: dan tags: block-redirects
2013-01-23
18:13
Keep the contents of Segment structures up to date at all times, so that none of the page numbers contained within are subject to redirection. check-in: a89abc2117 user: dan tags: block-redirects
2013-01-22
20:07
Several block-redirect related bugfixes. check-in: a56a334333 user: dan tags: block-redirects
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest_func.c.
42
43
44
45
46
47
48





49
50
51
52
53
54
55
  if( rc!=LSM_OK ){
    testPrintError("lsm_open(): rc=%d\n", rc);
  }else{
    rc = lsm_open(pDb, zDb);
    if( rc!=LSM_OK ){
      testPrintError("lsm_open(): rc=%d\n", rc);
    }else{





      rc = lsm_work(pDb, nMerge, nWork, 0);
      if( rc!=LSM_OK ){
        testPrintError("lsm_work(): rc=%d\n", rc);
      }
    }
  }
  if( rc==LSM_OK ){







>
>
>
>
>







42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
  if( rc!=LSM_OK ){
    testPrintError("lsm_open(): rc=%d\n", rc);
  }else{
    rc = lsm_open(pDb, zDb);
    if( rc!=LSM_OK ){
      testPrintError("lsm_open(): rc=%d\n", rc);
    }else{
      int n = -1;
      lsm_config(pDb, LSM_CONFIG_BLOCK_SIZE, &n);
      n = n*2;
      lsm_config(pDb, LSM_CONFIG_AUTOCHECKPOINT, &n);

      rc = lsm_work(pDb, nMerge, nWork, 0);
      if( rc!=LSM_OK ){
        testPrintError("lsm_work(): rc=%d\n", rc);
      }
    }
  }
  if( rc==LSM_OK ){
Changes to src/lsm_file.c.
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
  Page *p;
  int iHash;
  int rc = LSM_OK;

  /* In most cases iReal is the same as iPg. Except, if pRedirect is not
  ** NULL, and the block containing iPg has been redirected, then iReal
  ** is the page number after redirection.  */
  Pgno iReal = iPg;
  if( pRedirect ){
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    int iBlk = fsPageToBlock(pFS, iPg);
    int i;
    for(i=0; i<pRedirect->n; i++){
      int iFrom = pRedirect->a[i].iFrom;
      if( iFrom>iBlk ) break;
      if( iFrom==iBlk ){
        int iTo = pRedirect->a[i].iTo;
        iReal = iPg - (Pgno)(iFrom - iTo) * nPagePerBlock;
        if( iTo==1 ){
          iReal += (fsFirstPageOnBlock(pFS, 1)-1);
        }
        break;
      }
    }
  }

  assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
  assert( iReal>=fsFirstPageOnBlock(pFS, 1) );
  *ppPg = 0;

  assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
  if( pFS->bUseMmap ){







|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







1185
1186
1187
1188
1189
1190
1191
1192

















1193
1194
1195
1196
1197
1198
1199
  Page *p;
  int iHash;
  int rc = LSM_OK;

  /* In most cases iReal is the same as iPg. Except, if pRedirect is not
  ** NULL, and the block containing iPg has been redirected, then iReal
  ** is the page number after redirection.  */
  Pgno iReal = lsmFsRedirectPage(pFS, pRedirect, iPg);


















  assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
  assert( iReal>=fsFirstPageOnBlock(pFS, 1) );
  *ppPg = 0;

  assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
  if( pFS->bUseMmap ){
1463
1464
1465
1466
1467
1468
1469













1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487



1488
1489
1490
1491
1492
1493
1494
    if( fsPageToBlock(pFS, iPg)==iBlk && (iRet==0 || iPg<iRet) ){
      iRet = iPg;
    }
  }
  return iRet;
}














/*
** Argument aPgno is an array of nPgno page numbers. All pages belong to
** the segment pRun. This function gobbles from the start of the run to the
** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is
** the new first page of the run).
*/
void lsmFsGobble(
  lsm_db *pDb,
  Segment *pRun, 
  Pgno *aPgno,
  int nPgno
){
  int rc = LSM_OK;
  FileSystem *pFS = pDb->pFS;
  Snapshot *pSnapshot = pDb->pWorker;
  int iBlk;

  assert( pRun->nSize>0 );



  iBlk = fsPageToBlock(pFS, pRun->iFirst);
  pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));

  while( rc==LSM_OK ){
    int iNext = 0;
    Pgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
    if( iFirst ){







>
>
>
>
>
>
>
>
>
>
>
>
>


















>
>
>







1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
    if( fsPageToBlock(pFS, iPg)==iBlk && (iRet==0 || iPg<iRet) ){
      iRet = iPg;
    }
  }
  return iRet;
}

#ifndef NDEBUG
static int fsPageRedirects(FileSystem *pFS, Segment *p, Pgno iPg){
  return (iPg!=0 && iPg!=lsmFsRedirectPage(pFS, p->pRedirect, iPg));
}
static int fsSegmentRedirects(FileSystem *pFS, Segment *p){
  return (
      fsPageRedirects(pFS, p, p->iFirst)
   || fsPageRedirects(pFS, p, p->iRoot)
   || fsPageRedirects(pFS, p, p->iLastPg)
  );
}
#endif

/*
** Argument aPgno is an array of nPgno page numbers. All pages belong to
** the segment pRun. This function gobbles from the start of the run to the
** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is
** the new first page of the run).
*/
void lsmFsGobble(
  lsm_db *pDb,
  Segment *pRun, 
  Pgno *aPgno,
  int nPgno
){
  int rc = LSM_OK;
  FileSystem *pFS = pDb->pFS;
  Snapshot *pSnapshot = pDb->pWorker;
  int iBlk;

  assert( pRun->nSize>0 );
  assert( 0==fsSegmentRedirects(pFS, pRun) );
  assert( nPgno>0 && 0==fsPageRedirects(pFS, pRun, aPgno[0]) );

  iBlk = fsPageToBlock(pFS, pRun->iFirst);
  pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));

  while( rc==LSM_OK ){
    int iNext = 0;
    Pgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
    if( iFirst ){
1575
1576
1577
1578
1579
1580
1581

1582
1583
1584
1585
1586
1587
1588
** caller using lsmFsPageRelease().
*/
int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
  int rc = LSM_OK;
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;


  if( pFS->pCompress ){
    int nSpace = pPg->nCompress + 2*3;

    /* TODO: Fix redirects */
    assert( 0 );

    do {







>







1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
** caller using lsmFsPageRelease().
*/
int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
  int rc = LSM_OK;
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;

  assert( 0==fsSegmentRedirects(pFS, pRun) );
  if( pFS->pCompress ){
    int nSpace = pPg->nCompress + 2*3;

    /* TODO: Fix redirects */
    assert( 0 );

    do {
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
        assert( pPg->flags & PAGE_HASPREV );
        iPg = fsLastPageOnBlock(pFS, lsmGetU32(&pPg->aData[-4]));
      }else{
        iPg--;
      }
    }else{
      if( pRun ){
        Pgno iLast = lsmFsRedirectPage(pFS, pRedir, pRun->iLastPg);
        if( iPg==iLast ){
          *ppNext = 0;
          return LSM_OK;
        }
      }

      if( fsIsLast(pFS, iPg) ){
        int iBlk = fsRedirectBlock(







<
|







1616
1617
1618
1619
1620
1621
1622

1623
1624
1625
1626
1627
1628
1629
1630
        assert( pPg->flags & PAGE_HASPREV );
        iPg = fsLastPageOnBlock(pFS, lsmGetU32(&pPg->aData[-4]));
      }else{
        iPg--;
      }
    }else{
      if( pRun ){

        if( iPg==pRun->iLastPg ){
          *ppNext = 0;
          return LSM_OK;
        }
      }

      if( fsIsLast(pFS, iPg) ){
        int iBlk = fsRedirectBlock(
1920
1921
1922
1923
1924
1925
1926














1927
1928
1929
1930
1931
1932

1933
1934
1935
1936



1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949


1950











1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963





1964
1965
1966
1967
1968
1969
1970

/*
** Return true if page is currently writable.
*/
int lsmFsPageWritable(Page *pPg){
  return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
}















/*
** Copy the contents of block iFrom to block iTo. 
**
** It is safe to assume that there are no outstanding references to pages 
** on block iTo. And that block iFrom is not currently being written.

*/
int lsmFsMoveBlock(FileSystem *pFS, int iTo, int iFrom){
  Snapshot *p = pFS->pDb->pWorker;
  int rc = LSM_OK;



  
  assert( iTo!=1 );
  assert( iFrom>iTo );

  if( pFS->bUseMmap ){
    fsGrowMapping(pFS, (i64)iFrom * pFS->nBlocksize, &rc);
    if( rc==LSM_OK ){
      u8 *aMap = (u8 *)(pFS->pMap);
      i64 iFromOff = (i64)(iFrom-1) * pFS->nBlocksize;
      i64 iToOff = (i64)(iTo-1) * pFS->nBlocksize;
      memcpy(&aMap[iToOff], &aMap[iFromOff], pFS->nBlocksize);
    }
  }else{


    assert( 0 );











  }

  /* Update append-point list if necessary */
  if( rc==LSM_OK ){
    int i;
    for(i=0; i<LSM_APPLIST_SZ; i++){
      if( fsPageToBlock(pFS, p->aiAppend[i])==iFrom ){
        const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
        p->aiAppend[i] -= (i64)(iFrom-iTo) * nPagePerBlock;
      }
    }
  }






  return rc;
}

/*
** Append raw data to a segment. This function is only used in compressed
** database mode.
*/







>
>
>
>
>
>
>
>
>
>
>
>
>
>





|
>

|


>
>
>








<
<



>
>
|
>
>
>
>
>
>
>
>
>
>
>













>
>
>
>
>







1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961


1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003

/*
** Return true if page is currently writable.
*/
int lsmFsPageWritable(Page *pPg){
  return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
}

static void fsMovePage(
  FileSystem *pFS, 
  Segment *pSeg, 
  int iTo, 
  int iFrom, 
  Pgno *piPg
){
  Pgno iPg = *piPg;
  if( iFrom==fsPageToBlock(pFS, iPg) ){
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    *piPg = iPg - (Pgno)(iFrom - iTo) * nPagePerBlock;
  }
}

/*
** Copy the contents of block iFrom to block iTo. 
**
** It is safe to assume that there are no outstanding references to pages 
** on block iTo. And that block iFrom is not currently being written. In
** other words, the data can be read and written directly.
*/
int lsmFsMoveBlock(FileSystem *pFS, Segment *pSeg, int iTo, int iFrom){
  Snapshot *p = pFS->pDb->pWorker;
  int rc = LSM_OK;

  i64 iFromOff = (i64)(iFrom-1) * pFS->nBlocksize;
  i64 iToOff = (i64)(iTo-1) * pFS->nBlocksize;
  
  assert( iTo!=1 );
  assert( iFrom>iTo );

  if( pFS->bUseMmap ){
    fsGrowMapping(pFS, (i64)iFrom * pFS->nBlocksize, &rc);
    if( rc==LSM_OK ){
      u8 *aMap = (u8 *)(pFS->pMap);


      memcpy(&aMap[iToOff], &aMap[iFromOff], pFS->nBlocksize);
    }
  }else{
    int nSz = pFS->nPagesize;
    u8 *aData = (u8 *)lsmMallocRc(pFS->pEnv, nSz, &rc);
    if( rc==LSM_OK ){
      const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
      int i;
      for(i=0; rc==LSM_OK && i<nPagePerBlock; i++){
        i64 iOff = iFromOff + i*nSz;
        rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
        if( rc==LSM_OK ){
          iOff = iToOff + i*nSz;
          rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
        }
      }
    }
  }

  /* Update append-point list if necessary */
  if( rc==LSM_OK ){
    int i;
    for(i=0; i<LSM_APPLIST_SZ; i++){
      if( fsPageToBlock(pFS, p->aiAppend[i])==iFrom ){
        const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
        p->aiAppend[i] -= (i64)(iFrom-iTo) * nPagePerBlock;
      }
    }
  }

  /* Update the Segment structure itself */
  fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iFirst);
  fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iLastPg);
  fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iRoot);

  return rc;
}

/*
** Append raw data to a segment. This function is only used in compressed
** database mode.
*/
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619

2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
  int nUsed,
  u8 *aUsed
){
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      Redirect *pRedir = pSeg->pRedirect;
      int rc;
      Pgno iLast;                 /* Last page of segment (post-redirection) */
      Pgno iFirst;                /* First page of segment (post-redirection) */
      int iBlk;                   /* Current block (during iteration) */
      int iLastBlk;               /* Last real block of segment */
      int bLastIsLastOnBlock;     /* True iLast is the last on its block */


      iBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iFirst));
      iLastBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iLastPg));
      iLast = lsmFsRedirectPage(pFS, pRedir, pSeg->iLastPg);
      iFirst = lsmFsRedirectPage(pFS, pRedir, pSeg->iFirst);

      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==iLast);
      assert( iBlk>0 );

      /* If the first page of this run is also the first page of its first
      ** block, set the flag to indicate that the first page of iBlk is 
      ** in use.  */
      if( fsFirstPageOnBlock(pFS, iBlk)==iFirst ){
        assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
        aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
      }

      do {
        /* iBlk is a part of this sorted run. */
        aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;







<
<




>
|
|
<
<

|





|







2640
2641
2642
2643
2644
2645
2646


2647
2648
2649
2650
2651
2652
2653


2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
  int nUsed,
  u8 *aUsed
){
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      Redirect *pRedir = pSeg->pRedirect;
      int rc;


      int iBlk;                   /* Current block (during iteration) */
      int iLastBlk;               /* Last real block of segment */
      int bLastIsLastOnBlock;     /* True iLast is the last on its block */

      assert( 0==fsSegmentRedirects(pFS, pSeg) );
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);



      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
      assert( iBlk>0 );

      /* If the first page of this run is also the first page of its first
      ** block, set the flag to indicate that the first page of iBlk is 
      ** in use.  */
      if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst ){
        assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
        aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
      }

      do {
        /* iBlk is a part of this sorted run. */
        aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;
Changes to src/lsm_sorted.c.
2252
2253
2254
2255
2256
2257
2258




















2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
  for(i=0; i<nRhs; i++){
    pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
    pCsr->aPtr[i].pLevel = pLvl;
  }

  return LSM_OK;
}





















static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
  Level *pLvl;
  int nPtr = 0;
  int iPtr = 0;
  int rc = LSM_OK;

  for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
    /* If the LEVEL_INCOMPLETE flag is set, then this function is being
    ** called (indirectly) from within a sortedNewToplevel() call to
    ** construct pLvl. In this case ignore pLvl - this cursor is going to
    ** be used to retrieve a freelist entry from the LSM, and the partially
    ** complete level may confuse it.  */
    if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
    nPtr += (1 + pLvl->nRight);
  }

  assert( pCsr->aPtr==0 );
  pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
  if( rc==LSM_OK ) pCsr->nPtr = nPtr;

  for(pLvl=pSnap->pLevel; pLvl && rc==LSM_OK; pLvl=pLvl->pNext){
    int i;
    if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
    pCsr->aPtr[iPtr].pLevel = pLvl;
    pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
    iPtr++;
    for(i=0; i<pLvl->nRight; i++){
      pCsr->aPtr[iPtr].pLevel = pLvl;
      pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
      iPtr++;
    }

    if( pLvl->nRight && pLvl->pSplitKey==0 ){
      sortedSplitkey(pCsr->pDb, pLvl, &rc);
    }
  }

  return rc;
}

static int multiCursorInit(MultiCursor *pCsr, Snapshot *pSnap){







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



















<

|
<
|
<
<
<
<
<
<
<
<
<
<
|







2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297

2298
2299

2300










2301
2302
2303
2304
2305
2306
2307
2308
  for(i=0; i<nRhs; i++){
    pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
    pCsr->aPtr[i].pLevel = pLvl;
  }

  return LSM_OK;
}

static void multiCursorAddOne(MultiCursor *pCsr, Level *pLvl, int *pRc){
  if( *pRc==LSM_OK ){
    int iPtr = pCsr->nPtr;
    int i;
    pCsr->aPtr[iPtr].pLevel = pLvl;
    pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
    iPtr++;
    for(i=0; i<pLvl->nRight; i++){
      pCsr->aPtr[iPtr].pLevel = pLvl;
      pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
      iPtr++;
    }

    if( pLvl->nRight && pLvl->pSplitKey==0 ){
      sortedSplitkey(pCsr->pDb, pLvl, pRc);
    }
    pCsr->nPtr = iPtr;
  }
}

static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
  Level *pLvl;
  int nPtr = 0;
  int iPtr = 0;
  int rc = LSM_OK;

  for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
    /* If the LEVEL_INCOMPLETE flag is set, then this function is being
    ** called (indirectly) from within a sortedNewToplevel() call to
    ** construct pLvl. In this case ignore pLvl - this cursor is going to
    ** be used to retrieve a freelist entry from the LSM, and the partially
    ** complete level may confuse it.  */
    if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
    nPtr += (1 + pLvl->nRight);
  }

  assert( pCsr->aPtr==0 );
  pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);


  for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){

    if( (pLvl->flags & LEVEL_INCOMPLETE)==0 ){










      multiCursorAddOne(pCsr, pLvl, &rc);
    }
  }

  return rc;
}

static int multiCursorInit(MultiCursor *pCsr, Snapshot *pSnap){
4115
4116
4117
4118
4119
4120
4121
4122

4123
4124
4125
4126
4127
4128
4129
  int eTree,                      /* One of the TREE_XXX constants */
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */

  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;

  if( eTree!=TREE_NONE ){
    rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
  }








|
>







4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138
  int eTree,                      /* One of the TREE_XXX constants */
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pLinked = 0;           /* Delete separators from this segment */
  Level *pDel = 0;                /* Delete this entire level */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;

  if( eTree!=TREE_NONE ){
    rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
  }

4146
4147
4148
4149
4150
4151
4152
4153
4154
4155

4156


4157
4158

4159
4160
4161
4162
4163
4164
4165
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    pCsr->pDb = pDb;
    rc = multiCursorVisitFreelist(pCsr);
    if( rc==LSM_OK ){
      rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    }
    if( rc==LSM_OK 
        && pNext && pNext->pMerge==0 && pNext->lhs.iRoot 
        && (eTree!=TREE_NONE || (pNext->flags & LEVEL_FREELIST_ONLY))

    ){


      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);

    }

    /* If this will be the only segment in the database, discard any delete
    ** markers present in the in-memory tree.  */
    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }







|
<
|
>
|
>
>
|
|
>







4155
4156
4157
4158
4159
4160
4161
4162

4163
4164
4165
4166
4167
4168
4169
4170
4171
4172
4173
4174
4175
4176
4177
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    pCsr->pDb = pDb;
    rc = multiCursorVisitFreelist(pCsr);
    if( rc==LSM_OK ){
      rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    }
    if( rc==LSM_OK && pNext && pNext->pMerge==0 ){

      if( (pNext->flags & LEVEL_FREELIST_ONLY) ){
        pDel = pNext;
        pCsr->aPtr = lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr), &rc);
        multiCursorAddOne(pCsr, pNext, &rc);
      }else if( eTree!=TREE_NONE && pNext->lhs.iRoot ){
        pLinked = &pNext->lhs;
        rc = btreeCursorNew(pDb, pLinked, &pCsr->pBtCsr);
      }
    }

    /* If this will be the only segment in the database, discard any delete
    ** markers present in the in-memory tree.  */
    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
4202
4203
4204
4205
4206
4207
4208

4209






4210
4211
4212
4213
4214
4215
4216
  }

  if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
    assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }else{

    if( pDel ) pDel->iRoot = 0;







#if 0
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;







>
|
>
>
>
>
>
>







4214
4215
4216
4217
4218
4219
4220
4221
4222
4223
4224
4225
4226
4227
4228
4229
4230
4231
4232
4233
4234
4235
  }

  if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
    assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }else{
    if( pLinked ){
      pLinked->iRoot = 0;
    }else if( pDel ){
      assert( pNew->pNext==pDel );
      pNew->pNext = pDel->pNext;
      lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
      sortedFreeLevel(pDb->pEnv, pDel);
    }

#if 0
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
4474
4475
4476
4477
4478
4479
4480
4481
4482
4483
4484
4485
4486
4487
4488
4489
  ** with the largest number of right-hand segments. Work on it. */
  for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
    if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
      nThis++;
    }else{
      if( nThis>nBest ){
        if( (pLevel->iAge!=pThis->iAge+1)
            || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
          ){
          pBest = pThis;
          nBest = nThis;
        }
      }
      if( pLevel->nRight ){
        if( pLevel->nRight>nBest ){
          nBest = pLevel->nRight;







|
|







4493
4494
4495
4496
4497
4498
4499
4500
4501
4502
4503
4504
4505
4506
4507
4508
  ** with the largest number of right-hand segments. Work on it. */
  for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
    if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
      nThis++;
    }else{
      if( nThis>nBest ){
        if( (pLevel->iAge!=pThis->iAge+1)
         || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
        ){
          pBest = pThis;
          nBest = nThis;
        }
      }
      if( pLevel->nRight ){
        if( pLevel->nRight>nBest ){
          nBest = pLevel->nRight;
4598
4599
4600
4601
4602
4603
4604
4605
4606
4607
4608
4609
4610
4611
4612

  /* Find the first free block in the database, ignoring block 1. Block
  ** 1 is tricky as it is smaller than the other blocks.  */
  rc = lsmBlockAllocate(pDb, iFrom, &iTo);
  if( rc!=LSM_OK || iTo==0 ) return rc;
  assert( iTo!=1 && iTo<iFrom );

  rc = lsmFsMoveBlock(pDb->pFS, iTo, iFrom);
  if( rc==LSM_OK ){
    if( p->redirect.a==0 ){
      int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS;
      p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
    }
    if( rc==LSM_OK ){
      memmove(&p->redirect.a[1], &p->redirect.a[0], 







|







4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631

  /* Find the first free block in the database, ignoring block 1. Block
  ** 1 is tricky as it is smaller than the other blocks.  */
  rc = lsmBlockAllocate(pDb, iFrom, &iTo);
  if( rc!=LSM_OK || iTo==0 ) return rc;
  assert( iTo!=1 && iTo<iFrom );

  rc = lsmFsMoveBlock(pDb->pFS, &pLvl->lhs, iTo, iFrom);
  if( rc==LSM_OK ){
    if( p->redirect.a==0 ){
      int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS;
      p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
    }
    if( rc==LSM_OK ){
      memmove(&p->redirect.a[1], &p->redirect.a[0], 
4879
4880
4881
4882
4883
4884
4885


4886
4887
4888
4889
4890
4891
4892
){
  Snapshot *pWorker;              /* Worker snapshot */
  int rc = LSM_OK;                /* Return code */
  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;



  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  if( rc!=LSM_OK ) return rc;
  pWorker = pDb->pWorker;







>
>







4898
4899
4900
4901
4902
4903
4904
4905
4906
4907
4908
4909
4910
4911
4912
4913
){
  Snapshot *pWorker;              /* Worker snapshot */
  int rc = LSM_OK;                /* Return code */
  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;

  assert( nPage>0 );

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  if( rc!=LSM_OK ) return rc;
  pWorker = pDb->pWorker;
4979
4980
4981
4982
4983
4984
4985
4986
4987
4988
4989
4990
4991
4992
4993

  if( rc==LSM_OK && bDirty ){
    lsmFinishWork(pDb, 0, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcdummy);
    *pnWrite = 0;
    *pbCkpt = 0;
  }
  assert( pDb->pWorker==0 );
  return rc;
}

static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
  int rc;







<







5000
5001
5002
5003
5004
5005
5006

5007
5008
5009
5010
5011
5012
5013

  if( rc==LSM_OK && bDirty ){
    lsmFinishWork(pDb, 0, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcdummy);
    *pnWrite = 0;

  }
  assert( pDb->pWorker==0 );
  return rc;
}

static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
  int rc;