Index: lsm-test/README ================================================================== --- lsm-test/README +++ lsm-test/README @@ -26,9 +26,14 @@ The difference from lsmtest2.c is that this file tests live-recovery (recovery from a failure that occurs while other clients are still running) whereas lsmtest2.c tests recovery from a system or power failure. + + lsmtest9.c: More data tests. These focus on testing that calling + lsm_work(nMerge=1) to compact the database does not corrupt it. + In other words, that databases containing block-redirects + can be read and written. Index: lsm-test/lsmtest.h ================================================================== --- lsm-test/lsmtest.h +++ lsm-test/lsmtest.h @@ -217,10 +217,13 @@ void test_data_3(const char *, const char *, int *pRc); void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *); void testCaseProgress(int, int, int, int *); int testCaseNDot(void); +void testCompareDb(Datasource *, int, int, TestDb *, TestDb *, int *); +int testControlDb(TestDb **ppDb); + typedef struct CksumDb CksumDb; CksumDb *testCksumArrayNew(Datasource *, int, int, int); char *testCksumArrayGet(CksumDb *, int); void testCksumArrayFree(CksumDb *); void testCaseStart(int *pRc, char *zFmt, ...); @@ -232,10 +235,13 @@ int testCksumDatabase(TestDb *pDb, char *zOut); int testCountDatabase(TestDb *pDb); void testCompareInt(int, int, int *); void testCompareStr(const char *z1, const char *z2, int *pRc); +/* lsmtest9.c */ +void test_data_4(const char *, const char *, int *pRc); + /* ** Similar to the Tcl_GetIndexFromObjStruct() Tcl library function. */ #define testArgSelect(w,x,y,z) testArgSelectX(w,x,sizeof(w[0]),y,z) Index: lsm-test/lsmtest1.c ================================================================== --- lsm-test/lsmtest1.c +++ lsm-test/lsmtest1.c @@ -90,11 +90,11 @@ ); testFree(zData); return zRet; } -static int testControlDb(TestDb **ppDb){ +int testControlDb(TestDb **ppDb){ #ifdef HAVE_KYOTOCABINET return tdb_open("kyotocabinet", "tmp.db", 1, ppDb); #else return tdb_open("sqlite3", ":memory:", 1, ppDb); #endif @@ -346,11 +346,11 @@ } testFree(zName); } } -static void testCompareDb( +void testCompareDb( Datasource *pData, int nData, int iSeed, TestDb *pControl, TestDb *pDb, ADDED lsm-test/lsmtest9.c Index: lsm-test/lsmtest9.c ================================================================== --- /dev/null +++ lsm-test/lsmtest9.c @@ -0,0 +1,130 @@ + +#include "lsmtest.h" + +#define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE +#define DATA_RANDOM TEST_DATASOURCE_RANDOM + +typedef struct Datatest4 Datatest4; + +/* +** Test overview: +** +** 1. Insert (Datatest4.nRec) records into a database. +** +** 2. Repeat (Datatest4.nRepeat) times: +** +** 2a. Delete 2/3 of the records in the database. +** +** 2b. Run lsm_work(nMerge=1). +** +** 2c. Insert as many records as were deleted in 2a. +** +** 2d. Check database content is as expected. +** +** 2e. If (Datatest4.bReopen) is true, close and reopen the database. +*/ +struct Datatest4 { + /* Datasource definition */ + DatasourceDefn defn; + + int nRec; + int nRepeat; + int bReopen; +}; + +static void doDataTest4( + const char *zSystem, /* Database system to test */ + Datatest4 *p, /* Structure containing test parameters */ + int *pRc /* OUT: Error code */ +){ + lsm_db *db = 0; + TestDb *pDb; + TestDb *pControl; + Datasource *pData; + int i; + int rc = 0; + int iDot = 0; + + int nRecOn3 = (p->nRec / 3); + int iData = 0; + + /* Start the test case, open a database and allocate the datasource. */ + rc = testControlDb(&pControl); + pDb = testOpen(zSystem, 1, &rc); + pData = testDatasourceNew(&p->defn); + if( rc==0 ) db = tdb_lsm(pDb); + + testWriteDatasourceRange(pControl, pData, iData, nRecOn3*3, &rc); + testWriteDatasourceRange(pDb, pData, iData, nRecOn3*3, &rc); + + for(i=0; rc==0 && inRepeat; i++){ + + testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc); + testDeleteDatasourceRange(pDb, pData, iData, nRecOn3*2, &rc); + + if( db ){ + int nDone; + do { + nDone = 0; + rc = lsm_work(db, 1, 100000, &nDone); + }while( rc==0 && nDone>0 ); + } + + iData += (nRecOn3*2); + testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc); + testWriteDatasourceRange(pDb, pData, iData+nRecOn3, nRecOn3*2, &rc); + + testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc); + + /* If Datatest4.bReopen is true, close and reopen the database */ + if( p->bReopen ){ + testReopen(&pDb, &rc); + if( rc==0 ) db = tdb_lsm(pDb); + } + + /* Update the progress dots... */ + testCaseProgress(i, p->nRepeat, testCaseNDot(), &iDot); + } + + testClose(&pDb); + testClose(&pControl); + testDatasourceFree(pData); + testCaseFinish(rc); + *pRc = rc; +} + +static char *getName4(const char *zSystem, Datatest4 *pTest){ + char *zRet; + char *zData; + zData = testDatasourceName(&pTest->defn); + zRet = testMallocPrintf("data4.%s.%s.%d.%d.%d", + zSystem, zData, pTest->nRec, pTest->nRepeat, pTest->bReopen + ); + testFree(zData); + return zRet; +} + +void test_data_4( + const char *zSystem, /* Database system name */ + const char *zPattern, /* Run test cases that match this pattern */ + int *pRc /* IN/OUT: Error code */ +){ + Datatest4 aTest[] = { + /* defn, nRec, nRepeat, bReopen */ + { {DATA_RANDOM, 20,25, 100,200}, 10000, 10, 0 }, + { {DATA_RANDOM, 20,25, 100,200}, 10000, 10, 1 }, + }; + + int i; + + for(i=0; *pRc==LSM_OK && inBlocksize / pFS->nPagesize); int iBlk = fsPageToBlock(pFS, iPg); @@ -1442,10 +1442,15 @@ break; } rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk); iBlk = iNext; } + + if( pDel->pRedirect ){ + assert( pDel->pRedirect==&pSnapshot->redirect ); + pSnapshot->redirect.n = 0; + } if( bZero ) memset(pDel, 0, sizeof(Segment)); } return LSM_OK; } @@ -1613,11 +1618,11 @@ }else{ iPg--; } }else{ if( pRun ){ - Pgno iLast = fsRedirectPage(pFS, pRedir, pRun->iLastPg); + Pgno iLast = lsmFsRedirectPage(pFS, pRedir, pRun->iLastPg); if( iPg==iLast ){ *ppNext = 0; return LSM_OK; } } @@ -2612,12 +2617,12 @@ int iLastBlk; /* Last real block of segment */ int bLastIsLastOnBlock; /* True iLast is the last on its block */ iBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iFirst)); iLastBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iLastPg)); - iLast = fsRedirectPage(pFS, pRedir, pSeg->iLastPg); - iFirst = fsRedirectPage(pFS, pRedir, pSeg->iFirst); + iLast = lsmFsRedirectPage(pFS, pRedir, pSeg->iLastPg); + iFirst = lsmFsRedirectPage(pFS, pRedir, pSeg->iFirst); bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==iLast); assert( iBlk>0 ); /* If the first page of this run is also the first page of its first Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -835,10 +835,11 @@ if( p->iPg ){ lsm_env *pEnv = lsmFsEnv(pCsr->pFS); int iCell; /* Current cell number on leaf page */ Pgno iLeaf; /* Page number of current leaf page */ int nDepth; /* Depth of b-tree structure */ + Segment *pSeg = pCsr->pSeg; /* Decode the MergeInput structure */ iLeaf = p->iPg; nDepth = (p->iCell & 0x00FF); iCell = (p->iCell >> 8) - 1; @@ -851,21 +852,21 @@ if( rc==LSM_OK ){ Page **pp = &pCsr->aPg[nDepth-1].pPage; pCsr->iPg = nDepth-1; pCsr->nDepth = nDepth; pCsr->aPg[pCsr->iPg].iCell = iCell; - rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLeaf, pp); + rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp); } /* Populate any other aPg[] array entries */ if( rc==LSM_OK && nDepth>1 ){ Blob blob = {0,0,0}; void *pSeek; int nSeek; int iTopicSeek; int iPg = 0; - int iLoad = pCsr->pSeg->iRoot; + int iLoad = pSeg->iRoot; Page *pPg = pCsr->aPg[nDepth-1].pPage; if( pageObjGetNRec(pPg)==0 ){ /* This can happen when pPg is the right-most leaf in the b-tree. ** In this case, set the iTopicSeek/pSeek/nSeek key to a value @@ -874,18 +875,18 @@ iTopicSeek = 1000; pSeek = 0; nSeek = 0; }else{ Pgno dummy; - rc = pageGetBtreeKey(pCsr->pSeg, pPg, + rc = pageGetBtreeKey(pSeg, pPg, 0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob ); } do { Page *pPg; - rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pPg); + rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg); assert( rc==LSM_OK || pPg==0 ); if( rc==LSM_OK ){ u8 *aData; /* Buffer containing page data */ int nData; /* Size of aData[] in bytes */ int iMin; @@ -906,11 +907,11 @@ int iTopic; /* Topic for key pKeyT/nKeyT */ Pgno iPtr; /* Pointer for cell iTry */ int res; /* (pSeek - pKeyT) */ rc = pageGetBtreeKey( - pCsr->pSeg, pPg, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob + pSeg, pPg, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob ); if( rc!=LSM_OK ) break; res = sortedKeyCompare( xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey @@ -927,11 +928,13 @@ } pCsr->aPg[iPg].pPage = pPg; pCsr->aPg[iPg].iCell = iCell; iPg++; - assert( iPg!=nDepth-1 || iLoad==iLeaf ); + assert( iPg!=nDepth-1 + || lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf + ); } }while( rc==LSM_OK && iPg<(nDepth-1) ); sortedBlobFree(&blob); } @@ -949,11 +952,11 @@ int i; for(i=pCsr->iPg-1; i>=0; i--){ if( pCsr->aPg[i].iCell>0 ) break; } assert( i>=0 ); - rc = pageGetBtreeKey(pCsr->pSeg, + rc = pageGetBtreeKey(pSeg, pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1, &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob ); pCsr->eType |= LSM_SEPARATOR; @@ -4944,33 +4947,31 @@ } nRem -= nPg; if( nPg ) bDirty = 1; } - if( rc==LSM_OK && bDirty ){ - lsmFinishWork(pDb, 0, &rc); - }else{ - int rcdummy = LSM_BUSY; - lsmFinishWork(pDb, 0, &rcdummy); - } - assert( pDb->pWorker==0 ); - if( rc==LSM_OK ){ *pnWrite = (nMax - nRem); *pbCkpt = (bCkpt && nRem<=0); - if( nMerge==1 && pDb->nAutockpt>0 && bDirty + if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0 && pWorker->pLevel && pWorker->pLevel->nRight==0 && pWorker->pLevel->pNext==0 ){ *pbCkpt = 1; } + } + + if( rc==LSM_OK && bDirty ){ + lsmFinishWork(pDb, 0, &rc); }else{ + int rcdummy = LSM_BUSY; + lsmFinishWork(pDb, 0, &rcdummy); *pnWrite = 0; *pbCkpt = 0; } - + assert( pDb->pWorker==0 ); return rc; } static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){ int rc;