SQLite4
Check-in [a983d57868]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix various glitches to do with iterating through b-tree structures.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | embedded-btree
Files: files | file ages | folders
SHA1: a983d57868ca4d329964c425ada129acf6658f73
User & Date: dan 2012-06-25 10:02:53
Context
2012-06-25
18:40
Fix a problem with stale pointers after remapping a file. check-in: 49cfd9043b user: dan tags: embedded-btree
10:02
Fix various glitches to do with iterating through b-tree structures. check-in: a983d57868 user: dan tags: embedded-btree
2012-06-23
20:22
Scan the embedded b-trees instead of the separator runs when merging in separator keys from a lower level. check-in: 90c0d5d8fe user: dan tags: embedded-btree
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_main.c.

181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
...
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
...
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
  void *pVal, int nVal
){
  ScanResult *p = (ScanResult *)pCtx;
  u8 *aKey = (u8 *)pKey;
  u8 *aVal = (u8 *)pVal;
  int i;

  if( test_scan_debug ) printf("%s ", (char *)pKey);

#if 0
  /* Check tdb_fetch() matches */
  int rc = 0;
  testFetch(p->pDb, pKey, nKey, pVal, nVal, &rc);
  assert( rc==0 );
#endif
................................................................................
    res1.bReverse = bReverse;
    res2.pDb = pDb2;
    res2.nKey1 = nKey1; res2.pKey1 = pKey1;
    res2.nKey2 = nKey2; res2.pKey2 = pKey2;
    res2.bReverse = bReverse;

    tdb_scan(pDb1, pRes1, bReverse, pKey1, nKey1, pKey2, nKey2, scanCompareCb);
if( test_scan_debug ) printf("\n");
    tdb_scan(pDb2, pRes2, bReverse, pKey1, nKey1, pKey2, nKey2, scanCompareCb);
if( test_scan_debug ) printf("\n");

    if( res1.nRow!=res2.nRow 
     || res1.cksum1!=res2.cksum1 
     || res1.cksum2!=res2.cksum2
    ){
      printf("expected: %d %X %X\n", res1.nRow, res1.cksum1, res1.cksum2);
      printf("got:      %d %X %X\n", res2.nRow, res2.cksum1, res2.cksum2);
................................................................................
      *pRc = rc;
    }
  }
  return pDb;
}

void testReopen(TestDb **ppDb, int *pRc){
  if( 0 && *pRc==0 ){
    const char *zLib;
    zLib = tdb_library_name(*ppDb);
    testClose(ppDb);
    *pRc = tdb_open(zLib, 0, 0, ppDb);
  }
}








|







 







|

|







 







|







181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
...
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
...
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
  void *pVal, int nVal
){
  ScanResult *p = (ScanResult *)pCtx;
  u8 *aKey = (u8 *)pKey;
  u8 *aVal = (u8 *)pVal;
  int i;

  if( test_scan_debug ) printf("%.20s\n", (char *)pKey);

#if 0
  /* Check tdb_fetch() matches */
  int rc = 0;
  testFetch(p->pDb, pKey, nKey, pVal, nVal, &rc);
  assert( rc==0 );
#endif
................................................................................
    res1.bReverse = bReverse;
    res2.pDb = pDb2;
    res2.nKey1 = nKey1; res2.pKey1 = pKey1;
    res2.nKey2 = nKey2; res2.pKey2 = pKey2;
    res2.bReverse = bReverse;

    tdb_scan(pDb1, pRes1, bReverse, pKey1, nKey1, pKey2, nKey2, scanCompareCb);
if( test_scan_debug ) printf("\n\n\n");
    tdb_scan(pDb2, pRes2, bReverse, pKey1, nKey1, pKey2, nKey2, scanCompareCb);
if( test_scan_debug ) printf("\n\n\n");

    if( res1.nRow!=res2.nRow 
     || res1.cksum1!=res2.cksum1 
     || res1.cksum2!=res2.cksum2
    ){
      printf("expected: %d %X %X\n", res1.nRow, res1.cksum1, res1.cksum2);
      printf("got:      %d %X %X\n", res2.nRow, res2.cksum1, res2.cksum2);
................................................................................
      *pRc = rc;
    }
  }
  return pDb;
}

void testReopen(TestDb **ppDb, int *pRc){
  if( *pRc==0 ){
    const char *zLib;
    zLib = tdb_library_name(*ppDb);
    testClose(ppDb);
    *pRc = tdb_open(zLib, 0, 0, ppDb);
  }
}

Changes to src/lsm_sorted.c.

693
694
695
696
697
698
699

700
701
702
703
704
705
706
...
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
...
938
939
940
941
942
943
944

945
946
947
948
949
950

951
952

953
954
955
956
957
958
959
....
1270
1271
1272
1273
1274
1275
1276
1277



1278
1279
1280
1281
1282
1283
1284
....
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
....
3506
3507
3508
3509
3510
3511
3512

3513
3514

3515
3516
3517
3518

3519
3520
3521
3522
3523
3524
3525
....
4450
4451
4452
4453
4454
4455
4456
4457
4458
4459
4460
4461
4462
4463
4464
static void btreeCursorFree(BtreeCursor *pCsr){
  if( pCsr ){
    int i;
    lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
    for(i=0; i<=pCsr->iPg; i++){
      lsmFsPageRelease(pCsr->aPg[i].pPage);
    }

    lsmFree(pEnv, pCsr->aPg);
    lsmFree(pEnv, pCsr);
  }
}

static int btreeCursorFirst(BtreeCursor *pCsr){
  int rc;
................................................................................
          int iMax;
          int iCell;

          aData = lsmFsPageData(pPg, &nData);
          assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );

          iLoad = pageGetPtr(aData, nData);
          iCell = pageGetNRec(aData, nData)-1; 
          iMax = iCell;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
            int iTopic;                   /* Topic for key pKeyT/nKeyT */
            int iPtr;                     /* Pointer for cell iTry */
................................................................................
      rc = lsmFsDbPageGet(pFS, pRun->iFirst, &pPg);
    }

    while( rc==LSM_OK ){
      Page *pNext;
      u8 *aData;
      int nData;


      rc = lsmFsDbPageNext(pRun, pPg, 1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
      if( pPg==0 ) break;
      aData = lsmFsPageData(pPg, &nData);

      if( rc==LSM_OK 
       && 0==(SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData)) 

       && 0!=pageGetNRec(aData, nData)
      ){
        u8 *pKey;
        int nKey;
        int iTopic;
        pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
        assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
................................................................................
  int bLast,                      /* True for last, false for first */
  int *pRc                        /* IN/OUT error code */
){
  if( *pRc==LSM_OK ){
    int rc = LSM_OK;

    segmentPtrEndPage(pCsr->pFS, pPtr, bLast, &rc);
    while( rc==LSM_OK && pPtr->pPg && pPtr->nCell==0 ){



      rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
    }
    if( rc==LSM_OK && pPtr->pPg ){
      rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
    }

    if( rc==LSM_OK && pPtr->pPg && (
................................................................................
  **
  **   * If currently writing the separators array, push a copy of the key
  **     into the b-tree hierarchy.
  */
  if( rc==LSM_OK && nRec==0 && pRun->iFirst!=pRun->iLast ){
    assert( pMerge->nSkip>=0 );

    if( pMW->bFlush==0 || bSep==0 ){
      Pgno iPg = lsmFsPageNumber(pPg);
      rc = mergeWorkerPushHierarchy(pMW, bSep, iPg, rtTopic(eType), pKey, nKey);
    }
    if( bSep==0 ){
      if( pMerge->nSkip ){
        pMerge->nSkip--;
        flags = PGFTR_SKIP_THIS_FLAG;
................................................................................
    if( rc==LSM_OK && pMW->bFlush ){
      rc = mergeWorkerBuildHierarchy(pMW);
    }
    if( rc==LSM_OK && pSeg->sep.iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, &pSeg->sep);
    }


    assert( LSM_OK==assertBtreeOk(pDb, &pSeg->run) );
    if( pMW->pCsr->pBtCsr ){

      assertPointersOk(
          pDb, &pMW->pLevel->lhs.run, &pMW->pLevel->pNext->lhs.run, 0, 0
      );
    }


    mergeWorkerShutdown(pMW);
  }
  return rc;
}

static int mergeWorkerDone(MergeWorker *pMW){
................................................................................

  lsmDbSnapshotRelease(pDb->pEnv, pRelease);
  return rc;
}

void sortedDumpSegment(lsm_db *pDb, SortedRun *pRun, int bVals){
  assert( pDb->xLog );
  if( pRun ){
    char *zSeg;
    Page *pPg;

    zSeg = segToString(pDb->pEnv, pRun, 0);
    lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
    lsmFree(pDb->pEnv, zSeg);








>







 







|
|







 







>






>

<
>







 







|
>
>
>







 







|







 







>


>




>







 







|







693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
...
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
...
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954

955
956
957
958
959
960
961
962
....
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
....
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
....
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
3528
3529
3530
3531
3532
3533
3534
....
4459
4460
4461
4462
4463
4464
4465
4466
4467
4468
4469
4470
4471
4472
4473
static void btreeCursorFree(BtreeCursor *pCsr){
  if( pCsr ){
    int i;
    lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
    for(i=0; i<=pCsr->iPg; i++){
      lsmFsPageRelease(pCsr->aPg[i].pPage);
    }
    sortedBlobFree(&pCsr->blob);
    lsmFree(pEnv, pCsr->aPg);
    lsmFree(pEnv, pCsr);
  }
}

static int btreeCursorFirst(BtreeCursor *pCsr){
  int rc;
................................................................................
          int iMax;
          int iCell;

          aData = lsmFsPageData(pPg, &nData);
          assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );

          iLoad = pageGetPtr(aData, nData);
          iCell = pageGetNRec(aData, nData); 
          iMax = iCell-1;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
            int iTopic;                   /* Topic for key pKeyT/nKeyT */
            int iPtr;                     /* Pointer for cell iTry */
................................................................................
      rc = lsmFsDbPageGet(pFS, pRun->iFirst, &pPg);
    }

    while( rc==LSM_OK ){
      Page *pNext;
      u8 *aData;
      int nData;
      int flags;

      rc = lsmFsDbPageNext(pRun, pPg, 1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
      if( pPg==0 ) break;
      aData = lsmFsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( rc==LSM_OK 

       && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
       && 0!=pageGetNRec(aData, nData)
      ){
        u8 *pKey;
        int nKey;
        int iTopic;
        pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
        assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
................................................................................
  int bLast,                      /* True for last, false for first */
  int *pRc                        /* IN/OUT error code */
){
  if( *pRc==LSM_OK ){
    int rc = LSM_OK;

    segmentPtrEndPage(pCsr->pFS, pPtr, bLast, &rc);
    while( rc==LSM_OK 
        && pPtr->pPg 
        && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
    ){
      rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
    }
    if( rc==LSM_OK && pPtr->pPg ){
      rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
    }

    if( rc==LSM_OK && pPtr->pPg && (
................................................................................
  **
  **   * If currently writing the separators array, push a copy of the key
  **     into the b-tree hierarchy.
  */
  if( rc==LSM_OK && nRec==0 && pRun->iFirst!=pRun->iLast ){
    assert( pMerge->nSkip>=0 );

    if( (bSep && pMW->bFlush==0) || (bSep==0 && pMerge->nSkip==0) ){
      Pgno iPg = lsmFsPageNumber(pPg);
      rc = mergeWorkerPushHierarchy(pMW, bSep, iPg, rtTopic(eType), pKey, nKey);
    }
    if( bSep==0 ){
      if( pMerge->nSkip ){
        pMerge->nSkip--;
        flags = PGFTR_SKIP_THIS_FLAG;
................................................................................
    if( rc==LSM_OK && pMW->bFlush ){
      rc = mergeWorkerBuildHierarchy(pMW);
    }
    if( rc==LSM_OK && pSeg->sep.iFirst ){
      rc = lsmFsSortedFinish(pDb->pFS, &pSeg->sep);
    }

#if 0
    assert( LSM_OK==assertBtreeOk(pDb, &pSeg->run) );
    if( pMW->pCsr->pBtCsr ){
      assert( LSM_OK==assertBtreeOk(pDb, &pMW->pLevel->pNext->lhs.run) );
      assertPointersOk(
          pDb, &pMW->pLevel->lhs.run, &pMW->pLevel->pNext->lhs.run, 0, 0
      );
    }
#endif

    mergeWorkerShutdown(pMW);
  }
  return rc;
}

static int mergeWorkerDone(MergeWorker *pMW){
................................................................................

  lsmDbSnapshotRelease(pDb->pEnv, pRelease);
  return rc;
}

void sortedDumpSegment(lsm_db *pDb, SortedRun *pRun, int bVals){
  assert( pDb->xLog );
  if( pRun && pRun->iFirst ){
    char *zSeg;
    Page *pPg;

    zSeg = segToString(pDb->pEnv, pRun, 0);
    lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
    lsmFree(pDb->pEnv, zSeg);