SQLite4
Check-in [97fe6585f1]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Experimentally try embedding b-tree pages in sorted runs instead of creating an external separators run. This slows down the rate at which blocks can be cycled during incremental merging, but also makes database writes more contiguous.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | embedded-btree
Files: files | file ages | folders
SHA1: 97fe6585f179b1830906b995e8c794b43e41669f
User & Date: dan 2012-06-21 20:22:07
Context
2012-06-22
17:57
Add functions to iterate through b-tree keys. Also a complex assert statement to check that b-trees are correct with respect to the main run content. check-in: 29d761c438 user: dan tags: embedded-btree
2012-06-21
20:22
Experimentally try embedding b-tree pages in sorted runs instead of creating an external separators run. This slows down the rate at which blocks can be cycled during incremental merging, but also makes database writes more contiguous. check-in: 97fe6585f1 user: dan tags: embedded-btree
19:13
Get the amalgmation build working. check-in: 2d7bf1c174 user: drh tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
**   is set to a negative value.
*/
struct Merge {
  int nInput;                     /* Number of input runs being merged */
  MergeInput *aInput;             /* Array nInput entries in size */
  int nSkip;                      /* Number of separators entries to skip */
  int aiOutputOff[2];             /* Write offsets on run output pages */
  int bHierReadonly;              /* True if b-tree heirarchy is read-only */
};
struct MergeInput {
  Pgno iPg;                       /* Page on which next input is stored */
  int iCell;                      /* Cell containing next input to merge */
};

/* 







|







249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
**   is set to a negative value.
*/
struct Merge {
  int nInput;                     /* Number of input runs being merged */
  MergeInput *aInput;             /* Array nInput entries in size */
  int nSkip;                      /* Number of separators entries to skip */
  int aiOutputOff[2];             /* Write offsets on run output pages */
  int abHierReadonly[2];          /* True if b-tree heirarchies are read-only */
};
struct MergeInput {
  Pgno iPg;                       /* Page on which next input is stored */
  int iCell;                      /* Cell containing next input to merge */
};

/* 

Changes to src/lsm_ckpt.c.

221
222
223
224
225
226
227

228
229
230
231
232

233

234
235
236
237
238
239
240
...
390
391
392
393
394
395
396

397
398
399
400
401

402
403
404
405
406
407
408
...
419
420
421
422
423
424
425
426

427
428
429
430
431
432
433
...
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
...
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
  int *piOut, 
  int *pRc
){
  int iOut = *piOut;

  ckptSetValue(p, iOut++, pSeg->run.iFirst, pRc);
  ckptSetValue(p, iOut++, pSeg->run.iLast, pRc);

  ckptSetValue(p, iOut++, pSeg->run.nSize, pRc);
  if( segmentHasSeparators(pSeg) ){
    ckptSetValue(p, iOut++, pSeg->sep.iFirst, pRc);
    ckptSetValue(p, iOut++, pSeg->sep.iLast, pRc);
    ckptSetValue(p, iOut++, pSeg->sep.iRoot, pRc);

  }else{

    ckptSetValue(p, iOut++, 0, pRc);
    ckptSetValue(p, iOut++, 0, pRc);
    ckptSetValue(p, iOut++, 0, pRc);
  }

  *piOut = iOut;
}
................................................................................
  assert( pSegment->run.iFirst==0 && pSegment->run.iLast==0 );
  assert( pSegment->run.nSize==0 && pSegment->run.iRoot==0 );
  assert( pSegment->sep.iFirst==0 && pSegment->sep.iLast==0 );
  assert( pSegment->sep.nSize==0 && pSegment->sep.iRoot==0 );

  pSegment->run.iFirst = aIn[iIn++];
  pSegment->run.iLast = aIn[iIn++];

  pSegment->run.nSize = aIn[iIn++];
  pSegment->sep.iFirst = aIn[iIn++];
  pSegment->sep.iLast = aIn[iIn++];
  pSegment->sep.iRoot = aIn[iIn++];
  if( pSegment->sep.iFirst ) pSegment->sep.nSize = 1;


  *piIn = iIn;
}

static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){
  Merge *pMerge;                  /* Allocated Merge object */
  int nInput;                     /* Number of input segments in merge */
................................................................................

  /* Populate the Merge object. */
  pMerge->aInput = (MergeInput *)&pMerge[1];
  pMerge->nInput = nInput;
  pMerge->aiOutputOff[0] = -1;
  pMerge->aiOutputOff[1] = -1;
  pMerge->nSkip = (int)aInt[iIn++];
  pMerge->bHierReadonly = 1;

  for(i=0; i<nInput; i++){
    pMerge->aInput[i].iPg = (Pgno)aInt[iIn++];
    pMerge->aInput[i].iCell = (int)aInt[iIn++];
  }

  /* Set *piIn and return LSM_OK. */
  *piIn = iIn;
................................................................................
      nLevel = (int)aInt[CKPT_HDR_NLEVEL];
      lsmSnapshotSetNBlock(pSnap, (int)aInt[CKPT_HDR_NBLOCK]);
      lsmDbSetPagesize(pDb,(int)aInt[CKPT_HDR_PGSZ],(int)aInt[CKPT_HDR_BLKSZ]);

      /* Import log offset */
      ckptImportLog(aInt, &iIn, lsmDatabaseLog(pDb));

      /* Import each level. This loop runs once for each db level. */
      *pRc = ckptLoadLevels(pDb, aInt, &iIn, nLevel, &pTopLevel);
      lsmDbSnapshotSetLevel(pSnap, pTopLevel);

      /* Import the freelist delta */
      aDelta = lsmFreelistDeltaPtr(pDb);
      for(i=0; i<LSM_FREELIST_DELTA_SIZE; i++){
        aDelta[i] = aInt[iIn++];
................................................................................
int lsmCheckpointLevels(
  lsm_db *pDb,                    /* Database handle */
  int *pnHdrLevel,                /* OUT: Levels to write to db header */
  void **paVal,                   /* OUT: Pointer to LEVELS blob */
  int *pnVal                      /* OUT: Size of LEVELS blob in bytes */
){
  int rc = LSM_OK;                /* Return code */
  const int SEGMENT_SIZE = 6;     /* Size of a checkpoint segment record */
  Level *p;                       /* Used to iterate through levels */
  int nFree;                      /* Free integers remaining in db header */
  int nHdr = 0;                   /* Number of levels stored in db header */
  int nLevels = 0;                /* Number of levels stored in LEVELS */
 
  /* Number of free integers - 1024 less those used by the checkpoint header,
  ** less the 4 used for the log-pointer, less the 3 used for the 







>





>

>







 







>




|
>







 







|
>







 







|







 







|







221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
...
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
...
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
...
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
...
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
  int *piOut, 
  int *pRc
){
  int iOut = *piOut;

  ckptSetValue(p, iOut++, pSeg->run.iFirst, pRc);
  ckptSetValue(p, iOut++, pSeg->run.iLast, pRc);
  ckptSetValue(p, iOut++, pSeg->run.iRoot, pRc);
  ckptSetValue(p, iOut++, pSeg->run.nSize, pRc);
  if( segmentHasSeparators(pSeg) ){
    ckptSetValue(p, iOut++, pSeg->sep.iFirst, pRc);
    ckptSetValue(p, iOut++, pSeg->sep.iLast, pRc);
    ckptSetValue(p, iOut++, pSeg->sep.iRoot, pRc);
    ckptSetValue(p, iOut++, pSeg->sep.nSize, pRc);
  }else{
    ckptSetValue(p, iOut++, 0, pRc);
    ckptSetValue(p, iOut++, 0, pRc);
    ckptSetValue(p, iOut++, 0, pRc);
    ckptSetValue(p, iOut++, 0, pRc);
  }

  *piOut = iOut;
}
................................................................................
  assert( pSegment->run.iFirst==0 && pSegment->run.iLast==0 );
  assert( pSegment->run.nSize==0 && pSegment->run.iRoot==0 );
  assert( pSegment->sep.iFirst==0 && pSegment->sep.iLast==0 );
  assert( pSegment->sep.nSize==0 && pSegment->sep.iRoot==0 );

  pSegment->run.iFirst = aIn[iIn++];
  pSegment->run.iLast = aIn[iIn++];
  pSegment->run.iRoot = aIn[iIn++];
  pSegment->run.nSize = aIn[iIn++];
  pSegment->sep.iFirst = aIn[iIn++];
  pSegment->sep.iLast = aIn[iIn++];
  pSegment->sep.iRoot = aIn[iIn++];
  pSegment->sep.nSize = aIn[iIn++];
  if( pSegment->sep.iFirst && pSegment->sep.nSize==0 ) pSegment->sep.nSize = 1;

  *piIn = iIn;
}

static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){
  Merge *pMerge;                  /* Allocated Merge object */
  int nInput;                     /* Number of input segments in merge */
................................................................................

  /* Populate the Merge object. */
  pMerge->aInput = (MergeInput *)&pMerge[1];
  pMerge->nInput = nInput;
  pMerge->aiOutputOff[0] = -1;
  pMerge->aiOutputOff[1] = -1;
  pMerge->nSkip = (int)aInt[iIn++];
  pMerge->abHierReadonly[0] = 1;
  pMerge->abHierReadonly[1] = 1;
  for(i=0; i<nInput; i++){
    pMerge->aInput[i].iPg = (Pgno)aInt[iIn++];
    pMerge->aInput[i].iCell = (int)aInt[iIn++];
  }

  /* Set *piIn and return LSM_OK. */
  *piIn = iIn;
................................................................................
      nLevel = (int)aInt[CKPT_HDR_NLEVEL];
      lsmSnapshotSetNBlock(pSnap, (int)aInt[CKPT_HDR_NBLOCK]);
      lsmDbSetPagesize(pDb,(int)aInt[CKPT_HDR_PGSZ],(int)aInt[CKPT_HDR_BLKSZ]);

      /* Import log offset */
      ckptImportLog(aInt, &iIn, lsmDatabaseLog(pDb));

      /* Import all levels stored in the checkpoint. */
      *pRc = ckptLoadLevels(pDb, aInt, &iIn, nLevel, &pTopLevel);
      lsmDbSnapshotSetLevel(pSnap, pTopLevel);

      /* Import the freelist delta */
      aDelta = lsmFreelistDeltaPtr(pDb);
      for(i=0; i<LSM_FREELIST_DELTA_SIZE; i++){
        aDelta[i] = aInt[iIn++];
................................................................................
int lsmCheckpointLevels(
  lsm_db *pDb,                    /* Database handle */
  int *pnHdrLevel,                /* OUT: Levels to write to db header */
  void **paVal,                   /* OUT: Pointer to LEVELS blob */
  int *pnVal                      /* OUT: Size of LEVELS blob in bytes */
){
  int rc = LSM_OK;                /* Return code */
  const int SEGMENT_SIZE = 8;     /* Size of a checkpoint segment record */
  Level *p;                       /* Used to iterate through levels */
  int nFree;                      /* Free integers remaining in db header */
  int nHdr = 0;                   /* Number of levels stored in db header */
  int nLevels = 0;                /* Number of levels stored in LEVELS */
 
  /* Number of free integers - 1024 less those used by the checkpoint header,
  ** less the 4 used for the log-pointer, less the 3 used for the 

Changes to src/lsm_sorted.c.

6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
231
232
233
234
235
236
237






























238
239
240
241
242
243
244
...
269
270
271
272
273
274
275







276
277
278
279
280



281
282


283
284
285
286
287
288
289
....
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124

1125
1126
1127
1128
1129
1130
1131
1132

1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
....
1186
1187
1188
1189
1190
1191
1192

1193
1194
1195


















1196
1197

1198
1199
1200

1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
....
1216
1217
1218
1219
1220
1221
1222






1223
1224
1225
1226
1227
1228
1229
1230
1231


1232
1233
1234
1235
1236
1237
1238
....
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
....
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
....
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
....
2199
2200
2201
2202
2203
2204
2205
2206
2207



2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
....
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272



2273
2274

2275



2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
....
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
....
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374

2375
2376
2377
2378
2379
2380
2381
....
2383
2384
2385
2386
2387
2388
2389



2390
2391


2392
2393
2394



2395
2396
2397
2398
2399

2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
....
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
....
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
....
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
....
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774

2775
2776
2777
2778
2779
2780
2781
....
2854
2855
2856
2857
2858
2859
2860

2861



2862
2863
2864


2865

2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
....
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
....
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
....
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108

3109
3110
3111
3112
3113
3114
3115
....
3439
3440
3441
3442
3443
3444
3445

3446

3447


3448
3449
3450
3451
3452
3453
3454
....
3474
3475
3476
3477
3478
3479
3480

3481
3482
3483
3484
3485
3486
3487
....
4061
4062
4063
4064
4065
4066
4067
4068

4069
4070
4071
4072
4073
4074
4075
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
**
** SORTED FILE FORMAT:
**
**   A sorted file is divided into pages. The page-size is not stored anywhere
**   within the sorted file itself - it must be known in advance in order to
**   read the file. The maximum allowed page-size is 64KB.
**
** PAGE FORMAT:
**
**   The maximum page size is 65536 bytes.
**
**   Since all records are equal to or larger than 2 bytes in size, and 
**   some space within the page is consumed by the page footer, there must
**   be less than 2^15 records on each page.
................................................................................
  int nTree;
  int *aTree;

  int *pnHdrLevel;
  void *pSystemVal;
  Snapshot *pSnap;
};































/*
** CURSOR_IGNORE_DELETE
**   If set, this cursor will not visit SORTED_DELETE keys.
**
** CURSOR_NEW_SYSTEM
**   If set, then after all user data from the in-memory tree and any other
................................................................................
#define CURSOR_AT_FREELIST      0x00000004
#define CURSOR_AT_LEVELS        0x00000008
#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040

typedef struct MergeWorker MergeWorker;







struct MergeWorker {
  lsm_db *pDb;                    /* Database handle */
  Level *pLevel;                  /* Worker snapshot Level being merged */
  MultiCursor *pCsr;              /* Cursor to read new segment contents from */
  int bFlush;                     /* True if this is an in-memory tree flush */



  Page **apHier;                  /* Separators array b-tree internal nodes */
  int nHier;                      /* Number of entries in apHier[] */


  Page *apPage[2];                /* Current output pages (0 is main run) */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static void assertAllPointersOk(int rc, lsm_db *pDb);
static void assertAllBtreesOk(int rc, lsm_db *);
................................................................................
    );
    if( res ) iBest = i;
  }

  pCsr->iCurrentPtr = iBest;
}

static int seekInSeparators(
  LevelCursor *pCsr,
  SegmentPtr *pPtr,               /* Segment to seek within */
  void *pKey, int nKey,           /* Key to seek to */
  int *piPtr                      /* OUT: FC pointer */
){
  int rc;
  int iPg;

  Blob blob = {0, 0, 0};
  int iTopic = 0;                 /* TODO: Fix me */
  SortedRun *pSep = &pPtr->pSeg->sep;

  iPg = pSep->iRoot;
  do {
    Page *pPg;
    rc = lsmFsDbPageGet(pCsr->pFS, iPg, &pPg);

    if( rc==LSM_OK ){
      u8 *aData;                  /* Buffer containing page data */
      int nData;                  /* Size of aData[] in bytes */
      int iMin;
      int iMax;
      int nRec;
      int flags;

      aData = lsmFsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( (flags & SEGMENT_BTREE_FLAG)==0 ){
        lsmFsPageRelease(pPg);
        break;
      }

      iPg = pageGetPtr(aData, nData);
      nRec = pageGetNRec(aData, nData);

      iMin = 0;
      iMax = nRec-1;
      while( iMax>=iMin ){
................................................................................
          iMax = iTry-1;
        }else{
          iMin = iTry+1;
        }
        lsmFsPageRelease(pRef);
      }
      lsmFsPageRelease(pPg);

    }
  }while( rc==LSM_OK );



















  if( rc==LSM_OK ){
    assert( pPtr->pRun==&pPtr->pSeg->run );

    pPtr->pRun = pSep;
    rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPg);
    if( rc==LSM_OK ){

      rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, 0, piPtr);
    }
    pPtr->pRun = &pPtr->pSeg->run;
  }

  sortedBlobFree(&blob);
  return rc;
}

static int seekInSegment(
  LevelCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
................................................................................
  int *piPtr                      /* OUT: FC pointer */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  assert( pPtr->pRun==&pPtr->pSeg->run );







  if( segmentHasSeparators(pPtr->pSeg) ){
    rc = seekInSeparators(pCsr, pPtr, pKey, nKey, &iPtr);
  }else if( iPtr==0 ){
    iPtr = pPtr->pSeg->run.iFirst;
  }

  if( rc==LSM_OK ){
    rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPtr);
  }


  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr);
  }
  return rc;
}

/*
................................................................................
    lsmFree(pDb->pEnv, pCsr);
  }
}

#define MULTICURSOR_ADDLEVEL_ALL 1
#define MULTICURSOR_ADDLEVEL_RHS 2
#define MULTICURSOR_ADDLEVEL_LHS_SEP 3
#define MULTICURSOR_ADDLEVEL_RHS_SEP 4

/*
** Add segments belonging to level pLevel to the multi-cursor pCsr. The
** third argument must be one of the following:
**
**   MULTICURSOR_ADDLEVEL_ALL
**     Add all segments in the level to the cursor.
................................................................................
**   MULTICURSOR_ADDLEVEL_RHS
**     Add only the rhs segments in the level to the cursor.
**
**   MULTICURSOR_ADDLEVEL_LHS_SEP
**     Add only the lhs segment. And iterate through its separators array,
**     not the main run array.
**
**   MULTICURSOR_ADDLEVEL_RHS_SEP
**     Add only the first segment from the rhs. And iterate through its 
**     separators array, not the main run array.
**
** RHS and SEP are only used by cursors created to use as data sources when
** creating new segments (either when flushing the in-memory tree to disk or
** when merging existing runs).
*/
int multiCursorAddLevel(
  MultiCursor *pCsr,              /* Multi-cursor to add segment to */ 
  Level *pLevel,                  /* Level to add to multi-cursor merge */
................................................................................
  }

  assert( (rc==LSM_OK)==(pCsr!=0) );
  *ppCsr = pCsr;
  return rc;
}

#define CURSOR_DATA_TREE      0
#define CURSOR_DATA_SYSTEM    1
#define CURSOR_DATA_SEGMENT   2

static void multiCursorGetKey(
  MultiCursor *pCsr, 
  int iKey,
  int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
  void **ppKey,                   /* OUT: Pointer to buffer containing key */
  int *pnKey                      /* OUT: Size of *ppKey in bytes */
){
................................................................................
** This function copies all such b-tree pages to new locations, so that
** they can be modified as required.
**
** The complication is that not all database pages are the same size - due
** to the way the file.c module works some (the first and last in each block)
** are 4 bytes smaller than the others.
*/
static int mergeWorkerMoveHierarchy(MergeWorker *pMW){
  SortedRun *pSep;                /* Separators run being modified */



  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc = LSM_OK;                /* Return code */
  int i;
  int iRight = 0;
  int nHier = pMW->nHier;
  Page **apHier = pMW->apHier;

  assert( nHier>0 && pMW->pLevel->pMerge->bHierReadonly );

  pSep = &pMW->pLevel->lhs.sep;

  for(i=0; rc==LSM_OK && i<nHier; i++){
    Page *pNew = 0;
    rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSep, &pNew);
    assert( rc==LSM_OK );

    if( rc==LSM_OK ){
      u8 *a1; int n1;
      u8 *a2; int n2;

      a1 = lsmFsPageData(pNew, &n1);
................................................................................
        lsmFsPageRelease(pNew);
      }
    }
  }

#ifdef LSM_DEBUG
  if( rc==LSM_OK ){
    for(i=0; i<nHier; i++) assert( lsmFsPageWritable(pMW->apHier[i]) );
  }
#endif

  if( rc==LSM_OK ){
    pMW->pLevel->pMerge->bHierReadonly = 0;
  }
  return rc;
}

/*
** Allocate and populate the MergeWorker.apHier[] array.
*/
static int mergeWorkerLoadHierarchy(MergeWorker *pMW){



  int rc = LSM_OK;
  SortedRun *pSep = &pMW->pLevel->lhs.sep;





  if( pMW->apHier==0 && pSep->iRoot!=0 ){
    int bHierReadonly = pMW->pLevel->pMerge->bHierReadonly;
    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = pSep->iRoot;

    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

................................................................................
      }else{
        lsmFsPageRelease(pPg);
        break;
      }
    }while( 1 );

    if( rc==LSM_OK ){
      pMW->nHier = nHier;
      pMW->apHier = apHier;
    }else{
      int i;
      for(i=0; i<nHier; i++){
        lsmFsPageRelease(apHier[i]);
      }
      lsmFree(pEnv, apHier);
    }
................................................................................
** The reason for having the page footer pointer point to the right-child
** (instead of the left) is that doing things this way makes the 
** segWriterMoveHierarchy() operation less complicated (since the pointers 
** that need to be updated are all stored as fixed-size integers within the 
** page footer, not varints in page records).
**
** Records may not span b-tree pages. If this function is called to add a
** record larger than (page-size / 4) bytes, then a pointer to the separators
** array page that contains the main record is added to the b-tree instead.
** In this case the record format is:
**
**         + 0x00 byte (1 byte) 
**         + Absolute pointer value (varint),
**         + Absolute page number of page containing key (varint).
**
** See function seekInSeparators() for the code that traverses b-tree pages.
*/
static int mergeWorkerPushHierarchy(
  MergeWorker *pMW,               /* Merge worker object */

  Pgno iKeyPg,                    /* Page that will contain pKey/nKey */
  int iTopic,                     /* Topic value for this key */
  void *pKey,                     /* Pointer to key buffer */
  int nKey                        /* Size of pKey buffer in bytes */
){
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc;                         /* Return Code */
................................................................................
  int nData;                      /* Size of aData[] in bytes */
  u8 *aData;                      /* Page data for level iLevel */
  int iOff;                       /* Offset on b-tree page to write record to */
  int nRec;                       /* Initial number of records on b-tree page */
  Pgno iPtr;                      /* Pointer value to accompany pKey/nKey */
  int bIndirect;                  /* True to use an indirect record */




  /* If there exists a b-tree hierarchy and it is not loaded into 
  ** memory, load it now.  */


  rc = mergeWorkerLoadHierarchy(pMW);

  /* TODO: What the heck does this do? */



  if( pMW->nHier ){
    aData = lsmFsPageData(pMW->apHier[0], &nData);
    iPtr = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]);
  }else{
    iPtr = pMW->pLevel->lhs.sep.iFirst;

  }

  if( pMW->nHier && pMW->pLevel->pMerge->bHierReadonly ){
    rc = mergeWorkerMoveHierarchy(pMW);
    if( rc!=LSM_OK ) goto push_hierarchy_out;
  }

  /* Determine if the indirect format should be used. */
  bIndirect = (nKey*4 > lsmFsPageSize(pMW->pDb->pFS));

  /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
................................................................................
  ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
  ** root page.
  **
  ** This loop searches for a node with enough space to store the key on,
  ** starting with the leaf and iterating up towards the root. When the loop
  ** exits, the key may be written to apHier[iLevel].
  */
  for(iLevel=0; iLevel<=pMW->nHier; iLevel++){
    int nByte;                    /* Number of free bytes required */
    int iRight;                   /* Right hand pointer from aData[]/nData */

    if( iLevel==pMW->nHier ){
      /* Extend the array and allocate a new root page. */
      Page **aNew;
      aNew = (Page **)lsmRealloc(
          pMW->pDb->pEnv, pMW->apHier, sizeof(Page *)*(pMW->nHier+1)
      );
      if( !aNew ){
        rc = LSM_NOMEM_BKPT;
        goto push_hierarchy_out;
      }
      pMW->apHier = aNew;
    }else{
      int nFree;

      /* If the key will fit on this page, break out of the loop. */
      assert( lsmFsPageWritable(pMW->apHier[iLevel]) );
      aData = lsmFsPageData(pMW->apHier[iLevel], &nData);
      iRight = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]);
      if( bIndirect ){
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
      if( nByte<=nFree ) break;

      /* Otherwise, it is full. Release it. */
      iPtr = lsmFsPageNumber(pMW->apHier[iLevel]);
      rc = lsmFsPageRelease(pMW->apHier[iLevel]);
    }

    /* Allocate a new page for apHier[iLevel]. */
    pMW->apHier[iLevel] = 0;
    if( rc==LSM_OK ){
      rc = lsmFsSortedAppend(
          pDb->pFS, pDb->pWorker, &pMW->pLevel->lhs.sep, &pMW->apHier[iLevel]
      );
    }
    if( rc!=LSM_OK ) goto push_hierarchy_out;

    aData = lsmFsPageData(pMW->apHier[iLevel], &nData);
    memset(aData, 0, nData);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    if( iLevel>0 ){
      iRight = lsmFsPageNumber(pMW->apHier[iLevel-1]);
      lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
    }

    if( iLevel==pMW->nHier ){
      pMW->nHier++;
      break;
    }
  }

  /* Write the key into page apHier[iLevel]. */
  aData = lsmFsPageData(pMW->apHier[iLevel], &nData);

  iOff = mergeWorkerPageOffset(aData, nData);

  nRec = pageGetNRec(aData, nData);
  lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], iOff);
  lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1);

................................................................................
    aData[iOff++] = (u8)(iTopic | SORTED_SEPARATOR);
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  if( iLevel>0 ){
    int iRight = lsmFsPageNumber(pMW->apHier[iLevel-1]);
    lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
  }

  /* Write the right-hand pointer of the right-most leaf page of the 
  ** b-tree heirarchy. */
  aData = lsmFsPageData(pMW->apHier[0], &nData);
  lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iKeyPg);

  /* Ensure that the SortedRun.iRoot field is correct. */
  pMW->pLevel->lhs.sep.iRoot = lsmFsPageNumber(pMW->apHier[pMW->nHier-1]);

push_hierarchy_out:
  return rc;
}

/*
** The merge-worker object passed as the first argument to this function
................................................................................
        if( pageGetNRec(aData, nData)>0 ){
          u8 *pKey;
          int nKey;
          int iTopic;
          Pgno iPg = lsmFsPageNumber(pPg);

          pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
          rc = mergeWorkerPushHierarchy(pMW, iPg, iTopic, pKey, nKey);
        }
      }
    }

    if( pMW->nHier>0 ){
      Page *pRoot = pMW->apHier[pMW->nHier-1];
      pRun->iRoot = lsmFsPageNumber(pRoot);
    }else{
      pRun->iRoot = pRun->iFirst;
    }

    lsmFsPageRelease(pPg);
    sortedBlobFree(&blob);
................................................................................
  **
  **   * If currently writing the separators array, push a copy of the key
  **     into the b-tree hierarchy.
  */
  if( rc==LSM_OK && nRec==0 && pRun->iFirst!=pRun->iLast ){
    assert( pMerge->nSkip>=0 );

    if( bSep ){
      if( pMW->bFlush==0 ){
        Pgno iPg = lsmFsPageNumber(pPg);
        rc = mergeWorkerPushHierarchy(pMW, iPg, rtTopic(eType), pKey, nKey);
      }
    }else{

      if( pMerge->nSkip ){
        pMerge->nSkip--;
        flags = PGFTR_SKIP_THIS_FLAG;
      }else{
        *piPtrOut = lsmFsPageNumber(pPg);
        pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
      }
................................................................................
      }
    }
  }

  lsmMCursorClose(pCsr);
  lsmFsPageRelease(pMW->apPage[0]);
  lsmFsPageRelease(pMW->apPage[1]);

  for(i=0; i<pMW->nHier; i++){



    lsmFsPageRelease(pMW->apHier[i]);
  }
  lsmFree(pMW->pDb->pEnv, pMW->apHier);




  pMW->pCsr = 0;
  pMW->apHier = 0;
  pMW->nHier = 0;
  pMW->apPage[0] = 0;
  pMW->apPage[1] = 0;
}

static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc;                         /* Return code */
  SortedRun *pRun;                /* Run containing sep. keys to merge in */
................................................................................
  lsm_db *pDb,                    /* Connection handle */
  int *pnHdrLevel                 /* OUT: Number of levels not stored in LSM */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  SortedRun *pDel = 0;
  int iLeftPtr = 0;

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);

  /* Create a cursor to gather the data required by the new segment. The new
................................................................................

            /* This call moves any blocks occupied by separators array pDel 
            ** to the pending list. We do this here, even though pDel will be 
            ** read while building the new level, so that the blocks will be 
            ** included in the "FREELIST" entry visited by the cursor (and 
            ** written into the new top level).  */
            if( rc==LSM_OK ){
              pDel = &pNext->lhs.sep;
              rc = lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 0, pDel);
            }
          }
          iLeftPtr = pNext->lhs.run.iFirst;
        }
      }else{
        /* The new level will be the only level in the LSM. There is no reason
         ** to write out delete keys in this case.  */
................................................................................
  }
  lsmFreelistDeltaEnd(pDb);

  /* Link the new level into the top of the tree. Delete the separators
  ** array (if any) that was merged into the new level. */
  if( rc==LSM_OK ){
    if( pDel ){
      /* lsmFsSortedDelete() has already been called on pDel. So all
      ** that is required here is to zero it (so that it is not used by
      ** future LSM searches). */
      memset(pDel, 0, sizeof(SortedRun));

    }
  }else{
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }

  if( rc==LSM_OK ){
................................................................................
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
      */
      if( rc==LSM_OK ){
        if( mergeWorkerDone(&mergeworker)==0 ){
          int iGobble = mergeworker.pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
          if( iGobble<pLevel->nRight ){

            SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[iGobble].aPtr[0];

            if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){


              lsmFsGobble(pWorker, pGobble->pRun, pGobble->pPg);
            }
          }

        }else if( pLevel->lhs.run.iFirst==0 ){
          /* If the new level is completely empty, remove it from the 
          ** database snapshot. This can only happen if all input keys were
................................................................................
          int i;

          /* Free the separators of the next level, if required. */
          if( pLevel->pMerge->nInput > pLevel->nRight ){
            assert( pLevel->pNext );
            assert( segmentHasSeparators(&pLevel->pNext->lhs) );
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->pNext->lhs.sep);

          }

          /* Free the right-hand-side of pLevel */
          for(i=0; i<pLevel->nRight; i++){
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i].run);
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i].sep);
          }
................................................................................

  assert( pDb->pWorker );
  for(p=lsmDbSnapshotLevel(pDb->pWorker); p && rc==LSM_OK; p=p->pNext){
    Merge *pMerge = p->pMerge;
    if( pMerge ){
      pMerge->aiOutputOff[0] = -1;
      pMerge->aiOutputOff[1] = -1;
      pMerge->bHierReadonly = 1;

    }
  }

  return LSM_OK;
}

void lsmSortedSaveTreeCursors(lsm_db *pDb){







<
<
<
<
<
<







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>





>
>
>


>
>







 







|

|

|



>


<

|

<

>










|
<
<
<







 







>



>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


>

<
<
>
|
<



<







 







>
>
>
>
>
>
|
|
|
|
|
<
|
|
|
>
>







 







<







 







<
<
<
<







 







<
<
<
<







 







|
|
>
>
>




|
|

|
<
|



|







 







|




|







|
>
>
>

|
>
|
>
>
>
|
|




|







 







|
|







 







|











>







 







>
>
>


>
>
|

<
>
>
>
|
|


<
>


|
|







 







|



|



|





|




|
|











|
|



|


|




|




|



|
|





|







 







|





|



|







 







|




|
|







 







<
|
|
|
|
<
>







 







>
|
>
>
>
|
|
|
>
>
|
>

<
<







 







|







 







|
|







 







|


|
>







 







>

>
|
>
>







 







>







 







|
>







6
7
8
9
10
11
12






13
14
15
16
17
18
19
...
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
...
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
....
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163

1164
1165
1166

1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179



1180
1181
1182
1183
1184
1185
1186
....
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251


1252
1253

1254
1255
1256

1257
1258
1259
1260
1261
1262
1263
....
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283

1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
....
1489
1490
1491
1492
1493
1494
1495

1496
1497
1498
1499
1500
1501
1502
....
1504
1505
1506
1507
1508
1509
1510




1511
1512
1513
1514
1515
1516
1517
....
1678
1679
1680
1681
1682
1683
1684




1685
1686
1687
1688
1689
1690
1691
....
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266

2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
....
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
....
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
....
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
....
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456

2457
2458
2459
2460
2461
2462
2463

2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
....
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
....
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
....
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
....
2827
2828
2829
2830
2831
2832
2833

2834
2835
2836
2837

2838
2839
2840
2841
2842
2843
2844
2845
....
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937


2938
2939
2940
2941
2942
2943
2944
....
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
....
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
....
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
....
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
3528
....
3548
3549
3550
3551
3552
3553
3554
3555
3556
3557
3558
3559
3560
3561
3562
....
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
**






** PAGE FORMAT:
**
**   The maximum page size is 65536 bytes.
**
**   Since all records are equal to or larger than 2 bytes in size, and 
**   some space within the page is consumed by the page footer, there must
**   be less than 2^15 records on each page.
................................................................................
  int nTree;
  int *aTree;

  int *pnHdrLevel;
  void *pSystemVal;
  Snapshot *pSnap;
};

#define CURSOR_DATA_TREE      0
#define CURSOR_DATA_SYSTEM    1
#define CURSOR_DATA_SEGMENT   2

/*
** Used to iterate through the keys stored in a b-tree hierarchy from start
** to finish. Only First() and Next() operations are required.
*/
typedef struct BtreePg BtreePg;
typedef struct BtreeCursor BtreeCursor;
struct BtreePg {
  Page *pPage;
  int iCell;
};
struct BtreeCursor {
  int nAlloc;                     /* Allocated size of aPg[] */
  int iPg;                        /* Current entry in aPg[]. 0 -> EOF. */
  BtreePg *aPg;                   /* Pages from root to current location */

  /* Cache of current entry */
  void *pKey;
  int nKey;
  int iTopic;
  Pgno iPtr;

  /* Storage for key, if not local */
  Blob blob;
};


/*
** CURSOR_IGNORE_DELETE
**   If set, this cursor will not visit SORTED_DELETE keys.
**
** CURSOR_NEW_SYSTEM
**   If set, then after all user data from the in-memory tree and any other
................................................................................
#define CURSOR_AT_FREELIST      0x00000004
#define CURSOR_AT_LEVELS        0x00000008
#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040

typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;

struct Hierarchy {
  Page **apHier;
  int nHier;
};

struct MergeWorker {
  lsm_db *pDb;                    /* Database handle */
  Level *pLevel;                  /* Worker snapshot Level being merged */
  MultiCursor *pCsr;              /* Cursor to read new segment contents from */
  int bFlush;                     /* True if this is an in-memory tree flush */

  Hierarchy aHier[2];
#if 0
  Page **apHier;                  /* Separators array b-tree internal nodes */
  int nHier;                      /* Number of entries in apHier[] */
#endif

  Page *apPage[2];                /* Current output pages (0 is main run) */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static void assertAllPointersOk(int rc, lsm_db *pDb);
static void assertAllBtreesOk(int rc, lsm_db *);
................................................................................
    );
    if( res ) iBest = i;
  }

  pCsr->iCurrentPtr = iBest;
}

static int seekInBtree(
  LevelCursor *pCsr,
  SortedRun *pRun,
  void *pKey, int nKey,           /* Key to seek to */
  Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
){
  int rc;
  int iPg;
  Page *pPg = 0;
  Blob blob = {0, 0, 0};
  int iTopic = 0;                 /* TODO: Fix me */


  iPg = pRun->iRoot;
  do {

    rc = lsmFsDbPageGet(pCsr->pFS, iPg, &pPg);
    assert( rc==LSM_OK || pPg==0 );
    if( rc==LSM_OK ){
      u8 *aData;                  /* Buffer containing page data */
      int nData;                  /* Size of aData[] in bytes */
      int iMin;
      int iMax;
      int nRec;
      int flags;

      aData = lsmFsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;




      iPg = pageGetPtr(aData, nData);
      nRec = pageGetNRec(aData, nData);

      iMin = 0;
      iMax = nRec-1;
      while( iMax>=iMin ){
................................................................................
          iMax = iTry-1;
        }else{
          iMin = iTry+1;
        }
        lsmFsPageRelease(pRef);
      }
      lsmFsPageRelease(pPg);
      pPg = 0;
    }
  }while( rc==LSM_OK );

  sortedBlobFree(&blob);
  assert( (rc==LSM_OK)==(pPg!=0) );
  *ppPg = pPg;
  return rc;
}

static int seekInSeparators(
  LevelCursor *pCsr,
  SegmentPtr *pPtr,               /* Segment to seek within */
  void *pKey, int nKey,           /* Key to seek to */
  int *piPtr                      /* OUT: FC pointer */
){
  int rc;
  Page *pPg;
  SortedRun *pSep = &pPtr->pSeg->sep;

  rc = seekInBtree(pCsr, pSep, pKey, nKey, &pPg);

  if( rc==LSM_OK ){
    assert( pPtr->pRun==&pPtr->pSeg->run );
    assert( pPg );
    pPtr->pRun = pSep;


    segmentPtrSetPage(pPtr, pPg);
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, 0, piPtr);

    pPtr->pRun = &pPtr->pSeg->run;
  }


  return rc;
}

static int seekInSegment(
  LevelCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
................................................................................
  int *piPtr                      /* OUT: FC pointer */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  assert( pPtr->pRun==&pPtr->pSeg->run );

  if( pPtr->pSeg->run.iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->run.iRoot!=0 );
    rc = seekInBtree(pCsr, &pPtr->pSeg->run, pKey, nKey, &pPg);
    if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
  }else{
    if( segmentHasSeparators(pPtr->pSeg) ){
      rc = seekInSeparators(pCsr, pPtr, pKey, nKey, &iPtr);
    }else if( iPtr==0 ){
      iPtr = pPtr->pSeg->run.iFirst;
    }

    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr);
  }
  return rc;
}

/*
................................................................................
    lsmFree(pDb->pEnv, pCsr);
  }
}

#define MULTICURSOR_ADDLEVEL_ALL 1
#define MULTICURSOR_ADDLEVEL_RHS 2
#define MULTICURSOR_ADDLEVEL_LHS_SEP 3


/*
** Add segments belonging to level pLevel to the multi-cursor pCsr. The
** third argument must be one of the following:
**
**   MULTICURSOR_ADDLEVEL_ALL
**     Add all segments in the level to the cursor.
................................................................................
**   MULTICURSOR_ADDLEVEL_RHS
**     Add only the rhs segments in the level to the cursor.
**
**   MULTICURSOR_ADDLEVEL_LHS_SEP
**     Add only the lhs segment. And iterate through its separators array,
**     not the main run array.
**




** RHS and SEP are only used by cursors created to use as data sources when
** creating new segments (either when flushing the in-memory tree to disk or
** when merging existing runs).
*/
int multiCursorAddLevel(
  MultiCursor *pCsr,              /* Multi-cursor to add segment to */ 
  Level *pLevel,                  /* Level to add to multi-cursor merge */
................................................................................
  }

  assert( (rc==LSM_OK)==(pCsr!=0) );
  *ppCsr = pCsr;
  return rc;
}





static void multiCursorGetKey(
  MultiCursor *pCsr, 
  int iKey,
  int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
  void **ppKey,                   /* OUT: Pointer to buffer containing key */
  int *pnKey                      /* OUT: Size of *ppKey in bytes */
){
................................................................................
** This function copies all such b-tree pages to new locations, so that
** they can be modified as required.
**
** The complication is that not all database pages are the same size - due
** to the way the file.c module works some (the first and last in each block)
** are 4 bytes smaller than the others.
*/
static int mergeWorkerMoveHierarchy(
  MergeWorker *pMW,               /* Merge worker */
  int bSep                        /* True for separators run */
){
  SortedRun *pRun;                /* Run being modified */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc = LSM_OK;                /* Return code */
  int i;
  int iRight = 0;
  Page **apHier = pMW->aHier[bSep].apHier;
  int nHier = pMW->aHier[bSep].nHier;

  assert( nHier>0 && pMW->pLevel->pMerge->abHierReadonly[bSep] );

  pRun = (bSep ? &pMW->pLevel->lhs.sep : &pMW->pLevel->lhs.run);

  for(i=0; rc==LSM_OK && i<nHier; i++){
    Page *pNew = 0;
    rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pRun, &pNew);
    assert( rc==LSM_OK );

    if( rc==LSM_OK ){
      u8 *a1; int n1;
      u8 *a2; int n2;

      a1 = lsmFsPageData(pNew, &n1);
................................................................................
        lsmFsPageRelease(pNew);
      }
    }
  }

#ifdef LSM_DEBUG
  if( rc==LSM_OK ){
    for(i=0; i<nHier; i++) assert( lsmFsPageWritable(apHier[i]) );
  }
#endif

  if( rc==LSM_OK ){
    pMW->pLevel->pMerge->abHierReadonly[bSep] = 0;
  }
  return rc;
}

/*
** Allocate and populate the MergeWorker.apHier[] array.
*/
static int mergeWorkerLoadHierarchy(
  MergeWorker *pMW,
  int bSep
){
  int rc = LSM_OK;
  SortedRun *pRun;
  Hierarchy *p;
 
  pRun = (bSep ? &pMW->pLevel->lhs.sep : &pMW->pLevel->lhs.run);
  p = &pMW->aHier[bSep];

  if( p->apHier==0 && pRun->iRoot!=0 ){
    int bHierReadonly = pMW->pLevel->pMerge->abHierReadonly[bSep];
    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = pRun->iRoot;

    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

................................................................................
      }else{
        lsmFsPageRelease(pPg);
        break;
      }
    }while( 1 );

    if( rc==LSM_OK ){
      p->nHier = nHier;
      p->apHier = apHier;
    }else{
      int i;
      for(i=0; i<nHier; i++){
        lsmFsPageRelease(apHier[i]);
      }
      lsmFree(pEnv, apHier);
    }
................................................................................
** The reason for having the page footer pointer point to the right-child
** (instead of the left) is that doing things this way makes the 
** segWriterMoveHierarchy() operation less complicated (since the pointers 
** that need to be updated are all stored as fixed-size integers within the 
** page footer, not varints in page records).
**
** Records may not span b-tree pages. If this function is called to add a
** record larger than (page-size / 4) bytes, then a pointer to the indexed
** array page that contains the main record is added to the b-tree instead.
** In this case the record format is:
**
**         + 0x00 byte (1 byte) 
**         + Absolute pointer value (varint),
**         + Absolute page number of page containing key (varint).
**
** See function seekInSeparators() for the code that traverses b-tree pages.
*/
static int mergeWorkerPushHierarchy(
  MergeWorker *pMW,               /* Merge worker object */
  int bSep,                       /* True for separators, false otherwise */
  Pgno iKeyPg,                    /* Page that will contain pKey/nKey */
  int iTopic,                     /* Topic value for this key */
  void *pKey,                     /* Pointer to key buffer */
  int nKey                        /* Size of pKey buffer in bytes */
){
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc;                         /* Return Code */
................................................................................
  int nData;                      /* Size of aData[] in bytes */
  u8 *aData;                      /* Page data for level iLevel */
  int iOff;                       /* Offset on b-tree page to write record to */
  int nRec;                       /* Initial number of records on b-tree page */
  Pgno iPtr;                      /* Pointer value to accompany pKey/nKey */
  int bIndirect;                  /* True to use an indirect record */

  Hierarchy *p;
  SortedRun *pRun;

  /* If there exists a b-tree hierarchy and it is not loaded into 
  ** memory, load it now.  */
  pRun = (bSep ? &pMW->pLevel->lhs.sep : &pMW->pLevel->lhs.run);
  p = &pMW->aHier[bSep];
  rc = mergeWorkerLoadHierarchy(pMW, bSep);


  /* Obtain the absolute pointer value to store along with the key in the
  ** page body. This pointer points to a page that contains keys that are
  ** smaller than pKey/nKey.  */
  if( p->nHier ){
    aData = lsmFsPageData(p->apHier[0], &nData);
    iPtr = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]);
  }else{

    iPtr = pRun->iFirst;
  }

  if( p->nHier && pMW->pLevel->pMerge->abHierReadonly[bSep] ){
    rc = mergeWorkerMoveHierarchy(pMW, bSep);
    if( rc!=LSM_OK ) goto push_hierarchy_out;
  }

  /* Determine if the indirect format should be used. */
  bIndirect = (nKey*4 > lsmFsPageSize(pMW->pDb->pFS));

  /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
................................................................................
  ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
  ** root page.
  **
  ** This loop searches for a node with enough space to store the key on,
  ** starting with the leaf and iterating up towards the root. When the loop
  ** exits, the key may be written to apHier[iLevel].
  */
  for(iLevel=0; iLevel<=p->nHier; iLevel++){
    int nByte;                    /* Number of free bytes required */
    int iRight;                   /* Right hand pointer from aData[]/nData */

    if( iLevel==p->nHier ){
      /* Extend the array and allocate a new root page. */
      Page **aNew;
      aNew = (Page **)lsmRealloc(
          pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
      );
      if( !aNew ){
        rc = LSM_NOMEM_BKPT;
        goto push_hierarchy_out;
      }
      p->apHier = aNew;
    }else{
      int nFree;

      /* If the key will fit on this page, break out of the loop. */
      assert( lsmFsPageWritable(p->apHier[iLevel]) );
      aData = lsmFsPageData(p->apHier[iLevel], &nData);
      iRight = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]);
      if( bIndirect ){
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
      if( nByte<=nFree ) break;

      /* Otherwise, it is full. Release it. */
      iPtr = lsmFsPageNumber(p->apHier[iLevel]);
      rc = lsmFsPageRelease(p->apHier[iLevel]);
    }

    /* Allocate a new page for apHier[iLevel]. */
    p->apHier[iLevel] = 0;
    if( rc==LSM_OK ){
      rc = lsmFsSortedAppend(
          pDb->pFS, pDb->pWorker, pRun, &p->apHier[iLevel]
      );
    }
    if( rc!=LSM_OK ) goto push_hierarchy_out;

    aData = lsmFsPageData(p->apHier[iLevel], &nData);
    memset(aData, 0, nData);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    if( iLevel>0 ){
      iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
      lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
    }

    if( iLevel==p->nHier ){
      p->nHier++;
      break;
    }
  }

  /* Write the key into page apHier[iLevel]. */
  aData = lsmFsPageData(p->apHier[iLevel], &nData);

  iOff = mergeWorkerPageOffset(aData, nData);

  nRec = pageGetNRec(aData, nData);
  lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], iOff);
  lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1);

................................................................................
    aData[iOff++] = (u8)(iTopic | SORTED_SEPARATOR);
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  if( iLevel>0 ){
    int iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
    lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
  }

  /* Write the right-hand pointer of the right-most leaf page of the 
  ** b-tree heirarchy. */
  aData = lsmFsPageData(p->apHier[0], &nData);
  lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iKeyPg);

  /* Ensure that the SortedRun.iRoot field is correct. */
  pRun->iRoot = lsmFsPageNumber(p->apHier[p->nHier-1]);

push_hierarchy_out:
  return rc;
}

/*
** The merge-worker object passed as the first argument to this function
................................................................................
        if( pageGetNRec(aData, nData)>0 ){
          u8 *pKey;
          int nKey;
          int iTopic;
          Pgno iPg = lsmFsPageNumber(pPg);

          pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
          rc = mergeWorkerPushHierarchy(pMW, 1, iPg, iTopic, pKey, nKey);
        }
      }
    }

    if( pMW->aHier[1].nHier>0 ){
      Page *pRoot = pMW->aHier[1].apHier[pMW->aHier[1].nHier-1];
      pRun->iRoot = lsmFsPageNumber(pRoot);
    }else{
      pRun->iRoot = pRun->iFirst;
    }

    lsmFsPageRelease(pPg);
    sortedBlobFree(&blob);
................................................................................
  **
  **   * If currently writing the separators array, push a copy of the key
  **     into the b-tree hierarchy.
  */
  if( rc==LSM_OK && nRec==0 && pRun->iFirst!=pRun->iLast ){
    assert( pMerge->nSkip>=0 );


    if( pMW->bFlush==0 || bSep==0 ){
      Pgno iPg = lsmFsPageNumber(pPg);
      rc = mergeWorkerPushHierarchy(pMW, bSep, iPg, rtTopic(eType), pKey, nKey);
    }

    if( bSep==0 ){
      if( pMerge->nSkip ){
        pMerge->nSkip--;
        flags = PGFTR_SKIP_THIS_FLAG;
      }else{
        *piPtrOut = lsmFsPageNumber(pPg);
        pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
      }
................................................................................
      }
    }
  }

  lsmMCursorClose(pCsr);
  lsmFsPageRelease(pMW->apPage[0]);
  lsmFsPageRelease(pMW->apPage[1]);

  for(i=0; i<2; i++){
    Hierarchy *p = &pMW->aHier[i];
    int iPg;
    for(iPg=0; iPg<p->nHier; iPg++){
      lsmFsPageRelease(p->apHier[iPg]);
    }
    lsmFree(pMW->pDb->pEnv, p->apHier);
    p->apHier = 0;
    p->nHier = 0;
  }

  pMW->pCsr = 0;


  pMW->apPage[0] = 0;
  pMW->apPage[1] = 0;
}

static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc;                         /* Return code */
  SortedRun *pRun;                /* Run containing sep. keys to merge in */
................................................................................
  lsm_db *pDb,                    /* Connection handle */
  int *pnHdrLevel                 /* OUT: Number of levels not stored in LSM */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int iLeftPtr = 0;

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);

  /* Create a cursor to gather the data required by the new segment. The new
................................................................................

            /* This call moves any blocks occupied by separators array pDel 
            ** to the pending list. We do this here, even though pDel will be 
            ** read while building the new level, so that the blocks will be 
            ** included in the "FREELIST" entry visited by the cursor (and 
            ** written into the new top level).  */
            if( rc==LSM_OK ){
              pDel = &pNext->lhs;
              rc = lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 0, &pDel->sep);
            }
          }
          iLeftPtr = pNext->lhs.run.iFirst;
        }
      }else{
        /* The new level will be the only level in the LSM. There is no reason
         ** to write out delete keys in this case.  */
................................................................................
  }
  lsmFreelistDeltaEnd(pDb);

  /* Link the new level into the top of the tree. Delete the separators
  ** array (if any) that was merged into the new level. */
  if( rc==LSM_OK ){
    if( pDel ){
      /* lsmFsSortedDelete() has already been called on pDel->sep. So all
      ** that is required here is to zero it (so that it is not used by
      ** future LSM searches). */
      memset(&pDel->sep, 0, sizeof(SortedRun));
      pDel->run.iRoot = 0;
    }
  }else{
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }

  if( rc==LSM_OK ){
................................................................................
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
      */
      if( rc==LSM_OK ){
        if( mergeWorkerDone(&mergeworker)==0 ){
          int iGobble = mergeworker.pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
          if( iGobble<pLevel->nRight ){
            Level *pLevel = mergeworker.pCsr->aSegCsr[iGobble].pLevel;
            SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[iGobble].aPtr[0];

            if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 
             && pGobble->pRun->iRoot==0
            ){
              lsmFsGobble(pWorker, pGobble->pRun, pGobble->pPg);
            }
          }

        }else if( pLevel->lhs.run.iFirst==0 ){
          /* If the new level is completely empty, remove it from the 
          ** database snapshot. This can only happen if all input keys were
................................................................................
          int i;

          /* Free the separators of the next level, if required. */
          if( pLevel->pMerge->nInput > pLevel->nRight ){
            assert( pLevel->pNext );
            assert( segmentHasSeparators(&pLevel->pNext->lhs) );
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->pNext->lhs.sep);
            pLevel->pNext->lhs.run.iRoot = 0;
          }

          /* Free the right-hand-side of pLevel */
          for(i=0; i<pLevel->nRight; i++){
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i].run);
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i].sep);
          }
................................................................................

  assert( pDb->pWorker );
  for(p=lsmDbSnapshotLevel(pDb->pWorker); p && rc==LSM_OK; p=p->pNext){
    Merge *pMerge = p->pMerge;
    if( pMerge ){
      pMerge->aiOutputOff[0] = -1;
      pMerge->aiOutputOff[1] = -1;
      pMerge->abHierReadonly[0] = 1;
      pMerge->abHierReadonly[1] = 1;
    }
  }

  return LSM_OK;
}

void lsmSortedSaveTreeCursors(lsm_db *pDb){