Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix an lsm bug causing it to choose the wrong block to reuse. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
2ff461b42204376cbfda41046b94e1d8 |
User & Date: | dan 2013-01-17 19:13:34 |
Context
2013-01-18
| ||
10:46 | Truncate the database file when the number of connections drops from one to zero. check-in: dc4fa92596 user: dan tags: trunk | |
2013-01-17
| ||
19:13 | Fix an lsm bug causing it to choose the wrong block to reuse. check-in: 2ff461b422 user: dan tags: trunk | |
12:22 | Truncate away any free blocks at the end of the database file when the system is shutdown (last connection disconnects). check-in: 2351f01937 user: dan tags: trunk | |
Changes
Changes to lsm-test/lsmtest.h.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
** wrapper for Kyoto Cabinet. Kyoto cabinet has a C API, but ** the primary interface is the C++ API. */ int test_kc_open(const char *zFilename, int bClear, TestDb **ppDb); int test_kc_close(TestDb *); int test_kc_write(TestDb *, void *, int , void *, int); int test_kc_delete(TestDb *, void *, int); int test_kc_fetch(TestDb *, void *, int, void **, int *); int test_kc_scan(TestDb *, void *, int, void *, int, void *, int, void (*)(void *, void *, int , void *, int) ); int test_mdb_open(const char *zFilename, int bClear, TestDb **ppDb); int test_mdb_close(TestDb *); |
> |
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
** wrapper for Kyoto Cabinet. Kyoto cabinet has a C API, but
** the primary interface is the C++ API.
*/
int test_kc_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_kc_close(TestDb *);
int test_kc_write(TestDb *, void *, int , void *, int);
int test_kc_delete(TestDb *, void *, int);
int test_kc_delete_range(TestDb *, void *, int, void *, int);
int test_kc_fetch(TestDb *, void *, int, void **, int *);
int test_kc_scan(TestDb *, void *, int, void *, int, void *, int,
void (*)(void *, void *, int , void *, int)
);
int test_mdb_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_mdb_close(TestDb *);
|
Changes to lsm-test/lsmtest_tdb.c.
237
238
239
240
241
242
243
244
245
246
247
248
249
250
...
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
){
return test_kc_write(pTestDb, pKey, nKey, pVal, nVal);
}
static int kc_delete(TestDb *pTestDb, void *pKey, int nKey){
return test_kc_delete(pTestDb, pKey, nKey);
}
static int kc_fetch(
TestDb *pTestDb,
void *pKey,
int nKey,
void **ppVal,
int *pnVal
................................................................................
}
static int kc_open(const char *zFilename, int bClear, TestDb **ppDb){
static const DatabaseMethods KcdbMethods = {
kc_close,
kc_write,
kc_delete,
kc_fetch,
kc_scan,
error_transaction_function,
error_transaction_function,
error_transaction_function
};
|
>
>
>
>
>
>
>
>
>
|
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
...
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
|
){ return test_kc_write(pTestDb, pKey, nKey, pVal, nVal); } static int kc_delete(TestDb *pTestDb, void *pKey, int nKey){ return test_kc_delete(pTestDb, pKey, nKey); } static int kc_delete_range( TestDb *pTestDb, void *pKey1, int nKey1, void *pKey2, int nKey2 ){ return test_kc_delete_range(pTestDb, pKey1, nKey1, pKey2, nKey2); } static int kc_fetch( TestDb *pTestDb, void *pKey, int nKey, void **ppVal, int *pnVal ................................................................................ } static int kc_open(const char *zFilename, int bClear, TestDb **ppDb){ static const DatabaseMethods KcdbMethods = { kc_close, kc_write, kc_delete, kc_delete_range, kc_fetch, kc_scan, error_transaction_function, error_transaction_function, error_transaction_function }; |
Changes to lsm-test/lsmtest_tdb2.cc.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
int test_kc_delete(TestDb *pDb, void *pKey, int nKey){ KcDb *pKcDb = (KcDb *)pDb; int ok; ok = pKcDb->db->remove((const char *)pKey, nKey); return (ok ? 0 : 1); } int test_kc_fetch( TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal |
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
int test_kc_delete(TestDb *pDb, void *pKey, int nKey){ KcDb *pKcDb = (KcDb *)pDb; int ok; ok = pKcDb->db->remove((const char *)pKey, nKey); return (ok ? 0 : 1); } int test_kc_delete_range( TestDb *pDb, void *pKey1, int nKey1, void *pKey2, int nKey2 ){ int res; KcDb *pKcDb = (KcDb *)pDb; kyotocabinet::DB::Cursor* pCur = pKcDb->db->cursor(); if( pKey1 ){ res = pCur->jump((const char *)pKey1, nKey1); }else{ res = pCur->jump(); } while( 1 ){ const char *pKey; size_t nKey; const char *pVal; size_t nVal; pKey = pCur->get(&nKey, &pVal, &nVal); if( pKey==0 ) break; #ifndef NDEBUG if( pKey1 ){ res = memcmp(pKey, pKey1, MIN((size_t)nKey1, nKey)); assert( res>0 || (res==0 && nKey>nKey1) ); } #endif if( pKey2 ){ res = memcmp(pKey, pKey2, MIN((size_t)nKey2, nKey)); if( res>0 || (res==0 && (size_t)nKey2<nKey) ){ delete [] pKey; break; } } pCur->remove(); delete [] pKey; } delete pCur; return 0; } int test_kc_fetch( TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal |
Changes to src/lsm_main.c.
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
int rc;
/* Obtain the worker snapshot */
rc = infoGetWorker(pDb, &pWorker, &bUnlock);
if( rc!=LSM_OK ) return rc;
lsmStringInit(&s, pDb->pEnv);
rc = lsmWalkFreelist(pDb, 1, infoFreelistCb, &s);
if( rc!=LSM_OK ){
lsmFree(pDb->pEnv, s.z);
}else{
*pzOut = s.z;
}
/* Release the snapshot and return */
|
| |
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
int rc;
/* Obtain the worker snapshot */
rc = infoGetWorker(pDb, &pWorker, &bUnlock);
if( rc!=LSM_OK ) return rc;
lsmStringInit(&s, pDb->pEnv);
rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s);
if( rc!=LSM_OK ){
lsmFree(pDb->pEnv, s.z);
}else{
*pzOut = s.z;
}
/* Release the snapshot and return */
|
Changes to src/lsm_shared.c.
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
...
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
|
struct WalkFreelistCtx { lsm_db *pDb; int bReverse; Freelist *pFreelist; int iFree; int (*xUsr)(void *, int, i64); /* User callback function */ void *pUsrctx; /* User callback context */ }; /* ** Callback used by lsmWalkFreelist(). */ static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx; const int iDir = (p->bReverse ? -1 : 1); Freelist *pFree = p->pFreelist; if( pFree ){ while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){ FreelistEntry *pEntry = &pFree->aEntry[p->iFree]; if( (p->bReverse==0 && pEntry->iBlk>iBlk) || (p->bReverse!=0 && pEntry->iBlk<iBlk) ){ break; }else{ p->iFree += iDir; if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){ return 1; } if( pEntry->iBlk==iBlk ) return 0; } } } return p->xUsr(p->pUsrctx, iBlk, iSnapshot); } /* ** The database handle passed as the first argument must be the worker ** connection. This function iterates through the contents of the current ** free block list, invoking the supplied callback once for each list ** element. ................................................................................ if( ctx[0].pFreelist && bReverse ){ ctx[0].iFree = ctx[0].pFreelist->nEntry-1; }else{ ctx[0].iFree = 0; } ctx[0].xUsr = walkFreelistCb; ctx[0].pUsrctx = (void *)&ctx[1]; ctx[1].pDb = pDb; ctx[1].bReverse = bReverse; ctx[1].pFreelist = pDb->pFreelist; if( ctx[1].pFreelist && bReverse ){ ctx[1].iFree = ctx[1].pFreelist->nEntry-1; }else{ ctx[1].iFree = 0; } ctx[1].xUsr = x; ctx[1].pUsrctx = pCtx; rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]); for(iCtx=0; iCtx<2; iCtx++){ int i; WalkFreelistCtx *p = &ctx[iCtx]; for(i=p->iFree; p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0; i += iDir ){ FreelistEntry *pEntry = &p->pFreelist->aEntry[i]; if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){ return LSM_OK; } } } return rc; } |
>
>
>
|
>
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
|
>
|
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
...
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
|
struct WalkFreelistCtx { lsm_db *pDb; int bReverse; Freelist *pFreelist; int iFree; int (*xUsr)(void *, int, i64); /* User callback function */ void *pUsrctx; /* User callback context */ int bDone; /* Set to true after xUsr() returns true */ }; /* ** Callback used by lsmWalkFreelist(). */ static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx; const int iDir = (p->bReverse ? -1 : 1); Freelist *pFree = p->pFreelist; assert( p->bDone==0 ); if( pFree ){ while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){ FreelistEntry *pEntry = &pFree->aEntry[p->iFree]; if( (p->bReverse==0 && pEntry->iBlk>iBlk) || (p->bReverse!=0 && pEntry->iBlk<iBlk) ){ break; }else{ p->iFree += iDir; if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){ p->bDone = 1; return 1; } if( pEntry->iBlk==iBlk ) return 0; } } } if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){ p->bDone = 1; return 1; } return 0; } /* ** The database handle passed as the first argument must be the worker ** connection. This function iterates through the contents of the current ** free block list, invoking the supplied callback once for each list ** element. ................................................................................ if( ctx[0].pFreelist && bReverse ){ ctx[0].iFree = ctx[0].pFreelist->nEntry-1; }else{ ctx[0].iFree = 0; } ctx[0].xUsr = walkFreelistCb; ctx[0].pUsrctx = (void *)&ctx[1]; ctx[0].bDone = 0; ctx[1].pDb = pDb; ctx[1].bReverse = bReverse; ctx[1].pFreelist = pDb->pFreelist; if( ctx[1].pFreelist && bReverse ){ ctx[1].iFree = ctx[1].pFreelist->nEntry-1; }else{ ctx[1].iFree = 0; } ctx[1].xUsr = x; ctx[1].pUsrctx = pCtx; ctx[1].bDone = 0; rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]); if( ctx[0].bDone==0 ){ for(iCtx=0; iCtx<2; iCtx++){ int i; WalkFreelistCtx *p = &ctx[iCtx]; for(i=p->iFree; p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0; i += iDir ){ FreelistEntry *pEntry = &p->pFreelist->aEntry[i]; if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){ return LSM_OK; } } } } return rc; } |
Changes to src/lsm_sorted.c.
1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 .... 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 .... 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 |
return 1; } #endif static int segmentPtrSearchOversized( MultiCursor *pCsr, /* Cursor context */ SegmentPtr *pPtr, /* Pointer to seek */ void *pKey, int nKey /* Key to seek to */ ){ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; int rc = LSM_OK; /* If the OVERSIZED flag is set, then there is no pointer in the ** upper level to the next page in the segment that contains at least ................................................................................ pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1 ); /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current ** page. */ res = sortedKeyCompare( xCmp, iLastTopic, pLastKey, nLastKey, 0, pKey, nKey ); if( res>=0 ) break; /* Advance to the next page that contains at least one key. */ pNext = pPtr->pPg; lsmFsPageRef(pNext); while( 1 ){ ................................................................................ int iMax; Pgno iPtrOut = 0; /* If the current page contains an oversized entry, then there are no ** pointers to one or more of the subsequent pages in the sorted run. ** The following call ensures that the segment-ptr points to the correct ** page in this case. */ rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey); iPtrOut = pPtr->iPtr; /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all ** the keys it houses. |
> | | |
1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 .... 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 .... 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 |
return 1; } #endif static int segmentPtrSearchOversized( MultiCursor *pCsr, /* Cursor context */ SegmentPtr *pPtr, /* Pointer to seek */ int iTopic, /* Topic of key to search for */ void *pKey, int nKey /* Key to seek to */ ){ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; int rc = LSM_OK; /* If the OVERSIZED flag is set, then there is no pointer in the ** upper level to the next page in the segment that contains at least ................................................................................ pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1 ); /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current ** page. */ res = sortedKeyCompare( xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey ); if( res>=0 ) break; /* Advance to the next page that contains at least one key. */ pNext = pPtr->pPg; lsmFsPageRef(pNext); while( 1 ){ ................................................................................ int iMax; Pgno iPtrOut = 0; /* If the current page contains an oversized entry, then there are no ** pointers to one or more of the subsequent pages in the sorted run. ** The following call ensures that the segment-ptr points to the correct ** page in this case. */ rc = segmentPtrSearchOversized(pCsr, pPtr, iTopic, pKey, nKey); iPtrOut = pPtr->iPtr; /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all ** the keys it houses. |