SQLite4
Check-in [2be9bac408]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Ensure that "expensive-assert" code only runs when LSM_DEBUG_EXPENSIVE is defined.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | embedded-btree
Files: files | file ages | folders
SHA1: 2be9bac40812771cfc499a011132eb83d9f6fbe2
User & Date: dan 2012-06-27 10:22:58
Context
2012-06-27
16:02
Testing finds no cases where separator runs are faster than embedded btrees. So discard the separator runs related code. check-in: afcbe561ff user: dan tags: embedded-btree
10:22
Ensure that "expensive-assert" code only runs when LSM_DEBUG_EXPENSIVE is defined. check-in: 2be9bac408 user: dan tags: embedded-btree
2012-06-26
20:39
Merge trunk changes. check-in: 209c12ae0b user: dan tags: embedded-btree
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_main.c.

463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
  if( nArg==1 ){
    zPattern = azArg[0];
  }

  do_crash_test(zPattern, &rc);
  return rc;
}


static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    tdb_lsm_config_str(pDb, "mmap=1 autowork=1 nmerge=4 worker_nmerge=4");
  }







<







463
464
465
466
467
468
469

470
471
472
473
474
475
476
  if( nArg==1 ){
    zPattern = azArg[0];
  }

  do_crash_test(zPattern, &rc);
  return rc;
}


static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    tdb_lsm_config_str(pDb, "mmap=1 autowork=1 nmerge=4 worker_nmerge=4");
  }

Changes to src/lsm_sorted.c.

323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
...
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
....
3561
3562
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
....
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
....
3949
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959
3960
3961
3962
3963
....
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
....
4677
4678
4679
4680
4681
4682
4683

4684
4685
4686
4687

4688
4689
4690
4691
4692

4693
4694
4695
4696
4697
4698
4699
4700
4701
4702
4703
4704
4705
4706
....
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780



4781
4782
4783


4784
4785

4786
4787
4788
4789
4790
4791


4792
4793
4794
4795
4796
4797
4798
4799
4800
4801
4802

4803

4804
4805

4806
4807

4808
4809
4810
4811
4812
4813
4814
4815
4816
4817
4818
4819
4820
4821
4822
4823
4824
4825
4826
4827
4828
4829
4830
4831
4832
4833
4834
4835
4836
4837
4838
4839
4840
4841
4842
4843
4844
4845
4846
4847
4848
4849
4850
4851
4852
4853
4854
4855
4856
4857

4858
4859
4860
4861
4862
4863
4864
4865
4866
4867
4868
4869
4870
4871
4872
4873
4874
4875
4876
4877
4878
4879
4880
4881
4882
4883
4884
4885
4886
4887
4888
4889
4890
4891
4892
4893
4894
4895
4896
4897
4898
4899
4900
4901
4902
4903
4904
4905
4906
4907
4908
4909
4910
4911
4912
4913
4914
4915
4916
4917
4918
4919
4920
4921
4922
4923
4924
4925



4926
4927
4928
4929
4930
4931
4932
4933










4934
4935
4936
4937
4938
4939
4940
4941
4942
4943
4944
4945
4946
4947
4948
4949
4950
4951

4952
4953
4954
4955


4956
4957

4958
4959
4960
4961
4962
4963
  int bFlush;                     /* True if this is an in-memory tree flush */
  Hierarchy aHier[2];
  Page *apPage[2];                /* Current output pages (0 is main run) */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static void assertAllPointersOk(int rc, lsm_db *pDb);
static void assertAllBtreesOk(int rc, lsm_db *);
#else
# define assertAllPointersOk(y, z)
# define assertAllBtreesOk(y, z)
#endif

struct FilePage { u8 *aData; int nData; };
static u8 *fsPageData(Page *pPg, int *pnData){
  *pnData = ((struct FilePage *)(pPg))->nData;
  return ((struct FilePage *)(pPg))->aData;
}
static u8 *fsPageDataPtr(Page *pPg){
  return ((struct FilePage *)(pPg))->aData;
}


static int assertPointersOk(lsm_db *, SortedRun *, SortedRun *, int, char **);
/*
** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
*/
void lsmPutU16(u8 *aOut, u16 nVal){
  aOut[0] = (u8)((nVal>>8) & 0xFF);
  aOut[1] = (u8)(nVal & 0xFF);
}
................................................................................
    pCsr->iPg = -1;
  }

  *ppCsr = pCsr;
  return rc;
}

#ifndef NDEBUG
static int assertBtreeOk(
  lsm_db *pDb,
  SortedRun *pRun
){
  int rc = LSM_OK;                /* Return code */
  if( pRun->iRoot ){
    Blob blob = {0, 0, 0};        /* Buffer used to cache overflow keys */
    FileSystem *pFS = pDb->pFS;   /* File system to read from */
    Page *pPg = 0;                /* Main run page */
    BtreeCursor *pCsr = 0;        /* Btree cursor */

    rc = btreeCursorNew(pDb, pRun, &pCsr);
    if( rc==LSM_OK ){
      rc = btreeCursorFirst(pCsr);
    }
    if( rc==LSM_OK ){
      rc = lsmFsDbPageGet(pFS, pRun->iFirst, &pPg);
    }

    while( rc==LSM_OK ){
      Page *pNext;
      u8 *aData;
      int nData;
      int flags;

      rc = lsmFsDbPageNext(pRun, pPg, 1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
      if( pPg==0 ) break;
      aData = fsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( rc==LSM_OK 
       && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
       && 0!=pageGetNRec(aData, nData)
      ){
        u8 *pKey;
        int nKey;
        int iTopic;
        pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
        assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
        assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
        rc = btreeCursorNext(pCsr);
      }
    }
    assert( rc!=LSM_OK || pCsr->pKey==0 );

    if( pPg ) lsmFsPageRelease(pPg);

    btreeCursorFree(pCsr);
    sortedBlobFree(&blob);
  }

  return rc;
}
#endif

static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
  lsmFsPageRelease(pPtr->pPg);
  if( pNext ){
    int nData;
    u8 *aData = fsPageData(pNext, &nData);
    pPtr->nCell = pageGetNRec(aData, nData);
    pPtr->flags = pageGetFlags(aData, nData);
................................................................................
    if( rc==LSM_OK && pMW->bFlush ){
      rc = mergeWorkerBuildHierarchy(pMW);
    }
    if( rc==LSM_OK && pSeg->sep.iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, &pSeg->sep);
    }

#if 1
    assert( LSM_OK==assertBtreeOk(pDb, &pSeg->run) );
    if( pMW->pCsr->pBtCsr ){
      assert( LSM_OK==assertBtreeOk(pDb, &pMW->pLevel->pNext->lhs.run) );
      assertPointersOk(
          pDb, &pMW->pLevel->lhs.run, &pMW->pLevel->pNext->lhs.run, 0, 0
      );
    }
#endif

    mergeWorkerShutdown(pMW);
  }
  return rc;
}
................................................................................
    lsmSortedNewToplevel(pDb, pnHdrLevel);
  }

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "tree flush");
#endif

  assertAllBtreesOk(rc, pDb);
  assertAllPointersOk(rc, pDb);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

  lsmFinishFlush(pDb, rc==LSM_OK);
  return rc;
}

/*
................................................................................
int sortedWork(lsm_db *pDb, int nWork, int bOptimize, int *pnWrite){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pWorker );
  assertAllPointersOk(rc, pDb);

  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;
  lsmDatabaseDirty(pDb);

  while( nRemaining>0 ){
    Level *pLevel;
    Level *pTopLevel = lsmDbSnapshotLevel(pWorker);
................................................................................
    }
  }

  if( pnWrite ){
    *pnWrite = (nWork - nRemaining);
  }

  assertAllBtreesOk(rc, pDb);
  assertAllPointersOk(rc, pDb);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

typedef struct Metric Metric;
struct Metric {
  double fAvgHeight;
................................................................................
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->pTreeCsr);
  }
}


/*
** Check that the array pOne contains the required pointers to pTwo.
** Array pTwo must be a main array. pOne may be either a separators array
** or another main array. 

**
** If an error is encountered, *pzErr is set to point to a buffer containing
** a nul-terminated error message and this function returns immediately. The
** caller should eventually call lsmFree(*pzErr) to free the allocated
** error message buffer.

*/
static int assertPointersOk(
  lsm_db *pDb,                    /* Database handle */
  SortedRun *pOne,                /* Run containing pointers */
  SortedRun *pTwo,                /* Run containing pointer targets */
  int bRhs,                       /* True if pTwo may have been Gobble()d */
  char **pzErr
){
  int rc = LSM_OK;                /* Error code */
  SegmentPtr ptr1;                /* Iterates through pOne */
  SegmentPtr ptr2;                /* Iterates through pTwo */
  Pgno iPrev;

  assert( pOne && pTwo );
................................................................................
  }

  segmentPtrReset(&ptr1);
  segmentPtrReset(&ptr2);
  return LSM_OK;
}


#ifdef LSM_DEBUG_EXPENSIVE

/*
** Argument iPg is a page number within a separators run. Assert() that for
** each key K on on the page, (pKey1 >= K > pKey2) is true. 



**
** Also, if page iPg is a BTREE page, call this function recursively to
** check that the keys on each child page fall into the expected range.


*/
static void assertBtreeRanges(

  lsm_db *pDb, 
  SortedRun *pRun, 
  Pgno iPg,                       /* Database page to load */
  void *pKey1, int nKey1,         /* All keys must be >= than this */
  void *pKey2, int nKey2          /* And < than this */
){


  Blob blob = {0, 0, 0};
  u8 *aData;
  int nData;
  Page *pPg;
  int rc;
  int i;
  int nRec;
  int flags;

  int iPrevTopic = 0;             /* Previous topic value */
  u8 *aPrev = 0;                  /* Buffer pointing to previous key */

  int nPrev = 0;                  /* Size of aPrev[] in bytes */


  rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg);

  assert( rc==LSM_OK );
  aData = fsPageData(pPg, &nData);


  nRec = pageGetNRec(aData, nData);
  flags = pageGetFlags(aData, nData);

  for(i=0; i<nRec; i++){
    u8 *aKey;
    int nKey;
    int iTopic;
    int iPtr;

    if( flags & SEGMENT_BTREE_FLAG ){
      aKey = pageGetCell(aData, nData, i);
      aKey += lsmVarintGet32(aKey, &iPtr);
      aKey += lsmVarintGet32(aKey, &nKey);
    }else{
      aKey = pageGetKey(pPg, i, &iTopic, &nKey, &blob);
    }

    assert( pKey1==0 || pDb->xCmp(aKey, nKey, pKey1, nKey1)>=0 );
    assert( pKey2==0 || pDb->xCmp(aKey, nKey, pKey2, nKey2)<0 );

    if( flags&SEGMENT_BTREE_FLAG ){
      assertBtreeRanges(pDb, pRun, iPtr, aPrev, nPrev, aKey, nKey);
    }
    aPrev = aKey;
    nPrev = nKey;
  }

  if( flags&SEGMENT_BTREE_FLAG ){
    int iRight = pageGetPtr(aData, nData);
    assertBtreeRanges(pDb, pRun, iRight, aPrev, nPrev, 0, 0);
  }

  lsmFsPageRelease(pPg);
  sortedBlobFree(&blob);
}

static int countBtreeKeys(FileSystem *pFS, SortedRun *pRun, Pgno iPg){
#if 0
  int rc;
  Page *pPg;
  u8 *aData;
  int nData;
  int flags;
  int nRet;

  rc = lsmFsDbPageGet(pFs, iPg, &pPg);
  assert( rc==LSM_OK );
  aData = lsmFsPageData(pPg, &nData);
  flags = pageGetFlags(aData, nData);


  if( flags & SEGMENT_BTREE_FLAG ){
    Pgno iRight;
    int nRec;
    int i;

    iRight = pageGetPtr(aData, nData);
    nRec = pageGetNRec(aData, nData);

    nRet = nRec;
    nRet += countBtreeKeys(pFS, pRun, iRight);
    for(i=0; i<nRec; i++){
      Pgno iPtr;
      u8 *aCell = pageGetCell(aData, nData, i);
      aCell += lsmVarintGet32(aCell, &iPtr);
      if( iPtr==0 ){
        lsmVarintGet32(aCell, &iPtr);
      }
      nRet += countBtreeKeys(pFS, pRun, iPtr);
    }
  }else{
    nRet = 0;
  }

  lsmFsPageRelease(pPg);
  return nRet;
#endif
  return 0;
}

static void assertBtreeSize(FileSystem *pFS, SortedRun *pRun, Pgno iRoot){
#if 0
  int nRun = 0;
  int nKey = 0;
  int rc;

  Page *pPg;

  rc = lsmFsDbPageGet(pFS, pRun->iFirst, &pPg);
  assert( rc==LSM_OK );
  while( pPg ){
    Page *pNext = 0;
    u8 *aData;
    int nData;
    int flags;
    int nRec;

    aData = lsmFsPageData(pPg, &nData);
    flags = pageGetFlags(aData, nData);
    nRec = pageGetNRec(aData, nData);

    if( (flags & SEGMENT_BTREE_FLAG)==0 && nRec ){
      nRun++;
    }

    rc = lsmFsDbPageNext(pPg, 1, &pNext);
    assert( rc==LSM_OK );
    lsmFsPageRelease(pPg);
    pPg = pNext;
  }

  nKey = countBtreeKeys(pFS, pRun, iRoot);
  assert( nRun==1+nKey );
#endif
}

static void assertAllBtreesOk(int rc, lsm_db *pDb){
#if 0



  if( rc==LSM_OK ){
    Level *p;
    for(p=pDb->pLevel; p; p=p->pNext){
      SortedRun *pSep = p->lhs.pSep;
      Pgno iRoot = pSep->iRoot;
      if( pSep && iRoot ){
        assertBtreeRanges(pDb, pSep, iRoot, 0, 0, 0, 0);
        assertBtreeSize(pDb->pFS, pSep, iRoot);










      }
    }
  }
#endif
}

/*
** This function is only useful for debugging. 
*/
static void assertAllPointersOk(int rc, lsm_db *pDb){
  assert( rc!=LSM_OK || pDb->pWorker );
  if( rc==LSM_OK ){
    Level *p;
    for(p=lsmDbSnapshotLevel(pDb->pWorker); p; p=p->pNext){
      int i;

      if( segmentHasSeparators(&p->lhs) ){
        assertPointersOk(pDb, &p->lhs.sep, &p->lhs.run, 0, 0);

      }
      for(i=0; i<p->nRight; i++){
        if( segmentHasSeparators(&p->aRhs[i]) ){
          assertPointersOk(pDb, &p->aRhs[i].sep, &p->aRhs[i].run, 1, 0);


        }
      }


    }
  }
}

#endif /* ifdef LSM_DEBUG_EXPENSIVE */







|
|
<
<
<











<
<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|
|
|
|
|
|
|







 







<
<







 







<







 







<
<







 







>

<
|
|
>

<
|
|
|
>





|
<







 







<
<
<

<
<
>
>
>

<
<
>
>

<
>
|
|
<
<
<

>
>
|
<
<
<
<
<
<
<
<
<
<
>
|
>

<
>
|
<
>
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
>
|
<
<
<
<

<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
|
|
|
|
<

<
<
<
<
<
<
<
<
|
<
|
|
<
<
<
<
<
<
<
<
<
>
>
>
|
<
<
<
<
<
<
<
>
>
>
>
>
>
>
>
>
>


<
<
<
<
<
<
<
<
|
<
<
<
<

<
<
>
|
<
<
<
>
>
|
|
>
|
<
<
<
<

323
324
325
326
327
328
329
330
331



332
333
334
335
336
337
338
339
340
341
342


343
344
345
346
347
348
349
...
915
916
917
918
919
920
921

























































922
923
924
925
926
927
928
....
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
....
3685
3686
3687
3688
3689
3690
3691


3692
3693
3694
3695
3696
3697
3698
....
3885
3886
3887
3888
3889
3890
3891

3892
3893
3894
3895
3896
3897
3898
....
4044
4045
4046
4047
4048
4049
4050


4051
4052
4053
4054
4055
4056
4057
....
4610
4611
4612
4613
4614
4615
4616
4617
4618

4619
4620
4621
4622

4623
4624
4625
4626
4627
4628
4629
4630
4631
4632

4633
4634
4635
4636
4637
4638
4639
....
4701
4702
4703
4704
4705
4706
4707



4708


4709
4710
4711
4712


4713
4714
4715

4716
4717
4718



4719
4720
4721
4722










4723
4724
4725
4726

4727
4728

4729
4730














































4731


4732
4733




4734

































4735

4736
4737
4738
4739

4740








4741

4742
4743









4744
4745
4746
4747







4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759








4760




4761


4762
4763



4764
4765
4766
4767
4768
4769




4770
  int bFlush;                     /* True if this is an in-memory tree flush */
  Hierarchy aHier[2];
  Page *apPage[2];                /* Current output pages (0 is main run) */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, SortedRun *, SortedRun *, int);
static int assertBtreeOk(lsm_db *, SortedRun *);



#endif

struct FilePage { u8 *aData; int nData; };
static u8 *fsPageData(Page *pPg, int *pnData){
  *pnData = ((struct FilePage *)(pPg))->nData;
  return ((struct FilePage *)(pPg))->aData;
}
static u8 *fsPageDataPtr(Page *pPg){
  return ((struct FilePage *)(pPg))->aData;
}



/*
** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
*/
void lsmPutU16(u8 *aOut, u16 nVal){
  aOut[0] = (u8)((nVal>>8) & 0xFF);
  aOut[1] = (u8)(nVal & 0xFF);
}
................................................................................
    pCsr->iPg = -1;
  }

  *ppCsr = pCsr;
  return rc;
}


























































static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
  lsmFsPageRelease(pPtr->pPg);
  if( pNext ){
    int nData;
    u8 *aData = fsPageData(pNext, &nData);
    pPtr->nCell = pageGetNRec(aData, nData);
    pPtr->flags = pageGetFlags(aData, nData);
................................................................................
    if( rc==LSM_OK && pMW->bFlush ){
      rc = mergeWorkerBuildHierarchy(pMW);
    }
    if( rc==LSM_OK && pSeg->sep.iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, &pSeg->sep);
    }

#ifdef LSM_DEBUG_EXPENSIVE
    if( rc==LSM_OK ){
      rc = assertBtreeOk(pDb, &pSeg->run);
      if( pMW->pCsr->pBtCsr ){
        SortedRun *pNext = &pMW->pLevel->pNext->lhs.run;
        rc = assertPointersOk(pDb, &pSeg->run, pNext, 0);
      }
    }
#endif

    mergeWorkerShutdown(pMW);
  }
  return rc;
}
................................................................................
    lsmSortedNewToplevel(pDb, pnHdrLevel);
  }

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "tree flush");
#endif



  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

  lsmFinishFlush(pDb, rc==LSM_OK);
  return rc;
}

/*
................................................................................
int sortedWork(lsm_db *pDb, int nWork, int bOptimize, int *pnWrite){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pWorker );


  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;
  lsmDatabaseDirty(pDb);

  while( nRemaining>0 ){
    Level *pLevel;
    Level *pTopLevel = lsmDbSnapshotLevel(pWorker);
................................................................................
    }
  }

  if( pnWrite ){
    *pnWrite = (nWork - nRemaining);
  }



  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

typedef struct Metric Metric;
struct Metric {
  double fAvgHeight;
................................................................................
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->pTreeCsr);
  }
}

#ifdef LSM_DEBUG_EXPENSIVE */
/*

** This function is only included in the build if LSM_DEBUG_EXPENSIVE is 
** defined. Its only purpose is to evaluate various assert() statements to 
** verify that the database is well formed in certain respects.
**

** More specifically, it checks that the array pOne contains the required 
** pointers to pTwo. Array pTwo must be a main array. pOne may be either a 
** separators array or another main array. If pOne does not contain the 
** correct set of pointers, an assert() statement fails.
*/
static int assertPointersOk(
  lsm_db *pDb,                    /* Database handle */
  SortedRun *pOne,                /* Run containing pointers */
  SortedRun *pTwo,                /* Run containing pointer targets */
  int bRhs                        /* True if pTwo may have been Gobble()d */

){
  int rc = LSM_OK;                /* Error code */
  SegmentPtr ptr1;                /* Iterates through pOne */
  SegmentPtr ptr2;                /* Iterates through pTwo */
  Pgno iPrev;

  assert( pOne && pTwo );
................................................................................
  }

  segmentPtrReset(&ptr1);
  segmentPtrReset(&ptr2);
  return LSM_OK;
}




/*


** This function is only included in the build if LSM_DEBUG_EXPENSIVE is 
** defined. Its only purpose is to evaluate various assert() statements to 
** verify that the database is well formed in certain respects.
**


** More specifically, it checks that the b-tree embedded in array pRun
** contains the correct keys. If not, an assert() fails.
*/

static int assertBtreeOk(
  lsm_db *pDb,
  SortedRun *pRun



){
  int rc = LSM_OK;                /* Return code */
  if( pRun->iRoot ){
    Blob blob = {0, 0, 0};        /* Buffer used to cache overflow keys */










    FileSystem *pFS = pDb->pFS;   /* File system to read from */
    Page *pPg = 0;                /* Main run page */
    BtreeCursor *pCsr = 0;        /* Btree cursor */


    rc = btreeCursorNew(pDb, pRun, &pCsr);
    if( rc==LSM_OK ){

      rc = btreeCursorFirst(pCsr);
    }














































    if( rc==LSM_OK ){


      rc = lsmFsDbPageGet(pFS, pRun->iFirst, &pPg);
    }






































    while( rc==LSM_OK ){

      Page *pNext;
      u8 *aData;
      int nData;
      int flags;










      rc = lsmFsDbPageNext(pRun, pPg, 1, &pNext);

      lsmFsPageRelease(pPg);
      pPg = pNext;









      if( pPg==0 ) break;
      aData = fsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( rc==LSM_OK 







       && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
       && 0!=pageGetNRec(aData, nData)
      ){
        u8 *pKey;
        int nKey;
        int iTopic;
        pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
        assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
        assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
        rc = btreeCursorNext(pCsr);
      }
    }








    assert( rc!=LSM_OK || pCsr->pKey==0 );







    if( pPg ) lsmFsPageRelease(pPg);




    btreeCursorFree(pCsr);
    sortedBlobFree(&blob);
  }

  return rc;
}




#endif /* ifdef LSM_DEBUG_EXPENSIVE */