SQLite4
Check-in [0fcabb513b]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add LSM_CONFIG_NMERGE parameter. Add code to lsmtest so that lsm configurations can be specified in string form.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | embedded-btree
Files: files | file ages | folders
SHA1: 0fcabb513b0ae808e1ccbbb4dce24f3fcefc02e3
User & Date: dan 2012-06-26 15:43:01
Context
2012-06-26
18:10
Fix another problem with mmap and using a background thread. check-in: ec39163b79 user: dan tags: embedded-btree
15:43
Add LSM_CONFIG_NMERGE parameter. Add code to lsmtest so that lsm configurations can be specified in string form. check-in: 0fcabb513b user: dan tags: embedded-btree
2012-06-25
19:16
Fix another remapping related problem. check-in: 3e50bb0a22 user: dan tags: embedded-btree
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_main.c.

469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
....
1090
1091
1092
1093
1094
1095
1096

1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126


1127


1128
1129
1130
1131
1132
1133

1134
1135
1136
1137
1138
1139
1140
}


static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    int bMmap = 1;
    int nLimit = 2 * 1024 * 1024;
    int eSafety = 1;
    int bUseLog = 1;

    lsm_config(pLsm, LSM_CONFIG_WRITE_BUFFER, &nLimit);
    lsm_config(pLsm, LSM_CONFIG_SAFETY, &eSafety);
    lsm_config(pLsm, LSM_CONFIG_MMAP, &bMmap);
    lsm_config(pLsm, LSM_CONFIG_USE_LOG, &bUseLog);
  }
  return pLsm;
}

int do_speed_tests(int nArg, char **azArg){

  struct DbSystem {
................................................................................
  fclose(pInput);
  pEnv->xClose(pOut);

  return rc;
}

static int do_insert(int nArg, char **azArg){

  const char *zDb = "lsm";
  TestDb *pDb = 0;
  int i;
  int rc;
  const int nRow = 1 * 1000 * 1000;

  DatasourceDefn defn = { TEST_DATASOURCE_RANDOM, 8, 15, 80, 150 };
  Datasource *pData = 0;

  if( nArg>1 ){
    testPrintError("Usage: insert ?DATABASE?\n");
    return 1;
  }
  if( nArg==1 ){
    zDb = azArg[0];
  }

  testMallocUninstall(tdb_lsm_env());
  rc = tdb_open(zDb, 0, 1, &pDb);
  if( rc!=0 ){
    testPrintError("Error opening db \"%s\": %d\n", zDb, rc);
  }else{

    InsertWriteHook hook;
    memset(&hook, 0, sizeof(hook));
    hook.pOut = fopen("writelog.txt", "w");

    pData = testDatasourceNew(&defn);
    tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0);
    tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook);





    for(i=0; i<nRow; i++){
      void *pKey; int nKey;         /* Database key to insert */
      void *pVal; int nVal;         /* Database value to insert */

      testDatasourceEntry(pData, i, &pKey, &nKey, &pVal, &nVal);
      tdb_write(pDb, pKey, nKey, pVal, nVal);

    }

    testDatasourceFree(pData);
    tdb_close(pDb);
    flushHook(&hook);
    fclose(hook.pOut);
  }







|
<
<
<
<
<
<
<
<







 







>









|
|


|
|
<






<







>
>
|
>
>
|
|
|
<
|
|
>







469
470
471
472
473
474
475
476








477
478
479
480
481
482
483
....
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104

1105
1106
1107
1108
1109
1110

1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125

1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
}


static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    tdb_lsm_config_str(pDb, "mmap=1");








  }
  return pLsm;
}

int do_speed_tests(int nArg, char **azArg){

  struct DbSystem {
................................................................................
  fclose(pInput);
  pEnv->xClose(pOut);

  return rc;
}

static int do_insert(int nArg, char **azArg){
  const char *zConfig = 0;
  const char *zDb = "lsm";
  TestDb *pDb = 0;
  int i;
  int rc;
  const int nRow = 1 * 1000 * 1000;

  DatasourceDefn defn = { TEST_DATASOURCE_RANDOM, 8, 15, 80, 150 };
  Datasource *pData = 0;

  if( nArg>2 ){
    testPrintError("Usage: insert ?DATABASE? ?LSM-CONFIG?\n");
    return 1;
  }
  if( nArg==1 ){ zDb = azArg[0]; }
  if( nArg==2 ){ zConfig = azArg[1]; }


  testMallocUninstall(tdb_lsm_env());
  rc = tdb_open(zDb, 0, 1, &pDb);
  if( rc!=0 ){
    testPrintError("Error opening db \"%s\": %d\n", zDb, rc);
  }else{

    InsertWriteHook hook;
    memset(&hook, 0, sizeof(hook));
    hook.pOut = fopen("writelog.txt", "w");

    pData = testDatasourceNew(&defn);
    tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0);
    tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook);
    if( zConfig ){
      rc = test_lsm_config_str(tdb_lsm(pDb), zConfig);
    }

    if( rc==0 ){
      for(i=0; i<nRow; i++){
        void *pKey; int nKey;     /* Database key to insert */
        void *pVal; int nVal;     /* Database value to insert */

        testDatasourceEntry(pData, i, &pKey, &nKey, &pVal, &nVal);
        tdb_write(pDb, pKey, nKey, pVal, nVal);
      }
    }

    testDatasourceFree(pData);
    tdb_close(pDb);
    flushHook(&hook);
    fclose(hook.pOut);
  }

Changes to lsm-test/lsmtest_tdb3.c.

716
717
718
719
720
721
722
723
724
725
726
727
728
729
730

731

732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751

752
753
754
755
756
757
758
...
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905












































































906
907
908
909
910
911
912
#ifdef LSM_MUTEX_PTHREADS

static void *worker_main(void *pArg){
  LsmWorker *p = (LsmWorker *)pArg;
  lsm_db *pWorker;                /* Connection to access db through */

  pthread_mutex_lock(&p->worker_mutex);
  pWorker = p->pWorker;
  pthread_mutex_unlock(&p->worker_mutex);

  while( pWorker ){
    int nWrite = 0;
    int rc;

    /* Do some work. If an error occurs, exit. */

    rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);

    if( rc!=LSM_OK ){
      p->worker_rc = rc;
      break;
    }

    /* If the call to lsm_work() indicates that there is nothing more
    ** to do at this point, wait on the condition variable. The thread will
    ** wake up when it is signaled either because the client thread has
    ** flushed an in-memory tree into the db file or when the connection
    ** is being closed.  */
    if( nWrite==0 ){
      pthread_mutex_lock(&p->worker_mutex);
      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;
      pWorker = p->pWorker;
      pthread_mutex_unlock(&p->worker_mutex);
    }
  }

  
  return 0;
}


/*
** Signal worker thread iWorker that there may be work to do.
................................................................................

  if( rc==0 ){
    pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
    memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
    pDb->nWorker = nWorker;

    rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_CHECKPOINT, 
        nWorker==1 ? 32 : 0
    );
  }

  if( rc==0 && nWorker==2 ){
    rc = mt_start_worker(pDb, 1, zFilename, 0, 32);
  }

  return rc;
}

int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 1, bClear, ppDb);
}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 2, bClear, ppDb);
}













































































#else
static void mt_shutdown(LsmDb *pDb) { 
  unused_parameter(pDb); 
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  unused_parameter(zFilename);







|
<
<
<




>

>











<




<
<


>







 







|




|












>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







716
717
718
719
720
721
722
723



724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741

742
743
744
745


746
747
748
749
750
751
752
753
754
755
...
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
#ifdef LSM_MUTEX_PTHREADS

static void *worker_main(void *pArg){
  LsmWorker *p = (LsmWorker *)pArg;
  lsm_db *pWorker;                /* Connection to access db through */

  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){



    int nWrite = 0;
    int rc;

    /* Do some work. If an error occurs, exit. */
    pthread_mutex_unlock(&p->worker_mutex);
    rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);
    pthread_mutex_lock(&p->worker_mutex);
    if( rc!=LSM_OK ){
      p->worker_rc = rc;
      break;
    }

    /* If the call to lsm_work() indicates that there is nothing more
    ** to do at this point, wait on the condition variable. The thread will
    ** wake up when it is signaled either because the client thread has
    ** flushed an in-memory tree into the db file or when the connection
    ** is being closed.  */
    if( nWrite==0 ){

      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;


    }
  }
  pthread_mutex_unlock(&p->worker_mutex);
  
  return 0;
}


/*
** Signal worker thread iWorker that there may be work to do.
................................................................................

  if( rc==0 ){
    pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
    memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
    pDb->nWorker = nWorker;

    rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_CHECKPOINT, 
        nWorker==1 ? 512 : 0
    );
  }

  if( rc==0 && nWorker==2 ){
    rc = mt_start_worker(pDb, 1, zFilename, 0, 512);
  }

  return rc;
}

int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 1, bClear, ppDb);
}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 2, bClear, ppDb);
}

int test_lsm_config_str(lsm_db *pDb, const char *zStr){
  
  struct CfgParam {
    const char *zParam;
    int eParam;
  } aParam[] = {
    { "write_buffer",   LSM_CONFIG_WRITE_BUFFER },
    { "page_size",      LSM_CONFIG_PAGE_SIZE },
    { "safety",         LSM_CONFIG_SAFETY },
    { "autowork",       LSM_CONFIG_AUTOWORK },
    { "log_size",       LSM_CONFIG_LOG_SIZE },
    { "mmap",           LSM_CONFIG_MMAP },
    { "use_log",        LSM_CONFIG_USE_LOG },
    { "nmerge",         LSM_CONFIG_NMERGE },
    { 0, 0 }
  };
  char *z = zStr;

  while( z[0] && pDb ){
    char *zStart;

    /* Skip whitespace */
    while( *z==' ' ) z++;
    zStart = z;

    while( *z && *z!='=' ) z++;
    if( *z ){
      int iParam;
      int iVal;
      int rc;
      char zParam[32];
      int nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;

      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      rc = testArgSelect(aParam, "param", zParam, &iParam);
      if( rc!=0 ) return rc;
      iParam = aParam[iParam].eParam;

      z++;
      zStart = z;
      while( *z>='0' && *z<='9' ) z++;
      nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      iVal = atoi(zParam);

      lsm_config(pDb, iParam, &iVal);
    }else if( z!=zStart ){
      goto syntax_error;
    }
  }

  return 0;
 syntax_error:
  testPrintError("syntax error at: \"%s\"\n", z);
  return 1;
}

int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
  int rc = 0;
  if( tdb_lsm(pDb) ){
    int i;
    LsmDb *pLsm = (LsmDb *)pDb;

    rc = test_lsm_config_str(pLsm->db, zStr);
    for(i=0; rc==0 && i<pLsm->nWorker; i++){
      rc = test_lsm_config_str(pLsm->aWorker[i].pWorker, zStr);
    }
  }
  return rc;
}


#else
static void mt_shutdown(LsmDb *pDb) { 
  unused_parameter(pDb); 
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  unused_parameter(zFilename);

Changes to src/lsm.h.

159
160
161
162
163
164
165




166
167
168
169
170
171
172
173
174

175
176
177
178
179
180
181
**   LSM_CONFIG_MMAP
**     A read/write integer parameter. True to use mmap() to access the 
**     database file. False otherwise.
**
**   LSM_CONFIG_USE_LOG
**     A read/write boolean parameter. True (the default) to use the log
**     file normally. False otherwise.




*/
#define LSM_CONFIG_WRITE_BUFFER  1
#define LSM_CONFIG_PAGE_SIZE     2
#define LSM_CONFIG_SAFETY        3
#define LSM_CONFIG_BLOCK_SIZE    4
#define LSM_CONFIG_AUTOWORK      5
#define LSM_CONFIG_LOG_SIZE      6
#define LSM_CONFIG_MMAP          7
#define LSM_CONFIG_USE_LOG       8


#define LSM_SAFETY_OFF    0
#define LSM_SAFETY_NORMAL 1
#define LSM_SAFETY_FULL   2


/*







>
>
>
>









>







159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
**   LSM_CONFIG_MMAP
**     A read/write integer parameter. True to use mmap() to access the 
**     database file. False otherwise.
**
**   LSM_CONFIG_USE_LOG
**     A read/write boolean parameter. True (the default) to use the log
**     file normally. False otherwise.
**
**   LSM_CONFIG_NMERGE
**     A read/write integer parameter. The minimum number of segments to
**     merge together at a time. Default value 4.
*/
#define LSM_CONFIG_WRITE_BUFFER  1
#define LSM_CONFIG_PAGE_SIZE     2
#define LSM_CONFIG_SAFETY        3
#define LSM_CONFIG_BLOCK_SIZE    4
#define LSM_CONFIG_AUTOWORK      5
#define LSM_CONFIG_LOG_SIZE      6
#define LSM_CONFIG_MMAP          7
#define LSM_CONFIG_USE_LOG       8
#define LSM_CONFIG_NMERGE        9

#define LSM_SAFETY_OFF    0
#define LSM_SAFETY_NORMAL 1
#define LSM_SAFETY_FULL   2


/*

Changes to src/lsmInt.h.

42
43
44
45
46
47
48

49
50
51
52
53
54
55
...
168
169
170
171
172
173
174

175
176
177
178
179
180
181
*/
#define LSM_PAGE_SIZE   4096
#define LSM_BLOCK_SIZE  (2 * 1024 * 1024)
#define LSM_TREE_BYTES  (2 * 1024 * 1024)
#define LSM_ECOLA       4

#define LSM_DEFAULT_LOG_SIZE (128*1024)


/* Places where a NULL needs to be changed to a real lsm_env pointer
** are marked with NEED_ENV */
#define NEED_ENV ((lsm_env*)0)

/* Initial values for log file checksums. These are only used if the 
** database file does not contain a valid checkpoint.  */
................................................................................
  /* Database handle configuration */
  lsm_env *pEnv;                            /* runtime environment */
  int (*xCmp)(void *, int, void *, int);    /* Compare function */
  int nTreeLimit;                 /* Maximum size of in-memory tree in bytes */
  int bAutowork;                  /* True to do auto-work after writing */
  int eSafety;                    /* LSM_SAFETY_OFF, NORMAL or FULL */


  int nLogSz;                     /* Configured by LSM_CONFIG_LOG_SIZE */
  int bUseLog;                    /* Configured by LSM_CONFIG_USE_LOG */
  int nDfltPgsz;                  /* Configured by LSM_CONFIG_PAGE_SIZE */
  int nDfltBlksz;                 /* Configured by LSM_CONFIG_BLOCK_SIZE */

  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */







>







 







>







42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
...
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
*/
#define LSM_PAGE_SIZE   4096
#define LSM_BLOCK_SIZE  (2 * 1024 * 1024)
#define LSM_TREE_BYTES  (2 * 1024 * 1024)
#define LSM_ECOLA       4

#define LSM_DEFAULT_LOG_SIZE (128*1024)
#define LSM_DEFAULT_NMERGE   4

/* Places where a NULL needs to be changed to a real lsm_env pointer
** are marked with NEED_ENV */
#define NEED_ENV ((lsm_env*)0)

/* Initial values for log file checksums. These are only used if the 
** database file does not contain a valid checkpoint.  */
................................................................................
  /* Database handle configuration */
  lsm_env *pEnv;                            /* runtime environment */
  int (*xCmp)(void *, int, void *, int);    /* Compare function */
  int nTreeLimit;                 /* Maximum size of in-memory tree in bytes */
  int bAutowork;                  /* True to do auto-work after writing */
  int eSafety;                    /* LSM_SAFETY_OFF, NORMAL or FULL */

  int nMerge;                     /* Configured by LSM_CONFIG_NMERGE */
  int nLogSz;                     /* Configured by LSM_CONFIG_LOG_SIZE */
  int bUseLog;                    /* Configured by LSM_CONFIG_USE_LOG */
  int nDfltPgsz;                  /* Configured by LSM_CONFIG_PAGE_SIZE */
  int nDfltBlksz;                 /* Configured by LSM_CONFIG_BLOCK_SIZE */

  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */

Changes to src/lsm_main.c.

78
79
80
81
82
83
84

85
86
87
88
89
90
91
...
393
394
395
396
397
398
399







400
401
402
403
404
405
406
  pDb->nTreeLimit = LSM_TREE_BYTES;
  pDb->bAutowork = 1;
  pDb->eSafety = LSM_SAFETY_NORMAL;
  pDb->xCmp = xCmp;
  pDb->nLogSz = LSM_DEFAULT_LOG_SIZE;
  pDb->nDfltPgsz = LSM_PAGE_SIZE;
  pDb->nDfltBlksz = LSM_BLOCK_SIZE;

  pDb->bUseLog = 1;

  return LSM_OK;
}

lsm_env *lsm_get_env(lsm_db *pDb){
  assert( pDb->pEnv );
................................................................................
      int *piVal = va_arg(ap, int *);
      if( pDb->nTransOpen==0 && (*piVal==0 || *piVal==1) ){
        pDb->bUseLog = *piVal;
      }
      *piVal = pDb->bUseLog;
      break;
    }








    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);







>







 







>
>
>
>
>
>
>







78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
...
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
  pDb->nTreeLimit = LSM_TREE_BYTES;
  pDb->bAutowork = 1;
  pDb->eSafety = LSM_SAFETY_NORMAL;
  pDb->xCmp = xCmp;
  pDb->nLogSz = LSM_DEFAULT_LOG_SIZE;
  pDb->nDfltPgsz = LSM_PAGE_SIZE;
  pDb->nDfltBlksz = LSM_BLOCK_SIZE;
  pDb->nMerge = LSM_DEFAULT_NMERGE;
  pDb->bUseLog = 1;

  return LSM_OK;
}

lsm_env *lsm_get_env(lsm_db *pDb){
  assert( pDb->pEnv );
................................................................................
      int *piVal = va_arg(ap, int *);
      if( pDb->nTransOpen==0 && (*piVal==0 || *piVal==1) ){
        pDb->bUseLog = *piVal;
      }
      *piVal = pDb->bUseLog;
      break;
    }

    case LSM_CONFIG_NMERGE: {
      int *piVal = va_arg(ap, int *);
      if( *piVal>1 ) pDb->nMerge = *piVal;
      *piVal = pDb->nMerge;
      break;
    }

    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);

Changes to src/lsm_sorted.c.

3942
3943
3944
3945
3946
3947
3948
3949
3950
3951
3952
3953
3954
3955
3956
    Level *pLevel;
    Level *pTopLevel = lsmDbSnapshotLevel(pWorker);

    /* Find the longest contiguous run of levels not currently undergoing a 
    ** merge with the same age in the structure. Or the level being merged
    ** with the largest number of right-hand segments. Work on it.  */
    Level *pBest = 0;
    int nBest = 4;

    Level *pThis = 0;
    int nThis = 0;

    for(pLevel = pTopLevel; pLevel; pLevel=pLevel->pNext){
      if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
        nThis++;







|







3942
3943
3944
3945
3946
3947
3948
3949
3950
3951
3952
3953
3954
3955
3956
    Level *pLevel;
    Level *pTopLevel = lsmDbSnapshotLevel(pWorker);

    /* Find the longest contiguous run of levels not currently undergoing a 
    ** merge with the same age in the structure. Or the level being merged
    ** with the largest number of right-hand segments. Work on it.  */
    Level *pBest = 0;
    int nBest = pDb->nMerge;

    Level *pThis = 0;
    int nThis = 0;

    for(pLevel = pTopLevel; pLevel; pLevel=pLevel->pNext){
      if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
        nThis++;