#include "lsmtest_tdb.h" #include "lsm.h" #include "lsmtest.h" #include #include #include #ifndef _WIN32 # include #endif #include #ifndef _WIN32 # include #endif typedef struct LsmDb LsmDb; typedef struct LsmWorker LsmWorker; typedef struct LsmFile LsmFile; #define LSMTEST_DFLT_MT_MAX_CKPT (8*1024) #define LSMTEST_DFLT_MT_MIN_CKPT (2*1024) #ifdef LSM_MUTEX_PTHREADS #include #define LSMTEST_THREAD_CKPT 1 #define LSMTEST_THREAD_WORKER 2 #define LSMTEST_THREAD_WORKER_AC 3 /* ** There are several different types of worker threads that run in different ** test configurations, depending on the value of LsmWorker.eType. ** ** 1. Checkpointer. ** 2. Worker with auto-checkpoint. ** 3. Worker without auto-checkpoint. */ struct LsmWorker { LsmDb *pDb; /* Main database structure */ lsm_db *pWorker; /* Worker database handle */ pthread_t worker_thread; /* Worker thread */ pthread_cond_t worker_cond; /* Condition var the worker waits on */ pthread_mutex_t worker_mutex; /* Mutex used with worker_cond */ int bDoWork; /* Set to true by client when there is work */ int worker_rc; /* Store error code here */ int eType; /* LSMTEST_THREAD_XXX constant */ int bBlock; }; #else struct LsmWorker { int worker_rc; int bBlock; }; #endif static void mt_shutdown(LsmDb *); lsm_env *tdb_lsm_env(void){ static int bInit = 0; static lsm_env env; if( bInit==0 ){ memcpy(&env, lsm_default_env(), sizeof(env)); bInit = 1; } return &env; } typedef struct FileSector FileSector; typedef struct FileData FileData; struct FileSector { u8 *aOld; /* Old data for this sector */ }; struct FileData { int nSector; /* Allocated size of apSector[] array */ FileSector *aSector; /* Array of file sectors */ }; /* ** bPrepareCrash: ** If non-zero, the file wrappers maintain enough in-memory data to ** simulate the effect of a power-failure on the file-system (i.e. that ** unsynced sectors may be written, not written, or overwritten with ** arbitrary data when the crash occurs). ** ** bCrashed: ** Set to true after a crash is simulated. Once this variable is true, all ** VFS methods other than xClose() return LSM_IOERR as soon as they are ** called (without affecting the contents of the file-system). ** ** env: ** The environment object used by all lsm_db* handles opened by this ** object (i.e. LsmDb.db plus any worker connections). Variable env.pVfsCtx ** always points to the containing LsmDb structure. */ struct LsmDb { TestDb base; /* Base class - methods table */ lsm_env env; /* Environment used by connection db */ char *zName; /* Database file name */ lsm_db *db; /* LSM database handle */ lsm_cursor *pCsr; /* Cursor held open during read transaction */ void *pBuf; /* Buffer for tdb_fetch() output */ int nBuf; /* Allocated (not used) size of pBuf */ /* Crash testing related state */ int bCrashed; /* True once a crash has occurred */ int nAutoCrash; /* Number of syncs until a crash */ int bPrepareCrash; /* True to store writes in memory */ /* Unsynced data (while crash testing) */ int szSector; /* Assumed size of disk sectors (512B) */ FileData aFile[2]; /* Database and log file data */ /* Other test instrumentation */ int bNoRecovery; /* If true, assume DMS2 is locked */ /* Work hook redirection */ void (*xWork)(lsm_db *, void *); void *pWorkCtx; /* IO logging hook */ void (*xWriteHook)(void *, int, lsm_i64, int, int); void *pWriteCtx; /* Worker threads (for lsm_mt) */ int nMtMinCkpt; int nMtMaxCkpt; int eMode; int nWorker; LsmWorker *aWorker; }; #define LSMTEST_MODE_SINGLETHREAD 1 #define LSMTEST_MODE_BACKGROUND_CKPT 2 #define LSMTEST_MODE_BACKGROUND_WORK 3 #define LSMTEST_MODE_BACKGROUND_BOTH 4 /************************************************************************* ************************************************************************** ** Begin test VFS code. */ struct LsmFile { lsm_file *pReal; /* Real underlying file */ int bLog; /* True for log file. False for db file */ LsmDb *pDb; /* Database handle that uses this file */ }; static int testEnvFullpath( lsm_env *pEnv, /* Environment for current LsmDb */ const char *zFile, /* Relative path name */ char *zOut, /* Output buffer */ int *pnOut /* IN/OUT: Size of output buffer */ ){ lsm_env *pRealEnv = tdb_lsm_env(); return pRealEnv->xFullpath(pRealEnv, zFile, zOut, pnOut); } static int testEnvOpen( lsm_env *pEnv, /* Environment for current LsmDb */ const char *zFile, /* Name of file to open */ int flags, lsm_file **ppFile /* OUT: New file handle object */ ){ lsm_env *pRealEnv = tdb_lsm_env(); LsmDb *pDb = (LsmDb *)pEnv->pVfsCtx; int rc; /* Return Code */ LsmFile *pRet; /* The new file handle */ int nFile; /* Length of string zFile in bytes */ nFile = strlen(zFile); pRet = (LsmFile *)testMalloc(sizeof(LsmFile)); pRet->pDb = pDb; pRet->bLog = (nFile > 4 && 0==memcmp("-log", &zFile[nFile-4], 4)); rc = pRealEnv->xOpen(pRealEnv, zFile, flags, &pRet->pReal); if( rc!=LSM_OK ){ testFree(pRet); pRet = 0; } *ppFile = (lsm_file *)pRet; return rc; } static int testEnvRead(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; if( p->pDb->bCrashed ) return LSM_IOERR; return pRealEnv->xRead(p->pReal, iOff, pData, nData); } static int testEnvWrite(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; LsmDb *pDb = p->pDb; if( pDb->bCrashed ) return LSM_IOERR; if( pDb->bPrepareCrash ){ FileData *pData2 = &pDb->aFile[p->bLog]; int iFirst; int iLast; int iSector; iFirst = (int)(iOff / pDb->szSector); iLast = (int)((iOff + nData - 1) / pDb->szSector); if( pData2->nSector<(iLast+1) ){ int nNew = ( ((iLast + 1) + 63) / 64 ) * 64; assert( nNew>iLast ); pData2->aSector = (FileSector *)testRealloc( pData2->aSector, nNew*sizeof(FileSector) ); memset(&pData2->aSector[pData2->nSector], 0, (nNew - pData2->nSector) * sizeof(FileSector) ); pData2->nSector = nNew; } for(iSector=iFirst; iSector<=iLast; iSector++){ if( pData2->aSector[iSector].aOld==0 ){ u8 *aOld = (u8 *)testMalloc(pDb->szSector); pRealEnv->xRead( p->pReal, (lsm_i64)iSector*pDb->szSector, aOld, pDb->szSector ); pData2->aSector[iSector].aOld = aOld; } } } if( pDb->xWriteHook ){ int rc; int nUs; struct timeval t1; struct timeval t2; gettimeofday(&t1, 0); assert( nData>0 ); rc = pRealEnv->xWrite(p->pReal, iOff, pData, nData); gettimeofday(&t2, 0); nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec); pDb->xWriteHook(pDb->pWriteCtx, p->bLog, iOff, nData, nUs); return rc; } return pRealEnv->xWrite(p->pReal, iOff, pData, nData); } static void doSystemCrash(LsmDb *pDb); static int testEnvSync(lsm_file *pFile){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; LsmDb *pDb = p->pDb; FileData *pData = &pDb->aFile[p->bLog]; int i; if( pDb->bCrashed ) return LSM_IOERR; if( pDb->nAutoCrash ){ pDb->nAutoCrash--; if( pDb->nAutoCrash==0 ){ doSystemCrash(pDb); pDb->bCrashed = 1; return LSM_IOERR; } } if( pDb->bPrepareCrash ){ for(i=0; inSector; i++){ testFree(pData->aSector[i].aOld); pData->aSector[i].aOld = 0; } } if( pDb->xWriteHook ){ int rc; int nUs; struct timeval t1; struct timeval t2; gettimeofday(&t1, 0); rc = pRealEnv->xSync(p->pReal); gettimeofday(&t2, 0); nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec); pDb->xWriteHook(pDb->pWriteCtx, p->bLog, 0, 0, nUs); return rc; } return pRealEnv->xSync(p->pReal); } static int testEnvTruncate(lsm_file *pFile, lsm_i64 iOff){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; if( p->pDb->bCrashed ) return LSM_IOERR; return pRealEnv->xTruncate(p->pReal, iOff); } static int testEnvSectorSize(lsm_file *pFile){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; return pRealEnv->xSectorSize(p->pReal); } static int testEnvRemap( lsm_file *pFile, lsm_i64 iMin, void **ppOut, lsm_i64 *pnOut ){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; return pRealEnv->xRemap(p->pReal, iMin, ppOut, pnOut); } static int testEnvFileid( lsm_file *pFile, void *ppOut, int *pnOut ){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; return pRealEnv->xFileid(p->pReal, ppOut, pnOut); } static int testEnvClose(lsm_file *pFile){ lsm_env *pRealEnv = tdb_lsm_env(); LsmFile *p = (LsmFile *)pFile; pRealEnv->xClose(p->pReal); testFree(p); return LSM_OK; } static int testEnvUnlink(lsm_env *pEnv, const char *zFile){ lsm_env *pRealEnv = tdb_lsm_env(); unused_parameter(pEnv); return pRealEnv->xUnlink(pRealEnv, zFile); } static int testEnvLock(lsm_file *pFile, int iLock, int eType){ LsmFile *p = (LsmFile *)pFile; lsm_env *pRealEnv = tdb_lsm_env(); if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){ return LSM_BUSY; } return pRealEnv->xLock(p->pReal, iLock, eType); } static int testEnvTestLock(lsm_file *pFile, int iLock, int nLock, int eType){ LsmFile *p = (LsmFile *)pFile; lsm_env *pRealEnv = tdb_lsm_env(); if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){ return LSM_BUSY; } return pRealEnv->xTestLock(p->pReal, iLock, nLock, eType); } static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){ LsmFile *p = (LsmFile *)pFile; lsm_env *pRealEnv = tdb_lsm_env(); return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp); } static void testEnvShmBarrier(void){ } static int testEnvShmUnmap(lsm_file *pFile, int bDel){ LsmFile *p = (LsmFile *)pFile; lsm_env *pRealEnv = tdb_lsm_env(); return pRealEnv->xShmUnmap(p->pReal, bDel); } static int testEnvSleep(lsm_env *pEnv, int us){ lsm_env *pRealEnv = tdb_lsm_env(); return pRealEnv->xSleep(pRealEnv, us); } static void doSystemCrash(LsmDb *pDb){ lsm_env *pEnv = tdb_lsm_env(); int iFile; int iSeed = pDb->aFile[0].nSector + pDb->aFile[1].nSector; char *zFile = pDb->zName; char *zFree = 0; for(iFile=0; iFile<2; iFile++){ lsm_file *pFile = 0; int i; pEnv->xOpen(pEnv, zFile, 0, &pFile); for(i=0; iaFile[iFile].nSector; i++){ u8 *aOld = pDb->aFile[iFile].aSector[i].aOld; if( aOld ){ int iOpt = testPrngValue(iSeed++) % 3; switch( iOpt ){ case 0: break; case 1: testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4); /* Fall-through */ case 2: pEnv->xWrite( pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector ); break; } testFree(aOld); pDb->aFile[iFile].aSector[i].aOld = 0; } } pEnv->xClose(pFile); zFree = zFile = sqlite3_mprintf("%s-log", pDb->zName); } sqlite3_free(zFree); } /* ** End test VFS code. ************************************************************************** *************************************************************************/ /************************************************************************* ************************************************************************** ** Begin test compression hooks. */ #ifdef HAVE_ZLIB #include static int testZipBound(void *pCtx, int nSrc){ return compressBound(nSrc); } static int testZipCompress( void *pCtx, /* Context pointer */ char *aOut, int *pnOut, /* OUT: Buffer containing compressed data */ const char *aIn, int nIn /* Buffer containing input data */ ){ uLongf n = *pnOut; /* In/out buffer size for compress() */ int rc; /* compress() return code */ rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); *pnOut = n; return (rc==Z_OK ? 0 : LSM_ERROR); } static int testZipUncompress( void *pCtx, /* Context pointer */ char *aOut, int *pnOut, /* OUT: Buffer containing uncompressed data */ const char *aIn, int nIn /* Buffer containing input data */ ){ uLongf n = *pnOut; /* In/out buffer size for uncompress() */ int rc; /* uncompress() return code */ rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); *pnOut = n; return (rc==Z_OK ? 0 : LSM_ERROR); } static int testConfigureCompression(lsm_db *pDb){ static lsm_compress zip = { 0, /* Context pointer (unused) */ 1, /* Id value */ testZipBound, /* xBound method */ testZipCompress, /* xCompress method */ testZipUncompress /* xUncompress method */ }; return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip); } #endif /* ifdef HAVE_ZLIB */ /* ** End test compression hooks. ************************************************************************** *************************************************************************/ static int test_lsm_close(TestDb *pTestDb){ int i; int rc = LSM_OK; LsmDb *pDb = (LsmDb *)pTestDb; lsm_csr_close(pDb->pCsr); lsm_close(pDb->db); /* If this is a multi-threaded database, wait on the worker threads. */ mt_shutdown(pDb); for(i=0; inWorker && rc==LSM_OK; i++){ rc = pDb->aWorker[i].worker_rc; } for(i=0; iaFile[0].nSector; i++){ testFree(pDb->aFile[0].aSector[i].aOld); } testFree(pDb->aFile[0].aSector); for(i=0; iaFile[1].nSector; i++){ testFree(pDb->aFile[1].aSector[i].aOld); } testFree(pDb->aFile[1].aSector); memset(pDb, sizeof(LsmDb), 0x11); testFree((char *)pDb->pBuf); testFree((char *)pDb); return rc; } static void mt_signal_worker(LsmDb*, int); static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){ int nSleep = 0; int nKB; int rc; do { nKB = 0; rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nKB); if( rc!=LSM_OK || nKBnMtMaxCkpt ) break; #ifdef LSM_MUTEX_PTHREADS mt_signal_worker(pDb, (pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ? 0 : 1) ); #endif usleep(5000); nSleep += 5; }while( 1 ); #if 0 if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep); #endif return rc; } static int waitOnWorker(LsmDb *pDb){ int rc; int nLimit = -1; int nSleep = 0; rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit); do { int nOld, nNew, rc2; rc2 = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &nOld, &nNew); if( rc2!=LSM_OK ) return rc2; if( nOld==0 || nNew<(nLimit/2) ) break; #ifdef LSM_MUTEX_PTHREADS mt_signal_worker(pDb, 0); #endif usleep(5000); nSleep += 5; }while( 1 ); #if 0 if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep); #endif return rc; } static int test_lsm_write( TestDb *pTestDb, void *pKey, int nKey, void *pVal, int nVal ){ LsmDb *pDb = (LsmDb *)pTestDb; int rc = LSM_OK; if( pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ){ rc = waitOnCheckpointer(pDb, pDb->db); }else if( pDb->eMode==LSMTEST_MODE_BACKGROUND_WORK || pDb->eMode==LSMTEST_MODE_BACKGROUND_BOTH ){ rc = waitOnWorker(pDb); } if( rc==LSM_OK ){ rc = lsm_insert(pDb->db, pKey, nKey, pVal, nVal); } return rc; } static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){ LsmDb *pDb = (LsmDb *)pTestDb; return lsm_delete(pDb->db, pKey, nKey); } static int test_lsm_delete_range( TestDb *pTestDb, void *pKey1, int nKey1, void *pKey2, int nKey2 ){ LsmDb *pDb = (LsmDb *)pTestDb; return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2); } static int test_lsm_fetch( TestDb *pTestDb, void *pKey, int nKey, void **ppVal, int *pnVal ){ int rc; LsmDb *pDb = (LsmDb *)pTestDb; lsm_cursor *csr; if( pKey==0 ) return LSM_OK; rc = lsm_csr_open(pDb->db, &csr); if( rc!=LSM_OK ) return rc; rc = lsm_csr_seek(csr, pKey, nKey, LSM_SEEK_EQ); if( rc==LSM_OK ){ if( lsm_csr_valid(csr) ){ const void *pVal; int nVal; rc = lsm_csr_value(csr, &pVal, &nVal); if( nVal>pDb->nBuf ){ testFree(pDb->pBuf); pDb->pBuf = testMalloc(nVal*2); pDb->nBuf = nVal*2; } memcpy(pDb->pBuf, pVal, nVal); *ppVal = pDb->pBuf; *pnVal = nVal; }else{ *ppVal = 0; *pnVal = -1; } } lsm_csr_close(csr); return rc; } static int test_lsm_scan( TestDb *pTestDb, void *pCtx, int bReverse, void *pFirst, int nFirst, void *pLast, int nLast, void (*xCallback)(void *, void *, int , void *, int) ){ LsmDb *pDb = (LsmDb *)pTestDb; lsm_cursor *csr; int rc; rc = lsm_csr_open(pDb->db, &csr); if( rc!=LSM_OK ) return rc; if( bReverse ){ if( pLast ){ rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_LE); }else{ rc = lsm_csr_last(csr); } }else{ if( pFirst ){ rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_GE); }else{ rc = lsm_csr_first(csr); } } while( rc==LSM_OK && lsm_csr_valid(csr) ){ const void *pKey; int nKey; const void *pVal; int nVal; int cmp; lsm_csr_key(csr, &pKey, &nKey); lsm_csr_value(csr, &pVal, &nVal); if( bReverse && pFirst ){ cmp = memcmp(pFirst, pKey, MIN(nKey, nFirst)); if( cmp>0 || (cmp==0 && nFirst>nKey) ) break; }else if( bReverse==0 && pLast ){ cmp = memcmp(pLast, pKey, MIN(nKey, nLast)); if( cmp<0 || (cmp==0 && nLastpCsr==0 ) rc = lsm_csr_open(pDb->db, &pDb->pCsr); if( rc==LSM_OK && iLevel>1 ){ rc = lsm_begin(pDb->db, iLevel-1); } return rc; } static int test_lsm_commit(TestDb *pTestDb, int iLevel){ LsmDb *pDb = (LsmDb *)pTestDb; /* If iLevel==0, close any open read transaction */ if( iLevel==0 && pDb->pCsr ){ lsm_csr_close(pDb->pCsr); pDb->pCsr = 0; } /* If iLevel==0, close any open read transaction */ return lsm_commit(pDb->db, MAX(0, iLevel-1)); } static int test_lsm_rollback(TestDb *pTestDb, int iLevel){ LsmDb *pDb = (LsmDb *)pTestDb; /* If iLevel==0, close any open read transaction */ if( iLevel==0 && pDb->pCsr ){ lsm_csr_close(pDb->pCsr); pDb->pCsr = 0; } return lsm_rollback(pDb->db, MAX(0, iLevel-1)); } /* ** A log message callback registered with lsm connections. Prints all ** messages to stderr. */ static void xLog(void *pCtx, int rc, const char *z){ unused_parameter(rc); /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */ if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx); fprintf(stderr, "%s\n", z); fflush(stderr); } static void xWorkHook(lsm_db *db, void *pArg){ LsmDb *p = (LsmDb *)pArg; if( p->xWork ) p->xWork(db, p->pWorkCtx); } #define TEST_NO_RECOVERY -1 #define TEST_COMPRESSION -3 #define TEST_MT_MODE -2 #define TEST_MT_MIN_CKPT -4 #define TEST_MT_MAX_CKPT -5 int test_lsm_config_str( LsmDb *pLsm, lsm_db *db, int bWorker, const char *zStr, int *pnThread ){ struct CfgParam { const char *zParam; int bWorker; int eParam; } aParam[] = { { "autoflush", 0, LSM_CONFIG_AUTOFLUSH }, { "page_size", 0, LSM_CONFIG_PAGE_SIZE }, { "block_size", 0, LSM_CONFIG_BLOCK_SIZE }, { "safety", 0, LSM_CONFIG_SAFETY }, { "autowork", 0, LSM_CONFIG_AUTOWORK }, { "autocheckpoint", 0, LSM_CONFIG_AUTOCHECKPOINT }, { "mmap", 0, LSM_CONFIG_MMAP }, { "use_log", 0, LSM_CONFIG_USE_LOG }, { "automerge", 0, LSM_CONFIG_AUTOMERGE }, { "max_freelist", 0, LSM_CONFIG_MAX_FREELIST }, { "multi_proc", 0, LSM_CONFIG_MULTIPLE_PROCESSES }, { "worker_automerge", 1, LSM_CONFIG_AUTOMERGE }, { "test_no_recovery", 0, TEST_NO_RECOVERY }, { "bg_min_ckpt", 0, TEST_NO_RECOVERY }, { "mt_mode", 0, TEST_MT_MODE }, { "mt_min_ckpt", 0, TEST_MT_MIN_CKPT }, { "mt_max_ckpt", 0, TEST_MT_MAX_CKPT }, #ifdef HAVE_ZLIB { "compression", 0, TEST_COMPRESSION }, #endif { 0, 0 } }; const char *z = zStr; int nThread = 1; if( zStr==0 ) return 0; assert( db ); while( z[0] ){ const char *zStart; /* Skip whitespace */ while( *z==' ' ) z++; zStart = z; while( *z && *z!='=' ) z++; if( *z ){ int eParam; int i; int iVal; int iMul = 1; int rc; char zParam[32]; int nParam = z-zStart; if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error; memcpy(zParam, zStart, nParam); zParam[nParam] = '\0'; rc = testArgSelect(aParam, "param", zParam, &i); if( rc!=0 ) return rc; eParam = aParam[i].eParam; z++; zStart = z; while( *z>='0' && *z<='9' ) z++; if( *z=='k' || *z=='K' ){ iMul = 1; z++; }else if( *z=='M' || *z=='M' ){ iMul = 1024; z++; } nParam = z-zStart; if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error; memcpy(zParam, zStart, nParam); zParam[nParam] = '\0'; iVal = atoi(zParam) * iMul; if( eParam>0 ){ if( bWorker || aParam[i].bWorker==0 ){ lsm_config(db, eParam, &iVal); } }else{ switch( eParam ){ case TEST_NO_RECOVERY: if( pLsm ) pLsm->bNoRecovery = iVal; break; case TEST_MT_MODE: if( pLsm ) nThread = iVal; break; case TEST_MT_MIN_CKPT: if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal*1024; break; case TEST_MT_MAX_CKPT: if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal*1024; break; #ifdef HAVE_ZLIB case TEST_COMPRESSION: testConfigureCompression(db); break; #endif } } }else if( z!=zStart ){ goto syntax_error; } } if( pnThread ) *pnThread = nThread; if( pLsm && pLsm->nMtMaxCkpt < pLsm->nMtMinCkpt ){ pLsm->nMtMinCkpt = pLsm->nMtMaxCkpt; } return 0; syntax_error: testPrintError("syntax error at: \"%s\"\n", z); return 1; } int tdb_lsm_config_str(TestDb *pDb, const char *zStr){ int rc = 0; if( tdb_lsm(pDb) ){ #ifdef LSM_MUTEX_PTHREADS int i; #endif LsmDb *pLsm = (LsmDb *)pDb; rc = test_lsm_config_str(pLsm, pLsm->db, 0, zStr, 0); #ifdef LSM_MUTEX_PTHREADS for(i=0; rc==0 && inWorker; i++){ rc = test_lsm_config_str(0, pLsm->aWorker[i].pWorker, 1, zStr, 0); } #endif } return rc; } int tdb_lsm_configure(lsm_db *db, const char *zConfig){ return test_lsm_config_str(0, db, 0, zConfig, 0); } static int testLsmStartWorkers(LsmDb *, int, const char *, const char *); static int testLsmOpen( const char *zCfg, const char *zFilename, int bClear, TestDb **ppDb ){ static const DatabaseMethods LsmMethods = { test_lsm_close, test_lsm_write, test_lsm_delete, test_lsm_delete_range, test_lsm_fetch, test_lsm_scan, test_lsm_begin, test_lsm_commit, test_lsm_rollback }; int rc; int nFilename; LsmDb *pDb; /* If the bClear flag is set, delete any existing database. */ assert( zFilename); if( bClear ) testDeleteLsmdb(zFilename); nFilename = strlen(zFilename); pDb = (LsmDb *)testMalloc(sizeof(LsmDb) + nFilename + 1); memset(pDb, 0, sizeof(LsmDb)); pDb->base.pMethods = &LsmMethods; pDb->zName = (char *)&pDb[1]; memcpy(pDb->zName, zFilename, nFilename + 1); /* Default the sector size used for crash simulation to 512 bytes. ** Todo: There should be an OS method to obtain this value - just as ** there is in SQLite. For now, LSM assumes that it is smaller than ** the page size (default 4KB). */ pDb->szSector = 256; /* Default values for the mt_min_ckpt and mt_max_ckpt parameters. */ pDb->nMtMinCkpt = LSMTEST_DFLT_MT_MIN_CKPT; pDb->nMtMaxCkpt = LSMTEST_DFLT_MT_MAX_CKPT; memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env)); pDb->env.pVfsCtx = (void *)pDb; pDb->env.xFullpath = testEnvFullpath; pDb->env.xOpen = testEnvOpen; pDb->env.xRead = testEnvRead; pDb->env.xWrite = testEnvWrite; pDb->env.xTruncate = testEnvTruncate; pDb->env.xSync = testEnvSync; pDb->env.xSectorSize = testEnvSectorSize; pDb->env.xRemap = testEnvRemap; pDb->env.xFileid = testEnvFileid; pDb->env.xClose = testEnvClose; pDb->env.xUnlink = testEnvUnlink; pDb->env.xLock = testEnvLock; pDb->env.xTestLock = testEnvTestLock; pDb->env.xShmBarrier = testEnvShmBarrier; pDb->env.xShmMap = testEnvShmMap; pDb->env.xShmUnmap = testEnvShmUnmap; pDb->env.xSleep = testEnvSleep; rc = lsm_new(&pDb->env, &pDb->db); if( rc==LSM_OK ){ int nThread = 1; lsm_config_log(pDb->db, xLog, 0); lsm_config_work_hook(pDb->db, xWorkHook, (void *)pDb); rc = test_lsm_config_str(pDb, pDb->db, 0, zCfg, &nThread); if( rc==LSM_OK ) rc = lsm_open(pDb->db, zFilename); pDb->eMode = nThread; #ifdef LSM_MUTEX_PTHREADS if( rc==LSM_OK && nThread>1 ){ testLsmStartWorkers(pDb, nThread, zFilename, zCfg); } #endif if( rc!=LSM_OK ){ test_lsm_close((TestDb *)pDb); pDb = 0; } } *ppDb = (TestDb *)pDb; return rc; } int test_lsm_open( const char *zSpec, const char *zFilename, int bClear, TestDb **ppDb ){ return testLsmOpen(zSpec, zFilename, bClear, ppDb); } int test_lsm_small_open( const char *zSpec, const char *zFile, int bClear, TestDb **ppDb ){ const char *zCfg = "page_size=256 block_size=64 mmap=1024"; return testLsmOpen(zCfg, zFile, bClear, ppDb); } int test_lsm_lomem_open( const char *zSpec, const char *zFilename, int bClear, TestDb **ppDb ){ /* "max_freelist=4 autocheckpoint=32" */ const char *zCfg = "page_size=256 block_size=64 autoflush=16 " "autocheckpoint=32" "mmap=0 " ; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } int test_lsm_zip_open( const char *zSpec, const char *zFilename, int bClear, TestDb **ppDb ){ const char *zCfg = "page_size=256 block_size=64 autoflush=16 " "autocheckpoint=32 compression=1 mmap=0 " ; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } lsm_db *tdb_lsm(TestDb *pDb){ if( pDb->pMethods->xClose==test_lsm_close ){ return ((LsmDb *)pDb)->db; } return 0; } int tdb_lsm_multithread(TestDb *pDb){ int ret = 0; if( tdb_lsm(pDb) ){ ret = ((LsmDb*)pDb)->eMode!=LSMTEST_MODE_SINGLETHREAD; } return ret; } void tdb_lsm_enable_log(TestDb *pDb, int bEnable){ lsm_db *db = tdb_lsm(pDb); if( db ){ lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client"); } } void tdb_lsm_application_crash(TestDb *pDb){ if( tdb_lsm(pDb) ){ LsmDb *p = (LsmDb *)pDb; p->bCrashed = 1; } } void tdb_lsm_prepare_system_crash(TestDb *pDb){ if( tdb_lsm(pDb) ){ LsmDb *p = (LsmDb *)pDb; p->bPrepareCrash = 1; } } void tdb_lsm_system_crash(TestDb *pDb){ if( tdb_lsm(pDb) ){ LsmDb *p = (LsmDb *)pDb; p->bCrashed = 1; doSystemCrash(p); } } void tdb_lsm_safety(TestDb *pDb, int eMode){ assert( eMode==LSM_SAFETY_OFF || eMode==LSM_SAFETY_NORMAL || eMode==LSM_SAFETY_FULL ); if( tdb_lsm(pDb) ){ int iParam = eMode; LsmDb *p = (LsmDb *)pDb; lsm_config(p->db, LSM_CONFIG_SAFETY, &iParam); } } void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync){ assert( iSync>0 ); if( tdb_lsm(pDb) ){ LsmDb *p = (LsmDb *)pDb; p->nAutoCrash = iSync; p->bPrepareCrash = 1; } } void tdb_lsm_config_work_hook( TestDb *pDb, void (*xWork)(lsm_db *, void *), void *pWorkCtx ){ if( tdb_lsm(pDb) ){ LsmDb *p = (LsmDb *)pDb; p->xWork = xWork; p->pWorkCtx = pWorkCtx; } } void tdb_lsm_write_hook( TestDb *pDb, void (*xWrite)(void *, int, lsm_i64, int, int), void *pWriteCtx ){ if( tdb_lsm(pDb) ){ LsmDb *p = (LsmDb *)pDb; p->xWriteHook = xWrite; p->pWriteCtx = pWriteCtx; } } int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){ return testLsmOpen(zCfg, zDb, bClear, ppDb); } #ifdef LSM_MUTEX_PTHREADS /* ** Signal worker thread iWorker that there may be work to do. */ static void mt_signal_worker(LsmDb *pDb, int iWorker){ LsmWorker *p = &pDb->aWorker[iWorker]; pthread_mutex_lock(&p->worker_mutex); p->bDoWork = 1; pthread_cond_signal(&p->worker_cond); pthread_mutex_unlock(&p->worker_mutex); } /* ** This routine is used as the main() for all worker threads. */ static void *worker_main(void *pArg){ LsmWorker *p = (LsmWorker *)pArg; lsm_db *pWorker; /* Connection to access db through */ pthread_mutex_lock(&p->worker_mutex); while( (pWorker = p->pWorker) ){ int rc = LSM_OK; /* Do some work. If an error occurs, exit. */ pthread_mutex_unlock(&p->worker_mutex); if( p->eType==LSMTEST_THREAD_CKPT ){ int nKB = 0; rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nKB); if( rc==LSM_OK && nKB>=p->pDb->nMtMinCkpt ){ rc = lsm_checkpoint(pWorker, 0); } }else{ int nWrite; do { if( p->eType==LSMTEST_THREAD_WORKER ){ waitOnCheckpointer(p->pDb, pWorker); } nWrite = 0; rc = lsm_work(pWorker, 0, 256, &nWrite); if( p->eType==LSMTEST_THREAD_WORKER && nWrite ){ mt_signal_worker(p->pDb, 1); } }while( nWrite && p->pWorker ); } pthread_mutex_lock(&p->worker_mutex); if( rc!=LSM_OK && rc!=LSM_BUSY ){ p->worker_rc = rc; break; } /* The thread will wake up when it is signaled either because another ** thread has created some work for this one or because the connection ** is being closed. */ if( p->pWorker && p->bDoWork==0 ){ pthread_cond_wait(&p->worker_cond, &p->worker_mutex); } p->bDoWork = 0; } pthread_mutex_unlock(&p->worker_mutex); return 0; } static void mt_stop_worker(LsmDb *pDb, int iWorker){ LsmWorker *p = &pDb->aWorker[iWorker]; if( p->pWorker ){ void *pDummy; lsm_db *pWorker; /* Signal the worker to stop */ pthread_mutex_lock(&p->worker_mutex); pWorker = p->pWorker; p->pWorker = 0; pthread_cond_signal(&p->worker_cond); pthread_mutex_unlock(&p->worker_mutex); /* Join the worker thread. */ pthread_join(p->worker_thread, &pDummy); /* Free resources allocated in mt_start_worker() */ pthread_cond_destroy(&p->worker_cond); pthread_mutex_destroy(&p->worker_mutex); lsm_close(pWorker); } } static void mt_shutdown(LsmDb *pDb){ int i; for(i=0; inWorker; i++){ mt_stop_worker(pDb, i); } } /* ** This callback is invoked by LSM when the client database writes to ** the database file (i.e. to flush the contents of the in-memory tree). ** This implies there may be work to do on the database, so signal ** the worker threads. */ static void mt_client_work_hook(lsm_db *db, void *pArg){ LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */ /* Invoke the user level work-hook, if any. */ if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx); /* Wake up worker thread 0. */ mt_signal_worker(pDb, 0); } static void mt_worker_work_hook(lsm_db *db, void *pArg){ LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */ /* Invoke the user level work-hook, if any. */ if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx); } /* ** Launch worker thread iWorker for database connection pDb. */ static int mt_start_worker( LsmDb *pDb, /* Main database structure */ int iWorker, /* Worker number to start */ const char *zFilename, /* File name of database to open */ const char *zCfg, /* Connection configuration string */ int eType /* Type of worker thread */ ){ int rc = 0; /* Return code */ LsmWorker *p; /* Object to initialize */ assert( iWorkernWorker ); assert( eType==LSMTEST_THREAD_CKPT || eType==LSMTEST_THREAD_WORKER || eType==LSMTEST_THREAD_WORKER_AC ); p = &pDb->aWorker[iWorker]; p->eType = eType; p->pDb = pDb; /* Open the worker connection */ if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker); if( zCfg ){ test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0); } if( rc==0 ) rc = lsm_open(p->pWorker, zFilename); lsm_config_log(p->pWorker, xLog, (void *)"worker"); /* Configure the work-hook */ if( rc==0 ){ lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb); } if( eType==LSMTEST_THREAD_WORKER ){ test_lsm_config_str(0, p->pWorker, 1, "autocheckpoint=0", 0); } /* Kick off the worker thread. */ if( rc==0 ) rc = pthread_cond_init(&p->worker_cond, 0); if( rc==0 ) rc = pthread_mutex_init(&p->worker_mutex, 0); if( rc==0 ) rc = pthread_create(&p->worker_thread, 0, worker_main, (void *)p); return rc; } static int testLsmStartWorkers( LsmDb *pDb, int eModel, const char *zFilename, const char *zCfg ){ int rc; if( eModel<1 || eModel>4 ) return 1; if( eModel==1 ) return 0; /* Configure a work-hook for the client connection. Worker 0 is signalled ** every time the users connection writes to the database. */ lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb); /* Allocate space for two worker connections. They may not both be ** used, but both are allocated. */ pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * 2); memset(pDb->aWorker, 0, sizeof(LsmWorker) * 2); switch( eModel ){ case LSMTEST_MODE_BACKGROUND_CKPT: pDb->nWorker = 1; test_lsm_config_str(0, pDb->db, 0, "autocheckpoint=0", 0); rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_CKPT); break; case LSMTEST_MODE_BACKGROUND_WORK: pDb->nWorker = 1; test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0); rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER_AC); break; case LSMTEST_MODE_BACKGROUND_BOTH: pDb->nWorker = 2; test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0); rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER); if( rc==0 ){ rc = mt_start_worker(pDb, 1, zFilename, zCfg, LSMTEST_THREAD_CKPT); } break; } return rc; } int test_lsm_mt2( const char *zSpec, const char *zFilename, int bClear, TestDb **ppDb ){ const char *zCfg = "mt_mode=2"; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } int test_lsm_mt3( const char *zSpec, const char *zFilename, int bClear, TestDb **ppDb ){ const char *zCfg = "mt_mode=4"; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } #else static void mt_shutdown(LsmDb *pDb) { unused_parameter(pDb); } int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){ unused_parameter(zFilename); unused_parameter(bClear); unused_parameter(ppDb); testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n"); return 1; } #endif