Index: lsm-test/lsmtest2.c ================================================================== --- lsm-test/lsmtest2.c +++ lsm-test/lsmtest2.c @@ -289,12 +289,13 @@ /* Call lsm_work() on the db */ tdb_lsm_prepare_sync_crash(pDb, 1 + (i%(nWork*2))); for(iWork=0; testrc==0 && iWork0 ); + if( testrc==0 ) testrc = lsm_checkpoint(db, 0); } tdb_close(pDb); /* Check that the database content is still correct */ testCompareCksumLsmdb(DBNAME, testCksumArrayGet(pCksumDb, nRow), 0, pRc); Index: lsm-test/lsmtest8.c ================================================================== --- lsm-test/lsmtest8.c +++ lsm-test/lsmtest8.c @@ -14,11 +14,11 @@ #include "lsmtest.h" typedef struct SetupStep SetupStep; struct SetupStep { - int workflags; /* Flags to pass to lsm_work() */ + int bFlush; /* Flush to disk and checkpoint */ int iInsStart; /* First key-value from ds to insert */ int nIns; /* Number of rows to insert */ int iDelStart; /* First key from ds to delete */ int nDel; /* Number of rows to delete */ }; @@ -30,12 +30,24 @@ int *pRc ){ testWriteDatasourceRange(pDb, pData, pStep->iInsStart, pStep->nIns, pRc); testDeleteDatasourceRange(pDb, pData, pStep->iDelStart, pStep->nDel, pRc); if( *pRc==0 ){ + int nSave = -1; + int nBuf = 64; lsm_db *db = tdb_lsm(pDb); - *pRc = lsm_work(db, pStep->workflags, 0, 0); + + lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nSave); + lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nBuf); + lsm_begin(db, 1); + lsm_commit(db, 0); + lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nSave); + + *pRc = lsm_work(db, LSM_WORK_FLUSH, 0, 0); + if( *pRc==0 ){ + *pRc = lsm_checkpoint(db, 0); + } } } static void doSetupStepArray( TestDb *pDb, @@ -52,11 +64,11 @@ } static void setupDatabase1(TestDb *pDb, Datasource **ppData){ const SetupStep aStep[] = { { 0, 1, 2000, 0, 0 }, - { LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH, 0, 0, 0, 0 }, + { 1, 0, 0, 0, 0 }, { 0, 10001, 1000, 0, 0 }, }; const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 100, 500}; Datasource *pData; @@ -169,12 +181,16 @@ rc = 1; } testFree(pHdr); if( rc==0 ){ + int nBuf = 64; db = tdb_lsm(pDb); - rc = lsm_work(db, LSM_WORK_FLUSH|LSM_WORK_CHECKPOINT, 0, 0); + lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nBuf); + lsm_begin(db, 1); + lsm_commit(db, 0); + rc = lsm_work(db, LSM_WORK_FLUSH, 0, 0); } testCksumDatabase(pDb, zCksum2); testCompareStr(zCksum, zCksum2, &rc); } Index: lsm-test/lsmtest_func.c ================================================================== --- lsm-test/lsmtest_func.c +++ lsm-test/lsmtest_func.c @@ -13,11 +13,11 @@ lsm_db *pDb; int rc; int i; const char *zDb; - int flags = LSM_WORK_CHECKPOINT; + int flags = LSM_WORK_FLUSH; int nWork = (1<<30); if( nArg==0 ) goto usage; zDb = azArg[nArg-1]; for(i=0; i<(nArg-1); i++){ @@ -48,10 +48,13 @@ if( rc!=LSM_OK ){ testPrintError("lsm_work(): rc=%d\n", rc); } } } + if( rc==LSM_OK ){ + rc = lsm_checkpoint(pDb, 0); + } lsm_close(pDb); return rc; usage: Index: lsm-test/lsmtest_tdb3.c ================================================================== --- lsm-test/lsmtest_tdb3.c +++ lsm-test/lsmtest_tdb3.c @@ -27,10 +27,11 @@ int bDoWork; /* Set to true by client when there is work */ int worker_rc; /* Store error code here */ int lsm_work_flags; /* Flags to pass to lsm_work() */ int lsm_work_npage; /* nPage parameter to pass to lsm_work() */ + int bCkpt; /* True to call lsm_checkpoint() */ }; #else struct LsmWorker { int worker_rc; }; #endif @@ -596,11 +597,10 @@ unused_parameter(rc); /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */ if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx); fprintf(stderr, "%s\n", z); fflush(stderr); - } static void xWorkHook(lsm_db *db, void *pArg){ LsmDb *p = (LsmDb *)pArg; if( p->xWork ) p->xWork(db, p->pWorkCtx); @@ -624,10 +624,11 @@ { "write_buffer", 0, LSM_CONFIG_WRITE_BUFFER }, { "page_size", 0, LSM_CONFIG_PAGE_SIZE }, { "block_size", 0, LSM_CONFIG_BLOCK_SIZE }, { "safety", 0, LSM_CONFIG_SAFETY }, { "autowork", 0, LSM_CONFIG_AUTOWORK }, + { "autocheckpoint", 0, LSM_CONFIG_AUTOCHECKPOINT }, { "log_size", 0, LSM_CONFIG_LOG_SIZE }, { "mmap", 0, LSM_CONFIG_MMAP }, { "use_log", 0, LSM_CONFIG_USE_LOG }, { "nmerge", 0, LSM_CONFIG_NMERGE }, { "max_freelist", 0, LSM_CONFIG_MAX_FREELIST }, @@ -819,11 +820,11 @@ const char *zFilename, int bClear, TestDb **ppDb ){ const char *zCfg = - "page_size=256 block_size=65536 write_buffer=16384 max_freelist=4"; + "page_size=256 block_size=65536 write_buffer=16384 max_freelist=4 autocheckpoint=32768"; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } lsm_db *tdb_lsm(TestDb *pDb){ if( pDb->pMethods->xClose==test_lsm_close ){ @@ -930,37 +931,54 @@ lsm_db *pWorker; /* Connection to access db through */ pthread_mutex_lock(&p->worker_mutex); while( (pWorker = p->pWorker) ){ int nWrite = 0; - int rc; + int rc = LSM_OK; /* Do some work. If an error occurs, exit. */ pthread_mutex_unlock(&p->worker_mutex); - if( (p->lsm_work_flags & LSM_WORK_CHECKPOINT)==0 ){ - int nSleep = 0; - while( 1 ){ - int nByte = 0; - lsm_ckpt_size(pWorker, &nByte); - if( nByte<(32*1024*1024) ) break; - mt_signal_worker(p->pDb, 1); - usleep(1000); - nSleep++; - } + if( p->bCkpt==0 ){ + static const int nLimit = 16*1024*1024; + static const int nIncr = 4*1024*1024; + int nMax = 100; + int nByte = 0; + + lsm_ckpt_size(pWorker, &nByte); + if( nByte>nLimit ){ + int nSleep = 0; + while( nByte>nLimit ){ + nMax = nMax<<1; + nByte -= nIncr; + } + while( nSleeppDb, 1); + usleep(1000); + nSleep++; + } #if 0 - if( nSleep ) printf("nSleep=%d (worker)\n", nSleep); + if( nSleep ) printf("nSleep=%d/%d (worker)\n", nSleep, nMax); #endif + } } - rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite); + if( p->lsm_work_npage ){ + rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite); /* printf("# worked %d units\n", nWrite); */ + } + if( rc==LSM_OK && p->bCkpt ){ + rc = lsm_checkpoint(pWorker, 0); + } pthread_mutex_lock(&p->worker_mutex); + if( rc!=LSM_OK && rc!=LSM_BUSY ){ p->worker_rc = rc; break; } - if( nWrite && (p->lsm_work_flags & LSM_WORK_CHECKPOINT)==0 ){ + if( nWrite && p->bCkpt==0 ){ mt_signal_worker(p->pDb, 1); } /* If the call to lsm_work() indicates that there is nothing more ** to do at this point, wait on the condition variable. The thread will @@ -973,11 +991,10 @@ } p->bDoWork = 0; } } pthread_mutex_unlock(&p->worker_mutex); - /* printf("# worker EXIT\n"); */ return 0; } @@ -1043,27 +1060,35 @@ */ static int mt_start_worker( LsmDb *pDb, /* Main database structure */ int iWorker, /* Worker number to start */ const char *zFilename, /* File name of database to open */ + const char *zCfg, int flags, /* flags parameter to lsm_work() */ - int nPage /* nPage parameter to lsm_work() */ + int nPage, /* nPage parameter to lsm_work() */ + int bCkpt /* True to call lsm_checkpoint() */ ){ int rc = 0; /* Return code */ LsmWorker *p; /* Object to initialize */ assert( iWorkernWorker ); p = &pDb->aWorker[iWorker]; p->lsm_work_flags = flags; p->lsm_work_npage = nPage; + p->bCkpt = bCkpt; p->pDb = pDb; /* Open the worker connection */ if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker); + if( zCfg ){ + test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0); + } if( rc==0 ) rc = lsm_open(p->pWorker, zFilename); +#if 0 lsm_config_log(p->pWorker, xLog, (void *)"worker"); +#endif /* Configure the work-hook */ if( rc==0 ){ lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb); } @@ -1090,16 +1115,17 @@ pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker); memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker); pDb->nWorker = nWorker; if( nWorker==1 ){ - int flags = LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH; - rc = mt_start_worker(pDb, 0, zFilename, flags, 2048); + int flags = LSM_WORK_FLUSH; + rc = mt_start_worker(pDb, 0, zFilename, zCfg, flags, 2048, 1); }else{ - rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_FLUSH, 1024); + int flags = LSM_WORK_FLUSH; + rc = mt_start_worker(pDb, 0, zFilename, zCfg, flags, 1024, 0); if( rc==LSM_OK ){ - rc = mt_start_worker(pDb, 1, zFilename, LSM_WORK_CHECKPOINT, 0); + rc = mt_start_worker(pDb, 1, zFilename, zCfg, 0, 0, 1); } } return rc; } @@ -1129,17 +1155,17 @@ if( rc==0 ){ pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker); memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker); pDb->nWorker = nWorker; - rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_CHECKPOINT, - nWorker==1 ? 512 : 0 + rc = mt_start_worker(pDb, 0, zFilename, 0, LSM_WORK_FLUSH, + nWorker==1 ? 512 : 0, 1 ); } if( rc==0 && nWorker==2 ){ - rc = mt_start_worker(pDb, 1, zFilename, 0, 512); + rc = mt_start_worker(pDb, 1, zFilename, 0, 0, 512, 0); } return rc; } Index: src/kvlsm.c ================================================================== --- src/kvlsm.c +++ src/kvlsm.c @@ -387,11 +387,11 @@ *(int*)pArg = nWrite; break; } case SQLITE4_KVCTRL_LSM_CHECKPOINT: { - lsm_work(p->pDb, LSM_WORK_CHECKPOINT, 0, 0); + lsm_checkpoint(p->pDb, 0); break; } default: Index: src/lsm.h ================================================================== --- src/lsm.h +++ src/lsm.h @@ -165,10 +165,13 @@ ** contains all successfully committed transactions. ** ** LSM_CONFIG_AUTOWORK ** A read/write integer parameter. ** +** LSM_CONFIG_AUTOCHECKPOINT +** A read/write integer parameter. +** ** LSM_CONFIG_MMAP ** A read/write integer parameter. True to use mmap() to access the ** database file. False otherwise. ** ** LSM_CONFIG_USE_LOG @@ -199,10 +202,11 @@ #define LSM_CONFIG_MMAP 7 #define LSM_CONFIG_USE_LOG 8 #define LSM_CONFIG_NMERGE 9 #define LSM_CONFIG_MAX_FREELIST 10 #define LSM_CONFIG_MULTIPLE_PROCESSES 11 +#define LSM_CONFIG_AUTOCHECKPOINT 12 #define LSM_SAFETY_OFF 0 #define LSM_SAFETY_NORMAL 1 #define LSM_SAFETY_FULL 2 @@ -418,35 +422,43 @@ ** Describe the race condition this function is subject to. Or remove ** it somehow. */ int lsm_ckpt_size(lsm_db *, int *pnByte); - /* ** This function is called by a thread to work on the database structure. ** The actual operations performed by this function depend on the value ** passed as the "flags" parameter: ** ** LSM_WORK_FLUSH: ** Attempt to flush the contents of the in-memory tree to disk. ** -** LSM_WORK_CHECKPOINT: -** Write a checkpoint (if one exists in memory) to the database file. -** ** LSM_WORK_OPTIMIZE: ** If nMerge suitable arrays cannot be found, where nMerge is as ** configured by LSM_CONFIG_NMERGE, merge together any arrays that ** can be found. This is usually used to optimize the database by ** merging the whole thing into one big array. */ int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite); #define LSM_WORK_FLUSH 0x00000001 -#define LSM_WORK_CHECKPOINT 0x00000002 -#define LSM_WORK_OPTIMIZE 0x00000004 +#define LSM_WORK_OPTIMIZE 0x00000002 + +/* +** Attempt to checkpoint the current database snapshot. Return an LSM +** error code if an error occurs or LSM_OK otherwise. +** +** If the current snapshot has already been checkpointed, calling this +** function is a no-op. In this case if pnByte is not NULL, *pnByte is +** set to 0. Or, if the current snapshot is successfully checkpointed +** by this function and pbCkpt is not NULL, *pnByte is set to the number +** of bytes written to the database file since the previous checkpoint +** (the same measure as returned by lsm_ckpt_size()). +*/ +int lsm_checkpoint(lsm_db *pDb, int *pnByte); -/* +/* ** Open and close a database cursor. */ int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr); int lsm_csr_close(lsm_cursor *pCsr); Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -40,20 +40,16 @@ /* ** Default values for various data structure parameters. These may be ** overridden by calls to lsm_config(). */ -#define LSM_PAGE_SIZE 4096 -#define LSM_BLOCK_SIZE (2 * 1024 * 1024) -#define LSM_TREE_BYTES (2 * 1024 * 1024) - -#define LSM_DEFAULT_LOG_SIZE (128*1024) -#define LSM_DEFAULT_NMERGE 4 - -/* Places where a NULL needs to be changed to a real lsm_env pointer -** are marked with NEED_ENV */ -#define NEED_ENV ((lsm_env*)0) +#define LSM_DFLT_PAGE_SIZE (4 * 1024) +#define LSM_DFLT_BLOCK_SIZE (2 * 1024 * 1024) +#define LSM_DFLT_WRITE_BUFFER (2 * 1024 * 1024) +#define LSM_DFLT_AUTOCHECKPOINT (4 * 1024 * 1024) +#define LSM_DFLT_LOG_SIZE (128*1024) +#define LSM_DFLT_NMERGE 4 /* Initial values for log file checksums. These are only used if the ** database file does not contain a valid checkpoint. */ #define LSM_CKSUM0_INIT 42 #define LSM_CKSUM1_INIT 42 @@ -292,10 +288,11 @@ int bUseLog; /* Configured by LSM_CONFIG_USE_LOG */ int nDfltPgsz; /* Configured by LSM_CONFIG_PAGE_SIZE */ int nDfltBlksz; /* Configured by LSM_CONFIG_BLOCK_SIZE */ int nMaxFreelist; /* Configured by LSM_CONFIG_MAX_FREELIST */ int bMmap; /* Configured by LSM_CONFIG_MMAP */ + int nAutockpt; /* Configured by LSM_CONFIG_AUTOCHECKPOINT */ int bMultiProc; /* Configured by L_C_MULTIPLE_PROCESSES */ /* Sub-system handles */ FileSystem *pFS; /* On-disk portion of database */ Database *pDatabase; /* Database shared data */ @@ -482,11 +479,11 @@ #define LSM_INITIAL_SNAPSHOT_ID 11 /* ** Functions from file "lsm_ckpt.c". */ -int lsmCheckpointWrite(lsm_db *); +int lsmCheckpointWrite(lsm_db *, u32 *); int lsmCheckpointLevels(lsm_db *, int, void **, int *); int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal); int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *); int lsmCheckpointOverflowRequired(lsm_db *pDb); @@ -500,10 +497,11 @@ int lsmCheckpointLoad(lsm_db *pDb, int *); int lsmCheckpointLoadOk(lsm_db *pDb, int); i64 lsmCheckpointId(u32 *, int); +u32 lsmCheckpointNWrite(u32 *, int); i64 lsmCheckpointLogOffset(u32 *); int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); void lsmCheckpointZeroLogoffset(lsm_db *); @@ -520,11 +518,11 @@ void lsmTreeRelease(lsm_env *, Tree *); void lsmTreeClear(lsm_db *); int lsmTreeInit(lsm_db *); int lsmTreeRepair(lsm_db *); -void lsmTreeMakeOld(lsm_db *pDb, int *pnFlush); +void lsmTreeMakeOld(lsm_db *pDb); void lsmTreeDiscardOld(lsm_db *pDb); int lsmTreeHasOld(lsm_db *pDb); int lsmTreeSize(lsm_db *); int lsmTreeEndTransaction(lsm_db *pDb, int bCommit); @@ -674,13 +672,14 @@ /* ** Functions from file "lsm_sorted.c". */ int lsmInfoPageDump(lsm_db *, Pgno, int, char **); -int lsmSortedFlushTree(lsm_db *, int *); void lsmSortedCleanup(lsm_db *); int lsmSortedAutoWork(lsm_db *, int nUnit); + +int lsmFlushTreeToDisk(lsm_db *pDb); void lsmSortedRemap(lsm_db *pDb); void lsmSortedFreeLevel(lsm_env *pEnv, Level *); @@ -730,11 +729,10 @@ /* ** Functions from file "main.c". */ void lsmLogMessage(lsm_db *, int, const char *, ...); -int lsmFlushToDisk(lsm_db *); /* ** Functions from file "lsm_log.c". */ int lsmLogBegin(lsm_db *pDb); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -329,11 +329,11 @@ ){ int iOut = *piOut; assert( iOut==CKPT_HDR_LO_MSW ); - if( bFlush && pDb->treehdr.iOldShmid ){ + if( bFlush ){ i64 iOff = pDb->treehdr.iOldLog; ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc); ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc); ckptSetValue(p, iOut++, pDb->treehdr.oldcksum0, pRc); ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc); @@ -384,10 +384,11 @@ nFree = pSnap->freelist.nEntry; if( nOvfl>=0 ){ nFree -= nOvfl; }else{ + assert( 0 ); nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL]; } /* Initialize the output buffer */ memset(&ckpt, 0, sizeof(CkptBuffer)); @@ -428,11 +429,11 @@ ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc); ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc); ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc); ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc); ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc); - ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc); + ckptSetValue(&ckpt, CKPT_HDR_OVFL, (nOvfl?nOvfl:pSnap->nFreelistOvfl), &rc); ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc); if( bCksum ){ ckptAddChecksum(&ckpt, iOut, &rc); }else{ @@ -440,11 +441,11 @@ ckptSetValue(&ckpt, iOut+1, 0, &rc); } iOut += 2; assert( iOut<=1024 ); -#if 0 +#ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, rc, "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl ); #endif @@ -708,17 +709,18 @@ /* ** The connection must be the worker in order to call this function. ** ** True is returned if there are currently too many free-list entries -** in-memory to store in a checkpoint. Before calling lsmCheckpointSaveWorker() +** in-memory to store in a checkpoint. Before calling CheckpointSaveWorker() ** to save the current worker snapshot, a new top-level LSM segment must ** be created so that some of them can be written to the LSM. */ int lsmCheckpointOverflowRequired(lsm_db *pDb){ + Snapshot *p = pDb->pWorker; assert( lsmShmAssertWorker(pDb) ); - return (pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist); + return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0); } /* ** Connection pDb must be the worker to call this function. ** @@ -1124,10 +1126,17 @@ ShmHeader *pShm = pDb->pShmhdr; void *p = 0; int n = 0; int rc; +#if 0 +if( bFlush ){ + printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1); + fflush(stdout); +} +#endif + assert( lsmFsIntegrityCheck(pDb) ); rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); @@ -1206,10 +1215,18 @@ }else{ iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW]; } return iId; } + +u32 lsmCheckpointNWrite(u32 *aCkpt, int bDisk){ + if( bDisk ){ + return lsmGetU32((u8 *)&aCkpt[CKPT_HDR_NWRITE]); + }else{ + return aCkpt[CKPT_HDR_NWRITE]; + } +} i64 lsmCheckpointLogOffset(u32 *aCkpt){ return ((i64)aCkpt[CKPT_HDR_LO_MSW] << 32) + (i64)aCkpt[CKPT_HDR_LO_LSW]; } Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -387,12 +387,12 @@ pFS = (FileSystem *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc); if( pFS ){ LsmFile *pLsmFile; pFS->zDb = (char *)&pFS[1]; pFS->zLog = &pFS->zDb[nDb+1]; - pFS->nPagesize = LSM_PAGE_SIZE; - pFS->nBlocksize = LSM_BLOCK_SIZE; + pFS->nPagesize = LSM_DFLT_PAGE_SIZE; + pFS->nBlocksize = LSM_DFLT_BLOCK_SIZE; pFS->nMetasize = 4 * 1024; pFS->pDb = pDb; pFS->pEnv = pDb->pEnv; pFS->bUseMmap = pDb->bMmap; @@ -698,13 +698,14 @@ if( *pRc==LSM_OK && iSz>pFS->nMap ){ Page *pFix; int rc; u8 *aOld = pFS->pMap; rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap); - if( rc==LSM_OK ){ + if( rc==LSM_OK && pFS->pMap!=aOld ){ u8 *aData = (u8 *)pFS->pMap; for(pFix=pFS->pLruFirst; pFix; pFix=pFix->pLruNext){ + assert( &aOld[pFS->nPagesize * (i64)(pFix->iPg-1)]==pFix->aData ); pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)]; } lsmSortedRemap(pFS->pDb); } *pRc = rc; Index: src/lsm_log.c ================================================================== --- src/lsm_log.c +++ src/lsm_log.c @@ -445,13 +445,13 @@ ** in *pLog is updated before returning. */ void lsmLogEnd(lsm_db *pDb, int bCommit){ DbLog *pLog; LogWriter *p; - - if( pDb->bUseLog==0 ) return; p = pDb->pLogWriter; + + if( p==0 ) return; pLog = &pDb->treehdr.log; if( bCommit ){ pLog->aRegion[2].iEnd = p->iOff; pLog->cksum0 = p->cksum0; Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -71,18 +71,19 @@ *ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db)); if( pDb==0 ) return LSM_NOMEM_BKPT; /* Initialize the new object */ pDb->pEnv = pEnv; - pDb->nTreeLimit = LSM_TREE_BYTES; + pDb->nTreeLimit = LSM_DFLT_WRITE_BUFFER; + pDb->nAutockpt = LSM_DFLT_AUTOCHECKPOINT; pDb->bAutowork = 1; pDb->eSafety = LSM_SAFETY_NORMAL; pDb->xCmp = xCmp; - pDb->nLogSz = LSM_DEFAULT_LOG_SIZE; - pDb->nDfltPgsz = LSM_PAGE_SIZE; - pDb->nDfltBlksz = LSM_BLOCK_SIZE; - pDb->nMerge = LSM_DEFAULT_NMERGE; + pDb->nLogSz = LSM_DFLT_LOG_SIZE; + pDb->nDfltPgsz = LSM_DFLT_PAGE_SIZE; + pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE; + pDb->nMerge = LSM_DFLT_NMERGE; pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES; pDb->bUseLog = 1; pDb->iReader = -1; pDb->bMultiProc = 1; pDb->bMmap = LSM_IS_64_BIT; @@ -102,33 +103,10 @@ if( pDb->nTransOpen==0 && pDb->pCsr==0 ){ lsmFinishReadTrans(pDb); } } -static int dbAutoWork(lsm_db *pDb, int nUnit){ - int rc = LSM_OK; /* Return code */ - - assert( pDb->pWorker==0 ); - assert( pDb->bAutowork ); - assert( nUnit>0 ); - - /* If one is required, run a checkpoint. */ -#if 0 - rc = lsmCheckpointWrite(pDb); -#endif - - rc = lsmBeginWork(pDb); - if( rc==LSM_OK ) rc = lsmSortedAutoWork(pDb, nUnit); - if( pDb->pWorker && pDb->pWorker->pLevel ){ - lsmFinishWork(pDb, 0, -1, &rc); - }else{ - int rcdummy = LSM_BUSY; - lsmFinishWork(pDb, 0, 0, &rcdummy); - } - return rc; -} - static int getFullpathname( lsm_env *pEnv, const char *zRel, char **pzAbs ){ @@ -192,60 +170,10 @@ } return rc; } -/* -** This function flushes the contents of the in-memory tree to disk. It -** returns LSM_OK if successful, or an error code otherwise. -*/ -int lsmFlushToDisk(lsm_db *pDb){ - int rc = LSM_OK; /* Return code */ - int nOvfl = 0; /* Number of free-list entries in LSM */ - - /* Must not hold the worker snapshot when this is called. */ - assert( pDb->pWorker==0 ); - rc = lsmBeginWork(pDb); - - /* Save the position of each open cursor belonging to pDb. */ - if( rc==LSM_OK ){ - rc = lsmSaveCursors(pDb); - } - - if( rc==LSM_OK && pDb->bAutowork ){ - rc = lsmSortedAutoWork(pDb, LSM_AUTOWORK_QUANT); - } - while( rc==LSM_OK && lsmDatabaseFull(pDb) ){ - rc = lsmSortedAutoWork(pDb, LSM_AUTOWORK_QUANT); - } - - /* Write the contents of the in-memory tree into the database file and - ** update the worker snapshot accordingly. Then flush the contents of - ** the db file to disk too. No calls to fsync() are made here - just - ** write(). */ - if( rc==LSM_OK ) rc = lsmSortedFlushTree(pDb, &nOvfl); - lsmFinishWork(pDb, 1, nOvfl, &rc); - - /* Restore the position of any open cursors */ - if( rc==LSM_OK && pDb->pCsr ){ - lsmFreeSnapshot(pDb->pEnv, pDb->pClient); - pDb->pClient = 0; - rc = lsmCheckpointLoad(pDb, 0); - if( rc==LSM_OK ){ - rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient); - } - if( rc==LSM_OK ){ - rc = lsmRestoreCursors(pDb); - } - } - -#if 0 - if( rc==LSM_OK ) lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "flush"); -#endif - - return rc; -} int lsm_close(lsm_db *pDb){ int rc = LSM_OK; if( pDb ){ assert_db_state(pDb); @@ -268,11 +196,11 @@ va_start(ap, eParam); switch( eParam ){ case LSM_CONFIG_WRITE_BUFFER: { int *piVal = va_arg(ap, int *); - if( *piVal>0 ){ + if( *piVal>=0 ){ pDb->nTreeLimit = *piVal; } *piVal = pDb->nTreeLimit; break; } @@ -283,10 +211,19 @@ pDb->bAutowork = *piVal; } *piVal = pDb->bAutowork; break; } + + case LSM_CONFIG_AUTOCHECKPOINT: { + int *piVal = va_arg(ap, int *); + if( *piVal>=0 ){ + pDb->nAutockpt = *piVal; + } + *piVal = pDb->nAutockpt; + break; + } case LSM_CONFIG_LOG_SIZE: { int *piVal = va_arg(ap, int *); if( *piVal>0 ){ pDb->nLogSz = *piVal; @@ -576,18 +513,16 @@ nBefore = lsmTreeSize(pDb); rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal); nAfter = lsmTreeSize(pDb); nDiff = (nAfter/nQuant) - (nBefore/nQuant); if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){ - rc = dbAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT); - if( rc==LSM_BUSY ) rc = LSM_OK; + rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT); } } /* If a transaction was opened at the start of this function, commit it. - ** Or, if an error has occurred, roll it back. - */ + ** Or, if an error has occurred, roll it back. */ if( bCommit ){ if( rc==LSM_OK ){ rc = lsm_commit(pDb, 0); }else{ lsm_rollback(pDb, 0); @@ -759,59 +694,31 @@ return rc; } int lsm_commit(lsm_db *pDb, int iLevel){ - int nFlush = 0; /* Number of flushable trees in memory */ int rc = LSM_OK; assert_db_state( pDb ); /* A value less than zero means close the innermost nested transaction. */ if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1); if( iLevelnTransOpen ){ if( iLevel==0 ){ + int bAutowork = 0; /* Commit the transaction to disk. */ if( rc==LSM_OK ) rc = lsmLogCommit(pDb); if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){ rc = lsmFsSyncLog(pDb->pFS); } - - if( lsmTreeSize(pDb)>pDb->nTreeLimit ){ - lsmTreeMakeOld(pDb, &nFlush); - } lsmFinishWriteTrans(pDb, (rc==LSM_OK)); } pDb->nTransOpen = iLevel; - } dbReleaseClientSnapshot(pDb); - - /* If nFlush is not zero and auto-work is enabled, flush the tree to disk. - ** - ** If auto-work is enabled and data was written to disk, also sync the - ** db and checkpoint the latest snapshot. - ** - ** Ignore any LSM_BUSY errors that occur during these operations. If - ** LSM_BUSY does occur, it means some other connection is already working - ** on flushing the in-memory tree or checkpointing the database. - */ - assert( rc!=LSM_BUSY); - if( rc==LSM_OK ){ - if( nFlush && pDb->bAutowork ){ - rc = lsmFlushToDisk(pDb); - if( rc==LSM_OK && pDb->bAutowork ){ - rc = lsmCheckpointWrite(pDb); - } - }else if( nFlush && pDb->xWork ){ - pDb->xWork(pDb, pDb->pWorkCtx); - } - } - if( rc==LSM_BUSY ) rc = LSM_OK; - return rc; } int lsm_rollback(lsm_db *pDb, int iLevel){ int rc = LSM_OK; Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -87,11 +87,13 @@ ** Append an entry to the free-list. */ int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){ /* Assert that this is not an attempt to insert a duplicate block number */ +#if 0 assertNotInFreelist(p, iBlk); +#endif /* Extend the space allocated for the freelist, if required */ assert( p->nAlloc>=p->nEntry ); if( p->nAlloc==p->nEntry ){ int nNew; @@ -173,23 +175,29 @@ /* Flush the in-memory tree, if required. If there is data to flush, ** this will create a new client snapshot in Database.pClient. The ** checkpoint (serialization) of this snapshot may be written to disk ** by the following block. */ rc = lsmTreeLoadHeader(pDb, 0); - if( rc==LSM_OK && lsmTreeSize(pDb)>0 ){ - int nFlush = 0; - lsmTreeMakeOld(pDb, &nFlush); - if( nFlush ) rc = lsmFlushToDisk(pDb); + if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){ + assert( pDb->nTransOpen==0 ); + pDb->nTransOpen = 1; + lsmTreeMakeOld(pDb); + if( pDb->treehdr.iOldShmid ){ + rc = lsmFlushTreeToDisk(pDb); + } + pDb->nTransOpen = 0; } /* Write a checkpoint to disk. */ if( rc==LSM_OK ){ - rc = lsmCheckpointWrite(pDb); + rc = lsmCheckpointWrite(pDb, 0); } /* If the checkpoint was written successfully, delete the log file */ - if( rc==LSM_OK && pDb->pFS ){ + if( rc==LSM_OK && pDb->pFS + && pDb->treehdr.iOldShmid==0 && pDb->treehdr.nByte==0 + ){ Database *p = pDb->pDatabase; lsmFsCloseAndDeleteLog(pDb->pFS); if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1); } } @@ -493,10 +501,13 @@ int lsmBlockFree(lsm_db *pDb, int iBlk){ Snapshot *p = pDb->pWorker; assert( lsmShmAssertWorker(pDb) ); /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */ +#ifdef LSM_LOG_FREELIST + lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk); +#endif return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId); } /* @@ -529,12 +540,13 @@ ** The WORKER lock must not be held when this is called. This is because ** this function may indirectly call fsync(). And the WORKER lock should ** not be held that long (in case it is required by a client flushing an ** in-memory tree to disk). */ -int lsmCheckpointWrite(lsm_db *pDb){ +int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){ int rc; /* Return Code */ + u32 nWrite = 0; assert( pDb->pWorker==0 ); assert( 1 || pDb->pClient==0 ); assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) ); @@ -557,10 +569,11 @@ iCkpt = lsmCheckpointId(pDb->aSnapshot, 0); rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg); if( rc==LSM_OK ){ aData = lsmFsMetaPageData(pPg, &nData); iDisk = lsmCheckpointId((u32 *)aData, 1); + nWrite = lsmCheckpointNWrite((u32 *)aData, 1); lsmFsMetaPageRelease(pPg); } bDone = (iDisk>=iCkpt); } @@ -574,20 +587,24 @@ } if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta); if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){ rc = lsmFsSyncDb(pDb->pFS); } - if( rc==LSM_OK ) pShm->iMetaPage = iMeta; -#if 0 + if( rc==LSM_OK ){ + pShm->iMetaPage = iMeta; + nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite; + } +#ifdef LSM_LOG_WORK lsmLogMessage(pDb, 0, "finish checkpoint %d", (int)lsmCheckpointId(pDb->aSnapshot, 0) ); #endif } } lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0); + if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite; return rc; } int lsmBeginWork(lsm_db *pDb){ int rc; @@ -618,10 +635,13 @@ ** file space can be recycled. */ void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){ /* If no error has occurred, serialize the worker snapshot and write ** it to shared memory. */ + + assert( pDb->pWorker ); + assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 ); if( *pRc==LSM_OK ){ *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl); } if( pDb->pWorker ){ @@ -644,11 +664,11 @@ /* ** Begin a read transaction. This function is a no-op if the connection ** passed as the only argument already has an open read transaction. */ int lsmBeginReadTrans(lsm_db *pDb){ - const int MAX_READLOCK_ATTEMPTS = 5; + const int MAX_READLOCK_ATTEMPTS = 10; int rc = LSM_OK; /* Return code */ int iAttempt = 0; assert( pDb->pWorker==0 ); assert( (pDb->pClient!=0)==(pDb->iReader>=0) ); @@ -671,11 +691,11 @@ ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; u32 iShmMax = pDb->treehdr.iUsedShmid; - u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk; + u32 iShmMin = pDb->treehdr.iNextShmid+1-(1<<10); rc = lsmReadlock( pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax ); if( rc==LSM_OK ){ if( lsmTreeLoadHeaderOk(pDb, iTreehdr) @@ -690,12 +710,25 @@ assert( pDb->iReader>=0 ); }else{ rc = lsmReleaseReadlock(pDb); } } - if( rc==LSM_BUSY ) rc = LSM_OK; + if( rc==LSM_BUSY ){ + rc = LSM_OK; + } } +#if 0 +if( rc==LSM_OK && pDb->pClient ){ + printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n", + (void *)pDb, + (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, + (int)pDb->treehdr.root.iTransId, + (int)pDb->treehdr.iOldShmid + ); + fflush(stdout); +} +#endif } if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY; return rc; } @@ -783,14 +816,28 @@ ** merely releases locks and other resources held by the write-transaction. ** ** LSM_OK is returned if successful, or an LSM error code otherwise. */ int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){ + int rc = LSM_OK; + int bFlush = 0; + lsmLogEnd(pDb, bCommit); + if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){ + bFlush = 1; + lsmTreeMakeOld(pDb); + } lsmTreeEndTransaction(pDb, bCommit); + + if( rc==LSM_OK && bFlush && pDb->bAutowork ){ + rc = lsmSortedAutoWork(pDb, 1); + } lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0); - return LSM_OK; + if( bFlush && pDb->bAutowork==0 && pDb->xWork ){ + pDb->xWork(pDb, pDb->pWorkCtx); + } + return rc; } /* ** Return non-zero if the caller is holding the client mutex. @@ -813,13 +860,13 @@ ** Obtain a read-lock on database version identified by the combination ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or ** an LSM error code otherwise. */ int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){ + int rc = LSM_OK; ShmHeader *pShm = db->pShmhdr; int i; - int rc = LSM_OK; assert( db->iReader<0 ); assert( shm_sequence_ge(iShmMax, iShmMin) ); /* Search for an exact match. */ @@ -844,27 +891,31 @@ }else{ ShmReader *p = &pShm->aReader[i]; p->iLsmId = iLsm; p->iTreeId = iShmMax; rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0); + assert( rc!=LSM_BUSY ); if( rc==LSM_OK ) db->iReader = i; } } /* Search for any usable slot */ for(i=0; db->iReader<0 && rc==LSM_OK && iaReader[i]; - if( slotIsUsable(p, iLsm, iShmMax, iShmMax) ){ + if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){ rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0); - if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMax, iShmMax) ){ + if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){ db->iReader = i; }else if( rc==LSM_BUSY ){ rc = LSM_OK; } } } + if( rc==LSM_OK && db->iReader<0 ){ + rc = LSM_BUSY; + } return rc; } /* ** This is used to check if there exists a read-lock locking a particular @@ -1195,7 +1246,26 @@ void lsmShmBarrier(lsm_db *db){ lsmEnvShmBarrier(db->pEnv); } +int lsm_checkpoint(lsm_db *pDb, int *pnByte){ + int rc; /* Return code */ + u32 nWrite = 0; /* Number of pages checkpointed */ + + /* Attempt the checkpoint. If successful, nWrite is set to the number of + ** pages written between this and the previous checkpoint. */ + rc = lsmCheckpointWrite(pDb, &nWrite); + + /* If required, calculate the output variable (bytes of data checkpointed). + ** Set it to zero if an error occured. */ + if( pnByte ){ + int nByte = 0; + if( rc==LSM_OK && nWrite ){ + nByte = (int)nWrite * lsmFsPageSize(pDb->pFS); + } + *pnByte = nByte; + } + return rc; +} Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -324,11 +324,15 @@ }; #ifdef LSM_DEBUG_EXPENSIVE static int assertPointersOk(lsm_db *, Segment *, Segment *, int); static int assertBtreeOk(lsm_db *, Segment *); +static void assertRunInOrder(lsm_db *pDb, Segment *pSeg); +#else +#define assertRunInOrder(x,y) #endif + struct FilePage { u8 *aData; int nData; }; static u8 *fsPageData(Page *pPg, int *pnData){ *pnData = ((struct FilePage *)(pPg))->nData; return ((struct FilePage *)(pPg))->aData; @@ -605,25 +609,23 @@ if( pCsr->iPg<0 ){ pCsr->pKey = 0; pCsr->nKey = 0; pCsr->eType = 0; }else{ - int iPg; - for(iPg=pCsr->iPg; iPg>=0; iPg--){ - int iCell = pCsr->aPg[pCsr->iPg].iCell; - if( iCell>=0 ){ - int dummy; - rc = pageGetBtreeKey( - pCsr->aPg[pCsr->iPg].pPage, pCsr->aPg[pCsr->iPg].iCell, - &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob - ); - pCsr->eType |= SORTED_SEPARATOR; - break; - } - } - - if( iPg<0 ) rc = LSM_CORRUPT_BKPT; + int dummy; + int iPg = pCsr->iPg; + int iCell = pCsr->aPg[iPg].iCell; + while( iCell<0 && (--iPg)>=0 ){ + iCell = pCsr->aPg[iPg].iCell-1; + } + if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT; + + rc = pageGetBtreeKey( + pCsr->aPg[iPg].pPage, iCell, + &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob + ); + pCsr->eType |= SORTED_SEPARATOR; } return rc; } @@ -648,13 +650,11 @@ assert( pCsr->iPg>=0 ); assert( pCsr->iPg==pCsr->nDepth-1 ); aData = fsPageData(pPg->pPage, &nData); nCell = pageGetNRec(aData, nData); - assert( pPg->iCell<=nCell ); - pPg->iCell++; if( pPg->iCell==nCell ){ Pgno iLoad; /* Up to parent. */ @@ -3509,18 +3509,20 @@ } static int sortedNewToplevel( lsm_db *pDb, /* Connection handle */ int bTree, /* True to store contents of in-memory tree */ - int *pnOvfl /* OUT: Number of free-list entries stored */ + int *pnOvfl, /* OUT: Number of free-list entries stored */ + int *pnWrite /* OUT: Number of database pages written */ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ int iLeftPtr = 0; + int nWrite = 0; /* Number of database pages written */ assert( pnOvfl ); /* Allocate the new level structure to write to. */ pNext = lsmDbSnapshotLevel(pDb->pWorker); @@ -3584,10 +3586,11 @@ if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr); while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){ rc = mergeWorkerStep(&mergeworker); } + nWrite = mergeworker.nWork; mergeWorkerShutdown(&mergeworker, &rc); pNew->pMerge = 0; } /* Link the new level into the top of the tree. */ @@ -3602,49 +3605,14 @@ if( rc==LSM_OK ){ sortedInvokeWorkHook(pDb); } - return rc; -} - -/* -** Flush the contents of the in-memory tree to a new segment on disk. -** At present, this may occur in two scenarios: -** -** 1. When a transaction has just been committed (by connection pDb), -** and the in-memory tree has exceeded the size threshold, or -** -** 2. If the in-memory tree is not empty and the last connection to -** the database (pDb) is being closed. -** -** In both cases, the connection hold a worker snapshot reference. In -** the first, the connection also holds the in-memory tree write-version. -** In the second, no in-memory tree version reference is held at all. -*/ -int lsmSortedFlushTree( - lsm_db *pDb, /* Connection handle */ - int *pnOvfl /* OUT: Number of free-list entries written */ -){ - int rc; - - assert( pDb->pWorker ); - - /* If there is nothing to do, return early. */ - if( lsmTreeHasOld(pDb)==0 && lsmCheckpointOverflowRequired(pDb)==0 ){ - *pnOvfl = 0; - return LSM_OK; - } - - rc = sortedNewToplevel(pDb, 1, pnOvfl); - assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); - -#if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush"); -#endif -#if 0 - lsmLogMessage(pDb, rc, "flushed tree to disk"); + if( pnWrite ) *pnWrite = nWrite; + pDb->pWorker->nWrite += nWrite; +#if 0 + lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); #endif return rc; } /* @@ -3665,17 +3633,27 @@ int rc = LSM_OK; /* Return Code */ Level *pNew; /* New Level object */ int bUseNext = 0; /* True to link in next separators */ Merge *pMerge; /* New Merge object */ int nByte; /* Bytes of space allocated at pMerge */ + +#ifdef LSM_DEBUG + int iLevel; + Level *pX = pLevel; + for(iLevel=0; iLevelnRight==0 ); + pX = pX->pNext; + } +#endif /* Allocate the new Level object */ pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); if( pNew ){ pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, nMerge * sizeof(Segment), &rc); } + /* Populate the new Level object */ if( rc==LSM_OK ){ Level *pNext = 0; /* Level following pNew */ int i; @@ -3683,10 +3661,11 @@ Level *p = pLevel; Level **pp; pNew->nRight = nMerge; pNew->iAge = pLevel->iAge+1; for(i=0; inRight==0 ); pNext = p->pNext; pNew->aRhs[i] = p->lhs; sortedFreeLevel(pDb->pEnv, p); p = pNext; } @@ -3830,89 +3809,131 @@ } return rc; } -static int sortedWork(lsm_db *pDb, int nWork, int bOptimize, int *pnWrite){ +/* +** Argument p points to a level of age N. Return the number of levels in +** the linked list starting at p that have age=N (always at least 1). +*/ +static int sortedCountLevels(Level *p){ + int iAge = p->iAge; + int nRet = 0; + do { + nRet++; + p = p->pNext; + }while( p && p->iAge==iAge ); + return nRet; +} + +static int sortedSelectLevel(lsm_db *pDb, int bOpt, Level **ppOut){ + Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker); + int rc = LSM_OK; + Level *pLevel = 0; /* Output value */ + Level *pBest = 0; /* Best level to work on found so far */ + int nBest = pDb->nMerge-1; /* Number of segments merged at pBest */ + Level *pThis = 0; /* First in run of levels with age=iAge */ + int nThis = 0; /* Number of levels starting at pThis */ + + /* Find the longest contiguous run of levels not currently undergoing a + ** merge with the same age in the structure. Or the level being merged + ** with the largest number of right-hand segments. Work on it. */ + for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){ + if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){ + nThis++; + }else{ + if( nThis>nBest ){ + if( (pLevel->iAge!=pThis->iAge+1) + || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge) + ){ + pBest = pThis; + nBest = nThis; + } + } + if( pLevel->nRight ){ + if( pLevel->nRight>nBest ){ + nBest = pLevel->nRight; + pBest = pLevel; + } + nThis = 0; + pThis = 0; + }else{ + pThis = pLevel; + nThis = 1; + } + } + } + if( nThis>nBest ){ + assert( pThis ); + pBest = pThis; + nBest = nThis; + } + + if( pBest==0 && bOpt && pTopLevel->pNext ){ + pBest = pTopLevel; + nBest = 2; + } + + if( pBest ){ + if( pBest->nRight==0 ){ + rc = sortedMergeSetup(pDb, pBest, nBest, ppOut); + }else{ + *ppOut = pBest; + } + } + + return rc; +} + +static int sortedDbIsFull(lsm_db *pDb){ + Level *pTop = lsmDbSnapshotLevel(pDb->pWorker); + + if( lsmDatabaseFull(pDb) ) return 1; + if( pTop && pTop->iAge==0 + && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge) + ){ + return 1; + } + return 0; +} + +static int sortedWork( + lsm_db *pDb, /* Database handle. Must be worker. */ + int nWork, /* Number of pages of work to do */ + int bOptimize, /* True to merge less than nMerge levels */ + int bFlush, /* Set if call is to make room for a flush */ + int *pnWrite /* OUT: Actual number of pages written */ +){ int rc = LSM_OK; /* Return Code */ int nRemaining = nWork; /* Units of work to do before returning */ Snapshot *pWorker = pDb->pWorker; - assert( lsmFsIntegrityCheck(pDb) ); assert( pWorker ); - if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK; while( nRemaining>0 ){ - Level *pLevel; - Level *pTopLevel = lsmDbSnapshotLevel(pWorker); - - /* Find the longest contiguous run of levels not currently undergoing a - ** merge with the same age in the structure. Or the level being merged - ** with the largest number of right-hand segments. Work on it. */ - Level *pBest = 0; - int nBest = pDb->nMerge; - - Level *pThis = 0; - int nThis = 0; - - for(pLevel = pTopLevel; pLevel; pLevel=pLevel->pNext){ - if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){ - nThis++; - }else{ - if( nThis>=nBest ){ - pBest = pThis; - nBest = nThis; - } - if( pLevel->nRight ){ - if( pLevel->nRight>=nBest ){ - nBest = pLevel->nRight; - pBest = pLevel; - nThis = 0; - pThis = 0; - } - }else{ - pThis = pLevel; - nThis = 1; - } - } - } - if( nThis>nBest ){ - assert( pThis ); - pBest = pThis; - nBest = nThis; - } - - if( pBest==0 && bOptimize && pTopLevel->pNext ){ - pBest = pTopLevel; - nBest = 2; - } - - if( pBest ){ - if( pBest->nRight==0 ){ - rc = sortedMergeSetup(pDb, pBest, nBest, &pLevel); - }else{ - pLevel = pBest; - } - } + Level *pLevel = 0; + + /* Find a level to work on. */ + rc = sortedSelectLevel(pDb, bOptimize, &pLevel); + assert( rc==LSM_OK || pLevel==0 ); if( pLevel==0 ){ /* Could not find any work to do. Finished. */ break; }else{ MergeWorker mergeworker; /* State used to work on the level merge */ rc = mergeWorkerInit(pDb, pLevel, &mergeworker); - assert( mergeworker.nWork==0 ); while( rc==LSM_OK && 0==mergeWorkerDone(&mergeworker) && mergeworker.nWorklhs); + #if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work"); + lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work"); #endif - } } - if( pnWrite ){ - *pnWrite = (nWork - nRemaining); - } + if( pnWrite ) *pnWrite = (nWork - nRemaining); pWorker->nWrite += (nWork - nRemaining); - assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); +#ifdef LSM_LOG_WORK + lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining)); +#endif return rc; } typedef struct Metric Metric; struct Metric { @@ -4037,10 +4064,186 @@ nMinSz ); } } #endif + +/* +** The database connection passed as the first argument must be a worker +** connection. This function checks if there exists an "old" in-memory tree +** ready to be flushed to disk. If so, *pbOut is set to true before +** returning. Otherwise false. +** +** Normally, LSM_OK is returned. Or, if an error occurs, an LSM error code. +*/ +static int sortedTreeHasOld(lsm_db *pDb, int *pbOut){ + int rc = LSM_OK; + + assert( pDb->pWorker ); + if( pDb->nTransOpen==0 ){ + rc = lsmTreeLoadHeader(pDb, 0); + } + + if( rc==LSM_OK + && pDb->treehdr.iOldShmid + && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff + ){ + *pbOut = 1; + }else{ + *pbOut = 0; + } + return rc; +} + +static int doLsmSingleWork( + lsm_db *pDb, + int bShutdown, + int flags, + int nPage, /* Number of pages to write to disk */ + int *pnWrite, /* OUT: Pages actually written to disk */ + int *pbCkpt /* OUT: True if an auto-checkpoint is req. */ +){ + int rc = LSM_OK; /* Return code */ + int nOvfl = 0; + int bFlush = 0; + int nMax = nPage; /* Maximum pages to write to disk */ + int nRem = nPage; + int bCkpt = 0; + int bToplevel = 0; + + /* Open the worker 'transaction'. It will be closed before this function + ** returns. */ + assert( pDb->pWorker==0 ); + rc = lsmBeginWork(pDb); + if( rc!=LSM_OK ) return rc; + + /* If this connection is doing auto-checkpoints, set nMax (and nRem) so + ** that this call stops writing when the auto-checkpoint is due. */ + if( bShutdown==0 && pDb->nAutockpt ){ + u32 nSync; + u32 nUnsync; + int nPgsz; + int nMax; + + lsmCheckpointSynced(pDb, 0, 0, &nSync); + nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0); + nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1); + + nMax = (pDb->nAutockpt/nPgsz) - (nUnsync-nSync); + if( nMax0 ){ + int nPg = 0; + rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg); + nRem -= nPg; + if( rc==LSM_OK && pDb->nTransOpen>0 ){ + lsmTreeDiscardOld(pDb); + } + bFlush = 1; + bToplevel = 0; + } + } + } + + /* If nPage is still greater than zero, do some merging. */ + if( rc==LSM_OK && nRem>0 && bShutdown==0 ){ + int nPg = 0; + int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0); + rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg); + nRem -= nPg; + if( nPg ){ + bToplevel = 1; + nOvfl = 0; + } + } + + if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){ + while( rc==LSM_OK && sortedDbIsFull(pDb) ){ + int nPg = 0; + rc = sortedWork(pDb, 16, 0, 1, &nPg); + } + if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){ + rc = sortedNewToplevel(pDb, 0, &nOvfl, 0); + } + } + + if( rc==LSM_OK && (nRem!=nMax) ){ + rc = lsmSortedFlushDb(pDb); + lsmFinishWork(pDb, bFlush, nOvfl, &rc); + }else{ + int rcdummy = LSM_BUSY; + assert( rc!=LSM_OK || bFlush==0 ); + lsmFinishWork(pDb, 0, 0, &rcdummy); + } + assert( pDb->pWorker==0 ); + + if( rc==LSM_OK ){ + if( pnWrite ) *pnWrite = (nMax - nRem); + if( pbCkpt ) *pbCkpt = (bCkpt && nRem<=0); + }else{ + if( pnWrite ) *pnWrite = 0; + if( pbCkpt ) *pbCkpt = 0; + } + + return rc; +} + +static int doLsmWork(lsm_db *pDb, int flags, int nPage, int *pnWrite){ + int rc; + int nWrite = 0; + int bCkpt = 0; + + do { + int nThis = 0; + bCkpt = 0; + rc = doLsmSingleWork(pDb, 0, flags, nPage-nWrite, &nThis, &bCkpt); + nWrite += nThis; + if( rc==LSM_OK && bCkpt ){ + rc = lsm_checkpoint(pDb, 0); + } + }while( rc==LSM_OK && (nWritenTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT; + + return doLsmWork(pDb, flags, nPage, pnWrite); +} /* ** This function is called in auto-work mode to perform merging work on ** the data structure. It performs enough merging work to prevent the ** height of the tree from growing indefinitely assuming that roughly @@ -4051,116 +4254,66 @@ lsm_db *pDb, /* Database handle */ int nUnit /* Pages of data written to in-memory tree */ ){ int rc; /* Return code */ int nRemaining; /* Units of work to do before returning */ - int nDepth; /* Current height of tree (longest path) */ + int nDepth = 0; /* Current height of tree (longest path) */ int nWrite; /* Pages written */ Level *pLevel; /* Used to iterate through levels */ + int bRestore = 0; - assert( lsmFsIntegrityCheck(pDb) ); - assert( pDb->pWorker ); + assert( pDb->pWorker==0 ); + assert( pDb->nTransOpen>0 ); /* Determine how many units of work to do before returning. One unit of ** work is achieved by writing one page (~4KB) of merged data. */ - nRemaining = nDepth = 0; - for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){ + nRemaining = 0; + for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){ /* nDepth += LSM_MAX(1, pLevel->nRight); */ nDepth += 1; } + if( lsmTreeHasOld(pDb) ){ + nDepth += 1; + bRestore = 1; + rc = lsmSaveCursors(pDb); + if( rc!=LSM_OK ) return rc; + } + nRemaining = nUnit * nDepth; +#ifdef LSM_LOG_WORK + lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages", + nUnit, nDepth, nRemaining); +#endif + rc = doLsmWork(pDb, LSM_WORK_FLUSH, nRemaining, 0); + if( rc==LSM_BUSY ) rc = LSM_OK; - rc = sortedWork(pDb, nRemaining, 0, &nWrite); + if( bRestore && pDb->pCsr ){ + lsmFreeSnapshot(pDb->pEnv, pDb->pClient); + pDb->pClient = 0; + rc = lsmCheckpointLoad(pDb, 0); + if( rc==LSM_OK ){ + rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient); + } + if( rc==LSM_OK ){ + rc = lsmRestoreCursors(pDb); + } + } + #if 0 lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite); #endif + return rc; } -/* -** Perform work to merge database segments together. -*/ -int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){ - int rc = LSM_OK; /* Return code */ - int nOvfl = 0; - int bFlush = 0; - int bFinishWork = 0; - int nWrite = 0; - - /* This function may not be called if pDb has an open read or write - ** transaction. Return LSM_MISUSE if an application attempts this. */ - if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT; - - if( (flags & LSM_WORK_FLUSH) && (flags & LSM_WORK_OPTIMIZE) ){ - rc = lsmBeginWriteTrans(pDb); - if( rc==LSM_OK ){ - int nDummy; - lsmTreeMakeOld(pDb, &nDummy); - lsmFinishWriteTrans(pDb, 1); - lsmFinishReadTrans(pDb); - } - if( rc==LSM_BUSY ) rc = LSM_OK; - } - - assert( pDb->pWorker==0 ); - if( (flags & LSM_WORK_FLUSH) || nPage>0 ){ - rc = lsmBeginWork(pDb); - bFinishWork = 1; - } - - /* If the FLUSH flag is set, try to flush the contents of the in-memory - ** tree to disk. */ - if( rc==LSM_OK && ((flags & LSM_WORK_FLUSH)) ){ - rc = lsmTreeLoadHeader(pDb, 0); - if( rc==LSM_OK - && pDb->treehdr.iOldShmid - && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff - ){ - rc = lsmSortedFlushTree(pDb, &nOvfl); - bFlush = 1; - } - } - - /* If nPage is greater than zero, do some merging. */ - if( rc==LSM_OK && nPage>0 ){ - int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0); - rc = sortedWork(pDb, nPage, bOptimize, &nWrite); - if( rc==LSM_OK && nWrite ){ -#if 0 - { - char *z = 0; - lsmInfoFreelist(pDb, &z); - lsmLogMessage(pDb, 0, "work: %d pages", nWrite); - lsmLogMessage(pDb, 0, "freelist: %s", z); - lsm_free(lsm_get_env(pDb), z); - } -#endif - rc = lsmSortedFlushDb(pDb); - if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){ - nOvfl = -1; - rc = sortedNewToplevel(pDb, 0, &nOvfl); - } - } - } - - if( pnWrite ) *pnWrite = nWrite; - if( bFinishWork ){ - if( nWrite || bFlush ){ - lsmFinishWork(pDb, bFlush, nOvfl, &rc); - }else{ - int rcdummy = LSM_BUSY; - lsmFinishWork(pDb, 0, 0, &rcdummy); - } - } - assert( pDb->pWorker==0 ); - - /* If the LSM_WORK_CHECKPOINT flag is specified and one is available, - ** write a checkpoint out to disk. */ - if( rc==LSM_OK && (flags & LSM_WORK_CHECKPOINT) ){ - rc = lsmCheckpointWrite(pDb); - } - +int lsmFlushTreeToDisk(lsm_db *pDb){ + int rc; + rc = doLsmSingleWork(pDb, 1, LSM_WORK_FLUSH, (1<<30), 0, 0); + if( rc==LSM_OK ){ + lsmTreeMakeOld(pDb); + rc = doLsmSingleWork(pDb, 1, LSM_WORK_FLUSH, (1<<30), 0, 0); + } return rc; } /* ** Return a string representation of the segment passed as the only argument. @@ -4533,11 +4686,13 @@ if( ipEnv, zRight, sizeof(zRight), 28, aRight[i]); } if( i==0 ){ - sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d:", iLevel); + sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d)", + iLevel, pLevel->iAge + ); }else{ zLevel[0] = '\0'; } if( nRight==0 ){ @@ -4595,10 +4750,56 @@ for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){ lsmTreeCursorSave(pCsr->apTreeCsr[0]); lsmTreeCursorSave(pCsr->apTreeCsr[1]); } } + +#ifdef LSM_DEBUG_EXPENSIVE +static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){ + Page *pPg = 0; + Blob blob1 = {0, 0, 0, 0}; + Blob blob2 = {0, 0, 0, 0}; + + lsmFsDbPageGet(pDb->pFS, pSeg->iFirst, &pPg); + while( pPg ){ + u8 *aData; int nData; + Page *pNext; + + aData = lsmFsPageData(pPg, &nData); + if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){ + int i; + int nRec = pageGetNRec(aData, nData); + for(i=0; ipEnv, pPg, i, &iTopic1, &blob1); + + if( i==0 && blob2.nData ){ + assert( sortedKeyCompare( + pDb->xCmp, iTopic2, blob2.pData, blob2.nData, + iTopic1, blob1.pData, blob1.nData + )<0 ); + } + + if( i<(nRec-1) ){ + pageGetKeyCopy(pDb->pEnv, pPg, i+1, &iTopic2, &blob2); + assert( sortedKeyCompare( + pDb->xCmp, iTopic1, blob1.pData, blob1.nData, + iTopic2, blob2.pData, blob2.nData + )<0 ); + } + } + } + + lsmFsDbPageNext(pSeg, pPg, 1, &pNext); + lsmFsPageRelease(pPg); + pPg = pNext; + } + + sortedBlobFree(&blob1); + sortedBlobFree(&blob2); +} +#endif #ifdef LSM_DEBUG_EXPENSIVE /* ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is ** defined. Its only purpose is to evaluate various assert() statements to Index: src/lsm_tree.c ================================================================== --- src/lsm_tree.c +++ src/lsm_tree.c @@ -961,15 +961,12 @@ pDb->treehdr.root.nHeight = 0; pDb->treehdr.nByte = 0; pDb->treehdr.iUsedShmid = pDb->treehdr.iNextShmid-1; } -void lsmTreeMakeOld(lsm_db *pDb, int *pnFlush){ - if( pDb->treehdr.iOldShmid ){ - *pnFlush = 2; - }else{ - *pnFlush = 1; +void lsmTreeMakeOld(lsm_db *pDb){ + if( pDb->treehdr.iOldShmid==0 ){ pDb->treehdr.iOldLog = pDb->treehdr.log.aRegion[2].iEnd; pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0; pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1; pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1; memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot)); Index: tool/lsmperf.tcl ================================================================== --- tool/lsmperf.tcl +++ tool/lsmperf.tcl @@ -156,13 +156,17 @@ append script "pause -1\n" exec_gnuplot_script $script $zPng } -do_write_test x.png 40 20000 40000 1000 { - LSM "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0" +do_write_test x.png 60 25000 0 40 { + lsm-mt "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0" LevelDB leveldb } +# lsm-mt "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0" +# lsm-st "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1" +# LevelDB leveldb +# SQLite sqlite3 Index: www/lsm.wiki ================================================================== --- www/lsm.wiki +++ www/lsm.wiki @@ -702,7 +702,53 @@
  • Update the shared-memory variable to indicate the meta-page written in step 5.
  • Drop the CHECKPOINTER lock. + +

    5. Scheduling Policies

    + +

    +When a client writes to a database, the in-memory tree and log file are +updated by the client itself before the lsm_write() call returns. Eventually, +once sufficient writes have accumulated in memory, the client marks the +current tree as "old", and subsequent writes are accumulated in a new tree. + +

    +In order to prevent the in-memory tree and log file from growing indefinitely, +at some point in the future the following must occur: + +

      +
    • The contents of the old tree must be written into the database file + (a WORKER lock operation). Once this is done the memory used to store the + old tree is available for reuse. + +
    • A checkpoint operation must take place to sync the data into the + database file and update the database header (a CHECKPOINT lock + operation). Once this has been done the log file space that was used + to store the data may be reclaimed. +
    + +

    +In addition to the above, it is necessary to perform a certain amount of +work on the database to merge existing levels together. This is not just +to speed up queries - there is a hard limit of roughly 40 levels to stop +database snapshots from growing overly large. + +

    Explicit Calls to lsm_work() and lsm_checkpoint() + +

    Compulsory work + +

      +
    • If a writer tries to mark a tree as "old", but there is already an + old tree in-memory, the writer attempts to grab the WORKER lock and + write both the old and new tree to a new database level. + +

      If the WORKER lock cannot be obtained immediately, block until it + can be +

    + +

    Auto work + +