SQLite4
Check-in [ec39163b79]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix another problem with mmap and using a background thread.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | embedded-btree
Files: files | file ages | folders
SHA1: ec39163b79ae2075de3aaff76f2bddc30330b329
User & Date: dan 2012-06-26 18:10:18
Context
2012-06-26
19:45
Fix a bug causing an infinite loop if there are open cursors when the in-memory tree is flushed to disk. check-in: d409cd8c8a user: dan tags: embedded-btree
18:10
Fix another problem with mmap and using a background thread. check-in: ec39163b79 user: dan tags: embedded-btree
15:43
Add LSM_CONFIG_NMERGE parameter. Add code to lsmtest so that lsm configurations can be specified in string form. check-in: 0fcabb513b user: dan tags: embedded-btree
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_main.c.

469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
}


static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    tdb_lsm_config_str(pDb, "mmap=1");
  }
  return pLsm;
}

int do_speed_tests(int nArg, char **azArg){

  struct DbSystem {







|







469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
}


static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    tdb_lsm_config_str(pDb, "mmap=1 autowork=1 nmerge=4 worker_nmerge=4");
  }
  return pLsm;
}

int do_speed_tests(int nArg, char **azArg){

  struct DbSystem {

Changes to lsm-test/lsmtest_tdb3.c.

897
898
899
900
901
902
903
904




905
906
907

908
909
910
911
912
913
914
915
916
917

918
919
920
921
922
923
924
925
926
927
928
929
930

931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952

953

954
955
956
957
958
959
960
...
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
  return test_lsm_mt(zFilename, 1, bClear, ppDb);
}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 2, bClear, ppDb);
}

int test_lsm_config_str(lsm_db *pDb, const char *zStr){




  
  struct CfgParam {
    const char *zParam;

    int eParam;
  } aParam[] = {
    { "write_buffer",   LSM_CONFIG_WRITE_BUFFER },
    { "page_size",      LSM_CONFIG_PAGE_SIZE },
    { "safety",         LSM_CONFIG_SAFETY },
    { "autowork",       LSM_CONFIG_AUTOWORK },
    { "log_size",       LSM_CONFIG_LOG_SIZE },
    { "mmap",           LSM_CONFIG_MMAP },
    { "use_log",        LSM_CONFIG_USE_LOG },
    { "nmerge",         LSM_CONFIG_NMERGE },

    { 0, 0 }
  };
  char *z = zStr;

  while( z[0] && pDb ){
    char *zStart;

    /* Skip whitespace */
    while( *z==' ' ) z++;
    zStart = z;

    while( *z && *z!='=' ) z++;
    if( *z ){

      int iParam;
      int iVal;
      int rc;
      char zParam[32];
      int nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;

      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      rc = testArgSelect(aParam, "param", zParam, &iParam);
      if( rc!=0 ) return rc;
      iParam = aParam[iParam].eParam;

      z++;
      zStart = z;
      while( *z>='0' && *z<='9' ) z++;
      nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      iVal = atoi(zParam);


      lsm_config(pDb, iParam, &iVal);

    }else if( z!=zStart ){
      goto syntax_error;
    }
  }

  return 0;
 syntax_error:
................................................................................

int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
  int rc = 0;
  if( tdb_lsm(pDb) ){
    int i;
    LsmDb *pLsm = (LsmDb *)pDb;

    rc = test_lsm_config_str(pLsm->db, zStr);
    for(i=0; rc==0 && i<pLsm->nWorker; i++){
      rc = test_lsm_config_str(pLsm->aWorker[i].pWorker, zStr);
    }
  }
  return rc;
}


#else







|
>
>
>
>



>


|
|
|
|
|
|
|
|
>













>
|








|

|










>
|
>







 







|

|







897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
...
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
  return test_lsm_mt(zFilename, 1, bClear, ppDb);
}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 2, bClear, ppDb);
}

int test_lsm_config_str(
  lsm_db *pDb, 
  int bWorker,
  const char *zStr
){
  
  struct CfgParam {
    const char *zParam;
    int bWorker;
    int eParam;
  } aParam[] = {
    { "write_buffer",   0, LSM_CONFIG_WRITE_BUFFER },
    { "page_size",      0, LSM_CONFIG_PAGE_SIZE },
    { "safety",         0, LSM_CONFIG_SAFETY },
    { "autowork",       0, LSM_CONFIG_AUTOWORK },
    { "log_size",       0, LSM_CONFIG_LOG_SIZE },
    { "mmap",           0, LSM_CONFIG_MMAP },
    { "use_log",        0, LSM_CONFIG_USE_LOG },
    { "nmerge",         0, LSM_CONFIG_NMERGE },
    { "worker_nmerge",  1, LSM_CONFIG_NMERGE },
    { 0, 0 }
  };
  char *z = zStr;

  while( z[0] && pDb ){
    char *zStart;

    /* Skip whitespace */
    while( *z==' ' ) z++;
    zStart = z;

    while( *z && *z!='=' ) z++;
    if( *z ){
      int eParam;
      int i;
      int iVal;
      int rc;
      char zParam[32];
      int nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;

      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      rc = testArgSelect(aParam, "param", zParam, &i);
      if( rc!=0 ) return rc;
      eParam = aParam[i].eParam;

      z++;
      zStart = z;
      while( *z>='0' && *z<='9' ) z++;
      nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      iVal = atoi(zParam);

      if( bWorker || aParam[i].bWorker==0 ){
        lsm_config(pDb, eParam, &iVal);
      }
    }else if( z!=zStart ){
      goto syntax_error;
    }
  }

  return 0;
 syntax_error:
................................................................................

int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
  int rc = 0;
  if( tdb_lsm(pDb) ){
    int i;
    LsmDb *pLsm = (LsmDb *)pDb;

    rc = test_lsm_config_str(pLsm->db, 0, zStr);
    for(i=0; rc==0 && i<pLsm->nWorker; i++){
      rc = test_lsm_config_str(pLsm->aWorker[i].pWorker, 1, zStr);
    }
  }
  return rc;
}


#else

Changes to src/lsm_shared.c.

750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
    lsmMutexEnter(pDb->pEnv, p->pClientMutex);
    assertSnapshotListOk(p);
    pOld = p->pClient;
    pNew->pSnapshotNext = pOld;
    p->pClient = pNew;
    assertSnapshotListOk(p);
    if( pDb->pClient ){
      assert( pDb->pClient==pOld );
      pDb->pClient = p->pClient;
      p->pClient->nRef++;
    }
    lsmMutexLeave(pDb->pEnv, p->pClientMutex);

    lsmDbSnapshotRelease(pDb->pEnv, pOld);
    p->bDirty = 0;

    /* Upgrade the user connection to the new client snapshot */







<
|
|







750
751
752
753
754
755
756

757
758
759
760
761
762
763
764
765
    lsmMutexEnter(pDb->pEnv, p->pClientMutex);
    assertSnapshotListOk(p);
    pOld = p->pClient;
    pNew->pSnapshotNext = pOld;
    p->pClient = pNew;
    assertSnapshotListOk(p);
    if( pDb->pClient ){

      pDb->pClient = pNew;
      pNew->nRef++;
    }
    lsmMutexLeave(pDb->pEnv, p->pClientMutex);

    lsmDbSnapshotRelease(pDb->pEnv, pOld);
    p->bDirty = 0;

    /* Upgrade the user connection to the new client snapshot */

Changes to src/lsm_sorted.c.

249
250
251
252
253
254
255

256
257
258
259
260
261
262
...
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
....
1075
1076
1077
1078
1079
1080
1081



1082
1083
1084
1085
1086
1087
1088
....
2218
2219
2220
2221
2222
2223
2224

2225
2226
2227
2228
2229
2230
2231
....
2656
2657
2658
2659
2660
2661
2662




2663
2664

2665











2666
2667
2668
2669
2670
2671
2672
....
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
....
3221
3222
3223
3224
3225
3226
3227


3228
3229
3230
3231
3232
3233
3234
....
3258
3259
3260
3261
3262
3263
3264


3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277

3278
3279
3280
3281
3282
3283
3284
....
3323
3324
3325
3326
3327
3328
3329


3330

3331
3332
3333
3334
3335
3336
3337
....
3439
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
....
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498
3499
3500
3501
3502
3503
3504
....
4195
4196
4197
4198
4199
4200
4201

4202
4203
4204
4205
4206
4207

4208
4209
4210
4211
4212
4213
4214
  lsm_db *pDb;                    /* Connection that owns this cursor */
  MultiCursor *pNext;             /* Next cursor owned by connection pDb */

  int flags;                      /* Mask of CURSOR_XXX flags */
  int (*xCmp)(void *, int, void *, int);         /* Compare function */
  int eType;                      /* Cache of current key type */
  Blob key;                       /* Cache of current key (or NULL) */


  TreeCursor *pTreeCsr;           /* Single tree cursor */
  int nSegCsr;                    /* Size of aSegCsr[] array */
  LevelCursor *aSegCsr;           /* Array of cursors open on sorted files */
  int nTree;
  int *aTree;
  BtreeCursor *pBtCsr;
................................................................................
};

struct MergeWorker {
  lsm_db *pDb;                    /* Database handle */
  Level *pLevel;                  /* Worker snapshot Level being merged */
  MultiCursor *pCsr;              /* Cursor to read new segment contents from */
  int bFlush;                     /* True if this is an in-memory tree flush */

  Hierarchy aHier[2];
#if 0
  Page **apHier;                  /* Separators array b-tree internal nodes */
  int nHier;                      /* Number of entries in apHier[] */
#endif

  Page *apPage[2];                /* Current output pages (0 is main run) */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static void assertAllPointersOk(int rc, lsm_db *pDb);
static void assertAllBtreesOk(int rc, lsm_db *);
................................................................................
    rc = segmentPtrReadData(
        pPtr, iOff, pPtr->nKey, &pPtr->pKey, &pPtr->blob1
    );
    if( rc==LSM_OK && rtIsWrite(pPtr->eType) ){
      rc = segmentPtrReadData(
          pPtr, iOff+pPtr->nKey, pPtr->nVal, &pPtr->pVal, &pPtr->blob2
      );



    }
  }

  return rc;
}

void lsmSortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
................................................................................
         && segmentCursorValid(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT]) 
  ){
    segmentCursorValue(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT], ppVal, pnVal);
  }else{
    *ppVal = 0;
    *pnVal = 0;
  }

  return rc;
}

int lsmSortedLoadSystem(lsm_db *pDb){
  MultiCursor *pCsr = 0;          /* Cursor used to retreive free-list */
  int rc;                         /* Return Code */

................................................................................
    }
    *pnKey = nKey; 
  }
  return LSM_OK;
}

int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){




  assert( pCsr->aTree );
  assert( rtIsDelete(pCsr->eType)==0 || !(pCsr->flags & CURSOR_IGNORE_DELETE) );

  return multiCursorGetVal(pCsr, pCsr->aTree[1], ppVal, pnVal);











}

int lsmMCursorType(MultiCursor *pCsr, int *peType){
  assert( pCsr->aTree );
  multiCursorGetKey(pCsr, pCsr->aTree[1], peType, 0, 0);
  return LSM_OK;
}
................................................................................


static int mergeWorkerWrite(
  MergeWorker *pMW,               /* Merge worker object to write into */
  int bSep,                       /* True to write to separators array */
  int eType,                      /* One of SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey, int nKey,           /* Key value */
  void *pVal, int nVal,           /* Accompanying value, if any */
  int iPtr,                       /* Absolute value of page pointer, or 0 */
  int *piPtrOut                   /* OUT: Pointer to write to separators */
){
  int rc = LSM_OK;                /* Return code */
  Merge *pMerge;                  /* Persistent part of level merge state */
  int nHdr;                       /* Space required for this record header */
  Page *pPg;                      /* Page to write to */
................................................................................
  int nData;                      /* Size of buffer aData[] in bytes */
  int nRec;                       /* Number of records on page pPg */
  int iFPtr;                      /* Value of pointer in footer of pPg */
  int iRPtr;                      /* Value of pointer written into record */
  int iOff;                       /* Current write offset within page pPg */
  SortedRun *pRun;                /* Run being written to */
  int flags = 0;                  /* If != 0, flags value for page footer */



  assert( bSep==0 || bSep==1 );
  assert( bSep==0 || rtIsSeparator(eType) );

  pMerge = pMW->pLevel->pMerge;    
  pRun = (bSep ? &pMW->pLevel->lhs.sep : &pMW->pLevel->lhs.run);

................................................................................
  ** The header space is:
  **
  **     1) record type - 1 byte.
  **     2) Page-pointer-offset - 1 varint
  **     3) Key size - 1 varint
  **     4) Value size - 1 varint (SORTED_WRITE only)
  */


  nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
  if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

  /* If the entire header will not fit on page pPg, or if page pPg is 
  ** marked read-only, advance to the next page of the output run. */
  iOff = pMerge->aiOutputOff[bSep];
  if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
    iFPtr = iFPtr + (nRec ? pageGetRecordPtr(aData, nData, nRec-1) : 0);
    iRPtr = iPtr - iFPtr;
    iOff = 0;
    nRec = 0;
    rc = mergeWorkerNextPage(pMW, bSep, iFPtr);
    pPg = pMW->apPage[bSep];

  }

  /* If this record header will be the first on the page, and the page is 
  ** not the very first in the entire run, special actions may need to be 
  ** taken:
  **
  **   * If currently writing the main run, *piPtrOut should be set to
................................................................................
    if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal);   /* 4 */
    pMerge->aiOutputOff[bSep] = iOff;

    /* Write the key and data into the segment. */
    assert( iFPtr==pageGetPtr(aData, nData) );
    rc = mergeWorkerData(pMW, bSep, iFPtr+iRPtr, pKey, nKey);
    if( rc==LSM_OK && rtIsWrite(eType) ){


      rc = mergeWorkerData(pMW, bSep, iFPtr+iRPtr, pVal, nVal);

    }
  }

  return rc;
}


................................................................................

static int mergeWorkerStep(MergeWorker *pMW){
  lsm_db *pDb = pMW->pDb;       /* Database handle */
  MultiCursor *pCsr;            /* Cursor to read input data from */
  int rc = LSM_OK;              /* Return code */
  int eType;                    /* SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey; int nKey;         /* Key */
  void *pVal; int nVal;         /* Value */
  Segment *pSeg;                /* Output segment */
  int iPtr = 0;

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
................................................................................
    if( segmentCursorValid(pPtrs)
     && 0==pDb->xCmp(pPtrs->aPtr[0].pKey, pPtrs->aPtr[0].nKey, pKey, nKey)
    ){
      iPtr = pPtrs->aPtr[0].iPtr+pPtrs->aPtr[0].iPgPtr;
    }
  }


  /* If this is a separator key and we know that the output pointer has not
  ** changed, there is no point in writing an output record. Otherwise,
  ** proceed. */
  if( rtIsSeparator(eType)==0 || iPtr!=0 ){
    int iSPtr = 0;                /* Separators require a pointer here */

    if( pMW->apPage[0]==0 ){
      rc = mergeWorkerFirstPage(pMW);
    }

    /* Write the record into the main run. */
    if( rc==LSM_OK ){
      rc = lsmMCursorValue(pCsr, &pVal, &nVal);
    }
    if( rc==LSM_OK ){
      rc = mergeWorkerWrite(pMW, 0, eType, pKey, nKey, pVal, nVal, iPtr,&iSPtr);
    }

    /* If the call to mergeWorkerWrite() above started a new page, then
    ** add a SORTED_SEPARATOR key to the separators run.  */
#if 0
    if( rc==LSM_OK && iSPtr ){

................................................................................

  if( rc==LSM_OK && nPage>0 ){
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    int nWrite = 0;
    pDb->pWorker = lsmDbSnapshotWorker(pDb);
    rc = sortedWork(pDb, nPage, bOptimize, &nWrite);


    if( nWrite && (flags & LSM_WORK_CHECKPOINT) ){
      int nHdrLevel = 0;
      if( rc==LSM_OK ) rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK ) rc = lsmSortedNewToplevel(pDb, &nHdrLevel);
      if( rc==LSM_OK ) rc = lsmDbUpdateClient(pDb, nHdrLevel);
    }


    lsmDbSnapshotRelease(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
    if( pnWrite ) *pnWrite = nWrite;
  }else if( pnWrite ){
    *pnWrite = 0;
  }







>







 







<

<
<
<
<
<







 







>
>
>







 







>







 







>
>
>
>


>
|
>
>
>
>
>
>
>
>
>
>
>







 







|







 







>
>







 







>
>
|
|

|
|
|
|
|
|
|
|
|
|
>







 







>
>
|
>







 







<







 







<












<
<
<
|







 







>






>







249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
...
317
318
319
320
321
322
323

324





325
326
327
328
329
330
331
....
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
....
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
....
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
....
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
....
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247
3248
3249
3250
3251
....
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
....
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
....
3462
3463
3464
3465
3466
3467
3468

3469
3470
3471
3472
3473
3474
3475
....
3496
3497
3498
3499
3500
3501
3502

3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514



3515
3516
3517
3518
3519
3520
3521
3522
....
4213
4214
4215
4216
4217
4218
4219
4220
4221
4222
4223
4224
4225
4226
4227
4228
4229
4230
4231
4232
4233
4234
  lsm_db *pDb;                    /* Connection that owns this cursor */
  MultiCursor *pNext;             /* Next cursor owned by connection pDb */

  int flags;                      /* Mask of CURSOR_XXX flags */
  int (*xCmp)(void *, int, void *, int);         /* Compare function */
  int eType;                      /* Cache of current key type */
  Blob key;                       /* Cache of current key (or NULL) */
  Blob val;                       /* Cache of current value */

  TreeCursor *pTreeCsr;           /* Single tree cursor */
  int nSegCsr;                    /* Size of aSegCsr[] array */
  LevelCursor *aSegCsr;           /* Array of cursors open on sorted files */
  int nTree;
  int *aTree;
  BtreeCursor *pBtCsr;
................................................................................
};

struct MergeWorker {
  lsm_db *pDb;                    /* Database handle */
  Level *pLevel;                  /* Worker snapshot Level being merged */
  MultiCursor *pCsr;              /* Cursor to read new segment contents from */
  int bFlush;                     /* True if this is an in-memory tree flush */

  Hierarchy aHier[2];





  Page *apPage[2];                /* Current output pages (0 is main run) */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static void assertAllPointersOk(int rc, lsm_db *pDb);
static void assertAllBtreesOk(int rc, lsm_db *);
................................................................................
    rc = segmentPtrReadData(
        pPtr, iOff, pPtr->nKey, &pPtr->pKey, &pPtr->blob1
    );
    if( rc==LSM_OK && rtIsWrite(pPtr->eType) ){
      rc = segmentPtrReadData(
          pPtr, iOff+pPtr->nKey, pPtr->nVal, &pPtr->pVal, &pPtr->blob2
      );
    }else{
      pPtr->nVal = 0;
      pPtr->pVal = 0;
    }
  }

  return rc;
}

void lsmSortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
................................................................................
         && segmentCursorValid(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT]) 
  ){
    segmentCursorValue(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT], ppVal, pnVal);
  }else{
    *ppVal = 0;
    *pnVal = 0;
  }
  assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
  return rc;
}

int lsmSortedLoadSystem(lsm_db *pDb){
  MultiCursor *pCsr = 0;          /* Cursor used to retreive free-list */
  int rc;                         /* Return Code */

................................................................................
    }
    *pnKey = nKey; 
  }
  return LSM_OK;
}

int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
  void *pVal;
  int nVal;
  int rc;

  assert( pCsr->aTree );
  assert( rtIsDelete(pCsr->eType)==0 || !(pCsr->flags & CURSOR_IGNORE_DELETE) );

  rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
  if( rc==LSM_OK ) rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
  if( rc==LSM_OK ){
    pVal = pCsr->val.pData;
  }else{
    pVal = 0;
    nVal = 0;
  }

  *ppVal = pVal;
  *pnVal = nVal;
  return rc;
}

int lsmMCursorType(MultiCursor *pCsr, int *peType){
  assert( pCsr->aTree );
  multiCursorGetKey(pCsr, pCsr->aTree[1], peType, 0, 0);
  return LSM_OK;
}
................................................................................


static int mergeWorkerWrite(
  MergeWorker *pMW,               /* Merge worker object to write into */
  int bSep,                       /* True to write to separators array */
  int eType,                      /* One of SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey, int nKey,           /* Key value */
  MultiCursor *pCsr,              /* Read value (if any) from here */
  int iPtr,                       /* Absolute value of page pointer, or 0 */
  int *piPtrOut                   /* OUT: Pointer to write to separators */
){
  int rc = LSM_OK;                /* Return code */
  Merge *pMerge;                  /* Persistent part of level merge state */
  int nHdr;                       /* Space required for this record header */
  Page *pPg;                      /* Page to write to */
................................................................................
  int nData;                      /* Size of buffer aData[] in bytes */
  int nRec;                       /* Number of records on page pPg */
  int iFPtr;                      /* Value of pointer in footer of pPg */
  int iRPtr;                      /* Value of pointer written into record */
  int iOff;                       /* Current write offset within page pPg */
  SortedRun *pRun;                /* Run being written to */
  int flags = 0;                  /* If != 0, flags value for page footer */
  void *pVal;
  int nVal;

  assert( bSep==0 || bSep==1 );
  assert( bSep==0 || rtIsSeparator(eType) );

  pMerge = pMW->pLevel->pMerge;    
  pRun = (bSep ? &pMW->pLevel->lhs.sep : &pMW->pLevel->lhs.run);

................................................................................
  ** The header space is:
  **
  **     1) record type - 1 byte.
  **     2) Page-pointer-offset - 1 varint
  **     3) Key size - 1 varint
  **     4) Value size - 1 varint (SORTED_WRITE only)
  */
  rc = lsmMCursorValue(pCsr, &pVal, &nVal);
  if( rc==LSM_OK ){
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->aiOutputOff[bSep];
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = iFPtr + (nRec ? pageGetRecordPtr(aData, nData, nRec-1) : 0);
      iRPtr = iPtr - iFPtr;
      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, bSep, iFPtr);
      pPg = pMW->apPage[bSep];
    }
  }

  /* If this record header will be the first on the page, and the page is 
  ** not the very first in the entire run, special actions may need to be 
  ** taken:
  **
  **   * If currently writing the main run, *piPtrOut should be set to
................................................................................
    if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal);   /* 4 */
    pMerge->aiOutputOff[bSep] = iOff;

    /* Write the key and data into the segment. */
    assert( iFPtr==pageGetPtr(aData, nData) );
    rc = mergeWorkerData(pMW, bSep, iFPtr+iRPtr, pKey, nKey);
    if( rc==LSM_OK && rtIsWrite(eType) ){
      if( rtTopic(eType)==0 ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK ){
        rc = mergeWorkerData(pMW, bSep, iFPtr+iRPtr, pVal, nVal);
      }
    }
  }

  return rc;
}


................................................................................

static int mergeWorkerStep(MergeWorker *pMW){
  lsm_db *pDb = pMW->pDb;       /* Database handle */
  MultiCursor *pCsr;            /* Cursor to read input data from */
  int rc = LSM_OK;              /* Return code */
  int eType;                    /* SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey; int nKey;         /* Key */

  Segment *pSeg;                /* Output segment */
  int iPtr = 0;

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
................................................................................
    if( segmentCursorValid(pPtrs)
     && 0==pDb->xCmp(pPtrs->aPtr[0].pKey, pPtrs->aPtr[0].nKey, pKey, nKey)
    ){
      iPtr = pPtrs->aPtr[0].iPtr+pPtrs->aPtr[0].iPgPtr;
    }
  }


  /* If this is a separator key and we know that the output pointer has not
  ** changed, there is no point in writing an output record. Otherwise,
  ** proceed. */
  if( rtIsSeparator(eType)==0 || iPtr!=0 ){
    int iSPtr = 0;                /* Separators require a pointer here */

    if( pMW->apPage[0]==0 ){
      rc = mergeWorkerFirstPage(pMW);
    }

    /* Write the record into the main run. */
    if( rc==LSM_OK ){



      rc = mergeWorkerWrite(pMW, 0, eType, pKey, nKey, pCsr, iPtr, &iSPtr);
    }

    /* If the call to mergeWorkerWrite() above started a new page, then
    ** add a SORTED_SEPARATOR key to the separators run.  */
#if 0
    if( rc==LSM_OK && iSPtr ){

................................................................................

  if( rc==LSM_OK && nPage>0 ){
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    int nWrite = 0;
    pDb->pWorker = lsmDbSnapshotWorker(pDb);
    rc = sortedWork(pDb, nPage, bOptimize, &nWrite);

#if 0
    if( nWrite && (flags & LSM_WORK_CHECKPOINT) ){
      int nHdrLevel = 0;
      if( rc==LSM_OK ) rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK ) rc = lsmSortedNewToplevel(pDb, &nHdrLevel);
      if( rc==LSM_OK ) rc = lsmDbUpdateClient(pDb, nHdrLevel);
    }
#endif

    lsmDbSnapshotRelease(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
    if( pnWrite ) *pnWrite = nWrite;
  }else if( pnWrite ){
    *pnWrite = 0;
  }