SQLite4
Check-in [90c0d5d8fe]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Scan the embedded b-trees instead of the separator runs when merging in separator keys from a lower level.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | embedded-btree
Files: files | file ages | folders
SHA1: 90c0d5d8fe6b73f0e48235c47a0dd261458ae75e
User & Date: dan 2012-06-23 20:22:36
Context
2012-06-25
10:02
Fix various glitches to do with iterating through b-tree structures. check-in: a983d57868 user: dan tags: embedded-btree
2012-06-23
20:22
Scan the embedded b-trees instead of the separator runs when merging in separator keys from a lower level. check-in: 90c0d5d8fe user: dan tags: embedded-btree
10:04
Add untested functions to save and restore b-tree cursor positions. check-in: cc0b45d17e user: dan tags: embedded-btree
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_main.c.

297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
      *pRc = rc;
    }
  }
  return pDb;
}

void testReopen(TestDb **ppDb, int *pRc){
  if( *pRc==0 ){
    const char *zLib;
    zLib = tdb_library_name(*ppDb);
    testClose(ppDb);
    *pRc = tdb_open(zLib, 0, 0, ppDb);
  }
}








|







297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
      *pRc = rc;
    }
  }
  return pDb;
}

void testReopen(TestDb **ppDb, int *pRc){
  if( 0 && *pRc==0 ){
    const char *zLib;
    zLib = tdb_library_name(*ppDb);
    testClose(ppDb);
    *pRc = tdb_open(zLib, 0, 0, ppDb);
  }
}

Changes to src/lsm_sorted.c.

264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
...
335
336
337
338
339
340
341


342
343
344
345
346
347
348
...
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
...
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
...
754
755
756
757
758
759
760












761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
...
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836


837
838
839
840
841
842
843
844
845
846
847
848
849

850
851
852
853
854
855
856
...
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
...
938
939
940
941
942
943
944

945
946
947

948
949
950
951
952
953
954
....
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
....
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
....
1845
1846
1847
1848
1849
1850
1851



1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864

1865
1866
1867
1868
1869
1870
1871
....
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927









1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960

1961
1962
1963
1964
1965
1966
1967
....
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135




2136
2137
2138
2139
2140
2141
2142
2143
....
2275
2276
2277
2278
2279
2280
2281



2282
2283
2284
2285
2286
2287
2288
2289
2290
....
2310
2311
2312
2313
2314
2315
2316





2317
2318
2319
2320
2321
2322
2323
....
2555
2556
2557
2558
2559
2560
2561



2562
2563
2564
2565
2566
2567
2568
....
3311
3312
3313
3314
3315
3316
3317

3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332




3333
3334
3335
3336
3337
3338
3339
....
3352
3353
3354
3355
3356
3357
3358


3359
3360
3361




3362
3363
3364
3365
3366
3367
3368
3369
3370




3371
3372
3373
3374
3375
3376
3377
....
3395
3396
3397
3398
3399
3400
3401









3402
3403
3404
3405
3406
3407
3408
3409

3410
3411
3412
3413
3414
3415
3416
....
3463
3464
3465
3466
3467
3468
3469
3470
3471
3472







3473
3474
3475
3476
3477
3478
3479
....
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
....
3819
3820
3821
3822
3823
3824
3825





3826
3827
3828
3829
3830
3831
3832
....
4566
4567
4568
4569
4570
4571
4572



























































































4573
4574
4575
4576
4577
4578
4579
....
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
4700
4701
4702
4703
4704
4705
4706
4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719
4720
4721
4722
4723
4724
4725
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
  int *pnHdrLevel;
  void *pSystemVal;
  Snapshot *pSnap;
};

#define CURSOR_DATA_TREE      0
#define CURSOR_DATA_SYSTEM    1
#define CURSOR_DATA_BTREE     2
#define CURSOR_DATA_SEGMENT   3


/*
** CURSOR_IGNORE_DELETE
**   If set, this cursor will not visit SORTED_DELETE keys.
**
** CURSOR_NEW_SYSTEM
................................................................................
#ifdef LSM_DEBUG_EXPENSIVE
static void assertAllPointersOk(int rc, lsm_db *pDb);
static void assertAllBtreesOk(int rc, lsm_db *);
#else
# define assertAllPointersOk(y, z)
# define assertAllBtreesOk(y, z)
#endif



/*
** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
*/
void lsmPutU16(u8 *aOut, u16 nVal){
  aOut[0] = (u8)((nVal>>8) & 0xFF);
  aOut[1] = (u8)(nVal & 0xFF);
................................................................................
** Return the decoded (possibly relative) pointer value stored in cell 
** iCell from page aData/nData.
*/
static int pageGetRecordPtr(u8 *aData, int nData, int iCell){
  int iRet;                       /* Return value */
  u8 *aCell;                      /* Pointer to cell iCell */

  assert( iCell<pageGetNRec(aData, nData) );
  aCell = pageGetCell(aData, nData, iCell);
  lsmVarintGet32(&aCell[1], &iRet);
  return iRet;
}

static u8 *pageGetKey(
  Page *pPg,                      /* Page to read from */
................................................................................

  }else{
    rc = btreeCursorLoadKey(pCsr);
  }

  if( rc==LSM_OK && pCsr->iPg>=0 ){
    aData = lsmFsPageData(pCsr->aPg[pCsr->iPg].pPage, &nData);
    pCsr->iPtr = btreeCursorPtr(aData, nData, 0);
  }

  return rc;
}

static void btreeCursorFree(BtreeCursor *pCsr){
  if( pCsr ){
................................................................................
    p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
    p->iCell = ((pCsr->aPg[pCsr->iPg].iCell + 1) << 8) + pCsr->nDepth;
  }else{
    p->iPg = 0;
    p->iCell = 0;
  }
}













static int btreeCursorRestore(
  BtreeCursor *pCsr, 
  int (*xCmp)(void *, int, void *, int),
  MergeInput *p
){
  int rc = LSM_OK;
  if( p->iPg ){
    lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
    int iCell;                    /* Current cell number on leaf page */
    Pgno iPg;                     /* Page number of current leaf page */
    int nDepth;                   /* Depth of b-tree structure */

    /* Decode the MergeInput structure */
    iPg = p->iPg;
    nDepth = (p->iCell & 0x00FF);
    iCell = (p->iCell >> 8) - 1;

    /* Allocate the BtreeCursor.aPg[] array */
    assert( pCsr->aPg==0 );
    pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc);

    /* Populate the last entry of the aPg[] array */
    if( rc==LSM_OK ){
      pCsr->iPg = nDepth-1;
      pCsr->nDepth = nDepth;
      pCsr->aPg[pCsr->iPg].iCell = iCell;
      rc = lsmFsDbPageGet(pCsr->pFS, iPg, &pCsr->aPg[nDepth-1].pPage);
    }

    /* Populate any other aPg[] array entries */
    if( rc==LSM_OK && nDepth>1 ){
      Blob blob = {0,0,0};
      void *pSeek;
      int nSeek;
................................................................................
          int iCell;

          aData = lsmFsPageData(pPg, &nData);
          assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );

          iLoad = pageGetPtr(aData, nData);
          iCell = pageGetNRec(aData, nData)-1; 
          iMax = iCell-1;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
            int iTopic;                   /* Topic for key pKeyT/nKeyT */
            int iPtr;                     /* Pointer for cell iTry */
            int res;                      /* (pSeek - pKeyT) */

            rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob);
            if( rc!=LSM_OK ) break;

            res = iTopicSeek - iTopic;
            if( res==0 ) res = xCmp(pSeek, nSeek, pKey, nKey);



            if( res<0 ){
              iLoad = iPtr;
              iCell = iTry;
              iMax = iTry-1;
            }else{
              iMin = iTry+1;
            }
          }

          pCsr->aPg[iPg].pPage = pPg;
          pCsr->aPg[iPg].iCell = iCell;
          iPg++;

        }
      }while( rc==LSM_OK && iPg<(nDepth-1) );
      sortedBlobFree(&blob);
    }

    /* Load the current key and pointer */
    if( rc==LSM_OK ){
................................................................................
      pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
      if( pBtreePg->iCell<0 ){
        int dummy;
        int i;
        for(i=pCsr->iPg-1; i>=0; i--){
          if( pCsr->aPg[i].iCell>0 ) break;
        }
        assert( i>0 );
        rc = pageGetBtreeKey(
            pCsr->aPg[i].pPage, pCsr->aPg[i].iCell,
            &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
        );
        pCsr->eType |= SORTED_SEPARATOR;

      }else{
        rc = btreeCursorLoadKey(pCsr);
      }
    }
  }

  return rc;
}

static int btreeCursorNew(
  lsm_db *pDb,
  SortedRun *pRun,
  BtreeCursor **ppCsr
................................................................................
       && 0!=pageGetNRec(aData, nData)
      ){
        u8 *pKey;
        int nKey;
        int iTopic;
        pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
        assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );

        rc = btreeCursorNext(pCsr);
      }
    }


    if( pPg ) lsmFsPageRelease(pPg);

    btreeCursorFree(pCsr);
    sortedBlobFree(&blob);
  }

................................................................................
      );
    }
  }

  return rc;
}

static int sortedKeyCompare(
  int (*xCmp)(void *, int, void *, int),
  int iLhsTopic, void *pLhsKey, int nLhsKey,
  int iRhsTopic, void *pRhsKey, int nRhsKey
){
  int res = iLhsTopic - iRhsTopic;
  if( res==0 ){
    res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey);
  }
  return res;
}

void lsmSortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
  lsm_env *pEnv = pDb->pEnv;      /* Environment handle */
  int rc = *pRc;
  int i;
  Merge *pMerge = pLevel->pMerge;

  for(i=0; rc==LSM_OK && i<pLevel->nRight; i++){
................................................................................
  iPtrOut = pPtr->iPtr;

  /* Assert that this page is the right page of this segment for the key
  ** that we are searching for. Do this by loading page (iPg-1) and testing
  ** that pKey/nKey is greater than all keys on that page, and then by 
  ** loading (iPg+1) and testing that pKey/nKey is smaller than all
  ** the keys it houses.  */
#if 0
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pRun->nSize==1 
       || lsmFsPageNumber(pPtr->pPg)==pPtr->pRun->iLast
  );
................................................................................
  lsmTreeCursorDestroy(pCsr->pTreeCsr);

  /* Close the sorted file cursors */
  for(i=0; i<pCsr->nSegCsr; i++){
    segmentCursorClose(pEnv, &pCsr->aSegCsr[i]);
  }




  /* Free allocations */
  lsmFree(pEnv, pCsr->aSegCsr);
  lsmFree(pEnv, pCsr->aTree);
  lsmFree(pEnv, pCsr->pSystemVal);

  /* Zero fields */
  pCsr->nSegCsr = 0;
  pCsr->aSegCsr = 0;
  pCsr->nTree = 0;
  pCsr->aTree = 0;
  pCsr->pSystemVal = 0;
  pCsr->pSnap = 0;
  pCsr->pTreeCsr = 0;

}

void lsmMCursorClose(MultiCursor *pCsr){
  if( pCsr ){
    lsm_db *pDb = pCsr->pDb;
    MultiCursor **pp;             /* Iterator variable */

................................................................................
*/
int multiCursorAddLevel(
  MultiCursor *pCsr,              /* Multi-cursor to add segment to */ 
  Level *pLevel,                  /* Level to add to multi-cursor merge */
  int eMode                       /* A MULTICURSOR_ADDLEVEL_*** constant */
){
  int rc = LSM_OK;
  int i;
  int nAdd = (eMode==MULTICURSOR_ADDLEVEL_RHS ? pLevel->nRight : 1);

  assert( eMode==MULTICURSOR_ADDLEVEL_ALL
       || eMode==MULTICURSOR_ADDLEVEL_RHS
       || eMode==MULTICURSOR_ADDLEVEL_LHS_SEP
  );










  for(i=0; i<nAdd; i++){
    LevelCursor *pNew;
    lsm_db *pDb = pCsr->pDb;

    /* Grow the pCsr->aSegCsr array if required */
    if( 0==(pCsr->nSegCsr % 16) ){
      int nByte;
      LevelCursor *aNew;
      nByte = sizeof(LevelCursor) * (pCsr->nSegCsr+16);
      aNew = (LevelCursor *)lsmRealloc(pDb->pEnv, pCsr->aSegCsr, nByte);
      if( aNew==0 ) return LSM_NOMEM_BKPT;
      memset(&aNew[pCsr->nSegCsr], 0, sizeof(LevelCursor)*16);
      pCsr->aSegCsr = aNew;
    }
    pNew = &pCsr->aSegCsr[pCsr->nSegCsr];

    switch( eMode ){
      case MULTICURSOR_ADDLEVEL_ALL:
        rc = levelCursorInit(pDb, pLevel, pCsr->xCmp, pNew);
        break;

      case MULTICURSOR_ADDLEVEL_RHS:
        rc = levelCursorInitRun(pDb, &pLevel->aRhs[i].run, pCsr->xCmp, pNew);
        break;

      case MULTICURSOR_ADDLEVEL_LHS_SEP:
        rc = levelCursorInitRun(pDb, &pLevel->lhs.sep, pCsr->xCmp, pNew);
        break;
    }
    if( pCsr->flags & CURSOR_IGNORE_SYSTEM ){
      pNew->bIgnoreSystem = 1;
    }
    if( rc==LSM_OK ) pCsr->nSegCsr++;

  }

  return rc;
}


static int multiCursorNew(
................................................................................
      else if( pCsr->flags & CURSOR_AT_LEVELS ){
        pKey = (void *)"LEVELS";
        nKey = 6;
        eType = SORTED_SYSTEM_WRITE;
      }
      break;

    case CURSOR_DATA_BTREE: {
      BtreeCursor *pBtCsr = pCsr->pBtCsr;
      if( pBtCsr==0 ){
        eType = 0;
        pKey = 0;
        nKey = 0;
      }else{
        eType = pBtCsr->eType;
        pKey = pBtCsr->pKey;
        nKey = pBtCsr->nKey;
      }
      break;
    }

    default: {
      int iSeg = iKey - CURSOR_DATA_SEGMENT;




      if( iSeg<pCsr->nSegCsr && segmentCursorValid(&pCsr->aSegCsr[iSeg]) ){
        segmentCursorKey(&pCsr->aSegCsr[iSeg], &pKey, &nKey);
        segmentCursorType(&pCsr->aSegCsr[iSeg], &eType);
      }
      break;
    }
  }

................................................................................
  pCsr->aTree[iOut] = iRes;
}

static int multiCursorAllocTree(MultiCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->aTree==0 ){
    int nByte;                    /* Bytes of space to allocate */



    pCsr->nTree = 2;
    while( pCsr->nTree<(CURSOR_DATA_SEGMENT+pCsr->nSegCsr) ){
      pCsr->nTree = pCsr->nTree*2;
    }

    nByte = sizeof(int)*pCsr->nTree*2;
    pCsr->aTree = (int *)lsmMallocZeroRc(pCsr->pDb->pEnv, nByte, &rc);
  }
  return rc;
................................................................................
  if( pCsr->flags & CURSOR_NEW_SYSTEM ){
    assert( bLast==0 );
    pCsr->flags |= CURSOR_AT_FREELIST;
  }
  for(i=0; rc==LSM_OK && i<pCsr->nSegCsr; i++){
    rc = segmentCursorEnd(&pCsr->aSegCsr[i], bLast);
  }






  if( rc==LSM_OK ){
    rc = multiCursorAllocTree(pCsr);
  }

  if( rc==LSM_OK ){
    for(i=pCsr->nTree-1; i>0; i--){
................................................................................

        if( pCsr->flags & CURSOR_AT_FREELIST ){
          pCsr->flags &= ~CURSOR_AT_FREELIST;
          pCsr->flags |= CURSOR_AT_LEVELS;
        }else{
          pCsr->flags &= ~CURSOR_AT_LEVELS;
        }



      }else{
        LevelCursor *pLevel = &pCsr->aSegCsr[iKey-CURSOR_DATA_SEGMENT];
        rc = segmentCursorAdvance(pLevel, bReverse);
      }
      if( rc==LSM_OK ){
        int i;
        for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
................................................................................
  MultiCursor *pCsr = pMW->pCsr;

  /* Unless the merge has finished, save the cursor position in the
  ** Merge.aInput[] array. See function mergeWorkerInit() for the 
  ** code to restore a cursor position based on aInput[].  */
  if( pCsr ){
    Merge *pMerge = pMW->pLevel->pMerge;


    /* pMerge->nInput==0 indicates that this is a FlushTree() operation. */
    assert( pMerge->nInput==0 || pMW->pLevel->nRight>0 );
    assert( pMerge->nInput==0 || pMerge->nInput==pCsr->nSegCsr );

    for(i=0; i<pMerge->nInput; i++){
      SegmentPtr *pPtr = &pCsr->aSegCsr[i].aPtr[0];
      if( pPtr->pPg ){
        pMerge->aInput[i].iPg = lsmFsPageNumber(pPtr->pPg);
        pMerge->aInput[i].iCell = pPtr->iCell;
      }else{
        pMerge->aInput[i].iPg = 0;
        pMerge->aInput[i].iCell = 0;
      }
    }




  }

  lsmMCursorClose(pCsr);
  lsmFsPageRelease(pMW->apPage[0]);
  lsmFsPageRelease(pMW->apPage[1]);

  for(i=0; i<2; i++){
................................................................................
  pMW->apPage[1] = 0;
}

static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc;                         /* Return code */
  SortedRun *pRun;                /* Run containing sep. keys to merge in */
  Page *pPg = 0;                  /* First page of run pRun */



  assert( pMW->apPage[0]==0 );





  pRun = pMW->pCsr->aSegCsr[pMW->pCsr->nSegCsr-1].aPtr[0].pRun;
  rc = lsmFsDbPageGet(pMW->pDb->pFS, pRun->iFirst, &pPg);
  if( rc==LSM_OK ){
    u8 *aData;                    /* Buffer for page pPg */
    int nData;                    /* Size of aData[] in bytes */
    int iFPtr;                    /* Pointer value read from footer of pPg */
    aData = lsmFsPageData(pPg, &nData);
    iFPtr = pageGetPtr(aData, nData);
    lsmFsPageRelease(pPg);




    rc = mergeWorkerNextPage(pMW, 0, iFPtr);
  }

  return rc;
}

static int mergeWorkerStep(MergeWorker *pMW){
................................................................................

  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */









  if( pCsr->nSegCsr ){
    LevelCursor *pPtrs = &pCsr->aSegCsr[pCsr->nSegCsr-1];
    if( segmentCursorValid(pPtrs)
     && 0==pDb->xCmp(pPtrs->aPtr[0].pKey, pPtrs->aPtr[0].nKey, pKey, nKey)
    ){
      iPtr = pPtrs->aPtr[0].iPtr+pPtrs->aPtr[0].iPgPtr;
    }
  }


  /* If this is a separator key and we know that the output pointer has not
  ** changed, there is no point in writing an output record. Otherwise,
  ** proceed. */
  if( rtIsSeparator(eType)==0 || iPtr!=0 ){
    int iSPtr = 0;                /* Separators require a pointer here */

................................................................................
    }
    if( rc==LSM_OK && pMW->bFlush ){
      rc = mergeWorkerBuildHierarchy(pMW);
    }
    if( rc==LSM_OK && pSeg->sep.iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, &pSeg->sep);
    }
    mergeWorkerShutdown(pMW);

    assert( LSM_OK==assertBtreeOk(pDb, &pSeg->run) );







  }
  return rc;
}

static int mergeWorkerDone(MergeWorker *pMW){
  return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
}
................................................................................
      Level *pNext = pLevel->pNext;
      rc = multiCursorAddLevel(pCsr, pNext, MULTICURSOR_ADDLEVEL_LHS_SEP);
    }
    multiCursorReadSeparators(pCsr);
  }else{
    multiCursorIgnoreDelete(pCsr);
  }
  assert( pMerge->nInput==pCsr->nSegCsr );
  pMW->pCsr = pCsr;

  /* Load each of the output pages into memory. */
  if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW, 0);
  if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW, 1);

  /* Position the cursor. */
................................................................................
          pPtr = &pCsr->aSegCsr[i].aPtr[0];
          rc = segmentPtrLoadPage(pDb->pFS, pPtr, pInput->iPg);
          if( rc==LSM_OK && pPtr->nCell>0 ){
            rc = segmentPtrLoadCell(pPtr, pInput->iCell);
          }
        }
      }






      if( rc==LSM_OK ){
        rc = multiCursorSetupTree(pCsr, 0);
      }
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }
................................................................................
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->pTreeCsr);
  }
}





























































































#ifdef LSM_DEBUG_EXPENSIVE

/*
** Argument iPg is a page number within a separators run. Assert() that for
** each key K on on the page, (pKey1 >= K > pKey2) is true. 
**
................................................................................
    assertBtreeRanges(pDb, pRun, iRight, aPrev, nPrev, 0, 0);
  }

  lsmFsPageRelease(pPg);
  sortedBlobFree(&blob);
}

/*
** Check that the array pOne contains the required pointers to pTwo.
** Array pTwo must be a main array. pOne may be either a separators array
** or another main array. 
**
** If an error is encountered, *pzErr is set to point to a buffer containing
** a nul-terminated error message and this function returns immediately. The
** caller should eventually call lsmFree(*pzErr) to free the allocated
** error message buffer.
*/
static void assertPointersOk(
  lsm_db *pDb,                    /* Database handle */
  SortedRun *pOne,                /* Run containing pointers */
  SortedRun *pTwo,                /* Run containing pointer targets */
  int bRhs,                       /* True if pTwo may have been Gobble()d */
  char **pzErr
){
  int rc = LSM_OK;                /* Error code */
  SegmentPtr ptr1;                /* Iterates through pOne */
  SegmentPtr ptr2;                /* Iterates through pTwo */
  Pgno iPrev;

  assert( pOne && pTwo );

  memset(&ptr1, 0, sizeof(ptr1));
  memset(&ptr2, 0, sizeof(ptr1));
  ptr1.pRun = pOne;
  ptr2.pRun = pTwo;
  segmentPtrEndPage(pDb->pFS, &ptr1, 0, &rc);
  segmentPtrEndPage(pDb->pFS, &ptr2, 0, &rc);

  /* Check that the footer pointer of the first page of pOne points to
  ** the first page of pTwo. */
  iPrev = pTwo->iFirst;
  if( ptr1.iPtr!=iPrev && !bRhs ){
    assert( 0 );
  }

  if( rc==LSM_OK && ptr1.nCell>0 ){
    rc = segmentPtrLoadCell(&ptr1, 0);
  }
      
  while( rc==LSM_OK && ptr2.pPg ){
    Pgno iThis;

    /* Advance to the next page of segment pTwo that contains at least
    ** one cell. Break out of the loop if the iterator reaches EOF.  */
    do{
      rc = segmentPtrNextPage(&ptr2, 1);
      assert( rc==LSM_OK );
    }while( rc==LSM_OK && ptr2.pPg && ptr2.nCell==0 );
    if( rc!=LSM_OK || ptr2.pPg==0 ) break;
    iThis = lsmFsPageNumber(ptr2.pPg);

    if( (ptr2.flags & PGFTR_SKIP_THIS_FLAG)==0 ){

      /* Load the first cell in the array pTwo page. */
      rc = segmentPtrLoadCell(&ptr2, 0);

      /* Iterate forwards through pOne, searching for a key that matches the
      ** key ptr2.pKey/nKey. This key should have a pointer to the page that
      ** ptr2 currently points to. */
      while( rc==LSM_OK ){
        int res = rtTopic(ptr1.eType) - rtTopic(ptr2.eType);
        if( res==0 ){
          res = pDb->xCmp(ptr1.pKey, ptr1.nKey, ptr2.pKey, ptr2.nKey);
        }

        if( res<0 ){
          assert( bRhs || ptr1.iPtr+ptr1.iPgPtr==iPrev );
        }else if( res>0 ){
          assert( 0 );
        }else{
          assert( ptr1.iPtr+ptr1.iPgPtr==iThis );
          iPrev = lsmFsPageNumber(ptr2.pPg);
          break;
        }

        rc = segmentPtrAdvance(0, &ptr1, 0);
        if( ptr1.pPg==0 ){
          assert( 0 );
        }
      }
    }
  }

  segmentPtrReset(&ptr1);
  segmentPtrReset(&ptr2);
}

static int countBtreeKeys(FileSystem *pFS, SortedRun *pRun, Pgno iPg){
#if 0
  int rc;
  Page *pPg;
  u8 *aData;
  int nData;
  int flags;







<
|







 







>
>







 







|







 







|







 







>
>
>
>
>
>
>
>
>
>
>
>










|



|












|







 







|












|
|
>
>













>







 







|

|









<







 







>



>







 







<
<
<
<
<
<
<
<
<
<
<
<







 







|







 







>
>
>













>







 







<
<






>
>
>
>
>
>
>
>
>
|
|
|

|
|
|
|
|
|
|
|
|
|
|

|
|
|
|

|
|
|

|
|
|
|
|
|
|
|
>







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<


>
>
>
>
|







 







>
>
>

|







 







>
>
>
>
>







 







>
>
>







 







>



|

|









>
>
>
>







 







>
>



>
>
>
>
|
|
|
|
|
<
|
|
|
>
>
>
>







 







>
>
>
>
>
>
>
>
>
|







>







 







<


>
>
>
>
>
>
>







 







|







 







>
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







264
265
266
267
268
269
270

271
272
273
274
275
276
277
278
...
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
...
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
...
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
...
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
...
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
...
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897

898
899
900
901
902
903
904
...
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
....
1068
1069
1070
1071
1072
1073
1074












1075
1076
1077
1078
1079
1080
1081
....
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
....
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
....
1922
1923
1924
1925
1926
1927
1928


1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
....
2130
2131
2132
2133
2134
2135
2136














2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
....
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
....
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
....
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
....
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
....
3375
3376
3377
3378
3379
3380
3381
3382
3383
3384
3385
3386
3387
3388
3389
3390
3391
3392
3393
3394
3395

3396
3397
3398
3399
3400
3401
3402
3403
3404
3405
3406
3407
3408
3409
....
3427
3428
3429
3430
3431
3432
3433
3434
3435
3436
3437
3438
3439
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454
3455
3456
3457
3458
....
3505
3506
3507
3508
3509
3510
3511

3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
....
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
....
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
....
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631
4632
4633
4634
4635
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
4700
4701
4702
4703
4704
4705
4706
4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719
4720
4721
4722
4723
....
4780
4781
4782
4783
4784
4785
4786


























































































4787
4788
4789
4790
4791
4792
4793
  int *pnHdrLevel;
  void *pSystemVal;
  Snapshot *pSnap;
};

#define CURSOR_DATA_TREE      0
#define CURSOR_DATA_SYSTEM    1

#define CURSOR_DATA_SEGMENT   2


/*
** CURSOR_IGNORE_DELETE
**   If set, this cursor will not visit SORTED_DELETE keys.
**
** CURSOR_NEW_SYSTEM
................................................................................
#ifdef LSM_DEBUG_EXPENSIVE
static void assertAllPointersOk(int rc, lsm_db *pDb);
static void assertAllBtreesOk(int rc, lsm_db *);
#else
# define assertAllPointersOk(y, z)
# define assertAllBtreesOk(y, z)
#endif

static int assertPointersOk(lsm_db *, SortedRun *, SortedRun *, int, char **);

/*
** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
*/
void lsmPutU16(u8 *aOut, u16 nVal){
  aOut[0] = (u8)((nVal>>8) & 0xFF);
  aOut[1] = (u8)(nVal & 0xFF);
................................................................................
** Return the decoded (possibly relative) pointer value stored in cell 
** iCell from page aData/nData.
*/
static int pageGetRecordPtr(u8 *aData, int nData, int iCell){
  int iRet;                       /* Return value */
  u8 *aCell;                      /* Pointer to cell iCell */

  assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
  aCell = pageGetCell(aData, nData, iCell);
  lsmVarintGet32(&aCell[1], &iRet);
  return iRet;
}

static u8 *pageGetKey(
  Page *pPg,                      /* Page to read from */
................................................................................

  }else{
    rc = btreeCursorLoadKey(pCsr);
  }

  if( rc==LSM_OK && pCsr->iPg>=0 ){
    aData = lsmFsPageData(pCsr->aPg[pCsr->iPg].pPage, &nData);
    pCsr->iPtr = btreeCursorPtr(aData, nData, pCsr->aPg[pCsr->iPg].iCell+1);
  }

  return rc;
}

static void btreeCursorFree(BtreeCursor *pCsr){
  if( pCsr ){
................................................................................
    p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
    p->iCell = ((pCsr->aPg[pCsr->iPg].iCell + 1) << 8) + pCsr->nDepth;
  }else{
    p->iPg = 0;
    p->iCell = 0;
  }
}

static int sortedKeyCompare(
  int (*xCmp)(void *, int, void *, int),
  int iLhsTopic, void *pLhsKey, int nLhsKey,
  int iRhsTopic, void *pRhsKey, int nRhsKey
){
  int res = iLhsTopic - iRhsTopic;
  if( res==0 ){
    res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey);
  }
  return res;
}

static int btreeCursorRestore(
  BtreeCursor *pCsr, 
  int (*xCmp)(void *, int, void *, int),
  MergeInput *p
){
  int rc = LSM_OK;
  if( p->iPg ){
    lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
    int iCell;                    /* Current cell number on leaf page */
    Pgno iLeaf;                   /* Page number of current leaf page */
    int nDepth;                   /* Depth of b-tree structure */

    /* Decode the MergeInput structure */
    iLeaf = p->iPg;
    nDepth = (p->iCell & 0x00FF);
    iCell = (p->iCell >> 8) - 1;

    /* Allocate the BtreeCursor.aPg[] array */
    assert( pCsr->aPg==0 );
    pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc);

    /* Populate the last entry of the aPg[] array */
    if( rc==LSM_OK ){
      pCsr->iPg = nDepth-1;
      pCsr->nDepth = nDepth;
      pCsr->aPg[pCsr->iPg].iCell = iCell;
      rc = lsmFsDbPageGet(pCsr->pFS, iLeaf, &pCsr->aPg[nDepth-1].pPage);
    }

    /* Populate any other aPg[] array entries */
    if( rc==LSM_OK && nDepth>1 ){
      Blob blob = {0,0,0};
      void *pSeek;
      int nSeek;
................................................................................
          int iCell;

          aData = lsmFsPageData(pPg, &nData);
          assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );

          iLoad = pageGetPtr(aData, nData);
          iCell = pageGetNRec(aData, nData)-1; 
          iMax = iCell;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
            int iTopic;                   /* Topic for key pKeyT/nKeyT */
            int iPtr;                     /* Pointer for cell iTry */
            int res;                      /* (pSeek - pKeyT) */

            rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob);
            if( rc!=LSM_OK ) break;

            res = sortedKeyCompare(
                xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
            );
            assert( res!=0 );

            if( res<0 ){
              iLoad = iPtr;
              iCell = iTry;
              iMax = iTry-1;
            }else{
              iMin = iTry+1;
            }
          }

          pCsr->aPg[iPg].pPage = pPg;
          pCsr->aPg[iPg].iCell = iCell;
          iPg++;
          assert( iPg!=nDepth-1 || iLoad==iLeaf );
        }
      }while( rc==LSM_OK && iPg<(nDepth-1) );
      sortedBlobFree(&blob);
    }

    /* Load the current key and pointer */
    if( rc==LSM_OK ){
................................................................................
      pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
      if( pBtreePg->iCell<0 ){
        int dummy;
        int i;
        for(i=pCsr->iPg-1; i>=0; i--){
          if( pCsr->aPg[i].iCell>0 ) break;
        }
        assert( i>=0 );
        rc = pageGetBtreeKey(
            pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
            &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
        );
        pCsr->eType |= SORTED_SEPARATOR;

      }else{
        rc = btreeCursorLoadKey(pCsr);
      }
    }
  }

  return rc;
}

static int btreeCursorNew(
  lsm_db *pDb,
  SortedRun *pRun,
  BtreeCursor **ppCsr
................................................................................
       && 0!=pageGetNRec(aData, nData)
      ){
        u8 *pKey;
        int nKey;
        int iTopic;
        pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
        assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
        assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
        rc = btreeCursorNext(pCsr);
      }
    }
    assert( rc!=LSM_OK || pCsr->pKey==0 );

    if( pPg ) lsmFsPageRelease(pPg);

    btreeCursorFree(pCsr);
    sortedBlobFree(&blob);
  }

................................................................................
      );
    }
  }

  return rc;
}













void lsmSortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
  lsm_env *pEnv = pDb->pEnv;      /* Environment handle */
  int rc = *pRc;
  int i;
  Merge *pMerge = pLevel->pMerge;

  for(i=0; rc==LSM_OK && i<pLevel->nRight; i++){
................................................................................
  iPtrOut = pPtr->iPtr;

  /* Assert that this page is the right page of this segment for the key
  ** that we are searching for. Do this by loading page (iPg-1) and testing
  ** that pKey/nKey is greater than all keys on that page, and then by 
  ** loading (iPg+1) and testing that pKey/nKey is smaller than all
  ** the keys it houses.  */
#if 1
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pRun->nSize==1 
       || lsmFsPageNumber(pPtr->pPg)==pPtr->pRun->iLast
  );
................................................................................
  lsmTreeCursorDestroy(pCsr->pTreeCsr);

  /* Close the sorted file cursors */
  for(i=0; i<pCsr->nSegCsr; i++){
    segmentCursorClose(pEnv, &pCsr->aSegCsr[i]);
  }

  /* And the b-tree cursor, if any */
  btreeCursorFree(pCsr->pBtCsr);

  /* Free allocations */
  lsmFree(pEnv, pCsr->aSegCsr);
  lsmFree(pEnv, pCsr->aTree);
  lsmFree(pEnv, pCsr->pSystemVal);

  /* Zero fields */
  pCsr->nSegCsr = 0;
  pCsr->aSegCsr = 0;
  pCsr->nTree = 0;
  pCsr->aTree = 0;
  pCsr->pSystemVal = 0;
  pCsr->pSnap = 0;
  pCsr->pTreeCsr = 0;
  pCsr->pBtCsr = 0;
}

void lsmMCursorClose(MultiCursor *pCsr){
  if( pCsr ){
    lsm_db *pDb = pCsr->pDb;
    MultiCursor **pp;             /* Iterator variable */

................................................................................
*/
int multiCursorAddLevel(
  MultiCursor *pCsr,              /* Multi-cursor to add segment to */ 
  Level *pLevel,                  /* Level to add to multi-cursor merge */
  int eMode                       /* A MULTICURSOR_ADDLEVEL_*** constant */
){
  int rc = LSM_OK;



  assert( eMode==MULTICURSOR_ADDLEVEL_ALL
       || eMode==MULTICURSOR_ADDLEVEL_RHS
       || eMode==MULTICURSOR_ADDLEVEL_LHS_SEP
  );

  if( eMode==MULTICURSOR_ADDLEVEL_LHS_SEP ){
    assert( pLevel->lhs.run.iRoot );
    assert( pCsr->pBtCsr==0 );
    rc = btreeCursorNew(pCsr->pDb, &pLevel->lhs.run, &pCsr->pBtCsr);
    assert( (rc==LSM_OK)==(pCsr->pBtCsr!=0) );
  }else{
    int i;
    int nAdd = (eMode==MULTICURSOR_ADDLEVEL_RHS ? pLevel->nRight : 1);

    for(i=0; i<nAdd; i++){
      LevelCursor *pNew;
      lsm_db *pDb = pCsr->pDb;

      /* Grow the pCsr->aSegCsr array if required */
      if( 0==(pCsr->nSegCsr % 16) ){
        int nByte;
        LevelCursor *aNew;
        nByte = sizeof(LevelCursor) * (pCsr->nSegCsr+16);
        aNew = (LevelCursor *)lsmRealloc(pDb->pEnv, pCsr->aSegCsr, nByte);
        if( aNew==0 ) return LSM_NOMEM_BKPT;
        memset(&aNew[pCsr->nSegCsr], 0, sizeof(LevelCursor)*16);
        pCsr->aSegCsr = aNew;
      }
      pNew = &pCsr->aSegCsr[pCsr->nSegCsr];

      switch( eMode ){
        case MULTICURSOR_ADDLEVEL_ALL:
          rc = levelCursorInit(pDb, pLevel, pCsr->xCmp, pNew);
          break;

        case MULTICURSOR_ADDLEVEL_RHS:
          rc = levelCursorInitRun(pDb, &pLevel->aRhs[i].run, pCsr->xCmp, pNew);
          break;

        case MULTICURSOR_ADDLEVEL_LHS_SEP:
          rc = levelCursorInitRun(pDb, &pLevel->lhs.sep, pCsr->xCmp, pNew);
          break;
      }
      if( pCsr->flags & CURSOR_IGNORE_SYSTEM ){
        pNew->bIgnoreSystem = 1;
      }
      if( rc==LSM_OK ) pCsr->nSegCsr++;
    }
  }

  return rc;
}


static int multiCursorNew(
................................................................................
      else if( pCsr->flags & CURSOR_AT_LEVELS ){
        pKey = (void *)"LEVELS";
        nKey = 6;
        eType = SORTED_SYSTEM_WRITE;
      }
      break;















    default: {
      int iSeg = iKey - CURSOR_DATA_SEGMENT;
      if( iSeg==pCsr->nSegCsr && pCsr->pBtCsr ){
        pKey = pCsr->pBtCsr->pKey;
        nKey = pCsr->pBtCsr->nKey;
        eType = pCsr->pBtCsr->eType;
      }if( iSeg<pCsr->nSegCsr && segmentCursorValid(&pCsr->aSegCsr[iSeg]) ){
        segmentCursorKey(&pCsr->aSegCsr[iSeg], &pKey, &nKey);
        segmentCursorType(&pCsr->aSegCsr[iSeg], &eType);
      }
      break;
    }
  }

................................................................................
  pCsr->aTree[iOut] = iRes;
}

static int multiCursorAllocTree(MultiCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->aTree==0 ){
    int nByte;                    /* Bytes of space to allocate */
    int bBtree;                   /* True if b-tree cursor is present */

    bBtree = (pCsr->pBtCsr!=0);
    pCsr->nTree = 2;
    while( pCsr->nTree<(CURSOR_DATA_SEGMENT+pCsr->nSegCsr+bBtree) ){
      pCsr->nTree = pCsr->nTree*2;
    }

    nByte = sizeof(int)*pCsr->nTree*2;
    pCsr->aTree = (int *)lsmMallocZeroRc(pCsr->pDb->pEnv, nByte, &rc);
  }
  return rc;
................................................................................
  if( pCsr->flags & CURSOR_NEW_SYSTEM ){
    assert( bLast==0 );
    pCsr->flags |= CURSOR_AT_FREELIST;
  }
  for(i=0; rc==LSM_OK && i<pCsr->nSegCsr; i++){
    rc = segmentCursorEnd(&pCsr->aSegCsr[i], bLast);
  }

  if( rc==LSM_OK && pCsr->pBtCsr ){
    assert( bLast==0 );
    rc = btreeCursorFirst(pCsr->pBtCsr);
  }

  if( rc==LSM_OK ){
    rc = multiCursorAllocTree(pCsr);
  }

  if( rc==LSM_OK ){
    for(i=pCsr->nTree-1; i>0; i--){
................................................................................

        if( pCsr->flags & CURSOR_AT_FREELIST ){
          pCsr->flags &= ~CURSOR_AT_FREELIST;
          pCsr->flags |= CURSOR_AT_LEVELS;
        }else{
          pCsr->flags &= ~CURSOR_AT_LEVELS;
        }
      }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nSegCsr) ){
        assert( bReverse==0 && pCsr->pBtCsr );
        rc = btreeCursorNext(pCsr->pBtCsr);
      }else{
        LevelCursor *pLevel = &pCsr->aSegCsr[iKey-CURSOR_DATA_SEGMENT];
        rc = segmentCursorAdvance(pLevel, bReverse);
      }
      if( rc==LSM_OK ){
        int i;
        for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
................................................................................
  MultiCursor *pCsr = pMW->pCsr;

  /* Unless the merge has finished, save the cursor position in the
  ** Merge.aInput[] array. See function mergeWorkerInit() for the 
  ** code to restore a cursor position based on aInput[].  */
  if( pCsr ){
    Merge *pMerge = pMW->pLevel->pMerge;
    int bBtree = (pCsr->pBtCsr!=0);

    /* pMerge->nInput==0 indicates that this is a FlushTree() operation. */
    assert( pMerge->nInput==0 || pMW->pLevel->nRight>0 );
    assert( pMerge->nInput==0 || pMerge->nInput==(pCsr->nSegCsr+bBtree) );

    for(i=0; i<(pMerge->nInput-bBtree); i++){
      SegmentPtr *pPtr = &pCsr->aSegCsr[i].aPtr[0];
      if( pPtr->pPg ){
        pMerge->aInput[i].iPg = lsmFsPageNumber(pPtr->pPg);
        pMerge->aInput[i].iCell = pPtr->iCell;
      }else{
        pMerge->aInput[i].iPg = 0;
        pMerge->aInput[i].iCell = 0;
      }
    }
    if( bBtree && pMerge->nInput ){
      assert( i==pCsr->nSegCsr );
      btreeCursorPosition(pCsr->pBtCsr, &pMerge->aInput[i]);
    }
  }

  lsmMCursorClose(pCsr);
  lsmFsPageRelease(pMW->apPage[0]);
  lsmFsPageRelease(pMW->apPage[1]);

  for(i=0; i<2; i++){
................................................................................
  pMW->apPage[1] = 0;
}

static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc;                         /* Return code */
  SortedRun *pRun;                /* Run containing sep. keys to merge in */
  Page *pPg = 0;                  /* First page of run pRun */
  int iFPtr;                      /* Pointer value read from footer of pPg */
  MultiCursor *pCsr = pMW->pCsr;

  assert( pMW->apPage[0]==0 );

  if( pCsr->pBtCsr ){
    rc = LSM_OK;
    iFPtr = pMW->pLevel->pNext->lhs.run.iFirst;
  }else{
    pRun = pMW->pCsr->aSegCsr[pMW->pCsr->nSegCsr-1].aPtr[0].pRun;
    rc = lsmFsDbPageGet(pMW->pDb->pFS, pRun->iFirst, &pPg);
    if( rc==LSM_OK ){
      u8 *aData;                    /* Buffer for page pPg */
      int nData;                    /* Size of aData[] in bytes */

      aData = lsmFsPageData(pPg, &nData);
      iFPtr = pageGetPtr(aData, nData);
      lsmFsPageRelease(pPg);
    }
  }

  if( rc==LSM_OK ){
    rc = mergeWorkerNextPage(pMW, 0, iFPtr);
  }

  return rc;
}

static int mergeWorkerStep(MergeWorker *pMW){
................................................................................

  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */
  if( pCsr->pBtCsr ){
    BtreeCursor *pBtCsr = pCsr->pBtCsr;
    if( pBtCsr->pKey ){
      int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
      if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
      if( 0==res ) iPtr = pBtCsr->iPtr;

      assert( res>=0 );
    }
  }else if( pCsr->nSegCsr ){
    LevelCursor *pPtrs = &pCsr->aSegCsr[pCsr->nSegCsr-1];
    if( segmentCursorValid(pPtrs)
     && 0==pDb->xCmp(pPtrs->aPtr[0].pKey, pPtrs->aPtr[0].nKey, pKey, nKey)
    ){
      iPtr = pPtrs->aPtr[0].iPtr+pPtrs->aPtr[0].iPgPtr;
    }
  }


  /* If this is a separator key and we know that the output pointer has not
  ** changed, there is no point in writing an output record. Otherwise,
  ** proceed. */
  if( rtIsSeparator(eType)==0 || iPtr!=0 ){
    int iSPtr = 0;                /* Separators require a pointer here */

................................................................................
    }
    if( rc==LSM_OK && pMW->bFlush ){
      rc = mergeWorkerBuildHierarchy(pMW);
    }
    if( rc==LSM_OK && pSeg->sep.iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, &pSeg->sep);
    }


    assert( LSM_OK==assertBtreeOk(pDb, &pSeg->run) );
    if( pMW->pCsr->pBtCsr ){
      assertPointersOk(
          pDb, &pMW->pLevel->lhs.run, &pMW->pLevel->pNext->lhs.run, 0, 0
      );
    }

    mergeWorkerShutdown(pMW);
  }
  return rc;
}

static int mergeWorkerDone(MergeWorker *pMW){
  return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
}
................................................................................
      Level *pNext = pLevel->pNext;
      rc = multiCursorAddLevel(pCsr, pNext, MULTICURSOR_ADDLEVEL_LHS_SEP);
    }
    multiCursorReadSeparators(pCsr);
  }else{
    multiCursorIgnoreDelete(pCsr);
  }
  assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nSegCsr+(pCsr->pBtCsr!=0)) );
  pMW->pCsr = pCsr;

  /* Load each of the output pages into memory. */
  if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW, 0);
  if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW, 1);

  /* Position the cursor. */
................................................................................
          pPtr = &pCsr->aSegCsr[i].aPtr[0];
          rc = segmentPtrLoadPage(pDb->pFS, pPtr, pInput->iPg);
          if( rc==LSM_OK && pPtr->nCell>0 ){
            rc = segmentPtrLoadCell(pPtr, pInput->iCell);
          }
        }
      }

      if( rc==LSM_OK && pCsr->pBtCsr ){
        assert( i==pCsr->nSegCsr );
        rc = btreeCursorRestore(pCsr->pBtCsr, pCsr->xCmp, &pMerge->aInput[i]);
      }

      if( rc==LSM_OK ){
        rc = multiCursorSetupTree(pCsr, 0);
      }
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }
................................................................................
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->pTreeCsr);
  }
}

/*
** Check that the array pOne contains the required pointers to pTwo.
** Array pTwo must be a main array. pOne may be either a separators array
** or another main array. 
**
** If an error is encountered, *pzErr is set to point to a buffer containing
** a nul-terminated error message and this function returns immediately. The
** caller should eventually call lsmFree(*pzErr) to free the allocated
** error message buffer.
*/
static int assertPointersOk(
  lsm_db *pDb,                    /* Database handle */
  SortedRun *pOne,                /* Run containing pointers */
  SortedRun *pTwo,                /* Run containing pointer targets */
  int bRhs,                       /* True if pTwo may have been Gobble()d */
  char **pzErr
){
  int rc = LSM_OK;                /* Error code */
  SegmentPtr ptr1;                /* Iterates through pOne */
  SegmentPtr ptr2;                /* Iterates through pTwo */
  Pgno iPrev;

  assert( pOne && pTwo );

  memset(&ptr1, 0, sizeof(ptr1));
  memset(&ptr2, 0, sizeof(ptr1));
  ptr1.pRun = pOne;
  ptr2.pRun = pTwo;
  segmentPtrEndPage(pDb->pFS, &ptr1, 0, &rc);
  segmentPtrEndPage(pDb->pFS, &ptr2, 0, &rc);

  /* Check that the footer pointer of the first page of pOne points to
  ** the first page of pTwo. */
  iPrev = pTwo->iFirst;
  if( ptr1.iPtr!=iPrev && !bRhs ){
    assert( 0 );
  }

  if( rc==LSM_OK && ptr1.nCell>0 ){
    rc = segmentPtrLoadCell(&ptr1, 0);
  }
      
  while( rc==LSM_OK && ptr2.pPg ){
    Pgno iThis;

    /* Advance to the next page of segment pTwo that contains at least
    ** one cell. Break out of the loop if the iterator reaches EOF.  */
    do{
      rc = segmentPtrNextPage(&ptr2, 1);
      assert( rc==LSM_OK );
    }while( rc==LSM_OK && ptr2.pPg && ptr2.nCell==0 );
    if( rc!=LSM_OK || ptr2.pPg==0 ) break;
    iThis = lsmFsPageNumber(ptr2.pPg);

    if( (ptr2.flags & (PGFTR_SKIP_THIS_FLAG|SEGMENT_BTREE_FLAG))==0 ){

      /* Load the first cell in the array pTwo page. */
      rc = segmentPtrLoadCell(&ptr2, 0);

      /* Iterate forwards through pOne, searching for a key that matches the
      ** key ptr2.pKey/nKey. This key should have a pointer to the page that
      ** ptr2 currently points to. */
      while( rc==LSM_OK ){
        int res = rtTopic(ptr1.eType) - rtTopic(ptr2.eType);
        if( res==0 ){
          res = pDb->xCmp(ptr1.pKey, ptr1.nKey, ptr2.pKey, ptr2.nKey);
        }

        if( res<0 ){
          assert( bRhs || ptr1.iPtr+ptr1.iPgPtr==iPrev );
        }else if( res>0 ){
          assert( 0 );
        }else{
          assert( ptr1.iPtr+ptr1.iPgPtr==iThis );
          iPrev = iThis;
          break;
        }

        rc = segmentPtrAdvance(0, &ptr1, 0);
        if( ptr1.pPg==0 ){
          assert( 0 );
        }
      }
    }
  }

  segmentPtrReset(&ptr1);
  segmentPtrReset(&ptr2);
  return LSM_OK;
}


#ifdef LSM_DEBUG_EXPENSIVE

/*
** Argument iPg is a page number within a separators run. Assert() that for
** each key K on on the page, (pKey1 >= K > pKey2) is true. 
**
................................................................................
    assertBtreeRanges(pDb, pRun, iRight, aPrev, nPrev, 0, 0);
  }

  lsmFsPageRelease(pPg);
  sortedBlobFree(&blob);
}



























































































static int countBtreeKeys(FileSystem *pFS, SortedRun *pRun, Pgno iPg){
#if 0
  int rc;
  Page *pPg;
  u8 *aData;
  int nData;
  int flags;